#include "postgres.h"#include <signal.h>#include <fcntl.h>#include <grp.h>#include <unistd.h>#include <sys/file.h>#include <sys/socket.h>#include <sys/stat.h>#include <sys/time.h>#include <netdb.h>#include <netinet/in.h>#include <arpa/inet.h>#include "libpq/ip.h"#include "libpq/libpq.h"#include "miscadmin.h"#include "storage/ipc.h"#include "utils/guc.h"#include "utils/memutils.h"
Go to the source code of this file.
Defines | |
| #define | PQ_SEND_BUFFER_SIZE 8192 |
| #define | PQ_RECV_BUFFER_SIZE 8192 |
Functions | |
| static void | pq_close (int code, Datum arg) |
| static int | internal_putbytes (const char *s, size_t len) |
| static int | internal_flush (void) |
| static void | pq_set_nonblocking (bool nonblocking) |
| void | pq_init (void) |
| void | pq_comm_reset (void) |
| int | StreamServerPort (int family, char *hostName, unsigned short portNumber, char *unixSocketDir, pgsocket ListenSocket[], int MaxListen) |
| int | StreamConnection (pgsocket server_fd, Port *port) |
| void | StreamClose (pgsocket sock) |
| void | TouchSocketFiles (void) |
| static int | pq_recvbuf (void) |
| int | pq_getbyte (void) |
| int | pq_peekbyte (void) |
| int | pq_getbyte_if_available (unsigned char *c) |
| int | pq_getbytes (char *s, size_t len) |
| static int | pq_discardbytes (size_t len) |
| int | pq_getstring (StringInfo s) |
| int | pq_getmessage (StringInfo s, int maxlen) |
| int | pq_putbytes (const char *s, size_t len) |
| int | pq_flush (void) |
| int | pq_flush_if_writable (void) |
| bool | pq_is_send_pending (void) |
| int | pq_putmessage (char msgtype, const char *s, size_t len) |
| void | pq_putmessage_noblock (char msgtype, const char *s, size_t len) |
| void | pq_startcopyout (void) |
| void | pq_endcopyout (bool errorAbort) |
| int | pq_getkeepalivesidle (Port *port) |
| int | pq_setkeepalivesidle (int idle, Port *port) |
| int | pq_getkeepalivesinterval (Port *port) |
| int | pq_setkeepalivesinterval (int interval, Port *port) |
| int | pq_getkeepalivescount (Port *port) |
| int | pq_setkeepalivescount (int count, Port *port) |
Variables | |
| int | Unix_socket_permissions |
| char * | Unix_socket_group |
| static List * | sock_paths = NIL |
| static char * | PqSendBuffer |
| static int | PqSendBufferSize |
| static int | PqSendPointer |
| static int | PqSendStart |
| static char | PqRecvBuffer [PQ_RECV_BUFFER_SIZE] |
| static int | PqRecvPointer |
| static int | PqRecvLength |
| static bool | PqCommBusy |
| static bool | DoingCopyOut |
| #define PQ_RECV_BUFFER_SIZE 8192 |
Definition at line 118 of file pqcomm.c.
Referenced by pq_recvbuf().
| #define PQ_SEND_BUFFER_SIZE 8192 |
Definition at line 117 of file pqcomm.c.
Referenced by StreamConnection().
| static int internal_flush | ( | void | ) | [static] |
Definition at line 1241 of file pqcomm.c.
References ClientConnectionLost, COMMERROR, EAGAIN, EINTR, ereport, errcode_for_socket_access(), errmsg(), EWOULDBLOCK, InterruptPending, MyProcPort, PqSendBuffer, PqSendPointer, PqSendStart, and secure_write().
Referenced by internal_putbytes(), pq_flush(), and pq_flush_if_writable().
{
static int last_reported_send_errno = 0;
char *bufptr = PqSendBuffer + PqSendStart;
char *bufend = PqSendBuffer + PqSendPointer;
while (bufptr < bufend)
{
int r;
r = secure_write(MyProcPort, bufptr, bufend - bufptr);
if (r <= 0)
{
if (errno == EINTR)
continue; /* Ok if we were interrupted */
/*
* Ok if no data writable without blocking, and the socket is in
* non-blocking mode.
*/
if (errno == EAGAIN ||
errno == EWOULDBLOCK)
{
return 0;
}
/*
* Careful: an ereport() that tries to write to the client would
* cause recursion to here, leading to stack overflow and core
* dump! This message must go *only* to the postmaster log.
*
* If a client disconnects while we're in the midst of output, we
* might write quite a bit of data before we get to a safe query
* abort point. So, suppress duplicate log messages.
*/
if (errno != last_reported_send_errno)
{
last_reported_send_errno = errno;
ereport(COMMERROR,
(errcode_for_socket_access(),
errmsg("could not send data to client: %m")));
}
/*
* We drop the buffered data anyway so that processing can
* continue, even though we'll probably quit soon. We also set a
* flag that'll cause the next CHECK_FOR_INTERRUPTS to terminate
* the connection.
*/
PqSendStart = PqSendPointer = 0;
ClientConnectionLost = 1;
InterruptPending = 1;
return EOF;
}
last_reported_send_errno = 0; /* reset after any successful send */
bufptr += r;
PqSendStart += r;
}
PqSendStart = PqSendPointer = 0;
return 0;
}
| static int internal_putbytes | ( | const char * | s, | |
| size_t | len | |||
| ) | [static] |
Definition at line 1188 of file pqcomm.c.
References internal_flush(), pq_set_nonblocking(), PqSendBuffer, PqSendBufferSize, and PqSendPointer.
Referenced by pq_putbytes(), and pq_putmessage().
{
size_t amount;
while (len > 0)
{
/* If buffer is full, then flush it out */
if (PqSendPointer >= PqSendBufferSize)
{
pq_set_nonblocking(false);
if (internal_flush())
return EOF;
}
amount = PqSendBufferSize - PqSendPointer;
if (amount > len)
amount = len;
memcpy(PqSendBuffer + PqSendPointer, s, amount);
PqSendPointer += amount;
s += amount;
len -= amount;
}
return 0;
}
| static void pq_close | ( | int | code, | |
| Datum | arg | |||
| ) | [static] |
Definition at line 188 of file pqcomm.c.
References free, Port::gss, MyProcPort, NULL, secure_close(), and Port::sock.
Referenced by pq_init().
{
if (MyProcPort != NULL)
{
#if defined(ENABLE_GSS) || defined(ENABLE_SSPI)
#ifdef ENABLE_GSS
OM_uint32 min_s;
/* Shutdown GSSAPI layer */
if (MyProcPort->gss->ctx != GSS_C_NO_CONTEXT)
gss_delete_sec_context(&min_s, &MyProcPort->gss->ctx, NULL);
if (MyProcPort->gss->cred != GSS_C_NO_CREDENTIAL)
gss_release_cred(&min_s, &MyProcPort->gss->cred);
#endif /* ENABLE_GSS */
/* GSS and SSPI share the port->gss struct */
free(MyProcPort->gss);
#endif /* ENABLE_GSS || ENABLE_SSPI */
/* Cleanly shut down SSL layer */
secure_close(MyProcPort);
/*
* Formerly we did an explicit close() here, but it seems better to
* leave the socket open until the process dies. This allows clients
* to perform a "synchronous close" if they care --- wait till the
* transport layer reports connection closure, and you can be sure the
* backend has exited.
*
* We do set sock to PGINVALID_SOCKET to prevent any further I/O,
* though.
*/
MyProcPort->sock = PGINVALID_SOCKET;
}
}
| void pq_comm_reset | ( | void | ) |
Definition at line 172 of file pqcomm.c.
References pq_endcopyout(), and PqCommBusy.
Referenced by PostgresMain().
{
/* Do not throw away pending data, but do reset the busy flag */
PqCommBusy = false;
/* We can abort any old-style COPY OUT, too */
pq_endcopyout(true);
}
| static int pq_discardbytes | ( | size_t | len | ) | [static] |
Definition at line 1011 of file pqcomm.c.
References pq_recvbuf(), PqRecvLength, and PqRecvPointer.
Referenced by pq_getmessage().
{
size_t amount;
while (len > 0)
{
while (PqRecvPointer >= PqRecvLength)
{
if (pq_recvbuf()) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
amount = PqRecvLength - PqRecvPointer;
if (amount > len)
amount = len;
PqRecvPointer += amount;
len -= amount;
}
return 0;
}
| void pq_endcopyout | ( | bool | errorAbort | ) |
Definition at line 1455 of file pqcomm.c.
References DoingCopyOut, and pq_putbytes().
Referenced by DoCopyTo(), errfinish(), pq_comm_reset(), and SendCopyEnd().
{
if (!DoingCopyOut)
return;
if (errorAbort)
pq_putbytes("\n\n\\.\n", 5);
/* in non-error case, copy.c will have emitted the terminator line */
DoingCopyOut = false;
}
| int pq_flush | ( | void | ) |
Definition at line 1219 of file pqcomm.c.
References internal_flush(), pq_set_nonblocking(), and PqCommBusy.
Referenced by PostgresMain(), ProcessIncomingNotify(), ReadyForQuery(), ReceiveCopyBegin(), send_message_to_frontend(), sendAuthRequest(), StartReplication(), and WalSndLoop().
{
int res;
/* No-op if reentrant call */
if (PqCommBusy)
return 0;
PqCommBusy = true;
pq_set_nonblocking(false);
res = internal_flush();
PqCommBusy = false;
return res;
}
| int pq_flush_if_writable | ( | void | ) |
Definition at line 1314 of file pqcomm.c.
References internal_flush(), pq_set_nonblocking(), PqCommBusy, PqSendPointer, and PqSendStart.
Referenced by WalSndLoop().
{
int res;
/* Quick exit if nothing to do */
if (PqSendPointer == PqSendStart)
return 0;
/* No-op if reentrant call */
if (PqCommBusy)
return 0;
/* Temporarily put the socket into non-blocking mode */
pq_set_nonblocking(true);
PqCommBusy = true;
res = internal_flush();
PqCommBusy = false;
return res;
}
| int pq_getbyte | ( | void | ) |
Definition at line 891 of file pqcomm.c.
References pq_recvbuf(), PqRecvBuffer, PqRecvLength, and PqRecvPointer.
Referenced by CopyGetData(), recv_password_packet(), and SocketBackend().
{
while (PqRecvPointer >= PqRecvLength)
{
if (pq_recvbuf()) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
return (unsigned char) PqRecvBuffer[PqRecvPointer++];
}
| int pq_getbyte_if_available | ( | unsigned char * | c | ) |
Definition at line 927 of file pqcomm.c.
References COMMERROR, EAGAIN, EINTR, ereport, errcode_for_socket_access(), errmsg(), EWOULDBLOCK, MyProcPort, pq_set_nonblocking(), PqRecvBuffer, PqRecvLength, PqRecvPointer, and secure_read().
Referenced by ProcessRepliesIfAny().
{
int r;
if (PqRecvPointer < PqRecvLength)
{
*c = PqRecvBuffer[PqRecvPointer++];
return 1;
}
/* Put the socket into non-blocking mode */
pq_set_nonblocking(true);
r = secure_read(MyProcPort, c, 1);
if (r < 0)
{
/*
* Ok if no data available without blocking or interrupted (though
* EINTR really shouldn't happen with a non-blocking socket). Report
* other errors.
*/
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
r = 0;
else
{
/*
* Careful: an ereport() that tries to write to the client would
* cause recursion to here, leading to stack overflow and core
* dump! This message must go *only* to the postmaster log.
*/
ereport(COMMERROR,
(errcode_for_socket_access(),
errmsg("could not receive data from client: %m")));
r = EOF;
}
}
else if (r == 0)
{
/* EOF detected */
r = EOF;
}
return r;
}
| int pq_getbytes | ( | char * | s, | |
| size_t | len | |||
| ) |
Definition at line 979 of file pqcomm.c.
References pq_recvbuf(), PqRecvBuffer, PqRecvLength, and PqRecvPointer.
Referenced by CopyGetData(), GetOldFunctionMessage(), pq_getmessage(), and ProcessStartupPacket().
{
size_t amount;
while (len > 0)
{
while (PqRecvPointer >= PqRecvLength)
{
if (pq_recvbuf()) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
amount = PqRecvLength - PqRecvPointer;
if (amount > len)
amount = len;
memcpy(s, PqRecvBuffer + PqRecvPointer, amount);
PqRecvPointer += amount;
s += amount;
len -= amount;
}
return 0;
}
| int pq_getkeepalivescount | ( | Port * | port | ) |
Definition at line 1696 of file pqcomm.c.
References SockAddr::addr, Port::default_keepalives_count, elog, IS_AF_UNIX, Port::keepalives_count, Port::laddr, LOG, NULL, and Port::sock.
Referenced by pq_setkeepalivescount(), and show_tcp_keepalives_count().
{
#ifdef TCP_KEEPCNT
if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
return 0;
if (port->keepalives_count != 0)
return port->keepalives_count;
if (port->default_keepalives_count == 0)
{
ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_count);
if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPCNT,
(char *) &port->default_keepalives_count,
&size) < 0)
{
elog(LOG, "getsockopt(TCP_KEEPCNT) failed: %m");
port->default_keepalives_count = -1; /* don't know */
}
}
return port->default_keepalives_count;
#else
return 0;
#endif
}
| int pq_getkeepalivesidle | ( | Port * | port | ) |
Definition at line 1516 of file pqcomm.c.
References SockAddr::addr, Port::default_keepalives_idle, elog, IS_AF_UNIX, Port::keepalives_idle, Port::laddr, LOG, NULL, and Port::sock.
Referenced by pq_setkeepalivesidle(), and show_tcp_keepalives_idle().
{
#if defined(TCP_KEEPIDLE) || defined(TCP_KEEPALIVE) || defined(WIN32)
if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
return 0;
if (port->keepalives_idle != 0)
return port->keepalives_idle;
if (port->default_keepalives_idle == 0)
{
#ifndef WIN32
ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_idle);
#ifdef TCP_KEEPIDLE
if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPIDLE,
(char *) &port->default_keepalives_idle,
&size) < 0)
{
elog(LOG, "getsockopt(TCP_KEEPIDLE) failed: %m");
port->default_keepalives_idle = -1; /* don't know */
}
#else
if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPALIVE,
(char *) &port->default_keepalives_idle,
&size) < 0)
{
elog(LOG, "getsockopt(TCP_KEEPALIVE) failed: %m");
port->default_keepalives_idle = -1; /* don't know */
}
#endif /* TCP_KEEPIDLE */
#else /* WIN32 */
/* We can't get the defaults on Windows, so return "don't know" */
port->default_keepalives_idle = -1;
#endif /* WIN32 */
}
return port->default_keepalives_idle;
#else
return 0;
#endif
}
| int pq_getkeepalivesinterval | ( | Port * | port | ) |
Definition at line 1615 of file pqcomm.c.
References SockAddr::addr, Port::default_keepalives_interval, elog, IS_AF_UNIX, Port::keepalives_interval, Port::laddr, LOG, NULL, and Port::sock.
Referenced by pq_setkeepalivesinterval(), and show_tcp_keepalives_interval().
{
#if defined(TCP_KEEPINTVL) || defined(SIO_KEEPALIVE_VALS)
if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
return 0;
if (port->keepalives_interval != 0)
return port->keepalives_interval;
if (port->default_keepalives_interval == 0)
{
#ifndef WIN32
ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_interval);
if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPINTVL,
(char *) &port->default_keepalives_interval,
&size) < 0)
{
elog(LOG, "getsockopt(TCP_KEEPINTVL) failed: %m");
port->default_keepalives_interval = -1; /* don't know */
}
#else
/* We can't get the defaults on Windows, so return "don't know" */
port->default_keepalives_interval = -1;
#endif /* WIN32 */
}
return port->default_keepalives_interval;
#else
return 0;
#endif
}
| int pq_getmessage | ( | StringInfo | s, | |
| int | maxlen | |||
| ) |
Definition at line 1099 of file pqcomm.c.
References COMMERROR, StringInfoData::data, enlargeStringInfo(), ereport, errcode(), errmsg(), StringInfoData::len, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pq_discardbytes(), pq_getbytes(), and resetStringInfo().
Referenced by CopyGetData(), ProcessRepliesIfAny(), ProcessStandbyMessage(), recv_password_packet(), and SocketBackend().
{
int32 len;
resetStringInfo(s);
/* Read message length word */
if (pq_getbytes((char *) &len, 4) == EOF)
{
ereport(COMMERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("unexpected EOF within message length word")));
return EOF;
}
len = ntohl(len);
if (len < 4 ||
(maxlen > 0 && len > maxlen))
{
ereport(COMMERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid message length")));
return EOF;
}
len -= 4; /* discount length itself */
if (len > 0)
{
/*
* Allocate space for message. If we run out of room (ridiculously
* large message), we will elog(ERROR), but we want to discard the
* message body so as not to lose communication sync.
*/
PG_TRY();
{
enlargeStringInfo(s, len);
}
PG_CATCH();
{
if (pq_discardbytes(len) == EOF)
ereport(COMMERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("incomplete message from client")));
PG_RE_THROW();
}
PG_END_TRY();
/* And grab the message */
if (pq_getbytes(s->data, len) == EOF)
{
ereport(COMMERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("incomplete message from client")));
return EOF;
}
s->len = len;
/* Place a trailing null per StringInfo convention */
s->data[len] = '\0';
}
return 0;
}
| int pq_getstring | ( | StringInfo | s | ) |
Definition at line 1047 of file pqcomm.c.
References appendBinaryStringInfo(), i, pq_recvbuf(), PqRecvBuffer, PqRecvLength, PqRecvPointer, and resetStringInfo().
Referenced by GetOldFunctionMessage(), and SocketBackend().
{
int i;
resetStringInfo(s);
/* Read until we get the terminating '\0' */
for (;;)
{
while (PqRecvPointer >= PqRecvLength)
{
if (pq_recvbuf()) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
for (i = PqRecvPointer; i < PqRecvLength; i++)
{
if (PqRecvBuffer[i] == '\0')
{
/* include the '\0' in the copy */
appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer,
i - PqRecvPointer + 1);
PqRecvPointer = i + 1; /* advance past \0 */
return 0;
}
}
/* If we're here we haven't got the \0 in the buffer yet. */
appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer,
PqRecvLength - PqRecvPointer);
PqRecvPointer = PqRecvLength;
}
}
| void pq_init | ( | void | ) |
Definition at line 153 of file pqcomm.c.
References DoingCopyOut, MemoryContextAlloc(), on_proc_exit(), pq_close(), PqCommBusy, PqRecvLength, PqRecvPointer, PqSendBuffer, PqSendBufferSize, PqSendPointer, PqSendStart, and TopMemoryContext.
Referenced by BackendInitialize().
{
PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
PqCommBusy = false;
DoingCopyOut = false;
on_proc_exit(pq_close, 0);
}
| bool pq_is_send_pending | ( | void | ) |
Definition at line 1340 of file pqcomm.c.
References PqSendPointer, and PqSendStart.
Referenced by WalSndLoop().
{
return (PqSendStart < PqSendPointer);
}
| int pq_peekbyte | ( | void | ) |
Definition at line 908 of file pqcomm.c.
References pq_recvbuf(), PqRecvBuffer, PqRecvLength, and PqRecvPointer.
Referenced by recv_password_packet().
{
while (PqRecvPointer >= PqRecvLength)
{
if (pq_recvbuf()) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
return (unsigned char) PqRecvBuffer[PqRecvPointer];
}
| int pq_putbytes | ( | const char * | s, | |
| size_t | len | |||
| ) |
Definition at line 1172 of file pqcomm.c.
References Assert, DoingCopyOut, internal_putbytes(), and PqCommBusy.
Referenced by CopySendEndOfRow(), and pq_endcopyout().
{
int res;
/* Should only be called by old-style COPY OUT */
Assert(DoingCopyOut);
/* No-op if reentrant call */
if (PqCommBusy)
return 0;
PqCommBusy = true;
res = internal_putbytes(s, len);
PqCommBusy = false;
return res;
}
| int pq_putmessage | ( | char | msgtype, | |
| const char * | s, | |||
| size_t | len | |||
| ) |
Definition at line 1379 of file pqcomm.c.
References DoingCopyOut, FrontendProtocol, internal_putbytes(), PG_PROTOCOL_MAJOR, and PqCommBusy.
Referenced by _tarWriteHeader(), CopySendEndOfRow(), EndCommand(), NullCommand(), perform_base_backup(), pq_endmessage(), pq_putemptymessage(), pq_putmessage_noblock(), pq_puttextmessage(), sendFile(), and sendFileWithContent().
{
if (DoingCopyOut || PqCommBusy)
return 0;
PqCommBusy = true;
if (msgtype)
if (internal_putbytes(&msgtype, 1))
goto fail;
if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
{
uint32 n32;
n32 = htonl((uint32) (len + 4));
if (internal_putbytes((char *) &n32, 4))
goto fail;
}
if (internal_putbytes(s, len))
goto fail;
PqCommBusy = false;
return 0;
fail:
PqCommBusy = false;
return EOF;
}
| void pq_putmessage_noblock | ( | char | msgtype, | |
| const char * | s, | |||
| size_t | len | |||
| ) |
Definition at line 1412 of file pqcomm.c.
References Assert, pq_putmessage(), PqSendBuffer, PqSendBufferSize, PqSendPointer, and repalloc().
Referenced by ProcessRepliesIfAny(), WalSndKeepalive(), and XLogSend().
{
int res PG_USED_FOR_ASSERTS_ONLY;
int required;
/*
* Ensure we have enough space in the output buffer for the message header
* as well as the message itself.
*/
required = PqSendPointer + 1 + 4 + len;
if (required > PqSendBufferSize)
{
PqSendBuffer = repalloc(PqSendBuffer, required);
PqSendBufferSize = required;
}
res = pq_putmessage(msgtype, s, len);
Assert(res == 0); /* should not fail when the message fits in
* buffer */
}
| static int pq_recvbuf | ( | void | ) | [static] |
Definition at line 830 of file pqcomm.c.
References COMMERROR, EINTR, ereport, errcode_for_socket_access(), errmsg(), memmove, MyProcPort, PQ_RECV_BUFFER_SIZE, pq_set_nonblocking(), PqRecvBuffer, PqRecvLength, PqRecvPointer, and secure_read().
Referenced by pq_discardbytes(), pq_getbyte(), pq_getbytes(), pq_getstring(), and pq_peekbyte().
{
if (PqRecvPointer > 0)
{
if (PqRecvLength > PqRecvPointer)
{
/* still some unread data, left-justify it in the buffer */
memmove(PqRecvBuffer, PqRecvBuffer + PqRecvPointer,
PqRecvLength - PqRecvPointer);
PqRecvLength -= PqRecvPointer;
PqRecvPointer = 0;
}
else
PqRecvLength = PqRecvPointer = 0;
}
/* Ensure that we're in blocking mode */
pq_set_nonblocking(false);
/* Can fill buffer from PqRecvLength and upwards */
for (;;)
{
int r;
r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
PQ_RECV_BUFFER_SIZE - PqRecvLength);
if (r < 0)
{
if (errno == EINTR)
continue; /* Ok if interrupted */
/*
* Careful: an ereport() that tries to write to the client would
* cause recursion to here, leading to stack overflow and core
* dump! This message must go *only* to the postmaster log.
*/
ereport(COMMERROR,
(errcode_for_socket_access(),
errmsg("could not receive data from client: %m")));
return EOF;
}
if (r == 0)
{
/*
* EOF detected. We used to write a log message here, but it's
* better to expect the ultimate caller to do that.
*/
return EOF;
}
/* r contains number of bytes read, so just incr length */
PqRecvLength += r;
return 0;
}
}
| static void pq_set_nonblocking | ( | bool | nonblocking | ) | [static] |
Definition at line 793 of file pqcomm.c.
References COMMERROR, ereport, errmsg(), MyProcPort, Port::noblock, pg_set_block(), pg_set_noblock(), pgwin32_noblock, and Port::sock.
Referenced by internal_putbytes(), pq_flush(), pq_flush_if_writable(), pq_getbyte_if_available(), and pq_recvbuf().
{
if (MyProcPort->noblock == nonblocking)
return;
#ifdef WIN32
pgwin32_noblock = nonblocking ? 1 : 0;
#else
/*
* Use COMMERROR on failure, because ERROR would try to send the error to
* the client, which might require changing the mode again, leading to
* infinite recursion.
*/
if (nonblocking)
{
if (!pg_set_noblock(MyProcPort->sock))
ereport(COMMERROR,
(errmsg("could not set socket to nonblocking mode: %m")));
}
else
{
if (!pg_set_block(MyProcPort->sock))
ereport(COMMERROR,
(errmsg("could not set socket to blocking mode: %m")));
}
#endif
MyProcPort->noblock = nonblocking;
}
| int pq_setkeepalivescount | ( | int | count, | |
| Port * | port | |||
| ) |
Definition at line 1725 of file pqcomm.c.
References SockAddr::addr, Port::default_keepalives_count, elog, IS_AF_UNIX, Port::keepalives_count, Port::laddr, LOG, NULL, pq_getkeepalivescount(), and Port::sock.
Referenced by assign_tcp_keepalives_count(), and StreamConnection().
{
if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
return STATUS_OK;
#ifdef TCP_KEEPCNT
if (count == port->keepalives_count)
return STATUS_OK;
if (port->default_keepalives_count <= 0)
{
if (pq_getkeepalivescount(port) < 0)
{
if (count == 0)
return STATUS_OK; /* default is set but unknown */
else
return STATUS_ERROR;
}
}
if (count == 0)
count = port->default_keepalives_count;
if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPCNT,
(char *) &count, sizeof(count)) < 0)
{
elog(LOG, "setsockopt(TCP_KEEPCNT) failed: %m");
return STATUS_ERROR;
}
port->keepalives_count = count;
#else
if (count != 0)
{
elog(LOG, "setsockopt(TCP_KEEPCNT) not supported");
return STATUS_ERROR;
}
#endif
return STATUS_OK;
}
| int pq_setkeepalivesidle | ( | int | idle, | |
| Port * | port | |||
| ) |
Definition at line 1560 of file pqcomm.c.
References SockAddr::addr, Port::default_keepalives_idle, elog, IS_AF_UNIX, Port::keepalives_idle, Port::keepalives_interval, Port::laddr, LOG, NULL, pq_getkeepalivesidle(), and Port::sock.
Referenced by assign_tcp_keepalives_idle(), and StreamConnection().
{
if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
return STATUS_OK;
#if defined(TCP_KEEPIDLE) || defined(TCP_KEEPALIVE) || defined(SIO_KEEPALIVE_VALS)
if (idle == port->keepalives_idle)
return STATUS_OK;
#ifndef WIN32
if (port->default_keepalives_idle <= 0)
{
if (pq_getkeepalivesidle(port) < 0)
{
if (idle == 0)
return STATUS_OK; /* default is set but unknown */
else
return STATUS_ERROR;
}
}
if (idle == 0)
idle = port->default_keepalives_idle;
#ifdef TCP_KEEPIDLE
if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPIDLE,
(char *) &idle, sizeof(idle)) < 0)
{
elog(LOG, "setsockopt(TCP_KEEPIDLE) failed: %m");
return STATUS_ERROR;
}
#else
if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPALIVE,
(char *) &idle, sizeof(idle)) < 0)
{
elog(LOG, "setsockopt(TCP_KEEPALIVE) failed: %m");
return STATUS_ERROR;
}
#endif
port->keepalives_idle = idle;
#else /* WIN32 */
return pq_setkeepaliveswin32(port, idle, port->keepalives_interval);
#endif
#else /* TCP_KEEPIDLE || SIO_KEEPALIVE_VALS */
if (idle != 0)
{
elog(LOG, "setting the keepalive idle time is not supported");
return STATUS_ERROR;
}
#endif
return STATUS_OK;
}
| int pq_setkeepalivesinterval | ( | int | interval, | |
| Port * | port | |||
| ) |
Definition at line 1649 of file pqcomm.c.
References SockAddr::addr, Port::default_keepalives_interval, elog, IS_AF_UNIX, Port::keepalives_idle, Port::keepalives_interval, Port::laddr, LOG, NULL, pq_getkeepalivesinterval(), and Port::sock.
Referenced by assign_tcp_keepalives_interval(), and StreamConnection().
{
if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
return STATUS_OK;
#if defined(TCP_KEEPINTVL) || defined (SIO_KEEPALIVE_VALS)
if (interval == port->keepalives_interval)
return STATUS_OK;
#ifndef WIN32
if (port->default_keepalives_interval <= 0)
{
if (pq_getkeepalivesinterval(port) < 0)
{
if (interval == 0)
return STATUS_OK; /* default is set but unknown */
else
return STATUS_ERROR;
}
}
if (interval == 0)
interval = port->default_keepalives_interval;
if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPINTVL,
(char *) &interval, sizeof(interval)) < 0)
{
elog(LOG, "setsockopt(TCP_KEEPINTVL) failed: %m");
return STATUS_ERROR;
}
port->keepalives_interval = interval;
#else /* WIN32 */
return pq_setkeepaliveswin32(port, port->keepalives_idle, interval);
#endif
#else
if (interval != 0)
{
elog(LOG, "setsockopt(TCP_KEEPINTVL) not supported");
return STATUS_ERROR;
}
#endif
return STATUS_OK;
}
| void pq_startcopyout | ( | void | ) |
Definition at line 1439 of file pqcomm.c.
References DoingCopyOut.
Referenced by SendCopyBegin().
{
DoingCopyOut = true;
}
| void StreamClose | ( | pgsocket | sock | ) |
Definition at line 735 of file pqcomm.c.
References closesocket.
Referenced by ClosePostmasterPorts(), ConnCreate(), and ServerLoop().
{
closesocket(sock);
}
Definition at line 629 of file pqcomm.c.
References accept, SockAddr::addr, elog, ereport, errcode_for_socket_access(), errmsg(), IS_AF_UNIX, Port::laddr, LOG, pg_usleep(), PQ_SEND_BUFFER_SIZE, pq_setkeepalivescount(), pq_setkeepalivesidle(), pq_setkeepalivesinterval(), Port::raddr, SockAddr::salen, Port::sock, tcp_keepalives_count, tcp_keepalives_idle, and tcp_keepalives_interval.
Referenced by ConnCreate().
{
/* accept connection and fill in the client (remote) address */
port->raddr.salen = sizeof(port->raddr.addr);
if ((port->sock = accept(server_fd,
(struct sockaddr *) & port->raddr.addr,
&port->raddr.salen)) < 0)
{
ereport(LOG,
(errcode_for_socket_access(),
errmsg("could not accept new connection: %m")));
/*
* If accept() fails then postmaster.c will still see the server
* socket as read-ready, and will immediately try again. To avoid
* uselessly sucking lots of CPU, delay a bit before trying again.
* (The most likely reason for failure is being out of kernel file
* table slots; we can do little except hope some will get freed up.)
*/
pg_usleep(100000L); /* wait 0.1 sec */
return STATUS_ERROR;
}
#ifdef SCO_ACCEPT_BUG
/*
* UnixWare 7+ and OpenServer 5.0.4 are known to have this bug, but it
* shouldn't hurt to catch it for all versions of those platforms.
*/
if (port->raddr.addr.ss_family == 0)
port->raddr.addr.ss_family = AF_UNIX;
#endif
/* fill in the server (local) address */
port->laddr.salen = sizeof(port->laddr.addr);
if (getsockname(port->sock,
(struct sockaddr *) & port->laddr.addr,
&port->laddr.salen) < 0)
{
elog(LOG, "getsockname() failed: %m");
return STATUS_ERROR;
}
/* select NODELAY and KEEPALIVE options if it's a TCP connection */
if (!IS_AF_UNIX(port->laddr.addr.ss_family))
{
int on;
#ifdef TCP_NODELAY
on = 1;
if (setsockopt(port->sock, IPPROTO_TCP, TCP_NODELAY,
(char *) &on, sizeof(on)) < 0)
{
elog(LOG, "setsockopt(TCP_NODELAY) failed: %m");
return STATUS_ERROR;
}
#endif
on = 1;
if (setsockopt(port->sock, SOL_SOCKET, SO_KEEPALIVE,
(char *) &on, sizeof(on)) < 0)
{
elog(LOG, "setsockopt(SO_KEEPALIVE) failed: %m");
return STATUS_ERROR;
}
#ifdef WIN32
/*
* This is a Win32 socket optimization. The ideal size is 32k.
* http://support.microsoft.com/kb/823764/EN-US/
*/
on = PQ_SEND_BUFFER_SIZE * 4;
if (setsockopt(port->sock, SOL_SOCKET, SO_SNDBUF, (char *) &on,
sizeof(on)) < 0)
{
elog(LOG, "setsockopt(SO_SNDBUF) failed: %m");
return STATUS_ERROR;
}
#endif
/*
* Also apply the current keepalive parameters. If we fail to set a
* parameter, don't error out, because these aren't universally
* supported. (Note: you might think we need to reset the GUC
* variables to 0 in such a case, but it's not necessary because the
* show hooks for these variables report the truth anyway.)
*/
(void) pq_setkeepalivesidle(tcp_keepalives_idle, port);
(void) pq_setkeepalivesinterval(tcp_keepalives_interval, port);
(void) pq_setkeepalivescount(tcp_keepalives_count, port);
}
return STATUS_OK;
}
| int StreamServerPort | ( | int | family, | |
| char * | hostName, | |||
| unsigned short | portNumber, | |||
| char * | unixSocketDir, | |||
| pgsocket | ListenSocket[], | |||
| int | MaxListen | |||
| ) |
Definition at line 272 of file pqcomm.c.
References _, addrinfo::ai_family, addrinfo::ai_flags, addrinfo::ai_next, addrinfo::ai_socktype, closesocket, ereport, errcode_for_socket_access(), errhint(), errmsg(), gai_strerror, IS_AF_UNIX, LOG, MaxBackends, MemSet, pg_freeaddrinfo_all(), pg_getaddrinfo_all(), PG_SOMAXCONN, PGINVALID_SOCKET, snprintf(), socket, STATUS_OK, UNIXSOCK_PATH, and UNIXSOCK_PATH_BUFLEN.
Referenced by PostmasterMain().
{
pgsocket fd;
int err;
int maxconn;
int ret;
char portNumberStr[32];
const char *familyDesc;
char familyDescBuf[64];
char *service;
struct addrinfo *addrs = NULL,
*addr;
struct addrinfo hint;
int listen_index = 0;
int added = 0;
#ifdef HAVE_UNIX_SOCKETS
char unixSocketPath[MAXPGPATH];
#endif
#if !defined(WIN32) || defined(IPV6_V6ONLY)
int one = 1;
#endif
/* Initialize hint structure */
MemSet(&hint, 0, sizeof(hint));
hint.ai_family = family;
hint.ai_flags = AI_PASSIVE;
hint.ai_socktype = SOCK_STREAM;
#ifdef HAVE_UNIX_SOCKETS
if (family == AF_UNIX)
{
/*
* Create unixSocketPath from portNumber and unixSocketDir and lock
* that file path
*/
UNIXSOCK_PATH(unixSocketPath, portNumber, unixSocketDir);
if (strlen(unixSocketPath) >= UNIXSOCK_PATH_BUFLEN)
{
ereport(LOG,
(errmsg("Unix-domain socket path \"%s\" is too long (maximum %d bytes)",
unixSocketPath,
(int) (UNIXSOCK_PATH_BUFLEN - 1))));
return STATUS_ERROR;
}
if (Lock_AF_UNIX(unixSocketDir, unixSocketPath) != STATUS_OK)
return STATUS_ERROR;
service = unixSocketPath;
}
else
#endif /* HAVE_UNIX_SOCKETS */
{
snprintf(portNumberStr, sizeof(portNumberStr), "%d", portNumber);
service = portNumberStr;
}
ret = pg_getaddrinfo_all(hostName, service, &hint, &addrs);
if (ret || !addrs)
{
if (hostName)
ereport(LOG,
(errmsg("could not translate host name \"%s\", service \"%s\" to address: %s",
hostName, service, gai_strerror(ret))));
else
ereport(LOG,
(errmsg("could not translate service \"%s\" to address: %s",
service, gai_strerror(ret))));
if (addrs)
pg_freeaddrinfo_all(hint.ai_family, addrs);
return STATUS_ERROR;
}
for (addr = addrs; addr; addr = addr->ai_next)
{
if (!IS_AF_UNIX(family) && IS_AF_UNIX(addr->ai_family))
{
/*
* Only set up a unix domain socket when they really asked for it.
* The service/port is different in that case.
*/
continue;
}
/* See if there is still room to add 1 more socket. */
for (; listen_index < MaxListen; listen_index++)
{
if (ListenSocket[listen_index] == PGINVALID_SOCKET)
break;
}
if (listen_index >= MaxListen)
{
ereport(LOG,
(errmsg("could not bind to all requested addresses: MAXLISTEN (%d) exceeded",
MaxListen)));
break;
}
/* set up family name for possible error messages */
switch (addr->ai_family)
{
case AF_INET:
familyDesc = _("IPv4");
break;
#ifdef HAVE_IPV6
case AF_INET6:
familyDesc = _("IPv6");
break;
#endif
#ifdef HAVE_UNIX_SOCKETS
case AF_UNIX:
familyDesc = _("Unix");
break;
#endif
default:
snprintf(familyDescBuf, sizeof(familyDescBuf),
_("unrecognized address family %d"),
addr->ai_family);
familyDesc = familyDescBuf;
break;
}
if ((fd = socket(addr->ai_family, SOCK_STREAM, 0)) < 0)
{
ereport(LOG,
(errcode_for_socket_access(),
/* translator: %s is IPv4, IPv6, or Unix */
errmsg("could not create %s socket: %m",
familyDesc)));
continue;
}
#ifndef WIN32
/*
* Without the SO_REUSEADDR flag, a new postmaster can't be started
* right away after a stop or crash, giving "address already in use"
* error on TCP ports.
*
* On win32, however, this behavior only happens if the
* SO_EXLUSIVEADDRUSE is set. With SO_REUSEADDR, win32 allows multiple
* servers to listen on the same address, resulting in unpredictable
* behavior. With no flags at all, win32 behaves as Unix with
* SO_REUSEADDR.
*/
if (!IS_AF_UNIX(addr->ai_family))
{
if ((setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
(char *) &one, sizeof(one))) == -1)
{
ereport(LOG,
(errcode_for_socket_access(),
errmsg("setsockopt(SO_REUSEADDR) failed: %m")));
closesocket(fd);
continue;
}
}
#endif
#ifdef IPV6_V6ONLY
if (addr->ai_family == AF_INET6)
{
if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY,
(char *) &one, sizeof(one)) == -1)
{
ereport(LOG,
(errcode_for_socket_access(),
errmsg("setsockopt(IPV6_V6ONLY) failed: %m")));
closesocket(fd);
continue;
}
}
#endif
/*
* Note: This might fail on some OS's, like Linux older than
* 2.4.21-pre3, that don't have the IPV6_V6ONLY socket option, and map
* ipv4 addresses to ipv6. It will show ::ffff:ipv4 for all ipv4
* connections.
*/
err = bind(fd, addr->ai_addr, addr->ai_addrlen);
if (err < 0)
{
ereport(LOG,
(errcode_for_socket_access(),
/* translator: %s is IPv4, IPv6, or Unix */
errmsg("could not bind %s socket: %m",
familyDesc),
(IS_AF_UNIX(addr->ai_family)) ?
errhint("Is another postmaster already running on port %d?"
" If not, remove socket file \"%s\" and retry.",
(int) portNumber, service) :
errhint("Is another postmaster already running on port %d?"
" If not, wait a few seconds and retry.",
(int) portNumber)));
closesocket(fd);
continue;
}
#ifdef HAVE_UNIX_SOCKETS
if (addr->ai_family == AF_UNIX)
{
if (Setup_AF_UNIX(service) != STATUS_OK)
{
closesocket(fd);
break;
}
}
#endif
/*
* Select appropriate accept-queue length limit. PG_SOMAXCONN is only
* intended to provide a clamp on the request on platforms where an
* overly large request provokes a kernel error (are there any?).
*/
maxconn = MaxBackends * 2;
if (maxconn > PG_SOMAXCONN)
maxconn = PG_SOMAXCONN;
err = listen(fd, maxconn);
if (err < 0)
{
ereport(LOG,
(errcode_for_socket_access(),
/* translator: %s is IPv4, IPv6, or Unix */
errmsg("could not listen on %s socket: %m",
familyDesc)));
closesocket(fd);
continue;
}
ListenSocket[listen_index] = fd;
added++;
}
pg_freeaddrinfo_all(hint.ai_family, addrs);
if (!added)
return STATUS_ERROR;
return STATUS_OK;
}
| void TouchSocketFiles | ( | void | ) |
Definition at line 750 of file pqcomm.c.
Referenced by ServerLoop().
{
ListCell *l;
/* Loop through all created sockets... */
foreach(l, sock_paths)
{
char *sock_path = (char *) lfirst(l);
/*
* utime() is POSIX standard, utimes() is a common alternative. If we
* have neither, there's no way to affect the mod or access time of
* the socket :-(
*
* In either path, we ignore errors; there's no point in complaining.
*/
#ifdef HAVE_UTIME
utime(sock_path, NULL);
#else /* !HAVE_UTIME */
#ifdef HAVE_UTIMES
utimes(sock_path, NULL);
#endif /* HAVE_UTIMES */
#endif /* HAVE_UTIME */
}
}
bool DoingCopyOut [static] |
Definition at line 133 of file pqcomm.c.
Referenced by pq_endcopyout(), pq_init(), pq_putbytes(), pq_putmessage(), and pq_startcopyout().
bool PqCommBusy [static] |
Definition at line 132 of file pqcomm.c.
Referenced by pq_comm_reset(), pq_flush(), pq_flush_if_writable(), pq_init(), pq_putbytes(), and pq_putmessage().
char PqRecvBuffer[PQ_RECV_BUFFER_SIZE] [static] |
Definition at line 125 of file pqcomm.c.
Referenced by pq_getbyte(), pq_getbyte_if_available(), pq_getbytes(), pq_getstring(), pq_peekbyte(), and pq_recvbuf().
int PqRecvLength [static] |
Definition at line 127 of file pqcomm.c.
Referenced by pq_discardbytes(), pq_getbyte(), pq_getbyte_if_available(), pq_getbytes(), pq_getstring(), pq_init(), pq_peekbyte(), and pq_recvbuf().
int PqRecvPointer [static] |
Definition at line 126 of file pqcomm.c.
Referenced by pq_discardbytes(), pq_getbyte(), pq_getbyte_if_available(), pq_getbytes(), pq_getstring(), pq_init(), pq_peekbyte(), and pq_recvbuf().
char* PqSendBuffer [static] |
Definition at line 120 of file pqcomm.c.
Referenced by internal_flush(), internal_putbytes(), pq_init(), and pq_putmessage_noblock().
int PqSendBufferSize [static] |
Definition at line 121 of file pqcomm.c.
Referenced by internal_putbytes(), pq_init(), and pq_putmessage_noblock().
int PqSendPointer [static] |
Definition at line 122 of file pqcomm.c.
Referenced by internal_flush(), internal_putbytes(), pq_flush_if_writable(), pq_init(), pq_is_send_pending(), and pq_putmessage_noblock().
int PqSendStart [static] |
Definition at line 123 of file pqcomm.c.
Referenced by internal_flush(), pq_flush_if_writable(), pq_init(), and pq_is_send_pending().
List* sock_paths = NIL [static] |
| char* Unix_socket_group |
Definition at line 102 of file pqcomm.c.
Referenced by show_unix_socket_permissions().
1.7.1