티스토리 뷰

프로그래밍/RabbitMQ

[RabbitMQ] Routing

kkd927 2015. 7. 9. 18:39

[RabbitMQ] Routing

(using the Bunny client)


이전 강의에서 간단한 로그 시스템을 만들어봤습니다. 로그 메시지들을 여러 receiver들에게 브로드캐스트할 수 있었습니다.


이번 강의에서는 메시지의 일부를 수신할 수 있게 해볼 것 입니다. 예를들어, 콘솔에 전체 로그 메시지들을 출력하는 동안 치명적인 에러 메시지들은 디스크 공간에 저장될 수 있게 로그 파일에 기록하게 할 수 있습니다.


전제조건


이 튜토리얼에서 RabbitMQ가 localhost에 설치되어있고 기본 포트인 5672로 동작 중인 것을 가정으로 진행됩니다. 다른 호스트나, 포트를 사용할 경우 적절한 세팅을 하셔야합니다.


Bindings

이전 예제에서 bindings을 이미 만들었습니다. 다음 코드와 같습니다.

q.bind(exchange_name)

binding은 exchange와 queue와의 결합입니다. 요약하자면, 해당 queue는 exchange로 부터 오는 메시지들을 수신할 수 있습니다.


Bindings는 :routing_key 라는 추가 인자을 가질 수 있습니다. Bunny::Exchange#publish의 인자값과의 혼란을 피하기 위해 :routing_key 인자를 binding key라고 부르겠습니다. 아래는 key를 이용하여 binding를 생성하는 방법입니다.

q.bind(exchange_name, :routing_key => "black");

binding key는 exchange 종류에 따라 약간 다릅니다. 이전 강의에서 다루었던 fanout exchange 에서는 이 값이 무시됩니다.


Direct exchange

저번 강의에서 제작한 로그 시스템은 모든 메시지들을 모든 consumer에게 브로드캐스트했었습니다. 이번 강의에서는 메시지들을 필터링할 수 있게 확장시킬 것 입니다. 예를들어, warning 혹은 info 로그 메시지들은 디스크에 저장하지 않고 critical error 메시지들은 디스크에 저장하도록 스크립트를 제작할 수 있습니다.


저번에는 단순히 브로드캐스팅을 하는 다루기 어렵지 않은 fanout exchange를 사용 했었습니다. 이번에는 대신에 direct exchange를 사용할 것 입니다. direct exchange 내부의 라우팅 알고리즘은 단순합니다. 메시지는 해당 메시지의 routing key와 queue의 bidning key가 일치하는 queue에게만 전송 되어집니다.


이를 설명하기위해, 다음과 같은 상황을 설정 해보겠습니다.



위의 설정에서 X 이름의 direct exchange에 두개의 queue가 bind되어있는 것을 확인할 수 있습니다. 첫번째 queue는 orange 이름의 binding key에 bind 되어있고, 두번째 queue는 black, green 두 개의 binding key에 bind 되어 있습니다.


여기서 orange 이름의 routing key로 설정된 메시지들은 Q1 queue에 전달될 것 입니다. black 혹은 green 이름의 routing key로 설정된 메시지들은 Q2 queue에 전달될 것 입니다. 다른 모든 메시지들은 전송되지 않고 버려지게 됩니다.



다중 binding



똑같은 binding key를 이용하여 여러 queue들과의 bind도 허용됩니다. 우리의 예제에서 X 와 Q1 사이에 black 이름의 binding key를 추가할 수 있습니다. 이 경우에 direct exchange는 fanout 처럼 모든 메시지들을 브로드캐스트 하는 셈이 됩니다. 즉, black routing key의 메시지들은 Q1, Q2 둘 다에게 전송되어 집니다.



Emitting logs

우리는 fanout 대신 direct exchange를 이용하여 메시지를 송신할 것 입니다. 로그 메시지들을 routing key를 통해 분류할 것 입니다. 이를 통해 수신 시에 원하는 것만 수신할 수 있도록 할 수 있습니다. 우선 로그를 출력하는 것을 살펴 봅시다.


항사 그래왔듯이, 우선 exchange를 생성해야 합니다.

ch.direct("logs");

그러면 메시지를 전송할 준비가 갖춰집니다.

x = ch.direct("logs")
x.publish(msg, :routing_key => severity)

간단히 설명하자만 severity 부분에 info, warning 또는 error가 들어가게 됩니다.



구독하기

메시지 수신은 이전 강의와 한가지만 빼고 동일합니다. severity 부분에 맞게 새로운 binding을 만들 것 입니다.

q = ch.queue("")
ARGV.each do |severity|
  q.bind("logs", :routing_key => severity)
end



종합하기



emit_log_direct.rb의 스크립트 코드입니다.

#!/usr/bin/env ruby
# encoding: utf-8

require "bunny"

conn = Bunny.new
conn.start

ch       = conn.create_channel
x        = ch.direct("direct_logs")
severity = ARGV.shift || "info"
msg      = ARGV.empty? ? "Hello World!" : ARGV.join(" ")

x.publish(msg, :routing_key => severity)
puts " [x] Sent '#{msg}'"

conn.close

(emit_log_direct.rb 소스코드)


receive_logs_direct.rb 코드입니다.

#!/usr/bin/env ruby
# encoding: utf-8

require "bunny"

if ARGV.empty?
  abort "Usage: #{$0} [info] [warning] [error]"
end

conn = Bunny.new
conn.start

ch  = conn.create_channel
x   = ch.direct("direct_logs")
q   = ch.queue("", :exclusive => true)

ARGV.each do |severity|
  q.bind(x, :routing_key => severity)
end

puts " [*] Waiting for logs. To exit press CTRL+C"

begin
  q.subscribe(:block => true) do |delivery_info, properties, body|
    puts " [x] #{delivery_info.routing_key}:#{body}"
  end
rescue Interrupt => _
  ch.close
  conn.close
end

(receive_logs_direct.rb 소스코드)


info를 제외한 오직 warning 혹은 error 로그 메시지들을 저장하고 싶다면, 콘솔을 열고 다음과 같이 입력하시면 됩니다.

$ ruby -rubygems receive_logs_direct.rb warning error > logs_from_rabbit.log


화면에서 모든 로그 메시지들을 확인하고 싶으시면, 새로운 콘솔을 열고 다음과 같이 하면 됩니다.

$ ruby -rubygems receive_logs_direct.rb info warning error
 [*] Waiting for logs. To exit press CTRL+C


그리고 예를들어, error 로그 메시지를 출력하고 싶으면 다음과 같이 입력하시면 됩니다.

$ ruby -rubygems emit_log_direct.rb error "Run. Run. Or it will explode."
 [x] Sent 'error':'Run. Run. Or it will explode.'


다음 5번째 강의에서는 패턴 기반의 메시지를 listen 하는 법을 살펴볼 것 입니다.

댓글