root/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. svc_rdma_next_recv_ctxt
  2. svc_rdma_recv_ctxt_alloc
  3. svc_rdma_recv_ctxt_destroy
  4. svc_rdma_recv_ctxts_destroy
  5. svc_rdma_recv_ctxt_get
  6. svc_rdma_recv_ctxt_put
  7. svc_rdma_release_rqst
  8. __svc_rdma_post_recv
  9. svc_rdma_post_recv
  10. svc_rdma_post_recvs
  11. svc_rdma_wc_receive
  12. svc_rdma_flush_recv_queues
  13. svc_rdma_build_arg_xdr
  14. xdr_check_read_list
  15. xdr_check_write_chunk
  16. xdr_check_write_list
  17. xdr_check_reply_chunk
  18. svc_rdma_get_inv_rkey
  19. svc_rdma_xdr_decode_req
  20. rdma_read_complete
  21. svc_rdma_send_error
  22. svc_rdma_is_backchannel_reply
  23. svc_rdma_recvfrom

   1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
   2 /*
   3  * Copyright (c) 2016-2018 Oracle. All rights reserved.
   4  * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
   5  * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
   6  *
   7  * This software is available to you under a choice of one of two
   8  * licenses.  You may choose to be licensed under the terms of the GNU
   9  * General Public License (GPL) Version 2, available from the file
  10  * COPYING in the main directory of this source tree, or the BSD-type
  11  * license below:
  12  *
  13  * Redistribution and use in source and binary forms, with or without
  14  * modification, are permitted provided that the following conditions
  15  * are met:
  16  *
  17  *      Redistributions of source code must retain the above copyright
  18  *      notice, this list of conditions and the following disclaimer.
  19  *
  20  *      Redistributions in binary form must reproduce the above
  21  *      copyright notice, this list of conditions and the following
  22  *      disclaimer in the documentation and/or other materials provided
  23  *      with the distribution.
  24  *
  25  *      Neither the name of the Network Appliance, Inc. nor the names of
  26  *      its contributors may be used to endorse or promote products
  27  *      derived from this software without specific prior written
  28  *      permission.
  29  *
  30  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  31  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  32  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  33  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  34  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  35  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  36  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  37  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  38  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  39  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  40  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  41  *
  42  * Author: Tom Tucker <tom@opengridcomputing.com>
  43  */
  44 
  45 /* Operation
  46  *
  47  * The main entry point is svc_rdma_recvfrom. This is called from
  48  * svc_recv when the transport indicates there is incoming data to
  49  * be read. "Data Ready" is signaled when an RDMA Receive completes,
  50  * or when a set of RDMA Reads complete.
  51  *
  52  * An svc_rqst is passed in. This structure contains an array of
  53  * free pages (rq_pages) that will contain the incoming RPC message.
  54  *
  55  * Short messages are moved directly into svc_rqst::rq_arg, and
  56  * the RPC Call is ready to be processed by the Upper Layer.
  57  * svc_rdma_recvfrom returns the length of the RPC Call message,
  58  * completing the reception of the RPC Call.
  59  *
  60  * However, when an incoming message has Read chunks,
  61  * svc_rdma_recvfrom must post RDMA Reads to pull the RPC Call's
  62  * data payload from the client. svc_rdma_recvfrom sets up the
  63  * RDMA Reads using pages in svc_rqst::rq_pages, which are
  64  * transferred to an svc_rdma_recv_ctxt for the duration of the
  65  * I/O. svc_rdma_recvfrom then returns zero, since the RPC message
  66  * is still not yet ready.
  67  *
  68  * When the Read chunk payloads have become available on the
  69  * server, "Data Ready" is raised again, and svc_recv calls
  70  * svc_rdma_recvfrom again. This second call may use a different
  71  * svc_rqst than the first one, thus any information that needs
  72  * to be preserved across these two calls is kept in an
  73  * svc_rdma_recv_ctxt.
  74  *
  75  * The second call to svc_rdma_recvfrom performs final assembly
  76  * of the RPC Call message, using the RDMA Read sink pages kept in
  77  * the svc_rdma_recv_ctxt. The xdr_buf is copied from the
  78  * svc_rdma_recv_ctxt to the second svc_rqst. The second call returns
  79  * the length of the completed RPC Call message.
  80  *
  81  * Page Management
  82  *
  83  * Pages under I/O must be transferred from the first svc_rqst to an
  84  * svc_rdma_recv_ctxt before the first svc_rdma_recvfrom call returns.
  85  *
  86  * The first svc_rqst supplies pages for RDMA Reads. These are moved
  87  * from rqstp::rq_pages into ctxt::pages. The consumed elements of
  88  * the rq_pages array are set to NULL and refilled with the first
  89  * svc_rdma_recvfrom call returns.
  90  *
  91  * During the second svc_rdma_recvfrom call, RDMA Read sink pages
  92  * are transferred from the svc_rdma_recv_ctxt to the second svc_rqst
  93  * (see rdma_read_complete() below).
  94  */
  95 
  96 #include <linux/spinlock.h>
  97 #include <asm/unaligned.h>
  98 #include <rdma/ib_verbs.h>
  99 #include <rdma/rdma_cm.h>
 100 
 101 #include <linux/sunrpc/xdr.h>
 102 #include <linux/sunrpc/debug.h>
 103 #include <linux/sunrpc/rpc_rdma.h>
 104 #include <linux/sunrpc/svc_rdma.h>
 105 
 106 #include "xprt_rdma.h"
 107 #include <trace/events/rpcrdma.h>
 108 
 109 #define RPCDBG_FACILITY RPCDBG_SVCXPRT
 110 
 111 static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc);
 112 
 113 static inline struct svc_rdma_recv_ctxt *
 114 svc_rdma_next_recv_ctxt(struct list_head *list)
 115 {
 116         return list_first_entry_or_null(list, struct svc_rdma_recv_ctxt,
 117                                         rc_list);
 118 }
 119 
 120 static struct svc_rdma_recv_ctxt *
 121 svc_rdma_recv_ctxt_alloc(struct svcxprt_rdma *rdma)
 122 {
 123         struct svc_rdma_recv_ctxt *ctxt;
 124         dma_addr_t addr;
 125         void *buffer;
 126 
 127         ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
 128         if (!ctxt)
 129                 goto fail0;
 130         buffer = kmalloc(rdma->sc_max_req_size, GFP_KERNEL);
 131         if (!buffer)
 132                 goto fail1;
 133         addr = ib_dma_map_single(rdma->sc_pd->device, buffer,
 134                                  rdma->sc_max_req_size, DMA_FROM_DEVICE);
 135         if (ib_dma_mapping_error(rdma->sc_pd->device, addr))
 136                 goto fail2;
 137 
 138         ctxt->rc_recv_wr.next = NULL;
 139         ctxt->rc_recv_wr.wr_cqe = &ctxt->rc_cqe;
 140         ctxt->rc_recv_wr.sg_list = &ctxt->rc_recv_sge;
 141         ctxt->rc_recv_wr.num_sge = 1;
 142         ctxt->rc_cqe.done = svc_rdma_wc_receive;
 143         ctxt->rc_recv_sge.addr = addr;
 144         ctxt->rc_recv_sge.length = rdma->sc_max_req_size;
 145         ctxt->rc_recv_sge.lkey = rdma->sc_pd->local_dma_lkey;
 146         ctxt->rc_recv_buf = buffer;
 147         ctxt->rc_temp = false;
 148         return ctxt;
 149 
 150 fail2:
 151         kfree(buffer);
 152 fail1:
 153         kfree(ctxt);
 154 fail0:
 155         return NULL;
 156 }
 157 
 158 static void svc_rdma_recv_ctxt_destroy(struct svcxprt_rdma *rdma,
 159                                        struct svc_rdma_recv_ctxt *ctxt)
 160 {
 161         ib_dma_unmap_single(rdma->sc_pd->device, ctxt->rc_recv_sge.addr,
 162                             ctxt->rc_recv_sge.length, DMA_FROM_DEVICE);
 163         kfree(ctxt->rc_recv_buf);
 164         kfree(ctxt);
 165 }
 166 
 167 /**
 168  * svc_rdma_recv_ctxts_destroy - Release all recv_ctxt's for an xprt
 169  * @rdma: svcxprt_rdma being torn down
 170  *
 171  */
 172 void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma)
 173 {
 174         struct svc_rdma_recv_ctxt *ctxt;
 175         struct llist_node *node;
 176 
 177         while ((node = llist_del_first(&rdma->sc_recv_ctxts))) {
 178                 ctxt = llist_entry(node, struct svc_rdma_recv_ctxt, rc_node);
 179                 svc_rdma_recv_ctxt_destroy(rdma, ctxt);
 180         }
 181 }
 182 
 183 static struct svc_rdma_recv_ctxt *
 184 svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma)
 185 {
 186         struct svc_rdma_recv_ctxt *ctxt;
 187         struct llist_node *node;
 188 
 189         node = llist_del_first(&rdma->sc_recv_ctxts);
 190         if (!node)
 191                 goto out_empty;
 192         ctxt = llist_entry(node, struct svc_rdma_recv_ctxt, rc_node);
 193 
 194 out:
 195         ctxt->rc_page_count = 0;
 196         return ctxt;
 197 
 198 out_empty:
 199         ctxt = svc_rdma_recv_ctxt_alloc(rdma);
 200         if (!ctxt)
 201                 return NULL;
 202         goto out;
 203 }
 204 
 205 /**
 206  * svc_rdma_recv_ctxt_put - Return recv_ctxt to free list
 207  * @rdma: controlling svcxprt_rdma
 208  * @ctxt: object to return to the free list
 209  *
 210  */
 211 void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma,
 212                             struct svc_rdma_recv_ctxt *ctxt)
 213 {
 214         unsigned int i;
 215 
 216         for (i = 0; i < ctxt->rc_page_count; i++)
 217                 put_page(ctxt->rc_pages[i]);
 218 
 219         if (!ctxt->rc_temp)
 220                 llist_add(&ctxt->rc_node, &rdma->sc_recv_ctxts);
 221         else
 222                 svc_rdma_recv_ctxt_destroy(rdma, ctxt);
 223 }
 224 
 225 /**
 226  * svc_rdma_release_rqst - Release transport-specific per-rqst resources
 227  * @rqstp: svc_rqst being released
 228  *
 229  * Ensure that the recv_ctxt is released whether or not a Reply
 230  * was sent. For example, the client could close the connection,
 231  * or svc_process could drop an RPC, before the Reply is sent.
 232  */
 233 void svc_rdma_release_rqst(struct svc_rqst *rqstp)
 234 {
 235         struct svc_rdma_recv_ctxt *ctxt = rqstp->rq_xprt_ctxt;
 236         struct svc_xprt *xprt = rqstp->rq_xprt;
 237         struct svcxprt_rdma *rdma =
 238                 container_of(xprt, struct svcxprt_rdma, sc_xprt);
 239 
 240         rqstp->rq_xprt_ctxt = NULL;
 241         if (ctxt)
 242                 svc_rdma_recv_ctxt_put(rdma, ctxt);
 243 }
 244 
 245 static int __svc_rdma_post_recv(struct svcxprt_rdma *rdma,
 246                                 struct svc_rdma_recv_ctxt *ctxt)
 247 {
 248         int ret;
 249 
 250         svc_xprt_get(&rdma->sc_xprt);
 251         ret = ib_post_recv(rdma->sc_qp, &ctxt->rc_recv_wr, NULL);
 252         trace_svcrdma_post_recv(&ctxt->rc_recv_wr, ret);
 253         if (ret)
 254                 goto err_post;
 255         return 0;
 256 
 257 err_post:
 258         svc_rdma_recv_ctxt_put(rdma, ctxt);
 259         svc_xprt_put(&rdma->sc_xprt);
 260         return ret;
 261 }
 262 
 263 static int svc_rdma_post_recv(struct svcxprt_rdma *rdma)
 264 {
 265         struct svc_rdma_recv_ctxt *ctxt;
 266 
 267         ctxt = svc_rdma_recv_ctxt_get(rdma);
 268         if (!ctxt)
 269                 return -ENOMEM;
 270         return __svc_rdma_post_recv(rdma, ctxt);
 271 }
 272 
 273 /**
 274  * svc_rdma_post_recvs - Post initial set of Recv WRs
 275  * @rdma: fresh svcxprt_rdma
 276  *
 277  * Returns true if successful, otherwise false.
 278  */
 279 bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma)
 280 {
 281         struct svc_rdma_recv_ctxt *ctxt;
 282         unsigned int i;
 283         int ret;
 284 
 285         for (i = 0; i < rdma->sc_max_requests; i++) {
 286                 ctxt = svc_rdma_recv_ctxt_get(rdma);
 287                 if (!ctxt)
 288                         return false;
 289                 ctxt->rc_temp = true;
 290                 ret = __svc_rdma_post_recv(rdma, ctxt);
 291                 if (ret)
 292                         return false;
 293         }
 294         return true;
 295 }
 296 
 297 /**
 298  * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
 299  * @cq: Completion Queue context
 300  * @wc: Work Completion object
 301  *
 302  * NB: The svc_xprt/svcxprt_rdma is pinned whenever it's possible that
 303  * the Receive completion handler could be running.
 304  */
 305 static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
 306 {
 307         struct svcxprt_rdma *rdma = cq->cq_context;
 308         struct ib_cqe *cqe = wc->wr_cqe;
 309         struct svc_rdma_recv_ctxt *ctxt;
 310 
 311         trace_svcrdma_wc_receive(wc);
 312 
 313         /* WARNING: Only wc->wr_cqe and wc->status are reliable */
 314         ctxt = container_of(cqe, struct svc_rdma_recv_ctxt, rc_cqe);
 315 
 316         if (wc->status != IB_WC_SUCCESS)
 317                 goto flushed;
 318 
 319         if (svc_rdma_post_recv(rdma))
 320                 goto post_err;
 321 
 322         /* All wc fields are now known to be valid */
 323         ctxt->rc_byte_len = wc->byte_len;
 324         ib_dma_sync_single_for_cpu(rdma->sc_pd->device,
 325                                    ctxt->rc_recv_sge.addr,
 326                                    wc->byte_len, DMA_FROM_DEVICE);
 327 
 328         spin_lock(&rdma->sc_rq_dto_lock);
 329         list_add_tail(&ctxt->rc_list, &rdma->sc_rq_dto_q);
 330         /* Note the unlock pairs with the smp_rmb in svc_xprt_ready: */
 331         set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
 332         spin_unlock(&rdma->sc_rq_dto_lock);
 333         if (!test_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags))
 334                 svc_xprt_enqueue(&rdma->sc_xprt);
 335         goto out;
 336 
 337 flushed:
 338 post_err:
 339         svc_rdma_recv_ctxt_put(rdma, ctxt);
 340         set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
 341         svc_xprt_enqueue(&rdma->sc_xprt);
 342 out:
 343         svc_xprt_put(&rdma->sc_xprt);
 344 }
 345 
 346 /**
 347  * svc_rdma_flush_recv_queues - Drain pending Receive work
 348  * @rdma: svcxprt_rdma being shut down
 349  *
 350  */
 351 void svc_rdma_flush_recv_queues(struct svcxprt_rdma *rdma)
 352 {
 353         struct svc_rdma_recv_ctxt *ctxt;
 354 
 355         while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_read_complete_q))) {
 356                 list_del(&ctxt->rc_list);
 357                 svc_rdma_recv_ctxt_put(rdma, ctxt);
 358         }
 359         while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_rq_dto_q))) {
 360                 list_del(&ctxt->rc_list);
 361                 svc_rdma_recv_ctxt_put(rdma, ctxt);
 362         }
 363 }
 364 
 365 static void svc_rdma_build_arg_xdr(struct svc_rqst *rqstp,
 366                                    struct svc_rdma_recv_ctxt *ctxt)
 367 {
 368         struct xdr_buf *arg = &rqstp->rq_arg;
 369 
 370         arg->head[0].iov_base = ctxt->rc_recv_buf;
 371         arg->head[0].iov_len = ctxt->rc_byte_len;
 372         arg->tail[0].iov_base = NULL;
 373         arg->tail[0].iov_len = 0;
 374         arg->page_len = 0;
 375         arg->page_base = 0;
 376         arg->buflen = ctxt->rc_byte_len;
 377         arg->len = ctxt->rc_byte_len;
 378 }
 379 
 380 /* This accommodates the largest possible Write chunk,
 381  * in one segment.
 382  */
 383 #define MAX_BYTES_WRITE_SEG     ((u32)(RPCSVC_MAXPAGES << PAGE_SHIFT))
 384 
 385 /* This accommodates the largest possible Position-Zero
 386  * Read chunk or Reply chunk, in one segment.
 387  */
 388 #define MAX_BYTES_SPECIAL_SEG   ((u32)((RPCSVC_MAXPAGES + 2) << PAGE_SHIFT))
 389 
 390 /* Sanity check the Read list.
 391  *
 392  * Implementation limits:
 393  * - This implementation supports only one Read chunk.
 394  *
 395  * Sanity checks:
 396  * - Read list does not overflow buffer.
 397  * - Segment size limited by largest NFS data payload.
 398  *
 399  * The segment count is limited to how many segments can
 400  * fit in the transport header without overflowing the
 401  * buffer. That's about 40 Read segments for a 1KB inline
 402  * threshold.
 403  *
 404  * Returns pointer to the following Write list.
 405  */
 406 static __be32 *xdr_check_read_list(__be32 *p, const __be32 *end)
 407 {
 408         u32 position;
 409         bool first;
 410 
 411         first = true;
 412         while (*p++ != xdr_zero) {
 413                 if (first) {
 414                         position = be32_to_cpup(p++);
 415                         first = false;
 416                 } else if (be32_to_cpup(p++) != position) {
 417                         return NULL;
 418                 }
 419                 p++;    /* handle */
 420                 if (be32_to_cpup(p++) > MAX_BYTES_SPECIAL_SEG)
 421                         return NULL;
 422                 p += 2; /* offset */
 423 
 424                 if (p > end)
 425                         return NULL;
 426         }
 427         return p;
 428 }
 429 
 430 /* The segment count is limited to how many segments can
 431  * fit in the transport header without overflowing the
 432  * buffer. That's about 60 Write segments for a 1KB inline
 433  * threshold.
 434  */
 435 static __be32 *xdr_check_write_chunk(__be32 *p, const __be32 *end,
 436                                      u32 maxlen)
 437 {
 438         u32 i, segcount;
 439 
 440         segcount = be32_to_cpup(p++);
 441         for (i = 0; i < segcount; i++) {
 442                 p++;    /* handle */
 443                 if (be32_to_cpup(p++) > maxlen)
 444                         return NULL;
 445                 p += 2; /* offset */
 446 
 447                 if (p > end)
 448                         return NULL;
 449         }
 450 
 451         return p;
 452 }
 453 
 454 /* Sanity check the Write list.
 455  *
 456  * Implementation limits:
 457  * - This implementation supports only one Write chunk.
 458  *
 459  * Sanity checks:
 460  * - Write list does not overflow buffer.
 461  * - Segment size limited by largest NFS data payload.
 462  *
 463  * Returns pointer to the following Reply chunk.
 464  */
 465 static __be32 *xdr_check_write_list(__be32 *p, const __be32 *end)
 466 {
 467         u32 chcount;
 468 
 469         chcount = 0;
 470         while (*p++ != xdr_zero) {
 471                 p = xdr_check_write_chunk(p, end, MAX_BYTES_WRITE_SEG);
 472                 if (!p)
 473                         return NULL;
 474                 if (chcount++ > 1)
 475                         return NULL;
 476         }
 477         return p;
 478 }
 479 
 480 /* Sanity check the Reply chunk.
 481  *
 482  * Sanity checks:
 483  * - Reply chunk does not overflow buffer.
 484  * - Segment size limited by largest NFS data payload.
 485  *
 486  * Returns pointer to the following RPC header.
 487  */
 488 static __be32 *xdr_check_reply_chunk(__be32 *p, const __be32 *end)
 489 {
 490         if (*p++ != xdr_zero) {
 491                 p = xdr_check_write_chunk(p, end, MAX_BYTES_SPECIAL_SEG);
 492                 if (!p)
 493                         return NULL;
 494         }
 495         return p;
 496 }
 497 
 498 /* RPC-over-RDMA Version One private extension: Remote Invalidation.
 499  * Responder's choice: requester signals it can handle Send With
 500  * Invalidate, and responder chooses one R_key to invalidate.
 501  *
 502  * If there is exactly one distinct R_key in the received transport
 503  * header, set rc_inv_rkey to that R_key. Otherwise, set it to zero.
 504  *
 505  * Perform this operation while the received transport header is
 506  * still in the CPU cache.
 507  */
 508 static void svc_rdma_get_inv_rkey(struct svcxprt_rdma *rdma,
 509                                   struct svc_rdma_recv_ctxt *ctxt)
 510 {
 511         __be32 inv_rkey, *p;
 512         u32 i, segcount;
 513 
 514         ctxt->rc_inv_rkey = 0;
 515 
 516         if (!rdma->sc_snd_w_inv)
 517                 return;
 518 
 519         inv_rkey = xdr_zero;
 520         p = ctxt->rc_recv_buf;
 521         p += rpcrdma_fixed_maxsz;
 522 
 523         /* Read list */
 524         while (*p++ != xdr_zero) {
 525                 p++;    /* position */
 526                 if (inv_rkey == xdr_zero)
 527                         inv_rkey = *p;
 528                 else if (inv_rkey != *p)
 529                         return;
 530                 p += 4;
 531         }
 532 
 533         /* Write list */
 534         while (*p++ != xdr_zero) {
 535                 segcount = be32_to_cpup(p++);
 536                 for (i = 0; i < segcount; i++) {
 537                         if (inv_rkey == xdr_zero)
 538                                 inv_rkey = *p;
 539                         else if (inv_rkey != *p)
 540                                 return;
 541                         p += 4;
 542                 }
 543         }
 544 
 545         /* Reply chunk */
 546         if (*p++ != xdr_zero) {
 547                 segcount = be32_to_cpup(p++);
 548                 for (i = 0; i < segcount; i++) {
 549                         if (inv_rkey == xdr_zero)
 550                                 inv_rkey = *p;
 551                         else if (inv_rkey != *p)
 552                                 return;
 553                         p += 4;
 554                 }
 555         }
 556 
 557         ctxt->rc_inv_rkey = be32_to_cpu(inv_rkey);
 558 }
 559 
 560 /* On entry, xdr->head[0].iov_base points to first byte in the
 561  * RPC-over-RDMA header.
 562  *
 563  * On successful exit, head[0] points to first byte past the
 564  * RPC-over-RDMA header. For RDMA_MSG, this is the RPC message.
 565  * The length of the RPC-over-RDMA header is returned.
 566  *
 567  * Assumptions:
 568  * - The transport header is entirely contained in the head iovec.
 569  */
 570 static int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg)
 571 {
 572         __be32 *p, *end, *rdma_argp;
 573         unsigned int hdr_len;
 574 
 575         /* Verify that there's enough bytes for header + something */
 576         if (rq_arg->len <= RPCRDMA_HDRLEN_ERR)
 577                 goto out_short;
 578 
 579         rdma_argp = rq_arg->head[0].iov_base;
 580         if (*(rdma_argp + 1) != rpcrdma_version)
 581                 goto out_version;
 582 
 583         switch (*(rdma_argp + 3)) {
 584         case rdma_msg:
 585                 break;
 586         case rdma_nomsg:
 587                 break;
 588 
 589         case rdma_done:
 590                 goto out_drop;
 591 
 592         case rdma_error:
 593                 goto out_drop;
 594 
 595         default:
 596                 goto out_proc;
 597         }
 598 
 599         end = (__be32 *)((unsigned long)rdma_argp + rq_arg->len);
 600         p = xdr_check_read_list(rdma_argp + 4, end);
 601         if (!p)
 602                 goto out_inval;
 603         p = xdr_check_write_list(p, end);
 604         if (!p)
 605                 goto out_inval;
 606         p = xdr_check_reply_chunk(p, end);
 607         if (!p)
 608                 goto out_inval;
 609         if (p > end)
 610                 goto out_inval;
 611 
 612         rq_arg->head[0].iov_base = p;
 613         hdr_len = (unsigned long)p - (unsigned long)rdma_argp;
 614         rq_arg->head[0].iov_len -= hdr_len;
 615         rq_arg->len -= hdr_len;
 616         trace_svcrdma_decode_rqst(rdma_argp, hdr_len);
 617         return hdr_len;
 618 
 619 out_short:
 620         trace_svcrdma_decode_short(rq_arg->len);
 621         return -EINVAL;
 622 
 623 out_version:
 624         trace_svcrdma_decode_badvers(rdma_argp);
 625         return -EPROTONOSUPPORT;
 626 
 627 out_drop:
 628         trace_svcrdma_decode_drop(rdma_argp);
 629         return 0;
 630 
 631 out_proc:
 632         trace_svcrdma_decode_badproc(rdma_argp);
 633         return -EINVAL;
 634 
 635 out_inval:
 636         trace_svcrdma_decode_parse(rdma_argp);
 637         return -EINVAL;
 638 }
 639 
 640 static void rdma_read_complete(struct svc_rqst *rqstp,
 641                                struct svc_rdma_recv_ctxt *head)
 642 {
 643         int page_no;
 644 
 645         /* Move Read chunk pages to rqstp so that they will be released
 646          * when svc_process is done with them.
 647          */
 648         for (page_no = 0; page_no < head->rc_page_count; page_no++) {
 649                 put_page(rqstp->rq_pages[page_no]);
 650                 rqstp->rq_pages[page_no] = head->rc_pages[page_no];
 651         }
 652         head->rc_page_count = 0;
 653 
 654         /* Point rq_arg.pages past header */
 655         rqstp->rq_arg.pages = &rqstp->rq_pages[head->rc_hdr_count];
 656         rqstp->rq_arg.page_len = head->rc_arg.page_len;
 657 
 658         /* rq_respages starts after the last arg page */
 659         rqstp->rq_respages = &rqstp->rq_pages[page_no];
 660         rqstp->rq_next_page = rqstp->rq_respages + 1;
 661 
 662         /* Rebuild rq_arg head and tail. */
 663         rqstp->rq_arg.head[0] = head->rc_arg.head[0];
 664         rqstp->rq_arg.tail[0] = head->rc_arg.tail[0];
 665         rqstp->rq_arg.len = head->rc_arg.len;
 666         rqstp->rq_arg.buflen = head->rc_arg.buflen;
 667 }
 668 
 669 static void svc_rdma_send_error(struct svcxprt_rdma *xprt,
 670                                 __be32 *rdma_argp, int status)
 671 {
 672         struct svc_rdma_send_ctxt *ctxt;
 673         unsigned int length;
 674         __be32 *p;
 675         int ret;
 676 
 677         ctxt = svc_rdma_send_ctxt_get(xprt);
 678         if (!ctxt)
 679                 return;
 680 
 681         p = ctxt->sc_xprt_buf;
 682         *p++ = *rdma_argp;
 683         *p++ = *(rdma_argp + 1);
 684         *p++ = xprt->sc_fc_credits;
 685         *p++ = rdma_error;
 686         switch (status) {
 687         case -EPROTONOSUPPORT:
 688                 *p++ = err_vers;
 689                 *p++ = rpcrdma_version;
 690                 *p++ = rpcrdma_version;
 691                 trace_svcrdma_err_vers(*rdma_argp);
 692                 break;
 693         default:
 694                 *p++ = err_chunk;
 695                 trace_svcrdma_err_chunk(*rdma_argp);
 696         }
 697         length = (unsigned long)p - (unsigned long)ctxt->sc_xprt_buf;
 698         svc_rdma_sync_reply_hdr(xprt, ctxt, length);
 699 
 700         ctxt->sc_send_wr.opcode = IB_WR_SEND;
 701         ret = svc_rdma_send(xprt, &ctxt->sc_send_wr);
 702         if (ret)
 703                 svc_rdma_send_ctxt_put(xprt, ctxt);
 704 }
 705 
 706 /* By convention, backchannel calls arrive via rdma_msg type
 707  * messages, and never populate the chunk lists. This makes
 708  * the RPC/RDMA header small and fixed in size, so it is
 709  * straightforward to check the RPC header's direction field.
 710  */
 711 static bool svc_rdma_is_backchannel_reply(struct svc_xprt *xprt,
 712                                           __be32 *rdma_resp)
 713 {
 714         __be32 *p;
 715 
 716         if (!xprt->xpt_bc_xprt)
 717                 return false;
 718 
 719         p = rdma_resp + 3;
 720         if (*p++ != rdma_msg)
 721                 return false;
 722 
 723         if (*p++ != xdr_zero)
 724                 return false;
 725         if (*p++ != xdr_zero)
 726                 return false;
 727         if (*p++ != xdr_zero)
 728                 return false;
 729 
 730         /* XID sanity */
 731         if (*p++ != *rdma_resp)
 732                 return false;
 733         /* call direction */
 734         if (*p == cpu_to_be32(RPC_CALL))
 735                 return false;
 736 
 737         return true;
 738 }
 739 
 740 /**
 741  * svc_rdma_recvfrom - Receive an RPC call
 742  * @rqstp: request structure into which to receive an RPC Call
 743  *
 744  * Returns:
 745  *      The positive number of bytes in the RPC Call message,
 746  *      %0 if there were no Calls ready to return,
 747  *      %-EINVAL if the Read chunk data is too large,
 748  *      %-ENOMEM if rdma_rw context pool was exhausted,
 749  *      %-ENOTCONN if posting failed (connection is lost),
 750  *      %-EIO if rdma_rw initialization failed (DMA mapping, etc).
 751  *
 752  * Called in a loop when XPT_DATA is set. XPT_DATA is cleared only
 753  * when there are no remaining ctxt's to process.
 754  *
 755  * The next ctxt is removed from the "receive" lists.
 756  *
 757  * - If the ctxt completes a Read, then finish assembling the Call
 758  *   message and return the number of bytes in the message.
 759  *
 760  * - If the ctxt completes a Receive, then construct the Call
 761  *   message from the contents of the Receive buffer.
 762  *
 763  *   - If there are no Read chunks in this message, then finish
 764  *     assembling the Call message and return the number of bytes
 765  *     in the message.
 766  *
 767  *   - If there are Read chunks in this message, post Read WRs to
 768  *     pull that payload and return 0.
 769  */
 770 int svc_rdma_recvfrom(struct svc_rqst *rqstp)
 771 {
 772         struct svc_xprt *xprt = rqstp->rq_xprt;
 773         struct svcxprt_rdma *rdma_xprt =
 774                 container_of(xprt, struct svcxprt_rdma, sc_xprt);
 775         struct svc_rdma_recv_ctxt *ctxt;
 776         __be32 *p;
 777         int ret;
 778 
 779         rqstp->rq_xprt_ctxt = NULL;
 780 
 781         spin_lock(&rdma_xprt->sc_rq_dto_lock);
 782         ctxt = svc_rdma_next_recv_ctxt(&rdma_xprt->sc_read_complete_q);
 783         if (ctxt) {
 784                 list_del(&ctxt->rc_list);
 785                 spin_unlock(&rdma_xprt->sc_rq_dto_lock);
 786                 rdma_read_complete(rqstp, ctxt);
 787                 goto complete;
 788         }
 789         ctxt = svc_rdma_next_recv_ctxt(&rdma_xprt->sc_rq_dto_q);
 790         if (!ctxt) {
 791                 /* No new incoming requests, terminate the loop */
 792                 clear_bit(XPT_DATA, &xprt->xpt_flags);
 793                 spin_unlock(&rdma_xprt->sc_rq_dto_lock);
 794                 return 0;
 795         }
 796         list_del(&ctxt->rc_list);
 797         spin_unlock(&rdma_xprt->sc_rq_dto_lock);
 798 
 799         atomic_inc(&rdma_stat_recv);
 800 
 801         svc_rdma_build_arg_xdr(rqstp, ctxt);
 802 
 803         /* Prevent svc_xprt_release from releasing pages in rq_pages
 804          * if we return 0 or an error.
 805          */
 806         rqstp->rq_respages = rqstp->rq_pages;
 807         rqstp->rq_next_page = rqstp->rq_respages;
 808 
 809         p = (__be32 *)rqstp->rq_arg.head[0].iov_base;
 810         ret = svc_rdma_xdr_decode_req(&rqstp->rq_arg);
 811         if (ret < 0)
 812                 goto out_err;
 813         if (ret == 0)
 814                 goto out_drop;
 815         rqstp->rq_xprt_hlen = ret;
 816 
 817         if (svc_rdma_is_backchannel_reply(xprt, p)) {
 818                 ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt, p,
 819                                                &rqstp->rq_arg);
 820                 svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
 821                 return ret;
 822         }
 823         svc_rdma_get_inv_rkey(rdma_xprt, ctxt);
 824 
 825         p += rpcrdma_fixed_maxsz;
 826         if (*p != xdr_zero)
 827                 goto out_readchunk;
 828 
 829 complete:
 830         rqstp->rq_xprt_ctxt = ctxt;
 831         rqstp->rq_prot = IPPROTO_MAX;
 832         svc_xprt_copy_addrs(rqstp, xprt);
 833         return rqstp->rq_arg.len;
 834 
 835 out_readchunk:
 836         ret = svc_rdma_recv_read_chunk(rdma_xprt, rqstp, ctxt, p);
 837         if (ret < 0)
 838                 goto out_postfail;
 839         return 0;
 840 
 841 out_err:
 842         svc_rdma_send_error(rdma_xprt, p, ret);
 843         svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
 844         return 0;
 845 
 846 out_postfail:
 847         if (ret == -EINVAL)
 848                 svc_rdma_send_error(rdma_xprt, p, ret);
 849         svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
 850         return ret;
 851 
 852 out_drop:
 853         svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
 854         return 0;
 855 }

/* [<][>][^][v][top][bottom][index][help] */