1/*
2 *  linux/net/sunrpc/clnt.c
3 *
4 *  This file contains the high-level RPC interface.
5 *  It is modeled as a finite state machine to support both synchronous
6 *  and asynchronous requests.
7 *
8 *  -	RPC header generation and argument serialization.
9 *  -	Credential refresh.
10 *  -	TCP connect handling.
11 *  -	Retry of operation when it is suspected the operation failed because
12 *	of uid squashing on the server, or when the credentials were stale
13 *	and need to be refreshed, or when a packet was damaged in transit.
14 *	This may be have to be moved to the VFS layer.
15 *
16 *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
17 *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
18 */
19
20
21#include <linux/module.h>
22#include <linux/types.h>
23#include <linux/kallsyms.h>
24#include <linux/mm.h>
25#include <linux/namei.h>
26#include <linux/mount.h>
27#include <linux/slab.h>
28#include <linux/rcupdate.h>
29#include <linux/utsname.h>
30#include <linux/workqueue.h>
31#include <linux/in.h>
32#include <linux/in6.h>
33#include <linux/un.h>
34
35#include <linux/sunrpc/clnt.h>
36#include <linux/sunrpc/addr.h>
37#include <linux/sunrpc/rpc_pipe_fs.h>
38#include <linux/sunrpc/metrics.h>
39#include <linux/sunrpc/bc_xprt.h>
40#include <trace/events/sunrpc.h>
41
42#include "sunrpc.h"
43#include "netns.h"
44
45#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
46# define RPCDBG_FACILITY	RPCDBG_CALL
47#endif
48
49#define dprint_status(t)					\
50	dprintk("RPC: %5u %s (status %d)\n", t->tk_pid,		\
51			__func__, t->tk_status)
52
53/*
54 * All RPC clients are linked into this list
55 */
56
57static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
58
59
60static void	call_start(struct rpc_task *task);
61static void	call_reserve(struct rpc_task *task);
62static void	call_reserveresult(struct rpc_task *task);
63static void	call_allocate(struct rpc_task *task);
64static void	call_decode(struct rpc_task *task);
65static void	call_bind(struct rpc_task *task);
66static void	call_bind_status(struct rpc_task *task);
67static void	call_transmit(struct rpc_task *task);
68#if defined(CONFIG_SUNRPC_BACKCHANNEL)
69static void	call_bc_transmit(struct rpc_task *task);
70#endif /* CONFIG_SUNRPC_BACKCHANNEL */
71static void	call_status(struct rpc_task *task);
72static void	call_transmit_status(struct rpc_task *task);
73static void	call_refresh(struct rpc_task *task);
74static void	call_refreshresult(struct rpc_task *task);
75static void	call_timeout(struct rpc_task *task);
76static void	call_connect(struct rpc_task *task);
77static void	call_connect_status(struct rpc_task *task);
78
79static __be32	*rpc_encode_header(struct rpc_task *task);
80static __be32	*rpc_verify_header(struct rpc_task *task);
81static int	rpc_ping(struct rpc_clnt *clnt);
82
83static void rpc_register_client(struct rpc_clnt *clnt)
84{
85	struct net *net = rpc_net_ns(clnt);
86	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
87
88	spin_lock(&sn->rpc_client_lock);
89	list_add(&clnt->cl_clients, &sn->all_clients);
90	spin_unlock(&sn->rpc_client_lock);
91}
92
93static void rpc_unregister_client(struct rpc_clnt *clnt)
94{
95	struct net *net = rpc_net_ns(clnt);
96	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
97
98	spin_lock(&sn->rpc_client_lock);
99	list_del(&clnt->cl_clients);
100	spin_unlock(&sn->rpc_client_lock);
101}
102
103static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
104{
105	rpc_remove_client_dir(clnt);
106}
107
108static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
109{
110	struct net *net = rpc_net_ns(clnt);
111	struct super_block *pipefs_sb;
112
113	pipefs_sb = rpc_get_sb_net(net);
114	if (pipefs_sb) {
115		__rpc_clnt_remove_pipedir(clnt);
116		rpc_put_sb_net(net);
117	}
118}
119
120static struct dentry *rpc_setup_pipedir_sb(struct super_block *sb,
121				    struct rpc_clnt *clnt)
122{
123	static uint32_t clntid;
124	const char *dir_name = clnt->cl_program->pipe_dir_name;
125	char name[15];
126	struct dentry *dir, *dentry;
127
128	dir = rpc_d_lookup_sb(sb, dir_name);
129	if (dir == NULL) {
130		pr_info("RPC: pipefs directory doesn't exist: %s\n", dir_name);
131		return dir;
132	}
133	for (;;) {
134		snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
135		name[sizeof(name) - 1] = '\0';
136		dentry = rpc_create_client_dir(dir, name, clnt);
137		if (!IS_ERR(dentry))
138			break;
139		if (dentry == ERR_PTR(-EEXIST))
140			continue;
141		printk(KERN_INFO "RPC: Couldn't create pipefs entry"
142				" %s/%s, error %ld\n",
143				dir_name, name, PTR_ERR(dentry));
144		break;
145	}
146	dput(dir);
147	return dentry;
148}
149
150static int
151rpc_setup_pipedir(struct super_block *pipefs_sb, struct rpc_clnt *clnt)
152{
153	struct dentry *dentry;
154
155	if (clnt->cl_program->pipe_dir_name != NULL) {
156		dentry = rpc_setup_pipedir_sb(pipefs_sb, clnt);
157		if (IS_ERR(dentry))
158			return PTR_ERR(dentry);
159	}
160	return 0;
161}
162
163static int rpc_clnt_skip_event(struct rpc_clnt *clnt, unsigned long event)
164{
165	if (clnt->cl_program->pipe_dir_name == NULL)
166		return 1;
167
168	switch (event) {
169	case RPC_PIPEFS_MOUNT:
170		if (clnt->cl_pipedir_objects.pdh_dentry != NULL)
171			return 1;
172		if (atomic_read(&clnt->cl_count) == 0)
173			return 1;
174		break;
175	case RPC_PIPEFS_UMOUNT:
176		if (clnt->cl_pipedir_objects.pdh_dentry == NULL)
177			return 1;
178		break;
179	}
180	return 0;
181}
182
183static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
184				   struct super_block *sb)
185{
186	struct dentry *dentry;
187	int err = 0;
188
189	switch (event) {
190	case RPC_PIPEFS_MOUNT:
191		dentry = rpc_setup_pipedir_sb(sb, clnt);
192		if (!dentry)
193			return -ENOENT;
194		if (IS_ERR(dentry))
195			return PTR_ERR(dentry);
196		break;
197	case RPC_PIPEFS_UMOUNT:
198		__rpc_clnt_remove_pipedir(clnt);
199		break;
200	default:
201		printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
202		return -ENOTSUPP;
203	}
204	return err;
205}
206
207static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
208				struct super_block *sb)
209{
210	int error = 0;
211
212	for (;; clnt = clnt->cl_parent) {
213		if (!rpc_clnt_skip_event(clnt, event))
214			error = __rpc_clnt_handle_event(clnt, event, sb);
215		if (error || clnt == clnt->cl_parent)
216			break;
217	}
218	return error;
219}
220
221static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
222{
223	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
224	struct rpc_clnt *clnt;
225
226	spin_lock(&sn->rpc_client_lock);
227	list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
228		if (rpc_clnt_skip_event(clnt, event))
229			continue;
230		spin_unlock(&sn->rpc_client_lock);
231		return clnt;
232	}
233	spin_unlock(&sn->rpc_client_lock);
234	return NULL;
235}
236
237static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
238			    void *ptr)
239{
240	struct super_block *sb = ptr;
241	struct rpc_clnt *clnt;
242	int error = 0;
243
244	while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
245		error = __rpc_pipefs_event(clnt, event, sb);
246		if (error)
247			break;
248	}
249	return error;
250}
251
252static struct notifier_block rpc_clients_block = {
253	.notifier_call	= rpc_pipefs_event,
254	.priority	= SUNRPC_PIPEFS_RPC_PRIO,
255};
256
257int rpc_clients_notifier_register(void)
258{
259	return rpc_pipefs_notifier_register(&rpc_clients_block);
260}
261
262void rpc_clients_notifier_unregister(void)
263{
264	return rpc_pipefs_notifier_unregister(&rpc_clients_block);
265}
266
267static struct rpc_xprt *rpc_clnt_set_transport(struct rpc_clnt *clnt,
268		struct rpc_xprt *xprt,
269		const struct rpc_timeout *timeout)
270{
271	struct rpc_xprt *old;
272
273	spin_lock(&clnt->cl_lock);
274	old = rcu_dereference_protected(clnt->cl_xprt,
275			lockdep_is_held(&clnt->cl_lock));
276
277	if (!xprt_bound(xprt))
278		clnt->cl_autobind = 1;
279
280	clnt->cl_timeout = timeout;
281	rcu_assign_pointer(clnt->cl_xprt, xprt);
282	spin_unlock(&clnt->cl_lock);
283
284	return old;
285}
286
287static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)
288{
289	clnt->cl_nodelen = strlcpy(clnt->cl_nodename,
290			nodename, sizeof(clnt->cl_nodename));
291}
292
293static int rpc_client_register(struct rpc_clnt *clnt,
294			       rpc_authflavor_t pseudoflavor,
295			       const char *client_name)
296{
297	struct rpc_auth_create_args auth_args = {
298		.pseudoflavor = pseudoflavor,
299		.target_name = client_name,
300	};
301	struct rpc_auth *auth;
302	struct net *net = rpc_net_ns(clnt);
303	struct super_block *pipefs_sb;
304	int err;
305
306	rpc_clnt_debugfs_register(clnt);
307
308	pipefs_sb = rpc_get_sb_net(net);
309	if (pipefs_sb) {
310		err = rpc_setup_pipedir(pipefs_sb, clnt);
311		if (err)
312			goto out;
313	}
314
315	rpc_register_client(clnt);
316	if (pipefs_sb)
317		rpc_put_sb_net(net);
318
319	auth = rpcauth_create(&auth_args, clnt);
320	if (IS_ERR(auth)) {
321		dprintk("RPC:       Couldn't create auth handle (flavor %u)\n",
322				pseudoflavor);
323		err = PTR_ERR(auth);
324		goto err_auth;
325	}
326	return 0;
327err_auth:
328	pipefs_sb = rpc_get_sb_net(net);
329	rpc_unregister_client(clnt);
330	__rpc_clnt_remove_pipedir(clnt);
331out:
332	if (pipefs_sb)
333		rpc_put_sb_net(net);
334	rpc_clnt_debugfs_unregister(clnt);
335	return err;
336}
337
338static DEFINE_IDA(rpc_clids);
339
340static int rpc_alloc_clid(struct rpc_clnt *clnt)
341{
342	int clid;
343
344	clid = ida_simple_get(&rpc_clids, 0, 0, GFP_KERNEL);
345	if (clid < 0)
346		return clid;
347	clnt->cl_clid = clid;
348	return 0;
349}
350
351static void rpc_free_clid(struct rpc_clnt *clnt)
352{
353	ida_simple_remove(&rpc_clids, clnt->cl_clid);
354}
355
356static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
357		struct rpc_xprt *xprt,
358		struct rpc_clnt *parent)
359{
360	const struct rpc_program *program = args->program;
361	const struct rpc_version *version;
362	struct rpc_clnt *clnt = NULL;
363	const struct rpc_timeout *timeout;
364	const char *nodename = args->nodename;
365	int err;
366
367	/* sanity check the name before trying to print it */
368	dprintk("RPC:       creating %s client for %s (xprt %p)\n",
369			program->name, args->servername, xprt);
370
371	err = rpciod_up();
372	if (err)
373		goto out_no_rpciod;
374
375	err = -EINVAL;
376	if (args->version >= program->nrvers)
377		goto out_err;
378	version = program->version[args->version];
379	if (version == NULL)
380		goto out_err;
381
382	err = -ENOMEM;
383	clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
384	if (!clnt)
385		goto out_err;
386	clnt->cl_parent = parent ? : clnt;
387
388	err = rpc_alloc_clid(clnt);
389	if (err)
390		goto out_no_clid;
391
392	clnt->cl_procinfo = version->procs;
393	clnt->cl_maxproc  = version->nrprocs;
394	clnt->cl_prog     = args->prognumber ? : program->number;
395	clnt->cl_vers     = version->number;
396	clnt->cl_stats    = program->stats;
397	clnt->cl_metrics  = rpc_alloc_iostats(clnt);
398	rpc_init_pipe_dir_head(&clnt->cl_pipedir_objects);
399	err = -ENOMEM;
400	if (clnt->cl_metrics == NULL)
401		goto out_no_stats;
402	clnt->cl_program  = program;
403	INIT_LIST_HEAD(&clnt->cl_tasks);
404	spin_lock_init(&clnt->cl_lock);
405
406	timeout = xprt->timeout;
407	if (args->timeout != NULL) {
408		memcpy(&clnt->cl_timeout_default, args->timeout,
409				sizeof(clnt->cl_timeout_default));
410		timeout = &clnt->cl_timeout_default;
411	}
412
413	rpc_clnt_set_transport(clnt, xprt, timeout);
414
415	clnt->cl_rtt = &clnt->cl_rtt_default;
416	rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
417
418	atomic_set(&clnt->cl_count, 1);
419
420	if (nodename == NULL)
421		nodename = utsname()->nodename;
422	/* save the nodename */
423	rpc_clnt_set_nodename(clnt, nodename);
424
425	err = rpc_client_register(clnt, args->authflavor, args->client_name);
426	if (err)
427		goto out_no_path;
428	if (parent)
429		atomic_inc(&parent->cl_count);
430	return clnt;
431
432out_no_path:
433	rpc_free_iostats(clnt->cl_metrics);
434out_no_stats:
435	rpc_free_clid(clnt);
436out_no_clid:
437	kfree(clnt);
438out_err:
439	rpciod_down();
440out_no_rpciod:
441	xprt_put(xprt);
442	return ERR_PTR(err);
443}
444
445struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args,
446					struct rpc_xprt *xprt)
447{
448	struct rpc_clnt *clnt = NULL;
449
450	clnt = rpc_new_client(args, xprt, NULL);
451	if (IS_ERR(clnt))
452		return clnt;
453
454	if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
455		int err = rpc_ping(clnt);
456		if (err != 0) {
457			rpc_shutdown_client(clnt);
458			return ERR_PTR(err);
459		}
460	}
461
462	clnt->cl_softrtry = 1;
463	if (args->flags & RPC_CLNT_CREATE_HARDRTRY)
464		clnt->cl_softrtry = 0;
465
466	if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
467		clnt->cl_autobind = 1;
468	if (args->flags & RPC_CLNT_CREATE_NO_RETRANS_TIMEOUT)
469		clnt->cl_noretranstimeo = 1;
470	if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
471		clnt->cl_discrtry = 1;
472	if (!(args->flags & RPC_CLNT_CREATE_QUIET))
473		clnt->cl_chatty = 1;
474
475	return clnt;
476}
477EXPORT_SYMBOL_GPL(rpc_create_xprt);
478
479/**
480 * rpc_create - create an RPC client and transport with one call
481 * @args: rpc_clnt create argument structure
482 *
483 * Creates and initializes an RPC transport and an RPC client.
484 *
485 * It can ping the server in order to determine if it is up, and to see if
486 * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
487 * this behavior so asynchronous tasks can also use rpc_create.
488 */
489struct rpc_clnt *rpc_create(struct rpc_create_args *args)
490{
491	struct rpc_xprt *xprt;
492	struct xprt_create xprtargs = {
493		.net = args->net,
494		.ident = args->protocol,
495		.srcaddr = args->saddress,
496		.dstaddr = args->address,
497		.addrlen = args->addrsize,
498		.servername = args->servername,
499		.bc_xprt = args->bc_xprt,
500	};
501	char servername[48];
502
503	if (args->flags & RPC_CLNT_CREATE_INFINITE_SLOTS)
504		xprtargs.flags |= XPRT_CREATE_INFINITE_SLOTS;
505	if (args->flags & RPC_CLNT_CREATE_NO_IDLE_TIMEOUT)
506		xprtargs.flags |= XPRT_CREATE_NO_IDLE_TIMEOUT;
507	/*
508	 * If the caller chooses not to specify a hostname, whip
509	 * up a string representation of the passed-in address.
510	 */
511	if (xprtargs.servername == NULL) {
512		struct sockaddr_un *sun =
513				(struct sockaddr_un *)args->address;
514		struct sockaddr_in *sin =
515				(struct sockaddr_in *)args->address;
516		struct sockaddr_in6 *sin6 =
517				(struct sockaddr_in6 *)args->address;
518
519		servername[0] = '\0';
520		switch (args->address->sa_family) {
521		case AF_LOCAL:
522			snprintf(servername, sizeof(servername), "%s",
523				 sun->sun_path);
524			break;
525		case AF_INET:
526			snprintf(servername, sizeof(servername), "%pI4",
527				 &sin->sin_addr.s_addr);
528			break;
529		case AF_INET6:
530			snprintf(servername, sizeof(servername), "%pI6",
531				 &sin6->sin6_addr);
532			break;
533		default:
534			/* caller wants default server name, but
535			 * address family isn't recognized. */
536			return ERR_PTR(-EINVAL);
537		}
538		xprtargs.servername = servername;
539	}
540
541	xprt = xprt_create_transport(&xprtargs);
542	if (IS_ERR(xprt))
543		return (struct rpc_clnt *)xprt;
544
545	/*
546	 * By default, kernel RPC client connects from a reserved port.
547	 * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
548	 * but it is always enabled for rpciod, which handles the connect
549	 * operation.
550	 */
551	xprt->resvport = 1;
552	if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
553		xprt->resvport = 0;
554
555	return rpc_create_xprt(args, xprt);
556}
557EXPORT_SYMBOL_GPL(rpc_create);
558
559/*
560 * This function clones the RPC client structure. It allows us to share the
561 * same transport while varying parameters such as the authentication
562 * flavour.
563 */
564static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
565					   struct rpc_clnt *clnt)
566{
567	struct rpc_xprt *xprt;
568	struct rpc_clnt *new;
569	int err;
570
571	err = -ENOMEM;
572	rcu_read_lock();
573	xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
574	rcu_read_unlock();
575	if (xprt == NULL)
576		goto out_err;
577	args->servername = xprt->servername;
578	args->nodename = clnt->cl_nodename;
579
580	new = rpc_new_client(args, xprt, clnt);
581	if (IS_ERR(new)) {
582		err = PTR_ERR(new);
583		goto out_err;
584	}
585
586	/* Turn off autobind on clones */
587	new->cl_autobind = 0;
588	new->cl_softrtry = clnt->cl_softrtry;
589	new->cl_noretranstimeo = clnt->cl_noretranstimeo;
590	new->cl_discrtry = clnt->cl_discrtry;
591	new->cl_chatty = clnt->cl_chatty;
592	return new;
593
594out_err:
595	dprintk("RPC:       %s: returned error %d\n", __func__, err);
596	return ERR_PTR(err);
597}
598
599/**
600 * rpc_clone_client - Clone an RPC client structure
601 *
602 * @clnt: RPC client whose parameters are copied
603 *
604 * Returns a fresh RPC client or an ERR_PTR.
605 */
606struct rpc_clnt *rpc_clone_client(struct rpc_clnt *clnt)
607{
608	struct rpc_create_args args = {
609		.program	= clnt->cl_program,
610		.prognumber	= clnt->cl_prog,
611		.version	= clnt->cl_vers,
612		.authflavor	= clnt->cl_auth->au_flavor,
613	};
614	return __rpc_clone_client(&args, clnt);
615}
616EXPORT_SYMBOL_GPL(rpc_clone_client);
617
618/**
619 * rpc_clone_client_set_auth - Clone an RPC client structure and set its auth
620 *
621 * @clnt: RPC client whose parameters are copied
622 * @flavor: security flavor for new client
623 *
624 * Returns a fresh RPC client or an ERR_PTR.
625 */
626struct rpc_clnt *
627rpc_clone_client_set_auth(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
628{
629	struct rpc_create_args args = {
630		.program	= clnt->cl_program,
631		.prognumber	= clnt->cl_prog,
632		.version	= clnt->cl_vers,
633		.authflavor	= flavor,
634	};
635	return __rpc_clone_client(&args, clnt);
636}
637EXPORT_SYMBOL_GPL(rpc_clone_client_set_auth);
638
639/**
640 * rpc_switch_client_transport: switch the RPC transport on the fly
641 * @clnt: pointer to a struct rpc_clnt
642 * @args: pointer to the new transport arguments
643 * @timeout: pointer to the new timeout parameters
644 *
645 * This function allows the caller to switch the RPC transport for the
646 * rpc_clnt structure 'clnt' to allow it to connect to a mirrored NFS
647 * server, for instance.  It assumes that the caller has ensured that
648 * there are no active RPC tasks by using some form of locking.
649 *
650 * Returns zero if "clnt" is now using the new xprt.  Otherwise a
651 * negative errno is returned, and "clnt" continues to use the old
652 * xprt.
653 */
654int rpc_switch_client_transport(struct rpc_clnt *clnt,
655		struct xprt_create *args,
656		const struct rpc_timeout *timeout)
657{
658	const struct rpc_timeout *old_timeo;
659	rpc_authflavor_t pseudoflavor;
660	struct rpc_xprt *xprt, *old;
661	struct rpc_clnt *parent;
662	int err;
663
664	xprt = xprt_create_transport(args);
665	if (IS_ERR(xprt)) {
666		dprintk("RPC:       failed to create new xprt for clnt %p\n",
667			clnt);
668		return PTR_ERR(xprt);
669	}
670
671	pseudoflavor = clnt->cl_auth->au_flavor;
672
673	old_timeo = clnt->cl_timeout;
674	old = rpc_clnt_set_transport(clnt, xprt, timeout);
675
676	rpc_unregister_client(clnt);
677	__rpc_clnt_remove_pipedir(clnt);
678	rpc_clnt_debugfs_unregister(clnt);
679
680	/*
681	 * A new transport was created.  "clnt" therefore
682	 * becomes the root of a new cl_parent tree.  clnt's
683	 * children, if it has any, still point to the old xprt.
684	 */
685	parent = clnt->cl_parent;
686	clnt->cl_parent = clnt;
687
688	/*
689	 * The old rpc_auth cache cannot be re-used.  GSS
690	 * contexts in particular are between a single
691	 * client and server.
692	 */
693	err = rpc_client_register(clnt, pseudoflavor, NULL);
694	if (err)
695		goto out_revert;
696
697	synchronize_rcu();
698	if (parent != clnt)
699		rpc_release_client(parent);
700	xprt_put(old);
701	dprintk("RPC:       replaced xprt for clnt %p\n", clnt);
702	return 0;
703
704out_revert:
705	rpc_clnt_set_transport(clnt, old, old_timeo);
706	clnt->cl_parent = parent;
707	rpc_client_register(clnt, pseudoflavor, NULL);
708	xprt_put(xprt);
709	dprintk("RPC:       failed to switch xprt for clnt %p\n", clnt);
710	return err;
711}
712EXPORT_SYMBOL_GPL(rpc_switch_client_transport);
713
714/*
715 * Kill all tasks for the given client.
716 * XXX: kill their descendants as well?
717 */
718void rpc_killall_tasks(struct rpc_clnt *clnt)
719{
720	struct rpc_task	*rovr;
721
722
723	if (list_empty(&clnt->cl_tasks))
724		return;
725	dprintk("RPC:       killing all tasks for client %p\n", clnt);
726	/*
727	 * Spin lock all_tasks to prevent changes...
728	 */
729	spin_lock(&clnt->cl_lock);
730	list_for_each_entry(rovr, &clnt->cl_tasks, tk_task) {
731		if (!RPC_IS_ACTIVATED(rovr))
732			continue;
733		if (!(rovr->tk_flags & RPC_TASK_KILLED)) {
734			rovr->tk_flags |= RPC_TASK_KILLED;
735			rpc_exit(rovr, -EIO);
736			if (RPC_IS_QUEUED(rovr))
737				rpc_wake_up_queued_task(rovr->tk_waitqueue,
738							rovr);
739		}
740	}
741	spin_unlock(&clnt->cl_lock);
742}
743EXPORT_SYMBOL_GPL(rpc_killall_tasks);
744
745/*
746 * Properly shut down an RPC client, terminating all outstanding
747 * requests.
748 */
749void rpc_shutdown_client(struct rpc_clnt *clnt)
750{
751	might_sleep();
752
753	dprintk_rcu("RPC:       shutting down %s client for %s\n",
754			clnt->cl_program->name,
755			rcu_dereference(clnt->cl_xprt)->servername);
756
757	while (!list_empty(&clnt->cl_tasks)) {
758		rpc_killall_tasks(clnt);
759		wait_event_timeout(destroy_wait,
760			list_empty(&clnt->cl_tasks), 1*HZ);
761	}
762
763	rpc_release_client(clnt);
764}
765EXPORT_SYMBOL_GPL(rpc_shutdown_client);
766
767/*
768 * Free an RPC client
769 */
770static struct rpc_clnt *
771rpc_free_client(struct rpc_clnt *clnt)
772{
773	struct rpc_clnt *parent = NULL;
774
775	dprintk_rcu("RPC:       destroying %s client for %s\n",
776			clnt->cl_program->name,
777			rcu_dereference(clnt->cl_xprt)->servername);
778	if (clnt->cl_parent != clnt)
779		parent = clnt->cl_parent;
780	rpc_clnt_debugfs_unregister(clnt);
781	rpc_clnt_remove_pipedir(clnt);
782	rpc_unregister_client(clnt);
783	rpc_free_iostats(clnt->cl_metrics);
784	clnt->cl_metrics = NULL;
785	xprt_put(rcu_dereference_raw(clnt->cl_xprt));
786	rpciod_down();
787	rpc_free_clid(clnt);
788	kfree(clnt);
789	return parent;
790}
791
792/*
793 * Free an RPC client
794 */
795static struct rpc_clnt *
796rpc_free_auth(struct rpc_clnt *clnt)
797{
798	if (clnt->cl_auth == NULL)
799		return rpc_free_client(clnt);
800
801	/*
802	 * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
803	 *       release remaining GSS contexts. This mechanism ensures
804	 *       that it can do so safely.
805	 */
806	atomic_inc(&clnt->cl_count);
807	rpcauth_release(clnt->cl_auth);
808	clnt->cl_auth = NULL;
809	if (atomic_dec_and_test(&clnt->cl_count))
810		return rpc_free_client(clnt);
811	return NULL;
812}
813
814/*
815 * Release reference to the RPC client
816 */
817void
818rpc_release_client(struct rpc_clnt *clnt)
819{
820	dprintk("RPC:       rpc_release_client(%p)\n", clnt);
821
822	do {
823		if (list_empty(&clnt->cl_tasks))
824			wake_up(&destroy_wait);
825		if (!atomic_dec_and_test(&clnt->cl_count))
826			break;
827		clnt = rpc_free_auth(clnt);
828	} while (clnt != NULL);
829}
830EXPORT_SYMBOL_GPL(rpc_release_client);
831
832/**
833 * rpc_bind_new_program - bind a new RPC program to an existing client
834 * @old: old rpc_client
835 * @program: rpc program to set
836 * @vers: rpc program version
837 *
838 * Clones the rpc client and sets up a new RPC program. This is mainly
839 * of use for enabling different RPC programs to share the same transport.
840 * The Sun NFSv2/v3 ACL protocol can do this.
841 */
842struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
843				      const struct rpc_program *program,
844				      u32 vers)
845{
846	struct rpc_create_args args = {
847		.program	= program,
848		.prognumber	= program->number,
849		.version	= vers,
850		.authflavor	= old->cl_auth->au_flavor,
851	};
852	struct rpc_clnt *clnt;
853	int err;
854
855	clnt = __rpc_clone_client(&args, old);
856	if (IS_ERR(clnt))
857		goto out;
858	err = rpc_ping(clnt);
859	if (err != 0) {
860		rpc_shutdown_client(clnt);
861		clnt = ERR_PTR(err);
862	}
863out:
864	return clnt;
865}
866EXPORT_SYMBOL_GPL(rpc_bind_new_program);
867
868void rpc_task_release_client(struct rpc_task *task)
869{
870	struct rpc_clnt *clnt = task->tk_client;
871
872	if (clnt != NULL) {
873		/* Remove from client task list */
874		spin_lock(&clnt->cl_lock);
875		list_del(&task->tk_task);
876		spin_unlock(&clnt->cl_lock);
877		task->tk_client = NULL;
878
879		rpc_release_client(clnt);
880	}
881}
882
883static
884void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
885{
886	if (clnt != NULL) {
887		rpc_task_release_client(task);
888		task->tk_client = clnt;
889		atomic_inc(&clnt->cl_count);
890		if (clnt->cl_softrtry)
891			task->tk_flags |= RPC_TASK_SOFT;
892		if (clnt->cl_noretranstimeo)
893			task->tk_flags |= RPC_TASK_NO_RETRANS_TIMEOUT;
894		if (atomic_read(&clnt->cl_swapper))
895			task->tk_flags |= RPC_TASK_SWAPPER;
896		/* Add to the client's list of all tasks */
897		spin_lock(&clnt->cl_lock);
898		list_add_tail(&task->tk_task, &clnt->cl_tasks);
899		spin_unlock(&clnt->cl_lock);
900	}
901}
902
903void rpc_task_reset_client(struct rpc_task *task, struct rpc_clnt *clnt)
904{
905	rpc_task_release_client(task);
906	rpc_task_set_client(task, clnt);
907}
908EXPORT_SYMBOL_GPL(rpc_task_reset_client);
909
910
911static void
912rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
913{
914	if (msg != NULL) {
915		task->tk_msg.rpc_proc = msg->rpc_proc;
916		task->tk_msg.rpc_argp = msg->rpc_argp;
917		task->tk_msg.rpc_resp = msg->rpc_resp;
918		if (msg->rpc_cred != NULL)
919			task->tk_msg.rpc_cred = get_rpccred(msg->rpc_cred);
920	}
921}
922
923/*
924 * Default callback for async RPC calls
925 */
926static void
927rpc_default_callback(struct rpc_task *task, void *data)
928{
929}
930
931static const struct rpc_call_ops rpc_default_ops = {
932	.rpc_call_done = rpc_default_callback,
933};
934
935/**
936 * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
937 * @task_setup_data: pointer to task initialisation data
938 */
939struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
940{
941	struct rpc_task *task;
942
943	task = rpc_new_task(task_setup_data);
944	if (IS_ERR(task))
945		goto out;
946
947	rpc_task_set_client(task, task_setup_data->rpc_client);
948	rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
949
950	if (task->tk_action == NULL)
951		rpc_call_start(task);
952
953	atomic_inc(&task->tk_count);
954	rpc_execute(task);
955out:
956	return task;
957}
958EXPORT_SYMBOL_GPL(rpc_run_task);
959
960/**
961 * rpc_call_sync - Perform a synchronous RPC call
962 * @clnt: pointer to RPC client
963 * @msg: RPC call parameters
964 * @flags: RPC call flags
965 */
966int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
967{
968	struct rpc_task	*task;
969	struct rpc_task_setup task_setup_data = {
970		.rpc_client = clnt,
971		.rpc_message = msg,
972		.callback_ops = &rpc_default_ops,
973		.flags = flags,
974	};
975	int status;
976
977	WARN_ON_ONCE(flags & RPC_TASK_ASYNC);
978	if (flags & RPC_TASK_ASYNC) {
979		rpc_release_calldata(task_setup_data.callback_ops,
980			task_setup_data.callback_data);
981		return -EINVAL;
982	}
983
984	task = rpc_run_task(&task_setup_data);
985	if (IS_ERR(task))
986		return PTR_ERR(task);
987	status = task->tk_status;
988	rpc_put_task(task);
989	return status;
990}
991EXPORT_SYMBOL_GPL(rpc_call_sync);
992
993/**
994 * rpc_call_async - Perform an asynchronous RPC call
995 * @clnt: pointer to RPC client
996 * @msg: RPC call parameters
997 * @flags: RPC call flags
998 * @tk_ops: RPC call ops
999 * @data: user call data
1000 */
1001int
1002rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
1003	       const struct rpc_call_ops *tk_ops, void *data)
1004{
1005	struct rpc_task	*task;
1006	struct rpc_task_setup task_setup_data = {
1007		.rpc_client = clnt,
1008		.rpc_message = msg,
1009		.callback_ops = tk_ops,
1010		.callback_data = data,
1011		.flags = flags|RPC_TASK_ASYNC,
1012	};
1013
1014	task = rpc_run_task(&task_setup_data);
1015	if (IS_ERR(task))
1016		return PTR_ERR(task);
1017	rpc_put_task(task);
1018	return 0;
1019}
1020EXPORT_SYMBOL_GPL(rpc_call_async);
1021
1022#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1023/**
1024 * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
1025 * rpc_execute against it
1026 * @req: RPC request
1027 */
1028struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
1029{
1030	struct rpc_task *task;
1031	struct xdr_buf *xbufp = &req->rq_snd_buf;
1032	struct rpc_task_setup task_setup_data = {
1033		.callback_ops = &rpc_default_ops,
1034		.flags = RPC_TASK_SOFTCONN,
1035	};
1036
1037	dprintk("RPC: rpc_run_bc_task req= %p\n", req);
1038	/*
1039	 * Create an rpc_task to send the data
1040	 */
1041	task = rpc_new_task(&task_setup_data);
1042	if (IS_ERR(task)) {
1043		xprt_free_bc_request(req);
1044		goto out;
1045	}
1046	task->tk_rqstp = req;
1047
1048	/*
1049	 * Set up the xdr_buf length.
1050	 * This also indicates that the buffer is XDR encoded already.
1051	 */
1052	xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
1053			xbufp->tail[0].iov_len;
1054
1055	task->tk_action = call_bc_transmit;
1056	atomic_inc(&task->tk_count);
1057	WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
1058	rpc_execute(task);
1059
1060out:
1061	dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
1062	return task;
1063}
1064#endif /* CONFIG_SUNRPC_BACKCHANNEL */
1065
1066void
1067rpc_call_start(struct rpc_task *task)
1068{
1069	task->tk_action = call_start;
1070}
1071EXPORT_SYMBOL_GPL(rpc_call_start);
1072
1073/**
1074 * rpc_peeraddr - extract remote peer address from clnt's xprt
1075 * @clnt: RPC client structure
1076 * @buf: target buffer
1077 * @bufsize: length of target buffer
1078 *
1079 * Returns the number of bytes that are actually in the stored address.
1080 */
1081size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
1082{
1083	size_t bytes;
1084	struct rpc_xprt *xprt;
1085
1086	rcu_read_lock();
1087	xprt = rcu_dereference(clnt->cl_xprt);
1088
1089	bytes = xprt->addrlen;
1090	if (bytes > bufsize)
1091		bytes = bufsize;
1092	memcpy(buf, &xprt->addr, bytes);
1093	rcu_read_unlock();
1094
1095	return bytes;
1096}
1097EXPORT_SYMBOL_GPL(rpc_peeraddr);
1098
1099/**
1100 * rpc_peeraddr2str - return remote peer address in printable format
1101 * @clnt: RPC client structure
1102 * @format: address format
1103 *
1104 * NB: the lifetime of the memory referenced by the returned pointer is
1105 * the same as the rpc_xprt itself.  As long as the caller uses this
1106 * pointer, it must hold the RCU read lock.
1107 */
1108const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
1109			     enum rpc_display_format_t format)
1110{
1111	struct rpc_xprt *xprt;
1112
1113	xprt = rcu_dereference(clnt->cl_xprt);
1114
1115	if (xprt->address_strings[format] != NULL)
1116		return xprt->address_strings[format];
1117	else
1118		return "unprintable";
1119}
1120EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
1121
1122static const struct sockaddr_in rpc_inaddr_loopback = {
1123	.sin_family		= AF_INET,
1124	.sin_addr.s_addr	= htonl(INADDR_ANY),
1125};
1126
1127static const struct sockaddr_in6 rpc_in6addr_loopback = {
1128	.sin6_family		= AF_INET6,
1129	.sin6_addr		= IN6ADDR_ANY_INIT,
1130};
1131
1132/*
1133 * Try a getsockname() on a connected datagram socket.  Using a
1134 * connected datagram socket prevents leaving a socket in TIME_WAIT.
1135 * This conserves the ephemeral port number space.
1136 *
1137 * Returns zero and fills in "buf" if successful; otherwise, a
1138 * negative errno is returned.
1139 */
1140static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
1141			struct sockaddr *buf, int buflen)
1142{
1143	struct socket *sock;
1144	int err;
1145
1146	err = __sock_create(net, sap->sa_family,
1147				SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
1148	if (err < 0) {
1149		dprintk("RPC:       can't create UDP socket (%d)\n", err);
1150		goto out;
1151	}
1152
1153	switch (sap->sa_family) {
1154	case AF_INET:
1155		err = kernel_bind(sock,
1156				(struct sockaddr *)&rpc_inaddr_loopback,
1157				sizeof(rpc_inaddr_loopback));
1158		break;
1159	case AF_INET6:
1160		err = kernel_bind(sock,
1161				(struct sockaddr *)&rpc_in6addr_loopback,
1162				sizeof(rpc_in6addr_loopback));
1163		break;
1164	default:
1165		err = -EAFNOSUPPORT;
1166		goto out;
1167	}
1168	if (err < 0) {
1169		dprintk("RPC:       can't bind UDP socket (%d)\n", err);
1170		goto out_release;
1171	}
1172
1173	err = kernel_connect(sock, sap, salen, 0);
1174	if (err < 0) {
1175		dprintk("RPC:       can't connect UDP socket (%d)\n", err);
1176		goto out_release;
1177	}
1178
1179	err = kernel_getsockname(sock, buf, &buflen);
1180	if (err < 0) {
1181		dprintk("RPC:       getsockname failed (%d)\n", err);
1182		goto out_release;
1183	}
1184
1185	err = 0;
1186	if (buf->sa_family == AF_INET6) {
1187		struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
1188		sin6->sin6_scope_id = 0;
1189	}
1190	dprintk("RPC:       %s succeeded\n", __func__);
1191
1192out_release:
1193	sock_release(sock);
1194out:
1195	return err;
1196}
1197
1198/*
1199 * Scraping a connected socket failed, so we don't have a useable
1200 * local address.  Fallback: generate an address that will prevent
1201 * the server from calling us back.
1202 *
1203 * Returns zero and fills in "buf" if successful; otherwise, a
1204 * negative errno is returned.
1205 */
1206static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
1207{
1208	switch (family) {
1209	case AF_INET:
1210		if (buflen < sizeof(rpc_inaddr_loopback))
1211			return -EINVAL;
1212		memcpy(buf, &rpc_inaddr_loopback,
1213				sizeof(rpc_inaddr_loopback));
1214		break;
1215	case AF_INET6:
1216		if (buflen < sizeof(rpc_in6addr_loopback))
1217			return -EINVAL;
1218		memcpy(buf, &rpc_in6addr_loopback,
1219				sizeof(rpc_in6addr_loopback));
1220	default:
1221		dprintk("RPC:       %s: address family not supported\n",
1222			__func__);
1223		return -EAFNOSUPPORT;
1224	}
1225	dprintk("RPC:       %s: succeeded\n", __func__);
1226	return 0;
1227}
1228
1229/**
1230 * rpc_localaddr - discover local endpoint address for an RPC client
1231 * @clnt: RPC client structure
1232 * @buf: target buffer
1233 * @buflen: size of target buffer, in bytes
1234 *
1235 * Returns zero and fills in "buf" and "buflen" if successful;
1236 * otherwise, a negative errno is returned.
1237 *
1238 * This works even if the underlying transport is not currently connected,
1239 * or if the upper layer never previously provided a source address.
1240 *
1241 * The result of this function call is transient: multiple calls in
1242 * succession may give different results, depending on how local
1243 * networking configuration changes over time.
1244 */
1245int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
1246{
1247	struct sockaddr_storage address;
1248	struct sockaddr *sap = (struct sockaddr *)&address;
1249	struct rpc_xprt *xprt;
1250	struct net *net;
1251	size_t salen;
1252	int err;
1253
1254	rcu_read_lock();
1255	xprt = rcu_dereference(clnt->cl_xprt);
1256	salen = xprt->addrlen;
1257	memcpy(sap, &xprt->addr, salen);
1258	net = get_net(xprt->xprt_net);
1259	rcu_read_unlock();
1260
1261	rpc_set_port(sap, 0);
1262	err = rpc_sockname(net, sap, salen, buf, buflen);
1263	put_net(net);
1264	if (err != 0)
1265		/* Couldn't discover local address, return ANYADDR */
1266		return rpc_anyaddr(sap->sa_family, buf, buflen);
1267	return 0;
1268}
1269EXPORT_SYMBOL_GPL(rpc_localaddr);
1270
1271void
1272rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
1273{
1274	struct rpc_xprt *xprt;
1275
1276	rcu_read_lock();
1277	xprt = rcu_dereference(clnt->cl_xprt);
1278	if (xprt->ops->set_buffer_size)
1279		xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
1280	rcu_read_unlock();
1281}
1282EXPORT_SYMBOL_GPL(rpc_setbufsize);
1283
1284/**
1285 * rpc_protocol - Get transport protocol number for an RPC client
1286 * @clnt: RPC client to query
1287 *
1288 */
1289int rpc_protocol(struct rpc_clnt *clnt)
1290{
1291	int protocol;
1292
1293	rcu_read_lock();
1294	protocol = rcu_dereference(clnt->cl_xprt)->prot;
1295	rcu_read_unlock();
1296	return protocol;
1297}
1298EXPORT_SYMBOL_GPL(rpc_protocol);
1299
1300/**
1301 * rpc_net_ns - Get the network namespace for this RPC client
1302 * @clnt: RPC client to query
1303 *
1304 */
1305struct net *rpc_net_ns(struct rpc_clnt *clnt)
1306{
1307	struct net *ret;
1308
1309	rcu_read_lock();
1310	ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
1311	rcu_read_unlock();
1312	return ret;
1313}
1314EXPORT_SYMBOL_GPL(rpc_net_ns);
1315
1316/**
1317 * rpc_max_payload - Get maximum payload size for a transport, in bytes
1318 * @clnt: RPC client to query
1319 *
1320 * For stream transports, this is one RPC record fragment (see RFC
1321 * 1831), as we don't support multi-record requests yet.  For datagram
1322 * transports, this is the size of an IP packet minus the IP, UDP, and
1323 * RPC header sizes.
1324 */
1325size_t rpc_max_payload(struct rpc_clnt *clnt)
1326{
1327	size_t ret;
1328
1329	rcu_read_lock();
1330	ret = rcu_dereference(clnt->cl_xprt)->max_payload;
1331	rcu_read_unlock();
1332	return ret;
1333}
1334EXPORT_SYMBOL_GPL(rpc_max_payload);
1335
1336/**
1337 * rpc_get_timeout - Get timeout for transport in units of HZ
1338 * @clnt: RPC client to query
1339 */
1340unsigned long rpc_get_timeout(struct rpc_clnt *clnt)
1341{
1342	unsigned long ret;
1343
1344	rcu_read_lock();
1345	ret = rcu_dereference(clnt->cl_xprt)->timeout->to_initval;
1346	rcu_read_unlock();
1347	return ret;
1348}
1349EXPORT_SYMBOL_GPL(rpc_get_timeout);
1350
1351/**
1352 * rpc_force_rebind - force transport to check that remote port is unchanged
1353 * @clnt: client to rebind
1354 *
1355 */
1356void rpc_force_rebind(struct rpc_clnt *clnt)
1357{
1358	if (clnt->cl_autobind) {
1359		rcu_read_lock();
1360		xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
1361		rcu_read_unlock();
1362	}
1363}
1364EXPORT_SYMBOL_GPL(rpc_force_rebind);
1365
1366/*
1367 * Restart an (async) RPC call from the call_prepare state.
1368 * Usually called from within the exit handler.
1369 */
1370int
1371rpc_restart_call_prepare(struct rpc_task *task)
1372{
1373	if (RPC_ASSASSINATED(task))
1374		return 0;
1375	task->tk_action = call_start;
1376	task->tk_status = 0;
1377	if (task->tk_ops->rpc_call_prepare != NULL)
1378		task->tk_action = rpc_prepare_task;
1379	return 1;
1380}
1381EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
1382
1383/*
1384 * Restart an (async) RPC call. Usually called from within the
1385 * exit handler.
1386 */
1387int
1388rpc_restart_call(struct rpc_task *task)
1389{
1390	if (RPC_ASSASSINATED(task))
1391		return 0;
1392	task->tk_action = call_start;
1393	task->tk_status = 0;
1394	return 1;
1395}
1396EXPORT_SYMBOL_GPL(rpc_restart_call);
1397
1398#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
1399const char
1400*rpc_proc_name(const struct rpc_task *task)
1401{
1402	const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1403
1404	if (proc) {
1405		if (proc->p_name)
1406			return proc->p_name;
1407		else
1408			return "NULL";
1409	} else
1410		return "no proc";
1411}
1412#endif
1413
1414/*
1415 * 0.  Initial state
1416 *
1417 *     Other FSM states can be visited zero or more times, but
1418 *     this state is visited exactly once for each RPC.
1419 */
1420static void
1421call_start(struct rpc_task *task)
1422{
1423	struct rpc_clnt	*clnt = task->tk_client;
1424
1425	dprintk("RPC: %5u call_start %s%d proc %s (%s)\n", task->tk_pid,
1426			clnt->cl_program->name, clnt->cl_vers,
1427			rpc_proc_name(task),
1428			(RPC_IS_ASYNC(task) ? "async" : "sync"));
1429
1430	/* Increment call count */
1431	task->tk_msg.rpc_proc->p_count++;
1432	clnt->cl_stats->rpccnt++;
1433	task->tk_action = call_reserve;
1434}
1435
1436/*
1437 * 1.	Reserve an RPC call slot
1438 */
1439static void
1440call_reserve(struct rpc_task *task)
1441{
1442	dprint_status(task);
1443
1444	task->tk_status  = 0;
1445	task->tk_action  = call_reserveresult;
1446	xprt_reserve(task);
1447}
1448
1449static void call_retry_reserve(struct rpc_task *task);
1450
1451/*
1452 * 1b.	Grok the result of xprt_reserve()
1453 */
1454static void
1455call_reserveresult(struct rpc_task *task)
1456{
1457	int status = task->tk_status;
1458
1459	dprint_status(task);
1460
1461	/*
1462	 * After a call to xprt_reserve(), we must have either
1463	 * a request slot or else an error status.
1464	 */
1465	task->tk_status = 0;
1466	if (status >= 0) {
1467		if (task->tk_rqstp) {
1468			task->tk_action = call_refresh;
1469			return;
1470		}
1471
1472		printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
1473				__func__, status);
1474		rpc_exit(task, -EIO);
1475		return;
1476	}
1477
1478	/*
1479	 * Even though there was an error, we may have acquired
1480	 * a request slot somehow.  Make sure not to leak it.
1481	 */
1482	if (task->tk_rqstp) {
1483		printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
1484				__func__, status);
1485		xprt_release(task);
1486	}
1487
1488	switch (status) {
1489	case -ENOMEM:
1490		rpc_delay(task, HZ >> 2);
1491	case -EAGAIN:	/* woken up; retry */
1492		task->tk_action = call_retry_reserve;
1493		return;
1494	case -EIO:	/* probably a shutdown */
1495		break;
1496	default:
1497		printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
1498				__func__, status);
1499		break;
1500	}
1501	rpc_exit(task, status);
1502}
1503
1504/*
1505 * 1c.	Retry reserving an RPC call slot
1506 */
1507static void
1508call_retry_reserve(struct rpc_task *task)
1509{
1510	dprint_status(task);
1511
1512	task->tk_status  = 0;
1513	task->tk_action  = call_reserveresult;
1514	xprt_retry_reserve(task);
1515}
1516
1517/*
1518 * 2.	Bind and/or refresh the credentials
1519 */
1520static void
1521call_refresh(struct rpc_task *task)
1522{
1523	dprint_status(task);
1524
1525	task->tk_action = call_refreshresult;
1526	task->tk_status = 0;
1527	task->tk_client->cl_stats->rpcauthrefresh++;
1528	rpcauth_refreshcred(task);
1529}
1530
1531/*
1532 * 2a.	Process the results of a credential refresh
1533 */
1534static void
1535call_refreshresult(struct rpc_task *task)
1536{
1537	int status = task->tk_status;
1538
1539	dprint_status(task);
1540
1541	task->tk_status = 0;
1542	task->tk_action = call_refresh;
1543	switch (status) {
1544	case 0:
1545		if (rpcauth_uptodatecred(task)) {
1546			task->tk_action = call_allocate;
1547			return;
1548		}
1549		/* Use rate-limiting and a max number of retries if refresh
1550		 * had status 0 but failed to update the cred.
1551		 */
1552	case -ETIMEDOUT:
1553		rpc_delay(task, 3*HZ);
1554	case -EAGAIN:
1555		status = -EACCES;
1556	case -EKEYEXPIRED:
1557		if (!task->tk_cred_retry)
1558			break;
1559		task->tk_cred_retry--;
1560		dprintk("RPC: %5u %s: retry refresh creds\n",
1561				task->tk_pid, __func__);
1562		return;
1563	}
1564	dprintk("RPC: %5u %s: refresh creds failed with error %d\n",
1565				task->tk_pid, __func__, status);
1566	rpc_exit(task, status);
1567}
1568
1569/*
1570 * 2b.	Allocate the buffer. For details, see sched.c:rpc_malloc.
1571 *	(Note: buffer memory is freed in xprt_release).
1572 */
1573static void
1574call_allocate(struct rpc_task *task)
1575{
1576	unsigned int slack = task->tk_rqstp->rq_cred->cr_auth->au_cslack;
1577	struct rpc_rqst *req = task->tk_rqstp;
1578	struct rpc_xprt *xprt = req->rq_xprt;
1579	struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1580
1581	dprint_status(task);
1582
1583	task->tk_status = 0;
1584	task->tk_action = call_bind;
1585
1586	if (req->rq_buffer)
1587		return;
1588
1589	if (proc->p_proc != 0) {
1590		BUG_ON(proc->p_arglen == 0);
1591		if (proc->p_decode != NULL)
1592			BUG_ON(proc->p_replen == 0);
1593	}
1594
1595	/*
1596	 * Calculate the size (in quads) of the RPC call
1597	 * and reply headers, and convert both values
1598	 * to byte sizes.
1599	 */
1600	req->rq_callsize = RPC_CALLHDRSIZE + (slack << 1) + proc->p_arglen;
1601	req->rq_callsize <<= 2;
1602	req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
1603	req->rq_rcvsize <<= 2;
1604
1605	req->rq_buffer = xprt->ops->buf_alloc(task,
1606					req->rq_callsize + req->rq_rcvsize);
1607	if (req->rq_buffer != NULL)
1608		return;
1609	xprt_inject_disconnect(xprt);
1610
1611	dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
1612
1613	if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1614		task->tk_action = call_allocate;
1615		rpc_delay(task, HZ>>4);
1616		return;
1617	}
1618
1619	rpc_exit(task, -ERESTARTSYS);
1620}
1621
1622static inline int
1623rpc_task_need_encode(struct rpc_task *task)
1624{
1625	return task->tk_rqstp->rq_snd_buf.len == 0;
1626}
1627
1628static inline void
1629rpc_task_force_reencode(struct rpc_task *task)
1630{
1631	task->tk_rqstp->rq_snd_buf.len = 0;
1632	task->tk_rqstp->rq_bytes_sent = 0;
1633}
1634
1635static inline void
1636rpc_xdr_buf_init(struct xdr_buf *buf, void *start, size_t len)
1637{
1638	buf->head[0].iov_base = start;
1639	buf->head[0].iov_len = len;
1640	buf->tail[0].iov_len = 0;
1641	buf->page_len = 0;
1642	buf->flags = 0;
1643	buf->len = 0;
1644	buf->buflen = len;
1645}
1646
1647/*
1648 * 3.	Encode arguments of an RPC call
1649 */
1650static void
1651rpc_xdr_encode(struct rpc_task *task)
1652{
1653	struct rpc_rqst	*req = task->tk_rqstp;
1654	kxdreproc_t	encode;
1655	__be32		*p;
1656
1657	dprint_status(task);
1658
1659	rpc_xdr_buf_init(&req->rq_snd_buf,
1660			 req->rq_buffer,
1661			 req->rq_callsize);
1662	rpc_xdr_buf_init(&req->rq_rcv_buf,
1663			 (char *)req->rq_buffer + req->rq_callsize,
1664			 req->rq_rcvsize);
1665
1666	p = rpc_encode_header(task);
1667	if (p == NULL) {
1668		printk(KERN_INFO "RPC: couldn't encode RPC header, exit EIO\n");
1669		rpc_exit(task, -EIO);
1670		return;
1671	}
1672
1673	encode = task->tk_msg.rpc_proc->p_encode;
1674	if (encode == NULL)
1675		return;
1676
1677	task->tk_status = rpcauth_wrap_req(task, encode, req, p,
1678			task->tk_msg.rpc_argp);
1679}
1680
1681/*
1682 * 4.	Get the server port number if not yet set
1683 */
1684static void
1685call_bind(struct rpc_task *task)
1686{
1687	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1688
1689	dprint_status(task);
1690
1691	task->tk_action = call_connect;
1692	if (!xprt_bound(xprt)) {
1693		task->tk_action = call_bind_status;
1694		task->tk_timeout = xprt->bind_timeout;
1695		xprt->ops->rpcbind(task);
1696	}
1697}
1698
1699/*
1700 * 4a.	Sort out bind result
1701 */
1702static void
1703call_bind_status(struct rpc_task *task)
1704{
1705	int status = -EIO;
1706
1707	if (task->tk_status >= 0) {
1708		dprint_status(task);
1709		task->tk_status = 0;
1710		task->tk_action = call_connect;
1711		return;
1712	}
1713
1714	trace_rpc_bind_status(task);
1715	switch (task->tk_status) {
1716	case -ENOMEM:
1717		dprintk("RPC: %5u rpcbind out of memory\n", task->tk_pid);
1718		rpc_delay(task, HZ >> 2);
1719		goto retry_timeout;
1720	case -EACCES:
1721		dprintk("RPC: %5u remote rpcbind: RPC program/version "
1722				"unavailable\n", task->tk_pid);
1723		/* fail immediately if this is an RPC ping */
1724		if (task->tk_msg.rpc_proc->p_proc == 0) {
1725			status = -EOPNOTSUPP;
1726			break;
1727		}
1728		if (task->tk_rebind_retry == 0)
1729			break;
1730		task->tk_rebind_retry--;
1731		rpc_delay(task, 3*HZ);
1732		goto retry_timeout;
1733	case -ETIMEDOUT:
1734		dprintk("RPC: %5u rpcbind request timed out\n",
1735				task->tk_pid);
1736		goto retry_timeout;
1737	case -EPFNOSUPPORT:
1738		/* server doesn't support any rpcbind version we know of */
1739		dprintk("RPC: %5u unrecognized remote rpcbind service\n",
1740				task->tk_pid);
1741		break;
1742	case -EPROTONOSUPPORT:
1743		dprintk("RPC: %5u remote rpcbind version unavailable, retrying\n",
1744				task->tk_pid);
1745		goto retry_timeout;
1746	case -ECONNREFUSED:		/* connection problems */
1747	case -ECONNRESET:
1748	case -ECONNABORTED:
1749	case -ENOTCONN:
1750	case -EHOSTDOWN:
1751	case -EHOSTUNREACH:
1752	case -ENETUNREACH:
1753	case -ENOBUFS:
1754	case -EPIPE:
1755		dprintk("RPC: %5u remote rpcbind unreachable: %d\n",
1756				task->tk_pid, task->tk_status);
1757		if (!RPC_IS_SOFTCONN(task)) {
1758			rpc_delay(task, 5*HZ);
1759			goto retry_timeout;
1760		}
1761		status = task->tk_status;
1762		break;
1763	default:
1764		dprintk("RPC: %5u unrecognized rpcbind error (%d)\n",
1765				task->tk_pid, -task->tk_status);
1766	}
1767
1768	rpc_exit(task, status);
1769	return;
1770
1771retry_timeout:
1772	task->tk_status = 0;
1773	task->tk_action = call_timeout;
1774}
1775
1776/*
1777 * 4b.	Connect to the RPC server
1778 */
1779static void
1780call_connect(struct rpc_task *task)
1781{
1782	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1783
1784	dprintk("RPC: %5u call_connect xprt %p %s connected\n",
1785			task->tk_pid, xprt,
1786			(xprt_connected(xprt) ? "is" : "is not"));
1787
1788	task->tk_action = call_transmit;
1789	if (!xprt_connected(xprt)) {
1790		task->tk_action = call_connect_status;
1791		if (task->tk_status < 0)
1792			return;
1793		if (task->tk_flags & RPC_TASK_NOCONNECT) {
1794			rpc_exit(task, -ENOTCONN);
1795			return;
1796		}
1797		xprt_connect(task);
1798	}
1799}
1800
1801/*
1802 * 4c.	Sort out connect result
1803 */
1804static void
1805call_connect_status(struct rpc_task *task)
1806{
1807	struct rpc_clnt *clnt = task->tk_client;
1808	int status = task->tk_status;
1809
1810	dprint_status(task);
1811
1812	trace_rpc_connect_status(task, status);
1813	task->tk_status = 0;
1814	switch (status) {
1815	case -ECONNREFUSED:
1816	case -ECONNRESET:
1817	case -ECONNABORTED:
1818	case -ENETUNREACH:
1819	case -EHOSTUNREACH:
1820	case -EADDRINUSE:
1821	case -ENOBUFS:
1822	case -EPIPE:
1823		if (RPC_IS_SOFTCONN(task))
1824			break;
1825		/* retry with existing socket, after a delay */
1826		rpc_delay(task, 3*HZ);
1827	case -EAGAIN:
1828		/* Check for timeouts before looping back to call_bind */
1829	case -ETIMEDOUT:
1830		task->tk_action = call_timeout;
1831		return;
1832	case 0:
1833		clnt->cl_stats->netreconn++;
1834		task->tk_action = call_transmit;
1835		return;
1836	}
1837	rpc_exit(task, status);
1838}
1839
1840/*
1841 * 5.	Transmit the RPC request, and wait for reply
1842 */
1843static void
1844call_transmit(struct rpc_task *task)
1845{
1846	int is_retrans = RPC_WAS_SENT(task);
1847
1848	dprint_status(task);
1849
1850	task->tk_action = call_status;
1851	if (task->tk_status < 0)
1852		return;
1853	if (!xprt_prepare_transmit(task))
1854		return;
1855	task->tk_action = call_transmit_status;
1856	/* Encode here so that rpcsec_gss can use correct sequence number. */
1857	if (rpc_task_need_encode(task)) {
1858		rpc_xdr_encode(task);
1859		/* Did the encode result in an error condition? */
1860		if (task->tk_status != 0) {
1861			/* Was the error nonfatal? */
1862			if (task->tk_status == -EAGAIN)
1863				rpc_delay(task, HZ >> 4);
1864			else
1865				rpc_exit(task, task->tk_status);
1866			return;
1867		}
1868	}
1869	xprt_transmit(task);
1870	if (task->tk_status < 0)
1871		return;
1872	if (is_retrans)
1873		task->tk_client->cl_stats->rpcretrans++;
1874	/*
1875	 * On success, ensure that we call xprt_end_transmit() before sleeping
1876	 * in order to allow access to the socket to other RPC requests.
1877	 */
1878	call_transmit_status(task);
1879	if (rpc_reply_expected(task))
1880		return;
1881	task->tk_action = rpc_exit_task;
1882	rpc_wake_up_queued_task(&task->tk_rqstp->rq_xprt->pending, task);
1883}
1884
1885/*
1886 * 5a.	Handle cleanup after a transmission
1887 */
1888static void
1889call_transmit_status(struct rpc_task *task)
1890{
1891	task->tk_action = call_status;
1892
1893	/*
1894	 * Common case: success.  Force the compiler to put this
1895	 * test first.
1896	 */
1897	if (task->tk_status == 0) {
1898		xprt_end_transmit(task);
1899		rpc_task_force_reencode(task);
1900		return;
1901	}
1902
1903	switch (task->tk_status) {
1904	case -EAGAIN:
1905	case -ENOBUFS:
1906		break;
1907	default:
1908		dprint_status(task);
1909		xprt_end_transmit(task);
1910		rpc_task_force_reencode(task);
1911		break;
1912		/*
1913		 * Special cases: if we've been waiting on the
1914		 * socket's write_space() callback, or if the
1915		 * socket just returned a connection error,
1916		 * then hold onto the transport lock.
1917		 */
1918	case -ECONNREFUSED:
1919	case -EHOSTDOWN:
1920	case -EHOSTUNREACH:
1921	case -ENETUNREACH:
1922	case -EPERM:
1923		if (RPC_IS_SOFTCONN(task)) {
1924			xprt_end_transmit(task);
1925			rpc_exit(task, task->tk_status);
1926			break;
1927		}
1928	case -ECONNRESET:
1929	case -ECONNABORTED:
1930	case -EADDRINUSE:
1931	case -ENOTCONN:
1932	case -EPIPE:
1933		rpc_task_force_reencode(task);
1934	}
1935}
1936
1937#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1938/*
1939 * 5b.	Send the backchannel RPC reply.  On error, drop the reply.  In
1940 * addition, disconnect on connectivity errors.
1941 */
1942static void
1943call_bc_transmit(struct rpc_task *task)
1944{
1945	struct rpc_rqst *req = task->tk_rqstp;
1946
1947	if (!xprt_prepare_transmit(task))
1948		goto out_retry;
1949
1950	if (task->tk_status < 0) {
1951		printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1952			"error: %d\n", task->tk_status);
1953		goto out_done;
1954	}
1955	if (req->rq_connect_cookie != req->rq_xprt->connect_cookie)
1956		req->rq_bytes_sent = 0;
1957
1958	xprt_transmit(task);
1959
1960	if (task->tk_status == -EAGAIN)
1961		goto out_nospace;
1962
1963	xprt_end_transmit(task);
1964	dprint_status(task);
1965	switch (task->tk_status) {
1966	case 0:
1967		/* Success */
1968	case -EHOSTDOWN:
1969	case -EHOSTUNREACH:
1970	case -ENETUNREACH:
1971	case -ECONNRESET:
1972	case -ECONNREFUSED:
1973	case -EADDRINUSE:
1974	case -ENOTCONN:
1975	case -EPIPE:
1976		break;
1977	case -ETIMEDOUT:
1978		/*
1979		 * Problem reaching the server.  Disconnect and let the
1980		 * forechannel reestablish the connection.  The server will
1981		 * have to retransmit the backchannel request and we'll
1982		 * reprocess it.  Since these ops are idempotent, there's no
1983		 * need to cache our reply at this time.
1984		 */
1985		printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1986			"error: %d\n", task->tk_status);
1987		xprt_conditional_disconnect(req->rq_xprt,
1988			req->rq_connect_cookie);
1989		break;
1990	default:
1991		/*
1992		 * We were unable to reply and will have to drop the
1993		 * request.  The server should reconnect and retransmit.
1994		 */
1995		WARN_ON_ONCE(task->tk_status == -EAGAIN);
1996		printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1997			"error: %d\n", task->tk_status);
1998		break;
1999	}
2000	rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
2001out_done:
2002	task->tk_action = rpc_exit_task;
2003	return;
2004out_nospace:
2005	req->rq_connect_cookie = req->rq_xprt->connect_cookie;
2006out_retry:
2007	task->tk_status = 0;
2008}
2009#endif /* CONFIG_SUNRPC_BACKCHANNEL */
2010
2011/*
2012 * 6.	Sort out the RPC call status
2013 */
2014static void
2015call_status(struct rpc_task *task)
2016{
2017	struct rpc_clnt	*clnt = task->tk_client;
2018	struct rpc_rqst	*req = task->tk_rqstp;
2019	int		status;
2020
2021	if (req->rq_reply_bytes_recvd > 0 && !req->rq_bytes_sent)
2022		task->tk_status = req->rq_reply_bytes_recvd;
2023
2024	dprint_status(task);
2025
2026	status = task->tk_status;
2027	if (status >= 0) {
2028		task->tk_action = call_decode;
2029		return;
2030	}
2031
2032	trace_rpc_call_status(task);
2033	task->tk_status = 0;
2034	switch(status) {
2035	case -EHOSTDOWN:
2036	case -EHOSTUNREACH:
2037	case -ENETUNREACH:
2038	case -EPERM:
2039		if (RPC_IS_SOFTCONN(task)) {
2040			rpc_exit(task, status);
2041			break;
2042		}
2043		/*
2044		 * Delay any retries for 3 seconds, then handle as if it
2045		 * were a timeout.
2046		 */
2047		rpc_delay(task, 3*HZ);
2048	case -ETIMEDOUT:
2049		task->tk_action = call_timeout;
2050		if (!(task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT)
2051		    && task->tk_client->cl_discrtry)
2052			xprt_conditional_disconnect(req->rq_xprt,
2053					req->rq_connect_cookie);
2054		break;
2055	case -ECONNREFUSED:
2056	case -ECONNRESET:
2057	case -ECONNABORTED:
2058		rpc_force_rebind(clnt);
2059	case -EADDRINUSE:
2060		rpc_delay(task, 3*HZ);
2061	case -EPIPE:
2062	case -ENOTCONN:
2063		task->tk_action = call_bind;
2064		break;
2065	case -ENOBUFS:
2066		rpc_delay(task, HZ>>2);
2067	case -EAGAIN:
2068		task->tk_action = call_transmit;
2069		break;
2070	case -EIO:
2071		/* shutdown or soft timeout */
2072		rpc_exit(task, status);
2073		break;
2074	default:
2075		if (clnt->cl_chatty)
2076			printk("%s: RPC call returned error %d\n",
2077			       clnt->cl_program->name, -status);
2078		rpc_exit(task, status);
2079	}
2080}
2081
2082/*
2083 * 6a.	Handle RPC timeout
2084 * 	We do not release the request slot, so we keep using the
2085 *	same XID for all retransmits.
2086 */
2087static void
2088call_timeout(struct rpc_task *task)
2089{
2090	struct rpc_clnt	*clnt = task->tk_client;
2091
2092	if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
2093		dprintk("RPC: %5u call_timeout (minor)\n", task->tk_pid);
2094		goto retry;
2095	}
2096
2097	dprintk("RPC: %5u call_timeout (major)\n", task->tk_pid);
2098	task->tk_timeouts++;
2099
2100	if (RPC_IS_SOFTCONN(task)) {
2101		rpc_exit(task, -ETIMEDOUT);
2102		return;
2103	}
2104	if (RPC_IS_SOFT(task)) {
2105		if (clnt->cl_chatty) {
2106			rcu_read_lock();
2107			printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
2108				clnt->cl_program->name,
2109				rcu_dereference(clnt->cl_xprt)->servername);
2110			rcu_read_unlock();
2111		}
2112		if (task->tk_flags & RPC_TASK_TIMEOUT)
2113			rpc_exit(task, -ETIMEDOUT);
2114		else
2115			rpc_exit(task, -EIO);
2116		return;
2117	}
2118
2119	if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
2120		task->tk_flags |= RPC_CALL_MAJORSEEN;
2121		if (clnt->cl_chatty) {
2122			rcu_read_lock();
2123			printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
2124			clnt->cl_program->name,
2125			rcu_dereference(clnt->cl_xprt)->servername);
2126			rcu_read_unlock();
2127		}
2128	}
2129	rpc_force_rebind(clnt);
2130	/*
2131	 * Did our request time out due to an RPCSEC_GSS out-of-sequence
2132	 * event? RFC2203 requires the server to drop all such requests.
2133	 */
2134	rpcauth_invalcred(task);
2135
2136retry:
2137	task->tk_action = call_bind;
2138	task->tk_status = 0;
2139}
2140
2141/*
2142 * 7.	Decode the RPC reply
2143 */
2144static void
2145call_decode(struct rpc_task *task)
2146{
2147	struct rpc_clnt	*clnt = task->tk_client;
2148	struct rpc_rqst	*req = task->tk_rqstp;
2149	kxdrdproc_t	decode = task->tk_msg.rpc_proc->p_decode;
2150	__be32		*p;
2151
2152	dprint_status(task);
2153
2154	if (task->tk_flags & RPC_CALL_MAJORSEEN) {
2155		if (clnt->cl_chatty) {
2156			rcu_read_lock();
2157			printk(KERN_NOTICE "%s: server %s OK\n",
2158				clnt->cl_program->name,
2159				rcu_dereference(clnt->cl_xprt)->servername);
2160			rcu_read_unlock();
2161		}
2162		task->tk_flags &= ~RPC_CALL_MAJORSEEN;
2163	}
2164
2165	/*
2166	 * Ensure that we see all writes made by xprt_complete_rqst()
2167	 * before it changed req->rq_reply_bytes_recvd.
2168	 */
2169	smp_rmb();
2170	req->rq_rcv_buf.len = req->rq_private_buf.len;
2171
2172	/* Check that the softirq receive buffer is valid */
2173	WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
2174				sizeof(req->rq_rcv_buf)) != 0);
2175
2176	if (req->rq_rcv_buf.len < 12) {
2177		if (!RPC_IS_SOFT(task)) {
2178			task->tk_action = call_bind;
2179			goto out_retry;
2180		}
2181		dprintk("RPC:       %s: too small RPC reply size (%d bytes)\n",
2182				clnt->cl_program->name, task->tk_status);
2183		task->tk_action = call_timeout;
2184		goto out_retry;
2185	}
2186
2187	p = rpc_verify_header(task);
2188	if (IS_ERR(p)) {
2189		if (p == ERR_PTR(-EAGAIN))
2190			goto out_retry;
2191		return;
2192	}
2193
2194	task->tk_action = rpc_exit_task;
2195
2196	if (decode) {
2197		task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
2198						      task->tk_msg.rpc_resp);
2199	}
2200	dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
2201			task->tk_status);
2202	return;
2203out_retry:
2204	task->tk_status = 0;
2205	/* Note: rpc_verify_header() may have freed the RPC slot */
2206	if (task->tk_rqstp == req) {
2207		req->rq_reply_bytes_recvd = req->rq_rcv_buf.len = 0;
2208		if (task->tk_client->cl_discrtry)
2209			xprt_conditional_disconnect(req->rq_xprt,
2210					req->rq_connect_cookie);
2211	}
2212}
2213
2214static __be32 *
2215rpc_encode_header(struct rpc_task *task)
2216{
2217	struct rpc_clnt *clnt = task->tk_client;
2218	struct rpc_rqst	*req = task->tk_rqstp;
2219	__be32		*p = req->rq_svec[0].iov_base;
2220
2221	/* FIXME: check buffer size? */
2222
2223	p = xprt_skip_transport_header(req->rq_xprt, p);
2224	*p++ = req->rq_xid;		/* XID */
2225	*p++ = htonl(RPC_CALL);		/* CALL */
2226	*p++ = htonl(RPC_VERSION);	/* RPC version */
2227	*p++ = htonl(clnt->cl_prog);	/* program number */
2228	*p++ = htonl(clnt->cl_vers);	/* program version */
2229	*p++ = htonl(task->tk_msg.rpc_proc->p_proc);	/* procedure */
2230	p = rpcauth_marshcred(task, p);
2231	req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
2232	return p;
2233}
2234
2235static __be32 *
2236rpc_verify_header(struct rpc_task *task)
2237{
2238	struct rpc_clnt *clnt = task->tk_client;
2239	struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
2240	int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
2241	__be32	*p = iov->iov_base;
2242	u32 n;
2243	int error = -EACCES;
2244
2245	if ((task->tk_rqstp->rq_rcv_buf.len & 3) != 0) {
2246		/* RFC-1014 says that the representation of XDR data must be a
2247		 * multiple of four bytes
2248		 * - if it isn't pointer subtraction in the NFS client may give
2249		 *   undefined results
2250		 */
2251		dprintk("RPC: %5u %s: XDR representation not a multiple of"
2252		       " 4 bytes: 0x%x\n", task->tk_pid, __func__,
2253		       task->tk_rqstp->rq_rcv_buf.len);
2254		error = -EIO;
2255		goto out_err;
2256	}
2257	if ((len -= 3) < 0)
2258		goto out_overflow;
2259
2260	p += 1; /* skip XID */
2261	if ((n = ntohl(*p++)) != RPC_REPLY) {
2262		dprintk("RPC: %5u %s: not an RPC reply: %x\n",
2263			task->tk_pid, __func__, n);
2264		error = -EIO;
2265		goto out_garbage;
2266	}
2267
2268	if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
2269		if (--len < 0)
2270			goto out_overflow;
2271		switch ((n = ntohl(*p++))) {
2272		case RPC_AUTH_ERROR:
2273			break;
2274		case RPC_MISMATCH:
2275			dprintk("RPC: %5u %s: RPC call version mismatch!\n",
2276				task->tk_pid, __func__);
2277			error = -EPROTONOSUPPORT;
2278			goto out_err;
2279		default:
2280			dprintk("RPC: %5u %s: RPC call rejected, "
2281				"unknown error: %x\n",
2282				task->tk_pid, __func__, n);
2283			error = -EIO;
2284			goto out_err;
2285		}
2286		if (--len < 0)
2287			goto out_overflow;
2288		switch ((n = ntohl(*p++))) {
2289		case RPC_AUTH_REJECTEDCRED:
2290		case RPC_AUTH_REJECTEDVERF:
2291		case RPCSEC_GSS_CREDPROBLEM:
2292		case RPCSEC_GSS_CTXPROBLEM:
2293			if (!task->tk_cred_retry)
2294				break;
2295			task->tk_cred_retry--;
2296			dprintk("RPC: %5u %s: retry stale creds\n",
2297					task->tk_pid, __func__);
2298			rpcauth_invalcred(task);
2299			/* Ensure we obtain a new XID! */
2300			xprt_release(task);
2301			task->tk_action = call_reserve;
2302			goto out_retry;
2303		case RPC_AUTH_BADCRED:
2304		case RPC_AUTH_BADVERF:
2305			/* possibly garbled cred/verf? */
2306			if (!task->tk_garb_retry)
2307				break;
2308			task->tk_garb_retry--;
2309			dprintk("RPC: %5u %s: retry garbled creds\n",
2310					task->tk_pid, __func__);
2311			task->tk_action = call_bind;
2312			goto out_retry;
2313		case RPC_AUTH_TOOWEAK:
2314			rcu_read_lock();
2315			printk(KERN_NOTICE "RPC: server %s requires stronger "
2316			       "authentication.\n",
2317			       rcu_dereference(clnt->cl_xprt)->servername);
2318			rcu_read_unlock();
2319			break;
2320		default:
2321			dprintk("RPC: %5u %s: unknown auth error: %x\n",
2322					task->tk_pid, __func__, n);
2323			error = -EIO;
2324		}
2325		dprintk("RPC: %5u %s: call rejected %d\n",
2326				task->tk_pid, __func__, n);
2327		goto out_err;
2328	}
2329	p = rpcauth_checkverf(task, p);
2330	if (IS_ERR(p)) {
2331		error = PTR_ERR(p);
2332		dprintk("RPC: %5u %s: auth check failed with %d\n",
2333				task->tk_pid, __func__, error);
2334		goto out_garbage;		/* bad verifier, retry */
2335	}
2336	len = p - (__be32 *)iov->iov_base - 1;
2337	if (len < 0)
2338		goto out_overflow;
2339	switch ((n = ntohl(*p++))) {
2340	case RPC_SUCCESS:
2341		return p;
2342	case RPC_PROG_UNAVAIL:
2343		dprintk_rcu("RPC: %5u %s: program %u is unsupported "
2344				"by server %s\n", task->tk_pid, __func__,
2345				(unsigned int)clnt->cl_prog,
2346				rcu_dereference(clnt->cl_xprt)->servername);
2347		error = -EPFNOSUPPORT;
2348		goto out_err;
2349	case RPC_PROG_MISMATCH:
2350		dprintk_rcu("RPC: %5u %s: program %u, version %u unsupported "
2351				"by server %s\n", task->tk_pid, __func__,
2352				(unsigned int)clnt->cl_prog,
2353				(unsigned int)clnt->cl_vers,
2354				rcu_dereference(clnt->cl_xprt)->servername);
2355		error = -EPROTONOSUPPORT;
2356		goto out_err;
2357	case RPC_PROC_UNAVAIL:
2358		dprintk_rcu("RPC: %5u %s: proc %s unsupported by program %u, "
2359				"version %u on server %s\n",
2360				task->tk_pid, __func__,
2361				rpc_proc_name(task),
2362				clnt->cl_prog, clnt->cl_vers,
2363				rcu_dereference(clnt->cl_xprt)->servername);
2364		error = -EOPNOTSUPP;
2365		goto out_err;
2366	case RPC_GARBAGE_ARGS:
2367		dprintk("RPC: %5u %s: server saw garbage\n",
2368				task->tk_pid, __func__);
2369		break;			/* retry */
2370	default:
2371		dprintk("RPC: %5u %s: server accept status: %x\n",
2372				task->tk_pid, __func__, n);
2373		/* Also retry */
2374	}
2375
2376out_garbage:
2377	clnt->cl_stats->rpcgarbage++;
2378	if (task->tk_garb_retry) {
2379		task->tk_garb_retry--;
2380		dprintk("RPC: %5u %s: retrying\n",
2381				task->tk_pid, __func__);
2382		task->tk_action = call_bind;
2383out_retry:
2384		return ERR_PTR(-EAGAIN);
2385	}
2386out_err:
2387	rpc_exit(task, error);
2388	dprintk("RPC: %5u %s: call failed with error %d\n", task->tk_pid,
2389			__func__, error);
2390	return ERR_PTR(error);
2391out_overflow:
2392	dprintk("RPC: %5u %s: server reply was truncated.\n", task->tk_pid,
2393			__func__);
2394	goto out_garbage;
2395}
2396
2397static void rpcproc_encode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
2398{
2399}
2400
2401static int rpcproc_decode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
2402{
2403	return 0;
2404}
2405
2406static struct rpc_procinfo rpcproc_null = {
2407	.p_encode = rpcproc_encode_null,
2408	.p_decode = rpcproc_decode_null,
2409};
2410
2411static int rpc_ping(struct rpc_clnt *clnt)
2412{
2413	struct rpc_message msg = {
2414		.rpc_proc = &rpcproc_null,
2415	};
2416	int err;
2417	msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
2418	err = rpc_call_sync(clnt, &msg, RPC_TASK_SOFT | RPC_TASK_SOFTCONN);
2419	put_rpccred(msg.rpc_cred);
2420	return err;
2421}
2422
2423struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
2424{
2425	struct rpc_message msg = {
2426		.rpc_proc = &rpcproc_null,
2427		.rpc_cred = cred,
2428	};
2429	struct rpc_task_setup task_setup_data = {
2430		.rpc_client = clnt,
2431		.rpc_message = &msg,
2432		.callback_ops = &rpc_default_ops,
2433		.flags = flags,
2434	};
2435	return rpc_run_task(&task_setup_data);
2436}
2437EXPORT_SYMBOL_GPL(rpc_call_null);
2438
2439#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
2440static void rpc_show_header(void)
2441{
2442	printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
2443		"-timeout ---ops--\n");
2444}
2445
2446static void rpc_show_task(const struct rpc_clnt *clnt,
2447			  const struct rpc_task *task)
2448{
2449	const char *rpc_waitq = "none";
2450
2451	if (RPC_IS_QUEUED(task))
2452		rpc_waitq = rpc_qname(task->tk_waitqueue);
2453
2454	printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
2455		task->tk_pid, task->tk_flags, task->tk_status,
2456		clnt, task->tk_rqstp, task->tk_timeout, task->tk_ops,
2457		clnt->cl_program->name, clnt->cl_vers, rpc_proc_name(task),
2458		task->tk_action, rpc_waitq);
2459}
2460
2461void rpc_show_tasks(struct net *net)
2462{
2463	struct rpc_clnt *clnt;
2464	struct rpc_task *task;
2465	int header = 0;
2466	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
2467
2468	spin_lock(&sn->rpc_client_lock);
2469	list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
2470		spin_lock(&clnt->cl_lock);
2471		list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
2472			if (!header) {
2473				rpc_show_header();
2474				header++;
2475			}
2476			rpc_show_task(clnt, task);
2477		}
2478		spin_unlock(&clnt->cl_lock);
2479	}
2480	spin_unlock(&sn->rpc_client_lock);
2481}
2482#endif
2483
2484#if IS_ENABLED(CONFIG_SUNRPC_SWAP)
2485int
2486rpc_clnt_swap_activate(struct rpc_clnt *clnt)
2487{
2488	int ret = 0;
2489	struct rpc_xprt	*xprt;
2490
2491	if (atomic_inc_return(&clnt->cl_swapper) == 1) {
2492retry:
2493		rcu_read_lock();
2494		xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
2495		rcu_read_unlock();
2496		if (!xprt) {
2497			/*
2498			 * If we didn't get a reference, then we likely are
2499			 * racing with a migration event. Wait for a grace
2500			 * period and try again.
2501			 */
2502			synchronize_rcu();
2503			goto retry;
2504		}
2505
2506		ret = xprt_enable_swap(xprt);
2507		xprt_put(xprt);
2508	}
2509	return ret;
2510}
2511EXPORT_SYMBOL_GPL(rpc_clnt_swap_activate);
2512
2513void
2514rpc_clnt_swap_deactivate(struct rpc_clnt *clnt)
2515{
2516	struct rpc_xprt	*xprt;
2517
2518	if (atomic_dec_if_positive(&clnt->cl_swapper) == 0) {
2519retry:
2520		rcu_read_lock();
2521		xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
2522		rcu_read_unlock();
2523		if (!xprt) {
2524			/*
2525			 * If we didn't get a reference, then we likely are
2526			 * racing with a migration event. Wait for a grace
2527			 * period and try again.
2528			 */
2529			synchronize_rcu();
2530			goto retry;
2531		}
2532
2533		xprt_disable_swap(xprt);
2534		xprt_put(xprt);
2535	}
2536}
2537EXPORT_SYMBOL_GPL(rpc_clnt_swap_deactivate);
2538#endif /* CONFIG_SUNRPC_SWAP */
2539