7 ACE反应堆(Reactor)的设计和使用:用于事件多路分离的面向对象构架

 

Douglas C. Schmidt     Irfan Pyarali

7.1 介绍

 

本文关注ACE构架[2]中的反应堆(Reactor)模式[1]的设计和实现。反应堆模式处理由一或多个客户并发地递送给应用的服务请求。应用的每个服务可由一或多个方法组成,并由一个单独的事件处理器代表;事件处理器负责分派服务特有的请求。

在本论文描述的反应堆模式的实现中,事件处理器分派由ACE_Reactor对象来完成。ACE_Reactor结合了I/O事件,以及其他类型的事件,比如定时器和信号的多路分离。在此实现的核心是一个同步的事件多路分离器,例如,select[3]WaitForMultipleObjects[4]。当事件发生时,ACE_Reactor自动分派预登记的事件处理器的方法,后者随即执行应用指定的服务。

本论文被组织如下:7.2描述ACE_Reactor构架中的主要特性;7.3概述ACE_Reactor实现的OO设计[2]7.4检查若干例子、演示ACE_Reactor怎样简化并发的事件驱动网络应用的开发;7.5描述使用ACE_Reactor开发事件驱动应用时所应遵循的设计规则;7.6给出结束语。

 

7.2 反应堆的特性

 

ACE_Reactor提供OO多路分离和分派构架,它简化了事件驱动应用的开发。下面的段落描述反应堆构架提供给应用开发者的特性:

 

统一的OO多路分离和分派接口:使用ACE_Reacotr的应用并不直接访问诸如selectWaitForMultipleObjects这样的低级事件多路分离系统调用,而是通过从ACE_Event_Handler基类继承来创建具体的事件处理器。该类指定处理多种类型事件的虚方法,比如I/O事件、定时器事件、信号和同步事件。图7-1演示ACE_Reactor中的关键组件。它演示了实现7.4描述的日志服务器的具体事件处理器。使用反应堆构架的应用创建具体事件处理器,并将它们登记到ACE_Reactor

 

使事件处理器分派自动化:当ACE_Reactor管理的处理器上有活动发生时,反应堆自动调用适当的预登记的具体事件处理器的虚方法。C++对象被登记到ACE_Reactor。对象、而不是单独的函数的使用,允许在对具体事件处理器的挂钩方法的调用之间,状态可以方便地保持。这种风格的OO编程对于开发在多次客户调用间保持状态的事件处理器来说很有用。

 

支持透明的可扩展性ACE_Reactor的功能和它的已登记的事件处理器可被透明地扩展,而无须修改或重编译现有代码。为实现这样的扩展性,反应堆构架采用继承和动态绑定来去耦(1)它的较低级的事件多路分离和分派机制与(2)应用定义的处理事件的较高级的策略

ACE_Reactor管理的低级机制包括检测多个I/O句柄上的事件、使定时器到期,以及分派适当的事件处理器方法来处理这些事件。应用特有的具体事件处理器执行的较高级策略包括连接建立策略、数据编码和解码,以及处理来自客户的服务请求。例如,TAO[5]实时CORBA ORB使用反应堆构架来分离它的低级事件多路分离机制和它的GIOP连接管理和协议处理[5]的较高级策略。

 

 

7-1 反应堆组件

 

增加复用ACE_Reactor的多路分离和分派机制可被许多网络应用复用。通过复用,而不是重新发明,开发者可以专注于较高级的应用特有的策略,而不是反复与较低级事件多路分离和分派机制相纠缠。

相反,直接使用像selectWaitForMultipleObject这样的低级事件多路分离操作的程序员必须为每一个应用重新实现、调试和调谐同样的多路分离和分派代码。而所有利用ACE_Reactor的应用自动地复用它的特性,以及将来的增强和优化。

 

增强类型安全性ACE_Reactor将应用开发者和易错的细节(它们与使用像select这样的低级事件多路分离系统调用来编程是相关联的)屏蔽开来。这些细节涉及设置和清除位掩码、处理超时和中断,以及分派挂钩方法。特别地,ACE_Reacotr消除了导致select出错的若干微妙原因,这些错误涉及fd_set位掩码的误用。

 

