Table of Contents Previous Next
Logo
IceStorm : 41.5 Using IceStorm
Copyright © 2003-2008 ZeroC, Inc.

41.5 Using IceStorm

In this section we expand on the weather monitoring example from Section 41.2, demonstrating how to create, subscribe to and publish messages on a topic. We use the following Slice definitions in our example:
struct Measurement {
    string tower; // tower id
    float windSpeed; // knots
    short windDirection; // degrees
    float temperature; // degrees Celsius
};

interface Monitor {
    void report(Measurement m);
};
Monitor is our topic interface. For the sake of simplicity, it defines just one operation, report, taking a Measurement struct as its only parameter.

41.5.1 Implementing a Publisher

The implementation of our collector application can be summarized easily:
1. Obtain a proxy for the TopicManager. This is the primary IceStorm object, used by both publishers and subscribers.
2. Obtain a proxy for the Weather topic, either by creating the topic if it does not exist, or retrieving the proxy for the existing topic.
3. Obtain a proxy for the Weather topic’s "publisher object." This proxy is provided for the purpose of publishing messages, and therefore is narrowed to the topic interface (Monitor).
4. Collect and report measurements.
In the sections below, we present collector implementations in C++ and Java.

C++ Example

As usual, our C++ example begins by including the necessary header files. The interesting ones are IceStorm/IceStorm.h, which is generated from the IceStorm Slice definitions, and Monitor.h, containing the generated code for our monitor definitions shown above.
#include <Ice/Ice.h>
#include <IceStorm/IceStorm.h>
#include <Monitor.h>

int main(int argc, char* argv[])
{
    ...
    Ice::ObjectPrx obj = communicator>stringToProxy(
        "IceStorm/TopicManager:tcp p 9999");
    IceStorm::TopicManagerPrx topicManager =
        IceStorm::TopicManagerPrx::checkedCast(obj);
    IceStorm::TopicPrx topic;
    try {
        topic = topicManager>retrieve("Weather");
    }
    catch (const IceStorm::NoSuchTopic&) {
        topic = topicManager>create("Weather");
    }

    Ice::ObjectPrx pub = topic>getPublisher()>ice_oneway();
    MonitorPrx monitor = MonitorPrx::uncheckedCast(pub);
    while (true) {
        Measurement m = getMeasurement();
        monitor>report(m);
    }
    ...
}
Note that this example assumes that IceStorm uses the instance name IceStorm. The actual instance name may differ, and you need to use it as the category when calling stringToProxy (see page 1705).
After obtaining a proxy for the topic manager, the collector attempts to retrieve the topic. If the topic does not exist yet, the collector receives a NoSuchTopic exception and then creates the topic.
    IceStorm::TopicPrx topic;
    try {
        topic = topicManager>retrieve("Weather");
    }
    catch (const IceStorm::NoSuchTopic&) {
        topic = topicManager>create("Weather");
    }
The next step is obtaining a proxy for the publisher object, which the collector narrows to the Monitor interface. (We create a oneway proxy for the publisher purely for efficiency reasons.)
    Ice::ObjectPrx pub = topic>getPublisher()>ice_oneway();
    MonitorPrx monitor = MonitorPrx::uncheckedCast(pub);
Finally, the collector enters its main loop, collecting measurements and publishing them via the IceStorm publisher object.
    while (true) {
        Measurement m = getMeasurement();
        monitor>report(m);
    }

Java Example

