들어가기 전

이번 포스팅에서는 RabbitMQ에서 메시지 발행과 소비하는 과정에 대해서 알아보고 Java와 RabbitMQ 라이브러리를 이용해서 익스체인지와 큐를 연결하는 방법에 대해서 알아보겠습니다.

 

메시지를 발행하고 소비하기 전에 선행되어야 할 작업이 있습니다.
메시지를 발행하고 소비하려면 익스체인지와 큐를 각각 선언한 후 서로 연결해야 합니다.
먼저 익스체인지와 큐를 선언하고 연결하는 방법에 대해 알아보겠습니다.

 

익스체인지 선언하는 방법

Exchange.Declare 명령에 익스체인지 이름, 유형, 그리고 메시지 처리에 필요한 메타데이터를 전달하면 RabbitMQ는 익스체인지를 선언합니다.

익스체인지를 생성한 후 Exchange.DeclareOk 메서드 프레임을 응답으로 전송합니다.

특정 이유로 Exchange.Declare 명령이 실패하는 경우 RabbitMQ는 익스체인지를 생성하는 것을 실패하고 채널이 닫힌 이유를 나타내는 숫자 응답코드와 텍스트 값을 Channel.Close 명령에 포함하고 Exchangle.Declare 명령이 전송된 채널을 닫습니다.

 

 

 

 

큐 선언하는 방법

익스체인지 생성한 후 RabbitMQ에 Queue.Declare 명령을 보내 큐를 생성합니다.

익스체인지 선언하는 방법과 유사하게 큐를 생성한 후 Queue.DeclareOk 메서드 프레임을 응답으로 전송합니다.

문제가 발생하면  채널을 닫습니다.

큐를 선언할 때 동일한 Queue.Declare 명령을 두 번 이상 전송해도 문제가 발생하지 않습니다.

RabbitMQ는 중복된 큐 선언을 감지해 큐에 대기 중인 메시지의 수와 구독 중인 구독자의 수와 같이 큐에 대한 유용한 상태를 반환합니다.

반면에 이미 생성한 큐와 같은 이름이지만 속성이 다른 큐를 선언하려고 시도하면 RabbitMQ는 RPC 요청을 발행한 채널을 닫습니다.

 

 

위와 같이 정상적으로 익스체인지와 큐를 생성이 되는 과정에 대해서 알아보았습니다.

이제 문제가 발생하여 채널이 닫힐 때 에러를 처리하는 방법에 대해서 알아보겠습니다.

 

에러 처리

 

클라이언트 애플리케이션이 서버에서 전송하는 이벤트를 수신하지 않거나 적절하게 처리하지 않으면 메시지 손실이 될 수 있습니다.

존재하지 않거나 이미 닫힌 채널에 메시지를 발행하는 경우 RabbitMQ는 연결을 종료합니다.

메시지를 소비하는 애플리케이션이 RabbitMQ 채널이 닫힌 사실을 모르는 경우 빈 큐를 구독하고 있다고 판단하여 문제가 발생합니다.

이와 같은 문제를 정상적으로 처리하려면 클라이언트 애플리케이션이  RabbitMQ로부터 Channel.Close 명령을 전달받아 적절하게 응답해야 합니다.

특정 클라이언트 라이브러리는 에러 응답을 애플리케이션이 처리할 수 있는 예외로 처리하며 다른 유형의 라이브러리는 사용자가 메서드를 등록할 때 콜백을 추가하도록 하고 Channel.Close 명령을 보낼 때 콜백을 호출하는 식으로 처리하기도 합니다.

 

 

익스체인지와 큐 연결하는 방법

 

익스체인지와 큐가 생성되면 연결해야 합니다.

큐를 익스체인지에 연결하는 Queue.Bind 명령은 한 번에 하나의 큐만 바인딩할 수 있습니다.

위에서 익스체인지와 큐를 생성할 때 사용했던 Exchange.Declare와 Queue.Declare 명령과 유사하게 Queue.Bind 명령은 실행이 성공적으로 처리되는 경우 클라이언트 애플리케이션에 Queue.BindOk 메서드 프레임을 전송합니다.

 

 