改善可移植性ACE_Reactor在若干事件多路分离机制上运行,其中包括Win32 WaitForMultipleObjectsselect(它在UNIXWin32上都可用)。ACE_Reactor将应用和底层事件多路分离机制的移植性差异屏蔽开来。如图7-6所示,ACE_Reactor向应用输出同样的接口,而不管底层系统调用是什么。而且,ACE_Reactor使用像桥接(Bridge[6]这样的设计模式来增强它内部的可移植性。因而,将ACE_Reactorselect移植到WaitForMultipleObjects仅需要对构架进行一些局部变动[7]

 

线程安全性:反应堆构架是完全多线程的。所以多个线程可安全地共享单个ACE_Reactor。同样地,多个ACE_Reactor也可在一个进程的不同线程中运行。反应堆构架提供必要的同步机制来防止条件竞争和类中方法的死锁[8]

 

高效的多路分离ACE_Reactor非常高效地执行它的事件多路分离逻辑。例如,基于selectACE_Reactor使用7.3.2中描述的ACE_Handle_Set类来避免每次只查看fd_set位掩码的一位。这一优化基于一种成熟的算法,使用异或操作符来将运行时复杂度从O总位数)降低到O已置位的位数)。这充分地减少了运行时开销。

 

7.3 反应堆构架的OO设计

 

这一部分描述反应堆构架的OO设计。主要焦点在于它的组件的结构和关键的设计决策。在适当的地方还讨论了实现细节。7.3.1概述OS平台无关的组件,而7.3.2覆盖平台相关的组件。

 

7.3.1 平台无关的类组件

 

这一小部分总结反应堆构架中的平台无关的类,其中包括ACE_ReactorACE_Time_ValueACE_Timer_QueueACE_Event_Handler

 

7.3.1.1 ACE_Reactor

 

ACE_Reactor定义反应堆构架的公共接口。图7-2演示ACE_Reactor类中关键的公共方法。此类中的方法大致被分组为以下几个种类:

 

管理器方法:构造器和open方法通过动态地分配多个实现对象来创建和初始化ACE_Reactor对象;这些实现对象在下面的7.3.2.17.3.2.2中描述。析构器和close方法释放这些对象。

 

基于I/O的方法:派生自ACE_Event_Handler的具体事件处理器通过反应堆的register_handler方法登记到ACE_Reactor;同样地,具体事件处理器可以通过它的remove_handler方法移除。

 

基于定时器的方法:ACE_Reactor的定时器策略使用最终期限来评估被调度的定时器的优先级。ACE_Reactor的定时器策略提供的操作包括(1)登记将在用户指定时间执行的具体事件处理器和(2)取消先前登记的事件处理器。

 

事件循环方法:在登记初始的具体事件处理器之后,应用最终进入一个事件循环,重复调用ACE_Reactorhandle_events方法中的一个。这些方法阻塞应用指定长度的时间,等待各种事件的发生,比如I/O句柄上的同步I/O事件或基于定时器的事件。在事件发生时,ACE_Reactor分派具体事件处理器的适当方法;这些方法已被应用登记以处理这些事件。

 

class ACE_Reactor

{

public:

// = Initialization and termination methods.

enum { DEFAULT_SIZE = FD_SETSIZE };

 

// Initialize a Reactor instance that may

// contain SIZE entries (<restart> indicates

// to restart system calls after interrupts).

ACE_Reactor (int size, int restart = 0);

virtual int open (int size = DEFAULT_SIZE, int restart = 0);

 

// Perform cleanup activities to close down

// an instance of an <ACE_Reactor>.

void close (void);

?ACE_Reactor (void);

 

// = I/O-based event handler methods.

// Register an <ACE_Event_Handler> object according

// to the <ACE_Reactor_Mask>(s) (e.g., READ_MASK,

// WRITE_MASK, etc.).

virtual int register_handler (ACE_Event_Handler *, ACE_Reactor_Mask);

 

// Remove the handler associated with the

// appropriate <ACE_Reactor_Mask>(s).

virtual int remove_handler (ACE_Event_Handler *, ACE_Reactor_Mask);

 

// = Timer-based event handler methods.

// Register a handler to expire at time <delta>.

// When <delta> expires the <handle_timeout>

// method will be called with the current time

// and <act> as parameters. If <interval> is > 0

// then the handler is reinvoked periodically

// at that <interval>. The <delta> is interpreted

// "relative" to the current time of day.

virtual void schedule_timer

(ACE_Event_Handler *,

const void *act,

const ACE_Time_Value &delta,

const ACE_Time_Value &interval);

 

// Locate and cancel timer.

virtual void cancel_timer (ACE_Event_Handler *);

 

// = Event-loop methods

// Block process until I/O events occur or timer

// expires, then dispatch activated handler(s).

virtual int handle_events (void);

 

// Perform a timed event-loop that waits up to TV

// time units for events to occur; if no events

// occur then 0 is returned, otherwise return

// TV - (actual_time_waited).

virtual int handle_events (ACE_Time_Value &tv);

 

private:

// Pointer to the implementation class,

// e.g., <ACE_Select_Reactor> or

// <ACE_WFMO_Reactor>.

Reactor_Impl *reactor_impl_;

};