The equivalent Java version is shown below.
public static void main(String[] args)
{
    ...
    Ice.ObjectPrx obj = communicator.stringToProxy(
        "IceStorm/TopicManager:tcp p 9999");
    IceStorm.TopicManagerPrx topicManager =
        IceStorm.TopicManagerPrxHelper.checkedCast(obj);
    IceStorm.TopicPrx topic = null;
    try {
        topic = topicManager.retrieve("Weather");
    }
    catch (IceStorm.NoSuchTopic ex) {
        topic = topicManager.create("Weather");
    }

    Ice.ObjectPrx pub = topic.getPublisher().ice_oneway();
    MonitorPrx monitor = MonitorPrxHelper.uncheckedCast(pub);
    while (true) {
        Measurement m = getMeasurement();
        monitor.report(m);
    }
    ...
}
Note that this example assumes that IceStorm uses the instance name IceStorm. The actual instance name may differ, and you need to use it as the category when calling stringToProxy (see page 1705).
After obtaining a proxy for the topic manager, the collector attempts to retrieve the topic. If the topic does not exist yet, the collector receives a NoSuchTopic exception and then creates the topic.
    IceStorm.TopicPrx topic = null;
    try {
        topic = topicManager.retrieve("Weather");
    }
    catch (IceStorm.NoSuchTopic ex) {
        topic = topicManager.create("Weather");
    }
The next step is obtaining a proxy for the publisher object, which the collector narrows to the Monitor interface.
    Ice.ObjectPrx pub = topic.getPublisher().ice_oneway();
    MonitorPrx monitor = MonitorPrxHelper.uncheckedCast(pub);
Finally, the collector enters its main loop, collecting measurements and publishing them via the IceStorm publisher object.
    while (true) {
        Measurement m = getMeasurement();
        monitor.report(m);
    }

41.5.2 Using a Publisher Object

Each topic creates a publisher object for the express purpose of publishing messages. It is a special object in that it implements an Ice interface that allows the object to receive and forward requests (i.e., IceStorm messages) without requiring knowledge of the operation types.

Type Safety

From the publisher’s perspective, the publisher object appears to be an application-specific type. In reality, the publisher object can forward requests for any type, and that introduces a degree of risk: a misbehaving publisher can use uncheckedCast to narrow the publisher object to any type and invoke any operation; the publisher object unknowingly forwards those requests to the subscribers.
If a publisher sends a request using an incorrect type, the Ice run time in a subscriber typically responds by raising OperationNotExistException. However, since the subscriber receives its messages as oneway invocations, no response can be sent to the publisher object to indicate this failure, and therefore neither the publisher nor the subscriber is aware of the type-mismatch problem. In short, IceStorm places the burden on the developer to ensure that publishers and subscribers are using it correctly.

Oneway or Twoway?

IceStorm messages have oneway semantics (see Section 41.3.3), but publishers may use either oneway or twoway invocations when sending messages to the publisher object. Each invocation style has advantages and disadvantages that you should consider when deciding which one to use. The differences between the invocation styles affect a publisher in four ways:
• Efficiency
Oneway invocations have the advantage in efficiency because the Ice run time in the publisher does not await a reply to each message (and, of course, no reply is sent by IceStorm on the wire).
• Ordering
The use of oneway invocations by a publisher may affect the order in which subscribers receive messages. If ordering is important, use twoway invocations with a reliability QoS of ordered, or use a single thread in the subscriber (see also Section 41.10.1).
• Reliability
Oneway invocations can be lost under certain circumstances, even when they are sent over a reliable transport such as TCP (see Section 28.13). If the loss of messages is unacceptable, or you are unable to address the potential causes of lost oneway messages, then twoway invocations are recommended.
• Delays
A publisher may experience network-related delays when sending messages to IceStorm if subscribers are slow in processing messages. Twoway invocations are more susceptible to these delays than oneway invocations.

Transports

Each publisher can select its own transport for message delivery, therefore the transport used by a publisher to communicate with IceStorm has no effect on how IceStorm delivers messages to its subscribers.
For example, a publisher can use a UDP transport if the possibility of lost messages is acceptable (and if IceStorm provides a UDP endpoint to publishers). However, the TCP or SSL transports are generally recommended for IceStorm’s publisher endpoint in order to ensure that published messages are delivered reliably to IceStorm, even if they may not be delivered reliably to some subscribers.

Request Contexts

