728x90
반응형

나의 프로젝트에서 가장 복잡한 부분을 차지하게 될 메세지 브로커의 설정에 대해서 알아보도록 하겠다.

kafka가 어떤 것인지는 아래 포스트에서 간략하게 확인할 수 있다. 

 

2022.01.09 - [Infrastructure/Kafka] - kafka란 무엇인가?

 

kafka란 무엇인가?

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical ap..

corono.tistory.com

 

현재 개발된 서비스간의 메세지를 주고 받을 수 있도록 설정할 예정이다. 

여기 포스트에서는 간략한 시나리오를 바탕으로 필요한 설정들을 소개할 예정이다. 

 

<시나리오 1>

spring-micro-auth project에서 Jwt token을 redis에 저장하기 위한 message 를 생성하고 JwtToken Cache Creation Topic으로 send (publish)를 하면 spring-micro-session project에서 해당 Topic을 Subscription 하여 처리한다.

 

<시나리오 2>

spring-micro-auth project에서 JwtToken을 확인하기 위하여 redis에 저장된 정보를 요청한다. JwtToken Cache Request Topic으로 messge를 send하면 spring-micro-session project에서 해당 Topic을 Subscription 하고 있다가 JwtToken 정보를 응답으로 전달한다. 

 

시나리오 1과 2의 다른 점은 1의 경우 메세지를 전송하고 Session에서 처리하도록 둔다. 즉 결과를 확인하지 않는다. (asynchronous)

시나리오 2의 경우 Session의 응답을 기다린다. (synchronous)

 

Producer Configuration

Producer를 설정하기 위해서는 Producer의 Property들을 설정하고 그 Property로 ProducerFacotry를 만든 후 KafkaTemplate를 최종적으로 생성하면 된다. KafkaTemplate을 만들 때에는 주고 받는 메세지의 Topic과 Data의 자료형을 기준으로 생성한다. 

 

@Configuration
public class KafkaConfig {
	private final String bootStrapAddresses;
    private final String groupId;

    public KafkaJwtTokenConfig(
            @Value("${kafka.bootstrap.addresses}")
                    String bootStrapAddresses,
            @Value("${kafka.consumer.groupId}")
                    String groupId) {
        this.bootStrapAddresses = bootStrapAddresses;
        this.groupId = groupId;
    }
    
    private Map<String, Object> produceProperties() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapAddresses);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }
    
    @Bean
    public ProducerFactory<String, Object> jwtTokenProducerFactory() {
        return new DefaultKafkaProducerFactory<>(produceProperties());
    }
    
    @Bean
    public KafkaTemplate<String, Object> kafkaJwtTokenTemplate() {
        return new KafkaTemplate<>(jwtTokenProducerFactory());
    }
}

나의 프로젝트에서는 String type의 Topic과 Object Type의 data를 사용하였다. 

 

Consumer Configuration (with ReplyingKafkaTempate 설정)

위의 시나리오 2에서 Message를 보낸 후 응답을 기다리는 경우에는 다른 KafkaTemplate을 사용하여야 하는데 그 것이 ReplyingKafkaTemplate이다. 이 Template의 경우 ConcurrentMessageListenerContainer를 사용하여 선언해준다.

(보통의 경우에는 아래의 jwtTokenConsumerFactory() 만 설정하면 Consumer로 등록 후 Message 처리가 가능하다. )

@Configuration
public class KafkaConfig {
...
	
    private Map<String, Object> consumeProperties() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapAddresses);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }
    
    @Bean
    public ConsumerFactory<String, String> jwtTokenConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumeProperties());
    }
    
	@Bean
    public ReplyingKafkaTemplate<String, Object, String> jwtTokenReplyingKafkaTemplate(
            @Qualifier("jwtTokenProducerFactory") ProducerFactory<String, Object> pf,
            @Qualifier("jwtTokenReplyContainer") KafkaMessageListenerContainer<String, String> container) {
        return new ReplyingKafkaTemplate<>(pf, container);
    }
    
    @Bean
    public KafkaMessageListenerContainer<String, String> jwtTokenReplyContainer(
            @Qualifier("jwtTokenConsumerFactory") ConsumerFactory<String, String> cf
    ) {
        ContainerProperties containerProperties = new ContainerProperties(JwtTokenTopic.TOPIC_REPLY_JWT_TOKEN);
        return new KafkaMessageListenerContainer<>(cf, containerProperties);
    }
    
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> jwtTokenKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(jwtTokenConsumerFactory());
        factory.setReplyTemplate(kafkaJwtTokenTemplate());
        return factory;
    }
}

