[RabbitMQ] 메시지 유실 방지 방법(mandatory, publisher confirm, alternate exchange)
들어가기 전
메시지 발행은 메시징 기반 아키텍처의 핵심 동작 중 하나로 RabbitMQ에는 메시지 발행에 대한 여러 설정이 있습니다.
애플리케이션에서 사용 가능한 다양한 메시지 발행 옵션은 애플리케이션의 성능과 안정성에 많은 영향을 줍니다.
메시지 브로커는 높은 처리량과 빠른 성능도 중요하지만, 무엇보다 메시지를 신뢰할 수 있게 전달하는 것이 핵심 역할입니다.
의도한 대로 메시지가 발행되었음에도 불구하고 메시지가 누락된다면 서비스 전반에 치명적인 문제가 발생할 수 있습니다.
RabbitMQ와 같은 소프트웨어는 대기 중인 메시지를 확실히 전달하는 것이 중요합니다.
AMQP 스펙에서는 메시지를 발행할 때 트랜잭션을 제공하고 있으며 메시지를 디스크에 저장하는 경우 일반적인 메시지 발행보다 높은 수준의 안정적인 메시지 환경을 제공할 수 있습니다.
그래서 이번 포스팅에서는 메시지 발행 성능과 배달 보장의 성능 절충을 살펴보고 RabbitMQ가 자동으로 발행자의 메시지를 조절하는 방식에 대해서 알아보겠습니다.
메시지 발행속도와 배달 보장의 균형
RabbitMQ 서버를 재부팅하더라도 메시지를 유지하기 위한 기능들은 일부 애플리케이션에서는 성능 저하로 인해 적합하지 않을 수 있습니다.
반면 추가적인 배달 보장 없이 메시지를 발행하면 처리 속도는 훨씬 빨라지지만 메시지 유실이 치명적인 애플리케이션에는 안전한 환경을 제공하지 못하는 트레이드오프가 존재합니다.
RabbitMQ에서 배달 보장을 달성하기 위해 설계된 각 메커니즘은 성능 영향을 미칩니다.
처리량에 큰 차이 없다고 느낄 수 있지만 각 메커니즘을 조합해 사용하면 메시지 처리량에 상당한 영향을 미칠 수 있습니다.
올바른 솔루션을 위한 고성능 메시지 배달 보장 사이에 적절한 균형을 찾는 과정에서 아래 질문들을 염두하시는 걸 바랍니다.
- 발행 시에 메시지를 큐에 넣는 것이 얼마나 중요한가
- 메시지를 라우팅 할 수 없는 경우 발행자에게 메시지를 보내야 하는가
- 메시지를 라우팅 할 수 없는 경우 차후에 조정하는 다른 곳으로 메시지를 보내야 하는가
- RabbitMQ 서버에 장애가 발생했을 때 메시지가 손실돼도 괜찮은가
- RabbitMQ가 새 메시지를 처리할 때 요청한 모든 메시지를 라우팅 한 후 디스크에 저장하는 작업이 정상적으로 수행했는지 발행자가 확인해야 하는가
- 발행자가 메시지를 한꺼번에 전달하면 RabbitMQ는 메시지를 라우팅 하고 디스크에 저장한 후 작업이 정상적으로 실행됐는지를 발행자에게 다시 알려야 하는가
- 다수 메시지를 라우팅 한 후 디스크에 정상적으로 저장됐는지 확인하는 작업을 일괄 처리하는 경우 메시지를 저장할 큐에 원자 커밋이 필요한가
- 발행자가 적절한 성능과 메시지 처리량을 달성하는데 메시지 배달 보장 기능 간에 절충점이 있는가
- 메시지 발행의 다른 측면이 메시지 처리량 및 성능에 영향을 미치는가
mandatory 플래그를 설정한 메시지를 라우팅 할 수 없을 때
서버 모니터링 데이터가 항상 RabbitMQ로 배달되도록 보장하려면 RabbitMQ에 발행하는 메시지의 mandatory를 설정합니다.
mandatory 플래그는 Basic.Publish RPC 명령과 함께 전달되는 인수입니다.
메시지를 라우팅 할 수 없으면 Basic.Return RPC를 통해 RabbitMQ가 메시지를 발행자에게 다시 보내도록 지시합니다.
mandatory 플래그는 오류 감지 모드로 사용하는 게 아니라 메시지 라우팅 실패를 알리는 데 사용합니다.
메시지 라우팅이 올바르게 처리되면 발행자에게 별도의 메시지를 전송하지 않습니다.
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import study.rabbitmq.config.RabbitMqConfig;
public class MandatoryFlag {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = RabbitMqConfig.connectionFactory();
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("BasicReturnExchange", BuiltinExchangeType.DIRECT);
try {
byte[] message = "BasicReturnMessage".getBytes();
channel.basicPublish("BasicReturnExchange", "routingKey", true, new BasicProperties(),
message);
}catch (Exception e){
throw new RuntimeException(e);
}
}
}
mandatory 플래그를 설정하고 메시지 라우팅이 실패했을 때 오류가 발생하는 게 아니라서 위와 같이 할 경우에는 애플리케이션은 아무런 행동을 수행하지 않습니다.
basicPulibsh는 비동기 방식으로 수행되기 때문에 라우팅이 실패 여부는 서버가 나중에 판단하여 콜백으로 알려줍니다.
아래 이미지를 보면 많은 시간이 지났음에도 예외가 발생하지 않는 것을 확인할 수 있습니다.

