在Vovida的基础上实现自己的SIP协议栈(二) 卢政 2003/08/04 2. 5 HeartLessProxy Run方法的实现HeartLessProxy::run() { myWorkerThread->run(); mySipThread->run(); } 通过上面可以看到有两个Run方法的调用,第一个是WorkThread的Run方法,它的主要作用是处理UaBuilder的Process方法,主要用来处理Sptr < Fifo < Sptr < SipProxyEvent > > > myFifo中的各种事件,前面已经详细的介绍了SipProxyEvent类的作用,这个类已经在前面介绍了,其实简单的说,它就是一个本地的各种事件的集合。 现在我们来看一下两个Run方法的实现: 2.5.1 WorkerThread的Run方法: UaBuilder::process( const Sptr < SipProxyEvent > nextEvent ) { //处理以下的四种事件 /// SipEvent Sptr < SipEvent > sipEvent; sipEvent.dynamicCast( nextEvent ); if ( sipEvent != 0 ) { //处理本地的SIP的事件,包括对状态机的设置和命令/状态队列返回的操作在下面将//对它做详细的介绍 if( processSipEvent( sipEvent ) ) { return; } //向消息队列myCallContainer中插入相应的事件信息。 sendEvent( nextEvent ); return; } /// UaDeviceEvent Sptr < UaDeviceEvent > uaDeviceEvent; uaDeviceEvent.dynamicCast( nextEvent ); if ( uaDeviceEvent != 0 ) { //处理本地的设备事件,最主要的就是处理摘机信号; if( processUaDeviceEvent( uaDeviceEvent ) ) { return; } sendEvent( nextEvent ); return; } /// UaDigitEvent Sptr < UaDigitTimerEvent > uaDigitEvent; uaDigitEvent.dynamicCast( nextEvent ); if ( uaDigitEvent != 0 ) { //处理在规定的时间间隔(Kickstart)主动呼叫事件的触发。 if( processUaDigitEvent( uaDigitEvent ) ) { return; } sendEvent( nextEvent ); return; } /// UaTimerEvent Sptr < UaTimerEvent > uaTimerEvent; uaTimerEvent.dynamicCast( nextEvent ); if ( uaTimerEvent != 0 ) { //在各种SIP命令的回应产生了超时事件后,系统的事件触发。例如: //在StateTrying()中addEntryOperator( new OpStartTimer )在myEntryOperators队列中加入 //该Operator(指一个操作,例如呼叫或者是进入等待),这里我们这个Operator在时间到达 //以后户会被OpTimeout::process的方法检测到(isTimeout(event)进行检测,对StateTrying //整个状态进行检测,也就是Trying事件),最后如果UaTimerEvent事件被触发,那么,//就会调用:stateMachine->findState( "StateError" )这个状态,进入错误状态,实施错误的 //处理机制,同时向myEntryOperators队列中加入一个新的Operator--OpStartErrorTone, //从而被processUaTimerEvent过程扑捉到,最后通过SendEvent发送到执行队列里去。 if( processUaTimerEvent( uaTimerEvent ) ) { return; } sendEvent( nextEvent ); return; } assert( 0 ); } 2.5.1.1 processSipEvent 顾名思义,processSipEvent方法是对队列中的SIP消息进行处理,我们来看下面的程序: bool UaBuilder::processSipEvent( const Sptr < SipEvent > sipEvent ) { Sptr < StatusMsg > statusMsg; statusMsg.dynamicCast( sipEvent->getSipMsg() ); // 检验是否为返回的状态码(主要是对Notify,Subscribe,Register三种状态进行单独处理) //下面做详细介绍 if ( statusMsg != 0 ) { if( handleStatusMsg( sipEvent ) ) { return true; } } //在这里表示接收到一个SIP的消息, //检验是否为一个SIP的消息而不是一个状态(例如是否为Invite命令) /// Let's check the call info, now callId = sipEvent->getSipCallLeg()->getCallId(); callInfo = calls->findCall( callId ); if ( callInfo == 0 ) { //下面分成两种状况进行讨论,一种是接受到Invite的消息,一种是接收到一个普通的 //命令,例如 Sptr < InviteMsg > inviteMsg; inviteMsg.dynamicCast( sipEvent->getSipMsg() ); if ( inviteMsg == 0 ) { //如果大家在这里有什么奇怪的话没有必要,为什么除了inviteMsg以外的所有的消 //息都不处理呢?其实这些消息都在SipThread这个程序中处理了,在Ua这个大状态 //机中所有的状态都是以Invite这个消息作为启动的。每一个INVITE启动一个系列的//消息和状态。 return true; } else { //收到一个Invite消息,这个时候我们就要进入相应的处理机制中了; callInfo = calls->newCall ( sipEvent->getSipCallLeg()->getCallId() ); assert( callInfo != 0 ); callInfo->setFeature( stateMachine ); //如果进入的状态是自动呼叫(Auto Call)或者是自动应答(Auto Answer)状态(这 //两种状态的确定要在CFG文件中体现) if ( UaConfiguration::instance()->getLoadGenOn() ) { /// Assume this is a new call... /// Also assume that we are not in use. callInfo->setState( stateMachine->findState( "StateAutoIdle" ) ); //StateAutoIdle这个状态是一个自动应答和自动呼叫(按照呼叫列表)时候的状态,这里我 //们不做介绍,它本身和手动呼叫是非常相似的。 } else // LoadGen is off { //下面这个程序会进入等待远端SIP事件和本地呼叫事件的状态StateIdle if( handleCallWaiting( callInfo ) ) { cpLog( LOG_ERR, "Returned from handleCallWaiting\n" ); return true; } } } // lots of brackets! } return false; } /// UaBuilder::processSipEvent handleStatusMsg在做什么? 前面我们已经作了简单的介绍,这个函数的主要目的是在处理Rgister,Notify,和Subscribe等几个状态,并且分别调用他们的处理机; Rgister调用它的处理机: handleRegistrationResponse他的主要作用是处理返回的各种Rgister状态,例如200,4XX或者是100等状态,另外它还负责在作为Mashal Server的时候转发各种状态时候,重新设定Expire的值;另外要注意的是在Register中增加了一个新的返回--Trying这个是非常合理的,特别是大型网络中,对服务器端的性能判定很有效,所以使用协议栈的同志能好好利用这个机制;另外如果发挥的值是401/407状态(未授权),还需要调用authenticateMessage做相应的处理,以返回的(401/407)状态中所带的密钥加密新的Rgister消息,发送给Register服务器重新进行授权判定;有兴趣的可以看看BaseAuthentication中的addAuthorization函数。在介绍UaMarshal和Redirect Server的时候会着重讨论这个问题。 注明:Subscribe的处理机在Feature Server章节里面在再详细介绍)。 2.5.1.2 processUaDeviceEvent 前面说了,processUaDeviceEvent主要是用来处理本地的设备事件,最主要就是处理摘机信号,在这里程序的流程我就不详细的列出,不过我们从主要的程序主体部分可以看出: 在uaDeviceEvent->type == DeviceEventHookUp也就是检测了摘机以后,程序会采取某些必要的方式取得CallID(主要是通过CFG文件),最后让程序进入状态机的StateIdle状态,这个状态是接收和发送消息的初始状态,我们可以在后面将会重点介绍这个状态; 2.5.1.3 processUaDigitEvent 也是主要通过判定CFG文件中的LoadGen_On的参数是On或者是Off来决定是否进入StateAutoIdle状态,或者是StateAutoRS状态(自动通过Marshal Server进行中转所有的SIP的消息和状态,在Marshal Server的时候会做详细的介绍)。 2.5.1.4 processUaTimerEvent 这个的流程也实在没有什么好说的,前面也有了一定的介绍,如果大家对这些还有不明白的话,可以看一下SIP协议中Trying过程的走势,主要是对超时处理部分的介绍,就会明白(按照前面所说的UaBuilder::Process中关于SIP命令消息超时的介绍部分)。 2.5.2 SipThread的Run方法: Void SipThread::thread() { … … while ( true ) { try { //接收所发送的消息,并且准备置入相关的队列中; Sptr < SipMsgQueue > sipRcv( mySipStack->receive(1000) ); if ( sipRcv != 0 ) { Sptr < SipMsg > sipMsg = sipRcv->back(); if ( sipMsg != 0 ) { //根据本地的地址来检查是否发生了路由环路 if ( discardMessage(sipMsg) ) { continue; } // 在这里的myOutputFifo就是 myCallProcessingQueue(异地输入消息的队 //列),在Workthread构建的时候会把这个队列带入作为处理参量 Sptr < SipEvent > nextEvent = new SipEvent(myOutputFifo); if ( nextEvent != 0 ) { //以下就是把新收到的消息载入队列当中。 nextEvent->setSipReceive(sipRcv); nextEvent->setSipStack(mySipStack); if(myCallLegHistory) nextEvent->setCallLeg(); myOutputFifo->add(nextEvent); } } } else { … … } } catch ( VException& v) { … … } catch ( ... ) { … … } if ( isShutdown() == true ) { return; } } } 2.5.2.1 SIP消息的接收/发送缓冲技术 a. 负责接收的主要程序体: Sptr < SipMsgQueue > sipRcv( mySipStack->receive(1000) );这个方法就是利用SipTransceiver的receive方法接收SIP的消息; Sptr < SipMsgQueue > SipTransceiver::receive(int timeOut) { Sptr < SipMsgQueue > msgQPtr = 0; //以下是设立超时参数,如果发生超时,那么就让该命令无效; timeval start, now; if ( timeOut >= 0 ) { gettimeofday(&start, 0); } while (msgQPtr == 0) { int timePassed = 0; if ( timeOut >= 0 ) { gettimeofday(&now, 0); timePassed = ( now.tv_sec - start.tv_sec ) * 1000 + ( now.tv_usec - start.tv_usec ) / 1000; if (timePassed >= timeOut) { return 0; } } recvdMsgsFifo.block(timeOut); if ( !recvdMsgsFifo.messageAvailable() ) { continue; } SipMsgContainer *msgPtr = recvdMsgsFifo.getNext(); if ( msgPtr == 0) { assert(0); cpLog(LOG_CRIT, "received NULL"); continue; } #if 1 if ( natOn == true) { //这里是一个非常有意思的地方,虽然再程序主体中将它设定为False,也就是我们就 //不能采用NAT转换了,不过我还是想介绍一下,它主要是用在如果UA是一个标准 //的网关,或者是路由器设备的情况之下,在这个时候,它主要做各个消息包的转 //译工作,把路由(Via List)改成下一跳的IP地址和端口地址; SipVia natVia = msgPtr->msg.in->getVia(0); LocalScopeAllocator lo; string addr1 = natVia.getHost().getData(lo); string addr2 = msgPtr->msg.in->getReceivedIPName().getData(lo); NetworkAddress netaddr1(addr1); NetworkAddress netaddr2(addr2); if ( netaddr1.getHostName() != netaddr2.getHostName()) { natVia.setReceivedhost(msgPtr->msg.in->getReceivedIPName()); natVia.setReceivedport(msgPtr->msg.in->getReceivedIPPort()); //remove the first item from the via list msgPtr->msg.in->removeVia(0); //insert natvia in the vector via list msgPtr->msg.in->setVia(natVia, 0); } } #endif //---NAT /* *********************************************************************/ SipMsgQueue *msgQ = 0; Sptr if(msgPtr->msg.in->getType() == SIP_STATUS) //这两个是处理返回消息队列的函数,下面将重点介绍 msgQ = sentRequestDB.processRecv(msgPtr); else msgQ = sentResponseDB.processRecv(msgPtr); //更新SNMP命令队列,并向SNMP网管中心发送接收的消息队列; if(msgQ) { msgQPtr = msgQ; //need to have snmpDetails for this. if (sipAgent != 0) { updateSnmpData(sipPtr, INS); } } else if(msgPtr->msg.in != 0) { send(msgPtr); } else if(msgPtr->msg.out.length()) { send(msgPtr); } else … … } } b.描述接收/发送SIP消息队列的主要类: SipSentRequestDB:: processRecv和SipSentRequestDB::processSend是一对相互的方法, 另外还有SipSentResponseDB:: processRecv和SipSentResponseDB::processSend是用来记忆状态/消息的发送和接受的,在这里和Request的结构基本相同,就不做累述了;前者处理发送的SIP消息队列,后者处理接收的SIP消息队列,为了实现高效率的处理SIP的队列,在程序中大量采用了HASH表的方法,由于这个部分的程序非常的多,我不想一一把他们罗列出来,在这里就做一下简单的一个浏览: HASH队列的抽象:在这里有三个用于表示HASH表的类: SipTransLevel1Node,SipTransLevel2Node,SipTransLevel3Node; 第一个是表的入口,它的组成由:目的地址 NameAddress源地址 From以及CallID三个部分叠加而成; 第二个是表的索引,包括CSeq和Via 路由表 第三个就是具体的消息对了,也就是一个呼叫命令组的列表;详见下图: 我们下面一一个简单的例子来描述一下一个INVITE消息的处理过程: A. 接收到一个Invite Message/发送一个180状态的情况的情况: 1. 在UDP通道收到一个INVITE消息 2. 创建了一个InvMsg,同时发送到SipSentResponseDB中做备份,我们要检查在这里有没有重复的副本; 3.如果没有重复,那么InvMsg就放入RecvFifo中,准备让应用层进行处理; 4.应用层通过SipTransciever接收到了InvMsg并且做出相应的处理; 5.应用层产生了180回应到SipSentResponseDB中备份, 6.180在SndFifo中排队,并且调用SipTransceiver中的SendReply方法回送消息 B.从对方接收到一个100(Trying)状态的作为向对方发送Invite消息回应的情况: 1. 在UDP通道收到一个INVITE的状态; 2. 创建了一个StatusMsg,同时发送到SipSentResquestDB中做备份,我们要检查在这里有没有重复的副本; 3.如果没有重复,那么StatusMsg就放入RecvFifo中,准备让应用层进行处理; 4.应用层通过SipTransciever接收到了StatusMsg并且做出相应的处理; 5.应用层产生了ACK回应到SipSentResquestDB中备份, 6.180在SndFifo中排队,并且调用SipTransceiver中的SendAsync方法回送ACK消息, c.在存在一个呼叫重新定向的情况: *我们下面来看一个更加复杂一点的情况: 1>SipSendtRequestDB::processSend方法: 我们可以做一个很简单的举例,大家就对这两个方法有比较深入的了解了,可以以上面的Diagram1来做一个很好的例子比如,Marshal Server开始发送一个Invite的消息,由SipSendtRequestDB::processSend来进行处理,同时并且把这个消息装入SipMsgContainer中,然后消息被插入到SipTransactionList队列中: topNode->findOrInsert(id)->val->findOrInsert(id) 最后放在SipTransLevel1Node,SipTransLevel2Node,SipTransLevel3Node形成一个新的节点。 2>SipSentRequestDB:: processRecv方法: 例如我们接收了一个回应100 Trying这个回应的处理自然落在下面的这个部分: int statusCode = response->getStatusLine().getStatusCode(); if((statusCode < 200) || ((SipTransceiver::myAppContext == APP_CONTEXT_PROXY) && (statusCode == 200) && (response->getCSeq().getMethod() == INVITE_METHOD) ) ) … … retVal = new SipMsgQueue; retVal->push_back(msgContainer->msg.in) 单纯的把消息队列返回上面的应用层; 后续的180(Ringing)也是如此直接返回应用层; 但是到了接受到200(OK),那么处理的方式就大不一样了因为OK以后命令交互阶段已经告一段落,那么我们通过SipTransactionGC::instance()-> collect的后台方法处理(Thread线程),根据Delay的时间的变化:如invCleanupDelay等等,删除当前的一些队列中消息所占用的内存(垃圾处理),(具体处理机制可以参看SipTransactionGC::thread()这个后台处理掉一些孤独的消息,例如有Request没有Response的等等,并且根据各个消息所占用的Delay时间来释放他们); 但是如果没有收到200呢?假设我们收到了302(呼叫转移)呢?(例如在上面Diagram 1中所表现的那样) 答案在这里: else if(response->getStatusLine().getStatusCode() >= 200) { if(level3Node->val->msgs.response)//这里是检验在消息队列中是否有应答 //产生,也就是Diagram 1中的Second Phase的情况,(第二个Invite消息) { SipTransactionList curr = 0; if(level3Node->val->myKey == INVITE_METHOD) { curr = level2Node->val->level3.getLast(); while(curr) { // look for the ACK message if(curr->val->myKey == ACK_METHOD && curr->val->msgs.request) { cpLog(DEBUG_NEW_STACK,"duplicate message: %s", msgContainer->msg.out.logData()); //通过第一个ACK来复制第二个ACK,使用上二者完全相同, msgContainer->msg.in = 0; msgContainer->msg.out = curr->val->msgs.request->msg.out; msgContainer->msg.type = curr->val->msgs.request->msg.type; msgContainer->msg.transport = curr->val->msgs.request->msg.transport; msgContainer->msg.netAddr = curr->val->msgs.request->msg.netAddr; msgContainer->retransCount = FILTER_RETRANS_COUNT; break; } curr = level2Node->val->level3.getPrev(curr); } 很明显复制一个ACK消息准备进行下一个新的Invite的发送,当然这个是要在有ACK发送以后才可以进行,如果没有那么我们可以假定ACK正处在Processing状态; if(!curr) { msgContainer->msg.in = 0; msgContainer->msg.out = ""; msgContainer->retransCount = 0; } } else … … 在这个else下面所表示的处理机制是在第一个Message发送出去以后回应大于200的情况,也就是在Diagram 1中First Phase的情况,也就是发出第一个302的情况,在下面有一行语句: msgContainer->msg.out=msgContainer->msg.in->encode() 它的主要目的是用于形成ACK应答, 另外后面介绍Marshal Server的时候向异地发送Invite的时候返回4XX的回应,一般都是4XX等恶名招著的Response不会有其他的,本地一般采取的处理就是向应用层汇报,并且消除Hash队列里的所有驻留的消息。 大家可以根据上面介绍的方法实验一下其他的情况,基本上都是合适的. 目前来说这个处理机制并不使最优的,特别是在服务器的状态,某些情况事实上并没有 一个具体的处理方法:例如4XX的回应,可能会造成超时等待过长。 2.6 在User Agent中的四个重要实例的Run方法: HeartLessProxy的两个Run方法都介绍完毕了,现在我们来看下面将要启动的四个Run过程: 2.6.1 媒体设备启动 DeviceThread->run(); //调用SoundcardDevice::hardwareMain(0) 第一个是调用Sound Card的处理进程,它最主要的用处是返回各种按键的处理信息,他的具体的作用可以参看程序,和具体的操作手册,非常的简单易懂,不用详细介绍,不过要注意的一点是,在程序中,启动按键事件的检测是通过RTP/RTCP的事件触发的,(很明显,例如在通话的时候按下z表示挂机,必须是在有RTP/RTCP事件),说简单了,没有设备,键盘事件无法触发。 2.6.2 启动RTP线程,用于对RTP/RTCP包的接收和发送管理; rtpThread->run //调用SoundCardDevice::processRTP() 参看RtpThread实例化的过程可以看出,实际上就是调用SoundCardDevice的processRTP过程。 SoundCardDevice::processRTP () { … … if (audioStack == 0) { … … return; } bool bNothingDo = true; RtpSessionState sessionState = audioStack->getSessionState(); if ( sessionState == rtp_session_undefined ) { deviceMutex.unlock(); … … return; } if( sessionState == rtp_session_recvonly || sessionState == rtp_session_sendrecv ) { // audioStack就是RtpSession,在这里它是在构建这个声音设备的时候,就创建它了。 //这里表示从一个创建好的RTP会话中接收一帧数据, inRtpPkt = audioStack->receive(); if( inRtpPkt ) { //这里的声卡目前只能接受一种压缩方式PCM,所以只能解析这一种最常用的, if( inRtpPkt->getPayloadType() != rtpPayloadPCMU || //RTP的采样频率是否为要求的频率,例如为20ms inRtpPkt->getPayloadUsage() != NETWORK_RTP_RATE ) { cpLog(LOG_ERR,"Received from RTP stack incorrect payload type"); } //将数据输出到声卡, writeToSoundCard( (unsigned char*) inRtpPkt->getPayloadLoc(), inRtpPkt->getPayloadUsage() ); bNothingDo = false; … … } } // 这里是发送一帧数据; if( sessionState == rtp_session_sendonly || sessionState == rtp_session_sendrecv ) { int cc; if( audioStack->getRtcpTran() ) { //如果有发送零声的情况,例如零声回送被叫端,这里在OpRing里通过//sendRemoteRingback过程来实现向远端回送零声(sendRingback=True) if( sendRingback ) { cc = getRingbackTone( dataBuffer, RESID_RTP_RATE ); #ifdef WIN32 Sleep(15); #endif } else {//从声卡中读入一帧数据,按照cfg文件中规定的采样标准 cc = readFromSoundCard( dataBuffer, RESID_RTP_RATE ); } if ((cc > 0) && audioStack) {//将这帧数据(毛数据,未压缩的作成RTP包发送出去); audioStack->transmitRaw( (char*)dataBuffer, cc ); bNothingDo = false; } … … } } … … deviceMutex.unlock(); return; } 2.6.3 合法用户列表的获取(Redirection Server专用) 第三个过程是featureThread->run,,这个过程主要是用在向重定向服务器(Redirection Server)和Provisioning Server中的Feature线程,它实质上是调用 subscribe -Manager->subscribeMain,主体程序部分是向Provisioning Server发送Subscribe消息,在这个循环中会反复的发送SubScribe消息到Provision Server中去,稍后我们要介绍的UaBuilder::handleStatusMsg(UaBuilder::processSipEvent中)过程会将会处理从Provision Server 返回的Notify消息,关于Subscribe/Notify消息对的介绍我们可以参看在Vocal中的相关介绍,它的作用范围是在一个普通的UA向Marshal Server进行注册或者是证实的时候,Marshal Server同时向Redirection Server发出Register消息,并且由Redirection Server向Provisioning Server发送Subscribe消息,对用户列表进行检测;我们可以举一个例子来说明这个过程: 我们来看Diagram.7 1> 在A阶段当启动Redirection Server(RS)的时候,RS向Provisioning Server(PS)发送SubScribe消息,取得合法的用户列表; 2> 在B阶段,UA端向Marshal Server发送Register消息,以确认自己是否在合法用户列表内; 3> 在C阶段,RS将通过Subscribe/Notify命令对把该用户的呼叫特性列表(呼叫等待,呼叫转接,语音邮件,呼叫前转,禁止呼叫等信息)得到该用户的呼叫特性; 我们在Redirection Server这一章内将详细介绍Subscribe/Notify命令对。 2.6.4 监测线程: 一个调用的RUN方法loadGenThread->run是一个监测线程,检查各种回应和请求消息,并记录在LOG文件中。 2.6.5 自动呼叫 在loadGenThread->run后面的程序实现了一个自动在预定时间内发送INVITE消息的过程,大家有兴趣可以参看OpAutoCall类,当在UserAgent::Run()中通过检测Cfg文件,通过setLoadGenSignalType(LoadGenStartCall)设定了一个公共变量以后,我们可以发现系统将自动进入OpAutoCall操作,并且启动INVITE开始呼叫。 好了,通过上面的介绍后我们需要知道如何让系统进入Idle状态,在这个状态中系统处于一种"等待"的状态,接收本地的命令输入,和远端的消息;这个状态是所有后续状态的一个初始阶段,在上述程序中我们可以在processSipEvent过程中找到handleCallWaiting子程序,就在该过程中让系统进入Idle状态;见下面的程序: … … if ( UaConfiguration::instance()->getLoadGenOn() ) { callInfo->setState ( stateMachine->findState( "StateAutoIdle" ) ); } else // LoadGen is off { if( handleCallWaiting( callInfo ) ) { return true; } … … (未完待续) 在Vovida的基础上实现自己的SIP协议栈(三) 作者供稿 CTI论坛编辑 作者联系方法:[email protected] |