BLOGTIMES
2009/04/25

Rubyでお手軽MQ

 

仕事でバッチの多重起動を防止するスクリプトを書いたのですが、これだと後続のバッチがキャンセルされてしまって実行されないので、ジョブをQueueに溜めて逐次実行してくれる仕組みが欲しくなったので、Rubyを使ってMQサーバを書いてみました。

RubyにはQueueクラスがあるので、これを使ってdrbからメッセージを投げ込むようにすれば簡単なMQサーバはすぐにできるのですが、これだとメッセージの永続化機能がないので、不慮の事故でマシンの電源が落ちたりするとそのままメッセージが消滅してしまいます。それだと色々と問題があるので、今回はap4rのバックエンドにも使われている永続化機能を持ったメッセージキューであるreliable-msgを使います。

僕がかつて使っていたIBM MQ Series ( Webshere MQ Series )と比べればおもちゃのようなものですが、ちょっと使う分にはこれで十分です。

reliable-msgのインストールとパッチ当て

まず、gemでreliable-msgをインストールします。

# gem install reliable-msg

gemでインストールされるreliable-msgはUUID 2.0系で動かないなどのいくつかの不具合があるので、下記のパッチを当ててバグを修正します。

/usr/lib/ruby/gems/1.8/gems/reliable-msg-1.1.0/lib/reliable-msg

--- cli.rb.org  2009-04-23 00:31:40.000000000 +0900
+++ cli.rb      2009-04-23 00:52:53.000000000 +0900
@@ -209,8 +209,8 @@
                     exit
                 end
                 drb = Config::DEFAULT_DRB
-                drb.merge(config.drb) if config.drb
-                drb_uri = "druby://localhost:#{drb['port']}"
+                drb = drb.merge(config.drb) if config.drb
+                drb_uri = "druby://#{drb['host']||'localhost'}:#{drb['port']}"
             else
                 drb_uri = Queue::DEFAULT_DRB_URI
             end
--- message-store.rb.org        2009-04-23 00:31:40.000000000 +0900
+++ mescdsage-store.rb    2009-04-23 00:41:20.000000000 +0900
@@ -305,7 +305,7 @@
                     free = @mutex.synchronize do
                         @file_free.shift
                     end
-                    name = free ? free[0] : "#{@path}/#{UUID.new}.msg"
+                    name = free ? free[0] : "#{@path}/#{UUID.generate}.msg"
                     file = if free && free[1]
                         free[1]
                     else
--- queue-manager.rb.org        2009-04-23 00:31:40.000000000 +0900
+++ queue-manager.rb    2009-04-23 00:52:14.000000000 +0900
@@ -233,8 +233,8 @@

                     # Get the DRb URI from the configuration, or use the default. Create a DRb server.
                     drb = Config::DEFAULT_DRB
-                    drb.merge(@config.drb) if @config.drb
-                    drb_uri = "druby://localhost:#{drb['port']}"
+                    drb = drb.merge(@config.drb) if @config.drb
+                    drb_uri = "druby://#{drb['host']||'localhost'}:#{drb['port']}"
                     @drb_server = DRb::DRbServer.new drb_uri, self, :tcp_acl=>ACL.new(drb["acl"].split(" "), ACL::ALLOW_DENY)
                     @logger.info format(INFO_ACCEPTING_DRB, drb_uri)

@@ -305,7 +305,7 @@
             raise ArgumentError, ERROR_SEND_MISSING_QUEUE unless queue and queue.instance_of?(String) and !queue.empty?
             time = Time.new.to_i
             # TODO: change this to support the RM delivery protocol.
-            id = args[:id] || UUID.new
+            id = args[:id] || UUID.generate
             created = args[:created] || time

             # Validate and freeze the headers. The cloning ensures that the headers we hold in memory
@@ -478,7 +478,7 @@
             message, headers, topic, tid = args[:message], args[:headers], args[:topic].downcase, args[:tid]
             raise ArgumentError, ERROR_PUBLISH_MISSING_TOPIC unless topic and topic.instance_of?(String) and !topic.empty?
             time = Time.new.to_i
-            id = args[:id] || UUID.new
+            id = args[:id] || UUID.generate
             created = args[:created] || time

             # Validate and freeze the headers. The cloning ensures that the headers we hold in memory
@@ -559,7 +559,7 @@

         # Called by client to begin a transaction.
         def begin timeout
-            tid = UUID.new
+            tid = UUID.generate
             @transactions[tid] = {:inserts=>[], :deletes=>[], :timeout=>Time.new.to_i + timeout}
             tid
         end

サーバとクライアントの作成

スクリプトを配置するための適当なディレクトリと、メッセージが永続化されるディレクトリを作ります。
ココでは前者をqmtest、後者をqmtest/queuesと仮定します。

$ makedir -p qmtest/queues

次に、サーバとクライアントのスクリプト、設定ファイルを用意します。

qmtest/queues.cfg(QueueManagerの設定ファイル)

store:
  type: disk
drb:
  host: 0.0.0.0
  port: 6438
  acl: allow 127.0.0.1 allow ::1 allow localhost

qmtest/server.rb(サーバ本体)

#!/usr/bin/ruby -Ku

require 'rubygems'
require 'reliable-msg'
include ReliableMsg

require 'logger'
logger = Logger.new(STDOUT)

qm = QueueManager.new(:config => "queues.cfg", :logger => logger)
qm.start
logger.info "QueueManager started."

q = ReliableMsg::Queue.new "queue.test"
queueEnabled = true
Signal.trap(:INT){
  logger.info "SIGINT trapped."
  queueEnabled = false
}

