This source file includes following definitions.
- rds_connect_path_complete
- rds_connect_complete
- rds_queue_reconnect
- rds_connect_worker
- rds_send_worker
- rds_recv_worker
- rds_shutdown_worker
- rds_threads_exit
- rds_threads_init
- rds_addr_cmp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 #include <linux/kernel.h>
34 #include <linux/random.h>
35 #include <linux/export.h>
36
37 #include "rds.h"
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71 struct workqueue_struct *rds_wq;
72 EXPORT_SYMBOL_GPL(rds_wq);
73
74 void rds_connect_path_complete(struct rds_conn_path *cp, int curr)
75 {
76 if (!rds_conn_path_transition(cp, curr, RDS_CONN_UP)) {
77 printk(KERN_WARNING "%s: Cannot transition to state UP, "
78 "current state is %d\n",
79 __func__,
80 atomic_read(&cp->cp_state));
81 rds_conn_path_drop(cp, false);
82 return;
83 }
84
85 rdsdebug("conn %p for %pI6c to %pI6c complete\n",
86 cp->cp_conn, &cp->cp_conn->c_laddr, &cp->cp_conn->c_faddr);
87
88 cp->cp_reconnect_jiffies = 0;
89 set_bit(0, &cp->cp_conn->c_map_queued);
90 rcu_read_lock();
91 if (!rds_destroy_pending(cp->cp_conn)) {
92 queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
93 queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
94 }
95 rcu_read_unlock();
96 cp->cp_conn->c_proposed_version = RDS_PROTOCOL_VERSION;
97 }
98 EXPORT_SYMBOL_GPL(rds_connect_path_complete);
99
100 void rds_connect_complete(struct rds_connection *conn)
101 {
102 rds_connect_path_complete(&conn->c_path[0], RDS_CONN_CONNECTING);
103 }
104 EXPORT_SYMBOL_GPL(rds_connect_complete);
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124 void rds_queue_reconnect(struct rds_conn_path *cp)
125 {
126 unsigned long rand;
127 struct rds_connection *conn = cp->cp_conn;
128
129 rdsdebug("conn %p for %pI6c to %pI6c reconnect jiffies %lu\n",
130 conn, &conn->c_laddr, &conn->c_faddr,
131 cp->cp_reconnect_jiffies);
132
133
134 if (conn->c_trans->t_type == RDS_TRANS_TCP &&
135 rds_addr_cmp(&conn->c_laddr, &conn->c_faddr) >= 0)
136 return;
137
138 set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags);
139 if (cp->cp_reconnect_jiffies == 0) {
140 cp->cp_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies;
141 rcu_read_lock();
142 if (!rds_destroy_pending(cp->cp_conn))
143 queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
144 rcu_read_unlock();
145 return;
146 }
147
148 get_random_bytes(&rand, sizeof(rand));
149 rdsdebug("%lu delay %lu ceil conn %p for %pI6c -> %pI6c\n",
150 rand % cp->cp_reconnect_jiffies, cp->cp_reconnect_jiffies,
151 conn, &conn->c_laddr, &conn->c_faddr);
152 rcu_read_lock();
153 if (!rds_destroy_pending(cp->cp_conn))
154 queue_delayed_work(rds_wq, &cp->cp_conn_w,
155 rand % cp->cp_reconnect_jiffies);
156 rcu_read_unlock();
157
158 cp->cp_reconnect_jiffies = min(cp->cp_reconnect_jiffies * 2,
159 rds_sysctl_reconnect_max_jiffies);
160 }
161
162 void rds_connect_worker(struct work_struct *work)
163 {
164 struct rds_conn_path *cp = container_of(work,
165 struct rds_conn_path,
166 cp_conn_w.work);
167 struct rds_connection *conn = cp->cp_conn;
168 int ret;
169
170 if (cp->cp_index > 0 &&
171 rds_addr_cmp(&cp->cp_conn->c_laddr, &cp->cp_conn->c_faddr) >= 0)
172 return;
173 clear_bit(RDS_RECONNECT_PENDING, &cp->cp_flags);
174 ret = rds_conn_path_transition(cp, RDS_CONN_DOWN, RDS_CONN_CONNECTING);
175 if (ret) {
176 ret = conn->c_trans->conn_path_connect(cp);
177 rdsdebug("conn %p for %pI6c to %pI6c dispatched, ret %d\n",
178 conn, &conn->c_laddr, &conn->c_faddr, ret);
179
180 if (ret) {
181 if (rds_conn_path_transition(cp,
182 RDS_CONN_CONNECTING,
183 RDS_CONN_DOWN))
184 rds_queue_reconnect(cp);
185 else
186 rds_conn_path_error(cp, "connect failed\n");
187 }
188 }
189 }
190
191 void rds_send_worker(struct work_struct *work)
192 {
193 struct rds_conn_path *cp = container_of(work,
194 struct rds_conn_path,
195 cp_send_w.work);
196 int ret;
197
198 if (rds_conn_path_state(cp) == RDS_CONN_UP) {
199 clear_bit(RDS_LL_SEND_FULL, &cp->cp_flags);
200 ret = rds_send_xmit(cp);
201 cond_resched();
202 rdsdebug("conn %p ret %d\n", cp->cp_conn, ret);
203 switch (ret) {
204 case -EAGAIN:
205 rds_stats_inc(s_send_immediate_retry);
206 queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
207 break;
208 case -ENOMEM:
209 rds_stats_inc(s_send_delayed_retry);
210 queue_delayed_work(rds_wq, &cp->cp_send_w, 2);
211 default:
212 break;
213 }
214 }
215 }
216
217 void rds_recv_worker(struct work_struct *work)
218 {
219 struct rds_conn_path *cp = container_of(work,
220 struct rds_conn_path,
221 cp_recv_w.work);
222 int ret;
223
224 if (rds_conn_path_state(cp) == RDS_CONN_UP) {
225 ret = cp->cp_conn->c_trans->recv_path(cp);
226 rdsdebug("conn %p ret %d\n", cp->cp_conn, ret);
227 switch (ret) {
228 case -EAGAIN:
229 rds_stats_inc(s_recv_immediate_retry);
230 queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
231 break;
232 case -ENOMEM:
233 rds_stats_inc(s_recv_delayed_retry);
234 queue_delayed_work(rds_wq, &cp->cp_recv_w, 2);
235 default:
236 break;
237 }
238 }
239 }
240
241 void rds_shutdown_worker(struct work_struct *work)
242 {
243 struct rds_conn_path *cp = container_of(work,
244 struct rds_conn_path,
245 cp_down_w);
246
247 rds_conn_shutdown(cp);
248 }
249
250 void rds_threads_exit(void)
251 {
252 destroy_workqueue(rds_wq);
253 }
254
255 int rds_threads_init(void)
256 {
257 rds_wq = create_singlethread_workqueue("krdsd");
258 if (!rds_wq)
259 return -ENOMEM;
260
261 return 0;
262 }
263
264
265
266
267 int rds_addr_cmp(const struct in6_addr *addr1,
268 const struct in6_addr *addr2)
269 {
270 #if defined(CONFIG_HAVE_EFFICIENT_UNALIGNED_ACCESS) && BITS_PER_LONG == 64
271 const __be64 *a1, *a2;
272 u64 x, y;
273
274 a1 = (__be64 *)addr1;
275 a2 = (__be64 *)addr2;
276
277 if (*a1 != *a2) {
278 if (be64_to_cpu(*a1) < be64_to_cpu(*a2))
279 return -1;
280 else
281 return 1;
282 } else {
283 x = be64_to_cpu(*++a1);
284 y = be64_to_cpu(*++a2);
285 if (x < y)
286 return -1;
287 else if (x > y)
288 return 1;
289 else
290 return 0;
291 }
292 #else
293 u32 a, b;
294 int i;
295
296 for (i = 0; i < 4; i++) {
297 if (addr1->s6_addr32[i] != addr2->s6_addr32[i]) {
298 a = ntohl(addr1->s6_addr32[i]);
299 b = ntohl(addr2->s6_addr32[i]);
300 if (a < b)
301 return -1;
302 else if (a > b)
303 return 1;
304 }
305 }
306 return 0;
307 #endif
308 }
309 EXPORT_SYMBOL_GPL(rds_addr_cmp);