728x90
반응형

Kafka에 대한 설정 및 Producer와 Consumer의 예제에 대해서 알아보았다. 그럼 이 코드들이 잘 동작하는지는 어떻게 확인해야할까?

실제 서비스들 간의 메세지 전송 및 결과를 확인하기 위해서는 테스트 환경을 구축하여 서비스를 띄운 후에 테스트 해보는 것이 가장 정확하다. Docker compose로 Kafka broker와 Zookeeper를 띄우고 Bootstarp address에 Kafka broker 주소를 넣어준 뒤 서비스를 띄워서 테스트를 하면된다. 

 

하지만, 이러한 환경은 귀찮은 면이 있다. 항상 테스트를 위해서 서비스를 띄워두는 것은 리소스 낭비이다. 그래서 Kafka의 메세지 전송을 테스트 할 수 있는 환경이 필요하다. Spring boot 환경에서는 이를 테스트 할 수 있는 라이브러리를 제공하고 있다. 

아래의 라이브러리가 그것이며, 이름은 Spring kafka test library 이다. 

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <scope>test</scope>
</dependency>

 

위의 Configuration을 추가하면 Embedded kafka 를 사용할 수 있게 되고, 이 환경을 통해서 Message 전송을 임의로 만들 수 있게 된다. 

단, Container 환경에서 Embedded kafka로 테스트를 진행하면 정상적으로 진행되지 않다. 
현재, 나의 환경이 Jenkins를 Kubernetes 내부에 띄우고 Pod agent로 Docker Image build 및 Release Job을 수행하고 있고, 이때 Unit test를 수행하여 코드의 안정성을 확인하고 있다. 하지만, Embedded kafka관련 Test는 수행하고 못하고 있다. 

 

Test 예제

application-test.yaml

Embedeed kafka를 사용하기 위해서는 Embedded kafka에서 띄우는 서버를 사용하여야 한다. 

# kafka
kafka:
  bootstrap:
    addresses: ${spring.embedded.kafka.brokers:localhost:9092}

 

Essential Configuration (JUnit5)

Embedded kafka를 사용하기 위해서는 필수적으로 넣어주어야 하는 Configuration 이 있다. 

우선 @EmbeddedKafka annotation으로 Embedded Kafka를 추가해 준다. 그리고 이 Embedded kafka가 동작하기 위해서는 Integration Test 환경으로 수행되어야 한다. 그 말은 Spring 의 Dependency가 사용되는 환경에서 수행되어야 하기 때문에 @SpringbootTest annotation이 추가되어야 하고, 그와 더불어 @ExtendWith(SpringExtension.class)가 추가되어야 정상적인 동작이 된다. 이러한 셋팅은 JUnit5 기반으로 테스트 될때이며 JUnit4는 그 방법이 다르다. 

@Slf4j
@Tag("embedded-kafka-test")
@SpringBootTest
@ExtendWith(SpringExtension.class)
@EmbeddedKafka
@AutoConfigureMockMvc
@TestPropertySource(properties = "spring.config.location=classpath:application-test.yaml")
public class JwtTokenMessageServiceImplTest {
...
}

 

Kafka Broker Configuration

Embedded Kafka의 Broker를 셋팅해주는 부분이다. @Autowired annotation으로 Dependency Injection을 해주면 된다. 

Broker count는 2개, Partition은 2개로 설정해 주었다. 마지막은 설정과 동시에 생성할 Topic인데, Topic이 여러 개 일 때에는 뒤쪽으로 ,(콤마)로 구분하여 계속 넣어주면 된다. 

Intellij를 사용한다면 이 부분에서 @Autowired 로 Dependency Injection시 Dependecy를 찾을 수 없다는 Inspection Message를 받을 수 있다. 이 부부은 Intellij의 문제로 난 이 Inspection의 level을 warning으로 바꾸었다. 

2022.02.05 - [Tool/IntelliJ] - [Intellij] @Autowired inspection 오류 무시하는 방법

