00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 #include <sys/types.h>
00011 #include <errno.h>
00012 #include <signal.h>
00013 #include <stdlib.h>
00014 #include <string.h>
00015
00016 #include <db.h>
00017
00018 #include "ex_repquote.h"
00019
00020
00021
00022
00023 int master_eid;
00024 char *myaddr;
00025 unsigned short myport;
00026
00027 static int env_init
00028 __P((const char *, const char *, DB_ENV **, machtab_t *, u_int32_t));
00029 static void usage __P((const char *));
00030
00031 int
00032 main(argc, argv)
00033 int argc;
00034 char *argv[];
00035 {
00036 extern char *optarg;
00037 DB_ENV *dbenv;
00038 DBT local;
00039 enum { MASTER, CLIENT, UNKNOWN } whoami;
00040 all_args aa;
00041 connect_args ca;
00042 machtab_t *machtab;
00043 thread_t all_thr, conn_thr;
00044 void *astatus, *cstatus;
00045 #ifdef _WIN32
00046 WSADATA wsaData;
00047 #else
00048 struct sigaction sigact;
00049 #endif
00050 repsite_t site, *sitep, self, *selfp;
00051 int maxsites, nsites, ret, priority, totalsites, verbose;
00052 char *c, ch;
00053 const char *home, *progname;
00054
00055 master_eid = DB_EID_INVALID;
00056
00057 dbenv = NULL;
00058 whoami = UNKNOWN;
00059 machtab = NULL;
00060 selfp = sitep = NULL;
00061 maxsites = nsites = ret = totalsites = verbose = 0;
00062 priority = 100;
00063 home = "TESTDIR";
00064 progname = "ex_repquote";
00065
00066 while ((ch = getopt(argc, argv, "Ch:Mm:n:o:p:v")) != EOF)
00067 switch (ch) {
00068 case 'M':
00069 whoami = MASTER;
00070 master_eid = SELF_EID;
00071 break;
00072 case 'C':
00073 whoami = CLIENT;
00074 break;
00075 case 'h':
00076 home = optarg;
00077 break;
00078 case 'm':
00079 if ((myaddr = strdup(optarg)) == NULL) {
00080 fprintf(stderr,
00081 "System error %s\n", strerror(errno));
00082 goto err;
00083 }
00084 self.host = optarg;
00085 self.host = strtok(self.host, ":");
00086 if ((c = strtok(NULL, ":")) == NULL) {
00087 fprintf(stderr, "Bad host specification.\n");
00088 goto err;
00089 }
00090 myport = self.port = (unsigned short)atoi(c);
00091 selfp = &self;
00092 break;
00093 case 'n':
00094 totalsites = atoi(optarg);
00095 break;
00096 case 'o':
00097 site.host = optarg;
00098 site.host = strtok(site.host, ":");
00099 if ((c = strtok(NULL, ":")) == NULL) {
00100 fprintf(stderr, "Bad host specification.\n");
00101 goto err;
00102 }
00103 site.port = atoi(c);
00104 if (sitep == NULL || nsites >= maxsites) {
00105 maxsites = maxsites == 0 ? 10 : 2 * maxsites;
00106 if ((sitep = realloc(sitep,
00107 maxsites * sizeof(repsite_t))) == NULL) {
00108 fprintf(stderr, "System error %s\n",
00109 strerror(errno));
00110 goto err;
00111 }
00112 }
00113 sitep[nsites++] = site;
00114 break;
00115 case 'p':
00116 priority = atoi(optarg);
00117 break;
00118 case 'v':
00119 verbose = 1;
00120 break;
00121 case '?':
00122 default:
00123 usage(progname);
00124 }
00125
00126
00127 if (whoami == UNKNOWN) {
00128 fprintf(stderr, "Must specify -M or -C.\n");
00129 goto err;
00130 }
00131
00132 if (selfp == NULL)
00133 usage(progname);
00134
00135 if (home == NULL)
00136 usage(progname);
00137
00138 #ifdef _WIN32
00139
00140 if ((ret = WSAStartup(MAKEWORD(2, 2), &wsaData)) != 0) {
00141 fprintf(stderr,
00142 "Unable to initialize Windows sockets: %d\n", ret);
00143 goto err;
00144 }
00145 #else
00146
00147
00148
00149
00150 memset(&sigact, 0, sizeof(sigact));
00151 sigact.sa_handler = SIG_IGN;
00152 if ((ret = sigaction(SIGPIPE, &sigact, NULL)) != 0) {
00153 fprintf(stderr,
00154 "Unable to turn off SIGPIPE: %s\n", strerror(ret));
00155 goto err;
00156 }
00157 #endif
00158
00159
00160
00161
00162
00163
00164 if ((ret =
00165 machtab_init(&machtab, priority, totalsites)) != 0)
00166 goto err;
00167
00168
00169
00170
00171
00172
00173 if ((ret = env_init(progname, home, &dbenv, machtab, DB_RECOVER)) != 0)
00174 goto err;
00175 if (verbose &&
00176 (ret = dbenv->set_verbose(dbenv, DB_VERB_REPLICATION, 1)) != 0)
00177 goto err;
00178
00179
00180
00181
00182
00183
00184
00185 ca.dbenv = dbenv;
00186 ca.home = home;
00187 ca.progname = progname;
00188 ca.machtab = machtab;
00189 ca.port = selfp->port;
00190 if ((ret = thread_create(&conn_thr, NULL, connect_thread, &ca)) != 0) {
00191 dbenv->errx(dbenv, "can't create connect thread");
00192 goto err;
00193 }
00194
00195 aa.dbenv = dbenv;
00196 aa.progname = progname;
00197 aa.home = home;
00198 aa.machtab = machtab;
00199 aa.sites = sitep;
00200 aa.nsites = nsites;
00201 if ((ret = thread_create(&all_thr, NULL, connect_all, &aa)) != 0) {
00202 dbenv->errx(dbenv, "can't create connect-all thread");
00203 goto err;
00204 }
00205
00206
00207
00208
00209
00210 if (whoami == MASTER) {
00211 if ((ret = dbenv->rep_start(dbenv, NULL, DB_REP_MASTER)) != 0) {
00212 dbenv->err(dbenv, ret, "dbenv->rep_start failed");
00213 goto err;
00214 }
00215 if ((ret = domaster(dbenv, progname)) != 0) {
00216 dbenv->err(dbenv, ret, "Master failed");
00217 goto err;
00218 }
00219 } else {
00220 memset(&local, 0, sizeof(local));
00221 local.data = myaddr;
00222 local.size = (u_int32_t)strlen(myaddr) + 1;
00223 if ((ret =
00224 dbenv->rep_start(dbenv, &local, DB_REP_CLIENT)) != 0) {
00225 dbenv->err(dbenv, ret, "dbenv->rep_start failed");
00226 goto err;
00227 }
00228
00229 sleep(5);
00230 if ((ret = doclient(dbenv, progname, machtab)) != 0) {
00231 dbenv->err(dbenv, ret, "Client failed");
00232 goto err;
00233 }
00234
00235 }
00236
00237
00238 if (thread_join(all_thr, &astatus) || thread_join(conn_thr, &cstatus))
00239 ret = -1;
00240 if (ret == 0 &&
00241 ((uintptr_t)astatus != EXIT_SUCCESS ||
00242 (uintptr_t)cstatus != EXIT_SUCCESS))
00243 ret = -1;
00244
00245 err: if (machtab != NULL)
00246 free(machtab);
00247 if (dbenv != NULL)
00248 (void)dbenv->close(dbenv, 0);
00249 #ifdef _WIN32
00250
00251 (void)WSACleanup();
00252 #endif
00253 return (ret);
00254 }
00255
00256
00257
00258
00259
00260
00261
00262
00263
00264
00265
00266
00267
00268
00269
00270
00271
00272
00273
00274
00275
00276
00277
00278 static void
00279 usage(progname)
00280 const char *progname;
00281 {
00282 fprintf(stderr, "usage: %s ", progname);
00283 fprintf(stderr, "[-CM][-h home][-o host:port][-m host:port]%s",
00284 "[-n nsites][-p priority]\n");
00285 exit(EXIT_FAILURE);
00286 }
00287
00288
00289 static int
00290 env_init(progname, home, dbenvp, machtab, flags)
00291 const char *progname, *home;
00292 DB_ENV **dbenvp;
00293 machtab_t *machtab;
00294 u_int32_t flags;
00295 {
00296 DB_ENV *dbenv;
00297 int ret;
00298 char *prefix;
00299
00300 if ((prefix = malloc(strlen(progname) + 2)) == NULL) {
00301 fprintf(stderr,
00302 "%s: System error: %s\n", progname, strerror(errno));
00303 return (errno);
00304 }
00305 sprintf(prefix, "%s:", progname);
00306
00307 if ((ret = db_env_create(&dbenv, 0)) != 0) {
00308 fprintf(stderr, "%s: env create failed: %s\n",
00309 progname, db_strerror(ret));
00310 return (ret);
00311 }
00312 dbenv->set_errfile(dbenv, stderr);
00313 dbenv->set_errpfx(dbenv, prefix);
00314 (void)dbenv->set_cachesize(dbenv, 0, CACHESIZE, 0);
00315
00316
00317 dbenv->app_private = machtab;
00318 (void)dbenv->set_rep_transport(dbenv, SELF_EID, quote_send);
00319
00320 flags |= DB_CREATE | DB_THREAD | DB_INIT_REP |
00321 DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN;
00322
00323 if ((ret = dbenv->open(dbenv, home, flags, 0)) == 0)
00324 *dbenvp = dbenv;
00325 else
00326 fprintf(stderr, "can't open DB environment: %s\n",
00327 db_strerror(ret));
00328
00329 return (ret);
00330 }