티스토리 뷰

Work Queues

(using Bunny)



첫번째 강의에서 queue로 송/수신하는 프로그램을 작성했습니다. 이번 강의에서는 여러 작업 프로세스들 사이에서 일정 시간이 소모되는 작업들을 분배하는 Work Queue 를 만들것 입니다.


Work Queue(Task Queues 라 불리는)의 핵심 개념은 자원소모가 많은 작업을 즉시 처리하지 않고 작업이 완료될 때까지 기다리는 것입니다. 대신 이 작업이 나중에 처리되어지도록 스케쥴해야합니다. 작업을 메시지로 캡슐화하여 queue에 전송할 것입니다. 작업 프로세스는 백그라운드로 실행되면서 작업을 queue로 부터 꺼내 수행합니다. 여러 작업 프로세스들을 실행시킬 경우 작업들은 프로세스들 사이에서 분배되어 실행될 것입니다.


이 개념은 웹 어플리케이션에서 복잡한 작업을 다룰 때 특히 유용합니다.


전제조건


이 튜토리얼에서 RabbitMQ가 localhost에 설치되어있고 기본 포트인 5672로 동작 중인 것을 가정으로 진행됩니다. 다른 호스트나, 포트를 사용할 경우 적절한 세팅을 하셔야합니다.



준비 과정


이전 강의에서는 "Hello World"라는 메시지를 전송했습니다. 이제는 복잡한 잡업들 표현하는 문자열을 전송할 것입니다. 실제로 수행될 마땅한 작업들이 없기 때문에 kernel#sleep 함수를 통해 일정 시간을 지연시킬 것입니다. 마침표의 갯수로 작업의 복잡성을 나타낼 것입니다. 마침표 하나당 1초의 시간소요시킬 것입 니다. 예를들어 Hello... 라고 묘사된 작업은 3초가 소요될 작업입니다.


이전 예제의 send.rb에서 커맨드라인에서 입력한 문자열을 전송할 수 있도록 수정할 것 입니다. 이 프로그램은 work queue를 이용해 작업들을 스케쥴할 것입니다. 프로그램명을 new_task.rb라 하겠습니다.

msg  = ARGV.empty? ? "Hello World!" : ARGV.join(" ")

q.publish(msg, :persistent => true)
puts " [x] Sent #{msg}"


마찬가지로 전에 작성했던 receive.rb 파일도 약간의 수정이 필요합니다. 메시지 본문에 포함된 마침표 하나당 1초의 시간을 지연 시킬 것입니다. 이 소비자 프로그램(receiver)은 queue로 부터 메시지들을 꺼내 작업을 수행합니다. 프로그램명을 worker.rb라 하겠습니다.

q.subscribe(:ack => true, :block => true) do |delivery_info, properties, body|
  puts " [x] Received #{body}"
  # imitate some work
  sleep body.count(".").to_i
  puts " [x] Done"

  ch.ack(delivery_info.delivery_tag)
end


위의 (가짜) 작업은 실행시간을 표현합니다.


첫번째 강의에서 실행한 것 처럼 이 프로그램들을 실행시킵니다.

shell1$ ruby -rubygems worker.rb
shell2$ ruby -rubygems new_task.rb



Round-robin을 이용한 관리, 운용


Task Queue의 장점들 중 하나는 병렬작업을 처리하기 쉽다는 점이다. 만약 자원이 부족해 작업들이 밀린다면, 단지 작업 프로세스들을 추가하면 되므로 확장성이 좋습니다. 


우선 worker.rb 프로그램을 동시에 2개를 돌려 봅시다. 이 두 프로그램들은 queue로 부터 메시지를 같이 받아올 것이다. 어떻게 동작되는지 살펴 봅시다.


3개의 콘솔 창이 필요합니다. 두 개는 worker.rb 스크립트를 실행 시킬 것이다(consumer 역활을 수행합니다 - C1, C2라 명명).

shell1$ ruby -rubygems worker.rb
 [*] Waiting for messages. To exit press CTRL+C
shell2$ ruby -rubygems worker.rb
 [*] Waiting for messages. To exit press CTRL+C


3번째 콘솔은 새로운 작업들을 만들 것이다. consumer들을 시작시켰다면 여러 메시지들을 전송시킬 수 있습니다.

shell3$ ruby -rubygems new_task.rb First message.
shell3$ ruby -rubygems new_task.rb Second message..
shell3$ ruby -rubygems new_task.rb Third message...
shell3$ ruby -rubygems new_task.rb Fourth message....
shell3$ ruby -rubygems new_task.rb Fifth message.....


작업프로세스들에게 무엇이 전송되는 지 확인해보자.

