구성방법
현재 구성되어 있는 방법은 다음과같다.
1.kafka topic을
-line_info
-most_champion
-user_info
-full_record
와 같이 4개로 나누고 해당 토픽에 대응하는 컨슈머그룹을 총 4개운영한다.
현재 프로젝트 directory 구조는 다음과 같다
하나의 샘플을 예시로 들자면
샘플코드
Producer
@Service
public class FullRecordProducer {
private static final String TOPIC = "full_record";
private final KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public FullRecordProducer(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Autowired
public GameRecordRepository gameRecordRepository;
public void sendFullRecordJson(String key,String message) {
this.kafkaTemplate.send(TOPIC,key,message);
}
}
Consumer
@Service
@Slf4j
public class FullRecordConsumer {
@Autowired
public GameRecordRepository gameRecordRepository;
@Autowired
public GameRecordService gameRecordService;
@Autowired
public RecordMlRepository recordMlRepository;
@KafkaListener(topics = "full_record", groupId = "full_record_con_grp")
public void consume( @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Payload String data) throws IOException {
GameRecord gameRecord = gameRecordRepository.findById(key).get();
gameRecordService.updateFullRecord(gameRecord,data);
}
고려사항
kafka:
bootstrap-servers: 13.209.92.240:9092
consumer:
# consumer bootstrap servers가 따로 존재하면 설정
# bootstrap-servers: 3.34.97.97:9092
# 식별 가능한 Consumer Group Id
group-id: jsonGroup
# Kafka 서버에 초기 offset이 없거나, 서버에 현재 offset이 더 이상 존재하지 않을 경우 수행할 작업을 설정
# latest: 가장 최근에 생산된 메시지로 offeset reset
# earliest: 가장 오래된 메시지로 offeset reset
# none: offset 정보가 없으면 Exception 발생
auto-offset-reset: latest
# 데이터를 받아올 때, key/value를 역직렬화
# JSON 데이터를 받아올 것이라면 JsonDeserializer
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
# producer bootstrap servers가 따로 존재하면 설정
# bootstrap-servers: 3.34.97.97:9092
# 데이터를 보낼 때, key/value를 직렬화
# JSON 데이터를 보낼 것이라면 JsonDeserializer ,StringSerializer
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
application.yml에 kafka에 해당되는 설정을 하였다.
실제로는 JSON데이터를 받는것이기에 JsonSerializer,JsonDeserializer를 해야하는데 JSON으로 구성하게 되면 에러가 발생한다.
그렇기에 API에서 받은 데이터를 tostring하여 String으로 받았다.
실제 데이터가 잘 담기는것을 확인할 수 있다.
'프로젝트 > 롤 전적검색 프로젝트' 카테고리의 다른 글
[롤 전적검색] NiFi&Kafka를 이용하여 Json 데이터를 RDMBS에 적재(2) (0) | 2022.09.28 |
---|---|
[롤 전적검색] NiFi&Kafka를 이용하여 Json 데이터를 RDMBS에 적재 (1) | 2022.09.26 |
[DOCKER] react-docker시 unable to resolve dependency tree 문제 해결 (0) | 2022.09.16 |
[트러블슈팅] 문제 해결 과정(1)- RDBMS에 JSON데이터 적재하기 (0) | 2022.08.31 |
[트러블 슈팅]라이엇api 호출 delay (0) | 2022.08.29 |