仕事でバッチの多重起動を防止するスクリプトを書いたのですが、これだと後続のバッチがキャンセルされてしまって実行されないので、ジョブをQueueに溜めて逐次実行してくれる仕組みが欲しくなったので、Rubyを使ってMQサーバを書いてみました。
RubyにはQueueクラスがあるので、これを使ってdrbからメッセージを投げ込むようにすれば簡単なMQサーバはすぐにできるのですが、これだとメッセージの永続化機能がないので、不慮の事故でマシンの電源が落ちたりするとそのままメッセージが消滅してしまいます。それだと色々と問題があるので、今回はap4rのバックエンドにも使われている永続化機能を持ったメッセージキューであるreliable-msgを使います。
僕がかつて使っていたIBM MQ Series ( Webshere MQ Series )と比べればおもちゃのようなものですが、ちょっと使う分にはこれで十分です。
† reliable-msgのインストールとパッチ当て
まず、gemでreliable-msgをインストールします。
# gem install reliable-msg
gemでインストールされるreliable-msgはUUID 2.0系で動かないなどのいくつかの不具合があるので、下記のパッチを当ててバグを修正します。
/usr/lib/ruby/gems/1.8/gems/reliable-msg-1.1.0/lib/reliable-msg
--- cli.rb.org 2009-04-23 00:31:40.000000000 +0900
+++ cli.rb 2009-04-23 00:52:53.000000000 +0900
@@ -209,8 +209,8 @@
exit
end
drb = Config::DEFAULT_DRB
- drb.merge(config.drb) if config.drb
- drb_uri = "druby://localhost:#{drb['port']}"
+ drb = drb.merge(config.drb) if config.drb
+ drb_uri = "druby://#{drb['host']||'localhost'}:#{drb['port']}"
else
drb_uri = Queue::DEFAULT_DRB_URI
end
--- message-store.rb.org 2009-04-23 00:31:40.000000000 +0900
+++ mescdsage-store.rb 2009-04-23 00:41:20.000000000 +0900
@@ -305,7 +305,7 @@
free = @mutex.synchronize do
@file_free.shift
end
- name = free ? free[0] : "#{@path}/#{UUID.new}.msg"
+ name = free ? free[0] : "#{@path}/#{UUID.generate}.msg"
file = if free && free[1]
free[1]
else
--- queue-manager.rb.org 2009-04-23 00:31:40.000000000 +0900
+++ queue-manager.rb 2009-04-23 00:52:14.000000000 +0900
@@ -233,8 +233,8 @@
# Get the DRb URI from the configuration, or use the default. Create a DRb server.
drb = Config::DEFAULT_DRB
- drb.merge(@config.drb) if @config.drb
- drb_uri = "druby://localhost:#{drb['port']}"
+ drb = drb.merge(@config.drb) if @config.drb
+ drb_uri = "druby://#{drb['host']||'localhost'}:#{drb['port']}"
@drb_server = DRb::DRbServer.new drb_uri, self, :tcp_acl=>ACL.new(drb["acl"].split(" "), ACL::ALLOW_DENY)
@logger.info format(INFO_ACCEPTING_DRB, drb_uri)
@@ -305,7 +305,7 @@
raise ArgumentError, ERROR_SEND_MISSING_QUEUE unless queue and queue.instance_of?(String) and !queue.empty?
time = Time.new.to_i
# TODO: change this to support the RM delivery protocol.
- id = args[:id] || UUID.new
+ id = args[:id] || UUID.generate
created = args[:created] || time
# Validate and freeze the headers. The cloning ensures that the headers we hold in memory
@@ -478,7 +478,7 @@
message, headers, topic, tid = args[:message], args[:headers], args[:topic].downcase, args[:tid]
raise ArgumentError, ERROR_PUBLISH_MISSING_TOPIC unless topic and topic.instance_of?(String) and !topic.empty?
time = Time.new.to_i
- id = args[:id] || UUID.new
+ id = args[:id] || UUID.generate
created = args[:created] || time
# Validate and freeze the headers. The cloning ensures that the headers we hold in memory
@@ -559,7 +559,7 @@
# Called by client to begin a transaction.
def begin timeout
- tid = UUID.new
+ tid = UUID.generate
@transactions[tid] = {:inserts=>[], :deletes=>[], :timeout=>Time.new.to_i + timeout}
tid
end
† サーバとクライアントの作成
スクリプトを配置するための適当なディレクトリと、メッセージが永続化されるディレクトリを作ります。
ココでは前者をqmtest、後者をqmtest/queuesと仮定します。
$ makedir -p qmtest/queues
次に、サーバとクライアントのスクリプト、設定ファイルを用意します。
qmtest/queues.cfg(QueueManagerの設定ファイル)
store:
type: disk
drb:
host: 0.0.0.0
port: 6438
acl: allow 127.0.0.1 allow ::1 allow localhost
qmtest/server.rb(サーバ本体)
#!/usr/bin/ruby -Ku
require 'rubygems'
require 'reliable-msg'
include ReliableMsg
require 'logger'
logger = Logger.new(STDOUT)
qm = QueueManager.new(:config => "queues.cfg", :logger => logger)
qm.start
logger.info "QueueManager started."
q = ReliableMsg::Queue.new "queue.test"
queueEnabled = true
Signal.trap(:INT){
logger.info "SIGINT trapped."
queueEnabled = false
}
while queueEnabled
executed = q.get { |msg|
if msg
msg.headers.each do |k, v|
logger.debug("MessageHeader: #{k} => #{v}")
end
logger.debug("MessageObject: #{msg.object}")
end
}
sleep 10 unless executed
end
qm.stop
logger.info "QueueManager terminated."
qmtest/client.rb(QueueManagerにメッセージを送信するクライアント)
#!/usr/bin/ruby -Ku
require 'rubygems'
require 'reliable-msg'
include ReliableMsg
require 'logger'
logger = Logger.new(STDOUT)
q = ReliableMsg::Queue.new "queue.test"
logger.info q.put("hoge")
† 実際に動作させてみる
実際にサーバを起動して、メッセージを放りこんでみます。このサーバはテスト用なので、10秒ごとにQueueを監視してメッセージがあれば取り出して、メッセージをログに出力するだけで特に何も処理をしません。
コンソールを2つ開き、1つ目のコンソールでqmtestディレクトリに入ってサーバを起動します。
下記のようなログが出力されるはずです。
$ cd qmtest
$ ruby server.rb
I, [2009-04-26T02:23:14.215998 #9154] INFO -- : Loaded queues configuration from: /tmp/qmtest/queues.cfg
I, [2009-04-26T02:23:14.216099 #9154] INFO -- : Using message store: disk
I, [2009-04-26T02:23:14.216701 #9154] INFO -- : Accepting requests at: druby://0.0.0.0:6438
I, [2009-04-26T02:23:14.216792 #9154] INFO -- : QueueManager started.
もう一つのコンソールも同様にして、クライアントを起動させると、投入されたメッセージのIDが出力されるはずです。
$ cd qmtest
$ ruby client.rb
I, [2009-04-26T02:23:23.736733 #9157] INFO -- : b43c53d0-13eb-012c-d180-003048d48a0e
そうすると、1つめのコンソールがメッセージを受け取って下記のようなログを出力するはずです。
D, [2009-04-26T02:23:24.219530 #9154] DEBUG -- : MessageHeader: priority => 0
D, [2009-04-26T02:23:24.219609 #9154] DEBUG -- : MessageHeader: delivery => best_effort
D, [2009-04-26T02:23:24.219667 #9154] DEBUG -- : MessageHeader: created => 1240680203
D, [2009-04-26T02:23:24.219722 #9154] DEBUG -- : MessageHeader: max_deliveries => 5
D, [2009-04-26T02:23:24.219775 #9154] DEBUG -- : MessageHeader: id => b43c53d0-13eb-012c-d180-003048d48a0e
D, [2009-04-26T02:23:24.219830 #9154] DEBUG -- : MessageHeader: expires =>
D, [2009-04-26T02:23:24.219882 #9154] DEBUG -- : MessageObject: hoge
サーバを終了させるにはCtrl+Cを押します。
現在のメッセージの処理が完了すると、QueueManagerがシャットダウンされます。
I, [2009-04-26T02:23:30.856519 #9154] INFO -- : SIGINT trapped.
I, [2009-04-26T02:23:34.228837 #9154] INFO -- : Stopped queue manager at: druby://0.0.0.0:6438
I, [2009-04-26T02:23:34.228932 #9154] INFO -- : QueueManager terminated.
あとは、具体的な処理を書いたり、ThreadやProcessを使って多重化するなり自由自在です。
† シェルスクリプトでさらに起動と停止を便利にする
運用の時にサーバの上げ下げが面倒なのは嫌なので、start.shとstop.shで一発で起動と停止が出来るようにしてみました。
qmtest/start.sh
#!/bin/bash
BASENAME=`basename $0 .sh`
DIRNAME=`dirname $0`
PID_FILE="server.pid"
cd $DIRNAME
if [ -f $PID_FILE ]; then
STORED_PID=`cat $PID_FILE`
if (ps -p ${STORED_PID} -o pid= >/dev/null); then
echo "Already started. (PID: $STORED_PID)"
exit
fi
fi
ruby server.rb >> server.log 2>&1 &
QM_PID=$!
echo $QM_PID > $PID_FILE
echo "QueueManager started. (PID: $QM_PID)"
qmtest/stop.sh
#!/bin/bash
BASENAME=`basename $0 .sh`
DIRNAME=`dirname $0`
PID_FILE="server.pid"
cd $DIRNAME
if [ -f $PID_FILE ]; then
STORED_PID=`cat $PID_FILE`
if (ps -p ${STORED_PID} -o pid= >/dev/null); then
kill -2 $STORED_PID
echo -n "Signal sent, waiting for exit..."
fi
while (ps -p ${STORED_PID} -o pid= >/dev/null); do
sleep 3
echo -n "."
done
echo ""
fi
rm -f $PID_FILE
echo "done!"
† 参考
・AP4R で Parallels の壁を越える
・Rails Wiki - ReliableMsg