728x90
반응형

오늘은 Spring MVC(Sevlet)과 Webflux의 차이에 대해서 간략하게 정리해 보고자 한다. 

최근 API Gateway를 만들기 위해 Spring Cloud Gateway를 사용하였는데, 이 환경이 Webflux로 작성되어 있어서 기존에 내가 사용한 Spring MVC의 Servelt과 어떠한 차이가 있는지 궁금해서 이런저런 포스트를 찾아보았고, 이해한 내용을 정리해두고자 한다. 

 

우리의 서비스가 Concurrent하게 동작하기 위헤서는 서버 내부에서 동시성을 보장하는 기술이 필요하다. 간단하게 우리는 Thread라고 하는 녀석을 여러 개 만들어서 이것을 사용하여 여러 작업을 동시에 처리하게 할 수 있다. 이것은 Spring MVC에서 채택하고 있는 방법이고 이를 위해서 Thread pool를 만들어서 이 많은 Thread를 관리한다. Spring boot web에서 Request가 들어오면 해당 Request를 처리하기 위해서 Thread하나가 사용된다고 보면 될 것 같다. 

 

그에 반해, 적은 Thread 갯수로 이를 수행할 수 있는 방법이 존재한다. Thread 갯수보다 더 많은 작업이 있더라도 각 작업은 굉장히 적은 시간으로 쪼개어 하나씩 수행하도록 하는 방법이다. 이것을 Event-loop 방식이라하고, 이 방법을 채택해서 유명해 진 것이 우리가 아는 Node.js 이다. JavaScript의 거의 대부분의 수행은 메인 스레드 하나에서 실행된다. 그래서 Node.js는 싱글스레드 기반이라고 얘기하곤 한다. Spring Webflux는 이러한 컨셉을 가져와서 만들 것이라고 보면 된다. 

Webflux는 위에서 보는 것과 같이 기존 Servelt과 동일한 라이브리러리 이외에 Non-servlet으로 구현이 가능한 Netty와 Undertow 라이브러리를 가지고 있다. 이것으로 Non-blocking한 동작을 가능하게 한다. 둘다 동일하게 @Controller annotation을 사용하지만 내부적으로 수행되는 것은 다르다.

 

그럼 언제 Servlet을 사용하고 언제 Reactive를 사용해야 할까?

Spring boot의 Rossen Stoyanchev는 현재의 서비스가 Servlet으로 잘 구동되고 있다면 굳이 바꿀 필요가 없다고 말한다. 

그리고 Blocking Dependency를 사용하는 경우에는 Servlet이 더 어울린다. 그리고 Servlet API를 사용하게 된다면 Servlet이 더 어울린다. 왜나하면 Servlet API를 사용하는 순간 Blocking으로 바뀌게 되기 때문이다. 아래 그림을 보면 이해가 될 것이다. 

Servlet에서 Reactive를 사용하는 방법도 있다. 예를 들어 Microservice 환경에서 Restful API 요청이 들어왔을 때 다른 Service로 요청을 보내거나 데이터를 받아올 때 사용하면 좋을 것이다. 

 

Reactive가 반드시 Servlet 보다 빠른 것은 아니지만 Scalibility를 중시허가나 Resource를 효율적으로 사용하고 싶은 경우에는 고려하면 좋은 것임에는 틀림없다. 

 

미래의 나를 위해...

728x90
반응형
728x90
반응형

2022.02.04 - [Java/Spring boot] - [Spring boot] Kafka 설정하기

 

[Spring boot] Kafka 설정하기

