2012年8月20日月曜日

apache qpid アドレスのオプションキーワード

apache qpid 概要と初期設定
apache qpid 基本的なメッセージ操作
apache qpid アドレスの指定

の続き。

参考にしているのは「Programming in Apache Qpid


オプションキーワード

アドレスにはいくつかの便利なオプションが指定できる。

address_string ::= <address> [ / <subject> ] [ ; <options> ]
options ::= { <key> : <value>, ... }

・assert
接続先がqueueやexchangeでなければならない場合に指定する。

・create
接続時にアドレスに指定したキューが存在しない場合は作成する。

・browse
キューにメッセージを残したままメッセージを受け取る。

・x-bindings


assert

assertは接続時に相手がexhangesなのかqueueなのかを宣言できる。宣言したタイプと違うモノに接続された場合はエラーになる。

$ qpid-config add queue my-queue
$ qpid-config add exchange topic my-topic
$ qpid-config exchanges
Type      Exchange Name       Attributes
==================================================
direct                       
direct    amq.direct          --durable
fanout    amq.fanout          --durable
headers   amq.match           --durable
topic     amq.topic           --durable
topic     hello-world        
topic     my-topic           
topic     news-service       
direct    qmf.default.direct 
topic     qmf.default.topic  
topic     qpid.management    

$ qpid-config queues
Queue Name                                Attributes
=================================================================
6c973ec3-cd13-4469-b354-9645942fb810:0.0  auto-del excl 
my-queue                                  
qmfc-v2-hb-openstack.4254.1               auto-del excl --limit-policy=ring 
qmfc-v2-openstack.4254.1                  auto-del excl 
qmfc-v2-ui-openstack.4254.1               auto-del excl --limit-policy=ring 
que-test1                                 
reply-openstack.4254.1                    auto-del excl 
topic-openstack.4254.1                    auto-del excl --limit-policy=ring


$ ./drain 'my-queue; {assert: always, node:{ type: queue }}'
$ ./drain 'my-queue; {assert: always, node:{ type: topic }}'
Traceback (most recent call last):
  File "./drain", line 81, in <module>
    rcv = ssn.receiver(addr)
  File "<string>", line 6, in receiver
  File "/usr/lib/python2.7/site-packages/qpid/messaging/endpoints.py", line 619, in receiver
    raise e
qpid.messaging.exceptions.AssertionFailed: expected topic, got queue


$ ./drain 'my-topic; {assert: always, node:{ type: topic }}'
$ ./drain 'my-topic; {assert: always, node:{ type: queue }}'
Traceback (most recent call last):
  File "./drain", line 81, in <module>
    rcv = ssn.receiver(addr)
  File "<string>", line 6, in receiver
  File "/usr/lib/python2.7/site-packages/qpid/messaging/endpoints.py", line 619, in receiver
    raise e
qpid.messaging.exceptions.AssertionFailed: expected queue, got topic

いずれの場合も2回目の接続は宣言と、実際の相手のタイプが違うのでエラーとなっている。
自分が何からデータを受け取るのか明確に指定できるので、必ずしてしたほうが良さそう?


create

createは接続先に指定したキューが存在しない場合に自動的に作成する。

$ qpid-config
Total Exchanges: 11
          topic: 6
        headers: 1
         fanout: 1
         direct: 3

   Total Queues: 8
        durable: 0
    non-durable: 8

$ qpid-config exchanges
Type      Exchange Name       Attributes
==================================================
direct
direct    amq.direct          --durable
fanout    amq.fanout          --durable
headers   amq.match           --durable
topic     amq.topic           --durable
topic     hello-world
topic     my-topic
topic     news-service
direct    qmf.default.direct
topic     qmf.default.topic
topic     qpid.management

$ qpid-config queues
Queue Name                                Attributes
=================================================================
de73c8ff-6a74-4288-bf0d-9b94d1cfe064:0.0  auto-del excl
my-queue
qmfc-v2-hb-openstack.4254.1               auto-del excl --limit-policy=ring
qmfc-v2-openstack.4254.1                  auto-del excl
qmfc-v2-ui-openstack.4254.1               auto-del excl --limit-policy=ring
que-test1
reply-openstack.4254.1                    auto-del excl
topic-openstack.4254.1                    auto-del excl --limit-policy=ring



$ ./drain -t 30 "xoxox ; {create: always}"



$ qpid-config
Total Exchanges: 11
          topic: 6
        headers: 1
         fanout: 1
         direct: 3

   Total Queues: 9
        durable: 0
    non-durable: 9

$ qpid-config exchanges
Type      Exchange Name       Attributes
==================================================
direct                       
direct    amq.direct          --durable
fanout    amq.fanout          --durable
headers   amq.match           --durable
topic     amq.topic           --durable
topic     hello-world        
topic     my-topic           
topic     news-service       
direct    qmf.default.direct 
topic     qmf.default.topic  
topic     qpid.management    