...
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(
        2,
        true,
        2,
        JwtTokenTopic.TOPIC_CREATE_JWT_TOKEN
);
...

 

Producer configuration

Producer configuration은 간단하다. Produecer Property는 Default 셋팅으로 설정하였다. 그리고 Key Serializer는 String, Value Serialilzer는 Json으로 설정하였다. 이 부분의 서비스에서 사용하는 설정과 동일하게 하면 된다. 

private Producer<String, Object> configEmbeddedKafkaProducer() {
    Map<String, Object> producerProperties = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
    return new DefaultKafkaProducerFactory<>(producerProperties, new StringSerializer(), new JsonSerializer<>()).createProducer();
}

 

Consumer Configuration

Consumer의 설정은 복잡하다. 왜냐하면 Kafka의 Consumer의 설정에서 보았듯이 KafkaLinstener의 설정이 추가되어야 하기 때문이다. Embedded Kafka도 마찬가지로 Produer가 보내는 메세지가 Topic에 들어오면 Listener가 해당 메세지를 처리하도록 해야하기 때문이다. 

 

1) 우선 Consumer Property 셋팅을 한다. 기본 셋팅을 가지고 가되, Auto commit만 false로 셋팅하였다. 나의 코드에서 Commit을 수동을 하고 있다. 

2) ConsumerFactory를 만들고, Container Property를 topic과 함께 생성하고, KafkaMessageListenerContainer를 만든다. 

3) CusumerRecord를 순차적으로 저장시키고, 순차적으로 처리하기 위해, BlockingQueue로 선언한다. 

4) 이후, 생성된 KafkaMessageListenerContainer에 Listener 함수를 추가한다. 이 함수에서는 Log로 Message를 출력하고, BlockQueue에 record를 추가한다. 이 record를 필요한 시점에 뽑아서 확인한다.

private KafkaMessageListenerContainer<String, String> container;

private BlockingQueue<ConsumerRecord<String, String>> records;

private void configEmbeddedKafkaConsumer(String topic) {
    Map<String, Object> consumerProperties = new HashMap<>(KafkaTestUtils.consumerProps("auth", "false", embeddedKafkaBroker));

    DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties);

    ContainerProperties containerProperties = new ContainerProperties(topic);
    container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);

    records = new LinkedBlockingDeque<>();

    container.setupMessageListener((MessageListener<String, String>) record -> {
                LOGGER.debug("test-listener received message='{}'",
                        record.value());
                records.add(record);
    });
    container.start();

    ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
}

 

전체 Configuration 부분 코드

@Slf4j
@SpringBootTest
@ExtendWith(SpringExtension.class)
@EmbeddedKafka
@AutoConfigureMockMvc
@TestPropertySource(properties = "spring.config.location=classpath:application-test.yaml")
public class JwtTokenMessageServiceImplTest {
    @Autowired
    private JwtTokenMessageService jwtTokenMessageService;

    @MockBean
    private JwtTokenProvider jwtTokenProvider;

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(
            2,
            true,
            2,
            JwtTokenTopic.TOPIC_CREATE_JWT_TOKEN
    );

    private KafkaMessageListenerContainer<String, String> container;

    private BlockingQueue<ConsumerRecord<String, String>> records;

    @BeforeEach
    void setUp() {}

    private void configEmbeddedKafkaConsumer(String topic) {
        Map<String, Object> consumerProperties = new HashMap<>(KafkaTestUtils.consumerProps("auth", "false", embeddedKafkaBroker));

        DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties);

        ContainerProperties containerProperties = new ContainerProperties(topic);
        container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);

        records = new LinkedBlockingDeque<>();

        container.setupMessageListener((MessageListener<String, String>) record -> {
                    LOGGER.debug("test-listener received message='{}'",
                            record.value());
                    records.add(record);
        });
        container.start();

        ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
    }

    private Producer<String, Object> configEmbeddedKafkaProducer() {
        Map<String, Object> producerProperties = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
        return new DefaultKafkaProducerFactory<>(producerProperties, new StringSerializer(), new JsonSerializer<>()).createProducer();
    }

    @AfterEach
    void tearDown() {
        container.stop();
    }
}

 