나의 프로젝트에서 가장 복잡한 부분을 차지하게 될 메세지 브로커의 설정에 대해서 알아보도록 하겠다. kafka가 어떤 것인지는 아래 포스트에서 간략하게 확인할 수 있다. 2022.01.09 - [Infrastructure/

corono.tistory.com

 

Kafka 의 설정을 마무리하고 나면 이제 Kafka로 Message를 보낼 수 있게 된다. 이 포스트에서는 Kafka Topic으로 Message를 Publish 하는 예제를 공유하고자 한다. 

 

Kafka에서 메세지를 생산하고 publish 하는 주체를 Producer라고 한다. 나의 Project에서 auth service는 JwtToken을 만들고 이 값을 redis에 저장하고 token을 확인할 때마다 redis에서 값을 가져와서 비교하고자 한다.

 

Kafka 설정시에 2가지의 시나리오가 있어서 KafkaTemplate과 ReplyingKafkaTemplate을 설정하였다. 이 2가지에 대한 Message Publish를 알아보도록 하자. 

 

KafkaTempate & ReplyingKafkaTemplate

KafkaTemplate 을 이용해서 메세지를 보낼 때 primitive한 type으로 보낼 수 있지만, 각 서비스에서 데이터를 관리하기 위해서는 보통 Obejct로 많이 보내게 된다. 특정 Object 형태로 KafkaTemplate을 설정해서 사용할 수 있지만 메세지의 종류가 많아질 경우 이 설정 또한 방대해져서 관리가 힘들 수도 있다. 

그래서 난 범용 Object type을 Json을 Serialilze를 하여 Consumer 측에서 해당 Message로 Object Mapping을 하여 사용할 수 있도록 하였다. 이렇게 할 경우 공용 설정을 통해서 설정 부분의 코드를 줄일 수 있다. 하지만, Cosumer쪽에서 Message에 따라 Casting을 해줘야 하는 불편함은 존재한다. 선택의 문제니 필요에 따라 사용하면 될 것 같다. 

 

예제를 위해서는 몇가지 파일이 필요하다. 

1. Topic : 사용하는 Topic 값을 정의

    (이 Topic의 경우 사용하는 Producer와 Consumer가 함께 가지고 있어야 하는데 어떻게 하면 잘 관리할 수 있을지 고민중이다. )

public class JwtTokenTopic {
    public static final String TOPIC_CREATE_JWT_TOKEN = "topic.create.jwt.token";
    public static final String TOPIC_REQUEST_JWT_TOKEN = "topic.request.jwt.token";
    public static final String TOPIC_REPLY_JWT_TOKEN = "topic.reply.jwt.token";
}

2. Message Object : 메세지로 사용되는 Object들이다, Publish에 사용되는 2가지 간단한 메세지와 응답으로 받는 값을 Mapping 해줄 Object를 정의한다.

@Getter
@Setter
@Accessors(chain = true)
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Schema(description = "JwtToken Cache Creation Message")
public static class CreateMessage {
    @Schema(example = "1L")
    private Long accountId;

    @Schema(example = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.....")
    private String jwtToken;

    @Schema(example = "10L")
    private Long expiration = -1L;

    public static CreateMessage from(Account account, Date expireDate) {
        Long expiration = -1L;

        if (expireDate != null) {
            expiration = DateTimeUtils.getSecondsBetweenDates(new Date(), expireDate);
        }

        return new CreateMessage()
                .setAccountId(account.getId())
                .setJwtToken(account.getJwtToken())
                .setExpiration(expiration);
    }
}

@Getter
@Setter
@Accessors(chain = true)
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Schema(description = "JwtToken Cache Request Message")
public static class RequestMessage {
    @Schema(example = "1L")
    private Long accountId;
}

@Getter
@Setter
@Accessors(chain = true)
@AllArgsConstructor
@NoArgsConstructor
@ToString
@JsonInclude(NON_NULL)
@Schema(description = "JwtToken Cache Info")
public static class JwtTokenInfo {
    @Schema(example = "ff6681f0-50f8-4110-bf96-ef6cec45780e")
    private String id;

    @Schema(example = "1L")
    private Long accountId;

    @Schema(example = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.....")
    private String jwtToken;

    @Schema(example = "2021-01-01T00:00:00")
    @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss")
    private LocalDateTime updatedAt;
}

3. Callback 함수 파일 : 이 파일은 Service layer에서 Message을 보내고 난 후 ack를 확인하기 위한 함수를 정의해 두었다.

@Slf4j
public class JwtTokenFutureCallback implements ListenableFutureCallback<SendResult<String, Object>> {
    private final Object message;

    public JwtTokenFutureCallback(Object message) {
        this.message = message;
    }

    @Override
    public void onFailure(Throwable ex) {
        LOGGER.debug("Unable to send message=[{}] due to : {}", message, ex.getMessage());
    }

    @Override
    public void onSuccess(SendResult<String, Object> result) {
        LOGGER.debug("Sent message=[{}] with offset=[{}]", message, result.getRecordMetadata().offset());
    }
}

 

이제 중요한 Service layer의 코드이다. 

@Slf4j
@RequiredArgsConstructor
@Service
public class JwtTokenMessageServiceImpl implements JwtTokenMessageService {
    private final KafkaTemplate<String, Object> kafkaJwtTokenTemplate;
    private final ReplyingKafkaTemplate<String, Object, String> jwtTokenReplyingKafkaTemplate;
    private final ObjectMapper objectMapper;
    private final JwtTokenProvider jwtTokenProvider;

    @Override
    public void creteJwtTokenCache(Account account) {
        Date expireDate = jwtTokenProvider.getExpiredDate(account.getJwtToken());

        JwtTokenMessage.CreateMessage createMessage = JwtTokenMessage.CreateMessage.from(account, expireDate);

        LOGGER.debug("topic = {}, payload = {}", JwtTokenTopic.TOPIC_CREATE_JWT_TOKEN, createMessage);

        ListenableFuture<SendResult<String, Object>> listenableFuture = kafkaJwtTokenTemplate.send(JwtTokenTopic.TOPIC_CREATE_JWT_TOKEN, createMessage);

        listenableFuture.addCallback(new JwtTokenFutureCallback(createMessage));
    }

    @Override
    public JwtTokenMessage.JwtTokenInfo getJwtTokenCache(JwtTokenMessage.RequestMessage message) throws ExecutionException, InterruptedException, TimeoutException, JsonProcessingException {
        LOGGER.debug("topic = {}, payload = {}", JwtTokenTopic.TOPIC_REQUEST_JWT_TOKEN, message);

        ProducerRecord<String, Object> record = new ProducerRecord<>(JwtTokenTopic.TOPIC_REQUEST_JWT_TOKEN, message);
        RequestReplyFuture<String, Object, String> replyFuture = jwtTokenReplyingKafkaTemplate.sendAndReceive(record);

        SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
        ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);

        JwtTokenMessage.JwtTokenInfo jwtTokenInfo = objectMapper.readValue(consumerRecord.value(), JwtTokenMessage.JwtTokenInfo.class);

        LOGGER.debug("JwtTokenInfo : {}", jwtTokenInfo);
        return jwtTokenInfo;
    }
}

 

우선 KafkaTemplate의 경우 Dependency Injection을 통해 Configuration에서 등록된 Bean이 매핑되고, 이 template을 통해 

Send() 함수를 호출해서 메세지를 보내준다. 필요한 부분은 Topic과 메세지를 넣는 부분이다. 메세지는 각 메세지의 Object이고, Object를 넣어서 보내면 Configuration에서 설정해 둔 Serializer가 Json String 형태로 변경하여 보내게 된다. 

ListenableFuture<SendResult<String, Object>> listenableFuture = kafkaJwtTokenTemplate.send(JwtTokenTopic.TOPIC_CREATE_JWT_TOKEN, createMessage);

 

그리고 ReplayingKafkaTemplate으로 메세지를 보낼 경우에는 추가적인 작업이 필요하다. 

1. ProducerRecord를 생성하여 Topic과 Message 정보를 추가한다. 

ProducerRecord<String, Object> record = new ProducerRecord<>(JwtTokenTopic.TOPIC_REQUEST_JWT_TOKEN, message);

2. ReplyingKafkaTemplate의 sendAndReceive() 함수를 통해서 생성한 ProducerRecord를 보낸다. 

RequestReplyFuture<String, Object, String> replyFuture = jwtTokenReplyingKafkaTemplate.sendAndReceive(record);

3. 보낸 후에는 Async하게 응답을 기다리도록 ReplyFuture 를 리턴받아서 응답을 확인한다. 

SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);

4. 응답으로 받는 데이터는 보내는 형태와 마찬가지로 Object를 Json으로 변경한 값이므로 약속된 형태로 변경하여 사용한다. 

JwtTokenMessage.JwtTokenInfo jwtTokenInfo = objectMapper.readValue(consumerRecord.value(), JwtTokenMessage.JwtTokenInfo.class);

 

처음에 이 부분을 이해하고 코드를 추가하고 테스트를 하는데 어려움을 겪었다. 양쪽 서비스에 구현을 한 후 테스트를 하면서 문제 부분을 확인 후에 수정을 하다보니 시간이 많이 걸렸다. 처음부터 Embedded kafka를 설정해서 Unit test 만들면서 했으면 더 쉬웠을 것 같다. 하지만, Embedded kafka를 적용한 것도 시간이 걸렸으니 쌤쌤인가? ㅋㅋ 

 

다음 포스트에서는 이 메세지들을 받아서 처리하는 부분을 공유해보고자 한다. 

 

728x90
반응형

+ Recent posts