2022.02.04 - [Java/Spring boot] - [Spring boot] Kafka 설정하기
Kafka 의 설정을 마무리하고 나면 이제 Kafka로 Message를 보낼 수 있게 된다. 이 포스트에서는 Kafka Topic으로 Message를 Publish 하는 예제를 공유하고자 한다.
Kafka에서 메세지를 생산하고 publish 하는 주체를 Producer라고 한다. 나의 Project에서 auth service는 JwtToken을 만들고 이 값을 redis에 저장하고 token을 확인할 때마다 redis에서 값을 가져와서 비교하고자 한다.
Kafka 설정시에 2가지의 시나리오가 있어서 KafkaTemplate과 ReplyingKafkaTemplate을 설정하였다. 이 2가지에 대한 Message Publish를 알아보도록 하자.
KafkaTempate & ReplyingKafkaTemplate
KafkaTemplate 을 이용해서 메세지를 보낼 때 primitive한 type으로 보낼 수 있지만, 각 서비스에서 데이터를 관리하기 위해서는 보통 Obejct로 많이 보내게 된다. 특정 Object 형태로 KafkaTemplate을 설정해서 사용할 수 있지만 메세지의 종류가 많아질 경우 이 설정 또한 방대해져서 관리가 힘들 수도 있다.
그래서 난 범용 Object type을 Json을 Serialilze를 하여 Consumer 측에서 해당 Message로 Object Mapping을 하여 사용할 수 있도록 하였다. 이렇게 할 경우 공용 설정을 통해서 설정 부분의 코드를 줄일 수 있다. 하지만, Cosumer쪽에서 Message에 따라 Casting을 해줘야 하는 불편함은 존재한다. 선택의 문제니 필요에 따라 사용하면 될 것 같다.
예제를 위해서는 몇가지 파일이 필요하다.
1. Topic : 사용하는 Topic 값을 정의
(이 Topic의 경우 사용하는 Producer와 Consumer가 함께 가지고 있어야 하는데 어떻게 하면 잘 관리할 수 있을지 고민중이다. )
public class JwtTokenTopic {
public static final String TOPIC_CREATE_JWT_TOKEN = "topic.create.jwt.token";
public static final String TOPIC_REQUEST_JWT_TOKEN = "topic.request.jwt.token";
public static final String TOPIC_REPLY_JWT_TOKEN = "topic.reply.jwt.token";
}
2. Message Object : 메세지로 사용되는 Object들이다, Publish에 사용되는 2가지 간단한 메세지와 응답으로 받는 값을 Mapping 해줄 Object를 정의한다.
@Getter
@Setter
@Accessors(chain = true)
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Schema(description = "JwtToken Cache Creation Message")
public static class CreateMessage {
@Schema(example = "1L")
private Long accountId;
@Schema(example = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.....")
private String jwtToken;
@Schema(example = "10L")
private Long expiration = -1L;
public static CreateMessage from(Account account, Date expireDate) {
Long expiration = -1L;
if (expireDate != null) {
expiration = DateTimeUtils.getSecondsBetweenDates(new Date(), expireDate);
}
return new CreateMessage()
.setAccountId(account.getId())
.setJwtToken(account.getJwtToken())
.setExpiration(expiration);
}
}
@Getter
@Setter
@Accessors(chain = true)
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Schema(description = "JwtToken Cache Request Message")
public static class RequestMessage {
@Schema(example = "1L")
private Long accountId;
}
@Getter
@Setter
@Accessors(chain = true)
@AllArgsConstructor
@NoArgsConstructor
@ToString
@JsonInclude(NON_NULL)
@Schema(description = "JwtToken Cache Info")
public static class JwtTokenInfo {
@Schema(example = "ff6681f0-50f8-4110-bf96-ef6cec45780e")
private String id;
@Schema(example = "1L")
private Long accountId;
@Schema(example = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.....")
private String jwtToken;
@Schema(example = "2021-01-01T00:00:00")
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss")
private LocalDateTime updatedAt;
}
3. Callback 함수 파일 : 이 파일은 Service layer에서 Message을 보내고 난 후 ack를 확인하기 위한 함수를 정의해 두었다.
@Slf4j
public class JwtTokenFutureCallback implements ListenableFutureCallback<SendResult<String, Object>> {
private final Object message;
public JwtTokenFutureCallback(Object message) {
this.message = message;
}
@Override
public void onFailure(Throwable ex) {
LOGGER.debug("Unable to send message=[{}] due to : {}", message, ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> result) {
LOGGER.debug("Sent message=[{}] with offset=[{}]", message, result.getRecordMetadata().offset());
}
}
이제 중요한 Service layer의 코드이다.
@Slf4j
@RequiredArgsConstructor
@Service
public class JwtTokenMessageServiceImpl implements JwtTokenMessageService {
private final KafkaTemplate<String, Object> kafkaJwtTokenTemplate;
private final ReplyingKafkaTemplate<String, Object, String> jwtTokenReplyingKafkaTemplate;
private final ObjectMapper objectMapper;
private final JwtTokenProvider jwtTokenProvider;
@Override
public void creteJwtTokenCache(Account account) {
Date expireDate = jwtTokenProvider.getExpiredDate(account.getJwtToken());
JwtTokenMessage.CreateMessage createMessage = JwtTokenMessage.CreateMessage.from(account, expireDate);
LOGGER.debug("topic = {}, payload = {}", JwtTokenTopic.TOPIC_CREATE_JWT_TOKEN, createMessage);
ListenableFuture<SendResult<String, Object>> listenableFuture = kafkaJwtTokenTemplate.send(JwtTokenTopic.TOPIC_CREATE_JWT_TOKEN, createMessage);
listenableFuture.addCallback(new JwtTokenFutureCallback(createMessage));
}
@Override
public JwtTokenMessage.JwtTokenInfo getJwtTokenCache(JwtTokenMessage.RequestMessage message) throws ExecutionException, InterruptedException, TimeoutException, JsonProcessingException {
LOGGER.debug("topic = {}, payload = {}", JwtTokenTopic.TOPIC_REQUEST_JWT_TOKEN, message);
ProducerRecord<String, Object> record = new ProducerRecord<>(JwtTokenTopic.TOPIC_REQUEST_JWT_TOKEN, message);
RequestReplyFuture<String, Object, String> replyFuture = jwtTokenReplyingKafkaTemplate.sendAndReceive(record);
SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
JwtTokenMessage.JwtTokenInfo jwtTokenInfo = objectMapper.readValue(consumerRecord.value(), JwtTokenMessage.JwtTokenInfo.class);
LOGGER.debug("JwtTokenInfo : {}", jwtTokenInfo);
return jwtTokenInfo;
}
}
우선 KafkaTemplate의 경우 Dependency Injection을 통해 Configuration에서 등록된 Bean이 매핑되고, 이 template을 통해
Send() 함수를 호출해서 메세지를 보내준다. 필요한 부분은 Topic과 메세지를 넣는 부분이다. 메세지는 각 메세지의 Object이고, Object를 넣어서 보내면 Configuration에서 설정해 둔 Serializer가 Json String 형태로 변경하여 보내게 된다.
ListenableFuture<SendResult<String, Object>> listenableFuture = kafkaJwtTokenTemplate.send(JwtTokenTopic.TOPIC_CREATE_JWT_TOKEN, createMessage);
그리고 ReplayingKafkaTemplate으로 메세지를 보낼 경우에는 추가적인 작업이 필요하다.
1. ProducerRecord를 생성하여 Topic과 Message 정보를 추가한다.
ProducerRecord<String, Object> record = new ProducerRecord<>(JwtTokenTopic.TOPIC_REQUEST_JWT_TOKEN, message);
2. ReplyingKafkaTemplate의 sendAndReceive() 함수를 통해서 생성한 ProducerRecord를 보낸다.
RequestReplyFuture<String, Object, String> replyFuture = jwtTokenReplyingKafkaTemplate.sendAndReceive(record);
3. 보낸 후에는 Async하게 응답을 기다리도록 ReplyFuture 를 리턴받아서 응답을 확인한다.
SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
4. 응답으로 받는 데이터는 보내는 형태와 마찬가지로 Object를 Json으로 변경한 값이므로 약속된 형태로 변경하여 사용한다.
JwtTokenMessage.JwtTokenInfo jwtTokenInfo = objectMapper.readValue(consumerRecord.value(), JwtTokenMessage.JwtTokenInfo.class);
처음에 이 부분을 이해하고 코드를 추가하고 테스트를 하는데 어려움을 겪었다. 양쪽 서비스에 구현을 한 후 테스트를 하면서 문제 부분을 확인 후에 수정을 하다보니 시간이 많이 걸렸다. 처음부터 Embedded kafka를 설정해서 Unit test 만들면서 했으면 더 쉬웠을 것 같다. 하지만, Embedded kafka를 적용한 것도 시간이 걸렸으니 쌤쌤인가? ㅋㅋ
다음 포스트에서는 이 메세지들을 받아서 처리하는 부분을 공유해보고자 한다.
'Java > Spring Kafka' 카테고리의 다른 글
[Spring boot & Kafka] Kafka Unit test 예제 (0) | 2022.02.16 |
---|---|
[Spring boot & kafka] Kafka Consumer Service 예제 (0) | 2022.02.10 |
[Spring boot] Kafka 설정하기 (0) | 2022.02.04 |