1. 소개

이 빠른 사용방법(예제)에서는 연결 및 채널이라는 두 가지 핵심 개념과 관련된 RabbitMQ의 API를 사용하는 방법을 보여줍니다.

2. RabbitMQ 빠른 요약

RabbitMQ는 메시징 요구를 처리하기 위해 모든 규모의 회사에서 널리 사용되는 AMQP (Advanced Messaging Queue Protocol)의 대중적인 구현입니다.

응용 프로그램의 관점에서 우리는 일반적으로 AMQP의 주요 엔터티인 가상 호스트, 교환 및 Queue에 관심이 있습니다. 이전 기사에서 이러한 개념을 이미 다루었으므로 여기서는 덜 논의된 두 개념인 연결 및 채널에 대한 세부 정보에 중점을 둘 것입니다.

3. 연결

클라이언트가 RabbitMQ 브로커와 상호 작용하기 위해 취해야 하는 첫 번째 단계는 연결을 설정하는 것입니다. AMPQ는 애플리케이션 수준 프로토콜이므로 이 연결은 전송 수준 프로토콜 위에서 발생합니다. 이는 일반 TCP 연결이거나 TLS를 사용하는 암호화된 연결일 수 있습니다. Connection의 주요 역할은 클라이언트가 브로커와 상호 작용할 수 있는 Security 통로를 제공하는 것입니다.

이것은 연결을 설정하는 동안 클라이언트가 서버에 유효한 자격 증명을 제공해야 함을 의미합니다. 서버는 일반 사용자 이름/암호, SASL, X.509 암호 또는 지원되는 메커니즘을 포함하여 다양한 자격 증명 유형을 지원할 수 있습니다.

Security 외에도 연결 설정 단계는 AMPQ 프로토콜의 일부 측면을 협상하는 역할도 합니다. 이때 클라이언트 및/또는 서버가 프로토콜 버전 또는 조정 매개변수 값에 동의하지 않으면 연결이 설정되지 않고 전송 수준 연결이 닫힙니다.

3.1. Java 애플리케이션에서 연결 생성

Java를 사용할 때 RabbitMQ 브라우저와 통신하는 표준 방법은 amqp-client Java 라이브러리를 사용하는 것입니다. 해당 Maven 의존성을 추가하여 이 라이브러리를 프로젝트에 추가할 수 있습니다.

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.16.0</version>
</dependency>

이 아티팩트의 최신 버전은 Maven Central 에서 사용할 수 있습니다 .

이 라이브러리는 Factory 패턴을 사용하여 새 연결을 생성합니다. 먼저 새 ConnectionFactory 인스턴스를 만들고 연결을 만드는 데 필요한 모든 매개변수를 설정합니다. 최소한 RabbitMQ 호스트의 주소를 알려야 합니다.

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("amqp.example.com");

이러한 매개변수 설정이 완료되면 newConnection() 팩토리 메소드를 사용하여 새 Connection 인스턴스를 생성합니다.

Connection conn = factory.newConnection();

4. 채널

간단히 말해서 AMQP 채널은 단일 연결 위에 여러 논리 흐름을 다중화할 수 있는 메커니즘입니다. 연결 설정은 상대적으로 비용이 많이 드는 작업이기 때문에 클라이언트와 서버 측 모두에서 리소스를 더 잘 사용할 수 있습니다.

클라이언트는 브로커에 명령을 보낼 수 있도록 하나 이상의 채널을 만듭니다. 여기에는 메시지 보내기 및/또는 받기와 관련된 명령이 포함됩니다.

채널은 또한 프로토콜 논리와 관련하여 몇 가지 추가 보증을 제공합니다.

  • 주어진 채널에 대한 명령은 항상 전송된 것과 동일한 순서로 실행됩니다.
  • 클라이언트가 단일 연결을 통해 여러 채널을 여는 시나리오가 주어지면 구현에서 사용 가능한 대역폭을 둘 사이에 분배할 수 있습니다.
  • 양 당사자는 메시지 전송을 중지해야 한다고 피어에게 알리는 흐름 제어 명령을 실행할 수 있습니다.

채널의 주요 측면은 채널의 수명 주기가 채널을 만드는 데 사용된 연결에 바인딩된다는 것입니다. 즉, 연결을 닫으면 연결된 모든 채널도 닫힙니다.

4.1. Java 애플리케이션에서 채널 생성

amqp-client 라이브러리 를 사용하는 Java 애플리케이션 은 이전의 createChannel() 메소드를 사용하여 기존 연결 에서 새 채널 을 생성합니다.

channel = conn.createChannel();

