BLOGTIMES
2020/12/25

Python で非同期の STOMP クライアントを書く

  python 
このエントリーをはてなブックマークに追加

以前、Python で非同期の STOMP クライアントを書くために stomp.pythreading を書いてみたことがあったのですが、どうしても安定して動作させることができなくて結局 node.js で実装したことがありましたが、今回は stompest.async という違うライブラリを使って再チャレンジしてみました。

nikipore/stompest: STOMP client library for Python including both synchronous and Twisted implementations.

stompest is a full-featured STOMP 1.0, 1.1, and 1.2 implementation for Python 2.7 and Python 3 (versions 3.3 and higher) including both synchronous and asynchronous clients:

stompest.async は Twisted という Python のイベントドリブンなフレームワークを使って書かれており、動作自体は非常に安定しています。欠点としては async という名前が Python 3.7 から予約語になった*1ので、現在のリリースされているパッケージはそのままでは Python 3.6 でしか動かすことができないということでしょうか。

今回書いたコードはサンプルほぼそのままですが、 Heart Beat を有効化したかったので、プロトコルを STOMP v1.2 にするために以下のような感じにしました。

Consumer.py

# -*- coding: utf-8 -*- import json import logging from twisted.internet import defer, reactor from stompest.config import StompConfig from stompest.protocol import StompSpec from stompest.async import Stomp from stompest.async.listener import SubscriptionListener class Consumer(object): QUEUE = '/queue/testOut' ERROR_QUEUE = '/queue/testConsumerError' def __init__(self, config=None): if config is None: config = StompConfig('tcp://localhost:61613', version=StompSpec.VERSION_1_2) self.config = config self.amq_id = 1 @defer.inlineCallbacks def run(self): client = Stomp(self.config) yield client.connect(heartBeats = (600000, 600000)) headers = { # client-individual mode is necessary for concurrent processing # (requires ActiveMQ >= 5.2) StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL, StompSpec.ID_HEADER: str(self.amq_id), # the maximal number of messages the broker will let you work on at the same time 'activemq.prefetchSize': '100', } self.amq_id += 1 client.subscribe(self.QUEUE, headers, listener=SubscriptionListener(self.consume, errorDestination=self.ERROR_QUEUE)) def consume(self, client, frame): """ NOTE: you can return a Deferred here """ data = json.loads(frame.body.decode()) print('Received frame with count %d' % data['count']) if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) Consumer().run() reactor.run()

トラックバックについて
Trackback URL:
お気軽にどうぞ。トラックバック前にポリシーをお読みください。[policy]
このエントリへのTrackbackにはこのURLが必要です→https://blog.cles.jp/item/12190
Trackbacks
このエントリにトラックバックはありません
Comments
愛のあるツッコミをお気軽にどうぞ。[policy]
古いエントリについてはコメント制御しているため、即時に反映されないことがあります。
コメントはありません
Comments Form

コメントは承認後の表示となります。
OpenIDでログインすると、即時に公開されます。

OpenID を使ってログインすることができます。

Identity URL: Yahoo! JAPAN IDでログイン