In this section we expand on the weather monitoring example from Section 41.2, demonstrating how to create, subscribe to and publish messages on a topic. We use the following Slice definitions in our example:
struct Measurement {
string tower; // tower id
float windSpeed; // knots
short windDirection; // degrees
float temperature; // degrees Celsius
};
interface Monitor {
void report(Measurement m);
};
Monitor is our topic interface. For the sake of simplicity, it defines just one operation,
report, taking a
Measurement struct as its only parameter.
1. Obtain a proxy for the TopicManager. This is the primary IceStorm object, used by both publishers and subscribers.
2.
Obtain a proxy for the Weather topic, either by creating the topic if it does not exist, or retrieving the proxy for the existing topic.
3.
Obtain a proxy for the Weather topic’s "publisher object." This proxy is provided for the purpose of publishing messages, and therefore is narrowed to the topic interface (
Monitor).
As usual, our C++ example begins by including the necessary header files. The interesting ones are
IceStorm/IceStorm.h, which is generated from the IceStorm Slice definitions, and
Monitor.h, containing the generated code for our monitor definitions shown above.
#include <Ice/Ice.h>
#include <IceStorm/IceStorm.h>
#include <Monitor.h>
int main(int argc, char* argv[])
{
...
Ice::ObjectPrx obj = communicator‑>stringToProxy(
"IceStorm/TopicManager:tcp ‑p 9999");
IceStorm::TopicManagerPrx topicManager =
IceStorm::TopicManagerPrx::checkedCast(obj);
IceStorm::TopicPrx topic;
try {
topic = topicManager‑>retrieve("Weather");
}
catch (const IceStorm::NoSuchTopic&) {
topic = topicManager‑>create("Weather");
}
Ice::ObjectPrx pub = topic‑>getPublisher()‑>ice_oneway();
MonitorPrx monitor = MonitorPrx::uncheckedCast(pub);
while (true) {
Measurement m = getMeasurement();
monitor‑>report(m);
}
...
}
Note that this example assumes that IceStorm uses the instance name IceStorm. The actual instance name may differ, and you need to use it as the category when calling
stringToProxy (see
page 1705).
After obtaining a proxy for the topic manager, the collector attempts to retrieve the topic. If the topic does not exist yet, the collector receives a
NoSuchTopic exception and then creates the topic.
IceStorm::TopicPrx topic;
try {
topic = topicManager‑>retrieve("Weather");
}
catch (const IceStorm::NoSuchTopic&) {
topic = topicManager‑>create("Weather");
}
The next step is obtaining a proxy for the publisher object, which the collector narrows to the
Monitor interface. (We create a oneway proxy for the publisher purely for efficiency reasons.)
Ice::ObjectPrx pub = topic‑>getPublisher()‑>ice_oneway();
MonitorPrx monitor = MonitorPrx::uncheckedCast(pub);
while (true) {
Measurement m = getMeasurement();
monitor‑>report(m);
}
public static void main(String[] args)
{
...
Ice.ObjectPrx obj = communicator.stringToProxy(
"IceStorm/TopicManager:tcp ‑p 9999");
IceStorm.TopicManagerPrx topicManager =
IceStorm.TopicManagerPrxHelper.checkedCast(obj);
IceStorm.TopicPrx topic = null;
try {
topic = topicManager.retrieve("Weather");
}
catch (IceStorm.NoSuchTopic ex) {
topic = topicManager.create("Weather");
}
Ice.ObjectPrx pub = topic.getPublisher().ice_oneway();
MonitorPrx monitor = MonitorPrxHelper.uncheckedCast(pub);
while (true) {
Measurement m = getMeasurement();
monitor.report(m);
}
...
}
Note that this example assumes that IceStorm uses the instance name IceStorm. The actual instance name may differ, and you need to use it as the category when calling
stringToProxy (see
page 1705).
After obtaining a proxy for the topic manager, the collector attempts to retrieve the topic. If the topic does not exist yet, the collector receives a
NoSuchTopic exception and then creates the topic.
IceStorm.TopicPrx topic = null;
try {
topic = topicManager.retrieve("Weather");
}
catch (IceStorm.NoSuchTopic ex) {
topic = topicManager.create("Weather");
}
Ice.ObjectPrx pub = topic.getPublisher().ice_oneway();
MonitorPrx monitor = MonitorPrxHelper.uncheckedCast(pub);
while (true) {
Measurement m = getMeasurement();
monitor.report(m);
}
Each topic creates a publisher object for the express purpose of publishing messages. It is a special object in that it implements an Ice interface that allows the object to receive and forward requests (i.e., IceStorm messages) without requiring knowledge of the operation types.
From the publisher’s perspective, the publisher object appears to be an application-specific type. In reality, the publisher object can forward requests for any type, and that introduces a degree of risk: a misbehaving publisher can use
uncheckedCast to narrow the publisher object to any type and invoke any operation; the publisher object unknowingly forwards those requests to the subscribers.
If a publisher sends a request using an incorrect type, the Ice run time in a subscriber typically responds by raising
OperationNotExistException. However, since the subscriber receives its messages as oneway invocations, no response can be sent to the publisher object to indicate this failure, and therefore neither the publisher nor the subscriber is aware of the type-mismatch problem. In short, IceStorm places the burden on the developer to ensure that publishers and subscribers are using it correctly.
IceStorm messages have oneway semantics (see Section 41.3.3), but publishers may use either oneway or twoway invocations when sending messages to the publisher object. Each invocation style has advantages and disadvantages that you should consider when deciding which one to use. The differences between the invocation styles affect a publisher in four ways:
Each publisher can select its own transport for message delivery, therefore the transport used by a publisher to communicate with IceStorm has no effect on how IceStorm delivers messages to its subscribers.
For example, a publisher can use a UDP transport if the possibility of lost messages is acceptable (and if IceStorm provides a UDP endpoint to publishers). However, the TCP or SSL transports are generally recommended for IceStorm’s publisher endpoint in order to ensure that published messages are delivered reliably to IceStorm, even if they may not be delivered reliably to some subscribers.
A request context is an optional argument of all remote invocations (see Section 28.11). If a publisher supplies a request context when publishing a message, IceStorm will forward it intact to subscribers.
Services such as Glacier2 employ request contexts to provide applications with more control over the service’s behavior. For example, if a publisher knows that IceStorm is delivering messages to subscribers via a Glacier2 router, the publisher can influence Glacier2’s behavior by including a request context, as shown in the following C++ example:
Ice::ObjectPrx pub = topic‑>getPublisher();
Ice::Context ctx;
ctx["_fwd"] = "Oz";
MonitorPrx monitor =
MonitorPrx::uncheckedCast(pub‑>ice_context(ctx));
The _fwd context key, when encountered by Glacier2, causes the router to forward the request using compressed batch oneway messages. The
ice_context method is used to obtain a proxy that includes the Glacier2 request context in every invocation, eliminating the need for the publisher to specify it explicitly. See
Section 39.9 for more information on Glacier2’s use of request contexts.
1. Obtain a proxy for the TopicManager. This is the primary IceStorm object, used by both publishers and subscribers.
3.
Instantiate the Monitor servant and activate it with the object adapter.
5.
Process report messages until shutdown.
Our C++ monitor implementation begins by including the necessary header files. The interesting ones are
IceStorm/IceStorm.h, which is generated from the IceStorm Slice definitions, and
Monitor.h, containing the generated code for our monitor definitions shown at the beginning of
Section 41.2.
#include <Ice/Ice.h>
#include <IceStorm/IceStorm.h>
#include <Monitor.h>
using namespace std;
class MonitorI : virtual public Monitor {
public:
virtual void report(const Measurement& m,
const Ice::Current&) {
cout << "Measurement report:" << endl
<< " Tower: " << m.tower << endl
<< " W Spd: " << m.windSpeed << endl
<< " W Dir: " << m.windDirection << endl
<< " Temp: " << m.temperature << endl
<< endl;
}
};
int main(int argc, char* argv[])
{
...
Ice::ObjectPrx obj = communicator‑>stringToProxy(
"IceStorm/TopicManager:tcp ‑p 9999");
IceStorm::TopicManagerPrx topicManager =
IceStorm::TopicManagerPrx::checkedCast(obj);
Ice::ObjectAdapterPtr adapter =
communicator‑>createObjectAdapter("MonitorAdapter");
MonitorPtr monitor = new MonitorI;
Ice::ObjectPrx proxy = adapter‑>
addWithUUID(monitor)‑>ice_oneway();
IceStorm::TopicPrx topic;
try {
topic = topicManager‑>retrieve("Weather");
IceStorm::QoS qos;
topic‑>subscribeAndGetPublisher(qos, proxy);
}
catch (const IceStorm::NoSuchTopic&) {
// Error! No topic found!
...
}
adapter‑>activate();
communicator‑>waitForShutdown();
topic‑>unsubscribe(proxy);
...
}
Our implementation of the Monitor servant is currently quite simple. A real implementation might update a graphical display, or incorporate the measurements into an ongoing calculation.
class MonitorI : virtual public Monitor {
public:
virtual void report(const Measurement& m,
const Ice::Current&) {
cout << "Measurement report:" << endl
<< " Tower: " << m.tower << endl
<< " W Spd: " << m.windSpeed << endl
<< " W Dir: " << m.windDirection << endl
<< " Temp: " << m.temperature << endl
<< endl;
}
};
Ice::ObjectAdapterPtr adapter =
communicator‑>createObjectAdapter("MonitorAdapter");
MonitorPtr monitor = new MonitorI;
Ice::ObjectPrx proxy =
adapter‑>addWithUUID(monitor)‑>ice_oneway();
Note that the code creates a oneway proxy for the Monitor servant. This is for efficiency reasons: by subscribing with a oneway proxy, IceStorm will deliver events to the subscriber via oneway messages, instead of via twoway messages.
IceStorm::TopicPrx topic;
try {
topic = topicManager‑>retrieve("Weather");
IceStorm::QoS qos;
topic‑>subscribeAndGetPublisher(qos, proxy);
}
catch (const IceStorm::NoSuchTopic&) {
// Error! No topic found!
...
}
adapter‑>activate();
communicator‑>waitForShutdown();
topic‑>unsubscribe(proxy);
class MonitorI extends _MonitorDisp {
public void report(Measurement m, Ice.Current curr) {
System.out.println(
"Measurement report:\n" +
" Tower: " + m.tower + "\n" +
" W Spd: " + m.windSpeed + "\n" +
" W Dir: " + m.windDirection + "\n" +
" Temp: " + m.temperature + "\n");
}
}
public static void main(String[] args)
{
...
Ice.ObjectPrx obj = communicator.stringToProxy(
"IceStorm/TopicManager:tcp ‑p 9999");
IceStorm.TopicManagerPrx topicManager =
IceStorm.TopicManagerPrxHelper.checkedCast(obj);
Ice.ObjectAdapterPtr adapter =
communicator.createObjectAdapter("MonitorAdapter");
Monitor monitor = new MonitorI();
Ice.ObjectPrx proxy =
adapter.addWithUUID(monitor).ice_oneway();
IceStorm.TopicPrx topic = null;
try {
topic = topicManager.retrieve("Weather");
java.util.Map qos = null;
topic.subscribeAndGetPublisher(qos, proxy);
}
catch (IceStorm.NoSuchTopic ex) {
// Error! No topic found!
...
}
adapter.activate();
communicator.waitForShutdown();
topic.unsubscribe(proxy);
...
}
Our implementation of the Monitor servant is currently quite simple. A real implementation might update a graphical display, or incorporate the measurements into an ongoing calculation.
class MonitorI extends _MonitorDisp {
public void report(Measurement m, Ice.Current curr) {
System.out.println(
"Measurement report:\n" +
" Tower: " + m.tower + "\n" +
" W Spd: " + m.windSpeed + "\n" +
" W Dir: " + m.windDirection + "\n" +
" Temp: " + m.temperature + "\n");
}
}
Monitor monitor = new MonitorI();
Ice.ObjectPrx proxy =
adapter.addWithUUID(monitor).ice_oneway();
Note that the code creates a oneway proxy for the Monitor servant. This is for efficiency reasons: by subscribing with a oneway proxy, IceStorm will deliver events to the subscriber via oneway messages, instead of via twoway messages.
IceStorm.TopicPrx topic = null;
try {
topic = topicManager.retrieve("Weather");
java.util.Map qos = null;
topic.subscribeAndGetPublisher(qos, proxy);
}
catch (IceStorm.NoSuchTopic ex) {
// Error! No topic found!
...
}
adapter.activate();
communicator.waitForShutdown();
topic.unsubscribe(proxy);