2012年8月19日日曜日

apache qpid 基本的なメッセージ操作


このエントリーをはてなブックマークに追加


apache qpid 概要と初期設定 の続き。


サンプルプログラムの準備

まずサンプルを実行するために以下のパッケージインストールする。

$ sudo yum install -y python-qpid

サンプルコードは以下にインストールされる。

$ tree /usr/share/doc/python-qpid-0.16/
/usr/share/doc/python-qpid-0.16/
├── LICENSE.txt
├── NOTICE.txt
├── README.txt
└── examples
    ├── README.txt
    ├── api
    │   ├── drain
    │   ├── hello
    │   ├── hello_xml
    │   ├── server
    │   └── spout
    └── reservations
        ├── common.py
        ├── inventory
        ├── machine-agent
        └── reserve


サンプルの実行

キューを追加する。

$ qpid-config add queue hello-world
$ qpid-config
Total Exchanges: 8
          topic: 3
        headers: 1
         fanout: 1
         direct: 3

   Total Queues: 2
        durable: 0
    non-durable: 2

$ qpid-config queues
Queue Name                                Attributes
=================================================================
0098efa6-acea-4a23-acd7-28f41d0a9c46:0.0  auto-del excl
hello-world


作成したキューにメッセージを送付する。exapmes/api配下のspoutを利用する。

$ cd /usr/share/doc/python-qpid-0.16/examples/api
$ ./spout -h
Usage: spout [options] ADDRESS [ CONTENT ... ]

Send messages to the supplied address.

Options:
  -h, --help            show this help message and exit
  -b BROKER, --broker=BROKER
                        connect to specified BROKER (default localhost)
  -r, --reconnect       enable auto reconnect
  -i RECONNECT_INTERVAL, --reconnect-interval=RECONNECT_INTERVAL
                        interval between reconnect attempts
  -l RECONNECT_LIMIT, --reconnect-limit=RECONNECT_LIMIT
                        maximum number of reconnect attempts
  -c COUNT, --count=COUNT
                        stop after count messages have been sent, zero
                        disables (default 1)
  -t TIMEOUT, --timeout=TIMEOUT
                        exit after the specified time
  -I ID, --id=ID        use the supplied id instead of generating one
  -S SUBJECT, --subject=SUBJECT
                        specify a subject
  -R REPLY_TO, --reply-to=REPLY_TO
                        specify reply-to address
  -P NAME=VALUE, --property=NAME=VALUE
                        specify message property
  -M KEY=VALUE, --map=KEY=VALUE
                        specify map entry for message body
  -v                    enable logging

$ ./spout -I testid -S testsubject -P NAME=testname -M KEY=testkey hello-world
Message(subject='testsubject', properties={'qpid.subject': 'testsubject', 'NAME': 'testname', 'spout-id': 'testid:0'}, content={'KEY': 'testkey'})

メッセージを取り出すには、同一ディレクトリ内のdrainを使用する。

$ ./drain -h
Usage: drain [options] ADDRESS ...

Drain messages from the supplied address.

Options:
  -h, --help            show this help message and exit
  -b BROKER, --broker=BROKER
                        connect to specified BROKER (default localhost)
  -c COUNT, --count=COUNT
                        number of messages to drain
  -f, --forever         ignore timeout and wait forever
  -r, --reconnect       enable auto reconnect
  -i RECONNECT_INTERVAL, --reconnect-interval=RECONNECT_INTERVAL
                        interval between reconnect attempts
  -l RECONNECT_LIMIT, --reconnect-limit=RECONNECT_LIMIT
                        maximum number of reconnect attempts
  -t TIMEOUT, --timeout=TIMEOUT
                        timeout in seconds to wait before exiting (default 0)
  -p FORMAT, --print=FORMAT
                        format string for printing messages (default %(M)s)
  -v                    enable logging

$ ./drain hello-world
Message(subject=u'testsubject', properties={u'qpid.subject': u'testsubject', 'x-amqp-0-10.routing-key': u'hello-world', u'NAME': u'testname', u'spout-id': u'testid:0'}, content={u'KEY': u'testkey'})


もう一度実行してもキューが空なので何も取得できない。
$ ./drain hello-world
none



複数のメッセージを投入
$ for i in {1..100}; do ./spout -I testid$i -S testsubject$i -P NAME=testname$i -M KEY=testkey$i hello-world; done