while queueEnabled
  executed = q.get { |msg|
    if msg
      msg.headers.each do |k, v|
        logger.debug("MessageHeader: #{k} => #{v}")
      end
      logger.debug("MessageObject: #{msg.object}")
    end
  }
  sleep 10 unless executed
end

qm.stop
logger.info "QueueManager terminated."

qmtest/client.rb(QueueManagerにメッセージを送信するクライアント)

#!/usr/bin/ruby -Ku

require 'rubygems'
require 'reliable-msg'
include ReliableMsg

require 'logger'
logger = Logger.new(STDOUT)

q = ReliableMsg::Queue.new "queue.test"
logger.info q.put("hoge")

実際に動作させてみる

実際にサーバを起動して、メッセージを放りこんでみます。このサーバはテスト用なので、10秒ごとにQueueを監視してメッセージがあれば取り出して、メッセージをログに出力するだけで特に何も処理をしません。

コンソールを2つ開き、1つ目のコンソールでqmtestディレクトリに入ってサーバを起動します。
下記のようなログが出力されるはずです。

$ cd qmtest
$ ruby server.rb
I, [2009-04-26T02:23:14.215998 #9154]  INFO -- : Loaded queues configuration from: /tmp/qmtest/queues.cfg
I, [2009-04-26T02:23:14.216099 #9154]  INFO -- : Using message store: disk
I, [2009-04-26T02:23:14.216701 #9154]  INFO -- : Accepting requests at: druby://0.0.0.0:6438
I, [2009-04-26T02:23:14.216792 #9154]  INFO -- : QueueManager started.

もう一つのコンソールも同様にして、クライアントを起動させると、投入されたメッセージのIDが出力されるはずです。

$ cd qmtest
$ ruby client.rb
I, [2009-04-26T02:23:23.736733 #9157]  INFO -- : b43c53d0-13eb-012c-d180-003048d48a0e

そうすると、1つめのコンソールがメッセージを受け取って下記のようなログを出力するはずです。

D, [2009-04-26T02:23:24.219530 #9154] DEBUG -- : MessageHeader: priority => 0
D, [2009-04-26T02:23:24.219609 #9154] DEBUG -- : MessageHeader: delivery => best_effort
D, [2009-04-26T02:23:24.219667 #9154] DEBUG -- : MessageHeader: created => 1240680203
D, [2009-04-26T02:23:24.219722 #9154] DEBUG -- : MessageHeader: max_deliveries => 5
D, [2009-04-26T02:23:24.219775 #9154] DEBUG -- : MessageHeader: id => b43c53d0-13eb-012c-d180-003048d48a0e
D, [2009-04-26T02:23:24.219830 #9154] DEBUG -- : MessageHeader: expires =>
D, [2009-04-26T02:23:24.219882 #9154] DEBUG -- : MessageObject: hoge

サーバを終了させるにはCtrl+Cを押します。
現在のメッセージの処理が完了すると、QueueManagerがシャットダウンされます。

I, [2009-04-26T02:23:30.856519 #9154]  INFO -- : SIGINT trapped.
I, [2009-04-26T02:23:34.228837 #9154]  INFO -- : Stopped queue manager at: druby://0.0.0.0:6438
I, [2009-04-26T02:23:34.228932 #9154]  INFO -- : QueueManager terminated.

あとは、具体的な処理を書いたり、ThreadやProcessを使って多重化するなり自由自在です。

シェルスクリプトでさらに起動と停止を便利にする

運用の時にサーバの上げ下げが面倒なのは嫌なので、start.shとstop.shで一発で起動と停止が出来るようにしてみました。

qmtest/start.sh

#!/bin/bash

BASENAME=`basename $0 .sh`
DIRNAME=`dirname $0`

PID_FILE="server.pid"
cd $DIRNAME

if [ -f $PID_FILE ]; then
        STORED_PID=`cat $PID_FILE`
        if (ps -p ${STORED_PID} -o pid= >/dev/null); then
                echo "Already started. (PID: $STORED_PID)"
                exit
        fi
fi

ruby server.rb >> server.log 2>&1 &
QM_PID=$!
echo $QM_PID > $PID_FILE

echo "QueueManager started. (PID: $QM_PID)"

qmtest/stop.sh

#!/bin/bash

BASENAME=`basename $0 .sh`
DIRNAME=`dirname $0`

PID_FILE="server.pid"
cd $DIRNAME

if [ -f $PID_FILE ]; then
        STORED_PID=`cat $PID_FILE`
        if (ps -p ${STORED_PID} -o pid= >/dev/null); then
                kill -2 $STORED_PID
                echo -n "Signal sent, waiting for exit..."
        fi
        while (ps -p ${STORED_PID} -o pid= >/dev/null); do
                sleep 3
                echo -n "."
        done
        echo ""
fi

rm -f $PID_FILE
echo "done!"

参考
 ・AP4R で Parallels の壁を越える
 ・Rails Wiki - ReliableMsg

このエントリは役に立ちましたか?
     
トラックバックについて
Trackback URL:
お気軽にどうぞ。トラックバック前にポリシーをお読みください。[policy]
このエントリへのTrackbackにはこのURLが必要です→http://blog.cles.jp/item/3010
Trackbacks
このエントリにトラックバックはありません
Comments
愛のあるツッコミをお気軽にどうぞ。[policy]
古いエントリについてはコメント制御しているため、即時に反映されないことがあります。
コメントはありません
Comments Form

コメントは承認後の表示となります。
OpenIDでログインすると、即時に公開されます。

OpenID を使ってログインすることができます。

Identity URL: Yahoo! JAPAN IDでログイン

★下記に2つの英単語をスペースで区切って入力してください