7-2 ACE反应堆接口

 

7.3.1.2 ACE_Event_Handler

 

该基类指定ACE_Reactor用以控制和协调具体事件处理器的多路分离和分派接口。ACE_Event_Handler接口中的虚方法如图7-3所示。

 

// Handle portability issues.

#if defined (UNIX)

typedef int ACE_HANDLE;

#elif defined (WIN32)

typedef HANDLE ACE_HANDLE;

#endif /* UNIX */

class ACE_Event_Handler

{

public:

// These values can be bitwise "or’d" together to

// instruct the <ACE_Reactor> to check for

// multiple I/O activities on a single handle.

enum {

READ_MASK = 01,

WRITE_MASK = 02,

EXCEPT_MASK = 04,

RWE_MASK = READ_MASK | WRITE_MASK | EXCEPT_MASK,

DONT_CALL // Don’t callback to handle_close().

};

 

// Returns the I/O handle associated with the

// derived object (must be supplied by a subclass).

virtual ACE_HANDLE get_handle (void) const = 0;

 

// Called when event handler is removed from

// an <ACE_Reactor>.

virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask);

 

// Called when input becomes available.

virtual int handle_input (ACE_HANDLE);

 

// Called when output is possible.

virtual int handle_output (ACE_HANDLE);

 

// Called when urgent data is available.

virtual int handle_except (ACE_HANDLE);

 

// Called when timer expires (<tv> stores the

// current time and <act> is the argument given

// when the handler was originally scheduled).

virtual int handle_timeout (const ACE_Time_Value &tv, const void *act = 0);

 

// Called when signal is triggered by OS.

virtual int handle_signal (int signum);

};

7-3 ACE事件处理器接口

 

ACE_Reactor通过派生自ACE_Event_Handler的具体事件处理器来实现它的事件驱动回调机制。具体事件处理器可以有选择地重定义ACE_Event_Handler中的虚方法,以执行应用定义的处理来响应多种类型的事件。这些事件包括(1)同步I/O,例如,异常;(2)定时器;(3)信号;以及(4)同步,例如,将Win32互斥体从复位切换到置位[4]

