Last month I showed how to send a message using the
C++ client in Apache Qpid. This article looks at the
flip side of that example: how to receive a
message.
Apache Qpid offers client APIs in a
number of languages, but I work more with the C++
version, so will use that language for this
example.
To review the groundwork from last
month, the players in AMQP that we're concerned with
are:
- Client: an entity that sends and receives
messages. The example in this article is a client
program.
- Exchange: destination for messages that a client
sends; there are a number of different exchange types
which provide various message distribution mechanisms
such as direct, fan-out, and publish-subscribe
(pub-sub).
- Queue: holds messages that a client will
receive
- Broker: the traffic cop; manages exchanges,
queues, and the routing between them
- Connection: a data connection to the broker
- Session: an abstraction for managing a set of
message transfers; Sessions operate over
Connections.
The message-sending example
sends a message to a direct exchange with a routing key.
The broker uses the routing key to route the message
from the exchange to any queues that are bound to the
exchange using that key. Therefore, this
message-receiving example will create a queue and bind
it with the same routing key the sending example used.
That will cause a copy of the message to be routed to
the new queue where the example will retrieve
it.
Ok, let's get into it... first, the header
files we need:
#include
<qpid/client/Connection.h>#include
<qpid/client/Session.h>#include
<qpid/client/Message.h>As with the
sending example, we need a connection to the broker and
a session to use. We'll assume there's a broker on the
local system and that it's listening on TCP port 5672
(the default for Qpid). After setting up the session,
this example does the following:
- Declare a queue (my_queue) that will receive the
messages this program is interested in.
- Bind the new queue to the amq.direct exchange
using the routing key that the message sender used
(my_key).
- Use a SubscriptionManager and LocalQueue to manage
receiving messages from the new queue. There are a
number of ways to receive messages, but this one is
simple and needs only the single, main
thread.
qpid::client::Connection
connection;try
{
connection.open("localhost", 5672);
qpid::client::Session session =
connection.newSession();
session.queueDeclare(arg::queue="my_queue");
session.exchangeBind(arg::exchange="amq.direct",
arg::queue="my_queue",
arg::bindingKey="my_key");
SubscriptionManager
subscriptions(session);
LocalQueue
local_queue;
subscriptions.subscribe(local_queue,
string("my_queue"));
Message
message;
local_queue.get(message,
10000);
std::cout << message.getData()
<< std::endl;
connection.close();
return 0;}
catch(const std::exception& error) {
std::cout << error.what() <<
std::endl;
return 1;}That's
it. Again, this is a simple case and there are many
options and alternatives. One particular thing to note
in the above example is that the Message::getData()
method returns a reference to a std::string. It is easy
to assume that this means the message is text. This
would be a bad assumption in general. A std::string can
contain any set of bytes, text or not. What those bytes
are is up to the application designers.
Keep an
eye on my
blog for more how-to article on Qpid (and
ACE). Is there something particular you want to see an
example of? Let me
know!