2012年8月20日月曜日

apache qpid - exchageの種類と配送制御

apache qpid 概要と初期設定
apache qpid 基本的なメッセージ操作
apache qpid アドレスの指定
apache qpid アドレスのオプションキーワード
apache qpid - exchange/queue/bindの監視

の続き。

参考にしているのは「Programming in Apache Qpid


今までのサンプルで作ったexchageはメッセージのストアを行っていない。メッセージを受け取った時に接続しているクライアントがなければメッセージは破棄されてしまう。

これは exchage - queue の binding を行ってないためこのような動作になっている。

ここでは bind によるキューへの配送制御を行なってみる。

exchangeの種類

exchage - bind - queue の配送の定義には、exchage の 「type」が重要になる。typeの種類には以下がある。

・direct
exchage と queue を1:1で接続する。
メッセージに付与されたrouting key = bindに設定された binding key となるように配送を行う。

・fanout
exchage と queue を1:Nで接続する。bind されたqueue全てに配送を行う。

・topic
direct と fanout を合わせて、binding key にワイルドカードが指定できる。

・headers Headers exchange that matches header fields against the binding keys
binding keyに対してヘッダーがマッチする配送を行う。

・xml
XQueryを利用したメッセージのフィルタリングを行う。


以下では基本的な direct fanout topic の例を紹介する。


fanout

まずは最もわかりやすいfanoutから。

exchange/queueの作成
$ qpid-config add exchange fanout fanout-ex
$ qpid-config add queue queue1
$ qpid-config add queue queue2
$ qpid-config add queue queue3
$ qpid-config add queue queue4
$ qpid-config add queue queue5

exchange/queueのbinding
$ qpid-config bind fanout-ex queue1
$ qpid-config bind fanout-ex queue2
$ qpid-config bind fanout-ex queue3
$ qpid-config bind fanout-ex queue4
$ qpid-config bind fanout-ex queue5

$ qpid-config -r exchanges fanout
Exchange 'amq.fanout' (fanout)
Exchange 'fanout-ex' (fanout)
    bind [] => queue1
    bind [] => queue2
    bind [] => queue3
    bind [] => queue4
    bind [] => queue5


メッセージの配信
$ ./spout fanout-ex msg1111
Message(properties={'spout-id': '08bc6430-dc8c-4d47-9407-20010b1f406e:0'}, content='msg1111')

メッセージの受信
$ ./drain queue1
Message(properties={u'spout-id': u'08bc6430-dc8c-4d47-9407-20010b1f406e:0'}, content='msg1111')

$ ./drain queue2
Message(properties={u'spout-id': u'08bc6430-dc8c-4d47-9407-20010b1f406e:0'}, content='msg1111')

$ ./drain queue3
Message(properties={u'spout-id': u'08bc6430-dc8c-4d47-9407-20010b1f406e:0'}, content='msg1111')

$ ./drain queue4
Message(properties={u'spout-id': u'08bc6430-dc8c-4d47-9407-20010b1f406e:0'}, content='msg1111')

$ ./drain queue5
Message(properties={u'spout-id': u'08bc6430-dc8c-4d47-9407-20010b1f406e:0'}, content='msg1111')

このようにbindされたqueue全てでメッセージが受け取れるようになる。


direct

direct は メッセージに設定されたrouting key = binding key となる配送を行う。1:1 配送が基本の考え方だが、複数のbindを与えてやることで1:Nも可能。

exchageの作成
$ qpid-config add exchange direct direct-ex

$ qpid-config bind direct-ex queue1 binding_key_1000
$ qpid-config bind direct-ex queue2 binding_key_2000
$ qpid-config bind direct-ex queue3 binding_key_3000
$ qpid-config bind direct-ex queue4 binding_key_3000
$ qpid-config bind direct-ex queue5 binding_key_3000

$ qpid-config -r exchanges direct
Exchange 'amq.direct' (direct)
Exchange 'direct-ex' (direct)
    bind [binding_key_1000] => queue1
    bind [binding_key_2000] => queue2
    bind [binding_key_3000] => queue3
    bind [binding_key_3000] => queue4
    bind [binding_key_3000] => queue5
