root/net/sunrpc/xprtrdma/backchannel.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. xprt_rdma_bc_setup
  2. xprt_rdma_bc_maxpayload
  3. xprt_rdma_bc_max_slots
  4. rpcrdma_bc_marshal_reply
  5. xprt_rdma_bc_send_reply
  6. xprt_rdma_bc_destroy
  7. xprt_rdma_bc_free_rqst
  8. rpcrdma_bc_rqst_get
  9. rpcrdma_bc_receive_call

   1 // SPDX-License-Identifier: GPL-2.0
   2 /*
   3  * Copyright (c) 2015 Oracle.  All rights reserved.
   4  *
   5  * Support for backward direction RPCs on RPC/RDMA.
   6  */
   7 
   8 #include <linux/sunrpc/xprt.h>
   9 #include <linux/sunrpc/svc.h>
  10 #include <linux/sunrpc/svc_xprt.h>
  11 #include <linux/sunrpc/svc_rdma.h>
  12 
  13 #include "xprt_rdma.h"
  14 #include <trace/events/rpcrdma.h>
  15 
  16 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  17 # define RPCDBG_FACILITY        RPCDBG_TRANS
  18 #endif
  19 
  20 #undef RPCRDMA_BACKCHANNEL_DEBUG
  21 
  22 /**
  23  * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
  24  * @xprt: transport associated with these backchannel resources
  25  * @reqs: number of concurrent incoming requests to expect
  26  *
  27  * Returns 0 on success; otherwise a negative errno
  28  */
  29 int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
  30 {
  31         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
  32 
  33         r_xprt->rx_buf.rb_bc_srv_max_requests = RPCRDMA_BACKWARD_WRS >> 1;
  34         trace_xprtrdma_cb_setup(r_xprt, reqs);
  35         return 0;
  36 }
  37 
  38 /**
  39  * xprt_rdma_bc_maxpayload - Return maximum backchannel message size
  40  * @xprt: transport
  41  *
  42  * Returns maximum size, in bytes, of a backchannel message
  43  */
  44 size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt)
  45 {
  46         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
  47         struct rpcrdma_ep *ep = &r_xprt->rx_ep;
  48         size_t maxmsg;
  49 
  50         maxmsg = min_t(unsigned int, ep->rep_inline_send, ep->rep_inline_recv);
  51         maxmsg = min_t(unsigned int, maxmsg, PAGE_SIZE);
  52         return maxmsg - RPCRDMA_HDRLEN_MIN;
  53 }
  54 
  55 unsigned int xprt_rdma_bc_max_slots(struct rpc_xprt *xprt)
  56 {
  57         return RPCRDMA_BACKWARD_WRS >> 1;
  58 }
  59 
  60 static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
  61 {
  62         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
  63         struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
  64         __be32 *p;
  65 
  66         rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
  67         xdr_init_encode(&req->rl_stream, &req->rl_hdrbuf,
  68                         rdmab_data(req->rl_rdmabuf), rqst);
  69 
  70         p = xdr_reserve_space(&req->rl_stream, 28);
  71         if (unlikely(!p))
  72                 return -EIO;
  73         *p++ = rqst->rq_xid;
  74         *p++ = rpcrdma_version;
  75         *p++ = cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests);
  76         *p++ = rdma_msg;
  77         *p++ = xdr_zero;
  78         *p++ = xdr_zero;
  79         *p = xdr_zero;
  80 
  81         if (rpcrdma_prepare_send_sges(r_xprt, req, RPCRDMA_HDRLEN_MIN,
  82                                       &rqst->rq_snd_buf, rpcrdma_noch))
  83                 return -EIO;
  84 
  85         trace_xprtrdma_cb_reply(rqst);
  86         return 0;
  87 }
  88 
  89 /**
  90  * xprt_rdma_bc_send_reply - marshal and send a backchannel reply
  91  * @rqst: RPC rqst with a backchannel RPC reply in rq_snd_buf
  92  *
  93  * Caller holds the transport's write lock.
  94  *
  95  * Returns:
  96  *      %0 if the RPC message has been sent
  97  *      %-ENOTCONN if the caller should reconnect and call again
  98  *      %-EIO if a permanent error occurred and the request was not
  99  *              sent. Do not try to send this message again.
 100  */
 101 int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst)
 102 {
 103         struct rpc_xprt *xprt = rqst->rq_xprt;
 104         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 105         struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 106         int rc;
 107 
 108         if (!xprt_connected(xprt))
 109                 return -ENOTCONN;
 110 
 111         if (!xprt_request_get_cong(xprt, rqst))
 112                 return -EBADSLT;
 113 
 114         rc = rpcrdma_bc_marshal_reply(rqst);
 115         if (rc < 0)
 116                 goto failed_marshal;
 117 
 118         if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
 119                 goto drop_connection;
 120         return 0;
 121 
 122 failed_marshal:
 123         if (rc != -ENOTCONN)
 124                 return rc;
 125 drop_connection:
 126         xprt_rdma_close(xprt);
 127         return -ENOTCONN;
 128 }
 129 
 130 /**
 131  * xprt_rdma_bc_destroy - Release resources for handling backchannel requests
 132  * @xprt: transport associated with these backchannel resources
 133  * @reqs: number of incoming requests to destroy; ignored
 134  */
 135 void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
 136 {
 137         struct rpc_rqst *rqst, *tmp;
 138 
 139         spin_lock(&xprt->bc_pa_lock);
 140         list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
 141                 list_del(&rqst->rq_bc_pa_list);
 142                 spin_unlock(&xprt->bc_pa_lock);
 143 
 144                 rpcrdma_req_destroy(rpcr_to_rdmar(rqst));
 145 
 146                 spin_lock(&xprt->bc_pa_lock);
 147         }
 148         spin_unlock(&xprt->bc_pa_lock);
 149 }
 150 
 151 /**
 152  * xprt_rdma_bc_free_rqst - Release a backchannel rqst
 153  * @rqst: request to release
 154  */
 155 void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
 156 {
 157         struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 158         struct rpc_xprt *xprt = rqst->rq_xprt;
 159 
 160         rpcrdma_recv_buffer_put(req->rl_reply);
 161         req->rl_reply = NULL;
 162 
 163         spin_lock(&xprt->bc_pa_lock);
 164         list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
 165         spin_unlock(&xprt->bc_pa_lock);
 166         xprt_put(xprt);
 167 }
 168 
 169 static struct rpc_rqst *rpcrdma_bc_rqst_get(struct rpcrdma_xprt *r_xprt)
 170 {
 171         struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 172         struct rpcrdma_req *req;
 173         struct rpc_rqst *rqst;
 174         size_t size;
 175 
 176         spin_lock(&xprt->bc_pa_lock);
 177         rqst = list_first_entry_or_null(&xprt->bc_pa_list, struct rpc_rqst,
 178                                         rq_bc_pa_list);
 179         if (!rqst)
 180                 goto create_req;
 181         list_del(&rqst->rq_bc_pa_list);
 182         spin_unlock(&xprt->bc_pa_lock);
 183         return rqst;
 184 
 185 create_req:
 186         spin_unlock(&xprt->bc_pa_lock);
 187 
 188         /* Set a limit to prevent a remote from overrunning our resources.
 189          */
 190         if (xprt->bc_alloc_count >= RPCRDMA_BACKWARD_WRS)
 191                 return NULL;
 192 
 193         size = min_t(size_t, r_xprt->rx_ep.rep_inline_recv, PAGE_SIZE);
 194         req = rpcrdma_req_create(r_xprt, size, GFP_KERNEL);
 195         if (!req)
 196                 return NULL;
 197 
 198         xprt->bc_alloc_count++;
 199         rqst = &req->rl_slot;
 200         rqst->rq_xprt = xprt;
 201         __set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
 202         xdr_buf_init(&rqst->rq_snd_buf, rdmab_data(req->rl_sendbuf), size);
 203         return rqst;
 204 }
 205 
 206 /**
 207  * rpcrdma_bc_receive_call - Handle a backward direction call
 208  * @r_xprt: transport receiving the call
 209  * @rep: receive buffer containing the call
 210  *
 211  * Operational assumptions:
 212  *    o Backchannel credits are ignored, just as the NFS server
 213  *      forechannel currently does
 214  *    o The ULP manages a replay cache (eg, NFSv4.1 sessions).
 215  *      No replay detection is done at the transport level
 216  */
 217 void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
 218                              struct rpcrdma_rep *rep)
 219 {
 220         struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 221         struct svc_serv *bc_serv;
 222         struct rpcrdma_req *req;
 223         struct rpc_rqst *rqst;
 224         struct xdr_buf *buf;
 225         size_t size;
 226         __be32 *p;
 227 
 228         p = xdr_inline_decode(&rep->rr_stream, 0);
 229         size = xdr_stream_remaining(&rep->rr_stream);
 230 
 231 #ifdef RPCRDMA_BACKCHANNEL_DEBUG
 232         pr_info("RPC:       %s: callback XID %08x, length=%u\n",
 233                 __func__, be32_to_cpup(p), size);
 234         pr_info("RPC:       %s: %*ph\n", __func__, size, p);
 235 #endif
 236 
 237         rqst = rpcrdma_bc_rqst_get(r_xprt);
 238         if (!rqst)
 239                 goto out_overflow;
 240 
 241         rqst->rq_reply_bytes_recvd = 0;
 242         rqst->rq_xid = *p;
 243 
 244         rqst->rq_private_buf.len = size;
 245 
 246         buf = &rqst->rq_rcv_buf;
 247         memset(buf, 0, sizeof(*buf));
 248         buf->head[0].iov_base = p;
 249         buf->head[0].iov_len = size;
 250         buf->len = size;
 251 
 252         /* The receive buffer has to be hooked to the rpcrdma_req
 253          * so that it is not released while the req is pointing
 254          * to its buffer, and so that it can be reposted after
 255          * the Upper Layer is done decoding it.
 256          */
 257         req = rpcr_to_rdmar(rqst);
 258         req->rl_reply = rep;
 259         trace_xprtrdma_cb_call(rqst);
 260 
 261         /* Queue rqst for ULP's callback service */
 262         bc_serv = xprt->bc_serv;
 263         xprt_get(xprt);
 264         spin_lock(&bc_serv->sv_cb_lock);
 265         list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list);
 266         spin_unlock(&bc_serv->sv_cb_lock);
 267 
 268         wake_up(&bc_serv->sv_cb_waitq);
 269 
 270         r_xprt->rx_stats.bcall_count++;
 271         return;
 272 
 273 out_overflow:
 274         pr_warn("RPC/RDMA backchannel overflow\n");
 275         xprt_force_disconnect(xprt);
 276         /* This receive buffer gets reposted automatically
 277          * when the connection is re-established.
 278          */
 279         return;
 280 }

/* [<][>][^][v][top][bottom][index][help] */