00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 #include <sys/types.h>
00011 #include <assert.h>
00012 #include <errno.h>
00013 #include <stdio.h>
00014 #include <stdlib.h>
00015 #include <string.h>
00016
00017 #include <db.h>
00018 #include "ex_repquote.h"
00019 #ifndef _SYS_QUEUE_H
00020
00021
00022
00023
00024
00025 #include <dbinc/queue.h>
00026 #endif
00027
00028 int machtab_add __P((machtab_t *, socket_t, u_int32_t, int, int *));
00029 #ifdef DIAGNOSTIC
00030 void machtab_print __P((machtab_t *));
00031 #endif
00032 ssize_t readn __P((socket_t, void *, size_t));
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064 #define MACHID_INVALID 0
00065 #define MACHID_SELF 1
00066
00067 struct __machtab {
00068 LIST_HEAD(__machlist, __member) machlist;
00069 int nextid;
00070 mutex_t mtmutex;
00071 u_int32_t timeout_time;
00072 int current;
00073 int max;
00074 int nsites;
00075 int priority;
00076 };
00077
00078
00079 struct __member {
00080 u_int32_t hostaddr;
00081 int port;
00082 int eid;
00083 socket_t fd;
00084 LIST_ENTRY(__member) links;
00085
00086 };
00087
00088 static int quote_send_broadcast __P((machtab_t *,
00089 const DBT *, const DBT *, u_int32_t));
00090 static int quote_send_one __P((const DBT *, const DBT *, socket_t, u_int32_t));
00091
00092
00093
00094
00095
00096
00097
00098
00099 int
00100 machtab_init(machtabp, pri, nsites)
00101 machtab_t **machtabp;
00102 int pri, nsites;
00103 {
00104 int ret;
00105 machtab_t *machtab;
00106
00107 if ((machtab = malloc(sizeof(machtab_t))) == NULL) {
00108 fprintf(stderr, "can't allocate memory\n");
00109 return (ENOMEM);
00110 }
00111
00112 LIST_INIT(&machtab->machlist);
00113
00114
00115 machtab->nextid = 2;
00116 machtab->timeout_time = 2 * 1000000;
00117 machtab->current = machtab->max = 0;
00118 machtab->priority = pri;
00119 machtab->nsites = nsites;
00120
00121 ret = mutex_init(&machtab->mtmutex, NULL);
00122 *machtabp = machtab;
00123
00124 return (ret);
00125 }
00126
00127
00128
00129
00130
00131
00132 int
00133 machtab_add(machtab, fd, hostaddr, port, idp)
00134 machtab_t *machtab;
00135 socket_t fd;
00136 u_int32_t hostaddr;
00137 int port, *idp;
00138 {
00139 int ret;
00140 member_t *m, *member;
00141
00142 ret = 0;
00143 if ((member = malloc(sizeof(member_t))) == NULL) {
00144 fprintf(stderr, "can't allocate memory\n");
00145 return (ENOMEM);
00146 }
00147
00148 member->fd = fd;
00149 member->hostaddr = hostaddr;
00150 member->port = port;
00151
00152 if ((ret = mutex_lock(&machtab->mtmutex)) != 0) {
00153 fprintf(stderr, "can't lock mutex");
00154 return (ret);
00155 }
00156
00157 for (m = LIST_FIRST(&machtab->machlist);
00158 m != NULL; m = LIST_NEXT(m, links))
00159 if (m->hostaddr == hostaddr && m->port == port)
00160 break;
00161
00162 if (m == NULL) {
00163 member->eid = machtab->nextid++;
00164 LIST_INSERT_HEAD(&machtab->machlist, member, links);
00165 } else
00166 member->eid = m->eid;
00167
00168 if ((ret = mutex_unlock(&machtab->mtmutex)) != 0) {
00169 fprintf(stderr, "can't unlock mutex\n");
00170 return (ret);
00171 }
00172
00173 if (idp != NULL)
00174 *idp = member->eid;
00175
00176 if (m == NULL) {
00177 if (++machtab->current > machtab->max)
00178 machtab->max = machtab->current;
00179 } else {
00180 free(member);
00181 ret = EEXIST;
00182 }
00183 #ifdef DIAGNOSTIC
00184 printf("Exiting machtab_add\n");
00185 machtab_print(machtab);
00186 #endif
00187 return (ret);
00188 }
00189
00190
00191
00192
00193
00194 int
00195 machtab_getinfo(machtab, eid, hostp, portp)
00196 machtab_t *machtab;
00197 int eid;
00198 u_int32_t *hostp;
00199 int *portp;
00200 {
00201 int ret;
00202 member_t *member;
00203
00204 if ((ret = mutex_lock(&machtab->mtmutex)) != 0) {
00205 fprintf(stderr, "can't lock mutex\n");
00206 return (ret);
00207 }
00208
00209 for (member = LIST_FIRST(&machtab->machlist);
00210 member != NULL;
00211 member = LIST_NEXT(member, links))
00212 if (member->eid == eid) {
00213 *hostp = member->hostaddr;
00214 *portp = member->port;
00215 break;
00216 }
00217
00218 if ((ret = mutex_unlock(&machtab->mtmutex)) != 0) {
00219 fprintf(stderr, "can't unlock mutex\n");
00220 return (ret);
00221 }
00222
00223 return (member != NULL ? 0 : EINVAL);
00224 }
00225
00226
00227
00228
00229
00230
00231
00232 int
00233 machtab_rem(machtab, eid, lock)
00234 machtab_t *machtab;
00235 int eid;
00236 int lock;
00237 {
00238 int found, ret;
00239 member_t *member;
00240
00241 ret = 0;
00242 if (lock && (ret = mutex_lock(&machtab->mtmutex)) != 0) {
00243 fprintf(stderr, "can't lock mutex\n");
00244 return (ret);
00245 }
00246
00247 for (found = 0, member = LIST_FIRST(&machtab->machlist);
00248 member != NULL;
00249 member = LIST_NEXT(member, links))
00250 if (member->eid == eid) {
00251 found = 1;
00252 LIST_REMOVE(member, links);
00253 (void)closesocket(member->fd);
00254 free(member);
00255 machtab->current--;
00256 break;
00257 }
00258
00259 if (LIST_FIRST(&machtab->machlist) == NULL)
00260 machtab->nextid = 2;
00261
00262 if (lock && (ret = mutex_unlock(&machtab->mtmutex)) != 0)
00263 fprintf(stderr, "can't unlock mutex\n");
00264
00265 #ifdef DIAGNOSTIC
00266 printf("Exiting machtab_rem\n");
00267 machtab_print(machtab);
00268 #endif
00269 return (ret);
00270 }
00271
00272 void
00273 machtab_parm(machtab, nump, prip, timeoutp)
00274 machtab_t *machtab;
00275 int *nump, *prip;
00276 u_int32_t *timeoutp;
00277 {
00278 if (machtab->nsites == 0)
00279 *nump = machtab->max;
00280 else
00281 *nump = machtab->nsites;
00282 *prip = machtab->priority;
00283 *timeoutp = machtab->timeout_time;
00284 }
00285
00286 #ifdef DIAGNOSTIC
00287 void
00288 machtab_print(machtab)
00289 machtab_t *machtab;
00290 {
00291 member_t *m;
00292
00293 if (mutex_lock(&machtab->mtmutex) != 0) {
00294 fprintf(stderr, "can't lock mutex\n");
00295 abort();
00296 }
00297
00298 for (m = LIST_FIRST(&machtab->machlist);
00299 m != NULL; m = LIST_NEXT(m, links)) {
00300
00301 printf("IP: %lx Port: %6d EID: %2d FD: %3d\n",
00302 (long)m->hostaddr, m->port, m->eid, m->fd);
00303 }
00304
00305 if (mutex_unlock(&machtab->mtmutex) != 0) {
00306 fprintf(stderr, "can't unlock mutex\n");
00307 abort();
00308 }
00309 }
00310 #endif
00311
00312
00313
00314
00315
00316
00317 socket_t
00318 listen_socket_init(progname, port)
00319 const char *progname;
00320 int port;
00321 {
00322 socket_t s;
00323 int sockopt;
00324 struct sockaddr_in si;
00325
00326 COMPQUIET(progname, NULL);
00327
00328 if ((s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
00329 perror("can't create listen socket");
00330 return (-1);
00331 }
00332
00333 memset(&si, 0, sizeof(si));
00334 si.sin_family = AF_INET;
00335 si.sin_addr.s_addr = htonl(INADDR_ANY);
00336 si.sin_port = htons((unsigned short)port);
00337
00338
00339
00340
00341
00342
00343 sockopt = 1;
00344 setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
00345 (const char *)&sockopt, sizeof (sockopt));
00346
00347 if (bind(s, (struct sockaddr *)&si, sizeof(si)) != 0) {
00348 perror("can't bind listen socket");
00349 goto err;
00350 }
00351
00352 if (listen(s, 5) != 0) {
00353 perror("can't establish listen queue");
00354 goto err;
00355 }
00356
00357 return (s);
00358
00359 err: closesocket(s);
00360 return (-1);
00361 }
00362
00363
00364
00365
00366
00367
00368 socket_t
00369 listen_socket_accept(machtab, progname, s, eidp)
00370 machtab_t *machtab;
00371 const char *progname;
00372 socket_t s;
00373 int *eidp;
00374 {
00375 struct sockaddr_in si;
00376 int si_len;
00377 int host, ret;
00378 socket_t ns;
00379 u_int16_t port;
00380
00381 COMPQUIET(progname, NULL);
00382
00383 wait: memset(&si, 0, sizeof(si));
00384 si_len = sizeof(si);
00385 ns = accept(s, (struct sockaddr *)&si, &si_len);
00386 if (ns == SOCKET_CREATION_FAILURE) {
00387 fprintf(stderr, "can't accept incoming connection\n");
00388 return ns;
00389 }
00390 host = ntohl(si.sin_addr.s_addr);
00391
00392
00393
00394
00395
00396
00397 if (readn(ns, &port, 2) != 2)
00398 goto err;
00399 port = ntohs(port);
00400
00401 ret = machtab_add(machtab, ns, host, port, eidp);
00402 if (ret == EEXIST) {
00403 closesocket(ns);
00404 goto wait;
00405 } else if (ret != 0)
00406 goto err;
00407 printf("Connected to host %x port %d, eid = %d\n", host, port, *eidp);
00408 return (ns);
00409
00410 err: closesocket(ns);
00411 return SOCKET_CREATION_FAILURE;
00412 }
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422 socket_t
00423 get_connected_socket(machtab, progname, remotehost, port, is_open, eidp)
00424 machtab_t *machtab;
00425 const char *progname, *remotehost;
00426 int port, *is_open, *eidp;
00427 {
00428 int ret;
00429 socket_t s;
00430 struct hostent *hp;
00431 struct sockaddr_in si;
00432 u_int32_t addr;
00433 u_int16_t nport;
00434
00435 *is_open = 0;
00436
00437 if ((hp = gethostbyname(remotehost)) == NULL) {
00438 fprintf(stderr, "%s: host not found: %s\n", progname,
00439 strerror(net_errno));
00440 return (-1);
00441 }
00442
00443 if ((s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
00444 perror("can't create outgoing socket");
00445 return (-1);
00446 }
00447 memset(&si, 0, sizeof(si));
00448 memcpy((char *)&si.sin_addr, hp->h_addr, hp->h_length);
00449 addr = ntohl(si.sin_addr.s_addr);
00450 ret = machtab_add(machtab, s, addr, port, eidp);
00451 if (ret == EEXIST) {
00452 *is_open = 1;
00453 closesocket(s);
00454 return (0);
00455 } else if (ret != 0) {
00456 closesocket(s);
00457 return (-1);
00458 }
00459
00460 si.sin_family = AF_INET;
00461 si.sin_port = htons((unsigned short)port);
00462 if (connect(s, (struct sockaddr *)&si, sizeof(si)) < 0) {
00463 fprintf(stderr, "%s: connection failed: %s",
00464 progname, strerror(net_errno));
00465 (void)machtab_rem(machtab, *eidp, 1);
00466 return (-1);
00467 }
00468
00469
00470
00471
00472
00473
00474 nport = htons(myport);
00475 writesocket(s, &nport, 2);
00476
00477 return (s);
00478 }
00479
00480
00481
00482
00483
00484
00485
00486
00487
00488 int
00489 get_next_message(fd, rec, control)
00490 socket_t fd;
00491 DBT *rec, *control;
00492 {
00493 size_t nr;
00494 u_int32_t rsize, csize;
00495 u_int8_t *recbuf, *controlbuf;
00496
00497
00498
00499
00500
00501
00502
00503
00504
00505
00506
00507 nr = readn(fd, &rsize, 4);
00508 if (nr != 4)
00509 return (1);
00510
00511
00512 if (rsize > 0) {
00513 if (rec->size < rsize)
00514 rec->data = realloc(rec->data, rsize);
00515 recbuf = rec->data;
00516 nr = readn(fd, recbuf, rsize);
00517 } else {
00518 if (rec->data != NULL)
00519 free(rec->data);
00520 rec->data = NULL;
00521 }
00522 rec->size = rsize;
00523
00524
00525 nr = readn(fd, &csize, 4);
00526 if (nr != 4)
00527 return (1);
00528
00529
00530 if (csize > 0) {
00531 controlbuf = control->data;
00532 if (control->size < csize)
00533 controlbuf = realloc(controlbuf, csize);
00534 nr = readn(fd, controlbuf, csize);
00535 if (nr != csize)
00536 return (1);
00537 } else {
00538 if (control->data != NULL)
00539 free(control->data);
00540 controlbuf = NULL;
00541 }
00542 control->data = controlbuf;
00543 control->size = csize;
00544
00545 return (0);
00546 }
00547
00548
00549
00550
00551
00552
00553 ssize_t
00554 readn(fd, vptr, n)
00555 socket_t fd;
00556 void *vptr;
00557 size_t n;
00558 {
00559 size_t nleft;
00560 ssize_t nread;
00561 char *ptr;
00562
00563 ptr = vptr;
00564 nleft = n;
00565 while (nleft > 0) {
00566 if ((nread = readsocket(fd, ptr, nleft)) < 0) {
00567
00568
00569
00570
00571 if (net_errno == EINTR)
00572 nread = 0;
00573 else {
00574 perror("can't read from socket");
00575 return (-1);
00576 }
00577 } else if (nread == 0)
00578 break;
00579
00580 nleft -= nread;
00581 ptr += nread;
00582 }
00583
00584 return (n - nleft);
00585 }
00586
00587
00588
00589
00590
00591 int
00592 quote_send(dbenv, control, rec, lsnp, eid, flags)
00593 DB_ENV *dbenv;
00594 const DBT *control, *rec;
00595 const DB_LSN *lsnp;
00596 int eid;
00597 u_int32_t flags;
00598 {
00599 int n, ret, t_ret;
00600 socket_t fd;
00601 machtab_t *machtab;
00602 member_t *m;
00603
00604 COMPQUIET(lsnp, NULL);
00605 machtab = (machtab_t *)dbenv->app_private;
00606
00607 if (eid == DB_EID_BROADCAST) {
00608
00609
00610
00611
00612
00613 n = quote_send_broadcast(machtab, rec, control, flags);
00614 if (n < 0 )
00615 return (DB_REP_UNAVAIL);
00616 return (0);
00617 }
00618
00619 if ((ret = mutex_lock(&machtab->mtmutex)) != 0) {
00620 dbenv->errx(dbenv, "can't lock mutex");
00621 return (ret);
00622 }
00623
00624 fd = 0;
00625 for (m = LIST_FIRST(&machtab->machlist); m != NULL;
00626 m = LIST_NEXT(m, links)) {
00627 if (m->eid == eid) {
00628 fd = m->fd;
00629 break;
00630 }
00631 }
00632
00633 if (fd == 0) {
00634 dbenv->err(dbenv, DB_REP_UNAVAIL,
00635 "quote_send: cannot find machine ID %d", eid);
00636 return (DB_REP_UNAVAIL);
00637 }
00638
00639 if ((ret = quote_send_one(rec, control, fd, flags)) != 0)
00640 fprintf(stderr, "socket write error in send() function\n");
00641
00642 if ((t_ret = mutex_unlock(&machtab->mtmutex)) != 0) {
00643 dbenv->errx(dbenv, "can't unlock mutex");
00644 if (ret == 0)
00645 ret = t_ret;
00646 }
00647
00648 return (ret);
00649 }
00650
00651
00652
00653
00654
00655
00656
00657 static int
00658 quote_send_broadcast(machtab, rec, control, flags)
00659 machtab_t *machtab;
00660 const DBT *rec, *control;
00661 u_int32_t flags;
00662 {
00663 int ret, sent;
00664 member_t *m, *next;
00665
00666 if ((ret = mutex_lock(&machtab->mtmutex)) != 0) {
00667 fprintf(stderr, "can't lock mutex\n");
00668 return (ret);
00669 }
00670
00671 sent = 0;
00672 for (m = LIST_FIRST(&machtab->machlist); m != NULL; m = next) {
00673 next = LIST_NEXT(m, links);
00674 if ((ret = quote_send_one(rec, control, m->fd, flags)) != 0) {
00675 fprintf(stderr, "socket write error in broadcast\n");
00676 (void)machtab_rem(machtab, m->eid, 0);
00677 } else
00678 sent++;
00679 }
00680
00681 if (mutex_unlock(&machtab->mtmutex) != 0) {
00682 fprintf(stderr, "can't unlock mutex\n");
00683 return (-1);
00684 }
00685
00686 return (sent);
00687 }
00688
00689
00690
00691
00692
00693
00694
00695
00696
00697
00698
00699 static int
00700 quote_send_one(rec, control, fd, flags)
00701 const DBT *rec, *control;
00702 socket_t fd;
00703 u_int32_t flags;
00704
00705 {
00706 int retry;
00707 ssize_t bytes_left, nw;
00708 u_int8_t *wp;
00709
00710 COMPQUIET(flags, 0);
00711
00712
00713
00714
00715
00716 nw = writesocket(fd, (const char *)&rec->size, 4);
00717 if (nw != 4)
00718 return (DB_REP_UNAVAIL);
00719
00720 if (rec->size > 0) {
00721 nw = writesocket(fd, rec->data, rec->size);
00722 if (nw < 0)
00723 return (DB_REP_UNAVAIL);
00724 if (nw != (ssize_t)rec->size) {
00725
00726 wp = (u_int8_t *)rec->data + nw;
00727 bytes_left = rec->size - nw;
00728 for (retry = 0; bytes_left > 0 && retry < 3; retry++) {
00729 nw = writesocket(fd, wp, bytes_left);
00730 if (nw < 0)
00731 return (DB_REP_UNAVAIL);
00732 bytes_left -= nw;
00733 wp += nw;
00734 }
00735 if (bytes_left > 0)
00736 return (DB_REP_UNAVAIL);
00737 }
00738 }
00739
00740 nw = writesocket(fd, (const char *)&control->size, 4);
00741 if (nw != 4)
00742 return (DB_REP_UNAVAIL);
00743 if (control->size > 0) {
00744 nw = writesocket(fd, control->data, control->size);
00745 if (nw != (ssize_t)control->size)
00746 return (DB_REP_UNAVAIL);
00747 }
00748 return (0);
00749 }