root/net/sunrpc/xprtrdma/frwr_ops.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. frwr_is_supported
  2. frwr_release_mr
  3. frwr_mr_recycle
  4. frwr_mr_recycle_worker
  5. frwr_recycle
  6. frwr_reset
  7. frwr_init_mr
  8. frwr_open
  9. frwr_maxpages
  10. frwr_map
  11. frwr_wc_fastreg
  12. frwr_send
  13. frwr_reminv
  14. __frwr_release_mr
  15. frwr_wc_localinv
  16. frwr_wc_localinv_wake
  17. frwr_unmap_sync
  18. frwr_wc_localinv_done
  19. frwr_unmap_async

   1 // SPDX-License-Identifier: GPL-2.0
   2 /*
   3  * Copyright (c) 2015, 2017 Oracle.  All rights reserved.
   4  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
   5  */
   6 
   7 /* Lightweight memory registration using Fast Registration Work
   8  * Requests (FRWR).
   9  *
  10  * FRWR features ordered asynchronous registration and invalidation
  11  * of arbitrarily-sized memory regions. This is the fastest and safest
  12  * but most complex memory registration mode.
  13  */
  14 
  15 /* Normal operation
  16  *
  17  * A Memory Region is prepared for RDMA Read or Write using a FAST_REG
  18  * Work Request (frwr_map). When the RDMA operation is finished, this
  19  * Memory Region is invalidated using a LOCAL_INV Work Request
  20  * (frwr_unmap_async and frwr_unmap_sync).
  21  *
  22  * Typically FAST_REG Work Requests are not signaled, and neither are
  23  * RDMA Send Work Requests (with the exception of signaling occasionally
  24  * to prevent provider work queue overflows). This greatly reduces HCA
  25  * interrupt workload.
  26  */
  27 
  28 /* Transport recovery
  29  *
  30  * frwr_map and frwr_unmap_* cannot run at the same time the transport
  31  * connect worker is running. The connect worker holds the transport
  32  * send lock, just as ->send_request does. This prevents frwr_map and
  33  * the connect worker from running concurrently. When a connection is
  34  * closed, the Receive completion queue is drained before the allowing
  35  * the connect worker to get control. This prevents frwr_unmap and the
  36  * connect worker from running concurrently.
  37  *
  38  * When the underlying transport disconnects, MRs that are in flight
  39  * are flushed and are likely unusable. Thus all flushed MRs are
  40  * destroyed. New MRs are created on demand.
  41  */
  42 
  43 #include <linux/sunrpc/rpc_rdma.h>
  44 #include <linux/sunrpc/svc_rdma.h>
  45 
  46 #include "xprt_rdma.h"
  47 #include <trace/events/rpcrdma.h>
  48 
  49 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  50 # define RPCDBG_FACILITY        RPCDBG_TRANS
  51 #endif
  52 
  53 /**
  54  * frwr_is_supported - Check if device supports FRWR
  55  * @device: interface adapter to check
  56  *
  57  * Returns true if device supports FRWR, otherwise false
  58  */
  59 bool frwr_is_supported(struct ib_device *device)
  60 {
  61         struct ib_device_attr *attrs = &device->attrs;
  62 
  63         if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
  64                 goto out_not_supported;
  65         if (attrs->max_fast_reg_page_list_len == 0)
  66                 goto out_not_supported;
  67         return true;
  68 
  69 out_not_supported:
  70         pr_info("rpcrdma: 'frwr' mode is not supported by device %s\n",
  71                 device->name);
  72         return false;
  73 }
  74 
  75 /**
  76  * frwr_release_mr - Destroy one MR
  77  * @mr: MR allocated by frwr_init_mr
  78  *
  79  */
  80 void frwr_release_mr(struct rpcrdma_mr *mr)
  81 {
  82         int rc;
  83 
  84         rc = ib_dereg_mr(mr->frwr.fr_mr);
  85         if (rc)
  86                 trace_xprtrdma_frwr_dereg(mr, rc);
  87         kfree(mr->mr_sg);
  88         kfree(mr);
  89 }
  90 
  91 static void frwr_mr_recycle(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr)
  92 {
  93         trace_xprtrdma_mr_recycle(mr);
  94 
  95         if (mr->mr_dir != DMA_NONE) {
  96                 trace_xprtrdma_mr_unmap(mr);
  97                 ib_dma_unmap_sg(r_xprt->rx_ia.ri_id->device,
  98                                 mr->mr_sg, mr->mr_nents, mr->mr_dir);
  99                 mr->mr_dir = DMA_NONE;
 100         }
 101 
 102         spin_lock(&r_xprt->rx_buf.rb_lock);
 103         list_del(&mr->mr_all);
 104         r_xprt->rx_stats.mrs_recycled++;
 105         spin_unlock(&r_xprt->rx_buf.rb_lock);
 106 
 107         frwr_release_mr(mr);
 108 }
 109 
 110 /* MRs are dynamically allocated, so simply clean up and release the MR.
 111  * A replacement MR will subsequently be allocated on demand.
 112  */
 113 static void
 114 frwr_mr_recycle_worker(struct work_struct *work)
 115 {
 116         struct rpcrdma_mr *mr = container_of(work, struct rpcrdma_mr,
 117                                              mr_recycle);
 118 
 119         frwr_mr_recycle(mr->mr_xprt, mr);
 120 }
 121 
 122 /* frwr_recycle - Discard MRs
 123  * @req: request to reset
 124  *
 125  * Used after a reconnect. These MRs could be in flight, we can't
 126  * tell. Safe thing to do is release them.
 127  */
 128 void frwr_recycle(struct rpcrdma_req *req)
 129 {
 130         struct rpcrdma_mr *mr;
 131 
 132         while ((mr = rpcrdma_mr_pop(&req->rl_registered)))
 133                 frwr_mr_recycle(mr->mr_xprt, mr);
 134 }
 135 
 136 /* frwr_reset - Place MRs back on the free list
 137  * @req: request to reset
 138  *
 139  * Used after a failed marshal. For FRWR, this means the MRs
 140  * don't have to be fully released and recreated.
 141  *
 142  * NB: This is safe only as long as none of @req's MRs are
 143  * involved with an ongoing asynchronous FAST_REG or LOCAL_INV
 144  * Work Request.
 145  */
 146 void frwr_reset(struct rpcrdma_req *req)
 147 {
 148         struct rpcrdma_mr *mr;
 149 
 150         while ((mr = rpcrdma_mr_pop(&req->rl_registered)))
 151                 rpcrdma_mr_put(mr);
 152 }
 153 
 154 /**
 155  * frwr_init_mr - Initialize one MR
 156  * @ia: interface adapter
 157  * @mr: generic MR to prepare for FRWR
 158  *
 159  * Returns zero if successful. Otherwise a negative errno
 160  * is returned.
 161  */
 162 int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
 163 {
 164         unsigned int depth = ia->ri_max_frwr_depth;
 165         struct scatterlist *sg;
 166         struct ib_mr *frmr;
 167         int rc;
 168 
 169         /* NB: ib_alloc_mr and device drivers typically allocate
 170          *     memory with GFP_KERNEL.
 171          */
 172         frmr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype, depth);
 173         if (IS_ERR(frmr))
 174                 goto out_mr_err;
 175 
 176         sg = kcalloc(depth, sizeof(*sg), GFP_NOFS);
 177         if (!sg)
 178                 goto out_list_err;
 179 
 180         mr->frwr.fr_mr = frmr;
 181         mr->mr_dir = DMA_NONE;
 182         INIT_LIST_HEAD(&mr->mr_list);
 183         INIT_WORK(&mr->mr_recycle, frwr_mr_recycle_worker);
 184         init_completion(&mr->frwr.fr_linv_done);
 185 
 186         sg_init_table(sg, depth);
 187         mr->mr_sg = sg;
 188         return 0;
 189 
 190 out_mr_err:
 191         rc = PTR_ERR(frmr);
 192         trace_xprtrdma_frwr_alloc(mr, rc);
 193         return rc;
 194 
 195 out_list_err:
 196         ib_dereg_mr(frmr);
 197         return -ENOMEM;
 198 }
 199 
 200 /**
 201  * frwr_open - Prepare an endpoint for use with FRWR
 202  * @ia: interface adapter this endpoint will use
 203  * @ep: endpoint to prepare
 204  *
 205  * On success, sets:
 206  *      ep->rep_attr.cap.max_send_wr
 207  *      ep->rep_attr.cap.max_recv_wr
 208  *      ep->rep_max_requests
 209  *      ia->ri_max_segs
 210  *
 211  * And these FRWR-related fields:
 212  *      ia->ri_max_frwr_depth
 213  *      ia->ri_mrtype
 214  *
 215  * On failure, a negative errno is returned.
 216  */
 217 int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep)
 218 {
 219         struct ib_device_attr *attrs = &ia->ri_id->device->attrs;
 220         int max_qp_wr, depth, delta;
 221 
 222         ia->ri_mrtype = IB_MR_TYPE_MEM_REG;
 223         if (attrs->device_cap_flags & IB_DEVICE_SG_GAPS_REG)
 224                 ia->ri_mrtype = IB_MR_TYPE_SG_GAPS;
 225 
 226         /* Quirk: Some devices advertise a large max_fast_reg_page_list_len
 227          * capability, but perform optimally when the MRs are not larger
 228          * than a page.
 229          */
 230         if (attrs->max_sge_rd > 1)
 231                 ia->ri_max_frwr_depth = attrs->max_sge_rd;
 232         else
 233                 ia->ri_max_frwr_depth = attrs->max_fast_reg_page_list_len;
 234         if (ia->ri_max_frwr_depth > RPCRDMA_MAX_DATA_SEGS)
 235                 ia->ri_max_frwr_depth = RPCRDMA_MAX_DATA_SEGS;
 236         dprintk("RPC:       %s: max FR page list depth = %u\n",
 237                 __func__, ia->ri_max_frwr_depth);
 238 
 239         /* Add room for frwr register and invalidate WRs.
 240          * 1. FRWR reg WR for head
 241          * 2. FRWR invalidate WR for head
 242          * 3. N FRWR reg WRs for pagelist
 243          * 4. N FRWR invalidate WRs for pagelist
 244          * 5. FRWR reg WR for tail
 245          * 6. FRWR invalidate WR for tail
 246          * 7. The RDMA_SEND WR
 247          */
 248         depth = 7;
 249 
 250         /* Calculate N if the device max FRWR depth is smaller than
 251          * RPCRDMA_MAX_DATA_SEGS.
 252          */
 253         if (ia->ri_max_frwr_depth < RPCRDMA_MAX_DATA_SEGS) {
 254                 delta = RPCRDMA_MAX_DATA_SEGS - ia->ri_max_frwr_depth;
 255                 do {
 256                         depth += 2; /* FRWR reg + invalidate */
 257                         delta -= ia->ri_max_frwr_depth;
 258                 } while (delta > 0);
 259         }
 260 
 261         max_qp_wr = ia->ri_id->device->attrs.max_qp_wr;
 262         max_qp_wr -= RPCRDMA_BACKWARD_WRS;
 263         max_qp_wr -= 1;
 264         if (max_qp_wr < RPCRDMA_MIN_SLOT_TABLE)
 265                 return -ENOMEM;
 266         if (ep->rep_max_requests > max_qp_wr)
 267                 ep->rep_max_requests = max_qp_wr;
 268         ep->rep_attr.cap.max_send_wr = ep->rep_max_requests * depth;
 269         if (ep->rep_attr.cap.max_send_wr > max_qp_wr) {
 270                 ep->rep_max_requests = max_qp_wr / depth;
 271                 if (!ep->rep_max_requests)
 272                         return -EINVAL;
 273                 ep->rep_attr.cap.max_send_wr = ep->rep_max_requests * depth;
 274         }
 275         ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
 276         ep->rep_attr.cap.max_send_wr += 1; /* for ib_drain_sq */
 277         ep->rep_attr.cap.max_recv_wr = ep->rep_max_requests;
 278         ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
 279         ep->rep_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */
 280 
 281         ia->ri_max_segs =
 282                 DIV_ROUND_UP(RPCRDMA_MAX_DATA_SEGS, ia->ri_max_frwr_depth);
 283         /* Reply chunks require segments for head and tail buffers */
 284         ia->ri_max_segs += 2;
 285         if (ia->ri_max_segs > RPCRDMA_MAX_HDR_SEGS)
 286                 ia->ri_max_segs = RPCRDMA_MAX_HDR_SEGS;
 287         return 0;
 288 }
 289 
 290 /**
 291  * frwr_maxpages - Compute size of largest payload
 292  * @r_xprt: transport
 293  *
 294  * Returns maximum size of an RPC message, in pages.
 295  *
 296  * FRWR mode conveys a list of pages per chunk segment. The
 297  * maximum length of that list is the FRWR page list depth.
 298  */
 299 size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt)
 300 {
 301         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 302 
 303         return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
 304                      (ia->ri_max_segs - 2) * ia->ri_max_frwr_depth);
 305 }
 306 
 307 /**
 308  * frwr_map - Register a memory region
 309  * @r_xprt: controlling transport
 310  * @seg: memory region co-ordinates
 311  * @nsegs: number of segments remaining
 312  * @writing: true when RDMA Write will be used
 313  * @xid: XID of RPC using the registered memory
 314  * @mr: MR to fill in
 315  *
 316  * Prepare a REG_MR Work Request to register a memory region
 317  * for remote access via RDMA READ or RDMA WRITE.
 318  *
 319  * Returns the next segment or a negative errno pointer.
 320  * On success, @mr is filled in.
 321  */
 322 struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
 323                                 struct rpcrdma_mr_seg *seg,
 324                                 int nsegs, bool writing, __be32 xid,
 325                                 struct rpcrdma_mr *mr)
 326 {
 327         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 328         struct ib_reg_wr *reg_wr;
 329         int i, n, dma_nents;
 330         struct ib_mr *ibmr;
 331         u8 key;
 332 
 333         if (nsegs > ia->ri_max_frwr_depth)
 334                 nsegs = ia->ri_max_frwr_depth;
 335         for (i = 0; i < nsegs;) {
 336                 if (seg->mr_page)
 337                         sg_set_page(&mr->mr_sg[i],
 338                                     seg->mr_page,
 339                                     seg->mr_len,
 340                                     offset_in_page(seg->mr_offset));
 341                 else
 342                         sg_set_buf(&mr->mr_sg[i], seg->mr_offset,
 343                                    seg->mr_len);
 344 
 345                 ++seg;
 346                 ++i;
 347                 if (ia->ri_mrtype == IB_MR_TYPE_SG_GAPS)
 348                         continue;
 349                 if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
 350                     offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
 351                         break;
 352         }
 353         mr->mr_dir = rpcrdma_data_dir(writing);
 354         mr->mr_nents = i;
 355 
 356         dma_nents = ib_dma_map_sg(ia->ri_id->device, mr->mr_sg, mr->mr_nents,
 357                                   mr->mr_dir);
 358         if (!dma_nents)
 359                 goto out_dmamap_err;
 360 
 361         ibmr = mr->frwr.fr_mr;
 362         n = ib_map_mr_sg(ibmr, mr->mr_sg, dma_nents, NULL, PAGE_SIZE);
 363         if (n != dma_nents)
 364                 goto out_mapmr_err;
 365 
 366         ibmr->iova &= 0x00000000ffffffff;
 367         ibmr->iova |= ((u64)be32_to_cpu(xid)) << 32;
 368         key = (u8)(ibmr->rkey & 0x000000FF);
 369         ib_update_fast_reg_key(ibmr, ++key);
 370 
 371         reg_wr = &mr->frwr.fr_regwr;
 372         reg_wr->mr = ibmr;
 373         reg_wr->key = ibmr->rkey;
 374         reg_wr->access = writing ?
 375                          IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
 376                          IB_ACCESS_REMOTE_READ;
 377 
 378         mr->mr_handle = ibmr->rkey;
 379         mr->mr_length = ibmr->length;
 380         mr->mr_offset = ibmr->iova;
 381         trace_xprtrdma_mr_map(mr);
 382 
 383         return seg;
 384 
 385 out_dmamap_err:
 386         mr->mr_dir = DMA_NONE;
 387         trace_xprtrdma_frwr_sgerr(mr, i);
 388         return ERR_PTR(-EIO);
 389 
 390 out_mapmr_err:
 391         trace_xprtrdma_frwr_maperr(mr, n);
 392         return ERR_PTR(-EIO);
 393 }
 394 
 395 /**
 396  * frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC
 397  * @cq: completion queue (ignored)
 398  * @wc: completed WR
 399  *
 400  */
 401 static void frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
 402 {
 403         struct ib_cqe *cqe = wc->wr_cqe;
 404         struct rpcrdma_frwr *frwr =
 405                 container_of(cqe, struct rpcrdma_frwr, fr_cqe);
 406 
 407         /* WARNING: Only wr_cqe and status are reliable at this point */
 408         trace_xprtrdma_wc_fastreg(wc, frwr);
 409         /* The MR will get recycled when the associated req is retransmitted */
 410 }
 411 
 412 /**
 413  * frwr_send - post Send WR containing the RPC Call message
 414  * @ia: interface adapter
 415  * @req: Prepared RPC Call
 416  *
 417  * For FRWR, chain any FastReg WRs to the Send WR. Only a
 418  * single ib_post_send call is needed to register memory
 419  * and then post the Send WR.
 420  *
 421  * Returns the result of ib_post_send.
 422  */
 423 int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
 424 {
 425         struct ib_send_wr *post_wr;
 426         struct rpcrdma_mr *mr;
 427 
 428         post_wr = &req->rl_sendctx->sc_wr;
 429         list_for_each_entry(mr, &req->rl_registered, mr_list) {
 430                 struct rpcrdma_frwr *frwr;
 431 
 432                 frwr = &mr->frwr;
 433 
 434                 frwr->fr_cqe.done = frwr_wc_fastreg;
 435                 frwr->fr_regwr.wr.next = post_wr;
 436                 frwr->fr_regwr.wr.wr_cqe = &frwr->fr_cqe;
 437                 frwr->fr_regwr.wr.num_sge = 0;
 438                 frwr->fr_regwr.wr.opcode = IB_WR_REG_MR;
 439                 frwr->fr_regwr.wr.send_flags = 0;
 440 
 441                 post_wr = &frwr->fr_regwr.wr;
 442         }
 443 
 444         /* If ib_post_send fails, the next ->send_request for
 445          * @req will queue these MRs for recovery.
 446          */
 447         return ib_post_send(ia->ri_id->qp, post_wr, NULL);
 448 }
 449 
 450 /**
 451  * frwr_reminv - handle a remotely invalidated mr on the @mrs list
 452  * @rep: Received reply
 453  * @mrs: list of MRs to check
 454  *
 455  */
 456 void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
 457 {
 458         struct rpcrdma_mr *mr;
 459 
 460         list_for_each_entry(mr, mrs, mr_list)
 461                 if (mr->mr_handle == rep->rr_inv_rkey) {
 462                         list_del_init(&mr->mr_list);
 463                         trace_xprtrdma_mr_remoteinv(mr);
 464                         rpcrdma_mr_put(mr);
 465                         break;  /* only one invalidated MR per RPC */
 466                 }
 467 }
 468 
 469 static void __frwr_release_mr(struct ib_wc *wc, struct rpcrdma_mr *mr)
 470 {
 471         if (wc->status != IB_WC_SUCCESS)
 472                 rpcrdma_mr_recycle(mr);
 473         else
 474                 rpcrdma_mr_put(mr);
 475 }
 476 
 477 /**
 478  * frwr_wc_localinv - Invoked by RDMA provider for a LOCAL_INV WC
 479  * @cq: completion queue (ignored)
 480  * @wc: completed WR
 481  *
 482  */
 483 static void frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
 484 {
 485         struct ib_cqe *cqe = wc->wr_cqe;
 486         struct rpcrdma_frwr *frwr =
 487                 container_of(cqe, struct rpcrdma_frwr, fr_cqe);
 488         struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
 489 
 490         /* WARNING: Only wr_cqe and status are reliable at this point */
 491         trace_xprtrdma_wc_li(wc, frwr);
 492         __frwr_release_mr(wc, mr);
 493 }
 494 
 495 /**
 496  * frwr_wc_localinv_wake - Invoked by RDMA provider for a LOCAL_INV WC
 497  * @cq: completion queue (ignored)
 498  * @wc: completed WR
 499  *
 500  * Awaken anyone waiting for an MR to finish being fenced.
 501  */
 502 static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
 503 {
 504         struct ib_cqe *cqe = wc->wr_cqe;
 505         struct rpcrdma_frwr *frwr =
 506                 container_of(cqe, struct rpcrdma_frwr, fr_cqe);
 507         struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
 508 
 509         /* WARNING: Only wr_cqe and status are reliable at this point */
 510         trace_xprtrdma_wc_li_wake(wc, frwr);
 511         __frwr_release_mr(wc, mr);
 512         complete(&frwr->fr_linv_done);
 513 }
 514 
 515 /**
 516  * frwr_unmap_sync - invalidate memory regions that were registered for @req
 517  * @r_xprt: controlling transport instance
 518  * @req: rpcrdma_req with a non-empty list of MRs to process
 519  *
 520  * Sleeps until it is safe for the host CPU to access the previously mapped
 521  * memory regions. This guarantees that registered MRs are properly fenced
 522  * from the server before the RPC consumer accesses the data in them. It
 523  * also ensures proper Send flow control: waking the next RPC waits until
 524  * this RPC has relinquished all its Send Queue entries.
 525  */
 526 void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 527 {
 528         struct ib_send_wr *first, **prev, *last;
 529         const struct ib_send_wr *bad_wr;
 530         struct rpcrdma_frwr *frwr;
 531         struct rpcrdma_mr *mr;
 532         int rc;
 533 
 534         /* ORDER: Invalidate all of the MRs first
 535          *
 536          * Chain the LOCAL_INV Work Requests and post them with
 537          * a single ib_post_send() call.
 538          */
 539         frwr = NULL;
 540         prev = &first;
 541         while ((mr = rpcrdma_mr_pop(&req->rl_registered))) {
 542 
 543                 trace_xprtrdma_mr_localinv(mr);
 544                 r_xprt->rx_stats.local_inv_needed++;
 545 
 546                 frwr = &mr->frwr;
 547                 frwr->fr_cqe.done = frwr_wc_localinv;
 548                 last = &frwr->fr_invwr;
 549                 last->next = NULL;
 550                 last->wr_cqe = &frwr->fr_cqe;
 551                 last->sg_list = NULL;
 552                 last->num_sge = 0;
 553                 last->opcode = IB_WR_LOCAL_INV;
 554                 last->send_flags = IB_SEND_SIGNALED;
 555                 last->ex.invalidate_rkey = mr->mr_handle;
 556 
 557                 *prev = last;
 558                 prev = &last->next;
 559         }
 560 
 561         /* Strong send queue ordering guarantees that when the
 562          * last WR in the chain completes, all WRs in the chain
 563          * are complete.
 564          */
 565         frwr->fr_cqe.done = frwr_wc_localinv_wake;
 566         reinit_completion(&frwr->fr_linv_done);
 567 
 568         /* Transport disconnect drains the receive CQ before it
 569          * replaces the QP. The RPC reply handler won't call us
 570          * unless ri_id->qp is a valid pointer.
 571          */
 572         bad_wr = NULL;
 573         rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
 574 
 575         /* The final LOCAL_INV WR in the chain is supposed to
 576          * do the wake. If it was never posted, the wake will
 577          * not happen, so don't wait in that case.
 578          */
 579         if (bad_wr != first)
 580                 wait_for_completion(&frwr->fr_linv_done);
 581         if (!rc)
 582                 return;
 583 
 584         /* Recycle MRs in the LOCAL_INV chain that did not get posted.
 585          */
 586         trace_xprtrdma_post_linv(req, rc);
 587         while (bad_wr) {
 588                 frwr = container_of(bad_wr, struct rpcrdma_frwr,
 589                                     fr_invwr);
 590                 mr = container_of(frwr, struct rpcrdma_mr, frwr);
 591                 bad_wr = bad_wr->next;
 592 
 593                 list_del_init(&mr->mr_list);
 594                 rpcrdma_mr_recycle(mr);
 595         }
 596 }
 597 
 598 /**
 599  * frwr_wc_localinv_done - Invoked by RDMA provider for a signaled LOCAL_INV WC
 600  * @cq: completion queue (ignored)
 601  * @wc: completed WR
 602  *
 603  */
 604 static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc)
 605 {
 606         struct ib_cqe *cqe = wc->wr_cqe;
 607         struct rpcrdma_frwr *frwr =
 608                 container_of(cqe, struct rpcrdma_frwr, fr_cqe);
 609         struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
 610         struct rpcrdma_rep *rep = mr->mr_req->rl_reply;
 611 
 612         /* WARNING: Only wr_cqe and status are reliable at this point */
 613         trace_xprtrdma_wc_li_done(wc, frwr);
 614         __frwr_release_mr(wc, mr);
 615 
 616         /* Ensure @rep is generated before __frwr_release_mr */
 617         smp_rmb();
 618         rpcrdma_complete_rqst(rep);
 619 }
 620 
 621 /**
 622  * frwr_unmap_async - invalidate memory regions that were registered for @req
 623  * @r_xprt: controlling transport instance
 624  * @req: rpcrdma_req with a non-empty list of MRs to process
 625  *
 626  * This guarantees that registered MRs are properly fenced from the
 627  * server before the RPC consumer accesses the data in them. It also
 628  * ensures proper Send flow control: waking the next RPC waits until
 629  * this RPC has relinquished all its Send Queue entries.
 630  */
 631 void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 632 {
 633         struct ib_send_wr *first, *last, **prev;
 634         const struct ib_send_wr *bad_wr;
 635         struct rpcrdma_frwr *frwr;
 636         struct rpcrdma_mr *mr;
 637         int rc;
 638 
 639         /* Chain the LOCAL_INV Work Requests and post them with
 640          * a single ib_post_send() call.
 641          */
 642         frwr = NULL;
 643         prev = &first;
 644         while ((mr = rpcrdma_mr_pop(&req->rl_registered))) {
 645 
 646                 trace_xprtrdma_mr_localinv(mr);
 647                 r_xprt->rx_stats.local_inv_needed++;
 648 
 649                 frwr = &mr->frwr;
 650                 frwr->fr_cqe.done = frwr_wc_localinv;
 651                 last = &frwr->fr_invwr;
 652                 last->next = NULL;
 653                 last->wr_cqe = &frwr->fr_cqe;
 654                 last->sg_list = NULL;
 655                 last->num_sge = 0;
 656                 last->opcode = IB_WR_LOCAL_INV;
 657                 last->send_flags = IB_SEND_SIGNALED;
 658                 last->ex.invalidate_rkey = mr->mr_handle;
 659 
 660                 *prev = last;
 661                 prev = &last->next;
 662         }
 663 
 664         /* Strong send queue ordering guarantees that when the
 665          * last WR in the chain completes, all WRs in the chain
 666          * are complete. The last completion will wake up the
 667          * RPC waiter.
 668          */
 669         frwr->fr_cqe.done = frwr_wc_localinv_done;
 670 
 671         /* Transport disconnect drains the receive CQ before it
 672          * replaces the QP. The RPC reply handler won't call us
 673          * unless ri_id->qp is a valid pointer.
 674          */
 675         bad_wr = NULL;
 676         rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
 677         if (!rc)
 678                 return;
 679 
 680         /* Recycle MRs in the LOCAL_INV chain that did not get posted.
 681          */
 682         trace_xprtrdma_post_linv(req, rc);
 683         while (bad_wr) {
 684                 frwr = container_of(bad_wr, struct rpcrdma_frwr, fr_invwr);
 685                 mr = container_of(frwr, struct rpcrdma_mr, frwr);
 686                 bad_wr = bad_wr->next;
 687 
 688                 rpcrdma_mr_recycle(mr);
 689         }
 690 
 691         /* The final LOCAL_INV WR in the chain is supposed to
 692          * do the wake. If it was never posted, the wake will
 693          * not happen, so wake here in that case.
 694          */
 695         rpcrdma_complete_rqst(req->rl_reply);
 696 }

/* [<][>][^][v][top][bottom][index][help] */