1/*
2 * Device operations for the pnfs nfs4 file layout driver.
3 *
4 * Copyright (c) 2014, Primary Data, Inc. All rights reserved.
5 *
6 * Tao Peng <bergwolf@primarydata.com>
7 */
8
9#include <linux/nfs_fs.h>
10#include <linux/vmalloc.h>
11#include <linux/module.h>
12#include <linux/sunrpc/addr.h>
13
14#include "../internal.h"
15#include "../nfs4session.h"
16#include "flexfilelayout.h"
17
18#define NFSDBG_FACILITY		NFSDBG_PNFS_LD
19
20static unsigned int dataserver_timeo = NFS4_DEF_DS_TIMEO;
21static unsigned int dataserver_retrans = NFS4_DEF_DS_RETRANS;
22
23void nfs4_ff_layout_put_deviceid(struct nfs4_ff_layout_ds *mirror_ds)
24{
25	if (mirror_ds)
26		nfs4_put_deviceid_node(&mirror_ds->id_node);
27}
28
29void nfs4_ff_layout_free_deviceid(struct nfs4_ff_layout_ds *mirror_ds)
30{
31	nfs4_print_deviceid(&mirror_ds->id_node.deviceid);
32	nfs4_pnfs_ds_put(mirror_ds->ds);
33	kfree_rcu(mirror_ds, id_node.rcu);
34}
35
36/* Decode opaque device data and construct new_ds using it */
37struct nfs4_ff_layout_ds *
38nfs4_ff_alloc_deviceid_node(struct nfs_server *server, struct pnfs_device *pdev,
39			    gfp_t gfp_flags)
40{
41	struct xdr_stream stream;
42	struct xdr_buf buf;
43	struct page *scratch;
44	struct list_head dsaddrs;
45	struct nfs4_pnfs_ds_addr *da;
46	struct nfs4_ff_layout_ds *new_ds = NULL;
47	struct nfs4_ff_ds_version *ds_versions = NULL;
48	u32 mp_count;
49	u32 version_count;
50	__be32 *p;
51	int i, ret = -ENOMEM;
52
53	/* set up xdr stream */
54	scratch = alloc_page(gfp_flags);
55	if (!scratch)
56		goto out_err;
57
58	new_ds = kzalloc(sizeof(struct nfs4_ff_layout_ds), gfp_flags);
59	if (!new_ds)
60		goto out_scratch;
61
62	nfs4_init_deviceid_node(&new_ds->id_node,
63				server,
64				&pdev->dev_id);
65	INIT_LIST_HEAD(&dsaddrs);
66
67	xdr_init_decode_pages(&stream, &buf, pdev->pages, pdev->pglen);
68	xdr_set_scratch_buffer(&stream, page_address(scratch), PAGE_SIZE);
69
70	/* multipath count */
71	p = xdr_inline_decode(&stream, 4);
72	if (unlikely(!p))
73		goto out_err_drain_dsaddrs;
74	mp_count = be32_to_cpup(p);
75	dprintk("%s: multipath ds count %d\n", __func__, mp_count);
76
77	for (i = 0; i < mp_count; i++) {
78		/* multipath ds */
79		da = nfs4_decode_mp_ds_addr(server->nfs_client->cl_net,
80					    &stream, gfp_flags);
81		if (da)
82			list_add_tail(&da->da_node, &dsaddrs);
83	}
84	if (list_empty(&dsaddrs)) {
85		dprintk("%s: no suitable DS addresses found\n",
86			__func__);
87		ret = -ENOMEDIUM;
88		goto out_err_drain_dsaddrs;
89	}
90
91	/* version count */
92	p = xdr_inline_decode(&stream, 4);
93	if (unlikely(!p))
94		goto out_err_drain_dsaddrs;
95	version_count = be32_to_cpup(p);
96	dprintk("%s: version count %d\n", __func__, version_count);
97
98	ds_versions = kzalloc(version_count * sizeof(struct nfs4_ff_ds_version),
99			      gfp_flags);
100	if (!ds_versions)
101		goto out_scratch;
102
103	for (i = 0; i < version_count; i++) {
104		/* 20 = version(4) + minor_version(4) + rsize(4) + wsize(4) +
105		 * tightly_coupled(4) */
106		p = xdr_inline_decode(&stream, 20);
107		if (unlikely(!p))
108			goto out_err_drain_dsaddrs;
109		ds_versions[i].version = be32_to_cpup(p++);
110		ds_versions[i].minor_version = be32_to_cpup(p++);
111		ds_versions[i].rsize = nfs_block_size(be32_to_cpup(p++), NULL);
112		ds_versions[i].wsize = nfs_block_size(be32_to_cpup(p++), NULL);
113		ds_versions[i].tightly_coupled = be32_to_cpup(p);
114
115		if (ds_versions[i].rsize > NFS_MAX_FILE_IO_SIZE)
116			ds_versions[i].rsize = NFS_MAX_FILE_IO_SIZE;
117		if (ds_versions[i].wsize > NFS_MAX_FILE_IO_SIZE)
118			ds_versions[i].wsize = NFS_MAX_FILE_IO_SIZE;
119
120		if (ds_versions[i].version != 3 || ds_versions[i].minor_version != 0) {
121			dprintk("%s: [%d] unsupported ds version %d-%d\n", __func__,
122				i, ds_versions[i].version,
123				ds_versions[i].minor_version);
124			ret = -EPROTONOSUPPORT;
125			goto out_err_drain_dsaddrs;
126		}
127
128		dprintk("%s: [%d] vers %u minor_ver %u rsize %u wsize %u coupled %d\n",
129			__func__, i, ds_versions[i].version,
130			ds_versions[i].minor_version,
131			ds_versions[i].rsize,
132			ds_versions[i].wsize,
133			ds_versions[i].tightly_coupled);
134	}
135
136	new_ds->ds_versions = ds_versions;
137	new_ds->ds_versions_cnt = version_count;
138
139	new_ds->ds = nfs4_pnfs_ds_add(&dsaddrs, gfp_flags);
140	if (!new_ds->ds)
141		goto out_err_drain_dsaddrs;
142
143	/* If DS was already in cache, free ds addrs */
144	while (!list_empty(&dsaddrs)) {
145		da = list_first_entry(&dsaddrs,
146				      struct nfs4_pnfs_ds_addr,
147				      da_node);
148		list_del_init(&da->da_node);
149		kfree(da->da_remotestr);
150		kfree(da);
151	}
152
153	__free_page(scratch);
154	return new_ds;
155
156out_err_drain_dsaddrs:
157	while (!list_empty(&dsaddrs)) {
158		da = list_first_entry(&dsaddrs, struct nfs4_pnfs_ds_addr,
159				      da_node);
160		list_del_init(&da->da_node);
161		kfree(da->da_remotestr);
162		kfree(da);
163	}
164
165	kfree(ds_versions);
166out_scratch:
167	__free_page(scratch);
168out_err:
169	kfree(new_ds);
170
171	dprintk("%s ERROR: returning %d\n", __func__, ret);
172	return NULL;
173}
174
175static void ff_layout_mark_devid_invalid(struct pnfs_layout_segment *lseg,
176		struct nfs4_deviceid_node *devid)
177{
178	nfs4_mark_deviceid_unavailable(devid);
179	if (!ff_layout_has_available_ds(lseg))
180		pnfs_error_mark_layout_for_return(lseg->pls_layout->plh_inode,
181				lseg);
182}
183
184static bool ff_layout_mirror_valid(struct pnfs_layout_segment *lseg,
185		struct nfs4_ff_layout_mirror *mirror)
186{
187	if (mirror == NULL || mirror->mirror_ds == NULL) {
188		pnfs_error_mark_layout_for_return(lseg->pls_layout->plh_inode,
189					lseg);
190		return false;
191	}
192	if (mirror->mirror_ds->ds == NULL) {
193		struct nfs4_deviceid_node *devid;
194		devid = &mirror->mirror_ds->id_node;
195		ff_layout_mark_devid_invalid(lseg, devid);
196		return false;
197	}
198	return true;
199}
200
201static u64
202end_offset(u64 start, u64 len)
203{
204	u64 end;
205
206	end = start + len;
207	return end >= start ? end : NFS4_MAX_UINT64;
208}
209
210static void extend_ds_error(struct nfs4_ff_layout_ds_err *err,
211			    u64 offset, u64 length)
212{
213	u64 end;
214
215	end = max_t(u64, end_offset(err->offset, err->length),
216		    end_offset(offset, length));
217	err->offset = min_t(u64, err->offset, offset);
218	err->length = end - err->offset;
219}
220
221static bool ds_error_can_merge(struct nfs4_ff_layout_ds_err *err,  u64 offset,
222			       u64 length, int status, enum nfs_opnum4 opnum,
223			       nfs4_stateid *stateid,
224			       struct nfs4_deviceid *deviceid)
225{
226	return err->status == status && err->opnum == opnum &&
227	       nfs4_stateid_match(&err->stateid, stateid) &&
228	       !memcmp(&err->deviceid, deviceid, sizeof(*deviceid)) &&
229	       end_offset(err->offset, err->length) >= offset &&
230	       err->offset <= end_offset(offset, length);
231}
232
233static bool merge_ds_error(struct nfs4_ff_layout_ds_err *old,
234			   struct nfs4_ff_layout_ds_err *new)
235{
236	if (!ds_error_can_merge(old, new->offset, new->length, new->status,
237				new->opnum, &new->stateid, &new->deviceid))
238		return false;
239
240	extend_ds_error(old, new->offset, new->length);
241	return true;
242}
243
244static bool
245ff_layout_add_ds_error_locked(struct nfs4_flexfile_layout *flo,
246			      struct nfs4_ff_layout_ds_err *dserr)
247{
248	struct nfs4_ff_layout_ds_err *err;
249
250	list_for_each_entry(err, &flo->error_list, list) {
251		if (merge_ds_error(err, dserr)) {
252			return true;
253		}
254	}
255
256	list_add(&dserr->list, &flo->error_list);
257	return false;
258}
259
260static bool
261ff_layout_update_ds_error(struct nfs4_flexfile_layout *flo, u64 offset,
262			  u64 length, int status, enum nfs_opnum4 opnum,
263			  nfs4_stateid *stateid, struct nfs4_deviceid *deviceid)
264{
265	bool found = false;
266	struct nfs4_ff_layout_ds_err *err;
267
268	list_for_each_entry(err, &flo->error_list, list) {
269		if (ds_error_can_merge(err, offset, length, status, opnum,
270				       stateid, deviceid)) {
271			found = true;
272			extend_ds_error(err, offset, length);
273			break;
274		}
275	}
276
277	return found;
278}
279
280int ff_layout_track_ds_error(struct nfs4_flexfile_layout *flo,
281			     struct nfs4_ff_layout_mirror *mirror, u64 offset,
282			     u64 length, int status, enum nfs_opnum4 opnum,
283			     gfp_t gfp_flags)
284{
285	struct nfs4_ff_layout_ds_err *dserr;
286	bool needfree;
287
288	if (status == 0)
289		return 0;
290
291	if (mirror->mirror_ds == NULL)
292		return -EINVAL;
293
294	spin_lock(&flo->generic_hdr.plh_inode->i_lock);
295	if (ff_layout_update_ds_error(flo, offset, length, status, opnum,
296				      &mirror->stateid,
297				      &mirror->mirror_ds->id_node.deviceid)) {
298		spin_unlock(&flo->generic_hdr.plh_inode->i_lock);
299		return 0;
300	}
301	spin_unlock(&flo->generic_hdr.plh_inode->i_lock);
302	dserr = kmalloc(sizeof(*dserr), gfp_flags);
303	if (!dserr)
304		return -ENOMEM;
305
306	INIT_LIST_HEAD(&dserr->list);
307	dserr->offset = offset;
308	dserr->length = length;
309	dserr->status = status;
310	dserr->opnum = opnum;
311	nfs4_stateid_copy(&dserr->stateid, &mirror->stateid);
312	memcpy(&dserr->deviceid, &mirror->mirror_ds->id_node.deviceid,
313	       NFS4_DEVICEID4_SIZE);
314
315	spin_lock(&flo->generic_hdr.plh_inode->i_lock);
316	needfree = ff_layout_add_ds_error_locked(flo, dserr);
317	spin_unlock(&flo->generic_hdr.plh_inode->i_lock);
318	if (needfree)
319		kfree(dserr);
320
321	return 0;
322}
323
324/* currently we only support AUTH_NONE and AUTH_SYS */
325static rpc_authflavor_t
326nfs4_ff_layout_choose_authflavor(struct nfs4_ff_layout_mirror *mirror)
327{
328	if (mirror->uid == (u32)-1)
329		return RPC_AUTH_NULL;
330	return RPC_AUTH_UNIX;
331}
332
333/* fetch cred for NFSv3 DS */
334static int ff_layout_update_mirror_cred(struct nfs4_ff_layout_mirror *mirror,
335				      struct nfs4_pnfs_ds *ds)
336{
337	if (ds->ds_clp && !mirror->cred &&
338	    mirror->mirror_ds->ds_versions[0].version == 3) {
339		struct rpc_auth *auth = ds->ds_clp->cl_rpcclient->cl_auth;
340		struct rpc_cred *cred;
341		struct auth_cred acred = {
342			.uid = make_kuid(&init_user_ns, mirror->uid),
343			.gid = make_kgid(&init_user_ns, mirror->gid),
344		};
345
346		/* AUTH_NULL ignores acred */
347		cred = auth->au_ops->lookup_cred(auth, &acred, 0);
348		if (IS_ERR(cred)) {
349			dprintk("%s: lookup_cred failed with %ld\n",
350				__func__, PTR_ERR(cred));
351			return PTR_ERR(cred);
352		} else {
353			if (cmpxchg(&mirror->cred, NULL, cred))
354				put_rpccred(cred);
355		}
356	}
357	return 0;
358}
359
360struct nfs_fh *
361nfs4_ff_layout_select_ds_fh(struct pnfs_layout_segment *lseg, u32 mirror_idx)
362{
363	struct nfs4_ff_layout_mirror *mirror = FF_LAYOUT_COMP(lseg, mirror_idx);
364	struct nfs_fh *fh = NULL;
365
366	if (!ff_layout_mirror_valid(lseg, mirror)) {
367		pr_err_ratelimited("NFS: %s: No data server for mirror offset index %d\n",
368			__func__, mirror_idx);
369		goto out;
370	}
371
372	/* FIXME: For now assume there is only 1 version available for the DS */
373	fh = &mirror->fh_versions[0];
374out:
375	return fh;
376}
377
378/* Upon return, either ds is connected, or ds is NULL */
379struct nfs4_pnfs_ds *
380nfs4_ff_layout_prepare_ds(struct pnfs_layout_segment *lseg, u32 ds_idx,
381			  bool fail_return)
382{
383	struct nfs4_ff_layout_mirror *mirror = FF_LAYOUT_COMP(lseg, ds_idx);
384	struct nfs4_pnfs_ds *ds = NULL;
385	struct nfs4_deviceid_node *devid;
386	struct inode *ino = lseg->pls_layout->plh_inode;
387	struct nfs_server *s = NFS_SERVER(ino);
388	unsigned int max_payload;
389	rpc_authflavor_t flavor;
390
391	if (!ff_layout_mirror_valid(lseg, mirror)) {
392		pr_err_ratelimited("NFS: %s: No data server for offset index %d\n",
393			__func__, ds_idx);
394		goto out;
395	}
396
397	devid = &mirror->mirror_ds->id_node;
398	if (ff_layout_test_devid_unavailable(devid))
399		goto out;
400
401	ds = mirror->mirror_ds->ds;
402	/* matching smp_wmb() in _nfs4_pnfs_v3/4_ds_connect */
403	smp_rmb();
404	if (ds->ds_clp)
405		goto out_update_creds;
406
407	flavor = nfs4_ff_layout_choose_authflavor(mirror);
408
409	/* FIXME: For now we assume the server sent only one version of NFS
410	 * to use for the DS.
411	 */
412	nfs4_pnfs_ds_connect(s, ds, devid, dataserver_timeo,
413			     dataserver_retrans,
414			     mirror->mirror_ds->ds_versions[0].version,
415			     mirror->mirror_ds->ds_versions[0].minor_version,
416			     flavor);
417
418	/* connect success, check rsize/wsize limit */
419	if (ds->ds_clp) {
420		max_payload =
421			nfs_block_size(rpc_max_payload(ds->ds_clp->cl_rpcclient),
422				       NULL);
423		if (mirror->mirror_ds->ds_versions[0].rsize > max_payload)
424			mirror->mirror_ds->ds_versions[0].rsize = max_payload;
425		if (mirror->mirror_ds->ds_versions[0].wsize > max_payload)
426			mirror->mirror_ds->ds_versions[0].wsize = max_payload;
427	} else {
428		ff_layout_track_ds_error(FF_LAYOUT_FROM_HDR(lseg->pls_layout),
429					 mirror, lseg->pls_range.offset,
430					 lseg->pls_range.length, NFS4ERR_NXIO,
431					 OP_ILLEGAL, GFP_NOIO);
432		if (fail_return) {
433			pnfs_error_mark_layout_for_return(ino, lseg);
434			if (ff_layout_has_available_ds(lseg))
435				pnfs_set_retry_layoutget(lseg->pls_layout);
436			else
437				pnfs_clear_retry_layoutget(lseg->pls_layout);
438
439		} else {
440			if (ff_layout_has_available_ds(lseg))
441				set_bit(NFS_LAYOUT_RETURN_BEFORE_CLOSE,
442					&lseg->pls_layout->plh_flags);
443			else {
444				pnfs_error_mark_layout_for_return(ino, lseg);
445				pnfs_clear_retry_layoutget(lseg->pls_layout);
446			}
447		}
448	}
449out_update_creds:
450	if (ff_layout_update_mirror_cred(mirror, ds))
451		ds = NULL;
452out:
453	return ds;
454}
455
456struct rpc_cred *
457ff_layout_get_ds_cred(struct pnfs_layout_segment *lseg, u32 ds_idx,
458		      struct rpc_cred *mdscred)
459{
460	struct nfs4_ff_layout_mirror *mirror = FF_LAYOUT_COMP(lseg, ds_idx);
461	struct rpc_cred *cred = ERR_PTR(-EINVAL);
462
463	if (!nfs4_ff_layout_prepare_ds(lseg, ds_idx, true))
464		goto out;
465
466	if (mirror && mirror->cred)
467		cred = mirror->cred;
468	else
469		cred = mdscred;
470out:
471	return cred;
472}
473
474/**
475* Find or create a DS rpc client with th MDS server rpc client auth flavor
476* in the nfs_client cl_ds_clients list.
477*/
478struct rpc_clnt *
479nfs4_ff_find_or_create_ds_client(struct pnfs_layout_segment *lseg, u32 ds_idx,
480				 struct nfs_client *ds_clp, struct inode *inode)
481{
482	struct nfs4_ff_layout_mirror *mirror = FF_LAYOUT_COMP(lseg, ds_idx);
483
484	switch (mirror->mirror_ds->ds_versions[0].version) {
485	case 3:
486		/* For NFSv3 DS, flavor is set when creating DS connections */
487		return ds_clp->cl_rpcclient;
488	case 4:
489		return nfs4_find_or_create_ds_client(ds_clp, inode);
490	default:
491		BUG();
492	}
493}
494
495static bool is_range_intersecting(u64 offset1, u64 length1,
496				  u64 offset2, u64 length2)
497{
498	u64 end1 = end_offset(offset1, length1);
499	u64 end2 = end_offset(offset2, length2);
500
501	return (end1 == NFS4_MAX_UINT64 || end1 > offset2) &&
502	       (end2 == NFS4_MAX_UINT64 || end2 > offset1);
503}
504
505/* called with inode i_lock held */
506int ff_layout_encode_ds_ioerr(struct nfs4_flexfile_layout *flo,
507			      struct xdr_stream *xdr, int *count,
508			      const struct pnfs_layout_range *range)
509{
510	struct nfs4_ff_layout_ds_err *err, *n;
511	__be32 *p;
512
513	list_for_each_entry_safe(err, n, &flo->error_list, list) {
514		if (!is_range_intersecting(err->offset, err->length,
515					   range->offset, range->length))
516			continue;
517		/* offset(8) + length(8) + stateid(NFS4_STATEID_SIZE)
518		 * + array length + deviceid(NFS4_DEVICEID4_SIZE)
519		 * + status(4) + opnum(4)
520		 */
521		p = xdr_reserve_space(xdr,
522				28 + NFS4_STATEID_SIZE + NFS4_DEVICEID4_SIZE);
523		if (unlikely(!p))
524			return -ENOBUFS;
525		p = xdr_encode_hyper(p, err->offset);
526		p = xdr_encode_hyper(p, err->length);
527		p = xdr_encode_opaque_fixed(p, &err->stateid,
528					    NFS4_STATEID_SIZE);
529		/* Encode 1 error */
530		*p++ = cpu_to_be32(1);
531		p = xdr_encode_opaque_fixed(p, &err->deviceid,
532					    NFS4_DEVICEID4_SIZE);
533		*p++ = cpu_to_be32(err->status);
534		*p++ = cpu_to_be32(err->opnum);
535		*count += 1;
536		list_del(&err->list);
537		dprintk("%s: offset %llu length %llu status %d op %d count %d\n",
538			__func__, err->offset, err->length, err->status,
539			err->opnum, *count);
540		kfree(err);
541	}
542
543	return 0;
544}
545
546static bool ff_read_layout_has_available_ds(struct pnfs_layout_segment *lseg)
547{
548	struct nfs4_ff_layout_mirror *mirror;
549	struct nfs4_deviceid_node *devid;
550	u32 idx;
551
552	for (idx = 0; idx < FF_LAYOUT_MIRROR_COUNT(lseg); idx++) {
553		mirror = FF_LAYOUT_COMP(lseg, idx);
554		if (mirror && mirror->mirror_ds) {
555			devid = &mirror->mirror_ds->id_node;
556			if (!ff_layout_test_devid_unavailable(devid))
557				return true;
558		}
559	}
560
561	return false;
562}
563
564static bool ff_rw_layout_has_available_ds(struct pnfs_layout_segment *lseg)
565{
566	struct nfs4_ff_layout_mirror *mirror;
567	struct nfs4_deviceid_node *devid;
568	u32 idx;
569
570	for (idx = 0; idx < FF_LAYOUT_MIRROR_COUNT(lseg); idx++) {
571		mirror = FF_LAYOUT_COMP(lseg, idx);
572		if (!mirror || !mirror->mirror_ds)
573			return false;
574		devid = &mirror->mirror_ds->id_node;
575		if (ff_layout_test_devid_unavailable(devid))
576			return false;
577	}
578
579	return FF_LAYOUT_MIRROR_COUNT(lseg) != 0;
580}
581
582bool ff_layout_has_available_ds(struct pnfs_layout_segment *lseg)
583{
584	if (lseg->pls_range.iomode == IOMODE_READ)
585		return  ff_read_layout_has_available_ds(lseg);
586	/* Note: RW layout needs all mirrors available */
587	return ff_rw_layout_has_available_ds(lseg);
588}
589
590module_param(dataserver_retrans, uint, 0644);
591MODULE_PARM_DESC(dataserver_retrans, "The  number of times the NFSv4.1 client "
592			"retries a request before it attempts further "
593			" recovery  action.");
594module_param(dataserver_timeo, uint, 0644);
595MODULE_PARM_DESC(dataserver_timeo, "The time (in tenths of a second) the "
596			"NFSv4.1  client  waits for a response from a "
597			" data server before it retries an NFS request.");
598