apache qpid 基本的なメッセージ操作
apache qpid アドレスの指定
apache qpid アドレスのオプションキーワード
apache qpid - exchange/queue/bindの監視
の続き。
参考にしているのは「Programming in Apache Qpid」
今までのサンプルで作ったexchageはメッセージのストアを行っていない。メッセージを受け取った時に接続しているクライアントがなければメッセージは破棄されてしまう。
これは exchage - queue の binding を行ってないためこのような動作になっている。
ここでは bind によるキューへの配送制御を行なってみる。
exchangeの種類
exchage - bind - queue の配送の定義には、exchage の 「type」が重要になる。typeの種類には以下がある。
・direct
exchage と queue を1:1で接続する。
メッセージに付与されたrouting key = bindに設定された binding key となるように配送を行う。
・fanout
exchage と queue を1:Nで接続する。bind されたqueue全てに配送を行う。
・topic
direct と fanout を合わせて、binding key にワイルドカードが指定できる。
・headers Headers exchange that matches header fields against the binding keys
binding keyに対してヘッダーがマッチする配送を行う。
・xml
XQueryを利用したメッセージのフィルタリングを行う。
以下では基本的な direct fanout topic の例を紹介する。
fanout
まずは最もわかりやすいfanoutから。
exchange/queueの作成
$ qpid-config add exchange fanout fanout-ex
$ qpid-config add queue queue1
$ qpid-config add queue queue2
$ qpid-config add queue queue3
$ qpid-config add queue queue4
$ qpid-config add queue queue5
exchange/queueのbinding
$ qpid-config bind fanout-ex queue1
$ qpid-config bind fanout-ex queue2
$ qpid-config bind fanout-ex queue3
$ qpid-config bind fanout-ex queue4
$ qpid-config bind fanout-ex queue5
$ qpid-config -r exchanges fanout
Exchange 'amq.fanout' (fanout) Exchange 'fanout-ex' (fanout) bind [] => queue1 bind [] => queue2 bind [] => queue3 bind [] => queue4 bind [] => queue5
メッセージの配信
$ ./spout fanout-ex msg1111
Message(properties={'spout-id': '08bc6430-dc8c-4d47-9407-20010b1f406e:0'}, content='msg1111')
メッセージの受信
$ ./drain queue1
Message(properties={u'spout-id': u'08bc6430-dc8c-4d47-9407-20010b1f406e:0'}, content='msg1111')
$ ./drain queue2
Message(properties={u'spout-id': u'08bc6430-dc8c-4d47-9407-20010b1f406e:0'}, content='msg1111')
$ ./drain queue3
Message(properties={u'spout-id': u'08bc6430-dc8c-4d47-9407-20010b1f406e:0'}, content='msg1111')
$ ./drain queue4
Message(properties={u'spout-id': u'08bc6430-dc8c-4d47-9407-20010b1f406e:0'}, content='msg1111')
$ ./drain queue5
Message(properties={u'spout-id': u'08bc6430-dc8c-4d47-9407-20010b1f406e:0'}, content='msg1111')
このようにbindされたqueue全てでメッセージが受け取れるようになる。
direct
direct は メッセージに設定されたrouting key = binding key となる配送を行う。1:1 配送が基本の考え方だが、複数のbindを与えてやることで1:Nも可能。
exchageの作成
$ qpid-config add exchange direct direct-ex
$ qpid-config bind direct-ex queue1 binding_key_1000
$ qpid-config bind direct-ex queue2 binding_key_2000
$ qpid-config bind direct-ex queue3 binding_key_3000
$ qpid-config bind direct-ex queue4 binding_key_3000
$ qpid-config bind direct-ex queue5 binding_key_3000
$ qpid-config -r exchanges direct
Exchange 'amq.direct' (direct) Exchange 'direct-ex' (direct) bind [binding_key_1000] => queue1 bind [binding_key_2000] => queue2 bind [binding_key_3000] => queue3 bind [binding_key_3000] => queue4 bind [binding_key_3000] => queue5 Exchange 'qmf.default.direct' (direct)
メッセージの配送
$ ./spout -S binding_key_1000 direct-ex msg1000
Message(subject='binding_key_1000', properties={'qpid.subject': 'binding_key_1000', 'spout-id': '7ae5b0c5-41a5-40e2-ac3b-02a77b2c2636:0'}, content='msg1000')
$ ./spout -S binding_key_2000 direct-ex msg2000
Message(subject='binding_key_2000', properties={'qpid.subject': 'binding_key_2000', 'spout-id': 'e36f0d34-c0ce-4636-be88-ec01963816a7:0'}, content='msg2000')
$ ./spout -S binding_key_3000 direct-ex msg3000
Message(subject='binding_key_3000', properties={'qpid.subject': 'binding_key_3000', 'spout-id': '1721fb7c-40d4-46d0-830c-bb13e31560cf:0'}, content='msg3000')
メッセージの受信
$ ./drain queue1
Message(subject=u'binding_key_1000', properties={u'qpid.subject': u'binding_key_1000', 'x-amqp-0-10.routing-key': u'binding_key_1000', u'spout-id': u'7ae5b0c5-41a5-40e2-ac3b-02a77b2c2636:0'}, content='msg1000')
$ ./drain queue2
Message(subject=u'binding_key_2000', properties={u'qpid.subject': u'binding_key_2000', 'x-amqp-0-10.routing-key': u'binding_key_2000', u'spout-id': u'e36f0d34-c0ce-4636-be88-ec01963816a7:0'}, content='msg2000')
$ ./drain queue3
Message(subject=u'binding_key_3000', properties={u'qpid.subject': u'binding_key_3000', 'x-amqp-0-10.routing-key': u'binding_key_3000', u'spout-id': u'1721fb7c-40d4-46d0-830c-bb13e31560cf:0'}, content='msg3000')
$ ./drain queue4
Message(subject=u'binding_key_3000', properties={u'qpid.subject': u'binding_key_3000', 'x-amqp-0-10.routing-key': u'binding_key_3000', u'spout-id': u'1721fb7c-40d4-46d0-830c-bb13e31560cf:0'}, content='msg3000')
$ ./drain queue5
Message(subject=u'binding_key_3000', properties={u'qpid.subject': u'binding_key_3000', 'x-amqp-0-10.routing-key': u'binding_key_3000', u'spout-id': u'1721fb7c-40d4-46d0-830c-bb13e31560cf:0'}, content='msg3000')
サブジェクトに指定されたキーがbindingキーとして使われる。そのためこのようにも指定できる。
$ ./spout direct-ex/binding_key_1000 msg1010
Message(properties={'qpid.subject': 'binding_key_1000', 'spout-id': '8e50dea5-f93f-41f7-9432-54d5b72657ad:0'}, content='msg1010')
$ ./spout direct-ex/binding_key_1000.direct-ex/binding_key_1000.binding_key_2000 msg1020
Message(properties={'qpid.subject': 'binding_key_1000.direct-ex/binding_key_1000.binding_key_2000', 'spout-id': '7fca2c8a-e0a3-4026-878c-e0e3fa663a5e:0'}, content='msg1020')
$ ./drain queue1
Message(subject=u'binding_key_1000', properties={u'qpid.subject': u'binding_key_1000', 'x-amqp-0-10.routing-key': u'binding_key_1000', u'spout-id': u'8e50dea5-f93f-41f7-9432-54d5b72657ad:0'}, content='msg1010')
$ ./drain queue2
$ ./drain queue3
$ ./drain queue4
$ ./drain queue5
2つ目に指定している ./spout direct-ex/binding_key_1000.direct-ex/binding_key_1000.binding_key_2000 は2つのサブジェクトを設定しているが、directの場合はこれではマッチしない。こういった場合にマッチさせるには次の topic を使う。
topic
ワイルドカードマッチを使った配送を行いたい場合は topic を利用する。
$ qpid-config add exchange topic topic-ex
$ qpid-config bind topic-ex queue1 'news'
$ qpid-config bind topic-ex queue2 '#.news'
$ qpid-config bind topic-ex queue3 '*.news'
$ qpid-config bind topic-ex queue4 '#.asia.news'
$ qpid-config bind topic-ex queue5 '*.asia.news'
$ qpid-config -r exchanges topic-ex
Exchange 'topic-ex' (topic) bind [news] => queue1 bind [#.news] => queue2 bind [*.news] => queue3 bind [#.asia.news] => queue4 bind [*.asia.news] => queue5
メッセージの配送
$ ./spout topic-ex/news msg1000
Message(properties={'qpid.subject': 'news', 'spout-id': 'ba583ea3-1513-41a7-ba7f-3e5f9ad319c8:0'}, content='msg1000')
$ ./spout topic-ex/asia.news msg2000
Message(properties={'qpid.subject': 'asia.news', 'spout-id': '53760a6a-541a-4e9b-8dd1-99033177938f:0'}, content='msg2000')
$ ./spout topic-ex/euro.news msg3000
Message(properties={'qpid.subject': 'euro.news', 'spout-id': '19176e2a-e047-476b-ae26-2570bdc9f343:0'}, content='msg3000')
$ ./spout topic-ex/japan.asia.news msg4000
Message(properties={'qpid.subject': 'japan.asia.news', 'spout-id': '9c33afbb-3753-415d-bd42-3a7a08499150:0'}, content='msg4000')
$ ./spout topic-ex/japan.euro.news msg5000
Message(properties={'qpid.subject': 'japan.euro.news', 'spout-id': '585df0ec-0566-4da8-b7ea-4714a91412b9:0'}, content='msg5000')
$ ./spout topic-ex/tokyo.japan.asia.news msg6000
Message(properties={'qpid.subject': 'tokyo.japan.asia.news', 'spout-id': '23e408a4-016c-4365-a35e-787ec6ebdc63:0'}, content='msg6000')
$ ./spout topic-ex/akiba.tokyo.japan.asia.news msg7000
Message(properties={'qpid.subject': 'akiba.tokyo.japan.asia.news', 'spout-id': 'd27646c5-b5ce-4c44-99af-0565a387b567:0'}, content='msg7000')
メッセージの受信
$ ./drain queue1
Message(subject=u'news', properties={u'qpid.subject': u'news', 'x-amqp-0-10.routing-key': u'news', u'spout-id': u'ba583ea3-1513-41a7-ba7f-3e5f9ad319c8:0'}, content='msg1000')
$ ./drain queue2
Message(subject=u'news', properties={u'qpid.subject': u'news', 'x-amqp-0-10.routing-key': u'news', u'spout-id': u'ba583ea3-1513-41a7-ba7f-3e5f9ad319c8:0'}, content='msg1000') Message(subject=u'asia.news', properties={u'qpid.subject': u'asia.news', 'x-amqp-0-10.routing-key': u'asia.news', u'spout-id': u'53760a6a-541a-4e9b-8dd1-99033177938f:0'}, content='msg2000') Message(subject=u'euro.news', properties={u'qpid.subject': u'euro.news', 'x-amqp-0-10.routing-key': u'euro.news', u'spout-id': u'19176e2a-e047-476b-ae26-2570bdc9f343:0'}, content='msg3000') Message(subject=u'japan.asia.news', properties={u'qpid.subject': u'japan.asia.news', 'x-amqp-0-10.routing-key': u'japan.asia.news', u'spout-id': u'9c33afbb-3753-415d-bd42-3a7a08499150:0'}, content='msg4000') Message(subject=u'japan.euro.news', properties={u'qpid.subject': u'japan.euro.news', 'x-amqp-0-10.routing-key': u'japan.euro.news', u'spout-id': u'585df0ec-0566-4da8-b7ea-4714a91412b9:0'}, content='msg5000') Message(subject=u'tokyo.japan.asia.news', properties={u'qpid.subject': u'tokyo.japan.asia.news', 'x-amqp-0-10.routing-key': u'tokyo.japan.asia.news', u'spout-id': u'23e408a4-016c-4365-a35e-787ec6ebdc63:0'}, content='msg6000') Message(subject=u'akiba.tokyo.japan.asia.news', properties={u'qpid.subject': u'akiba.tokyo.japan.asia.news', 'x-amqp-0-10.routing-key': u'akiba.tokyo.japan.asia.news', u'spout-id': u'd27646c5-b5ce-4c44-99af-0565a387b567:0'}, content='msg7000')
$ ./drain queue3
Message(subject=u'asia.news', properties={u'qpid.subject': u'asia.news', 'x-amqp-0-10.routing-key': u'asia.news', u'spout-id': u'53760a6a-541a-4e9b-8dd1-99033177938f:0'}, content='msg2000') Message(subject=u'euro.news', properties={u'qpid.subject': u'euro.news', 'x-amqp-0-10.routing-key': u'euro.news', u'spout-id': u'19176e2a-e047-476b-ae26-2570bdc9f343:0'}, content='msg3000')
$ ./drain queue4
Message(subject=u'asia.news', properties={u'qpid.subject': u'asia.news', 'x-amqp-0-10.routing-key': u'asia.news', u'spout-id': u'53760a6a-541a-4e9b-8dd1-99033177938f:0'}, content='msg2000') Message(subject=u'japan.asia.news', properties={u'qpid.subject': u'japan.asia.news', 'x-amqp-0-10.routing-key': u'japan.asia.news', u'spout-id': u'9c33afbb-3753-415d-bd42-3a7a08499150:0'}, content='msg4000') Message(subject=u'tokyo.japan.asia.news', properties={u'qpid.subject': u'tokyo.japan.asia.news', 'x-amqp-0-10.routing-key': u'tokyo.japan.asia.news', u'spout-id': u'23e408a4-016c-4365-a35e-787ec6ebdc63:0'}, content='msg6000') Message(subject=u'akiba.tokyo.japan.asia.news', properties={u'qpid.subject': u'akiba.tokyo.japan.asia.news', 'x-amqp-0-10.routing-key': u'akiba.tokyo.japan.asia.news', u'spout-id': u'd27646c5-b5ce-4c44-99af-0565a387b567:0'}, content='msg7000')
$ ./drain queue5
Message(subject=u'japan.asia.news', properties={u'qpid.subject': u'japan.asia.news', 'x-amqp-0-10.routing-key': u'japan.asia.news', u'spout-id': u'9c33afbb-3753-415d-bd42-3a7a08499150:0'}, content='msg4000')
組み合わせ
exchage:bind:queueの関係は L:M:N で定義できる。
上のサンプルを実行するとキューに対して複数のbindが定義されていることが確認できる。
$ qpid-config -r queues queue
Queue 'queue1' bind [queue1] => '' bind [binding_key_1000] => direct-ex bind [] => fanout-queue bind [news] => topic-ex Queue 'queue2' bind [queue2] => '' bind [binding_key_2000] => direct-ex bind [] => fanout-queue bind [#.news] => topic-ex Queue 'queue3' bind [queue3] => '' bind [binding_key_3000] => direct-ex bind [] => fanout-queue bind [*.news] => topic-ex Queue 'queue4' bind [queue4] => '' bind [binding_key_3000] => direct-ex bind [] => fanout-queue bind [#.asia.news] => topic-ex Queue 'queue5' bind [queue5] => '' bind [binding_key_3000] => direct-ex bind [] => fanout-queue bind [*.asia.news] => topic-ex
★メッセージの配送設計は非常に重要。
0 件のコメント:
コメントを投稿