千万级邮件系统的设计及数据库中间件部分实现


(作者:艺龙公司技术研发经理 高鹏)

  近日,艺龙公司推出了“艺龙五星级企业邮箱系统”和“艺龙个人5秒邮”的收费邮件系统,这两套系统都采用的是艺龙公司与全世界最大的邮件运营商mail.com合作设计和开发的千万级邮件系统:“ComConnect”。笔者是该系统的主要设计人员,本文将介绍这套系统的技术特色,以及数据库中间件部分实现。

  艺龙公司的“ComConnect”系统可以运行在大部分的unix系统平台,包括solaris、linux等,采用模块化的设计,整个系统分为下面八大模块:

  1.数据库中间件模块+缓存模块

    这是整套系统的主线,贯穿于系统的其他所有模块。

  在大规模邮件系统中,为了管理的方便和系统的分布式部署,都需要采用SQL数据库保存一些信息,例如帐号信息等。例如,当用户通过POP3登陆时,服务器端会启动一个程序,与SQL数据库连接,并查询该用户身份的合法性,当用户退出时,该程序会断开与SQL数据库的连接。这样,大量用户的登陆、退出,都会对应于SQL数据库的连接、断开操作,降低了系统的效率,也增加了SQL数据库的负载。


  在ComConnect系统中,所有数据库的操作,都是通过“数据库中间件”模块完成的。该模块会保持与SQL数据库的连接,永不断开。当有程序需要查询数据库时,会通过unix domain socket与本地的数据库中间件服务器相连接,中间件会取得查询需求,并进行实际的SQL查询,并将结果通过unix domain socket返回给应用程序。同时,对于所得的查询,中间件服务器会进行本地缓存,保存到内存数据库BerkeleyDB3(http://www.sleepycat.com)中,这样下次同样的查询请求,就不必SQL查询了,因为结果已经被保存在本地缓存了。这种设计,一方面,避免了频繁的SQL数据库的连接、断开的操作,另一方面,由于本地缓存的存在,也大大降低了SQL数据库的负载,提高了系统的响应效率。

   2.收信模块


  该模块对应于DNS中的MX记录。

  传统的收信模块运行方式是,在得到一封邮件后,保存到本地,或通过SMTP协议进行转发。而在ComConnect系统中,采用LMTP(local mail transport protocol)协议连接“收信模块”和“邮件存储模块”,在这两个模块之间,永久保持若干个通道,当收信模块得到新的邮件时,就会利用这些已经存在的通道将邮件内容传输给后台的邮件存储模块。这种设计,大大减少了SMTP服务器频繁的fork、exit过程,从而提高了效率。

   3.发信模块

   该模块是整个系统中最独立的模块。为了提高安全性,ComConnect系统通过cyrus-sasl实现了发信服务器的用户身份认证功能。认证部分通过“认证模块”完成。

   4.中央数据库模块


  与其他大规模邮件系统一样,ComConnect系统也需要中央数据库(支持SQL标准)模块。但与其他系统不同的是,ComConnect系统的中央数据库保存的内容非常少,仅仅保存用户帐号信息。而且,由于“数据库中间件”和“缓存”模块的存在,在理想状态下,中央数据库根本不需要存在。

  这种设计,极大地提高了系统性能,更提高了系统的可扩展性。与此相对应,其他的邮件系统将尽可能多的信息都保存在中央数据库中,随着用户数目的增大,效率会越来越低,最终成为系统的瓶颈。

   5.HTTP session模块

  由于HTTP协议的无状态性质,所以当用户登陆后需要保存用户的session(会话)信息。传统的session模块都是通过数据库完成的,这样,如果用户量非常大,会对数据库造成很大的负载,最终形成整套系统的瓶颈。ComConnect系统对session的处理,采用了自行开发的专门的session server处理,系统拿出一台单独的服务器充当session server,内部定义了一套session协议来维护每个session的状态,并在一定时间客户端没有访问时自动删除session记录以实现session垃圾回收机制。该模块的数据库采用了高效的内存数据库:BerkeleyDB3。在这种设计下,web服务器作为session模块的客户端,session server作为session模块的服务器端。当用户登陆时,Web服务器会通过session协议访问后台的session server,以记录该次session的信息。

  与传统的session管理机制相比,这种方式减少了中央数据库的负载,又由于内存数据库的高效性,以及session协议的简单性,大大提高系统的响应速度。

   6.认证模块

  系统需要认证的部分有:Web登陆、发信时的用户身份认证、POP3登陆。这些认证都通过本地的认证模块实现,该模块工作机制与“数据库中间件”类似,通过unix domain socket进行进程间通讯,并与数据库持续连接,以及维护本地的缓存。

   7.Web模块

  这是用户使用的Web界面部分。该模块通过IMAP协议,与后台的“邮件存储模块”进行通讯。其中的session,是通过“HTTP session模块”完成的。


   8.邮件存储模块

  该部分是整个系统最为核心的模块,所有用户的邮件最终都保存在该模块上。每个用户的邮箱就是一个目录,每封邮件就是一个文件。


  有些大规模邮件系统将邮件的信头(header)保存到数据库中,而邮件的正文(body)保存到文件中。这种设计,可以提高用户在访问邮箱时的速度,尤其是邮箱中有很多封邮件的时候,另外,在实际实现的时候也有代码简单的优点。但它的缺点也是突出的,首先,“收信模块”要对进入的每封邮件进行处理,以提取出信头和正文,这会降低收信的效率;另一方面,由于每封邮件都会引发对数据库的INSERT操作,因此会加大数据库的负担;还有,随着系统接收邮件数目的增大,数据库中的记录数也会相应地增大,最终可能出现瓶颈。

  与上面的解决办法不同,ComConnect系统在文件系统中建立了索引机制,而不采用数据库的索引。每个用户的邮箱(对应一个目录)下,都有该用户所有邮件的索引。这样,当用户操作邮箱时,速度可以非常快;另外,对每封进入的邮件,“收信模块”不需要做任何处理,可以直接通过LMTP通道传输给“邮件存储模块”,而“邮件存储模块”直接将内容写入对应用户的邮箱目录下。


  这八大模块构成了ComConnect整个系统,其中每个模块都具有很好的扩展机制,可以通过增加计算机数目来提高性能。更详细的信息,请访问: http://bms.elong.com/ads/


下面是数据库中间件的部分代码,读者可以从中了解其运行机制:


dbServer dbserver = db_unknown;

int midInited = 0;


/*

*--------------------------------------------------------------

*

* mid_init

*

*--------------------------------------------------------------

*/

void

mid_init(const char *conf)

{

  const char fname[] = "mid_init";

  const char *server ;


  if ( !conf || !*conf ) {

    myconfig_read(MIDWARE_CONFIG_FIENAME);

  } else {

    myconfig_read(conf);

  }

  server = myconfig_getstring("backend_server",NULL);

  if ( !server || !*server ) {

    syslog(LOG_ERR,"[%s]invalid 'backend_server' in conf file",fname);

exit(1); /* yes, we exit directly */

  }


  /* judge what backend server is */

  if ( !strcasecmp(server,BACKEND_MYSQL) ) {

    syslog(LOG_DEBUG3,"[%s]backend server is %s",fname,BACKEND_MYSQL);

    dbserver = db_mysql;

  } else if ( !strcasecmp(server,BACKEND_ORACLE) ) {

    syslog(LOG_DEBUG3,"[%s]backend server is %s",fname,BACKEND_ORACLE);

    dbserver = db_oracle;

  } else if ( !strcasecmp(server,BACKEND_BERKELEYDB) ) {

    syslog(LOG_DEBUG3,"[%s]backend server is %s",fname,BACKEND_BERKELEYDB);

    dbserver = db_berkeleydb;

  } else {

    dbserver = db_unknown;

    syslog(LOG_CRIT,"[%s]unknown backend server: %s",fname,server);

exit(1);

  }


  midInited = 1;


} /* mid_init */


/*

*------------------------------------------------------------------------

*

* sendCmd

*

* -- midware protocol implementation on midwared client side

*

* ask midware server something via unix domain socket

*

* RET:

* CMD_ERR_SYS: system error

* CMD_INVALID: null cmd sent to midwared server

* CMD_OK : ok

*

*------------------------------------------------------------------------

*/

static cmdResult

sendCmd(char *cmd,char *sqlcmd,const char *param2,const char *param3,

      const char *param4,char **reply)

{

  const char fname[]="sendCmd";

  int s; int r,iovcount=0;

  struct sockaddr_un srvaddr;

  struct iovec iov[6];

  static char response[MAX_REP_LEN];

  char sockfile[1024];

  int start, n;

  if (reply) *reply = NULL;

  if ( !cmd || !*cmd ) {

    syslog(LOG_ERR,"[%s]null cmd specified",fname);

    return CMD_INVALID;

  }

  /* create socket and connect to midware server */

  s = socket(AF_UNIX, SOCK_STREAM, 0);

  if (s == -1) {

    syslog(LOG_CRIT,"[%s]create socket failed: %m",fname);

return CMD_ERR_SYS;

  }

  memset(sockfile,0,sizeof(sockfile));

  strncpy(sockfile, myconfig_getstring("unixsock_dir",DEFAULT_MIDWARED_DIR) ,

    sizeof(sockfile));

  strcat(sockfile,"/");

  strcat(sockfile,SOCKET_FILENAME);

  syslog(LOG_DEBUG3,"[%s]socket filename: %s",fname,sockfile);

  memset((char *)&srvaddr, 0, sizeof(srvaddr));

  srvaddr.sun_family = AF_UNIX;

  strncpy(srvaddr.sun_path, sockfile, sizeof(srvaddr.sun_path));

  r = connect(s, (struct sockaddr *)&srvaddr, sizeof(srvaddr));

  if (r == -1) {

    if (reply) *reply="Cannot connect to midwared server";

    syslog(LOG_ERR,"[%s]:connect: %m",fname);

    return CMD_ERR_SYS;

  }

  /*

   * connected with the unix-domain socket ,next

   * prepare the parameters for midwared server

   */

  iov[iovcount].iov_base = (char *)cmd;

  iov[iovcount].iov_len = strlen(cmd)+1;

  /* check sqlcmd */

  if ( sqlcmd && *sqlcmd ){

iovcount++;

    iov[iovcount].iov_base = (char *)sqlcmd;

    iov[iovcount].iov_len = strlen(sqlcmd)+1;

  } else {

    goto startsend;

  }

  /* check parameter2 */

  if ( param2 && *param2 ){

iovcount++;

    iov[iovcount].iov_base = (char *)param2;

    iov[iovcount].iov_len = strlen(param2)+1;

  } else {

    goto startsend;

  }

  /* check parameter3 */

  if ( param3 && *param3 ){

iovcount++;

    iov[iovcount].iov_base = (char *)param3;

    iov[iovcount].iov_len = strlen(param3)+1;

  } else {

    goto startsend;

  }

  /* check parameter4 */

  if ( param4 && *param4 ){

iovcount++;

    iov[iovcount].iov_base = (char *)param4;

    iov[iovcount].iov_len = strlen(param4)+1;

  }


startsend:

  retry_writev(s, iov, iovcount+1);

  /* get reply from midwared server */

  start = 0;

  while (start     n = read(s, response+start, sizeof(response) - 1 - start);

    if (n <1) break;

    start += n;

  } /* while */

  close(s);

  /* marshell the reply for client call */

  if ( start <) =1 {

    /* failurely got the reply */

    if ( reply ) {

*reply=response;

}

    return CMD_ERR_SYS;

  } else {

    /* successfully got the reply */

    response[start] = '\0';

    if (reply) {

*reply=response;

}

    return CMD_OK;

  }


} /* sendCmd */


/*

*--------------------------------------------------------------

*

* mid_query

* - exported for end user directly

*

* ARG -

* key: used for cache

*

* PRECON:

* argument 'result' must be initialized before this call

*

*--------------------------------------------------------------

*/

midQueryResult

mid_query(char *sqlcmd,const char *key,int useCache,char result[])

{

  extern char *cache_mapstr(int);

  const char fname[]="mid_query";

  char *reply;

  int r = 0;


  if ( midInited == 0 ) {

    mid_init(NULL);

  }

                

  switch ( dbserver ) {

    case db_mysql:

      r = sendCmd(REQ_MYSQL_QUERY,sqlcmd,key,cache_mapstr(useCache),

        NULL,(char **)&reply);

      break;

    case db_oracle:

      r = sendCmd(REQ_ORACLEL_QUERY,sqlcmd,key,cache_mapstr(useCache),

        NULL,(char **)&reply);

      break;

    case db_berkeleydb:

      break;

    default:

      syslog(LOG_CRIT,"[%s]unknown backend server",fname);

      break;

  } /* switch */


  /* let midware client know what happened */

  if ( result && *result ) strcpy(result,reply);

  if ( r != CMD_OK ) {

    syslog(LOG_DEBUG3,"[%s]no reply,system error",fname);

return QUERY_SYS_ERR;

  }


  if ( !strcmp(reply,REP_DB_QUERY_FAIL) ) {

    return QUERY_FAIL;

  } else if ( !strcmp(reply,REP_DB_QUERY_NOTFOUND) ) {

    return QUERY_NOTFOUND;

  } else if ( !strcmp(reply,REP_DB_QUERY_MANYRECORD) ) {

    return QUERY_FOUNDMANY;

  } else {

    return QUERY_OK;

  }


} /* mid_query */


/*

*--------------------------------------------------------------

*

* mid_insert

*

*--------------------------------------------------------------

*/

midInsertResult

mid_insert(char *sqlcmd)

{

  const char fname[]="mid_insert";

  char *reply;

  int r = 0;

                

  if ( midInited == 0 ) {

    mid_init(NULL);

  }

                

  switch ( dbserver ) {

    case db_mysql:

      r = sendCmd(REQ_MYSQL_INSERT,sqlcmd,NULL,NULL,NULL,(char **)&reply);

      break;

    case db_oracle:

      r = sendCmd(REQ_ORACLE_INSERT,sqlcmd,NULL,NULL,NULL,

(char **)&reply);

      break;

    case db_berkeleydb:

      break;

    default:

      syslog(LOG_CRIT,"[%s]unknown backend server",fname);

      break;

  } /* switch */

  if ( r != CMD_OK ) {

    syslog(LOG_DEBUG3,"[%s]no reply,system error",fname);

return INSERT_SYS_ERR;

  }


  if ( !strcmp(reply,REP_DB_INSERT_FAIL) ) {

    return INSERT_FAIL;

  } else {

    return INSERT_OK;

  }


} /* mid_insert */