728x90
반응형

Kafka에 대한 설정 및 Producer와 Consumer의 예제에 대해서 알아보았다. 그럼 이 코드들이 잘 동작하는지는 어떻게 확인해야할까?

실제 서비스들 간의 메세지 전송 및 결과를 확인하기 위해서는 테스트 환경을 구축하여 서비스를 띄운 후에 테스트 해보는 것이 가장 정확하다. Docker compose로 Kafka broker와 Zookeeper를 띄우고 Bootstarp address에 Kafka broker 주소를 넣어준 뒤 서비스를 띄워서 테스트를 하면된다. 

 

하지만, 이러한 환경은 귀찮은 면이 있다. 항상 테스트를 위해서 서비스를 띄워두는 것은 리소스 낭비이다. 그래서 Kafka의 메세지 전송을 테스트 할 수 있는 환경이 필요하다. Spring boot 환경에서는 이를 테스트 할 수 있는 라이브러리를 제공하고 있다. 

아래의 라이브러리가 그것이며, 이름은 Spring kafka test library 이다. 

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <scope>test</scope>
</dependency>

 

위의 Configuration을 추가하면 Embedded kafka 를 사용할 수 있게 되고, 이 환경을 통해서 Message 전송을 임의로 만들 수 있게 된다. 

단, Container 환경에서 Embedded kafka로 테스트를 진행하면 정상적으로 진행되지 않다. 
현재, 나의 환경이 Jenkins를 Kubernetes 내부에 띄우고 Pod agent로 Docker Image build 및 Release Job을 수행하고 있고, 이때 Unit test를 수행하여 코드의 안정성을 확인하고 있다. 하지만, Embedded kafka관련 Test는 수행하고 못하고 있다. 

 

Test 예제

application-test.yaml

Embedeed kafka를 사용하기 위해서는 Embedded kafka에서 띄우는 서버를 사용하여야 한다. 

# kafka
kafka:
  bootstrap:
    addresses: ${spring.embedded.kafka.brokers:localhost:9092}

 

Essential Configuration (JUnit5)

Embedded kafka를 사용하기 위해서는 필수적으로 넣어주어야 하는 Configuration 이 있다. 

우선 @EmbeddedKafka annotation으로 Embedded Kafka를 추가해 준다. 그리고 이 Embedded kafka가 동작하기 위해서는 Integration Test 환경으로 수행되어야 한다. 그 말은 Spring 의 Dependency가 사용되는 환경에서 수행되어야 하기 때문에 @SpringbootTest annotation이 추가되어야 하고, 그와 더불어 @ExtendWith(SpringExtension.class)가 추가되어야 정상적인 동작이 된다. 이러한 셋팅은 JUnit5 기반으로 테스트 될때이며 JUnit4는 그 방법이 다르다. 

@Slf4j
@Tag("embedded-kafka-test")
@SpringBootTest
@ExtendWith(SpringExtension.class)
@EmbeddedKafka
@AutoConfigureMockMvc
@TestPropertySource(properties = "spring.config.location=classpath:application-test.yaml")
public class JwtTokenMessageServiceImplTest {
...
}

 

Kafka Broker Configuration

Embedded Kafka의 Broker를 셋팅해주는 부분이다. @Autowired annotation으로 Dependency Injection을 해주면 된다. 

Broker count는 2개, Partition은 2개로 설정해 주었다. 마지막은 설정과 동시에 생성할 Topic인데, Topic이 여러 개 일 때에는 뒤쪽으로 ,(콤마)로 구분하여 계속 넣어주면 된다. 

Intellij를 사용한다면 이 부분에서 @Autowired 로 Dependency Injection시 Dependecy를 찾을 수 없다는 Inspection Message를 받을 수 있다. 이 부부은 Intellij의 문제로 난 이 Inspection의 level을 warning으로 바꾸었다. 

2022.02.05 - [Tool/IntelliJ] - [Intellij] @Autowired inspection 오류 무시하는 방법

...
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(
        2,
        true,
        2,
        JwtTokenTopic.TOPIC_CREATE_JWT_TOKEN
);
...

 

