00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069 #include "postgres.h"
00070
00071 #include <signal.h>
00072 #include <fcntl.h>
00073 #include <grp.h>
00074 #include <unistd.h>
00075 #include <sys/file.h>
00076 #include <sys/socket.h>
00077 #include <sys/stat.h>
00078 #include <sys/time.h>
00079 #include <netdb.h>
00080 #include <netinet/in.h>
00081 #ifdef HAVE_NETINET_TCP_H
00082 #include <netinet/tcp.h>
00083 #endif
00084 #include <arpa/inet.h>
00085 #ifdef HAVE_UTIME_H
00086 #include <utime.h>
00087 #endif
00088 #ifdef WIN32_ONLY_COMPILER
00089 #include <mstcpip.h>
00090 #endif
00091
00092 #include "libpq/ip.h"
00093 #include "libpq/libpq.h"
00094 #include "miscadmin.h"
00095 #include "storage/ipc.h"
00096 #include "utils/guc.h"
00097 #include "utils/memutils.h"
00098
00099
00100
00101
00102 int Unix_socket_permissions;
00103 char *Unix_socket_group;
00104
00105
00106
00107 static List *sock_paths = NIL;
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117 #define PQ_SEND_BUFFER_SIZE 8192
00118 #define PQ_RECV_BUFFER_SIZE 8192
00119
00120 static char *PqSendBuffer;
00121 static int PqSendBufferSize;
00122 static int PqSendPointer;
00123 static int PqSendStart;
00124
00125 static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
00126 static int PqRecvPointer;
00127 static int PqRecvLength;
00128
00129
00130
00131
00132 static bool PqCommBusy;
00133 static bool DoingCopyOut;
00134
00135
00136
00137 static void pq_close(int code, Datum arg);
00138 static int internal_putbytes(const char *s, size_t len);
00139 static int internal_flush(void);
00140 static void pq_set_nonblocking(bool nonblocking);
00141
00142 #ifdef HAVE_UNIX_SOCKETS
00143 static int Lock_AF_UNIX(char *unixSocketDir, char *unixSocketPath);
00144 static int Setup_AF_UNIX(char *sock_path);
00145 #endif
00146
00147
00148
00149
00150
00151
00152 void
00153 pq_init(void)
00154 {
00155 PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
00156 PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
00157 PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
00158 PqCommBusy = false;
00159 DoingCopyOut = false;
00160 on_proc_exit(pq_close, 0);
00161 }
00162
00163
00164
00165
00166
00167
00168
00169
00170
00171 void
00172 pq_comm_reset(void)
00173 {
00174
00175 PqCommBusy = false;
00176
00177 pq_endcopyout(true);
00178 }
00179
00180
00181
00182
00183
00184
00185
00186
00187 static void
00188 pq_close(int code, Datum arg)
00189 {
00190 if (MyProcPort != NULL)
00191 {
00192 #if defined(ENABLE_GSS) || defined(ENABLE_SSPI)
00193 #ifdef ENABLE_GSS
00194 OM_uint32 min_s;
00195
00196
00197 if (MyProcPort->gss->ctx != GSS_C_NO_CONTEXT)
00198 gss_delete_sec_context(&min_s, &MyProcPort->gss->ctx, NULL);
00199
00200 if (MyProcPort->gss->cred != GSS_C_NO_CREDENTIAL)
00201 gss_release_cred(&min_s, &MyProcPort->gss->cred);
00202 #endif
00203
00204
00205 free(MyProcPort->gss);
00206 #endif
00207
00208
00209 secure_close(MyProcPort);
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221 MyProcPort->sock = PGINVALID_SOCKET;
00222 }
00223 }
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235
00236
00237
00238
00239 #ifdef HAVE_UNIX_SOCKETS
00240 static void
00241 StreamDoUnlink(int code, Datum arg)
00242 {
00243 ListCell *l;
00244
00245
00246 foreach(l, sock_paths)
00247 {
00248 char *sock_path = (char *) lfirst(l);
00249
00250 unlink(sock_path);
00251 }
00252
00253 sock_paths = NIL;
00254 }
00255 #endif
00256
00257
00258
00259
00260
00261
00262
00263
00264
00265
00266
00267
00268
00269
00270
00271 int
00272 StreamServerPort(int family, char *hostName, unsigned short portNumber,
00273 char *unixSocketDir,
00274 pgsocket ListenSocket[], int MaxListen)
00275 {
00276 pgsocket fd;
00277 int err;
00278 int maxconn;
00279 int ret;
00280 char portNumberStr[32];
00281 const char *familyDesc;
00282 char familyDescBuf[64];
00283 char *service;
00284 struct addrinfo *addrs = NULL,
00285 *addr;
00286 struct addrinfo hint;
00287 int listen_index = 0;
00288 int added = 0;
00289
00290 #ifdef HAVE_UNIX_SOCKETS
00291 char unixSocketPath[MAXPGPATH];
00292 #endif
00293 #if !defined(WIN32) || defined(IPV6_V6ONLY)
00294 int one = 1;
00295 #endif
00296
00297
00298 MemSet(&hint, 0, sizeof(hint));
00299 hint.ai_family = family;
00300 hint.ai_flags = AI_PASSIVE;
00301 hint.ai_socktype = SOCK_STREAM;
00302
00303 #ifdef HAVE_UNIX_SOCKETS
00304 if (family == AF_UNIX)
00305 {
00306
00307
00308
00309
00310 UNIXSOCK_PATH(unixSocketPath, portNumber, unixSocketDir);
00311 if (strlen(unixSocketPath) >= UNIXSOCK_PATH_BUFLEN)
00312 {
00313 ereport(LOG,
00314 (errmsg("Unix-domain socket path \"%s\" is too long (maximum %d bytes)",
00315 unixSocketPath,
00316 (int) (UNIXSOCK_PATH_BUFLEN - 1))));
00317 return STATUS_ERROR;
00318 }
00319 if (Lock_AF_UNIX(unixSocketDir, unixSocketPath) != STATUS_OK)
00320 return STATUS_ERROR;
00321 service = unixSocketPath;
00322 }
00323 else
00324 #endif
00325 {
00326 snprintf(portNumberStr, sizeof(portNumberStr), "%d", portNumber);
00327 service = portNumberStr;
00328 }
00329
00330 ret = pg_getaddrinfo_all(hostName, service, &hint, &addrs);
00331 if (ret || !addrs)
00332 {
00333 if (hostName)
00334 ereport(LOG,
00335 (errmsg("could not translate host name \"%s\", service \"%s\" to address: %s",
00336 hostName, service, gai_strerror(ret))));
00337 else
00338 ereport(LOG,
00339 (errmsg("could not translate service \"%s\" to address: %s",
00340 service, gai_strerror(ret))));
00341 if (addrs)
00342 pg_freeaddrinfo_all(hint.ai_family, addrs);
00343 return STATUS_ERROR;
00344 }
00345
00346 for (addr = addrs; addr; addr = addr->ai_next)
00347 {
00348 if (!IS_AF_UNIX(family) && IS_AF_UNIX(addr->ai_family))
00349 {
00350
00351
00352
00353
00354 continue;
00355 }
00356
00357
00358 for (; listen_index < MaxListen; listen_index++)
00359 {
00360 if (ListenSocket[listen_index] == PGINVALID_SOCKET)
00361 break;
00362 }
00363 if (listen_index >= MaxListen)
00364 {
00365 ereport(LOG,
00366 (errmsg("could not bind to all requested addresses: MAXLISTEN (%d) exceeded",
00367 MaxListen)));
00368 break;
00369 }
00370
00371
00372 switch (addr->ai_family)
00373 {
00374 case AF_INET:
00375 familyDesc = _("IPv4");
00376 break;
00377 #ifdef HAVE_IPV6
00378 case AF_INET6:
00379 familyDesc = _("IPv6");
00380 break;
00381 #endif
00382 #ifdef HAVE_UNIX_SOCKETS
00383 case AF_UNIX:
00384 familyDesc = _("Unix");
00385 break;
00386 #endif
00387 default:
00388 snprintf(familyDescBuf, sizeof(familyDescBuf),
00389 _("unrecognized address family %d"),
00390 addr->ai_family);
00391 familyDesc = familyDescBuf;
00392 break;
00393 }
00394
00395 if ((fd = socket(addr->ai_family, SOCK_STREAM, 0)) < 0)
00396 {
00397 ereport(LOG,
00398 (errcode_for_socket_access(),
00399
00400 errmsg("could not create %s socket: %m",
00401 familyDesc)));
00402 continue;
00403 }
00404
00405 #ifndef WIN32
00406
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418 if (!IS_AF_UNIX(addr->ai_family))
00419 {
00420 if ((setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
00421 (char *) &one, sizeof(one))) == -1)
00422 {
00423 ereport(LOG,
00424 (errcode_for_socket_access(),
00425 errmsg("setsockopt(SO_REUSEADDR) failed: %m")));
00426 closesocket(fd);
00427 continue;
00428 }
00429 }
00430 #endif
00431
00432 #ifdef IPV6_V6ONLY
00433 if (addr->ai_family == AF_INET6)
00434 {
00435 if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY,
00436 (char *) &one, sizeof(one)) == -1)
00437 {
00438 ereport(LOG,
00439 (errcode_for_socket_access(),
00440 errmsg("setsockopt(IPV6_V6ONLY) failed: %m")));
00441 closesocket(fd);
00442 continue;
00443 }
00444 }
00445 #endif
00446
00447
00448
00449
00450
00451
00452
00453 err = bind(fd, addr->ai_addr, addr->ai_addrlen);
00454 if (err < 0)
00455 {
00456 ereport(LOG,
00457 (errcode_for_socket_access(),
00458
00459 errmsg("could not bind %s socket: %m",
00460 familyDesc),
00461 (IS_AF_UNIX(addr->ai_family)) ?
00462 errhint("Is another postmaster already running on port %d?"
00463 " If not, remove socket file \"%s\" and retry.",
00464 (int) portNumber, service) :
00465 errhint("Is another postmaster already running on port %d?"
00466 " If not, wait a few seconds and retry.",
00467 (int) portNumber)));
00468 closesocket(fd);
00469 continue;
00470 }
00471
00472 #ifdef HAVE_UNIX_SOCKETS
00473 if (addr->ai_family == AF_UNIX)
00474 {
00475 if (Setup_AF_UNIX(service) != STATUS_OK)
00476 {
00477 closesocket(fd);
00478 break;
00479 }
00480 }
00481 #endif
00482
00483
00484
00485
00486
00487
00488 maxconn = MaxBackends * 2;
00489 if (maxconn > PG_SOMAXCONN)
00490 maxconn = PG_SOMAXCONN;
00491
00492 err = listen(fd, maxconn);
00493 if (err < 0)
00494 {
00495 ereport(LOG,
00496 (errcode_for_socket_access(),
00497
00498 errmsg("could not listen on %s socket: %m",
00499 familyDesc)));
00500 closesocket(fd);
00501 continue;
00502 }
00503 ListenSocket[listen_index] = fd;
00504 added++;
00505 }
00506
00507 pg_freeaddrinfo_all(hint.ai_family, addrs);
00508
00509 if (!added)
00510 return STATUS_ERROR;
00511
00512 return STATUS_OK;
00513 }
00514
00515
00516 #ifdef HAVE_UNIX_SOCKETS
00517
00518
00519
00520
00521 static int
00522 Lock_AF_UNIX(char *unixSocketDir, char *unixSocketPath)
00523 {
00524
00525
00526
00527
00528
00529
00530
00531
00532 CreateSocketLockFile(unixSocketPath, true, unixSocketDir);
00533
00534
00535
00536
00537
00538 unlink(unixSocketPath);
00539
00540
00541
00542
00543
00544
00545 if (sock_paths == NIL)
00546 on_proc_exit(StreamDoUnlink, 0);
00547
00548 sock_paths = lappend(sock_paths, pstrdup(unixSocketPath));
00549
00550 return STATUS_OK;
00551 }
00552
00553
00554
00555
00556
00557 static int
00558 Setup_AF_UNIX(char *sock_path)
00559 {
00560
00561
00562
00563
00564
00565 Assert(Unix_socket_group);
00566 if (Unix_socket_group[0] != '\0')
00567 {
00568 #ifdef WIN32
00569 elog(WARNING, "configuration item unix_socket_group is not supported on this platform");
00570 #else
00571 char *endptr;
00572 unsigned long val;
00573 gid_t gid;
00574
00575 val = strtoul(Unix_socket_group, &endptr, 10);
00576 if (*endptr == '\0')
00577 {
00578 gid = val;
00579 }
00580 else
00581 {
00582 struct group *gr;
00583
00584 gr = getgrnam(Unix_socket_group);
00585 if (!gr)
00586 {
00587 ereport(LOG,
00588 (errmsg("group \"%s\" does not exist",
00589 Unix_socket_group)));
00590 return STATUS_ERROR;
00591 }
00592 gid = gr->gr_gid;
00593 }
00594 if (chown(sock_path, -1, gid) == -1)
00595 {
00596 ereport(LOG,
00597 (errcode_for_file_access(),
00598 errmsg("could not set group of file \"%s\": %m",
00599 sock_path)));
00600 return STATUS_ERROR;
00601 }
00602 #endif
00603 }
00604
00605 if (chmod(sock_path, Unix_socket_permissions) == -1)
00606 {
00607 ereport(LOG,
00608 (errcode_for_file_access(),
00609 errmsg("could not set permissions of file \"%s\": %m",
00610 sock_path)));
00611 return STATUS_ERROR;
00612 }
00613 return STATUS_OK;
00614 }
00615 #endif
00616
00617
00618
00619
00620
00621
00622
00623
00624
00625
00626
00627
00628 int
00629 StreamConnection(pgsocket server_fd, Port *port)
00630 {
00631
00632 port->raddr.salen = sizeof(port->raddr.addr);
00633 if ((port->sock = accept(server_fd,
00634 (struct sockaddr *) & port->raddr.addr,
00635 &port->raddr.salen)) < 0)
00636 {
00637 ereport(LOG,
00638 (errcode_for_socket_access(),
00639 errmsg("could not accept new connection: %m")));
00640
00641
00642
00643
00644
00645
00646
00647
00648 pg_usleep(100000L);
00649 return STATUS_ERROR;
00650 }
00651
00652 #ifdef SCO_ACCEPT_BUG
00653
00654
00655
00656
00657
00658 if (port->raddr.addr.ss_family == 0)
00659 port->raddr.addr.ss_family = AF_UNIX;
00660 #endif
00661
00662
00663 port->laddr.salen = sizeof(port->laddr.addr);
00664 if (getsockname(port->sock,
00665 (struct sockaddr *) & port->laddr.addr,
00666 &port->laddr.salen) < 0)
00667 {
00668 elog(LOG, "getsockname() failed: %m");
00669 return STATUS_ERROR;
00670 }
00671
00672
00673 if (!IS_AF_UNIX(port->laddr.addr.ss_family))
00674 {
00675 int on;
00676
00677 #ifdef TCP_NODELAY
00678 on = 1;
00679 if (setsockopt(port->sock, IPPROTO_TCP, TCP_NODELAY,
00680 (char *) &on, sizeof(on)) < 0)
00681 {
00682 elog(LOG, "setsockopt(TCP_NODELAY) failed: %m");
00683 return STATUS_ERROR;
00684 }
00685 #endif
00686 on = 1;
00687 if (setsockopt(port->sock, SOL_SOCKET, SO_KEEPALIVE,
00688 (char *) &on, sizeof(on)) < 0)
00689 {
00690 elog(LOG, "setsockopt(SO_KEEPALIVE) failed: %m");
00691 return STATUS_ERROR;
00692 }
00693
00694 #ifdef WIN32
00695
00696
00697
00698
00699
00700 on = PQ_SEND_BUFFER_SIZE * 4;
00701 if (setsockopt(port->sock, SOL_SOCKET, SO_SNDBUF, (char *) &on,
00702 sizeof(on)) < 0)
00703 {
00704 elog(LOG, "setsockopt(SO_SNDBUF) failed: %m");
00705 return STATUS_ERROR;
00706 }
00707 #endif
00708
00709
00710
00711
00712
00713
00714
00715
00716 (void) pq_setkeepalivesidle(tcp_keepalives_idle, port);
00717 (void) pq_setkeepalivesinterval(tcp_keepalives_interval, port);
00718 (void) pq_setkeepalivescount(tcp_keepalives_count, port);
00719 }
00720
00721 return STATUS_OK;
00722 }
00723
00724
00725
00726
00727
00728
00729
00730
00731
00732
00733
00734 void
00735 StreamClose(pgsocket sock)
00736 {
00737 closesocket(sock);
00738 }
00739
00740
00741
00742
00743
00744
00745
00746
00747
00748
00749 void
00750 TouchSocketFiles(void)
00751 {
00752 ListCell *l;
00753
00754
00755 foreach(l, sock_paths)
00756 {
00757 char *sock_path = (char *) lfirst(l);
00758
00759
00760
00761
00762
00763
00764
00765
00766 #ifdef HAVE_UTIME
00767 utime(sock_path, NULL);
00768 #else
00769 #ifdef HAVE_UTIMES
00770 utimes(sock_path, NULL);
00771 #endif
00772 #endif
00773 }
00774 }
00775
00776
00777
00778
00779
00780
00781
00782
00783
00784
00785
00786
00787
00788
00789
00790
00791
00792 static void
00793 pq_set_nonblocking(bool nonblocking)
00794 {
00795 if (MyProcPort->noblock == nonblocking)
00796 return;
00797
00798 #ifdef WIN32
00799 pgwin32_noblock = nonblocking ? 1 : 0;
00800 #else
00801
00802
00803
00804
00805
00806
00807 if (nonblocking)
00808 {
00809 if (!pg_set_noblock(MyProcPort->sock))
00810 ereport(COMMERROR,
00811 (errmsg("could not set socket to nonblocking mode: %m")));
00812 }
00813 else
00814 {
00815 if (!pg_set_block(MyProcPort->sock))
00816 ereport(COMMERROR,
00817 (errmsg("could not set socket to blocking mode: %m")));
00818 }
00819 #endif
00820 MyProcPort->noblock = nonblocking;
00821 }
00822
00823
00824
00825
00826
00827
00828
00829 static int
00830 pq_recvbuf(void)
00831 {
00832 if (PqRecvPointer > 0)
00833 {
00834 if (PqRecvLength > PqRecvPointer)
00835 {
00836
00837 memmove(PqRecvBuffer, PqRecvBuffer + PqRecvPointer,
00838 PqRecvLength - PqRecvPointer);
00839 PqRecvLength -= PqRecvPointer;
00840 PqRecvPointer = 0;
00841 }
00842 else
00843 PqRecvLength = PqRecvPointer = 0;
00844 }
00845
00846
00847 pq_set_nonblocking(false);
00848
00849
00850 for (;;)
00851 {
00852 int r;
00853
00854 r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
00855 PQ_RECV_BUFFER_SIZE - PqRecvLength);
00856
00857 if (r < 0)
00858 {
00859 if (errno == EINTR)
00860 continue;
00861
00862
00863
00864
00865
00866
00867 ereport(COMMERROR,
00868 (errcode_for_socket_access(),
00869 errmsg("could not receive data from client: %m")));
00870 return EOF;
00871 }
00872 if (r == 0)
00873 {
00874
00875
00876
00877
00878 return EOF;
00879 }
00880
00881 PqRecvLength += r;
00882 return 0;
00883 }
00884 }
00885
00886
00887
00888
00889
00890 int
00891 pq_getbyte(void)
00892 {
00893 while (PqRecvPointer >= PqRecvLength)
00894 {
00895 if (pq_recvbuf())
00896 return EOF;
00897 }
00898 return (unsigned char) PqRecvBuffer[PqRecvPointer++];
00899 }
00900
00901
00902
00903
00904
00905
00906
00907 int
00908 pq_peekbyte(void)
00909 {
00910 while (PqRecvPointer >= PqRecvLength)
00911 {
00912 if (pq_recvbuf())
00913 return EOF;
00914 }
00915 return (unsigned char) PqRecvBuffer[PqRecvPointer];
00916 }
00917
00918
00919
00920
00921
00922
00923
00924
00925
00926 int
00927 pq_getbyte_if_available(unsigned char *c)
00928 {
00929 int r;
00930
00931 if (PqRecvPointer < PqRecvLength)
00932 {
00933 *c = PqRecvBuffer[PqRecvPointer++];
00934 return 1;
00935 }
00936
00937
00938 pq_set_nonblocking(true);
00939
00940 r = secure_read(MyProcPort, c, 1);
00941 if (r < 0)
00942 {
00943
00944
00945
00946
00947
00948 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
00949 r = 0;
00950 else
00951 {
00952
00953
00954
00955
00956
00957 ereport(COMMERROR,
00958 (errcode_for_socket_access(),
00959 errmsg("could not receive data from client: %m")));
00960 r = EOF;
00961 }
00962 }
00963 else if (r == 0)
00964 {
00965
00966 r = EOF;
00967 }
00968
00969 return r;
00970 }
00971
00972
00973
00974
00975
00976
00977
00978 int
00979 pq_getbytes(char *s, size_t len)
00980 {
00981 size_t amount;
00982
00983 while (len > 0)
00984 {
00985 while (PqRecvPointer >= PqRecvLength)
00986 {
00987 if (pq_recvbuf())
00988 return EOF;
00989 }
00990 amount = PqRecvLength - PqRecvPointer;
00991 if (amount > len)
00992 amount = len;
00993 memcpy(s, PqRecvBuffer + PqRecvPointer, amount);
00994 PqRecvPointer += amount;
00995 s += amount;
00996 len -= amount;
00997 }
00998 return 0;
00999 }
01000
01001
01002
01003
01004
01005
01006
01007
01008
01009
01010 static int
01011 pq_discardbytes(size_t len)
01012 {
01013 size_t amount;
01014
01015 while (len > 0)
01016 {
01017 while (PqRecvPointer >= PqRecvLength)
01018 {
01019 if (pq_recvbuf())
01020 return EOF;
01021 }
01022 amount = PqRecvLength - PqRecvPointer;
01023 if (amount > len)
01024 amount = len;
01025 PqRecvPointer += amount;
01026 len -= amount;
01027 }
01028 return 0;
01029 }
01030
01031
01032
01033
01034
01035
01036
01037
01038
01039
01040
01041
01042
01043
01044
01045
01046 int
01047 pq_getstring(StringInfo s)
01048 {
01049 int i;
01050
01051 resetStringInfo(s);
01052
01053
01054 for (;;)
01055 {
01056 while (PqRecvPointer >= PqRecvLength)
01057 {
01058 if (pq_recvbuf())
01059 return EOF;
01060 }
01061
01062 for (i = PqRecvPointer; i < PqRecvLength; i++)
01063 {
01064 if (PqRecvBuffer[i] == '\0')
01065 {
01066
01067 appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer,
01068 i - PqRecvPointer + 1);
01069 PqRecvPointer = i + 1;
01070 return 0;
01071 }
01072 }
01073
01074
01075 appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer,
01076 PqRecvLength - PqRecvPointer);
01077 PqRecvPointer = PqRecvLength;
01078 }
01079 }
01080
01081
01082
01083
01084
01085
01086
01087
01088
01089
01090
01091
01092
01093
01094
01095
01096
01097
01098 int
01099 pq_getmessage(StringInfo s, int maxlen)
01100 {
01101 int32 len;
01102
01103 resetStringInfo(s);
01104
01105
01106 if (pq_getbytes((char *) &len, 4) == EOF)
01107 {
01108 ereport(COMMERROR,
01109 (errcode(ERRCODE_PROTOCOL_VIOLATION),
01110 errmsg("unexpected EOF within message length word")));
01111 return EOF;
01112 }
01113
01114 len = ntohl(len);
01115
01116 if (len < 4 ||
01117 (maxlen > 0 && len > maxlen))
01118 {
01119 ereport(COMMERROR,
01120 (errcode(ERRCODE_PROTOCOL_VIOLATION),
01121 errmsg("invalid message length")));
01122 return EOF;
01123 }
01124
01125 len -= 4;
01126
01127 if (len > 0)
01128 {
01129
01130
01131
01132
01133
01134 PG_TRY();
01135 {
01136 enlargeStringInfo(s, len);
01137 }
01138 PG_CATCH();
01139 {
01140 if (pq_discardbytes(len) == EOF)
01141 ereport(COMMERROR,
01142 (errcode(ERRCODE_PROTOCOL_VIOLATION),
01143 errmsg("incomplete message from client")));
01144 PG_RE_THROW();
01145 }
01146 PG_END_TRY();
01147
01148
01149 if (pq_getbytes(s->data, len) == EOF)
01150 {
01151 ereport(COMMERROR,
01152 (errcode(ERRCODE_PROTOCOL_VIOLATION),
01153 errmsg("incomplete message from client")));
01154 return EOF;
01155 }
01156 s->len = len;
01157
01158 s->data[len] = '\0';
01159 }
01160
01161 return 0;
01162 }
01163
01164
01165
01166
01167
01168
01169
01170
01171 int
01172 pq_putbytes(const char *s, size_t len)
01173 {
01174 int res;
01175
01176
01177 Assert(DoingCopyOut);
01178
01179 if (PqCommBusy)
01180 return 0;
01181 PqCommBusy = true;
01182 res = internal_putbytes(s, len);
01183 PqCommBusy = false;
01184 return res;
01185 }
01186
01187 static int
01188 internal_putbytes(const char *s, size_t len)
01189 {
01190 size_t amount;
01191
01192 while (len > 0)
01193 {
01194
01195 if (PqSendPointer >= PqSendBufferSize)
01196 {
01197 pq_set_nonblocking(false);
01198 if (internal_flush())
01199 return EOF;
01200 }
01201 amount = PqSendBufferSize - PqSendPointer;
01202 if (amount > len)
01203 amount = len;
01204 memcpy(PqSendBuffer + PqSendPointer, s, amount);
01205 PqSendPointer += amount;
01206 s += amount;
01207 len -= amount;
01208 }
01209 return 0;
01210 }
01211
01212
01213
01214
01215
01216
01217
01218 int
01219 pq_flush(void)
01220 {
01221 int res;
01222
01223
01224 if (PqCommBusy)
01225 return 0;
01226 PqCommBusy = true;
01227 pq_set_nonblocking(false);
01228 res = internal_flush();
01229 PqCommBusy = false;
01230 return res;
01231 }
01232
01233
01234
01235
01236
01237
01238
01239
01240 static int
01241 internal_flush(void)
01242 {
01243 static int last_reported_send_errno = 0;
01244
01245 char *bufptr = PqSendBuffer + PqSendStart;
01246 char *bufend = PqSendBuffer + PqSendPointer;
01247
01248 while (bufptr < bufend)
01249 {
01250 int r;
01251
01252 r = secure_write(MyProcPort, bufptr, bufend - bufptr);
01253
01254 if (r <= 0)
01255 {
01256 if (errno == EINTR)
01257 continue;
01258
01259
01260
01261
01262
01263 if (errno == EAGAIN ||
01264 errno == EWOULDBLOCK)
01265 {
01266 return 0;
01267 }
01268
01269
01270
01271
01272
01273
01274
01275
01276
01277
01278 if (errno != last_reported_send_errno)
01279 {
01280 last_reported_send_errno = errno;
01281 ereport(COMMERROR,
01282 (errcode_for_socket_access(),
01283 errmsg("could not send data to client: %m")));
01284 }
01285
01286
01287
01288
01289
01290
01291
01292 PqSendStart = PqSendPointer = 0;
01293 ClientConnectionLost = 1;
01294 InterruptPending = 1;
01295 return EOF;
01296 }
01297
01298 last_reported_send_errno = 0;
01299 bufptr += r;
01300 PqSendStart += r;
01301 }
01302
01303 PqSendStart = PqSendPointer = 0;
01304 return 0;
01305 }
01306
01307
01308
01309
01310
01311
01312
01313 int
01314 pq_flush_if_writable(void)
01315 {
01316 int res;
01317
01318
01319 if (PqSendPointer == PqSendStart)
01320 return 0;
01321
01322
01323 if (PqCommBusy)
01324 return 0;
01325
01326
01327 pq_set_nonblocking(true);
01328
01329 PqCommBusy = true;
01330 res = internal_flush();
01331 PqCommBusy = false;
01332 return res;
01333 }
01334
01335
01336
01337
01338
01339 bool
01340 pq_is_send_pending(void)
01341 {
01342 return (PqSendStart < PqSendPointer);
01343 }
01344
01345
01346
01347
01348
01349
01350
01351
01352
01353
01354
01355
01356
01357
01358
01359
01360
01361
01362
01363
01364
01365
01366
01367
01368
01369
01370
01371
01372
01373
01374
01375
01376
01377
01378 int
01379 pq_putmessage(char msgtype, const char *s, size_t len)
01380 {
01381 if (DoingCopyOut || PqCommBusy)
01382 return 0;
01383 PqCommBusy = true;
01384 if (msgtype)
01385 if (internal_putbytes(&msgtype, 1))
01386 goto fail;
01387 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
01388 {
01389 uint32 n32;
01390
01391 n32 = htonl((uint32) (len + 4));
01392 if (internal_putbytes((char *) &n32, 4))
01393 goto fail;
01394 }
01395 if (internal_putbytes(s, len))
01396 goto fail;
01397 PqCommBusy = false;
01398 return 0;
01399
01400 fail:
01401 PqCommBusy = false;
01402 return EOF;
01403 }
01404
01405
01406
01407
01408
01409
01410
01411 void
01412 pq_putmessage_noblock(char msgtype, const char *s, size_t len)
01413 {
01414 int res PG_USED_FOR_ASSERTS_ONLY;
01415 int required;
01416
01417
01418
01419
01420
01421 required = PqSendPointer + 1 + 4 + len;
01422 if (required > PqSendBufferSize)
01423 {
01424 PqSendBuffer = repalloc(PqSendBuffer, required);
01425 PqSendBufferSize = required;
01426 }
01427 res = pq_putmessage(msgtype, s, len);
01428 Assert(res == 0);
01429
01430 }
01431
01432
01433
01434
01435
01436
01437
01438 void
01439 pq_startcopyout(void)
01440 {
01441 DoingCopyOut = true;
01442 }
01443
01444
01445
01446
01447
01448
01449
01450
01451
01452
01453
01454 void
01455 pq_endcopyout(bool errorAbort)
01456 {
01457 if (!DoingCopyOut)
01458 return;
01459 if (errorAbort)
01460 pq_putbytes("\n\n\\.\n", 5);
01461
01462 DoingCopyOut = false;
01463 }
01464
01465
01466
01467
01468
01469
01470
01471
01472
01473
01474
01475
01476 #if defined(WIN32) && defined(SIO_KEEPALIVE_VALS)
01477 static int
01478 pq_setkeepaliveswin32(Port *port, int idle, int interval)
01479 {
01480 struct tcp_keepalive ka;
01481 DWORD retsize;
01482
01483 if (idle <= 0)
01484 idle = 2 * 60 * 60;
01485 if (interval <= 0)
01486 interval = 1;
01487
01488 ka.onoff = 1;
01489 ka.keepalivetime = idle * 1000;
01490 ka.keepaliveinterval = interval * 1000;
01491
01492 if (WSAIoctl(port->sock,
01493 SIO_KEEPALIVE_VALS,
01494 (LPVOID) &ka,
01495 sizeof(ka),
01496 NULL,
01497 0,
01498 &retsize,
01499 NULL,
01500 NULL)
01501 != 0)
01502 {
01503 elog(LOG, "WSAIoctl(SIO_KEEPALIVE_VALS) failed: %ui",
01504 WSAGetLastError());
01505 return STATUS_ERROR;
01506 }
01507 if (port->keepalives_idle != idle)
01508 port->keepalives_idle = idle;
01509 if (port->keepalives_interval != interval)
01510 port->keepalives_interval = interval;
01511 return STATUS_OK;
01512 }
01513 #endif
01514
01515 int
01516 pq_getkeepalivesidle(Port *port)
01517 {
01518 #if defined(TCP_KEEPIDLE) || defined(TCP_KEEPALIVE) || defined(WIN32)
01519 if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
01520 return 0;
01521
01522 if (port->keepalives_idle != 0)
01523 return port->keepalives_idle;
01524
01525 if (port->default_keepalives_idle == 0)
01526 {
01527 #ifndef WIN32
01528 ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_idle);
01529
01530 #ifdef TCP_KEEPIDLE
01531 if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPIDLE,
01532 (char *) &port->default_keepalives_idle,
01533 &size) < 0)
01534 {
01535 elog(LOG, "getsockopt(TCP_KEEPIDLE) failed: %m");
01536 port->default_keepalives_idle = -1;
01537 }
01538 #else
01539 if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPALIVE,
01540 (char *) &port->default_keepalives_idle,
01541 &size) < 0)
01542 {
01543 elog(LOG, "getsockopt(TCP_KEEPALIVE) failed: %m");
01544 port->default_keepalives_idle = -1;
01545 }
01546 #endif
01547 #else
01548
01549 port->default_keepalives_idle = -1;
01550 #endif
01551 }
01552
01553 return port->default_keepalives_idle;
01554 #else
01555 return 0;
01556 #endif
01557 }
01558
01559 int
01560 pq_setkeepalivesidle(int idle, Port *port)
01561 {
01562 if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
01563 return STATUS_OK;
01564
01565 #if defined(TCP_KEEPIDLE) || defined(TCP_KEEPALIVE) || defined(SIO_KEEPALIVE_VALS)
01566 if (idle == port->keepalives_idle)
01567 return STATUS_OK;
01568
01569 #ifndef WIN32
01570 if (port->default_keepalives_idle <= 0)
01571 {
01572 if (pq_getkeepalivesidle(port) < 0)
01573 {
01574 if (idle == 0)
01575 return STATUS_OK;
01576 else
01577 return STATUS_ERROR;
01578 }
01579 }
01580
01581 if (idle == 0)
01582 idle = port->default_keepalives_idle;
01583
01584 #ifdef TCP_KEEPIDLE
01585 if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPIDLE,
01586 (char *) &idle, sizeof(idle)) < 0)
01587 {
01588 elog(LOG, "setsockopt(TCP_KEEPIDLE) failed: %m");
01589 return STATUS_ERROR;
01590 }
01591 #else
01592 if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPALIVE,
01593 (char *) &idle, sizeof(idle)) < 0)
01594 {
01595 elog(LOG, "setsockopt(TCP_KEEPALIVE) failed: %m");
01596 return STATUS_ERROR;
01597 }
01598 #endif
01599
01600 port->keepalives_idle = idle;
01601 #else
01602 return pq_setkeepaliveswin32(port, idle, port->keepalives_interval);
01603 #endif
01604 #else
01605 if (idle != 0)
01606 {
01607 elog(LOG, "setting the keepalive idle time is not supported");
01608 return STATUS_ERROR;
01609 }
01610 #endif
01611 return STATUS_OK;
01612 }
01613
01614 int
01615 pq_getkeepalivesinterval(Port *port)
01616 {
01617 #if defined(TCP_KEEPINTVL) || defined(SIO_KEEPALIVE_VALS)
01618 if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
01619 return 0;
01620
01621 if (port->keepalives_interval != 0)
01622 return port->keepalives_interval;
01623
01624 if (port->default_keepalives_interval == 0)
01625 {
01626 #ifndef WIN32
01627 ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_interval);
01628
01629 if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPINTVL,
01630 (char *) &port->default_keepalives_interval,
01631 &size) < 0)
01632 {
01633 elog(LOG, "getsockopt(TCP_KEEPINTVL) failed: %m");
01634 port->default_keepalives_interval = -1;
01635 }
01636 #else
01637
01638 port->default_keepalives_interval = -1;
01639 #endif
01640 }
01641
01642 return port->default_keepalives_interval;
01643 #else
01644 return 0;
01645 #endif
01646 }
01647
01648 int
01649 pq_setkeepalivesinterval(int interval, Port *port)
01650 {
01651 if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
01652 return STATUS_OK;
01653
01654 #if defined(TCP_KEEPINTVL) || defined (SIO_KEEPALIVE_VALS)
01655 if (interval == port->keepalives_interval)
01656 return STATUS_OK;
01657
01658 #ifndef WIN32
01659 if (port->default_keepalives_interval <= 0)
01660 {
01661 if (pq_getkeepalivesinterval(port) < 0)
01662 {
01663 if (interval == 0)
01664 return STATUS_OK;
01665 else
01666 return STATUS_ERROR;
01667 }
01668 }
01669
01670 if (interval == 0)
01671 interval = port->default_keepalives_interval;
01672
01673 if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPINTVL,
01674 (char *) &interval, sizeof(interval)) < 0)
01675 {
01676 elog(LOG, "setsockopt(TCP_KEEPINTVL) failed: %m");
01677 return STATUS_ERROR;
01678 }
01679
01680 port->keepalives_interval = interval;
01681 #else
01682 return pq_setkeepaliveswin32(port, port->keepalives_idle, interval);
01683 #endif
01684 #else
01685 if (interval != 0)
01686 {
01687 elog(LOG, "setsockopt(TCP_KEEPINTVL) not supported");
01688 return STATUS_ERROR;
01689 }
01690 #endif
01691
01692 return STATUS_OK;
01693 }
01694
01695 int
01696 pq_getkeepalivescount(Port *port)
01697 {
01698 #ifdef TCP_KEEPCNT
01699 if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
01700 return 0;
01701
01702 if (port->keepalives_count != 0)
01703 return port->keepalives_count;
01704
01705 if (port->default_keepalives_count == 0)
01706 {
01707 ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_count);
01708
01709 if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPCNT,
01710 (char *) &port->default_keepalives_count,
01711 &size) < 0)
01712 {
01713 elog(LOG, "getsockopt(TCP_KEEPCNT) failed: %m");
01714 port->default_keepalives_count = -1;
01715 }
01716 }
01717
01718 return port->default_keepalives_count;
01719 #else
01720 return 0;
01721 #endif
01722 }
01723
01724 int
01725 pq_setkeepalivescount(int count, Port *port)
01726 {
01727 if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
01728 return STATUS_OK;
01729
01730 #ifdef TCP_KEEPCNT
01731 if (count == port->keepalives_count)
01732 return STATUS_OK;
01733
01734 if (port->default_keepalives_count <= 0)
01735 {
01736 if (pq_getkeepalivescount(port) < 0)
01737 {
01738 if (count == 0)
01739 return STATUS_OK;
01740 else
01741 return STATUS_ERROR;
01742 }
01743 }
01744
01745 if (count == 0)
01746 count = port->default_keepalives_count;
01747
01748 if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPCNT,
01749 (char *) &count, sizeof(count)) < 0)
01750 {
01751 elog(LOG, "setsockopt(TCP_KEEPCNT) failed: %m");
01752 return STATUS_ERROR;
01753 }
01754
01755 port->keepalives_count = count;
01756 #else
01757 if (count != 0)
01758 {
01759 elog(LOG, "setsockopt(TCP_KEEPCNT) not supported");
01760 return STATUS_ERROR;
01761 }
01762 #endif
01763
01764 return STATUS_OK;
01765 }