$ qpid-config queues
Queue Name                                Attributes
=================================================================
b4fc1aae-820b-4262-9c68-ce89700bc4a4:0.0  auto-del excl 
my-queue                                  
qmfc-v2-hb-openstack.4254.1               auto-del excl --limit-policy=ring 
qmfc-v2-openstack.4254.1                  auto-del excl 
qmfc-v2-ui-openstack.4254.1               auto-del excl --limit-policy=ring 
que-test1                                 
reply-openstack.4254.1                    auto-del excl 
topic-openstack.4254.1                    auto-del excl --limit-policy=ring 
xoxox


browse

browseはキューにメッセージを残したままメッセージを受け取ることができる。

通常は

$ ./spout my-queue one
Message(properties={'spout-id': '86f0195a-ec1c-4efb-a1ad-42c60cb83b92:0'}, content='one')

$ ./spout my-queue two
Message(properties={'spout-id': '31e057d6-4cb6-4797-a715-f21310f9331c:0'}, content='two')

$ ./spout my-queue three
Message(properties={'spout-id': '2ded7074-3d21-4d14-a774-6a1c93f03dbf:0'}, content='three')

$ ./drain my-queue
Message(properties={'x-amqp-0-10.routing-key': u'my-queue', u'spout-id': u'86f0195a-ec1c-4efb-a1ad-42c60cb83b92:0'}, content='one')
Message(properties={'x-amqp-0-10.routing-key': u'my-queue', u'spout-id': u'31e057d6-4cb6-4797-a715-f21310f9331c:0'}, content='two')
Message(properties={'x-amqp-0-10.routing-key': u'my-queue', u'spout-id': u'2ded7074-3d21-4d14-a774-6a1c93f03dbf:0'}, content='three')

$ ./drain my-queue

このように一旦受信してしまうとキューは削除されてしまうが、browseを指定すると。


$ ./spout my-queue one
Message(properties={'spout-id': 'de3c4003-59ac-4567-b5c9-546cff782dd9:0'}, content='one')

$ ./spout my-queue two
Message(properties={'spout-id': '8d96cad4-7372-41fc-9b50-91e18f7b3980:0'}, content='two')

$ ./spout my-queue three
Message(properties={'spout-id': 'dd84221c-2c9f-4970-b754-377889e28d03:0'}, content='three')

$ ./drain 'my-queue; {mode: browse}'
Message(properties={'x-amqp-0-10.routing-key': u'my-queue', u'spout-id': u'de3c4003-59ac-4567-b5c9-546cff782dd9:0'}, content='one')
Message(properties={'x-amqp-0-10.routing-key': u'my-queue', u'spout-id': u'8d96cad4-7372-41fc-9b50-91e18f7b3980:0'}, content='two')
Message(properties={'x-amqp-0-10.routing-key': u'my-queue', u'spout-id': u'dd84221c-2c9f-4970-b754-377889e28d03:0'}, content='three')

$ ./drain 'my-queue; {mode: browse}'
Message(properties={'x-amqp-0-10.routing-key': u'my-queue', u'spout-id': u'de3c4003-59ac-4567-b5c9-546cff782dd9:0'}, content='one')
Message(properties={'x-amqp-0-10.routing-key': u'my-queue', u'spout-id': u'8d96cad4-7372-41fc-9b50-91e18f7b3980:0'}, content='two')
Message(properties={'x-amqp-0-10.routing-key': u'my-queue', u'spout-id': u'dd84221c-2c9f-4970-b754-377889e28d03:0'}, content='three')

$ ./drain my-queue
Message(properties={'x-amqp-0-10.routing-key': u'my-queue', u'spout-id': u'de3c4003-59ac-4567-b5c9-546cff782dd9:0'}, content='one')
Message(properties={'x-amqp-0-10.routing-key': u'my-queue', u'spout-id': u'8d96cad4-7372-41fc-9b50-91e18f7b3980:0'}, content='two')
Message(properties={'x-amqp-0-10.routing-key': u'my-queue', u'spout-id': u'dd84221c-2c9f-4970-b754-377889e28d03:0'}, content='three')

$ ./drain my-queue

このようにメッセージが削除されなくなる。

以下のように指定するとキューを監視することができる。
$ ./drain -t 600 'my-queue; {mode: browse}'


x-bindings

ちょっと長くなるので別記載で掲載予定


複数のオプション指定

これらキーワードは複数指定することができる。

$ ./drain -t 600 'not-exist-queue; {mode: browse, type: queue}'

0 件のコメント:

コメントを投稿