채널 이 있으면 서버에 명령을 보낼 수 있습니다. 예를 들어 Queue을 생성하기 위해 queueDeclare() 메서드를 사용합니다.

channel.queueDeclare("example.queue", true, false, true, null);

이 코드는 Queue을 “선언”합니다. 이것은 AMQP가 “아직 존재하지 않는 경우 생성”을 말하는 방식입니다. Queue 이름 뒤의 추가 인수는 추가 특성을 정의합니다.

  • 내구성 : 이 선언은 영구적입니다. 즉, 서버를 다시 시작해도 살아남을 수 있습니다.
  • 배타적: 이 큐는 그것을 선언하는 채널과 관련된 연결로 제한됩니다.
  • autodelete: 더 이상 사용하지 않으면 서버가 Queue을 삭제합니다.
  • args: Queue 동작을 조정하는 데 사용되는 인수가 있는 선택적 맵. 예를 들어 이러한 인수를 사용하여 메시지 및 배달 못한 편지 동작에 대한 TTL을 정의할 수 있습니다.

이제 기본 교환을 사용하여 이 Queue에 메시지를 게시하려면 basicPublish() 메서드를 사용합니다.

channel.basicPublish("", queue, null, payload);

이 코드는 큐 이름을 라우팅 키로 사용하여 기본 교환에 메시지를 보냅니다.

5. 채널 할당 전략

메시징 시스템인 CQRS(명령 쿼리 책임 분리) 응용 프로그램을 사용하는 시나리오를 고려해 보겠습니다. 간단히 말해서 CQRS 기반 응용 프로그램에는 명령과 쿼리라는 두 개의 독립적인 경로가 있습니다. 명령은 데이터를 변경할 수 있지만 값을 반환하지 않습니다. 반면 쿼리는 값을 반환하지만 절대 수정하지 않습니다.

명령 경로는 데이터를 반환하지 않으므로 서비스는 데이터를 비동기적으로 실행할 수 있습니다. 일반적인 구현에서는 내부적으로 메시지를 작성하고 나중에 처리하기 위해 Queue로 보내는 HTTP POST 끝점이 있습니다.

이제 수십 또는 수백 개의 동시 요청을 처리해야 하는 서비스의 경우 매번 연결 및 채널을 여는 것은 현실적인 옵션이 아닙니다 . 대신 더 나은 접근 방식은 채널 풀을 사용하는 것입니다.

물론 이것은 다음 문제로 이어집니다. 단일 연결을 만들고 이 연결에서 채널을 만들어야 할까요 아니면 여러 연결을 사용해야 할까요?

5.1. 단일 연결/다중 채널

이 전략에서는 단일 연결을 사용하고 서비스가 관리할 수 있는 최대 동시 연결 수와 동일한 용량으로 채널 풀을 생성합니다. 기존 요청당 스레드 모델의 경우 요청 처리기 스레드 풀과 동일한 크기로 설정해야 합니다.

이 전략의 단점은 부하가 높을 때 관련 채널을 통해 한 번에 하나씩 명령을 보내야 한다는 사실이 동기화 메커니즘을 사용해야 한다는 것을 의미한다는 것입니다. 이것은 차례로 우리가 최소화하고자 하는 명령 경로에 추가 대기 시간을 추가합니다.

5.2. 스레드당 연결 전략

또 다른 옵션은 다른 극단적인 방법으로 연결 풀을 사용하여 채널에 대한 경합이 없도록 하는 것입니다. 연결 에 대해 핸들러 스레드가 서버에 명령을 실행하는 데 사용할 단일 채널 을 만듭니다.

그러나 클라이언트 측에서 동기화를 제거한다는 사실에는 비용이 따릅니다. 브로커는 소켓 설명자 및 상태 정보와 같은 각 연결에 대해 추가 리소스를 할당해야 합니다. 또한 서버는 클라이언트 간에 사용 가능한 처리량을 분할해야 합니다.

6. 벤치마킹 전략

이러한 후보 전략을 평가하기 위해 각각에 대해 간단한 벤치마크를 실행해 보겠습니다. 벤치마크는 각각 4KB의 메시지 1,000개를 보내는 여러 작업자를 병렬로 실행하는 것으로 구성됩니다. 구성 시 작업자는 명령을 보낼 채널 을 생성할  연결 을 받습니다. 또한 테스트 실행자에게 메시지 전송이 완료되었음을 알리는 데 사용되는 반복 횟수, 페이로드 크기 및 CountDownLatch 를 수신합니다.

public class Worker implements Callable<Worker.WorkerResult> {
    
