40 #include <linux/module.h>
42 #include <linux/types.h>
45 #include <linux/net.h>
59 # define RPCDBG_FACILITY RPCDBG_XPRT
65 static void xprt_init(
struct rpc_xprt *xprt,
struct net *
net);
66 static void xprt_request_init(
struct rpc_task *,
struct rpc_xprt *);
68 static int __xprt_get_cong(
struct rpc_xprt *,
struct rpc_task *);
69 static void xprt_destroy(
struct rpc_xprt *xprt);
85 #define RPC_CWNDSHIFT (8U)
86 #define RPC_CWNDSCALE (1U << RPC_CWNDSHIFT)
87 #define RPC_INITCWND RPC_CWNDSCALE
88 #define RPC_MAXCWND(xprt) ((xprt)->max_reqs << RPC_CWNDSHIFT)
90 #define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd)
106 struct xprt_class *
t;
110 spin_lock(&xprt_list_lock);
113 if (t->ident == transport->ident)
123 spin_unlock(&xprt_list_lock);
138 struct xprt_class *
t;
142 spin_lock(&xprt_list_lock);
144 if (t == transport) {
146 "RPC: Unregistered %s transport module.\n",
148 list_del_init(&transport->list);
155 spin_unlock(&xprt_list_lock);
170 struct xprt_class *
t;
174 spin_lock(&xprt_list_lock);
176 if (
strcmp(t->name, transport_name) == 0) {
177 spin_unlock(&xprt_list_lock);
181 spin_unlock(&xprt_list_lock);
182 result = request_module(
"xprt%s", transport_name);
203 if (task == xprt->snd_task)
207 xprt->snd_task =
task;
209 req->rq_bytes_sent = 0;
216 dprintk(
"RPC: %5u failed to lock transport %p\n",
222 else if (!req->rq_ntrans)
231 static void xprt_clear_locked(
struct rpc_xprt *xprt)
233 xprt->snd_task =
NULL;
234 if (!
test_bit(XPRT_CLOSE_WAIT, &xprt->state)) {
256 if (task == xprt->snd_task)
261 xprt->snd_task =
task;
264 if (__xprt_get_cong(xprt, task)) {
265 xprt->snd_task =
task;
266 req->rq_bytes_sent = 0;
270 xprt_clear_locked(xprt);
272 dprintk(
"RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
277 else if (!req->rq_ntrans)
286 static inline int xprt_lock_write(
struct rpc_xprt *xprt,
struct rpc_task *
task)
290 spin_lock_bh(&xprt->transport_lock);
291 retval = xprt->ops->reserve_xprt(xprt, task);
292 spin_unlock_bh(&xprt->transport_lock);
296 static bool __xprt_lock_write_func(
struct rpc_task *task,
void *
data)
298 struct rpc_xprt *xprt =
data;
299 struct rpc_rqst *
req;
302 xprt->snd_task =
task;
304 req->rq_bytes_sent = 0;
310 static void __xprt_lock_write_next(
struct rpc_xprt *xprt)
317 xprt_clear_locked(xprt);
320 static bool __xprt_lock_write_cong_func(
struct rpc_task *task,
void *data)
322 struct rpc_xprt *xprt =
data;
323 struct rpc_rqst *
req;
327 xprt->snd_task =
task;
330 if (__xprt_get_cong(xprt, task)) {
331 xprt->snd_task =
task;
332 req->rq_bytes_sent = 0;
339 static void __xprt_lock_write_next_cong(
struct rpc_xprt *xprt)
348 xprt_clear_locked(xprt);
360 if (xprt->snd_task == task) {
361 xprt_clear_locked(xprt);
362 __xprt_lock_write_next(xprt);
377 if (xprt->snd_task == task) {
378 xprt_clear_locked(xprt);
379 __xprt_lock_write_next_cong(xprt);
384 static inline void xprt_release_write(
struct rpc_xprt *xprt,
struct rpc_task *task)
386 spin_lock_bh(&xprt->transport_lock);
387 xprt->ops->release_xprt(xprt, task);
388 spin_unlock_bh(&xprt->transport_lock);
396 __xprt_get_cong(
struct rpc_xprt *xprt,
struct rpc_task *task)
398 struct rpc_rqst *req = task->
tk_rqstp;
402 dprintk(
"RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n",
403 task->tk_pid, xprt->cong, xprt->cwnd);
416 __xprt_put_cong(
struct rpc_xprt *xprt,
struct rpc_rqst *req)
422 __xprt_lock_write_next_cong(xprt);
433 __xprt_put_cong(task->tk_xprt, task->
tk_rqstp);
446 struct rpc_rqst *req = task->
tk_rqstp;
447 struct rpc_xprt *xprt = task->tk_xprt;
448 unsigned long cwnd = xprt->cwnd;
450 if (result >= 0 && cwnd <= xprt->cong) {
456 __xprt_lock_write_next_cong(xprt);
462 dprintk(
"RPC: cong %ld, cwnd was %ld, now %ld\n",
463 xprt->cong, xprt->cwnd, cwnd);
465 __xprt_put_cong(xprt, req);
491 struct rpc_rqst *req = task->
tk_rqstp;
492 struct rpc_xprt *xprt = req->rq_xprt;
507 spin_lock_bh(&xprt->transport_lock);
508 if (xprt->snd_task) {
509 dprintk(
"RPC: write space: waking waiting task on "
513 spin_unlock_bh(&xprt->transport_lock);
542 struct rpc_rqst *req = task->
tk_rqstp;
543 unsigned long max_timeout = clnt->
cl_timeout->to_maxval;
546 task->
tk_timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries;
552 static void xprt_reset_majortimeo(
struct rpc_rqst *req)
554 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
556 req->rq_majortimeo = req->rq_timeout;
557 if (to->to_exponential)
558 req->rq_majortimeo <<= to->to_retries;
560 req->rq_majortimeo += to->to_increment * to->to_retries;
561 if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0)
562 req->rq_majortimeo = to->to_maxval;
573 struct rpc_xprt *xprt = req->rq_xprt;
574 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
578 if (to->to_exponential)
579 req->rq_timeout <<= 1;
581 req->rq_timeout += to->to_increment;
582 if (to->to_maxval && req->rq_timeout >= to->to_maxval)
583 req->rq_timeout = to->to_maxval;
586 req->rq_timeout = to->to_initval;
588 xprt_reset_majortimeo(req);
590 spin_lock_bh(&xprt->transport_lock);
591 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
592 spin_unlock_bh(&xprt->transport_lock);
596 if (req->rq_timeout == 0) {
598 req->rq_timeout = 5 *
HZ;
605 struct rpc_xprt *xprt =
608 xprt->ops->close(xprt);
609 clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
610 xprt_release_write(xprt,
NULL);
620 dprintk(
"RPC: disconnected transport %p\n", xprt);
621 spin_lock_bh(&xprt->transport_lock);
622 xprt_clear_connected(xprt);
624 spin_unlock_bh(&xprt->transport_lock);
636 spin_lock_bh(&xprt->transport_lock);
637 set_bit(XPRT_CLOSE_WAIT, &xprt->state);
642 spin_unlock_bh(&xprt->transport_lock);
659 spin_lock_bh(&xprt->transport_lock);
660 if (cookie != xprt->connect_cookie)
662 if (
test_bit(XPRT_CLOSING, &xprt->state) || !xprt_connected(xprt))
664 set_bit(XPRT_CLOSE_WAIT, &xprt->state);
670 spin_unlock_bh(&xprt->transport_lock);
674 xprt_init_autodisconnect(
unsigned long data)
676 struct rpc_xprt *xprt = (
struct rpc_xprt *)data;
678 spin_lock(&xprt->transport_lock);
679 if (!list_empty(&xprt->recv))
683 spin_unlock(&xprt->transport_lock);
684 set_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
688 spin_unlock(&xprt->transport_lock);
698 struct rpc_xprt *xprt = task->tk_xprt;
700 dprintk(
"RPC: %5u xprt_connect xprt %p %s connected\n", task->tk_pid,
701 xprt, (xprt_connected(xprt) ?
"is" :
"is not"));
703 if (!xprt_bound(xprt)) {
707 if (!xprt_lock_write(xprt, task))
711 xprt->ops->close(xprt);
713 if (xprt_connected(xprt))
714 xprt_release_write(xprt, task);
718 rpc_sleep_on(&xprt->pending, task, xprt_connect_status);
720 if (
test_bit(XPRT_CLOSING, &xprt->state))
722 if (xprt_test_and_set_connecting(xprt))
724 xprt->stat.connect_start =
jiffies;
725 xprt->ops->connect(task);
729 static void xprt_connect_status(
struct rpc_task *task)
731 struct rpc_xprt *xprt = task->tk_xprt;
734 xprt->stat.connect_count++;
735 xprt->stat.connect_time += (
long)jiffies - xprt->stat.connect_start;
736 dprintk(
"RPC: %5u xprt_connect_status: connection established\n",
743 dprintk(
"RPC: %5u xprt_connect_status: retrying\n", task->tk_pid);
746 dprintk(
"RPC: %5u xprt_connect_status: connect attempt timed "
747 "out\n", task->tk_pid);
750 dprintk(
"RPC: %5u xprt_connect_status: error %d connecting to "
751 "server %s\n", task->tk_pid, -task->
tk_status,
753 xprt_release_write(xprt, task);
766 struct rpc_rqst *
entry;
769 if (entry->rq_xid == xid)
772 dprintk(
"RPC: xprt_lookup_rqst did not find xid %08x\n",
774 xprt->stat.bad_xids++;
779 static void xprt_update_rtt(
struct rpc_task *task)
781 struct rpc_rqst *req = task->
tk_rqstp;
783 unsigned int timer = task->
tk_msg.rpc_proc->p_timer;
787 if (req->rq_ntrans == 1)
789 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1);
802 struct rpc_rqst *req = task->
tk_rqstp;
803 struct rpc_xprt *xprt = req->rq_xprt;
805 dprintk(
"RPC: %5u xid %08x complete (%d bytes received)\n",
806 task->tk_pid,
ntohl(req->rq_xid), copied);
809 req->rq_rtt = ktime_sub(
ktime_get(), req->rq_xtime);
810 if (xprt->ops->timer !=
NULL)
811 xprt_update_rtt(task);
813 list_del_init(&req->rq_list);
814 req->rq_private_buf.len = copied;
818 req->rq_reply_bytes_recvd = copied;
823 static void xprt_timer(
struct rpc_task *task)
825 struct rpc_rqst *req = task->
tk_rqstp;
826 struct rpc_xprt *xprt = req->rq_xprt;
830 dprintk(
"RPC: %5u xprt_timer\n", task->tk_pid);
832 spin_lock_bh(&xprt->transport_lock);
833 if (!req->rq_reply_bytes_recvd) {
834 if (xprt->ops->timer)
835 xprt->ops->timer(task);
838 spin_unlock_bh(&xprt->transport_lock);
841 static inline int xprt_has_timer(
struct rpc_xprt *xprt)
843 return xprt->idle_timeout != 0;
853 struct rpc_rqst *req = task->
tk_rqstp;
854 struct rpc_xprt *xprt = req->rq_xprt;
857 dprintk(
"RPC: %5u xprt_prepare_transmit\n", task->tk_pid);
859 spin_lock_bh(&xprt->transport_lock);
860 if (req->rq_reply_bytes_recvd && !req->rq_bytes_sent) {
861 err = req->rq_reply_bytes_recvd;
864 if (!xprt->ops->reserve_xprt(xprt, task))
867 spin_unlock_bh(&xprt->transport_lock);
873 xprt_release_write(task->
tk_rqstp->rq_xprt, task);
884 struct rpc_rqst *req = task->
tk_rqstp;
885 struct rpc_xprt *xprt = req->rq_xprt;
888 dprintk(
"RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
890 if (!req->rq_reply_bytes_recvd) {
891 if (list_empty(&req->rq_list) && rpc_reply_expected(task)) {
895 spin_lock_bh(&xprt->transport_lock);
897 memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
898 sizeof(req->rq_private_buf));
901 spin_unlock_bh(&xprt->transport_lock);
902 xprt_reset_majortimeo(req);
906 }
else if (!req->rq_bytes_sent)
909 req->rq_connect_cookie = xprt->connect_cookie;
911 status = xprt->ops->send_request(task);
917 dprintk(
"RPC: %5u xmit complete\n", task->tk_pid);
919 spin_lock_bh(&xprt->transport_lock);
921 xprt->ops->set_retrans_timeout(task);
924 if (numreqs > xprt->stat.max_slots)
925 xprt->stat.max_slots = numreqs;
927 xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
928 xprt->stat.bklog_u += xprt->backlog.qlen;
929 xprt->stat.sending_u += xprt->sending.qlen;
930 xprt->stat.pending_u += xprt->pending.qlen;
933 if (!xprt_connected(xprt))
935 else if (!req->rq_reply_bytes_recvd && rpc_reply_expected(task)) {
942 spin_unlock_bh(&xprt->transport_lock);
945 static struct rpc_rqst *xprt_dynamic_alloc_slot(
struct rpc_xprt *xprt,
gfp_t gfp_flags)
947 struct rpc_rqst *req = ERR_PTR(-
EAGAIN);
949 if (!atomic_add_unless(&xprt->num_reqs, 1, xprt->max_reqs))
951 req = kzalloc(
sizeof(
struct rpc_rqst), gfp_flags);
960 static bool xprt_dynamic_free_slot(
struct rpc_xprt *xprt,
struct rpc_rqst *req)
962 if (atomic_add_unless(&xprt->num_reqs, -1, xprt->min_reqs)) {
971 struct rpc_rqst *
req;
973 spin_lock(&xprt->reserve_lock);
974 if (!list_empty(&xprt->free)) {
975 req =
list_entry(xprt->free.next,
struct rpc_rqst, rq_list);
982 switch (PTR_ERR(req)) {
984 dprintk(
"RPC: dynamic allocation of request slot "
985 "failed! Retrying\n");
990 dprintk(
"RPC: waiting for request slot\n");
994 spin_unlock(&xprt->reserve_lock);
999 xprt_request_init(task, xprt);
1000 spin_unlock(&xprt->reserve_lock);
1011 if (xprt_lock_write(xprt, task)) {
1013 xprt_release_write(xprt, task);
1018 static void xprt_free_slot(
struct rpc_xprt *xprt,
struct rpc_rqst *req)
1020 spin_lock(&xprt->reserve_lock);
1021 if (!xprt_dynamic_free_slot(xprt, req)) {
1022 memset(req, 0,
sizeof(*req));
1023 list_add(&req->rq_list, &xprt->free);
1026 spin_unlock(&xprt->reserve_lock);
1029 static void xprt_free_all_slots(
struct rpc_xprt *xprt)
1031 struct rpc_rqst *
req;
1032 while (!list_empty(&xprt->free)) {
1040 unsigned int num_prealloc,
1041 unsigned int max_alloc)
1043 struct rpc_xprt *xprt;
1044 struct rpc_rqst *
req;
1051 xprt_init(xprt, net);
1053 for (i = 0; i < num_prealloc; i++) {
1054 req = kzalloc(
sizeof(
struct rpc_rqst),
GFP_KERNEL);
1057 list_add(&req->rq_list, &xprt->free);
1059 if (i < num_prealloc)
1061 if (max_alloc > num_prealloc)
1062 xprt->max_reqs = max_alloc;
1064 xprt->max_reqs = num_prealloc;
1065 xprt->min_reqs = num_prealloc;
1079 put_net(xprt->xprt_net);
1080 xprt_free_all_slots(xprt);
1094 struct rpc_xprt *xprt = task->tk_xprt;
1102 xprt->ops->alloc_slot(xprt, task);
1105 static inline __be32 xprt_alloc_xid(
struct rpc_xprt *xprt)
1110 static inline void xprt_init_xid(
struct rpc_xprt *xprt)
1115 static void xprt_request_init(
struct rpc_task *task,
struct rpc_xprt *xprt)
1117 struct rpc_rqst *req = task->
tk_rqstp;
1119 INIT_LIST_HEAD(&req->rq_list);
1120 req->rq_timeout = task->
tk_client->cl_timeout->to_initval;
1121 req->rq_task =
task;
1122 req->rq_xprt = xprt;
1123 req->rq_buffer =
NULL;
1124 req->rq_xid = xprt_alloc_xid(xprt);
1125 req->rq_release_snd_buf =
NULL;
1126 xprt_reset_majortimeo(req);
1127 dprintk(
"RPC: %5u reserved req %p xid %08x\n", task->tk_pid,
1128 req,
ntohl(req->rq_xid));
1138 struct rpc_xprt *xprt;
1139 struct rpc_rqst *
req;
1144 xprt = req->rq_xprt;
1149 spin_lock_bh(&xprt->transport_lock);
1150 xprt->ops->release_xprt(xprt, task);
1151 if (xprt->ops->release_request)
1152 xprt->ops->release_request(task);
1153 if (!list_empty(&req->rq_list))
1156 if (list_empty(&xprt->recv) && xprt_has_timer(xprt))
1158 xprt->last_used + xprt->idle_timeout);
1159 spin_unlock_bh(&xprt->transport_lock);
1161 xprt->ops->buf_free(req->rq_buffer);
1162 if (req->rq_cred !=
NULL)
1165 if (req->rq_release_snd_buf)
1166 req->rq_release_snd_buf(req);
1168 dprintk(
"RPC: %5u release request %p\n", task->tk_pid, req);
1169 if (
likely(!bc_prealloc(req)))
1170 xprt_free_slot(xprt, req);
1175 static void xprt_init(
struct rpc_xprt *xprt,
struct net *
net)
1182 INIT_LIST_HEAD(&xprt->free);
1183 INIT_LIST_HEAD(&xprt->recv);
1184 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1186 INIT_LIST_HEAD(&xprt->bc_pa_list);
1191 xprt->bind_index = 0;
1198 xprt_init_xid(xprt);
1200 xprt->xprt_net = get_net(net);
1210 struct rpc_xprt *xprt;
1211 struct xprt_class *
t;
1213 spin_lock(&xprt_list_lock);
1215 if (t->ident == args->ident) {
1216 spin_unlock(&xprt_list_lock);
1220 spin_unlock(&xprt_list_lock);
1221 printk(
KERN_ERR "RPC: transport (%d) not supported\n", args->ident);
1222 return ERR_PTR(-
EIO);
1225 xprt = t->setup(args);
1227 dprintk(
"RPC: xprt_create_transport: failed, %ld\n",
1231 INIT_WORK(&xprt->task_cleanup, xprt_autoclose);
1232 if (xprt_has_timer(xprt))
1233 setup_timer(&xprt->timer, xprt_init_autodisconnect,
1234 (
unsigned long)xprt);
1238 if (
strlen(args->servername) > RPC_MAXNETNAMELEN) {
1243 if (xprt->servername ==
NULL) {
1248 dprintk(
"RPC: created transport %p with %u slots\n", xprt,
1259 static void xprt_destroy(
struct rpc_xprt *xprt)
1261 dprintk(
"RPC: destroying transport %p\n", xprt);
1269 kfree(xprt->servername);
1273 xprt->ops->destroy(xprt);