A request context is an optional argument of all remote invocations (see Section 28.11). If a publisher supplies a request context when publishing a message, IceStorm will forward it intact to subscribers.
Services such as Glacier2 employ request contexts to provide applications with more control over the service’s behavior. For example, if a publisher knows that IceStorm is delivering messages to subscribers via a Glacier2 router, the publisher can influence Glacier2’s behavior by including a request context, as shown in the following C++ example:
    Ice::ObjectPrx pub = topic>getPublisher();
    Ice::Context ctx;
    ctx["_fwd"] = "Oz";
    MonitorPrx monitor =
        MonitorPrx::uncheckedCast(pub>ice_context(ctx));
The _fwd context key, when encountered by Glacier2, causes the router to forward the request using compressed batch oneway messages. The ice_context method is used to obtain a proxy that includes the Glacier2 request context in every invocation, eliminating the need for the publisher to specify it explicitly. See Section 39.9 for more information on Glacier2’s use of request contexts.

41.5.3 Implementing a Subscriber

Our subscriber implementation takes the following steps:
1. Obtain a proxy for the TopicManager. This is the primary IceStorm object, used by both publishers and subscribers.
2. Create an object adapter to host our Monitor servant.
3. Instantiate the Monitor servant and activate it with the object adapter.
4. Subscribe to the Weather topic.
5. Process report messages until shutdown.
6. Unsubscribe from the Weather topic.
In the sections below, we present monitor implementations in C++ and Java.

C++ Example

Our C++ monitor implementation begins by including the necessary header files. The interesting ones are IceStorm/IceStorm.h, which is generated from the IceStorm Slice definitions, and Monitor.h, containing the generated code for our monitor definitions shown at the beginning of Section 41.2.
#include <Ice/Ice.h>
#include <IceStorm/IceStorm.h>
#include <Monitor.h>

using namespace std;

class MonitorI : virtual public Monitor {
public:
    virtual void report(const Measurement& m,
                        const Ice::Current&) {
        cout << "Measurement report:" << endl
             << "  Tower: " << m.tower << endl
             << "  W Spd: " << m.windSpeed << endl
             << "  W Dir: " << m.windDirection << endl
             << "   Temp: " << m.temperature << endl
             << endl;
    }
};

int main(int argc, char* argv[])
{
    ...
    Ice::ObjectPrx obj = communicator>stringToProxy(
        "IceStorm/TopicManager:tcp p 9999");
    IceStorm::TopicManagerPrx topicManager =
        IceStorm::TopicManagerPrx::checkedCast(obj);

    Ice::ObjectAdapterPtr adapter =
        communicator>createObjectAdapter("MonitorAdapter");

    MonitorPtr monitor = new MonitorI;
    Ice::ObjectPrx proxy = adapter>
                addWithUUID(monitor)>ice_oneway();

    IceStorm::TopicPrx topic;
    try {
        topic = topicManager>retrieve("Weather");
        IceStorm::QoS qos;
        topic>subscribeAndGetPublisher(qos, proxy);
    }
    catch (const IceStorm::NoSuchTopic&) {
        // Error! No topic found!
        ...
    }

    adapter>activate();
    communicator>waitForShutdown();

    topic>unsubscribe(proxy);
    ...
}
Our implementation of the Monitor servant is currently quite simple. A real implementation might update a graphical display, or incorporate the measurements into an ongoing calculation.
class MonitorI : virtual public Monitor {
public:
    virtual void report(const Measurement& m,
                        const Ice::Current&) {
        cout << "Measurement report:" << endl
             << "  Tower: " << m.tower << endl
             << "  W Spd: " << m.windSpeed << endl
             << "  W Dir: " << m.windDirection << endl
             << "   Temp: " << m.temperature << endl
             << endl;
    }
};
After obtaining a proxy for the topic manager, the program creates an object adapter, instantiates the Monitor servant and activates it.
    Ice::ObjectAdapterPtr adapter =
        communicator>createObjectAdapter("MonitorAdapter");

    MonitorPtr monitor = new MonitorI;
    Ice::ObjectPrx proxy =
        adapter>addWithUUID(monitor)>ice_oneway();
