Developer's Guide

  • Docs Home
  • Community Home

1. Twisted Network Programming Overview

Zenoss relies heavily on the Twisted network Python libraries. Twisted provides an asynchronous, layered networking stack that is used by Zenoss for daemon communications as well as for contacting devices. The main Twisted documentation can provide a more detailed background.

One of the central concepts in Twisted is not a multi-threaded design, but an asynchronous design. This means that it is event-driven (the next function to be called depends on what data is received) with co-operative multi-tasking (such as a badly behaved function that sleeps or takes a long time to execute can stall an entire application). The unit of co-operative multi-tasking is a deferred object. A simplified overview is that a Twisted program starts a bunch of deferred tasks and then waits for timers to expire and network events to happen.

Daemons communicate with ZenHub via Twisted Perspective Broker (PB), which is a library for transferring objects over the network. The most important PB concepts for our purposes are these:

  • Methods that start with remote_ are callable from the daemons.

  • There are restrictions on what type of objects can be passed back and forth between the service and the daemon. Passing native Python types is supported, as well as some support for more simple objects (classes without methods). Simple objects can be marked using the PB method pb.setUnjellyableForClass() to help accomplish this goal.

1.1. Understanding NJobs, Driver and DeferredList

Writing scalable, single-threaded communications servers requires an event-driven programming approach. Small, simple I/O steps are connected by callbacks, rather than normal control flow. For example, instead of just sending a request and waiting for the response you have to create the request, queue it for delivery, send it when the network flow-control says it has space, wait for the response, reading it piecemeal, as it arrives, and then correlating it to the sent message. Fortunately, we use a comprehensive library that performs many of these steps for us, so the underlying steps are not as small. But, once you have queued your request, you must head back to the main event loop so that I/O from many different parts of your application can complete in a reactive manner. The fundamental callback mechanism is the Twisted library's Deferred. There are three common tasks that our data collectors perform in an asynchronous environment. They are:

  1. Perform these tasks, in any order, and report to me when they are complete.

  2. Perform this long list of tasks, but do not do more than N of them at a time.

  3. Perform a sequence of related activities in the correct order.

1.1.1. DeferredList

Lets say you need to perform I/O requests in parallel, and you don't care which finishes first, so long as they all complete before the next step. For this problem, we gather up the deferreds from each step as we initiate it, and we hand them to a DeferredList. Once they have all fired (with callbacks or errbacks) the DeferredList will return a list of the results, along with a boolean value indicating success or failure.

from twisted.internet.defer import DeferredList

d1 = task1()
d2 = task2()
d3 = task3()
d = DeferredList([d1, d2, d3])
d.addCallback(printResults)

def printResults(results):
   for success, value in results:
      if success:
         print "Callback successful:", value
      else:
         print "Errback: ", value

Each task runs in parallel, completing at its own pace. This approach is useful for knowing when a number of unrelated requests have completed. For example, fetching the initial configuration may have several requests that are not interrelated. These may be done in parallel, so long as they all complete before collection begins.

1.1.2. NJobs

Each collector can overwhelm existing resources if it does not limit itself. For example, file descriptors in a process are normally limited to approximately a thousand. Unless you change the operating system's default it is not possible to talk to more than a thousand devices at one time if each requires its own file descriptor. So, we normally wish to a talk to as many as we can concurrently, but not so many that we run out of local resources. NJobs takes a callable that takes a single argument and returns a deferred, and a sequence of items, along with a value N, such that only N of the callables are outstanding at each time.

from Products.ZenUtils.NJobs import NJobs

jobs = NJobs(10, collectDevice, devices)
d = jobs.start()
d.addCallback(printResults):

def printResults(results):
	for result in results:
		print "Result is", results

The callable is called on the sequence list in the order given, but each call may complete out-of-order. Therefore, the results may also have a different order than the input sequence. NJobs prevents us from having to write a built-in limit to each type of asynchronous collector.

1.1.3. Driver

The most difficult to understand of the asynchronous tools that uses Deferreds is Driver. First let's understand the basic problem. We have a sequence of asynchronous activities we want to link together, but each step requires some intervening computation or organization. If the activities were synchronous, they might look like this:

config = readConfig()
self.updateConfig(config)
for d in self.config:
    clearStatus(d.id)
collect(self.config)
sendHeartbeat()

Each of these steps must be completed in order. Using just deferreds we might right something like this:

d = readConfig()
d.addCallback(updateConfig)
def clearStatuses(self):
    d = DeferredList([clearStatus(d.id) for d in self.config])
    d.addCallback(collect)
    d.addCallback(heartbeat)
d.addCallback(clearStatuses)