7.3.1.4中描述的ACE_Timer_Queue使用具体事件处理器来处理基于定时器的事件。当此队列管理的定时器到期时,先前排定的(scheduled)事件处理器的handle_timeout方法被调用。有两个参数被传递给该方法:(1)当前时间和(2void * 异步完成令牌ACT[9],当事件处理器最初被排定时,此令牌也作为参数被传递给schedule_timer

当具体事件处理器中的任何方法的返回值<0时,反应堆都自动调用处理器的handle_close清扫方法。应用可对该方法进行编程以执行终止活动,比如关闭日志文件或删除对象分配的动态内存。当handle_close方法返回时,ACE_Reactor从它的内部表中将与之相关联的具体事件处理器移除掉。

具体事件处理器通常要提供一个I/O句柄。当应用调用ACE_Reactorregister_handler方法时,该方法回调具体事件处理器的get_handle方法,以获取底层的I/O句柄。

当应用调用ACE_Reactorhandle_events方法时,所有已登记的具体事件处理器的句柄都被传递给底层的OS事件多路分离调用,例如,selectWaitForMultipleObjects。随后,当I/O事件变为“就绪”时,OS激活这些句柄。在此时,ACE_Reactor通过调用被定义用来处理该事件的方法,通知适当的具体事件处理器。

 

7.3.1.3 ACE_Time_Value

 

C++包装封装底层OS平台的日期和时间结构,例如,在大多数UNIX平台上定义的struct timeval类型。timeval结构包括两个域,根据微秒来表示时间。其他OS平台,比如POSIXWin32,使用稍稍不同的时间表示法。为此,ACE_Time_Value类封装这些细节以提供可移植的C++接口。

ACE_Time_Value类的主要方法在图7-4中演示。ACE_Time_Value包装使用操作符重载来简化基于时间的比较。这样的重载允许标准的算术语法被用于涉及时间比较的关系表达式。

 

// Time value structure from /usr/include/sys/time.h

// struct timeval { long secs; long usecs; };

class ACE_Time_Value

{

public:

ACE_Time_Value (long sec = 0, long usec = 0);

ACE_Time_Value (timeval t);

 

// Returns sum of two <ACE_Time_Value>s.

friend ACE_Time_Value operator +

(const ACE_Time_Value &lhs,

const ACE_Time_Value &rhs);

 

// Returns difference between two

// <ACE_Time_Value>s.

friend ACE_Time_Value operator -(

const ACE_Time_Value &lhs,

const ACE_Time_Value &rhs);

 

// Relational and comparison operators for

// normalized <ACE_Time_Value>s.

friend int operator <

(const ACE_Time_Value &lhs,

const ACE_Time_Value &rhs);

 

// Other relation operators...

 

private:

// ...

};

7-4 ACE时间值接口

 

ACE_Time_Value的方法被实现用来“规格化”时间数量。规格化调整timeval结构中的两个域,使之使用能确保精确比较的规范的编码方式。例如,在规格化之后,数量ACE_Time_Value(1, 1000000)将与ACE_Time_Value(2)相等。相反,直接按位比较这些非规格化的类值将检测不到它们的相等。

下面的代码创建两个ACE_Time_Value对象,它们由用户提供的命令行参数加上当前时间来构造。随后显示两个对象之间的正确顺序关系:

 

int main (int argc, char *argv[])

{

if (argc != 3)

ACE_ERROR_RETURN ((LM_ERROR, "usage: %d" "time1 time2\n"), 1);

 

ACE_Time_Value time = ACE_OS::gettimeofday ();

ACE_Time_Value timer1 = time + ACE_Time_Value (ACE_OS::atoi (argv[1]));

ACE_Time_Value timer2 = time + ACE_Time_Value (ACE_OS::atoi (argv[2]));

 

if (timer1 > timer2)

ACE_DEBUG ((LM_DEBUG, "timer 1 is greater\n"));

else if (timer2 > timer1)

ACE_DEBUG ((LM_DEBUG, "timer 2 is greater\n"));

else

ACE_DEBUG ((LM_DEBUG, "timers are equal\n"));

 

return 0;

}

 

上面所示的代码对所有OS平台都是完全可移植的。注意像操作符重载和类这样的C++特性的使用是怎样简化与时间相关操作的使用的。

 

7.3.1.4 ACE_Timer_Queue

 

ACE_Reactor的基于定时器的机制对于需要定时器支持的应用来说是很有用的。例如,WWW服务器需要看门狗定时器来释放资源,如果客户在它们连接后不在指定的时间间隔内发送HTTP请求的话。同样地,像Windows NT Service Control Manager[4]这样的看守配置构架要求在它们控制之下的服务周期性地报告它们的当前状态。这些“心跳”消息用于确认服务没有异常地终止。

ACE_Timer_Queue类提供的机制允许应用登记派生自ACE_Event_Handler的、基于定时器的具体事件处理器。ACE_Timer_Queue确保这些事件处理器中的handle_timout方法将来在应用指定的时间被调用。图7-5中所示的ACE_Timer_Queue类的方法使得应用可以调度、取消,和调用定时器对象。

 

class ACE_Timer_Queue

{

public:

ACE_Timer_Queue (void);

 

// True if queue is empty, else false.

int is_empty (void) const;

 

// Returns earliest time in queue.

const ACE_Time_Value &earliest_time (void) const;

 

// Schedule a <handler> to be dispatched at

// the <future_time> and at subsequent

// <interval>s.

int virtual schedule

(ACE_Event_Handler *handler,

const void *act,

const ACE_Time_Value &future_time,

const ACE_Time_Value &interval);

 

// Cancel all registered <ACE_Event_Handlers>

// that match the address of <handler>, which

// can be registered multiple times.

int virtual cancel(ACE_Event_Handler *handler);

 

// Cancel the single <ACE_Event_Handler>

// matching the <timer_id> value returned

// from <schedule>.

int virtual cancel (int timer_id, const void **act = 0);

 

// Expire all timers <= <expire_time>

// (note, this method must be called manually

// since it is not invoked asynchronously).

void virtual expire(const ACE_Time_Value &expire_time);

 

private:

// ...

};

7-5 ACE_Timer_Queue接口

 

应用调度具体事件处理器,在延迟一定数量的时间之后到期。如果它到期了,act作为值被传递给事件处理器的handle_timeout挂钩方法。如果interval不等于ACE_Time_Value::zero,该值就被用于自动重调度事件处理器。

Schedule方法返回一个定时器id,唯一地标识每个事件处理器在定时器队列的内部表中的登记。定时器idcancel方法用于在事件处理器到期之前将其移除。如果一个非NULL act被传给cancel,这个act就被设置为应用在定时器最初被排定时所传入的异步完成令牌ACT[9] 这使得程序可以释放动态分配的ACT,以避免内存泄漏。

缺省地,ACE_Reactor所用的ACE_Timer_Queue被作为堆来实现。堆是一种“部分有序的、几乎完全的二进制树”,它确保插入和删除一个具体事件处理器的平均和最坏情况的时间复杂度为O(lg n)。堆表示法对大多数应用实例、特别是实时应用来说都是很适用的。

ACE_Timer_Queue堆由含有ACE_Time_ValueACE_Event_Handler *,和void *的三元组组成。ACE_Event_Handler * 域指向被排定的定时器对象,该对象的运行时间由ACE_Time_Value域指定。void * 域是在具体事件处理器被最初被排定时所提供的参数。当定时器到期时,这个参数被自动传给7.3.1.2中描述的handle_timeout方法。堆中的每个ACE_Time_Value都以绝对时间单元来存储,例如,按照UNIX gettimeofday系统调用所生成的时间。

ACE_Timer_Queue接口中使用了虚方法。因而,应用可以扩展缺省的ACE实现来支持其他可选的数据结构,如delta[11]定时轮timing wheel[12]delta表将时间存储为“相对的”单元,即相对于表的最前面的ACE_Time_Value的偏移或“delta”。定时轮使用循环缓冲区,使得程序有可能在O(1)时间内启动、停止和维护定时器。ACE构架提供了若干可选的定时器队列的实现。

 

7.3.2 平台相关的类组件

 

ACE_Reactor类是应用用以访问ACE反应堆构架的公共接口。ACE_Reactor接口由一些虚方法组成。因此,它可以通过继承来扩展。

但是,扩展ACE_Reactor最常用的方法并不是派生它的子类。相反,如图7-6所示,桥接模式[6]被用来使ACE_Reactor接口与它的ACE_Reactor_Impl子类实现去耦合。ACE构架提供的两个子类包括ACE_Select_ReactorACE_WMFO_Reactor,它们分别封装了selectWaitForMultipleObjects OS事件多路分离调用。

在不同OS平台上,ACE_Reactor_Impl子类的实现也不同。但是,ACE_Reactor接口提供的方法的名字和总的功能保持不变。这种统一性源于ACE_Reactor的设计的模块性,该设计还增强了反应堆的复用、可移植性和可维护性。ACE_ReactorWaitForMultipleObjectsselect版本概述如下。

 

7.3.2.1 ACE_Select_Reactor

 

如图7-6(1)所示,基于selectACE_Reactor包含有三个ACE_Event_Handler * 数组。这些数组存储的指针指向已登记用来处理各种类型事件的具体事件处理器。

ACE_Handle_Set类为底层的fd_set位掩码数据类型提供高效的C++包装。fd_setI/O句柄名字空间映射到紧凑的位向量表示,并提供若干操作来置位、复位和测试与I/O句柄相对应的位。可传给select调用一或多个fd_set

ACE_Handle_Set类通过以下方法来优化若干常用的fd_set操作:(1)使用“全字”(full-word)比较,以使不必要的位操作最少化, 2)缓存某些值,以避免每次调用都计算位偏移,以及(3)使用一种异或算法,它与fd_set中的活动句柄的数目、而非潜在的活动句柄的数目线性相关。

 

