root/net/rds/threads.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. rds_connect_path_complete
  2. rds_connect_complete
  3. rds_queue_reconnect
  4. rds_connect_worker
  5. rds_send_worker
  6. rds_recv_worker
  7. rds_shutdown_worker
  8. rds_threads_exit
  9. rds_threads_init
  10. rds_addr_cmp

   1 /*
   2  * Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved.
   3  *
   4  * This software is available to you under a choice of one of two
   5  * licenses.  You may choose to be licensed under the terms of the GNU
   6  * General Public License (GPL) Version 2, available from the file
   7  * COPYING in the main directory of this source tree, or the
   8  * OpenIB.org BSD license below:
   9  *
  10  *     Redistribution and use in source and binary forms, with or
  11  *     without modification, are permitted provided that the following
  12  *     conditions are met:
  13  *
  14  *      - Redistributions of source code must retain the above
  15  *        copyright notice, this list of conditions and the following
  16  *        disclaimer.
  17  *
  18  *      - Redistributions in binary form must reproduce the above
  19  *        copyright notice, this list of conditions and the following
  20  *        disclaimer in the documentation and/or other materials
  21  *        provided with the distribution.
  22  *
  23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  24  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  25  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  26  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  27  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  28  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  29  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  30  * SOFTWARE.
  31  *
  32  */
  33 #include <linux/kernel.h>
  34 #include <linux/random.h>
  35 #include <linux/export.h>
  36 
  37 #include "rds.h"
  38 
  39 /*
  40  * All of connection management is simplified by serializing it through
  41  * work queues that execute in a connection managing thread.
  42  *
  43  * TCP wants to send acks through sendpage() in response to data_ready(),
  44  * but it needs a process context to do so.
  45  *
  46  * The receive paths need to allocate but can't drop packets (!) so we have
  47  * a thread around to block allocating if the receive fast path sees an
  48  * allocation failure.
  49  */
  50 
  51 /* Grand Unified Theory of connection life cycle:
  52  * At any point in time, the connection can be in one of these states:
  53  * DOWN, CONNECTING, UP, DISCONNECTING, ERROR
  54  *
  55  * The following transitions are possible:
  56  *  ANY           -> ERROR
  57  *  UP            -> DISCONNECTING
  58  *  ERROR         -> DISCONNECTING
  59  *  DISCONNECTING -> DOWN
  60  *  DOWN          -> CONNECTING
  61  *  CONNECTING    -> UP
  62  *
  63  * Transition to state DISCONNECTING/DOWN:
  64  *  -   Inside the shutdown worker; synchronizes with xmit path
  65  *      through RDS_IN_XMIT, and with connection management callbacks
  66  *      via c_cm_lock.
  67  *
  68  *      For receive callbacks, we rely on the underlying transport
  69  *      (TCP, IB/RDMA) to provide the necessary synchronisation.
  70  */
  71 struct workqueue_struct *rds_wq;
  72 EXPORT_SYMBOL_GPL(rds_wq);
  73 
  74 void rds_connect_path_complete(struct rds_conn_path *cp, int curr)
  75 {
  76         if (!rds_conn_path_transition(cp, curr, RDS_CONN_UP)) {
  77                 printk(KERN_WARNING "%s: Cannot transition to state UP, "
  78                                 "current state is %d\n",
  79                                 __func__,
  80                                 atomic_read(&cp->cp_state));
  81                 rds_conn_path_drop(cp, false);
  82                 return;
  83         }
  84 
  85         rdsdebug("conn %p for %pI6c to %pI6c complete\n",
  86                  cp->cp_conn, &cp->cp_conn->c_laddr, &cp->cp_conn->c_faddr);
  87 
  88         cp->cp_reconnect_jiffies = 0;
  89         set_bit(0, &cp->cp_conn->c_map_queued);
  90         rcu_read_lock();
  91         if (!rds_destroy_pending(cp->cp_conn)) {
  92                 queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
  93                 queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
  94         }
  95         rcu_read_unlock();
  96         cp->cp_conn->c_proposed_version = RDS_PROTOCOL_VERSION;
  97 }
  98 EXPORT_SYMBOL_GPL(rds_connect_path_complete);
  99 
 100 void rds_connect_complete(struct rds_connection *conn)
 101 {
 102         rds_connect_path_complete(&conn->c_path[0], RDS_CONN_CONNECTING);
 103 }
 104 EXPORT_SYMBOL_GPL(rds_connect_complete);
 105 
 106 /*
 107  * This random exponential backoff is relied on to eventually resolve racing
 108  * connects.
 109  *
 110  * If connect attempts race then both parties drop both connections and come
 111  * here to wait for a random amount of time before trying again.  Eventually
 112  * the backoff range will be so much greater than the time it takes to
 113  * establish a connection that one of the pair will establish the connection
 114  * before the other's random delay fires.
 115  *
 116  * Connection attempts that arrive while a connection is already established
 117  * are also considered to be racing connects.  This lets a connection from
 118  * a rebooted machine replace an existing stale connection before the transport
 119  * notices that the connection has failed.
 120  *
 121  * We should *always* start with a random backoff; otherwise a broken connection
 122  * will always take several iterations to be re-established.
 123  */
 124 void rds_queue_reconnect(struct rds_conn_path *cp)
 125 {
 126         unsigned long rand;
 127         struct rds_connection *conn = cp->cp_conn;
 128 
 129         rdsdebug("conn %p for %pI6c to %pI6c reconnect jiffies %lu\n",
 130                  conn, &conn->c_laddr, &conn->c_faddr,
 131                  cp->cp_reconnect_jiffies);
 132 
 133         /* let peer with smaller addr initiate reconnect, to avoid duels */
 134         if (conn->c_trans->t_type == RDS_TRANS_TCP &&
 135             rds_addr_cmp(&conn->c_laddr, &conn->c_faddr) >= 0)
 136                 return;
 137 
 138         set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags);
 139         if (cp->cp_reconnect_jiffies == 0) {
 140                 cp->cp_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies;
 141                 rcu_read_lock();
 142                 if (!rds_destroy_pending(cp->cp_conn))
 143                         queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
 144                 rcu_read_unlock();
 145                 return;
 146         }
 147 
 148         get_random_bytes(&rand, sizeof(rand));
 149         rdsdebug("%lu delay %lu ceil conn %p for %pI6c -> %pI6c\n",
 150                  rand % cp->cp_reconnect_jiffies, cp->cp_reconnect_jiffies,
 151                  conn, &conn->c_laddr, &conn->c_faddr);
 152         rcu_read_lock();
 153         if (!rds_destroy_pending(cp->cp_conn))
 154                 queue_delayed_work(rds_wq, &cp->cp_conn_w,
 155                                    rand % cp->cp_reconnect_jiffies);
 156         rcu_read_unlock();
 157 
 158         cp->cp_reconnect_jiffies = min(cp->cp_reconnect_jiffies * 2,
 159                                         rds_sysctl_reconnect_max_jiffies);
 160 }
 161 
 162 void rds_connect_worker(struct work_struct *work)
 163 {
 164         struct rds_conn_path *cp = container_of(work,
 165                                                 struct rds_conn_path,
 166                                                 cp_conn_w.work);
 167         struct rds_connection *conn = cp->cp_conn;
 168         int ret;
 169 
 170         if (cp->cp_index > 0 &&
 171             rds_addr_cmp(&cp->cp_conn->c_laddr, &cp->cp_conn->c_faddr) >= 0)
 172                 return;
 173         clear_bit(RDS_RECONNECT_PENDING, &cp->cp_flags);
 174         ret = rds_conn_path_transition(cp, RDS_CONN_DOWN, RDS_CONN_CONNECTING);
 175         if (ret) {
 176                 ret = conn->c_trans->conn_path_connect(cp);
 177                 rdsdebug("conn %p for %pI6c to %pI6c dispatched, ret %d\n",
 178                          conn, &conn->c_laddr, &conn->c_faddr, ret);
 179 
 180                 if (ret) {
 181                         if (rds_conn_path_transition(cp,
 182                                                      RDS_CONN_CONNECTING,
 183                                                      RDS_CONN_DOWN))
 184                                 rds_queue_reconnect(cp);
 185                         else
 186                                 rds_conn_path_error(cp, "connect failed\n");
 187                 }
 188         }
 189 }
 190 
 191 void rds_send_worker(struct work_struct *work)
 192 {
 193         struct rds_conn_path *cp = container_of(work,
 194                                                 struct rds_conn_path,
 195                                                 cp_send_w.work);
 196         int ret;
 197 
 198         if (rds_conn_path_state(cp) == RDS_CONN_UP) {
 199                 clear_bit(RDS_LL_SEND_FULL, &cp->cp_flags);
 200                 ret = rds_send_xmit(cp);
 201                 cond_resched();
 202                 rdsdebug("conn %p ret %d\n", cp->cp_conn, ret);
 203                 switch (ret) {
 204                 case -EAGAIN:
 205                         rds_stats_inc(s_send_immediate_retry);
 206                         queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
 207                         break;
 208                 case -ENOMEM:
 209                         rds_stats_inc(s_send_delayed_retry);
 210                         queue_delayed_work(rds_wq, &cp->cp_send_w, 2);
 211                 default:
 212                         break;
 213                 }
 214         }
 215 }
 216 
 217 void rds_recv_worker(struct work_struct *work)
 218 {
 219         struct rds_conn_path *cp = container_of(work,
 220                                                 struct rds_conn_path,
 221                                                 cp_recv_w.work);
 222         int ret;
 223 
 224         if (rds_conn_path_state(cp) == RDS_CONN_UP) {
 225                 ret = cp->cp_conn->c_trans->recv_path(cp);
 226                 rdsdebug("conn %p ret %d\n", cp->cp_conn, ret);
 227                 switch (ret) {
 228                 case -EAGAIN:
 229                         rds_stats_inc(s_recv_immediate_retry);
 230                         queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
 231                         break;
 232                 case -ENOMEM:
 233                         rds_stats_inc(s_recv_delayed_retry);
 234                         queue_delayed_work(rds_wq, &cp->cp_recv_w, 2);
 235                 default:
 236                         break;
 237                 }
 238         }
 239 }
 240 
 241 void rds_shutdown_worker(struct work_struct *work)
 242 {
 243         struct rds_conn_path *cp = container_of(work,
 244                                                 struct rds_conn_path,
 245                                                 cp_down_w);
 246 
 247         rds_conn_shutdown(cp);
 248 }
 249 
 250 void rds_threads_exit(void)
 251 {
 252         destroy_workqueue(rds_wq);
 253 }
 254 
 255 int rds_threads_init(void)
 256 {
 257         rds_wq = create_singlethread_workqueue("krdsd");
 258         if (!rds_wq)
 259                 return -ENOMEM;
 260 
 261         return 0;
 262 }
 263 
 264 /* Compare two IPv6 addresses.  Return 0 if the two addresses are equal.
 265  * Return 1 if the first is greater.  Return -1 if the second is greater.
 266  */
 267 int rds_addr_cmp(const struct in6_addr *addr1,
 268                  const struct in6_addr *addr2)
 269 {
 270 #if defined(CONFIG_HAVE_EFFICIENT_UNALIGNED_ACCESS) && BITS_PER_LONG == 64
 271         const __be64 *a1, *a2;
 272         u64 x, y;
 273 
 274         a1 = (__be64 *)addr1;
 275         a2 = (__be64 *)addr2;
 276 
 277         if (*a1 != *a2) {
 278                 if (be64_to_cpu(*a1) < be64_to_cpu(*a2))
 279                         return -1;
 280                 else
 281                         return 1;
 282         } else {
 283                 x = be64_to_cpu(*++a1);
 284                 y = be64_to_cpu(*++a2);
 285                 if (x < y)
 286                         return -1;
 287                 else if (x > y)
 288                         return 1;
 289                 else
 290                         return 0;
 291         }
 292 #else
 293         u32 a, b;
 294         int i;
 295 
 296         for (i = 0; i < 4; i++) {
 297                 if (addr1->s6_addr32[i] != addr2->s6_addr32[i]) {
 298                         a = ntohl(addr1->s6_addr32[i]);
 299                         b = ntohl(addr2->s6_addr32[i]);
 300                         if (a < b)
 301                                 return -1;
 302                         else if (a > b)
 303                                 return 1;
 304                 }
 305         }
 306         return 0;
 307 #endif
 308 }
 309 EXPORT_SYMBOL_GPL(rds_addr_cmp);

/* [<][>][^][v][top][bottom][index][help] */