728x90
반응형

Unit test를 구현할 때에 간혹 특정 Function의 동작을 무시하고 싶을 때가 있다. 

나의 경우를 예를 들면 Restful API로 들어온 요청이 있고, 이 중 특정 데이터를 Message Broker를 통해 다른 Server로 전달 후 Redis에 저장을 하는 경우에 필요하였다. 

Message Broker 동작은 단순히 전달만 하고 끝나는 작업이기도 하고 Message Broker에 대한 Unit test는 별도로 수행하기 때문에 Restful API 동작에 대한 Unit test에서는 Message Broker의 동작은 무시하여도 되는 상황이었다. 

 

이 때 사용할 수 있는 것이 doNothing()이다. 사용법은 아래와 같다. 

doNothing().when(jwtTokenMessageService).creteJwtTokenCache(any());

JwtTokenMessageService는 Message Broker(kafka)로 메세지를 전달하는 역할을 하고 이 서비스는 별도의 Unit test로 테스트가 된다. (이 부분은 차후에 포스팅 하려고 한다.)

 

미래의 나를 위해 남겨두는 팁이다. 

728x90
반응형
728x90
반응형

나의 프로젝트에서 가장 복잡한 부분을 차지하게 될 메세지 브로커의 설정에 대해서 알아보도록 하겠다.

kafka가 어떤 것인지는 아래 포스트에서 간략하게 확인할 수 있다. 

 

2022.01.09 - [Infrastructure/Kafka] - kafka란 무엇인가?

 

kafka란 무엇인가?

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical ap..

corono.tistory.com

 

현재 개발된 서비스간의 메세지를 주고 받을 수 있도록 설정할 예정이다. 

여기 포스트에서는 간략한 시나리오를 바탕으로 필요한 설정들을 소개할 예정이다. 

 

<시나리오 1>

spring-micro-auth project에서 Jwt token을 redis에 저장하기 위한 message 를 생성하고 JwtToken Cache Creation Topic으로 send (publish)를 하면 spring-micro-session project에서 해당 Topic을 Subscription 하여 처리한다.

 

<시나리오 2>

spring-micro-auth project에서 JwtToken을 확인하기 위하여 redis에 저장된 정보를 요청한다. JwtToken Cache Request Topic으로 messge를 send하면 spring-micro-session project에서 해당 Topic을 Subscription 하고 있다가 JwtToken 정보를 응답으로 전달한다. 

 

시나리오 1과 2의 다른 점은 1의 경우 메세지를 전송하고 Session에서 처리하도록 둔다. 즉 결과를 확인하지 않는다. (asynchronous)

시나리오 2의 경우 Session의 응답을 기다린다. (synchronous)

 

Producer Configuration

Producer를 설정하기 위해서는 Producer의 Property들을 설정하고 그 Property로 ProducerFacotry를 만든 후 KafkaTemplate를 최종적으로 생성하면 된다. KafkaTemplate을 만들 때에는 주고 받는 메세지의 Topic과 Data의 자료형을 기준으로 생성한다. 

 

@Configuration
public class KafkaConfig {
	private final String bootStrapAddresses;
    private final String groupId;

    public KafkaJwtTokenConfig(
            @Value("${kafka.bootstrap.addresses}")
                    String bootStrapAddresses,
            @Value("${kafka.consumer.groupId}")
                    String groupId) {
        this.bootStrapAddresses = bootStrapAddresses;
        this.groupId = groupId;
    }
    
    private Map<String, Object> produceProperties() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapAddresses);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }
    
    @Bean
    public ProducerFactory<String, Object> jwtTokenProducerFactory() {
        return new DefaultKafkaProducerFactory<>(produceProperties());
    }
    
    @Bean
    public KafkaTemplate<String, Object> kafkaJwtTokenTemplate() {
        return new KafkaTemplate<>(jwtTokenProducerFactory());
    }
}

나의 프로젝트에서는 String type의 Topic과 Object Type의 data를 사용하였다. 

 

Consumer Configuration (with ReplyingKafkaTempate 설정)

위의 시나리오 2에서 Message를 보낸 후 응답을 기다리는 경우에는 다른 KafkaTemplate을 사용하여야 하는데 그 것이 ReplyingKafkaTemplate이다. 이 Template의 경우 ConcurrentMessageListenerContainer를 사용하여 선언해준다.

(보통의 경우에는 아래의 jwtTokenConsumerFactory() 만 설정하면 Consumer로 등록 후 Message 처리가 가능하다. )

@Configuration
public class KafkaConfig {
...
	