shell1$ ruby -rubygems worker.rb
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'
shell2$ ruby -rubygems worker.rb
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'


기본적으로 RabbitMQ는 각 메시지를 순차적으로 다음 consumer에게 전송합니다. 대체로 모든 consumer들은 같은 갯수의 메시지들을 받는다. 이 분산 메시지 방법은 round-robin이라 불립니. 이 방법을 세개 혹은 더 많은 작업 프로세스들로 시도해보길 바랍니다.



메시지 확인응답(Message acknowledgment)


작업을 수행할 때 몇 초가 소요될 수 있습니다. 사용자는 consumer들 중 하나가 긴 작업을 시작했는지, 죽었는지 혹은 부분적으로 완료했는지 등을 궁금해할 수 있습니다. 현재 코드로는 RabbitMQ가 메시지를 customer에게 전달하는 순간 메모리에서 메시지를 삭제합니다. 이 경우에 worker 프로세스를 죽일 경우 막 처리한 메시지를 손실할 수 도 있습니다. 이 경우에 또한 특정한 worker에 보내진 모든 메시지들을 잃을 수 있지만 아직 이 부분을 처리하지 않았습니다.


우리는 어느 작업이든 손실이 되기 원하지 않습니다. 만약 worker가 죽는다면 다른 worker에게 메시지가 전달되어지길 바랄 것입니다.


메시지를 절대 손실하지 않기 위해서 RabbitMQ는 메시지 확인응답을 제공합니다. ack(응답)은 특정 메시지가 수신되거나 처리될 때 consumer로 부터 RabbitMQ에게 보내집니다. 이제 RabbitMQ 시스템에서 이 worker을 삭제할 수 있습니다.


만약 ack 응답을 보내지 않고 consumer가 죽게되면, RabbitMQ는 메시지가 완전히 처리되지 않았다고 인식하고 다른 consumer에게 메시지를 전달합니다. 이 방법은 가끔 worker가 예기치않게 죽더라도 메시지가 손실되지 않습니다.


메시지 타임아웃은 없습니다. RabbitMQ는 오직 worker와의 연결이 죽었을때만 메시지를 재전송합니다. 이렇게되면 메시지를 처리하는데 엄청 오랜 시간이 걸리더라도 아무상관 없습니다.


메시지 확인응답은 기본설정으로 꺼져있습니다. 이제 :manual_ack 옵션을 통해 이 설정을 켜고 작업이 끝났을 때 worker로 부터 적절한 확인응답을 전송할 것 입니다.

q.subscribe(:manual_ack => true, :block => true) do |delivery_info, properties, body|
  puts " [x] Received '#{body}'"
  # imitate some work
  sleep 1.0
  puts " [x] Done"
  ch.ack(delivery_info.delivery_tag)
end


위의 코드를 통해 worker 가 메시지를 처리하는 도중에 CTRL+C를 눌려 죽일지라도 메시지를 손실하지 않을 수 있습니다. 


확인응답 처리를 잊어버렸을 경우


ack 처리를 까먹고 잊어버릴 수 도 있습니다. 이는 흔한 실수이지만, 그 결과는 심각할 수 있습니다. client가 종료될 때 메시지들은 계속 재전송 되어지고 이는 RabbitMQ에서 계속 메모리를 차지하여 메시지들을 정상적으로 처리하지 못할 수 도 있습니다.


이런 종류의 실수를 디버그하기 위해서 rabbitmqctl을 이용하여 message_unacknowleged 정보를 출력할 수 있습니다.

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.



메시지 내구성(Message durability)


consumer가 죽더라도 작업이 손실되지 않도록 어떻게 처리하는지 배웠습니다. 하지만 작업들은 여전히 RabbitMQ 서버가 중지될 때 손실 될 수 있습니다.


RabbitMQ가 종료되거나 충돌이 일어날 경우 예외처리를 하지 않으면 queue 와 메시지들을 손실 할 수 있습니다. 메시지들을 손실하지 않기 위해선 두 가지가 요구됩니다. 큐와 메시지들이 내구성을 가질 수 있도록 처리할 필요가 있습니다.


우선, RabbitMQ가 절대 queue를 손실하지 않도록 처리할 필요가 있습니다. 이를 처리하기 위해서 durable 옵션을 통해 선언해야합니다.

ch.queue("hello", :durable => true)


비록 이 명령은 올바르지만, 현재 튜토리얼의 상태에서는 작동하지 않습니다. 왜나하면 이미 hello 라는 queue 가 not durable로 정의되었기 때문입니다. RabbitMQ는 이미 존재하는 queue에 다른 옵션을 재정의를 허락하지 않는다. 하지만 task_queue 와 같은 다른 이름의 queue를 선언함으로써 이 문제를 해결할 수 있다.

