Kafka를 이용하는 Spring boot project에서 unit test를 수행할 때에 여러가지 어려움을 직면하게 된다. Kafka message send and receive 관련 Unit test를 추가하기 위해서는 Embedded kafka를 추가한 후 메세지가 제대로 전달되는지를 확인한다.
이때, Kafka bootstrap server address를 아래와 같이 Embedded Kafka 의 Broker 주소 값으로 수행한다.
${spring.embedded.kafka.brokers}
하지만, 이 설정은 아래의 Annotation이 설정된 Spring boot test에서만 사용이 가능하다.
@EmbeddedKafka
그렇지 않은 Unit test에서는 아래와 같은 Exception이 발생하게 된다. Embedded Kafka의 AutoConfiguration이 적용되지 않아서 broker address를 찾을 수 없기 때문이다.
이러한 문제가 발생할 경우 application.yaml에 추가되어 있는 Embedded Kafka 의 broker 주소에 Default 값을 넣어주면 문제가 해결된다.
즉, @EmbeddedKafka가 없는 Spring boot test의 경우 Default 주소로 동작을 하게 되고, 이 때 해당 주소에 실제 Kafka가 없더라도 Integration Test를 수행하는데는 문제가 발생하지 않는다.
나의 경우 Kafka broker의 기본 Port를 사용하여 Default 값을 추가해 주었다.
Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
kafka는 위에 보는 것과 같이 분산 이벤트 스트리밍 플랫폼이다. 이것이 무엇인고 하니, 보통 microservice architecture에서 각 service가 독립된 처리를 하게 된다. 즉, 클라이언트의 요청이 들어오면 이 요청을 처리하기 위해서 여러 service가 특정 행위를 처리해야 한다. 하지만 이러한 동작이 동기화되어 있다면 모든 서비스가 처리될 때 동안 다른 처리를 하지 못하는 경우가 발생되게 되는데 이러한 처리를 비동기로 할 수 있게 도와주는 플랫폼이라고 보면 된다. 요즘은 메세지 브로커, 메세지 큐 등 다양하게 불려지고 있다.
kafka는 Publish - Subscript 를 기반으로 되어 있다. Publisher를 메세지를 생산하고, Subscriper 는 그 메세지를 소비한다.
예를 들어, auth service가 session service에 특정 정보를 저장하고 싶을 때, Publisher : auth - Subscriper : session이 되는 특정 Topic을 생성하고 이 Topic으로 메세지를 보내고, session이 메세지가 가져가서 처리하게 된다. 아주 간단하게 설명한 것이다.
kafka에서는 여러가지 용어가 있다.
Producer : those client applications that publish (write) events to Kafka (이벤트, 즉, 메세지를 생산하는 자)
Consumer : those that subscribe to (read and process) these events. (이벤트를 소비하는 자)
Topic : place which everts are organized and durably stored (이벤트가 관리되고 저장되는 장소, 큐)
Partition : Topics are partitioned, meaning a topic is spread over a number of "buckets" located on different Kafka brokers. (Topic은 여러개의 Partition으로 나뉘어져 있고, 이벤트 키에 따라서 저장되는 파티션이 결정된다.)
Replication : To make your data fault-tolerant and highly-available, every topic can be replicated, even across geo-regions or datacenters, so that there are always multiple brokers that have a copy of the data just in case things go wrong, you want to do maintenance on the brokers, and so on. (이벤트는 Replication을 가질 수 있다. 다른 지역에 있는 개발환경 또는 Data center등 자연재해로 인해 손실될 수 있는 것을 방지하기 위함이다. - 뭔가 거대하다...)
Kafka는 여러가지 API를 가지게 되는데, Admin API, Producer API, Consumer API, and Kafka Streams API 등이 있다.
이것은 역할에 맞는 API 들이 있는 것이기 때문에 당연해 보인다. (아래는 참고용이다.)
The Admin API to manage and inspect topics, brokers, and other Kafka objects.
The Producer API to publish (write) a stream of events to one or more Kafka topics.
The Consumer API to subscribe to (read) one or more topics and to process the stream of events produced to them.
The Kafka Streams API to implement stream processing applications and microservices. It provides higher-level functions to process event streams, including transformations, stateful operations like aggregations and joins, windowing, processing based on event-time, and more. Input is read from one or more topics in order to generate output to one or more topics, effectively transforming the input streams to output streams. The Kafka Connect API to build and run reusable data import/export connectors that consume (read) or produce (write) streams of events from and to external systems and applications so they can integrate with Kafka. For example, a connector to a relational database like PostgreSQL might capture every change to a set of tables. However, in practice, you typically don't need to implement your own connectors because the Kafka community already provides hundreds of ready-to-use connectors.
앞으로 kafka를 공부해 가면서 나의 서버 개발에 사용을 해보고자 한다. 이론적인 부분이 추가될 수도 있고 실용적인 코드 위주로만 추가될 수도 있을 것 같다. 아마 내가 기억해야하는 부분이 주로 추가되지 않을까 싶다.