Exchange 'qmf.default.direct' (direct)

メッセージの配送
$ ./spout -S binding_key_1000 direct-ex msg1000
Message(subject='binding_key_1000', properties={'qpid.subject': 'binding_key_1000', 'spout-id': '7ae5b0c5-41a5-40e2-ac3b-02a77b2c2636:0'}, content='msg1000')

$ ./spout -S binding_key_2000 direct-ex msg2000
Message(subject='binding_key_2000', properties={'qpid.subject': 'binding_key_2000', 'spout-id': 'e36f0d34-c0ce-4636-be88-ec01963816a7:0'}, content='msg2000')

$ ./spout -S binding_key_3000 direct-ex msg3000
Message(subject='binding_key_3000', properties={'qpid.subject': 'binding_key_3000', 'spout-id': '1721fb7c-40d4-46d0-830c-bb13e31560cf:0'}, content='msg3000')

メッセージの受信
$ ./drain queue1
Message(subject=u'binding_key_1000', properties={u'qpid.subject': u'binding_key_1000', 'x-amqp-0-10.routing-key': u'binding_key_1000', u'spout-id': u'7ae5b0c5-41a5-40e2-ac3b-02a77b2c2636:0'}, content='msg1000')

$ ./drain queue2
Message(subject=u'binding_key_2000', properties={u'qpid.subject': u'binding_key_2000', 'x-amqp-0-10.routing-key': u'binding_key_2000', u'spout-id': u'e36f0d34-c0ce-4636-be88-ec01963816a7:0'}, content='msg2000')

$ ./drain queue3
Message(subject=u'binding_key_3000', properties={u'qpid.subject': u'binding_key_3000', 'x-amqp-0-10.routing-key': u'binding_key_3000', u'spout-id': u'1721fb7c-40d4-46d0-830c-bb13e31560cf:0'}, content='msg3000')

$ ./drain queue4
Message(subject=u'binding_key_3000', properties={u'qpid.subject': u'binding_key_3000', 'x-amqp-0-10.routing-key': u'binding_key_3000', u'spout-id': u'1721fb7c-40d4-46d0-830c-bb13e31560cf:0'}, content='msg3000')

$ ./drain queue5
Message(subject=u'binding_key_3000', properties={u'qpid.subject': u'binding_key_3000', 'x-amqp-0-10.routing-key': u'binding_key_3000', u'spout-id': u'1721fb7c-40d4-46d0-830c-bb13e31560cf:0'}, content='msg3000')


サブジェクトに指定されたキーがbindingキーとして使われる。そのためこのようにも指定できる。

$ ./spout direct-ex/binding_key_1000 msg1010
Message(properties={'qpid.subject': 'binding_key_1000', 'spout-id': '8e50dea5-f93f-41f7-9432-54d5b72657ad:0'}, content='msg1010')

$ ./spout direct-ex/binding_key_1000.direct-ex/binding_key_1000.binding_key_2000 msg1020
Message(properties={'qpid.subject': 'binding_key_1000.direct-ex/binding_key_1000.binding_key_2000', 'spout-id': '7fca2c8a-e0a3-4026-878c-e0e3fa663a5e:0'}, content='msg1020')


$ ./drain queue1
Message(subject=u'binding_key_1000', properties={u'qpid.subject': u'binding_key_1000', 'x-amqp-0-10.routing-key': u'binding_key_1000', u'spout-id': u'8e50dea5-f93f-41f7-9432-54d5b72657ad:0'}, content='msg1010')

$ ./drain queue2
$ ./drain queue3
$ ./drain queue4
$ ./drain queue5

2つ目に指定している ./spout direct-ex/binding_key_1000.direct-ex/binding_key_1000.binding_key_2000 は2つのサブジェクトを設定しているが、directの場合はこれではマッチしない。こういった場合にマッチさせるには次の topic を使う。


