root/net/sunrpc/xprtrdma/rpc_rdma.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. rpcrdma_max_call_header_size
  2. rpcrdma_max_reply_header_size
  3. rpcrdma_set_max_header_sizes
  4. rpcrdma_args_inline
  5. rpcrdma_results_inline
  6. rpcrdma_nonpayload_inline
  7. rpcrdma_convert_kvec
  8. rpcrdma_convert_iovs
  9. encode_item_present
  10. encode_item_not_present
  11. xdr_encode_rdma_segment
  12. encode_rdma_segment
  13. encode_read_segment
  14. rpcrdma_mr_prepare
  15. rpcrdma_encode_read_list
  16. rpcrdma_encode_write_list
  17. rpcrdma_encode_reply_chunk
  18. rpcrdma_sendctx_done
  19. rpcrdma_sendctx_unmap
  20. rpcrdma_prepare_hdr_sge
  21. rpcrdma_prepare_msg_sges
  22. rpcrdma_prepare_send_sges
  23. rpcrdma_marshal_req
  24. rpcrdma_inline_fixup
  25. rpcrdma_is_bcall
  26. decode_rdma_segment
  27. decode_write_chunk
  28. decode_read_list
  29. decode_write_list
  30. decode_reply_chunk
  31. rpcrdma_decode_msg
  32. rpcrdma_decode_nomsg
  33. rpcrdma_decode_error
  34. rpcrdma_complete_rqst
  35. rpcrdma_reply_done
  36. rpcrdma_reply_handler

   1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
   2 /*
   3  * Copyright (c) 2014-2017 Oracle.  All rights reserved.
   4  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
   5  *
   6  * This software is available to you under a choice of one of two
   7  * licenses.  You may choose to be licensed under the terms of the GNU
   8  * General Public License (GPL) Version 2, available from the file
   9  * COPYING in the main directory of this source tree, or the BSD-type
  10  * license below:
  11  *
  12  * Redistribution and use in source and binary forms, with or without
  13  * modification, are permitted provided that the following conditions
  14  * are met:
  15  *
  16  *      Redistributions of source code must retain the above copyright
  17  *      notice, this list of conditions and the following disclaimer.
  18  *
  19  *      Redistributions in binary form must reproduce the above
  20  *      copyright notice, this list of conditions and the following
  21  *      disclaimer in the documentation and/or other materials provided
  22  *      with the distribution.
  23  *
  24  *      Neither the name of the Network Appliance, Inc. nor the names of
  25  *      its contributors may be used to endorse or promote products
  26  *      derived from this software without specific prior written
  27  *      permission.
  28  *
  29  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  30  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  31  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  32  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  33  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  34  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  35  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  36  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  37  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  38  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  39  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  40  */
  41 
  42 /*
  43  * rpc_rdma.c
  44  *
  45  * This file contains the guts of the RPC RDMA protocol, and
  46  * does marshaling/unmarshaling, etc. It is also where interfacing
  47  * to the Linux RPC framework lives.
  48  */
  49 
  50 #include <linux/highmem.h>
  51 
  52 #include <linux/sunrpc/svc_rdma.h>
  53 
  54 #include "xprt_rdma.h"
  55 #include <trace/events/rpcrdma.h>
  56 
  57 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  58 # define RPCDBG_FACILITY        RPCDBG_TRANS
  59 #endif
  60 
  61 /* Returns size of largest RPC-over-RDMA header in a Call message
  62  *
  63  * The largest Call header contains a full-size Read list and a
  64  * minimal Reply chunk.
  65  */
  66 static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs)
  67 {
  68         unsigned int size;
  69 
  70         /* Fixed header fields and list discriminators */
  71         size = RPCRDMA_HDRLEN_MIN;
  72 
  73         /* Maximum Read list size */
  74         size = maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32);
  75 
  76         /* Minimal Read chunk size */
  77         size += sizeof(__be32); /* segment count */
  78         size += rpcrdma_segment_maxsz * sizeof(__be32);
  79         size += sizeof(__be32); /* list discriminator */
  80 
  81         dprintk("RPC:       %s: max call header size = %u\n",
  82                 __func__, size);
  83         return size;
  84 }
  85 
  86 /* Returns size of largest RPC-over-RDMA header in a Reply message
  87  *
  88  * There is only one Write list or one Reply chunk per Reply
  89  * message.  The larger list is the Write list.
  90  */
  91 static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
  92 {
  93         unsigned int size;
  94 
  95         /* Fixed header fields and list discriminators */
  96         size = RPCRDMA_HDRLEN_MIN;
  97 
  98         /* Maximum Write list size */
  99         size = sizeof(__be32);          /* segment count */
 100         size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32);
 101         size += sizeof(__be32); /* list discriminator */
 102 
 103         dprintk("RPC:       %s: max reply header size = %u\n",
 104                 __func__, size);
 105         return size;
 106 }
 107 
 108 /**
 109  * rpcrdma_set_max_header_sizes - Initialize inline payload sizes
 110  * @r_xprt: transport instance to initialize
 111  *
 112  * The max_inline fields contain the maximum size of an RPC message
 113  * so the marshaling code doesn't have to repeat this calculation
 114  * for every RPC.
 115  */
 116 void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *r_xprt)
 117 {
 118         unsigned int maxsegs = r_xprt->rx_ia.ri_max_segs;
 119         struct rpcrdma_ep *ep = &r_xprt->rx_ep;
 120 
 121         ep->rep_max_inline_send =
 122                 ep->rep_inline_send - rpcrdma_max_call_header_size(maxsegs);
 123         ep->rep_max_inline_recv =
 124                 ep->rep_inline_recv - rpcrdma_max_reply_header_size(maxsegs);
 125 }
 126 
 127 /* The client can send a request inline as long as the RPCRDMA header
 128  * plus the RPC call fit under the transport's inline limit. If the
 129  * combined call message size exceeds that limit, the client must use
 130  * a Read chunk for this operation.
 131  *
 132  * A Read chunk is also required if sending the RPC call inline would
 133  * exceed this device's max_sge limit.
 134  */
 135 static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt,
 136                                 struct rpc_rqst *rqst)
 137 {
 138         struct xdr_buf *xdr = &rqst->rq_snd_buf;
 139         unsigned int count, remaining, offset;
 140 
 141         if (xdr->len > r_xprt->rx_ep.rep_max_inline_send)
 142                 return false;
 143 
 144         if (xdr->page_len) {
 145                 remaining = xdr->page_len;
 146                 offset = offset_in_page(xdr->page_base);
 147                 count = RPCRDMA_MIN_SEND_SGES;
 148                 while (remaining) {
 149                         remaining -= min_t(unsigned int,
 150                                            PAGE_SIZE - offset, remaining);
 151                         offset = 0;
 152                         if (++count > r_xprt->rx_ia.ri_max_send_sges)
 153                                 return false;
 154                 }
 155         }
 156 
 157         return true;
 158 }
 159 
 160 /* The client can't know how large the actual reply will be. Thus it
 161  * plans for the largest possible reply for that particular ULP
 162  * operation. If the maximum combined reply message size exceeds that
 163  * limit, the client must provide a write list or a reply chunk for
 164  * this request.
 165  */
 166 static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 167                                    struct rpc_rqst *rqst)
 168 {
 169         return rqst->rq_rcv_buf.buflen <= r_xprt->rx_ep.rep_max_inline_recv;
 170 }
 171 
 172 /* The client is required to provide a Reply chunk if the maximum
 173  * size of the non-payload part of the RPC Reply is larger than
 174  * the inline threshold.
 175  */
 176 static bool
 177 rpcrdma_nonpayload_inline(const struct rpcrdma_xprt *r_xprt,
 178                           const struct rpc_rqst *rqst)
 179 {
 180         const struct xdr_buf *buf = &rqst->rq_rcv_buf;
 181 
 182         return (buf->head[0].iov_len + buf->tail[0].iov_len) <
 183                 r_xprt->rx_ep.rep_max_inline_recv;
 184 }
 185 
 186 /* Split @vec on page boundaries into SGEs. FMR registers pages, not
 187  * a byte range. Other modes coalesce these SGEs into a single MR
 188  * when they can.
 189  *
 190  * Returns pointer to next available SGE, and bumps the total number
 191  * of SGEs consumed.
 192  */
 193 static struct rpcrdma_mr_seg *
 194 rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
 195                      unsigned int *n)
 196 {
 197         u32 remaining, page_offset;
 198         char *base;
 199 
 200         base = vec->iov_base;
 201         page_offset = offset_in_page(base);
 202         remaining = vec->iov_len;
 203         while (remaining) {
 204                 seg->mr_page = NULL;
 205                 seg->mr_offset = base;
 206                 seg->mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining);
 207                 remaining -= seg->mr_len;
 208                 base += seg->mr_len;
 209                 ++seg;
 210                 ++(*n);
 211                 page_offset = 0;
 212         }
 213         return seg;
 214 }
 215 
 216 /* Convert @xdrbuf into SGEs no larger than a page each. As they
 217  * are registered, these SGEs are then coalesced into RDMA segments
 218  * when the selected memreg mode supports it.
 219  *
 220  * Returns positive number of SGEs consumed, or a negative errno.
 221  */
 222 
 223 static int
 224 rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf,
 225                      unsigned int pos, enum rpcrdma_chunktype type,
 226                      struct rpcrdma_mr_seg *seg)
 227 {
 228         unsigned long page_base;
 229         unsigned int len, n;
 230         struct page **ppages;
 231 
 232         n = 0;
 233         if (pos == 0)
 234                 seg = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, &n);
 235 
 236         len = xdrbuf->page_len;
 237         ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
 238         page_base = offset_in_page(xdrbuf->page_base);
 239         while (len) {
 240                 /* ACL likes to be lazy in allocating pages - ACLs
 241                  * are small by default but can get huge.
 242                  */
 243                 if (unlikely(xdrbuf->flags & XDRBUF_SPARSE_PAGES)) {
 244                         if (!*ppages)
 245                                 *ppages = alloc_page(GFP_NOWAIT | __GFP_NOWARN);
 246                         if (!*ppages)
 247                                 return -ENOBUFS;
 248                 }
 249                 seg->mr_page = *ppages;
 250                 seg->mr_offset = (char *)page_base;
 251                 seg->mr_len = min_t(u32, PAGE_SIZE - page_base, len);
 252                 len -= seg->mr_len;
 253                 ++ppages;
 254                 ++seg;
 255                 ++n;
 256                 page_base = 0;
 257         }
 258 
 259         /* When encoding a Read chunk, the tail iovec contains an
 260          * XDR pad and may be omitted.
 261          */
 262         if (type == rpcrdma_readch && r_xprt->rx_ia.ri_implicit_roundup)
 263                 goto out;
 264 
 265         /* When encoding a Write chunk, some servers need to see an
 266          * extra segment for non-XDR-aligned Write chunks. The upper
 267          * layer provides space in the tail iovec that may be used
 268          * for this purpose.
 269          */
 270         if (type == rpcrdma_writech && r_xprt->rx_ia.ri_implicit_roundup)
 271                 goto out;
 272 
 273         if (xdrbuf->tail[0].iov_len)
 274                 seg = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, &n);
 275 
 276 out:
 277         if (unlikely(n > RPCRDMA_MAX_SEGS))
 278                 return -EIO;
 279         return n;
 280 }
 281 
 282 static inline int
 283 encode_item_present(struct xdr_stream *xdr)
 284 {
 285         __be32 *p;
 286 
 287         p = xdr_reserve_space(xdr, sizeof(*p));
 288         if (unlikely(!p))
 289                 return -EMSGSIZE;
 290 
 291         *p = xdr_one;
 292         return 0;
 293 }
 294 
 295 static inline int
 296 encode_item_not_present(struct xdr_stream *xdr)
 297 {
 298         __be32 *p;
 299 
 300         p = xdr_reserve_space(xdr, sizeof(*p));
 301         if (unlikely(!p))
 302                 return -EMSGSIZE;
 303 
 304         *p = xdr_zero;
 305         return 0;
 306 }
 307 
 308 static void
 309 xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mr *mr)
 310 {
 311         *iptr++ = cpu_to_be32(mr->mr_handle);
 312         *iptr++ = cpu_to_be32(mr->mr_length);
 313         xdr_encode_hyper(iptr, mr->mr_offset);
 314 }
 315 
 316 static int
 317 encode_rdma_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr)
 318 {
 319         __be32 *p;
 320 
 321         p = xdr_reserve_space(xdr, 4 * sizeof(*p));
 322         if (unlikely(!p))
 323                 return -EMSGSIZE;
 324 
 325         xdr_encode_rdma_segment(p, mr);
 326         return 0;
 327 }
 328 
 329 static int
 330 encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr,
 331                     u32 position)
 332 {
 333         __be32 *p;
 334 
 335         p = xdr_reserve_space(xdr, 6 * sizeof(*p));
 336         if (unlikely(!p))
 337                 return -EMSGSIZE;
 338 
 339         *p++ = xdr_one;                 /* Item present */
 340         *p++ = cpu_to_be32(position);
 341         xdr_encode_rdma_segment(p, mr);
 342         return 0;
 343 }
 344 
 345 static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt,
 346                                                  struct rpcrdma_req *req,
 347                                                  struct rpcrdma_mr_seg *seg,
 348                                                  int nsegs, bool writing,
 349                                                  struct rpcrdma_mr **mr)
 350 {
 351         *mr = rpcrdma_mr_pop(&req->rl_free_mrs);
 352         if (!*mr) {
 353                 *mr = rpcrdma_mr_get(r_xprt);
 354                 if (!*mr)
 355                         goto out_getmr_err;
 356                 trace_xprtrdma_mr_get(req);
 357                 (*mr)->mr_req = req;
 358         }
 359 
 360         rpcrdma_mr_push(*mr, &req->rl_registered);
 361         return frwr_map(r_xprt, seg, nsegs, writing, req->rl_slot.rq_xid, *mr);
 362 
 363 out_getmr_err:
 364         trace_xprtrdma_nomrs(req);
 365         xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
 366         if (r_xprt->rx_ep.rep_connected != -ENODEV)
 367                 schedule_work(&r_xprt->rx_buf.rb_refresh_worker);
 368         return ERR_PTR(-EAGAIN);
 369 }
 370 
 371 /* Register and XDR encode the Read list. Supports encoding a list of read
 372  * segments that belong to a single read chunk.
 373  *
 374  * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
 375  *
 376  *  Read chunklist (a linked list):
 377  *   N elements, position P (same P for all chunks of same arg!):
 378  *    1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
 379  *
 380  * Returns zero on success, or a negative errno if a failure occurred.
 381  * @xdr is advanced to the next position in the stream.
 382  *
 383  * Only a single @pos value is currently supported.
 384  */
 385 static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
 386                                     struct rpcrdma_req *req,
 387                                     struct rpc_rqst *rqst,
 388                                     enum rpcrdma_chunktype rtype)
 389 {
 390         struct xdr_stream *xdr = &req->rl_stream;
 391         struct rpcrdma_mr_seg *seg;
 392         struct rpcrdma_mr *mr;
 393         unsigned int pos;
 394         int nsegs;
 395 
 396         if (rtype == rpcrdma_noch)
 397                 goto done;
 398 
 399         pos = rqst->rq_snd_buf.head[0].iov_len;
 400         if (rtype == rpcrdma_areadch)
 401                 pos = 0;
 402         seg = req->rl_segments;
 403         nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_snd_buf, pos,
 404                                      rtype, seg);
 405         if (nsegs < 0)
 406                 return nsegs;
 407 
 408         do {
 409                 seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, false, &mr);
 410                 if (IS_ERR(seg))
 411                         return PTR_ERR(seg);
 412 
 413                 if (encode_read_segment(xdr, mr, pos) < 0)
 414                         return -EMSGSIZE;
 415 
 416                 trace_xprtrdma_chunk_read(rqst->rq_task, pos, mr, nsegs);
 417                 r_xprt->rx_stats.read_chunk_count++;
 418                 nsegs -= mr->mr_nents;
 419         } while (nsegs);
 420 
 421 done:
 422         return encode_item_not_present(xdr);
 423 }
 424 
 425 /* Register and XDR encode the Write list. Supports encoding a list
 426  * containing one array of plain segments that belong to a single
 427  * write chunk.
 428  *
 429  * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
 430  *
 431  *  Write chunklist (a list of (one) counted array):
 432  *   N elements:
 433  *    1 - N - HLOO - HLOO - ... - HLOO - 0
 434  *
 435  * Returns zero on success, or a negative errno if a failure occurred.
 436  * @xdr is advanced to the next position in the stream.
 437  *
 438  * Only a single Write chunk is currently supported.
 439  */
 440 static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt,
 441                                      struct rpcrdma_req *req,
 442                                      struct rpc_rqst *rqst,
 443                                      enum rpcrdma_chunktype wtype)
 444 {
 445         struct xdr_stream *xdr = &req->rl_stream;
 446         struct rpcrdma_mr_seg *seg;
 447         struct rpcrdma_mr *mr;
 448         int nsegs, nchunks;
 449         __be32 *segcount;
 450 
 451         if (wtype != rpcrdma_writech)
 452                 goto done;
 453 
 454         seg = req->rl_segments;
 455         nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf,
 456                                      rqst->rq_rcv_buf.head[0].iov_len,
 457                                      wtype, seg);
 458         if (nsegs < 0)
 459                 return nsegs;
 460 
 461         if (encode_item_present(xdr) < 0)
 462                 return -EMSGSIZE;
 463         segcount = xdr_reserve_space(xdr, sizeof(*segcount));
 464         if (unlikely(!segcount))
 465                 return -EMSGSIZE;
 466         /* Actual value encoded below */
 467 
 468         nchunks = 0;
 469         do {
 470                 seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr);
 471                 if (IS_ERR(seg))
 472                         return PTR_ERR(seg);
 473 
 474                 if (encode_rdma_segment(xdr, mr) < 0)
 475                         return -EMSGSIZE;
 476 
 477                 trace_xprtrdma_chunk_write(rqst->rq_task, mr, nsegs);
 478                 r_xprt->rx_stats.write_chunk_count++;
 479                 r_xprt->rx_stats.total_rdma_request += mr->mr_length;
 480                 nchunks++;
 481                 nsegs -= mr->mr_nents;
 482         } while (nsegs);
 483 
 484         /* Update count of segments in this Write chunk */
 485         *segcount = cpu_to_be32(nchunks);
 486 
 487 done:
 488         return encode_item_not_present(xdr);
 489 }
 490 
 491 /* Register and XDR encode the Reply chunk. Supports encoding an array
 492  * of plain segments that belong to a single write (reply) chunk.
 493  *
 494  * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
 495  *
 496  *  Reply chunk (a counted array):
 497  *   N elements:
 498  *    1 - N - HLOO - HLOO - ... - HLOO
 499  *
 500  * Returns zero on success, or a negative errno if a failure occurred.
 501  * @xdr is advanced to the next position in the stream.
 502  */
 503 static int rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
 504                                       struct rpcrdma_req *req,
 505                                       struct rpc_rqst *rqst,
 506                                       enum rpcrdma_chunktype wtype)
 507 {
 508         struct xdr_stream *xdr = &req->rl_stream;
 509         struct rpcrdma_mr_seg *seg;
 510         struct rpcrdma_mr *mr;
 511         int nsegs, nchunks;
 512         __be32 *segcount;
 513 
 514         if (wtype != rpcrdma_replych)
 515                 return encode_item_not_present(xdr);
 516 
 517         seg = req->rl_segments;
 518         nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg);
 519         if (nsegs < 0)
 520                 return nsegs;
 521 
 522         if (encode_item_present(xdr) < 0)
 523                 return -EMSGSIZE;
 524         segcount = xdr_reserve_space(xdr, sizeof(*segcount));
 525         if (unlikely(!segcount))
 526                 return -EMSGSIZE;
 527         /* Actual value encoded below */
 528 
 529         nchunks = 0;
 530         do {
 531                 seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr);
 532                 if (IS_ERR(seg))
 533                         return PTR_ERR(seg);
 534 
 535                 if (encode_rdma_segment(xdr, mr) < 0)
 536                         return -EMSGSIZE;
 537 
 538                 trace_xprtrdma_chunk_reply(rqst->rq_task, mr, nsegs);
 539                 r_xprt->rx_stats.reply_chunk_count++;
 540                 r_xprt->rx_stats.total_rdma_request += mr->mr_length;
 541                 nchunks++;
 542                 nsegs -= mr->mr_nents;
 543         } while (nsegs);
 544 
 545         /* Update count of segments in the Reply chunk */
 546         *segcount = cpu_to_be32(nchunks);
 547 
 548         return 0;
 549 }
 550 
 551 static void rpcrdma_sendctx_done(struct kref *kref)
 552 {
 553         struct rpcrdma_req *req =
 554                 container_of(kref, struct rpcrdma_req, rl_kref);
 555         struct rpcrdma_rep *rep = req->rl_reply;
 556 
 557         rpcrdma_complete_rqst(rep);
 558         rep->rr_rxprt->rx_stats.reply_waits_for_send++;
 559 }
 560 
 561 /**
 562  * rpcrdma_sendctx_unmap - DMA-unmap Send buffer
 563  * @sc: sendctx containing SGEs to unmap
 564  *
 565  */
 566 void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
 567 {
 568         struct ib_sge *sge;
 569 
 570         if (!sc->sc_unmap_count)
 571                 return;
 572 
 573         /* The first two SGEs contain the transport header and
 574          * the inline buffer. These are always left mapped so
 575          * they can be cheaply re-used.
 576          */
 577         for (sge = &sc->sc_sges[2]; sc->sc_unmap_count;
 578              ++sge, --sc->sc_unmap_count)
 579                 ib_dma_unmap_page(sc->sc_device, sge->addr, sge->length,
 580                                   DMA_TO_DEVICE);
 581 
 582         kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done);
 583 }
 584 
 585 /* Prepare an SGE for the RPC-over-RDMA transport header.
 586  */
 587 static bool rpcrdma_prepare_hdr_sge(struct rpcrdma_xprt *r_xprt,
 588                                     struct rpcrdma_req *req, u32 len)
 589 {
 590         struct rpcrdma_sendctx *sc = req->rl_sendctx;
 591         struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
 592         struct ib_sge *sge = sc->sc_sges;
 593 
 594         if (!rpcrdma_regbuf_dma_map(r_xprt, rb))
 595                 goto out_regbuf;
 596         sge->addr = rdmab_addr(rb);
 597         sge->length = len;
 598         sge->lkey = rdmab_lkey(rb);
 599 
 600         ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length,
 601                                       DMA_TO_DEVICE);
 602         sc->sc_wr.num_sge++;
 603         return true;
 604 
 605 out_regbuf:
 606         pr_err("rpcrdma: failed to DMA map a Send buffer\n");
 607         return false;
 608 }
 609 
 610 /* Prepare the Send SGEs. The head and tail iovec, and each entry
 611  * in the page list, gets its own SGE.
 612  */
 613 static bool rpcrdma_prepare_msg_sges(struct rpcrdma_xprt *r_xprt,
 614                                      struct rpcrdma_req *req,
 615                                      struct xdr_buf *xdr,
 616                                      enum rpcrdma_chunktype rtype)
 617 {
 618         struct rpcrdma_sendctx *sc = req->rl_sendctx;
 619         unsigned int sge_no, page_base, len, remaining;
 620         struct rpcrdma_regbuf *rb = req->rl_sendbuf;
 621         struct ib_sge *sge = sc->sc_sges;
 622         struct page *page, **ppages;
 623 
 624         /* The head iovec is straightforward, as it is already
 625          * DMA-mapped. Sync the content that has changed.
 626          */
 627         if (!rpcrdma_regbuf_dma_map(r_xprt, rb))
 628                 goto out_regbuf;
 629         sc->sc_device = rdmab_device(rb);
 630         sge_no = 1;
 631         sge[sge_no].addr = rdmab_addr(rb);
 632         sge[sge_no].length = xdr->head[0].iov_len;
 633         sge[sge_no].lkey = rdmab_lkey(rb);
 634         ib_dma_sync_single_for_device(rdmab_device(rb), sge[sge_no].addr,
 635                                       sge[sge_no].length, DMA_TO_DEVICE);
 636 
 637         /* If there is a Read chunk, the page list is being handled
 638          * via explicit RDMA, and thus is skipped here. However, the
 639          * tail iovec may include an XDR pad for the page list, as
 640          * well as additional content, and may not reside in the
 641          * same page as the head iovec.
 642          */
 643         if (rtype == rpcrdma_readch) {
 644                 len = xdr->tail[0].iov_len;
 645 
 646                 /* Do not include the tail if it is only an XDR pad */
 647                 if (len < 4)
 648                         goto out;
 649 
 650                 page = virt_to_page(xdr->tail[0].iov_base);
 651                 page_base = offset_in_page(xdr->tail[0].iov_base);
 652 
 653                 /* If the content in the page list is an odd length,
 654                  * xdr_write_pages() has added a pad at the beginning
 655                  * of the tail iovec. Force the tail's non-pad content
 656                  * to land at the next XDR position in the Send message.
 657                  */
 658                 page_base += len & 3;
 659                 len -= len & 3;
 660                 goto map_tail;
 661         }
 662 
 663         /* If there is a page list present, temporarily DMA map
 664          * and prepare an SGE for each page to be sent.
 665          */
 666         if (xdr->page_len) {
 667                 ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
 668                 page_base = offset_in_page(xdr->page_base);
 669                 remaining = xdr->page_len;
 670                 while (remaining) {
 671                         sge_no++;
 672                         if (sge_no > RPCRDMA_MAX_SEND_SGES - 2)
 673                                 goto out_mapping_overflow;
 674 
 675                         len = min_t(u32, PAGE_SIZE - page_base, remaining);
 676                         sge[sge_no].addr =
 677                                 ib_dma_map_page(rdmab_device(rb), *ppages,
 678                                                 page_base, len, DMA_TO_DEVICE);
 679                         if (ib_dma_mapping_error(rdmab_device(rb),
 680                                                  sge[sge_no].addr))
 681                                 goto out_mapping_err;
 682                         sge[sge_no].length = len;
 683                         sge[sge_no].lkey = rdmab_lkey(rb);
 684 
 685                         sc->sc_unmap_count++;
 686                         ppages++;
 687                         remaining -= len;
 688                         page_base = 0;
 689                 }
 690         }
 691 
 692         /* The tail iovec is not always constructed in the same
 693          * page where the head iovec resides (see, for example,
 694          * gss_wrap_req_priv). To neatly accommodate that case,
 695          * DMA map it separately.
 696          */
 697         if (xdr->tail[0].iov_len) {
 698                 page = virt_to_page(xdr->tail[0].iov_base);
 699                 page_base = offset_in_page(xdr->tail[0].iov_base);
 700                 len = xdr->tail[0].iov_len;
 701 
 702 map_tail:
 703                 sge_no++;
 704                 sge[sge_no].addr =
 705                         ib_dma_map_page(rdmab_device(rb), page, page_base, len,
 706                                         DMA_TO_DEVICE);
 707                 if (ib_dma_mapping_error(rdmab_device(rb), sge[sge_no].addr))
 708                         goto out_mapping_err;
 709                 sge[sge_no].length = len;
 710                 sge[sge_no].lkey = rdmab_lkey(rb);
 711                 sc->sc_unmap_count++;
 712         }
 713 
 714 out:
 715         sc->sc_wr.num_sge += sge_no;
 716         if (sc->sc_unmap_count)
 717                 kref_get(&req->rl_kref);
 718         return true;
 719 
 720 out_regbuf:
 721         pr_err("rpcrdma: failed to DMA map a Send buffer\n");
 722         return false;
 723 
 724 out_mapping_overflow:
 725         rpcrdma_sendctx_unmap(sc);
 726         pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no);
 727         return false;
 728 
 729 out_mapping_err:
 730         rpcrdma_sendctx_unmap(sc);
 731         trace_xprtrdma_dma_maperr(sge[sge_no].addr);
 732         return false;
 733 }
 734 
 735 /**
 736  * rpcrdma_prepare_send_sges - Construct SGEs for a Send WR
 737  * @r_xprt: controlling transport
 738  * @req: context of RPC Call being marshalled
 739  * @hdrlen: size of transport header, in bytes
 740  * @xdr: xdr_buf containing RPC Call
 741  * @rtype: chunk type being encoded
 742  *
 743  * Returns 0 on success; otherwise a negative errno is returned.
 744  */
 745 int
 746 rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
 747                           struct rpcrdma_req *req, u32 hdrlen,
 748                           struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
 749 {
 750         int ret;
 751 
 752         ret = -EAGAIN;
 753         req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt);
 754         if (!req->rl_sendctx)
 755                 goto err;
 756         req->rl_sendctx->sc_wr.num_sge = 0;
 757         req->rl_sendctx->sc_unmap_count = 0;
 758         req->rl_sendctx->sc_req = req;
 759         kref_init(&req->rl_kref);
 760 
 761         ret = -EIO;
 762         if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen))
 763                 goto err;
 764         if (rtype != rpcrdma_areadch)
 765                 if (!rpcrdma_prepare_msg_sges(r_xprt, req, xdr, rtype))
 766                         goto err;
 767         return 0;
 768 
 769 err:
 770         trace_xprtrdma_prepsend_failed(&req->rl_slot, ret);
 771         return ret;
 772 }
 773 
 774 /**
 775  * rpcrdma_marshal_req - Marshal and send one RPC request
 776  * @r_xprt: controlling transport
 777  * @rqst: RPC request to be marshaled
 778  *
 779  * For the RPC in "rqst", this function:
 780  *  - Chooses the transfer mode (eg., RDMA_MSG or RDMA_NOMSG)
 781  *  - Registers Read, Write, and Reply chunks
 782  *  - Constructs the transport header
 783  *  - Posts a Send WR to send the transport header and request
 784  *
 785  * Returns:
 786  *      %0 if the RPC was sent successfully,
 787  *      %-ENOTCONN if the connection was lost,
 788  *      %-EAGAIN if the caller should call again with the same arguments,
 789  *      %-ENOBUFS if the caller should call again after a delay,
 790  *      %-EMSGSIZE if the transport header is too small,
 791  *      %-EIO if a permanent problem occurred while marshaling.
 792  */
 793 int
 794 rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
 795 {
 796         struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 797         struct xdr_stream *xdr = &req->rl_stream;
 798         enum rpcrdma_chunktype rtype, wtype;
 799         bool ddp_allowed;
 800         __be32 *p;
 801         int ret;
 802 
 803         rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
 804         xdr_init_encode(xdr, &req->rl_hdrbuf, rdmab_data(req->rl_rdmabuf),
 805                         rqst);
 806 
 807         /* Fixed header fields */
 808         ret = -EMSGSIZE;
 809         p = xdr_reserve_space(xdr, 4 * sizeof(*p));
 810         if (!p)
 811                 goto out_err;
 812         *p++ = rqst->rq_xid;
 813         *p++ = rpcrdma_version;
 814         *p++ = cpu_to_be32(r_xprt->rx_buf.rb_max_requests);
 815 
 816         /* When the ULP employs a GSS flavor that guarantees integrity
 817          * or privacy, direct data placement of individual data items
 818          * is not allowed.
 819          */
 820         ddp_allowed = !(rqst->rq_cred->cr_auth->au_flags &
 821                                                 RPCAUTH_AUTH_DATATOUCH);
 822 
 823         /*
 824          * Chunks needed for results?
 825          *
 826          * o If the expected result is under the inline threshold, all ops
 827          *   return as inline.
 828          * o Large read ops return data as write chunk(s), header as
 829          *   inline.
 830          * o Large non-read ops return as a single reply chunk.
 831          */
 832         if (rpcrdma_results_inline(r_xprt, rqst))
 833                 wtype = rpcrdma_noch;
 834         else if ((ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ) &&
 835                  rpcrdma_nonpayload_inline(r_xprt, rqst))
 836                 wtype = rpcrdma_writech;
 837         else
 838                 wtype = rpcrdma_replych;
 839 
 840         /*
 841          * Chunks needed for arguments?
 842          *
 843          * o If the total request is under the inline threshold, all ops
 844          *   are sent as inline.
 845          * o Large write ops transmit data as read chunk(s), header as
 846          *   inline.
 847          * o Large non-write ops are sent with the entire message as a
 848          *   single read chunk (protocol 0-position special case).
 849          *
 850          * This assumes that the upper layer does not present a request
 851          * that both has a data payload, and whose non-data arguments
 852          * by themselves are larger than the inline threshold.
 853          */
 854         if (rpcrdma_args_inline(r_xprt, rqst)) {
 855                 *p++ = rdma_msg;
 856                 rtype = rpcrdma_noch;
 857         } else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
 858                 *p++ = rdma_msg;
 859                 rtype = rpcrdma_readch;
 860         } else {
 861                 r_xprt->rx_stats.nomsg_call_count++;
 862                 *p++ = rdma_nomsg;
 863                 rtype = rpcrdma_areadch;
 864         }
 865 
 866         /* If this is a retransmit, discard previously registered
 867          * chunks. Very likely the connection has been replaced,
 868          * so these registrations are invalid and unusable.
 869          */
 870         frwr_recycle(req);
 871 
 872         /* This implementation supports the following combinations
 873          * of chunk lists in one RPC-over-RDMA Call message:
 874          *
 875          *   - Read list
 876          *   - Write list
 877          *   - Reply chunk
 878          *   - Read list + Reply chunk
 879          *
 880          * It might not yet support the following combinations:
 881          *
 882          *   - Read list + Write list
 883          *
 884          * It does not support the following combinations:
 885          *
 886          *   - Write list + Reply chunk
 887          *   - Read list + Write list + Reply chunk
 888          *
 889          * This implementation supports only a single chunk in each
 890          * Read or Write list. Thus for example the client cannot
 891          * send a Call message with a Position Zero Read chunk and a
 892          * regular Read chunk at the same time.
 893          */
 894         ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
 895         if (ret)
 896                 goto out_err;
 897         ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
 898         if (ret)
 899                 goto out_err;
 900         ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
 901         if (ret)
 902                 goto out_err;
 903 
 904         ret = rpcrdma_prepare_send_sges(r_xprt, req, req->rl_hdrbuf.len,
 905                                         &rqst->rq_snd_buf, rtype);
 906         if (ret)
 907                 goto out_err;
 908 
 909         trace_xprtrdma_marshal(req, rtype, wtype);
 910         return 0;
 911 
 912 out_err:
 913         trace_xprtrdma_marshal_failed(rqst, ret);
 914         r_xprt->rx_stats.failed_marshal_count++;
 915         frwr_reset(req);
 916         return ret;
 917 }
 918 
 919 /**
 920  * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs
 921  * @rqst: controlling RPC request
 922  * @srcp: points to RPC message payload in receive buffer
 923  * @copy_len: remaining length of receive buffer content
 924  * @pad: Write chunk pad bytes needed (zero for pure inline)
 925  *
 926  * The upper layer has set the maximum number of bytes it can
 927  * receive in each component of rq_rcv_buf. These values are set in
 928  * the head.iov_len, page_len, tail.iov_len, and buflen fields.
 929  *
 930  * Unlike the TCP equivalent (xdr_partial_copy_from_skb), in
 931  * many cases this function simply updates iov_base pointers in
 932  * rq_rcv_buf to point directly to the received reply data, to
 933  * avoid copying reply data.
 934  *
 935  * Returns the count of bytes which had to be memcopied.
 936  */
 937 static unsigned long
 938 rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
 939 {
 940         unsigned long fixup_copy_count;
 941         int i, npages, curlen;
 942         char *destp;
 943         struct page **ppages;
 944         int page_base;
 945 
 946         /* The head iovec is redirected to the RPC reply message
 947          * in the receive buffer, to avoid a memcopy.
 948          */
 949         rqst->rq_rcv_buf.head[0].iov_base = srcp;
 950         rqst->rq_private_buf.head[0].iov_base = srcp;
 951 
 952         /* The contents of the receive buffer that follow
 953          * head.iov_len bytes are copied into the page list.
 954          */
 955         curlen = rqst->rq_rcv_buf.head[0].iov_len;
 956         if (curlen > copy_len)
 957                 curlen = copy_len;
 958         trace_xprtrdma_fixup(rqst, copy_len, curlen);
 959         srcp += curlen;
 960         copy_len -= curlen;
 961 
 962         ppages = rqst->rq_rcv_buf.pages +
 963                 (rqst->rq_rcv_buf.page_base >> PAGE_SHIFT);
 964         page_base = offset_in_page(rqst->rq_rcv_buf.page_base);
 965         fixup_copy_count = 0;
 966         if (copy_len && rqst->rq_rcv_buf.page_len) {
 967                 int pagelist_len;
 968 
 969                 pagelist_len = rqst->rq_rcv_buf.page_len;
 970                 if (pagelist_len > copy_len)
 971                         pagelist_len = copy_len;
 972                 npages = PAGE_ALIGN(page_base + pagelist_len) >> PAGE_SHIFT;
 973                 for (i = 0; i < npages; i++) {
 974                         curlen = PAGE_SIZE - page_base;
 975                         if (curlen > pagelist_len)
 976                                 curlen = pagelist_len;
 977 
 978                         trace_xprtrdma_fixup_pg(rqst, i, srcp,
 979                                                 copy_len, curlen);
 980                         destp = kmap_atomic(ppages[i]);
 981                         memcpy(destp + page_base, srcp, curlen);
 982                         flush_dcache_page(ppages[i]);
 983                         kunmap_atomic(destp);
 984                         srcp += curlen;
 985                         copy_len -= curlen;
 986                         fixup_copy_count += curlen;
 987                         pagelist_len -= curlen;
 988                         if (!pagelist_len)
 989                                 break;
 990                         page_base = 0;
 991                 }
 992 
 993                 /* Implicit padding for the last segment in a Write
 994                  * chunk is inserted inline at the front of the tail
 995                  * iovec. The upper layer ignores the content of
 996                  * the pad. Simply ensure inline content in the tail
 997                  * that follows the Write chunk is properly aligned.
 998                  */
 999                 if (pad)
1000                         srcp -= pad;
1001         }
1002 
1003         /* The tail iovec is redirected to the remaining data
1004          * in the receive buffer, to avoid a memcopy.
1005          */
1006         if (copy_len || pad) {
1007                 rqst->rq_rcv_buf.tail[0].iov_base = srcp;
1008                 rqst->rq_private_buf.tail[0].iov_base = srcp;
1009         }
1010 
1011         return fixup_copy_count;
1012 }
1013 
1014 /* By convention, backchannel calls arrive via rdma_msg type
1015  * messages, and never populate the chunk lists. This makes
1016  * the RPC/RDMA header small and fixed in size, so it is
1017  * straightforward to check the RPC header's direction field.
1018  */
1019 static bool
1020 rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep)
1021 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1022 {
1023         struct xdr_stream *xdr = &rep->rr_stream;
1024         __be32 *p;
1025 
1026         if (rep->rr_proc != rdma_msg)
1027                 return false;
1028 
1029         /* Peek at stream contents without advancing. */
1030         p = xdr_inline_decode(xdr, 0);
1031 
1032         /* Chunk lists */
1033         if (*p++ != xdr_zero)
1034                 return false;
1035         if (*p++ != xdr_zero)
1036                 return false;
1037         if (*p++ != xdr_zero)
1038                 return false;
1039 
1040         /* RPC header */
1041         if (*p++ != rep->rr_xid)
1042                 return false;
1043         if (*p != cpu_to_be32(RPC_CALL))
1044                 return false;
1045 
1046         /* Now that we are sure this is a backchannel call,
1047          * advance to the RPC header.
1048          */
1049         p = xdr_inline_decode(xdr, 3 * sizeof(*p));
1050         if (unlikely(!p))
1051                 goto out_short;
1052 
1053         rpcrdma_bc_receive_call(r_xprt, rep);
1054         return true;
1055 
1056 out_short:
1057         pr_warn("RPC/RDMA short backward direction call\n");
1058         return true;
1059 }
1060 #else   /* CONFIG_SUNRPC_BACKCHANNEL */
1061 {
1062         return false;
1063 }
1064 #endif  /* CONFIG_SUNRPC_BACKCHANNEL */
1065 
1066 static int decode_rdma_segment(struct xdr_stream *xdr, u32 *length)
1067 {
1068         u32 handle;
1069         u64 offset;
1070         __be32 *p;
1071 
1072         p = xdr_inline_decode(xdr, 4 * sizeof(*p));
1073         if (unlikely(!p))
1074                 return -EIO;
1075 
1076         handle = be32_to_cpup(p++);
1077         *length = be32_to_cpup(p++);
1078         xdr_decode_hyper(p, &offset);
1079 
1080         trace_xprtrdma_decode_seg(handle, *length, offset);
1081         return 0;
1082 }
1083 
1084 static int decode_write_chunk(struct xdr_stream *xdr, u32 *length)
1085 {
1086         u32 segcount, seglength;
1087         __be32 *p;
1088 
1089         p = xdr_inline_decode(xdr, sizeof(*p));
1090         if (unlikely(!p))
1091                 return -EIO;
1092 
1093         *length = 0;
1094         segcount = be32_to_cpup(p);
1095         while (segcount--) {
1096                 if (decode_rdma_segment(xdr, &seglength))
1097                         return -EIO;
1098                 *length += seglength;
1099         }
1100 
1101         return 0;
1102 }
1103 
1104 /* In RPC-over-RDMA Version One replies, a Read list is never
1105  * expected. This decoder is a stub that returns an error if
1106  * a Read list is present.
1107  */
1108 static int decode_read_list(struct xdr_stream *xdr)
1109 {
1110         __be32 *p;
1111 
1112         p = xdr_inline_decode(xdr, sizeof(*p));
1113         if (unlikely(!p))
1114                 return -EIO;
1115         if (unlikely(*p != xdr_zero))
1116                 return -EIO;
1117         return 0;
1118 }
1119 
1120 /* Supports only one Write chunk in the Write list
1121  */
1122 static int decode_write_list(struct xdr_stream *xdr, u32 *length)
1123 {
1124         u32 chunklen;
1125         bool first;
1126         __be32 *p;
1127 
1128         *length = 0;
1129         first = true;
1130         do {
1131                 p = xdr_inline_decode(xdr, sizeof(*p));
1132                 if (unlikely(!p))
1133                         return -EIO;
1134                 if (*p == xdr_zero)
1135                         break;
1136                 if (!first)
1137                         return -EIO;
1138 
1139                 if (decode_write_chunk(xdr, &chunklen))
1140                         return -EIO;
1141                 *length += chunklen;
1142                 first = false;
1143         } while (true);
1144         return 0;
1145 }
1146 
1147 static int decode_reply_chunk(struct xdr_stream *xdr, u32 *length)
1148 {
1149         __be32 *p;
1150 
1151         p = xdr_inline_decode(xdr, sizeof(*p));
1152         if (unlikely(!p))
1153                 return -EIO;
1154 
1155         *length = 0;
1156         if (*p != xdr_zero)
1157                 if (decode_write_chunk(xdr, length))
1158                         return -EIO;
1159         return 0;
1160 }
1161 
1162 static int
1163 rpcrdma_decode_msg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
1164                    struct rpc_rqst *rqst)
1165 {
1166         struct xdr_stream *xdr = &rep->rr_stream;
1167         u32 writelist, replychunk, rpclen;
1168         char *base;
1169 
1170         /* Decode the chunk lists */
1171         if (decode_read_list(xdr))
1172                 return -EIO;
1173         if (decode_write_list(xdr, &writelist))
1174                 return -EIO;
1175         if (decode_reply_chunk(xdr, &replychunk))
1176                 return -EIO;
1177 
1178         /* RDMA_MSG sanity checks */
1179         if (unlikely(replychunk))
1180                 return -EIO;
1181 
1182         /* Build the RPC reply's Payload stream in rqst->rq_rcv_buf */
1183         base = (char *)xdr_inline_decode(xdr, 0);
1184         rpclen = xdr_stream_remaining(xdr);
1185         r_xprt->rx_stats.fixup_copy_count +=
1186                 rpcrdma_inline_fixup(rqst, base, rpclen, writelist & 3);
1187 
1188         r_xprt->rx_stats.total_rdma_reply += writelist;
1189         return rpclen + xdr_align_size(writelist);
1190 }
1191 
1192 static noinline int
1193 rpcrdma_decode_nomsg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep)
1194 {
1195         struct xdr_stream *xdr = &rep->rr_stream;
1196         u32 writelist, replychunk;
1197 
1198         /* Decode the chunk lists */
1199         if (decode_read_list(xdr))
1200                 return -EIO;
1201         if (decode_write_list(xdr, &writelist))
1202                 return -EIO;
1203         if (decode_reply_chunk(xdr, &replychunk))
1204                 return -EIO;
1205 
1206         /* RDMA_NOMSG sanity checks */
1207         if (unlikely(writelist))
1208                 return -EIO;
1209         if (unlikely(!replychunk))
1210                 return -EIO;
1211 
1212         /* Reply chunk buffer already is the reply vector */
1213         r_xprt->rx_stats.total_rdma_reply += replychunk;
1214         return replychunk;
1215 }
1216 
1217 static noinline int
1218 rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
1219                      struct rpc_rqst *rqst)
1220 {
1221         struct xdr_stream *xdr = &rep->rr_stream;
1222         __be32 *p;
1223 
1224         p = xdr_inline_decode(xdr, sizeof(*p));
1225         if (unlikely(!p))
1226                 return -EIO;
1227 
1228         switch (*p) {
1229         case err_vers:
1230                 p = xdr_inline_decode(xdr, 2 * sizeof(*p));
1231                 if (!p)
1232                         break;
1233                 dprintk("RPC:       %s: server reports "
1234                         "version error (%u-%u), xid %08x\n", __func__,
1235                         be32_to_cpup(p), be32_to_cpu(*(p + 1)),
1236                         be32_to_cpu(rep->rr_xid));
1237                 break;
1238         case err_chunk:
1239                 dprintk("RPC:       %s: server reports "
1240                         "header decoding error, xid %08x\n", __func__,
1241                         be32_to_cpu(rep->rr_xid));
1242                 break;
1243         default:
1244                 dprintk("RPC:       %s: server reports "
1245                         "unrecognized error %d, xid %08x\n", __func__,
1246                         be32_to_cpup(p), be32_to_cpu(rep->rr_xid));
1247         }
1248 
1249         r_xprt->rx_stats.bad_reply_count++;
1250         return -EREMOTEIO;
1251 }
1252 
1253 /* Perform XID lookup, reconstruction of the RPC reply, and
1254  * RPC completion while holding the transport lock to ensure
1255  * the rep, rqst, and rq_task pointers remain stable.
1256  */
1257 void rpcrdma_complete_rqst(struct rpcrdma_rep *rep)
1258 {
1259         struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
1260         struct rpc_xprt *xprt = &r_xprt->rx_xprt;
1261         struct rpc_rqst *rqst = rep->rr_rqst;
1262         int status;
1263 
1264         switch (rep->rr_proc) {
1265         case rdma_msg:
1266                 status = rpcrdma_decode_msg(r_xprt, rep, rqst);
1267                 break;
1268         case rdma_nomsg:
1269                 status = rpcrdma_decode_nomsg(r_xprt, rep);
1270                 break;
1271         case rdma_error:
1272                 status = rpcrdma_decode_error(r_xprt, rep, rqst);
1273                 break;
1274         default:
1275                 status = -EIO;
1276         }
1277         if (status < 0)
1278                 goto out_badheader;
1279 
1280 out:
1281         spin_lock(&xprt->queue_lock);
1282         xprt_complete_rqst(rqst->rq_task, status);
1283         xprt_unpin_rqst(rqst);
1284         spin_unlock(&xprt->queue_lock);
1285         return;
1286 
1287 /* If the incoming reply terminated a pending RPC, the next
1288  * RPC call will post a replacement receive buffer as it is
1289  * being marshaled.
1290  */
1291 out_badheader:
1292         trace_xprtrdma_reply_hdr(rep);
1293         r_xprt->rx_stats.bad_reply_count++;
1294         goto out;
1295 }
1296 
1297 static void rpcrdma_reply_done(struct kref *kref)
1298 {
1299         struct rpcrdma_req *req =
1300                 container_of(kref, struct rpcrdma_req, rl_kref);
1301 
1302         rpcrdma_complete_rqst(req->rl_reply);
1303 }
1304 
1305 /**
1306  * rpcrdma_reply_handler - Process received RPC/RDMA messages
1307  * @rep: Incoming rpcrdma_rep object to process
1308  *
1309  * Errors must result in the RPC task either being awakened, or
1310  * allowed to timeout, to discover the errors at that time.
1311  */
1312 void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
1313 {
1314         struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
1315         struct rpc_xprt *xprt = &r_xprt->rx_xprt;
1316         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1317         struct rpcrdma_req *req;
1318         struct rpc_rqst *rqst;
1319         u32 credits;
1320         __be32 *p;
1321 
1322         /* Any data means we had a useful conversation, so
1323          * then we don't need to delay the next reconnect.
1324          */
1325         if (xprt->reestablish_timeout)
1326                 xprt->reestablish_timeout = 0;
1327 
1328         /* Fixed transport header fields */
1329         xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf,
1330                         rep->rr_hdrbuf.head[0].iov_base, NULL);
1331         p = xdr_inline_decode(&rep->rr_stream, 4 * sizeof(*p));
1332         if (unlikely(!p))
1333                 goto out_shortreply;
1334         rep->rr_xid = *p++;
1335         rep->rr_vers = *p++;
1336         credits = be32_to_cpu(*p++);
1337         rep->rr_proc = *p++;
1338 
1339         if (rep->rr_vers != rpcrdma_version)
1340                 goto out_badversion;
1341 
1342         if (rpcrdma_is_bcall(r_xprt, rep))
1343                 return;
1344 
1345         /* Match incoming rpcrdma_rep to an rpcrdma_req to
1346          * get context for handling any incoming chunks.
1347          */
1348         spin_lock(&xprt->queue_lock);
1349         rqst = xprt_lookup_rqst(xprt, rep->rr_xid);
1350         if (!rqst)
1351                 goto out_norqst;
1352         xprt_pin_rqst(rqst);
1353         spin_unlock(&xprt->queue_lock);
1354 
1355         if (credits == 0)
1356                 credits = 1;    /* don't deadlock */
1357         else if (credits > buf->rb_max_requests)
1358                 credits = buf->rb_max_requests;
1359         if (buf->rb_credits != credits) {
1360                 spin_lock(&xprt->transport_lock);
1361                 buf->rb_credits = credits;
1362                 xprt->cwnd = credits << RPC_CWNDSHIFT;
1363                 spin_unlock(&xprt->transport_lock);
1364         }
1365         rpcrdma_post_recvs(r_xprt, false);
1366 
1367         req = rpcr_to_rdmar(rqst);
1368         if (req->rl_reply) {
1369                 trace_xprtrdma_leaked_rep(rqst, req->rl_reply);
1370                 rpcrdma_recv_buffer_put(req->rl_reply);
1371         }
1372         req->rl_reply = rep;
1373         rep->rr_rqst = rqst;
1374 
1375         trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
1376 
1377         if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
1378                 frwr_reminv(rep, &req->rl_registered);
1379         if (!list_empty(&req->rl_registered))
1380                 frwr_unmap_async(r_xprt, req);
1381                 /* LocalInv completion will complete the RPC */
1382         else
1383                 kref_put(&req->rl_kref, rpcrdma_reply_done);
1384         return;
1385 
1386 out_badversion:
1387         trace_xprtrdma_reply_vers(rep);
1388         goto out;
1389 
1390 out_norqst:
1391         spin_unlock(&xprt->queue_lock);
1392         trace_xprtrdma_reply_rqst(rep);
1393         goto out;
1394 
1395 out_shortreply:
1396         trace_xprtrdma_reply_short(rep);
1397 
1398 out:
1399         rpcrdma_recv_buffer_put(rep);
1400 }

/* [<][>][^][v][top][bottom][index][help] */