728x90
반응형

2022.02.04 - [Java/Spring boot] - [Spring boot] Kafka 설정하기

 

[Spring boot] Kafka 설정하기

나의 프로젝트에서 가장 복잡한 부분을 차지하게 될 메세지 브로커의 설정에 대해서 알아보도록 하겠다. kafka가 어떤 것인지는 아래 포스트에서 간략하게 확인할 수 있다. 2022.01.09 - [Infrastructure/

corono.tistory.com

 

Kafka 의 설정을 마무리하고 나면 이제 Kafka로 Message를 보낼 수 있게 된다. 이 포스트에서는 Kafka Topic으로 Message를 Publish 하는 예제를 공유하고자 한다. 

 

Kafka에서 메세지를 생산하고 publish 하는 주체를 Producer라고 한다. 나의 Project에서 auth service는 JwtToken을 만들고 이 값을 redis에 저장하고 token을 확인할 때마다 redis에서 값을 가져와서 비교하고자 한다.

 

Kafka 설정시에 2가지의 시나리오가 있어서 KafkaTemplate과 ReplyingKafkaTemplate을 설정하였다. 이 2가지에 대한 Message Publish를 알아보도록 하자. 

 

KafkaTempate & ReplyingKafkaTemplate

KafkaTemplate 을 이용해서 메세지를 보낼 때 primitive한 type으로 보낼 수 있지만, 각 서비스에서 데이터를 관리하기 위해서는 보통 Obejct로 많이 보내게 된다. 특정 Object 형태로 KafkaTemplate을 설정해서 사용할 수 있지만 메세지의 종류가 많아질 경우 이 설정 또한 방대해져서 관리가 힘들 수도 있다. 

그래서 난 범용 Object type을 Json을 Serialilze를 하여 Consumer 측에서 해당 Message로 Object Mapping을 하여 사용할 수 있도록 하였다. 이렇게 할 경우 공용 설정을 통해서 설정 부분의 코드를 줄일 수 있다. 하지만, Cosumer쪽에서 Message에 따라 Casting을 해줘야 하는 불편함은 존재한다. 선택의 문제니 필요에 따라 사용하면 될 것 같다. 

 

예제를 위해서는 몇가지 파일이 필요하다. 

1. Topic : 사용하는 Topic 값을 정의

    (이 Topic의 경우 사용하는 Producer와 Consumer가 함께 가지고 있어야 하는데 어떻게 하면 잘 관리할 수 있을지 고민중이다. )

public class JwtTokenTopic {
    public static final String TOPIC_CREATE_JWT_TOKEN = "topic.create.jwt.token";
    public static final String TOPIC_REQUEST_JWT_TOKEN = "topic.request.jwt.token";
    public static final String TOPIC_REPLY_JWT_TOKEN = "topic.reply.jwt.token";
}

2. Message Object : 메세지로 사용되는 Object들이다, Publish에 사용되는 2가지 간단한 메세지와 응답으로 받는 값을 Mapping 해줄 Object를 정의한다.

@Getter
@Setter
@Accessors(chain = true)
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Schema(description = "JwtToken Cache Creation Message")
public static class CreateMessage {
    @Schema(example = "1L")
    private Long accountId;

    @Schema(example = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.....")
    private String jwtToken;

    @Schema(example = "10L")
    private Long expiration = -1L;

    public static CreateMessage from(Account account, Date expireDate) {
        Long expiration = -1L;

        if (expireDate != null) {
            expiration = DateTimeUtils.getSecondsBetweenDates(new Date(), expireDate);
        }

        return new CreateMessage()
                .setAccountId(account.getId())
                .setJwtToken(account.getJwtToken())
                .setExpiration(expiration);
    }
}

@Getter
@Setter
@Accessors(chain = true)
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Schema(description = "JwtToken Cache Request Message")
public static class RequestMessage {
    @Schema(example = "1L")
    private Long accountId;
}

@Getter
@Setter
@Accessors(chain = true)
@AllArgsConstructor
@NoArgsConstructor
@ToString
@JsonInclude(NON_NULL)
@Schema(description = "JwtToken Cache Info")
public static class JwtTokenInfo {
    @Schema(example = "ff6681f0-50f8-4110-bf96-ef6cec45780e")
    private String id;

    @Schema(example = "1L")
    private Long accountId;

    @Schema(example = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.....")
    private String jwtToken;

    @Schema(example = "2021-01-01T00:00:00")
    @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss")
    private LocalDateTime updatedAt;
}

3. Callback 함수 파일 : 이 파일은 Service layer에서 Message을 보내고 난 후 ack를 확인하기 위한 함수를 정의해 두었다.

@Slf4j
public class JwtTokenFutureCallback implements ListenableFutureCallback<SendResult<String, Object>> {
    private final Object message;

    public JwtTokenFutureCallback(Object message) {
        this.message = message;
    }

    @Override
    public void onFailure(Throwable ex) {
        LOGGER.debug("Unable to send message=[{}] due to : {}", message, ex.getMessage());
    }

    @Override
    public void onSuccess(SendResult<String, Object> result) {
        LOGGER.debug("Sent message=[{}] with offset=[{}]", message, result.getRecordMetadata().offset());
    }
}

 

이제 중요한 Service layer의 코드이다. 

@Slf4j
@RequiredArgsConstructor
@Service
public class JwtTokenMessageServiceImpl implements JwtTokenMessageService {
    private final KafkaTemplate<String, Object> kafkaJwtTokenTemplate;
    private final ReplyingKafkaTemplate<String, Object, String> jwtTokenReplyingKafkaTemplate;
    private final ObjectMapper objectMapper;
    private final JwtTokenProvider jwtTokenProvider;

    @Override
    public void creteJwtTokenCache(Account account) {
        Date expireDate = jwtTokenProvider.getExpiredDate(account.getJwtToken());

        JwtTokenMessage.CreateMessage createMessage = JwtTokenMessage.CreateMessage.from(account, expireDate);

        LOGGER.debug("topic = {}, payload = {}", JwtTokenTopic.TOPIC_CREATE_JWT_TOKEN, createMessage);

        ListenableFuture<SendResult<String, Object>> listenableFuture = kafkaJwtTokenTemplate.send(JwtTokenTopic.TOPIC_CREATE_JWT_TOKEN, createMessage);

        listenableFuture.addCallback(new JwtTokenFutureCallback(createMessage));
    }

    @Override
    public JwtTokenMessage.JwtTokenInfo getJwtTokenCache(JwtTokenMessage.RequestMessage message) throws ExecutionException, InterruptedException, TimeoutException, JsonProcessingException {
        LOGGER.debug("topic = {}, payload = {}", JwtTokenTopic.TOPIC_REQUEST_JWT_TOKEN, message);

        ProducerRecord<String, Object> record = new ProducerRecord<>(JwtTokenTopic.TOPIC_REQUEST_JWT_TOKEN, message);
        RequestReplyFuture<String, Object, String> replyFuture = jwtTokenReplyingKafkaTemplate.sendAndReceive(record);

        SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
        ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);

        JwtTokenMessage.JwtTokenInfo jwtTokenInfo = objectMapper.readValue(consumerRecord.value(), JwtTokenMessage.JwtTokenInfo.class);

        LOGGER.debug("JwtTokenInfo : {}", jwtTokenInfo);
        return jwtTokenInfo;
    }
}

 

우선 KafkaTemplate의 경우 Dependency Injection을 통해 Configuration에서 등록된 Bean이 매핑되고, 이 template을 통해 

Send() 함수를 호출해서 메세지를 보내준다. 필요한 부분은 Topic과 메세지를 넣는 부분이다. 메세지는 각 메세지의 Object이고, Object를 넣어서 보내면 Configuration에서 설정해 둔 Serializer가 Json String 형태로 변경하여 보내게 된다. 

ListenableFuture<SendResult<String, Object>> listenableFuture = kafkaJwtTokenTemplate.send(JwtTokenTopic.TOPIC_CREATE_JWT_TOKEN, createMessage);

 

그리고 ReplayingKafkaTemplate으로 메세지를 보낼 경우에는 추가적인 작업이 필요하다. 

1. ProducerRecord를 생성하여 Topic과 Message 정보를 추가한다. 

ProducerRecord<String, Object> record = new ProducerRecord<>(JwtTokenTopic.TOPIC_REQUEST_JWT_TOKEN, message);

2. ReplyingKafkaTemplate의 sendAndReceive() 함수를 통해서 생성한 ProducerRecord를 보낸다. 

RequestReplyFuture<String, Object, String> replyFuture = jwtTokenReplyingKafkaTemplate.sendAndReceive(record);

3. 보낸 후에는 Async하게 응답을 기다리도록 ReplyFuture 를 리턴받아서 응답을 확인한다. 

SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);

4. 응답으로 받는 데이터는 보내는 형태와 마찬가지로 Object를 Json으로 변경한 값이므로 약속된 형태로 변경하여 사용한다. 

JwtTokenMessage.JwtTokenInfo jwtTokenInfo = objectMapper.readValue(consumerRecord.value(), JwtTokenMessage.JwtTokenInfo.class);

 

처음에 이 부분을 이해하고 코드를 추가하고 테스트를 하는데 어려움을 겪었다. 양쪽 서비스에 구현을 한 후 테스트를 하면서 문제 부분을 확인 후에 수정을 하다보니 시간이 많이 걸렸다. 처음부터 Embedded kafka를 설정해서 Unit test 만들면서 했으면 더 쉬웠을 것 같다. 하지만, Embedded kafka를 적용한 것도 시간이 걸렸으니 쌤쌤인가? ㅋㅋ 

 

다음 포스트에서는 이 메세지들을 받아서 처리하는 부분을 공유해보고자 한다. 

 

728x90
반응형

+ Recent posts