BLOGTIMES
2009/04/25

Rubyでお手軽MQ

  ruby  sh  mom 
このエントリーをはてなブックマークに追加

仕事でバッチの多重起動を防止するスクリプトを書いたのですが、これだと後続のバッチがキャンセルされてしまって実行されないので、ジョブをQueueに溜めて逐次実行してくれる仕組みが欲しくなったので、Rubyを使ってMQサーバを書いてみました。

RubyにはQueueクラスがあるので、これを使ってdrbからメッセージを投げ込むようにすれば簡単なMQサーバはすぐにできるのですが、これだとメッセージの永続化機能がないので、不慮の事故でマシンの電源が落ちたりするとそのままメッセージが消滅してしまいます。それだと色々と問題があるので、今回はap4rのバックエンドにも使われている永続化機能を持ったメッセージキューであるreliable-msgを使います。

僕がかつて使っていたIBM MQ Series ( Webshere MQ Series )と比べればおもちゃのようなものですが、ちょっと使う分にはこれで十分です。

reliable-msgのインストールとパッチ当て

まず、gemでreliable-msgをインストールします。

# gem install reliable-msg

gemでインストールされるreliable-msgはUUID 2.0系で動かないなどのいくつかの不具合があるので、下記のパッチを当ててバグを修正します。

/usr/lib/ruby/gems/1.8/gems/reliable-msg-1.1.0/lib/reliable-msg

--- cli.rb.org 2009-04-23 00:31:40.000000000 +0900 +++ cli.rb 2009-04-23 00:52:53.000000000 +0900 @@ -209,8 +209,8 @@ exit end drb = Config::DEFAULT_DRB - drb.merge(config.drb) if config.drb - drb_uri = "druby://localhost:#{drb['port']}" + drb = drb.merge(config.drb) if config.drb + drb_uri = "druby://#{drb['host']||'localhost'}:#{drb['port']}" else drb_uri = Queue::DEFAULT_DRB_URI end
--- message-store.rb.org 2009-04-23 00:31:40.000000000 +0900 +++ mescdsage-store.rb 2009-04-23 00:41:20.000000000 +0900 @@ -305,7 +305,7 @@ free = @mutex.synchronize do @file_free.shift end - name = free ? free[0] : "#{@path}/#{UUID.new}.msg" + name = free ? free[0] : "#{@path}/#{UUID.generate}.msg" file = if free && free[1] free[1] else
--- queue-manager.rb.org 2009-04-23 00:31:40.000000000 +0900 +++ queue-manager.rb 2009-04-23 00:52:14.000000000 +0900 @@ -233,8 +233,8 @@ # Get the DRb URI from the configuration, or use the default. Create a DRb server. drb = Config::DEFAULT_DRB - drb.merge(@config.drb) if @config.drb - drb_uri = "druby://localhost:#{drb['port']}" + drb = drb.merge(@config.drb) if @config.drb + drb_uri = "druby://#{drb['host']||'localhost'}:#{drb['port']}" @drb_server = DRb::DRbServer.new drb_uri, self, :tcp_acl=>ACL.new(drb["acl"].split(" "), ACL::ALLOW_DENY) @logger.info format(INFO_ACCEPTING_DRB, drb_uri) @@ -305,7 +305,7 @@ raise ArgumentError, ERROR_SEND_MISSING_QUEUE unless queue and queue.instance_of?(String) and !queue.empty? time = Time.new.to_i # TODO: change this to support the RM delivery protocol. - id = args[:id] || UUID.new + id = args[:id] || UUID.generate created = args[:created] || time # Validate and freeze the headers. The cloning ensures that the headers we hold in memory @@ -478,7 +478,7 @@ message, headers, topic, tid = args[:message], args[:headers], args[:topic].downcase, args[:tid] raise ArgumentError, ERROR_PUBLISH_MISSING_TOPIC unless topic and topic.instance_of?(String) and !topic.empty? time = Time.new.to_i - id = args[:id] || UUID.new + id = args[:id] || UUID.generate created = args[:created] || time # Validate and freeze the headers. The cloning ensures that the headers we hold in memory @@ -559,7 +559,7 @@ # Called by client to begin a transaction. def begin timeout - tid = UUID.new + tid = UUID.generate @transactions[tid] = {:inserts=>[], :deletes=>[], :timeout=>Time.new.to_i + timeout} tid end

サーバとクライアントの作成

スクリプトを配置するための適当なディレクトリと、メッセージが永続化されるディレクトリを作ります。
ココでは前者をqmtest、後者をqmtest/queuesと仮定します。

$ makedir -p qmtest/queues

次に、サーバとクライアントのスクリプト、設定ファイルを用意します。

qmtest/queues.cfg(QueueManagerの設定ファイル)

store: type: disk drb: host: 0.0.0.0 port: 6438 acl: allow 127.0.0.1 allow ::1 allow localhost

qmtest/server.rb(サーバ本体)

#!/usr/bin/ruby -Ku require 'rubygems' require 'reliable-msg' include ReliableMsg require 'logger' logger = Logger.new(STDOUT) qm = QueueManager.new(:config => "queues.cfg", :logger => logger) qm.start logger.info "QueueManager started." q = ReliableMsg::Queue.new "queue.test" queueEnabled = true Signal.trap(:INT){ logger.info "SIGINT trapped." queueEnabled = false } while queueEnabled executed = q.get { |msg| if msg msg.headers.each do |k, v| logger.debug("MessageHeader: #{k} => #{v}") end logger.debug("MessageObject: #{msg.object}") end } sleep 10 unless executed end qm.stop logger.info "QueueManager terminated."