    // ... field and constructor omitted
    @Override
    public WorkerResult call() throws Exception {

        try {
            long start = System.currentTimeMillis();
            for (int i = 0; i < iterations; i++) {
                channel.basicPublish("", queue, null, payload);
            }

            long elapsed = System.currentTimeMillis() - start;
            channel.queueDelete(queue);
            return new WorkerResult(elapsed);
        } finally {
            counter.countDown();
        }
    }
    
    public static class WorkerResult {
        public final long elapsed;

        WorkerResult(long elapsed) {
            this.elapsed = elapsed;
        }
    }
}

작업자는 래치를 감소시켜 작업을 완료했음을 나타내는 것 외에도 모든 메시지를 보내는 데 경과된 시간과 함께 WorkerResult 인스턴스를 반환합니다. 여기에는 값만 있지만 확장을 사용하여 더 자세한 정보를 반환할 수 있습니다.

컨트롤러는 평가 중인 전략에 따라 연결 팩토리와 작업자를 생성합니다. 단일 연결의 경우 Connection 인스턴스를 만들고 모든 작업자에게 전달합니다.

@Override
public Long call() {
    
    try {
        Connection connection = factory.newConnection();
        CountDownLatch counter = new CountDownLatch(workerCount);
        List<Worker> workers = new ArrayList<>();
        
        for( int i = 0 ; i < workerCount ; i++ ) {
            workers.add(new Worker("queue_" + i, connection, iterations, counter,payloadSize));
        }

        ExecutorService executor = new ThreadPoolExecutor(workerCount, workerCount, 0,
          TimeUnit.SECONDS, new ArrayBlockingQueue<>(workerCount, true));
        long start = System.currentTimeMillis();
        executor.invokeAll(workers);
        
        if( counter.await(5, TimeUnit.MINUTES)) {
            long elapsed = System.currentTimeMillis() - start;
            return throughput(workerCount,iterations,elapsed);
        }
        else {
            throw new RuntimeException("Timeout waiting workers to complete");
        }        
    }
    catch(Exception ex) {
        throw new RuntimeException(ex);
    }
}

다중 연결 전략의 경우 각 작업자에 대해 새 연결 을 만듭니다.

for (int i = 0; i < workerCount; i++) {
    Connection conn = factory.newConnection();
    workers.add(new Worker("queue_" + i, conn, iterations, counter, payloadSize));
}

처리량 함수 는 모든 작업자를 완료하는 데 필요한 총 시간을 작업자 수로 나눈 벤치마크 측정값을 계산합니다.

private static long throughput(int workerCount, int iterations, long elapsed) {
    return (iterations * workerCount * 1000) / elapsed;
}

메시지의 처리량을 초 단위로 얻으려면 분자에 1000을 곱해야 합니다.

7. 벤치마크 실행

이는 두 전략 모두에 대한 벤치마크 결과입니다. 각 작업자 수에 대해 벤치마크를 10번 실행하고 평균 값을 특정 작업자/전략에 대한 처리량 측정값으로 사용했습니다. 오늘날의 표준에 따르면 환경 자체는 겸손합니다.

  • CPU: 듀얼 코어 i7 Dell 노트북 @ 3.0GHz
  • 총 RAM: 16GB
  • RabbitMQ: Docker에서 실행되는 3.10.7(4GB RAM이 있는 docker-machine)
기준

이 특정 환경의 경우 단일 연결 전략에서 약간의 이점이 있습니다. 이 이점은 150명의 작업자 시나리오에서 증가하는 것으로 보입니다.

8. 전략 선택

벤치마크 결과를 고려할 때 확실한 승자를 가리킬 수 없습니다. 5에서 100 사이의 작업자 수의 경우 결과는 거의 동일합니다. 그 후에는 다중 연결과 관련된 오버헤드가 단일 연결에서 다중 채널을 처리하는 것보다 더 높은 것 같습니다.

또한 테스트 작업자가 큐에 고정 메시지를 보내는 한 가지만 수행한다는 점을 고려해야 합니다. 우리가 언급한 CQRS와 같은 실제 응용 프로그램은 일반적으로 메시지를 보내기 전후에 몇 가지 추가 작업을 수행합니다. 따라서 최상의 전략을 선택하려면 프로덕션 환경에 최대한 가까운 구성을 사용하여 자체 벤치마크를 실행하는 것이 좋습니다 .

9. 결론

이 기사에서는 RabbitMQ의 채널 및 연결 개념과 이를 다양한 방식으로 사용하는 방법을 살펴보았습니다. 평소와 같이 전체 코드는 GitHub 에서 사용할 수 있습니다 .

Generic footer banner