1/*
2 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
3 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
4 *
5 * This software is available to you under a choice of one of two
6 * licenses.  You may choose to be licensed under the terms of the GNU
7 * General Public License (GPL) Version 2, available from the file
8 * COPYING in the main directory of this source tree, or the BSD-type
9 * license below:
10 *
11 * Redistribution and use in source and binary forms, with or without
12 * modification, are permitted provided that the following conditions
13 * are met:
14 *
15 *      Redistributions of source code must retain the above copyright
16 *      notice, this list of conditions and the following disclaimer.
17 *
18 *      Redistributions in binary form must reproduce the above
19 *      copyright notice, this list of conditions and the following
20 *      disclaimer in the documentation and/or other materials provided
21 *      with the distribution.
22 *
23 *      Neither the name of the Network Appliance, Inc. nor the names of
24 *      its contributors may be used to endorse or promote products
25 *      derived from this software without specific prior written
26 *      permission.
27 *
28 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
29 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
30 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
31 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
32 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
33 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
34 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
35 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
36 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
37 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
38 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 *
40 * Author: Tom Tucker <tom@opengridcomputing.com>
41 */
42
43#include <linux/sunrpc/debug.h>
44#include <linux/sunrpc/rpc_rdma.h>
45#include <linux/spinlock.h>
46#include <asm/unaligned.h>
47#include <rdma/ib_verbs.h>
48#include <rdma/rdma_cm.h>
49#include <linux/sunrpc/svc_rdma.h>
50
51#define RPCDBG_FACILITY	RPCDBG_SVCXPRT
52
53static int map_xdr(struct svcxprt_rdma *xprt,
54		   struct xdr_buf *xdr,
55		   struct svc_rdma_req_map *vec)
56{
57	int sge_no;
58	u32 sge_bytes;
59	u32 page_bytes;
60	u32 page_off;
61	int page_no;
62
63	if (xdr->len !=
64	    (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len)) {
65		pr_err("svcrdma: map_xdr: XDR buffer length error\n");
66		return -EIO;
67	}
68
69	/* Skip the first sge, this is for the RPCRDMA header */
70	sge_no = 1;
71
72	/* Head SGE */
73	vec->sge[sge_no].iov_base = xdr->head[0].iov_base;
74	vec->sge[sge_no].iov_len = xdr->head[0].iov_len;
75	sge_no++;
76
77	/* pages SGE */
78	page_no = 0;
79	page_bytes = xdr->page_len;
80	page_off = xdr->page_base;
81	while (page_bytes) {
82		vec->sge[sge_no].iov_base =
83			page_address(xdr->pages[page_no]) + page_off;
84		sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE - page_off));
85		page_bytes -= sge_bytes;
86		vec->sge[sge_no].iov_len = sge_bytes;
87
88		sge_no++;
89		page_no++;
90		page_off = 0; /* reset for next time through loop */
91	}
92
93	/* Tail SGE */
94	if (xdr->tail[0].iov_len) {
95		vec->sge[sge_no].iov_base = xdr->tail[0].iov_base;
96		vec->sge[sge_no].iov_len = xdr->tail[0].iov_len;
97		sge_no++;
98	}
99
100	dprintk("svcrdma: map_xdr: sge_no %d page_no %d "
101		"page_base %u page_len %u head_len %zu tail_len %zu\n",
102		sge_no, page_no, xdr->page_base, xdr->page_len,
103		xdr->head[0].iov_len, xdr->tail[0].iov_len);
104
105	vec->count = sge_no;
106	return 0;
107}
108
109static dma_addr_t dma_map_xdr(struct svcxprt_rdma *xprt,
110			      struct xdr_buf *xdr,
111			      u32 xdr_off, size_t len, int dir)
112{
113	struct page *page;
114	dma_addr_t dma_addr;
115	if (xdr_off < xdr->head[0].iov_len) {
116		/* This offset is in the head */
117		xdr_off += (unsigned long)xdr->head[0].iov_base & ~PAGE_MASK;
118		page = virt_to_page(xdr->head[0].iov_base);
119	} else {
120		xdr_off -= xdr->head[0].iov_len;
121		if (xdr_off < xdr->page_len) {
122			/* This offset is in the page list */
123			xdr_off += xdr->page_base;
124			page = xdr->pages[xdr_off >> PAGE_SHIFT];
125			xdr_off &= ~PAGE_MASK;
126		} else {
127			/* This offset is in the tail */
128			xdr_off -= xdr->page_len;
129			xdr_off += (unsigned long)
130				xdr->tail[0].iov_base & ~PAGE_MASK;
131			page = virt_to_page(xdr->tail[0].iov_base);
132		}
133	}
134	dma_addr = ib_dma_map_page(xprt->sc_cm_id->device, page, xdr_off,
135				   min_t(size_t, PAGE_SIZE, len), dir);
136	return dma_addr;
137}
138
139/* Assumptions:
140 * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
141 */
142static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
143		      u32 rmr, u64 to,
144		      u32 xdr_off, int write_len,
145		      struct svc_rdma_req_map *vec)
146{
147	struct ib_send_wr write_wr;
148	struct ib_sge *sge;
149	int xdr_sge_no;
150	int sge_no;
151	int sge_bytes;
152	int sge_off;
153	int bc;
154	struct svc_rdma_op_ctxt *ctxt;
155
156	if (vec->count > RPCSVC_MAXPAGES) {
157		pr_err("svcrdma: Too many pages (%lu)\n", vec->count);
158		return -EIO;
159	}
160
161	dprintk("svcrdma: RDMA_WRITE rmr=%x, to=%llx, xdr_off=%d, "
162		"write_len=%d, vec->sge=%p, vec->count=%lu\n",
163		rmr, (unsigned long long)to, xdr_off,
164		write_len, vec->sge, vec->count);
165
166	ctxt = svc_rdma_get_context(xprt);
167	ctxt->direction = DMA_TO_DEVICE;
168	sge = ctxt->sge;
169
170	/* Find the SGE associated with xdr_off */
171	for (bc = xdr_off, xdr_sge_no = 1; bc && xdr_sge_no < vec->count;
172	     xdr_sge_no++) {
173		if (vec->sge[xdr_sge_no].iov_len > bc)
174			break;
175		bc -= vec->sge[xdr_sge_no].iov_len;
176	}
177
178	sge_off = bc;
179	bc = write_len;
180	sge_no = 0;
181
182	/* Copy the remaining SGE */
183	while (bc != 0) {
184		sge_bytes = min_t(size_t,
185			  bc, vec->sge[xdr_sge_no].iov_len-sge_off);
186		sge[sge_no].length = sge_bytes;
187		sge[sge_no].addr =
188			dma_map_xdr(xprt, &rqstp->rq_res, xdr_off,
189				    sge_bytes, DMA_TO_DEVICE);
190		xdr_off += sge_bytes;
191		if (ib_dma_mapping_error(xprt->sc_cm_id->device,
192					 sge[sge_no].addr))
193			goto err;
194		atomic_inc(&xprt->sc_dma_used);
195		sge[sge_no].lkey = xprt->sc_dma_lkey;
196		ctxt->count++;
197		sge_off = 0;
198		sge_no++;
199		xdr_sge_no++;
200		if (xdr_sge_no > vec->count) {
201			pr_err("svcrdma: Too many sges (%d)\n", xdr_sge_no);
202			goto err;
203		}
204		bc -= sge_bytes;
205		if (sge_no == xprt->sc_max_sge)
206			break;
207	}
208
209	/* Prepare WRITE WR */
210	memset(&write_wr, 0, sizeof write_wr);
211	ctxt->wr_op = IB_WR_RDMA_WRITE;
212	write_wr.wr_id = (unsigned long)ctxt;
213	write_wr.sg_list = &sge[0];
214	write_wr.num_sge = sge_no;
215	write_wr.opcode = IB_WR_RDMA_WRITE;
216	write_wr.send_flags = IB_SEND_SIGNALED;
217	write_wr.wr.rdma.rkey = rmr;
218	write_wr.wr.rdma.remote_addr = to;
219
220	/* Post It */
221	atomic_inc(&rdma_stat_write);
222	if (svc_rdma_send(xprt, &write_wr))
223		goto err;
224	return write_len - bc;
225 err:
226	svc_rdma_unmap_dma(ctxt);
227	svc_rdma_put_context(ctxt, 0);
228	/* Fatal error, close transport */
229	return -EIO;
230}
231
232static int send_write_chunks(struct svcxprt_rdma *xprt,
233			     struct rpcrdma_msg *rdma_argp,
234			     struct rpcrdma_msg *rdma_resp,
235			     struct svc_rqst *rqstp,
236			     struct svc_rdma_req_map *vec)
237{
238	u32 xfer_len = rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
239	int write_len;
240	u32 xdr_off;
241	int chunk_off;
242	int chunk_no;
243	struct rpcrdma_write_array *arg_ary;
244	struct rpcrdma_write_array *res_ary;
245	int ret;
246
247	arg_ary = svc_rdma_get_write_array(rdma_argp);
248	if (!arg_ary)
249		return 0;
250	res_ary = (struct rpcrdma_write_array *)
251		&rdma_resp->rm_body.rm_chunks[1];
252
253	/* Write chunks start at the pagelist */
254	for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
255	     xfer_len && chunk_no < arg_ary->wc_nchunks;
256	     chunk_no++) {
257		struct rpcrdma_segment *arg_ch;
258		u64 rs_offset;
259
260		arg_ch = &arg_ary->wc_array[chunk_no].wc_target;
261		write_len = min(xfer_len, ntohl(arg_ch->rs_length));
262
263		/* Prepare the response chunk given the length actually
264		 * written */
265		xdr_decode_hyper((__be32 *)&arg_ch->rs_offset, &rs_offset);
266		svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
267						arg_ch->rs_handle,
268						arg_ch->rs_offset,
269						write_len);
270		chunk_off = 0;
271		while (write_len) {
272			ret = send_write(xprt, rqstp,
273					 ntohl(arg_ch->rs_handle),
274					 rs_offset + chunk_off,
275					 xdr_off,
276					 write_len,
277					 vec);
278			if (ret <= 0) {
279				dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n",
280					ret);
281				return -EIO;
282			}
283			chunk_off += ret;
284			xdr_off += ret;
285			xfer_len -= ret;
286			write_len -= ret;
287		}
288	}
289	/* Update the req with the number of chunks actually used */
290	svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no);
291
292	return rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
293}
294
295static int send_reply_chunks(struct svcxprt_rdma *xprt,
296			     struct rpcrdma_msg *rdma_argp,
297			     struct rpcrdma_msg *rdma_resp,
298			     struct svc_rqst *rqstp,
299			     struct svc_rdma_req_map *vec)
300{
301	u32 xfer_len = rqstp->rq_res.len;
302	int write_len;
303	u32 xdr_off;
304	int chunk_no;
305	int chunk_off;
306	int nchunks;
307	struct rpcrdma_segment *ch;
308	struct rpcrdma_write_array *arg_ary;
309	struct rpcrdma_write_array *res_ary;
310	int ret;
311
312	arg_ary = svc_rdma_get_reply_array(rdma_argp);
313	if (!arg_ary)
314		return 0;
315	/* XXX: need to fix when reply lists occur with read-list and or
316	 * write-list */
317	res_ary = (struct rpcrdma_write_array *)
318		&rdma_resp->rm_body.rm_chunks[2];
319
320	/* xdr offset starts at RPC message */
321	nchunks = ntohl(arg_ary->wc_nchunks);
322	for (xdr_off = 0, chunk_no = 0;
323	     xfer_len && chunk_no < nchunks;
324	     chunk_no++) {
325		u64 rs_offset;
326		ch = &arg_ary->wc_array[chunk_no].wc_target;
327		write_len = min(xfer_len, htonl(ch->rs_length));
328
329		/* Prepare the reply chunk given the length actually
330		 * written */
331		xdr_decode_hyper((__be32 *)&ch->rs_offset, &rs_offset);
332		svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
333						ch->rs_handle, ch->rs_offset,
334						write_len);
335		chunk_off = 0;
336		while (write_len) {
337			ret = send_write(xprt, rqstp,
338					 ntohl(ch->rs_handle),
339					 rs_offset + chunk_off,
340					 xdr_off,
341					 write_len,
342					 vec);
343			if (ret <= 0) {
344				dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n",
345					ret);
346				return -EIO;
347			}
348			chunk_off += ret;
349			xdr_off += ret;
350			xfer_len -= ret;
351			write_len -= ret;
352		}
353	}
354	/* Update the req with the number of chunks actually used */
355	svc_rdma_xdr_encode_reply_array(res_ary, chunk_no);
356
357	return rqstp->rq_res.len;
358}
359
360/* This function prepares the portion of the RPCRDMA message to be
361 * sent in the RDMA_SEND. This function is called after data sent via
362 * RDMA has already been transmitted. There are three cases:
363 * - The RPCRDMA header, RPC header, and payload are all sent in a
364 *   single RDMA_SEND. This is the "inline" case.
365 * - The RPCRDMA header and some portion of the RPC header and data
366 *   are sent via this RDMA_SEND and another portion of the data is
367 *   sent via RDMA.
368 * - The RPCRDMA header [NOMSG] is sent in this RDMA_SEND and the RPC
369 *   header and data are all transmitted via RDMA.
370 * In all three cases, this function prepares the RPCRDMA header in
371 * sge[0], the 'type' parameter indicates the type to place in the
372 * RPCRDMA header, and the 'byte_count' field indicates how much of
373 * the XDR to include in this RDMA_SEND. NB: The offset of the payload
374 * to send is zero in the XDR.
375 */
376static int send_reply(struct svcxprt_rdma *rdma,
377		      struct svc_rqst *rqstp,
378		      struct page *page,
379		      struct rpcrdma_msg *rdma_resp,
380		      struct svc_rdma_op_ctxt *ctxt,
381		      struct svc_rdma_req_map *vec,
382		      int byte_count)
383{
384	struct ib_send_wr send_wr;
385	u32 xdr_off;
386	int sge_no;
387	int sge_bytes;
388	int page_no;
389	int pages;
390	int ret;
391
392	/* Post a recv buffer to handle another request. */
393	ret = svc_rdma_post_recv(rdma);
394	if (ret) {
395		printk(KERN_INFO
396		       "svcrdma: could not post a receive buffer, err=%d."
397		       "Closing transport %p.\n", ret, rdma);
398		set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
399		svc_rdma_put_context(ctxt, 0);
400		return -ENOTCONN;
401	}
402
403	/* Prepare the context */
404	ctxt->pages[0] = page;
405	ctxt->count = 1;
406
407	/* Prepare the SGE for the RPCRDMA Header */
408	ctxt->sge[0].lkey = rdma->sc_dma_lkey;
409	ctxt->sge[0].length = svc_rdma_xdr_get_reply_hdr_len(rdma_resp);
410	ctxt->sge[0].addr =
411	    ib_dma_map_page(rdma->sc_cm_id->device, page, 0,
412			    ctxt->sge[0].length, DMA_TO_DEVICE);
413	if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr))
414		goto err;
415	atomic_inc(&rdma->sc_dma_used);
416
417	ctxt->direction = DMA_TO_DEVICE;
418
419	/* Map the payload indicated by 'byte_count' */
420	xdr_off = 0;
421	for (sge_no = 1; byte_count && sge_no < vec->count; sge_no++) {
422		sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len, byte_count);
423		byte_count -= sge_bytes;
424		ctxt->sge[sge_no].addr =
425			dma_map_xdr(rdma, &rqstp->rq_res, xdr_off,
426				    sge_bytes, DMA_TO_DEVICE);
427		xdr_off += sge_bytes;
428		if (ib_dma_mapping_error(rdma->sc_cm_id->device,
429					 ctxt->sge[sge_no].addr))
430			goto err;
431		atomic_inc(&rdma->sc_dma_used);
432		ctxt->sge[sge_no].lkey = rdma->sc_dma_lkey;
433		ctxt->sge[sge_no].length = sge_bytes;
434	}
435	if (byte_count != 0) {
436		pr_err("svcrdma: Could not map %d bytes\n", byte_count);
437		goto err;
438	}
439
440	/* Save all respages in the ctxt and remove them from the
441	 * respages array. They are our pages until the I/O
442	 * completes.
443	 */
444	pages = rqstp->rq_next_page - rqstp->rq_respages;
445	for (page_no = 0; page_no < pages; page_no++) {
446		ctxt->pages[page_no+1] = rqstp->rq_respages[page_no];
447		ctxt->count++;
448		rqstp->rq_respages[page_no] = NULL;
449		/*
450		 * If there are more pages than SGE, terminate SGE
451		 * list so that svc_rdma_unmap_dma doesn't attempt to
452		 * unmap garbage.
453		 */
454		if (page_no+1 >= sge_no)
455			ctxt->sge[page_no+1].length = 0;
456	}
457	rqstp->rq_next_page = rqstp->rq_respages + 1;
458
459	/* The loop above bumps sc_dma_used for each sge. The
460	 * xdr_buf.tail gets a separate sge, but resides in the
461	 * same page as xdr_buf.head. Don't count it twice.
462	 */
463	if (sge_no > ctxt->count)
464		atomic_dec(&rdma->sc_dma_used);
465
466	if (sge_no > rdma->sc_max_sge) {
467		pr_err("svcrdma: Too many sges (%d)\n", sge_no);
468		goto err;
469	}
470	memset(&send_wr, 0, sizeof send_wr);
471	ctxt->wr_op = IB_WR_SEND;
472	send_wr.wr_id = (unsigned long)ctxt;
473	send_wr.sg_list = ctxt->sge;
474	send_wr.num_sge = sge_no;
475	send_wr.opcode = IB_WR_SEND;
476	send_wr.send_flags =  IB_SEND_SIGNALED;
477
478	ret = svc_rdma_send(rdma, &send_wr);
479	if (ret)
480		goto err;
481
482	return 0;
483
484 err:
485	svc_rdma_unmap_dma(ctxt);
486	svc_rdma_put_context(ctxt, 1);
487	return -EIO;
488}
489
490void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
491{
492}
493
494int svc_rdma_sendto(struct svc_rqst *rqstp)
495{
496	struct svc_xprt *xprt = rqstp->rq_xprt;
497	struct svcxprt_rdma *rdma =
498		container_of(xprt, struct svcxprt_rdma, sc_xprt);
499	struct rpcrdma_msg *rdma_argp;
500	struct rpcrdma_msg *rdma_resp;
501	struct rpcrdma_write_array *reply_ary;
502	enum rpcrdma_proc reply_type;
503	int ret;
504	int inline_bytes;
505	struct page *res_page;
506	struct svc_rdma_op_ctxt *ctxt;
507	struct svc_rdma_req_map *vec;
508
509	dprintk("svcrdma: sending response for rqstp=%p\n", rqstp);
510
511	/* Get the RDMA request header. The receive logic always
512	 * places this at the start of page 0.
513	 */
514	rdma_argp = page_address(rqstp->rq_pages[0]);
515
516	/* Build an req vec for the XDR */
517	ctxt = svc_rdma_get_context(rdma);
518	ctxt->direction = DMA_TO_DEVICE;
519	vec = svc_rdma_get_req_map();
520	ret = map_xdr(rdma, &rqstp->rq_res, vec);
521	if (ret)
522		goto err0;
523	inline_bytes = rqstp->rq_res.len;
524
525	/* Create the RDMA response header */
526	res_page = svc_rdma_get_page();
527	rdma_resp = page_address(res_page);
528	reply_ary = svc_rdma_get_reply_array(rdma_argp);
529	if (reply_ary)
530		reply_type = RDMA_NOMSG;
531	else
532		reply_type = RDMA_MSG;
533	svc_rdma_xdr_encode_reply_header(rdma, rdma_argp,
534					 rdma_resp, reply_type);
535
536	/* Send any write-chunk data and build resp write-list */
537	ret = send_write_chunks(rdma, rdma_argp, rdma_resp,
538				rqstp, vec);
539	if (ret < 0) {
540		printk(KERN_ERR "svcrdma: failed to send write chunks, rc=%d\n",
541		       ret);
542		goto err1;
543	}
544	inline_bytes -= ret;
545
546	/* Send any reply-list data and update resp reply-list */
547	ret = send_reply_chunks(rdma, rdma_argp, rdma_resp,
548				rqstp, vec);
549	if (ret < 0) {
550		printk(KERN_ERR "svcrdma: failed to send reply chunks, rc=%d\n",
551		       ret);
552		goto err1;
553	}
554	inline_bytes -= ret;
555
556	ret = send_reply(rdma, rqstp, res_page, rdma_resp, ctxt, vec,
557			 inline_bytes);
558	svc_rdma_put_req_map(vec);
559	dprintk("svcrdma: send_reply returns %d\n", ret);
560	return ret;
561
562 err1:
563	put_page(res_page);
564 err0:
565	svc_rdma_put_req_map(vec);
566	svc_rdma_put_context(ctxt, 0);
567	return ret;
568}
569