Producer Test Code예제

void testCreateMessageForJwtTokenCache() throws InterruptedException, JsonProcessingException {
	// Consumer 생성
    configEmbeddedKafkaConsumer(JwtTokenTopic.TOPIC_CREATE_JWT_TOKEN);
	
    Account account = TestAccountBuilder.accountWithToken();
	
    // 전송할 테스트 메세지 생성
    JwtTokenMessage.CreateMessage createMessage = JwtTokenMessage.CreateMessage.from(account, null);
    String expectedMessage = objectMapper.writeValueAsString(createMessage);

	// 확인하지 않을 함수에 대한 Mocking
    when(jwtTokenProvider.getExpiredDate(any())).thenReturn(null);

	// Embedded kafka로 Message 전송
    jwtTokenMessageService.creteJwtTokenCache(account);

	// BlockingQueue에 메세지를 가져옴. 10초 동안 메세지가 들어오지 않으면 Error 발생
    ConsumerRecord<String, String> record = records.poll(10, TimeUnit.SECONDS);
    
    
	// 수신한 메세지 확인
    assertThat(record).isNotNull();
    assertThat(record.value()).isEqualTo(expectedMessage);
    assertThat(record).has(key(null));
}

 

Consumer Test Code 예제

void testCreateMessageForJwtTokenCache() {
	// 전송할 메세지 생성
    JwtTokenMessage.CreateMessage createMessage = new JwtTokenMessage.CreateMessage();
    createMessage.setAccountId(1L);
    createMessage.setJwtToken("test-jwtToken");
    createMessage.setExpiration(120L);

	// 메세지를 사용하여 ProducerRecord를 생성한다.
    ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(JwtTokenTopic.TOPIC_CREATE_JWT_TOKEN, createMessage);
    // 메세지 전송
    producer.send(producerRecord);
    producer.flush();

	// Mocking
    when(jwtTokenService.create(any())).thenReturn(TestJwtTokenBuilder.defaultJwtToken());

	// Consumer가 메세지를 잘 받아서 Listener가 수행되는지를 확인 (Listener의 특정 Function이 수행되는지를 확인)
    verify(jwtTokenService, timeout(10000L).times(1)).create(any());
}

 

이 정도면 간단한 메세지 전송 여부는 확인이 가능하다. 

728x90
반응형
728x90
반응형

https://docs.jboss.org/hibernate/stable/validator/reference/en-US/html_single/#validator-customconstraints

 

Hibernate Validator 7.0.2.Final - Jakarta Bean Validation Reference Implementation: Reference Guide

Validating data is a common task that occurs throughout all application layers, from the presentation to the persistence layer. Often the same validation logic is implemented in each layer which is time consuming and error-prone. To avoid duplication of th

docs.jboss.org

Spring Project에서 Validation 설정 후에 사용을 하다보면 내가 원하는 형태의 Validation이 필요할 때가 있다. 기본적으로 제공하는 것으로 부족할 때가 반드시 발생하게 된다. 그때에는 Custom Annotation을 만들고, Validator를 연결하여 Contraint Annotation을 만들어서 사용하면 된다. 

 

나의 경우에 가장 먼저 필요한 Custom Validation Contraint는 Request Body에 전달되는 Session Values가 Json String format인지 확인이 필요하였다. (일반적으로 Session data는 Client가 입력하고 싶은 데이터를 입력하여야 하기 때문에 그 값을 Json 형태로 저장해 두면 활용성이 올라갈 것 같아 이렇게 만들었다.)

 

하지만, 기본으로 제공하는 Validation에서는 없는 기능이라 Custom Contraint를 작성하게 되었다. 

 

Annotation Interface

우선 Custom Annotation을 만들기 위해서는 Annotation Interface가 필요하다. 

