第6章 主动对象(Active Object):用于并发编程的对象行为模式
R. Greg Lavender Douglas C. Schmidt
本论文描述主动对象(Active Object)模式。该模式使方法执行与方法调用去耦合,以简化对驻留在它自己的线程控制中的对象的同步访问。主动对象模式允许一或多个交错访问数据的独立执行的线程被建模为单个对象。这一并发模式能良好地适用于广泛的生产者/消费者和读者/作者应用类。该模式通常用于需要多线程服务器的分布式系统中。此外,客户应用,比如窗口系统和网络浏览器,采用主动对象来简化并发和异步的网络操作。
主动对象设计模式使方法执行与方法调用去耦合,以增强并发、并简化对驻留在它自己的线程控制中的对象的同步访问。
并发对象和Actor。
为演示主动对象模式,考虑一个通信网关[1]的设计。网关使协作的组件去耦合,并允许它们进行交互,而无需彼此直接依赖[2]。图6-1中所示的网关在分布式系统中将来自一或多个供应者进程的消息路由到一或多个消费者进程[3]。
在我们的例子中,网关、供应者和消费者在面向连接的协议TCP[4]之上进行通信。因此,当网关软件尝试向远地消费者发送数据时,可能会遇到来自TCP传输层的流控制。TCP使用流控制来确保快速的生产者或网关不会过快地生产出数据,以致慢速消费者或拥挤的网络不能缓冲和处理这些数据。
为了改善所有供应者和消费者的端到端服务质量(QoS),整个网关进程不能在任何到消费者的连接上阻塞以等待流控制缓解。此外,当供应者和消费者的数目增加时,网关还必须能高效地扩展,
防止阻塞并提高性能的一种有效的方法是在网关设计中引入并发。并发应用允许执行对象O的方法的线程控制与调用O的方法的线程控制去耦合。而且,在网关中使用并发还使TCP连接被流控制的线程的阻塞不会阻碍TCP连接未被流控制的线程的执行。

图6-1 通信网关
对运行在相互分离的线程控制中的对象进行访问的客户。
许多应用受益于使用并发对象来改善它们的QoS,例如,通过允许应用并行地处理多个客户请求。并发对象驻留在它们自己的线程控制中,而不是使用单线程被动对象棗这些对象在调用其方法的客户的线程控制中执行它们的方法。但是,如果对象并发执行,且这些对象被多个客户线程共享,我们必须同步对它们的方法和数据的访问。在存在这样的问题时,会产生三种压力:
对于每个需要并发执行的对象,使对对象方法的请求与方法执行去耦合。这样的去耦合被设计用于使客户线程看起来像是调用一个平常的方法。该方法被自动转换为方法请求对象,并传递给另一个线程控制,在其中它又被转换回方法,并在对象实现上被执行。
主动对象由以下组件组成:代理(Proxy)[5, 2]表示对象的接口,仆人(Servant)提供对象的实现。代理和仆人运行在分离的线程中,以使方法调用和方法执行能并发运行:代理在客户线程中运行,而仆人在不同的线程中运行。在运行时,代理将客户的方法调用(Method Invocation)转换为方法请求(Method Request),并由调度者(Scheduler)将其存储在启用队列(Activation Queue)中。调度者持续地运行在与仆人相同的线程中,当启用队列中的方法请求变得可运行时,就将它们出队,并分派给实现主动对象的仆人。客户可通过代理返回的“期货”(future)获取方法执行的结果。
主动对象模式的结构在下面的Booch类图中演示:

在主动对象模式中有六个关键的参与者:
代理(Proxy)
方法请求(Method
Request)
启用队列(Activation
Queue)
调度者(Scheduler)
仆人(Servant)
期货(Future)
下图演示主动对象模式中的协作的三个阶段:

