[RabbitMQ] RabbitMQ 개념 및 Spring 연동
들어가기 전
이번 포스팅에서는 RabbitMQ에 대해서 알아보고 RabbitMQ와 Spring을 연동하는 방법에 대해서 알아보겠습니다.
RabbitMQ
RabbitMQ는 클라우드 환경, 온프레미스에 쉽게 배포할 수 있는 안정적인 메시징 및 스토리밍 브로커입니다.
메시지 큐*(Message Queue)를 통해 여러 애플리케이션에 데이터를 주고받을 수 있도록 해주기 위한 AMQP*의 구현체입니다.
AMQP란?
Advanced Message Queuing Protocol의 약자로 생산자(Producer)와 수신자(Consumer) 사이에서 메시지를 안전하게 교환하는 메시지 지향 미들웨어 개방형 프로토콜입니다.
메시지 큐(Message Queue)란?
프로세스 또는 프로그램 간에 데이터를 교환할 때 사용하는 통신방법으로 메시지 지향 미들웨어를 구현한 시스템을 의미합니다.
메시지 지향 미들웨어는 비동기 메시지를 사용하는 응용프로그램 사이에서 데이터를 송수신하는 것을 의미합니다.
여기서 메시지는 요청, 응답 및 오류 등 다양한 정보 등의 데이터가 될 수 있습니다.
메시지 큐를 사용하는 경우
메시지 큐는 생산자가 메시지를 보낼 때 소비자는 생산자가 보낸 메시즈를 언제 처리할 수 있는지 보장하지 않습니다.
사용자가 요청을 하고 응답을 기다려야 끝나는 프로세스에는 사용하지 않습니다.
예시로 회원가입, 비밀번호 변경 등 클라이언트의 요청이 들어오고 서버로부터의 응답에 따라 결과가 달라지는 경우에는 메시지 큐를 사용하지 않습니다.
대신 회원가입 시 회원가입에 대한 알림 메일, 메시지, 비밀번호 변경에 대한 알림 메일 등 클라이언트의 요청과 서버의 응답이 상호작용 안 하는 상황에서 메시지 큐를 사용할 수 있습니다.
RabbitMQ 처리 과정
- Producer : 메시지를 발행하는 생산자
- Exchange : 생산자가 발행한 메시지를 보관하고 있다가 알맞은 큐에 전달하는 매개체
- Queue : 생산자가 발행한 메시지를 보관하고 있다가 소비자가 소비할 때 소비자에게 전달
- Consumer : 생산자가 발행한 메시지를 구독하고 사용하는 소비자
- Binding : Exchange에게 알맞은 큐에 메시지를 라우팅 할 때 규칙을 지정하는 행위, Exchange의 종류에 따라 지정하는 방식이 달라집니다.
Exchange Type
- Direct : Routing Key와 일치하는 큐에 메시지를 전달합니다.
- FangOut : 브로드 캐스트 방식으로 Exchange에 바인딩된 모든 Queue에 메시지를 전달합니다.
- Topic : Routing Key 패턴과 일치하는 큐에 메시지를 전달합니다.
지금까지 RabbitMQ의 개념과 처리과정, Exchange Type 종류에 대해서 알아보았습니다. 이제 스프링과 연동하여 설정하는 방법과 Exchange Type 별로 어떻게 동작하는지 예제를 통해 확인해 보겠습니다.
(이번 포스팅에서는 3가지의 Exchange Type에 대해서 다루겠습니다.)
예제를 시작하기 전에 로컬에 RabbitMQ를 설치하고 실행하는 방법에 대해 먼저 알아보겠습니다.
필자는 도커를 사용하여 로컬에 RabbitMQ를 설치하고 실행시킬 것입니다.
(이번 포스팅에서는 도커를 다루는 게 아니기 때문에 도커 설치 방법, 명령어 설명은 생략하겠습니다.)
RabbitMQ 공식문서를 확인해 보면 도커로 설치하는 방법에 대해 나와있습니다.
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
- 5672:5672 : rabbitMQ의 포트
- 15672: 15672 : rabbitMQ의 Management 포트
위와 같이 명령어를 입력하고 현재 실행 중인 컨테이너를 확인해 보면 RabbitMQ가 있는 것을 확인할 수 있습니다.
rabbitMQ가 실행 중인 것을 확인하였다면 도커를 실행할 때 정의한 management 포트를 사용하여 Management 사이트를 확인할 수 있습니다.
기본 username과 password는 guest입니다. guest를 입력하면 아래와 같은 화면을 확인할 수 있습니다.
카테고리를 자세히 보면 Exchange 카테고리가 존재합니다. 해당 카테고리로 들어가면 아래와 이미지와 같이 위에서 간략하게 소개한 Exchange Type이 존재합니다.
지금까지 도커를 활용하여 RabbitMQ를 설치하는 방법에 대해서 알아보았습니다. 이제 설치된 RabbitMQ와 Spring Boot와 연동하는 방법에 대해서 알아보겠습니다.
1. 메시지를 생산하는 Producer와 소비하는 Consumer 프로젝트 생성
2. Producer 프로젝트와 Consumer 프로젝트 공통 설정
build.gradle
implementation 'org.springframework.boot:spring-boot-starter-amqp'
application.yml
spring:
rabbitmq:
port: 5672
host: localhost
3. Consumer 프로젝트 설정
application.yml
server:
port: 8000
@Configuration
public class RabbitMqConfig {
@Value("${spring.rabbitmq.port}")
private String port;
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setPort(Integer.parseInt(port));
connectionFactory.setHost(host);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory());
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
- 도커에서 실행한 RabbitMQ에 연결하기 위해 필요한 port, host, username, password값을 매핑해 주어 연결을 해줍니다.
지금까지 Producer와 Consumer에 필요한 설정에 대해 알아보았습니다. 이제 Exchange Type 별로 예제를 통해 알아보겠습니다.
Direct Exchange
Producer
@RestController
@RequiredArgsConstructor
public class RabbitMQController {
private final DirectExchangeService directExchangeService;
@GetMapping("/direct")
public void sendDirectExchange() {
directExchangeService.send();
}
}
@Service
@RequiredArgsConstructor
public class DirectExchangeService {
private final RabbitTemplate rabbitTemplate;
private final String DIRECT_EXCHANGE_NAME = "direct_exchange";
private final String DIRECT_QUEUE_ROUTING_KEY = "direct_routing_key";
public void send() {
SendMessage sendMessage = new SendMessage("https://hoestory.tistory.com/", "hoestory",
"RabbitMQ");
rabbitTemplate.convertAndSend(DIRECT_EXCHANGE_NAME,DIRECT_QUEUE_ROUTING_KEY,sendMessage);
}
}
- Producer는 Consumer에서 설정해 두었던 Exchange, 라우팅 키와 Consumer에 전달할 메시지를 발행합니다. 발행하는 Exchange와 라우팅키와 동일한 Consumer가 있을 경우 해당 메시지를 처리합니다.
@AllArgsConstructor
@Getter
public class SendMessage {
private String blogAddress;
private String blogName;
private String blogSubject;
}
Consumer
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectRabbitMqConfig {
private final String DIRECT_QUEUE_NAME = "direct_queue";
private final String DIRECT_EXCHANGE_NAME = "direct_exchange";
private final String DIRECT_QUEUE_ROUTING_KEY = "direct_routing_key";
@Bean
public Queue queue() {
return new Queue(DIRECT_QUEUE_NAME);
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange(DIRECT_EXCHANGE_NAME);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(directExchange()).with(DIRECT_QUEUE_ROUTING_KEY);
}
}
- queue() : 큐 이름을 설정합니다.
- directExchange() : Exchange 이름을 설정합니다.
- binding() : Exchange가 어떤 Queue에 메시지를 보낼지 라우팅 키와 함께 설정을 합니다.
@Component
public class DirectListener {
@RabbitListener(queues = "direct_queue")
public void getMessage(SendMessage sendMessage) {
System.out.println(sendMessage);
}
}
- Producer가 Consumer의 Config 파일에서 설정했던 라우팅 키와 Queue이름으로 메시지가 발행될 경우 발행된 메시지의 큐 이름과 동일한 @RabbitListener를 찾아서 처리합니다.
@AllArgsConstructor
@Getter
public class SendMessage {
private String blogAddress;
private String blogName;
private String blogSubject;
@Override
public String toString() {
return "" +
"블로그 주소='" + blogAddress + '\'' +
", 블로그 이름='" + blogName + '\'' +
", 블로그 주제='" + blogSubject + '\'' +
'}';
}
}
위에서 간략하게 설명한 DirectExchange는 Producer가 발행한 이벤트의 라우팅 키와 Consumer에서 구독하는 큐의 라우팅 키가 동일할 경우에만 정상적으로 동작합니다.
만약 Consumer에서 설정한 라우팅 키와 Producer에서 발행을 위해 설정한 라우팅키와 일치하지 않을 경우는 Consumer에서는 아무런 처리를 하지 않습니다.
그리고 위와 같이 Exchange, Queue를 설정을 하면 Management에서 아래와 같이 확인할 수 있습니다.
Fanout Exchange
Producer
@RestController
@RequiredArgsConstructor
public class RabbitMQController {
private final FanOutExchangeService fanOutExchangeService;
@GetMapping("/fanout")
public void sendFanoutExchange() {
fanOutExchangeService.send();
}
}
@Service
@RequiredArgsConstructor
public class FanoutExchangeService {
private final RabbitTemplate rabbitTemplate;
private final String DIRECT_EXCHANGE_NAME = "fanout_exchange";
public void send() {
SendMessage sendMessage = new SendMessage("https://hoestory.tistory.com/", "hoestory",
"RabbitMQ");
rabbitTemplate.convertAndSend(DIRECT_EXCHANGE_NAME,null, sendMessage);
}
}
- fanout은 브로드캐스트 방식으로 동작하기 때문에 이벤트를 구독하고 있는 모든 큐에 메시지를 발행하기 때문에 라우팅 키는 Null을 가집니다.
Consumer
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutRabbitMqConfig {
private final String EXCHANGE_NAME = "fanout_exchange";
private final String QUEUE_NAME = "fanout_queue";
@Bean
public Queue fanoutQueue() {
return new Queue(QUEUE_NAME);
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(EXCHANGE_NAME);
}
@Bean
public Binding fanoutBinding() {
return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
}
}
- Direct 방식과 달리 라우팅 키 설정하는 부분이 존재하지 않습니다.
@Component
public class FanoutListener {
@RabbitListener(queues = "fanout_queue")
public void getMessage(SendMessage sendMessage){
System.out.println(sendMessage);
}
}
Direct와 동일하게 Fanout Exchange와 Queue가 Management에 노출되는 것을 확인할 수 있습니다.
Topic Exchange
Producer
@RestController
@RequiredArgsConstructor
public class RabbitMQController {
private final TopicExchangeService topicExchangeService;
@GetMapping("/topic")
public void sendTopicExchange() {
topicExchangeService.send();
}
}
@Service
@RequiredArgsConstructor
public class TopicExchangeService {
private final RabbitTemplate rabbitTemplate;
private final String TOPIC_EXCHANGE_NAME = "direct_exchange";
private final String TOPIC_QUEUE_ROUTING_KEY = "direct_routing_key.sendMessage";
public void send() {
SendMessage sendMessage = new SendMessage("https://hoestory.tistory.com/", "hoestory",
"RabbitMQ");
rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME,TOPIC_QUEUE_ROUTING_KEY, sendMessage);
}
}
Consumer
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicRabbitMqConfig {
private final String TOPIC_QUEUE_NAME = "topic_queue";
private final String TOPIC_EXCHANGE_NAME = "topic_exchange";
private final String TOPIC_QUEUE_ROUTING_KEY = "topic_routing_key.#";
@Bean
public Queue topicQueue() {
return new Queue(TOPIC_QUEUE_NAME);
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE_NAME);
}
@Bean
public Binding topicBinding() {
return BindingBuilder.bind(topicQueue()).to(topicExchange()).with(TOPIC_QUEUE_ROUTING_KEY);
}
}
- Topic Exchange는 Producer로부터 발행된 이벤트의 라우팅 키가 Consumer에서 정의한 라우팅 키의 패턴에 적합할 경우 정의한 큐 이름과 동일한 @RabbitListener을 찾아서 이벤트를 처리합니다.
- Producer에서는 라우팅 키를 topic_routing_key.sendMessage로 정의하였고 Consumer에서는 topic_routing_key.#으로 패턴을 정의하였습니다. Producer에서 정의한 라우팅 키가 Consumer에 정의한 패턴에 적합하여 정상 동작합니다.
@Component
public class TopicListener {
@RabbitListener(queues = "topic_queue")
public void getMessage(SendMessage sendMessage){
System.out.println(sendMessage);
}
}
Direct, Fanout과 동일하게 Topic Exchange와 Queue가 Management에 노출되는 것을 확인할 수 있습니다.
Github
예제에 사용된 코드는 아래 링크에서 자세히 살펴볼 수 있습니다.
Producer : https://github.com/cousim46/study-collection/tree/main/rabbitMQ-sender/sender
Consumer : https://github.com/cousim46/study-collection/tree/main/rabbitMQ-receiver/receiver
'Spring Boot' 카테고리의 다른 글
[Spring Boot] 트랜잭션 전파과정에 대하여 (0) | 2024.11.10 |
---|---|
[Spring] Checked Exception, Unchecked Exception의 트랜잭션 처리 방식 (2) | 2024.06.26 |
[Spring] @DataJpaTest에서 Auditing 적용 안되는 현상 (0) | 2024.05.21 |
[Spring] 나태지옥에 빠지지 않기 위한 스케줄러 (1) | 2024.04.07 |
[Spring Boot] Spring Boot + Kotlin + AWS S3를 이용한 이미지 다루는 방법 (0) | 2024.03.18 |
댓글
이 글 공유하기
다른 글
-
[Spring Boot] 트랜잭션 전파과정에 대하여
[Spring Boot] 트랜잭션 전파과정에 대하여
2024.11.10 -
[Spring] Checked Exception, Unchecked Exception의 트랜잭션 처리 방식
[Spring] Checked Exception, Unchecked Exception의 트랜잭션 처리 방식
2024.06.26 -
[Spring] @DataJpaTest에서 Auditing 적용 안되는 현상
[Spring] @DataJpaTest에서 Auditing 적용 안되는 현상
2024.05.21 -
[Spring] 나태지옥에 빠지지 않기 위한 스케줄러
[Spring] 나태지옥에 빠지지 않기 위한 스케줄러
2024.04.07