@Documented			// Javadoc 문서에 Annotation이 포함된다.
@Constraint(validatedBy = JsonStringValidator.class)	// Contraint를 수행하는 Validator class
@Target({ElementType.FIELD})			// Target은 Field에서만 가능하다. (DTO field)
@Retention(RetentionPolicy.RUNTIME)		// 이 Annotation이 동작되는 범위 - Source(컴파일 이후 없어짐), Class(클래스 참조시까지), Runtime(컴파일 이후에도 가능)
public @interface JsonStringConstraint {
    String message() default "Invalid Json String type";
    Class<?>[] groups() default {};
    Class<? extends Payload>[] payload() default {};
}

위와 같이 JsonStringConstraint Annotation Interface를 작성하였다. 엄밀히 얘기하면 Custom Contraint용 Annotation이다. 

Contraint를 작성하기 위해서는 3가지 요소가 반드시 필요한데, message(), groups(), payload()가 그것이다. 

 

  • message - Validation이 실패하였을 경우 표시하는 메세지이다. 이것은 Message Source Accessor과 함께 사용할 수 있다. 
  • groups - Contraint를 Groupping 하는 기능이다. 그룹별로 Message Source를 다르게 사용할 때 등에 사용된다. 
  • payload - 이 값은 Validator에 전달하고 싶은 값을 넣는 곳이다. 예를 들면, Contraint 의 심각도 등을 보내어 심각도에 따라 다른 행위를 하도록 할 수 있다. 

 

JsonStringValidator class

@Slf4j
public class JsonStringValidator implements ConstraintValidator<JsonStringConstraint, String> {

    @Override
    public void initialize(JsonStringConstraint constraintAnnotation) {
        ConstraintValidator.super.initialize(constraintAnnotation);
    }

    @Override
    public boolean isValid(String s, ConstraintValidatorContext constraintValidatorContext) {
        try {
            if (s != null) {
                final ObjectMapper mapper = new ObjectMapper();
                mapper.readTree(s);
            }
            return true;
        } catch (IOException e) {
            LOGGER.debug("String is not json format. {}", s);
            return false;
        }
    }
}

Validator class 는 ContraintValidator Interface를 구현한다. Override되는 메서드는 initailize() 와 isValid() 가 있다. 

여기에서 중요한 것은 isValid() 이다. Contraint가 값을 확인 후에 Validation 결과를 Return 해주어야 한다. 

 

Json String이 유요한지 여부는 Jackson 에 있는 ObjectMapper를 이용하여 확인하였다. 

이 Contraint는 String Field에서만 사용이 가능하다. 

 

사용방법

사용방법은 다른 Validation annotation과 동일하게 사용가능하다. 

@Getter
@Setter
@Accessors(chain = true)
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Schema(description = "Session Creation Request")
public static class SessionCreateRequest {
    @Schema(example = "1L")
    @NotNull(message = "{account.id.empty}")
    private Long accountId;
    @Schema(example = "{\"orderCount\":1}")
    @NotBlank(message = "{session.value.empty}")
    @JsonStringConstraint(message = "{session.value.not.json}")
    private String values;
}

Validation messsage는 Message Source 기능을 사용하여 가져오도록 하였다. 그리고, 다른 Validation Annotation과 함께 사용할 수 있는 것을 볼 수 있다. 

 

아래와 같이 Validation Fail 일 경우 Error response를 받을 수 있다. 

...
"errors": [
        {
            "codes": [
                "JsonStringConstraint.sessionCreateRequest.values",
                "JsonStringConstraint.values",
                "JsonStringConstraint.java.lang.String",
                "JsonStringConstraint"
            ],
            "arguments": [
                {
                    "codes": [
                        "sessionCreateRequest.values",
                        "values"
                    ],
                    "arguments": null,
                    "defaultMessage": "values",
                    "code": "values"
                }
            ],
            "defaultMessage": "String is not JSON format.",
            "objectName": "sessionCreateRequest",
            "field": "values",
            "rejectedValue": "test",
            "bindingFailure": false,
            "code": "JsonStringConstraint"
        }
    ],
    "path": "/api/v1/session"