Producer configuration

Producer configuration은 간단하다. Produecer Property는 Default 셋팅으로 설정하였다. 그리고 Key Serializer는 String, Value Serialilzer는 Json으로 설정하였다. 이 부분의 서비스에서 사용하는 설정과 동일하게 하면 된다. 

private Producer<String, Object> configEmbeddedKafkaProducer() {
    Map<String, Object> producerProperties = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
    return new DefaultKafkaProducerFactory<>(producerProperties, new StringSerializer(), new JsonSerializer<>()).createProducer();
}

 

Consumer Configuration

Consumer의 설정은 복잡하다. 왜냐하면 Kafka의 Consumer의 설정에서 보았듯이 KafkaLinstener의 설정이 추가되어야 하기 때문이다. Embedded Kafka도 마찬가지로 Produer가 보내는 메세지가 Topic에 들어오면 Listener가 해당 메세지를 처리하도록 해야하기 때문이다. 

 

1) 우선 Consumer Property 셋팅을 한다. 기본 셋팅을 가지고 가되, Auto commit만 false로 셋팅하였다. 나의 코드에서 Commit을 수동을 하고 있다. 

2) ConsumerFactory를 만들고, Container Property를 topic과 함께 생성하고, KafkaMessageListenerContainer를 만든다. 

3) CusumerRecord를 순차적으로 저장시키고, 순차적으로 처리하기 위해, BlockingQueue로 선언한다. 

4) 이후, 생성된 KafkaMessageListenerContainer에 Listener 함수를 추가한다. 이 함수에서는 Log로 Message를 출력하고, BlockQueue에 record를 추가한다. 이 record를 필요한 시점에 뽑아서 확인한다.

private KafkaMessageListenerContainer<String, String> container;

private BlockingQueue<ConsumerRecord<String, String>> records;

private void configEmbeddedKafkaConsumer(String topic) {
    Map<String, Object> consumerProperties = new HashMap<>(KafkaTestUtils.consumerProps("auth", "false", embeddedKafkaBroker));

    DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties);

    ContainerProperties containerProperties = new ContainerProperties(topic);
    container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);

    records = new LinkedBlockingDeque<>();

    container.setupMessageListener((MessageListener<String, String>) record -> {
                LOGGER.debug("test-listener received message='{}'",
                        record.value());
                records.add(record);
    });
    container.start();

    ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
}

 

전체 Configuration 부분 코드

@Slf4j
@SpringBootTest
@ExtendWith(SpringExtension.class)
@EmbeddedKafka
@AutoConfigureMockMvc
@TestPropertySource(properties = "spring.config.location=classpath:application-test.yaml")
public class JwtTokenMessageServiceImplTest {
    @Autowired
    private JwtTokenMessageService jwtTokenMessageService;

    @MockBean
    private JwtTokenProvider jwtTokenProvider;

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(
            2,
            true,
            2,
            JwtTokenTopic.TOPIC_CREATE_JWT_TOKEN
    );

    private KafkaMessageListenerContainer<String, String> container;

    private BlockingQueue<ConsumerRecord<String, String>> records;

    @BeforeEach
    void setUp() {}

    private void configEmbeddedKafkaConsumer(String topic) {
        Map<String, Object> consumerProperties = new HashMap<>(KafkaTestUtils.consumerProps("auth", "false", embeddedKafkaBroker));

        DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties);

        ContainerProperties containerProperties = new ContainerProperties(topic);
        container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);

        records = new LinkedBlockingDeque<>();

        container.setupMessageListener((MessageListener<String, String>) record -> {
                    LOGGER.debug("test-listener received message='{}'",
                            record.value());
                    records.add(record);
        });
        container.start();

        ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
    }

    private Producer<String, Object> configEmbeddedKafkaProducer() {
        Map<String, Object> producerProperties = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
        return new DefaultKafkaProducerFactory<>(producerProperties, new StringSerializer(), new JsonSerializer<>()).createProducer();
    }

    @AfterEach
    void tearDown() {
        container.stop();
    }
}

 

