root/net/sunrpc/xprtrdma/verbs.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. rpcrdma_xprt_drain
  2. rpcrdma_qp_event_handler
  3. rpcrdma_wc_send
  4. rpcrdma_wc_receive
  5. rpcrdma_update_connect_private
  6. rpcrdma_cm_event_handler
  7. rpcrdma_create_id
  8. rpcrdma_ia_open
  9. rpcrdma_ia_remove
  10. rpcrdma_ia_close
  11. rpcrdma_ep_create
  12. rpcrdma_ep_destroy
  13. rpcrdma_ep_recreate_xprt
  14. rpcrdma_ep_reconnect
  15. rpcrdma_ep_connect
  16. rpcrdma_ep_disconnect
  17. rpcrdma_sendctxs_destroy
  18. rpcrdma_sendctx_create
  19. rpcrdma_sendctxs_create
  20. rpcrdma_sendctx_next
  21. rpcrdma_sendctx_get_locked
  22. rpcrdma_sendctx_put_locked
  23. rpcrdma_mrs_create
  24. rpcrdma_mr_refresh_worker
  25. rpcrdma_req_create
  26. rpcrdma_reqs_reset
  27. rpcrdma_rep_create
  28. rpcrdma_rep_destroy
  29. rpcrdma_rep_get_locked
  30. rpcrdma_rep_put
  31. rpcrdma_reps_unmap
  32. rpcrdma_reps_destroy
  33. rpcrdma_buffer_create
  34. rpcrdma_req_destroy
  35. rpcrdma_mrs_destroy
  36. rpcrdma_buffer_destroy
  37. rpcrdma_mr_get
  38. rpcrdma_mr_put
  39. rpcrdma_buffer_get
  40. rpcrdma_buffer_put
  41. rpcrdma_recv_buffer_put
  42. rpcrdma_regbuf_alloc
  43. rpcrdma_regbuf_realloc
  44. __rpcrdma_regbuf_dma_map
  45. rpcrdma_regbuf_dma_unmap
  46. rpcrdma_regbuf_free
  47. rpcrdma_ep_post
  48. rpcrdma_post_recvs

   1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
   2 /*
   3  * Copyright (c) 2014-2017 Oracle.  All rights reserved.
   4  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
   5  *
   6  * This software is available to you under a choice of one of two
   7  * licenses.  You may choose to be licensed under the terms of the GNU
   8  * General Public License (GPL) Version 2, available from the file
   9  * COPYING in the main directory of this source tree, or the BSD-type
  10  * license below:
  11  *
  12  * Redistribution and use in source and binary forms, with or without
  13  * modification, are permitted provided that the following conditions
  14  * are met:
  15  *
  16  *      Redistributions of source code must retain the above copyright
  17  *      notice, this list of conditions and the following disclaimer.
  18  *
  19  *      Redistributions in binary form must reproduce the above
  20  *      copyright notice, this list of conditions and the following
  21  *      disclaimer in the documentation and/or other materials provided
  22  *      with the distribution.
  23  *
  24  *      Neither the name of the Network Appliance, Inc. nor the names of
  25  *      its contributors may be used to endorse or promote products
  26  *      derived from this software without specific prior written
  27  *      permission.
  28  *
  29  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  30  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  31  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  32  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  33  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  34  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  35  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  36  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  37  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  38  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  39  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  40  */
  41 
  42 /*
  43  * verbs.c
  44  *
  45  * Encapsulates the major functions managing:
  46  *  o adapters
  47  *  o endpoints
  48  *  o connections
  49  *  o buffer memory
  50  */
  51 
  52 #include <linux/interrupt.h>
  53 #include <linux/slab.h>
  54 #include <linux/sunrpc/addr.h>
  55 #include <linux/sunrpc/svc_rdma.h>
  56 #include <linux/log2.h>
  57 
  58 #include <asm-generic/barrier.h>
  59 #include <asm/bitops.h>
  60 
  61 #include <rdma/ib_cm.h>
  62 
  63 #include "xprt_rdma.h"
  64 #include <trace/events/rpcrdma.h>
  65 
  66 /*
  67  * Globals/Macros
  68  */
  69 
  70 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  71 # define RPCDBG_FACILITY        RPCDBG_TRANS
  72 #endif
  73 
  74 /*
  75  * internal functions
  76  */
  77 static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc);
  78 static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt);
  79 static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt);
  80 static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
  81 static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf);
  82 static struct rpcrdma_regbuf *
  83 rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
  84                      gfp_t flags);
  85 static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb);
  86 static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb);
  87 
  88 /* Wait for outstanding transport work to finish. ib_drain_qp
  89  * handles the drains in the wrong order for us, so open code
  90  * them here.
  91  */
  92 static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
  93 {
  94         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
  95 
  96         /* Flush Receives, then wait for deferred Reply work
  97          * to complete.
  98          */
  99         ib_drain_rq(ia->ri_id->qp);
 100 
 101         /* Deferred Reply processing might have scheduled
 102          * local invalidations.
 103          */
 104         ib_drain_sq(ia->ri_id->qp);
 105 }
 106 
 107 /**
 108  * rpcrdma_qp_event_handler - Handle one QP event (error notification)
 109  * @event: details of the event
 110  * @context: ep that owns QP where event occurred
 111  *
 112  * Called from the RDMA provider (device driver) possibly in an interrupt
 113  * context.
 114  */
 115 static void
 116 rpcrdma_qp_event_handler(struct ib_event *event, void *context)
 117 {
 118         struct rpcrdma_ep *ep = context;
 119         struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt,
 120                                                    rx_ep);
 121 
 122         trace_xprtrdma_qp_event(r_xprt, event);
 123 }
 124 
 125 /**
 126  * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
 127  * @cq: completion queue (ignored)
 128  * @wc: completed WR
 129  *
 130  */
 131 static void
 132 rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
 133 {
 134         struct ib_cqe *cqe = wc->wr_cqe;
 135         struct rpcrdma_sendctx *sc =
 136                 container_of(cqe, struct rpcrdma_sendctx, sc_cqe);
 137 
 138         /* WARNING: Only wr_cqe and status are reliable at this point */
 139         trace_xprtrdma_wc_send(sc, wc);
 140         rpcrdma_sendctx_put_locked(sc);
 141 }
 142 
 143 /**
 144  * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
 145  * @cq: completion queue (ignored)
 146  * @wc: completed WR
 147  *
 148  */
 149 static void
 150 rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
 151 {
 152         struct ib_cqe *cqe = wc->wr_cqe;
 153         struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
 154                                                rr_cqe);
 155         struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
 156 
 157         /* WARNING: Only wr_cqe and status are reliable at this point */
 158         trace_xprtrdma_wc_receive(wc);
 159         --r_xprt->rx_ep.rep_receive_count;
 160         if (wc->status != IB_WC_SUCCESS)
 161                 goto out_flushed;
 162 
 163         /* status == SUCCESS means all fields in wc are trustworthy */
 164         rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len);
 165         rep->rr_wc_flags = wc->wc_flags;
 166         rep->rr_inv_rkey = wc->ex.invalidate_rkey;
 167 
 168         ib_dma_sync_single_for_cpu(rdmab_device(rep->rr_rdmabuf),
 169                                    rdmab_addr(rep->rr_rdmabuf),
 170                                    wc->byte_len, DMA_FROM_DEVICE);
 171 
 172         rpcrdma_reply_handler(rep);
 173         return;
 174 
 175 out_flushed:
 176         rpcrdma_recv_buffer_put(rep);
 177 }
 178 
 179 static void
 180 rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
 181                                struct rdma_conn_param *param)
 182 {
 183         const struct rpcrdma_connect_private *pmsg = param->private_data;
 184         unsigned int rsize, wsize;
 185 
 186         /* Default settings for RPC-over-RDMA Version One */
 187         r_xprt->rx_ia.ri_implicit_roundup = xprt_rdma_pad_optimize;
 188         rsize = RPCRDMA_V1_DEF_INLINE_SIZE;
 189         wsize = RPCRDMA_V1_DEF_INLINE_SIZE;
 190 
 191         if (pmsg &&
 192             pmsg->cp_magic == rpcrdma_cmp_magic &&
 193             pmsg->cp_version == RPCRDMA_CMP_VERSION) {
 194                 r_xprt->rx_ia.ri_implicit_roundup = true;
 195                 rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size);
 196                 wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
 197         }
 198 
 199         if (rsize < r_xprt->rx_ep.rep_inline_recv)
 200                 r_xprt->rx_ep.rep_inline_recv = rsize;
 201         if (wsize < r_xprt->rx_ep.rep_inline_send)
 202                 r_xprt->rx_ep.rep_inline_send = wsize;
 203         dprintk("RPC:       %s: max send %u, max recv %u\n", __func__,
 204                 r_xprt->rx_ep.rep_inline_send,
 205                 r_xprt->rx_ep.rep_inline_recv);
 206         rpcrdma_set_max_header_sizes(r_xprt);
 207 }
 208 
 209 /**
 210  * rpcrdma_cm_event_handler - Handle RDMA CM events
 211  * @id: rdma_cm_id on which an event has occurred
 212  * @event: details of the event
 213  *
 214  * Called with @id's mutex held. Returns 1 if caller should
 215  * destroy @id, otherwise 0.
 216  */
 217 static int
 218 rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
 219 {
 220         struct rpcrdma_xprt *r_xprt = id->context;
 221         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 222         struct rpcrdma_ep *ep = &r_xprt->rx_ep;
 223         struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 224 
 225         might_sleep();
 226 
 227         trace_xprtrdma_cm_event(r_xprt, event);
 228         switch (event->event) {
 229         case RDMA_CM_EVENT_ADDR_RESOLVED:
 230         case RDMA_CM_EVENT_ROUTE_RESOLVED:
 231                 ia->ri_async_rc = 0;
 232                 complete(&ia->ri_done);
 233                 return 0;
 234         case RDMA_CM_EVENT_ADDR_ERROR:
 235                 ia->ri_async_rc = -EPROTO;
 236                 complete(&ia->ri_done);
 237                 return 0;
 238         case RDMA_CM_EVENT_ROUTE_ERROR:
 239                 ia->ri_async_rc = -ENETUNREACH;
 240                 complete(&ia->ri_done);
 241                 return 0;
 242         case RDMA_CM_EVENT_DEVICE_REMOVAL:
 243 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 244                 pr_info("rpcrdma: removing device %s for %s:%s\n",
 245                         ia->ri_id->device->name,
 246                         rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt));
 247 #endif
 248                 init_completion(&ia->ri_remove_done);
 249                 set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags);
 250                 ep->rep_connected = -ENODEV;
 251                 xprt_force_disconnect(xprt);
 252                 wait_for_completion(&ia->ri_remove_done);
 253 
 254                 ia->ri_id = NULL;
 255                 /* Return 1 to ensure the core destroys the id. */
 256                 return 1;
 257         case RDMA_CM_EVENT_ESTABLISHED:
 258                 ++xprt->connect_cookie;
 259                 ep->rep_connected = 1;
 260                 rpcrdma_update_connect_private(r_xprt, &event->param.conn);
 261                 wake_up_all(&ep->rep_connect_wait);
 262                 break;
 263         case RDMA_CM_EVENT_CONNECT_ERROR:
 264                 ep->rep_connected = -ENOTCONN;
 265                 goto disconnected;
 266         case RDMA_CM_EVENT_UNREACHABLE:
 267                 ep->rep_connected = -ENETUNREACH;
 268                 goto disconnected;
 269         case RDMA_CM_EVENT_REJECTED:
 270                 dprintk("rpcrdma: connection to %s:%s rejected: %s\n",
 271                         rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt),
 272                         rdma_reject_msg(id, event->status));
 273                 ep->rep_connected = -ECONNREFUSED;
 274                 if (event->status == IB_CM_REJ_STALE_CONN)
 275                         ep->rep_connected = -EAGAIN;
 276                 goto disconnected;
 277         case RDMA_CM_EVENT_DISCONNECTED:
 278                 ep->rep_connected = -ECONNABORTED;
 279 disconnected:
 280                 xprt_force_disconnect(xprt);
 281                 wake_up_all(&ep->rep_connect_wait);
 282                 break;
 283         default:
 284                 break;
 285         }
 286 
 287         dprintk("RPC:       %s: %s:%s on %s/frwr: %s\n", __func__,
 288                 rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt),
 289                 ia->ri_id->device->name, rdma_event_msg(event->event));
 290         return 0;
 291 }
 292 
 293 static struct rdma_cm_id *
 294 rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia)
 295 {
 296         unsigned long wtimeout = msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1;
 297         struct rdma_cm_id *id;
 298         int rc;
 299 
 300         trace_xprtrdma_conn_start(xprt);
 301 
 302         init_completion(&ia->ri_done);
 303 
 304         id = rdma_create_id(xprt->rx_xprt.xprt_net, rpcrdma_cm_event_handler,
 305                             xprt, RDMA_PS_TCP, IB_QPT_RC);
 306         if (IS_ERR(id))
 307                 return id;
 308 
 309         ia->ri_async_rc = -ETIMEDOUT;
 310         rc = rdma_resolve_addr(id, NULL,
 311                                (struct sockaddr *)&xprt->rx_xprt.addr,
 312                                RDMA_RESOLVE_TIMEOUT);
 313         if (rc)
 314                 goto out;
 315         rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
 316         if (rc < 0) {
 317                 trace_xprtrdma_conn_tout(xprt);
 318                 goto out;
 319         }
 320 
 321         rc = ia->ri_async_rc;
 322         if (rc)
 323                 goto out;
 324 
 325         ia->ri_async_rc = -ETIMEDOUT;
 326         rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
 327         if (rc)
 328                 goto out;
 329         rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
 330         if (rc < 0) {
 331                 trace_xprtrdma_conn_tout(xprt);
 332                 goto out;
 333         }
 334         rc = ia->ri_async_rc;
 335         if (rc)
 336                 goto out;
 337 
 338         return id;
 339 
 340 out:
 341         rdma_destroy_id(id);
 342         return ERR_PTR(rc);
 343 }
 344 
 345 /*
 346  * Exported functions.
 347  */
 348 
 349 /**
 350  * rpcrdma_ia_open - Open and initialize an Interface Adapter.
 351  * @xprt: transport with IA to (re)initialize
 352  *
 353  * Returns 0 on success, negative errno if an appropriate
 354  * Interface Adapter could not be found and opened.
 355  */
 356 int
 357 rpcrdma_ia_open(struct rpcrdma_xprt *xprt)
 358 {
 359         struct rpcrdma_ia *ia = &xprt->rx_ia;
 360         int rc;
 361 
 362         ia->ri_id = rpcrdma_create_id(xprt, ia);
 363         if (IS_ERR(ia->ri_id)) {
 364                 rc = PTR_ERR(ia->ri_id);
 365                 goto out_err;
 366         }
 367 
 368         ia->ri_pd = ib_alloc_pd(ia->ri_id->device, 0);
 369         if (IS_ERR(ia->ri_pd)) {
 370                 rc = PTR_ERR(ia->ri_pd);
 371                 pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc);
 372                 goto out_err;
 373         }
 374 
 375         switch (xprt_rdma_memreg_strategy) {
 376         case RPCRDMA_FRWR:
 377                 if (frwr_is_supported(ia->ri_id->device))
 378                         break;
 379                 /*FALLTHROUGH*/
 380         default:
 381                 pr_err("rpcrdma: Device %s does not support memreg mode %d\n",
 382                        ia->ri_id->device->name, xprt_rdma_memreg_strategy);
 383                 rc = -EINVAL;
 384                 goto out_err;
 385         }
 386 
 387         return 0;
 388 
 389 out_err:
 390         rpcrdma_ia_close(ia);
 391         return rc;
 392 }
 393 
 394 /**
 395  * rpcrdma_ia_remove - Handle device driver unload
 396  * @ia: interface adapter being removed
 397  *
 398  * Divest transport H/W resources associated with this adapter,
 399  * but allow it to be restored later.
 400  */
 401 void
 402 rpcrdma_ia_remove(struct rpcrdma_ia *ia)
 403 {
 404         struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
 405                                                    rx_ia);
 406         struct rpcrdma_ep *ep = &r_xprt->rx_ep;
 407         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 408         struct rpcrdma_req *req;
 409 
 410         cancel_work_sync(&buf->rb_refresh_worker);
 411 
 412         /* This is similar to rpcrdma_ep_destroy, but:
 413          * - Don't cancel the connect worker.
 414          * - Don't call rpcrdma_ep_disconnect, which waits
 415          *   for another conn upcall, which will deadlock.
 416          * - rdma_disconnect is unneeded, the underlying
 417          *   connection is already gone.
 418          */
 419         if (ia->ri_id->qp) {
 420                 rpcrdma_xprt_drain(r_xprt);
 421                 rdma_destroy_qp(ia->ri_id);
 422                 ia->ri_id->qp = NULL;
 423         }
 424         ib_free_cq(ep->rep_attr.recv_cq);
 425         ep->rep_attr.recv_cq = NULL;
 426         ib_free_cq(ep->rep_attr.send_cq);
 427         ep->rep_attr.send_cq = NULL;
 428 
 429         /* The ULP is responsible for ensuring all DMA
 430          * mappings and MRs are gone.
 431          */
 432         rpcrdma_reps_unmap(r_xprt);
 433         list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
 434                 rpcrdma_regbuf_dma_unmap(req->rl_rdmabuf);
 435                 rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
 436                 rpcrdma_regbuf_dma_unmap(req->rl_recvbuf);
 437         }
 438         rpcrdma_mrs_destroy(buf);
 439         ib_dealloc_pd(ia->ri_pd);
 440         ia->ri_pd = NULL;
 441 
 442         /* Allow waiters to continue */
 443         complete(&ia->ri_remove_done);
 444 
 445         trace_xprtrdma_remove(r_xprt);
 446 }
 447 
 448 /**
 449  * rpcrdma_ia_close - Clean up/close an IA.
 450  * @ia: interface adapter to close
 451  *
 452  */
 453 void
 454 rpcrdma_ia_close(struct rpcrdma_ia *ia)
 455 {
 456         if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
 457                 if (ia->ri_id->qp)
 458                         rdma_destroy_qp(ia->ri_id);
 459                 rdma_destroy_id(ia->ri_id);
 460         }
 461         ia->ri_id = NULL;
 462 
 463         /* If the pd is still busy, xprtrdma missed freeing a resource */
 464         if (ia->ri_pd && !IS_ERR(ia->ri_pd))
 465                 ib_dealloc_pd(ia->ri_pd);
 466         ia->ri_pd = NULL;
 467 }
 468 
 469 /**
 470  * rpcrdma_ep_create - Create unconnected endpoint
 471  * @r_xprt: transport to instantiate
 472  *
 473  * Returns zero on success, or a negative errno.
 474  */
 475 int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt)
 476 {
 477         struct rpcrdma_ep *ep = &r_xprt->rx_ep;
 478         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 479         struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private;
 480         struct ib_cq *sendcq, *recvcq;
 481         unsigned int max_sge;
 482         int rc;
 483 
 484         ep->rep_max_requests = xprt_rdma_slot_table_entries;
 485         ep->rep_inline_send = xprt_rdma_max_inline_write;
 486         ep->rep_inline_recv = xprt_rdma_max_inline_read;
 487 
 488         max_sge = min_t(unsigned int, ia->ri_id->device->attrs.max_send_sge,
 489                         RPCRDMA_MAX_SEND_SGES);
 490         if (max_sge < RPCRDMA_MIN_SEND_SGES) {
 491                 pr_warn("rpcrdma: HCA provides only %d send SGEs\n", max_sge);
 492                 return -ENOMEM;
 493         }
 494         ia->ri_max_send_sges = max_sge;
 495 
 496         rc = frwr_open(ia, ep);
 497         if (rc)
 498                 return rc;
 499 
 500         ep->rep_attr.event_handler = rpcrdma_qp_event_handler;
 501         ep->rep_attr.qp_context = ep;
 502         ep->rep_attr.srq = NULL;
 503         ep->rep_attr.cap.max_send_sge = max_sge;
 504         ep->rep_attr.cap.max_recv_sge = 1;
 505         ep->rep_attr.cap.max_inline_data = 0;
 506         ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
 507         ep->rep_attr.qp_type = IB_QPT_RC;
 508         ep->rep_attr.port_num = ~0;
 509 
 510         dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
 511                 "iovs: send %d recv %d\n",
 512                 __func__,
 513                 ep->rep_attr.cap.max_send_wr,
 514                 ep->rep_attr.cap.max_recv_wr,
 515                 ep->rep_attr.cap.max_send_sge,
 516                 ep->rep_attr.cap.max_recv_sge);
 517 
 518         ep->rep_send_batch = ep->rep_max_requests >> 3;
 519         ep->rep_send_count = ep->rep_send_batch;
 520         init_waitqueue_head(&ep->rep_connect_wait);
 521         ep->rep_receive_count = 0;
 522 
 523         sendcq = ib_alloc_cq_any(ia->ri_id->device, NULL,
 524                                  ep->rep_attr.cap.max_send_wr + 1,
 525                                  IB_POLL_WORKQUEUE);
 526         if (IS_ERR(sendcq)) {
 527                 rc = PTR_ERR(sendcq);
 528                 goto out1;
 529         }
 530 
 531         recvcq = ib_alloc_cq_any(ia->ri_id->device, NULL,
 532                                  ep->rep_attr.cap.max_recv_wr + 1,
 533                                  IB_POLL_WORKQUEUE);
 534         if (IS_ERR(recvcq)) {
 535                 rc = PTR_ERR(recvcq);
 536                 goto out2;
 537         }
 538 
 539         ep->rep_attr.send_cq = sendcq;
 540         ep->rep_attr.recv_cq = recvcq;
 541 
 542         /* Initialize cma parameters */
 543         memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma));
 544 
 545         /* Prepare RDMA-CM private message */
 546         pmsg->cp_magic = rpcrdma_cmp_magic;
 547         pmsg->cp_version = RPCRDMA_CMP_VERSION;
 548         pmsg->cp_flags |= RPCRDMA_CMP_F_SND_W_INV_OK;
 549         pmsg->cp_send_size = rpcrdma_encode_buffer_size(ep->rep_inline_send);
 550         pmsg->cp_recv_size = rpcrdma_encode_buffer_size(ep->rep_inline_recv);
 551         ep->rep_remote_cma.private_data = pmsg;
 552         ep->rep_remote_cma.private_data_len = sizeof(*pmsg);
 553 
 554         /* Client offers RDMA Read but does not initiate */
 555         ep->rep_remote_cma.initiator_depth = 0;
 556         ep->rep_remote_cma.responder_resources =
 557                 min_t(int, U8_MAX, ia->ri_id->device->attrs.max_qp_rd_atom);
 558 
 559         /* Limit transport retries so client can detect server
 560          * GID changes quickly. RPC layer handles re-establishing
 561          * transport connection and retransmission.
 562          */
 563         ep->rep_remote_cma.retry_count = 6;
 564 
 565         /* RPC-over-RDMA handles its own flow control. In addition,
 566          * make all RNR NAKs visible so we know that RPC-over-RDMA
 567          * flow control is working correctly (no NAKs should be seen).
 568          */
 569         ep->rep_remote_cma.flow_control = 0;
 570         ep->rep_remote_cma.rnr_retry_count = 0;
 571 
 572         return 0;
 573 
 574 out2:
 575         ib_free_cq(sendcq);
 576 out1:
 577         return rc;
 578 }
 579 
 580 /**
 581  * rpcrdma_ep_destroy - Disconnect and destroy endpoint.
 582  * @r_xprt: transport instance to shut down
 583  *
 584  */
 585 void rpcrdma_ep_destroy(struct rpcrdma_xprt *r_xprt)
 586 {
 587         struct rpcrdma_ep *ep = &r_xprt->rx_ep;
 588         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 589 
 590         if (ia->ri_id && ia->ri_id->qp) {
 591                 rpcrdma_ep_disconnect(ep, ia);
 592                 rdma_destroy_qp(ia->ri_id);
 593                 ia->ri_id->qp = NULL;
 594         }
 595 
 596         if (ep->rep_attr.recv_cq)
 597                 ib_free_cq(ep->rep_attr.recv_cq);
 598         if (ep->rep_attr.send_cq)
 599                 ib_free_cq(ep->rep_attr.send_cq);
 600 }
 601 
 602 /* Re-establish a connection after a device removal event.
 603  * Unlike a normal reconnection, a fresh PD and a new set
 604  * of MRs and buffers is needed.
 605  */
 606 static int rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
 607                                     struct ib_qp_init_attr *qp_init_attr)
 608 {
 609         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 610         struct rpcrdma_ep *ep = &r_xprt->rx_ep;
 611         int rc, err;
 612 
 613         trace_xprtrdma_reinsert(r_xprt);
 614 
 615         rc = -EHOSTUNREACH;
 616         if (rpcrdma_ia_open(r_xprt))
 617                 goto out1;
 618 
 619         rc = -ENOMEM;
 620         err = rpcrdma_ep_create(r_xprt);
 621         if (err) {
 622                 pr_err("rpcrdma: rpcrdma_ep_create returned %d\n", err);
 623                 goto out2;
 624         }
 625         memcpy(qp_init_attr, &ep->rep_attr, sizeof(*qp_init_attr));
 626 
 627         rc = -ENETUNREACH;
 628         err = rdma_create_qp(ia->ri_id, ia->ri_pd, qp_init_attr);
 629         if (err) {
 630                 pr_err("rpcrdma: rdma_create_qp returned %d\n", err);
 631                 goto out3;
 632         }
 633 
 634         rpcrdma_mrs_create(r_xprt);
 635         return 0;
 636 
 637 out3:
 638         rpcrdma_ep_destroy(r_xprt);
 639 out2:
 640         rpcrdma_ia_close(ia);
 641 out1:
 642         return rc;
 643 }
 644 
 645 static int rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt,
 646                                 struct ib_qp_init_attr *qp_init_attr)
 647 {
 648         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 649         struct rdma_cm_id *id, *old;
 650         int err, rc;
 651 
 652         trace_xprtrdma_reconnect(r_xprt);
 653 
 654         rpcrdma_ep_disconnect(&r_xprt->rx_ep, ia);
 655 
 656         rc = -EHOSTUNREACH;
 657         id = rpcrdma_create_id(r_xprt, ia);
 658         if (IS_ERR(id))
 659                 goto out;
 660 
 661         /* As long as the new ID points to the same device as the
 662          * old ID, we can reuse the transport's existing PD and all
 663          * previously allocated MRs. Also, the same device means
 664          * the transport's previous DMA mappings are still valid.
 665          *
 666          * This is a sanity check only. There should be no way these
 667          * point to two different devices here.
 668          */
 669         old = id;
 670         rc = -ENETUNREACH;
 671         if (ia->ri_id->device != id->device) {
 672                 pr_err("rpcrdma: can't reconnect on different device!\n");
 673                 goto out_destroy;
 674         }
 675 
 676         err = rdma_create_qp(id, ia->ri_pd, qp_init_attr);
 677         if (err)
 678                 goto out_destroy;
 679 
 680         /* Atomically replace the transport's ID and QP. */
 681         rc = 0;
 682         old = ia->ri_id;
 683         ia->ri_id = id;
 684         rdma_destroy_qp(old);
 685 
 686 out_destroy:
 687         rdma_destroy_id(old);
 688 out:
 689         return rc;
 690 }
 691 
 692 /*
 693  * Connect unconnected endpoint.
 694  */
 695 int
 696 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 697 {
 698         struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
 699                                                    rx_ia);
 700         struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 701         struct ib_qp_init_attr qp_init_attr;
 702         int rc;
 703 
 704 retry:
 705         memcpy(&qp_init_attr, &ep->rep_attr, sizeof(qp_init_attr));
 706         switch (ep->rep_connected) {
 707         case 0:
 708                 dprintk("RPC:       %s: connecting...\n", __func__);
 709                 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &qp_init_attr);
 710                 if (rc) {
 711                         rc = -ENETUNREACH;
 712                         goto out_noupdate;
 713                 }
 714                 break;
 715         case -ENODEV:
 716                 rc = rpcrdma_ep_recreate_xprt(r_xprt, &qp_init_attr);
 717                 if (rc)
 718                         goto out_noupdate;
 719                 break;
 720         default:
 721                 rc = rpcrdma_ep_reconnect(r_xprt, &qp_init_attr);
 722                 if (rc)
 723                         goto out;
 724         }
 725 
 726         ep->rep_connected = 0;
 727         xprt_clear_connected(xprt);
 728 
 729         rpcrdma_post_recvs(r_xprt, true);
 730 
 731         rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
 732         if (rc)
 733                 goto out;
 734 
 735         if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
 736                 xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
 737         wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
 738         if (ep->rep_connected <= 0) {
 739                 if (ep->rep_connected == -EAGAIN)
 740                         goto retry;
 741                 rc = ep->rep_connected;
 742                 goto out;
 743         }
 744 
 745         dprintk("RPC:       %s: connected\n", __func__);
 746 
 747 out:
 748         if (rc)
 749                 ep->rep_connected = rc;
 750 
 751 out_noupdate:
 752         return rc;
 753 }
 754 
 755 /**
 756  * rpcrdma_ep_disconnect - Disconnect underlying transport
 757  * @ep: endpoint to disconnect
 758  * @ia: associated interface adapter
 759  *
 760  * This is separate from destroy to facilitate the ability
 761  * to reconnect without recreating the endpoint.
 762  *
 763  * This call is not reentrant, and must not be made in parallel
 764  * on the same endpoint.
 765  */
 766 void
 767 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 768 {
 769         struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt,
 770                                                    rx_ep);
 771         int rc;
 772 
 773         /* returns without wait if ID is not connected */
 774         rc = rdma_disconnect(ia->ri_id);
 775         if (!rc)
 776                 wait_event_interruptible(ep->rep_connect_wait,
 777                                                         ep->rep_connected != 1);
 778         else
 779                 ep->rep_connected = rc;
 780         trace_xprtrdma_disconnect(r_xprt, rc);
 781 
 782         rpcrdma_xprt_drain(r_xprt);
 783         rpcrdma_reqs_reset(r_xprt);
 784 }
 785 
 786 /* Fixed-size circular FIFO queue. This implementation is wait-free and
 787  * lock-free.
 788  *
 789  * Consumer is the code path that posts Sends. This path dequeues a
 790  * sendctx for use by a Send operation. Multiple consumer threads
 791  * are serialized by the RPC transport lock, which allows only one
 792  * ->send_request call at a time.
 793  *
 794  * Producer is the code path that handles Send completions. This path
 795  * enqueues a sendctx that has been completed. Multiple producer
 796  * threads are serialized by the ib_poll_cq() function.
 797  */
 798 
 799 /* rpcrdma_sendctxs_destroy() assumes caller has already quiesced
 800  * queue activity, and rpcrdma_xprt_drain has flushed all remaining
 801  * Send requests.
 802  */
 803 static void rpcrdma_sendctxs_destroy(struct rpcrdma_buffer *buf)
 804 {
 805         unsigned long i;
 806 
 807         for (i = 0; i <= buf->rb_sc_last; i++)
 808                 kfree(buf->rb_sc_ctxs[i]);
 809         kfree(buf->rb_sc_ctxs);
 810 }
 811 
 812 static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ia *ia)
 813 {
 814         struct rpcrdma_sendctx *sc;
 815 
 816         sc = kzalloc(struct_size(sc, sc_sges, ia->ri_max_send_sges),
 817                      GFP_KERNEL);
 818         if (!sc)
 819                 return NULL;
 820 
 821         sc->sc_wr.wr_cqe = &sc->sc_cqe;
 822         sc->sc_wr.sg_list = sc->sc_sges;
 823         sc->sc_wr.opcode = IB_WR_SEND;
 824         sc->sc_cqe.done = rpcrdma_wc_send;
 825         return sc;
 826 }
 827 
 828 static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt)
 829 {
 830         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 831         struct rpcrdma_sendctx *sc;
 832         unsigned long i;
 833 
 834         /* Maximum number of concurrent outstanding Send WRs. Capping
 835          * the circular queue size stops Send Queue overflow by causing
 836          * the ->send_request call to fail temporarily before too many
 837          * Sends are posted.
 838          */
 839         i = buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS;
 840         dprintk("RPC:       %s: allocating %lu send_ctxs\n", __func__, i);
 841         buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), GFP_KERNEL);
 842         if (!buf->rb_sc_ctxs)
 843                 return -ENOMEM;
 844 
 845         buf->rb_sc_last = i - 1;
 846         for (i = 0; i <= buf->rb_sc_last; i++) {
 847                 sc = rpcrdma_sendctx_create(&r_xprt->rx_ia);
 848                 if (!sc)
 849                         return -ENOMEM;
 850 
 851                 sc->sc_xprt = r_xprt;
 852                 buf->rb_sc_ctxs[i] = sc;
 853         }
 854 
 855         return 0;
 856 }
 857 
 858 /* The sendctx queue is not guaranteed to have a size that is a
 859  * power of two, thus the helpers in circ_buf.h cannot be used.
 860  * The other option is to use modulus (%), which can be expensive.
 861  */
 862 static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf,
 863                                           unsigned long item)
 864 {
 865         return likely(item < buf->rb_sc_last) ? item + 1 : 0;
 866 }
 867 
 868 /**
 869  * rpcrdma_sendctx_get_locked - Acquire a send context
 870  * @r_xprt: controlling transport instance
 871  *
 872  * Returns pointer to a free send completion context; or NULL if
 873  * the queue is empty.
 874  *
 875  * Usage: Called to acquire an SGE array before preparing a Send WR.
 876  *
 877  * The caller serializes calls to this function (per transport), and
 878  * provides an effective memory barrier that flushes the new value
 879  * of rb_sc_head.
 880  */
 881 struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt)
 882 {
 883         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 884         struct rpcrdma_sendctx *sc;
 885         unsigned long next_head;
 886 
 887         next_head = rpcrdma_sendctx_next(buf, buf->rb_sc_head);
 888 
 889         if (next_head == READ_ONCE(buf->rb_sc_tail))
 890                 goto out_emptyq;
 891 
 892         /* ORDER: item must be accessed _before_ head is updated */
 893         sc = buf->rb_sc_ctxs[next_head];
 894 
 895         /* Releasing the lock in the caller acts as a memory
 896          * barrier that flushes rb_sc_head.
 897          */
 898         buf->rb_sc_head = next_head;
 899 
 900         return sc;
 901 
 902 out_emptyq:
 903         /* The queue is "empty" if there have not been enough Send
 904          * completions recently. This is a sign the Send Queue is
 905          * backing up. Cause the caller to pause and try again.
 906          */
 907         xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
 908         r_xprt->rx_stats.empty_sendctx_q++;
 909         return NULL;
 910 }
 911 
 912 /**
 913  * rpcrdma_sendctx_put_locked - Release a send context
 914  * @sc: send context to release
 915  *
 916  * Usage: Called from Send completion to return a sendctxt
 917  * to the queue.
 918  *
 919  * The caller serializes calls to this function (per transport).
 920  */
 921 static void
 922 rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
 923 {
 924         struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf;
 925         unsigned long next_tail;
 926 
 927         /* Unmap SGEs of previously completed but unsignaled
 928          * Sends by walking up the queue until @sc is found.
 929          */
 930         next_tail = buf->rb_sc_tail;
 931         do {
 932                 next_tail = rpcrdma_sendctx_next(buf, next_tail);
 933 
 934                 /* ORDER: item must be accessed _before_ tail is updated */
 935                 rpcrdma_sendctx_unmap(buf->rb_sc_ctxs[next_tail]);
 936 
 937         } while (buf->rb_sc_ctxs[next_tail] != sc);
 938 
 939         /* Paired with READ_ONCE */
 940         smp_store_release(&buf->rb_sc_tail, next_tail);
 941 
 942         xprt_write_space(&sc->sc_xprt->rx_xprt);
 943 }
 944 
 945 static void
 946 rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
 947 {
 948         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 949         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 950         unsigned int count;
 951 
 952         for (count = 0; count < ia->ri_max_segs; count++) {
 953                 struct rpcrdma_mr *mr;
 954                 int rc;
 955 
 956                 mr = kzalloc(sizeof(*mr), GFP_NOFS);
 957                 if (!mr)
 958                         break;
 959 
 960                 rc = frwr_init_mr(ia, mr);
 961                 if (rc) {
 962                         kfree(mr);
 963                         break;
 964                 }
 965 
 966                 mr->mr_xprt = r_xprt;
 967 
 968                 spin_lock(&buf->rb_lock);
 969                 rpcrdma_mr_push(mr, &buf->rb_mrs);
 970                 list_add(&mr->mr_all, &buf->rb_all_mrs);
 971                 spin_unlock(&buf->rb_lock);
 972         }
 973 
 974         r_xprt->rx_stats.mrs_allocated += count;
 975         trace_xprtrdma_createmrs(r_xprt, count);
 976 }
 977 
 978 static void
 979 rpcrdma_mr_refresh_worker(struct work_struct *work)
 980 {
 981         struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
 982                                                   rb_refresh_worker);
 983         struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
 984                                                    rx_buf);
 985 
 986         rpcrdma_mrs_create(r_xprt);
 987         xprt_write_space(&r_xprt->rx_xprt);
 988 }
 989 
 990 /**
 991  * rpcrdma_req_create - Allocate an rpcrdma_req object
 992  * @r_xprt: controlling r_xprt
 993  * @size: initial size, in bytes, of send and receive buffers
 994  * @flags: GFP flags passed to memory allocators
 995  *
 996  * Returns an allocated and fully initialized rpcrdma_req or NULL.
 997  */
 998 struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
 999                                        gfp_t flags)