7.3.2.2 ACE_WMFO_Reactor

 

WaitForMultipleObjects接口比select更为通用,它允许应用等待更广泛的事件,比如同步事件。因此,基于WaitForMultipleObjectsACE_Reactor既不需要三个ACE_Event_Handler * 数组,也不需要ACE_Handle_Set类,而是在内部分配和使用单个的ACE_Event_Handler指针数组和句柄数组,以存储已登记的具体事件处理器。

 

 

7-6 将桥接模式用于反应堆实现

7.4 分布式日志服务例子

 

反应堆构架意在简化事件驱动应用的开发,比如Web服务器[13, 14]CORBA对象请求代理[15]。这一部分描述一个分布式日志服务(distributed logging service)的设计和实现,以演示ACE_Reactor在实践中的应用。

 

7.4.1 综述

 

日志提供一种“只追加”的存储服务,记录由一或多个应用发来的诊断信息。日志的主要单元是记录。到来的记录被追加到日志的末尾,而所有其它类型的写访问都是被禁止的。

如图7-7所示,下面检查的日志服务使用客户/服务器体系结构,以使通过TCP/IP网络连接的工作站和服务器能够记录事件。日志服务将ACE_Reactor的多路分离与分派特性和[16]中描述的C++ IPC包装库提供的BSD sockets与系统V传输层接口(TLI)的OO接口结合在了一起。

 

 