这一部分解释使用主动对象模式构建并发应用所涉及的步骤。使用主动对象模式实现的应用是6.3网关的一部分。图6-2演示该例子的结构和参与者。这一部分中的例子使用ACE构架[9]的可复用组件。ACE提供了一组丰富的可复用C++包装和构架组件,可跨越广泛的OS平台执行常见的通信软件任务。
在我们的网关例子中,仆人是一个消息队列,缓冲待处理的递送给消费者的消息。对于每一个远地消费者,都有一个Consumer Handler(消费者处理器),其中含有一个到消费者进程的TCP连接。此外,Consumer Handler含有一个被建模为主动对象的消息队列,并通过MQ_Servant来实现。当从供应者传递到网关的消息在等待被发送到它们的远地消费者时,每个Consumer Handler的主动对象消息队列就存储这些消息。下面的类提供了这个仆人的接口:
class MQ_Servant
{
public:
MQ_Servant (size_t mq_size);
// Message queue implementation operations.
void put_i (const Message &msg);
Message get_i (void);
// Predicates.
bool empty_i (void) const;
bool full_i (void) const;
private:
// Internal Queue representation, e.g., a
// circular array or a linked list, etc.
};

图6-2 将消费者处理器的消息队列实现为主动对象
put_i和get_i方法分别实现队列的插入和移除操作。此外,仆人还定义了两个断言(Predicate):empty_i和full_i,可区分三种内部状态(1)空,(2)满,以及(3)既不为空也不为满。这些断言用于方法请求的看守方法的实现,后者允许调度者强制实施运行时同步约束;这些同步约束规定仆人的put_i和get_i的调用顺序。
注意MQ_Servant类是怎样设计,以使同步机制始终外在于仆人。例如,在我们的网关例子中,MQ_Servant类中的方法并不包括任何实现同步的代码。该类仅仅提供实现仆人功能和检查它的内部状态的方法。这样的设计避免了“继承异常”[10, 11, 12, 13]问题;如果子类需要不同的同步策略,该问题将会妨碍仆人实现的复用。因而,对主动对象的同步约束的改变不需要影响它的仆人实现。
在我们的网关例子中,MQ_Proxy提供步骤1中定义的MQ_Servant的抽象接口。该消息队列被Consumer Handler用于排队递送给消费者的消息,如图6-2所示。此外,MQ_Proxy还是一个工厂,它构造方法请求的实例,并将它们传递给调度者,后者将它们排队,用于后面在分离的线程中的执行。MQ_Proxy的C++实现如下所示:
class MQ_Proxy
{
public:
// Bound the message queue size.
enum { MAX_SIZE = 100 };
MQ_Proxy (size_t size = MAX_SIZE)
: scheduler_ (new MQ_Scheduler (size)),
servant_ (new MQ_Servant (size)) {}
// Schedule <put> to execute on the active object.
void put (const Message &m)
{
Method_Request *method_request = new Put (servant_, m);
scheduler_->enqueue (method_request);
}
// Return a Message_Future as the ‘‘future’’
// result of an asynchronous <get>
// method on the active object.
Message_Future get (void)
{
Message_Future result;
Method_Request *method_request = new Get (servant_,
result);
scheduler_->enqueue (method_request);
return result;
}
// ... empty() and full() predicate implementations ...
protected:
// The Servant that implements the
// Active Object methods.
MQ_Servant *servant_;
// A scheduler for the Message Queue.
MQ_Scheduler *scheduler_;
};
MQ_Proxy的每个方法都将它的调用转换为方法请求,并将其传递给它的MQ_Scheduler,后者将请求入队,用于后续的启用。Method_Request基类定义虚guard和call方法,分别被它的调度者用于决定方法请求是否可被执行和在它的仆人上执行方法请求。如下所示:
class Method_Request
{
public:
// Evaluate the synchronization constraint.
virtual bool guard (void) const = 0;
// Implement the method.
virtual void call (void) = 0;
};
该类中的方法必须被子类定义,代理中定义的每个方法都有一个相应的Method_Request子类。定义这两个方法的原因是为调度者提供一个统一接口来计算和执行具体Method_Request。因而,调度者就得以与怎样计算同步约束、或是触发具体Method_Request执行的特定知识去耦合。
例如,在我们的网关例子中,当客户调用代理上的put方法时,该方法被转换为Put子类的实例;该子类继承自Method_Request,并含有指向MQ_Servant的指针。如下所示:
class Put : public Method_Request
{
public:
Put (MQ_Servant *rep, Message arg)
: servant_ (rep), arg_ (arg) {}
virtual bool guard (void) const
{
// Synchronization constraint: only allow
// <put_i> calls when the queue is not full.
return !servant_->full_i ();
}
virtual void call (void)
{
// Insert message into the servant.
servant_->put_i (arg_);
}
private:
MQ_Servant *servant_;
Message arg_;
};
注意guard方法怎样使用MQ_Servant的full_I断言来实现同步约束,以允许调度者确定Put方法请求何时可以执行。当Put方法请求可被执行时,调度者调用它的call挂钩方法。该方法使用它的MQ_Servant运行时绑定来调用仆人的put_i方法。put_i方法在仆人的上下文中执行,并且不需要任何显式的序列化机制,因为调度者通过方法请求guard来强制实施所有必要的同步约束。
代理还将get方法转换为Get类的实例;Get类定义如下:
class Get : public Method_Request
{
public:
Get (MQ_Servant *rep, const Message_Future &f)
: servant_ (rep), result_ (f) {}
bool guard (void) const
{
// Synchronization constraint:
// cannot call a <get_i> method until
// the queue is not empty.
return !servant_->empty_i ();
}
virtual void call (void)
{
// Bind the dequeued message to the
// future result object.
result_ = servant_->get_i ();
}
private:
MQ_Servant *servant_;
// Message_Future result value.
Message_Future result_;
};
对于代理中所有返回值的两路方法,比如在我们的网关例子中的get_i方法,Message_Future被返回给调用该方法的客户线程,如下面的实现步骤4所示。客户可以选择立即对Message_Future的值进行求值,在这样的情况下,客户会阻塞、直到方法请求被调度者执行为止。相反,对主动对象方法调用的返回结果的求值也可被延期,在这样的情况下,客户线程和执行该方法的线程可以异步地执行。
下面的C++代码演示Activation_Queue是怎样被用于网关中的:
class Activation_Queue
{
public:
// Block for an "infinite" amount of time
// waiting for <enqueue> and <dequeue> methods
// to complete.
const int INFINITE = -1;
// Define a "trait".
typedef Activation_Queue_Iterator iterator;
// Constructor creates the queue with the
// specified high water mark that determines
// its capacity.
Activation_Queue (size_t high_water_mark);
// Insert <method_request> into the queue, waiting
// up to <msec_timeout> amount of time for space
// to become available in the queue.
void enqueue (Method_Request *method_request,
long msec_timeout = INFINITE);
// Remove <method_request> from the queue, waiting
// up to <msec_timeout> amount of time for a
// <method_request> to appear in the queue.
void dequeue (Method_Request *method_request,
long msec_timeout = INFINITE);
private:
// Synchronization mechanisms, e.g., condition
// variables and mutexes, and the queue
// implementation, e.g., an array or a linked
// list, go here.
// ...
};
enqueue和dequeue方法提供一种“有界缓冲区生产者/消费者”并发模式,允许多个线程同时插入和移除Method_Request,而不会破坏Activation_Queue的内部状态。一或多个客户线程扮演生产者角色,通过代理将Method_Request入队。调度者线程扮演消费者角色,当Method_Request的guard方法求值为“真”时,将它们出队,并调用它们的call挂钩来执行仆人方法。
Activation_Queue被设计为使用条件变量和互斥体[14]的有界缓冲区。因此,当试图从空的Activation_Queue中移除Method_Request时,调度者将会阻塞msec_timeout长度的时间。同样地,当试图在满的Activation_Queue中(也就是,当前Method_Request数目等于其高水位标的队列)插入时,客户线程将会阻塞最多msec_timeout长度的时间。如果enqueue方法超时了,控制会返回给客户线程,而方法没有被执行。
在我们的网关例子中,我们定义MQ_Scheduler类如下:
class MQ_Scheduler
{
public:
// Initialize the Activation_Queue to have the
// specified capacity and make the Scheduler
// run in its own thread of control.
MQ_Scheduler (size_t high_water_mark);
// ... Other constructors/destructors, etc.,
// Insert the Method Request into
// the Activation_Queue. This method
// runs in the thread of its client, i.e.,
// in the Proxy’s thread.
void enqueue (Method_Request *method_request)
{
act_queue_->enqueue (method_request);
}
// Dispatch the Method Requests on their Servant
// in the Scheduler’s thread.
virtual void dispatch (void);
protected:
// Queue of pending Method_Requests.
Activation_Queue *act_queue_;
// Entry point into the new thread.
static void *svc_run (void *arg);
};
调度者在与它的客户线程不同的线程控制中执行它的dispatch方法。这些客户线程驱使代理将方法请求放入调度者的Activation_Queue中。调度者在它自己的线程中监控它的Activation_Queue,选择其guard求值所得为“真”(也就是,同步约束已被满足)的Method_Request。于是这个Method_Request就通过调用它的call挂钩方法被执行。注意多个客户线程可以共享同一个代理。代理方法无需是线程安全的,因为调度者和启用队列会处理并发控制。
例如,在我们的网关例子中,MQ_Scheduler的构造器初始化Activation_Queue,并派生一个新线程来运行MQ_Scheduler的dispatch方法。如下所示:
MQ_Scheduler (size_t high_water_mark)
: act_queue_ (new Activation_Queue (high_water_mark))
{
// Spawn a separate thread to dispatch
// method requests.
Thread_Manager::instance ()->spawn (svc_run, this);
}
这个新线程执行svc_run静态方法,后者仅仅是一个调用dispatch方法的适配器。如下所示:
void *MQ_Scheduler::svc_run (void *args)
{
MQ_Scheduler *this_obj = reinterpret_cast<MQ_Scheduler
*> (args);
this_obj->dispatch ();
}
dispatch方法基于底层的MQ_Servant断言empty_i和full_i来决定Put和Get方法请求的处理顺序。这些断言反映仆人的状态,比如消息队列是否为空、满,或都不是。通过经由方法请求的guard方法对这些断言约束进行求值,调度者可以确保对MQ_Servant的公平的共享访问。如下所示:
virtual void MQ_Scheduler::dispatch (void)
{
// Iterate continuously in a
// separate thread.
for (;;)
{
Activation_Queue::iterator i;
// The iterator’s <begin> call blocks
// when the <Activation_Queue> is empty.
for (i = act_queue_->begin (); i != act_queue_->end
(); i++)
{
// Select a Method Request ‘mr’
// whose guard evaluates to true.
Method_Request *mr = *i;
if (mr->guard ())
{
// Remove <mr> from the queue first
// in case <call> throws an exception.
act_queue_->dequeue (mr);
mr->call ();
delete mr;
}
}
}
}
在我们的网关例子中,MQ_Scheduler类的dispatch的实现持续地执行下一个其guard求值为真的Method_Request。但是,调度者实现还可以更为成熟,并且可以含有表示仆人同步状态的变量。例如,要实现一个多读者/单作者的同步策略,可在调度者中存储若干计数器变量,以跟踪读和写请求的数目。调度者可使用这些计数器来确定一个单个的作者何时可以继续执行,也就是,当目前的读者数目为0,而目前又没有其他作者在运行时。注意计数器的值独立于仆人的状态,因为后者仅仅被调度者用来代表仆人强制实施正确的同步策略。
期货构造允许进行两路异步调用,它们返回值给客户。当仆人完成方法执行时,它获取期货上的写锁,用结果值更新期货。任何正阻塞等待该结果值的线程都被唤醒,并可以并发地访问结果值。期货对象可在作者和所有读者都不再引用它后被垃圾回收。在像C++这样不直接支持垃圾回收的语言里,期货对象可以在它们不再被使用时通过像计数器指针[2]这样的习语来进行回收。
在我们的网关例子中,在MQ_Proxy上调用get方法最终导致Get::call方法被MQ_Scheduler分派,如上面的步骤2所示。因为MQ_Proxy get方法返回一个值,当客户调用它时会返回一个Message_Future。Message_Future被定义如下:
class
Message_Future
{
public:
// Copy
constructor binds <this> and <f> to the
// same
<Message_Future_Rep>, which is created if
// necessary.
Message_Future
(const Message_Future &f);
// Constructor
that initializes <Message_Future> to
// point to
<Message> <m> immediately.
Message_Future
(const Message &m);
// Assignment
operator that binds <this> and <f>
// to the same
<Message_Future_Rep>, which is
// created if
necessary.
void operator= (const
Message_Future &f);
// ... other
constructors/destructors, etc.,
// Type
conversion, which blocks
// waiting to
obtain the result of the
// asynchronous
method invocation.
operator
Message ();
};
Message_Future使用计数指针习语[2]来实现。通过使用引用计数的Message_Future_Rep体(通过Message_Future句柄单独进行访问),这种习语简化了动态分配的C++对象的内存管理。
一般而言,客户可通过下面两种方法中的一种来从Message_Future对象那里获取消息的结果值:
MQ_Proxy mq;
// ...
// Conversion of
Message_Future from the
// get() method
into a Message causes the
// thread to
block until a message is
// available.
Message msg =
mq.get ();
// Transmit
message to the consumer.
send (msg);
// Obtain a
future (does not block the client).
Message_Future
future = mq.get ();
// Do something
else here...
// Evaluate
future in the conversion operator;
// may block if
the result is not available yet.
Message msg =
Message (future);