Producer Test Code예제

void testCreateMessageForJwtTokenCache() throws InterruptedException, JsonProcessingException {
	// Consumer 생성
    configEmbeddedKafkaConsumer(JwtTokenTopic.TOPIC_CREATE_JWT_TOKEN);
	
    Account account = TestAccountBuilder.accountWithToken();
	
    // 전송할 테스트 메세지 생성
    JwtTokenMessage.CreateMessage createMessage = JwtTokenMessage.CreateMessage.from(account, null);
    String expectedMessage = objectMapper.writeValueAsString(createMessage);

	// 확인하지 않을 함수에 대한 Mocking
    when(jwtTokenProvider.getExpiredDate(any())).thenReturn(null);

	// Embedded kafka로 Message 전송
    jwtTokenMessageService.creteJwtTokenCache(account);

	// BlockingQueue에 메세지를 가져옴. 10초 동안 메세지가 들어오지 않으면 Error 발생
    ConsumerRecord<String, String> record = records.poll(10, TimeUnit.SECONDS);
    
    
	// 수신한 메세지 확인
    assertThat(record).isNotNull();
    assertThat(record.value()).isEqualTo(expectedMessage);
    assertThat(record).has(key(null));
}

 

Consumer Test Code 예제

void testCreateMessageForJwtTokenCache() {
	// 전송할 메세지 생성
    JwtTokenMessage.CreateMessage createMessage = new JwtTokenMessage.CreateMessage();
    createMessage.setAccountId(1L);
    createMessage.setJwtToken("test-jwtToken");
    createMessage.setExpiration(120L);

	// 메세지를 사용하여 ProducerRecord를 생성한다.
    ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(JwtTokenTopic.TOPIC_CREATE_JWT_TOKEN, createMessage);
    // 메세지 전송
    producer.send(producerRecord);
    producer.flush();

	// Mocking
    when(jwtTokenService.create(any())).thenReturn(TestJwtTokenBuilder.defaultJwtToken());

	// Consumer가 메세지를 잘 받아서 Listener가 수행되는지를 확인 (Listener의 특정 Function이 수행되는지를 확인)
    verify(jwtTokenService, timeout(10000L).times(1)).create(any());
}

 

이 정도면 간단한 메세지 전송 여부는 확인이 가능하다. 

728x90
반응형
728x90
반응형

2022.02.09 - [Java/Spring boot] - [Spring boot & Kafka] Kafka Producer Service 예제

 

[Spring boot & Kafka] Kafka Producer Service 예제

2022.02.04 - [Java/Spring boot] - [Spring boot] Kafka 설정하기 [Spring boot] Kafka 설정하기 나의 프로젝트에서 가장 복잡한 부분을 차지하게 될 메세지 브로커의 설정에 대해서 알아보도록 하겠다. kafka가..

corono.tistory.com

 

이번엔 Kafka Consumer Service를 작성해 보겠다. 위의 포스트에서 사용한 KafkaTempalate.send() 와 ReplykafkaTemplate.sendAndReceive()의 대응되는 부분을 작성해보겠다. 

 

Kafka Consumer는 KafkaListener를 등록하고 Topic의 partition에 message가 오기를 기다린다. Message가 Topic 의 Partition으로 전달이 되면 Listener의 event가 전달되어 Listener로 등록된 함수를 실행하게 된다. 

 

Consumer의 설정은 앞쪽 Kafka Configuration 포스트에서 작성되어 있다. 

2022.02.04 - [Java/Spring boot] - [Spring boot] Kafka 설정하기

 

[Spring boot] Kafka 설정하기

