56 #include <linux/kernel.h>
58 #include <linux/slab.h>
61 #include <linux/net.h>
62 #include <linux/export.h>
65 #include <asm/uaccess.h>
70 #define MLOG_MASK_PREFIX ML_TCP
76 #define SC_NODEF_FMT "node %s (num %u) at %pI4:%u"
77 #define SC_NODEF_ARGS(sc) sc->sc_node->nd_name, sc->sc_node->nd_num, \
78 &sc->sc_node->nd_ipv4_address, \
79 ntohs(sc->sc_node->nd_ipv4_port)
86 #define msglog(hdr, fmt, args...) do { \
87 typeof(hdr) __hdr = (hdr); \
88 mlog(ML_MSG, "[mag %u len %u typ %u stat %d sys_stat %d " \
89 "key %08x num %u] " fmt, \
90 be16_to_cpu(__hdr->magic), be16_to_cpu(__hdr->data_len), \
91 be16_to_cpu(__hdr->msg_type), be32_to_cpu(__hdr->status), \
92 be32_to_cpu(__hdr->sys_status), be32_to_cpu(__hdr->key), \
93 be32_to_cpu(__hdr->msg_num) , ##args); \
96 #define sclog(sc, fmt, args...) do { \
97 typeof(sc) __sc = (sc); \
98 mlog(ML_SOCKET, "[sc %p refs %d sock %p node %u page %p " \
99 "pg_off %zu] " fmt, __sc, \
100 atomic_read(&__sc->sc_kref.refcount), __sc->sc_sock, \
101 __sc->sc_node->nd_num, __sc->sc_page, __sc->sc_page_off , \
111 static struct socket *o2net_listen_sock =
NULL;
125 #define O2NET_HB_PRI 0x1
128 static struct o2net_msg *o2net_keep_req, *o2net_keep_resp;
140 static void o2net_listen_data_ready(
struct sock *
sk,
int bytes);
142 static void o2net_idle_timer(
unsigned long data);
146 #ifdef CONFIG_DEBUG_FS
150 INIT_LIST_HEAD(&nst->st_net_debug_item);
153 nst->st_msg_key = msgkey;
215 # define o2net_init_nst(a, b, c, d, e)
216 # define o2net_set_nst_sock_time(a)
217 # define o2net_set_nst_send_time(a)
218 # define o2net_set_nst_status_time(a)
219 # define o2net_set_nst_sock_container(a, b)
220 # define o2net_set_nst_msg_id(a, b)
221 # define o2net_set_sock_timer(a)
222 # define o2net_set_data_ready_time(a)
223 # define o2net_set_advance_start_time(a)
224 # define o2net_set_advance_stop_time(a)
225 # define o2net_set_func_start_time(a)
226 # define o2net_set_func_stop_time(a)
229 #ifdef CONFIG_OCFS2_FS_STATS
232 return ktime_sub(sc->sc_tv_func_stop, sc->sc_tv_func_start);
238 sc->sc_tv_status_total = ktime_add(sc->sc_tv_status_total,
240 nst->st_status_time));
241 sc->sc_tv_send_total = ktime_add(sc->sc_tv_send_total,
242 ktime_sub(nst->st_status_time,
244 sc->sc_tv_acquiry_total = ktime_add(sc->sc_tv_acquiry_total,
245 ktime_sub(nst->st_send_time,
252 sc->sc_tv_process_total = ktime_add(sc->sc_tv_process_total,
253 o2net_get_func_run_time(sc));
259 # define o2net_update_send_stats(a, b)
261 # define o2net_update_recv_stats(sc)
265 static inline int o2net_reconnect_delay(
void)
270 static inline int o2net_keepalive_delay(
void)
275 static inline int o2net_idle_timeout(
void)
284 trans = o2net_sys_err_translations[
err];
291 static struct o2net_node * o2net_nn_from_num(
u8 node_num)
294 return &o2net_nodes[node_num];
300 return nn - o2net_nodes;
331 static void o2net_complete_nsw_locked(
struct o2net_node *nn,
347 static void o2net_complete_nsw(
struct o2net_node *nn,
362 o2net_complete_nsw_locked(nn, nsw, sys_status, status);
369 static void o2net_complete_nodes_nsw(
struct o2net_node *nn)
372 unsigned int num_kills = 0;
381 mlog(0,
"completed %d messages for node %u\n", num_kills,
382 o2net_num_from_nn(nn));
385 static int o2net_nsw_completed(
struct o2net_node *nn,
397 static void sc_kref_release(
struct kref *
kref)
403 sclog(sc,
"releasing\n");
414 o2net_debug_del_sc(sc);
421 kref_put(&sc->
sc_kref, sc_kref_release);
435 sc = kzalloc(
sizeof(*sc),
GFP_NOFS);
459 sclog(sc,
"alloced\n");
463 o2net_debug_add_sc(sc);
506 static void o2net_set_nn_state(
struct o2net_node *nn,
518 else if (!old_sc && sc)
527 if (was_valid && !valid && err == 0)
530 mlog(
ML_CONN,
"node %u sc: %p -> %p, valid %u -> %u, err %d -> %d\n",
548 if (was_valid && !valid) {
551 o2net_complete_nodes_nsw(nn);
554 if (!was_valid && valid) {
559 "Connected to" :
"Accepted connection from",
567 if (!valid && o2net_wq) {
576 mlog(
ML_CONN,
"queueing conn attempt in %lu jiffies\n", delay);
593 if ((old_sc ==
NULL) && sc)
595 if (old_sc && (old_sc != sc)) {
602 static void o2net_data_ready(
struct sock *
sk,
int bytes)
609 sclog(sc,
"data_ready hit\n");
622 static void o2net_state_change(
struct sock *sk)
634 sclog(sc,
"state_change to %d\n", sk->sk_state);
638 switch(sk->sk_state) {
648 " shutdown, state %d\n",
663 static void o2net_register_callbacks(
struct sock *sk,
688 static int o2net_unregister_callbacks(
struct sock *sk,
711 static void o2net_ensure_shutdown(
struct o2net_node *nn,
717 o2net_set_nn_state(nn,
NULL, 0, err);
729 static void o2net_shutdown_sc(
struct work_struct *work)
736 sclog(sc,
"shutting down\n");
739 if (o2net_unregister_callbacks(sc->
sc_sock->sk, sc)) {
750 o2net_ensure_shutdown(nn, sc, 0);
768 o2net_handler_tree_lookup(
u32 msg_type,
u32 key,
struct rb_node ***ret_p,
779 cmp = o2net_handler_cmp(nmh, msg_type, key);
793 if (ret_parent !=
NULL)
794 *ret_parent = parent;
799 static void o2net_handler_kref_release(
struct kref *kref)
809 kref_put(&nmh->
nh_kref, o2net_handler_kref_release);
824 mlog(0,
"max_len for message handler out of range: %u\n",
831 mlog(0,
"no message type provided: %u, %p\n", msg_type, func);
837 mlog(0,
"no message handler provided: %u, %p\n",
861 if (o2net_handler_tree_lookup(msg_type, key, &p, &parent))
864 rb_link_node(&nmh->
nh_node, parent, p);
868 mlog(
ML_TCP,
"registered handler func %p type %u key %08x\n",
869 func, msg_type, key);
873 "couldn't find handler we *just* registerd "
874 "for type %u key %08x\n", msg_type, key);
894 mlog(
ML_TCP,
"unregistering handler func %p type %u key %08x\n",
898 kref_put(&nmh->
nh_kref, o2net_handler_kref_release);
909 nmh = o2net_handler_tree_lookup(msg_type, key,
NULL,
NULL);
919 static int o2net_recv_tcp_msg(
struct socket *
sock,
void *
data,
size_t len)
929 .msg_iov = (
struct iovec *)&vec,
941 static int o2net_send_tcp_msg(
struct socket *sock,
struct kvec *vec,
942 size_t veclen,
size_t total)
948 .msg_iovlen = veclen,
961 mlog(
ML_ERROR,
"sendmsg returned %d instead of %zu\n", ret,
971 mlog(0,
"returning error: %d\n", ret);
976 void *kmalloced_virt,
992 mlog(0,
"sendpage of size %zu to " SC_NODEF_FMT
999 o2net_ensure_shutdown(nn, sc, 0);
1015 static int o2net_tx_can_proceed(
struct o2net_node *nn,
1027 kref_get(&nn->
nn_sc->sc_kref);
1030 *sc_ret = nn->
nn_sc;
1048 o2net_tx_can_proceed(o2net_nn_from_num(node), &sc, &ret);
1058 size_t caller_veclen,
u8 target_node,
int *status)
1062 size_t veclen, caller_bytes = 0;
1065 struct o2net_node *nn = o2net_nn_from_num(target_node);
1073 if (o2net_wq ==
NULL) {
1074 mlog(0,
"attempt to tx without o2netd running\n");
1079 if (caller_veclen == 0) {
1080 mlog(0,
"bad kvec array length\n");
1085 caller_bytes = iov_length((
struct iovec *)caller_vec, caller_veclen);
1087 mlog(0,
"total payload len %zu too large\n", caller_bytes);
1097 o2net_debug_add_nst(&nst);
1107 veclen = caller_veclen + 1;
1110 mlog(0,
"failed to %zu element kvec!\n", veclen);
1117 mlog(0,
"failed to allocate a o2net_msg!\n");
1122 o2net_init_msg(msg, caller_bytes, msg_type, key);
1126 memcpy(&vec[1], caller_vec, caller_veclen *
sizeof(
struct kvec));
1128 ret = o2net_prep_nsw(nn, &nsw);
1140 ret = o2net_send_tcp_msg(sc->
sc_sock, vec, veclen,
1141 sizeof(
struct o2net_msg) + caller_bytes);
1143 msglog(msg,
"sending returned %d\n", ret);
1145 mlog(0,
"error returned from o2net_send_tcp_msg=%d\n", ret);
1162 mlog(0,
"woken, returning system status %d, user status %d\n",
1165 o2net_debug_del_nst(&nst);
1172 o2net_complete_nsw(nn, &nsw, 0, 0, 0);
1178 u8 target_node,
int *status)
1185 target_node, status);
1206 msglog(hdr,
"about to send status magic %d\n", err);
1208 return o2net_send_tcp_msg(sock, &vec, 1,
sizeof(
struct o2net_msg));
1217 int ret = 0, handler_status;
1220 void *ret_data =
NULL;
1222 msglog(hdr,
"processing message\n");
1224 o2net_sc_postpone_idle(sc);
1229 o2net_complete_nsw(nn,
NULL,
1235 o2net_sendpage(sc, o2net_keep_resp,
1236 sizeof(*o2net_keep_resp));
1243 msglog(hdr,
"bad magic\n");
1254 mlog(
ML_TCP,
"couldn't find handler for type %u key %08x\n",
1281 ret = o2net_send_status_magic(sc->
sc_sock, hdr, syserr,
1285 mlog(0,
"sending handler status %d, syserr %d returned %d\n",
1286 handler_status, syserr, ret);
1297 o2net_handler_put(nmh);
1308 "protocol version %llu but %llu is required. "
1314 o2net_ensure_shutdown(nn, sc, -
ENOTCONN);
1324 o2net_idle_timeout()) {
1326 "idle timeout of %u ms, but we use %u ms locally. "
1329 o2net_idle_timeout());
1330 o2net_ensure_shutdown(nn, sc, -
ENOTCONN);
1335 o2net_keepalive_delay()) {
1337 "delay of %u ms, but we use %u ms locally. "
1340 o2net_keepalive_delay());
1341 o2net_ensure_shutdown(nn, sc, -
ENOTCONN);
1348 "timeout of %u ms, but we use %u ms locally. "
1352 o2net_ensure_shutdown(nn, sc, -
ENOTCONN);
1361 if (nn->
nn_sc == sc) {
1362 o2net_sc_reset_idle_timer(sc);
1364 o2net_set_nn_state(nn, sc, 1, 0);
1386 sclog(sc,
"receiving\n");
1393 ret = o2net_recv_tcp_msg(sc->
sc_sock, data, datalen);
1399 o2net_check_handshake(sc);
1409 datalen =
sizeof(
struct o2net_msg) - sc->sc_page_off;
1410 ret = o2net_recv_tcp_msg(sc->
sc_sock, data, datalen);
1443 ret = o2net_recv_tcp_msg(sc->
sc_sock, data, datalen);
1454 ret = o2net_process_message(sc, hdr);
1461 sclog(sc,
"ret = %d\n", ret);
1469 static void o2net_rx_until_empty(
struct work_struct *work)
1476 ret = o2net_advance_rx(sc);
1479 if (ret <= 0 && ret != -
EAGAIN) {
1481 sclog(sc,
"saw error %d, closing\n", ret);
1483 o2net_ensure_shutdown(nn, sc, 0);
1489 static int o2net_set_nodelay(
struct socket *sock)
1508 (
char __user *)&val,
sizeof(val));
1514 static void o2net_initialize_handshake(
void)
1520 o2net_keepalive_delay());
1522 o2net_reconnect_delay());
1529 static void o2net_sc_connect_completed(
struct work_struct *work)
1535 mlog(
ML_MSG,
"sc sending handshake with ver %llu id %llx\n",
1539 o2net_initialize_handshake();
1540 o2net_sendpage(sc, o2net_hand,
sizeof(*o2net_hand));
1545 static void o2net_sc_send_keep_req(
struct work_struct *work)
1551 o2net_sendpage(sc, o2net_keep_req,
sizeof(*o2net_keep_req));
1558 static void o2net_idle_timer(
unsigned long data)
1562 #ifdef CONFIG_DEBUG_FS
1563 unsigned long msecs = ktime_to_ms(
ktime_get()) -
1564 ktime_to_ms(sc->sc_tv_timer);
1566 unsigned long msecs = o2net_idle_timeout();
1570 "idle for %lu.%lu secs, shutting it down.\n",
SC_NODEF_ARGS(sc),
1571 msecs / 1000, msecs % 1000);
1596 o2net_sc_reset_idle_timer(sc);
1604 static void o2net_start_connect(
struct work_struct *work)
1611 struct sockaddr_in myaddr = {0, }, remoteaddr = {0, };
1627 if (mynode ==
NULL) {
1650 sc = sc_alloc(node);
1652 mlog(0,
"couldn't allocate sc\n");
1659 mlog(0,
"can't create socket: %d\n", ret);
1667 myaddr.
sin_addr.s_addr = mynode->nd_ipv4_address;
1670 ret = sock->
ops->bind(sock, (
struct sockaddr *)&myaddr,
1673 mlog(
ML_ERROR,
"bind failed with %d at address %pI4\n",
1674 ret, &mynode->nd_ipv4_address);
1678 ret = o2net_set_nodelay(sc->
sc_sock);
1680 mlog(
ML_ERROR,
"setting TCP_NODELAY failed with %d\n", ret);
1684 o2net_register_callbacks(sc->
sc_sock->sk, sc);
1688 o2net_set_nn_state(nn, sc, 0, 0);
1691 remoteaddr.sin_family =
AF_INET;
1709 o2net_ensure_shutdown(nn, sc, 0);
1721 static void o2net_connect_expired(
struct work_struct *work)
1729 "node %u after %u.%u seconds, giving up.\n",
1730 o2net_num_from_nn(nn),
1731 o2net_idle_timeout() / 1000,
1732 o2net_idle_timeout() % 1000);
1739 static void o2net_still_up(
struct work_struct *work)
1767 static void o2net_hb_node_down_cb(
struct o2nm_node *node,
int node_num,
1781 static void o2net_hb_node_up_cb(
struct o2nm_node *node,
int node_num,
1784 struct o2net_node *nn = o2net_nn_from_num(node_num);
1802 o2net_set_nn_state(nn,
NULL, 0, 0);
1834 static int o2net_accept_one(
struct socket *sock)
1846 sock->
sk->sk_protocol, &new_sock);
1851 new_sock->
ops = sock->
ops;
1858 ret = o2net_set_nodelay(new_sock);
1860 mlog(
ML_ERROR,
"setting TCP_NODELAY failed with %d\n", ret);
1865 ret = new_sock->
ops->getname(new_sock, (
struct sockaddr *) &sin,
1873 "node at %pI4:%d\n", &sin.sin_addr.s_addr,
1874 ntohs(sin.sin_port));
1882 "at node '%s' (%u, %pI4:%d) from node '%s' (%u, "
1886 node->
nd_num, &sin.sin_addr.s_addr,
ntohs(sin.sin_port));
1894 mlog(
ML_CONN,
"attempt to connect from node '%s' at "
1895 "%pI4:%d but it isn't heartbeating\n",
1896 node->
nd_name, &sin.sin_addr.s_addr,
1897 ntohs(sin.sin_port));
1902 nn = o2net_nn_from_num(node->
nd_num);
1912 "at %pI4:%d but it already has an open connection\n",
1913 node->
nd_name, &sin.sin_addr.s_addr,
1914 ntohs(sin.sin_port));
1918 sc = sc_alloc(node);
1929 o2net_set_nn_state(nn, sc, 0, 0);
1932 o2net_register_callbacks(sc->
sc_sock->sk, sc);
1935 o2net_initialize_handshake();
1936 o2net_sendpage(sc, o2net_hand,
sizeof(*o2net_hand));
1950 static void o2net_accept_many(
struct work_struct *work)
1952 struct socket *sock = o2net_listen_sock;
1953 while (o2net_accept_one(sock) == 0)
1957 static void o2net_listen_data_ready(
struct sock *sk,
int bytes)
1988 .sin_addr = { .s_addr = addr },
2001 sock->
sk->sk_user_data = sock->
sk->sk_data_ready;
2002 sock->
sk->sk_data_ready = o2net_listen_data_ready;
2005 o2net_listen_sock =
sock;
2006 INIT_WORK(&o2net_listen_work, o2net_accept_many);
2009 ret = sock->
ops->bind(sock, (
struct sockaddr *)&sin,
sizeof(sin));
2012 "%pI4:%u\n", ret, &addr,
ntohs(port));
2016 ret = sock->
ops->listen(sock, 64);
2019 ret, &addr,
ntohs(port));
2023 o2net_listen_sock =
NULL;
2046 if (o2net_wq ==
NULL) {
2066 struct socket *sock = o2net_listen_sock;
2074 sock->
sk->sk_data_ready = sock->
sk->sk_user_data;
2075 sock->
sk->sk_user_data =
NULL;
2078 for (i = 0; i <
ARRAY_SIZE(o2net_nodes); i++) {
2092 o2net_listen_sock =
NULL;
2105 if (o2net_debugfs_init())
2111 if (!o2net_hand || !o2net_keep_req || !o2net_keep_resp) {
2113 kfree(o2net_keep_req);
2114 kfree(o2net_keep_resp);
2124 for (i = 0; i <
ARRAY_SIZE(o2net_nodes); i++) {
2125 struct o2net_node *nn = o2net_nn_from_num(i);
2131 o2net_connect_expired);
2147 kfree(o2net_keep_req);
2148 kfree(o2net_keep_resp);
2149 o2net_debugfs_exit();