2022.02.09 - [Java/Spring boot] - [Spring boot & Kafka] Kafka Producer Service 예제
이번엔 Kafka Consumer Service를 작성해 보겠다. 위의 포스트에서 사용한 KafkaTempalate.send() 와 ReplykafkaTemplate.sendAndReceive()의 대응되는 부분을 작성해보겠다.
Kafka Consumer는 KafkaListener를 등록하고 Topic의 partition에 message가 오기를 기다린다. Message가 Topic 의 Partition으로 전달이 되면 Listener의 event가 전달되어 Listener로 등록된 함수를 실행하게 된다.
Consumer의 설정은 앞쪽 Kafka Configuration 포스트에서 작성되어 있다.
2022.02.04 - [Java/Spring boot] - [Spring boot] Kafka 설정하기
Consumer 가 있는 Service에서는 Listener 함수부분만 설명하도록 하겠다.
Consumer Service code
Kafka Consumer를 위한 Listener는 @kafkaListener annotation으로 등록한다.
@KafkaListenr annotation으로 등록되는 Listner에는 몇가지 정보가 등록된다. 이 Listener가 수행되게 되는 Topic을 지정해주고, GroupID, ContainerFactory를 지정해준다. GroupId는 Topic을 Consume하는 Consumer들의 모임이다. 아래의 그림을 보면 이해가 빠를 것이다.
Topic에 여러 Consumer Group이 붙을 수 있고 이 Group 모두 Message를 다 받을 수 있다. 만약 Group에 Consumer가 추가되게 되면 Rebalancing을 통하 모든 Consumer에게 메세지가 전달될 수 있게 한다.
ContainerFactory를 설정파일에서 Bean으로 등록한 ListenerContainerFactory를 넣어준다.
public class JwtTokenMessageServiceImpl implements JwtTokenMessageService {
private final JwtTokenService jwtTokenService;
private final ObjectMapper objectMapper;
private final ModelMapper modelMapper;
@Override
@KafkaListener(
topics = JwtTokenTopic.TOPIC_CREATE_JWT_TOKEN,
groupId = "${kafka.consumer.groupId}",
containerFactory = "kafkaJwtTokenListenerContainerFactory")
public void listenCreateJwtTokenCache(ConsumerRecord<String, String> record, Acknowledgment ack) throws JsonProcessingException {
LOGGER.debug("received message from '{}' : {}", record.topic(), record.value());
JwtTokenMessage.CreateMessage createMessage = objectMapper.readValue(record.value(), JwtTokenMessage.CreateMessage.class);
JwtToken jwtToken = jwtTokenService.create(modelMapper.map(createMessage, JwtToken.class));
LOGGER.debug("created JwtToken Cache '{}' : {}", jwtToken.getId(), jwtToken.getJwtToken());
ack.acknowledge();
}
@Override
@KafkaListener(
topics = JwtTokenTopic.TOPIC_REQUEST_JWT_TOKEN,
groupId = "${kafka.consumer.groupId}",
containerFactory = "kafkaJwtTokenListenerContainerFactory")
@SendTo(JwtTokenTopic.TOPIC_REPLY_JWT_TOKEN)
public JwtTokenMessage.JwtTokenInfo listenRetrieveJwtTokenCache(ConsumerRecord<String, String> record, Acknowledgment ack) throws JsonProcessingException {
LOGGER.debug("received message from '{}' : {}", record.topic(), record.value());
JwtTokenMessage.RequestMessage retrieveMessage = objectMapper.readValue(record.value(), JwtTokenMessage.RequestMessage.class);
JwtToken jwtToken = jwtTokenService.getToken(retrieveMessage.getAccountId());
ack.acknowledge();
LOGGER.debug("requested JwtToken Cache '{}' : {}", jwtToken.getId(), jwtToken.getJwtToken());
return JwtTokenMessage.JwtTokenInfo.from(jwtToken, modelMapper);
}
}
첫번째 listenCreateJwtTokenCache() 함수는 kafkaTemplate.send()에 대한 동작을 수행한다. 메세지가 들어올 경우 메세지를 미리 정의된 Object로 mapping 해준 후 Redis에 JwtToken 값을 생성한다. 그리고 ack.acknowlege() 함수로 Producer에게 메세지를 받았다는 응답을 전달한다. ack.acknowlege() 함수는 Auto-Commit 설정을 false로 하였을 경우 명시적으로 호출해 주어야 한다.
두번째 listenRetrieveJwtTokenCache() 함수는 ReplyingKafkaTemplate.sendAndReceive() 에 대한 응답이다. 첫번째 Listener와 다른 점은 @SendTo annotation이 붙어 있고 Return type이 void가 아닌 것이다. @SendTo annotation은 이 Listener는 응답을 보낸다는 것을 나타내고 어떤 Topic으로 그 응답을 보낼지를 정의한다. 그리고 Return type은 Producer쪽으로 보내질 응답의 type을 정의해 준다.
Listener 설정은 크게 어렵지 않고 너무나 직관적이다.
설정 이후 메세지 처리에 집중하면 될 정도로 간단하게 되어 있다.
'Java > Spring Kafka' 카테고리의 다른 글
[Spring boot & Kafka] Kafka Unit test 예제 (0) | 2022.02.16 |
---|---|
[Spring boot & Kafka] Kafka Producer Service 예제 (0) | 2022.02.09 |
[Spring boot] Kafka 설정하기 (0) | 2022.02.04 |