티스토리 뷰

지난 시간에는 kafka를 설치하여 topic을 발행하고 producer가 메세지를 보내고 consumer가 메세지를 받는 것을 예제를 통해 알아보았다. 이번에는 kafka를 테스트를 했던 CLI 환경이 아닌 springboot app에서 어떻게 사용하는지에 대해 알아보도록 하겠다.


일단 zookeeper 및 kafka 설치는 위의 링크를 따라가서 기본적인 kafka 환경을 구축해 두도록 하자. 

 

예제까지 한번 다 해봤다면 다음은 STS에서 프로젝트를 만드는 것부터 해보도록 하자. 이미 프로젝트가 있고 kafka 연동 추가를 해야 하는 사람은 아래의 kafka dependency만 추가해주면 된다. 

 

1. Spring Starter Project 생성

spring starter project 생성
kafka sample project 생성
kafka dependency 추가

Spring for Apache Kafka, Spring Web starter를 추가해준다. Finish

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

그럼 위와 같이 spring-boot-starter-web, spring-kafka dependency가 추가되는것을 확인할 수 있다. 

 

2. Springboot에서의 Kafka 설정

일단 설치한 kafka의 서버 정보는 localhost:9092이고 발행한 topic은 oingdaddy였다. 이 정보를 springboot의 application.yml 에서 연결을 시켜줘야 한다. 

spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: group-id-oing
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

연결을 하기 위해 application.yml 에 위와 같은 정보를 작성해준다. springboot application이 consumer의 역할도, producer의 역할도 할 수 있으므로 둘다 설정을 하였다. bootstrap-servers에 localhost:9092를 설정해주고 consumer 설정에서 group-id는 유일하게 식별할 수 있는 값을 넣어준다. consumer는 받는 입장이기에 key-value를 역직렬화해서 받고 producer는 직렬화를 해서 내보내주는 설정이다. kafka property에 대한 설명은 이 글을 참조하도록 하자. 

 

3. Consumer 코드 작성 및 테스트 

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

@Service
public class KafkaSampleConsumerService {

    @KafkaListener(topics = "oingdaddy", groupId = "group-id-oing")
    public void consume(String message) throws IOException {
        System.out.println("receive message : " + message);
    }
}

위와 같이 서비스 클래스를 하나 만든다. 메소드 위에 @KafkaListener라는 설정을 해주고 여기에 내가 받을 topic에 대한 정보와 application.yml 에서 설정했던 groupId를 넣어주도록 하자. 끝이다. 그리고 서버를 기동해준다.

Consumer라는것은 Producer가 보내는걸 받아주는 역할을 하기 때문에 만약 Producer가 보낸 메세지가 있다면 @KafkaListener에서 이를 받아준다. 

 

아직 Producer 부분은 구현을 하지 않았으므로 CLI에서 Producer로 메세지를 다음과 같이 보내보도록 하겠다. 

그럼 springboot application console에는 다음과 같이 메세지를 받아서 출력을 해줄것이다. 

위와 같이 보인다면 정상적으로 Consumer 구성이 완료가 되었음을 알 수 있다. 

 

4. Producer 코드 작성 및 테스트

Producer Service Class

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaSampleProducerService {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    public void sendMessage(String message) {
        System.out.println("send message : " +  message);
        this.kafkaTemplate.send("oingdaddy", message);
    }
}

Service 클래스에서 봐야 할점은 KafkaTemplate 이라는 녀석이다. RestTemplate처럼 application.yml에 설정해놓은 Kafka 서버로 바로 통신할 수 있게 해주는 역할을 한다. KafkaTemplate 을 주입받고 이것을 통해 message를 보낸다. 그리고 send() 에서는 첫번째 파라미터로 topic을 두번째 파라미터로는 실제로 보낼 메세지를 넣어주면 된다. 

 

Producer Controller Class

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.example.demo.service.KafkaSampleProducerService;

@RestController
public class KafkaSampleProducerController {
    
    @Autowired
    private KafkaSampleProducerService kafkaSampleProducerService;

    @PostMapping(value = "/sendMessage")
    public void sendMessage(String message) {
    	kafkaSampleProducerService.sendMessage(message);
    }
}
	

Controller에서는 위에서 만든 Service를 연결해 주는 역할을 하면 된다. Kafka와는 관계 없이 일반적인 RestController의 모습이라고 보면 된다. 화면으로부터 넘어온 message를 받아서 Service로 전달해주는 역할을 한다. 

 

준비가 다 됐으면 서버를 기동시켜보자. 그리고 포스트맨이든 부메랑을 켜서 post 방식으로 메세지를 보내보자. 

 

부메랑

message에 oing is cute라는 값을 넣었다. 그리고 전송을 해보자. 

send message : oing is cute
2021-04-15 23:38:37.794  INFO 22672 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [localhost:9092]
    ...
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2021-04-15 23:38:37.812  INFO 22672 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.6.0
2021-04-15 23:38:37.812  INFO 22672 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 62abe01bee039651
2021-04-15 23:38:37.812  INFO 22672 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1618497517812
2021-04-15 23:38:37.817  INFO 22672 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: Sqcbuk8cS4m-V_Htq7CJqA
receive message : oing is cute

1라인에서 보이는건 Producer가 보낸 메세지이고 15라인에서 보이는것은 Consumer가 받은 메세지이다. Producer가 보낸 메세지 (pub) 를 Kafka가 잘 받아주었고 이를 다시 Consumer가 가져간 것 (sub) 이다. pub은 publish, sub은 subscribe를 뜻한다. 즉 발행하고 구독하는 관계라고 보면 된다. 

 

놀랄만큼 간단하게 연동이 가능했다. 거기다가 성능까지 좋으니 사람들이 많이 사용하는데는 다 이유가 있는것 같다. 

 

끝!

댓글
최근에 올라온 글
최근에 달린 댓글
«   2024/12   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30 31