1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/o2iblnd/o2iblnd_cb.c
37  *
38  * Author: Eric Barton <eric@bartonsoftware.com>
39  */
40 
41 #include "o2iblnd.h"
42 
43 static void kiblnd_unmap_tx(lnet_ni_t *ni, kib_tx_t *tx);
44 
45 static void
kiblnd_tx_done(lnet_ni_t * ni,kib_tx_t * tx)46 kiblnd_tx_done(lnet_ni_t *ni, kib_tx_t *tx)
47 {
48 	lnet_msg_t *lntmsg[2];
49 	kib_net_t *net = ni->ni_data;
50 	int rc;
51 	int i;
52 
53 	LASSERT(net != NULL);
54 	LASSERT(!in_interrupt());
55 	LASSERT(!tx->tx_queued);	       /* mustn't be queued for sending */
56 	LASSERT(tx->tx_sending == 0);	  /* mustn't be awaiting sent callback */
57 	LASSERT(!tx->tx_waiting);	      /* mustn't be awaiting peer response */
58 	LASSERT(tx->tx_pool != NULL);
59 
60 	kiblnd_unmap_tx(ni, tx);
61 
62 	/* tx may have up to 2 lnet msgs to finalise */
63 	lntmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
64 	lntmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
65 	rc = tx->tx_status;
66 
67 	if (tx->tx_conn != NULL) {
68 		LASSERT(ni == tx->tx_conn->ibc_peer->ibp_ni);
69 
70 		kiblnd_conn_decref(tx->tx_conn);
71 		tx->tx_conn = NULL;
72 	}
73 
74 	tx->tx_nwrq = 0;
75 	tx->tx_status = 0;
76 
77 	kiblnd_pool_free_node(&tx->tx_pool->tpo_pool, &tx->tx_list);
78 
79 	/* delay finalize until my descs have been freed */
80 	for (i = 0; i < 2; i++) {
81 		if (lntmsg[i] == NULL)
82 			continue;
83 
84 		lnet_finalize(ni, lntmsg[i], rc);
85 	}
86 }
87 
88 void
kiblnd_txlist_done(lnet_ni_t * ni,struct list_head * txlist,int status)89 kiblnd_txlist_done(lnet_ni_t *ni, struct list_head *txlist, int status)
90 {
91 	kib_tx_t *tx;
92 
93 	while (!list_empty(txlist)) {
94 		tx = list_entry(txlist->next, kib_tx_t, tx_list);
95 
96 		list_del(&tx->tx_list);
97 		/* complete now */
98 		tx->tx_waiting = 0;
99 		tx->tx_status = status;
100 		kiblnd_tx_done(ni, tx);
101 	}
102 }
103 
104 static kib_tx_t *
kiblnd_get_idle_tx(lnet_ni_t * ni,lnet_nid_t target)105 kiblnd_get_idle_tx(lnet_ni_t *ni, lnet_nid_t target)
106 {
107 	kib_net_t *net = (kib_net_t *)ni->ni_data;
108 	struct list_head *node;
109 	kib_tx_t *tx;
110 	kib_tx_poolset_t *tps;
111 
112 	tps = net->ibn_tx_ps[lnet_cpt_of_nid(target)];
113 	node = kiblnd_pool_alloc_node(&tps->tps_poolset);
114 	if (node == NULL)
115 		return NULL;
116 	tx = container_of(node, kib_tx_t, tx_list);
117 
118 	LASSERT(tx->tx_nwrq == 0);
119 	LASSERT(!tx->tx_queued);
120 	LASSERT(tx->tx_sending == 0);
121 	LASSERT(!tx->tx_waiting);
122 	LASSERT(tx->tx_status == 0);
123 	LASSERT(tx->tx_conn == NULL);
124 	LASSERT(tx->tx_lntmsg[0] == NULL);
125 	LASSERT(tx->tx_lntmsg[1] == NULL);
126 	LASSERT(tx->tx_nfrags == 0);
127 
128 	return tx;
129 }
130 
131 static void
kiblnd_drop_rx(kib_rx_t * rx)132 kiblnd_drop_rx(kib_rx_t *rx)
133 {
134 	kib_conn_t *conn = rx->rx_conn;
135 	struct kib_sched_info *sched = conn->ibc_sched;
136 	unsigned long flags;
137 
138 	spin_lock_irqsave(&sched->ibs_lock, flags);
139 	LASSERT(conn->ibc_nrx > 0);
140 	conn->ibc_nrx--;
141 	spin_unlock_irqrestore(&sched->ibs_lock, flags);
142 
143 	kiblnd_conn_decref(conn);
144 }
145 
146 int
kiblnd_post_rx(kib_rx_t * rx,int credit)147 kiblnd_post_rx(kib_rx_t *rx, int credit)
148 {
149 	kib_conn_t *conn = rx->rx_conn;
150 	kib_net_t *net = conn->ibc_peer->ibp_ni->ni_data;
151 	struct ib_recv_wr *bad_wrq = NULL;
152 	struct ib_mr *mr;
153 	int rc;
154 
155 	LASSERT(net != NULL);
156 	LASSERT(!in_interrupt());
157 	LASSERT(credit == IBLND_POSTRX_NO_CREDIT ||
158 		credit == IBLND_POSTRX_PEER_CREDIT ||
159 		credit == IBLND_POSTRX_RSRVD_CREDIT);
160 
161 	mr = kiblnd_find_dma_mr(conn->ibc_hdev, rx->rx_msgaddr, IBLND_MSG_SIZE);
162 	LASSERT(mr != NULL);
163 
164 	rx->rx_sge.lkey   = mr->lkey;
165 	rx->rx_sge.addr   = rx->rx_msgaddr;
166 	rx->rx_sge.length = IBLND_MSG_SIZE;
167 
168 	rx->rx_wrq.next    = NULL;
169 	rx->rx_wrq.sg_list = &rx->rx_sge;
170 	rx->rx_wrq.num_sge = 1;
171 	rx->rx_wrq.wr_id   = kiblnd_ptr2wreqid(rx, IBLND_WID_RX);
172 
173 	LASSERT(conn->ibc_state >= IBLND_CONN_INIT);
174 	LASSERT(rx->rx_nob >= 0);	      /* not posted */
175 
176 	if (conn->ibc_state > IBLND_CONN_ESTABLISHED) {
177 		kiblnd_drop_rx(rx);	     /* No more posts for this rx */
178 		return 0;
179 	}
180 
181 	rx->rx_nob = -1;			/* flag posted */
182 
183 	/* NB: need an extra reference after ib_post_recv because we don't
184 	 * own this rx (and rx::rx_conn) anymore, LU-5678.
185 	 */
186 	kiblnd_conn_addref(conn);
187 	rc = ib_post_recv(conn->ibc_cmid->qp, &rx->rx_wrq, &bad_wrq);
188 	if (unlikely(rc != 0)) {
189 		CERROR("Can't post rx for %s: %d, bad_wrq: %p\n",
190 		       libcfs_nid2str(conn->ibc_peer->ibp_nid), rc, bad_wrq);
191 		rx->rx_nob = 0;
192 	}
193 
194 	if (conn->ibc_state < IBLND_CONN_ESTABLISHED) /* Initial post */
195 		goto out;
196 
197 	if (unlikely(rc != 0)) {
198 		kiblnd_close_conn(conn, rc);
199 		kiblnd_drop_rx(rx);	     /* No more posts for this rx */
200 		goto out;
201 	}
202 
203 	if (credit == IBLND_POSTRX_NO_CREDIT)
204 		goto out;
205 
206 	spin_lock(&conn->ibc_lock);
207 	if (credit == IBLND_POSTRX_PEER_CREDIT)
208 		conn->ibc_outstanding_credits++;
209 	else
210 		conn->ibc_reserved_credits++;
211 	spin_unlock(&conn->ibc_lock);
212 
213 	kiblnd_check_sends(conn);
214 out:
215 	kiblnd_conn_decref(conn);
216 	return rc;
217 }
218 
219 static kib_tx_t *
kiblnd_find_waiting_tx_locked(kib_conn_t * conn,int txtype,__u64 cookie)220 kiblnd_find_waiting_tx_locked(kib_conn_t *conn, int txtype, __u64 cookie)
221 {
222 	struct list_head *tmp;
223 
224 	list_for_each(tmp, &conn->ibc_active_txs) {
225 		kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);
226 
227 		LASSERT(!tx->tx_queued);
228 		LASSERT(tx->tx_sending != 0 || tx->tx_waiting);
229 
230 		if (tx->tx_cookie != cookie)
231 			continue;
232 
233 		if (tx->tx_waiting &&
234 		    tx->tx_msg->ibm_type == txtype)
235 			return tx;
236 
237 		CWARN("Bad completion: %swaiting, type %x (wanted %x)\n",
238 		      tx->tx_waiting ? "" : "NOT ",
239 		      tx->tx_msg->ibm_type, txtype);
240 	}
241 	return NULL;
242 }
243 
244 static void
kiblnd_handle_completion(kib_conn_t * conn,int txtype,int status,__u64 cookie)245 kiblnd_handle_completion(kib_conn_t *conn, int txtype, int status, __u64 cookie)
246 {
247 	kib_tx_t *tx;
248 	lnet_ni_t *ni = conn->ibc_peer->ibp_ni;
249 	int idle;
250 
251 	spin_lock(&conn->ibc_lock);
252 
253 	tx = kiblnd_find_waiting_tx_locked(conn, txtype, cookie);
254 	if (tx == NULL) {
255 		spin_unlock(&conn->ibc_lock);
256 
257 		CWARN("Unmatched completion type %x cookie %#llx from %s\n",
258 		      txtype, cookie, libcfs_nid2str(conn->ibc_peer->ibp_nid));
259 		kiblnd_close_conn(conn, -EPROTO);
260 		return;
261 	}
262 
263 	if (tx->tx_status == 0) {	       /* success so far */
264 		if (status < 0) /* failed? */
265 			tx->tx_status = status;
266 		else if (txtype == IBLND_MSG_GET_REQ)
267 			lnet_set_reply_msg_len(ni, tx->tx_lntmsg[1], status);
268 	}
269 
270 	tx->tx_waiting = 0;
271 
272 	idle = !tx->tx_queued && (tx->tx_sending == 0);
273 	if (idle)
274 		list_del(&tx->tx_list);
275 
276 	spin_unlock(&conn->ibc_lock);
277 
278 	if (idle)
279 		kiblnd_tx_done(ni, tx);
280 }
281 
282 static void
kiblnd_send_completion(kib_conn_t * conn,int type,int status,__u64 cookie)283 kiblnd_send_completion(kib_conn_t *conn, int type, int status, __u64 cookie)
284 {
285 	lnet_ni_t *ni = conn->ibc_peer->ibp_ni;
286 	kib_tx_t *tx = kiblnd_get_idle_tx(ni, conn->ibc_peer->ibp_nid);
287 
288 	if (tx == NULL) {
289 		CERROR("Can't get tx for completion %x for %s\n",
290 		       type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
291 		return;
292 	}
293 
294 	tx->tx_msg->ibm_u.completion.ibcm_status = status;
295 	tx->tx_msg->ibm_u.completion.ibcm_cookie = cookie;
296 	kiblnd_init_tx_msg(ni, tx, type, sizeof(kib_completion_msg_t));
297 
298 	kiblnd_queue_tx(tx, conn);
299 }
300 
301 static void
kiblnd_handle_rx(kib_rx_t * rx)302 kiblnd_handle_rx(kib_rx_t *rx)
303 {
304 	kib_msg_t *msg = rx->rx_msg;
305 	kib_conn_t *conn = rx->rx_conn;
306 	lnet_ni_t *ni = conn->ibc_peer->ibp_ni;
307 	int credits = msg->ibm_credits;
308 	kib_tx_t *tx;
309 	int rc = 0;
310 	int rc2;
311 	int post_credit;
312 
313 	LASSERT(conn->ibc_state >= IBLND_CONN_ESTABLISHED);
314 
315 	CDEBUG(D_NET, "Received %x[%d] from %s\n",
316 	       msg->ibm_type, credits,
317 	       libcfs_nid2str(conn->ibc_peer->ibp_nid));
318 
319 	if (credits != 0) {
320 		/* Have I received credits that will let me send? */
321 		spin_lock(&conn->ibc_lock);
322 
323 		if (conn->ibc_credits + credits >
324 		    IBLND_MSG_QUEUE_SIZE(conn->ibc_version)) {
325 			rc2 = conn->ibc_credits;
326 			spin_unlock(&conn->ibc_lock);
327 
328 			CERROR("Bad credits from %s: %d + %d > %d\n",
329 			       libcfs_nid2str(conn->ibc_peer->ibp_nid),
330 			       rc2, credits,
331 			       IBLND_MSG_QUEUE_SIZE(conn->ibc_version));
332 
333 			kiblnd_close_conn(conn, -EPROTO);
334 			kiblnd_post_rx(rx, IBLND_POSTRX_NO_CREDIT);
335 			return;
336 		}
337 
338 		conn->ibc_credits += credits;
339 
340 		/* This ensures the credit taken by NOOP can be returned */
341 		if (msg->ibm_type == IBLND_MSG_NOOP &&
342 		    !IBLND_OOB_CAPABLE(conn->ibc_version)) /* v1 only */
343 			conn->ibc_outstanding_credits++;
344 
345 		spin_unlock(&conn->ibc_lock);
346 		kiblnd_check_sends(conn);
347 	}
348 
349 	switch (msg->ibm_type) {
350 	default:
351 		CERROR("Bad IBLND message type %x from %s\n",
352 		       msg->ibm_type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
353 		post_credit = IBLND_POSTRX_NO_CREDIT;
354 		rc = -EPROTO;
355 		break;
356 
357 	case IBLND_MSG_NOOP:
358 		if (IBLND_OOB_CAPABLE(conn->ibc_version)) {
359 			post_credit = IBLND_POSTRX_NO_CREDIT;
360 			break;
361 		}
362 
363 		if (credits != 0) /* credit already posted */
364 			post_credit = IBLND_POSTRX_NO_CREDIT;
365 		else	      /* a keepalive NOOP */
366 			post_credit = IBLND_POSTRX_PEER_CREDIT;
367 		break;
368 
369 	case IBLND_MSG_IMMEDIATE:
370 		post_credit = IBLND_POSTRX_DONT_POST;
371 		rc = lnet_parse(ni, &msg->ibm_u.immediate.ibim_hdr,
372 				msg->ibm_srcnid, rx, 0);
373 		if (rc < 0)		     /* repost on error */
374 			post_credit = IBLND_POSTRX_PEER_CREDIT;
375 		break;
376 
377 	case IBLND_MSG_PUT_REQ:
378 		post_credit = IBLND_POSTRX_DONT_POST;
379 		rc = lnet_parse(ni, &msg->ibm_u.putreq.ibprm_hdr,
380 				msg->ibm_srcnid, rx, 1);
381 		if (rc < 0)		     /* repost on error */
382 			post_credit = IBLND_POSTRX_PEER_CREDIT;
383 		break;
384 
385 	case IBLND_MSG_PUT_NAK:
386 		CWARN("PUT_NACK from %s\n",
387 		      libcfs_nid2str(conn->ibc_peer->ibp_nid));
388 		post_credit = IBLND_POSTRX_RSRVD_CREDIT;
389 		kiblnd_handle_completion(conn, IBLND_MSG_PUT_REQ,
390 					 msg->ibm_u.completion.ibcm_status,
391 					 msg->ibm_u.completion.ibcm_cookie);
392 		break;
393 
394 	case IBLND_MSG_PUT_ACK:
395 		post_credit = IBLND_POSTRX_RSRVD_CREDIT;
396 
397 		spin_lock(&conn->ibc_lock);
398 		tx = kiblnd_find_waiting_tx_locked(conn, IBLND_MSG_PUT_REQ,
399 					msg->ibm_u.putack.ibpam_src_cookie);
400 		if (tx != NULL)
401 			list_del(&tx->tx_list);
402 		spin_unlock(&conn->ibc_lock);
403 
404 		if (tx == NULL) {
405 			CERROR("Unmatched PUT_ACK from %s\n",
406 			       libcfs_nid2str(conn->ibc_peer->ibp_nid));
407 			rc = -EPROTO;
408 			break;
409 		}
410 
411 		LASSERT(tx->tx_waiting);
412 		/* CAVEAT EMPTOR: I could be racing with tx_complete, but...
413 		 * (a) I can overwrite tx_msg since my peer has received it!
414 		 * (b) tx_waiting set tells tx_complete() it's not done. */
415 
416 		tx->tx_nwrq = 0;		/* overwrite PUT_REQ */
417 
418 		rc2 = kiblnd_init_rdma(conn, tx, IBLND_MSG_PUT_DONE,
419 				       kiblnd_rd_size(&msg->ibm_u.putack.ibpam_rd),
420 				       &msg->ibm_u.putack.ibpam_rd,
421 				       msg->ibm_u.putack.ibpam_dst_cookie);
422 		if (rc2 < 0)
423 			CERROR("Can't setup rdma for PUT to %s: %d\n",
424 			       libcfs_nid2str(conn->ibc_peer->ibp_nid), rc2);
425 
426 		spin_lock(&conn->ibc_lock);
427 		tx->tx_waiting = 0;	/* clear waiting and queue atomically */
428 		kiblnd_queue_tx_locked(tx, conn);
429 		spin_unlock(&conn->ibc_lock);
430 		break;
431 
432 	case IBLND_MSG_PUT_DONE:
433 		post_credit = IBLND_POSTRX_PEER_CREDIT;
434 		kiblnd_handle_completion(conn, IBLND_MSG_PUT_ACK,
435 					 msg->ibm_u.completion.ibcm_status,
436 					 msg->ibm_u.completion.ibcm_cookie);
437 		break;
438 
439 	case IBLND_MSG_GET_REQ:
440 		post_credit = IBLND_POSTRX_DONT_POST;
441 		rc = lnet_parse(ni, &msg->ibm_u.get.ibgm_hdr,
442 				msg->ibm_srcnid, rx, 1);
443 		if (rc < 0)		     /* repost on error */
444 			post_credit = IBLND_POSTRX_PEER_CREDIT;
445 		break;
446 
447 	case IBLND_MSG_GET_DONE:
448 		post_credit = IBLND_POSTRX_RSRVD_CREDIT;
449 		kiblnd_handle_completion(conn, IBLND_MSG_GET_REQ,
450 					 msg->ibm_u.completion.ibcm_status,
451 					 msg->ibm_u.completion.ibcm_cookie);
452 		break;
453 	}
454 
455 	if (rc < 0)			     /* protocol error */
456 		kiblnd_close_conn(conn, rc);
457 
458 	if (post_credit != IBLND_POSTRX_DONT_POST)
459 		kiblnd_post_rx(rx, post_credit);
460 }
461 
462 static void
kiblnd_rx_complete(kib_rx_t * rx,int status,int nob)463 kiblnd_rx_complete(kib_rx_t *rx, int status, int nob)
464 {
465 	kib_msg_t *msg = rx->rx_msg;
466 	kib_conn_t *conn = rx->rx_conn;
467 	lnet_ni_t *ni = conn->ibc_peer->ibp_ni;
468 	kib_net_t *net = ni->ni_data;
469 	int rc;
470 	int err = -EIO;
471 
472 	LASSERT(net != NULL);
473 	LASSERT(rx->rx_nob < 0);	       /* was posted */
474 	rx->rx_nob = 0;			 /* isn't now */
475 
476 	if (conn->ibc_state > IBLND_CONN_ESTABLISHED)
477 		goto ignore;
478 
479 	if (status != IB_WC_SUCCESS) {
480 		CNETERR("Rx from %s failed: %d\n",
481 			libcfs_nid2str(conn->ibc_peer->ibp_nid), status);
482 		goto failed;
483 	}
484 
485 	LASSERT(nob >= 0);
486 	rx->rx_nob = nob;
487 
488 	rc = kiblnd_unpack_msg(msg, rx->rx_nob);
489 	if (rc != 0) {
490 		CERROR("Error %d unpacking rx from %s\n",
491 			rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
492 		goto failed;
493 	}
494 
495 	if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid ||
496 	    msg->ibm_dstnid != ni->ni_nid ||
497 	    msg->ibm_srcstamp != conn->ibc_incarnation ||
498 	    msg->ibm_dststamp != net->ibn_incarnation) {
499 		CERROR("Stale rx from %s\n",
500 			libcfs_nid2str(conn->ibc_peer->ibp_nid));
501 		err = -ESTALE;
502 		goto failed;
503 	}
504 
505 	/* set time last known alive */
506 	kiblnd_peer_alive(conn->ibc_peer);
507 
508 	/* racing with connection establishment/teardown! */
509 
510 	if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
511 		rwlock_t *g_lock = &kiblnd_data.kib_global_lock;
512 		unsigned long flags;
513 
514 		write_lock_irqsave(g_lock, flags);
515 		/* must check holding global lock to eliminate race */
516 		if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
517 			list_add_tail(&rx->rx_list, &conn->ibc_early_rxs);
518 			write_unlock_irqrestore(g_lock, flags);
519 			return;
520 		}
521 		write_unlock_irqrestore(g_lock, flags);
522 	}
523 	kiblnd_handle_rx(rx);
524 	return;
525 
526  failed:
527 	CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
528 	kiblnd_close_conn(conn, err);
529  ignore:
530 	kiblnd_drop_rx(rx);		     /* Don't re-post rx. */
531 }
532 
533 static struct page *
kiblnd_kvaddr_to_page(unsigned long vaddr)534 kiblnd_kvaddr_to_page(unsigned long vaddr)
535 {
536 	struct page *page;
537 
538 	if (is_vmalloc_addr((void *)vaddr)) {
539 		page = vmalloc_to_page((void *)vaddr);
540 		LASSERT(page != NULL);
541 		return page;
542 	}
543 #ifdef CONFIG_HIGHMEM
544 	if (vaddr >= PKMAP_BASE &&
545 	    vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE)) {
546 		/* No highmem pages only used for bulk (kiov) I/O */
547 		CERROR("find page for address in highmem\n");
548 		LBUG();
549 	}
550 #endif
551 	page = virt_to_page(vaddr);
552 	LASSERT(page != NULL);
553 	return page;
554 }
555 
556 static int
kiblnd_fmr_map_tx(kib_net_t * net,kib_tx_t * tx,kib_rdma_desc_t * rd,int nob)557 kiblnd_fmr_map_tx(kib_net_t *net, kib_tx_t *tx, kib_rdma_desc_t *rd, int nob)
558 {
559 	kib_hca_dev_t *hdev;
560 	__u64 *pages = tx->tx_pages;
561 	kib_fmr_poolset_t *fps;
562 	int npages;
563 	int size;
564 	int cpt;
565 	int rc;
566 	int i;
567 
568 	LASSERT(tx->tx_pool != NULL);
569 	LASSERT(tx->tx_pool->tpo_pool.po_owner != NULL);
570 
571 	hdev = tx->tx_pool->tpo_hdev;
572 
573 	for (i = 0, npages = 0; i < rd->rd_nfrags; i++) {
574 		for (size = 0; size <  rd->rd_frags[i].rf_nob;
575 			       size += hdev->ibh_page_size) {
576 			pages[npages++] = (rd->rd_frags[i].rf_addr &
577 					    hdev->ibh_page_mask) + size;
578 		}
579 	}
580 
581 	cpt = tx->tx_pool->tpo_pool.po_owner->ps_cpt;
582 
583 	fps = net->ibn_fmr_ps[cpt];
584 	rc = kiblnd_fmr_pool_map(fps, pages, npages, 0, &tx->fmr);
585 	if (rc != 0) {
586 		CERROR("Can't map %d pages: %d\n", npages, rc);
587 		return rc;
588 	}
589 
590 	/* If rd is not tx_rd, it's going to get sent to a peer, who will need
591 	 * the rkey */
592 	rd->rd_key = (rd != tx->tx_rd) ? tx->fmr.fmr_pfmr->fmr->rkey :
593 					 tx->fmr.fmr_pfmr->fmr->lkey;
594 	rd->rd_frags[0].rf_addr &= ~hdev->ibh_page_mask;
595 	rd->rd_frags[0].rf_nob = nob;
596 	rd->rd_nfrags = 1;
597 
598 	return 0;
599 }
600 
kiblnd_unmap_tx(lnet_ni_t * ni,kib_tx_t * tx)601 static void kiblnd_unmap_tx(lnet_ni_t *ni, kib_tx_t *tx)
602 {
603 	kib_net_t *net = ni->ni_data;
604 
605 	LASSERT(net != NULL);
606 
607 	if (net->ibn_fmr_ps && tx->fmr.fmr_pfmr) {
608 		kiblnd_fmr_pool_unmap(&tx->fmr, tx->tx_status);
609 		tx->fmr.fmr_pfmr = NULL;
610 	}
611 
612 	if (tx->tx_nfrags != 0) {
613 		kiblnd_dma_unmap_sg(tx->tx_pool->tpo_hdev->ibh_ibdev,
614 				    tx->tx_frags, tx->tx_nfrags, tx->tx_dmadir);
615 		tx->tx_nfrags = 0;
616 	}
617 }
618 
kiblnd_map_tx(lnet_ni_t * ni,kib_tx_t * tx,kib_rdma_desc_t * rd,int nfrags)619 static int kiblnd_map_tx(lnet_ni_t *ni, kib_tx_t *tx, kib_rdma_desc_t *rd,
620 			 int nfrags)
621 {
622 	kib_hca_dev_t *hdev = tx->tx_pool->tpo_hdev;
623 	kib_net_t *net = ni->ni_data;
624 	struct ib_mr *mr    = NULL;
625 	__u32 nob;
626 	int i;
627 
628 	/* If rd is not tx_rd, it's going to get sent to a peer and I'm the
629 	 * RDMA sink */
630 	tx->tx_dmadir = (rd != tx->tx_rd) ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
631 	tx->tx_nfrags = nfrags;
632 
633 	rd->rd_nfrags = kiblnd_dma_map_sg(hdev->ibh_ibdev, tx->tx_frags,
634 					  tx->tx_nfrags, tx->tx_dmadir);
635 
636 	for (i = 0, nob = 0; i < rd->rd_nfrags; i++) {
637 		rd->rd_frags[i].rf_nob  = kiblnd_sg_dma_len(
638 			hdev->ibh_ibdev, &tx->tx_frags[i]);
639 		rd->rd_frags[i].rf_addr = kiblnd_sg_dma_address(
640 			hdev->ibh_ibdev, &tx->tx_frags[i]);
641 		nob += rd->rd_frags[i].rf_nob;
642 	}
643 
644 	/* looking for pre-mapping MR */
645 	mr = kiblnd_find_rd_dma_mr(hdev, rd);
646 	if (mr != NULL) {
647 		/* found pre-mapping MR */
648 		rd->rd_key = (rd != tx->tx_rd) ? mr->rkey : mr->lkey;
649 		return 0;
650 	}
651 
652 	if (net->ibn_fmr_ps != NULL)
653 		return kiblnd_fmr_map_tx(net, tx, rd, nob);
654 
655 	return -EINVAL;
656 }
657 
658 static int
kiblnd_setup_rd_iov(lnet_ni_t * ni,kib_tx_t * tx,kib_rdma_desc_t * rd,unsigned int niov,struct kvec * iov,int offset,int nob)659 kiblnd_setup_rd_iov(lnet_ni_t *ni, kib_tx_t *tx, kib_rdma_desc_t *rd,
660 		    unsigned int niov, struct kvec *iov, int offset, int nob)
661 {
662 	kib_net_t *net = ni->ni_data;
663 	struct page *page;
664 	struct scatterlist *sg;
665 	unsigned long vaddr;
666 	int fragnob;
667 	int page_offset;
668 
669 	LASSERT(nob > 0);
670 	LASSERT(niov > 0);
671 	LASSERT(net != NULL);
672 
673 	while (offset >= iov->iov_len) {
674 		offset -= iov->iov_len;
675 		niov--;
676 		iov++;
677 		LASSERT(niov > 0);
678 	}
679 
680 	sg = tx->tx_frags;
681 	do {
682 		LASSERT(niov > 0);
683 
684 		vaddr = ((unsigned long)iov->iov_base) + offset;
685 		page_offset = vaddr & (PAGE_SIZE - 1);
686 		page = kiblnd_kvaddr_to_page(vaddr);
687 		if (page == NULL) {
688 			CERROR("Can't find page\n");
689 			return -EFAULT;
690 		}
691 
692 		fragnob = min((int)(iov->iov_len - offset), nob);
693 		fragnob = min(fragnob, (int)PAGE_SIZE - page_offset);
694 
695 		sg_set_page(sg, page, fragnob, page_offset);
696 		sg++;
697 
698 		if (offset + fragnob < iov->iov_len) {
699 			offset += fragnob;
700 		} else {
701 			offset = 0;
702 			iov++;
703 			niov--;
704 		}
705 		nob -= fragnob;
706 	} while (nob > 0);
707 
708 	return kiblnd_map_tx(ni, tx, rd, sg - tx->tx_frags);
709 }
710 
711 static int
kiblnd_setup_rd_kiov(lnet_ni_t * ni,kib_tx_t * tx,kib_rdma_desc_t * rd,int nkiov,lnet_kiov_t * kiov,int offset,int nob)712 kiblnd_setup_rd_kiov(lnet_ni_t *ni, kib_tx_t *tx, kib_rdma_desc_t *rd,
713 		      int nkiov, lnet_kiov_t *kiov, int offset, int nob)
714 {
715 	kib_net_t *net = ni->ni_data;
716 	struct scatterlist *sg;
717 	int fragnob;
718 
719 	CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
720 
721 	LASSERT(nob > 0);
722 	LASSERT(nkiov > 0);
723 	LASSERT(net != NULL);
724 
725 	while (offset >= kiov->kiov_len) {
726 		offset -= kiov->kiov_len;
727 		nkiov--;
728 		kiov++;
729 		LASSERT(nkiov > 0);
730 	}
731 
732 	sg = tx->tx_frags;
733 	do {
734 		LASSERT(nkiov > 0);
735 
736 		fragnob = min((int)(kiov->kiov_len - offset), nob);
737 
738 		sg_set_page(sg, kiov->kiov_page, fragnob,
739 			    kiov->kiov_offset + offset);
740 		sg++;
741 
742 		offset = 0;
743 		kiov++;
744 		nkiov--;
745 		nob -= fragnob;
746 	} while (nob > 0);
747 
748 	return kiblnd_map_tx(ni, tx, rd, sg - tx->tx_frags);
749 }
750 
751 static int
kiblnd_post_tx_locked(kib_conn_t * conn,kib_tx_t * tx,int credit)752 kiblnd_post_tx_locked(kib_conn_t *conn, kib_tx_t *tx, int credit)
753 	__releases(conn->ibc_lock)
754 	__acquires(conn->ibc_lock)
755 {
756 	kib_msg_t *msg = tx->tx_msg;
757 	kib_peer_t *peer = conn->ibc_peer;
758 	int ver = conn->ibc_version;
759 	int rc;
760 	int done;
761 	struct ib_send_wr *bad_wrq;
762 
763 	LASSERT(tx->tx_queued);
764 	/* We rely on this for QP sizing */
765 	LASSERT(tx->tx_nwrq > 0);
766 	LASSERT(tx->tx_nwrq <= 1 + IBLND_RDMA_FRAGS(ver));
767 
768 	LASSERT(credit == 0 || credit == 1);
769 	LASSERT(conn->ibc_outstanding_credits >= 0);
770 	LASSERT(conn->ibc_outstanding_credits <= IBLND_MSG_QUEUE_SIZE(ver));
771 	LASSERT(conn->ibc_credits >= 0);
772 	LASSERT(conn->ibc_credits <= IBLND_MSG_QUEUE_SIZE(ver));
773 
774 	if (conn->ibc_nsends_posted == IBLND_CONCURRENT_SENDS(ver)) {
775 		/* tx completions outstanding... */
776 		CDEBUG(D_NET, "%s: posted enough\n",
777 		       libcfs_nid2str(peer->ibp_nid));
778 		return -EAGAIN;
779 	}
780 
781 	if (credit != 0 && conn->ibc_credits == 0) {   /* no credits */
782 		CDEBUG(D_NET, "%s: no credits\n",
783 		       libcfs_nid2str(peer->ibp_nid));
784 		return -EAGAIN;
785 	}
786 
787 	if (credit != 0 && !IBLND_OOB_CAPABLE(ver) &&
788 	    conn->ibc_credits == 1 &&   /* last credit reserved */
789 	    msg->ibm_type != IBLND_MSG_NOOP) {      /* for NOOP */
790 		CDEBUG(D_NET, "%s: not using last credit\n",
791 		       libcfs_nid2str(peer->ibp_nid));
792 		return -EAGAIN;
793 	}
794 
795 	/* NB don't drop ibc_lock before bumping tx_sending */
796 	list_del(&tx->tx_list);
797 	tx->tx_queued = 0;
798 
799 	if (msg->ibm_type == IBLND_MSG_NOOP &&
800 	    (!kiblnd_need_noop(conn) ||     /* redundant NOOP */
801 	     (IBLND_OOB_CAPABLE(ver) && /* posted enough NOOP */
802 	      conn->ibc_noops_posted == IBLND_OOB_MSGS(ver)))) {
803 		/* OK to drop when posted enough NOOPs, since
804 		 * kiblnd_check_sends will queue NOOP again when
805 		 * posted NOOPs complete */
806 		spin_unlock(&conn->ibc_lock);
807 		kiblnd_tx_done(peer->ibp_ni, tx);
808 		spin_lock(&conn->ibc_lock);
809 		CDEBUG(D_NET, "%s(%d): redundant or enough NOOP\n",
810 		       libcfs_nid2str(peer->ibp_nid),
811 		       conn->ibc_noops_posted);
812 		return 0;
813 	}
814 
815 	kiblnd_pack_msg(peer->ibp_ni, msg, ver, conn->ibc_outstanding_credits,
816 			peer->ibp_nid, conn->ibc_incarnation);
817 
818 	conn->ibc_credits -= credit;
819 	conn->ibc_outstanding_credits = 0;
820 	conn->ibc_nsends_posted++;
821 	if (msg->ibm_type == IBLND_MSG_NOOP)
822 		conn->ibc_noops_posted++;
823 
824 	/* CAVEAT EMPTOR!  This tx could be the PUT_DONE of an RDMA
825 	 * PUT.  If so, it was first queued here as a PUT_REQ, sent and
826 	 * stashed on ibc_active_txs, matched by an incoming PUT_ACK,
827 	 * and then re-queued here.  It's (just) possible that
828 	 * tx_sending is non-zero if we've not done the tx_complete()
829 	 * from the first send; hence the ++ rather than = below. */
830 	tx->tx_sending++;
831 	list_add(&tx->tx_list, &conn->ibc_active_txs);
832 
833 	/* I'm still holding ibc_lock! */
834 	if (conn->ibc_state != IBLND_CONN_ESTABLISHED) {
835 		rc = -ECONNABORTED;
836 	} else if (tx->tx_pool->tpo_pool.po_failed ||
837 		 conn->ibc_hdev != tx->tx_pool->tpo_hdev) {
838 		/* close_conn will launch failover */
839 		rc = -ENETDOWN;
840 	} else {
841 		rc = ib_post_send(conn->ibc_cmid->qp, &tx->tx_wrq->wr, &bad_wrq);
842 	}
843 
844 	conn->ibc_last_send = jiffies;
845 
846 	if (rc == 0)
847 		return 0;
848 
849 	/* NB credits are transferred in the actual
850 	 * message, which can only be the last work item */
851 	conn->ibc_credits += credit;
852 	conn->ibc_outstanding_credits += msg->ibm_credits;
853 	conn->ibc_nsends_posted--;
854 	if (msg->ibm_type == IBLND_MSG_NOOP)
855 		conn->ibc_noops_posted--;
856 
857 	tx->tx_status = rc;
858 	tx->tx_waiting = 0;
859 	tx->tx_sending--;
860 
861 	done = (tx->tx_sending == 0);
862 	if (done)
863 		list_del(&tx->tx_list);
864 
865 	spin_unlock(&conn->ibc_lock);
866 
867 	if (conn->ibc_state == IBLND_CONN_ESTABLISHED)
868 		CERROR("Error %d posting transmit to %s\n",
869 		       rc, libcfs_nid2str(peer->ibp_nid));
870 	else
871 		CDEBUG(D_NET, "Error %d posting transmit to %s\n",
872 		       rc, libcfs_nid2str(peer->ibp_nid));
873 
874 	kiblnd_close_conn(conn, rc);
875 
876 	if (done)
877 		kiblnd_tx_done(peer->ibp_ni, tx);
878 
879 	spin_lock(&conn->ibc_lock);
880 
881 	return -EIO;
882 }
883 
884 void
kiblnd_check_sends(kib_conn_t * conn)885 kiblnd_check_sends(kib_conn_t *conn)
886 {
887 	int ver = conn->ibc_version;
888 	lnet_ni_t *ni = conn->ibc_peer->ibp_ni;
889 	kib_tx_t *tx;
890 
891 	/* Don't send anything until after the connection is established */
892 	if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
893 		CDEBUG(D_NET, "%s too soon\n",
894 		       libcfs_nid2str(conn->ibc_peer->ibp_nid));
895 		return;
896 	}
897 
898 	spin_lock(&conn->ibc_lock);
899 
900 	LASSERT(conn->ibc_nsends_posted <= IBLND_CONCURRENT_SENDS(ver));
901 	LASSERT(!IBLND_OOB_CAPABLE(ver) ||
902 		 conn->ibc_noops_posted <= IBLND_OOB_MSGS(ver));
903 	LASSERT(conn->ibc_reserved_credits >= 0);
904 
905 	while (conn->ibc_reserved_credits > 0 &&
906 	       !list_empty(&conn->ibc_tx_queue_rsrvd)) {
907 		tx = list_entry(conn->ibc_tx_queue_rsrvd.next,
908 				    kib_tx_t, tx_list);
909 		list_del(&tx->tx_list);
910 		list_add_tail(&tx->tx_list, &conn->ibc_tx_queue);
911 		conn->ibc_reserved_credits--;
912 	}
913 
914 	if (kiblnd_need_noop(conn)) {
915 		spin_unlock(&conn->ibc_lock);
916 
917 		tx = kiblnd_get_idle_tx(ni, conn->ibc_peer->ibp_nid);
918 		if (tx != NULL)
919 			kiblnd_init_tx_msg(ni, tx, IBLND_MSG_NOOP, 0);
920 
921 		spin_lock(&conn->ibc_lock);
922 		if (tx != NULL)
923 			kiblnd_queue_tx_locked(tx, conn);
924 	}
925 
926 	kiblnd_conn_addref(conn); /* 1 ref for me.... (see b21911) */
927 
928 	for (;;) {
929 		int credit;
930 
931 		if (!list_empty(&conn->ibc_tx_queue_nocred)) {
932 			credit = 0;
933 			tx = list_entry(conn->ibc_tx_queue_nocred.next,
934 					    kib_tx_t, tx_list);
935 		} else if (!list_empty(&conn->ibc_tx_noops)) {
936 			LASSERT(!IBLND_OOB_CAPABLE(ver));
937 			credit = 1;
938 			tx = list_entry(conn->ibc_tx_noops.next,
939 					kib_tx_t, tx_list);
940 		} else if (!list_empty(&conn->ibc_tx_queue)) {
941 			credit = 1;
942 			tx = list_entry(conn->ibc_tx_queue.next,
943 					    kib_tx_t, tx_list);
944 		} else
945 			break;
946 
947 		if (kiblnd_post_tx_locked(conn, tx, credit) != 0)
948 			break;
949 	}
950 
951 	spin_unlock(&conn->ibc_lock);
952 
953 	kiblnd_conn_decref(conn); /* ...until here */
954 }
955 
956 static void
kiblnd_tx_complete(kib_tx_t * tx,int status)957 kiblnd_tx_complete(kib_tx_t *tx, int status)
958 {
959 	int failed = (status != IB_WC_SUCCESS);
960 	kib_conn_t *conn = tx->tx_conn;
961 	int idle;
962 
963 	LASSERT(tx->tx_sending > 0);
964 
965 	if (failed) {
966 		if (conn->ibc_state == IBLND_CONN_ESTABLISHED)
967 			CNETERR("Tx -> %s cookie %#llx sending %d waiting %d: failed %d\n",
968 				libcfs_nid2str(conn->ibc_peer->ibp_nid),
969 				tx->tx_cookie, tx->tx_sending, tx->tx_waiting,
970 				status);
971 
972 		kiblnd_close_conn(conn, -EIO);
973 	} else {
974 		kiblnd_peer_alive(conn->ibc_peer);
975 	}
976 
977 	spin_lock(&conn->ibc_lock);
978 
979 	/* I could be racing with rdma completion.  Whoever makes 'tx' idle
980 	 * gets to free it, which also drops its ref on 'conn'. */
981 
982 	tx->tx_sending--;
983 	conn->ibc_nsends_posted--;
984 	if (tx->tx_msg->ibm_type == IBLND_MSG_NOOP)
985 		conn->ibc_noops_posted--;
986 
987 	if (failed) {
988 		tx->tx_waiting = 0;	     /* don't wait for peer */
989 		tx->tx_status = -EIO;
990 	}
991 
992 	idle = (tx->tx_sending == 0) &&	 /* This is the final callback */
993 	       !tx->tx_waiting &&	       /* Not waiting for peer */
994 	       !tx->tx_queued;		  /* Not re-queued (PUT_DONE) */
995 	if (idle)
996 		list_del(&tx->tx_list);
997 
998 	kiblnd_conn_addref(conn);	       /* 1 ref for me.... */
999 
1000 	spin_unlock(&conn->ibc_lock);
1001 
1002 	if (idle)
1003 		kiblnd_tx_done(conn->ibc_peer->ibp_ni, tx);
1004 
1005 	kiblnd_check_sends(conn);
1006 
1007 	kiblnd_conn_decref(conn);	       /* ...until here */
1008 }
1009 
1010 void
kiblnd_init_tx_msg(lnet_ni_t * ni,kib_tx_t * tx,int type,int body_nob)1011 kiblnd_init_tx_msg(lnet_ni_t *ni, kib_tx_t *tx, int type, int body_nob)
1012 {
1013 	kib_hca_dev_t *hdev = tx->tx_pool->tpo_hdev;
1014 	struct ib_sge *sge = &tx->tx_sge[tx->tx_nwrq];
1015 	struct ib_rdma_wr *wrq = &tx->tx_wrq[tx->tx_nwrq];
1016 	int nob = offsetof(kib_msg_t, ibm_u) + body_nob;
1017 	struct ib_mr *mr;
1018 
1019 	LASSERT(tx->tx_nwrq >= 0);
1020 	LASSERT(tx->tx_nwrq < IBLND_MAX_RDMA_FRAGS + 1);
1021 	LASSERT(nob <= IBLND_MSG_SIZE);
1022 
1023 	kiblnd_init_msg(tx->tx_msg, type, body_nob);
1024 
1025 	mr = kiblnd_find_dma_mr(hdev, tx->tx_msgaddr, nob);
1026 	LASSERT(mr != NULL);
1027 
1028 	sge->lkey   = mr->lkey;
1029 	sge->addr   = tx->tx_msgaddr;
1030 	sge->length = nob;
1031 
1032 	memset(wrq, 0, sizeof(*wrq));
1033 
1034 	wrq->wr.next       = NULL;
1035 	wrq->wr.wr_id      = kiblnd_ptr2wreqid(tx, IBLND_WID_TX);
1036 	wrq->wr.sg_list    = sge;
1037 	wrq->wr.num_sge    = 1;
1038 	wrq->wr.opcode     = IB_WR_SEND;
1039 	wrq->wr.send_flags = IB_SEND_SIGNALED;
1040 
1041 	tx->tx_nwrq++;
1042 }
1043 
1044 int
kiblnd_init_rdma(kib_conn_t * conn,kib_tx_t * tx,int type,int resid,kib_rdma_desc_t * dstrd,__u64 dstcookie)1045 kiblnd_init_rdma(kib_conn_t *conn, kib_tx_t *tx, int type,
1046 		  int resid, kib_rdma_desc_t *dstrd, __u64 dstcookie)
1047 {
1048 	kib_msg_t *ibmsg = tx->tx_msg;
1049 	kib_rdma_desc_t *srcrd = tx->tx_rd;
1050 	struct ib_sge *sge = &tx->tx_sge[0];
1051 	struct ib_rdma_wr *wrq = &tx->tx_wrq[0], *next;
1052 	int rc  = resid;
1053 	int srcidx;
1054 	int dstidx;
1055 	int wrknob;
1056 
1057 	LASSERT(!in_interrupt());
1058 	LASSERT(tx->tx_nwrq == 0);
1059 	LASSERT(type == IBLND_MSG_GET_DONE ||
1060 		 type == IBLND_MSG_PUT_DONE);
1061 
1062 	srcidx = dstidx = 0;
1063 
1064 	while (resid > 0) {
1065 		if (srcidx >= srcrd->rd_nfrags) {
1066 			CERROR("Src buffer exhausted: %d frags\n", srcidx);
1067 			rc = -EPROTO;
1068 			break;
1069 		}
1070 
1071 		if (dstidx == dstrd->rd_nfrags) {
1072 			CERROR("Dst buffer exhausted: %d frags\n", dstidx);
1073 			rc = -EPROTO;
1074 			break;
1075 		}
1076 
1077 		if (tx->tx_nwrq == IBLND_RDMA_FRAGS(conn->ibc_version)) {
1078 			CERROR("RDMA too fragmented for %s (%d): %d/%d src %d/%d dst frags\n",
1079 			       libcfs_nid2str(conn->ibc_peer->ibp_nid),
1080 			       IBLND_RDMA_FRAGS(conn->ibc_version),
1081 			       srcidx, srcrd->rd_nfrags,
1082 			       dstidx, dstrd->rd_nfrags);
1083 			rc = -EMSGSIZE;
1084 			break;
1085 		}
1086 
1087 		wrknob = min(min(kiblnd_rd_frag_size(srcrd, srcidx),
1088 				 kiblnd_rd_frag_size(dstrd, dstidx)),
1089 			     (__u32) resid);
1090 
1091 		sge = &tx->tx_sge[tx->tx_nwrq];
1092 		sge->addr   = kiblnd_rd_frag_addr(srcrd, srcidx);
1093 		sge->lkey   = kiblnd_rd_frag_key(srcrd, srcidx);
1094 		sge->length = wrknob;
1095 
1096 		wrq = &tx->tx_wrq[tx->tx_nwrq];
1097 		next = wrq + 1;
1098 
1099 		wrq->wr.next       = &next->wr;
1100 		wrq->wr.wr_id      = kiblnd_ptr2wreqid(tx, IBLND_WID_RDMA);
1101 		wrq->wr.sg_list    = sge;
1102 		wrq->wr.num_sge    = 1;
1103 		wrq->wr.opcode     = IB_WR_RDMA_WRITE;
1104 		wrq->wr.send_flags = 0;
1105 
1106 		wrq->remote_addr = kiblnd_rd_frag_addr(dstrd, dstidx);
1107 		wrq->rkey        = kiblnd_rd_frag_key(dstrd, dstidx);
1108 
1109 		srcidx = kiblnd_rd_consume_frag(srcrd, srcidx, wrknob);
1110 		dstidx = kiblnd_rd_consume_frag(dstrd, dstidx, wrknob);
1111 
1112 		resid -= wrknob;
1113 
1114 		tx->tx_nwrq++;
1115 		wrq++;
1116 		sge++;
1117 	}
1118 
1119 	if (rc < 0)			     /* no RDMA if completing with failure */
1120 		tx->tx_nwrq = 0;
1121 
1122 	ibmsg->ibm_u.completion.ibcm_status = rc;
1123 	ibmsg->ibm_u.completion.ibcm_cookie = dstcookie;
1124 	kiblnd_init_tx_msg(conn->ibc_peer->ibp_ni, tx,
1125 			   type, sizeof(kib_completion_msg_t));
1126 
1127 	return rc;
1128 }
1129 
1130 void
kiblnd_queue_tx_locked(kib_tx_t * tx,kib_conn_t * conn)1131 kiblnd_queue_tx_locked(kib_tx_t *tx, kib_conn_t *conn)
1132 {
1133 	struct list_head *q;
1134 
1135 	LASSERT(tx->tx_nwrq > 0);	      /* work items set up */
1136 	LASSERT(!tx->tx_queued);	       /* not queued for sending already */
1137 	LASSERT(conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1138 
1139 	tx->tx_queued = 1;
1140 	tx->tx_deadline = jiffies + (*kiblnd_tunables.kib_timeout * HZ);
1141 
1142 	if (tx->tx_conn == NULL) {
1143 		kiblnd_conn_addref(conn);
1144 		tx->tx_conn = conn;
1145 		LASSERT(tx->tx_msg->ibm_type != IBLND_MSG_PUT_DONE);
1146 	} else {
1147 		/* PUT_DONE first attached to conn as a PUT_REQ */
1148 		LASSERT(tx->tx_conn == conn);
1149 		LASSERT(tx->tx_msg->ibm_type == IBLND_MSG_PUT_DONE);
1150 	}
1151 
1152 	switch (tx->tx_msg->ibm_type) {
1153 	default:
1154 		LBUG();
1155 
1156 	case IBLND_MSG_PUT_REQ:
1157 	case IBLND_MSG_GET_REQ:
1158 		q = &conn->ibc_tx_queue_rsrvd;
1159 		break;
1160 
1161 	case IBLND_MSG_PUT_NAK:
1162 	case IBLND_MSG_PUT_ACK:
1163 	case IBLND_MSG_PUT_DONE:
1164 	case IBLND_MSG_GET_DONE:
1165 		q = &conn->ibc_tx_queue_nocred;
1166 		break;
1167 
1168 	case IBLND_MSG_NOOP:
1169 		if (IBLND_OOB_CAPABLE(conn->ibc_version))
1170 			q = &conn->ibc_tx_queue_nocred;
1171 		else
1172 			q = &conn->ibc_tx_noops;
1173 		break;
1174 
1175 	case IBLND_MSG_IMMEDIATE:
1176 		q = &conn->ibc_tx_queue;
1177 		break;
1178 	}
1179 
1180 	list_add_tail(&tx->tx_list, q);
1181 }
1182 
1183 void
kiblnd_queue_tx(kib_tx_t * tx,kib_conn_t * conn)1184 kiblnd_queue_tx(kib_tx_t *tx, kib_conn_t *conn)
1185 {
1186 	spin_lock(&conn->ibc_lock);
1187 	kiblnd_queue_tx_locked(tx, conn);
1188 	spin_unlock(&conn->ibc_lock);
1189 
1190 	kiblnd_check_sends(conn);
1191 }
1192 
kiblnd_resolve_addr(struct rdma_cm_id * cmid,struct sockaddr_in * srcaddr,struct sockaddr_in * dstaddr,int timeout_ms)1193 static int kiblnd_resolve_addr(struct rdma_cm_id *cmid,
1194 			       struct sockaddr_in *srcaddr,
1195 			       struct sockaddr_in *dstaddr,
1196 			       int timeout_ms)
1197 {
1198 	unsigned short port;
1199 	int rc;
1200 
1201 	/* allow the port to be reused */
1202 	rc = rdma_set_reuseaddr(cmid, 1);
1203 	if (rc != 0) {
1204 		CERROR("Unable to set reuse on cmid: %d\n", rc);
1205 		return rc;
1206 	}
1207 
1208 	/* look for a free privileged port */
1209 	for (port = PROT_SOCK-1; port > 0; port--) {
1210 		srcaddr->sin_port = htons(port);
1211 		rc = rdma_resolve_addr(cmid,
1212 				       (struct sockaddr *)srcaddr,
1213 				       (struct sockaddr *)dstaddr,
1214 				       timeout_ms);
1215 		if (rc == 0) {
1216 			CDEBUG(D_NET, "bound to port %hu\n", port);
1217 			return 0;
1218 		} else if (rc == -EADDRINUSE || rc == -EADDRNOTAVAIL) {
1219 			CDEBUG(D_NET, "bind to port %hu failed: %d\n",
1220 			       port, rc);
1221 		} else {
1222 			return rc;
1223 		}
1224 	}
1225 
1226 	CERROR("Failed to bind to a free privileged port\n");
1227 	return rc;
1228 }
1229 
1230 static void
kiblnd_connect_peer(kib_peer_t * peer)1231 kiblnd_connect_peer(kib_peer_t *peer)
1232 {
1233 	struct rdma_cm_id *cmid;
1234 	kib_dev_t *dev;
1235 	kib_net_t *net = peer->ibp_ni->ni_data;
1236 	struct sockaddr_in srcaddr;
1237 	struct sockaddr_in dstaddr;
1238 	int rc;
1239 
1240 	LASSERT(net != NULL);
1241 	LASSERT(peer->ibp_connecting > 0);
1242 
1243 	cmid = kiblnd_rdma_create_id(kiblnd_cm_callback, peer, RDMA_PS_TCP,
1244 				     IB_QPT_RC);
1245 
1246 	if (IS_ERR(cmid)) {
1247 		CERROR("Can't create CMID for %s: %ld\n",
1248 		       libcfs_nid2str(peer->ibp_nid), PTR_ERR(cmid));
1249 		rc = PTR_ERR(cmid);
1250 		goto failed;
1251 	}
1252 
1253 	dev = net->ibn_dev;
1254 	memset(&srcaddr, 0, sizeof(srcaddr));
1255 	srcaddr.sin_family = AF_INET;
1256 	srcaddr.sin_addr.s_addr = htonl(dev->ibd_ifip);
1257 
1258 	memset(&dstaddr, 0, sizeof(dstaddr));
1259 	dstaddr.sin_family = AF_INET;
1260 	dstaddr.sin_port = htons(*kiblnd_tunables.kib_service);
1261 	dstaddr.sin_addr.s_addr = htonl(LNET_NIDADDR(peer->ibp_nid));
1262 
1263 	kiblnd_peer_addref(peer);	       /* cmid's ref */
1264 
1265 	if (*kiblnd_tunables.kib_use_priv_port) {
1266 		rc = kiblnd_resolve_addr(cmid, &srcaddr, &dstaddr,
1267 					 *kiblnd_tunables.kib_timeout * 1000);
1268 	} else {
1269 		rc = rdma_resolve_addr(cmid,
1270 				       (struct sockaddr *)&srcaddr,
1271 				       (struct sockaddr *)&dstaddr,
1272 				       *kiblnd_tunables.kib_timeout * 1000);
1273 	}
1274 	if (rc != 0) {
1275 		/* Can't initiate address resolution:  */
1276 		CERROR("Can't resolve addr for %s: %d\n",
1277 		       libcfs_nid2str(peer->ibp_nid), rc);
1278 		goto failed2;
1279 	}
1280 
1281 	LASSERT(cmid->device != NULL);
1282 	CDEBUG(D_NET, "%s: connection bound to %s:%pI4h:%s\n",
1283 	       libcfs_nid2str(peer->ibp_nid), dev->ibd_ifname,
1284 	       &dev->ibd_ifip, cmid->device->name);
1285 
1286 	return;
1287 
1288  failed2:
1289 	kiblnd_peer_decref(peer);	       /* cmid's ref */
1290 	rdma_destroy_id(cmid);
1291  failed:
1292 	kiblnd_peer_connect_failed(peer, 1, rc);
1293 }
1294 
1295 void
kiblnd_launch_tx(lnet_ni_t * ni,kib_tx_t * tx,lnet_nid_t nid)1296 kiblnd_launch_tx(lnet_ni_t *ni, kib_tx_t *tx, lnet_nid_t nid)
1297 {
1298 	kib_peer_t *peer;
1299 	kib_peer_t *peer2;
1300 	kib_conn_t *conn;
1301 	rwlock_t *g_lock = &kiblnd_data.kib_global_lock;
1302 	unsigned long flags;
1303 	int rc;
1304 
1305 	/* If I get here, I've committed to send, so I complete the tx with
1306 	 * failure on any problems */
1307 
1308 	LASSERT(tx == NULL || tx->tx_conn == NULL); /* only set when assigned a conn */
1309 	LASSERT(tx == NULL || tx->tx_nwrq > 0);     /* work items have been set up */
1310 
1311 	/* First time, just use a read lock since I expect to find my peer
1312 	 * connected */
1313 	read_lock_irqsave(g_lock, flags);
1314 
1315 	peer = kiblnd_find_peer_locked(nid);
1316 	if (peer != NULL && !list_empty(&peer->ibp_conns)) {
1317 		/* Found a peer with an established connection */
1318 		conn = kiblnd_get_conn_locked(peer);
1319 		kiblnd_conn_addref(conn); /* 1 ref for me... */
1320 
1321 		read_unlock_irqrestore(g_lock, flags);
1322 
1323 		if (tx != NULL)
1324 			kiblnd_queue_tx(tx, conn);
1325 		kiblnd_conn_decref(conn); /* ...to here */
1326 		return;
1327 	}
1328 
1329 	read_unlock(g_lock);
1330 	/* Re-try with a write lock */
1331 	write_lock(g_lock);
1332 
1333 	peer = kiblnd_find_peer_locked(nid);
1334 	if (peer != NULL) {
1335 		if (list_empty(&peer->ibp_conns)) {
1336 			/* found a peer, but it's still connecting... */
1337 			LASSERT(peer->ibp_connecting != 0 ||
1338 				 peer->ibp_accepting != 0);
1339 			if (tx != NULL)
1340 				list_add_tail(&tx->tx_list,
1341 						  &peer->ibp_tx_queue);
1342 			write_unlock_irqrestore(g_lock, flags);
1343 		} else {
1344 			conn = kiblnd_get_conn_locked(peer);
1345 			kiblnd_conn_addref(conn); /* 1 ref for me... */
1346 
1347 			write_unlock_irqrestore(g_lock, flags);
1348 
1349 			if (tx != NULL)
1350 				kiblnd_queue_tx(tx, conn);
1351 			kiblnd_conn_decref(conn); /* ...to here */
1352 		}
1353 		return;
1354 	}
1355 
1356 	write_unlock_irqrestore(g_lock, flags);
1357 
1358 	/* Allocate a peer ready to add to the peer table and retry */
1359 	rc = kiblnd_create_peer(ni, &peer, nid);
1360 	if (rc != 0) {
1361 		CERROR("Can't create peer %s\n", libcfs_nid2str(nid));
1362 		if (tx != NULL) {
1363 			tx->tx_status = -EHOSTUNREACH;
1364 			tx->tx_waiting = 0;
1365 			kiblnd_tx_done(ni, tx);
1366 		}
1367 		return;
1368 	}
1369 
1370 	write_lock_irqsave(g_lock, flags);
1371 
1372 	peer2 = kiblnd_find_peer_locked(nid);
1373 	if (peer2 != NULL) {
1374 		if (list_empty(&peer2->ibp_conns)) {
1375 			/* found a peer, but it's still connecting... */
1376 			LASSERT(peer2->ibp_connecting != 0 ||
1377 				 peer2->ibp_accepting != 0);
1378 			if (tx != NULL)
1379 				list_add_tail(&tx->tx_list,
1380 						  &peer2->ibp_tx_queue);
1381 			write_unlock_irqrestore(g_lock, flags);
1382 		} else {
1383 			conn = kiblnd_get_conn_locked(peer2);
1384 			kiblnd_conn_addref(conn); /* 1 ref for me... */
1385 
1386 			write_unlock_irqrestore(g_lock, flags);
1387 
1388 			if (tx != NULL)
1389 				kiblnd_queue_tx(tx, conn);
1390 			kiblnd_conn_decref(conn); /* ...to here */
1391 		}
1392 
1393 		kiblnd_peer_decref(peer);
1394 		return;
1395 	}
1396 
1397 	/* Brand new peer */
1398 	LASSERT(peer->ibp_connecting == 0);
1399 	peer->ibp_connecting = 1;
1400 
1401 	/* always called with a ref on ni, which prevents ni being shutdown */
1402 	LASSERT(((kib_net_t *)ni->ni_data)->ibn_shutdown == 0);
1403 
1404 	if (tx != NULL)
1405 		list_add_tail(&tx->tx_list, &peer->ibp_tx_queue);
1406 
1407 	kiblnd_peer_addref(peer);
1408 	list_add_tail(&peer->ibp_list, kiblnd_nid2peerlist(nid));
1409 
1410 	write_unlock_irqrestore(g_lock, flags);
1411 
1412 	kiblnd_connect_peer(peer);
1413 	kiblnd_peer_decref(peer);
1414 }
1415 
1416 int
kiblnd_send(lnet_ni_t * ni,void * private,lnet_msg_t * lntmsg)1417 kiblnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1418 {
1419 	lnet_hdr_t *hdr = &lntmsg->msg_hdr;
1420 	int type = lntmsg->msg_type;
1421 	lnet_process_id_t target = lntmsg->msg_target;
1422 	int target_is_router = lntmsg->msg_target_is_router;
1423 	int routing = lntmsg->msg_routing;
1424 	unsigned int payload_niov = lntmsg->msg_niov;
1425 	struct kvec *payload_iov = lntmsg->msg_iov;
1426 	lnet_kiov_t *payload_kiov = lntmsg->msg_kiov;
1427 	unsigned int payload_offset = lntmsg->msg_offset;
1428 	unsigned int payload_nob = lntmsg->msg_len;
1429 	kib_msg_t *ibmsg;
1430 	kib_rdma_desc_t  *rd;
1431 	kib_tx_t *tx;
1432 	int nob;
1433 	int rc;
1434 
1435 	/* NB 'private' is different depending on what we're sending.... */
1436 
1437 	CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1438 	       payload_nob, payload_niov, libcfs_id2str(target));
1439 
1440 	LASSERT(payload_nob == 0 || payload_niov > 0);
1441 	LASSERT(payload_niov <= LNET_MAX_IOV);
1442 
1443 	/* Thread context */
1444 	LASSERT(!in_interrupt());
1445 	/* payload is either all vaddrs or all pages */
1446 	LASSERT(!(payload_kiov != NULL && payload_iov != NULL));
1447 
1448 	switch (type) {
1449 	default:
1450 		LBUG();
1451 		return -EIO;
1452 
1453 	case LNET_MSG_ACK:
1454 		LASSERT(payload_nob == 0);
1455 		break;
1456 
1457 	case LNET_MSG_GET:
1458 		if (routing || target_is_router)
1459 			break;		  /* send IMMEDIATE */
1460 
1461 		/* is the REPLY message too small for RDMA? */
1462 		nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[lntmsg->msg_md->md_length]);
1463 		if (nob <= IBLND_MSG_SIZE)
1464 			break;		  /* send IMMEDIATE */
1465 
1466 		tx = kiblnd_get_idle_tx(ni, target.nid);
1467 		if (tx == NULL) {
1468 			CERROR("Can't allocate txd for GET to %s\n",
1469 			       libcfs_nid2str(target.nid));
1470 			return -ENOMEM;
1471 		}
1472 
1473 		ibmsg = tx->tx_msg;
1474 		rd = &ibmsg->ibm_u.get.ibgm_rd;
1475 		if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
1476 			rc = kiblnd_setup_rd_iov(ni, tx, rd,
1477 						 lntmsg->msg_md->md_niov,
1478 						 lntmsg->msg_md->md_iov.iov,
1479 						 0, lntmsg->msg_md->md_length);
1480 		else
1481 			rc = kiblnd_setup_rd_kiov(ni, tx, rd,
1482 						  lntmsg->msg_md->md_niov,
1483 						  lntmsg->msg_md->md_iov.kiov,
1484 						  0, lntmsg->msg_md->md_length);
1485 		if (rc != 0) {
1486 			CERROR("Can't setup GET sink for %s: %d\n",
1487 			       libcfs_nid2str(target.nid), rc);
1488 			kiblnd_tx_done(ni, tx);
1489 			return -EIO;
1490 		}
1491 
1492 		nob = offsetof(kib_get_msg_t, ibgm_rd.rd_frags[rd->rd_nfrags]);
1493 		ibmsg->ibm_u.get.ibgm_cookie = tx->tx_cookie;
1494 		ibmsg->ibm_u.get.ibgm_hdr = *hdr;
1495 
1496 		kiblnd_init_tx_msg(ni, tx, IBLND_MSG_GET_REQ, nob);
1497 
1498 		tx->tx_lntmsg[1] = lnet_create_reply_msg(ni, lntmsg);
1499 		if (tx->tx_lntmsg[1] == NULL) {
1500 			CERROR("Can't create reply for GET -> %s\n",
1501 			       libcfs_nid2str(target.nid));
1502 			kiblnd_tx_done(ni, tx);
1503 			return -EIO;
1504 		}
1505 
1506 		tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg[0,1] on completion */
1507 		tx->tx_waiting = 1;	     /* waiting for GET_DONE */
1508 		kiblnd_launch_tx(ni, tx, target.nid);
1509 		return 0;
1510 
1511 	case LNET_MSG_REPLY:
1512 	case LNET_MSG_PUT:
1513 		/* Is the payload small enough not to need RDMA? */
1514 		nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1515 		if (nob <= IBLND_MSG_SIZE)
1516 			break;		  /* send IMMEDIATE */
1517 
1518 		tx = kiblnd_get_idle_tx(ni, target.nid);
1519 		if (tx == NULL) {
1520 			CERROR("Can't allocate %s txd for %s\n",
1521 			       type == LNET_MSG_PUT ? "PUT" : "REPLY",
1522 			       libcfs_nid2str(target.nid));
1523 			return -ENOMEM;
1524 		}
1525 
1526 		if (payload_kiov == NULL)
1527 			rc = kiblnd_setup_rd_iov(ni, tx, tx->tx_rd,
1528 						 payload_niov, payload_iov,
1529 						 payload_offset, payload_nob);
1530 		else
1531 			rc = kiblnd_setup_rd_kiov(ni, tx, tx->tx_rd,
1532 						  payload_niov, payload_kiov,
1533 						  payload_offset, payload_nob);
1534 		if (rc != 0) {
1535 			CERROR("Can't setup PUT src for %s: %d\n",
1536 			       libcfs_nid2str(target.nid), rc);
1537 			kiblnd_tx_done(ni, tx);
1538 			return -EIO;
1539 		}
1540 
1541 		ibmsg = tx->tx_msg;
1542 		ibmsg->ibm_u.putreq.ibprm_hdr = *hdr;
1543 		ibmsg->ibm_u.putreq.ibprm_cookie = tx->tx_cookie;
1544 		kiblnd_init_tx_msg(ni, tx, IBLND_MSG_PUT_REQ, sizeof(kib_putreq_msg_t));
1545 
1546 		tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1547 		tx->tx_waiting = 1;	     /* waiting for PUT_{ACK,NAK} */
1548 		kiblnd_launch_tx(ni, tx, target.nid);
1549 		return 0;
1550 	}
1551 
1552 	/* send IMMEDIATE */
1553 
1554 	LASSERT(offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob])
1555 		 <= IBLND_MSG_SIZE);
1556 
1557 	tx = kiblnd_get_idle_tx(ni, target.nid);
1558 	if (tx == NULL) {
1559 		CERROR("Can't send %d to %s: tx descs exhausted\n",
1560 			type, libcfs_nid2str(target.nid));
1561 		return -ENOMEM;
1562 	}
1563 
1564 	ibmsg = tx->tx_msg;
1565 	ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1566 
1567 	if (payload_kiov != NULL)
1568 		lnet_copy_kiov2flat(IBLND_MSG_SIZE, ibmsg,
1569 				    offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1570 				    payload_niov, payload_kiov,
1571 				    payload_offset, payload_nob);
1572 	else
1573 		lnet_copy_iov2flat(IBLND_MSG_SIZE, ibmsg,
1574 				   offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1575 				   payload_niov, payload_iov,
1576 				   payload_offset, payload_nob);
1577 
1578 	nob = offsetof(kib_immediate_msg_t, ibim_payload[payload_nob]);
1579 	kiblnd_init_tx_msg(ni, tx, IBLND_MSG_IMMEDIATE, nob);
1580 
1581 	tx->tx_lntmsg[0] = lntmsg;	      /* finalise lntmsg on completion */
1582 	kiblnd_launch_tx(ni, tx, target.nid);
1583 	return 0;
1584 }
1585 
1586 static void
kiblnd_reply(lnet_ni_t * ni,kib_rx_t * rx,lnet_msg_t * lntmsg)1587 kiblnd_reply(lnet_ni_t *ni, kib_rx_t *rx, lnet_msg_t *lntmsg)
1588 {
1589 	lnet_process_id_t target = lntmsg->msg_target;
1590 	unsigned int niov = lntmsg->msg_niov;
1591 	struct kvec *iov = lntmsg->msg_iov;
1592 	lnet_kiov_t *kiov = lntmsg->msg_kiov;
1593 	unsigned int offset = lntmsg->msg_offset;
1594 	unsigned int nob = lntmsg->msg_len;
1595 	kib_tx_t *tx;
1596 	int rc;
1597 
1598 	tx = kiblnd_get_idle_tx(ni, rx->rx_conn->ibc_peer->ibp_nid);
1599 	if (tx == NULL) {
1600 		CERROR("Can't get tx for REPLY to %s\n",
1601 		       libcfs_nid2str(target.nid));
1602 		goto failed_0;
1603 	}
1604 
1605 	if (nob == 0)
1606 		rc = 0;
1607 	else if (kiov == NULL)
1608 		rc = kiblnd_setup_rd_iov(ni, tx, tx->tx_rd,
1609 					 niov, iov, offset, nob);
1610 	else
1611 		rc = kiblnd_setup_rd_kiov(ni, tx, tx->tx_rd,
1612 					  niov, kiov, offset, nob);
1613 
1614 	if (rc != 0) {
1615 		CERROR("Can't setup GET src for %s: %d\n",
1616 		       libcfs_nid2str(target.nid), rc);
1617 		goto failed_1;
1618 	}
1619 
1620 	rc = kiblnd_init_rdma(rx->rx_conn, tx,
1621 			      IBLND_MSG_GET_DONE, nob,
1622 			      &rx->rx_msg->ibm_u.get.ibgm_rd,
1623 			      rx->rx_msg->ibm_u.get.ibgm_cookie);
1624 	if (rc < 0) {
1625 		CERROR("Can't setup rdma for GET from %s: %d\n",
1626 		       libcfs_nid2str(target.nid), rc);
1627 		goto failed_1;
1628 	}
1629 
1630 	if (nob == 0) {
1631 		/* No RDMA: local completion may happen now! */
1632 		lnet_finalize(ni, lntmsg, 0);
1633 	} else {
1634 		/* RDMA: lnet_finalize(lntmsg) when it
1635 		 * completes */
1636 		tx->tx_lntmsg[0] = lntmsg;
1637 	}
1638 
1639 	kiblnd_queue_tx(tx, rx->rx_conn);
1640 	return;
1641 
1642  failed_1:
1643 	kiblnd_tx_done(ni, tx);
1644  failed_0:
1645 	lnet_finalize(ni, lntmsg, -EIO);
1646 }
1647 
1648 int
kiblnd_recv(lnet_ni_t * ni,void * private,lnet_msg_t * lntmsg,int delayed,unsigned int niov,struct kvec * iov,lnet_kiov_t * kiov,unsigned int offset,unsigned int mlen,unsigned int rlen)1649 kiblnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
1650 	     unsigned int niov, struct kvec *iov, lnet_kiov_t *kiov,
1651 	     unsigned int offset, unsigned int mlen, unsigned int rlen)
1652 {
1653 	kib_rx_t *rx = private;
1654 	kib_msg_t *rxmsg = rx->rx_msg;
1655 	kib_conn_t *conn = rx->rx_conn;
1656 	kib_tx_t *tx;
1657 	int nob;
1658 	int post_credit = IBLND_POSTRX_PEER_CREDIT;
1659 	int rc = 0;
1660 
1661 	LASSERT(mlen <= rlen);
1662 	LASSERT(!in_interrupt());
1663 	/* Either all pages or all vaddrs */
1664 	LASSERT(!(kiov != NULL && iov != NULL));
1665 
1666 	switch (rxmsg->ibm_type) {
1667 	default:
1668 		LBUG();
1669 
1670 	case IBLND_MSG_IMMEDIATE:
1671 		nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);
1672 		if (nob > rx->rx_nob) {
1673 			CERROR("Immediate message from %s too big: %d(%d)\n",
1674 				libcfs_nid2str(rxmsg->ibm_u.immediate.ibim_hdr.src_nid),
1675 				nob, rx->rx_nob);
1676 			rc = -EPROTO;
1677 			break;
1678 		}
1679 
1680 		if (kiov != NULL)
1681 			lnet_copy_flat2kiov(niov, kiov, offset,
1682 					    IBLND_MSG_SIZE, rxmsg,
1683 					    offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1684 					    mlen);
1685 		else
1686 			lnet_copy_flat2iov(niov, iov, offset,
1687 					   IBLND_MSG_SIZE, rxmsg,
1688 					   offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
1689 					   mlen);
1690 		lnet_finalize(ni, lntmsg, 0);
1691 		break;
1692 
1693 	case IBLND_MSG_PUT_REQ: {
1694 		kib_msg_t	*txmsg;
1695 		kib_rdma_desc_t *rd;
1696 
1697 		if (mlen == 0) {
1698 			lnet_finalize(ni, lntmsg, 0);
1699 			kiblnd_send_completion(rx->rx_conn, IBLND_MSG_PUT_NAK, 0,
1700 					       rxmsg->ibm_u.putreq.ibprm_cookie);
1701 			break;
1702 		}
1703 
1704 		tx = kiblnd_get_idle_tx(ni, conn->ibc_peer->ibp_nid);
1705 		if (tx == NULL) {
1706 			CERROR("Can't allocate tx for %s\n",
1707 			       libcfs_nid2str(conn->ibc_peer->ibp_nid));
1708 			/* Not replying will break the connection */
1709 			rc = -ENOMEM;
1710 			break;
1711 		}
1712 
1713 		txmsg = tx->tx_msg;
1714 		rd = &txmsg->ibm_u.putack.ibpam_rd;
1715 		if (kiov == NULL)
1716 			rc = kiblnd_setup_rd_iov(ni, tx, rd,
1717 						 niov, iov, offset, mlen);
1718 		else
1719 			rc = kiblnd_setup_rd_kiov(ni, tx, rd,
1720 						  niov, kiov, offset, mlen);
1721 		if (rc != 0) {
1722 			CERROR("Can't setup PUT sink for %s: %d\n",
1723 			       libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
1724 			kiblnd_tx_done(ni, tx);
1725 			/* tell peer it's over */
1726 			kiblnd_send_completion(rx->rx_conn, IBLND_MSG_PUT_NAK, rc,
1727 					       rxmsg->ibm_u.putreq.ibprm_cookie);
1728 			break;
1729 		}
1730 
1731 		nob = offsetof(kib_putack_msg_t, ibpam_rd.rd_frags[rd->rd_nfrags]);
1732 		txmsg->ibm_u.putack.ibpam_src_cookie = rxmsg->ibm_u.putreq.ibprm_cookie;
1733 		txmsg->ibm_u.putack.ibpam_dst_cookie = tx->tx_cookie;
1734 
1735 		kiblnd_init_tx_msg(ni, tx, IBLND_MSG_PUT_ACK, nob);
1736 
1737 		tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1738 		tx->tx_waiting = 1;	     /* waiting for PUT_DONE */
1739 		kiblnd_queue_tx(tx, conn);
1740 
1741 		/* reposted buffer reserved for PUT_DONE */
1742 		post_credit = IBLND_POSTRX_NO_CREDIT;
1743 		break;
1744 		}
1745 
1746 	case IBLND_MSG_GET_REQ:
1747 		if (lntmsg != NULL) {
1748 			/* Optimized GET; RDMA lntmsg's payload */
1749 			kiblnd_reply(ni, rx, lntmsg);
1750 		} else {
1751 			/* GET didn't match anything */
1752 			kiblnd_send_completion(rx->rx_conn, IBLND_MSG_GET_DONE,
1753 					       -ENODATA,
1754 					       rxmsg->ibm_u.get.ibgm_cookie);
1755 		}
1756 		break;
1757 	}
1758 
1759 	kiblnd_post_rx(rx, post_credit);
1760 	return rc;
1761 }
1762 
1763 int
kiblnd_thread_start(int (* fn)(void * arg),void * arg,char * name)1764 kiblnd_thread_start(int (*fn)(void *arg), void *arg, char *name)
1765 {
1766 	struct task_struct *task = kthread_run(fn, arg, "%s", name);
1767 
1768 	if (IS_ERR(task))
1769 		return PTR_ERR(task);
1770 
1771 	atomic_inc(&kiblnd_data.kib_nthreads);
1772 	return 0;
1773 }
1774 
1775 static void
kiblnd_thread_fini(void)1776 kiblnd_thread_fini(void)
1777 {
1778 	atomic_dec(&kiblnd_data.kib_nthreads);
1779 }
1780 
1781 void
kiblnd_peer_alive(kib_peer_t * peer)1782 kiblnd_peer_alive(kib_peer_t *peer)
1783 {
1784 	/* This is racy, but everyone's only writing cfs_time_current() */
1785 	peer->ibp_last_alive = cfs_time_current();
1786 	mb();
1787 }
1788 
1789 static void
kiblnd_peer_notify(kib_peer_t * peer)1790 kiblnd_peer_notify(kib_peer_t *peer)
1791 {
1792 	int error = 0;
1793 	unsigned long last_alive = 0;
1794 	unsigned long flags;
1795 
1796 	read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1797 
1798 	if (list_empty(&peer->ibp_conns) &&
1799 	    peer->ibp_accepting == 0 &&
1800 	    peer->ibp_connecting == 0 &&
1801 	    peer->ibp_error != 0) {
1802 		error = peer->ibp_error;
1803 		peer->ibp_error = 0;
1804 
1805 		last_alive = peer->ibp_last_alive;
1806 	}
1807 
1808 	read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1809 
1810 	if (error != 0)
1811 		lnet_notify(peer->ibp_ni,
1812 			    peer->ibp_nid, 0, last_alive);
1813 }
1814 
1815 void
kiblnd_close_conn_locked(kib_conn_t * conn,int error)1816 kiblnd_close_conn_locked(kib_conn_t *conn, int error)
1817 {
1818 	/* This just does the immediate housekeeping.  'error' is zero for a
1819 	 * normal shutdown which can happen only after the connection has been
1820 	 * established.  If the connection is established, schedule the
1821 	 * connection to be finished off by the connd.  Otherwise the connd is
1822 	 * already dealing with it (either to set it up or tear it down).
1823 	 * Caller holds kib_global_lock exclusively in irq context */
1824 	kib_peer_t *peer = conn->ibc_peer;
1825 	kib_dev_t *dev;
1826 	unsigned long flags;
1827 
1828 	LASSERT(error != 0 || conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1829 
1830 	if (error != 0 && conn->ibc_comms_error == 0)
1831 		conn->ibc_comms_error = error;
1832 
1833 	if (conn->ibc_state != IBLND_CONN_ESTABLISHED)
1834 		return; /* already being handled  */
1835 
1836 	if (error == 0 &&
1837 	    list_empty(&conn->ibc_tx_noops) &&
1838 	    list_empty(&conn->ibc_tx_queue) &&
1839 	    list_empty(&conn->ibc_tx_queue_rsrvd) &&
1840 	    list_empty(&conn->ibc_tx_queue_nocred) &&
1841 	    list_empty(&conn->ibc_active_txs)) {
1842 		CDEBUG(D_NET, "closing conn to %s\n",
1843 		       libcfs_nid2str(peer->ibp_nid));
1844 	} else {
1845 		CNETERR("Closing conn to %s: error %d%s%s%s%s%s\n",
1846 		       libcfs_nid2str(peer->ibp_nid), error,
1847 		       list_empty(&conn->ibc_tx_queue) ? "" : "(sending)",
1848 		       list_empty(&conn->ibc_tx_noops) ? "" : "(sending_noops)",
1849 		       list_empty(&conn->ibc_tx_queue_rsrvd) ? "" : "(sending_rsrvd)",
1850 		       list_empty(&conn->ibc_tx_queue_nocred) ? "" : "(sending_nocred)",
1851 		       list_empty(&conn->ibc_active_txs) ? "" : "(waiting)");
1852 	}
1853 
1854 	dev = ((kib_net_t *)peer->ibp_ni->ni_data)->ibn_dev;
1855 	list_del(&conn->ibc_list);
1856 	/* connd (see below) takes over ibc_list's ref */
1857 
1858 	if (list_empty(&peer->ibp_conns) &&    /* no more conns */
1859 	    kiblnd_peer_active(peer)) {	 /* still in peer table */
1860 		kiblnd_unlink_peer_locked(peer);
1861 
1862 		/* set/clear error on last conn */
1863 		peer->ibp_error = conn->ibc_comms_error;
1864 	}
1865 
1866 	kiblnd_set_conn_state(conn, IBLND_CONN_CLOSING);
1867 
1868 	if (error != 0 &&
1869 	    kiblnd_dev_can_failover(dev)) {
1870 		list_add_tail(&dev->ibd_fail_list,
1871 			      &kiblnd_data.kib_failed_devs);
1872 		wake_up(&kiblnd_data.kib_failover_waitq);
1873 	}
1874 
1875 	spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
1876 
1877 	list_add_tail(&conn->ibc_list, &kiblnd_data.kib_connd_conns);
1878 	wake_up(&kiblnd_data.kib_connd_waitq);
1879 
1880 	spin_unlock_irqrestore(&kiblnd_data.kib_connd_lock, flags);
1881 }
1882 
1883 void
kiblnd_close_conn(kib_conn_t * conn,int error)1884 kiblnd_close_conn(kib_conn_t *conn, int error)
1885 {
1886 	unsigned long flags;
1887 
1888 	write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1889 
1890 	kiblnd_close_conn_locked(conn, error);
1891 
1892 	write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1893 }
1894 
1895 static void
kiblnd_handle_early_rxs(kib_conn_t * conn)1896 kiblnd_handle_early_rxs(kib_conn_t *conn)
1897 {
1898 	unsigned long flags;
1899 	kib_rx_t *rx;
1900 	kib_rx_t *tmp;
1901 
1902 	LASSERT(!in_interrupt());
1903 	LASSERT(conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1904 
1905 	write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1906 	list_for_each_entry_safe(rx, tmp, &conn->ibc_early_rxs, rx_list) {
1907 		list_del(&rx->rx_list);
1908 		write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1909 
1910 		kiblnd_handle_rx(rx);
1911 
1912 		write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1913 	}
1914 	write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1915 }
1916 
1917 static void
kiblnd_abort_txs(kib_conn_t * conn,struct list_head * txs)1918 kiblnd_abort_txs(kib_conn_t *conn, struct list_head *txs)
1919 {
1920 	LIST_HEAD(zombies);
1921 	struct list_head *tmp;
1922 	struct list_head *nxt;
1923 	kib_tx_t *tx;
1924 
1925 	spin_lock(&conn->ibc_lock);
1926 
1927 	list_for_each_safe(tmp, nxt, txs) {
1928 		tx = list_entry(tmp, kib_tx_t, tx_list);
1929 
1930 		if (txs == &conn->ibc_active_txs) {
1931 			LASSERT(!tx->tx_queued);
1932 			LASSERT(tx->tx_waiting ||
1933 				 tx->tx_sending != 0);
1934 		} else {
1935 			LASSERT(tx->tx_queued);
1936 		}
1937 
1938 		tx->tx_status = -ECONNABORTED;
1939 		tx->tx_waiting = 0;
1940 
1941 		if (tx->tx_sending == 0) {
1942 			tx->tx_queued = 0;
1943 			list_del(&tx->tx_list);
1944 			list_add(&tx->tx_list, &zombies);
1945 		}
1946 	}
1947 
1948 	spin_unlock(&conn->ibc_lock);
1949 
1950 	kiblnd_txlist_done(conn->ibc_peer->ibp_ni, &zombies, -ECONNABORTED);
1951 }
1952 
1953 static void
kiblnd_finalise_conn(kib_conn_t * conn)1954 kiblnd_finalise_conn(kib_conn_t *conn)
1955 {
1956 	LASSERT(!in_interrupt());
1957 	LASSERT(conn->ibc_state > IBLND_CONN_INIT);
1958 
1959 	kiblnd_set_conn_state(conn, IBLND_CONN_DISCONNECTED);
1960 
1961 	/* abort_receives moves QP state to IB_QPS_ERR.  This is only required
1962 	 * for connections that didn't get as far as being connected, because
1963 	 * rdma_disconnect() does this for free. */
1964 	kiblnd_abort_receives(conn);
1965 
1966 	/* Complete all tx descs not waiting for sends to complete.
1967 	 * NB we should be safe from RDMA now that the QP has changed state */
1968 
1969 	kiblnd_abort_txs(conn, &conn->ibc_tx_noops);
1970 	kiblnd_abort_txs(conn, &conn->ibc_tx_queue);
1971 	kiblnd_abort_txs(conn, &conn->ibc_tx_queue_rsrvd);
1972 	kiblnd_abort_txs(conn, &conn->ibc_tx_queue_nocred);
1973 	kiblnd_abort_txs(conn, &conn->ibc_active_txs);
1974 
1975 	kiblnd_handle_early_rxs(conn);
1976 }
1977 
1978 void
kiblnd_peer_connect_failed(kib_peer_t * peer,int active,int error)1979 kiblnd_peer_connect_failed(kib_peer_t *peer, int active, int error)
1980 {
1981 	LIST_HEAD(zombies);
1982 	unsigned long flags;
1983 
1984 	LASSERT(error != 0);
1985 	LASSERT(!in_interrupt());
1986 
1987 	write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1988 
1989 	if (active) {
1990 		LASSERT(peer->ibp_connecting > 0);
1991 		peer->ibp_connecting--;
1992 	} else {
1993 		LASSERT(peer->ibp_accepting > 0);
1994 		peer->ibp_accepting--;
1995 	}
1996 
1997 	if (peer->ibp_connecting != 0 ||
1998 	    peer->ibp_accepting != 0) {
1999 		/* another connection attempt under way... */
2000 		write_unlock_irqrestore(&kiblnd_data.kib_global_lock,
2001 					    flags);
2002 		return;
2003 	}
2004 
2005 	if (list_empty(&peer->ibp_conns)) {
2006 		/* Take peer's blocked transmits to complete with error */
2007 		list_add(&zombies, &peer->ibp_tx_queue);
2008 		list_del_init(&peer->ibp_tx_queue);
2009 
2010 		if (kiblnd_peer_active(peer))
2011 			kiblnd_unlink_peer_locked(peer);
2012 
2013 		peer->ibp_error = error;
2014 	} else {
2015 		/* Can't have blocked transmits if there are connections */
2016 		LASSERT(list_empty(&peer->ibp_tx_queue));
2017 	}
2018 
2019 	write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2020 
2021 	kiblnd_peer_notify(peer);
2022 
2023 	if (list_empty(&zombies))
2024 		return;
2025 
2026 	CNETERR("Deleting messages for %s: connection failed\n",
2027 		libcfs_nid2str(peer->ibp_nid));
2028 
2029 	kiblnd_txlist_done(peer->ibp_ni, &zombies, -EHOSTUNREACH);
2030 }
2031 
2032 void
kiblnd_connreq_done(kib_conn_t * conn,int status)2033 kiblnd_connreq_done(kib_conn_t *conn, int status)
2034 {
2035 	kib_peer_t *peer = conn->ibc_peer;
2036 	kib_tx_t *tx;
2037 	kib_tx_t *tmp;
2038 	struct list_head txs;
2039 	unsigned long flags;
2040 	int active;
2041 
2042 	active = (conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2043 
2044 	CDEBUG(D_NET, "%s: active(%d), version(%x), status(%d)\n",
2045 	       libcfs_nid2str(peer->ibp_nid), active,
2046 	       conn->ibc_version, status);
2047 
2048 	LASSERT(!in_interrupt());
2049 	LASSERT((conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT &&
2050 		  peer->ibp_connecting > 0) ||
2051 		 (conn->ibc_state == IBLND_CONN_PASSIVE_WAIT &&
2052 		  peer->ibp_accepting > 0));
2053 
2054 	LIBCFS_FREE(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
2055 	conn->ibc_connvars = NULL;
2056 
2057 	if (status != 0) {
2058 		/* failed to establish connection */
2059 		kiblnd_peer_connect_failed(peer, active, status);
2060 		kiblnd_finalise_conn(conn);
2061 		return;
2062 	}
2063 
2064 	/* connection established */
2065 	write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2066 
2067 	conn->ibc_last_send = jiffies;
2068 	kiblnd_set_conn_state(conn, IBLND_CONN_ESTABLISHED);
2069 	kiblnd_peer_alive(peer);
2070 
2071 	/* Add conn to peer's list and nuke any dangling conns from a different
2072 	 * peer instance... */
2073 	kiblnd_conn_addref(conn);	       /* +1 ref for ibc_list */
2074 	list_add(&conn->ibc_list, &peer->ibp_conns);
2075 	if (active)
2076 		peer->ibp_connecting--;
2077 	else
2078 		peer->ibp_accepting--;
2079 
2080 	if (peer->ibp_version == 0) {
2081 		peer->ibp_version     = conn->ibc_version;
2082 		peer->ibp_incarnation = conn->ibc_incarnation;
2083 	}
2084 
2085 	if (peer->ibp_version     != conn->ibc_version ||
2086 	    peer->ibp_incarnation != conn->ibc_incarnation) {
2087 		kiblnd_close_stale_conns_locked(peer, conn->ibc_version,
2088 						conn->ibc_incarnation);
2089 		peer->ibp_version     = conn->ibc_version;
2090 		peer->ibp_incarnation = conn->ibc_incarnation;
2091 	}
2092 
2093 	/* grab pending txs while I have the lock */
2094 	list_add(&txs, &peer->ibp_tx_queue);
2095 	list_del_init(&peer->ibp_tx_queue);
2096 
2097 	if (!kiblnd_peer_active(peer) ||	/* peer has been deleted */
2098 	    conn->ibc_comms_error != 0) {       /* error has happened already */
2099 		lnet_ni_t *ni = peer->ibp_ni;
2100 
2101 		/* start to shut down connection */
2102 		kiblnd_close_conn_locked(conn, -ECONNABORTED);
2103 		write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2104 
2105 		kiblnd_txlist_done(ni, &txs, -ECONNABORTED);
2106 
2107 		return;
2108 	}
2109 
2110 	write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2111 
2112 	/* Schedule blocked txs */
2113 	spin_lock(&conn->ibc_lock);
2114 	list_for_each_entry_safe(tx, tmp, &txs, tx_list) {
2115 		list_del(&tx->tx_list);
2116 
2117 		kiblnd_queue_tx_locked(tx, conn);
2118 	}
2119 	spin_unlock(&conn->ibc_lock);
2120 
2121 	kiblnd_check_sends(conn);
2122 
2123 	/* schedule blocked rxs */
2124 	kiblnd_handle_early_rxs(conn);
2125 }
2126 
2127 static void
kiblnd_reject(struct rdma_cm_id * cmid,kib_rej_t * rej)2128 kiblnd_reject(struct rdma_cm_id *cmid, kib_rej_t *rej)
2129 {
2130 	int rc;
2131 
2132 	rc = rdma_reject(cmid, rej, sizeof(*rej));
2133 
2134 	if (rc != 0)
2135 		CWARN("Error %d sending reject\n", rc);
2136 }
2137 
2138 static int
kiblnd_passive_connect(struct rdma_cm_id * cmid,void * priv,int priv_nob)2139 kiblnd_passive_connect(struct rdma_cm_id *cmid, void *priv, int priv_nob)
2140 {
2141 	rwlock_t *g_lock = &kiblnd_data.kib_global_lock;
2142 	kib_msg_t *reqmsg = priv;
2143 	kib_msg_t *ackmsg;
2144 	kib_dev_t *ibdev;
2145 	kib_peer_t *peer;
2146 	kib_peer_t *peer2;
2147 	kib_conn_t *conn;
2148 	lnet_ni_t *ni  = NULL;
2149 	kib_net_t *net = NULL;
2150 	lnet_nid_t nid;
2151 	struct rdma_conn_param cp;
2152 	kib_rej_t rej;
2153 	int version = IBLND_MSG_VERSION;
2154 	unsigned long flags;
2155 	int rc;
2156 	struct sockaddr_in *peer_addr;
2157 
2158 	LASSERT(!in_interrupt());
2159 
2160 	/* cmid inherits 'context' from the corresponding listener id */
2161 	ibdev = (kib_dev_t *)cmid->context;
2162 	LASSERT(ibdev != NULL);
2163 
2164 	memset(&rej, 0, sizeof(rej));
2165 	rej.ibr_magic = IBLND_MSG_MAGIC;
2166 	rej.ibr_why = IBLND_REJECT_FATAL;
2167 	rej.ibr_cp.ibcp_max_msg_size = IBLND_MSG_SIZE;
2168 
2169 	peer_addr = (struct sockaddr_in *)&(cmid->route.addr.dst_addr);
2170 	if (*kiblnd_tunables.kib_require_priv_port &&
2171 	    ntohs(peer_addr->sin_port) >= PROT_SOCK) {
2172 		__u32 ip = ntohl(peer_addr->sin_addr.s_addr);
2173 
2174 		CERROR("Peer's port (%pI4h:%hu) is not privileged\n",
2175 		       &ip, ntohs(peer_addr->sin_port));
2176 		goto failed;
2177 	}
2178 
2179 	if (priv_nob < offsetof(kib_msg_t, ibm_type)) {
2180 		CERROR("Short connection request\n");
2181 		goto failed;
2182 	}
2183 
2184 	/* Future protocol version compatibility support!  If the
2185 	 * o2iblnd-specific protocol changes, or when LNET unifies
2186 	 * protocols over all LNDs, the initial connection will
2187 	 * negotiate a protocol version.  I trap this here to avoid
2188 	 * console errors; the reject tells the peer which protocol I
2189 	 * speak. */
2190 	if (reqmsg->ibm_magic == LNET_PROTO_MAGIC ||
2191 	    reqmsg->ibm_magic == __swab32(LNET_PROTO_MAGIC))
2192 		goto failed;
2193 	if (reqmsg->ibm_magic == IBLND_MSG_MAGIC &&
2194 	    reqmsg->ibm_version != IBLND_MSG_VERSION &&
2195 	    reqmsg->ibm_version != IBLND_MSG_VERSION_1)
2196 		goto failed;
2197 	if (reqmsg->ibm_magic == __swab32(IBLND_MSG_MAGIC) &&
2198 	    reqmsg->ibm_version != __swab16(IBLND_MSG_VERSION) &&
2199 	    reqmsg->ibm_version != __swab16(IBLND_MSG_VERSION_1))
2200 		goto failed;
2201 
2202 	rc = kiblnd_unpack_msg(reqmsg, priv_nob);
2203 	if (rc != 0) {
2204 		CERROR("Can't parse connection request: %d\n", rc);
2205 		goto failed;
2206 	}
2207 
2208 	nid = reqmsg->ibm_srcnid;
2209 	ni = lnet_net2ni(LNET_NIDNET(reqmsg->ibm_dstnid));
2210 
2211 	if (ni != NULL) {
2212 		net = (kib_net_t *)ni->ni_data;
2213 		rej.ibr_incarnation = net->ibn_incarnation;
2214 	}
2215 
2216 	if (ni == NULL ||			 /* no matching net */
2217 	    ni->ni_nid != reqmsg->ibm_dstnid ||   /* right NET, wrong NID! */
2218 	    net->ibn_dev != ibdev) {	      /* wrong device */
2219 		CERROR("Can't accept %s on %s (%s:%d:%pI4h): bad dst nid %s\n",
2220 		       libcfs_nid2str(nid),
2221 		       ni == NULL ? "NA" : libcfs_nid2str(ni->ni_nid),
2222 		       ibdev->ibd_ifname, ibdev->ibd_nnets,
2223 		       &ibdev->ibd_ifip,
2224 		       libcfs_nid2str(reqmsg->ibm_dstnid));
2225 
2226 		goto failed;
2227 	}
2228 
2229        /* check time stamp as soon as possible */
2230 	if (reqmsg->ibm_dststamp != 0 &&
2231 	    reqmsg->ibm_dststamp != net->ibn_incarnation) {
2232 		CWARN("Stale connection request\n");
2233 		rej.ibr_why = IBLND_REJECT_CONN_STALE;
2234 		goto failed;
2235 	}
2236 
2237 	/* I can accept peer's version */
2238 	version = reqmsg->ibm_version;
2239 
2240 	if (reqmsg->ibm_type != IBLND_MSG_CONNREQ) {
2241 		CERROR("Unexpected connreq msg type: %x from %s\n",
2242 		       reqmsg->ibm_type, libcfs_nid2str(nid));
2243 		goto failed;
2244 	}
2245 
2246 	if (reqmsg->ibm_u.connparams.ibcp_queue_depth !=
2247 	    IBLND_MSG_QUEUE_SIZE(version)) {
2248 		CERROR("Can't accept %s: incompatible queue depth %d (%d wanted)\n",
2249 		       libcfs_nid2str(nid), reqmsg->ibm_u.connparams.ibcp_queue_depth,
2250 		       IBLND_MSG_QUEUE_SIZE(version));
2251 
2252 		if (version == IBLND_MSG_VERSION)
2253 			rej.ibr_why = IBLND_REJECT_MSG_QUEUE_SIZE;
2254 
2255 		goto failed;
2256 	}
2257 
2258 	if (reqmsg->ibm_u.connparams.ibcp_max_frags !=
2259 	    IBLND_RDMA_FRAGS(version)) {
2260 		CERROR("Can't accept %s(version %x): incompatible max_frags %d (%d wanted)\n",
2261 		       libcfs_nid2str(nid), version,
2262 		       reqmsg->ibm_u.connparams.ibcp_max_frags,
2263 		       IBLND_RDMA_FRAGS(version));
2264 
2265 		if (version == IBLND_MSG_VERSION)
2266 			rej.ibr_why = IBLND_REJECT_RDMA_FRAGS;
2267 
2268 		goto failed;
2269 
2270 	}
2271 
2272 	if (reqmsg->ibm_u.connparams.ibcp_max_msg_size > IBLND_MSG_SIZE) {
2273 		CERROR("Can't accept %s: message size %d too big (%d max)\n",
2274 		       libcfs_nid2str(nid),
2275 		       reqmsg->ibm_u.connparams.ibcp_max_msg_size,
2276 		       IBLND_MSG_SIZE);
2277 		goto failed;
2278 	}
2279 
2280 	/* assume 'nid' is a new peer; create  */
2281 	rc = kiblnd_create_peer(ni, &peer, nid);
2282 	if (rc != 0) {
2283 		CERROR("Can't create peer for %s\n", libcfs_nid2str(nid));
2284 		rej.ibr_why = IBLND_REJECT_NO_RESOURCES;
2285 		goto failed;
2286 	}
2287 
2288 	write_lock_irqsave(g_lock, flags);
2289 
2290 	peer2 = kiblnd_find_peer_locked(nid);
2291 	if (peer2 != NULL) {
2292 		if (peer2->ibp_version == 0) {
2293 			peer2->ibp_version     = version;
2294 			peer2->ibp_incarnation = reqmsg->ibm_srcstamp;
2295 		}
2296 
2297 		/* not the guy I've talked with */
2298 		if (peer2->ibp_incarnation != reqmsg->ibm_srcstamp ||
2299 		    peer2->ibp_version     != version) {
2300 			kiblnd_close_peer_conns_locked(peer2, -ESTALE);
2301 			write_unlock_irqrestore(g_lock, flags);
2302 
2303 			CWARN("Conn stale %s [old ver: %x, new ver: %x]\n",
2304 			      libcfs_nid2str(nid), peer2->ibp_version, version);
2305 
2306 			kiblnd_peer_decref(peer);
2307 			rej.ibr_why = IBLND_REJECT_CONN_STALE;
2308 			goto failed;
2309 		}
2310 
2311 		/* tie-break connection race in favour of the higher NID */
2312 		if (peer2->ibp_connecting != 0 &&
2313 		    nid < ni->ni_nid) {
2314 			write_unlock_irqrestore(g_lock, flags);
2315 
2316 			CWARN("Conn race %s\n", libcfs_nid2str(peer2->ibp_nid));
2317 
2318 			kiblnd_peer_decref(peer);
2319 			rej.ibr_why = IBLND_REJECT_CONN_RACE;
2320 			goto failed;
2321 		}
2322 
2323 		peer2->ibp_accepting++;
2324 		kiblnd_peer_addref(peer2);
2325 
2326 		write_unlock_irqrestore(g_lock, flags);
2327 		kiblnd_peer_decref(peer);
2328 		peer = peer2;
2329 	} else {
2330 		/* Brand new peer */
2331 		LASSERT(peer->ibp_accepting == 0);
2332 		LASSERT(peer->ibp_version == 0 &&
2333 			 peer->ibp_incarnation == 0);
2334 
2335 		peer->ibp_accepting   = 1;
2336 		peer->ibp_version     = version;
2337 		peer->ibp_incarnation = reqmsg->ibm_srcstamp;
2338 
2339 		/* I have a ref on ni that prevents it being shutdown */
2340 		LASSERT(net->ibn_shutdown == 0);
2341 
2342 		kiblnd_peer_addref(peer);
2343 		list_add_tail(&peer->ibp_list, kiblnd_nid2peerlist(nid));
2344 
2345 		write_unlock_irqrestore(g_lock, flags);
2346 	}
2347 
2348 	conn = kiblnd_create_conn(peer, cmid, IBLND_CONN_PASSIVE_WAIT, version);
2349 	if (conn == NULL) {
2350 		kiblnd_peer_connect_failed(peer, 0, -ENOMEM);
2351 		kiblnd_peer_decref(peer);
2352 		rej.ibr_why = IBLND_REJECT_NO_RESOURCES;
2353 		goto failed;
2354 	}
2355 
2356 	/* conn now "owns" cmid, so I return success from here on to ensure the
2357 	 * CM callback doesn't destroy cmid. */
2358 
2359 	conn->ibc_incarnation      = reqmsg->ibm_srcstamp;
2360 	conn->ibc_credits          = IBLND_MSG_QUEUE_SIZE(version);
2361 	conn->ibc_reserved_credits = IBLND_MSG_QUEUE_SIZE(version);
2362 	LASSERT(conn->ibc_credits + conn->ibc_reserved_credits + IBLND_OOB_MSGS(version)
2363 		 <= IBLND_RX_MSGS(version));
2364 
2365 	ackmsg = &conn->ibc_connvars->cv_msg;
2366 	memset(ackmsg, 0, sizeof(*ackmsg));
2367 
2368 	kiblnd_init_msg(ackmsg, IBLND_MSG_CONNACK,
2369 			sizeof(ackmsg->ibm_u.connparams));
2370 	ackmsg->ibm_u.connparams.ibcp_queue_depth  = IBLND_MSG_QUEUE_SIZE(version);
2371 	ackmsg->ibm_u.connparams.ibcp_max_msg_size = IBLND_MSG_SIZE;
2372 	ackmsg->ibm_u.connparams.ibcp_max_frags    = IBLND_RDMA_FRAGS(version);
2373 
2374 	kiblnd_pack_msg(ni, ackmsg, version, 0, nid, reqmsg->ibm_srcstamp);
2375 
2376 	memset(&cp, 0, sizeof(cp));
2377 	cp.private_data	= ackmsg;
2378 	cp.private_data_len = ackmsg->ibm_nob;
2379 	cp.responder_resources = 0;	     /* No atomic ops or RDMA reads */
2380 	cp.initiator_depth = 0;
2381 	cp.flow_control	= 1;
2382 	cp.retry_count = *kiblnd_tunables.kib_retry_count;
2383 	cp.rnr_retry_count = *kiblnd_tunables.kib_rnr_retry_count;
2384 
2385 	CDEBUG(D_NET, "Accept %s\n", libcfs_nid2str(nid));
2386 
2387 	rc = rdma_accept(cmid, &cp);
2388 	if (rc != 0) {
2389 		CERROR("Can't accept %s: %d\n", libcfs_nid2str(nid), rc);
2390 		rej.ibr_version = version;
2391 		rej.ibr_why     = IBLND_REJECT_FATAL;
2392 
2393 		kiblnd_reject(cmid, &rej);
2394 		kiblnd_connreq_done(conn, rc);
2395 		kiblnd_conn_decref(conn);
2396 	}
2397 
2398 	lnet_ni_decref(ni);
2399 	return 0;
2400 
2401  failed:
2402 	if (ni != NULL)
2403 		lnet_ni_decref(ni);
2404 
2405 	rej.ibr_version             = version;
2406 	rej.ibr_cp.ibcp_queue_depth = IBLND_MSG_QUEUE_SIZE(version);
2407 	rej.ibr_cp.ibcp_max_frags   = IBLND_RDMA_FRAGS(version);
2408 	kiblnd_reject(cmid, &rej);
2409 
2410 	return -ECONNREFUSED;
2411 }
2412 
2413 static void
kiblnd_reconnect(kib_conn_t * conn,int version,__u64 incarnation,int why,kib_connparams_t * cp)2414 kiblnd_reconnect(kib_conn_t *conn, int version,
2415 		  __u64 incarnation, int why, kib_connparams_t *cp)
2416 {
2417 	kib_peer_t *peer = conn->ibc_peer;
2418 	char *reason;
2419 	int retry = 0;
2420 	unsigned long flags;
2421 
2422 	LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2423 	LASSERT(peer->ibp_connecting > 0);     /* 'conn' at least */
2424 
2425 	write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2426 
2427 	/* retry connection if it's still needed and no other connection
2428 	 * attempts (active or passive) are in progress
2429 	 * NB: reconnect is still needed even when ibp_tx_queue is
2430 	 * empty if ibp_version != version because reconnect may be
2431 	 * initiated by kiblnd_query() */
2432 	if ((!list_empty(&peer->ibp_tx_queue) ||
2433 	     peer->ibp_version != version) &&
2434 	    peer->ibp_connecting == 1 &&
2435 	    peer->ibp_accepting == 0) {
2436 		retry = 1;
2437 		peer->ibp_connecting++;
2438 
2439 		peer->ibp_version     = version;
2440 		peer->ibp_incarnation = incarnation;
2441 	}
2442 
2443 	write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2444 
2445 	if (!retry)
2446 		return;
2447 
2448 	switch (why) {
2449 	default:
2450 		reason = "Unknown";
2451 		break;
2452 
2453 	case IBLND_REJECT_CONN_STALE:
2454 		reason = "stale";
2455 		break;
2456 
2457 	case IBLND_REJECT_CONN_RACE:
2458 		reason = "conn race";
2459 		break;
2460 
2461 	case IBLND_REJECT_CONN_UNCOMPAT:
2462 		reason = "version negotiation";
2463 		break;
2464 	}
2465 
2466 	CNETERR("%s: retrying (%s), %x, %x, queue_dep: %d, max_frag: %d, msg_size: %d\n",
2467 		libcfs_nid2str(peer->ibp_nid),
2468 		reason, IBLND_MSG_VERSION, version,
2469 		cp != NULL ? cp->ibcp_queue_depth  : IBLND_MSG_QUEUE_SIZE(version),
2470 		cp != NULL ? cp->ibcp_max_frags    : IBLND_RDMA_FRAGS(version),
2471 		cp != NULL ? cp->ibcp_max_msg_size : IBLND_MSG_SIZE);
2472 
2473 	kiblnd_connect_peer(peer);
2474 }
2475 
2476 static void
kiblnd_rejected(kib_conn_t * conn,int reason,void * priv,int priv_nob)2477 kiblnd_rejected(kib_conn_t *conn, int reason, void *priv, int priv_nob)
2478 {
2479 	kib_peer_t *peer = conn->ibc_peer;
2480 
2481 	LASSERT(!in_interrupt());
2482 	LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2483 
2484 	switch (reason) {
2485 	case IB_CM_REJ_STALE_CONN:
2486 		kiblnd_reconnect(conn, IBLND_MSG_VERSION, 0,
2487 				 IBLND_REJECT_CONN_STALE, NULL);
2488 		break;
2489 
2490 	case IB_CM_REJ_INVALID_SERVICE_ID:
2491 		CNETERR("%s rejected: no listener at %d\n",
2492 			libcfs_nid2str(peer->ibp_nid),
2493 			*kiblnd_tunables.kib_service);
2494 		break;
2495 
2496 	case IB_CM_REJ_CONSUMER_DEFINED:
2497 		if (priv_nob >= offsetof(kib_rej_t, ibr_padding)) {
2498 			kib_rej_t *rej = priv;
2499 			kib_connparams_t *cp = NULL;
2500 			int flip = 0;
2501 			__u64 incarnation = -1;
2502 
2503 			/* NB. default incarnation is -1 because:
2504 			 * a) V1 will ignore dst incarnation in connreq.
2505 			 * b) V2 will provide incarnation while rejecting me,
2506 			 *    -1 will be overwrote.
2507 			 *
2508 			 * if I try to connect to a V1 peer with V2 protocol,
2509 			 * it rejected me then upgrade to V2, I have no idea
2510 			 * about the upgrading and try to reconnect with V1,
2511 			 * in this case upgraded V2 can find out I'm trying to
2512 			 * talk to the old guy and reject me(incarnation is -1).
2513 			 */
2514 
2515 			if (rej->ibr_magic == __swab32(IBLND_MSG_MAGIC) ||
2516 			    rej->ibr_magic == __swab32(LNET_PROTO_MAGIC)) {
2517 				__swab32s(&rej->ibr_magic);
2518 				__swab16s(&rej->ibr_version);
2519 				flip = 1;
2520 			}
2521 
2522 			if (priv_nob >= sizeof(kib_rej_t) &&
2523 			    rej->ibr_version > IBLND_MSG_VERSION_1) {
2524 				/* priv_nob is always 148 in current version
2525 				 * of OFED, so we still need to check version.
2526 				 * (define of IB_CM_REJ_PRIVATE_DATA_SIZE) */
2527 				cp = &rej->ibr_cp;
2528 
2529 				if (flip) {
2530 					__swab64s(&rej->ibr_incarnation);
2531 					__swab16s(&cp->ibcp_queue_depth);
2532 					__swab16s(&cp->ibcp_max_frags);
2533 					__swab32s(&cp->ibcp_max_msg_size);
2534 				}
2535 
2536 				incarnation = rej->ibr_incarnation;
2537 			}
2538 
2539 			if (rej->ibr_magic != IBLND_MSG_MAGIC &&
2540 			    rej->ibr_magic != LNET_PROTO_MAGIC) {
2541 				CERROR("%s rejected: consumer defined fatal error\n",
2542 				       libcfs_nid2str(peer->ibp_nid));
2543 				break;
2544 			}
2545 
2546 			if (rej->ibr_version != IBLND_MSG_VERSION &&
2547 			    rej->ibr_version != IBLND_MSG_VERSION_1) {
2548 				CERROR("%s rejected: o2iblnd version %x error\n",
2549 				       libcfs_nid2str(peer->ibp_nid),
2550 				       rej->ibr_version);
2551 				break;
2552 			}
2553 
2554 			if (rej->ibr_why     == IBLND_REJECT_FATAL &&
2555 			    rej->ibr_version == IBLND_MSG_VERSION_1) {
2556 				CDEBUG(D_NET, "rejected by old version peer %s: %x\n",
2557 				       libcfs_nid2str(peer->ibp_nid), rej->ibr_version);
2558 
2559 				if (conn->ibc_version != IBLND_MSG_VERSION_1)
2560 					rej->ibr_why = IBLND_REJECT_CONN_UNCOMPAT;
2561 			}
2562 
2563 			switch (rej->ibr_why) {
2564 			case IBLND_REJECT_CONN_RACE:
2565 			case IBLND_REJECT_CONN_STALE:
2566 			case IBLND_REJECT_CONN_UNCOMPAT:
2567 				kiblnd_reconnect(conn, rej->ibr_version,
2568 						 incarnation, rej->ibr_why, cp);
2569 				break;
2570 
2571 			case IBLND_REJECT_MSG_QUEUE_SIZE:
2572 				CERROR("%s rejected: incompatible message queue depth %d, %d\n",
2573 				       libcfs_nid2str(peer->ibp_nid),
2574 				       cp != NULL ? cp->ibcp_queue_depth :
2575 				       IBLND_MSG_QUEUE_SIZE(rej->ibr_version),
2576 				       IBLND_MSG_QUEUE_SIZE(conn->ibc_version));
2577 				break;
2578 
2579 			case IBLND_REJECT_RDMA_FRAGS:
2580 				CERROR("%s rejected: incompatible # of RDMA fragments %d, %d\n",
2581 				       libcfs_nid2str(peer->ibp_nid),
2582 				       cp != NULL ? cp->ibcp_max_frags :
2583 				       IBLND_RDMA_FRAGS(rej->ibr_version),
2584 				       IBLND_RDMA_FRAGS(conn->ibc_version));
2585 				break;
2586 
2587 			case IBLND_REJECT_NO_RESOURCES:
2588 				CERROR("%s rejected: o2iblnd no resources\n",
2589 				       libcfs_nid2str(peer->ibp_nid));
2590 				break;
2591 
2592 			case IBLND_REJECT_FATAL:
2593 				CERROR("%s rejected: o2iblnd fatal error\n",
2594 				       libcfs_nid2str(peer->ibp_nid));
2595 				break;
2596 
2597 			default:
2598 				CERROR("%s rejected: o2iblnd reason %d\n",
2599 				       libcfs_nid2str(peer->ibp_nid),
2600 				       rej->ibr_why);
2601 				break;
2602 			}
2603 			break;
2604 		}
2605 		/* fall through */
2606 	default:
2607 		CNETERR("%s rejected: reason %d, size %d\n",
2608 			libcfs_nid2str(peer->ibp_nid), reason, priv_nob);
2609 		break;
2610 	}
2611 
2612 	kiblnd_connreq_done(conn, -ECONNREFUSED);
2613 }
2614 
2615 static void
kiblnd_check_connreply(kib_conn_t * conn,void * priv,int priv_nob)2616 kiblnd_check_connreply(kib_conn_t *conn, void *priv, int priv_nob)
2617 {
2618 	kib_peer_t *peer = conn->ibc_peer;
2619 	lnet_ni_t *ni = peer->ibp_ni;
2620 	kib_net_t *net = ni->ni_data;
2621 	kib_msg_t *msg = priv;
2622 	int ver = conn->ibc_version;
2623 	int rc = kiblnd_unpack_msg(msg, priv_nob);
2624 	unsigned long flags;
2625 
2626 	LASSERT(net != NULL);
2627 
2628 	if (rc != 0) {
2629 		CERROR("Can't unpack connack from %s: %d\n",
2630 		       libcfs_nid2str(peer->ibp_nid), rc);
2631 		goto failed;
2632 	}
2633 
2634 	if (msg->ibm_type != IBLND_MSG_CONNACK) {
2635 		CERROR("Unexpected message %d from %s\n",
2636 		       msg->ibm_type, libcfs_nid2str(peer->ibp_nid));
2637 		rc = -EPROTO;
2638 		goto failed;
2639 	}
2640 
2641 	if (ver != msg->ibm_version) {
2642 		CERROR("%s replied version %x is different with requested version %x\n",
2643 		       libcfs_nid2str(peer->ibp_nid), msg->ibm_version, ver);
2644 		rc = -EPROTO;
2645 		goto failed;
2646 	}
2647 
2648 	if (msg->ibm_u.connparams.ibcp_queue_depth !=
2649 	    IBLND_MSG_QUEUE_SIZE(ver)) {
2650 		CERROR("%s has incompatible queue depth %d(%d wanted)\n",
2651 		       libcfs_nid2str(peer->ibp_nid),
2652 		       msg->ibm_u.connparams.ibcp_queue_depth,
2653 		       IBLND_MSG_QUEUE_SIZE(ver));
2654 		rc = -EPROTO;
2655 		goto failed;
2656 	}
2657 
2658 	if (msg->ibm_u.connparams.ibcp_max_frags !=
2659 	    IBLND_RDMA_FRAGS(ver)) {
2660 		CERROR("%s has incompatible max_frags %d (%d wanted)\n",
2661 		       libcfs_nid2str(peer->ibp_nid),
2662 		       msg->ibm_u.connparams.ibcp_max_frags,
2663 		       IBLND_RDMA_FRAGS(ver));
2664 		rc = -EPROTO;
2665 		goto failed;
2666 	}
2667 
2668 	if (msg->ibm_u.connparams.ibcp_max_msg_size > IBLND_MSG_SIZE) {
2669 		CERROR("%s max message size %d too big (%d max)\n",
2670 		       libcfs_nid2str(peer->ibp_nid),
2671 		       msg->ibm_u.connparams.ibcp_max_msg_size,
2672 		       IBLND_MSG_SIZE);
2673 		rc = -EPROTO;
2674 		goto failed;
2675 	}
2676 
2677 	read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2678 	if (msg->ibm_dstnid == ni->ni_nid &&
2679 	    msg->ibm_dststamp == net->ibn_incarnation)
2680 		rc = 0;
2681 	else
2682 		rc = -ESTALE;
2683 	read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2684 
2685 	if (rc != 0) {
2686 		CERROR("Bad connection reply from %s, rc = %d, version: %x max_frags: %d\n",
2687 		       libcfs_nid2str(peer->ibp_nid), rc,
2688 		       msg->ibm_version, msg->ibm_u.connparams.ibcp_max_frags);
2689 		goto failed;
2690 	}
2691 
2692 	conn->ibc_incarnation = msg->ibm_srcstamp;
2693 	conn->ibc_credits =
2694 	conn->ibc_reserved_credits = IBLND_MSG_QUEUE_SIZE(ver);
2695 	LASSERT(conn->ibc_credits + conn->ibc_reserved_credits + IBLND_OOB_MSGS(ver)
2696 		 <= IBLND_RX_MSGS(ver));
2697 
2698 	kiblnd_connreq_done(conn, 0);
2699 	return;
2700 
2701  failed:
2702 	/* NB My QP has already established itself, so I handle anything going
2703 	 * wrong here by setting ibc_comms_error.
2704 	 * kiblnd_connreq_done(0) moves the conn state to ESTABLISHED, but then
2705 	 * immediately tears it down. */
2706 
2707 	LASSERT(rc != 0);
2708 	conn->ibc_comms_error = rc;
2709 	kiblnd_connreq_done(conn, 0);
2710 }
2711 
2712 static int
kiblnd_active_connect(struct rdma_cm_id * cmid)2713 kiblnd_active_connect(struct rdma_cm_id *cmid)
2714 {
2715 	kib_peer_t *peer = (kib_peer_t *)cmid->context;
2716 	kib_conn_t *conn;
2717 	kib_msg_t *msg;
2718 	struct rdma_conn_param cp;
2719 	int version;
2720 	__u64 incarnation;
2721 	unsigned long flags;
2722 	int rc;
2723 
2724 	read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2725 
2726 	incarnation = peer->ibp_incarnation;
2727 	version = (peer->ibp_version == 0) ? IBLND_MSG_VERSION :
2728 					     peer->ibp_version;
2729 
2730 	read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2731 
2732 	conn = kiblnd_create_conn(peer, cmid, IBLND_CONN_ACTIVE_CONNECT, version);
2733 	if (conn == NULL) {
2734 		kiblnd_peer_connect_failed(peer, 1, -ENOMEM);
2735 		kiblnd_peer_decref(peer); /* lose cmid's ref */
2736 		return -ENOMEM;
2737 	}
2738 
2739 	/* conn "owns" cmid now, so I return success from here on to ensure the
2740 	 * CM callback doesn't destroy cmid. conn also takes over cmid's ref
2741 	 * on peer */
2742 
2743 	msg = &conn->ibc_connvars->cv_msg;
2744 
2745 	memset(msg, 0, sizeof(*msg));
2746 	kiblnd_init_msg(msg, IBLND_MSG_CONNREQ, sizeof(msg->ibm_u.connparams));
2747 	msg->ibm_u.connparams.ibcp_queue_depth  = IBLND_MSG_QUEUE_SIZE(version);
2748 	msg->ibm_u.connparams.ibcp_max_frags    = IBLND_RDMA_FRAGS(version);
2749 	msg->ibm_u.connparams.ibcp_max_msg_size = IBLND_MSG_SIZE;
2750 
2751 	kiblnd_pack_msg(peer->ibp_ni, msg, version,
2752 			0, peer->ibp_nid, incarnation);
2753 
2754 	memset(&cp, 0, sizeof(cp));
2755 	cp.private_data	= msg;
2756 	cp.private_data_len    = msg->ibm_nob;
2757 	cp.responder_resources = 0;	     /* No atomic ops or RDMA reads */
2758 	cp.initiator_depth     = 0;
2759 	cp.flow_control        = 1;
2760 	cp.retry_count         = *kiblnd_tunables.kib_retry_count;
2761 	cp.rnr_retry_count     = *kiblnd_tunables.kib_rnr_retry_count;
2762 
2763 	LASSERT(cmid->context == (void *)conn);
2764 	LASSERT(conn->ibc_cmid == cmid);
2765 
2766 	rc = rdma_connect(cmid, &cp);
2767 	if (rc != 0) {
2768 		CERROR("Can't connect to %s: %d\n",
2769 		       libcfs_nid2str(peer->ibp_nid), rc);
2770 		kiblnd_connreq_done(conn, rc);
2771 		kiblnd_conn_decref(conn);
2772 	}
2773 
2774 	return 0;
2775 }
2776 
2777 int
kiblnd_cm_callback(struct rdma_cm_id * cmid,struct rdma_cm_event * event)2778 kiblnd_cm_callback(struct rdma_cm_id *cmid, struct rdma_cm_event *event)
2779 {
2780 	kib_peer_t *peer;
2781 	kib_conn_t *conn;
2782 	int rc;
2783 
2784 	switch (event->event) {
2785 	default:
2786 		CERROR("Unexpected event: %d, status: %d\n",
2787 		       event->event, event->status);
2788 		LBUG();
2789 
2790 	case RDMA_CM_EVENT_CONNECT_REQUEST:
2791 		/* destroy cmid on failure */
2792 		rc = kiblnd_passive_connect(cmid,
2793 					    (void *)KIBLND_CONN_PARAM(event),
2794 					    KIBLND_CONN_PARAM_LEN(event));
2795 		CDEBUG(D_NET, "connreq: %d\n", rc);
2796 		return rc;
2797 
2798 	case RDMA_CM_EVENT_ADDR_ERROR:
2799 		peer = (kib_peer_t *)cmid->context;
2800 		CNETERR("%s: ADDR ERROR %d\n",
2801 		       libcfs_nid2str(peer->ibp_nid), event->status);
2802 		kiblnd_peer_connect_failed(peer, 1, -EHOSTUNREACH);
2803 		kiblnd_peer_decref(peer);
2804 		return -EHOSTUNREACH;      /* rc != 0 destroys cmid */
2805 
2806 	case RDMA_CM_EVENT_ADDR_RESOLVED:
2807 		peer = (kib_peer_t *)cmid->context;
2808 
2809 		CDEBUG(D_NET, "%s Addr resolved: %d\n",
2810 		       libcfs_nid2str(peer->ibp_nid), event->status);
2811 
2812 		if (event->status != 0) {
2813 			CNETERR("Can't resolve address for %s: %d\n",
2814 				libcfs_nid2str(peer->ibp_nid), event->status);
2815 			rc = event->status;
2816 		} else {
2817 			rc = rdma_resolve_route(
2818 				cmid, *kiblnd_tunables.kib_timeout * 1000);
2819 			if (rc == 0)
2820 				return 0;
2821 			/* Can't initiate route resolution */
2822 			CERROR("Can't resolve route for %s: %d\n",
2823 			       libcfs_nid2str(peer->ibp_nid), rc);
2824 		}
2825 		kiblnd_peer_connect_failed(peer, 1, rc);
2826 		kiblnd_peer_decref(peer);
2827 		return rc;		      /* rc != 0 destroys cmid */
2828 
2829 	case RDMA_CM_EVENT_ROUTE_ERROR:
2830 		peer = (kib_peer_t *)cmid->context;
2831 		CNETERR("%s: ROUTE ERROR %d\n",
2832 			libcfs_nid2str(peer->ibp_nid), event->status);
2833 		kiblnd_peer_connect_failed(peer, 1, -EHOSTUNREACH);
2834 		kiblnd_peer_decref(peer);
2835 		return -EHOSTUNREACH;	   /* rc != 0 destroys cmid */
2836 
2837 	case RDMA_CM_EVENT_ROUTE_RESOLVED:
2838 		peer = (kib_peer_t *)cmid->context;
2839 		CDEBUG(D_NET, "%s Route resolved: %d\n",
2840 		       libcfs_nid2str(peer->ibp_nid), event->status);
2841 
2842 		if (event->status == 0)
2843 			return kiblnd_active_connect(cmid);
2844 
2845 		CNETERR("Can't resolve route for %s: %d\n",
2846 		       libcfs_nid2str(peer->ibp_nid), event->status);
2847 		kiblnd_peer_connect_failed(peer, 1, event->status);
2848 		kiblnd_peer_decref(peer);
2849 		return event->status;	   /* rc != 0 destroys cmid */
2850 
2851 	case RDMA_CM_EVENT_UNREACHABLE:
2852 		conn = (kib_conn_t *)cmid->context;
2853 		LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT ||
2854 			conn->ibc_state == IBLND_CONN_PASSIVE_WAIT);
2855 		CNETERR("%s: UNREACHABLE %d\n",
2856 		       libcfs_nid2str(conn->ibc_peer->ibp_nid), event->status);
2857 		kiblnd_connreq_done(conn, -ENETDOWN);
2858 		kiblnd_conn_decref(conn);
2859 		return 0;
2860 
2861 	case RDMA_CM_EVENT_CONNECT_ERROR:
2862 		conn = (kib_conn_t *)cmid->context;
2863 		LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT ||
2864 			conn->ibc_state == IBLND_CONN_PASSIVE_WAIT);
2865 		CNETERR("%s: CONNECT ERROR %d\n",
2866 			libcfs_nid2str(conn->ibc_peer->ibp_nid), event->status);
2867 		kiblnd_connreq_done(conn, -ENOTCONN);
2868 		kiblnd_conn_decref(conn);
2869 		return 0;
2870 
2871 	case RDMA_CM_EVENT_REJECTED:
2872 		conn = (kib_conn_t *)cmid->context;
2873 		switch (conn->ibc_state) {
2874 		default:
2875 			LBUG();
2876 
2877 		case IBLND_CONN_PASSIVE_WAIT:
2878 			CERROR("%s: REJECTED %d\n",
2879 				libcfs_nid2str(conn->ibc_peer->ibp_nid),
2880 				event->status);
2881 			kiblnd_connreq_done(conn, -ECONNRESET);
2882 			break;
2883 
2884 		case IBLND_CONN_ACTIVE_CONNECT:
2885 			kiblnd_rejected(conn, event->status,
2886 					(void *)KIBLND_CONN_PARAM(event),
2887 					KIBLND_CONN_PARAM_LEN(event));
2888 			break;
2889 		}
2890 		kiblnd_conn_decref(conn);
2891 		return 0;
2892 
2893 	case RDMA_CM_EVENT_ESTABLISHED:
2894 		conn = (kib_conn_t *)cmid->context;
2895 		switch (conn->ibc_state) {
2896 		default:
2897 			LBUG();
2898 
2899 		case IBLND_CONN_PASSIVE_WAIT:
2900 			CDEBUG(D_NET, "ESTABLISHED (passive): %s\n",
2901 			       libcfs_nid2str(conn->ibc_peer->ibp_nid));
2902 			kiblnd_connreq_done(conn, 0);
2903 			break;
2904 
2905 		case IBLND_CONN_ACTIVE_CONNECT:
2906 			CDEBUG(D_NET, "ESTABLISHED(active): %s\n",
2907 			       libcfs_nid2str(conn->ibc_peer->ibp_nid));
2908 			kiblnd_check_connreply(conn,
2909 					       (void *)KIBLND_CONN_PARAM(event),
2910 					       KIBLND_CONN_PARAM_LEN(event));
2911 			break;
2912 		}
2913 		/* net keeps its ref on conn! */
2914 		return 0;
2915 
2916 	case RDMA_CM_EVENT_TIMEWAIT_EXIT:
2917 		CDEBUG(D_NET, "Ignore TIMEWAIT_EXIT event\n");
2918 		return 0;
2919 	case RDMA_CM_EVENT_DISCONNECTED:
2920 		conn = (kib_conn_t *)cmid->context;
2921 		if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
2922 			CERROR("%s DISCONNECTED\n",
2923 			       libcfs_nid2str(conn->ibc_peer->ibp_nid));
2924 			kiblnd_connreq_done(conn, -ECONNRESET);
2925 		} else {
2926 			kiblnd_close_conn(conn, 0);
2927 		}
2928 		kiblnd_conn_decref(conn);
2929 		cmid->context = NULL;
2930 		return 0;
2931 
2932 	case RDMA_CM_EVENT_DEVICE_REMOVAL:
2933 		LCONSOLE_ERROR_MSG(0x131,
2934 				   "Received notification of device removal\n"
2935 				   "Please shutdown LNET to allow this to proceed\n");
2936 		/* Can't remove network from underneath LNET for now, so I have
2937 		 * to ignore this */
2938 		return 0;
2939 
2940 	case RDMA_CM_EVENT_ADDR_CHANGE:
2941 		LCONSOLE_INFO("Physical link changed (eg hca/port)\n");
2942 		return 0;
2943 	}
2944 }
2945 
2946 static int
kiblnd_check_txs_locked(kib_conn_t * conn,struct list_head * txs)2947 kiblnd_check_txs_locked(kib_conn_t *conn, struct list_head *txs)
2948 {
2949 	kib_tx_t *tx;
2950 	struct list_head *ttmp;
2951 
2952 	list_for_each(ttmp, txs) {
2953 		tx = list_entry(ttmp, kib_tx_t, tx_list);
2954 
2955 		if (txs != &conn->ibc_active_txs) {
2956 			LASSERT(tx->tx_queued);
2957 		} else {
2958 			LASSERT(!tx->tx_queued);
2959 			LASSERT(tx->tx_waiting || tx->tx_sending != 0);
2960 		}
2961 
2962 		if (cfs_time_aftereq(jiffies, tx->tx_deadline)) {
2963 			CERROR("Timed out tx: %s, %lu seconds\n",
2964 			       kiblnd_queue2str(conn, txs),
2965 			       cfs_duration_sec(jiffies - tx->tx_deadline));
2966 			return 1;
2967 		}
2968 	}
2969 
2970 	return 0;
2971 }
2972 
2973 static int
kiblnd_conn_timed_out_locked(kib_conn_t * conn)2974 kiblnd_conn_timed_out_locked(kib_conn_t *conn)
2975 {
2976 	return  kiblnd_check_txs_locked(conn, &conn->ibc_tx_queue) ||
2977 		kiblnd_check_txs_locked(conn, &conn->ibc_tx_noops) ||
2978 		kiblnd_check_txs_locked(conn, &conn->ibc_tx_queue_rsrvd) ||
2979 		kiblnd_check_txs_locked(conn, &conn->ibc_tx_queue_nocred) ||
2980 		kiblnd_check_txs_locked(conn, &conn->ibc_active_txs);
2981 }
2982 
2983 static void
kiblnd_check_conns(int idx)2984 kiblnd_check_conns(int idx)
2985 {
2986 	LIST_HEAD(closes);
2987 	LIST_HEAD(checksends);
2988 	struct list_head *peers = &kiblnd_data.kib_peers[idx];
2989 	struct list_head *ptmp;
2990 	kib_peer_t *peer;
2991 	kib_conn_t *conn;
2992 	kib_conn_t *tmp;
2993 	struct list_head *ctmp;
2994 	unsigned long flags;
2995 
2996 	/* NB. We expect to have a look at all the peers and not find any
2997 	 * RDMAs to time out, so we just use a shared lock while we
2998 	 * take a look... */
2999 	read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
3000 
3001 	list_for_each(ptmp, peers) {
3002 		peer = list_entry(ptmp, kib_peer_t, ibp_list);
3003 
3004 		list_for_each(ctmp, &peer->ibp_conns) {
3005 			int timedout;
3006 			int sendnoop;
3007 
3008 			conn = list_entry(ctmp, kib_conn_t, ibc_list);
3009 
3010 			LASSERT(conn->ibc_state == IBLND_CONN_ESTABLISHED);
3011 
3012 			spin_lock(&conn->ibc_lock);
3013 
3014 			sendnoop = kiblnd_need_noop(conn);
3015 			timedout = kiblnd_conn_timed_out_locked(conn);
3016 			if (!sendnoop && !timedout) {
3017 				spin_unlock(&conn->ibc_lock);
3018 				continue;
3019 			}
3020 
3021 			if (timedout) {
3022 				CERROR("Timed out RDMA with %s (%lu): c: %u, oc: %u, rc: %u\n",
3023 				       libcfs_nid2str(peer->ibp_nid),
3024 				       cfs_duration_sec(cfs_time_current() -
3025 							peer->ibp_last_alive),
3026 				       conn->ibc_credits,
3027 				       conn->ibc_outstanding_credits,
3028 				       conn->ibc_reserved_credits);
3029 				list_add(&conn->ibc_connd_list, &closes);
3030 			} else {
3031 				list_add(&conn->ibc_connd_list,
3032 					     &checksends);
3033 			}
3034 			/* +ref for 'closes' or 'checksends' */
3035 			kiblnd_conn_addref(conn);
3036 
3037 			spin_unlock(&conn->ibc_lock);
3038 		}
3039 	}
3040 
3041 	read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
3042 
3043 	/* Handle timeout by closing the whole
3044 	 * connection. We can only be sure RDMA activity
3045 	 * has ceased once the QP has been modified. */
3046 	list_for_each_entry_safe(conn, tmp, &closes, ibc_connd_list) {
3047 		list_del(&conn->ibc_connd_list);
3048 		kiblnd_close_conn(conn, -ETIMEDOUT);
3049 		kiblnd_conn_decref(conn);
3050 	}
3051 
3052 	/* In case we have enough credits to return via a
3053 	 * NOOP, but there were no non-blocking tx descs
3054 	 * free to do it last time... */
3055 	while (!list_empty(&checksends)) {
3056 		conn = list_entry(checksends.next,
3057 				      kib_conn_t, ibc_connd_list);
3058 		list_del(&conn->ibc_connd_list);
3059 		kiblnd_check_sends(conn);
3060 		kiblnd_conn_decref(conn);
3061 	}
3062 }
3063 
3064 static void
kiblnd_disconnect_conn(kib_conn_t * conn)3065 kiblnd_disconnect_conn(kib_conn_t *conn)
3066 {
3067 	LASSERT(!in_interrupt());
3068 	LASSERT(current == kiblnd_data.kib_connd);
3069 	LASSERT(conn->ibc_state == IBLND_CONN_CLOSING);
3070 
3071 	rdma_disconnect(conn->ibc_cmid);
3072 	kiblnd_finalise_conn(conn);
3073 
3074 	kiblnd_peer_notify(conn->ibc_peer);
3075 }
3076 
3077 int
kiblnd_connd(void * arg)3078 kiblnd_connd(void *arg)
3079 {
3080 	wait_queue_t wait;
3081 	unsigned long flags;
3082 	kib_conn_t *conn;
3083 	int timeout;
3084 	int i;
3085 	int dropped_lock;
3086 	int peer_index = 0;
3087 	unsigned long deadline = jiffies;
3088 
3089 	cfs_block_allsigs();
3090 
3091 	init_waitqueue_entry(&wait, current);
3092 	kiblnd_data.kib_connd = current;
3093 
3094 	spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
3095 
3096 	while (!kiblnd_data.kib_shutdown) {
3097 
3098 		dropped_lock = 0;
3099 
3100 		if (!list_empty(&kiblnd_data.kib_connd_zombies)) {
3101 			conn = list_entry(kiblnd_data.kib_connd_zombies.next,
3102 					      kib_conn_t, ibc_list);
3103 			list_del(&conn->ibc_list);
3104 
3105 			spin_unlock_irqrestore(&kiblnd_data.kib_connd_lock,
3106 					       flags);
3107 			dropped_lock = 1;
3108 
3109 			kiblnd_destroy_conn(conn);
3110 
3111 			spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
3112 		}
3113 
3114 		if (!list_empty(&kiblnd_data.kib_connd_conns)) {
3115 			conn = list_entry(kiblnd_data.kib_connd_conns.next,
3116 					      kib_conn_t, ibc_list);
3117 			list_del(&conn->ibc_list);
3118 
3119 			spin_unlock_irqrestore(&kiblnd_data.kib_connd_lock,
3120 					       flags);
3121 			dropped_lock = 1;
3122 
3123 			kiblnd_disconnect_conn(conn);
3124 			kiblnd_conn_decref(conn);
3125 
3126 			spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
3127 		}
3128 
3129 		/* careful with the jiffy wrap... */
3130 		timeout = (int)(deadline - jiffies);
3131 		if (timeout <= 0) {
3132 			const int n = 4;
3133 			const int p = 1;
3134 			int chunk = kiblnd_data.kib_peer_hash_size;
3135 
3136 			spin_unlock_irqrestore(&kiblnd_data.kib_connd_lock, flags);
3137 			dropped_lock = 1;
3138 
3139 			/* Time to check for RDMA timeouts on a few more
3140 			 * peers: I do checks every 'p' seconds on a
3141 			 * proportion of the peer table and I need to check
3142 			 * every connection 'n' times within a timeout
3143 			 * interval, to ensure I detect a timeout on any
3144 			 * connection within (n+1)/n times the timeout
3145 			 * interval. */
3146 
3147 			if (*kiblnd_tunables.kib_timeout > n * p)
3148 				chunk = (chunk * n * p) /
3149 					*kiblnd_tunables.kib_timeout;
3150 			if (chunk == 0)
3151 				chunk = 1;
3152 
3153 			for (i = 0; i < chunk; i++) {
3154 				kiblnd_check_conns(peer_index);
3155 				peer_index = (peer_index + 1) %
3156 					     kiblnd_data.kib_peer_hash_size;
3157 			}
3158 
3159 			deadline += p * HZ;
3160 			spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
3161 		}
3162 
3163 		if (dropped_lock)
3164 			continue;
3165 
3166 		/* Nothing to do for 'timeout'  */
3167 		set_current_state(TASK_INTERRUPTIBLE);
3168 		add_wait_queue(&kiblnd_data.kib_connd_waitq, &wait);
3169 		spin_unlock_irqrestore(&kiblnd_data.kib_connd_lock, flags);
3170 
3171 		schedule_timeout(timeout);
3172 
3173 		remove_wait_queue(&kiblnd_data.kib_connd_waitq, &wait);
3174 		spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
3175 	}
3176 
3177 	spin_unlock_irqrestore(&kiblnd_data.kib_connd_lock, flags);
3178 
3179 	kiblnd_thread_fini();
3180 	return 0;
3181 }
3182 
3183 void
kiblnd_qp_event(struct ib_event * event,void * arg)3184 kiblnd_qp_event(struct ib_event *event, void *arg)
3185 {
3186 	kib_conn_t *conn = arg;
3187 
3188 	switch (event->event) {
3189 	case IB_EVENT_COMM_EST:
3190 		CDEBUG(D_NET, "%s established\n",
3191 		       libcfs_nid2str(conn->ibc_peer->ibp_nid));
3192 		return;
3193 
3194 	default:
3195 		CERROR("%s: Async QP event type %d\n",
3196 		       libcfs_nid2str(conn->ibc_peer->ibp_nid), event->event);
3197 		return;
3198 	}
3199 }
3200 
3201 static void
kiblnd_complete(struct ib_wc * wc)3202 kiblnd_complete(struct ib_wc *wc)
3203 {
3204 	switch (kiblnd_wreqid2type(wc->wr_id)) {
3205 	default:
3206 		LBUG();
3207 
3208 	case IBLND_WID_RDMA:
3209 		/* We only get RDMA completion notification if it fails.  All
3210 		 * subsequent work items, including the final SEND will fail
3211 		 * too.  However we can't print out any more info about the
3212 		 * failing RDMA because 'tx' might be back on the idle list or
3213 		 * even reused already if we didn't manage to post all our work
3214 		 * items */
3215 		CNETERR("RDMA (tx: %p) failed: %d\n",
3216 			kiblnd_wreqid2ptr(wc->wr_id), wc->status);
3217 		return;
3218 
3219 	case IBLND_WID_TX:
3220 		kiblnd_tx_complete(kiblnd_wreqid2ptr(wc->wr_id), wc->status);
3221 		return;
3222 
3223 	case IBLND_WID_RX:
3224 		kiblnd_rx_complete(kiblnd_wreqid2ptr(wc->wr_id), wc->status,
3225 				   wc->byte_len);
3226 		return;
3227 	}
3228 }
3229 
3230 void
kiblnd_cq_completion(struct ib_cq * cq,void * arg)3231 kiblnd_cq_completion(struct ib_cq *cq, void *arg)
3232 {
3233 	/* NB I'm not allowed to schedule this conn once its refcount has
3234 	 * reached 0.  Since fundamentally I'm racing with scheduler threads
3235 	 * consuming my CQ I could be called after all completions have
3236 	 * occurred.  But in this case, ibc_nrx == 0 && ibc_nsends_posted == 0
3237 	 * and this CQ is about to be destroyed so I NOOP. */
3238 	kib_conn_t *conn = arg;
3239 	struct kib_sched_info *sched = conn->ibc_sched;
3240 	unsigned long flags;
3241 
3242 	LASSERT(cq == conn->ibc_cq);
3243 
3244 	spin_lock_irqsave(&sched->ibs_lock, flags);
3245 
3246 	conn->ibc_ready = 1;
3247 
3248 	if (!conn->ibc_scheduled &&
3249 	    (conn->ibc_nrx > 0 ||
3250 	     conn->ibc_nsends_posted > 0)) {
3251 		kiblnd_conn_addref(conn); /* +1 ref for sched_conns */
3252 		conn->ibc_scheduled = 1;
3253 		list_add_tail(&conn->ibc_sched_list, &sched->ibs_conns);
3254 
3255 		if (waitqueue_active(&sched->ibs_waitq))
3256 			wake_up(&sched->ibs_waitq);
3257 	}
3258 
3259 	spin_unlock_irqrestore(&sched->ibs_lock, flags);
3260 }
3261 
3262 void
kiblnd_cq_event(struct ib_event * event,void * arg)3263 kiblnd_cq_event(struct ib_event *event, void *arg)
3264 {
3265 	kib_conn_t *conn = arg;
3266 
3267 	CERROR("%s: async CQ event type %d\n",
3268 	       libcfs_nid2str(conn->ibc_peer->ibp_nid), event->event);
3269 }
3270 
3271 int
kiblnd_scheduler(void * arg)3272 kiblnd_scheduler(void *arg)
3273 {
3274 	long id = (long)arg;
3275 	struct kib_sched_info *sched;
3276 	kib_conn_t *conn;
3277 	wait_queue_t wait;
3278 	unsigned long flags;
3279 	struct ib_wc wc;
3280 	int did_something;
3281 	int busy_loops = 0;
3282 	int rc;
3283 
3284 	cfs_block_allsigs();
3285 
3286 	init_waitqueue_entry(&wait, current);
3287 
3288 	sched = kiblnd_data.kib_scheds[KIB_THREAD_CPT(id)];
3289 
3290 	rc = cfs_cpt_bind(lnet_cpt_table(), sched->ibs_cpt);
3291 	if (rc != 0) {
3292 		CWARN("Failed to bind on CPT %d, please verify whether all CPUs are healthy and reload modules if necessary, otherwise your system might under risk of low performance\n",
3293 		      sched->ibs_cpt);
3294 	}
3295 
3296 	spin_lock_irqsave(&sched->ibs_lock, flags);
3297 
3298 	while (!kiblnd_data.kib_shutdown) {
3299 		if (busy_loops++ >= IBLND_RESCHED) {
3300 			spin_unlock_irqrestore(&sched->ibs_lock, flags);
3301 
3302 			cond_resched();
3303 			busy_loops = 0;
3304 
3305 			spin_lock_irqsave(&sched->ibs_lock, flags);
3306 		}
3307 
3308 		did_something = 0;
3309 
3310 		if (!list_empty(&sched->ibs_conns)) {
3311 			conn = list_entry(sched->ibs_conns.next,
3312 					      kib_conn_t, ibc_sched_list);
3313 			/* take over kib_sched_conns' ref on conn... */
3314 			LASSERT(conn->ibc_scheduled);
3315 			list_del(&conn->ibc_sched_list);
3316 			conn->ibc_ready = 0;
3317 
3318 			spin_unlock_irqrestore(&sched->ibs_lock, flags);
3319 
3320 			rc = ib_poll_cq(conn->ibc_cq, 1, &wc);
3321 			if (rc == 0) {
3322 				rc = ib_req_notify_cq(conn->ibc_cq,
3323 						      IB_CQ_NEXT_COMP);
3324 				if (rc < 0) {
3325 					CWARN("%s: ib_req_notify_cq failed: %d, closing connection\n",
3326 					      libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
3327 					kiblnd_close_conn(conn, -EIO);
3328 					kiblnd_conn_decref(conn);
3329 					spin_lock_irqsave(&sched->ibs_lock,
3330 							      flags);
3331 					continue;
3332 				}
3333 
3334 				rc = ib_poll_cq(conn->ibc_cq, 1, &wc);
3335 			}
3336 
3337 			if (rc < 0) {
3338 				CWARN("%s: ib_poll_cq failed: %d, closing connection\n",
3339 				      libcfs_nid2str(conn->ibc_peer->ibp_nid),
3340 				      rc);
3341 				kiblnd_close_conn(conn, -EIO);
3342 				kiblnd_conn_decref(conn);
3343 				spin_lock_irqsave(&sched->ibs_lock, flags);
3344 				continue;
3345 			}
3346 
3347 			spin_lock_irqsave(&sched->ibs_lock, flags);
3348 
3349 			if (rc != 0 || conn->ibc_ready) {
3350 				/* There may be another completion waiting; get
3351 				 * another scheduler to check while I handle
3352 				 * this one... */
3353 				/* +1 ref for sched_conns */
3354 				kiblnd_conn_addref(conn);
3355 				list_add_tail(&conn->ibc_sched_list,
3356 						  &sched->ibs_conns);
3357 				if (waitqueue_active(&sched->ibs_waitq))
3358 					wake_up(&sched->ibs_waitq);
3359 			} else {
3360 				conn->ibc_scheduled = 0;
3361 			}
3362 
3363 			if (rc != 0) {
3364 				spin_unlock_irqrestore(&sched->ibs_lock, flags);
3365 				kiblnd_complete(&wc);
3366 
3367 				spin_lock_irqsave(&sched->ibs_lock, flags);
3368 			}
3369 
3370 			kiblnd_conn_decref(conn); /* ...drop my ref from above */
3371 			did_something = 1;
3372 		}
3373 
3374 		if (did_something)
3375 			continue;
3376 
3377 		set_current_state(TASK_INTERRUPTIBLE);
3378 		add_wait_queue_exclusive(&sched->ibs_waitq, &wait);
3379 		spin_unlock_irqrestore(&sched->ibs_lock, flags);
3380 
3381 		schedule();
3382 		busy_loops = 0;
3383 
3384 		remove_wait_queue(&sched->ibs_waitq, &wait);
3385 		spin_lock_irqsave(&sched->ibs_lock, flags);
3386 	}
3387 
3388 	spin_unlock_irqrestore(&sched->ibs_lock, flags);
3389 
3390 	kiblnd_thread_fini();
3391 	return 0;
3392 }
3393 
3394 int
kiblnd_failover_thread(void * arg)3395 kiblnd_failover_thread(void *arg)
3396 {
3397 	rwlock_t *glock = &kiblnd_data.kib_global_lock;
3398 	kib_dev_t *dev;
3399 	wait_queue_t wait;
3400 	unsigned long flags;
3401 	int rc;
3402 
3403 	LASSERT(*kiblnd_tunables.kib_dev_failover != 0);
3404 
3405 	cfs_block_allsigs();
3406 
3407 	init_waitqueue_entry(&wait, current);
3408 	write_lock_irqsave(glock, flags);
3409 
3410 	while (!kiblnd_data.kib_shutdown) {
3411 		int do_failover = 0;
3412 		int long_sleep;
3413 
3414 		list_for_each_entry(dev, &kiblnd_data.kib_failed_devs,
3415 				    ibd_fail_list) {
3416 			if (time_before(cfs_time_current(),
3417 					dev->ibd_next_failover))
3418 				continue;
3419 			do_failover = 1;
3420 			break;
3421 		}
3422 
3423 		if (do_failover) {
3424 			list_del_init(&dev->ibd_fail_list);
3425 			dev->ibd_failover = 1;
3426 			write_unlock_irqrestore(glock, flags);
3427 
3428 			rc = kiblnd_dev_failover(dev);
3429 
3430 			write_lock_irqsave(glock, flags);
3431 
3432 			LASSERT(dev->ibd_failover);
3433 			dev->ibd_failover = 0;
3434 			if (rc >= 0) { /* Device is OK or failover succeed */
3435 				dev->ibd_next_failover = cfs_time_shift(3);
3436 				continue;
3437 			}
3438 
3439 			/* failed to failover, retry later */
3440 			dev->ibd_next_failover =
3441 				cfs_time_shift(min(dev->ibd_failed_failover, 10));
3442 			if (kiblnd_dev_can_failover(dev)) {
3443 				list_add_tail(&dev->ibd_fail_list,
3444 					      &kiblnd_data.kib_failed_devs);
3445 			}
3446 
3447 			continue;
3448 		}
3449 
3450 		/* long sleep if no more pending failover */
3451 		long_sleep = list_empty(&kiblnd_data.kib_failed_devs);
3452 
3453 		set_current_state(TASK_INTERRUPTIBLE);
3454 		add_wait_queue(&kiblnd_data.kib_failover_waitq, &wait);
3455 		write_unlock_irqrestore(glock, flags);
3456 
3457 		rc = schedule_timeout(long_sleep ? cfs_time_seconds(10) :
3458 						   cfs_time_seconds(1));
3459 		remove_wait_queue(&kiblnd_data.kib_failover_waitq, &wait);
3460 		write_lock_irqsave(glock, flags);
3461 
3462 		if (!long_sleep || rc != 0)
3463 			continue;
3464 
3465 		/* have a long sleep, routine check all active devices,
3466 		 * we need checking like this because if there is not active
3467 		 * connection on the dev and no SEND from local, we may listen
3468 		 * on wrong HCA for ever while there is a bonding failover */
3469 		list_for_each_entry(dev, &kiblnd_data.kib_devs, ibd_list) {
3470 			if (kiblnd_dev_can_failover(dev)) {
3471 				list_add_tail(&dev->ibd_fail_list,
3472 					      &kiblnd_data.kib_failed_devs);
3473 			}
3474 		}
3475 	}
3476 
3477 	write_unlock_irqrestore(glock, flags);
3478 
3479 	kiblnd_thread_fini();
3480 	return 0;
3481 }
3482