위처럼 mandatory 플래그는 예외가 발생하는 게 아니라 Basic.Return을 반환합니다.
그래서 아래 코드와 같이 ReturnListener을 등록하여 라우팅 실패된 내용에 대해서 직접 받아서 처리하여야 합니다.
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import study.rabbitmq.config.RabbitMqConfig;
public class MandatoryFlag {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = RabbitMqConfig.connectionFactory();
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("BasicReturnExchange", BuiltinExchangeType.DIRECT);
channel.addReturnListener((replyCode, replyText, exchange,
routingKey, properties, body) -> {
System.out.println("=== Message Returned ===");
System.out.println("replyCode : " + replyCode);
System.out.println("replyText : " + replyText);
System.out.println("exchange : " + exchange);
System.out.println("routingKey : " + routingKey);
System.out.println("body : " + new String(body));
});
byte[] message = "BasicReturnMessage".getBytes();
channel.basicPublish("BasicReturnExchange", "routingKey", true, new BasicProperties(),message);
}
}
channel.addReturnListener을 통해 라우팅 실패 했을 때 처리할 수 있는 ReturnListener을 등록하였습니다.
그래서 라우팅이 실패하면 아래와 같이 지정한 출력값이 나오는 것을 확인할 수 있습니다.

replyCode의 312 값은 라우팅 실패를 의미합니다.

발행자가 메시지 잘 전달되었는지 확인하는 방법
AMQP 스펙의 확장 기능으로, RabbitMQ 확장을 지원하는 클라이언트 라이브러리를 통해 메시지 발행 결과를 확인할 수 있습니다.
디스크에 메시지를 저장하는 것만으로는 메시지 손실을 방지할 수는 있지만, 이것만으로 발행자와 RabbitMQ 사이에 메시지가 정상적으로 전달되었음을 보장할 수는 없습니다.
이를 보완하기 위해 발행자는 메시지를 전송하기 전에 채널을 confirm 모드로 전환하기 위해 Confirm.Select RPC 요청을 전송하며, 이후 발행된 각 메시지에 대해 RabbitMQ는 Basic.Ack 또는 Basic.Nack으로 처리 결과를 반환합니다.
이때 각 메시지는 채널 단위로 증가하는 delivery tag(시퀀스 번호)를 가지며, 서버는 해당 번호를 기준으로 메시지 수신 여부를 확인합니다.

