47 #include <asm/ioctls.h>
54 #include <linux/slab.h>
64 #define NEEDED_RMEM (4*1024*1024)
65 #define CONN_HASH_SIZE 32
68 #define MAX_SEND_MSG_COUNT 25
76 static void cbuf_add(
struct cbuf *
cb,
int n)
81 static int cbuf_data(
struct cbuf *
cb)
86 static void cbuf_init(
struct cbuf *cb,
int size)
92 static void cbuf_eat(
struct cbuf *cb,
int n)
99 static bool cbuf_empty(
struct cbuf *cb)
109 #define CF_READ_PENDING 1
110 #define CF_WRITE_PENDING 2
111 #define CF_CONNECT_PENDING 3
112 #define CF_INIT_PENDING 4
113 #define CF_IS_OTHERCON 5
115 #define CF_APP_LIMITED 7
123 #define MAX_CONNECT_RETRIES 3
130 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
154 static int dlm_local_count;
155 static int dlm_allow_conn;
172 static inline int nodeid_hash(
int nodeid)
183 r = nodeid_hash(nodeid);
186 if (con->
nodeid == nodeid)
201 con = __find_con(nodeid);
205 con = kmem_cache_zalloc(con_cache, alloc);
209 r = nodeid_hash(nodeid);
210 hlist_add_head(&con->
list, &connection_hash[r]);
232 static void foreach_conn(
void (*conn_func)(
struct connection *
c))
250 con = __nodeid2con(nodeid, allocation);
257 static struct connection *assoc2con(
int assoc_id)
290 switch (x->ss_family) {
321 if (!dlm_local_count)
324 spin_lock(&dlm_node_addrs_spin);
325 na = find_node_addr(nodeid);
328 spin_unlock(&dlm_node_addrs_spin);
360 spin_lock(&dlm_node_addrs_spin);
365 if (!addr_compare(na->
addr[0], addr))
372 spin_unlock(&dlm_node_addrs_spin);
391 memcpy(new_addr, addr, len);
393 spin_lock(&dlm_node_addrs_spin);
394 na = find_node_addr(nodeid);
397 new_node->
addr[0] = new_addr;
399 list_add(&new_node->
list, &dlm_node_addrs);
400 spin_unlock(&dlm_node_addrs_spin);
405 spin_unlock(&dlm_node_addrs_spin);
412 spin_unlock(&dlm_node_addrs_spin);
418 static void lowcomms_data_ready(
struct sock *
sk,
int count_unused)
425 static void lowcomms_write_space(
struct sock *sk)
435 con->
sock->sk->sk_write_pending--;
443 static inline void lowcomms_connect_sock(
struct connection *con)
451 static void lowcomms_state_change(
struct sock *sk)
454 lowcomms_write_space(sk);
471 lowcomms_connect_sock(con);
481 con->
sock->sk->sk_data_ready = lowcomms_data_ready;
482 con->
sock->sk->sk_write_space = lowcomms_write_space;
483 con->
sock->sk->sk_state_change = lowcomms_state_change;
484 con->
sock->sk->sk_user_data =
con;
493 saddr->ss_family = dlm_local_addr[0]->ss_family;
494 if (saddr->ss_family ==
AF_INET) {
498 memset(&in4_addr->sin_zero, 0,
sizeof(in4_addr->sin_zero));
508 static void close_connection(
struct connection *con,
bool and_other)
518 close_connection(con->
othercon,
false);
539 con = nodeid2con(0,0);
542 outmessage.msg_name =
NULL;
543 outmessage.msg_namelen = 0;
544 outmessage.msg_control = outcmsg;
545 outmessage.msg_controllen =
sizeof(outcmsg);
546 outmessage.msg_flags =
MSG_EOR;
552 outmessage.msg_controllen = cmsg->
cmsg_len;
562 log_print(
"send EOF to node failed: %d", ret);
565 static void sctp_init_failed_foreach(
struct connection *con)
576 static void sctp_init_failed(
void)
580 foreach_conn(sctp_init_failed_foreach);
586 static void process_sctp_notification(
struct connection *con,
610 log_print(
"COMM_UP for invalid assoc ID %d",
625 log_print(
"getsockopt/sctp_primary_addr on "
626 "new assoc %d failed : %d",
636 make_sockaddr(&
prim.ssp_addr, 0, &addr_len);
637 if (addr_to_nodeid(&
prim.ssp_addr, &nodeid)) {
638 unsigned char *
b=(
unsigned char *)&
prim.ssp_addr;
639 log_print(
"reject connect from unknown addr");
642 sctp_send_shutdown(
prim.ssp_assoc_id);
646 new_con = nodeid2con(nodeid,
GFP_NOFS);
658 "connection %d to node %d: err=%d",
663 add_sock(new_con->
sock, new_con);
665 log_print(
"connecting to %d sctp association %d",
695 log_print(
"Can't start SCTP association - retrying");
701 log_print(
"unexpected SCTP assoc change id=%d state=%d",
709 static int receive_from_sock(
struct connection *con)
716 int call_again_soon = 0;
739 memset(&incmsg, 0,
sizeof(incmsg));
747 iov[0].iov_len = con->
cb.base - cbuf_data(&con->
cb);
756 if (cbuf_data(&con->
cb) >= con->
cb.base) {
758 iov[1].iov_len = con->
cb.base;
762 len = iov[0].iov_len + iov[1].iov_len;
774 process_sctp_notification(con, &msg,
783 cbuf_add(&con->
cb, ret);
786 con->
cb.base, con->
cb.len,
789 log_print(
"lowcomms: addr=%p, base=%u, len=%u, "
790 "iov_len=%u, iov_base[0]=%p, read=%d",
792 len, iov[0].iov_base, r);
796 cbuf_eat(&con->
cb, ret);
798 if (cbuf_empty(&con->
cb) && !call_again_soon) {
817 close_connection(con,
false);
828 static int tcp_accept_from_sock(
struct connection *con)
839 if (!dlm_allow_conn) {
845 memset(&peeraddr, 0,
sizeof(peeraddr));
865 memset(&peeraddr, 0,
sizeof(peeraddr));
866 if (newsock->
ops->getname(newsock, (
struct sockaddr *)&peeraddr,
873 make_sockaddr(&peeraddr, 0, &len);
874 if (addr_to_nodeid(&peeraddr, &nodeid)) {
875 unsigned char *b=(
unsigned char *)&peeraddr;
876 log_print(
"connect from non cluster node");
884 log_print(
"got connection from %d", nodeid);
891 newcon = nodeid2con(nodeid,
GFP_NOFS);
901 othercon = kmem_cache_zalloc(con_cache,
GFP_NOFS);
903 log_print(
"failed to allocate incoming socket");
915 if (!othercon->
sock) {
917 othercon->
sock = newsock;
919 add_sock(newsock, othercon);
923 printk(
"Extra connection from node %d attempted\n", nodeid);
930 newsock->
sk->sk_user_data = newcon;
932 add_sock(newsock, newcon);
954 log_print(
"error accepting connection from node: %d", result);
969 static void sctp_init_assoc(
struct connection *con)
993 base_con = nodeid2con(0, 0);
996 make_sockaddr(&rem_addr,
dlm_config.ci_tcp_port, &addrlen);
998 outmessage.msg_name = &rem_addr;
999 outmessage.msg_namelen = addrlen;
1000 outmessage.msg_control = outcmsg;
1001 outmessage.msg_controllen =
sizeof(outcmsg);
1002 outmessage.msg_flags =
MSG_EOR;
1019 iov[0].iov_len = len;
1028 outmessage.msg_controllen = cmsg->
cmsg_len;
1032 log_print(
"Send first packet to node %d failed: %d",
1044 if (e->
len == 0 && e->
users == 0) {
1053 static void tcp_connect_to_sock(
struct connection *con)
1062 log_print(
"attempt to connect sock 0 foiled");
1080 memset(&saddr, 0,
sizeof(saddr));
1081 result = nodeid_to_addr(con->
nodeid, &saddr,
NULL);
1087 sock->
sk->sk_user_data =
con;
1090 add_sock(sock, con);
1095 make_sockaddr(&
src_addr, 0, &addr_len);
1099 log_print(
"could not bind for connect: %d", result);
1103 make_sockaddr(&saddr,
dlm_config.ci_tcp_port, &addr_len);
1111 result = sock->
ops->connect(sock, (
struct sockaddr *)&saddr, addr_len,
1138 lowcomms_connect_sock(con);
1163 log_print(
"Can't create listening comms socket");
1172 (
char *)&one,
sizeof(one));
1175 log_print(
"Failed to set SO_REUSEADDR on socket: %d", result);
1181 make_sockaddr(saddr,
dlm_config.ci_tcp_port, &addr_len);
1182 result = sock->
ops->bind(sock, (
struct sockaddr *) saddr, addr_len);
1191 (
char *)&one,
sizeof(one));
1193 log_print(
"Set keepalive failed: %d", result);
1196 result = sock->
ops->listen(sock, 5);
1209 static void init_local(
void)
1214 dlm_local_count = 0;
1222 memcpy(addr, &sas,
sizeof(*addr));
1223 dlm_local_addr[dlm_local_count++] =
addr;
1229 static int add_sctp_bind_addr(
struct connection *sctp_con,
1231 int addr_len,
int num)
1242 (
char *)addr, addr_len);
1245 log_print(
"Can't bind to port %d addr number %d",
1252 static int sctp_listen_for_all(
void)
1257 int result = -
EINVAL, num = 1,
i, addr_len;
1264 log_print(
"Using SCTP for communications");
1269 log_print(
"Can't create comms socket, check SCTP is loaded");
1274 memset(&subscribe, 0,
sizeof(subscribe));
1275 subscribe.sctp_data_io_event = 1;
1276 subscribe.sctp_association_event = 1;
1277 subscribe.sctp_send_failure_event = 1;
1278 subscribe.sctp_shutdown_event = 1;
1279 subscribe.sctp_partial_delivery_event = 1;
1282 (
char *)&bufsize,
sizeof(bufsize));
1284 log_print(
"Error increasing buffer space on socket %d", result);
1287 (
char *)&subscribe,
sizeof(subscribe));
1289 log_print(
"Failed to set SCTP_EVENTS on socket: result=%d",
1291 goto create_delsock;
1295 sock->
sk->sk_user_data =
con;
1297 con->
sock->sk->sk_data_ready = lowcomms_data_ready;
1302 for (i = 0; i < dlm_local_count; i++) {
1303 memcpy(&localaddr, dlm_local_addr[i],
sizeof(localaddr));
1304 make_sockaddr(&localaddr,
dlm_config.ci_tcp_port, &addr_len);
1306 result = add_sctp_bind_addr(con, &localaddr, addr_len, num);
1308 goto create_delsock;
1312 result = sock->
ops->listen(sock, 5);
1314 log_print(
"Can't set socket listening");
1315 goto create_delsock;
1327 static int tcp_listen_for_all(
void)
1337 if (dlm_local_addr[1] !=
NULL) {
1338 log_print(
"TCP protocol can't handle multi-homed hosts, "
1343 log_print(
"Using TCP for communications");
1345 sock = tcp_create_listen_sock(con, dlm_local_addr[0]);
1347 add_sock(sock, con);
1390 con = nodeid2con(nodeid, allocation);
1412 e = new_writequeue_entry(con, allocation);
1449 static void send_to_sock(
struct connection *con)
1477 if (ret == -
EAGAIN || ret == 0) {
1485 con->
sock->sk->sk_write_pending++;
1503 if (e->
len == 0 && e->
users == 0) {
1515 close_connection(con,
false);
1516 lowcomms_connect_sock(con);
1522 lowcomms_connect_sock(con);
1525 static void clean_one_writequeue(
struct connection *con)
1544 log_print(
"closing connection to node %d", nodeid);
1545 con = nodeid2con(nodeid, 0);
1551 log_print(
"canceled swork for node %d", nodeid);
1553 log_print(
"canceled rwork for node %d", nodeid);
1554 clean_one_writequeue(con);
1555 close_connection(con,
true);
1558 spin_lock(&dlm_node_addrs_spin);
1559 na = find_node_addr(nodeid);
1566 spin_unlock(&dlm_node_addrs_spin);
1584 static void process_send_sockets(
struct work_struct *work)
1598 static void clean_writequeues(
void)
1600 foreach_conn(clean_one_writequeue);
1603 static void work_stop(
void)
1609 static int work_start(
void)
1613 if (!recv_workqueue) {
1620 if (!send_workqueue) {
1633 con->
sock->sk->sk_user_data =
NULL;
1636 static void free_conn(
struct connection *con)
1638 close_connection(con,
true);
1641 hlist_del(&con->
list);
1658 clean_writequeues();
1660 foreach_conn(free_conn);
1676 if (!dlm_local_count) {
1678 log_print(
"no local IP address has been set");
1689 error = work_start();
1697 error = tcp_listen_for_all();
1699 error = sctp_listen_for_all();
1707 con = nodeid2con(0,0);
1709 close_connection(con,
false);
1722 spin_lock(&dlm_node_addrs_spin);
1729 spin_unlock(&dlm_node_addrs_spin);