3 #include <linux/module.h>
8 #include <linux/slab.h>
21 #define OSD_OP_FRONT_LEN 4096
22 #define OSD_OPREPLY_FRONT_LEN 512
35 static int op_needs_trail(
int op)
49 static int op_has_extent(
int op)
75 dout(
" skipping last %llu, final file extent %llu~%llu\n",
76 orig_len - *plen, off, *plen);
78 if (op_has_extent(op->
op)) {
79 op->
extent.offset = objoff;
80 op->
extent.length = objlen;
87 dout(
"calc_layout bno=%llx %llu~%llu (%d pages)\n",
129 plen, &bno, req, op);
151 dout(
"%s revoking pages %p from con %p\n", __func__,
165 ceph_put_snap_context(req->
r_snapc);
184 if (needs_trail && op_needs_trail(ops[i].op))
204 int num_op = get_num_ops(ops, &needs_trail);
211 memset(req, 0,
sizeof(*req));
213 req = kzalloc(
sizeof(*req), gfp_flags);
222 init_completion(&req->r_completion);
241 ceph_osdc_put_request(req);
250 ceph_osdc_put_request(req);
253 ceph_pagelist_init(req->
r_trail);
265 ceph_osdc_put_request(req);
313 src->
xattr.name_len);
315 src->
xattr.value_len);
327 src->
cls.method_len);
329 src->
cls.indata_len);
344 &prot_ver,
sizeof(prot_ver));
346 &timeout,
sizeof(timeout));
355 pr_err(
"unrecognized osd opcode %d\n", dst->
op);
379 int num_op = get_num_ops(src_ops,
NULL);
380 size_t msg_size =
sizeof(*head) + num_op*
sizeof(*op);
385 head = msg->
front.iov_base;
386 op = (
void *)(head + 1);
387 p = (
void *)(op + num_op);
389 req->
r_snapc = ceph_get_snap_context(snapc);
394 ceph_encode_timespec(&head->
mtime, mtime);
405 osd_req_encode_op(req, op, src_op);
411 data_len += req->
r_trail->length;
422 if (flags & CEPH_OSD_FLAG_WRITE) {
425 }
else if (data_len) {
433 msg_size = p - msg->
front.iov_base;
461 bool use_mempool,
int num_reply,
488 r = calc_layout(osdc, vino, layout, off, plen, req, ops);
495 req->
r_num_pages = calc_pages_for(page_align, *plen);
520 if (new->r_tid < req->
r_tid)
522 else if (new->r_tid > req->
r_tid)
528 rb_link_node(&new->r_node, parent, p);
540 if (tid < req->r_tid)
542 else if (tid > req->
r_tid)
559 if (tid < req->r_tid) {
563 }
else if (tid > req->
r_tid) {
581 dout(
"__kick_osd_requests osd%d\n", osd->
o_osd);
582 err = __reset_osd(osdc, osd);
588 dout(
"requeued %p tid %llu osd%d\n", req, req->
r_tid,
601 __register_request(osdc, req);
604 __unregister_linger_request(osdc, req);
605 dout(
"requeued lingering %p tid %llu osd%d\n", req, req->
r_tid,
614 __kick_osd_requests(osdc, kickosd);
631 kick_osd_requests(osdc, osd);
643 osd = kzalloc(
sizeof(*osd),
GFP_NOFS);
668 dout(
"get_osd %p FAIL\n", osd);
673 static void put_osd(
struct ceph_osd *osd)
680 if (ac->
ops && ac->
ops->destroy_authorizer)
681 ac->
ops->destroy_authorizer(ac, osd->
o_auth.authorizer);
691 dout(
"__remove_osd %p\n", osd);
701 dout(
"%s %p\n", __func__, osdc);
706 __remove_osd(osdc, osd);
714 dout(
"__move_osd_to_lru %p\n", osd);
720 static void __remove_osd_from_lru(
struct ceph_osd *osd)
722 dout(
"__remove_osd_from_lru %p\n", osd);
731 dout(
"__remove_old_osds %p\n", osdc);
736 __remove_osd(osdc, osd);
749 dout(
"__reset_osd %p osd%d\n", osd, osd->
o_osd);
752 __remove_osd(osdc, osd);
754 &osd->
o_con.peer_addr,
755 sizeof(osd->
o_con.peer_addr)) == 0 &&
757 dout(
" osd addr hasn't changed and connection never opened,"
758 " letting msgr retry");
778 dout(
"__insert_osd %p osd%d\n",
new, new->o_osd);
782 if (new->o_osd < osd->
o_osd)
784 else if (new->o_osd > osd->
o_osd)
790 rb_link_node(&new->o_node, parent, p);
803 else if (o > osd->
o_osd)
814 osdc->
client->options->osd_keepalive_timeout *
HZ);
831 dout(
"__register_request %p tid %lld\n", req, req->
r_tid);
832 __insert_request(osdc, req);
833 ceph_osdc_get_request(req);
836 dout(
" first request, scheduling timeout\n");
837 __schedule_osd_timeout(osdc);
845 __register_request(osdc, req);
856 dout(
"__unregister_request %p tid %lld not registered\n",
861 dout(
"__unregister_request %p tid %lld\n", req, req->
r_tid);
870 if (list_empty(&req->
r_osd->o_requests) &&
871 list_empty(&req->
r_osd->o_linger_requests)) {
872 dout(
"moving osd to %p lru\n", req->
r_osd);
873 __move_osd_to_lru(osdc, req->
r_osd);
879 ceph_osdc_put_request(req);
883 dout(
" no requests, canceling timeout\n");
884 __cancel_osd_timeout(osdc);
902 dout(
"__register_linger_request %p\n", req);
906 &req->
r_osd->o_linger_requests);
912 dout(
"__unregister_linger_request %p\n", req);
917 if (list_empty(&req->
r_osd->o_requests) &&
918 list_empty(&req->
r_osd->o_linger_requests)) {
919 dout(
"moving osd to %p lru\n", req->
r_osd);
920 __move_osd_to_lru(osdc, req->
r_osd);
932 __unregister_linger_request(osdc, req);
933 ceph_osdc_put_request(req);
943 dout(
"set_request_linger %p\n", req);
949 ceph_osdc_get_request(req);
973 dout(
"map_request %p tid %lld\n", req, req->
r_tid);
989 if ((!force_resend &&
997 dout(
"map_request tid %llu pgid %d.%x osd%d (was osd%d)\n",
1006 __cancel_request(req);
1011 req->
r_osd = __lookup_osd(osdc, o);
1012 if (!req->
r_osd && o >= 0) {
1014 req->
r_osd = create_osd(osdc, o);
1020 dout(
"map_request osd %p is osd%d\n", req->
r_osd, o);
1021 __insert_osd(osdc, req->
r_osd);
1025 &osdc->
osdmap->osd_addr[o]);
1029 __remove_osd_from_lru(req->
r_osd);
1049 dout(
"send_request %p tid %llu to osd%d flags %d\n",
1052 reqhead = req->
r_request->front.iov_base;
1072 dout(
"send_queued\n");
1075 __send_request(osdc, req);
1096 unsigned long keepalive =
1097 osdc->
client->options->osd_keepalive_timeout *
HZ;
1098 unsigned long last_stamp = 0;
1114 while (timeout && !list_empty(&osdc->
req_lru)) {
1133 pr_warning(
" tid %llu timed out on osd%d, will reset osd\n",
1135 __kick_osd_requests(osdc, osd);
1143 INIT_LIST_HEAD(&slow_osds);
1150 dout(
" tid %llu is slow, will send keepalive on osd%d\n",
1154 while (!list_empty(&slow_osds)) {
1161 __schedule_osd_timeout(osdc);
1167 static void handle_osds_timeout(
struct work_struct *work)
1172 unsigned long delay =
1173 osdc->
client->options->osd_idle_ttl *
HZ >> 2;
1175 dout(
"osds timeout\n");
1177 remove_old_osds(osdc);
1205 if (msg->
front.iov_len <
sizeof(*rhead))
1210 if (msg->
front.iov_len !=
sizeof(*rhead) + object_len +
1213 dout(
"handle_reply %p tid %llu result %d\n", msg, tid, (
int)result);
1216 req = __lookup_request(osdc, tid);
1218 dout(
"handle_reply tid %llu dne\n", tid);
1222 ceph_osdc_get_request(req);
1230 dout(
" dropping con_filling_msg ref %p\n", con);
1240 dout(
"handle_reply result %d bytes %d\n", req->
r_result,
1250 dout(
"handle_reply tid %llu dup ack\n", tid);
1255 dout(
"handle_reply tid %llu flags %d\n", tid, flags);
1257 if (req->
r_linger && (flags & CEPH_OSD_FLAG_ONDISK))
1258 __register_linger_request(osdc, req);
1262 (flags & CEPH_OSD_FLAG_ONDISK) ||
1264 __unregister_request(osdc, req);
1273 if (flags & CEPH_OSD_FLAG_ONDISK)
1274 complete_request(req);
1278 ceph_osdc_put_request(req);
1282 pr_err(
"corrupt osd_op_reply got %d %d expected %d\n",
1284 (
int)
sizeof(*rhead));
1298 ceph_osd_addr(osdc->
osdmap,
1301 __reset_osd(osdc, osd);
1311 static void kick_requests(
struct ceph_osd_client *osdc,
int force_resend)
1318 dout(
"kick_requests %s\n", force_resend ?
" (force resend)" :
"");
1323 err = __map_request(osdc, req, force_resend);
1327 dout(
"%p tid %llu maps to no osd\n", req, req->
r_tid);
1329 }
else if (err > 0) {
1331 dout(
"%p tid %llu requeued on osd%d\n", req,
1342 dout(
"%p tid %llu restart on osd%d\n",
1345 __register_linger_request(osdc, req);
1346 __unregister_request(osdc, req);
1352 dout(
"linger req=%p req->r_osd=%p\n", req, req->
r_osd);
1354 err = __map_request(osdc, req, force_resend);
1360 dout(
"tid %llu maps to no valid osd\n", req->
r_tid);
1365 dout(
"kicking lingering %p tid %llu osd%d\n", req, req->
r_tid,
1367 __unregister_linger_request(osdc, req);
1368 __register_request(osdc, req);
1373 dout(
"%d requests for down osds, need new map\n", needmap);
1389 u32 nr_maps, maplen;
1396 p = msg->
front.iov_base;
1397 end = p + msg->
front.iov_len;
1401 ceph_decode_copy(&p, &fsid,
sizeof(fsid));
1409 dout(
" %d inc maps\n", nr_maps);
1410 while (nr_maps > 0) {
1412 epoch = ceph_decode_32(&p);
1413 maplen = ceph_decode_32(&p);
1417 dout(
"applying incremental map %u len %d\n",
1422 if (IS_ERR(newmap)) {
1423 err = PTR_ERR(newmap);
1427 if (newmap != osdc->
osdmap) {
1431 kick_requests(osdc, 0);
1432 reset_changed_osds(osdc);
1434 dout(
"ignoring incremental map %u len %d\n",
1445 dout(
" %d full maps\n", nr_maps);
1448 epoch = ceph_decode_32(&p);
1449 maplen = ceph_decode_32(&p);
1452 dout(
"skipping non-latest full map %u len %d\n",
1454 }
else if (osdc->
osdmap && osdc->
osdmap->epoch >= epoch) {
1455 dout(
"skipping full map %u len %d, "
1456 "older than our %u\n", epoch, maplen,
1459 int skipped_map = 0;
1461 dout(
"taking full map %u len %d\n", epoch, maplen);
1463 if (IS_ERR(newmap)) {
1464 err = PTR_ERR(newmap);
1471 if (oldmap->epoch + 1 < newmap->
epoch)
1475 kick_requests(osdc, skipped_map);
1499 pr_err(
"osdc handle_map corrupt msg\n");
1510 static void __release_event(
struct kref *
kref)
1521 kref_get(&event->
kref);
1526 kref_put(&event->
kref, __release_event);
1540 if (new->cookie <
event->cookie)
1542 else if (new->cookie >
event->cookie)
1548 rb_link_node(&new->node, parent, p);
1562 if (cookie < event->cookie)
1564 else if (cookie >
event->cookie)
1577 dout(
"__remove_event removed %p\n", event);
1581 dout(
"__remove_event didn't remove %p\n", event);
1586 void (*event_cb)(
u64,
u64,
u8,
void *),
1587 int one_shot,
void *
data,
1596 dout(
"create_event %p\n", event);
1597 event->cb = event_cb;
1602 kref_init(&event->
kref);
1603 kref_get(&event->
kref);
1608 __insert_event(osdc, event);
1620 dout(
"cancel_event %p\n", event);
1622 __remove_event(event);
1629 static void do_event_work(
struct work_struct *work)
1638 dout(
"do_event_work completing %p\n", event);
1639 event->cb(ver, notify_id, opcode, event->
data);
1641 dout(
"do_event_work completed %p\n", event);
1659 p = msg->
front.iov_base;
1660 end = p + msg->
front.iov_len;
1669 event = __find_event(osdc, cookie);
1673 __remove_event(event);
1676 dout(
"handle_watch_notify cookie %lld ver %lld event %p\n",
1677 cookie, ver, event);
1681 dout(
"ERROR: could not allocate event_work\n");
1690 dout(
"WARNING: failed to queue notify event work\n");
1703 pr_err(
"osdc handle_watch_notify corrupt msg\n");
1711 dout(
"wait_event %p\n", event);
1717 dout(
"wait_event %p returns %d\n", event, err);
1738 register_request(osdc, req);
1748 rc = __map_request(osdc, req, 0);
1751 dout(
"osdc_start_request failed map, "
1752 " will retry %lld\n", req->
r_tid);
1758 dout(
"send_request %p no up osds in pg\n", req);
1761 __send_request(osdc, req);
1784 __cancel_request(req);
1785 __unregister_request(osdc, req);
1787 complete_request(req);
1788 dout(
"wait_request tid %llu canceled/timed out\n", req->
r_tid);
1803 u64 last_tid, next_tid = 0;
1808 req = __lookup_request_ge(osdc, next_tid);
1811 if (req->
r_tid > last_tid)
1814 next_tid = req->
r_tid + 1;
1815 if ((req->
r_flags & CEPH_OSD_FLAG_WRITE) == 0)
1818 ceph_osdc_get_request(req);
1820 dout(
"sync waiting on tid %llu (last is %llu)\n",
1821 req->
r_tid, last_tid);
1824 ceph_osdc_put_request(req);
1827 dout(
"sync done (thru tid %llu)\n", last_tid);
1847 INIT_LIST_HEAD(&osdc->
osd_lru);
1849 INIT_LIST_HEAD(&osdc->
req_lru);
1864 osdc->
req_mempool = mempool_create_kmalloc_pool(10,
1907 remove_all_osds(osdc);
1927 dout(
"readpages on ino %llx.%llx on %llu~%llu\n", vino.
ino,
1928 vino.
snap, off, *plen);
1931 NULL, 0, truncate_seq, truncate_size,
NULL,
1932 false, 1, page_align);
1934 return PTR_ERR(req);
1939 dout(
"readpages final extent is %llu~%llu (%d pages align %d)\n",
1946 ceph_osdc_put_request(req);
1947 dout(
"readpages result %d\n", rc);
1962 int flags,
int do_sync,
bool nofail)
1971 flags | CEPH_OSD_FLAG_ONDISK |
1972 CEPH_OSD_FLAG_WRITE,
1974 truncate_seq, truncate_size, mtime,
1975 nofail, 1, page_align);
1977 return PTR_ERR(req);
1981 dout(
"writepages %llu~%llu (%d pages)\n", off, len,
1988 ceph_osdc_put_request(req);
1991 dout(
"writepages result %d\n", rc);
2014 handle_reply(osdc, msg, con);
2021 pr_err(
"received unknown message type %d %s\n", type,
2046 req = __lookup_request(osdc, tid);
2050 dout(
"get_reply unknown tid %llu from osd%d\n", tid,
2056 dout(
"%s revoking msg %p from old con %p\n", __func__,
2063 if (front > req->
r_reply->front.iov_len) {
2064 pr_warning(
"get_reply front %d > preallocated %d\n",
2065 front, (
int)req->
r_reply->front.iov_len);
2072 m = ceph_msg_get(req->
r_reply);
2078 pr_warning(
"tid %lld reply has %d bytes %d pages, we"
2079 " had only %d pages ready\n", tid, data_len,
2090 m->
bio = req->r_bio;
2095 dout(
"get_reply tid %lld %p\n", tid, m);
2117 return get_reply(con, hdr, skip);
2119 pr_info(
"alloc_msg unexpected msg type %d from osd%d\n", type,
2151 int *
proto,
int force_new)
2159 if (ac->
ops && ac->
ops->destroy_authorizer)
2167 return ERR_PTR(ret);
2175 static int verify_authorizer_reply(
struct ceph_connection *con,
int len)
2185 return ac->
ops->verify_authorizer_reply(ac, o->
o_auth.authorizer, len);
2194 if (ac->
ops && ac->
ops->invalidate_authorizer)