apache qpid 基本的なメッセージ操作
apache qpid アドレスの指定
の続き。
参考にしているのは「Programming in Apache Qpid」
オプションキーワード
アドレスにはいくつかの便利なオプションが指定できる。
address_string ::= <address> [ / <subject> ] [ ; <options> ]
options ::= { <key> : <value>, ... }
・assert
接続先がqueueやexchangeでなければならない場合に指定する。
・create
接続時にアドレスに指定したキューが存在しない場合は作成する。
・browse
キューにメッセージを残したままメッセージを受け取る。
・x-bindings
assert
assertは接続時に相手がexhangesなのかqueueなのかを宣言できる。宣言したタイプと違うモノに接続された場合はエラーになる。
$ qpid-config add queue my-queue
$ qpid-config add exchange topic my-topic
$ qpid-config exchanges
Type Exchange Name Attributes ================================================== direct direct amq.direct --durable fanout amq.fanout --durable headers amq.match --durable topic amq.topic --durable topic hello-world topic my-topic topic news-service direct qmf.default.direct topic qmf.default.topic topic qpid.management
$ qpid-config queues
Queue Name Attributes ================================================================= 6c973ec3-cd13-4469-b354-9645942fb810:0.0 auto-del excl my-queue qmfc-v2-hb-openstack.4254.1 auto-del excl --limit-policy=ring qmfc-v2-openstack.4254.1 auto-del excl qmfc-v2-ui-openstack.4254.1 auto-del excl --limit-policy=ring que-test1 reply-openstack.4254.1 auto-del excl topic-openstack.4254.1 auto-del excl --limit-policy=ring
$ ./drain 'my-queue; {assert: always, node:{ type: queue }}'
$ ./drain 'my-queue; {assert: always, node:{ type: topic }}'
Traceback (most recent call last): File "./drain", line 81, in <module> rcv = ssn.receiver(addr) File "<string>", line 6, in receiver File "/usr/lib/python2.7/site-packages/qpid/messaging/endpoints.py", line 619, in receiver raise e qpid.messaging.exceptions.AssertionFailed: expected topic, got queue
$ ./drain 'my-topic; {assert: always, node:{ type: topic }}'
$ ./drain 'my-topic; {assert: always, node:{ type: queue }}'
Traceback (most recent call last): File "./drain", line 81, in <module> rcv = ssn.receiver(addr) File "<string>", line 6, in receiver File "/usr/lib/python2.7/site-packages/qpid/messaging/endpoints.py", line 619, in receiver raise e qpid.messaging.exceptions.AssertionFailed: expected queue, got topic
いずれの場合も2回目の接続は宣言と、実際の相手のタイプが違うのでエラーとなっている。
自分が何からデータを受け取るのか明確に指定できるので、必ずしてしたほうが良さそう?
create
createは接続先に指定したキューが存在しない場合に自動的に作成する。
$ qpid-config
Total Exchanges: 11 topic: 6 headers: 1 fanout: 1 direct: 3 Total Queues: 8 durable: 0 non-durable: 8
$ qpid-config exchanges
Type Exchange Name Attributes ================================================== direct direct amq.direct --durable fanout amq.fanout --durable headers amq.match --durable topic amq.topic --durable topic hello-world topic my-topic topic news-service direct qmf.default.direct topic qmf.default.topic topic qpid.management
$ qpid-config queues
Queue Name Attributes ================================================================= de73c8ff-6a74-4288-bf0d-9b94d1cfe064:0.0 auto-del excl my-queue qmfc-v2-hb-openstack.4254.1 auto-del excl --limit-policy=ring qmfc-v2-openstack.4254.1 auto-del excl qmfc-v2-ui-openstack.4254.1 auto-del excl --limit-policy=ring que-test1 reply-openstack.4254.1 auto-del excl topic-openstack.4254.1 auto-del excl --limit-policy=ring
$ ./drain -t 30 "xoxox ; {create: always}"
$ qpid-config
Total Exchanges: 11 topic: 6 headers: 1 fanout: 1 direct: 3 Total Queues: 9 durable: 0 non-durable: 9
$ qpid-config exchanges
Type Exchange Name Attributes ================================================== direct direct amq.direct --durable fanout amq.fanout --durable headers amq.match --durable topic amq.topic --durable topic hello-world topic my-topic topic news-service direct qmf.default.direct topic qmf.default.topic topic qpid.management
$ qpid-config queues
Queue Name Attributes ================================================================= b4fc1aae-820b-4262-9c68-ce89700bc4a4:0.0 auto-del excl my-queue qmfc-v2-hb-openstack.4254.1 auto-del excl --limit-policy=ring qmfc-v2-openstack.4254.1 auto-del excl qmfc-v2-ui-openstack.4254.1 auto-del excl --limit-policy=ring que-test1 reply-openstack.4254.1 auto-del excl topic-openstack.4254.1 auto-del excl --limit-policy=ring xoxox
browse
browseはキューにメッセージを残したままメッセージを受け取ることができる。
通常は
$ ./spout my-queue one
Message(properties={'spout-id': '86f0195a-ec1c-4efb-a1ad-42c60cb83b92:0'}, content='one')
$ ./spout my-queue two
Message(properties={'spout-id': '31e057d6-4cb6-4797-a715-f21310f9331c:0'}, content='two')
$ ./spout my-queue three
Message(properties={'spout-id': '2ded7074-3d21-4d14-a774-6a1c93f03dbf:0'}, content='three')
$ ./drain my-queue
Message(properties={'x-amqp-0-10.routing-key': u'my-queue', u'spout-id': u'86f0195a-ec1c-4efb-a1ad-42c60cb83b92:0'}, content='one') Message(properties={'x-amqp-0-10.routing-key': u'my-queue', u'spout-id': u'31e057d6-4cb6-4797-a715-f21310f9331c:0'}, content='two') Message(properties={'x-amqp-0-10.routing-key': u'my-queue', u'spout-id': u'2ded7074-3d21-4d14-a774-6a1c93f03dbf:0'}, content='three')
$ ./drain my-queue
このように一旦受信してしまうとキューは削除されてしまうが、browseを指定すると。
$ ./spout my-queue one
Message(properties={'spout-id': 'de3c4003-59ac-4567-b5c9-546cff782dd9:0'}, content='one')
$ ./spout my-queue two
Message(properties={'spout-id': '8d96cad4-7372-41fc-9b50-91e18f7b3980:0'}, content='two')
$ ./spout my-queue three
Message(properties={'spout-id': 'dd84221c-2c9f-4970-b754-377889e28d03:0'}, content='three')
$ ./drain 'my-queue; {mode: browse}'
Message(properties={'x-amqp-0-10.routing-key': u'my-queue', u'spout-id': u'de3c4003-59ac-4567-b5c9-546cff782dd9:0'}, content='one') Message(properties={'x-amqp-0-10.routing-key': u'my-queue', u'spout-id': u'8d96cad4-7372-41fc-9b50-91e18f7b3980:0'}, content='two') Message(properties={'x-amqp-0-10.routing-key': u'my-queue', u'spout-id': u'dd84221c-2c9f-4970-b754-377889e28d03:0'}, content='three')
$ ./drain 'my-queue; {mode: browse}'
Message(properties={'x-amqp-0-10.routing-key': u'my-queue', u'spout-id': u'de3c4003-59ac-4567-b5c9-546cff782dd9:0'}, content='one') Message(properties={'x-amqp-0-10.routing-key': u'my-queue', u'spout-id': u'8d96cad4-7372-41fc-9b50-91e18f7b3980:0'}, content='two') Message(properties={'x-amqp-0-10.routing-key': u'my-queue', u'spout-id': u'dd84221c-2c9f-4970-b754-377889e28d03:0'}, content='three')
$ ./drain my-queue
Message(properties={'x-amqp-0-10.routing-key': u'my-queue', u'spout-id': u'de3c4003-59ac-4567-b5c9-546cff782dd9:0'}, content='one') Message(properties={'x-amqp-0-10.routing-key': u'my-queue', u'spout-id': u'8d96cad4-7372-41fc-9b50-91e18f7b3980:0'}, content='two') Message(properties={'x-amqp-0-10.routing-key': u'my-queue', u'spout-id': u'dd84221c-2c9f-4970-b754-377889e28d03:0'}, content='three')
$ ./drain my-queue
このようにメッセージが削除されなくなる。
以下のように指定するとキューを監視することができる。
$ ./drain -t 600 'my-queue; {mode: browse}'
x-bindings
ちょっと長くなるので別記載で掲載予定
複数のオプション指定
これらキーワードは複数指定することができる。
$ ./drain -t 600 'not-exist-queue; {mode: browse, type: queue}'
0 件のコメント:
コメントを投稿