728x90
반응형

2022.02.09 - [Java/Spring boot] - [Spring boot & Kafka] Kafka Producer Service 예제

 

[Spring boot & Kafka] Kafka Producer Service 예제

2022.02.04 - [Java/Spring boot] - [Spring boot] Kafka 설정하기 [Spring boot] Kafka 설정하기 나의 프로젝트에서 가장 복잡한 부분을 차지하게 될 메세지 브로커의 설정에 대해서 알아보도록 하겠다. kafka가..

corono.tistory.com

 

이번엔 Kafka Consumer Service를 작성해 보겠다. 위의 포스트에서 사용한 KafkaTempalate.send() 와 ReplykafkaTemplate.sendAndReceive()의 대응되는 부분을 작성해보겠다. 

 

Kafka Consumer는 KafkaListener를 등록하고 Topic의 partition에 message가 오기를 기다린다. Message가 Topic 의 Partition으로 전달이 되면 Listener의 event가 전달되어 Listener로 등록된 함수를 실행하게 된다. 

 

Consumer의 설정은 앞쪽 Kafka Configuration 포스트에서 작성되어 있다. 

2022.02.04 - [Java/Spring boot] - [Spring boot] Kafka 설정하기

 

[Spring boot] Kafka 설정하기

나의 프로젝트에서 가장 복잡한 부분을 차지하게 될 메세지 브로커의 설정에 대해서 알아보도록 하겠다. kafka가 어떤 것인지는 아래 포스트에서 간략하게 확인할 수 있다. 2022.01.09 - [Infrastructure/

corono.tistory.com

 

Consumer 가 있는 Service에서는 Listener 함수부분만 설명하도록 하겠다. 

 

Consumer Service code

Kafka Consumer를 위한 Listener는 @kafkaListener annotation으로 등록한다. 

@KafkaListenr annotation으로 등록되는 Listner에는 몇가지 정보가 등록된다. 이 Listener가 수행되게 되는 Topic을 지정해주고, GroupID, ContainerFactory를 지정해준다. GroupId는 Topic을 Consume하는 Consumer들의 모임이다. 아래의 그림을 보면 이해가 빠를 것이다. 

출처 : https://lankydan.dev/intro-to-kafka-consumer-groups

Topic에 여러 Consumer Group이 붙을 수 있고 이 Group 모두 Message를 다 받을 수 있다. 만약 Group에 Consumer가 추가되게 되면 Rebalancing을 통하 모든 Consumer에게 메세지가 전달될 수 있게 한다. 

 

ContainerFactory를 설정파일에서 Bean으로 등록한 ListenerContainerFactory를 넣어준다. 

public class JwtTokenMessageServiceImpl implements JwtTokenMessageService {
    private final JwtTokenService jwtTokenService;
    private final ObjectMapper objectMapper;
    private final ModelMapper modelMapper;

    @Override
    @KafkaListener(
            topics = JwtTokenTopic.TOPIC_CREATE_JWT_TOKEN,
            groupId = "${kafka.consumer.groupId}",
            containerFactory = "kafkaJwtTokenListenerContainerFactory")
    public void listenCreateJwtTokenCache(ConsumerRecord<String, String> record, Acknowledgment ack) throws JsonProcessingException {
        LOGGER.debug("received message from '{}' : {}", record.topic(), record.value());

        JwtTokenMessage.CreateMessage createMessage = objectMapper.readValue(record.value(), JwtTokenMessage.CreateMessage.class);
        JwtToken jwtToken = jwtTokenService.create(modelMapper.map(createMessage, JwtToken.class));

        LOGGER.debug("created JwtToken Cache '{}' : {}", jwtToken.getId(), jwtToken.getJwtToken());

        ack.acknowledge();
    }

    @Override
    @KafkaListener(
            topics = JwtTokenTopic.TOPIC_REQUEST_JWT_TOKEN,
            groupId = "${kafka.consumer.groupId}",
            containerFactory = "kafkaJwtTokenListenerContainerFactory")
    @SendTo(JwtTokenTopic.TOPIC_REPLY_JWT_TOKEN)
    public JwtTokenMessage.JwtTokenInfo listenRetrieveJwtTokenCache(ConsumerRecord<String, String> record, Acknowledgment ack) throws JsonProcessingException {
        LOGGER.debug("received message from '{}' : {}", record.topic(), record.value());

        JwtTokenMessage.RequestMessage retrieveMessage = objectMapper.readValue(record.value(), JwtTokenMessage.RequestMessage.class);
        JwtToken jwtToken = jwtTokenService.getToken(retrieveMessage.getAccountId());
        ack.acknowledge();

        LOGGER.debug("requested JwtToken Cache '{}' : {}", jwtToken.getId(), jwtToken.getJwtToken());

        return JwtTokenMessage.JwtTokenInfo.from(jwtToken, modelMapper);
    }
 }

 

첫번째 listenCreateJwtTokenCache() 함수는 kafkaTemplate.send()에 대한 동작을 수행한다. 메세지가 들어올 경우 메세지를 미리 정의된 Object로 mapping 해준 후 Redis에 JwtToken 값을 생성한다. 그리고 ack.acknowlege() 함수로 Producer에게 메세지를 받았다는 응답을 전달한다.  ack.acknowlege() 함수는 Auto-Commit 설정을 false로 하였을 경우 명시적으로 호출해 주어야 한다.

 

두번째 listenRetrieveJwtTokenCache() 함수는 ReplyingKafkaTemplate.sendAndReceive() 에 대한 응답이다. 첫번째 Listener와 다른 점은 @SendTo annotation이 붙어 있고 Return type이 void가 아닌 것이다. @SendTo annotation은 이 Listener는 응답을 보낸다는 것을 나타내고 어떤 Topic으로 그 응답을 보낼지를 정의한다. 그리고 Return type은 Producer쪽으로 보내질 응답의 type을 정의해 준다. 

 

Listener 설정은 크게 어렵지 않고 너무나 직관적이다. 

 

설정 이후 메세지 처리에 집중하면 될 정도로 간단하게 되어 있다. 

728x90
반응형

+ Recent posts