4 #include <linux/ctype.h>
9 #include <linux/slab.h>
10 #include <linux/socket.h>
11 #include <linux/string.h>
21 #include <linux/export.h>
74 #define CON_SOCK_STATE_NEW 0
75 #define CON_SOCK_STATE_CLOSED 1
76 #define CON_SOCK_STATE_CONNECTING 2
77 #define CON_SOCK_STATE_CONNECTED 3
78 #define CON_SOCK_STATE_CLOSING 4
83 #define CON_STATE_CLOSED 1
84 #define CON_STATE_PREOPEN 2
85 #define CON_STATE_CONNECTING 3
86 #define CON_STATE_NEGOTIATING 4
87 #define CON_STATE_OPEN 5
88 #define CON_STATE_STANDBY 6
93 #define CON_FLAG_LOSSYTX 0
95 #define CON_FLAG_KEEPALIVE_PENDING 1
96 #define CON_FLAG_WRITE_PENDING 2
97 #define CON_FLAG_SOCK_CLOSED 3
98 #define CON_FLAG_BACKOFF 4
105 #ifdef CONFIG_LOCKDEP
113 #define SKIP_BUF_SIZE 1024
123 #define ADDR_STR_COUNT_LOG 5
124 #define ADDR_STR_COUNT (1 << ADDR_STR_COUNT_LOG)
125 #define ADDR_STR_COUNT_MASK (ADDR_STR_COUNT - 1)
126 #define MAX_ADDR_STR_LEN 64
131 static struct page *zero_page;
143 switch (ss->ss_family) {
197 pr_err(
"msgr_init failed to create workqueue\n");
226 printk(
"%s: unexpected old state %d\n", __func__, old_state);
227 dout(
"%s con %p sock %d -> %d\n", __func__, con, old_state,
237 printk(
"%s: unexpected old state %d\n", __func__, old_state);
238 dout(
"%s con %p sock %d -> %d\n", __func__, con, old_state,
248 printk(
"%s: unexpected old state %d\n", __func__, old_state);
249 dout(
"%s con %p sock %d -> %d\n", __func__, con, old_state,
261 printk(
"%s: unexpected old state %d\n", __func__, old_state);
262 dout(
"%s con %p sock %d -> %d\n", __func__, con, old_state,
275 printk(
"%s: unexpected old state %d\n", __func__, old_state);
276 dout(
"%s con %p sock %d -> %d\n", __func__, con, old_state,
285 static void ceph_sock_data_ready(
struct sock *
sk,
int count_unused)
293 dout(
"%s on %p state = %lu, queueing work\n", __func__,
300 static void ceph_sock_write_space(
struct sock *sk)
312 if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
313 dout(
"%s %p queueing write work\n", __func__, con);
318 dout(
"%s %p nothing to write\n", __func__, con);
323 static void ceph_sock_state_change(
struct sock *sk)
327 dout(
"%s %p state = %lu sk_state = %u\n", __func__,
328 con, con->
state, sk->sk_state);
330 switch (sk->sk_state) {
332 dout(
"%s TCP_CLOSE\n", __func__);
334 dout(
"%s TCP_CLOSE_WAIT\n", __func__);
335 con_sock_state_closing(con);
340 dout(
"%s TCP_ESTABLISHED\n", __func__);
341 con_sock_state_connected(con);
352 static void set_sock_callbacks(
struct socket *
sock,
355 struct sock *sk = sock->
sk;
383 #ifdef CONFIG_LOCKDEP
387 set_sock_callbacks(sock, con);
391 con_sock_state_connecting(con);
392 ret = sock->
ops->connect(sock, (
struct sockaddr *)paddr,
sizeof(*paddr),
395 dout(
"connect %s EINPROGRESS sk_state = %u\n",
398 }
else if (ret < 0) {
399 pr_err(
"connect %s error %d\n",
410 static int ceph_tcp_recvmsg(
struct socket *sock,
void *
buf,
size_t len)
426 static int ceph_tcp_sendmsg(
struct socket *sock,
struct kvec *iov,
427 size_t kvlen,
size_t len,
int more)
443 static int ceph_tcp_sendpage(
struct socket *sock,
struct page *
page,
464 dout(
"con_close_socket on %p sock %p\n", con, con->
sock);
479 con_sock_state_closed(con);
487 static void ceph_msg_remove(
struct ceph_msg *msg)
491 msg->
con->ops->put(msg->
con);
498 while (!list_empty(head)) {
501 ceph_msg_remove(msg);
510 ceph_msg_remove_list(&con->
out_sent);
515 ceph_msg_put(con->
in_msg);
536 dout(
"con_close %p peer %s\n", con,
546 reset_connection(con);
549 con_close_socket(con);
592 dout(
"con_init %p\n", con);
593 memset(con, 0,
sizeof(*con));
598 con_sock_state_init(con);
634 size_t size,
void *
data)
648 static void init_bio_iter(
struct bio *
bio,
struct bio **iter,
int *
seg)
659 static void iter_bio_next(
struct bio **
bio_iter,
int *seg)
661 if (*bio_iter ==
NULL)
664 BUG_ON(*seg >= (*bio_iter)->bi_vcnt);
667 if (*seg == (*bio_iter)->bi_vcnt)
668 init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
705 dout(
"prepare_write_message_footer %p\n", con);
723 con_out_kvec_reset(con);
731 con_out_kvec_add(con,
sizeof (tag_ack), &tag_ack);
759 dout(
"prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n",
767 con_out_kvec_add(con,
sizeof (tag_msg), &tag_msg);
768 con_out_kvec_add(con,
sizeof (m->
hdr), &m->
hdr);
769 con_out_kvec_add(con, m->
front.iov_len, m->
front.iov_base);
772 con_out_kvec_add(con, m->
middle->vec.iov_len,
778 con->
out_msg->footer.flags = 0;
787 con->
out_msg->footer.middle_crc = 0;
788 dout(
"%s front_crc %u middle_crc %u\n", __func__,
793 con->
out_msg->footer.data_crc = 0;
795 prepare_write_message_data(con);
798 prepare_write_message_footer(con);
808 dout(
"prepare_write_ack %p %llu -> %llu\n", con,
812 con_out_kvec_reset(con);
814 con_out_kvec_add(con,
sizeof (tag_ack), &tag_ack);
829 dout(
"prepare_write_keepalive %p\n", con);
830 con_out_kvec_reset(con);
831 con_out_kvec_add(con,
sizeof (tag_keepalive), &tag_keepalive);
844 if (!con->
ops->get_authorizer) {
852 auth = con->
ops->get_authorizer(con, auth_proto, con->
auth_retry);
871 con_out_kvec_add(con,
sizeof (con->
msgr->my_enc_addr),
872 &con->
msgr->my_enc_addr);
899 dout(
"prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
910 auth = get_connect_authorizer(con, &auth_proto);
912 return PTR_ERR(auth);
968 dout(
"write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
973 static void out_msg_pos_next(
struct ceph_connection *con,
struct page *page,
974 size_t len,
size_t sent,
bool in_trail)
991 list_move_tail(&page->
lru,
994 list_move_tail(&page->
lru,
1014 bool do_datacrc = !con->
msgr->nocrc;
1016 int total_max_write;
1017 bool in_trail =
false;
1018 const size_t trail_len = (msg->
trail ? msg->
trail->length : 0);
1019 const size_t trail_off = data_len - trail_len;
1021 dout(
"write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
1034 struct page *page =
NULL;
1038 in_trail = in_trail || con->
out_msg_pos.data_pos >= trail_off;
1040 total_max_write = trail_off - con->
out_msg_pos.data_pos;
1043 total_max_write = data_len - con->
out_msg_pos.data_pos;
1047 }
else if (msg->
pages) {
1053 }
else if (msg->
bio) {
1058 bio_offset = bv->bv_offset;
1059 max_write = bv->bv_len;
1067 if (do_datacrc && !con->
out_msg_pos.did_page_crc) {
1074 base = kaddr + con->
out_msg_pos.page_pos + bio_offset;
1075 crc =
crc32c(crc, base, len);
1080 ret = ceph_tcp_sendpage(con->
sock, page,
1086 out_msg_pos_next(con, page, len, (
size_t) ret, in_trail);
1089 dout(
"write_partial_msg_pages %p msg %p done\n", con, msg);
1094 con_out_kvec_reset(con);
1095 prepare_write_message_footer(con);
1111 ret = ceph_tcp_sendpage(con->
sock, zero_page, 0, size, 1);
1126 dout(
"prepare_read_banner %p\n", con);
1132 dout(
"prepare_read_connect %p\n", con);
1138 dout(
"prepare_read_ack %p\n", con);
1144 dout(
"prepare_read_tag %p\n", con);
1154 dout(
"prepare_read_message %p\n", con);
1163 int end,
int size,
void *
object)
1168 int ret = ceph_tcp_recvmsg(con->
sock,
object + have, left);
1191 ret = read_partial(con, end, size, con->
in_banner);
1221 ret = read_partial(con, end, size, &con->
in_reply);
1231 dout(
"read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n",
1246 pr_err(
"connect to %s got bad banner\n",
1248 con->
error_msg =
"protocol error, bad banner";
1256 switch (ss->ss_family) {
1258 return ((
struct sockaddr_in *)ss)->sin_addr.s_addr == 0;
1261 ((
struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[0] == 0 &&
1262 ((
struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[1] == 0 &&
1263 ((
struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[2] == 0 &&
1264 ((
struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[3] == 0;
1271 switch (ss->ss_family) {
1282 switch (ss->ss_family) {
1296 char delim,
const char **ipend)
1301 memset(ss, 0,
sizeof(*ss));
1319 #ifdef CONFIG_CEPH_LIB_USE_DNS_RESOLVER
1320 static int ceph_dns_resolve_name(
const char *
name,
size_t namelen,
1323 const char *
end, *delim_p;
1331 delim_p =
memchr(name, delim, namelen);
1332 colon_p =
memchr(name,
':', namelen);
1334 if (delim_p && colon_p)
1335 end = delim_p < colon_p ? delim_p : colon_p;
1336 else if (!delim_p && colon_p)
1350 ret = ceph_pton(ip_addr, ip_len, ss, -1,
NULL);
1358 pr_info(
"resolve '%.*s' (ret=%d): %s\n", (
int)(end - name), name,
1364 static inline int ceph_dns_resolve_name(
const char *name,
size_t namelen,
1375 static int ceph_parse_server_name(
const char *name,
size_t namelen,
1380 ret = ceph_pton(name, namelen, ss, delim, ipend);
1382 ret = ceph_dns_resolve_name(name, namelen, ss, delim, ipend);
1398 dout(
"parse_ips on '%.*s'\n", (
int)(end-c), c);
1410 ret = ceph_parse_server_name(p, end - p, ss, delim, &ipend);
1419 dout(
"missing matching ']'\n");
1426 if (p < end && *p ==
':') {
1429 while (p < end && *p >=
'0' && *p <=
'9') {
1430 port = (port * 10) + (*p -
'0');
1433 if (port > 65535 || port == 0)
1439 addr_set_port(ss, port);
1458 pr_err(
"parse_ips bad ip '%.*s'\n", (
int)(end - c), c);
1465 dout(
"process_banner on %p\n", con);
1467 if (verify_hello(con) < 0)
1482 pr_warning(
"wrong peer, want %s/%d, got %s/%d\n",
1487 con->
error_msg =
"wrong peer at address";
1494 if (addr_is_blank(&con->
msgr->inst.addr.in_addr)) {
1495 int port = addr_port(&con->
msgr->inst.addr.in_addr);
1500 addr_set_port(&con->
msgr->inst.addr.in_addr, port);
1501 encode_my_addr(con->
msgr);
1502 dout(
"process_banner learned my addr is %s\n",
1511 reset_connection(con);
1518 u64 sup_feat = con->
msgr->supported_features;
1519 u64 req_feat = con->
msgr->required_features;
1523 dout(
"process_connect on %p tag %d\n", con, (
int)con->
in_tag);
1527 pr_err(
"%s%lld %s feature set mismatch,"
1528 " my %llx < server's %llx, missing %llx\n",
1531 sup_feat, server_feat, server_feat & ~sup_feat);
1532 con->
error_msg =
"missing required protocol features";
1537 pr_err(
"%s%lld %s protocol version mismatch,"
1538 " my %d != server's %d\n",
1543 con->
error_msg =
"protocol version mismatch";
1549 dout(
"process_connect %p got BADAUTHORIZER attempt %d\n", con,
1552 con->
error_msg =
"connect authorization failure";
1556 con_out_kvec_reset(con);
1557 ret = prepare_write_connect(con);
1560 prepare_read_connect(con);
1571 dout(
"process_connect got RESET peer seq %u\n",
1573 pr_err(
"%s%lld %s connection reset\n",
1576 reset_connection(con);
1577 con_out_kvec_reset(con);
1578 ret = prepare_write_connect(con);
1581 prepare_read_connect(con);
1586 if (con->
ops->peer_reset)
1587 con->
ops->peer_reset(con);
1598 dout(
"process_connect got RETRY_SESSION my seq %u, peer %u\n",
1602 con_out_kvec_reset(con);
1603 ret = prepare_write_connect(con);
1606 prepare_read_connect(con);
1614 dout(
"process_connect got RETRY_GLOBAL my %u peer_gseq %u\n",
1617 get_global_seq(con->
msgr,
1619 con_out_kvec_reset(con);
1620 ret = prepare_write_connect(con);
1623 prepare_read_connect(con);
1627 if (req_feat & ~server_feat) {
1628 pr_err(
"%s%lld %s protocol feature mismatch,"
1629 " my required %llx > server's %llx, need %llx\n",
1632 req_feat, server_feat, req_feat & ~server_feat);
1633 con->
error_msg =
"missing required protocol features";
1644 dout(
"process_connect got READY gseq %d cseq %d (%d)\n",
1656 prepare_read_tag(con);
1666 pr_err(
"process_connect got WAIT as client\n");
1667 con->
error_msg =
"protocol error, got WAIT as client";
1671 pr_err(
"connect protocol error, will retry\n");
1672 con->
error_msg =
"protocol error, garbage tag during connect";
1687 return read_partial(con, end, size, &con->
in_temp_ack);
1700 while (!list_empty(&con->
out_sent)) {
1706 dout(
"got ack for seq %llu type %d at %p\n", seq,
1711 prepare_read_tag(con);
1719 unsigned int sec_len,
u32 *crc)
1725 while (section->
iov_len < sec_len) {
1727 left = sec_len - section->
iov_len;
1728 ret = ceph_tcp_recvmsg(con->
sock, (
char *)section->
iov_base +
1734 if (section->
iov_len == sec_len)
1743 struct page **
pages,
1744 unsigned int data_len,
bool do_datacrc)
1757 if (ret > 0 && do_datacrc)
1776 struct bio **bio_iter,
int *
bio_seg,
1777 unsigned int data_len,
bool do_datacrc)
1779 struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
1784 (
int)(bv->bv_len - con->
in_msg_pos.page_pos));
1786 p =
kmap(bv->bv_page) + bv->bv_offset;
1790 if (ret > 0 && do_datacrc)
1799 if (con->
in_msg_pos.page_pos == bv->bv_len) {
1801 iter_bio_next(bio_iter, bio_seg);
1818 bool do_datacrc = !con->
msgr->nocrc;
1822 dout(
"read_partial_message con %p msg %p\n", con, m);
1825 size =
sizeof (con->
in_hdr);
1827 ret = read_partial(con, end, size, &con->
in_hdr);
1833 pr_err(
"read_partial_message bad hdr "
1834 " crc %u != expected %u\n",
1852 pr_info(
"skipping %s%lld %s seq %lld expected %lld\n",
1856 con->
in_base_pos = -front_len - middle_len - data_len -
1861 pr_err(
"read_partial_message bad seq %lld expected %lld\n",
1863 con->
error_msg =
"bad message sequence # for incoming message";
1871 dout(
"got hdr type %d front %d data %d\n", con->
in_hdr.type,
1873 ret = ceph_con_in_msg_alloc(con, &skip);
1878 dout(
"alloc_msg said skip message\n");
1880 con->
in_base_pos = -front_len - middle_len - data_len -
1890 m->
front.iov_len = 0;
1892 m->
middle->vec.iov_len = 0;
1908 ret = read_partial_message_section(con, &m->
front, front_len,
1915 ret = read_partial_message_section(con, &m->
middle->vec,
1923 while (con->
in_msg_pos.data_pos < data_len) {
1925 ret = read_partial_message_pages(con, m->
pages,
1926 data_len, do_datacrc);
1930 }
else if (m->
bio) {
1932 ret = read_partial_message_bio(con,
1934 data_len, do_datacrc);
1944 size =
sizeof (m->
footer);
1946 ret = read_partial(con, end, size, &m->
footer);
1950 dout(
"read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
1951 m, front_len, m->
footer.front_crc, middle_len,
1956 pr_err(
"read_partial_message %p front crc %u != exp. %u\n",
1961 pr_err(
"read_partial_message %p middle crc %u != exp %u\n",
1968 pr_err(
"read_partial_message %p data crc %u != exp. %u\n", m,
1998 dout(
"===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n",
2006 con->
ops->dispatch(con, msg);
2020 dout(
"try_write start %p state %lu\n", con, con->
state);
2030 con_out_kvec_reset(con);
2031 prepare_write_banner(con);
2032 prepare_read_banner(con);
2036 dout(
"try_write initiating connect on %p new state %lu\n",
2038 ret = ceph_tcp_connect(con);
2048 ret = write_partial_skip(con);
2053 ret = write_partial_kvec(con);
2066 ret = write_partial_msg_pages(con);
2072 dout(
"try_write write_partial_msg_pages err %d\n",
2082 prepare_write_message(con);
2086 prepare_write_ack(con);
2091 prepare_write_keepalive(con);
2098 dout(
"try_write nothing else to write.\n");
2101 dout(
"try_write done on %p ret %d\n", con, ret);
2115 dout(
"try_read start on %p state %lu\n", con, con->
state);
2123 dout(
"try_read tag %d in_base_pos %d\n", (
int)con->
in_tag,
2127 dout(
"try_read connecting\n");
2128 ret = read_partial_banner(con);
2131 ret = process_banner(con);
2143 ret = prepare_write_connect(con);
2146 prepare_read_connect(con);
2153 dout(
"try_read negotiating\n");
2154 ret = read_partial_connect(con);
2157 ret = process_connect(con);
2175 ret = ceph_tcp_recvmsg(con->
sock, buf, skip);
2186 ret = ceph_tcp_recvmsg(con->
sock, &con->
in_tag, 1);
2189 dout(
"try_read got tag %d\n", (
int)con->
in_tag);
2192 prepare_read_message(con);
2195 prepare_read_ack(con);
2198 con_close_socket(con);
2206 ret = read_partial_message(con);
2223 prepare_read_tag(con);
2227 ret = read_partial_ack(con);
2235 dout(
"try_read done on %p ret %d\n", con, ret);
2239 pr_err(
"try_read bad con->in_tag = %d\n", (
int)con->
in_tag);
2240 con->
error_msg =
"protocol error, garbage tag";
2252 if (!con->
ops->get(con)) {
2253 dout(
"queue_con %p ref count 0\n", con);
2258 dout(
"queue_con %p - already queued\n", con);
2261 dout(
"queue_con %p\n", con);
2277 switch (con->
state) {
2288 dout(
"unrecognized con state %d\n", (
int)con->
state);
2289 con->
error_msg =
"unrecognized con state";
2296 dout(
"con_work %p backing off\n", con);
2299 dout(
"con_work %p backoff %lu\n", con, con->
delay);
2303 dout(
"con_work %p FAILED to back off %lu\n", con,
2311 dout(
"con_work %p STANDBY\n", con);
2315 dout(
"con_work %p CLOSED\n", con);
2320 dout(
"con_work OPENING\n");
2324 ret = try_read(con);
2328 con->
error_msg =
"socket error on read";
2332 ret = try_write(con);
2336 con->
error_msg =
"socket error on write";
2361 dout(
"fault %p state %lu to peer %s\n",
2368 con_close_socket(con);
2371 dout(
"fault on LOSSYTX channel, marking CLOSED\n");
2379 ceph_msg_put(con->
in_msg);
2391 dout(
"fault %p setting STANDBY clearing WRITE_PENDING\n", con);
2397 if (con->
delay == 0)
2404 dout(
"fault queued %p delay %lu\n", con, con->
delay);
2407 dout(
"fault failed to queue %p delay %lu, backoff\n",
2428 dout(
"calling invalidate_authorizer()\n");
2429 con->
ops->invalidate_authorizer(con);
2432 if (con->
ops->fault)
2433 con->
ops->fault(con);
2443 u32 supported_features,
2444 u32 required_features,
2453 msgr->
inst.addr = *myaddr;
2456 msgr->
inst.addr.type = 0;
2458 encode_my_addr(msgr);
2459 msgr->
nocrc = nocrc;
2463 dout(
"%s %p\n", __func__, msgr);
2471 dout(
"clear_standby %p and ++connect_seq\n", con);
2485 msg->
hdr.src = con->
msgr->inst.name;
2492 dout(
"con_send %p closed, dropping %p\n", con, msg);
2499 msg->
con = con->
ops->get(con);
2504 dout(
"----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
2533 dout(
"%s %p msg %p - was on queue\n", __func__, con, msg);
2536 msg->
con->ops->put(msg->
con);
2543 dout(
"%s %p msg %p - was sending\n", __func__, con, msg);
2565 dout(
"%s msg %p null con\n", __func__, msg);
2572 if (con->
in_msg == msg) {
2578 dout(
"%s %p msg %p revoked\n", __func__, con, msg);
2584 sizeof(struct ceph_msg_footer);
2585 ceph_msg_put(con->
in_msg);
2590 dout(
"%s %p in_msg %p msg %p no-op\n",
2591 __func__, con, con->
in_msg, msg);
2601 dout(
"con_keepalive %p\n", con);
2621 m =
kmalloc(
sizeof(*m), flags);
2624 kref_init(&m->
kref);
2634 m->
hdr.middle_len = 0;
2635 m->
hdr.data_len = 0;
2636 m->
hdr.data_off = 0;
2637 m->
hdr.reserved = 0;
2639 m->
footer.middle_crc = 0;
2671 dout(
"ceph_msg_new can't allocate %d bytes\n",
2680 dout(
"ceph_msg_new %p front %d\n", m, front_len);
2687 pr_err(
"msg_new can't create type %d front %d\n", type,
2691 dout(
"msg_new can't create type %d front %d\n", type,
2710 dout(
"alloc_middle %p type %d %s middle_len %d\n", msg, type,
2736 static int ceph_con_in_msg_alloc(
struct ceph_connection *con,
int *skip)
2746 if (con->
ops->alloc_msg) {
2750 msg = con->
ops->alloc_msg(con, hdr, skip);
2768 "error allocating memory for incoming message";
2775 pr_err(
"unable to allocate msg type %d len %d\n",
2785 if (middle_len && !con->
in_msg->middle) {
2786 ret = ceph_alloc_middle(con, con->
in_msg);
2788 ceph_msg_put(con->
in_msg);
2802 dout(
"msg_kfree %p\n", m);
2817 dout(
"ceph_msg_put last one on %p\n", m);
2822 ceph_buffer_put(m->
middle);
2845 pr_debug(
"msg_dump %p (front_max %d nr_pages %d)\n", msg,
2849 &msg->
hdr,
sizeof(msg->
hdr),
true);
2852 msg->
front.iov_base, msg->
front.iov_len,
true);
2856 msg->
middle->vec.iov_base,
2857 msg->
middle->vec.iov_len,
true);