Huihoo.org - Open Enterprise Foundation

ACE Monitor对象:一个用于并发程序的对象行为模式

(作者:Douglas C. Schmidt ,by huihoo.org Thzhang 译)

目的

Monitor对象模式通过同步方法执行来确保一个对象在一个时刻只有一个方法在被运行。它也允许一个对象的方法能够协同地安排它们执行的顺序。

别名

线程安全的被动对象

例子

让我们再次考虑在活动对象模式中曾经描述过的通讯网关的设计,参考图一。网关进程包含多个运行在分离的线程上的供给者(suppliers)处理对象和消费者(consumer)处理对象,这些对象分别的将来自一个或多个远端消息供给者(suppliers)的消息路由给一个或多个远端的消息消费者



(consumer)。当一个供给者处理对象从一个远端消息供给者接收到消息后,它使用存在消息体中的一个地址域来确定该消息对应的消费者处理对象,这个消费者处理对象最终将消息发送给它的远端消息消费者。

当消息供给者和消息消费者驻留在不同的主机上时,网关使用面向连接的TCP协议来提供可靠的消息发送和端到端的流控。当发送者发送消息的速度远大于接受者处理消息的速度时,TCP的流控算法将阻塞速度快的发送者。但是整个网关系统应该是非阻塞的,特别是当等待流控去减少TCP连接的外出流量时。为了最小化阻塞,每一个消费者处理对象应该包含一个线程安全的消息队列用于缓存从供给处理者线程接受来的新消息。

实现线程安全的消息队列的一种方法是使用活动对象模式,这种模式将调用方法线程和执行方法线程分割开来(译者:通常情况下方法调用和执行都由调用线程来完成)。如图二所示,每一个消息队列活动对象包含一个有边界的缓冲和用于维护待处理消息队列的控制线程。使用活动对象模式实现线程安全的消息队列可以使供给处理对象线程和消费处理对象线程相互分离,于是所有的线程都可以并发运行,当在任意TCP连接上发生流控时,能够被单独阻塞而不影响其他线程的工作。



虽然活动对象模式能够用于实现一个功能化的网关,但是它存在如下的缺陷:

1、 性能载荷
活动对象模式提供了一种强有力的并发模型。它不但能够同步对一个对象上的并发方法请求,而且能够实现完善的调度决策来确定请求的执行顺序。但是当调度和执行方法请求时,这些特性将引发大量的、重量级的上下文切换、同步、动态内存管理和数据移动带来的负载。

2、 编程载荷
活动对象模式要求程序员实现至少6个组件:代理、方法请求、活动队列、调度器、一个服务和用于每个代理方法的Future。虽然一些组件比如活动队列和方法请求可以被重用,但是程序员在每次应用该模式的时候可能还是不得不重新实现或客户化这些组件。

一般来说,如果一个应用并不需要活动对象模式中的所有特性,特别是它的"老练的"调度能力,上面提到的性能和编程方面的负载未必会很多。然而一个开发并发应用的程序员必须保证对一个对象的方法请求应该是被适当的同步或是调度。

语境

存在需要对多线程并发访问对象进行控制的应用。

问题

许多应用都包含能够被多个客户线程并发访问的对象。因此为了确保并发应用的正确执行,对这些对象的访问经常需要同步或调度。在解决这个问题之前,下面的三个需求必须被满足:

1、同步的边界应该对应于对象的方法。面向对象的程序员经常习惯于通过访问对象的接口方法来访问对象,目的是保护对象内部的数据不被任意的改变。可以直接扩展这个面向对象的编程模式,用于保护对象内部的数据不会被任意的并发改变,这就是通常说的竞争条件。因此一个对象的接口方法应该被定义成它的同步边界。
2、对象本身,而不是客户,应该实现其自身的方法同步。并发应用编程将变得更加困难,如果客户端必须清晰的实现获取或释放低层次的同步机制,比如信号量、互斥量以及条件变量。因此对象本身就有责任确保任何针对它的方法请求的同步应该被透明的进行,而不需要客户清晰的介入。
3、对象能够协同的调度它自身的方法。如果一个对象的方法在执行过程中必须被阻塞,他们应该自愿的放弃他们的线程控制来保证来自其他客户端线程的方法调用可以访问该对象。这个特性可以帮助防止死锁,使得影响(或调节)硬件/软件平台上可获得的并发性成为可能。