qmtest/client.rb(QueueManagerにメッセージを送信するクライアント)

#!/usr/bin/ruby -Ku require 'rubygems' require 'reliable-msg' include ReliableMsg require 'logger' logger = Logger.new(STDOUT) q = ReliableMsg::Queue.new "queue.test" logger.info q.put("hoge")

実際に動作させてみる

実際にサーバを起動して、メッセージを放りこんでみます。このサーバはテスト用なので、10秒ごとにQueueを監視してメッセージがあれば取り出して、メッセージをログに出力するだけで特に何も処理をしません。

コンソールを2つ開き、1つ目のコンソールでqmtestディレクトリに入ってサーバを起動します。
下記のようなログが出力されるはずです。

$ cd qmtest $ ruby server.rb I, [2009-04-26T02:23:14.215998 #9154] INFO -- : Loaded queues configuration from: /tmp/qmtest/queues.cfg I, [2009-04-26T02:23:14.216099 #9154] INFO -- : Using message store: disk I, [2009-04-26T02:23:14.216701 #9154] INFO -- : Accepting requests at: druby://0.0.0.0:6438 I, [2009-04-26T02:23:14.216792 #9154] INFO -- : QueueManager started.

もう一つのコンソールも同様にして、クライアントを起動させると、投入されたメッセージのIDが出力されるはずです。

$ cd qmtest $ ruby client.rb I, [2009-04-26T02:23:23.736733 #9157] INFO -- : b43c53d0-13eb-012c-d180-003048d48a0e

そうすると、1つめのコンソールがメッセージを受け取って下記のようなログを出力するはずです。

D, [2009-04-26T02:23:24.219530 #9154] DEBUG -- : MessageHeader: priority => 0 D, [2009-04-26T02:23:24.219609 #9154] DEBUG -- : MessageHeader: delivery => best_effort D, [2009-04-26T02:23:24.219667 #9154] DEBUG -- : MessageHeader: created => 1240680203 D, [2009-04-26T02:23:24.219722 #9154] DEBUG -- : MessageHeader: max_deliveries => 5 D, [2009-04-26T02:23:24.219775 #9154] DEBUG -- : MessageHeader: id => b43c53d0-13eb-012c-d180-003048d48a0e D, [2009-04-26T02:23:24.219830 #9154] DEBUG -- : MessageHeader: expires => D, [2009-04-26T02:23:24.219882 #9154] DEBUG -- : MessageObject: hoge

サーバを終了させるにはCtrl+Cを押します。
現在のメッセージの処理が完了すると、QueueManagerがシャットダウンされます。

I, [2009-04-26T02:23:30.856519 #9154] INFO -- : SIGINT trapped. I, [2009-04-26T02:23:34.228837 #9154] INFO -- : Stopped queue manager at: druby://0.0.0.0:6438 I, [2009-04-26T02:23:34.228932 #9154] INFO -- : QueueManager terminated.

あとは、具体的な処理を書いたり、ThreadやProcessを使って多重化するなり自由自在です。

シェルスクリプトでさらに起動と停止を便利にする

運用の時にサーバの上げ下げが面倒なのは嫌なので、start.shとstop.shで一発で起動と停止が出来るようにしてみました。

qmtest/start.sh

#!/bin/bash BASENAME=`basename $0 .sh` DIRNAME=`dirname $0` PID_FILE="server.pid" cd $DIRNAME if [ -f $PID_FILE ]; then STORED_PID=`cat $PID_FILE` if (ps -p ${STORED_PID} -o pid= >/dev/null); then echo "Already started. (PID: $STORED_PID)" exit fi fi ruby server.rb >> server.log 2>&1 & QM_PID=$! echo $QM_PID > $PID_FILE echo "QueueManager started. (PID: $QM_PID)"

qmtest/stop.sh

#!/bin/bash BASENAME=`basename $0 .sh` DIRNAME=`dirname $0` PID_FILE="server.pid" cd $DIRNAME if [ -f $PID_FILE ]; then STORED_PID=`cat $PID_FILE` if (ps -p ${STORED_PID} -o pid= >/dev/null); then kill -2 $STORED_PID echo -n "Signal sent, waiting for exit..." fi while (ps -p ${STORED_PID} -o pid= >/dev/null); do sleep 3 echo -n "." done echo "" fi rm -f $PID_FILE echo "done!"

参考
 ・AP4R で Parallels の壁を越える
 ・Rails Wiki - ReliableMsg


    トラックバックについて
    Trackback URL:
    お気軽にどうぞ。トラックバック前にポリシーをお読みください。[policy]
    このエントリへのTrackbackにはこのURLが必要です→https://blog.cles.jp/item/3010
    Trackbacks
    このエントリにトラックバックはありません
    Comments
    愛のあるツッコミをお気軽にどうぞ。[policy]
    古いエントリについてはコメント制御しているため、即時に反映されないことがあります。
    コメントはありません
    Comments Form

    コメントは承認後の表示となります。
    OpenIDでログインすると、即時に公開されます。

    OpenID を使ってログインすることができます。

    Identity URL: Yahoo! JAPAN IDでログイン