33 #include <linux/kernel.h>
38 #include <linux/list.h>
40 #include <linux/export.h>
54 static int send_batch_count = 64;
56 MODULE_PARM_DESC(send_batch_count,
" batch factor when working the send queue");
58 static void rds_send_remove_from_sock(
struct list_head *messages,
int status);
99 spin_unlock_irqrestore(&conn->
c_lock, flags);
117 if (waitqueue_active(&conn->
c_waitq))
153 if (!acquire_in_xmit(conn)) {
163 if (!rds_conn_up(conn)) {
164 release_in_xmit(conn);
169 if (conn->
c_trans->xmit_prepare)
170 conn->
c_trans->xmit_prepare(conn);
190 rm->
data.op_active = 1;
220 spin_unlock_irqrestore(&conn->
c_lock, flags);
232 if (rm->
rdma.op_active &&
237 spin_unlock_irqrestore(&conn->
c_lock, flags);
290 if (rm->
data.op_nents == 0) {
292 int all_ops_are_silent = 1;
294 ops_present = (rm->
atomic.op_active || rm->
rdma.op_active);
296 all_ops_are_silent = 0;
297 if (rm->
rdma.op_active && !rm->
rdma.op_silent)
298 all_ops_are_silent = 0;
300 if (ops_present && all_ops_are_silent
302 rm->
data.op_active = 0;
307 ret = conn->
c_trans->xmit(conn, rm,
315 tmp =
min_t(
int, ret,
360 if (conn->
c_trans->xmit_complete)
361 conn->
c_trans->xmit_complete(conn);
363 release_in_xmit(conn);
366 if (!list_empty(&to_be_dropped)) {
412 return is_acked(rm, ack);
425 struct rm_rdma_op *
ro;
433 ro->op_active && ro->op_notify && ro->op_notifier) {
434 notifier = ro->op_notifier;
436 sock_hold(rds_rs_to_sk(rs));
443 ro->op_notifier =
NULL;
446 spin_unlock_irqrestore(&rm->
m_rs_lock, flags);
450 sock_put(rds_rs_to_sk(rs));
461 struct rm_atomic_op *ao;
469 && ao->op_active && ao->op_notify && ao->op_notifier) {
470 notifier = ao->op_notifier;
472 sock_hold(rds_rs_to_sk(rs));
479 ao->op_notifier =
NULL;
482 spin_unlock_irqrestore(&rm->
m_rs_lock, flags);
486 sock_put(rds_rs_to_sk(rs));
499 struct rm_rdma_op *
ro;
500 struct rm_atomic_op *ao;
503 if (ro->op_active && ro->op_notify && ro->op_notifier) {
504 ro->op_notifier->n_status =
status;
506 ro->op_notifier =
NULL;
510 if (ao->op_active && ao->op_notify && ao->op_notifier) {
511 ao->op_notifier->n_status =
status;
513 ao->op_notifier =
NULL;
525 struct rm_rdma_op *
op)
533 if (&rm->
rdma == op) {
541 if (&rm->
rdma == op) {
549 spin_unlock_irqrestore(&conn->
c_lock, flags);
563 static void rds_send_remove_from_sock(
struct list_head *messages,
int status)
569 while (!list_empty(messages)) {
588 goto unlock_and_drop;
590 if (rs != rm->
m_rs) {
593 sock_put(rds_rs_to_sk(rs));
596 sock_hold(rds_rs_to_sk(rs));
601 struct rm_rdma_op *ro = &rm->
rdma;
605 rds_send_sndbuf_remove(rs, rm);
607 if (ro->op_active && ro->op_notifier &&
608 (ro->op_notify || (ro->op_recverr && status))) {
609 notifier = ro->op_notifier;
622 spin_unlock_irqrestore(&rm->
m_rs_lock, flags);
630 sock_put(rds_rs_to_sk(rs));
655 if (!rds_send_is_acked(rm, ack, is_acked))
663 if (!list_empty(&
list))
666 spin_unlock_irqrestore(&conn->
c_lock, flags);
689 rds_send_sndbuf_remove(rs, rm);
696 spin_unlock_irqrestore(&rs->
rs_lock, flags);
698 if (list_empty(&
list))
704 conn = rm->
m_inc.i_conn;
713 spin_unlock_irqrestore(&conn->
c_lock, flags);
717 spin_unlock_irqrestore(&conn->
c_lock, flags);
730 spin_unlock_irqrestore(&rm->
m_rs_lock, flags);
737 while (!list_empty(&
list)) {
753 __be16 dport,
int *queued)
794 rm->
m_inc.i_conn = conn;
801 spin_unlock(&conn->
c_lock);
803 rdsdebug(
"queued msg %p len %d, rs %p bytes %d seq %llu\n",
810 spin_unlock_irqrestore(&rs->
rs_lock, flags);
866 if (cmsg_groups == 3)
873 struct msghdr *msg,
int *allocated_mr)
923 struct sock *
sk = sock->
sk;
924 struct rds_sock *rs = rds_sk_to_rs(sk);
931 int queued = 0, allocated_mr = 0;
933 long timeo = sock_sndtimeo(sk, nonblock);
965 ret = rds_rm_size(msg, payload_len);
978 if (!rm->
data.op_sg) {
986 rm->
data.op_active = 1;
997 sock->
sk->sk_allocation);
1006 ret = rds_cmsg_send(rs, rm, msg, &allocated_mr);
1010 if (rm->
rdma.op_active && !conn->
c_trans->xmit_rdma) {
1036 if (payload_len > rds_sk_sndbuf(rs)) {
1046 rds_send_queue_rm(rs, conn, rm,
1051 rdsdebug(
"sendmsg woke queued %d timeo %ld\n", queued, timeo);
1092 unsigned long flags;
1102 rm->
data.op_active = 1;
1114 rm->
m_inc.i_conn = conn;
1119 spin_unlock_irqrestore(&conn->
c_lock, flags);