    private Map<String, Object> consumeProperties() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapAddresses);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }
    
    @Bean
    public ConsumerFactory<String, String> jwtTokenConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumeProperties());
    }
    
	@Bean
    public ReplyingKafkaTemplate<String, Object, String> jwtTokenReplyingKafkaTemplate(
            @Qualifier("jwtTokenProducerFactory") ProducerFactory<String, Object> pf,
            @Qualifier("jwtTokenReplyContainer") KafkaMessageListenerContainer<String, String> container) {
        return new ReplyingKafkaTemplate<>(pf, container);
    }
    
    @Bean
    public KafkaMessageListenerContainer<String, String> jwtTokenReplyContainer(
            @Qualifier("jwtTokenConsumerFactory") ConsumerFactory<String, String> cf
    ) {
        ContainerProperties containerProperties = new ContainerProperties(JwtTokenTopic.TOPIC_REPLY_JWT_TOKEN);
        return new KafkaMessageListenerContainer<>(cf, containerProperties);
    }
    
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> jwtTokenKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(jwtTokenConsumerFactory());
        factory.setReplyTemplate(kafkaJwtTokenTemplate());
        return factory;
    }
}

ReplyingKafkaTemplate 사용을 위해서는 응답으로 대한 Consumer config가 추가되어야 한다. 

jwtTokenReplayingKafkaTemplate() 함수를 보면 Producer와 Comsumer를 Linsten하고 있는 KafkaMessageListenrContainer를 함께 인자로 받아서 ReplyingkafkaTemplate을 생성한다. 

 

이렇게 생성된 Template으로 메세지를 주고 받을 경우에는 내부적으로 Correlation ID를 생성하여 요청한 Publisher에게 응답이 잘 전달될 수 있도록 한다. 

 

그리고 응답을 받기위해 중요한 설정 중에 하나는 jwtTokenkafkaListenerContainerFactory() 함수에 있는 setReplayTemplate()이다.

@Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> jwtTokenKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(jwtTokenConsumerFactory());
        factory.setReplyTemplate(kafkaJwtTokenTemplate());			// 이 셋팅이 있어야 응답으로 오는 ConsumerRecord를 처리할 수 있음
        return factory;
    }

 

오늘은 kafka configuration에 대해서 간략하게 적어

728x90
반응형
728x90
반응형

Kafka를 이용하는 Spring boot project에서 unit test를 수행할 때에 여러가지 어려움을 직면하게 된다. Kafka message send and receive 관련 Unit test를 추가하기 위해서는 Embedded kafka를 추가한 후 메세지가 제대로 전달되는지를 확인한다. 

 

이때, Kafka bootstrap server address를 아래와 같이 Embedded Kafka 의 Broker 주소 값으로 수행한다.

${spring.embedded.kafka.brokers}

하지만, 이 설정은 아래의 Annotation이 설정된 Spring boot test에서만 사용이 가능하다.

@EmbeddedKafka

그렇지 않은 Unit test에서는 아래와 같은 Exception이 발생하게 된다. Embedded Kafka의 AutoConfiguration이 적용되지 않아서 broker address를 찾을 수 없기 때문이다. 

 

이러한 문제가 발생할 경우 application.yaml에 추가되어 있는 Embedded Kafka 의 broker 주소에 Default 값을 넣어주면 문제가 해결된다. 

 

즉, @EmbeddedKafka가 없는 Spring boot test의 경우 Default 주소로 동작을 하게 되고, 이 때 해당 주소에 실제 Kafka가 없더라도 Integration Test를 수행하는데는 문제가 발생하지 않는다. 

 

나의 경우 Kafka broker의 기본 Port를 사용하여 Default 값을 추가해 주었다. 

kafka:
  bootstrap:
    addresses: ${spring.embedded.kafka.brokers:localhost:9092}

(위 설정은 Config 파일을 통해서 직접 Bean을 등록하여 Spring에서 기본으로 사용하는 Configuration과는 차이가 있다. Default 값을 넣은 부분만 참고하다록 하자.)

728x90
반응형
728x90
반응형

k8s persistent volume을 셋팅하다 보면 Terminating state 에서 stuck 되는 경우가 많다. 

NAME         CAPACITY   ACCESS MODES   RECLAIM POLICY   STATUS        CLAIM               STORAGECLASS            REASON   AGE
jenkins-pv   5Gi        RWO            Retain           Terminating   default/redis-pvc   default-local-storage            25m

 

이럴 경우 아래와 같으 명령어로 쉽게 해결이 가능하다.

$ kubectl patch pv jenkins-pv -p '{"metadata":{"finalizers":null}}'

 

728x90
반응형

'Infrastructure > kubernetes' 카테고리의 다른 글

kubernetes 설치하기 with Docker desktop  (0) 2022.01.12

+ Recent posts