티스토리 뷰
지난 시간에는 kafka를 설치하여 topic을 발행하고 producer가 메세지를 보내고 consumer가 메세지를 받는 것을 예제를 통해 알아보았다. 이번에는 kafka를 테스트를 했던 CLI 환경이 아닌 springboot app에서 어떻게 사용하는지에 대해 알아보도록 하겠다.
일단 zookeeper 및 kafka 설치는 위의 링크를 따라가서 기본적인 kafka 환경을 구축해 두도록 하자.
예제까지 한번 다 해봤다면 다음은 STS에서 프로젝트를 만드는 것부터 해보도록 하자. 이미 프로젝트가 있고 kafka 연동 추가를 해야 하는 사람은 아래의 kafka dependency만 추가해주면 된다.
1. Spring Starter Project 생성
Spring for Apache Kafka, Spring Web starter를 추가해준다. Finish
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
그럼 위와 같이 spring-boot-starter-web, spring-kafka dependency가 추가되는것을 확인할 수 있다.
2. Springboot에서의 Kafka 설정
일단 설치한 kafka의 서버 정보는 localhost:9092이고 발행한 topic은 oingdaddy였다. 이 정보를 springboot의 application.yml 에서 연결을 시켜줘야 한다.
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: group-id-oing
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
연결을 하기 위해 application.yml 에 위와 같은 정보를 작성해준다. springboot application이 consumer의 역할도, producer의 역할도 할 수 있으므로 둘다 설정을 하였다. bootstrap-servers에 localhost:9092를 설정해주고 consumer 설정에서 group-id는 유일하게 식별할 수 있는 값을 넣어준다. consumer는 받는 입장이기에 key-value를 역직렬화해서 받고 producer는 직렬화를 해서 내보내주는 설정이다. kafka property에 대한 설명은 이 글을 참조하도록 하자.
3. Consumer 코드 작성 및 테스트
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Service
public class KafkaSampleConsumerService {
@KafkaListener(topics = "oingdaddy", groupId = "group-id-oing")
public void consume(String message) throws IOException {
System.out.println("receive message : " + message);
}
}
위와 같이 서비스 클래스를 하나 만든다. 메소드 위에 @KafkaListener라는 설정을 해주고 여기에 내가 받을 topic에 대한 정보와 application.yml 에서 설정했던 groupId를 넣어주도록 하자. 끝이다. 그리고 서버를 기동해준다.
Consumer라는것은 Producer가 보내는걸 받아주는 역할을 하기 때문에 만약 Producer가 보낸 메세지가 있다면 @KafkaListener에서 이를 받아준다.
아직 Producer 부분은 구현을 하지 않았으므로 CLI에서 Producer로 메세지를 다음과 같이 보내보도록 하겠다.
그럼 springboot application console에는 다음과 같이 메세지를 받아서 출력을 해줄것이다.
위와 같이 보인다면 정상적으로 Consumer 구성이 완료가 되었음을 알 수 있다.
4. Producer 코드 작성 및 테스트
Producer Service Class
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaSampleProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message) {
System.out.println("send message : " + message);
this.kafkaTemplate.send("oingdaddy", message);
}
}
Service 클래스에서 봐야 할점은 KafkaTemplate 이라는 녀석이다. RestTemplate처럼 application.yml에 설정해놓은 Kafka 서버로 바로 통신할 수 있게 해주는 역할을 한다. KafkaTemplate 을 주입받고 이것을 통해 message를 보낸다. 그리고 send() 에서는 첫번째 파라미터로 topic을 두번째 파라미터로는 실제로 보낼 메세지를 넣어주면 된다.
Producer Controller Class
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.example.demo.service.KafkaSampleProducerService;
@RestController
public class KafkaSampleProducerController {
@Autowired
private KafkaSampleProducerService kafkaSampleProducerService;
@PostMapping(value = "/sendMessage")
public void sendMessage(String message) {
kafkaSampleProducerService.sendMessage(message);
}
}
Controller에서는 위에서 만든 Service를 연결해 주는 역할을 하면 된다. Kafka와는 관계 없이 일반적인 RestController의 모습이라고 보면 된다. 화면으로부터 넘어온 message를 받아서 Service로 전달해주는 역할을 한다.
준비가 다 됐으면 서버를 기동시켜보자. 그리고 포스트맨이든 부메랑을 켜서 post 방식으로 메세지를 보내보자.
message에 oing is cute라는 값을 넣었다. 그리고 전송을 해보자.
send message : oing is cute
[2m2021-04-15 23:38:37.794[0;39m [32m INFO[0;39m [35m22672[0;39m [2m---[0;39m [2m[nio-8080-exec-1][0;39m [36mo.a.k.clients.producer.ProducerConfig [0;39m [2m:[0;39m ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [localhost:9092]
...
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
[2m2021-04-15 23:38:37.812[0;39m [32m INFO[0;39m [35m22672[0;39m [2m---[0;39m [2m[nio-8080-exec-1][0;39m [36mo.a.kafka.common.utils.AppInfoParser [0;39m [2m:[0;39m Kafka version: 2.6.0
[2m2021-04-15 23:38:37.812[0;39m [32m INFO[0;39m [35m22672[0;39m [2m---[0;39m [2m[nio-8080-exec-1][0;39m [36mo.a.kafka.common.utils.AppInfoParser [0;39m [2m:[0;39m Kafka commitId: 62abe01bee039651
[2m2021-04-15 23:38:37.812[0;39m [32m INFO[0;39m [35m22672[0;39m [2m---[0;39m [2m[nio-8080-exec-1][0;39m [36mo.a.kafka.common.utils.AppInfoParser [0;39m [2m:[0;39m Kafka startTimeMs: 1618497517812
[2m2021-04-15 23:38:37.817[0;39m [32m INFO[0;39m [35m22672[0;39m [2m---[0;39m [2m[ad | producer-1][0;39m [36morg.apache.kafka.clients.Metadata [0;39m [2m:[0;39m [Producer clientId=producer-1] Cluster ID: Sqcbuk8cS4m-V_Htq7CJqA
receive message : oing is cute
1라인에서 보이는건 Producer가 보낸 메세지이고 15라인에서 보이는것은 Consumer가 받은 메세지이다. Producer가 보낸 메세지 (pub) 를 Kafka가 잘 받아주었고 이를 다시 Consumer가 가져간 것 (sub) 이다. pub은 publish, sub은 subscribe를 뜻한다. 즉 발행하고 구독하는 관계라고 보면 된다.
놀랄만큼 간단하게 연동이 가능했다. 거기다가 성능까지 좋으니 사람들이 많이 사용하는데는 다 이유가 있는것 같다.
끝!
'Framework > Spring' 카테고리의 다른 글
Springboot + Redis 연동하는 예제 (2) 세션 클러스터링 (0) | 2021.04.16 |
---|---|
Springboot + Redis 연동하는 예제 (1) 기본 (1) | 2021.04.16 |
Springboot Kafka 설정 application.properties 목록 (0) | 2021.04.15 |
Springboot application.properties (or .yml) 속성 종류 (0) | 2021.04.14 |
Spring Message Default Value 설정하기 (0) | 2021.04.12 |