나의 프로젝트에서 가장 복잡한 부분을 차지하게 될 메세지 브로커의 설정에 대해서 알아보도록 하겠다. kafka가 어떤 것인지는 아래 포스트에서 간략하게 확인할 수 있다. 2022.01.09 - [Infrastructure/

corono.tistory.com

 

Consumer 가 있는 Service에서는 Listener 함수부분만 설명하도록 하겠다. 

 

Consumer Service code

Kafka Consumer를 위한 Listener는 @kafkaListener annotation으로 등록한다. 

@KafkaListenr annotation으로 등록되는 Listner에는 몇가지 정보가 등록된다. 이 Listener가 수행되게 되는 Topic을 지정해주고, GroupID, ContainerFactory를 지정해준다. GroupId는 Topic을 Consume하는 Consumer들의 모임이다. 아래의 그림을 보면 이해가 빠를 것이다. 

출처 :&nbsp;https://lankydan.dev/intro-to-kafka-consumer-groups

Topic에 여러 Consumer Group이 붙을 수 있고 이 Group 모두 Message를 다 받을 수 있다. 만약 Group에 Consumer가 추가되게 되면 Rebalancing을 통하 모든 Consumer에게 메세지가 전달될 수 있게 한다. 

 

ContainerFactory를 설정파일에서 Bean으로 등록한 ListenerContainerFactory를 넣어준다. 

public class JwtTokenMessageServiceImpl implements JwtTokenMessageService {
    private final JwtTokenService jwtTokenService;
    private final ObjectMapper objectMapper;
    private final ModelMapper modelMapper;

    @Override
    @KafkaListener(
            topics = JwtTokenTopic.TOPIC_CREATE_JWT_TOKEN,
            groupId = "${kafka.consumer.groupId}",
            containerFactory = "kafkaJwtTokenListenerContainerFactory")
    public void listenCreateJwtTokenCache(ConsumerRecord<String, String> record, Acknowledgment ack) throws JsonProcessingException {
        LOGGER.debug("received message from '{}' : {}", record.topic(), record.value());

        JwtTokenMessage.CreateMessage createMessage = objectMapper.readValue(record.value(), JwtTokenMessage.CreateMessage.class);
        JwtToken jwtToken = jwtTokenService.create(modelMapper.map(createMessage, JwtToken.class));

        LOGGER.debug("created JwtToken Cache '{}' : {}", jwtToken.getId(), jwtToken.getJwtToken());

        ack.acknowledge();
    }

    @Override
    @KafkaListener(
            topics = JwtTokenTopic.TOPIC_REQUEST_JWT_TOKEN,
            groupId = "${kafka.consumer.groupId}",
            containerFactory = "kafkaJwtTokenListenerContainerFactory")
    @SendTo(JwtTokenTopic.TOPIC_REPLY_JWT_TOKEN)
    public JwtTokenMessage.JwtTokenInfo listenRetrieveJwtTokenCache(ConsumerRecord<String, String> record, Acknowledgment ack) throws JsonProcessingException {
        LOGGER.debug("received message from '{}' : {}", record.topic(), record.value());

        JwtTokenMessage.RequestMessage retrieveMessage = objectMapper.readValue(record.value(), JwtTokenMessage.RequestMessage.class);
        JwtToken jwtToken = jwtTokenService.getToken(retrieveMessage.getAccountId());
        ack.acknowledge();

        LOGGER.debug("requested JwtToken Cache '{}' : {}", jwtToken.getId(), jwtToken.getJwtToken());

        return JwtTokenMessage.JwtTokenInfo.from(jwtToken, modelMapper);
    }
 }

 

첫번째 listenCreateJwtTokenCache() 함수는 kafkaTemplate.send()에 대한 동작을 수행한다. 메세지가 들어올 경우 메세지를 미리 정의된 Object로 mapping 해준 후 Redis에 JwtToken 값을 생성한다. 그리고 ack.acknowlege() 함수로 Producer에게 메세지를 받았다는 응답을 전달한다.  ack.acknowlege() 함수는 Auto-Commit 설정을 false로 하였을 경우 명시적으로 호출해 주어야 한다.

 

두번째 listenRetrieveJwtTokenCache() 함수는 ReplyingKafkaTemplate.sendAndReceive() 에 대한 응답이다. 첫번째 Listener와 다른 점은 @SendTo annotation이 붙어 있고 Return type이 void가 아닌 것이다. @SendTo annotation은 이 Listener는 응답을 보낸다는 것을 나타내고 어떤 Topic으로 그 응답을 보낼지를 정의한다. 그리고 Return type은 Producer쪽으로 보내질 응답의 type을 정의해 준다. 

 

Listener 설정은 크게 어렵지 않고 너무나 직관적이다. 

 

설정 이후 메세지 처리에 집중하면 될 정도로 간단하게 되어 있다. 

728x90
반응형
728x90
반응형

2022.02.04 - [Java/Spring boot] - [Spring boot] Kafka 설정하기

 

[Spring boot] Kafka 설정하기

나의 프로젝트에서 가장 복잡한 부분을 차지하게 될 메세지 브로커의 설정에 대해서 알아보도록 하겠다. kafka가 어떤 것인지는 아래 포스트에서 간략하게 확인할 수 있다. 2022.01.09 - [Infrastructure/

corono.tistory.com

 

Kafka 의 설정을 마무리하고 나면 이제 Kafka로 Message를 보낼 수 있게 된다. 이 포스트에서는 Kafka Topic으로 Message를 Publish 하는 예제를 공유하고자 한다. 

 

Kafka에서 메세지를 생산하고 publish 하는 주체를 Producer라고 한다. 나의 Project에서 auth service는 JwtToken을 만들고 이 값을 redis에 저장하고 token을 확인할 때마다 redis에서 값을 가져와서 비교하고자 한다.

 

Kafka 설정시에 2가지의 시나리오가 있어서 KafkaTemplate과 ReplyingKafkaTemplate을 설정하였다. 이 2가지에 대한 Message Publish를 알아보도록 하자. 

 

KafkaTempate & ReplyingKafkaTemplate

KafkaTemplate 을 이용해서 메세지를 보낼 때 primitive한 type으로 보낼 수 있지만, 각 서비스에서 데이터를 관리하기 위해서는 보통 Obejct로 많이 보내게 된다. 특정 Object 형태로 KafkaTemplate을 설정해서 사용할 수 있지만 메세지의 종류가 많아질 경우 이 설정 또한 방대해져서 관리가 힘들 수도 있다. 

그래서 난 범용 Object type을 Json을 Serialilze를 하여 Consumer 측에서 해당 Message로 Object Mapping을 하여 사용할 수 있도록 하였다. 이렇게 할 경우 공용 설정을 통해서 설정 부분의 코드를 줄일 수 있다. 하지만, Cosumer쪽에서 Message에 따라 Casting을 해줘야 하는 불편함은 존재한다. 선택의 문제니 필요에 따라 사용하면 될 것 같다. 

 

예제를 위해서는 몇가지 파일이 필요하다. 

1. Topic : 사용하는 Topic 값을 정의

    (이 Topic의 경우 사용하는 Producer와 Consumer가 함께 가지고 있어야 하는데 어떻게 하면 잘 관리할 수 있을지 고민중이다. )

public class JwtTokenTopic {
    public static final String TOPIC_CREATE_JWT_TOKEN = "topic.create.jwt.token";
    public static final String TOPIC_REQUEST_JWT_TOKEN = "topic.request.jwt.token";
    public static final String TOPIC_REPLY_JWT_TOKEN = "topic.reply.jwt.token";
}

2. Message Object : 메세지로 사용되는 Object들이다, Publish에 사용되는 2가지 간단한 메세지와 응답으로 받는 값을 Mapping 해줄 Object를 정의한다.

@Getter
@Setter
@Accessors(chain = true)
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Schema(description = "JwtToken Cache Creation Message")
public static class CreateMessage {
    @Schema(example = "1L")
    private Long accountId;

    @Schema(example = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.....")
    private String jwtToken;

    @Schema(example = "10L")
    private Long expiration = -1L;

    public static CreateMessage from(Account account, Date expireDate) {
        Long expiration = -1L;

        if (expireDate != null) {
            expiration = DateTimeUtils.getSecondsBetweenDates(new Date(), expireDate);
        }

        return new CreateMessage()
                .setAccountId(account.getId())
                .setJwtToken(account.getJwtToken())
                .setExpiration(expiration);
    }
}

@Getter
@Setter
@Accessors(chain = true)
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Schema(description = "JwtToken Cache Request Message")
public static class RequestMessage {
    @Schema(example = "1L")
    private Long accountId;
}

@Getter
@Setter
@Accessors(chain = true)
@AllArgsConstructor
@NoArgsConstructor
@ToString
@JsonInclude(NON_NULL)
@Schema(description = "JwtToken Cache Info")
public static class JwtTokenInfo {
    @Schema(example = "ff6681f0-50f8-4110-bf96-ef6cec45780e")
    private String id;

    @Schema(example = "1L")
    private Long accountId;

    @Schema(example = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.....")
    private String jwtToken;

    @Schema(example = "2021-01-01T00:00:00")
    @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss")
    private LocalDateTime updatedAt;
}

3. Callback 함수 파일 : 이 파일은 Service layer에서 Message을 보내고 난 후 ack를 확인하기 위한 함수를 정의해 두었다.

@Slf4j
public class JwtTokenFutureCallback implements ListenableFutureCallback<SendResult<String, Object>> {
    private final Object message;

    public JwtTokenFutureCallback(Object message) {
        this.message = message;
    }

    @Override
    public void onFailure(Throwable ex) {
        LOGGER.debug("Unable to send message=[{}] due to : {}", message, ex.getMessage());
    }

    @Override
    public void onSuccess(SendResult<String, Object> result) {
        LOGGER.debug("Sent message=[{}] with offset=[{}]", message, result.getRecordMetadata().offset());
    }
}

 

이제 중요한 Service layer의 코드이다. 

@Slf4j
@RequiredArgsConstructor
@Service
public class JwtTokenMessageServiceImpl implements JwtTokenMessageService {
    private final KafkaTemplate<String, Object> kafkaJwtTokenTemplate;
    private final ReplyingKafkaTemplate<String, Object, String> jwtTokenReplyingKafkaTemplate;
    private final ObjectMapper objectMapper;
    private final JwtTokenProvider jwtTokenProvider;

    @Override
    public void creteJwtTokenCache(Account account) {
        Date expireDate = jwtTokenProvider.getExpiredDate(account.getJwtToken());

        JwtTokenMessage.CreateMessage createMessage = JwtTokenMessage.CreateMessage.from(account, expireDate);

        LOGGER.debug("topic = {}, payload = {}", JwtTokenTopic.TOPIC_CREATE_JWT_TOKEN, createMessage);

        ListenableFuture<SendResult<String, Object>> listenableFuture = kafkaJwtTokenTemplate.send(JwtTokenTopic.TOPIC_CREATE_JWT_TOKEN, createMessage);

        listenableFuture.addCallback(new JwtTokenFutureCallback(createMessage));
    }

    @Override
    public JwtTokenMessage.JwtTokenInfo getJwtTokenCache(JwtTokenMessage.RequestMessage message) throws ExecutionException, InterruptedException, TimeoutException, JsonProcessingException {
        LOGGER.debug("topic = {}, payload = {}", JwtTokenTopic.TOPIC_REQUEST_JWT_TOKEN, message);

        ProducerRecord<String, Object> record = new ProducerRecord<>(JwtTokenTopic.TOPIC_REQUEST_JWT_TOKEN, message);
        RequestReplyFuture<String, Object, String> replyFuture = jwtTokenReplyingKafkaTemplate.sendAndReceive(record);

        SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
        ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);

        JwtTokenMessage.JwtTokenInfo jwtTokenInfo = objectMapper.readValue(consumerRecord.value(), JwtTokenMessage.JwtTokenInfo.class);

        LOGGER.debug("JwtTokenInfo : {}", jwtTokenInfo);
        return jwtTokenInfo;
    }
}

 

우선 KafkaTemplate의 경우 Dependency Injection을 통해 Configuration에서 등록된 Bean이 매핑되고, 이 template을 통해 

Send() 함수를 호출해서 메세지를 보내준다. 필요한 부분은 Topic과 메세지를 넣는 부분이다. 메세지는 각 메세지의 Object이고, Object를 넣어서 보내면 Configuration에서 설정해 둔 Serializer가 Json String 형태로 변경하여 보내게 된다. 

ListenableFuture<SendResult<String, Object>> listenableFuture = kafkaJwtTokenTemplate.send(JwtTokenTopic.TOPIC_CREATE_JWT_TOKEN, createMessage);

 

그리고 ReplayingKafkaTemplate으로 메세지를 보낼 경우에는 추가적인 작업이 필요하다. 

1. ProducerRecord를 생성하여 Topic과 Message 정보를 추가한다. 

ProducerRecord<String, Object> record = new ProducerRecord<>(JwtTokenTopic.TOPIC_REQUEST_JWT_TOKEN, message);

2. ReplyingKafkaTemplate의 sendAndReceive() 함수를 통해서 생성한 ProducerRecord를 보낸다. 

RequestReplyFuture<String, Object, String> replyFuture = jwtTokenReplyingKafkaTemplate.sendAndReceive(record);

3. 보낸 후에는 Async하게 응답을 기다리도록 ReplyFuture 를 리턴받아서 응답을 확인한다. 

SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);

4. 응답으로 받는 데이터는 보내는 형태와 마찬가지로 Object를 Json으로 변경한 값이므로 약속된 형태로 변경하여 사용한다. 

JwtTokenMessage.JwtTokenInfo jwtTokenInfo = objectMapper.readValue(consumerRecord.value(), JwtTokenMessage.JwtTokenInfo.class);

 

처음에 이 부분을 이해하고 코드를 추가하고 테스트를 하는데 어려움을 겪었다. 양쪽 서비스에 구현을 한 후 테스트를 하면서 문제 부분을 확인 후에 수정을 하다보니 시간이 많이 걸렸다. 처음부터 Embedded kafka를 설정해서 Unit test 만들면서 했으면 더 쉬웠을 것 같다. 하지만, Embedded kafka를 적용한 것도 시간이 걸렸으니 쌤쌤인가? ㅋㅋ 

 

다음 포스트에서는 이 메세지들을 받아서 처리하는 부분을 공유해보고자 한다. 

 

728x90
반응형
728x90
반응형

나의 프로젝트에서 가장 복잡한 부분을 차지하게 될 메세지 브로커의 설정에 대해서 알아보도록 하겠다.

kafka가 어떤 것인지는 아래 포스트에서 간략하게 확인할 수 있다. 

 

2022.01.09 - [Infrastructure/Kafka] - kafka란 무엇인가?

 

kafka란 무엇인가?

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical ap..

corono.tistory.com

 

현재 개발된 서비스간의 메세지를 주고 받을 수 있도록 설정할 예정이다. 

여기 포스트에서는 간략한 시나리오를 바탕으로 필요한 설정들을 소개할 예정이다. 

 

<시나리오 1>

spring-micro-auth project에서 Jwt token을 redis에 저장하기 위한 message 를 생성하고 JwtToken Cache Creation Topic으로 send (publish)를 하면 spring-micro-session project에서 해당 Topic을 Subscription 하여 처리한다.

 

<시나리오 2>

spring-micro-auth project에서 JwtToken을 확인하기 위하여 redis에 저장된 정보를 요청한다. JwtToken Cache Request Topic으로 messge를 send하면 spring-micro-session project에서 해당 Topic을 Subscription 하고 있다가 JwtToken 정보를 응답으로 전달한다. 

 

시나리오 1과 2의 다른 점은 1의 경우 메세지를 전송하고 Session에서 처리하도록 둔다. 즉 결과를 확인하지 않는다. (asynchronous)

시나리오 2의 경우 Session의 응답을 기다린다. (synchronous)

 

Producer Configuration

Producer를 설정하기 위해서는 Producer의 Property들을 설정하고 그 Property로 ProducerFacotry를 만든 후 KafkaTemplate를 최종적으로 생성하면 된다. KafkaTemplate을 만들 때에는 주고 받는 메세지의 Topic과 Data의 자료형을 기준으로 생성한다. 

 

@Configuration
public class KafkaConfig {
	private final String bootStrapAddresses;
    private final String groupId;

    public KafkaJwtTokenConfig(
            @Value("${kafka.bootstrap.addresses}")
                    String bootStrapAddresses,
            @Value("${kafka.consumer.groupId}")
                    String groupId) {
        this.bootStrapAddresses = bootStrapAddresses;
        this.groupId = groupId;
    }
    
    private Map<String, Object> produceProperties() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapAddresses);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }
    
    @Bean
    public ProducerFactory<String, Object> jwtTokenProducerFactory() {
        return new DefaultKafkaProducerFactory<>(produceProperties());
    }
    
    @Bean
    public KafkaTemplate<String, Object> kafkaJwtTokenTemplate() {
        return new KafkaTemplate<>(jwtTokenProducerFactory());
    }
}