메시지 발행

 

RabbitMQ에 메시지를 발행할 때 여러 종류의 프레임들이 서버로 전송되는 메시지의 데이터를 캡슐화합니다.

실제 메시지 본문을 RabbitMQ에 전달하기 전에 클라이언트 애플리케이션은 Basic.Publish 메서드 프레임, 콘텐츠 헤더 프레임, 하나 이상의 바디 프레임을 전송합니다.

RabbitMQ는 Basic.Publish 메서드 프레임에 포함된 정보를 기반으로 익스체인지와 라우팅 키의 유효성을 검증합니다.

그리고 내부에 저장된 익스체인지 정보와 Basic.Publish 메서드 프레임에 포함된 익스체인지 이름 및 라우팅 키를 비교합니다.

그리고 Basic.Publish 메서드 프레임의 익스체인지 이름과 일치하는 익스체인지를 발견한 후 내부 바인딩을 평가하며 라우팅 키와 일치하는 큐를 찾습니다.

일치하는 큐가 존재하면 RabbitMQ서버는 선입선출(FIFO) 순서로 메시지를 큐에 삽입합니다. 여기서 RabbitMQ는 실제 메시지의 값 대신 메시지를 참조할 수 있는 형태로 관리합니다. 메시지를 전달할 준비가 되면 RabbitMQ는 메시지를 AMQP 프레임 형태로 직렬화하여 소비자에게 전송합니다.

하나의 메시지가 여러 큐로 라우팅 되는 경우에도 RabbitMQ는 메시지 본문을 불필요하게 중복 저장하지 않도록 내부적으로 관리하여 메모리를 적게 사용할 수 있습니다.

특정 큐에 존재하던 메시지의 값이 소비되거나 만료 또는 유휴로 인해 제거될 때 다른 큐의 메시지 처리에 영향을 주지 않습니다.

메시지가 모든 큐에서 제거되면 RabbitMQ는 해당 메시지와 관련된 내부 리소스를 정리합니다.

메시지의 콘텐츠 헤더 프레임에 지정된 Delivery Mode에 따라 메시지를 메모리에 보관하거나 디스크에 기록하는데 큐를 구독하는 소비자가 없을 경우 메시지를 소비하지 않고 큐에 계속 보관하고 있기 때문에 메시지가 추가될수록 큐의 크기가 커져서 메모리 또는 디스크 사용량이 늘어납니다.

 

 

메시지 소비

메시지가 발행되면 큐에 삽입을 합니다.

이때 소비자 애플리케이션은 발행된 메시지를 소비하기 위해 Basic.Consume명령을 실행해서 RabbitMQ의 큐를 구독합니다.

RabbitMQ 서버에서는 해당 명령에 대해 Basic.ConsumeOk로 응답해 클라이언트가 연속해서 메시지를 받을 준비를 하도록 알립니다.

RabbitMQ는 소비자 등록 상태와 QoS 설정(prefetch count)에 따라 Basic.Deliver 메서드 프레임, 콘텐츠 헤더, 바디 프레임으로 메시지를 전달합니다.

Basic.Consume이 실행된 순간부터 특정 상황이 발생하기 전까지 활성상태를 유지하고  메시지 수신을 중지하려면 Basic.Cancel 명령을 발행해야 합니다.

RabbitMQ가 계속해서 메시지를 보내는 동안에 명령이 비동기적으로 실행되는데 소비자는 Basic.CancelOk 응답 프레임을 받기 전에 RabbitMQ가 미리 할당한 메시지 수만큼 메시지를 받을 수 있습니다.

RabbitMQ는 소비자가 메시지를 소비할 때 수신하는 방식에 대해 알 수 몇 가지 설정 있습니다. 

그중 하나는 Basic.Consume 명령의 no_ack 인수입니다. no_ack를 true로 설정하면 RabbitMQ는 메시지에 대한 명시적인 확인(Ack)을 기다리지 않고 메시지를 전송합니다.