...

 

728x90
반응형
728x90
반응형

Spring Project를 진행하다 보면 `SLF4J: Class path contains multiple SLF4J bindings.` 라는 Warning 메세지를 볼 수 있다. 

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/kimseunghwan/.m2/repository/ch/qos/logback/logback-classic/1.2.9/logback-classic-1.2.9.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/kimseunghwan/.m2/repository/org/slf4j/slf4j-simple/1.7.32/slf4j-simple-1.7.32.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]

 

이런 Warning 메세지는 Dependecy 내부에서 같은 Dependecy지만 다른 버전을 Dependency 내부에서 참조를 하고 있으면 발생하게 된다. 이를 해결하기 위해서는 아래와 같은 방법을 사용하면 된다. 

 

Dependecy tree 확인

현재 어떤 Dependecy가 충돌되는 Dependency를 참조하고 있는지 모르기 때문에 `mvn dependecy:tree`로 현재 내가 사용하고 있는 Dependecy를 확인한다. 

$ mvn dependecy:tree

IDE에서 확인하는 방법 (Intellij)

 

나의 경우에는 아래와 같이 Spring boot 와 Embedded-redis에서 충돌이 발생하고 있다. 

[INFO] io.coolexplorer:spring-boot-session:war:0.0.1-SNAPSHOT
[INFO] +- org.springframework.boot:spring-boot-starter-web:jar:2.6.2:compile
[INFO] |  +- org.springframework.boot:spring-boot-starter:jar:2.6.2:compile
[INFO] |  |  +- org.springframework.boot:spring-boot-starter-logging:jar:2.6.2:compile
[INFO] |  |  |  +- ch.qos.logback:logback-classic:jar:1.2.9:compile
[INFO] |  |  |  |  \- ch.qos.logback:logback-core:jar:1.2.9:compile
[INFO] |  |  |  +- org.apache.logging.log4j:log4j-to-slf4j:jar:2.17.0:compile
[INFO] |  |  |  |  \- org.apache.logging.log4j:log4j-api:jar:2.17.0:compile
[INFO] |  |  |  \- org.slf4j:jul-to-slf4j:jar:1.7.32:compile
...
[INFO] \- it.ozimov:embedded-redis:jar:0.7.3:test
[INFO]    +- com.google.guava:guava:jar:21.0:test
[INFO]    +- commons-io:commons-io:jar:2.5:test
[INFO]    +- org.slf4j:slf4j-simple:jar:1.7.32:test
[INFO]    \- commons-logging:commons-logging:jar:1.2:test

 

Dependency 제외

해결 방법은 충돌하고 있는 두 Dependency중 한 부분에서 제외를 시켜주는 것이다. 나의 경우에는 Embedded-redis에서 제외를 시켜 주었다. 

변경된 pom.xml - <exclusions> 블럭이 추가되었다. 

<dependency>
    <groupId>it.ozimov</groupId>
    <artifactId>embedded-redis</artifactId>
    <version>${embedded.redis.version}</version>
    <scope>test</scope>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
        </exclusion>
    </exclusions>
</dependency>

 

간단하게 해결!

728x90
반응형
728x90
반응형

Server API의 Request 나 Response 의 String이 모두 영어로 되어 있을 경우에는 문제가 없지만, 에러메세지나 Validation 메세지 등을 한글로 보낼 경우에는 `UTP-8` Encoding 을 사용해야 한다. 그럼 언어를 Character Encoding을 설정하는 방법에 대해서 알아보자. 

 

Spring에서 Character Encoding을 설정하기 위해서는 Configuration annotation을 활용하여 Bean으로 Character Encoding Filter를 등록시키면 된다. 

 

내 개인 프로젝트에서는 아래와 같이 간단하게 AppConfig class를 만들어 두었다. 

 

 

 

728x90
반응형

+ Recent posts