7-7 分布式日志服务中的组件

 

日志服务中的关键组件描述如下:

 

应用日志接口(Application logging interface):在客户主机上运行的应用进程,例如,P1P2P3,使用ACE_Log_Msg C++类来生成日志记录,比如LM_ERRORLM_DEBUGACE_Log_Msg::log方法提供一种printf风格的接口。图7-8概述了在应用接口、客户和服务器日志看守之间交换的记录的若干优先级和数据格式。当被应用调用时,日志接口格式化日志记录,打上时间戳,将它们写到周知的STREAM管道[17]中去。如下所述,客户日志看守负责处理这些记录。

 

// The following enum indicates the relative

// priorities of the logging messages.

enum Log_Priority

{

// Messages that contain information normally

// used only when debugging a program.

LM_DEBUG,

 

// Critical conditions, e.g., hard device errors

LM_ERROR

 

// ...

};

 

struct Log_Record

{

enum {

// Maximum number of bytes in logging record.

MAXLOGMSGLEN = 1024

};

 

// Type of logging record.

Log_Priority type_;

 

// length of the logging record.

long length_;

 

// Time logging record generated.

long time_stamp_;

 

// Id of process that generated the record.

long pid_;

 

// Logging record data.

char rec_data_[MAXLOGMSGLEN];

};

7-8 日志记录格式

 

客户日志看守(Client logging daemon):客户日志看守运行在所有参与分布式日志服务的主机上。每个客户日志看守与STREAM管道的读端相连,后者用于从这台机器上的应用那里接收日志记录。使用STREAM管道是因为它们是只用于本地主机的IPC的高效形式。此外,STREAM管道的语义允许“结合优先级”的消息,可按照“重要性顺序”,以及“到达顺序”接收[18]

客户日志看守从应用进程那里持续地接收日志记录。它随后将多字节的记录头字段转换为网络字节顺序。最后,它使用TCP将记录转发给服务器日志看守。服务器通常运行在远地主机上。

 

服务器日志看守(Server logging daemon):服务器日志看守持续地收集、重格式化和输出到来的日志记录。这一部分的余下部分专注于服务器日志看守。在此例中演示和描述了多种ACE_ReactorACE C++ socket包装机制。

 

7.4.2 服务器日志看守

 

下面描述用来构造服务器日志看守的类的接口和实现。日志服务器在单独的进程中运行,并发地处理来自客户的日志记录。并发是由ACE_Reactor提供的,它将它的注意力以循环方式“分时”给每个活动的客户。

每次应用调用ACE_Reactorhandle_events方法,就从每个I/O句柄变为活动的客户那里读取一条日志记录。日志记录被写到服务器日志看守的标准输出。该输出可被重定向到多种设备,比如打印机、持久存储仓库或日志管理控制台。

 

 

7-9 服务器日志看守中的组件

 

有若干C++类组件出现在日志服务体系中。在图7-9中使用Booch表示法[19]演示了多种组件间的继承和模板参数化关系。为增强复用和可扩展性,图中所示的组件被设计用以使服务器日志看守体系结构的以下方面去耦合:

 

反应堆构架组件(Reactor framework component):反应堆构架中的组件封装执行I/O多路分离和具体事件处理器分派的最底层机制。这些组件已在7.3中讨论。

 

连接相关组件(Connection-related component):这些通用模板实现接受器(Acceptor)模式[20],提供可复用的连接工厂组件。ACE_Acceptor是一个接受来自远地客户的网络连接、创建ACE_Svc_Handler的模板。ACE_Svc_Handler是与相连的客户交换数据的模板。这些组件在7.4.2.1中讨论。

 