Basic.Ack 요청은 발행된 메시지를 라우팅 한 모든 큐의 소비자 애플리케이션이 직접 사용하거나 메시지를 큐의 디스크에 저장할 때 발행자에게 전송됩니다.
메시지를 라우팅 할 수 없는 경우 메시지 브로커는 오류를 뜻하는 Basic.Nack RPC 요청을 반환합니다.
이후 발행자는 메시지를 어떻게 처리할지 결정합니다.
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.AMQP.Confirm.SelectOk;
import com.rabbitmq.client.AMQP.Exchange.DeclareOk;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import study.rabbitmq.config.RabbitMqConfig;
public class MessageConfirmDeliveryProducer {
public static void main(String[] args)
throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = RabbitMqConfig.connectionFactory();
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
DeclareOk exchange = channel.exchangeDeclare("exchange",
BuiltinExchangeType.DIRECT);//익스체인지 선언
SelectOk selectOk = channel.confirmSelect();
channel.basicPublish("exchange","routingKey", new BasicProperties(), new String("메시지").getBytes() );
channel.waitForConfirmsOrDie(); // 또는 channel.waitForConfirms()
}
}
- channel.ConfirmSelect : 해당 채널에 대해서 confirm mode(확인모드)로 설정
- SelectOk : 채널에 대해서 confirm mode로 설정된 것을 의미
- channel.waitFOrmConfirmsOrDie : 마지막 호출 이후 발행된 모든 메시지에 대해 브로커로부터 ACK 또는 NACK 응답이 올 때까지 대기합니다. 이 중 하나라도 NACK이 반환되면 IOException을 발생되고,
채널이 Confirm 모드가 아닐 경우 IllegalStateException 예외가 발생합니다.
라우팅 할 수 없는 메시지를 위한 대체 익스체인지 사용
익스체인지를 선언할 때 해당 익스체인지에서 라우팅 할 수 없는 메시지를 처리하기 위해 대체 익스체인지를 지정할 수 있습니다.
익스체인지에 바인딩된 큐가 없어서 라우팅 되지 못하는 메시지를 RabbitMQ에서 대체 익스체인지로 전달하도록 설정할 때 사용합니다.
기존 익스체인지에서 발행한 메시지에 설정된 mandatory 값은 유지되며 대체 익스체인지에서도 라우팅에 실패할 경우 mandatory=true 설정에 따라 메시지가 발행자에게 반환됩니다.
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import study.rabbitmq.config.RabbitMqConfig;
public class AlterExchangeProducer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = RabbitMqConfig.connectionFactory();
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String alterExchangeName = "alterExchangeName";
channel.exchangeDeclare(alterExchangeName, BuiltinExchangeType.FANOUT);
Map<String, Object> argMap = new HashMap<>();
argMap.put("alternate-exchange", alterExchangeName);
channel.exchangeDeclare("baseExchangeName", BuiltinExchangeType.DIRECT, false,
false, argMap);
channel.basicPublish("baseExchangeName","routingKey", new BasicProperties(), "대체 익스체인지에서 처리됨".getBytes());
}
}
발행자가 메시지를 발행하기 전에 대체 익스체인지를 선언하고 해당 값을 기존 익스체인지 설정값으로 설정합니다.
(참고 : 대체 익스체인지는 익스체인지를 선언할 때만 설정할 수 있어 이후에는 변경이 불가능합니다.)
그리고 기존 익스체인지로 메시지를 발행합니다.
import com.rabbitmq.client.AMQP.Queue.DeclareOk;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import study.rabbitmq.config.RabbitMqConfig;
public class AlterExchangeConsumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = RabbitMqConfig.connectionFactory();
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String alterExchangeName = "alterExchangeName";
channel.exchangeDeclare(alterExchangeName, BuiltinExchangeType.FANOUT);
DeclareOk declareOk = channel.queueDeclare();
channel.queueBind(declareOk.getQueue(), alterExchangeName, "");
channel.basicConsume(declareOk.getQueue(), false,
(consumerTag, delivery) -> {
System.out.println(new String(delivery.getBody(), StandardCharsets.UTF_8));
},
consumerTag -> {
System.out.println("소비자 취소됨");
});
}
}
메시지를 소비하는 소비자 측에서는 대체 익스체인지에 바인딩하고 있는 큐를 선언합니다.
그리고 메시지 발행자와 소비자 코드를 수행해 보면 기존 익스체인지를 바인딩하고 있는 큐가 존재하지 않다 보니 대체 익스체인지를 바인딩하고 있는 큐에서 메시지를 소비하는 것을 확인할 수 있습니다.
'RabbitMQ' 카테고리의 다른 글
| [RabbitMQ] 메시지 속성에 대하여 (0) | 2026.01.18 |
|---|---|
| [RabbitMQ] 익스체인지, 큐 생성 및 연결하는 방법과 메시지 발행 및 소비하는 과정 (0) | 2026.01.13 |
| [RabbitMQ] AMQP 프레임 유형 (0) | 2026.01.11 |
| [RabbitMQ] RabbitMQ 기능과 장점 (0) | 2026.01.05 |
댓글
이 글 공유하기
다른 글
-
[RabbitMQ] 메시지 속성에 대하여
[RabbitMQ] 메시지 속성에 대하여
2026.01.18 -
[RabbitMQ] 익스체인지, 큐 생성 및 연결하는 방법과 메시지 발행 및 소비하는 과정
[RabbitMQ] 익스체인지, 큐 생성 및 연결하는 방법과 메시지 발행 및 소비하는 과정
2026.01.13 -
[RabbitMQ] AMQP 프레임 유형
[RabbitMQ] AMQP 프레임 유형
2026.01.11 -
[RabbitMQ] RabbitMQ 기능과 장점
[RabbitMQ] RabbitMQ 기능과 장점
2026.01.05