나의 프로젝트에서 가장 복잡한 부분을 차지하게 될 메세지 브로커의 설정에 대해서 알아보도록 하겠다.
kafka가 어떤 것인지는 아래 포스트에서 간략하게 확인할 수 있다.
2022.01.09 - [Infrastructure/Kafka] - kafka란 무엇인가?
현재 개발된 서비스간의 메세지를 주고 받을 수 있도록 설정할 예정이다.
여기 포스트에서는 간략한 시나리오를 바탕으로 필요한 설정들을 소개할 예정이다.
<시나리오 1>
spring-micro-auth project에서 Jwt token을 redis에 저장하기 위한 message 를 생성하고 JwtToken Cache Creation Topic으로 send (publish)를 하면 spring-micro-session project에서 해당 Topic을 Subscription 하여 처리한다.
<시나리오 2>
spring-micro-auth project에서 JwtToken을 확인하기 위하여 redis에 저장된 정보를 요청한다. JwtToken Cache Request Topic으로 messge를 send하면 spring-micro-session project에서 해당 Topic을 Subscription 하고 있다가 JwtToken 정보를 응답으로 전달한다.
시나리오 1과 2의 다른 점은 1의 경우 메세지를 전송하고 Session에서 처리하도록 둔다. 즉 결과를 확인하지 않는다. (asynchronous)
시나리오 2의 경우 Session의 응답을 기다린다. (synchronous)
Producer Configuration
Producer를 설정하기 위해서는 Producer의 Property들을 설정하고 그 Property로 ProducerFacotry를 만든 후 KafkaTemplate를 최종적으로 생성하면 된다. KafkaTemplate을 만들 때에는 주고 받는 메세지의 Topic과 Data의 자료형을 기준으로 생성한다.
@Configuration
public class KafkaConfig {
private final String bootStrapAddresses;
private final String groupId;
public KafkaJwtTokenConfig(
@Value("${kafka.bootstrap.addresses}")
String bootStrapAddresses,
@Value("${kafka.consumer.groupId}")
String groupId) {
this.bootStrapAddresses = bootStrapAddresses;
this.groupId = groupId;
}
private Map<String, Object> produceProperties() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapAddresses);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, Object> jwtTokenProducerFactory() {
return new DefaultKafkaProducerFactory<>(produceProperties());
}
@Bean
public KafkaTemplate<String, Object> kafkaJwtTokenTemplate() {
return new KafkaTemplate<>(jwtTokenProducerFactory());
}
}
나의 프로젝트에서는 String type의 Topic과 Object Type의 data를 사용하였다.
Consumer Configuration (with ReplyingKafkaTempate 설정)
위의 시나리오 2에서 Message를 보낸 후 응답을 기다리는 경우에는 다른 KafkaTemplate을 사용하여야 하는데 그 것이 ReplyingKafkaTemplate이다. 이 Template의 경우 ConcurrentMessageListenerContainer를 사용하여 선언해준다.
(보통의 경우에는 아래의 jwtTokenConsumerFactory() 만 설정하면 Consumer로 등록 후 Message 처리가 가능하다. )
@Configuration
public class KafkaConfig {
...
private Map<String, Object> consumeProperties() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapAddresses);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public ConsumerFactory<String, String> jwtTokenConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumeProperties());
}
@Bean
public ReplyingKafkaTemplate<String, Object, String> jwtTokenReplyingKafkaTemplate(
@Qualifier("jwtTokenProducerFactory") ProducerFactory<String, Object> pf,
@Qualifier("jwtTokenReplyContainer") KafkaMessageListenerContainer<String, String> container) {
return new ReplyingKafkaTemplate<>(pf, container);
}
@Bean
public KafkaMessageListenerContainer<String, String> jwtTokenReplyContainer(
@Qualifier("jwtTokenConsumerFactory") ConsumerFactory<String, String> cf
) {
ContainerProperties containerProperties = new ContainerProperties(JwtTokenTopic.TOPIC_REPLY_JWT_TOKEN);
return new KafkaMessageListenerContainer<>(cf, containerProperties);
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> jwtTokenKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(jwtTokenConsumerFactory());
factory.setReplyTemplate(kafkaJwtTokenTemplate());
return factory;
}
}
ReplyingKafkaTemplate 사용을 위해서는 응답으로 대한 Consumer config가 추가되어야 한다.
jwtTokenReplayingKafkaTemplate() 함수를 보면 Producer와 Comsumer를 Linsten하고 있는 KafkaMessageListenrContainer를 함께 인자로 받아서 ReplyingkafkaTemplate을 생성한다.
이렇게 생성된 Template으로 메세지를 주고 받을 경우에는 내부적으로 Correlation ID를 생성하여 요청한 Publisher에게 응답이 잘 전달될 수 있도록 한다.
그리고 응답을 받기위해 중요한 설정 중에 하나는 jwtTokenkafkaListenerContainerFactory() 함수에 있는 setReplayTemplate()이다.
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> jwtTokenKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(jwtTokenConsumerFactory());
factory.setReplyTemplate(kafkaJwtTokenTemplate()); // 이 셋팅이 있어야 응답으로 오는 ConsumerRecord를 처리할 수 있음
return factory;
}
오늘은 kafka configuration에 대해서 간략하게 적어
'Java > Spring Kafka' 카테고리의 다른 글
[Spring boot & Kafka] Kafka Unit test 예제 (0) | 2022.02.16 |
---|---|
[Spring boot & kafka] Kafka Consumer Service 예제 (0) | 2022.02.10 |
[Spring boot & Kafka] Kafka Producer Service 예제 (0) | 2022.02.09 |