解决之道

将每一个被客户线程并发访问的对象定义为一个monitor对象。客户仅仅通过monitor对象的同步方法才能访问monitor对象定义的服务。为了防止monitor对象的状态陷入竞争条件,在一个时刻只能有一个monitor的同步方法被执行。每一个monitor对象包含一个monitor锁,被同步方法用于串行访问对象的行为和状态。此外,同步方法可以根据一个或多个与monitor对象相关的monitor conditions来决定在何种环境下挂起或恢复他们的执行。

结构

在一个monitor对象模式中存在四个参与者:
1、monitor对象。一个monitor对象向客户暴露一个或多个接口方法。为了保护monitor对象的内部状态不受任意修改和竞争条件的破坏,所有的客户必须通过这些方法访问monitor对象。因为monitor本身不包含自己的控制线程,所以每个方法在调用它的客户线程上执行。
2、同步方法。同步方法实现线程安全的被monitor对象暴露的服务。为了防止竞争条件,无论是否同时有多个线程并发调用同步方法,还是monitor对象类含有多个同步方法,在一个monitor对象内,在任意时间点只有一个同步方法能够被执行。
3、monitor锁。每一个monitor对象包含自己的monitor锁。同步方法使用这个monitor锁来实现每个对象基础上的方法调用串行化。当方法进入/离开对象时,每个同步方法必须分别的获取/释放monitor锁。这个协议保证无论什时候一个方法访问或修改对象的状态时都应该先获取monitor锁。
4、monitor条件。运行在分离线程上的多个同步方法可以经由monitor条件来相互等待和通知以实现协同地调度它们执行的顺序。同步方法可以使用monitor 条件来决定在何种环境下挂起或恢复他们的执行。

动态特征

在monitor对象模式中,在参与者之间将发生如下的协作过程:
1、同步方法的调用和串行化。当一个客户调用monitor对象的同步方法时,这个方法必须首先获取monitor对象的monitor锁。只要在monitor对象中有其他同步方法正在被执行,获取monitor锁便不会成功。在这种情况下,客户线程将被阻塞直到它获取monitor锁,在这个点上同步方法将获取monitor锁,进入临界区,执行方法实现的服务。一旦同步方法完成执行,monitor锁必须被释放,目的是使其他同步方法可以访问monitor对象。
2、同步方法线程挂起。如果一个同步方法必须被阻塞或是有其他原因不能立刻进行,它能够在一个monitor条件上等待,这将导致同步方法暂时"离开"monitor对象。当一个同步方法离开monitor对象,被同步方法获取的monitor锁将自动被释放,客户调用线程将被挂起在monitor条件上。
3、方法条件通知。一个同步方法能够通知一个monitor条件,目的是为了让一个前期使自己挂起在一个monitor条件上的同步方法线程恢复运行。此外,一个同步方法能够通知所有的前期使自己挂起在一个monitor条件上的同步方法线程。
4、同步方法线程恢复。一旦一个早先被挂起在monitor条件上的同步方法线程获取通知,它将继续在最初的等待monitor条件的点上执行。在被通知线程"重入"monitor对象,恢复执行同步方法之前,monitor锁将自动被获取。



实现

下面的步骤展示了如何实现monitor对象模式。
1、定义monitor对象接口方法。Monitor对象接口向用户暴露一个方法集合。典型的接口方法是同步的,在特定的monitor对象中,在任一时刻它们中只有一个能够被执行。在我们网关的例子中,每一个消费者处理器都包含一个消息队列和TCP连接。消息队列可以被定义为一个monitor对象,用于缓冲从供给处理器线程接受的消息。无论什么时候消费者处理器线程遇到发生在连接远端消费者的TCP连接的流控,monitor对象总能够帮助整个网关进程不会被阻塞。 下面的c++程序定义了消息队列monitor对象的接口:

class Message_Queue
{
public:
enum {
MAX_MESSAGES = /* ... */;
};
// The constructor defines the maximum number
// of messages in the queue. This determines
// when the queue is 'full.'
Message_Queue (size_t max_messages = MAX_MESSAGES);
// = Message queue synchronized methods.
// Put the  at the tail of the queue.
// If the queue is full, block until the queue
// is not full.
void put (const Message &msg);
// Get the  at the head of the queue.
// If the queue is empty, block until the queue
// is not empty.
Message get (void);
// True if the queue is full, else false.
// Does not block.
bool empty (void) const;
// True if the queue is empty, else false.
// Does not block.
bool full (void) const;
private:
// ...
};

Message_Queue monitor对象接口暴露了四个同步方法。Empty和full方法是谓词,客户可以用来区分三个不同的状态:空、满、既不空也不满。Put 和get方法分别用于完成从消息队列中插入和提取消息,如果消息队列为满或空将发生阻塞。
2、定义接口方法的对象实现方法。一个monitor对象经常包含实现方法用于简化其接口方法。这个分离的概念有助于实现monitor对象的功能和同步及调度逻辑之间的解耦合,同时也避免了对象内部的死锁和不必要的加锁载荷。
下面的惯例是基于线程安全的接口的习惯用法,可以用于构建接口方法和实现方法相分离的概念。
a)接口方法仅仅是获取/释放monitor锁,等待/通知特定的monitor条件,将monitor对象的功能转交给实现方法来完成。
b)当实现方法被接口方法调用时,仅仅实现其特定的功能,不再进行获取/释放monitor锁,等待/通知特定的monitor条件。此外,为了避免对象内部的死锁和不必要的加锁载荷,实现方法不应该调用任何monitor对象接口中定义的同步方法。 在我们网关的例子中,Message_Queue类定义了四个实现方法:put_i、get_i、empty_i和full_i,分别对应于相应的接口方法。这些函数的签名如下:

class Message_Queue
{
public:
// ... See above ....
private:
// = Private helper methods (non-synchronized
// and do not block).
// Put the  at the tail of the queue.
void put_i (const Message &msg);
// Get the  at the head of the queue.
Message get_i (void);
// True if the queue is full, else false.
// Assumes locks are held.
bool empty_i (void) const;
// True if the queue is empty, else false.
// Assumes locks are held.
bool full_i (void) const;
// ...

典型的这些方法既不是同步的,也不会被阻塞,刚好符合上面提到的线程安全接口的习惯用法。
3、定义方法的对象内部状态。一个monitor对象包含定义在其内部状态的数据成员。此外,一个monitor对象包含一个用于串行化它的同步方法执行的monitor锁,一个或多个用于调度同步方法执行的monitor条件。对于每一个类型的状况都有一个单独的monitor条件,在这里同步方法必须使它们被挂起或恢复其他被同步方法挂起的线程。
一个monitor锁可以用mutex来实现。当获得锁的线程正在执行临界区中的代码时,mutex用来使其他试图进入临界区的线程等待。Monitor条件可以用条件变量来实现。与mutex不同的是,条件变量被线程用来使自己被阻塞直到一个涉及共享数据的任意复杂条件表达式达到某个特殊的状态。
一个条件变量经常和mutex结合起来被使用,客户线程在进行表达式评估之前必须先获取mutex。如果条件表达式返回flase,客户线程将自动在条件变量上把自己挂起,同时释放mutex,从而使其他线程可以获取mutex来修改共享数据。当相应的线程更改了共享数据后,它可以通知条件变量,条件变量将自动的恢复早先在其上面挂起的线程并再次获取mutex。获取mutex的恢复线程再次评估条件表达式,如果共享数据达到希望的状态,线程将继续执行下去。否则,线程将重新使自己挂起在条件变量上等待它再次被恢复。这个过程将一直重复到条件表达式返回true.
一般情况下,条件变量比mutex更适合用于处理包含复杂表达式的状态或是调度行为。举例来说,条件变量可以被用来实现线程安全的消息队列。在这个用例中,一对条件变量可以协作的用来分别阻塞供给者线程,当消息队列满的时候和阻塞消费者线程,当消息队列空的时候。 在我们网关的例子中,Message_Queue类定义了它的内部状态,如下所示:

class Message_Queue
{
// ... See above ....
private:
// Internal Queue representation. ...
// Current number of s in the queue.
size_t message_count_;
// The maximum number s that can be
// in a queue before it's considered 'full.'
size_t max_messages_;
// = Mechanisms required to implement the
// monitor object's synchronization policies.
// Mutex that protect the queue's internal state
// from race conditions during concurrent access.
mutable Thread_Mutex monitor_lock_;
// Condition variable used to make synchronized
// method threads wait until the queue is no longer empty.
Thread_Condition not_empty_;
// Condition variable used to make synchronized
// method threads wait until the queue is no longer full.
Thread_Condition not_full_;
}


一个Message_Queue monitor对象定义了三种类型的状态:
a)队列表现数据成员。这些数据成员定义了内部消息队列的表现形式。这个表现使用循环数组或是链表来存储队列中的内容,同时还包含用于确定队列是否空、满、非空非满状态的记帐信息。内部消息队列的表现仅仅通过get_i、put_i、full_i和empty_i四个实现方法来访问和操纵。
b) monitor锁数据成员。Monitor锁被Message_Queue的同步方法用于串行化对monitor对象的访问。Monitor锁使用在包装门面模式(wrapper fa?ade pattern)中定义的 Thread Mutex类来实现。这个类提供了与平台无关的API接口。
c)monitor条件数据成员。当Message_Queue在空和满的边界条件转化时,Monitor条件被put和get同步方法分别用于使自己挂起和恢复。这些monitor条件使用如下定义的Thread Condition包装门面类来实现:


