00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 #include <sys/types.h>
00011 #include <errno.h>
00012 #include <stdio.h>
00013 #include <stdlib.h>
00014 #include <string.h>
00015
00016 #include <db.h>
00017
00018 #include "ex_repquote.h"
00019
00020 static int connect_site __P((DB_ENV *, machtab_t *,
00021 const char *, repsite_t *, int *, int *, thread_t *));
00022 static void *elect_thread __P((void *));
00023 static void *hm_loop __P((void *));
00024
00025 typedef struct {
00026 DB_ENV *dbenv;
00027 machtab_t *machtab;
00028 } elect_args;
00029
00030 typedef struct {
00031 DB_ENV *dbenv;
00032 const char *progname;
00033 const char *home;
00034 socket_t fd;
00035 u_int32_t eid;
00036 machtab_t *tab;
00037 } hm_loop_args;
00038
00039
00040
00041
00042
00043
00044 static void *
00045 hm_loop(args)
00046 void *args;
00047 {
00048 DB_ENV *dbenv;
00049 DB_LSN permlsn;
00050 DBT rec, control;
00051 const char *c, *home, *progname;
00052 elect_args *ea;
00053 hm_loop_args *ha;
00054 machtab_t *tab;
00055 thread_t elect_thr, *site_thrs, *tmp, tid;
00056 repsite_t self;
00057 u_int32_t timeout;
00058 int eid, n, nsites, newm, nsites_allocd;
00059 int already_open, pri, r, ret, t_ret, tmpid;
00060 socket_t fd;
00061 void *status;
00062
00063 ea = NULL;
00064 site_thrs = NULL;
00065 nsites_allocd = 0;
00066 nsites = 0;
00067
00068 ha = (hm_loop_args *)args;
00069 dbenv = ha->dbenv;
00070 fd = ha->fd;
00071 home = ha->home;
00072 eid = ha->eid;
00073 progname = ha->progname;
00074 tab = ha->tab;
00075 free(ha);
00076
00077 memset(&rec, 0, sizeof(DBT));
00078 memset(&control, 0, sizeof(DBT));
00079
00080 for (ret = 0; ret == 0;) {
00081 if ((ret = get_next_message(fd, &rec, &control)) != 0) {
00082
00083
00084
00085
00086 closesocket(fd);
00087 if ((ret = machtab_rem(tab, eid, 1)) != 0)
00088 break;
00089
00090
00091
00092
00093
00094 if (master_eid == SELF_EID)
00095 break;
00096
00097
00098
00099
00100
00101
00102 if (master_eid != eid)
00103 break;
00104
00105 master_eid = DB_EID_INVALID;
00106 machtab_parm(tab, &n, &pri, &timeout);
00107 if ((ret = dbenv->rep_elect(dbenv,
00108 n, (n/2+1), pri, timeout, &newm, 0)) != 0)
00109 continue;
00110
00111
00112
00113
00114
00115 if (newm == SELF_EID && (ret =
00116 dbenv->rep_start(dbenv, NULL, DB_REP_MASTER)) == 0)
00117 ret = domaster(dbenv, progname);
00118 break;
00119 }
00120
00121 tmpid = eid;
00122 switch (r = dbenv->rep_process_message(dbenv,
00123 &control, &rec, &tmpid, &permlsn)) {
00124 case DB_REP_NEWSITE:
00125
00126
00127
00128
00129
00130
00131
00132
00133 if (rec.size == 0)
00134 break;
00135
00136
00137 if (strncmp(myaddr, rec.data, rec.size) == 0)
00138 break;
00139
00140 self.host = (char *)rec.data;
00141 self.host = strtok(self.host, ":");
00142 if ((c = strtok(NULL, ":")) == NULL) {
00143 dbenv->errx(dbenv, "Bad host specification");
00144 goto out;
00145 }
00146 self.port = atoi(c);
00147
00148
00149
00150
00151
00152
00153
00154 if (nsites == nsites_allocd) {
00155
00156 if ((tmp = realloc(site_thrs,
00157 (10 + nsites) * sizeof(thread_t))) == NULL) {
00158 ret = errno;
00159 goto out;
00160 }
00161 site_thrs = tmp;
00162 nsites_allocd += 10;
00163 }
00164 if ((ret = connect_site(dbenv, tab, progname,
00165 &self, &already_open, &tmpid, &tid)) != 0)
00166 goto out;
00167 if (!already_open)
00168 memcpy(&site_thrs[nsites++], &tid, sizeof(thread_t));
00169 break;
00170 case DB_REP_HOLDELECTION:
00171 if (master_eid == SELF_EID)
00172 break;
00173
00174 if (ea != NULL) {
00175 if (thread_join(elect_thr, &status) != 0) {
00176 dbenv->errx(dbenv,
00177 "thread join failure");
00178 goto out;
00179 }
00180 ea = NULL;
00181 }
00182 if ((ea = calloc(sizeof(elect_args), 1)) == NULL) {
00183 dbenv->errx(dbenv, "can't allocate memory");
00184 ret = errno;
00185 goto out;
00186 }
00187 ea->dbenv = dbenv;
00188 ea->machtab = tab;
00189 if ((ret = thread_create(&elect_thr,
00190 NULL, elect_thread, (void *)ea)) != 0) {
00191 dbenv->errx(dbenv,
00192 "can't create election thread");
00193 }
00194 break;
00195 case DB_REP_NEWMASTER:
00196
00197 master_eid = tmpid;
00198 if (tmpid == SELF_EID) {
00199 if ((ret = dbenv->rep_start(dbenv,
00200 NULL, DB_REP_MASTER)) != 0) {
00201 dbenv->err(dbenv, ret,
00202 "can't start as master");
00203 goto out;
00204 }
00205 ret = domaster(dbenv, progname);
00206 }
00207 break;
00208 case DB_REP_ISPERM:
00209
00210 case 0:
00211 break;
00212 default:
00213 dbenv->err(dbenv, r, "DB_ENV->rep_process_message");
00214 break;
00215 }
00216 }
00217
00218 out: if ((t_ret = machtab_rem(tab, eid, 1)) != 0 && ret == 0)
00219 ret = t_ret;
00220
00221
00222 if (ea != NULL && thread_join(elect_thr, &status) != 0)
00223 dbenv->errx(dbenv, "can't join election thread");
00224
00225 if (site_thrs != NULL)
00226 while (--nsites >= 0)
00227 if (thread_join(site_thrs[nsites], &status) != 0)
00228 dbenv->errx(dbenv, "can't join site thread");
00229
00230 return ((void *)(uintptr_t)ret);
00231 }
00232
00233
00234
00235
00236
00237
00238 void *
00239 connect_thread(args)
00240 void *args;
00241 {
00242 DB_ENV *dbenv;
00243 const char *home, *progname;
00244 hm_loop_args *ha;
00245 connect_args *cargs;
00246 machtab_t *machtab;
00247 thread_t hm_thrs[MAX_THREADS];
00248 void *status;
00249 int i, eid, port, ret;
00250 socket_t fd, ns;
00251
00252 ha = NULL;
00253 cargs = (connect_args *)args;
00254 dbenv = cargs->dbenv;
00255 home = cargs->home;
00256 progname = cargs->progname;
00257 machtab = cargs->machtab;
00258 port = cargs->port;
00259
00260
00261
00262
00263
00264 if ((fd = listen_socket_init(progname, port)) < 0) {
00265 ret = errno;
00266 goto err;
00267 }
00268
00269 for (i = 0; i < MAX_THREADS; i++) {
00270 if ((ns = listen_socket_accept(machtab,
00271 progname, fd, &eid)) == SOCKET_CREATION_FAILURE) {
00272 ret = errno;
00273 goto err;
00274 }
00275 if ((ha = calloc(sizeof(hm_loop_args), 1)) == NULL) {
00276 dbenv->errx(dbenv, "can't allocate memory");
00277 ret = errno;
00278 goto err;
00279 }
00280 ha->progname = progname;
00281 ha->home = home;
00282 ha->fd = ns;
00283 ha->eid = eid;
00284 ha->tab = machtab;
00285 ha->dbenv = dbenv;
00286 if ((ret = thread_create(&hm_thrs[i++], NULL,
00287 hm_loop, (void *)ha)) != 0) {
00288 dbenv->errx(dbenv, "can't create thread for site");
00289 goto err;
00290 }
00291 ha = NULL;
00292 }
00293
00294
00295 dbenv->errx(dbenv, "Too many threads");
00296 ret = ENOMEM;
00297
00298
00299 while (--i >= 0)
00300 if (thread_join(hm_thrs[i], &status) != 0)
00301 dbenv->errx(dbenv, "can't join site thread");
00302
00303 err: return (ret == 0 ? (void *)EXIT_SUCCESS : (void *)EXIT_FAILURE);
00304 }
00305
00306
00307
00308
00309
00310 void *
00311 connect_all(args)
00312 void *args;
00313 {
00314 DB_ENV *dbenv;
00315 all_args *aa;
00316 const char *home, *progname;
00317 hm_loop_args *ha;
00318 int failed, i, eid, nsites, open, ret, *success;
00319 machtab_t *machtab;
00320 thread_t *hm_thr;
00321 repsite_t *sites;
00322
00323 ha = NULL;
00324 aa = (all_args *)args;
00325 dbenv = aa->dbenv;
00326 progname = aa->progname;
00327 home = aa->home;
00328 machtab = aa->machtab;
00329 nsites = aa->nsites;
00330 sites = aa->sites;
00331
00332 ret = 0;
00333 hm_thr = NULL;
00334 success = NULL;
00335
00336
00337 if ((success = calloc(nsites > 0 ? nsites : 1, sizeof(int))) == NULL) {
00338 dbenv->err(dbenv, errno, "connect_all");
00339 ret = 1;
00340 goto err;
00341 }
00342
00343 if (nsites > 0 && (hm_thr = calloc(nsites, sizeof(int))) == NULL) {
00344 dbenv->err(dbenv, errno, "connect_all");
00345 ret = 1;
00346 goto err;
00347 }
00348
00349 for (failed = nsites; failed > 0;) {
00350 for (i = 0; i < nsites; i++) {
00351 if (success[i])
00352 continue;
00353
00354 ret = connect_site(dbenv, machtab,
00355 progname, &sites[i], &open, &eid, &hm_thr[i]);
00356
00357
00358
00359
00360
00361
00362 if (ret == DB_REP_UNAVAIL)
00363 continue;
00364
00365 if (ret != 0)
00366 goto err;
00367
00368 failed--;
00369 success[i] = 1;
00370
00371
00372 if (ret == 0 && open == 1)
00373 continue;
00374
00375 }
00376 sleep(1);
00377 }
00378
00379 err: if (success != NULL)
00380 free(success);
00381 if (hm_thr != NULL)
00382 free(hm_thr);
00383 return (ret ? (void *)EXIT_FAILURE : (void *)EXIT_SUCCESS);
00384 }
00385
00386 static int
00387 connect_site(dbenv, machtab, progname, site, is_open, eidp, hm_thrp)
00388 DB_ENV *dbenv;
00389 machtab_t *machtab;
00390 const char *progname;
00391 repsite_t *site;
00392 int *is_open, *eidp;
00393 thread_t *hm_thrp;
00394 {
00395 int ret;
00396 socket_t s;
00397 hm_loop_args *ha;
00398
00399 if ((s = get_connected_socket(machtab, progname,
00400 site->host, site->port, is_open, eidp)) < 0)
00401 return (DB_REP_UNAVAIL);
00402
00403 if (*is_open)
00404 return (0);
00405
00406 if ((ha = calloc(sizeof(hm_loop_args), 1)) == NULL) {
00407 dbenv->errx(dbenv, "can't allocate memory");
00408 ret = errno;
00409 goto err;
00410 }
00411
00412 ha->progname = progname;
00413 ha->fd = s;
00414 ha->eid = *eidp;
00415 ha->tab = machtab;
00416 ha->dbenv = dbenv;
00417
00418 if ((ret = thread_create(hm_thrp, NULL,
00419 hm_loop, (void *)ha)) != 0) {
00420 dbenv->errx(dbenv, "can't create thread for connected site");
00421 goto err1;
00422 }
00423 dbenv->errx(dbenv, "created thread %d\n", *hm_thrp);
00424
00425 return (0);
00426
00427 err1: free(ha);
00428 err:
00429 return (ret);
00430 }
00431
00432
00433
00434
00435
00436 static void *
00437 elect_thread(args)
00438 void *args;
00439 {
00440 DB_ENV *dbenv;
00441 elect_args *eargs;
00442 machtab_t *machtab;
00443 u_int32_t timeout;
00444 int n, ret, pri;
00445
00446 eargs = (elect_args *)args;
00447 dbenv = eargs->dbenv;
00448 machtab = eargs->machtab;
00449 free(eargs);
00450
00451 machtab_parm(machtab, &n, &pri, &timeout);
00452 while ((ret = dbenv->rep_elect(dbenv, n, (n/2+1), pri, timeout,
00453 &master_eid, 0)) != 0)
00454 sleep(2);
00455
00456
00457 if (master_eid == SELF_EID)
00458 if ((ret = dbenv->rep_start(dbenv, NULL, DB_REP_MASTER)) != 0)
00459 dbenv->err(dbenv, ret,
00460 "can't start as master in election thread");
00461
00462 return (NULL);
00463 }