false로 설정하면 소비자는 메시지를 수신한 후 Basic.Deliver 프레임에 포함된 delivery-tag를 사용하여 Basic.Ack 명령을 RabbitMQ로 전송해야 합니다.

Basic.Ack 응답프레임이 전송되면 소비자는 Basic.Deliver 메서드 프레임에 Delivery tag 인수를 전달해야 합니다. RabbitMQ는 배달 태그를 고유한 식별자로 사용해서 메시지 수신을 확인하거나 거절 또는 부정적인 수신을 확인합니다.

 

 

지금까지 익스체인지, 큐 생성 및 연결하는 방법과 메시지 발행과 소비하는 과정에 대해서 알아보았습니다.
이제 익스체인지, 큐 생성과 연결하는 방법을 Java와 RabbitMQ 라이브러리를 이용하여 예제를 만들면서 알아보겠습니다.

 

 

익스체인지 생성

 

import com.rabbitmq.client.AMQP.Exchange;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class ExchangeCreate {
    private static final String EXCHANGE_NAME = "exchangeName";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        Exchange.DeclareOk exchangeDeclareOk = channel.exchangeDeclare(EXCHANGE_NAME,
            BuiltinExchangeType.DIRECT);//익스체인지 선언 및 생성
    }

}

 

 

channel.exchangeDeclare에 익스체인지 이름과 타입을 지정하고 반환값으로 DeclareOk가 반환된 것을 확인할 수 있습니다.

위 코드는 익스체인지 선언하는 방법에서 설명했듯 클라이언트는 RabbitMQ 서버에 Exchange.Declare 요청을 전송하고, 서버는 Exchange.DeclareOk 응답을 반환합니다. 이를 통해 익스체인지가 선언되며, 존재하지 않을 경우 생성됩니다.

 

 

큐 생성

 

import com.rabbitmq.client.AMQP.Queue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class QueueCreate {
    private static final String QUEUE_NAME = "queueName";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        Queue.DeclareOk queueDeclareOk = channel.queueDeclare(QUEUE_NAME, false,false,false, Map.of());//큐 선언 및 생성
    }
}

 

익스체인지 생성하는 방법과 유사하게 channel.queueDeclare를 통해 큐를 생성하는 것을 확인할 수 있습니다.

 

익스체인지와 큐 연결

 

import com.rabbitmq.client.AMQP.Exchange;
import com.rabbitmq.client.AMQP.Queue;
import com.rabbitmq.client.AMQP.Queue.BindOk;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class ExchangeQueueConnection {
    private static final String QUEUE_NAME = "queueName";
    private static final String EXCHANGE_NAME = "exchangeName";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        Queue.DeclareOk queueDeclareOk = channel.queueDeclare(QUEUE_NAME, false,false,false, Map.of());//큐 선언 및 생성
        Exchange.DeclareOk exchangeDeclareOk = channel.exchangeDeclare(EXCHANGE_NAME,
            BuiltinExchangeType.DIRECT);//익스체인지 선언 및 생성
        BindOk queueBindOk = channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routingKey");
    }
}

 

 

 

아래 이미지는 위 코드를 실행한 후 RabbitMQ Management 콘솔의 Exchanges 탭과 Queues 탭에서 생성된 익스체인지와 큐, 그리고 두 리소스 간의 바인딩 상태를 확인한 화면입니다.

 

익스체인지 및 큐 생성

 

 

 

익스체인지와 큐 연결

 

 

참고 사이트

 

RabbitMQ tutorial - Routing | RabbitMQ

<!--

www.rabbitmq.com

 

 

 

'RabbitMQ' 카테고리의 다른 글

[RabbitMQ] 메시지 속성에 대하여  (0) 2026.01.18
[RabbitMQ] AMQP 프레임 유형  (0) 2026.01.11
[RabbitMQ] RabbitMQ 기능과 장점  (0) 2026.01.05