1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36 
37 #include "socklnd.h"
38 
39 int
ksocknal_lib_get_conn_addrs(ksock_conn_t * conn)40 ksocknal_lib_get_conn_addrs(ksock_conn_t *conn)
41 {
42 	int rc = libcfs_sock_getaddr(conn->ksnc_sock, 1,
43 				     &conn->ksnc_ipaddr,
44 				     &conn->ksnc_port);
45 
46 	/* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
47 	LASSERT(!conn->ksnc_closing);
48 
49 	if (rc != 0) {
50 		CERROR("Error %d getting sock peer IP\n", rc);
51 		return rc;
52 	}
53 
54 	rc = libcfs_sock_getaddr(conn->ksnc_sock, 0,
55 				 &conn->ksnc_myipaddr, NULL);
56 	if (rc != 0) {
57 		CERROR("Error %d getting sock local IP\n", rc);
58 		return rc;
59 	}
60 
61 	return 0;
62 }
63 
64 int
ksocknal_lib_zc_capable(ksock_conn_t * conn)65 ksocknal_lib_zc_capable(ksock_conn_t *conn)
66 {
67 	int  caps = conn->ksnc_sock->sk->sk_route_caps;
68 
69 	if (conn->ksnc_proto == &ksocknal_protocol_v1x)
70 		return 0;
71 
72 	/* ZC if the socket supports scatter/gather and doesn't need software
73 	 * checksums */
74 	return ((caps & NETIF_F_SG) != 0 && (caps & NETIF_F_ALL_CSUM) != 0);
75 }
76 
77 int
ksocknal_lib_send_iov(ksock_conn_t * conn,ksock_tx_t * tx)78 ksocknal_lib_send_iov(ksock_conn_t *conn, ksock_tx_t *tx)
79 {
80 	struct socket *sock = conn->ksnc_sock;
81 	int	    nob;
82 	int	    rc;
83 
84 	if (*ksocknal_tunables.ksnd_enable_csum	&& /* checksum enabled */
85 	    conn->ksnc_proto == &ksocknal_protocol_v2x && /* V2.x connection  */
86 	    tx->tx_nob == tx->tx_resid		 && /* frist sending    */
87 	    tx->tx_msg.ksm_csum == 0)		     /* not checksummed  */
88 		ksocknal_lib_csum_tx(tx);
89 
90 	/* NB we can't trust socket ops to either consume our iovs
91 	 * or leave them alone. */
92 
93 	{
94 #if SOCKNAL_SINGLE_FRAG_TX
95 		struct kvec    scratch;
96 		struct kvec   *scratchiov = &scratch;
97 		unsigned int    niov = 1;
98 #else
99 		struct kvec   *scratchiov = conn->ksnc_scheduler->kss_scratch_iov;
100 		unsigned int    niov = tx->tx_niov;
101 #endif
102 		struct msghdr msg = {.msg_flags = MSG_DONTWAIT};
103 		int  i;
104 
105 		for (nob = i = 0; i < niov; i++) {
106 			scratchiov[i] = tx->tx_iov[i];
107 			nob += scratchiov[i].iov_len;
108 		}
109 
110 		if (!list_empty(&conn->ksnc_tx_queue) ||
111 		    nob < tx->tx_resid)
112 			msg.msg_flags |= MSG_MORE;
113 
114 		rc = kernel_sendmsg(sock, &msg, scratchiov, niov, nob);
115 	}
116 	return rc;
117 }
118 
119 int
ksocknal_lib_send_kiov(ksock_conn_t * conn,ksock_tx_t * tx)120 ksocknal_lib_send_kiov(ksock_conn_t *conn, ksock_tx_t *tx)
121 {
122 	struct socket *sock = conn->ksnc_sock;
123 	lnet_kiov_t   *kiov = tx->tx_kiov;
124 	int	    rc;
125 	int	    nob;
126 
127 	/* Not NOOP message */
128 	LASSERT(tx->tx_lnetmsg != NULL);
129 
130 	/* NB we can't trust socket ops to either consume our iovs
131 	 * or leave them alone. */
132 	if (tx->tx_msg.ksm_zc_cookies[0] != 0) {
133 		/* Zero copy is enabled */
134 		struct sock   *sk = sock->sk;
135 		struct page   *page = kiov->kiov_page;
136 		int	    offset = kiov->kiov_offset;
137 		int	    fragsize = kiov->kiov_len;
138 		int	    msgflg = MSG_DONTWAIT;
139 
140 		CDEBUG(D_NET, "page %p + offset %x for %d\n",
141 			       page, offset, kiov->kiov_len);
142 
143 		if (!list_empty(&conn->ksnc_tx_queue) ||
144 		    fragsize < tx->tx_resid)
145 			msgflg |= MSG_MORE;
146 
147 		if (sk->sk_prot->sendpage != NULL) {
148 			rc = sk->sk_prot->sendpage(sk, page,
149 						   offset, fragsize, msgflg);
150 		} else {
151 			rc = cfs_tcp_sendpage(sk, page, offset, fragsize,
152 					      msgflg);
153 		}
154 	} else {
155 #if SOCKNAL_SINGLE_FRAG_TX || !SOCKNAL_RISK_KMAP_DEADLOCK
156 		struct kvec  scratch;
157 		struct kvec *scratchiov = &scratch;
158 		unsigned int  niov = 1;
159 #else
160 #ifdef CONFIG_HIGHMEM
161 #warning "XXX risk of kmap deadlock on multiple frags..."
162 #endif
163 		struct kvec *scratchiov = conn->ksnc_scheduler->kss_scratch_iov;
164 		unsigned int  niov = tx->tx_nkiov;
165 #endif
166 		struct msghdr msg = {.msg_flags = MSG_DONTWAIT};
167 		int	   i;
168 
169 		for (nob = i = 0; i < niov; i++) {
170 			scratchiov[i].iov_base = kmap(kiov[i].kiov_page) +
171 						 kiov[i].kiov_offset;
172 			nob += scratchiov[i].iov_len = kiov[i].kiov_len;
173 		}
174 
175 		if (!list_empty(&conn->ksnc_tx_queue) ||
176 		    nob < tx->tx_resid)
177 			msg.msg_flags |= MSG_MORE;
178 
179 		rc = kernel_sendmsg(sock, &msg, (struct kvec *)scratchiov, niov, nob);
180 
181 		for (i = 0; i < niov; i++)
182 			kunmap(kiov[i].kiov_page);
183 	}
184 	return rc;
185 }
186 
187 void
ksocknal_lib_eager_ack(ksock_conn_t * conn)188 ksocknal_lib_eager_ack(ksock_conn_t *conn)
189 {
190 	int	    opt = 1;
191 	struct socket *sock = conn->ksnc_sock;
192 
193 	/* Remind the socket to ACK eagerly.  If I don't, the socket might
194 	 * think I'm about to send something it could piggy-back the ACK
195 	 * on, introducing delay in completing zero-copy sends in my
196 	 * peer. */
197 
198 	kernel_setsockopt(sock, SOL_TCP, TCP_QUICKACK,
199 			       (char *)&opt, sizeof(opt));
200 }
201 
202 int
ksocknal_lib_recv_iov(ksock_conn_t * conn)203 ksocknal_lib_recv_iov(ksock_conn_t *conn)
204 {
205 #if SOCKNAL_SINGLE_FRAG_RX
206 	struct kvec  scratch;
207 	struct kvec *scratchiov = &scratch;
208 	unsigned int  niov = 1;
209 #else
210 	struct kvec *scratchiov = conn->ksnc_scheduler->kss_scratch_iov;
211 	unsigned int  niov = conn->ksnc_rx_niov;
212 #endif
213 	struct kvec *iov = conn->ksnc_rx_iov;
214 	struct msghdr msg = {
215 		.msg_flags      = 0
216 	};
217 	int	  nob;
218 	int	  i;
219 	int	  rc;
220 	int	  fragnob;
221 	int	  sum;
222 	__u32	saved_csum;
223 
224 	/* NB we can't trust socket ops to either consume our iovs
225 	 * or leave them alone. */
226 	LASSERT(niov > 0);
227 
228 	for (nob = i = 0; i < niov; i++) {
229 		scratchiov[i] = iov[i];
230 		nob += scratchiov[i].iov_len;
231 	}
232 	LASSERT(nob <= conn->ksnc_rx_nob_wanted);
233 
234 	rc = kernel_recvmsg(conn->ksnc_sock, &msg,
235 		scratchiov, niov, nob, MSG_DONTWAIT);
236 
237 	saved_csum = 0;
238 	if (conn->ksnc_proto == &ksocknal_protocol_v2x) {
239 		saved_csum = conn->ksnc_msg.ksm_csum;
240 		conn->ksnc_msg.ksm_csum = 0;
241 	}
242 
243 	if (saved_csum != 0) {
244 		/* accumulate checksum */
245 		for (i = 0, sum = rc; sum > 0; i++, sum -= fragnob) {
246 			LASSERT(i < niov);
247 
248 			fragnob = iov[i].iov_len;
249 			if (fragnob > sum)
250 				fragnob = sum;
251 
252 			conn->ksnc_rx_csum = ksocknal_csum(conn->ksnc_rx_csum,
253 							   iov[i].iov_base, fragnob);
254 		}
255 		conn->ksnc_msg.ksm_csum = saved_csum;
256 	}
257 
258 	return rc;
259 }
260 
261 static void
ksocknal_lib_kiov_vunmap(void * addr)262 ksocknal_lib_kiov_vunmap(void *addr)
263 {
264 	if (addr == NULL)
265 		return;
266 
267 	vunmap(addr);
268 }
269 
270 static void *
ksocknal_lib_kiov_vmap(lnet_kiov_t * kiov,int niov,struct kvec * iov,struct page ** pages)271 ksocknal_lib_kiov_vmap(lnet_kiov_t *kiov, int niov,
272 		       struct kvec *iov, struct page **pages)
273 {
274 	void	     *addr;
275 	int	       nob;
276 	int	       i;
277 
278 	if (!*ksocknal_tunables.ksnd_zc_recv || pages == NULL)
279 		return NULL;
280 
281 	LASSERT(niov <= LNET_MAX_IOV);
282 
283 	if (niov < 2 ||
284 	    niov < *ksocknal_tunables.ksnd_zc_recv_min_nfrags)
285 		return NULL;
286 
287 	for (nob = i = 0; i < niov; i++) {
288 		if ((kiov[i].kiov_offset != 0 && i > 0) ||
289 		    (kiov[i].kiov_offset + kiov[i].kiov_len != PAGE_CACHE_SIZE && i < niov - 1))
290 			return NULL;
291 
292 		pages[i] = kiov[i].kiov_page;
293 		nob += kiov[i].kiov_len;
294 	}
295 
296 	addr = vmap(pages, niov, VM_MAP, PAGE_KERNEL);
297 	if (addr == NULL)
298 		return NULL;
299 
300 	iov->iov_base = addr + kiov[0].kiov_offset;
301 	iov->iov_len = nob;
302 
303 	return addr;
304 }
305 
306 int
ksocknal_lib_recv_kiov(ksock_conn_t * conn)307 ksocknal_lib_recv_kiov(ksock_conn_t *conn)
308 {
309 #if SOCKNAL_SINGLE_FRAG_RX || !SOCKNAL_RISK_KMAP_DEADLOCK
310 	struct kvec   scratch;
311 	struct kvec  *scratchiov = &scratch;
312 	struct page  **pages      = NULL;
313 	unsigned int   niov       = 1;
314 #else
315 #ifdef CONFIG_HIGHMEM
316 #warning "XXX risk of kmap deadlock on multiple frags..."
317 #endif
318 	struct kvec  *scratchiov = conn->ksnc_scheduler->kss_scratch_iov;
319 	struct page  **pages      = conn->ksnc_scheduler->kss_rx_scratch_pgs;
320 	unsigned int   niov       = conn->ksnc_rx_nkiov;
321 #endif
322 	lnet_kiov_t   *kiov = conn->ksnc_rx_kiov;
323 	struct msghdr msg = {
324 		.msg_flags      = 0
325 	};
326 	int	  nob;
327 	int	  i;
328 	int	  rc;
329 	void	*base;
330 	void	*addr;
331 	int	  sum;
332 	int	  fragnob;
333 	int n;
334 
335 	/* NB we can't trust socket ops to either consume our iovs
336 	 * or leave them alone. */
337 	addr = ksocknal_lib_kiov_vmap(kiov, niov, scratchiov, pages);
338 	if (addr != NULL) {
339 		nob = scratchiov[0].iov_len;
340 		n = 1;
341 
342 	} else {
343 		for (nob = i = 0; i < niov; i++) {
344 			nob += scratchiov[i].iov_len = kiov[i].kiov_len;
345 			scratchiov[i].iov_base = kmap(kiov[i].kiov_page) +
346 						 kiov[i].kiov_offset;
347 		}
348 		n = niov;
349 	}
350 
351 	LASSERT(nob <= conn->ksnc_rx_nob_wanted);
352 
353 	rc = kernel_recvmsg(conn->ksnc_sock, &msg,
354 			(struct kvec *)scratchiov, n, nob, MSG_DONTWAIT);
355 
356 	if (conn->ksnc_msg.ksm_csum != 0) {
357 		for (i = 0, sum = rc; sum > 0; i++, sum -= fragnob) {
358 			LASSERT(i < niov);
359 
360 			/* Dang! have to kmap again because I have nowhere to stash the
361 			 * mapped address.  But by doing it while the page is still
362 			 * mapped, the kernel just bumps the map count and returns me
363 			 * the address it stashed. */
364 			base = kmap(kiov[i].kiov_page) + kiov[i].kiov_offset;
365 			fragnob = kiov[i].kiov_len;
366 			if (fragnob > sum)
367 				fragnob = sum;
368 
369 			conn->ksnc_rx_csum = ksocknal_csum(conn->ksnc_rx_csum,
370 							   base, fragnob);
371 
372 			kunmap(kiov[i].kiov_page);
373 		}
374 	}
375 
376 	if (addr != NULL) {
377 		ksocknal_lib_kiov_vunmap(addr);
378 	} else {
379 		for (i = 0; i < niov; i++)
380 			kunmap(kiov[i].kiov_page);
381 	}
382 
383 	return rc;
384 }
385 
386 void
ksocknal_lib_csum_tx(ksock_tx_t * tx)387 ksocknal_lib_csum_tx(ksock_tx_t *tx)
388 {
389 	int	  i;
390 	__u32	csum;
391 	void	*base;
392 
393 	LASSERT(tx->tx_iov[0].iov_base == &tx->tx_msg);
394 	LASSERT(tx->tx_conn != NULL);
395 	LASSERT(tx->tx_conn->ksnc_proto == &ksocknal_protocol_v2x);
396 
397 	tx->tx_msg.ksm_csum = 0;
398 
399 	csum = ksocknal_csum(~0, tx->tx_iov[0].iov_base,
400 			     tx->tx_iov[0].iov_len);
401 
402 	if (tx->tx_kiov != NULL) {
403 		for (i = 0; i < tx->tx_nkiov; i++) {
404 			base = kmap(tx->tx_kiov[i].kiov_page) +
405 			       tx->tx_kiov[i].kiov_offset;
406 
407 			csum = ksocknal_csum(csum, base, tx->tx_kiov[i].kiov_len);
408 
409 			kunmap(tx->tx_kiov[i].kiov_page);
410 		}
411 	} else {
412 		for (i = 1; i < tx->tx_niov; i++)
413 			csum = ksocknal_csum(csum, tx->tx_iov[i].iov_base,
414 					     tx->tx_iov[i].iov_len);
415 	}
416 
417 	if (*ksocknal_tunables.ksnd_inject_csum_error) {
418 		csum++;
419 		*ksocknal_tunables.ksnd_inject_csum_error = 0;
420 	}
421 
422 	tx->tx_msg.ksm_csum = csum;
423 }
424 
425 int
ksocknal_lib_get_conn_tunables(ksock_conn_t * conn,int * txmem,int * rxmem,int * nagle)426 ksocknal_lib_get_conn_tunables(ksock_conn_t *conn, int *txmem, int *rxmem, int *nagle)
427 {
428 	struct socket *sock = conn->ksnc_sock;
429 	int	    len;
430 	int	    rc;
431 
432 	rc = ksocknal_connsock_addref(conn);
433 	if (rc != 0) {
434 		LASSERT(conn->ksnc_closing);
435 		*txmem = *rxmem = *nagle = 0;
436 		return -ESHUTDOWN;
437 	}
438 
439 	rc = libcfs_sock_getbuf(sock, txmem, rxmem);
440 	if (rc == 0) {
441 		len = sizeof(*nagle);
442 		rc = kernel_getsockopt(sock, SOL_TCP, TCP_NODELAY,
443 					   (char *)nagle, &len);
444 	}
445 
446 	ksocknal_connsock_decref(conn);
447 
448 	if (rc == 0)
449 		*nagle = !*nagle;
450 	else
451 		*txmem = *rxmem = *nagle = 0;
452 
453 	return rc;
454 }
455 
456 int
ksocknal_lib_setup_sock(struct socket * sock)457 ksocknal_lib_setup_sock(struct socket *sock)
458 {
459 	int	     rc;
460 	int	     option;
461 	int	     keep_idle;
462 	int	     keep_intvl;
463 	int	     keep_count;
464 	int	     do_keepalive;
465 	struct linger   linger;
466 
467 	sock->sk->sk_allocation = GFP_NOFS;
468 
469 	/* Ensure this socket aborts active sends immediately when we close
470 	 * it. */
471 
472 	linger.l_onoff = 0;
473 	linger.l_linger = 0;
474 
475 	rc = kernel_setsockopt(sock, SOL_SOCKET, SO_LINGER,
476 			      (char *)&linger, sizeof(linger));
477 	if (rc != 0) {
478 		CERROR("Can't set SO_LINGER: %d\n", rc);
479 		return rc;
480 	}
481 
482 	option = -1;
483 	rc = kernel_setsockopt(sock, SOL_TCP, TCP_LINGER2,
484 				    (char *)&option, sizeof(option));
485 	if (rc != 0) {
486 		CERROR("Can't set SO_LINGER2: %d\n", rc);
487 		return rc;
488 	}
489 
490 	if (!*ksocknal_tunables.ksnd_nagle) {
491 		option = 1;
492 
493 		rc = kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY,
494 					    (char *)&option, sizeof(option));
495 		if (rc != 0) {
496 			CERROR("Can't disable nagle: %d\n", rc);
497 			return rc;
498 		}
499 	}
500 
501 	rc = libcfs_sock_setbuf(sock,
502 				*ksocknal_tunables.ksnd_tx_buffer_size,
503 				*ksocknal_tunables.ksnd_rx_buffer_size);
504 	if (rc != 0) {
505 		CERROR("Can't set buffer tx %d, rx %d buffers: %d\n",
506 			*ksocknal_tunables.ksnd_tx_buffer_size,
507 			*ksocknal_tunables.ksnd_rx_buffer_size, rc);
508 		return rc;
509 	}
510 
511 /* TCP_BACKOFF_* sockopt tunables unsupported in stock kernels */
512 
513 	/* snapshot tunables */
514 	keep_idle  = *ksocknal_tunables.ksnd_keepalive_idle;
515 	keep_count = *ksocknal_tunables.ksnd_keepalive_count;
516 	keep_intvl = *ksocknal_tunables.ksnd_keepalive_intvl;
517 
518 	do_keepalive = (keep_idle > 0 && keep_count > 0 && keep_intvl > 0);
519 
520 	option = (do_keepalive ? 1 : 0);
521 	rc = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
522 			      (char *)&option, sizeof(option));
523 	if (rc != 0) {
524 		CERROR("Can't set SO_KEEPALIVE: %d\n", rc);
525 		return rc;
526 	}
527 
528 	if (!do_keepalive)
529 		return 0;
530 
531 	rc = kernel_setsockopt(sock, SOL_TCP, TCP_KEEPIDLE,
532 				    (char *)&keep_idle, sizeof(keep_idle));
533 	if (rc != 0) {
534 		CERROR("Can't set TCP_KEEPIDLE: %d\n", rc);
535 		return rc;
536 	}
537 
538 	rc = kernel_setsockopt(sock, SOL_TCP, TCP_KEEPINTVL,
539 				    (char *)&keep_intvl, sizeof(keep_intvl));
540 	if (rc != 0) {
541 		CERROR("Can't set TCP_KEEPINTVL: %d\n", rc);
542 		return rc;
543 	}
544 
545 	rc = kernel_setsockopt(sock, SOL_TCP, TCP_KEEPCNT,
546 				    (char *)&keep_count, sizeof(keep_count));
547 	if (rc != 0) {
548 		CERROR("Can't set TCP_KEEPCNT: %d\n", rc);
549 		return rc;
550 	}
551 
552 	return 0;
553 }
554 
555 void
ksocknal_lib_push_conn(ksock_conn_t * conn)556 ksocknal_lib_push_conn(ksock_conn_t *conn)
557 {
558 	struct sock    *sk;
559 	struct tcp_sock *tp;
560 	int	     nonagle;
561 	int	     val = 1;
562 	int	     rc;
563 
564 	rc = ksocknal_connsock_addref(conn);
565 	if (rc != 0)			    /* being shut down */
566 		return;
567 
568 	sk = conn->ksnc_sock->sk;
569 	tp = tcp_sk(sk);
570 
571 	lock_sock(sk);
572 	nonagle = tp->nonagle;
573 	tp->nonagle = 1;
574 	release_sock(sk);
575 
576 	rc = kernel_setsockopt(conn->ksnc_sock, SOL_TCP, TCP_NODELAY,
577 				      (char *)&val, sizeof(val));
578 	LASSERT(rc == 0);
579 
580 	lock_sock(sk);
581 	tp->nonagle = nonagle;
582 	release_sock(sk);
583 
584 	ksocknal_connsock_decref(conn);
585 }
586 
587 extern void ksocknal_read_callback(ksock_conn_t *conn);
588 extern void ksocknal_write_callback(ksock_conn_t *conn);
589 /*
590  * socket call back in Linux
591  */
592 static void
ksocknal_data_ready(struct sock * sk)593 ksocknal_data_ready(struct sock *sk)
594 {
595 	ksock_conn_t  *conn;
596 
597 	/* interleave correctly with closing sockets... */
598 	LASSERT(!in_irq());
599 	read_lock(&ksocknal_data.ksnd_global_lock);
600 
601 	conn = sk->sk_user_data;
602 	if (conn == NULL) {	     /* raced with ksocknal_terminate_conn */
603 		LASSERT(sk->sk_data_ready != &ksocknal_data_ready);
604 		sk->sk_data_ready(sk);
605 	} else
606 		ksocknal_read_callback(conn);
607 
608 	read_unlock(&ksocknal_data.ksnd_global_lock);
609 }
610 
611 static void
ksocknal_write_space(struct sock * sk)612 ksocknal_write_space(struct sock *sk)
613 {
614 	ksock_conn_t  *conn;
615 	int	    wspace;
616 	int	    min_wpace;
617 
618 	/* interleave correctly with closing sockets... */
619 	LASSERT(!in_irq());
620 	read_lock(&ksocknal_data.ksnd_global_lock);
621 
622 	conn = sk->sk_user_data;
623 	wspace = SOCKNAL_WSPACE(sk);
624 	min_wpace = SOCKNAL_MIN_WSPACE(sk);
625 
626 	CDEBUG(D_NET, "sk %p wspace %d low water %d conn %p%s%s%s\n",
627 	       sk, wspace, min_wpace, conn,
628 	       (conn == NULL) ? "" : (conn->ksnc_tx_ready ?
629 				      " ready" : " blocked"),
630 	       (conn == NULL) ? "" : (conn->ksnc_tx_scheduled ?
631 				      " scheduled" : " idle"),
632 	       (conn == NULL) ? "" : (list_empty(&conn->ksnc_tx_queue) ?
633 				      " empty" : " queued"));
634 
635 	if (conn == NULL) {	     /* raced with ksocknal_terminate_conn */
636 		LASSERT(sk->sk_write_space != &ksocknal_write_space);
637 		sk->sk_write_space(sk);
638 
639 		read_unlock(&ksocknal_data.ksnd_global_lock);
640 		return;
641 	}
642 
643 	if (wspace >= min_wpace) {	      /* got enough space */
644 		ksocknal_write_callback(conn);
645 
646 		/* Clear SOCK_NOSPACE _after_ ksocknal_write_callback so the
647 		 * ENOMEM check in ksocknal_transmit is race-free (think about
648 		 * it). */
649 
650 		clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
651 	}
652 
653 	read_unlock(&ksocknal_data.ksnd_global_lock);
654 }
655 
656 void
ksocknal_lib_save_callback(struct socket * sock,ksock_conn_t * conn)657 ksocknal_lib_save_callback(struct socket *sock, ksock_conn_t *conn)
658 {
659 	conn->ksnc_saved_data_ready = sock->sk->sk_data_ready;
660 	conn->ksnc_saved_write_space = sock->sk->sk_write_space;
661 }
662 
663 void
ksocknal_lib_set_callback(struct socket * sock,ksock_conn_t * conn)664 ksocknal_lib_set_callback(struct socket *sock,  ksock_conn_t *conn)
665 {
666 	sock->sk->sk_user_data = conn;
667 	sock->sk->sk_data_ready = ksocknal_data_ready;
668 	sock->sk->sk_write_space = ksocknal_write_space;
669 	return;
670 }
671 
672 void
ksocknal_lib_reset_callback(struct socket * sock,ksock_conn_t * conn)673 ksocknal_lib_reset_callback(struct socket *sock, ksock_conn_t *conn)
674 {
675 	/* Remove conn's network callbacks.
676 	 * NB I _have_ to restore the callback, rather than storing a noop,
677 	 * since the socket could survive past this module being unloaded!! */
678 	sock->sk->sk_data_ready = conn->ksnc_saved_data_ready;
679 	sock->sk->sk_write_space = conn->ksnc_saved_write_space;
680 
681 	/* A callback could be in progress already; they hold a read lock
682 	 * on ksnd_global_lock (to serialise with me) and NOOP if
683 	 * sk_user_data is NULL. */
684 	sock->sk->sk_user_data = NULL;
685 
686 	return ;
687 }
688 
689 int
ksocknal_lib_memory_pressure(ksock_conn_t * conn)690 ksocknal_lib_memory_pressure(ksock_conn_t *conn)
691 {
692 	int	    rc = 0;
693 	ksock_sched_t *sched;
694 
695 	sched = conn->ksnc_scheduler;
696 	spin_lock_bh(&sched->kss_lock);
697 
698 	if (!test_bit(SOCK_NOSPACE, &conn->ksnc_sock->flags) &&
699 	    !conn->ksnc_tx_ready) {
700 		/* SOCK_NOSPACE is set when the socket fills
701 		 * and cleared in the write_space callback
702 		 * (which also sets ksnc_tx_ready).  If
703 		 * SOCK_NOSPACE and ksnc_tx_ready are BOTH
704 		 * zero, I didn't fill the socket and
705 		 * write_space won't reschedule me, so I
706 		 * return -ENOMEM to get my caller to retry
707 		 * after a timeout */
708 		rc = -ENOMEM;
709 	}
710 
711 	spin_unlock_bh(&sched->kss_lock);
712 
713 	return rc;
714 }
715