ReplyingKafkaTemplate 사용을 위해서는 응답으로 대한 Consumer config가 추가되어야 한다. 

jwtTokenReplayingKafkaTemplate() 함수를 보면 Producer와 Comsumer를 Linsten하고 있는 KafkaMessageListenrContainer를 함께 인자로 받아서 ReplyingkafkaTemplate을 생성한다. 

 

이렇게 생성된 Template으로 메세지를 주고 받을 경우에는 내부적으로 Correlation ID를 생성하여 요청한 Publisher에게 응답이 잘 전달될 수 있도록 한다. 

 

그리고 응답을 받기위해 중요한 설정 중에 하나는 jwtTokenkafkaListenerContainerFactory() 함수에 있는 setReplayTemplate()이다.

@Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> jwtTokenKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(jwtTokenConsumerFactory());
        factory.setReplyTemplate(kafkaJwtTokenTemplate());			// 이 셋팅이 있어야 응답으로 오는 ConsumerRecord를 처리할 수 있음
        return factory;
    }

 

오늘은 kafka configuration에 대해서 간략하게 적어

728x90
반응형
728x90
반응형

Kafka를 이용하는 Spring boot project에서 unit test를 수행할 때에 여러가지 어려움을 직면하게 된다. Kafka message send and receive 관련 Unit test를 추가하기 위해서는 Embedded kafka를 추가한 후 메세지가 제대로 전달되는지를 확인한다. 

 

이때, Kafka bootstrap server address를 아래와 같이 Embedded Kafka 의 Broker 주소 값으로 수행한다.

${spring.embedded.kafka.brokers}

하지만, 이 설정은 아래의 Annotation이 설정된 Spring boot test에서만 사용이 가능하다.

@EmbeddedKafka

그렇지 않은 Unit test에서는 아래와 같은 Exception이 발생하게 된다. Embedded Kafka의 AutoConfiguration이 적용되지 않아서 broker address를 찾을 수 없기 때문이다. 

 

이러한 문제가 발생할 경우 application.yaml에 추가되어 있는 Embedded Kafka 의 broker 주소에 Default 값을 넣어주면 문제가 해결된다. 

 

즉, @EmbeddedKafka가 없는 Spring boot test의 경우 Default 주소로 동작을 하게 되고, 이 때 해당 주소에 실제 Kafka가 없더라도 Integration Test를 수행하는데는 문제가 발생하지 않는다. 

 

나의 경우 Kafka broker의 기본 Port를 사용하여 Default 값을 추가해 주었다. 

kafka:
  bootstrap:
    addresses: ${spring.embedded.kafka.brokers:localhost:9092}

(위 설정은 Config 파일을 통해서 직접 Bean을 등록하여 Spring에서 기본으로 사용하는 Configuration과는 차이가 있다. Default 값을 넣은 부분만 참고하다록 하자.)

728x90
반응형
728x90
반응형

k8s persistent volume을 셋팅하다 보면 Terminating state 에서 stuck 되는 경우가 많다. 

NAME         CAPACITY   ACCESS MODES   RECLAIM POLICY   STATUS        CLAIM               STORAGECLASS            REASON   AGE
jenkins-pv   5Gi        RWO            Retain           Terminating   default/redis-pvc   default-local-storage            25m

 

이럴 경우 아래와 같으 명령어로 쉽게 해결이 가능하다.

$ kubectl patch pv jenkins-pv -p '{"metadata":{"finalizers":null}}'

 

728x90
반응형

'Infrastructure > kubernetes' 카테고리의 다른 글

kubernetes 설치하기 with Docker desktop  (0) 2022.01.12
728x90
반응형

https://docs.jboss.org/hibernate/stable/validator/reference/en-US/html_single/#validator-customconstraints

 

Hibernate Validator 7.0.2.Final - Jakarta Bean Validation Reference Implementation: Reference Guide

Validating data is a common task that occurs throughout all application layers, from the presentation to the persistence layer. Often the same validation logic is implemented in each layer which is time consuming and error-prone. To avoid duplication of th

docs.jboss.org

Spring Project에서 Validation 설정 후에 사용을 하다보면 내가 원하는 형태의 Validation이 필요할 때가 있다. 기본적으로 제공하는 것으로 부족할 때가 반드시 발생하게 된다. 그때에는 Custom Annotation을 만들고, Validator를 연결하여 Contraint Annotation을 만들어서 사용하면 된다. 

 

나의 경우에 가장 먼저 필요한 Custom Validation Contraint는 Request Body에 전달되는 Session Values가 Json String format인지 확인이 필요하였다. (일반적으로 Session data는 Client가 입력하고 싶은 데이터를 입력하여야 하기 때문에 그 값을 Json 형태로 저장해 두면 활용성이 올라갈 것 같아 이렇게 만들었다.)

 

