티스토리 뷰
[RabbitMQ] Publish/Subscribe
(using the Bunny Client)
이전 강의에서 work queue 를 만들어봤습니다. work queue 의 내부에서는 각 작업을 정확히 하나씩의 worker 에게 전달해줍니다. 이번 강의에서는 work queue와는 전혀 다른 하나의 메시지를 여러 consumer들에게 전송해볼 것 입니다. 이 패턴은 publish/subscribe 이라 불립니다.
이 패턴을 구현하기 위해 간단한 로그 시스템을 만들 것 입니다. 2개의 프로그램으로 구성되며, 첫 번째 프로그램은 로그 메시지를 생성하고 두 번째는 로그를 받아 출력합니다.
제작할 로그 시스템은 동작 중인 receiver 프로그램이 받는 모든 메시지들을 복사합니다. 이 경우 한 receiver 프로그램은 로그를 디스크에 저장하고 다른 receiver 프로그램의 로그는 화면에 출력해 줄 수 있습니다.
근본적으로, 로그 메시지들은 모든 receiver 들에게 송신됩니다.
전제조건
이 튜토리얼에서 RabbitMQ가 localhost에 설치되어있고 기본 포트인 5672로 동작 중인 것을 가정으로 진행됩니다. 다른 호스트나, 포트를 사용할 경우 적절한 세팅을 하셔야합니다.
Exchanges
이전 강의에서는 queue 로 메시지를 전송하거나 수신하였습니다. 이번 강의에서는 RabbitMQ에서의 메시징 모델 전체를 설명할 것 입니다.
이전 강의에서 학습한 내용을 간단히 요약해 보겠습니다.
> 송신자(producer)는 메시지를 송신하는 사용자 어플리케이션입니다.
> 큐(queue)는 메시지가 저장되는 버퍼입니다.
> 수신자(consumer)는 메시지를 수신하는 사용자 어플리케이션입니다.
RabbitMQ의 메시징 모델의 핵심 개념은 producer가 직접적으로 queue 에게 메시지를 보내지 않는다는 것입니다. 사실 producer는 메시지가 queue 에게 전달되는지 조차 모릅니다.
대신에 producer는 메시지를 exchange에게 전송한다. exchange는 매우 간단한 일을 수행합니다. 메시지를 producer에게 수신받으면 그 메시지를 queue에 전달합니다. exchange는 메시지가 queue 에 정확히 전달되는지 확인할 수 있어야합니다(특정 큐에 전달되는지, 여러 큐에 전달되는지 혹은 메시지가 폐기되었는지 등). 이를 위한 룰들은 exchange type에 정의되어 있습니다.
direct, topic, headers 그리고 fanout 옵션이 exchange type으로 사용 가능합니다. 우리는 마직막 옵션인 fanout을 이용할 것 입니다. 이 옵션을 이용해 exchange를 생성해봅시다. 이름은 logs라 짓겠습니다.
ch.fanout("logs")
fanout exchange는 매우 간단합니다. 이름에서 추측할 수 있듯이, 메시지들을 모든 queue들이 수신할 수 있도록 브로드캐스트합니다. 이는 우리가 제작할 로그 프로그램이 필요한 이유이기도 합니다.
Listing exchanges
exchange의 목록을 보기위해선 rabbitmqctl을 이용하면 됩니다.
$ sudo rabbitmqctl list_exchanges Listing exchanges ... direct amq.direct direct amq.fanout fanout amq.headers headers amq.match headers amq.rabbitmq.log topic amq.rabbitmq.trace topic amq.topic topic logs fanout ...done.
리스트를 확인해보면 amq.* 이름의 기본 exchange들이 있습니다. 이들은 기본적으로 생성되지만 지금 당장은 필요하진 않습니다.
Nameless exchange
이전 강의에서는 exchange에 대해 아무것도 아는게 없었지만 queue로 메시지를 보낼 수 있었습니다. 이는 우리가 기본으로 있는 exchange들을 사용했기에 가능했습니다.
전에 메시지를 전송한 부분을 다시 보면
ch.default_exchange.publish("hello", :routing_key => "hello");
이 부분에서 기본 혹은 무명의 exchange를 사용했었습니다. 만약 특별히 설정을 해주지않으면 위 처럼 :routing_key를 이용하여 메시지들이 queue로 가는 경로를 지정해줍니다.
이제는 유명의 exchange을 이용하여 전송할 수 있습니다.
x = ch.fanout("logs") x.publish(msg)
임시 queue
이전에 hello 와 task_queue라고 이름붙힌 queue를 이용한 적이 있습니다. 큐에 이름을 붙힐 수 있다는 것은 worker들에게 같은 큐를 사용할 수 있게 해주므로 꽤나 중요한 사안입니다. 큐에 이름을 붙이는 것은 producer와 consumer 사이에 큐를 공유할 수 있게 해줍니다.
하지만 우리가 제작할 로그 시스템에서는 조금 상황이 다릅니다. 저희는 일부가 아니라 모든 로그 메시지들을 확인하길 원합니다. 또한 예전 것이 아닌 현재의 로그상황을 확인하길 원할 것입니다. 이 문제를 해결하기 위해선 두 가지가 필요합니다.
첫째로, RabbitMQ에 접속할 때 마다 새로운 빈 queue가 필요합니다. 이는 랜덤 이름의 queue(더 나은 방법이 있다면 그 방법을)를 만듬으로써 해결할 수 있습니다.
두번째로, 사용자가 consumer와의 연결을 끊으면 해당 queue는 자동으로 삭제되어야 합니다.
Bunny 클라이언트에서는 queue의 이름을 빈 문자열로 지정하면 랜덤이름과 함께 임시의 queue를 생성해줍니다.
q = ch.queue("", :exclusive => true)
이 함수가 리턴될 때, queue 인스턴스는 RabbitMQ로 부터 생성된 임의의 queue 이름을 포함한다. 예를들어 amq.gen-JzTY20BRgKO-HjmUJj0wLg와 같이 생겼다.
연결이 종료될 때, queue는 일회용으로 생성되었기에 삭제됩니다.
Bindings
저희는 위에서 fanout exchange와 queue를 생성해 보았습니다. 이제 이 exchange가 queue로 메시지를 보낼 수 있도록 해야합니다. exchange와 queue의 관계연결을 binding이라 합니다.
q.bind("logs")
이제 logs 이름의 exchange는 queue로 메시지를 전송할 것 입니다.
Listing bindings
rabbitmqctl list_bindings를 이용하면 현재 존재하는 binding들의 목록을 확인할 수 있습니다.
종합하기
로그 메시지를 출력하는 producer 프로그램은 이전의 강의에서의 내용과 별로 차이가 없습니다. 가장 중요한 차이는 무명의 exchange가 아닌 logs이름의 exchange에 메시지를 보내는 것 입니다. 메시지를 보낼때 routingKey를 설정할 필요가 있지만 fanout exchange에서는 이 값이 무시됩니다. 아래는 emit_log.rb 스크립입니다.
#!/usr/bin/env ruby # encoding: utf-8 require "bunny" conn = Bunny.new conn.start ch = conn.create_channel x = ch.fanout("logs") msg = ARGV.empty? ? "Hello World!" : ARGV.join(" ") x.publish(msg) puts " [x] Sent #{msg}" conn.close
아시다싶이 connection을 맺고나서 exchange를 선언해야합니다. 이 부분에서 존재하지 않은 exchange가 생성되는 걸 방지합니다.
아직 queue가 exchange에 binding 되지않았다면 메시지들은 손실될 수 도있지만 현재상황에서는 괜찮습니다. 아직 consumer가 listening하지 않았다면 메시지를 안전하게 삭제할 수 있습니다.
receive_logs.rb 코드입니다.
#!/usr/bin/env ruby # encoding: utf-8 require "bunny" conn = Bunny.new conn.start ch = conn.create_channel x = ch.fanout("logs") q = ch.queue("", :exclusive => true) q.bind(x) puts " [*] Waiting for logs. To exit press CTRL+C" begin q.subscribe(:block => true) do |delivery_info, properties, body| puts " [x] #{body}" end rescue Interrupt => _ ch.close conn.close end
로그를 파일로 저장하길 원한다면, 콘솔을 열어서 다음과 같이 입력하시면 됩니다.
$ ruby -rubygems receive_logs.rb > logs_from_rabbit.log
로그를 화면에서 보길 원한다면 새로운 터미널을 열어 다음과 같이 실행하면 됩니다.
$ ruby -rubygems receive_logs.rb
물론 로그출력 프로그램도 실행시켜야합니다.
$ ruby -rubygems emit_log.rb
rabbitmqctl list_bindings를 이용하면 코드가 binding와 queue들을 생성하는 것을 확인해볼 수 있습니다. 두 개의 receive_logs.rb 프로그램을 실행시키고 다음괌 같이 입력하시면 됩니다.
$ sudo rabbitmqctl list_bindings Listing bindings ... logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] ...done.
결과는 이해하기 쉬울 정도로 간단명료합니다. 데이터들은 logs 이름의 exchange로 부터 서버로부터 임의의 이름로 붙혀진 두개의 queue로 흘러갑니다. 이는 우리가 의도한대로의 결과입니다.
다음 강의에서는 메시지의 일부를 듣는 방법을 알아볼 것 입니다.