나의 프로젝트에서는 String type의 Topic과 Object Type의 data를 사용하였다. 

 

Consumer Configuration (with ReplyingKafkaTempate 설정)

위의 시나리오 2에서 Message를 보낸 후 응답을 기다리는 경우에는 다른 KafkaTemplate을 사용하여야 하는데 그 것이 ReplyingKafkaTemplate이다. 이 Template의 경우 ConcurrentMessageListenerContainer를 사용하여 선언해준다.

(보통의 경우에는 아래의 jwtTokenConsumerFactory() 만 설정하면 Consumer로 등록 후 Message 처리가 가능하다. )

@Configuration
public class KafkaConfig {
...
	
    private Map<String, Object> consumeProperties() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapAddresses);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }
    
    @Bean
    public ConsumerFactory<String, String> jwtTokenConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumeProperties());
    }
    
	@Bean
    public ReplyingKafkaTemplate<String, Object, String> jwtTokenReplyingKafkaTemplate(
            @Qualifier("jwtTokenProducerFactory") ProducerFactory<String, Object> pf,
            @Qualifier("jwtTokenReplyContainer") KafkaMessageListenerContainer<String, String> container) {
        return new ReplyingKafkaTemplate<>(pf, container);
    }
    
    @Bean
    public KafkaMessageListenerContainer<String, String> jwtTokenReplyContainer(
            @Qualifier("jwtTokenConsumerFactory") ConsumerFactory<String, String> cf
    ) {
        ContainerProperties containerProperties = new ContainerProperties(JwtTokenTopic.TOPIC_REPLY_JWT_TOKEN);
        return new KafkaMessageListenerContainer<>(cf, containerProperties);
    }
    
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> jwtTokenKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(jwtTokenConsumerFactory());
        factory.setReplyTemplate(kafkaJwtTokenTemplate());
        return factory;
    }
}

ReplyingKafkaTemplate 사용을 위해서는 응답으로 대한 Consumer config가 추가되어야 한다. 

jwtTokenReplayingKafkaTemplate() 함수를 보면 Producer와 Comsumer를 Linsten하고 있는 KafkaMessageListenrContainer를 함께 인자로 받아서 ReplyingkafkaTemplate을 생성한다. 

 

이렇게 생성된 Template으로 메세지를 주고 받을 경우에는 내부적으로 Correlation ID를 생성하여 요청한 Publisher에게 응답이 잘 전달될 수 있도록 한다. 

 

그리고 응답을 받기위해 중요한 설정 중에 하나는 jwtTokenkafkaListenerContainerFactory() 함수에 있는 setReplayTemplate()이다.

@Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> jwtTokenKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(jwtTokenConsumerFactory());
        factory.setReplyTemplate(kafkaJwtTokenTemplate());			// 이 셋팅이 있어야 응답으로 오는 ConsumerRecord를 처리할 수 있음
        return factory;
    }

 

오늘은 kafka configuration에 대해서 간략하게 적어

728x90
반응형

+ Recent posts