root/net/sunrpc/xprtrdma/transport.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. xprt_rdma_format_addresses4
  2. xprt_rdma_format_addresses6
  3. xprt_rdma_format_addresses
  4. xprt_rdma_free_addresses
  5. xprt_rdma_connect_worker
  6. xprt_rdma_inject_disconnect
  7. xprt_rdma_destroy
  8. xprt_setup_rdma
  9. xprt_rdma_close
  10. xprt_rdma_set_port
  11. xprt_rdma_timer
  12. xprt_rdma_set_connect_timeout
  13. xprt_rdma_connect
  14. xprt_rdma_alloc_slot
  15. xprt_rdma_free_slot
  16. rpcrdma_check_regbuf
  17. xprt_rdma_allocate
  18. xprt_rdma_free
  19. xprt_rdma_send_request
  20. xprt_rdma_print_stats
  21. xprt_rdma_enable_swap
  22. xprt_rdma_disable_swap
  23. xprt_rdma_cleanup
  24. xprt_rdma_init

   1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
   2 /*
   3  * Copyright (c) 2014-2017 Oracle.  All rights reserved.
   4  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
   5  *
   6  * This software is available to you under a choice of one of two
   7  * licenses.  You may choose to be licensed under the terms of the GNU
   8  * General Public License (GPL) Version 2, available from the file
   9  * COPYING in the main directory of this source tree, or the BSD-type
  10  * license below:
  11  *
  12  * Redistribution and use in source and binary forms, with or without
  13  * modification, are permitted provided that the following conditions
  14  * are met:
  15  *
  16  *      Redistributions of source code must retain the above copyright
  17  *      notice, this list of conditions and the following disclaimer.
  18  *
  19  *      Redistributions in binary form must reproduce the above
  20  *      copyright notice, this list of conditions and the following
  21  *      disclaimer in the documentation and/or other materials provided
  22  *      with the distribution.
  23  *
  24  *      Neither the name of the Network Appliance, Inc. nor the names of
  25  *      its contributors may be used to endorse or promote products
  26  *      derived from this software without specific prior written
  27  *      permission.
  28  *
  29  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  30  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  31  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  32  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  33  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  34  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  35  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  36  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  37  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  38  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  39  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  40  */
  41 
  42 /*
  43  * transport.c
  44  *
  45  * This file contains the top-level implementation of an RPC RDMA
  46  * transport.
  47  *
  48  * Naming convention: functions beginning with xprt_ are part of the
  49  * transport switch. All others are RPC RDMA internal.
  50  */
  51 
  52 #include <linux/module.h>
  53 #include <linux/slab.h>
  54 #include <linux/seq_file.h>
  55 #include <linux/smp.h>
  56 
  57 #include <linux/sunrpc/addr.h>
  58 #include <linux/sunrpc/svc_rdma.h>
  59 
  60 #include "xprt_rdma.h"
  61 #include <trace/events/rpcrdma.h>
  62 
  63 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  64 # define RPCDBG_FACILITY        RPCDBG_TRANS
  65 #endif
  66 
  67 /*
  68  * tunables
  69  */
  70 
  71 unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE;
  72 unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE;
  73 unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE;
  74 unsigned int xprt_rdma_memreg_strategy          = RPCRDMA_FRWR;
  75 int xprt_rdma_pad_optimize;
  76 
  77 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  78 
  79 static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE;
  80 static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE;
  81 static unsigned int min_inline_size = RPCRDMA_MIN_INLINE;
  82 static unsigned int max_inline_size = RPCRDMA_MAX_INLINE;
  83 static unsigned int max_padding = PAGE_SIZE;
  84 static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS;
  85 static unsigned int max_memreg = RPCRDMA_LAST - 1;
  86 static unsigned int dummy;
  87 
  88 static struct ctl_table_header *sunrpc_table_header;
  89 
  90 static struct ctl_table xr_tunables_table[] = {
  91         {
  92                 .procname       = "rdma_slot_table_entries",
  93                 .data           = &xprt_rdma_slot_table_entries,
  94                 .maxlen         = sizeof(unsigned int),
  95                 .mode           = 0644,
  96                 .proc_handler   = proc_dointvec_minmax,
  97                 .extra1         = &min_slot_table_size,
  98                 .extra2         = &max_slot_table_size
  99         },
 100         {
 101                 .procname       = "rdma_max_inline_read",
 102                 .data           = &xprt_rdma_max_inline_read,
 103                 .maxlen         = sizeof(unsigned int),
 104                 .mode           = 0644,
 105                 .proc_handler   = proc_dointvec_minmax,
 106                 .extra1         = &min_inline_size,
 107                 .extra2         = &max_inline_size,
 108         },
 109         {
 110                 .procname       = "rdma_max_inline_write",
 111                 .data           = &xprt_rdma_max_inline_write,
 112                 .maxlen         = sizeof(unsigned int),
 113                 .mode           = 0644,
 114                 .proc_handler   = proc_dointvec_minmax,
 115                 .extra1         = &min_inline_size,
 116                 .extra2         = &max_inline_size,
 117         },
 118         {
 119                 .procname       = "rdma_inline_write_padding",
 120                 .data           = &dummy,
 121                 .maxlen         = sizeof(unsigned int),
 122                 .mode           = 0644,
 123                 .proc_handler   = proc_dointvec_minmax,
 124                 .extra1         = SYSCTL_ZERO,
 125                 .extra2         = &max_padding,
 126         },
 127         {
 128                 .procname       = "rdma_memreg_strategy",
 129                 .data           = &xprt_rdma_memreg_strategy,
 130                 .maxlen         = sizeof(unsigned int),
 131                 .mode           = 0644,
 132                 .proc_handler   = proc_dointvec_minmax,
 133                 .extra1         = &min_memreg,
 134                 .extra2         = &max_memreg,
 135         },
 136         {
 137                 .procname       = "rdma_pad_optimize",
 138                 .data           = &xprt_rdma_pad_optimize,
 139                 .maxlen         = sizeof(unsigned int),
 140                 .mode           = 0644,
 141                 .proc_handler   = proc_dointvec,
 142         },
 143         { },
 144 };
 145 
 146 static struct ctl_table sunrpc_table[] = {
 147         {
 148                 .procname       = "sunrpc",
 149                 .mode           = 0555,
 150                 .child          = xr_tunables_table
 151         },
 152         { },
 153 };
 154 
 155 #endif
 156 
 157 static const struct rpc_xprt_ops xprt_rdma_procs;
 158 
 159 static void
 160 xprt_rdma_format_addresses4(struct rpc_xprt *xprt, struct sockaddr *sap)
 161 {
 162         struct sockaddr_in *sin = (struct sockaddr_in *)sap;
 163         char buf[20];
 164 
 165         snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr));
 166         xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
 167 
 168         xprt->address_strings[RPC_DISPLAY_NETID] = RPCBIND_NETID_RDMA;
 169 }
 170 
 171 static void
 172 xprt_rdma_format_addresses6(struct rpc_xprt *xprt, struct sockaddr *sap)
 173 {
 174         struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)sap;
 175         char buf[40];
 176 
 177         snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr);
 178         xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
 179 
 180         xprt->address_strings[RPC_DISPLAY_NETID] = RPCBIND_NETID_RDMA6;
 181 }
 182 
 183 void
 184 xprt_rdma_format_addresses(struct rpc_xprt *xprt, struct sockaddr *sap)
 185 {
 186         char buf[128];
 187 
 188         switch (sap->sa_family) {
 189         case AF_INET:
 190                 xprt_rdma_format_addresses4(xprt, sap);
 191                 break;
 192         case AF_INET6:
 193                 xprt_rdma_format_addresses6(xprt, sap);
 194                 break;
 195         default:
 196                 pr_err("rpcrdma: Unrecognized address family\n");
 197                 return;
 198         }
 199 
 200         (void)rpc_ntop(sap, buf, sizeof(buf));
 201         xprt->address_strings[RPC_DISPLAY_ADDR] = kstrdup(buf, GFP_KERNEL);
 202 
 203         snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap));
 204         xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
 205 
 206         snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap));
 207         xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
 208 
 209         xprt->address_strings[RPC_DISPLAY_PROTO] = "rdma";
 210 }
 211 
 212 void
 213 xprt_rdma_free_addresses(struct rpc_xprt *xprt)
 214 {
 215         unsigned int i;
 216 
 217         for (i = 0; i < RPC_DISPLAY_MAX; i++)
 218                 switch (i) {
 219                 case RPC_DISPLAY_PROTO:
 220                 case RPC_DISPLAY_NETID:
 221                         continue;
 222                 default:
 223                         kfree(xprt->address_strings[i]);
 224                 }
 225 }
 226 
 227 /**
 228  * xprt_rdma_connect_worker - establish connection in the background
 229  * @work: worker thread context
 230  *
 231  * Requester holds the xprt's send lock to prevent activity on this
 232  * transport while a fresh connection is being established. RPC tasks
 233  * sleep on the xprt's pending queue waiting for connect to complete.
 234  */
 235 static void
 236 xprt_rdma_connect_worker(struct work_struct *work)
 237 {
 238         struct rpcrdma_xprt *r_xprt = container_of(work, struct rpcrdma_xprt,
 239                                                    rx_connect_worker.work);
 240         struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 241         int rc;
 242 
 243         rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia);
 244         xprt_clear_connecting(xprt);
 245         if (r_xprt->rx_ep.rep_connected > 0) {
 246                 if (!xprt_test_and_set_connected(xprt)) {
 247                         xprt->stat.connect_count++;
 248                         xprt->stat.connect_time += (long)jiffies -
 249                                                    xprt->stat.connect_start;
 250                         xprt_wake_pending_tasks(xprt, -EAGAIN);
 251                 }
 252         } else {
 253                 if (xprt_test_and_clear_connected(xprt))
 254                         xprt_wake_pending_tasks(xprt, rc);
 255         }
 256 }
 257 
 258 /**
 259  * xprt_rdma_inject_disconnect - inject a connection fault
 260  * @xprt: transport context
 261  *
 262  * If @xprt is connected, disconnect it to simulate spurious connection
 263  * loss.
 264  */
 265 static void
 266 xprt_rdma_inject_disconnect(struct rpc_xprt *xprt)
 267 {
 268         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 269 
 270         trace_xprtrdma_op_inject_dsc(r_xprt);
 271         rdma_disconnect(r_xprt->rx_ia.ri_id);
 272 }
 273 
 274 /**
 275  * xprt_rdma_destroy - Full tear down of transport
 276  * @xprt: doomed transport context
 277  *
 278  * Caller guarantees there will be no more calls to us with
 279  * this @xprt.
 280  */
 281 static void
 282 xprt_rdma_destroy(struct rpc_xprt *xprt)
 283 {
 284         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 285 
 286         trace_xprtrdma_op_destroy(r_xprt);
 287 
 288         cancel_delayed_work_sync(&r_xprt->rx_connect_worker);
 289 
 290         rpcrdma_ep_destroy(r_xprt);
 291         rpcrdma_buffer_destroy(&r_xprt->rx_buf);
 292         rpcrdma_ia_close(&r_xprt->rx_ia);
 293 
 294         xprt_rdma_free_addresses(xprt);
 295         xprt_free(xprt);
 296 
 297         module_put(THIS_MODULE);
 298 }
 299 
 300 /* 60 second timeout, no retries */
 301 static const struct rpc_timeout xprt_rdma_default_timeout = {
 302         .to_initval = 60 * HZ,
 303         .to_maxval = 60 * HZ,
 304 };
 305 
 306 /**
 307  * xprt_setup_rdma - Set up transport to use RDMA
 308  *
 309  * @args: rpc transport arguments
 310  */
 311 static struct rpc_xprt *
 312 xprt_setup_rdma(struct xprt_create *args)
 313 {
 314         struct rpc_xprt *xprt;
 315         struct rpcrdma_xprt *new_xprt;
 316         struct sockaddr *sap;
 317         int rc;
 318 
 319         if (args->addrlen > sizeof(xprt->addr))
 320                 return ERR_PTR(-EBADF);
 321 
 322         xprt = xprt_alloc(args->net, sizeof(struct rpcrdma_xprt), 0, 0);
 323         if (!xprt)
 324                 return ERR_PTR(-ENOMEM);
 325 
 326         xprt->timeout = &xprt_rdma_default_timeout;
 327         xprt->connect_timeout = xprt->timeout->to_initval;
 328         xprt->max_reconnect_timeout = xprt->timeout->to_maxval;
 329         xprt->bind_timeout = RPCRDMA_BIND_TO;
 330         xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
 331         xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;
 332 
 333         xprt->resvport = 0;             /* privileged port not needed */
 334         xprt->ops = &xprt_rdma_procs;
 335 
 336         /*
 337          * Set up RDMA-specific connect data.
 338          */
 339         sap = args->dstaddr;
 340 
 341         /* Ensure xprt->addr holds valid server TCP (not RDMA)
 342          * address, for any side protocols which peek at it */
 343         xprt->prot = IPPROTO_TCP;
 344         xprt->addrlen = args->addrlen;
 345         memcpy(&xprt->addr, sap, xprt->addrlen);
 346 
 347         if (rpc_get_port(sap))
 348                 xprt_set_bound(xprt);
 349         xprt_rdma_format_addresses(xprt, sap);
 350 
 351         new_xprt = rpcx_to_rdmax(xprt);
 352         rc = rpcrdma_ia_open(new_xprt);
 353         if (rc)
 354                 goto out1;
 355 
 356         rc = rpcrdma_ep_create(new_xprt);
 357         if (rc)
 358                 goto out2;
 359 
 360         rc = rpcrdma_buffer_create(new_xprt);
 361         if (rc)
 362                 goto out3;
 363 
 364         INIT_DELAYED_WORK(&new_xprt->rx_connect_worker,
 365                           xprt_rdma_connect_worker);
 366 
 367         xprt->max_payload = frwr_maxpages(new_xprt);
 368         if (xprt->max_payload == 0)
 369                 goto out4;
 370         xprt->max_payload <<= PAGE_SHIFT;
 371         dprintk("RPC:       %s: transport data payload maximum: %zu bytes\n",
 372                 __func__, xprt->max_payload);
 373 
 374         if (!try_module_get(THIS_MODULE))
 375                 goto out4;
 376 
 377         dprintk("RPC:       %s: %s:%s\n", __func__,
 378                 xprt->address_strings[RPC_DISPLAY_ADDR],
 379                 xprt->address_strings[RPC_DISPLAY_PORT]);
 380         trace_xprtrdma_create(new_xprt);
 381         return xprt;
 382 
 383 out4:
 384         rpcrdma_buffer_destroy(&new_xprt->rx_buf);
 385         rc = -ENODEV;
 386 out3:
 387         rpcrdma_ep_destroy(new_xprt);
 388 out2:
 389         rpcrdma_ia_close(&new_xprt->rx_ia);
 390 out1:
 391         trace_xprtrdma_op_destroy(new_xprt);
 392         xprt_rdma_free_addresses(xprt);
 393         xprt_free(xprt);
 394         return ERR_PTR(rc);
 395 }
 396 
 397 /**
 398  * xprt_rdma_close - close a transport connection
 399  * @xprt: transport context
 400  *
 401  * Called during autoclose or device removal.
 402  *
 403  * Caller holds @xprt's send lock to prevent activity on this
 404  * transport while the connection is torn down.
 405  */
 406 void xprt_rdma_close(struct rpc_xprt *xprt)
 407 {
 408         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 409         struct rpcrdma_ep *ep = &r_xprt->rx_ep;
 410         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 411 
 412         might_sleep();
 413 
 414         trace_xprtrdma_op_close(r_xprt);
 415 
 416         /* Prevent marshaling and sending of new requests */
 417         xprt_clear_connected(xprt);
 418 
 419         if (test_and_clear_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags)) {
 420                 rpcrdma_ia_remove(ia);
 421                 goto out;
 422         }
 423 
 424         if (ep->rep_connected == -ENODEV)
 425                 return;
 426         rpcrdma_ep_disconnect(ep, ia);
 427 
 428         /* Prepare @xprt for the next connection by reinitializing
 429          * its credit grant to one (see RFC 8166, Section 3.3.3).
 430          */
 431         spin_lock(&xprt->transport_lock);
 432         r_xprt->rx_buf.rb_credits = 1;
 433         xprt->cong = 0;
 434         xprt->cwnd = RPC_CWNDSHIFT;
 435         spin_unlock(&xprt->transport_lock);
 436 
 437 out:
 438         xprt->reestablish_timeout = 0;
 439         ++xprt->connect_cookie;
 440         xprt_disconnect_done(xprt);
 441 }
 442 
 443 /**
 444  * xprt_rdma_set_port - update server port with rpcbind result
 445  * @xprt: controlling RPC transport
 446  * @port: new port value
 447  *
 448  * Transport connect status is unchanged.
 449  */
 450 static void
 451 xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port)
 452 {
 453         struct sockaddr *sap = (struct sockaddr *)&xprt->addr;
 454         char buf[8];
 455 
 456         dprintk("RPC:       %s: setting port for xprt %p (%s:%s) to %u\n",
 457                 __func__, xprt,
 458                 xprt->address_strings[RPC_DISPLAY_ADDR],
 459                 xprt->address_strings[RPC_DISPLAY_PORT],
 460                 port);
 461 
 462         rpc_set_port(sap, port);
 463 
 464         kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
 465         snprintf(buf, sizeof(buf), "%u", port);
 466         xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
 467 
 468         kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]);
 469         snprintf(buf, sizeof(buf), "%4hx", port);
 470         xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
 471 }
 472 
 473 /**
 474  * xprt_rdma_timer - invoked when an RPC times out
 475  * @xprt: controlling RPC transport
 476  * @task: RPC task that timed out
 477  *
 478  * Invoked when the transport is still connected, but an RPC
 479  * retransmit timeout occurs.
 480  *
 481  * Since RDMA connections don't have a keep-alive, forcibly
 482  * disconnect and retry to connect. This drives full
 483  * detection of the network path, and retransmissions of
 484  * all pending RPCs.
 485  */
 486 static void
 487 xprt_rdma_timer(struct rpc_xprt *xprt, struct rpc_task *task)
 488 {
 489         xprt_force_disconnect(xprt);
 490 }
 491 
 492 /**
 493  * xprt_rdma_set_connect_timeout - set timeouts for establishing a connection
 494  * @xprt: controlling transport instance
 495  * @connect_timeout: reconnect timeout after client disconnects
 496  * @reconnect_timeout: reconnect timeout after server disconnects
 497  *
 498  */
 499 static void xprt_rdma_set_connect_timeout(struct rpc_xprt *xprt,
 500                                           unsigned long connect_timeout,
 501                                           unsigned long reconnect_timeout)
 502 {
 503         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 504 
 505         trace_xprtrdma_op_set_cto(r_xprt, connect_timeout, reconnect_timeout);
 506 
 507         spin_lock(&xprt->transport_lock);
 508 
 509         if (connect_timeout < xprt->connect_timeout) {
 510                 struct rpc_timeout to;
 511                 unsigned long initval;
 512 
 513                 to = *xprt->timeout;
 514                 initval = connect_timeout;
 515                 if (initval < RPCRDMA_INIT_REEST_TO << 1)
 516                         initval = RPCRDMA_INIT_REEST_TO << 1;
 517                 to.to_initval = initval;
 518                 to.to_maxval = initval;
 519                 r_xprt->rx_timeout = to;
 520                 xprt->timeout = &r_xprt->rx_timeout;
 521                 xprt->connect_timeout = connect_timeout;
 522         }
 523 
 524         if (reconnect_timeout < xprt->max_reconnect_timeout)
 525                 xprt->max_reconnect_timeout = reconnect_timeout;
 526 
 527         spin_unlock(&xprt->transport_lock);
 528 }
 529 
 530 /**
 531  * xprt_rdma_connect - schedule an attempt to reconnect
 532  * @xprt: transport state
 533  * @task: RPC scheduler context (unused)
 534  *
 535  */
 536 static void
 537 xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
 538 {
 539         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 540         unsigned long delay;
 541 
 542         trace_xprtrdma_op_connect(r_xprt);
 543 
 544         delay = 0;
 545         if (r_xprt->rx_ep.rep_connected != 0) {
 546                 delay = xprt_reconnect_delay(xprt);
 547                 xprt_reconnect_backoff(xprt, RPCRDMA_INIT_REEST_TO);
 548         }
 549         queue_delayed_work(xprtiod_workqueue, &r_xprt->rx_connect_worker,
 550                            delay);
 551 }
 552 
 553 /**
 554  * xprt_rdma_alloc_slot - allocate an rpc_rqst
 555  * @xprt: controlling RPC transport
 556  * @task: RPC task requesting a fresh rpc_rqst
 557  *
 558  * tk_status values:
 559  *      %0 if task->tk_rqstp points to a fresh rpc_rqst
 560  *      %-EAGAIN if no rpc_rqst is available; queued on backlog
 561  */
 562 static void
 563 xprt_rdma_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
 564 {
 565         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 566         struct rpcrdma_req *req;
 567 
 568         req = rpcrdma_buffer_get(&r_xprt->rx_buf);
 569         if (!req)
 570                 goto out_sleep;
 571         task->tk_rqstp = &req->rl_slot;
 572         task->tk_status = 0;
 573         return;
 574 
 575 out_sleep:
 576         set_bit(XPRT_CONGESTED, &xprt->state);
 577         rpc_sleep_on(&xprt->backlog, task, NULL);
 578         task->tk_status = -EAGAIN;
 579 }
 580 
 581 /**
 582  * xprt_rdma_free_slot - release an rpc_rqst
 583  * @xprt: controlling RPC transport
 584  * @rqst: rpc_rqst to release
 585  *
 586  */
 587 static void
 588 xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst)
 589 {
 590         struct rpcrdma_xprt *r_xprt =
 591                 container_of(xprt, struct rpcrdma_xprt, rx_xprt);
 592 
 593         memset(rqst, 0, sizeof(*rqst));
 594         rpcrdma_buffer_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
 595         if (unlikely(!rpc_wake_up_next(&xprt->backlog)))
 596                 clear_bit(XPRT_CONGESTED, &xprt->state);
 597 }
 598 
 599 static bool rpcrdma_check_regbuf(struct rpcrdma_xprt *r_xprt,
 600                                  struct rpcrdma_regbuf *rb, size_t size,
 601                                  gfp_t flags)
 602 {
 603         if (unlikely(rdmab_length(rb) < size)) {
 604                 if (!rpcrdma_regbuf_realloc(rb, size, flags))
 605                         return false;
 606                 r_xprt->rx_stats.hardway_register_count += size;
 607         }
 608         return true;
 609 }
 610 
 611 /**
 612  * xprt_rdma_allocate - allocate transport resources for an RPC
 613  * @task: RPC task
 614  *
 615  * Return values:
 616  *        0:    Success; rq_buffer points to RPC buffer to use
 617  *   ENOMEM:    Out of memory, call again later
 618  *      EIO:    A permanent error occurred, do not retry
 619  */
 620 static int
 621 xprt_rdma_allocate(struct rpc_task *task)
 622 {
 623         struct rpc_rqst *rqst = task->tk_rqstp;
 624         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
 625         struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 626         gfp_t flags;
 627 
 628         flags = RPCRDMA_DEF_GFP;
 629         if (RPC_IS_SWAPPER(task))
 630                 flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;
 631 
 632         if (!rpcrdma_check_regbuf(r_xprt, req->rl_sendbuf, rqst->rq_callsize,
 633                                   flags))
 634                 goto out_fail;
 635         if (!rpcrdma_check_regbuf(r_xprt, req->rl_recvbuf, rqst->rq_rcvsize,
 636                                   flags))
 637                 goto out_fail;
 638 
 639         rqst->rq_buffer = rdmab_data(req->rl_sendbuf);
 640         rqst->rq_rbuffer = rdmab_data(req->rl_recvbuf);
 641         trace_xprtrdma_op_allocate(task, req);
 642         return 0;
 643 
 644 out_fail:
 645         trace_xprtrdma_op_allocate(task, NULL);
 646         return -ENOMEM;
 647 }
 648 
 649 /**
 650  * xprt_rdma_free - release resources allocated by xprt_rdma_allocate
 651  * @task: RPC task
 652  *
 653  * Caller guarantees rqst->rq_buffer is non-NULL.
 654  */
 655 static void
 656 xprt_rdma_free(struct rpc_task *task)
 657 {
 658         struct rpc_rqst *rqst = task->tk_rqstp;
 659         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
 660         struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 661 
 662         trace_xprtrdma_op_free(task, req);
 663 
 664         if (!list_empty(&req->rl_registered))
 665                 frwr_unmap_sync(r_xprt, req);
 666 
 667         /* XXX: If the RPC is completing because of a signal and
 668          * not because a reply was received, we ought to ensure
 669          * that the Send completion has fired, so that memory
 670          * involved with the Send is not still visible to the NIC.
 671          */
 672 }
 673 
 674 /**
 675  * xprt_rdma_send_request - marshal and send an RPC request
 676  * @rqst: RPC message in rq_snd_buf
 677  *
 678  * Caller holds the transport's write lock.
 679  *
 680  * Returns:
 681  *      %0 if the RPC message has been sent
 682  *      %-ENOTCONN if the caller should reconnect and call again
 683  *      %-EAGAIN if the caller should call again
 684  *      %-ENOBUFS if the caller should call again after a delay
 685  *      %-EMSGSIZE if encoding ran out of buffer space. The request
 686  *              was not sent. Do not try to send this message again.
 687  *      %-EIO if an I/O error occurred. The request was not sent.
 688  *              Do not try to send this message again.
 689  */
 690 static int
 691 xprt_rdma_send_request(struct rpc_rqst *rqst)
 692 {
 693         struct rpc_xprt *xprt = rqst->rq_xprt;
 694         struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 695         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 696         int rc = 0;
 697 
 698 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
 699         if (unlikely(!rqst->rq_buffer))
 700                 return xprt_rdma_bc_send_reply(rqst);
 701 #endif  /* CONFIG_SUNRPC_BACKCHANNEL */
 702 
 703         if (!xprt_connected(xprt))
 704                 return -ENOTCONN;
 705 
 706         if (!xprt_request_get_cong(xprt, rqst))
 707                 return -EBADSLT;
 708 
 709         rc = rpcrdma_marshal_req(r_xprt, rqst);
 710         if (rc < 0)
 711                 goto failed_marshal;
 712 
 713         /* Must suppress retransmit to maintain credits */
 714         if (rqst->rq_connect_cookie == xprt->connect_cookie)
 715                 goto drop_connection;
 716         rqst->rq_xtime = ktime_get();
 717 
 718         if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
 719                 goto drop_connection;
 720 
 721         rqst->rq_xmit_bytes_sent += rqst->rq_snd_buf.len;
 722 
 723         /* An RPC with no reply will throw off credit accounting,
 724          * so drop the connection to reset the credit grant.
 725          */
 726         if (!rpc_reply_expected(rqst->rq_task))
 727                 goto drop_connection;
 728         return 0;
 729 
 730 failed_marshal:
 731         if (rc != -ENOTCONN)
 732                 return rc;
 733 drop_connection:
 734         xprt_rdma_close(xprt);
 735         return -ENOTCONN;
 736 }
 737 
 738 void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
 739 {
 740         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 741         long idle_time = 0;
 742 
 743         if (xprt_connected(xprt))
 744                 idle_time = (long)(jiffies - xprt->last_used) / HZ;
 745 
 746         seq_puts(seq, "\txprt:\trdma ");
 747         seq_printf(seq, "%u %lu %lu %lu %ld %lu %lu %lu %llu %llu ",
 748                    0,   /* need a local port? */
 749                    xprt->stat.bind_count,
 750                    xprt->stat.connect_count,
 751                    xprt->stat.connect_time / HZ,
 752                    idle_time,
 753                    xprt->stat.sends,
 754                    xprt->stat.recvs,
 755                    xprt->stat.bad_xids,
 756                    xprt->stat.req_u,
 757                    xprt->stat.bklog_u);
 758         seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %lu %lu %lu %lu ",
 759                    r_xprt->rx_stats.read_chunk_count,
 760                    r_xprt->rx_stats.write_chunk_count,
 761                    r_xprt->rx_stats.reply_chunk_count,
 762                    r_xprt->rx_stats.total_rdma_request,
 763                    r_xprt->rx_stats.total_rdma_reply,
 764                    r_xprt->rx_stats.pullup_copy_count,
 765                    r_xprt->rx_stats.fixup_copy_count,
 766                    r_xprt->rx_stats.hardway_register_count,
 767                    r_xprt->rx_stats.failed_marshal_count,
 768                    r_xprt->rx_stats.bad_reply_count,
 769                    r_xprt->rx_stats.nomsg_call_count);
 770         seq_printf(seq, "%lu %lu %lu %lu %lu %lu\n",
 771                    r_xprt->rx_stats.mrs_recycled,
 772                    r_xprt->rx_stats.mrs_orphaned,
 773                    r_xprt->rx_stats.mrs_allocated,
 774                    r_xprt->rx_stats.local_inv_needed,
 775                    r_xprt->rx_stats.empty_sendctx_q,
 776                    r_xprt->rx_stats.reply_waits_for_send);
 777 }
 778 
 779 static int
 780 xprt_rdma_enable_swap(struct rpc_xprt *xprt)
 781 {
 782         return 0;
 783 }
 784 
 785 static void
 786 xprt_rdma_disable_swap(struct rpc_xprt *xprt)
 787 {
 788 }
 789 
 790 /*
 791  * Plumbing for rpc transport switch and kernel module
 792  */
 793 
 794 static const struct rpc_xprt_ops xprt_rdma_procs = {
 795         .reserve_xprt           = xprt_reserve_xprt_cong,
 796         .release_xprt           = xprt_release_xprt_cong, /* sunrpc/xprt.c */
 797         .alloc_slot             = xprt_rdma_alloc_slot,
 798         .free_slot              = xprt_rdma_free_slot,
 799         .release_request        = xprt_release_rqst_cong,       /* ditto */
 800         .wait_for_reply_request = xprt_wait_for_reply_request_def, /* ditto */
 801         .timer                  = xprt_rdma_timer,
 802         .rpcbind                = rpcb_getport_async,   /* sunrpc/rpcb_clnt.c */
 803         .set_port               = xprt_rdma_set_port,
 804         .connect                = xprt_rdma_connect,
 805         .buf_alloc              = xprt_rdma_allocate,
 806         .buf_free               = xprt_rdma_free,
 807         .send_request           = xprt_rdma_send_request,
 808         .close                  = xprt_rdma_close,
 809         .destroy                = xprt_rdma_destroy,
 810         .set_connect_timeout    = xprt_rdma_set_connect_timeout,
 811         .print_stats            = xprt_rdma_print_stats,
 812         .enable_swap            = xprt_rdma_enable_swap,
 813         .disable_swap           = xprt_rdma_disable_swap,
 814         .inject_disconnect      = xprt_rdma_inject_disconnect,
 815 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
 816         .bc_setup               = xprt_rdma_bc_setup,
 817         .bc_maxpayload          = xprt_rdma_bc_maxpayload,
 818         .bc_num_slots           = xprt_rdma_bc_max_slots,
 819         .bc_free_rqst           = xprt_rdma_bc_free_rqst,
 820         .bc_destroy             = xprt_rdma_bc_destroy,
 821 #endif
 822 };
 823 
 824 static struct xprt_class xprt_rdma = {
 825         .list                   = LIST_HEAD_INIT(xprt_rdma.list),
 826         .name                   = "rdma",
 827         .owner                  = THIS_MODULE,
 828         .ident                  = XPRT_TRANSPORT_RDMA,
 829         .setup                  = xprt_setup_rdma,
 830 };
 831 
 832 void xprt_rdma_cleanup(void)
 833 {
 834 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 835         if (sunrpc_table_header) {
 836                 unregister_sysctl_table(sunrpc_table_header);
 837                 sunrpc_table_header = NULL;
 838         }
 839 #endif
 840 
 841         xprt_unregister_transport(&xprt_rdma);
 842         xprt_unregister_transport(&xprt_rdma_bc);
 843 }
 844 
 845 int xprt_rdma_init(void)
 846 {
 847         int rc;
 848 
 849         rc = xprt_register_transport(&xprt_rdma);
 850         if (rc)
 851                 return rc;
 852 
 853         rc = xprt_register_transport(&xprt_rdma_bc);
 854         if (rc) {
 855                 xprt_unregister_transport(&xprt_rdma);
 856                 return rc;
 857         }
 858 
 859 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 860         if (!sunrpc_table_header)
 861                 sunrpc_table_header = register_sysctl_table(sunrpc_table);
 862 #endif
 863         return 0;
 864 }

/* [<][>][^][v][top][bottom][index][help] */