class Thread_Condition
{
public:
// Initialize the condition variable and
// associate it with the .
Thread_Condition (const Thread_Mutex &m)
// Implicitly destroy the condition variable.
Thread_Condition (void);
// Wait for the  to be,
// notified or until  has elapsed.
// If  == 0 wait indefinitely.
int wait (Time_Value *timeout = 0) const;
// Notify one thread waiting on the .
int notify (void) const;
// Notify *all* threads waiting on the .
int notify_all (void) const;
private:
#if defined (_POSIX_PTHREAD_SEMANTICS)
pthread_cond_t cond_;
#else
// Condition variable emulations.
#endif /* _POSIX_PTHREAD_SEMANTICS */
// Reference to mutex lock.
const Thread_Mutex &mutex_;
};



构造函数初始化条件变量,并使它与通过参数传递的Thread_Mutex相互关联。析构函数销毁条件变量,释放任何由构造函数分配的资源。需要注意的是mutex_本身不被Thread_Condition所拥有,所以在析构函数中不要销毁mutex_对象。
当被客户线程调用的时候,wait方法将自动释放mutex_对象,并使线程挂起一直到超过等待其他线程通知Thread_Condition对象的时间。Notify方法恢复一个等待在Thread_Condition对象上的线程,notifyall同时所有并发等待在Thread_Condition对象上的线程。在wait方法或者因为条件变量被通知,或者因为超时而返回客户线程之前,mutex_对象将被再次获取。
4、实现所有的monitor对象的方法和数据成员。最后这一步包括实现所有monitor对象的方法和上面定义的内部状态。
a)初始化数据成员。这个子步骤实现对象指定数据成员的初始化,包括monitor锁和所有的monitor条件。

例如:Message_Queue的构造函数创建一个空的消息队列,初始化monitor条件not_empty_和not_full_
Message_Queue::Message_Queue (size_t max_messages)
: not_full_ (monitor_lock_),
not_empty_ (monitor_lock_),
max_messages_ (max_messages),
message_count_ (0)
{
// ...
}

在这个例子中,展示了monitor条件如何共享同一个monitor锁。当多个线程同时通过put和get方法访问消息队列时,这个设计保证了Message_Queue对象的状态,如message_count,能够被串行化访问用于防止竞争条件的发生。
b)使用线程安全接口模式。在这个子步骤中,根据线程安全接口模式,接口方法和实现方法被分别的实现。 例如,下面的Message_Queue方法用于检测队列是否是空,也就是队列中没有任何消息,或是满,也就是队列中包含了超过max messages数量的消息。我们先看看接口方法的实现:


bool
Message_Queue::empty (void) const
{
Guard guard (monitor_lock_);
return empty_i ();
}
bool
Message_Queue::full (void) const
{
Guard guard (monitor_lock_);
return full_i ();
}