图6-3 通信网关
网关软件在内部包含有Supplier和Consumer Handler,分别用作远地供应者和消费者的本地代理[2, 5]。如图6-3所示,Supplier Handler接收来自远地供应者的消息,检查消息中的地址域,并将该地址用作Routing Table中的关键字;该关键字标识哪一个远地消费者应该接收该消息。Routing Table维护Consumer Handler的一个映射表;每一个Consumer Handler负责在各自的TCP连接上递送消息给它的远地消费者。
为在多个TCP连接上处理流控制,每个Consumer Handler都含有一个使用6.9中描述的主动对象来实现的消息队列。Consumer_Handler类被定义如下:
class
Consumer_Handler
{
public:
Consumer_Handler
(void);
// Put the
message into the queue.
void put (const
Message &msg)
{
message_queue_.put (msg);
}
private:
// Proxy to the
Active Object.
MQ_Proxy
message_queue_;
// Connection
to the remote consumer.
SOCK_Stream
connection_;
// Entry point
into the new thread.
static void
*svc_run (void *arg);
};
在它们自己的线程中运行的Supplier Handler将消息放进适当的Consumer Handler的消息队列中。如下所示:
Supplier_Handler::route_message
(const Message &msg)
{
// Locate the
appropriate consumer based on the
// address
information in the Message.
Consumer_Handler
*ch = routing_table_.find (msg.address ());
// Put the Message
into the Consumer Handler’s queue.
ch->put
(msg);
};
为处理放置在消息队列中的消息,每个Consumer_Handler在它的构造器中都派生出一个单独的线程控制。如下所示:
Consumer_Handler::Consumer_Handler
(void)
{
// Spawn a
separate thread to get messages
// from the
message queue and send them to
// the
consumer.
Thread_Manager::instance
()->spawn (svc_run, this);
}
这个新线程执行svc_run方法,取得由Supplier Handler线程放置在队列中的消息,并在TCP连接上将它们发送给消费者。如下所示:
void
*Consumer_Handler::svc_run (void *args)
{
Consumer_Handler
*this_obj =
reinterpret_cast<Consumer_Handler
*> (args);
for (;;)
{
// Conversion of Message_Future
from the
// get() method into a Message
causes the
// thread to block until a message
is
// available.
Message msg =
this_obj->message_queue_.get ();
// Transmit message to the
consumer.
this_obj->connection_.send
(msg);
}
}
因为消息队列被实现成主动对象,send操作可以在任何给定的Consumer_Handler对象中阻塞,而不会影响其他Consumer_Handler的服务质量。
下面是主动对象模式的一些变种:
集成调度者:为减少实现主动对象模式所需组件的数目,代理和仆人的角色常常被集成进调度者组件中,尽管仆人仍然在与代理不同的线程中执行。而且,将方法调用转换为方法请求也可被集成进调度者。例如,下面是使用集成调度者来实现消息队列的另一种方法:
class
MQ_Scheduler
{
public:
MQ_Scheduler
(size_t size)
: act_queue_ (new Activation_Queue
(size))
{}
// ... other
constructors/destructors, etc.,
void put (const
Message &msg)
{
Method_Request *method_request =
// The <MQ_Scheduler> is the servant.
new Put (this, msg);
act_queue_->enqueue
(method_request);
}
Message_Future
get (void)
{
Message_Future result;
Method_Request *method_request =
// The <MQ_Scheduler> is the
servant.
new Get (this, result);
act_queue_->enqueue (method_request);
return result;
}
// ...
private:
// Message
queue servant operations.
void put_i
(const Message &msg);
Message get_i
(void);
// Predicates.
bool empty_i
(void) const;
bool full_i
(void) const;
Activation_Queue
*act_queue_;
// ...
};
通过使生成方法请求的地方集中化,可以简化模式的实现,因为组件变少了。当然,缺点是调度者必须知道仆人和代理的类型,这使得开发者难以将调度者复用于不同类型的主动对象。
消息传递:一种更为精致的集成调度者变种是将代理和仆人一起去除,并在客户线程和调度者线程间使用直接的消息传递。如下所示:
class Scheduler
{
public:
Scheduler
(size_t size)
: act_queue_ (new Activation_Queue
(size))
{}
// ... other
constructors/destructors, etc.,
// Enqueue a
Message Request in the thread of
// the client.
void enqueue
(Message_Request *message_request)
{
act_queue_->enqueue
(message_request);
}
// Dispatch
Message Requests in the thread of
// the
Scheduler.
virtual void
dispatch (void)
{
Message_Request *mr;
// Block waiting for next request
to arrive.
while (act_queue_->dequeue (mr))
{
// Process the message request
<mr>.
}
}
protected:
Activation_Queue
*act_queue_;
// ...
};
在此设计中没有代理,于是客户简单地创建适当类型的Message_Request,并调用enqueue,将请求插入到Activation_Queue。同样地,也没有仆人,于是运行在调度者线程中的dispatch方法简单地使下一个Message_Request出队,并根据它的类型来进行处理。
一般而言,开发一种消息传递机制要比开发主动对象容易,因为需要开发的组件较少。但是,消息传递通常更为麻烦而易错,因为应用开发者要负责对代理和仆人逻辑进行编程,而不是让主动对象开发者来编写此代码。
多态期货:多态期货[15]允许对期货所代表的最终结果类型进行参数化,并会采取必要的同步。特别地,多态期货结果值提供一次写、多次读的同步。客户是否会阻塞在期货上取决于结果值是否已被计算。因此,多态期货部分地是一种读者/作者条件同步模式,部分地是生产者/消费者同步模式。
下面的类演示一种C++多态期货模板:
template
<class T>
class Future
{
// This class
implements a ‘single write, multiple
// read’
pattern that can be used to return results
// from
asynchronous method invocations.
public:
// Constructor.
Future (void);
// Copy
constructor that binds <this> and <r> to
// the same
<Future> representation
Future (const
Future<T> &r);
// Destructor.
?Future (void);
// Assignment
operator that binds <this> and
// <r> to
the same <Future>.
void operator =
(const Future<T> &r);
// Cancel a
<Future>. Put the future into its
// initial
state. Returns 0 on success and -1
// on failure.
int cancel
(void);
// Type
conversion, which obtains the result
// of the
asynchronous method invocation.
// Will block
forever until the result is
// obtained.
operator T ();
// Check if the
result is available.
int ready
(void);
private:
Future_Rep<T>
*future_rep_;
// Future representation
implemented using
// the Counted
Pointer idiom.
};
客户可以这样来使用多态