应用特有组件(Application-specific component):这些组件实现分布式日志服务的应用特有的部分。Logging_Acceptor类给ACE_Acceptor提供具体的参数化类型,后者创建专用于日志应用的连接处理实例。同样地,Logging_Handler类也通过具体类型来实例化,这种具体类型提供必需的应用特有的功能,以接收和处理来自远地客户的日志记录。这些组件在7.4.2.2中讨论。

 

使用这样的高度去耦合的OO分解极大地增强了服务器日志看守的开发和可扩展性。这些组件的每一个被描述如下。

 

7.4.2.1 连接相关组件

 

下面的类用于实现Acceptor模式[20]。该模式用于使(1被动连接建立与(2)一旦服务的两端连接和初始化后、服务所进行的处理去耦合。

 

ACE_Acceptor类:该类为一族类提供一种通用模板,使从客户接受网络连接请求所必需的步骤标准化和自动化。图7-10演示了ACE_Acceptor类的接口。

 

// A template class that handles connection

// requests from a remote client.

 

template <class SVC_HANDLER,

class PEER_ACCEPTOR>

class ACE_Acceptor : public ACE_Event_Handler

{

public:

ACE_Acceptor (ACE_Reactor *r, const PEER_ACCEPTOR::PEER_ADDR &a);

ACE_Acceptor (void);

protected:

virtual ACE_HANDLE get_handle (void) const;

virtual int handle_input (ACE_HANDLE);

virtual int handle_close

(ACE_HANDLE = ACE_INVALID_HANDLE,

ACE_Reactor_Mask = ACE_Event_Handler::READ_MASK);

 

private:

// Accept connections.

PEER_ACCEPTOR acceptor_;

};

7-10 Acceptor类接口

 

ACE_Acceptor模板类继承自ACE_Event_Handler。这一派生使得ACE_Acceptor可以与反应堆构架无缝地交互。此外,该模板类通过具体的SVC_HANDLER(它知道怎样执行与客户的I/O)和PEER_ACCEPTOR类来参数化(它知道怎样接受客户连接)。

ACE_Acceptor实例化的类有能力完成以下工作:

 

  1. 接受远地客户发送的连接请求。
  2. 动态分配SVC_HANDLER子类的对象。
  3. 将此对象登记到ACE_Reactor的一个实例。随后,SVC_HANDLER类必须知道怎样处理与客户交换的数据。

 

ACE_Acceptor类的实现如图7-11所示。当一或多个连接请求到达时,handle_input方法被Reactor自动分派。该方法的行为如下:首先,它动态创建新的SVC_HANDLER对象,负责处理发送数据和接收来自新客户的数据。其次,它将一个到达的连接接受进SVC_HANDLER。最后,它调用新SVC_HANDLERopen挂钩。如下所示,该挂钩可以将新创建的SVC_HANDLER登记到ACE_Reactor,或是生成一个独立的线程控制,等等。

 

// Shorthand names

#define SH SVC_HANDLER

#define PA PEER_ACCEPTOR

 

template <class SH, class PA>

ACE_Acceptor<SH, PA>::ACE_Acceptor

(ACE_Reactor *reactor,

const PA::PEER_ADDR &addr)

: acceptor_ (addr)

{

// Register to accept connections.

reactor->register_handler

(this,

ACE_Event_Handler::ACCEPT_MASK);

}

 

template <class SH, class PA> ACE_HANDLE

ACE_Acceptor<SH, PA>::get_handle (void) const

{

// Return the underlying I/O handle

// when called by Reactor during

// registration.

return this->acceptor_.get_handle ();

}

 

template <class SH, class PA> int

ACE_Acceptor<SH, PA>::handle_close

(ACE_HANDLE, ACE_Reactor_Mask)

{

// Close down the Acceptor and

// release the handle resources.

return this->acceptor_.close ();

}

 

template <class SH, class PA>

ACE_Acceptor<SH, PA>::?ACE_Acceptor (void)

{

this->handle_close ();

}

 

// Template Method that accepts connections

// from client hosts, creates and activates

// a service handler.

 

template <class SH, class PA> int

ACE_Acceptor<SH, PA>::handle_input

(ACE_HANDLE)

{

// Create a new Svc_Handler.

SH *svc_handler = new SH;

 

// Accept connection into the handler.

this->acceptor_.accept (svc_handler->peer ());

 

// Activate the handler.

svc_handler->open (0);

}

7-11 Aceptor类实现

 

ACE_Svc_Handler类:该参数化类型为处理与客户交换的数据提供一种通用模板。例如,在分布式日志服务中,I/O格式牵涉到日志记录。但是,也可以很容易地为其他应用换用不同的格式。ACE_Svc_Handler类的接口在图7-12中描述。和ACE_Acceptor类一样,该类继承ACE_Event_Handler基类的功能。这使得从ACE_Svc_Handler实例化的具体事件处理器可被动态创建并登记到ACE_ReactorACE_Acceptor类中的handle_input方法自动完成这一行为。

 

// Receive client message from the remote clients.

template <class PEER_STREAM>

class ACE_Svc_Handler : public ACE_Event_Handler

{

public:

ACE_Svc_Handler (void);

 

// Must be filled in by subclass

virtual int open (void *) = 0;

PEER_STREAM &peer (void);

 

// Demultiplexing hooks.

virtual ACE_HANDLE get_handle (void) const;

 

protected:

// Connection open to the client.

PEER_STREAM peer_stream_;

};

7-12 Svc_Handler类接口

 

7-13演示ACE_Svc_Handler类实现。注意继承、动态绑定和参数化类型的结合是怎样使构架的通用部分(例如,连接建立)与应用特有的功能(例如,接收日志记录)去耦合的。

 

#define PS PEER_STREAM

 

// Extract the underlying PS (e.g., for

// use by accept()).

 

template <class PS> PS &

ACE_Svc_Handler<PS>::peer (void)

{

return this->peer_stream_;

}

 

template <class PS> ACE_HANDLE

ACE_Svc_Handler<PS>::get_handle (void) const

{

// Return the underlying I/O handle

// when called by Reactor during

// registration.

return this->peer_stream_.get_handle ();

}

7-13 Svc_Handler类实现

 

ACE_Reactor被指示从它的内部表中移除ACE_Svc_Handler时,它自动调用具体事件处理器的handle_close方法。缺省地,该方法释放处理器的内存;该内存原来是由ACE_Acceptor类中的handle_input方法分配的。具体事件处理器通常在客户进程关闭或发生严重传输错误时被移除。

 

7.4.2.2应用特有的服务

 

下面的类实现服务的应用特有部分,在此例中也就是日志服务器看守。

 

Logging_Acceptor:为实现分布式日志应用的服务器看守部分,Logging_Acceptor类从通用的ACE_Acceptor模板实例化,如下所示:

 

typedef ACE_Acceptor <Logging_Handler, ACE_SOCK_Acceptor>Logging_Acceptor;

 

SVC_HANDLER模板参数通过下面描述的Logging_Handler类实例化。同样地,PEER_ACCEPTOR模板参数被ACE_SOCK_Acceptor类替换。ACE_SOCK_* 实例化类型是称为SOCK_SAPC++包装的一部分[16]SOCK_SAP封装socket接口,以在客户和服务器上的进程间可靠地传输数据。

通过使用参数化类型,用于IPC的类可以是任何遵从参数化类中所用API的网络编程接口。例如,取决于底层OS平台的特定属性(比如是UNIXBSD还是系统V变种),日志应用可以使用SOCK_SAPTLI_SAP(后者是系统V传输层接口(TLI)的ACE C++包装)来实例化ACE_Svc_Handler类。这种技术在下面演示:

 

// Logging application.

 

#if defined (USE_SOCKETS)

typedef ACE_SOCK_Stream PEER_STREAM;

#elif defined (USE_TLI)

typedef ACE_TLI_Stream PEER_STREAM;

#endif /* USE_SOCKETS */

 

class Logging_Handler

: public ACE_Svc_Handler<PEER_STREAM>

{

// ...

};

 

在开发必须跨越多种OS平台运行的应用时,基于模板的可扩展性所提供的灵活性十分有用。事实上,通过传输接口来参数化应用的能力对于跨越OS平台的变种也很有用。例如,Solaris的某些版本不提供线程安全的socket实现。

 

Logging_Handler类:该类通过实例化ACE_Svc_Handler类创建:

 

class Logging_Handler :

public ACE_Svc_Handler<ACE_SOCK_Stream>

{

public:

// Initialization hook called by

// the <ACE_Acceptor>.

virtual int open (void *)

{

ACE_SOCK_Stream::PEER_ADDR addr;

 

// Cache remove host name.

peer ().get_remote_addr (addr);

ACE_OS: