규턴의 개발블로그
article thumbnail

구성방법

 

현재 구성되어 있는 방법은 다음과같다.

 

1.kafka topic을
-line_info

-most_champion

-user_info

-full_record

와 같이 4개로 나누고 해당 토픽에 대응하는 컨슈머그룹을 총 4개운영한다.

 

현재 프로젝트 directory 구조는 다음과 같다

 

하나의 샘플을 예시로 들자면

 

샘플코드

Producer

@Service
public class FullRecordProducer {
    private static final String TOPIC = "full_record";
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public FullRecordProducer(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @Autowired
    public GameRecordRepository gameRecordRepository;


    public void sendFullRecordJson(String key,String message) {
        this.kafkaTemplate.send(TOPIC,key,message);
    }
}

 

Consumer


@Service
@Slf4j
public class FullRecordConsumer {
    @Autowired
    public GameRecordRepository gameRecordRepository;
    @Autowired
    public GameRecordService gameRecordService;
    @Autowired
    public RecordMlRepository recordMlRepository;

    

    @KafkaListener(topics = "full_record", groupId = "full_record_con_grp")
    public void consume( @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
                         @Payload String data) throws IOException {
        GameRecord gameRecord = gameRecordRepository.findById(key).get();
        gameRecordService.updateFullRecord(gameRecord,data);
    }

 

 

구성도

 

 

고려사항

kafka:
  bootstrap-servers: 13.209.92.240:9092
  consumer:
    # consumer bootstrap servers가 따로 존재하면 설정
    # bootstrap-servers: 3.34.97.97:9092

    # 식별 가능한 Consumer Group Id
    group-id: jsonGroup
    # Kafka 서버에 초기 offset이 없거나, 서버에 현재 offset이 더 이상 존재하지 않을 경우 수행할 작업을 설정
    # latest: 가장 최근에 생산된 메시지로 offeset reset
    # earliest: 가장 오래된 메시지로 offeset reset
    # none: offset 정보가 없으면 Exception 발생
    auto-offset-reset: latest
    # 데이터를 받아올 때, key/value를 역직렬화
    # JSON 데이터를 받아올 것이라면 JsonDeserializer
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  producer:
    # producer bootstrap servers가 따로 존재하면 설정
    # bootstrap-servers: 3.34.97.97:9092

    # 데이터를 보낼 때, key/value를 직렬화
    # JSON 데이터를 보낼 것이라면 JsonDeserializer ,StringSerializer
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: org.apache.kafka.common.serialization.StringSerializer

application.yml에 kafka에 해당되는 설정을 하였다.

 

실제로는 JSON데이터를 받는것이기에 JsonSerializer,JsonDeserializer를 해야하는데 JSON으로 구성하게 되면 에러가 발생한다.

 

그렇기에 API에서 받은 데이터를 tostring하여 String으로 받았다.

 

실제 데이터가 잘 담기는것을 확인할 수 있다.

profile

규턴의 개발블로그

@규턴이

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!