ch.queue("task_queue", :durable => true)


:durable 옵션은 producer 와 consumer  모두 적용시켜야 합니다.


이렇게 옵션을 설정하면 RabbitMQ가 재시작하더라고 task_queue 큐는 손실되지 않습니다. 이제 메시지들의 내구성을 Bunny::Exchange#publish 에 :persistent 옵션을 통해 설정해줘야 합니다.

x.publish(msg, :persistent => true)


메시지 지속성에 대해 알아두기


메시지들을 persistent 옵션으로 설정해준다고 완전히 비손실 보장은 되지 않습니다. 비록 RabbitMQ에서 디스크에 해당 메시지 저장을 지시하지만 RabbitMQ에서 메시지를 수령하고 저장하는 찰나의 시간에 문제가 생길 수 도 있습니다. 또한 RabbitMQ는 모든 메시지에 fysnc(2)를 실행하지 않습니다(캐시에 메시지를 저장하고 실제 디스크에 쓰지 않는다). 이 지속성의 효과는 강력하진 않지만 simple task queue 보단 월등히 효과적입니다. 만약 더 강력한 지속성을 원한다면 publisher confirms 을 이용하면 됩니다.



공평한 전송(Fair dispatch)


메시지가 우리가 원하는대로 전송되지는 않습니다. 예를들어 두 개의 worker 가 있는 상황에서, 모든 홀수번 째 메시지는 처리량이 많고 짝수번 째 메시지는 처리량이 적은 경우, 한 worker는 끊임없이 바쁠테고 나머지 worker는 간간히 작업을 수행할 것입니다. RabbitMQ는 모든 것을 아는 것이 아니기 때문에 골고루 메시지를 전송하도록 처리를 해줘야합니다.


이 현상은 RabbitMQ는 메시지가 queue 에 들어오면 그 즉시 메시지를 전송하기 때문에 일어납니다. RabbitMQ는 consumer 의 unacknowledged message 들을 살펴보진 않습니다. 단순히 n번째 메시지를 n번째 consumer에게 전송할 뿐입니다.



이를 해결하기 위해 prefetch 함수를 1이라는 값과 함께 사용해줘야 합니다. 이는 RabbitMQ가 한 번에 한 worker에 한개 그 이상의 메시지를 전송하지 않게 합니다. 다른말로, 한 worker 가 일을 완료하거나 확인응답을 보내기 전까지 새로운 메시지를 전송하지 않습니다. 대신에 바쁘지 않는 다음 worker에게 메시지를 전송하게 됩니.

n = 1;
ch.prefetch(n);


Queue 크기에 대해 알아보기


만약 모든 worker 들이 바쁘다면 queue 는 포화상태가 됩니다. 메시지들을 놓치지 싫다면, 더 많은 worker 들을 추가하거나 다른 계획이 필요합니다.



위에서 살펴본 내용 종합하기


new_task.rb의 최종 코드는 다음과 같습니다.

#!/usr/bin/env ruby
# encoding: utf-8

require "bunny"

conn = Bunny.new
conn.start

ch   = conn.create_channel
q    = ch.queue("task_queue", :durable => true)

msg  = ARGV.empty? ? "Hello World!" : ARGV.join(" ")

q.publish(msg, :persistent => true)
puts " [x] Sent #{msg}"

sleep 1.0
conn.close

(new_task.rb source)


worker.rb의 최종 코드는 다음과 같습니다.

#!/usr/bin/env ruby
# encoding: utf-8

require "bunny"

conn = Bunny.new
conn.start

ch   = conn.create_channel
q    = ch.queue("task_queue", :durable => true)

ch.prefetch(1)
puts " [*] Waiting for messages. To exit press CTRL+C"

begin
  q.subscribe(:ack => true, :block => true) do |delivery_info, properties, body|
    puts " [x] Received '#{body}'"
    # imitate some work
    sleep 1.0
    puts " [x] Done"
    ch.ack(delivery_info.delivery_tag)
  end
rescue Interrupt => _
  conn.close
end

(worker.rb source)


메시지 확인응답과 prefetch를 사용하면 work queue를 제공할 수 있습니다. 내구성 옵션은 RabbitMQ가 재시작 되더라도 작업들이 손실되지 않게 해줍니다.


Bunny::Channel 함수들과 메시지 속성들에 대해 더 자세히 알아보길 원한다면 Bunny API reference를 참고하면 됩니다.


다음 강의에서는 같은 메시지를 동시에 여러 consumer들에게 보내는 방법을 배울 것 입니다.

댓글