Note that the code creates a oneway proxy for the Monitor servant. This is for efficiency reasons: by subscribing with a oneway proxy, IceStorm will deliver events to the subscriber via oneway messages, instead of via twoway messages.
Next, the monitor subscribes to the topic.
    IceStorm::TopicPrx topic;
    try {
        topic = topicManager>retrieve("Weather");
        IceStorm::QoS qos;
        topic>subscribeAndGetPublisher(qos, proxy);
    }
    catch (const IceStorm::NoSuchTopic&) {
        // Error! No topic found!
        ...
    }
Finally, the monitor activates its object adapter and waits to be shutdown. After waitForShutdown returns, the monitor cleans up by unsubscribing from the topic.
    adapter>activate();
    communicator>waitForShutdown();

    topic>unsubscribe(proxy);

Java Example

The Java implementation of the monitor is shown below.
class MonitorI extends _MonitorDisp {
    public void report(Measurement m, Ice.Current curr) {
        System.out.println(
            "Measurement report:\n" +
            "  Tower: " + m.tower + "\n" +
            "  W Spd: " + m.windSpeed + "\n" +
            "  W Dir: " + m.windDirection + "\n" +
            "   Temp: " + m.temperature + "\n");
    }
}

public static void main(String[] args)
{
    ...
    Ice.ObjectPrx obj = communicator.stringToProxy(
        "IceStorm/TopicManager:tcp p 9999");
    IceStorm.TopicManagerPrx topicManager =
        IceStorm.TopicManagerPrxHelper.checkedCast(obj);

    Ice.ObjectAdapterPtr adapter =
        communicator.createObjectAdapter("MonitorAdapter");

    Monitor monitor = new MonitorI();
    Ice.ObjectPrx proxy =
        adapter.addWithUUID(monitor).ice_oneway();

    IceStorm.TopicPrx topic = null;
    try {
        topic = topicManager.retrieve("Weather");
        java.util.Map qos = null;
        topic.subscribeAndGetPublisher(qos, proxy);
    }
    catch (IceStorm.NoSuchTopic ex) {
        // Error! No topic found!
        ...
    }

    adapter.activate();
    communicator.waitForShutdown();

    topic.unsubscribe(proxy);
    ...
}
Our implementation of the Monitor servant is currently quite simple. A real implementation might update a graphical display, or incorporate the measurements into an ongoing calculation.
class MonitorI extends _MonitorDisp {
    public void report(Measurement m, Ice.Current curr) {
        System.out.println(
            "Measurement report:\n" +
            "  Tower: " + m.tower + "\n" +
            "  W Spd: " + m.windSpeed + "\n" +
            "  W Dir: " + m.windDirection + "\n" +
            "   Temp: " + m.temperature + "\n");
    }
}
After obtaining a proxy for the topic manager, the program creates an object adapter, instantiates the Monitor servant and activates it.
    Monitor monitor = new MonitorI();
    Ice.ObjectPrx proxy =
        adapter.addWithUUID(monitor).ice_oneway();
Note that the code creates a oneway proxy for the Monitor servant. This is for efficiency reasons: by subscribing with a oneway proxy, IceStorm will deliver events to the subscriber via oneway messages, instead of via twoway messages.
Next, the monitor subscribes to the topic.
    IceStorm.TopicPrx topic = null;
    try {
        topic = topicManager.retrieve("Weather");
        java.util.Map qos = null;
        topic.subscribeAndGetPublisher(qos, proxy);
    }
    catch (IceStorm.NoSuchTopic ex) {
        // Error! No topic found!
        ...
    }
Finally, the monitor activates its object adapter and waits to be shutdown. After waitForShutdown returns, the monitor cleans up by unsubscribing from the topic.
    adapter.activate();
    communicator.waitForShutdown();

    topic.unsubscribe(proxy);
Table of Contents Previous Next
Logo