56 #include <linux/kernel.h>
58 #include <linux/slab.h>
61 #include <linux/net.h>
62 #include <linux/export.h>
70 #define MLOG_MASK_PREFIX ML_TCP
75 #define SC_NODEF_FMT "node %s (num %u) at %pI4:%u"
82 #define msglog(hdr, fmt, args...) do { \
83 typeof(hdr) __hdr = (hdr); \
84 mlog(ML_MSG, "[mag %u len %u typ %u stat %d sys_stat %d " \
85 "key %08x num %u] " fmt, \
86 be16_to_cpu(__hdr->magic), be16_to_cpu(__hdr->data_len), \
87 be16_to_cpu(__hdr->msg_type), be32_to_cpu(__hdr->status), \
88 be32_to_cpu(__hdr->sys_status), be32_to_cpu(__hdr->key), \
89 be32_to_cpu(__hdr->msg_num) , ##args); \
92 #define sclog(sc, fmt, args...) do { \
93 typeof(sc) __sc = (sc); \
94 mlog(ML_SOCKET, "[sc %p refs %d sock %p node %u page %p " \
95 "pg_off %zu] " fmt, __sc, \
96 atomic_read(&__sc->sc_kref.refcount), __sc->sc_sock, \
97 __sc->sc_node->nd_num, __sc->sc_page, __sc->sc_page_off , \
107 static struct socket *r2net_listen_sock;
121 #define R2NET_HB_PRI 0x1
124 static struct r2net_msg *r2net_keep_req, *r2net_keep_resp;
136 static void r2net_listen_data_ready(
struct sock *
sk,
int bytes);
138 static void r2net_idle_timer(
unsigned long data);
142 #ifdef CONFIG_DEBUG_FS
146 INIT_LIST_HEAD(&nst->st_net_debug_item);
149 nst->st_msg_key = msgkey;
211 # define r2net_init_nst(a, b, c, d, e)
212 # define r2net_set_nst_sock_time(a)
213 # define r2net_set_nst_send_time(a)
214 # define r2net_set_nst_status_time(a)
215 # define r2net_set_nst_sock_container(a, b)
216 # define r2net_set_nst_msg_id(a, b)
217 # define r2net_set_sock_timer(a)
218 # define r2net_set_data_ready_time(a)
219 # define r2net_set_advance_start_time(a)
220 # define r2net_set_advance_stop_time(a)
221 # define r2net_set_func_start_time(a)
222 # define r2net_set_func_stop_time(a)
225 #ifdef CONFIG_RAMSTER_FS_STATS
228 return ktime_sub(sc->sc_tv_func_stop, sc->sc_tv_func_start);
234 sc->sc_tv_status_total = ktime_add(sc->sc_tv_status_total,
236 nst->st_status_time));
237 sc->sc_tv_send_total = ktime_add(sc->sc_tv_send_total,
238 ktime_sub(nst->st_status_time,
240 sc->sc_tv_acquiry_total = ktime_add(sc->sc_tv_acquiry_total,
241 ktime_sub(nst->st_send_time,
248 sc->sc_tv_process_total = ktime_add(sc->sc_tv_process_total,
249 r2net_get_func_run_time(sc));
255 # define r2net_update_send_stats(a, b)
257 # define r2net_update_recv_stats(sc)
261 static inline int r2net_reconnect_delay(
void)
266 static inline int r2net_keepalive_delay(
void)
271 static inline int r2net_idle_timeout(
void)
280 trans = r2net_sys_err_translations[
err];
290 return &r2net_nodes[node_num];
296 return nn - r2net_nodes;
327 static void r2net_complete_nsw_locked(
struct r2net_node *nn,
343 static void r2net_complete_nsw(
struct r2net_node *nn,
358 r2net_complete_nsw_locked(nn, nsw, sys_status, status);
365 static void r2net_complete_nodes_nsw(
struct r2net_node *nn)
368 unsigned int num_kills = 0;
377 mlog(0,
"completed %d messages for node %u\n", num_kills,
378 r2net_num_from_nn(nn));
381 static int r2net_nsw_completed(
struct r2net_node *nn,
393 static void sc_kref_release(
struct kref *
kref)
399 sclog(sc,
"releasing\n");
410 r2net_debug_del_sc(sc);
417 kref_put(&sc->
sc_kref, sc_kref_release);
431 sc = kzalloc(
sizeof(*sc),
GFP_NOFS);
455 sclog(sc,
"alloced\n");
459 r2net_debug_add_sc(sc);
502 static void r2net_set_nn_state(
struct r2net_node *nn,
514 else if (!old_sc && sc)
523 if (was_valid && !valid && err == 0)
526 mlog(
ML_CONN,
"node %u sc: %p -> %p, valid %u -> %u, err %d -> %d\n",
543 if (was_valid && !valid) {
546 &old_sc->
sc_node->nd_ipv4_address,
548 r2net_complete_nodes_nsw(nn);
551 if (!was_valid && valid) {
555 "Connected to" :
"Accepted connection from",
565 if (!valid && r2net_wq) {
574 mlog(
ML_CONN,
"queueing conn attempt in %lu jiffies\n", delay);
591 if ((old_sc ==
NULL) && sc)
593 if (old_sc && (old_sc != sc)) {
600 static void r2net_data_ready(
struct sock *
sk,
int bytes)
607 sclog(sc,
"data_ready hit\n");
620 static void r2net_state_change(
struct sock *sk)
632 sclog(sc,
"state_change to %d\n", sk->sk_state);
636 switch (sk->sk_state) {
646 pr_info(
"ramster: Connection to "
665 static void r2net_register_callbacks(
struct sock *sk,
690 static int r2net_unregister_callbacks(
struct sock *sk,
713 static void r2net_ensure_shutdown(
struct r2net_node *nn,
719 r2net_set_nn_state(nn,
NULL, 0, err);
731 static void r2net_shutdown_sc(
struct work_struct *work)
738 sclog(sc,
"shutting down\n");
741 if (r2net_unregister_callbacks(sc->
sc_sock->sk, sc)) {
752 r2net_ensure_shutdown(nn, sc, 0);
770 r2net_handler_tree_lookup(
u32 msg_type,
u32 key,
struct rb_node ***ret_p,
781 cmp = r2net_handler_cmp(nmh, msg_type, key);
795 if (ret_parent !=
NULL)
796 *ret_parent = parent;
801 static void r2net_handler_kref_release(
struct kref *kref)
811 kref_put(&nmh->
nh_kref, r2net_handler_kref_release);
826 mlog(0,
"max_len for message handler out of range: %u\n",
833 mlog(0,
"no message type provided: %u, %p\n", msg_type, func);
839 mlog(0,
"no message handler provided: %u, %p\n",
863 if (r2net_handler_tree_lookup(msg_type, key, &p, &parent))
866 rb_link_node(&nmh->
nh_node, parent, p);
870 mlog(
ML_TCP,
"registered handler func %p type %u key %08x\n",
871 func, msg_type, key);
875 "couldn't find handler we *just* registered "
876 "for type %u key %08x\n", msg_type, key);
896 mlog(
ML_TCP,
"unregistering handler func %p type %u key %08x\n",
900 kref_put(&nmh->
nh_kref, r2net_handler_kref_release);
911 nmh = r2net_handler_tree_lookup(msg_type, key,
NULL,
NULL);
921 static int r2net_recv_tcp_msg(
struct socket *
sock,
void *
data,
size_t len)
931 .msg_iov = (
struct iovec *)&vec,
943 static int r2net_send_tcp_msg(
struct socket *sock,
struct kvec *vec,
944 size_t veclen,
size_t total)
950 .msg_iovlen = veclen,
963 mlog(
ML_ERROR,
"sendmsg returned %d instead of %zu\n", ret,
973 mlog(0,
"returning error: %d\n", ret);
978 void *kmalloced_virt,
995 " returned EAGAIN\n", size, sc->
sc_node->nd_name,
1003 " failed with %zd\n", size, sc->
sc_node->nd_name,
1006 r2net_ensure_shutdown(nn, sc, 0);
1023 static int r2net_tx_can_proceed(
struct r2net_node *nn,
1035 kref_get(&nn->
nn_sc->sc_kref);
1038 *sc_ret = nn->
nn_sc;
1066 size_t caller_veclen,
u8 target_node,
int *status)
1070 size_t veclen, caller_bytes = 0;
1084 if (r2net_wq ==
NULL) {
1085 mlog(0,
"attempt to tx without r2netd running\n");
1090 if (caller_veclen == 0) {
1091 mlog(0,
"bad kvec array length\n");
1096 caller_bytes = iov_length((
struct iovec *)caller_vec, caller_veclen);
1098 mlog(0,
"total payload len %zu too large\n", caller_bytes);
1108 r2net_debug_add_nst(&nst);
1118 veclen = caller_veclen + 1;
1121 mlog(0,
"failed to %zu element kvec!\n", veclen);
1128 mlog(0,
"failed to allocate a r2net_msg!\n");
1133 r2net_init_msg(msg, caller_bytes, msg_type, key);
1137 memcpy(&vec[1], caller_vec, caller_veclen *
sizeof(
struct kvec));
1139 ret = r2net_prep_nsw(nn, &nsw);
1151 ret = r2net_send_tcp_msg(sc->
sc_sock, vec, veclen,
1152 sizeof(
struct r2net_msg) + caller_bytes);
1154 msglog(msg,
"sending returned %d\n", ret);
1156 mlog(0,
"error returned from r2net_send_tcp_msg=%d\n", ret);
1174 mlog(0,
"woken, returning system status %d, user status %d\n",
1177 r2net_debug_del_nst(&nst);
1182 r2net_complete_nsw(nn, &nsw, 0, 0, 0);
1188 u8 target_node,
int *status)
1195 target_node, status);
1217 msglog(hdr,
"about to send status magic %d\n", err);
1219 return r2net_send_tcp_msg(sock, &vec, 1,
sizeof(
struct r2net_msg));
1228 void *data,
size_t data_len,
1248 msglog(hdr,
"about to send data magic %d\n", err);
1250 ret = r2net_send_tcp_msg(sc->
sc_sock, vec, 2,
1272 int ret = 0, handler_status;
1275 void *ret_data =
NULL;
1278 msglog(hdr,
"processing message\n");
1280 r2net_sc_postpone_idle(sc);
1291 r2net_sendpage(sc, r2net_keep_resp,
sizeof(*r2net_keep_resp));
1306 msglog(hdr,
"bad magic\n");
1317 mlog(
ML_TCP,
"couldn't find handler for type %u key %08x\n",
1329 pr_err(
"ramster_r2net, message length problem\n");
1354 ret = r2net_send_data_magic(sc, hdr,
1355 ret_data, handler_status,
1358 mlog(0,
"sending data reply %d, syserr %d returned %d\n",
1359 handler_status, syserr, ret);
1372 ret = r2net_send_status_magic(sc->
sc_sock, hdr, syserr,
1376 mlog(0,
"sending handler status %d, syserr %d returned %d\n",
1377 handler_status, syserr, ret);
1388 r2net_handler_put(nmh);
1399 "protocol version %llu but %llu is required. "
1400 "Disconnecting.\n", sc->
sc_node->nd_name,
1407 r2net_ensure_shutdown(nn, sc, -
ENOTCONN);
1417 r2net_idle_timeout()) {
1419 "idle timeout of %u ms, but we use %u ms locally. "
1420 "Disconnecting.\n", sc->
sc_node->nd_name,
1424 r2net_idle_timeout());
1425 r2net_ensure_shutdown(nn, sc, -
ENOTCONN);
1430 r2net_keepalive_delay()) {
1432 "delay of %u ms, but we use %u ms locally. "
1433 "Disconnecting.\n", sc->
sc_node->nd_name,
1437 r2net_keepalive_delay());
1438 r2net_ensure_shutdown(nn, sc, -
ENOTCONN);
1445 "timeout of %u ms, but we use %u ms locally. "
1446 "Disconnecting.\n", sc->
sc_node->nd_name,
1451 r2net_ensure_shutdown(nn, sc, -
ENOTCONN);
1460 if (nn->
nn_sc == sc) {
1461 r2net_sc_reset_idle_timer(sc);
1463 r2net_set_nn_state(nn, sc, 1, 0);
1485 sclog(sc,
"receiving\n");
1493 ret = r2net_recv_tcp_msg(sc->
sc_sock, data, datalen);
1499 r2net_check_handshake(sc);
1509 datalen =
sizeof(
struct r2net_msg) - sc->sc_page_off;
1510 ret = r2net_recv_tcp_msg(sc->
sc_sock, data, datalen);
1546 ret = r2net_recv_tcp_msg(sc->
sc_sock, data, datalen);
1558 ret = r2net_process_message(sc, hdr);
1565 sclog(sc,
"ret = %d\n", ret);
1573 static void r2net_rx_until_empty(
struct work_struct *work)
1580 ret = r2net_advance_rx(sc);
1583 if (ret <= 0 && ret != -
EAGAIN) {
1585 sclog(sc,
"saw error %d, closing\n", ret);
1587 r2net_ensure_shutdown(nn, sc, 0);
1592 static int r2net_set_nodelay(
struct socket *sock)
1611 (
char __user *)&val,
sizeof(val));
1617 static void r2net_initialize_handshake(
void)
1623 r2net_keepalive_delay());
1625 r2net_reconnect_delay());
1632 static void r2net_sc_connect_completed(
struct work_struct *work)
1638 mlog(
ML_MSG,
"sc sending handshake with ver %llu id %llx\n",
1642 r2net_initialize_handshake();
1643 r2net_sendpage(sc, r2net_hand,
sizeof(*r2net_hand));
1648 static void r2net_sc_send_keep_req(
struct work_struct *work)
1654 r2net_sendpage(sc, r2net_keep_req,
sizeof(*r2net_keep_req));
1661 static void r2net_idle_timer(
unsigned long data)
1665 #ifdef CONFIG_DEBUG_FS
1666 unsigned long msecs = ktime_to_ms(
ktime_get()) -
1667 ktime_to_ms(sc->sc_tv_timer);
1669 unsigned long msecs = r2net_idle_timeout();
1673 "idle for %lu.%lu secs, shutting it down.\n",
1676 msecs / 1000, msecs % 1000);
1700 r2net_sc_reset_idle_timer(sc);
1708 static void r2net_start_connect(
struct work_struct *work)
1715 struct sockaddr_in myaddr = {0, }, remoteaddr = {0, };
1731 if (mynode ==
NULL) {
1754 sc = sc_alloc(node);
1756 mlog(0,
"couldn't allocate sc\n");
1763 mlog(0,
"can't create socket: %d\n", ret);
1771 myaddr.
sin_addr.s_addr = mynode->nd_ipv4_address;
1774 ret = sock->
ops->bind(sock, (
struct sockaddr *)&myaddr,
1777 mlog(
ML_ERROR,
"bind failed with %d at address %pI4\n",
1778 ret, &mynode->nd_ipv4_address);
1782 ret = r2net_set_nodelay(sc->
sc_sock);
1784 mlog(
ML_ERROR,
"setting TCP_NODELAY failed with %d\n", ret);
1788 r2net_register_callbacks(sc->
sc_sock->sk, sc);
1792 r2net_set_nn_state(nn, sc, 0, 0);
1795 remoteaddr.sin_family =
AF_INET;
1809 " failed with errno %d\n", sc->
sc_node->nd_name,
1815 r2net_ensure_shutdown(nn, sc, 0);
1827 static void r2net_connect_expired(
struct work_struct *work)
1834 pr_notice(
"ramster: No connection established with "
1835 "node %u after %u.%u seconds, giving up.\n",
1836 r2net_num_from_nn(nn),
1837 r2net_idle_timeout() / 1000,
1838 r2net_idle_timeout() % 1000);
1845 static void r2net_still_up(
struct work_struct *work)
1869 static void r2net_hb_node_down_cb(
struct r2nm_node *node,
int node_num,
1881 static void r2net_hb_node_up_cb(
struct r2nm_node *node,
int node_num,
1900 r2net_set_nn_state(nn,
NULL, 0, 0);
1932 static int r2net_accept_one(
struct socket *sock)
1944 sock->
sk->sk_protocol, &new_sock);
1949 new_sock->
ops = sock->
ops;
1956 ret = r2net_set_nodelay(new_sock);
1958 mlog(
ML_ERROR,
"setting TCP_NODELAY failed with %d\n", ret);
1963 ret = new_sock->
ops->getname(new_sock, (
struct sockaddr *) &sin,
1970 pr_notice(
"ramster: Attempt to connect from unknown "
1971 "node at %pI4:%d\n", &sin.sin_addr.s_addr,
1972 ntohs(sin.sin_port));
1979 pr_notice(
"ramster: Unexpected connect attempt seen "
1980 "at node '%s' (%u, %pI4:%d) from node '%s' (%u, "
1984 node->
nd_num, &sin.sin_addr.s_addr,
ntohs(sin.sin_port));
1992 mlog(
ML_CONN,
"attempt to connect from node '%s' at "
1993 "%pI4:%d but it isn't heartbeating\n",
1994 node->
nd_name, &sin.sin_addr.s_addr,
1995 ntohs(sin.sin_port));
2009 pr_notice(
"ramster: Attempt to connect from node '%s' "
2010 "at %pI4:%d but it already has an open connection\n",
2011 node->
nd_name, &sin.sin_addr.s_addr,
2012 ntohs(sin.sin_port));
2016 sc = sc_alloc(node);
2027 r2net_set_nn_state(nn, sc, 0, 0);
2030 r2net_register_callbacks(sc->
sc_sock->sk, sc);
2033 r2net_initialize_handshake();
2034 r2net_sendpage(sc, r2net_hand,
sizeof(*r2net_hand));
2048 static void r2net_accept_many(
struct work_struct *work)
2050 struct socket *sock = r2net_listen_sock;
2051 while (r2net_accept_one(sock) == 0)
2055 static void r2net_listen_data_ready(
struct sock *sk,
int bytes)
2086 .sin_addr = { .s_addr = addr },
2092 pr_err(
"ramster: Error %d while creating socket\n", ret);
2099 sock->
sk->sk_user_data = sock->
sk->sk_data_ready;
2100 sock->
sk->sk_data_ready = r2net_listen_data_ready;
2103 r2net_listen_sock =
sock;
2104 INIT_WORK(&r2net_listen_work, r2net_accept_many);
2106 sock->
sk->sk_reuse = 1;
2107 ret = sock->
ops->bind(sock, (
struct sockaddr *)&sin,
sizeof(sin));
2109 pr_err(
"ramster: Error %d while binding socket at %pI4:%u\n",
2110 ret, &addr,
ntohs(port));
2114 ret = sock->
ops->listen(sock, 64);
2116 pr_err(
"ramster: Error %d while listening on %pI4:%u\n",
2117 ret, &addr,
ntohs(port));
2121 r2net_listen_sock =
NULL;
2144 if (r2net_wq ==
NULL) {
2163 struct socket *sock = r2net_listen_sock;
2171 sock->
sk->sk_data_ready = sock->
sk->sk_user_data;
2172 sock->
sk->sk_user_data =
NULL;
2175 for (i = 0; i <
ARRAY_SIZE(r2net_nodes); i++) {
2189 r2net_listen_sock =
NULL;
2196 pr_err(
"ramster: cluster not alive, node_up_manual ignored\n");
2199 r2net_hb_node_up_cb(&dummy, node_num,
NULL);
2209 if (r2net_debugfs_init())
2215 if (!r2net_hand || !r2net_keep_req || !r2net_keep_resp) {
2217 kfree(r2net_keep_req);
2218 kfree(r2net_keep_resp);
2228 for (i = 0; i <
ARRAY_SIZE(r2net_nodes); i++) {
2235 r2net_connect_expired);
2250 kfree(r2net_keep_req);
2251 kfree(r2net_keep_resp);
2252 r2net_debugfs_exit();