topic

ワイルドカードマッチを使った配送を行いたい場合は topic を利用する。

$ qpid-config add exchange topic topic-ex

$ qpid-config bind topic-ex queue1 'news'
$ qpid-config bind topic-ex queue2 '#.news'
$ qpid-config bind topic-ex queue3 '*.news'
$ qpid-config bind topic-ex queue4 '#.asia.news'
$ qpid-config bind topic-ex queue5 '*.asia.news'

$ qpid-config -r exchanges topic-ex
Exchange 'topic-ex' (topic)
    bind [news] => queue1
    bind [#.news] => queue2
    bind [*.news] => queue3
    bind [#.asia.news] => queue4
    bind [*.asia.news] => queue5

メッセージの配送
$ ./spout topic-ex/news msg1000
Message(properties={'qpid.subject': 'news', 'spout-id': 'ba583ea3-1513-41a7-ba7f-3e5f9ad319c8:0'}, content='msg1000')

$ ./spout topic-ex/asia.news msg2000
Message(properties={'qpid.subject': 'asia.news', 'spout-id': '53760a6a-541a-4e9b-8dd1-99033177938f:0'}, content='msg2000')

$ ./spout topic-ex/euro.news msg3000
Message(properties={'qpid.subject': 'euro.news', 'spout-id': '19176e2a-e047-476b-ae26-2570bdc9f343:0'}, content='msg3000')

$ ./spout topic-ex/japan.asia.news msg4000
Message(properties={'qpid.subject': 'japan.asia.news', 'spout-id': '9c33afbb-3753-415d-bd42-3a7a08499150:0'}, content='msg4000')

$ ./spout topic-ex/japan.euro.news msg5000
Message(properties={'qpid.subject': 'japan.euro.news', 'spout-id': '585df0ec-0566-4da8-b7ea-4714a91412b9:0'}, content='msg5000')

$ ./spout topic-ex/tokyo.japan.asia.news msg6000
Message(properties={'qpid.subject': 'tokyo.japan.asia.news', 'spout-id': '23e408a4-016c-4365-a35e-787ec6ebdc63:0'}, content='msg6000')

$ ./spout topic-ex/akiba.tokyo.japan.asia.news msg7000
Message(properties={'qpid.subject': 'akiba.tokyo.japan.asia.news', 'spout-id': 'd27646c5-b5ce-4c44-99af-0565a387b567:0'}, content='msg7000')


メッセージの受信
$ ./drain queue1
Message(subject=u'news', properties={u'qpid.subject': u'news', 'x-amqp-0-10.routing-key': u'news', u'spout-id': u'ba583ea3-1513-41a7-ba7f-3e5f9ad319c8:0'}, content='msg1000')

$ ./drain queue2
Message(subject=u'news', properties={u'qpid.subject': u'news', 'x-amqp-0-10.routing-key': u'news', u'spout-id': u'ba583ea3-1513-41a7-ba7f-3e5f9ad319c8:0'}, content='msg1000')
Message(subject=u'asia.news', properties={u'qpid.subject': u'asia.news', 'x-amqp-0-10.routing-key': u'asia.news', u'spout-id': u'53760a6a-541a-4e9b-8dd1-99033177938f:0'}, content='msg2000')
Message(subject=u'euro.news', properties={u'qpid.subject': u'euro.news', 'x-amqp-0-10.routing-key': u'euro.news', u'spout-id': u'19176e2a-e047-476b-ae26-2570bdc9f343:0'}, content='msg3000')
Message(subject=u'japan.asia.news', properties={u'qpid.subject': u'japan.asia.news', 'x-amqp-0-10.routing-key': u'japan.asia.news', u'spout-id': u'9c33afbb-3753-415d-bd42-3a7a08499150:0'}, content='msg4000')
Message(subject=u'japan.euro.news', properties={u'qpid.subject': u'japan.euro.news', 'x-amqp-0-10.routing-key': u'japan.euro.news', u'spout-id': u'585df0ec-0566-4da8-b7ea-4714a91412b9:0'}, content='msg5000')
Message(subject=u'tokyo.japan.asia.news', properties={u'qpid.subject': u'tokyo.japan.asia.news', 'x-amqp-0-10.routing-key': u'tokyo.japan.asia.news', u'spout-id': u'23e408a4-016c-4365-a35e-787ec6ebdc63:0'}, content='msg6000')
Message(subject=u'akiba.tokyo.japan.asia.news', properties={u'qpid.subject': u'akiba.tokyo.japan.asia.news', 'x-amqp-0-10.routing-key': u'akiba.tokyo.japan.asia.news', u'spout-id': u'd27646c5-b5ce-4c44-99af-0565a387b567:0'}, content='msg7000')

$ ./drain queue3
Message(subject=u'asia.news', properties={u'qpid.subject': u'asia.news', 'x-amqp-0-10.routing-key': u'asia.news', u'spout-id': u'53760a6a-541a-4e9b-8dd1-99033177938f:0'}, content='msg2000')
Message(subject=u'euro.news', properties={u'qpid.subject': u'euro.news', 'x-amqp-0-10.routing-key': u'euro.news', u'spout-id': u'19176e2a-e047-476b-ae26-2570bdc9f343:0'}, content='msg3000')

$ ./drain queue4
Message(subject=u'asia.news', properties={u'qpid.subject': u'asia.news', 'x-amqp-0-10.routing-key': u'asia.news', u'spout-id': u'53760a6a-541a-4e9b-8dd1-99033177938f:0'}, content='msg2000')
Message(subject=u'japan.asia.news', properties={u'qpid.subject': u'japan.asia.news', 'x-amqp-0-10.routing-key': u'japan.asia.news', u'spout-id': u'9c33afbb-3753-415d-bd42-3a7a08499150:0'}, content='msg4000')
Message(subject=u'tokyo.japan.asia.news', properties={u'qpid.subject': u'tokyo.japan.asia.news', 'x-amqp-0-10.routing-key': u'tokyo.japan.asia.news', u'spout-id': u'23e408a4-016c-4365-a35e-787ec6ebdc63:0'}, content='msg6000')
Message(subject=u'akiba.tokyo.japan.asia.news', properties={u'qpid.subject': u'akiba.tokyo.japan.asia.news', 'x-amqp-0-10.routing-key': u'akiba.tokyo.japan.asia.news', u'spout-id': u'd27646c5-b5ce-4c44-99af-0565a387b567:0'}, content='msg7000')

$ ./drain queue5
Message(subject=u'japan.asia.news', properties={u'qpid.subject': u'japan.asia.news', 'x-amqp-0-10.routing-key': u'japan.asia.news', u'spout-id': u'9c33afbb-3753-415d-bd42-3a7a08499150:0'}, content='msg4000')

組み合わせ

exchage:bind:queueの関係は L:M:N で定義できる。
上のサンプルを実行するとキューに対して複数のbindが定義されていることが確認できる。

$ qpid-config -r queues queue
Queue 'queue1'
    bind [queue1] => ''
    bind [binding_key_1000] => direct-ex
    bind [] => fanout-queue
    bind [news] => topic-ex
Queue 'queue2'
    bind [queue2] => ''
    bind [binding_key_2000] => direct-ex
    bind [] => fanout-queue
    bind [#.news] => topic-ex
Queue 'queue3'
    bind [queue3] => ''
    bind [binding_key_3000] => direct-ex
    bind [] => fanout-queue
    bind [*.news] => topic-ex
Queue 'queue4'
    bind [queue4] => ''
    bind [binding_key_3000] => direct-ex
    bind [] => fanout-queue
    bind [#.asia.news] => topic-ex
Queue 'queue5'
    bind [queue5] => ''
    bind [binding_key_3000] => direct-ex
    bind [] => fanout-queue
    bind [*.asia.news] => topic-ex


★メッセージの配送設計は非常に重要。

0 件のコメント:

コメントを投稿