The interleaving of synchronous calls (the for loop) and asynchronous calls twists the code around the callback mechanism. There is a mechanism in Python that can be used to straighten out a convoluted sequence of actions to produce a stream of results. Like a tokenizer, which uses yield to produce tokens as they have been discovered in an input stream, Driver uses yield to produce deferreds as they come up. Driver consumes the deferreds and resumes computation when they complete. So lets see what this code looks like when we yield a deferred whenever we have one:

yield readConfig()
self.updateConfig(results)
for d in self.config:
  yield clearStatus(d.id)
yield self.config()
yield sendHeartbeat()

What remains is very much like the normal synchronous control flow, except the result from the deferreds are missing. The value results in the 2nd line of the example is a stand-in for some mechanism to get the results of the last deferred that was returned by yield.

Here's the example in a more complete fragment:

from Products.ZenUtils.Driver import drive
def cycle(driver):
	yield readConfig()
	self.updateConfig(driver.next())
	for d in self.config:
		yield clearStatus(d.id)
		driver.next()
	yield self.config(); driver.next()
	yield sendHeartbeat(); driver.next()
drive(cycle)

So, when we drive one of these deferred-generating-sequences, we get a reference to the driver. The driver keeps the last value returned by a deferred result, so that it is available to the iterator. Construction is difficult to understand, but understanding is not necessary to use Driver. If you have a sequence of code, where deferreds keep cropping up and preventing your workflow from, well, flowing, you can use Driver to make flow like the synchronous version.

First, you need a generator which takes a single argument. If you don't have one, you can make one right in the body of the function:

def f(a, b, c, d):
	def inner(drive):
	      yield g(a, b, c, d)
		 drive.next()
 	return drive(inner)

Next, just yield the deferreds as they come up, and get the result with driver.next(). It's good to call driver.next() even if you don't use the result, because if the result was an exception, driver.next() will throw the exception.

Finally, drive returns a deferred, so be sure to perform callback handling on it. The callback value of the deferred is the last value from the last deferred.

drive(function).addBoth(self.handleResult)

1.1.4. A Simple Example

The following code is a simple example of the usage of a Twisted client / server code as well as the Zenoss driver() code.

#! /usr/bin/env python

__doc__= """
Simple example of using ZenUtils Driver and Twisted Perspective Broker (PB).
Sums all of the numbers that are given as command line arguments by repeatedly
calling a remote add method on the server-side object.
"""

from twisted.spread import pb
from twisted.internet import reactor
import Globals
from Products.ZenUtils.Driver import drive

class Server(pb.Root):
    """
    This is the server-side object.
    """

    def __init__(self, port):
        """
        Listen on the specified port.

        @param port: the TCP/IP port to listen on
        @type port: positive integer
        """
        reactor.listenTCP(port, pb.PBServerFactory(self))

    def remote_add(self, x, y):
        """
        Add the two parameters together and return the result.

        @param x: first operand
        @type x: number
        @param y: second operand
        @type y: number
        @return: the sum of x and y
        @rtype: number
        """
        return x + y


class Client(object):
    """
    This is the client-side object.
    """

    def __init__(self, port, numbers, callback):
        """
        Connect to the server and drive the sum method.

        @param port: TCP/IP port number on which a server is listening
        @type port: positive integer
        @param numbers: numbers to add
        @type numbers: list of numbers
        @param callback: a callable that accepts an argument
        @type callback: Twisted callback object
        """
        self.numbers = [int(n) for n in numbers]
        self.clientFactory = pb.PBClientFactory()
        drive(self.sum).addCallback(callback)
        reactor.connectTCP('localhost', port, self.clientFactory)

    def sum(self, driver):
        """
        Get the root object. Call the remote add method repeatedly keeping
        track of the total.
        This is a Python iterable.

        @param driver: a driver of the iterables
        @param driver: Zenoss driver() class
        @return: deferred to track the returned number
        @rtype: Twisted deferred object
        """
        yield self.clientFactory.getRootObject()
        root = driver.next()
        total = 0
        for n in self.numbers:
            yield root.callRemote('add', total, n)
            total = driver.next()


def main(numbers):
    """
    Assign a port. Create the client and server. Run the reactor.

    @param numbers: numbers to add
    @type numbers: list of numbers
    """
    port = 7691

    # Add the server to the reactor
    Server(port)

    def callback(total):
        """
        A simple callback to return the total and stop the reactor

        @param total: the total, as returned by the server
        @param total: number
        """
        print total
        reactor.stop()

    # Add the client to the reactor
    Client(port, numbers, callback)
    
    reactor.run()


if __name__ == '__main__':
    import sys
    if len(sys.argv) > 1:
        main(sys.argv[1:])
    else:
        print 'Usage: %s <number> [number...]' % __file__