メッセージの取り出し
$ ./drain hello-world
  ・
  ・
  ・
Message(subject=u'testsubject95', properties={u'qpid.subject': u'testsubject95', 'x-amqp-0-10.routing-key': u'hello-world', u'NAME': u'testname95', u'spout-id': u'testid95:0'}, content={u'KEY': u'testkey95'})
Message(subject=u'testsubject96', properties={u'qpid.subject': u'testsubject96', 'x-amqp-0-10.routing-key': u'hello-world', u'NAME': u'testname96', u'spout-id': u'testid96:0'}, content={u'KEY': u'testkey96'})
Message(subject=u'testsubject97', properties={u'qpid.subject': u'testsubject97', 'x-amqp-0-10.routing-key': u'hello-world', u'NAME': u'testname97', u'spout-id': u'testid97:0'}, content={u'KEY': u'testkey97'})
Message(subject=u'testsubject98', properties={u'qpid.subject': u'testsubject98', 'x-amqp-0-10.routing-key': u'hello-world', u'NAME': u'testname98', u'spout-id': u'testid98:0'}, content={u'KEY': u'testkey98'})
Message(subject=u'testsubject99', properties={u'qpid.subject': u'testsubject99', 'x-amqp-0-10.routing-key': u'hello-world', u'NAME': u'testname99', u'spout-id': u'testid99:0'}, content={u'KEY': u'testkey99'})
Message(subject=u'testsubject100', properties={u'qpid.subject': u'testsubject100', 'x-amqp-0-10.routing-key': u'hello-world', u'NAME': u'testname100', u'spout-id': u'testid100:0'}, content={u'KEY': u'testkey100'})

これが最も基本的な操作となる。


exchangeを使ったメッセージ

前のサンプルで使ったキューを削除する。
$ qpid-config del queue hello-world
$ qpid-config
Total Exchanges: 8
          topic: 3
        headers: 1
         fanout: 1
         direct: 3

   Total Queues: 1
        durable: 0
    non-durable: 1


exchangeの追加
$ qpid-config add exchange topic hello-world
$ qpid-config
Total Exchanges: 9
          topic: 4
        headers: 1
         fanout: 1
         direct: 3

   Total Queues: 1
        durable: 0
    non-durable: 1

$ qpid-config exchanges
Type      Exchange Name       Attributes
==================================================
direct                       
direct    amq.direct          --durable
fanout    amq.fanout          --durable
headers   amq.match           --durable
topic     amq.topic           --durable
topic     hello-world        
direct    qmf.default.direct 
topic     qmf.default.topic  
topic     qpid.management    
[openstack@openstack api]$ qpid-config queues
Queue Name                                Attributes
=================================================================
6e367cc2-e89b-4cc2-a741-8e635a555ccf:0.0  auto-del excl 

$ ./spout hello-world
Message(properties={'spout-id': '596d5b81-0d23-46e4-a55a-338d2308d75c:0'})

$ ./drain hello-world
$ ./drain hello-world
$ ./drain hello-world
→ 何も取得できない。

これはexchangeを作成する際に「topic」というタイプを選んでいるため。
topic exchange は接続されたクライアントへ即座にメッセージを配信し、配信できない場合は破棄する。

ターミナルを別に開き、以下のコマンドを実行する。これは60秒間メッセージを待ち受ける状態になる。
terminal2$ ./drain -t 60 hello-world

メッセージの送信
terminal1$ ./spout hello-world
Message(properties={'spout-id': 'd4796b9d-1db4-4e55-8571-de873f871b89:0'})

drain側が受信する。
terminal2$
Message(properties={u'spout-id': u'd4796b9d-1db4-4e55-8571-de873f871b89:0'})

更に別のターミナルを開いてdrainからqpidへ接続する。
terminal3$ ./drain -t 120 hello-world

メッセージの送信
$ ./spout hello-world
Message(properties={'spout-id': 'd039a3ed-3c37-4a33-a230-27d21a479a66:0'})

terminal2$
Message(properties={u'spout-id': u'd039a3ed-3c37-4a33-a230-27d21a479a66:0'})

terminal3$
Message(properties={u'spout-id': u'd039a3ed-3c37-4a33-a230-27d21a479a66:0'})





0 件のコメント:

コメントを投稿