Main Page | Class Hierarchy | Data Structures | Directories | File List | Data Fields | Related Pages

ex_rq_util.c

00001 /*-
00002  * See the file LICENSE for redistribution information.
00003  *
00004  * Copyright (c) 2001-2005
00005  *      Sleepycat Software.  All rights reserved.
00006  *
00007  * $Id: ex_rq_util.c,v 12.5 2005/11/02 22:14:24 alanb Exp $
00008  */
00009 
00010 #include <sys/types.h>
00011 #include <errno.h>
00012 #include <stdio.h>
00013 #include <stdlib.h>
00014 #include <string.h>
00015 
00016 #include <db.h>
00017 
00018 #include "ex_repquote.h"
00019 
00020 static int   connect_site __P((DB_ENV *, machtab_t *,
00021                  const char *, repsite_t *, int *, int *, thread_t *));
00022 static void *elect_thread __P((void *));
00023 static void *hm_loop __P((void *));
00024 
00025 typedef struct {
00026         DB_ENV *dbenv;
00027         machtab_t *machtab;
00028 } elect_args;
00029 
00030 typedef struct {
00031         DB_ENV *dbenv;
00032         const char *progname;
00033         const char *home;
00034         socket_t fd;
00035         u_int32_t eid;
00036         machtab_t *tab;
00037 } hm_loop_args;
00038 
00039 /*
00040  * This is a generic message handling loop that is used both by the
00041  * master to accept messages from a client as well as by clients
00042  * to communicate with other clients.
00043  */
00044 static void *
00045 hm_loop(args)
00046         void *args;
00047 {
00048         DB_ENV *dbenv;
00049         DB_LSN permlsn;
00050         DBT rec, control;
00051         const char *c, *home, *progname;
00052         elect_args *ea;
00053         hm_loop_args *ha;
00054         machtab_t *tab;
00055         thread_t elect_thr, *site_thrs, *tmp, tid;
00056         repsite_t self;
00057         u_int32_t timeout;
00058         int eid, n, nsites, newm, nsites_allocd;
00059         int already_open, pri, r, ret, t_ret, tmpid;
00060         socket_t fd;
00061         void *status;
00062 
00063         ea = NULL;
00064         site_thrs = NULL;
00065         nsites_allocd = 0;
00066         nsites = 0;
00067 
00068         ha = (hm_loop_args *)args;
00069         dbenv = ha->dbenv;
00070         fd = ha->fd;
00071         home = ha->home;
00072         eid = ha->eid;
00073         progname = ha->progname;
00074         tab = ha->tab;
00075         free(ha);
00076 
00077         memset(&rec, 0, sizeof(DBT));
00078         memset(&control, 0, sizeof(DBT));
00079 
00080         for (ret = 0; ret == 0;) {
00081                 if ((ret = get_next_message(fd, &rec, &control)) != 0) {
00082                         /*
00083                          * Close this connection; if it's the master call
00084                          * for an election.
00085                          */
00086                         closesocket(fd);
00087                         if ((ret = machtab_rem(tab, eid, 1)) != 0)
00088                                 break;
00089 
00090                         /*
00091                          * If I'm the master, I just lost a client and this
00092                          * thread is done.
00093                          */
00094                         if (master_eid == SELF_EID)
00095                                 break;
00096 
00097                         /*
00098                          * If I was talking with the master and the master
00099                          * went away, I need to call an election; else I'm
00100                          * done.
00101                          */
00102                         if (master_eid != eid)
00103                                 break;
00104 
00105                         master_eid = DB_EID_INVALID;
00106                         machtab_parm(tab, &n, &pri, &timeout);
00107                         if ((ret = dbenv->rep_elect(dbenv,
00108                             n, (n/2+1), pri, timeout, &newm, 0)) != 0)
00109                                 continue;
00110 
00111                         /*
00112                          * Regardless of the results, the site I was talking
00113                          * to is gone, so I have nothing to do but exit.
00114                          */
00115                         if (newm == SELF_EID && (ret =
00116                             dbenv->rep_start(dbenv, NULL, DB_REP_MASTER)) == 0)
00117                                 ret = domaster(dbenv, progname);
00118                         break;
00119                 }
00120 
00121                 tmpid = eid;
00122                 switch (r = dbenv->rep_process_message(dbenv,
00123                     &control, &rec, &tmpid, &permlsn)) {
00124                 case DB_REP_NEWSITE:
00125                         /*
00126                          * Check if we got sent connect information and if we
00127                          * did, if this is me or if we already have a
00128                          * connection to this new site.  If we don't,
00129                          * establish a new one.
00130                          */
00131 
00132                         /* No connect info. */
00133                         if (rec.size == 0)
00134                                 break;
00135 
00136                         /* It's me, do nothing. */
00137                         if (strncmp(myaddr, rec.data, rec.size) == 0)
00138                                 break;
00139 
00140                         self.host = (char *)rec.data;
00141                         self.host = strtok(self.host, ":");
00142                         if ((c = strtok(NULL, ":")) == NULL) {
00143                                 dbenv->errx(dbenv, "Bad host specification");
00144                                 goto out;
00145                         }
00146                         self.port = atoi(c);
00147 
00148                         /*
00149                          * We try to connect to the new site.  If we can't,
00150                          * we treat it as an error since we know that the site
00151                          * should be up if we got a message from it (even
00152                          * indirectly).
00153                          */
00154                         if (nsites == nsites_allocd) {
00155                                 /* Need to allocate more space. */
00156                                 if ((tmp = realloc(site_thrs,
00157                                     (10 + nsites) * sizeof(thread_t))) == NULL) {
00158                                         ret = errno;
00159                                         goto out;
00160                                 }
00161                                 site_thrs = tmp;
00162                                 nsites_allocd += 10;
00163                         }
00164                         if ((ret = connect_site(dbenv, tab, progname,
00165                             &self, &already_open, &tmpid, &tid)) != 0)
00166                                 goto out;
00167                         if (!already_open)
00168                                 memcpy(&site_thrs[nsites++], &tid, sizeof(thread_t));
00169                         break;
00170                 case DB_REP_HOLDELECTION:
00171                         if (master_eid == SELF_EID)
00172                                 break;
00173                         /* Make sure that previous election has finished. */
00174                         if (ea != NULL) {
00175                                 if (thread_join(elect_thr, &status) != 0) {
00176                                         dbenv->errx(dbenv,
00177                                             "thread join failure");
00178                                         goto out;
00179                                 }
00180                                 ea = NULL;
00181                         }
00182                         if ((ea = calloc(sizeof(elect_args), 1)) == NULL) {
00183                                 dbenv->errx(dbenv, "can't allocate memory");
00184                                 ret = errno;
00185                                 goto out;
00186                         }
00187                         ea->dbenv = dbenv;
00188                         ea->machtab = tab;
00189                         if ((ret = thread_create(&elect_thr,
00190                              NULL, elect_thread, (void *)ea)) != 0) {
00191                                 dbenv->errx(dbenv,
00192                                     "can't create election thread");
00193                         }
00194                         break;
00195                 case DB_REP_NEWMASTER:
00196                         /* Check if it's us. */
00197                         master_eid = tmpid;
00198                         if (tmpid == SELF_EID) {
00199                                 if ((ret = dbenv->rep_start(dbenv,
00200                                     NULL, DB_REP_MASTER)) != 0) {
00201                                         dbenv->err(dbenv, ret,
00202                                             "can't start as master");
00203                                         goto out;
00204                                 }
00205                                 ret = domaster(dbenv, progname);
00206                         }
00207                         break;
00208                 case DB_REP_ISPERM:
00209                         /* FALLTHROUGH */
00210                 case 0:
00211                         break;
00212                 default:
00213                         dbenv->err(dbenv, r, "DB_ENV->rep_process_message");
00214                         break;
00215                 }
00216         }
00217 
00218 out:    if ((t_ret = machtab_rem(tab, eid, 1)) != 0 && ret == 0)
00219                 ret = t_ret;
00220 
00221         /* Don't close the environment before any children exit. */
00222         if (ea != NULL && thread_join(elect_thr, &status) != 0)
00223                 dbenv->errx(dbenv, "can't join election thread");
00224 
00225         if (site_thrs != NULL)
00226                 while (--nsites >= 0)
00227                         if (thread_join(site_thrs[nsites], &status) != 0)
00228                                 dbenv->errx(dbenv, "can't join site thread");
00229 
00230         return ((void *)(uintptr_t)ret);
00231 }
00232 
00233 /*
00234  * This is a generic thread that spawns a thread to listen for connections
00235  * on a socket and then spawns off child threads to handle each new
00236  * connection.
00237  */
00238 void *
00239 connect_thread(args)
00240         void *args;
00241 {
00242         DB_ENV *dbenv;
00243         const char *home, *progname;
00244         hm_loop_args *ha;
00245         connect_args *cargs;
00246         machtab_t *machtab;
00247         thread_t hm_thrs[MAX_THREADS];
00248         void *status;
00249         int i, eid, port, ret;
00250         socket_t fd, ns;
00251 
00252         ha = NULL;
00253         cargs = (connect_args *)args;
00254         dbenv = cargs->dbenv;
00255         home = cargs->home;
00256         progname = cargs->progname;
00257         machtab = cargs->machtab;
00258         port = cargs->port;
00259 
00260         /*
00261          * Loop forever, accepting connections from new machines,
00262          * and forking off a thread to handle each.
00263          */
00264         if ((fd = listen_socket_init(progname, port)) < 0) {
00265                 ret = errno;
00266                 goto err;
00267         }
00268 
00269         for (i = 0; i < MAX_THREADS; i++) {
00270                 if ((ns = listen_socket_accept(machtab,
00271                     progname, fd, &eid)) == SOCKET_CREATION_FAILURE) {
00272                         ret = errno;
00273                         goto err;
00274                 }
00275                 if ((ha = calloc(sizeof(hm_loop_args), 1)) == NULL) {
00276                         dbenv->errx(dbenv, "can't allocate memory");
00277                         ret = errno;
00278                         goto err;
00279                 }
00280                 ha->progname = progname;
00281                 ha->home = home;
00282                 ha->fd = ns;
00283                 ha->eid = eid;
00284                 ha->tab = machtab;
00285                 ha->dbenv = dbenv;
00286                 if ((ret = thread_create(&hm_thrs[i++], NULL,
00287                     hm_loop, (void *)ha)) != 0) {
00288                         dbenv->errx(dbenv, "can't create thread for site");
00289                         goto err;
00290                 }
00291                 ha = NULL;
00292         }
00293 
00294         /* If we fell out, we ended up with too many threads. */
00295         dbenv->errx(dbenv, "Too many threads");
00296         ret = ENOMEM;
00297 
00298         /* Do not return until all threads have exited. */
00299         while (--i >= 0)
00300                 if (thread_join(hm_thrs[i], &status) != 0)
00301                         dbenv->errx(dbenv, "can't join site thread");
00302 
00303 err:    return (ret == 0 ? (void *)EXIT_SUCCESS : (void *)EXIT_FAILURE);
00304 }
00305 
00306 /*
00307  * Open a connection to everyone that we've been told about.  If we
00308  * cannot open some connections, keep trying.
00309  */
00310 void *
00311 connect_all(args)
00312         void *args;
00313 {
00314         DB_ENV *dbenv;
00315         all_args *aa;
00316         const char *home, *progname;
00317         hm_loop_args *ha;
00318         int failed, i, eid, nsites, open, ret, *success;
00319         machtab_t *machtab;
00320         thread_t *hm_thr;
00321         repsite_t *sites;
00322 
00323         ha = NULL;
00324         aa = (all_args *)args;
00325         dbenv = aa->dbenv;
00326         progname = aa->progname;
00327         home = aa->home;
00328         machtab = aa->machtab;
00329         nsites = aa->nsites;
00330         sites = aa->sites;
00331 
00332         ret = 0;
00333         hm_thr = NULL;
00334         success = NULL;
00335 
00336         /* Some implementations of calloc are sad about allocating 0 things. */
00337         if ((success = calloc(nsites > 0 ? nsites : 1, sizeof(int))) == NULL) {
00338                 dbenv->err(dbenv, errno, "connect_all");
00339                 ret = 1;
00340                 goto err;
00341         }
00342 
00343         if (nsites > 0 && (hm_thr = calloc(nsites, sizeof(int))) == NULL) {
00344                 dbenv->err(dbenv, errno, "connect_all");
00345                 ret = 1;
00346                 goto err;
00347         }
00348 
00349         for (failed = nsites; failed > 0;) {
00350                 for (i = 0; i < nsites; i++) {
00351                         if (success[i])
00352                                 continue;
00353 
00354                         ret = connect_site(dbenv, machtab,
00355                             progname, &sites[i], &open, &eid, &hm_thr[i]);
00356 
00357                         /*
00358                          * If we couldn't make the connection, this isn't
00359                          * fatal to the loop, but we have nothing further
00360                          * to do on this machine at the moment.
00361                          */
00362                         if (ret == DB_REP_UNAVAIL)
00363                                 continue;
00364 
00365                         if (ret != 0)
00366                                 goto err;
00367 
00368                         failed--;
00369                         success[i] = 1;
00370 
00371                         /* If the connection is already open, we're done. */
00372                         if (ret == 0 && open == 1)
00373                                 continue;
00374 
00375                 }
00376                 sleep(1);
00377         }
00378 
00379 err:    if (success != NULL)
00380                 free(success);
00381         if (hm_thr != NULL)
00382                 free(hm_thr);
00383         return (ret ? (void *)EXIT_FAILURE : (void *)EXIT_SUCCESS);
00384 }
00385 
00386 static int
00387 connect_site(dbenv, machtab, progname, site, is_open, eidp, hm_thrp)
00388         DB_ENV *dbenv;
00389         machtab_t *machtab;
00390         const char *progname;
00391         repsite_t *site;
00392         int *is_open, *eidp;
00393         thread_t *hm_thrp;
00394 {
00395         int ret;
00396         socket_t s;
00397         hm_loop_args *ha;
00398 
00399         if ((s = get_connected_socket(machtab, progname,
00400             site->host, site->port, is_open, eidp)) < 0)
00401                 return (DB_REP_UNAVAIL);
00402 
00403         if (*is_open)
00404                 return (0);
00405 
00406         if ((ha = calloc(sizeof(hm_loop_args), 1)) == NULL) {
00407                 dbenv->errx(dbenv, "can't allocate memory");
00408                 ret = errno;
00409                 goto err;
00410         }
00411 
00412         ha->progname = progname;
00413         ha->fd = s;
00414         ha->eid = *eidp;
00415         ha->tab = machtab;
00416         ha->dbenv = dbenv;
00417 
00418         if ((ret = thread_create(hm_thrp, NULL,
00419             hm_loop, (void *)ha)) != 0) {
00420                 dbenv->errx(dbenv, "can't create thread for connected site");
00421                 goto err1;
00422         }
00423         dbenv->errx(dbenv, "created thread %d\n", *hm_thrp); /* ### */
00424 
00425         return (0);
00426 
00427 err1:   free(ha);
00428 err:
00429         return (ret);
00430 }
00431 
00432 /*
00433  * We need to spawn off a new thread in which to hold an election in
00434  * case we are the only thread listening on for messages.
00435  */
00436 static void *
00437 elect_thread(args)
00438         void *args;
00439 {
00440         DB_ENV *dbenv;
00441         elect_args *eargs;
00442         machtab_t *machtab;
00443         u_int32_t timeout;
00444         int n, ret, pri;
00445 
00446         eargs = (elect_args *)args;
00447         dbenv = eargs->dbenv;
00448         machtab = eargs->machtab;
00449         free(eargs);
00450 
00451         machtab_parm(machtab, &n, &pri, &timeout);
00452         while ((ret = dbenv->rep_elect(dbenv, n, (n/2+1), pri, timeout,
00453             &master_eid, 0)) != 0)
00454                 sleep(2);
00455 
00456         /* Check if it's us. */
00457         if (master_eid == SELF_EID)
00458                 if ((ret = dbenv->rep_start(dbenv, NULL, DB_REP_MASTER)) != 0)
00459                         dbenv->err(dbenv, ret,
00460                             "can't start as master in election thread");
00461 
00462         return (NULL);
00463 }

Generated on Sun Dec 25 12:14:25 2005 for Berkeley DB 4.4.16 by  doxygen 1.4.2