규턴의 개발블로그
article thumbnail

SpringBoot에서 Kafka - Producer

 

https://devbksheen.tistory.com/entry/Kafka-Spring-Boot%EC%97%90-Kafka%EB%A5%BC-%EC%97%B0%EB%8F%99?category=1024738 

 

Spring Boot에 Kafka 연동

[Kafka] Local에서 Kafka 명령어 날리기 [Kafka] EC2 생성 후 접속, Kafka 설치 및 설정 AWS EC2 생성 후 Inbound rule 추가 1. AWS Console 로그인 후 EC2에 접속 2. EC2 인스턴스 생성 3. 키 페어 생성 4. Inbou..

devbksheen.tistory.com

springboot와 kafka 연동은 다음을 참고하였음

 

public static String fullRecordGameRecord(String lolName){
        String addUrl = "/dataflow/user/gameRecordMachineLearning"+"?lolName="+lolName;
        ResponseEntity responseEntity = CallApi.GetIncludeParameter(addUrl);
        return responseEntity.getBody().toString();
    }

 

우선 fullrecord json데이터를 얻기위해 url을 호출한다. 

다른 서버에서 구성되고 있는 springboot application을 호출함

 

@Service
public class FullRecordProducer {
    private static final String TOPIC = "full_record";
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public FullRecordProducer(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @Autowired
    public GameRecordRepository gameRecordRepository;


    public void sendFullRecordJson(String key,String message) {
        this.kafkaTemplate.send(TOPIC,key,message);
    }
}

 

그리고 kafka를 통해 key= lolname, message=유저의 전적(json데이터)를 produce한다.

 

 

 

 public void produceFullSearch(String lolName) throws IOException, InterruptedException {
            String encodedLolName = URLEncoder.encode(lolName, "utf-8");
            String fullGameRecord = GameRecordApi.fullRecordGameRecord(encodedLolName);
            fullRecordProducer.sendFullRecordJson(lolName,fullGameRecord);
    }

           

실제 내가 사용하고 있는 함수는 다음과같다.

1. 롤 네임을 utf-8로 인코딩

2. api호출하여 return값은 '유저의 전적(json)'

3. 해당 전적데이터를 kafka를 통해 produce

 

 

NiFi & Kafka - Consumer

 

 

우선 전체적인 흐름은 다음과 같다.

 

1.kafka consumer를 통해 key,value 데이터 받아오기

2. "gamerecord": [

 {}, {}

]

과 같이 객체 배열로 된 데이터를 나누어 주는 SplitJson 사용 -> 총 20개의 json데이터로 나누어줌

3. matchId를 attribute로 추출 -> 추후 전적저장시에 중복된 matchId는 저장하지 않기위함

4. itemarray, players는 값 자체가 json데이터로 이루어져 있는데 NiFi에서 'convert to json' 사용시에 json데이터 자체를 column의 데이터 값으로 바꾸어주지 못한다. 그렇기에 우회해서 사용하는 방법인 해당 json데이터를 우선 attribute로 추가하여 추후에 attribute를 사용하여 insert한다.

5. json데이터를 내가 만든 table에 매핑시켜 자동으로 sql문으로 만들어준다.

6. 이후 4번에서 attribute로 만들었던것을 sql문에 따로 저장한다.

7. sql문을 실행시킨다.(이때 PK값으로 uuid를 사용하기에 sql문을 조금 수정한다)

 

실제로 해당 과정을 통해 자동으로 json데이터를 rdbms에 저장한다.

profile

규턴의 개발블로그

@규턴이

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!