这些方法展示了上面提到的线程安全接口模式的一个简单的例子。它们使用范围锁习惯用法来获取/释放monitor锁,然后立刻调用相应的实现方法。正像下面所要展示的,这些实现方法都是在假定monitor锁已经被获取前提下,进行简单的队列边界条件检测。

bool
Message_Queue::empty_i (void) const
{
return message_count_ <= 0;
}
bool
Message_Queue::full_i (void) const
{
return message_count_ > max_messages_;
}


put方法在消息队列的尾部添加一个新的消息。它是一个同步方法,下面展示了更加老练的使用线程安全接口模式的方法实现。

Message_Queue::put (const Message &msg)
{
// Use the Scoped Locking idiom to
// acquire/release the  upon
// entry/exit to the synchronized method.
Guard guard (monitor_lock_);
// Wait while the queue is full.
while (full_i ()) {
// Release  and suspend our
// thread waiting for space to become available
// in the queue. The  is
// reacquired automatically when  returns.
not_full_.wait ();
}
// Enqueue the  at the tail of
// the queue and update .
put_i (new_item);
// Notify any thread waiting in  that
// the queue has at least one .
not_empty_.notify ();
// Destructor of  releases .
}



一旦消息队列中有多余的空间,它就调用put_i方法,这个方法完成将消息插入消息队列同时更新消息对应的记帐信息。此外put_i方法不再需要被同步,因为put方法在获取monitor锁之前是不会调用它的。同样,put_i方法不需要检测消息队列是否已满,因为只要full_i返回ture,它就永远不会被调用。 get方法从消息队列前端移走一个消息,并把它返回给调用者。

Message_Queue::get (void)
{
// Use the Scoped Locking idiom to
// acquire/release the  upon
// entry/exit to the synchronized method.
Guard guard (monitor_lock_);
// Wait while the queue is empty.
while (empty_i ()) {
// Release  and wait for a new
//  to be placed in the queue. The
//  is reacquired automatically
// when  returns.
not_empty_.wait ();
}
// Dequeue the first  in the queue
// and update the .
Message m = get_i ();
// Notify any thread waiting in  that the
// queue has room for at least one .
not_full_.notify ();
return m;
// Destructor of  releases .
}

变化

下面是monitor对象模式的几种变化:
1、定时的同步方法调用。一些应用受益于定时的同步方法调用。定时的调用使客户可以绑定一定数量的等待时间,用于等待同步方法进入monitor对象的临界区。早期定义的Message_Queue monitor对象可以被改写成支持定时的同步方法调用。

class Message_Queue
{
public:
// Message queue synchronized methods.
// Put the  at the tail of the queue.
// If the queue is full, block until the queue
// is not full. If  is 0 then block
// until the  is inserted into the queue.
// Otherwise, if  expires before the
//  is enqueued, the  exception
// is thrown.
void put (const Message &msg,
Time_Value *timeout = 0)
throw (Timedout);
// Get the  at the head of the queue.
// If the queue is empty, block until the queue
// is not empty. If  is 0 then block
// until the  is inserted into the queue.
// Otherwise, if  expires before the
//  is enqueued, the  exception
// is thrown.
Message get (Time_Value *timeout = 0)
throw (Timedout);
// ...
};


如果timeout的值是0,那么put和get方法都将永远阻塞直到一个消息被移出或插入Message_Queue monitor对象。否则,如果时间超期,将抛出超时异常,客户端必须做好处理此异常的准备。
下面展示了,使用定时等待特性的put方法是如何实现的:

void Message_Queue::put (const Message &msg,
Time_Value *timeout)
throw (Timedout)
{
// ... Same as before ...
// Wait while the queue is full.
while (full_i ()) {
// Release  and suspend our
// thread waiting for space to become available
// in the queue or for  to elapse.
// The  is reacquired automatically
// when  returns, regardless of whether
// a timeout occurred or not.
if (not_full_.wait (timeout) == -1
&& errno = ETIMEDOUT)
throw Timedout ();
}
// ... Same as before ...
}


3、策略化锁机制。策略化的锁模式可以使monitor对象更加灵活,更具有可重用性。例如,下面的模板类参数化了Message_Queue的同步方面特性。

