千万级邮件系统的设计及数据库中间件部分实现(作者:艺龙公司技术研发经理 高鹏) 近日,艺龙公司推出了“艺龙五星级企业邮箱系统”和“艺龙个人5秒邮”的收费邮件系统,这两套系统都采用的是艺龙公司与全世界最大的邮件运营商mail.com合作设计和开发的千万级邮件系统:“ComConnect”。笔者是该系统的主要设计人员,本文将介绍这套系统的技术特色,以及数据库中间件部分实现。 艺龙公司的“ComConnect”系统可以运行在大部分的unix系统平台,包括solaris、linux等,采用模块化的设计,整个系统分为下面八大模块: 1.数据库中间件模块+缓存模块 这是整套系统的主线,贯穿于系统的其他所有模块。 在大规模邮件系统中,为了管理的方便和系统的分布式部署,都需要采用SQL数据库保存一些信息,例如帐号信息等。例如,当用户通过POP3登陆时,服务器端会启动一个程序,与SQL数据库连接,并查询该用户身份的合法性,当用户退出时,该程序会断开与SQL数据库的连接。这样,大量用户的登陆、退出,都会对应于SQL数据库的连接、断开操作,降低了系统的效率,也增加了SQL数据库的负载。
2.收信模块
传统的收信模块运行方式是,在得到一封邮件后,保存到本地,或通过SMTP协议进行转发。而在ComConnect系统中,采用LMTP(local mail transport protocol)协议连接“收信模块”和“邮件存储模块”,在这两个模块之间,永久保持若干个通道,当收信模块得到新的邮件时,就会利用这些已经存在的通道将邮件内容传输给后台的邮件存储模块。这种设计,大大减少了SMTP服务器频繁的fork、exit过程,从而提高了效率。 3.发信模块 该模块是整个系统中最独立的模块。为了提高安全性,ComConnect系统通过cyrus-sasl实现了发信服务器的用户身份认证功能。认证部分通过“认证模块”完成。 4.中央数据库模块
这种设计,极大地提高了系统性能,更提高了系统的可扩展性。与此相对应,其他的邮件系统将尽可能多的信息都保存在中央数据库中,随着用户数目的增大,效率会越来越低,最终成为系统的瓶颈。
由于HTTP协议的无状态性质,所以当用户登陆后需要保存用户的session(会话)信息。传统的session模块都是通过数据库完成的,这样,如果用户量非常大,会对数据库造成很大的负载,最终形成整套系统的瓶颈。ComConnect系统对session的处理,采用了自行开发的专门的session server处理,系统拿出一台单独的服务器充当session server,内部定义了一套session协议来维护每个session的状态,并在一定时间客户端没有访问时自动删除session记录以实现session垃圾回收机制。该模块的数据库采用了高效的内存数据库:BerkeleyDB3。在这种设计下,web服务器作为session模块的客户端,session server作为session模块的服务器端。当用户登陆时,Web服务器会通过session协议访问后台的session server,以记录该次session的信息。 与传统的session管理机制相比,这种方式减少了中央数据库的负载,又由于内存数据库的高效性,以及session协议的简单性,大大提高系统的响应速度。 6.认证模块 系统需要认证的部分有:Web登陆、发信时的用户身份认证、POP3登陆。这些认证都通过本地的认证模块实现,该模块工作机制与“数据库中间件”类似,通过unix domain socket进行进程间通讯,并与数据库持续连接,以及维护本地的缓存。 7.Web模块 这是用户使用的Web界面部分。该模块通过IMAP协议,与后台的“邮件存储模块”进行通讯。其中的session,是通过“HTTP session模块”完成的。
该部分是整个系统最为核心的模块,所有用户的邮件最终都保存在该模块上。每个用户的邮箱就是一个目录,每封邮件就是一个文件。
与上面的解决办法不同,ComConnect系统在文件系统中建立了索引机制,而不采用数据库的索引。每个用户的邮箱(对应一个目录)下,都有该用户所有邮件的索引。这样,当用户操作邮箱时,速度可以非常快;另外,对每封进入的邮件,“收信模块”不需要做任何处理,可以直接通过LMTP通道传输给“邮件存储模块”,而“邮件存储模块”直接将内容写入对应用户的邮箱目录下。
int midInited = 0;
*-------------------------------------------------------------- * * mid_init * *-------------------------------------------------------------- */ void mid_init(const char *conf) { const char fname[] = "mid_init"; const char *server ;
myconfig_read(MIDWARE_CONFIG_FIENAME); } else { myconfig_read(conf); } server = myconfig_getstring("backend_server",NULL); if ( !server || !*server ) { syslog(LOG_ERR,"[%s]invalid 'backend_server' in conf file",fname); exit(1); /* yes, we exit directly */ }
if ( !strcasecmp(server,BACKEND_MYSQL) ) { syslog(LOG_DEBUG3,"[%s]backend server is %s",fname,BACKEND_MYSQL); dbserver = db_mysql; } else if ( !strcasecmp(server,BACKEND_ORACLE) ) { syslog(LOG_DEBUG3,"[%s]backend server is %s",fname,BACKEND_ORACLE); dbserver = db_oracle; } else if ( !strcasecmp(server,BACKEND_BERKELEYDB) ) { syslog(LOG_DEBUG3,"[%s]backend server is %s",fname,BACKEND_BERKELEYDB); dbserver = db_berkeleydb; } else { dbserver = db_unknown; syslog(LOG_CRIT,"[%s]unknown backend server: %s",fname,server); exit(1); }
*------------------------------------------------------------------------ * * sendCmd * * -- midware protocol implementation on midwared client side * * ask midware server something via unix domain socket * * RET: * CMD_ERR_SYS: system error * CMD_INVALID: null cmd sent to midwared server * CMD_OK : ok * *------------------------------------------------------------------------ */ static cmdResult sendCmd(char *cmd,char *sqlcmd,const char *param2,const char *param3, const char *param4,char **reply) { const char fname[]="sendCmd"; int s; int r,iovcount=0; struct sockaddr_un srvaddr; struct iovec iov[6]; static char response[MAX_REP_LEN]; char sockfile[1024]; int start, n;
if (reply) *reply = NULL; if ( !cmd || !*cmd ) { syslog(LOG_ERR,"[%s]null cmd specified",fname); return CMD_INVALID; }
/* create socket and connect to midware server */ s = socket(AF_UNIX, SOCK_STREAM, 0); if (s == -1) { syslog(LOG_CRIT,"[%s]create socket failed: %m",fname); return CMD_ERR_SYS; } memset(sockfile,0,sizeof(sockfile)); strncpy(sockfile, myconfig_getstring("unixsock_dir",DEFAULT_MIDWARED_DIR) , sizeof(sockfile)); strcat(sockfile,"/"); strcat(sockfile,SOCKET_FILENAME); syslog(LOG_DEBUG3,"[%s]socket filename: %s",fname,sockfile); memset((char *)&srvaddr, 0, sizeof(srvaddr)); srvaddr.sun_family = AF_UNIX; strncpy(srvaddr.sun_path, sockfile, sizeof(srvaddr.sun_path)); r = connect(s, (struct sockaddr *)&srvaddr, sizeof(srvaddr)); if (r == -1) { if (reply) *reply="Cannot connect to midwared server"; syslog(LOG_ERR,"[%s]:connect: %m",fname); return CMD_ERR_SYS; }
/* * connected with the unix-domain socket ,next * prepare the parameters for midwared server */ iov[iovcount].iov_base = (char *)cmd; iov[iovcount].iov_len = strlen(cmd)+1; /* check sqlcmd */ if ( sqlcmd && *sqlcmd ){ iovcount++; iov[iovcount].iov_base = (char *)sqlcmd; iov[iovcount].iov_len = strlen(sqlcmd)+1; } else { goto startsend; } /* check parameter2 */ if ( param2 && *param2 ){ iovcount++; iov[iovcount].iov_base = (char *)param2; iov[iovcount].iov_len = strlen(param2)+1; } else { goto startsend; } /* check parameter3 */ if ( param3 && *param3 ){ iovcount++; iov[iovcount].iov_base = (char *)param3; iov[iovcount].iov_len = strlen(param3)+1; } else { goto startsend; } /* check parameter4 */ if ( param4 && *param4 ){ iovcount++; iov[iovcount].iov_base = (char *)param4; iov[iovcount].iov_len = strlen(param4)+1; }
retry_writev(s, iov, iovcount+1);
/* get reply from midwared server */ start = 0; while (start n = read(s, response+start, sizeof(response) - 1 - start); if (n <1) break; start += n; } /* while */ close(s);
/* marshell the reply for client call */ if ( start <) =1 { /* failurely got the reply */ if ( reply ) { *reply=response; } return CMD_ERR_SYS; } else { /* successfully got the reply */ response[start] = '\0'; if (reply) { *reply=response; } return CMD_OK; }
*-------------------------------------------------------------- * * mid_query * - exported for end user directly * * ARG - * key: used for cache * * PRECON: * argument 'result' must be initialized before this call * *-------------------------------------------------------------- */ midQueryResult mid_query(char *sqlcmd,const char *key,int useCache,char result[]) { extern char *cache_mapstr(int); const char fname[]="mid_query"; char *reply; int r = 0;
mid_init(NULL); }
switch ( dbserver ) { case db_mysql: r = sendCmd(REQ_MYSQL_QUERY,sqlcmd,key,cache_mapstr(useCache), NULL,(char **)&reply); break;
case db_oracle: r = sendCmd(REQ_ORACLEL_QUERY,sqlcmd,key,cache_mapstr(useCache), NULL,(char **)&reply); break;
case db_berkeleydb: break;
default: syslog(LOG_CRIT,"[%s]unknown backend server",fname); break; } /* switch */
if ( result && *result ) strcpy(result,reply);
if ( r != CMD_OK ) { syslog(LOG_DEBUG3,"[%s]no reply,system error",fname); return QUERY_SYS_ERR; }
return QUERY_FAIL; } else if ( !strcmp(reply,REP_DB_QUERY_NOTFOUND) ) { return QUERY_NOTFOUND; } else if ( !strcmp(reply,REP_DB_QUERY_MANYRECORD) ) { return QUERY_FOUNDMANY; } else { return QUERY_OK; }
*-------------------------------------------------------------- * * mid_insert * *-------------------------------------------------------------- */ midInsertResult mid_insert(char *sqlcmd) { const char fname[]="mid_insert"; char *reply; int r = 0;
if ( midInited == 0 ) { mid_init(NULL); }
switch ( dbserver ) { case db_mysql: r = sendCmd(REQ_MYSQL_INSERT,sqlcmd,NULL,NULL,NULL,(char **)&reply); break;
case db_oracle: r = sendCmd(REQ_ORACLE_INSERT,sqlcmd,NULL,NULL,NULL, (char **)&reply); break;
case db_berkeleydb: break;
default: syslog(LOG_CRIT,"[%s]unknown backend server",fname); break; } /* switch */
if ( r != CMD_OK ) { syslog(LOG_DEBUG3,"[%s]no reply,system error",fname); return INSERT_SYS_ERR; }
return INSERT_FAIL; } else { return INSERT_OK; }
|