Main Page | Class Hierarchy | Data Structures | Directories | File List | Data Fields | Related Pages

ex_rq_net.c

00001 /*-
00002  * See the file LICENSE for redistribution information.
00003  *
00004  * Copyright (c) 2001-2005
00005  *      Sleepycat Software.  All rights reserved.
00006  *
00007  * $Id: ex_rq_net.c,v 12.7 2005/11/03 18:29:21 bostic Exp $
00008  */
00009 
00010 #include <sys/types.h>
00011 #include <assert.h>
00012 #include <errno.h>
00013 #include <stdio.h>
00014 #include <stdlib.h>
00015 #include <string.h>
00016 
00017 #include <db.h>
00018 #include "ex_repquote.h"
00019 #ifndef _SYS_QUEUE_H
00020 /*
00021  * Some *BSD Unix variants include the Queue macros in their libraries and
00022  * these might already have been included.  In that case, it would be bad
00023  * to include them again.
00024  */
00025 #include <dbinc/queue.h>                /* !!!: for the LIST_XXX macros. */
00026 #endif
00027 
00028 int machtab_add __P((machtab_t *, socket_t, u_int32_t, int, int *));
00029 #ifdef DIAGNOSTIC
00030 void machtab_print __P((machtab_t *));
00031 #endif
00032 ssize_t readn __P((socket_t, void *, size_t));
00033 
00034 /*
00035  * This file defines the communication infrastructure for the ex_repquote
00036  * sample application.
00037  *
00038  * This application uses TCP/IP for its communication.  In an N-site
00039  * replication group, this means that there are N * N communication
00040  * channels so that every site can communicate with every other site
00041  * (this allows elections to be held when the master fails).  We do
00042  * not require that anyone know about all sites when the application
00043  * starts up.  In order to communicate, the application should know
00044  * about someone, else it has no idea how to ever get in the game.
00045  *
00046  * Communication is handled via a number of different threads.  These
00047  * thread functions are implemented in rep_util.c  In this file, we
00048  * define the data structures that maintain the state that describes
00049  * the comm infrastructure, the functions that manipulates this state
00050  * and the routines used to actually send and receive data over the
00051  * sockets.
00052  */
00053 
00054 /*
00055  * The communication infrastructure is represented by a machine table,
00056  * machtab_t, which is essentially a mutex-protected linked list of members
00057  * of the group.  The machtab also contains the parameters that are needed
00058  * to call for an election.  We hardwire values for these parameters in the
00059  * init function, but these could be set via some configuration setup in a
00060  * real application.  We reserve the machine-id 1 to refer to ourselves and
00061  * make the machine-id 0 be invalid.
00062  */
00063 
00064 #define MACHID_INVALID  0
00065 #define MACHID_SELF     1
00066 
00067 struct __machtab {
00068         LIST_HEAD(__machlist, __member) machlist;
00069         int nextid;
00070         mutex_t mtmutex;
00071         u_int32_t timeout_time;
00072         int current;
00073         int max;
00074         int nsites;
00075         int priority;
00076 };
00077 
00078 /* Data structure that describes each entry in the machtab. */
00079 struct __member {
00080         u_int32_t hostaddr;     /* Host IP address. */
00081         int port;               /* Port number. */
00082         int eid;                /* Application-specific machine id. */
00083         socket_t fd;            /* File descriptor for the socket. */
00084         LIST_ENTRY(__member) links;
00085                                 /* For linked list of all members we know of. */
00086 };
00087 
00088 static int quote_send_broadcast __P((machtab_t *,
00089     const DBT *, const DBT *, u_int32_t));
00090 static int quote_send_one __P((const DBT *, const DBT *, socket_t, u_int32_t));
00091 
00092 /*
00093  * machtab_init --
00094  *      Initialize the machine ID table.
00095  * XXX Right now we treat the number of sites as the maximum
00096  * number we've ever had on the list at one time.  We probably
00097  * want to make that smarter.
00098  */
00099 int
00100 machtab_init(machtabp, pri, nsites)
00101         machtab_t **machtabp;
00102         int pri, nsites;
00103 {
00104         int ret;
00105         machtab_t *machtab;
00106 
00107         if ((machtab = malloc(sizeof(machtab_t))) == NULL) {
00108                 fprintf(stderr, "can't allocate memory\n");
00109                 return (ENOMEM);
00110         }
00111 
00112         LIST_INIT(&machtab->machlist);
00113 
00114         /* Reserve eid's 0 and 1. */
00115         machtab->nextid = 2;
00116         machtab->timeout_time = 2 * 1000000;            /* 2 seconds. */
00117         machtab->current = machtab->max = 0;
00118         machtab->priority = pri;
00119         machtab->nsites = nsites;
00120 
00121         ret = mutex_init(&machtab->mtmutex, NULL);
00122         *machtabp = machtab;
00123 
00124         return (ret);
00125 }
00126 
00127 /*
00128  * machtab_add --
00129  *      Add a file descriptor to the table of machines, returning
00130  *  a new machine ID.
00131  */
00132 int
00133 machtab_add(machtab, fd, hostaddr, port, idp)
00134         machtab_t *machtab;
00135         socket_t fd;
00136         u_int32_t hostaddr;
00137         int port, *idp;
00138 {
00139         int ret;
00140         member_t *m, *member;
00141 
00142         ret = 0;
00143         if ((member = malloc(sizeof(member_t))) == NULL) {
00144                 fprintf(stderr, "can't allocate memory\n");
00145                 return (ENOMEM);
00146         }
00147 
00148         member->fd = fd;
00149         member->hostaddr = hostaddr;
00150         member->port = port;
00151 
00152         if ((ret = mutex_lock(&machtab->mtmutex)) != 0) {
00153                 fprintf(stderr, "can't lock mutex");
00154                 return (ret);
00155         }
00156 
00157         for (m = LIST_FIRST(&machtab->machlist);
00158             m != NULL; m = LIST_NEXT(m, links))
00159                 if (m->hostaddr == hostaddr && m->port == port)
00160                         break;
00161 
00162         if (m == NULL) {
00163                 member->eid = machtab->nextid++;
00164                 LIST_INSERT_HEAD(&machtab->machlist, member, links);
00165         } else
00166                 member->eid = m->eid;
00167 
00168         if ((ret = mutex_unlock(&machtab->mtmutex)) != 0) {
00169                 fprintf(stderr, "can't unlock mutex\n");
00170                 return (ret);
00171         }
00172 
00173         if (idp != NULL)
00174                 *idp = member->eid;
00175 
00176         if (m == NULL) {
00177                 if (++machtab->current > machtab->max)
00178                         machtab->max = machtab->current;
00179         } else {
00180                 free(member);
00181                 ret = EEXIST;
00182         }
00183 #ifdef DIAGNOSTIC
00184         printf("Exiting machtab_add\n");
00185         machtab_print(machtab);
00186 #endif
00187         return (ret);
00188 }
00189 
00190 /*
00191  * machtab_getinfo --
00192  *      Return host and port information for a particular machine id.
00193  */
00194 int
00195 machtab_getinfo(machtab, eid, hostp, portp)
00196         machtab_t *machtab;
00197         int eid;
00198         u_int32_t *hostp;
00199         int *portp;
00200 {
00201         int ret;
00202         member_t *member;
00203 
00204         if ((ret = mutex_lock(&machtab->mtmutex)) != 0) {
00205                 fprintf(stderr, "can't lock mutex\n");
00206                 return (ret);
00207         }
00208 
00209         for (member = LIST_FIRST(&machtab->machlist);
00210             member != NULL;
00211             member = LIST_NEXT(member, links))
00212                 if (member->eid == eid) {
00213                         *hostp = member->hostaddr;
00214                         *portp = member->port;
00215                         break;
00216                 }
00217 
00218         if ((ret = mutex_unlock(&machtab->mtmutex)) != 0) {
00219                 fprintf(stderr, "can't unlock mutex\n");
00220                 return (ret);
00221         }
00222 
00223         return (member != NULL ? 0 : EINVAL);
00224 }
00225 
00226 /*
00227  * machtab_rem --
00228  *      Remove a mapping from the table of machines.  Lock indicates
00229  * whether we need to lock the machtab or not (0 indicates we do not
00230  * need to lock; non-zero indicates that we do need to lock).
00231  */
00232 int
00233 machtab_rem(machtab, eid, lock)
00234         machtab_t *machtab;
00235         int eid;
00236         int lock;
00237 {
00238         int found, ret;
00239         member_t *member;
00240 
00241         ret = 0;
00242         if (lock && (ret = mutex_lock(&machtab->mtmutex)) != 0) {
00243                 fprintf(stderr, "can't lock mutex\n");
00244                 return (ret);
00245         }
00246 
00247         for (found = 0, member = LIST_FIRST(&machtab->machlist);
00248             member != NULL;
00249             member = LIST_NEXT(member, links))
00250                 if (member->eid == eid) {
00251                         found = 1;
00252                         LIST_REMOVE(member, links);
00253                         (void)closesocket(member->fd);
00254                         free(member);
00255                         machtab->current--;
00256                         break;
00257                 }
00258 
00259         if (LIST_FIRST(&machtab->machlist) == NULL)
00260                 machtab->nextid = 2;
00261 
00262         if (lock && (ret = mutex_unlock(&machtab->mtmutex)) != 0)
00263                 fprintf(stderr, "can't unlock mutex\n");
00264 
00265 #ifdef DIAGNOSTIC
00266         printf("Exiting machtab_rem\n");
00267         machtab_print(machtab);
00268 #endif
00269         return (ret);
00270 }
00271 
00272 void
00273 machtab_parm(machtab, nump, prip, timeoutp)
00274         machtab_t *machtab;
00275         int *nump, *prip;
00276         u_int32_t *timeoutp;
00277 {
00278         if (machtab->nsites == 0)
00279                 *nump = machtab->max;
00280         else
00281                 *nump = machtab->nsites;
00282         *prip = machtab->priority;
00283         *timeoutp = machtab->timeout_time;
00284 }
00285 
00286 #ifdef DIAGNOSTIC
00287 void
00288 machtab_print(machtab)
00289         machtab_t *machtab;
00290 {
00291         member_t *m;
00292 
00293         if (mutex_lock(&machtab->mtmutex) != 0) {
00294                 fprintf(stderr, "can't lock mutex\n");
00295                 abort();
00296         }
00297 
00298         for (m = LIST_FIRST(&machtab->machlist);
00299             m != NULL; m = LIST_NEXT(m, links)) {
00300 
00301             printf("IP: %lx Port: %6d EID: %2d FD: %3d\n",
00302                 (long)m->hostaddr, m->port, m->eid, m->fd);
00303         }
00304 
00305         if (mutex_unlock(&machtab->mtmutex) != 0) {
00306                 fprintf(stderr, "can't unlock mutex\n");
00307                 abort();
00308         }
00309 }
00310 #endif
00311 /*
00312  * listen_socket_init --
00313  *      Initialize a socket for listening on the specified port.  Returns
00314  *      a file descriptor for the socket, ready for an accept() call
00315  *      in a thread that we're happy to let block.
00316  */
00317 socket_t
00318 listen_socket_init(progname, port)
00319         const char *progname;
00320         int port;
00321 {
00322         socket_t s;
00323         int sockopt;
00324         struct sockaddr_in si;
00325 
00326         COMPQUIET(progname, NULL);
00327 
00328         if ((s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
00329                 perror("can't create listen socket");
00330                 return (-1);
00331         }
00332 
00333         memset(&si, 0, sizeof(si));
00334         si.sin_family = AF_INET;
00335         si.sin_addr.s_addr = htonl(INADDR_ANY);
00336         si.sin_port = htons((unsigned short)port);
00337 
00338         /*
00339          * When using this example for testing, it's common to kill and restart
00340          * regularly.  On some systems, this causes bind to fail with "address
00341          * in use" errors unless this option is set.
00342          */
00343         sockopt = 1;
00344         setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
00345             (const char *)&sockopt, sizeof (sockopt));
00346 
00347         if (bind(s, (struct sockaddr *)&si, sizeof(si)) != 0) {
00348                 perror("can't bind listen socket");
00349                 goto err;
00350         }
00351 
00352         if (listen(s, 5) != 0) {
00353                 perror("can't establish listen queue");
00354                 goto err;
00355         }
00356 
00357         return (s);
00358 
00359 err:    closesocket(s);
00360         return (-1);
00361 }
00362 
00363 /*
00364  * listen_socket_accept --
00365  *      Accept a connection on a socket.  This is essentially just a wrapper
00366  *      for accept(3).
00367  */
00368 socket_t
00369 listen_socket_accept(machtab, progname, s, eidp)
00370         machtab_t *machtab;
00371         const char *progname;
00372         socket_t s;
00373         int *eidp;
00374 {
00375         struct sockaddr_in si;
00376         int si_len;
00377         int host, ret;
00378         socket_t ns;
00379         u_int16_t port;
00380 
00381         COMPQUIET(progname, NULL);
00382 
00383 wait:   memset(&si, 0, sizeof(si));
00384         si_len = sizeof(si);
00385         ns = accept(s, (struct sockaddr *)&si, &si_len);
00386         if (ns == SOCKET_CREATION_FAILURE) {
00387                 fprintf(stderr, "can't accept incoming connection\n");
00388                 return ns;
00389         }
00390         host = ntohl(si.sin_addr.s_addr);
00391 
00392         /*
00393          * Sites send their listening port when connections are first
00394          * established, as it will be different from the outgoing port
00395          * for this connection.
00396          */
00397         if (readn(ns, &port, 2) != 2)
00398                 goto err;
00399         port = ntohs(port);
00400 
00401         ret = machtab_add(machtab, ns, host, port, eidp);
00402         if (ret == EEXIST) {
00403                 closesocket(ns);
00404                 goto wait;
00405         } else if (ret != 0)
00406                 goto err;
00407         printf("Connected to host %x port %d, eid = %d\n", host, port, *eidp);
00408         return (ns);
00409 
00410 err:    closesocket(ns);
00411         return SOCKET_CREATION_FAILURE;
00412 }
00413 
00414 /*
00415  * get_connected_socket --
00416  *      Connect to the specified port of the specified remote machine,
00417  *      and return a file descriptor when we have accepted a connection on it.
00418  *      Add this connection to the machtab.  If we already have a connection
00419  *      open to this machine, then don't create another one, return the eid
00420  *      of the connection (in *eidp) and set is_open to 1.  Return 0.
00421  */
00422 socket_t
00423 get_connected_socket(machtab, progname, remotehost, port, is_open, eidp)
00424         machtab_t *machtab;
00425         const char *progname, *remotehost;
00426         int port, *is_open, *eidp;
00427 {
00428         int ret;
00429         socket_t s;
00430         struct hostent *hp;
00431         struct sockaddr_in si;
00432         u_int32_t addr;
00433         u_int16_t nport;
00434 
00435         *is_open = 0;
00436 
00437         if ((hp = gethostbyname(remotehost)) == NULL) {
00438                 fprintf(stderr, "%s: host not found: %s\n", progname,
00439                     strerror(net_errno));
00440                 return (-1);
00441         }
00442 
00443         if ((s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
00444                 perror("can't create outgoing socket");
00445                 return (-1);
00446         }
00447         memset(&si, 0, sizeof(si));
00448         memcpy((char *)&si.sin_addr, hp->h_addr, hp->h_length);
00449         addr = ntohl(si.sin_addr.s_addr);
00450         ret = machtab_add(machtab, s, addr, port, eidp);
00451         if (ret == EEXIST) {
00452                 *is_open = 1;
00453                 closesocket(s);
00454                 return (0);
00455         } else if (ret != 0) {
00456                 closesocket(s);
00457                 return (-1);
00458         }
00459 
00460         si.sin_family = AF_INET;
00461         si.sin_port = htons((unsigned short)port);
00462         if (connect(s, (struct sockaddr *)&si, sizeof(si)) < 0) {
00463                 fprintf(stderr, "%s: connection failed: %s",
00464                     progname, strerror(net_errno));
00465                 (void)machtab_rem(machtab, *eidp, 1);
00466                 return (-1);
00467         }
00468 
00469         /*
00470          * The first thing we send on the socket is our (listening) port
00471          * so the site we are connecting to can register us correctly in
00472          * its machtab.
00473          */
00474         nport = htons(myport);
00475         writesocket(s, &nport, 2);
00476 
00477         return (s);
00478 }
00479 
00480 /*
00481  * get_next_message --
00482  *      Read a single message from the specified file descriptor, and
00483  * return it in the format used by rep functions (two DBTs and a type).
00484  *
00485  * This function is called in a loop by both clients and masters, and
00486  * the resulting DBTs are manually dispatched to DB_ENV->rep_process_message().
00487  */
00488 int
00489 get_next_message(fd, rec, control)
00490         socket_t fd;
00491         DBT *rec, *control;
00492 {
00493         size_t nr;
00494         u_int32_t rsize, csize;
00495         u_int8_t *recbuf, *controlbuf;
00496 
00497         /*
00498          * The protocol we use on the wire is dead simple:
00499          *
00500          *      4 bytes         - rec->size
00501          *      (# read above)  - rec->data
00502          *      4 bytes         - control->size
00503          *      (# read above)  - control->data
00504          */
00505 
00506         /* Read rec->size. */
00507         nr = readn(fd, &rsize, 4);
00508         if (nr != 4)
00509                 return (1);
00510 
00511         /* Read the record itself. */
00512         if (rsize > 0) {
00513                 if (rec->size < rsize)
00514                         rec->data = realloc(rec->data, rsize);
00515                 recbuf = rec->data;
00516                 nr = readn(fd, recbuf, rsize);
00517         } else {
00518                 if (rec->data != NULL)
00519                         free(rec->data);
00520                 rec->data = NULL;
00521         }
00522         rec->size = rsize;
00523 
00524         /* Read control->size. */
00525         nr = readn(fd, &csize, 4);
00526         if (nr != 4)
00527                 return (1);
00528 
00529         /* Read the control struct itself. */
00530         if (csize > 0) {
00531                 controlbuf = control->data;
00532                 if (control->size < csize)
00533                         controlbuf = realloc(controlbuf, csize);
00534                 nr = readn(fd, controlbuf, csize);
00535                 if (nr != csize)
00536                         return (1);
00537         } else {
00538                 if (control->data != NULL)
00539                         free(control->data);
00540                 controlbuf = NULL;
00541         }
00542         control->data = controlbuf;
00543         control->size = csize;
00544 
00545         return (0);
00546 }
00547 
00548 /*
00549  * readn --
00550  *     Read a full n characters from a file descriptor, unless we get an error
00551  * or EOF.
00552  */
00553 ssize_t
00554 readn(fd, vptr, n)
00555         socket_t fd;
00556         void *vptr;
00557         size_t n;
00558 {
00559         size_t nleft;
00560         ssize_t nread;
00561         char *ptr;
00562 
00563         ptr = vptr;
00564         nleft = n;
00565         while (nleft > 0) {
00566                 if ((nread = readsocket(fd, ptr, nleft)) < 0) {
00567                         /*
00568                          * Call read() again on interrupted system call;
00569                          * on other errors, bail.
00570                          */
00571                         if (net_errno == EINTR)
00572                                 nread = 0;
00573                         else {
00574                                 perror("can't read from socket");
00575                                 return (-1);
00576                         }
00577                 } else if (nread == 0)
00578                         break;  /* EOF */
00579 
00580                 nleft -= nread;
00581                 ptr   += nread;
00582         }
00583 
00584         return (n - nleft);
00585 }
00586 
00587 /*
00588  * quote_send --
00589  * The f_send function for DB_ENV->set_rep_transport.
00590  */
00591 int
00592 quote_send(dbenv, control, rec, lsnp, eid, flags)
00593         DB_ENV *dbenv;
00594         const DBT *control, *rec;
00595         const DB_LSN *lsnp;
00596         int eid;
00597         u_int32_t flags;
00598 {
00599         int n, ret, t_ret;
00600         socket_t fd;
00601         machtab_t *machtab;
00602         member_t *m;
00603 
00604         COMPQUIET(lsnp, NULL);
00605         machtab = (machtab_t *)dbenv->app_private;
00606 
00607         if (eid == DB_EID_BROADCAST) {
00608                 /*
00609                  * Right now, we do not require successful transmission.
00610                  * I'd like to move this requiring at least one successful
00611                  * transmission on PERMANENT requests.
00612                  */
00613                 n = quote_send_broadcast(machtab, rec, control, flags);
00614                 if (n < 0 /*|| (n == 0 && LF_ISSET(DB_REP_PERMANENT))*/)
00615                         return (DB_REP_UNAVAIL);
00616                 return (0);
00617         }
00618 
00619         if ((ret = mutex_lock(&machtab->mtmutex)) != 0) {
00620                 dbenv->errx(dbenv, "can't lock mutex");
00621                 return (ret);
00622         }
00623 
00624         fd = 0;
00625         for (m = LIST_FIRST(&machtab->machlist); m != NULL;
00626             m = LIST_NEXT(m, links)) {
00627                 if (m->eid == eid) {
00628                         fd = m->fd;
00629                         break;
00630                 }
00631         }
00632 
00633         if (fd == 0) {
00634                 dbenv->err(dbenv, DB_REP_UNAVAIL,
00635                     "quote_send: cannot find machine ID %d", eid);
00636                 return (DB_REP_UNAVAIL);
00637         }
00638 
00639         if ((ret = quote_send_one(rec, control, fd, flags)) != 0)
00640                 fprintf(stderr, "socket write error in send() function\n");
00641 
00642         if ((t_ret = mutex_unlock(&machtab->mtmutex)) != 0) {
00643                 dbenv->errx(dbenv, "can't unlock mutex");
00644                 if (ret == 0)
00645                         ret = t_ret;
00646         }
00647 
00648         return (ret);
00649 }
00650 
00651 /*
00652  * quote_send_broadcast --
00653  *      Send a message to everybody.
00654  * Returns the number of sites to which this message was successfully
00655  * communicated.  A -1 indicates a fatal error.
00656  */
00657 static int
00658 quote_send_broadcast(machtab, rec, control, flags)
00659         machtab_t *machtab;
00660         const DBT *rec, *control;
00661         u_int32_t flags;
00662 {
00663         int ret, sent;
00664         member_t *m, *next;
00665 
00666         if ((ret = mutex_lock(&machtab->mtmutex)) != 0) {
00667                 fprintf(stderr, "can't lock mutex\n");
00668                 return (ret);
00669         }
00670 
00671         sent = 0;
00672         for (m = LIST_FIRST(&machtab->machlist); m != NULL; m = next) {
00673                 next = LIST_NEXT(m, links);
00674                 if ((ret = quote_send_one(rec, control, m->fd, flags)) != 0) {
00675                         fprintf(stderr, "socket write error in broadcast\n");
00676                         (void)machtab_rem(machtab, m->eid, 0);
00677                 } else
00678                         sent++;
00679         }
00680 
00681         if (mutex_unlock(&machtab->mtmutex) != 0) {
00682                 fprintf(stderr, "can't unlock mutex\n");
00683                 return (-1);
00684         }
00685 
00686         return (sent);
00687 }
00688 
00689 /*
00690  * quote_send_one --
00691  *      Send a message to a single machine, given that machine's file
00692  * descriptor.
00693  *
00694  * !!!
00695  * Note that the machtab mutex should be held through this call.
00696  * It doubles as a synchronizer to make sure that two threads don't
00697  * intersperse writes that are part of two single messages.
00698  */
00699 static int
00700 quote_send_one(rec, control, fd, flags)
00701         const DBT *rec, *control;
00702         socket_t fd;
00703         u_int32_t flags;
00704 
00705 {
00706         int retry;
00707         ssize_t bytes_left, nw;
00708         u_int8_t *wp;
00709 
00710         COMPQUIET(flags, 0);
00711 
00712         /*
00713          * The protocol is simply: write rec->size, write rec->data,
00714          * write control->size, write control->data.
00715          */
00716         nw = writesocket(fd, (const char *)&rec->size, 4);
00717         if (nw != 4)
00718                 return (DB_REP_UNAVAIL);
00719 
00720         if (rec->size > 0) {
00721                 nw = writesocket(fd, rec->data, rec->size);
00722                 if (nw < 0)
00723                         return (DB_REP_UNAVAIL);
00724                 if (nw != (ssize_t)rec->size) {
00725                         /* Try a couple of times to finish the write. */
00726                         wp = (u_int8_t *)rec->data + nw;
00727                         bytes_left = rec->size - nw;
00728                         for (retry = 0; bytes_left > 0 && retry < 3; retry++) {
00729                                 nw = writesocket(fd, wp, bytes_left);
00730                                 if (nw < 0)
00731                                         return (DB_REP_UNAVAIL);
00732                                 bytes_left -= nw;
00733                                 wp += nw;
00734                         }
00735                         if (bytes_left > 0)
00736                                 return (DB_REP_UNAVAIL);
00737                 }
00738         }
00739 
00740         nw = writesocket(fd, (const char *)&control->size, 4);
00741         if (nw != 4)
00742                 return (DB_REP_UNAVAIL);
00743         if (control->size > 0) {
00744                 nw = writesocket(fd, control->data, control->size);
00745                 if (nw != (ssize_t)control->size)
00746                         return (DB_REP_UNAVAIL);
00747         }
00748         return (0);
00749 }

Generated on Sun Dec 25 12:14:25 2005 for Berkeley DB 4.4.16 by  doxygen 1.4.2