template 
class Message_Queue
	{	
// ...
private:
typename SYNCH_STRATEGY::MUTEX monitor_lock_;
typename SYNCH_STRATEGY::CONDITION not_empty_;
typename SYNCH_STRATEGY::CONDITION not_full_;
// ...
};


于是每一个同步方法就被修改成类似下面展示的empty方法一样:

template  bool
Message_Queue::empty (void) const
{
Guard guard (monitor_lock_);
return empty_i ();
}


为了参数化和Message_Queue相关联的同步特性,我们可以定义一对类:MT_SYNC和NULL_SYNC,这两个类适当的定义了一些c++ traits,如下所示:

class MT_SYNCH {
public:
// Synchronization traits.
typedef Thread_Mutex MUTEX;
typedef Thread_Condition CONDITION;
};
class NULL_SYNCH {
public:
// Synchronization traits.
typedef Null_Mutex MUTEX;
typedef Null_Thread_Condition CONDITION;
};

这样,为了定义一个线程安全的Message_Queue,我们使用MT_SYNCH策略来参数化它:
Message_Queue message_queue;
同样,为了创建非线程安全的Message_Queue,我们使用NULL_SYNCH策略来参数化它:
Message_Queue message_queue;
需要注意的是当在C++程序中使用策略化锁模式的时候,在特定的用例下,组件类是不可能知道被参数化的锁的类型。因此,使用线程安全接口模式确保对象内方法互调用,如put方法调用full_和put_i,不会发生自死锁/最小化加锁载荷是非常重要的。

结论

使用monitor对象模式可以带来如下好处:
1、简化在一个对象上方法并发调用的同步过程。当调用monitor对象的方法时,客户不必关系同步的控制。如果编程语言没有将monitor对象作为语言特性,开发者可以象使用范围锁习惯用法一样使用它,来简化和自动处理monitor锁的加锁和解锁过程,从而实现monitor对象方法和状态的串行化访问。
2、同步方法可以协同的调度它们执行的顺序。同步方法使用monitor条件来决定在何种环境下它们应该挂起或是恢复执行。例如,方法能够使自己挂起,等待任意复杂的条件发生时的通知,而不是采取的低效的轮询处理。这个特性可以使monitor对象协同的在分离的线程中调度它们的方法。
使用monitor对象会带来如下的缺点:
1、在对象的功能和同步机制之间建立了紧耦合关系。一个活动对象的功能和同步策略是松耦合的,因为活动对象有分离的调度器。作为对比,一个monitor对象的同步和调度逻辑是紧耦合在它的方法功能中。虽然这样使monitor对象比活动对象更有效率,但是在不直接改变monitor对象方法实现的基础上很难改变它的同步策略和机制。正象线程安全接口和策略化锁模式所描述的那样,一种可以减少monitor对象中同步和功能之间耦合的方法是使用面向方面的编程(aspect-oriented programming)。
2、 嵌套monitor的怠工(lockout)。这个问题发生在当一个monitor对象嵌套在另一个monitor对象中时。考虑下面的JAVA代码:

class Inner {
protected boolean cond_ = false;
public synchronized void awaitCondition () {
while (!cond)
try { wait (); }
catch (InterruptedException e) {}
// Any other code.
}
public synchronized
void notifyCondition (boolean c) {
cond_ = c;
notifyAll ();
}
}
class Outer {
protected Inner inner_ = new Inner ();
public synchronized void process () {
inner_.awaitCondition ();
}
public synchronized
void set (boolean c) {
inner_.notifyCondition (c);
}
}



上面用JAVA代码展示了一个典型的monitor lockout问题的表现形式。当一个JAVA 线程阻塞在一个monitor对象的等待队列上时,除了这个被放在等待队列上的对象外的所有锁都将被正常持有。想象一下将会发生什么,如果线程T1调用了outer.process方法,将导致被阻塞在inner.awaitCondition方法中的wait方法上。Inner和outer类并不共享它们的monitor锁。这样一来,awaitCondition方法调用导致释放inner monitor锁而持有outer monitor锁。但是,另一个线程T2不能获取outer monitor对象,因为它将被同步方法阻塞。于是outer.set条件永远不会编程true,T1将永远被阻塞在wait方法上。