하지만, 기본으로 제공하는 Validation에서는 없는 기능이라 Custom Contraint를 작성하게 되었다. 

 

Annotation Interface

우선 Custom Annotation을 만들기 위해서는 Annotation Interface가 필요하다. 

@Documented			// Javadoc 문서에 Annotation이 포함된다.
@Constraint(validatedBy = JsonStringValidator.class)	// Contraint를 수행하는 Validator class
@Target({ElementType.FIELD})			// Target은 Field에서만 가능하다. (DTO field)
@Retention(RetentionPolicy.RUNTIME)		// 이 Annotation이 동작되는 범위 - Source(컴파일 이후 없어짐), Class(클래스 참조시까지), Runtime(컴파일 이후에도 가능)
public @interface JsonStringConstraint {
    String message() default "Invalid Json String type";
    Class<?>[] groups() default {};
    Class<? extends Payload>[] payload() default {};
}

위와 같이 JsonStringConstraint Annotation Interface를 작성하였다. 엄밀히 얘기하면 Custom Contraint용 Annotation이다. 

Contraint를 작성하기 위해서는 3가지 요소가 반드시 필요한데, message(), groups(), payload()가 그것이다. 

 

  • message - Validation이 실패하였을 경우 표시하는 메세지이다. 이것은 Message Source Accessor과 함께 사용할 수 있다. 
  • groups - Contraint를 Groupping 하는 기능이다. 그룹별로 Message Source를 다르게 사용할 때 등에 사용된다. 
  • payload - 이 값은 Validator에 전달하고 싶은 값을 넣는 곳이다. 예를 들면, Contraint 의 심각도 등을 보내어 심각도에 따라 다른 행위를 하도록 할 수 있다. 

 

JsonStringValidator class

@Slf4j
public class JsonStringValidator implements ConstraintValidator<JsonStringConstraint, String> {

    @Override
    public void initialize(JsonStringConstraint constraintAnnotation) {
        ConstraintValidator.super.initialize(constraintAnnotation);
    }

    @Override
    public boolean isValid(String s, ConstraintValidatorContext constraintValidatorContext) {
        try {
            if (s != null) {
                final ObjectMapper mapper = new ObjectMapper();
                mapper.readTree(s);
            }
            return true;
        } catch (IOException e) {
            LOGGER.debug("String is not json format. {}", s);
            return false;
        }
    }
}

Validator class 는 ContraintValidator Interface를 구현한다. Override되는 메서드는 initailize() 와 isValid() 가 있다. 

여기에서 중요한 것은 isValid() 이다. Contraint가 값을 확인 후에 Validation 결과를 Return 해주어야 한다. 

 

Json String이 유요한지 여부는 Jackson 에 있는 ObjectMapper를 이용하여 확인하였다. 

이 Contraint는 String Field에서만 사용이 가능하다. 

 

사용방법

사용방법은 다른 Validation annotation과 동일하게 사용가능하다. 

@Getter
@Setter
@Accessors(chain = true)
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Schema(description = "Session Creation Request")
public static class SessionCreateRequest {
    @Schema(example = "1L")
    @NotNull(message = "{account.id.empty}")
    private Long accountId;
    @Schema(example = "{\"orderCount\":1}")
    @NotBlank(message = "{session.value.empty}")
    @JsonStringConstraint(message = "{session.value.not.json}")
    private String values;
}

Validation messsage는 Message Source 기능을 사용하여 가져오도록 하였다. 그리고, 다른 Validation Annotation과 함께 사용할 수 있는 것을 볼 수 있다. 

 

아래와 같이 Validation Fail 일 경우 Error response를 받을 수 있다. 

...
"errors": [
        {
            "codes": [
                "JsonStringConstraint.sessionCreateRequest.values",
                "JsonStringConstraint.values",
                "JsonStringConstraint.java.lang.String",
                "JsonStringConstraint"
            ],
            "arguments": [
                {
                    "codes": [
                        "sessionCreateRequest.values",
                        "values"
                    ],
                    "arguments": null,
                    "defaultMessage": "values",
                    "code": "values"
                }
            ],
            "defaultMessage": "String is not JSON format.",
            "objectName": "sessionCreateRequest",
            "field": "values",
            "rejectedValue": "test",
            "bindingFailure": false,
            "code": "JsonStringConstraint"
        }
    ],
    "path": "/api/v1/session"
...

 

728x90
반응형

+ Recent posts