1000 {
1001         struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
1002         struct rpcrdma_regbuf *rb;
1003         struct rpcrdma_req *req;
1004         size_t maxhdrsize;
1005 
1006         req = kzalloc(sizeof(*req), flags);
1007         if (req == NULL)
1008                 goto out1;
1009 
1010         /* Compute maximum header buffer size in bytes */
1011         maxhdrsize = rpcrdma_fixed_maxsz + 3 +
1012                      r_xprt->rx_ia.ri_max_segs * rpcrdma_readchunk_maxsz;
1013         maxhdrsize *= sizeof(__be32);
1014         rb = rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize),
1015                                   DMA_TO_DEVICE, flags);
1016         if (!rb)
1017                 goto out2;
1018         req->rl_rdmabuf = rb;
1019         xdr_buf_init(&req->rl_hdrbuf, rdmab_data(rb), rdmab_length(rb));
1020 
1021         req->rl_sendbuf = rpcrdma_regbuf_alloc(size, DMA_TO_DEVICE, flags);
1022         if (!req->rl_sendbuf)
1023                 goto out3;
1024 
1025         req->rl_recvbuf = rpcrdma_regbuf_alloc(size, DMA_NONE, flags);
1026         if (!req->rl_recvbuf)
1027                 goto out4;
1028 
1029         INIT_LIST_HEAD(&req->rl_free_mrs);
1030         INIT_LIST_HEAD(&req->rl_registered);
1031         spin_lock(&buffer->rb_lock);
1032         list_add(&req->rl_all, &buffer->rb_allreqs);
1033         spin_unlock(&buffer->rb_lock);
1034         return req;
1035 
1036 out4:
1037         kfree(req->rl_sendbuf);
1038 out3:
1039         kfree(req->rl_rdmabuf);
1040 out2:
1041         kfree(req);
1042 out1:
1043         return NULL;
1044 }
1045 
1046 /**
1047  * rpcrdma_reqs_reset - Reset all reqs owned by a transport
1048  * @r_xprt: controlling transport instance
1049  *
1050  * ASSUMPTION: the rb_allreqs list is stable for the duration,
1051  * and thus can be walked without holding rb_lock. Eg. the
1052  * caller is holding the transport send lock to exclude
1053  * device removal or disconnection.
1054  */
1055 static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt)
1056 {
1057         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1058         struct rpcrdma_req *req;
1059 
1060         list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
1061                 /* Credits are valid only for one connection */
1062                 req->rl_slot.rq_cong = 0;
1063         }
1064 }
1065 
1066 static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
1067                                               bool temp)
1068 {
1069         struct rpcrdma_rep *rep;
1070 
1071         rep = kzalloc(sizeof(*rep), GFP_KERNEL);
1072         if (rep == NULL)
1073                 goto out;
1074 
1075         rep->rr_rdmabuf = rpcrdma_regbuf_alloc(r_xprt->rx_ep.rep_inline_recv,
1076                                                DMA_FROM_DEVICE, GFP_KERNEL);
1077         if (!rep->rr_rdmabuf)
1078                 goto out_free;
1079 
1080         xdr_buf_init(&rep->rr_hdrbuf, rdmab_data(rep->rr_rdmabuf),
1081                      rdmab_length(rep->rr_rdmabuf));
1082         rep->rr_cqe.done = rpcrdma_wc_receive;
1083         rep->rr_rxprt = r_xprt;
1084         rep->rr_recv_wr.next = NULL;
1085         rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
1086         rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
1087         rep->rr_recv_wr.num_sge = 1;
1088         rep->rr_temp = temp;
1089         list_add(&rep->rr_all, &r_xprt->rx_buf.rb_all_reps);
1090         return rep;
1091 
1092 out_free:
1093         kfree(rep);
1094 out:
1095         return NULL;
1096 }
1097 
1098 static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
1099 {
1100         list_del(&rep->rr_all);
1101         rpcrdma_regbuf_free(rep->rr_rdmabuf);
1102         kfree(rep);
1103 }
1104 
1105 static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf)
1106 {
1107         struct llist_node *node;
1108 
1109         /* Calls to llist_del_first are required to be serialized */
1110         node = llist_del_first(&buf->rb_free_reps);
1111         if (!node)
1112                 return NULL;
1113         return llist_entry(node, struct rpcrdma_rep, rr_node);
1114 }
1115 
1116 static void rpcrdma_rep_put(struct rpcrdma_buffer *buf,
1117                             struct rpcrdma_rep *rep)
1118 {
1119         llist_add(&rep->rr_node, &buf->rb_free_reps);
1120 }
1121 
1122 static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt)
1123 {
1124         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1125         struct rpcrdma_rep *rep;
1126 
1127         list_for_each_entry(rep, &buf->rb_all_reps, rr_all)
1128                 rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf);
1129 }
1130 
1131 static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf)
1132 {
1133         struct rpcrdma_rep *rep;
1134 
1135         while ((rep = rpcrdma_rep_get_locked(buf)) != NULL)
1136                 rpcrdma_rep_destroy(rep);
1137 }
1138 
1139 /**
1140  * rpcrdma_buffer_create - Create initial set of req/rep objects
1141  * @r_xprt: transport instance to (re)initialize
1142  *
1143  * Returns zero on success, otherwise a negative errno.
1144  */
1145 int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1146 {
1147         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1148         int i, rc;
1149 
1150         buf->rb_max_requests = r_xprt->rx_ep.rep_max_requests;
1151         buf->rb_bc_srv_max_requests = 0;
1152         spin_lock_init(&buf->rb_lock);
1153         INIT_LIST_HEAD(&buf->rb_mrs);
1154         INIT_LIST_HEAD(&buf->rb_all_mrs);
1155         INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker);
1156 
1157         rpcrdma_mrs_create(r_xprt);
1158 
1159         INIT_LIST_HEAD(&buf->rb_send_bufs);
1160         INIT_LIST_HEAD(&buf->rb_allreqs);
1161         INIT_LIST_HEAD(&buf->rb_all_reps);
1162 
1163         rc = -ENOMEM;
1164         for (i = 0; i < buf->rb_max_requests; i++) {
1165                 struct rpcrdma_req *req;
1166 
1167                 req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE,
1168                                          GFP_KERNEL);
1169                 if (!req)
1170                         goto out;
1171                 list_add(&req->rl_list, &buf->rb_send_bufs);
1172         }
1173 
1174         buf->rb_credits = 1;
1175         init_llist_head(&buf->rb_free_reps);
1176 
1177         rc = rpcrdma_sendctxs_create(r_xprt);
1178         if (rc)
1179                 goto out;
1180 
1181         return 0;
1182 out:
1183         rpcrdma_buffer_destroy(buf);
1184         return rc;
1185 }
1186 
1187 /**
1188  * rpcrdma_req_destroy - Destroy an rpcrdma_req object
1189  * @req: unused object to be destroyed
1190  *
1191  * This function assumes that the caller prevents concurrent device
1192  * unload and transport tear-down.
1193  */
1194 void rpcrdma_req_destroy(struct rpcrdma_req *req)
1195 {
1196         struct rpcrdma_mr *mr;
1197 
1198         list_del(&req->rl_all);
1199 
1200         while ((mr = rpcrdma_mr_pop(&req->rl_free_mrs))) {
1201                 struct rpcrdma_buffer *buf = &mr->mr_xprt->rx_buf;
1202 
1203                 spin_lock(&buf->rb_lock);
1204                 list_del(&mr->mr_all);
1205                 spin_unlock(&buf->rb_lock);
1206 
1207                 frwr_release_mr(mr);
1208         }
1209 
1210         rpcrdma_regbuf_free(req->rl_recvbuf);
1211         rpcrdma_regbuf_free(req->rl_sendbuf);
1212         rpcrdma_regbuf_free(req->rl_rdmabuf);
1213         kfree(req);
1214 }
1215 
1216 /**
1217  * rpcrdma_mrs_destroy - Release all of a transport's MRs
1218  * @buf: controlling buffer instance
1219  *
1220  * Relies on caller holding the transport send lock to protect
1221  * removing mr->mr_list from req->rl_free_mrs safely.
1222  */
1223 static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
1224 {
1225         struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
1226                                                    rx_buf);
1227         struct rpcrdma_mr *mr;
1228 
1229         spin_lock(&buf->rb_lock);
1230         while ((mr = list_first_entry_or_null(&buf->rb_all_mrs,
1231                                               struct rpcrdma_mr,
1232                                               mr_all)) != NULL) {
1233                 list_del(&mr->mr_list);
1234                 list_del(&mr->mr_all);
1235                 spin_unlock(&buf->rb_lock);
1236 
1237                 frwr_release_mr(mr);
1238                 spin_lock(&buf->rb_lock);
1239         }
1240         spin_unlock(&buf->rb_lock);
1241         r_xprt->rx_stats.mrs_allocated = 0;
1242 }
1243 
1244 /**
1245  * rpcrdma_buffer_destroy - Release all hw resources
1246  * @buf: root control block for resources
1247  *
1248  * ORDERING: relies on a prior rpcrdma_xprt_drain :
1249  * - No more Send or Receive completions can occur
1250  * - All MRs, reps, and reqs are returned to their free lists
1251  */
1252 void
1253 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1254 {
1255         cancel_work_sync(&buf->rb_refresh_worker);
1256 
1257         rpcrdma_sendctxs_destroy(buf);
1258         rpcrdma_reps_destroy(buf);
1259 
1260         while (!list_empty(&buf->rb_send_bufs)) {
1261                 struct rpcrdma_req *req;
1262 
1263                 req = list_first_entry(&buf->rb_send_bufs,
1264                                        struct rpcrdma_req, rl_list);
1265                 list_del(&req->rl_list);
1266                 rpcrdma_req_destroy(req);
1267         }
1268 
1269         rpcrdma_mrs_destroy(buf);
1270 }
1271 
1272 /**
1273  * rpcrdma_mr_get - Allocate an rpcrdma_mr object
1274  * @r_xprt: controlling transport
1275  *
1276  * Returns an initialized rpcrdma_mr or NULL if no free
1277  * rpcrdma_mr objects are available.
1278  */
1279 struct rpcrdma_mr *
1280 rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
1281 {
1282         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1283         struct rpcrdma_mr *mr;
1284 
1285         spin_lock(&buf->rb_lock);
1286         mr = rpcrdma_mr_pop(&buf->rb_mrs);
1287         spin_unlock(&buf->rb_lock);
1288         return mr;
1289 }
1290 
1291 /**
1292  * rpcrdma_mr_put - DMA unmap an MR and release it
1293  * @mr: MR to release
1294  *
1295  */
1296 void rpcrdma_mr_put(struct rpcrdma_mr *mr)
1297 {
1298         struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
1299 
1300         if (mr->mr_dir != DMA_NONE) {
1301                 trace_xprtrdma_mr_unmap(mr);
1302                 ib_dma_unmap_sg(r_xprt->rx_ia.ri_id->device,
1303                                 mr->mr_sg, mr->mr_nents, mr->mr_dir);
1304                 mr->mr_dir = DMA_NONE;
1305         }
1306 
1307         rpcrdma_mr_push(mr, &mr->mr_req->rl_free_mrs);
1308 }
1309 
1310 /**
1311  * rpcrdma_buffer_get - Get a request buffer
1312  * @buffers: Buffer pool from which to obtain a buffer
1313  *
1314  * Returns a fresh rpcrdma_req, or NULL if none are available.
1315  */
1316 struct rpcrdma_req *
1317 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1318 {
1319         struct rpcrdma_req *req;
1320 
1321         spin_lock(&buffers->rb_lock);
1322         req = list_first_entry_or_null(&buffers->rb_send_bufs,
1323                                        struct rpcrdma_req, rl_list);
1324         if (req)
1325                 list_del_init(&req->rl_list);
1326         spin_unlock(&buffers->rb_lock);
1327         return req;
1328 }
1329 
1330 /**
1331  * rpcrdma_buffer_put - Put request/reply buffers back into pool
1332  * @buffers: buffer pool
1333  * @req: object to return
1334  *
1335  */
1336 void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
1337 {
1338         if (req->rl_reply)
1339                 rpcrdma_rep_put(buffers, req->rl_reply);
1340         req->rl_reply = NULL;
1341 
1342         spin_lock(&buffers->rb_lock);
1343         list_add(&req->rl_list, &buffers->rb_send_bufs);
1344         spin_unlock(&buffers->rb_lock);
1345 }
1346 
1347 /**
1348  * rpcrdma_recv_buffer_put - Release rpcrdma_rep back to free list
1349  * @rep: rep to release
1350  *
1351  * Used after error conditions.
1352  */
1353 void rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1354 {
1355         rpcrdma_rep_put(&rep->rr_rxprt->rx_buf, rep);
1356 }
1357 
1358 /* Returns a pointer to a rpcrdma_regbuf object, or NULL.
1359  *
1360  * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1361  * receiving the payload of RDMA RECV operations. During Long Calls
1362  * or Replies they may be registered externally via frwr_map.
1363  */
1364 static struct rpcrdma_regbuf *
1365 rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
1366                      gfp_t flags)
1367 {
1368         struct rpcrdma_regbuf *rb;
1369 
1370         rb = kmalloc(sizeof(*rb), flags);
1371         if (!rb)
1372                 return NULL;
1373         rb->rg_data = kmalloc(size, flags);
1374         if (!rb->rg_data) {
1375                 kfree(rb);
1376                 return NULL;
1377         }
1378 
1379         rb->rg_device = NULL;
1380         rb->rg_direction = direction;
1381         rb->rg_iov.length = size;
1382         return rb;
1383 }
1384 
1385 /**
1386  * rpcrdma_regbuf_realloc - re-allocate a SEND/RECV buffer
1387  * @rb: regbuf to reallocate
1388  * @size: size of buffer to be allocated, in bytes
1389  * @flags: GFP flags
1390  *
1391  * Returns true if reallocation was successful. If false is
1392  * returned, @rb is left untouched.
1393  */
1394 bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size, gfp_t flags)
1395 {
1396         void *buf;
1397 
1398         buf = kmalloc(size, flags);
1399         if (!buf)
1400                 return false;
1401 
1402         rpcrdma_regbuf_dma_unmap(rb);
1403         kfree(rb->rg_data);
1404 
1405         rb->rg_data = buf;
1406         rb->rg_iov.length = size;
1407         return true;
1408 }
1409 
1410 /**
1411  * __rpcrdma_regbuf_dma_map - DMA-map a regbuf
1412  * @r_xprt: controlling transport instance
1413  * @rb: regbuf to be mapped
1414  *
1415  * Returns true if the buffer is now DMA mapped to @r_xprt's device
1416  */
1417 bool __rpcrdma_regbuf_dma_map(struct rpcrdma_xprt *r_xprt,
1418                               struct rpcrdma_regbuf *rb)
1419 {
1420         struct ib_device *device = r_xprt->rx_ia.ri_id->device;
1421 
1422         if (rb->rg_direction == DMA_NONE)
1423                 return false;
1424 
1425         rb->rg_iov.addr = ib_dma_map_single(device, rdmab_data(rb),
1426                                             rdmab_length(rb), rb->rg_direction);
1427         if (ib_dma_mapping_error(device, rdmab_addr(rb))) {
1428                 trace_xprtrdma_dma_maperr(rdmab_addr(rb));
1429                 return false;
1430         }
1431 
1432         rb->rg_device = device;
1433         rb->rg_iov.lkey = r_xprt->rx_ia.ri_pd->local_dma_lkey;
1434         return true;
1435 }
1436 
1437 static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb)
1438 {
1439         if (!rb)
1440                 return;
1441 
1442         if (!rpcrdma_regbuf_is_mapped(rb))
1443                 return;
1444 
1445         ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb), rdmab_length(rb),
1446                             rb->rg_direction);
1447         rb->rg_device = NULL;
1448 }
1449 
1450 static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb)
1451 {
1452         rpcrdma_regbuf_dma_unmap(rb);
1453         if (rb)
1454                 kfree(rb->rg_data);
1455         kfree(rb);
1456 }
1457 
1458 /**
1459  * rpcrdma_ep_post - Post WRs to a transport's Send Queue
1460  * @ia: transport's device information
1461  * @ep: transport's RDMA endpoint information
1462  * @req: rpcrdma_req containing the Send WR to post
1463  *
1464  * Returns 0 if the post was successful, otherwise -ENOTCONN
1465  * is returned.
1466  */
1467 int
1468 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1469                 struct rpcrdma_ep *ep,
1470                 struct rpcrdma_req *req)
1471 {
1472         struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
1473         int rc;
1474 
1475         if (!ep->rep_send_count || kref_read(&req->rl_kref) > 1) {
1476                 send_wr->send_flags |= IB_SEND_SIGNALED;
1477                 ep->rep_send_count = ep->rep_send_batch;
1478         } else {
1479                 send_wr->send_flags &= ~IB_SEND_SIGNALED;
1480                 --ep->rep_send_count;
1481         }
1482 
1483         rc = frwr_send(ia, req);
1484         trace_xprtrdma_post_send(req, rc);
1485         if (rc)
1486                 return -ENOTCONN;
1487         return 0;
1488 }
1489 
1490 /**
1491  * rpcrdma_post_recvs - Refill the Receive Queue
1492  * @r_xprt: controlling transport instance
1493  * @temp: mark Receive buffers to be deleted after use
1494  *
1495  */
1496 void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
1497 {
1498         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1499         struct rpcrdma_ep *ep = &r_xprt->rx_ep;
1500         struct ib_recv_wr *i, *wr, *bad_wr;
1501         struct rpcrdma_rep *rep;
1502         int needed, count, rc;
1503 
1504         rc = 0;
1505         count = 0;
1506 
1507         needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
1508         if (likely(ep->rep_receive_count > needed))
1509                 goto out;
1510         needed -= ep->rep_receive_count;
1511         if (!temp)
1512                 needed += RPCRDMA_MAX_RECV_BATCH;
1513 
1514         /* fast path: all needed reps can be found on the free list */
1515         wr = NULL;
1516         while (needed) {
1517                 rep = rpcrdma_rep_get_locked(buf);
1518                 if (rep && rep->rr_temp) {
1519                         rpcrdma_rep_destroy(rep);
1520                         continue;
1521                 }
1522                 if (!rep)
1523                         rep = rpcrdma_rep_create(r_xprt, temp);
1524                 if (!rep)
1525                         break;
1526 
1527                 rep->rr_recv_wr.next = wr;
1528                 wr = &rep->rr_recv_wr;
1529                 --needed;
1530         }
1531         if (!wr)
1532                 goto out;
1533 
1534         for (i = wr; i; i = i->next) {
1535                 rep = container_of(i, struct rpcrdma_rep, rr_recv_wr);
1536 
1537                 if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf))
1538                         goto release_wrs;
1539 
1540                 trace_xprtrdma_post_recv(rep);
1541                 ++count;
1542         }
1543 
1544         rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr,
1545                           (const struct ib_recv_wr **)&bad_wr);
1546 out:
1547         trace_xprtrdma_post_recvs(r_xprt, count, rc);
1548         if (rc) {
1549                 for (wr = bad_wr; wr;) {
1550                         struct rpcrdma_rep *rep;
1551 
1552                         rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr);
1553                         wr = wr->next;
1554                         rpcrdma_recv_buffer_put(rep);
1555                         --count;
1556                 }
1557         }
1558         ep->rep_receive_count += count;
1559         return;
1560 
1561 release_wrs:
1562         for (i = wr; i;) {
1563                 rep = container_of(i, struct rpcrdma_rep, rr_recv_wr);
1564                 i = i->next;
1565                 rpcrdma_recv_buffer_put(rep);
1566         }
1567 }

/* [<][>][^][v][top][bottom][index][help] */