1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/selftest/framework.c
37  *
38  * Author: Isaac Huang <isaac@clusterfs.com>
39  * Author: Liang Zhen  <liangzhen@clusterfs.com>
40  */
41 
42 #define DEBUG_SUBSYSTEM S_LNET
43 
44 #include "selftest.h"
45 
46 lst_sid_t LST_INVALID_SID = {LNET_NID_ANY, -1};
47 
48 static int session_timeout = 100;
49 module_param(session_timeout, int, 0444);
50 MODULE_PARM_DESC(session_timeout, "test session timeout in seconds (100 by default, 0 == never)");
51 
52 static int rpc_timeout = 64;
53 module_param(rpc_timeout, int, 0644);
54 MODULE_PARM_DESC(rpc_timeout, "rpc timeout in seconds (64 by default, 0 == never)");
55 
56 #define sfw_unpack_id(id)          \
57 do {                               \
58 	__swab64s(&(id).nid);	   \
59 	__swab32s(&(id).pid);	   \
60 } while (0)
61 
62 #define sfw_unpack_sid(sid)             \
63 do {                                    \
64 	__swab64s(&(sid).ses_nid);      \
65 	__swab64s(&(sid).ses_stamp);    \
66 } while (0)
67 
68 #define sfw_unpack_fw_counters(fc)        \
69 do {                                      \
70 	__swab32s(&(fc).running_ms);      \
71 	__swab32s(&(fc).active_batches);  \
72 	__swab32s(&(fc).zombie_sessions); \
73 	__swab32s(&(fc).brw_errors);      \
74 	__swab32s(&(fc).ping_errors);     \
75 } while (0)
76 
77 #define sfw_unpack_rpc_counters(rc)     \
78 do {                                    \
79 	__swab32s(&(rc).errors);	\
80 	__swab32s(&(rc).rpcs_sent);     \
81 	__swab32s(&(rc).rpcs_rcvd);     \
82 	__swab32s(&(rc).rpcs_dropped);  \
83 	__swab32s(&(rc).rpcs_expired);  \
84 	__swab64s(&(rc).bulk_get);      \
85 	__swab64s(&(rc).bulk_put);      \
86 } while (0)
87 
88 #define sfw_unpack_lnet_counters(lc)    \
89 do {                                    \
90 	__swab32s(&(lc).errors);	\
91 	__swab32s(&(lc).msgs_max);      \
92 	__swab32s(&(lc).msgs_alloc);    \
93 	__swab32s(&(lc).send_count);    \
94 	__swab32s(&(lc).recv_count);    \
95 	__swab32s(&(lc).drop_count);    \
96 	__swab32s(&(lc).route_count);   \
97 	__swab64s(&(lc).send_length);   \
98 	__swab64s(&(lc).recv_length);   \
99 	__swab64s(&(lc).drop_length);   \
100 	__swab64s(&(lc).route_length);  \
101 } while (0)
102 
103 #define sfw_test_active(t)      (atomic_read(&(t)->tsi_nactive) != 0)
104 #define sfw_batch_active(b)     (atomic_read(&(b)->bat_nactive) != 0)
105 
106 static struct smoketest_framework {
107 	struct list_head  fw_zombie_rpcs;     /* RPCs to be recycled */
108 	struct list_head  fw_zombie_sessions; /* stopping sessions */
109 	struct list_head  fw_tests;           /* registered test cases */
110 	atomic_t          fw_nzombies;        /* # zombie sessions */
111 	spinlock_t        fw_lock;            /* serialise */
112 	sfw_session_t     *fw_session;        /* _the_ session */
113 	int               fw_shuttingdown;    /* shutdown in progress */
114 	srpc_server_rpc_t *fw_active_srpc;    /* running RPC */
115 } sfw_data;
116 
117 /* forward ref's */
118 int sfw_stop_batch(sfw_batch_t *tsb, int force);
119 void sfw_destroy_session(sfw_session_t *sn);
120 
121 static inline sfw_test_case_t *
sfw_find_test_case(int id)122 sfw_find_test_case(int id)
123 {
124 	sfw_test_case_t *tsc;
125 
126 	LASSERT(id <= SRPC_SERVICE_MAX_ID);
127 	LASSERT(id > SRPC_FRAMEWORK_SERVICE_MAX_ID);
128 
129 	list_for_each_entry(tsc, &sfw_data.fw_tests, tsc_list) {
130 		if (tsc->tsc_srv_service->sv_id == id)
131 			return tsc;
132 	}
133 
134 	return NULL;
135 }
136 
137 static int
sfw_register_test(srpc_service_t * service,sfw_test_client_ops_t * cliops)138 sfw_register_test(srpc_service_t *service, sfw_test_client_ops_t *cliops)
139 {
140 	sfw_test_case_t *tsc;
141 
142 	if (sfw_find_test_case(service->sv_id) != NULL) {
143 		CERROR("Failed to register test %s (%d)\n",
144 			service->sv_name, service->sv_id);
145 		return -EEXIST;
146 	}
147 
148 	LIBCFS_ALLOC(tsc, sizeof(sfw_test_case_t));
149 	if (tsc == NULL)
150 		return -ENOMEM;
151 
152 	tsc->tsc_cli_ops     = cliops;
153 	tsc->tsc_srv_service = service;
154 
155 	list_add_tail(&tsc->tsc_list, &sfw_data.fw_tests);
156 	return 0;
157 }
158 
159 static void
sfw_add_session_timer(void)160 sfw_add_session_timer(void)
161 {
162 	sfw_session_t *sn = sfw_data.fw_session;
163 	stt_timer_t *timer = &sn->sn_timer;
164 
165 	LASSERT(!sfw_data.fw_shuttingdown);
166 
167 	if (sn == NULL || sn->sn_timeout == 0)
168 		return;
169 
170 	LASSERT(!sn->sn_timer_active);
171 
172 	sn->sn_timer_active = 1;
173 	timer->stt_expires = ktime_get_real_seconds() + sn->sn_timeout;
174 	stt_add_timer(timer);
175 	return;
176 }
177 
178 static int
sfw_del_session_timer(void)179 sfw_del_session_timer(void)
180 {
181 	sfw_session_t *sn = sfw_data.fw_session;
182 
183 	if (sn == NULL || !sn->sn_timer_active)
184 		return 0;
185 
186 	LASSERT(sn->sn_timeout != 0);
187 
188 	if (stt_del_timer(&sn->sn_timer)) { /* timer defused */
189 		sn->sn_timer_active = 0;
190 		return 0;
191 	}
192 
193 	return EBUSY; /* racing with sfw_session_expired() */
194 }
195 
196 static void
sfw_deactivate_session(void)197 sfw_deactivate_session(void)
198 	__must_hold(&sfw_data.fw_lock)
199 {
200 	sfw_session_t *sn = sfw_data.fw_session;
201 	int nactive = 0;
202 	sfw_batch_t *tsb;
203 	sfw_test_case_t *tsc;
204 
205 	if (sn == NULL)
206 		return;
207 
208 	LASSERT(!sn->sn_timer_active);
209 
210 	sfw_data.fw_session = NULL;
211 	atomic_inc(&sfw_data.fw_nzombies);
212 	list_add(&sn->sn_list, &sfw_data.fw_zombie_sessions);
213 
214 	spin_unlock(&sfw_data.fw_lock);
215 
216 	list_for_each_entry(tsc, &sfw_data.fw_tests, tsc_list) {
217 		srpc_abort_service(tsc->tsc_srv_service);
218 	}
219 
220 	spin_lock(&sfw_data.fw_lock);
221 
222 	list_for_each_entry(tsb, &sn->sn_batches, bat_list) {
223 		if (sfw_batch_active(tsb)) {
224 			nactive++;
225 			sfw_stop_batch(tsb, 1);
226 		}
227 	}
228 
229 	if (nactive != 0)
230 		return;   /* wait for active batches to stop */
231 
232 	list_del_init(&sn->sn_list);
233 	spin_unlock(&sfw_data.fw_lock);
234 
235 	sfw_destroy_session(sn);
236 
237 	spin_lock(&sfw_data.fw_lock);
238 }
239 
240 static void
sfw_session_expired(void * data)241 sfw_session_expired(void *data)
242 {
243 	sfw_session_t *sn = data;
244 
245 	spin_lock(&sfw_data.fw_lock);
246 
247 	LASSERT(sn->sn_timer_active);
248 	LASSERT(sn == sfw_data.fw_session);
249 
250 	CWARN("Session expired! sid: %s-%llu, name: %s\n",
251 	       libcfs_nid2str(sn->sn_id.ses_nid),
252 	       sn->sn_id.ses_stamp, &sn->sn_name[0]);
253 
254 	sn->sn_timer_active = 0;
255 	sfw_deactivate_session();
256 
257 	spin_unlock(&sfw_data.fw_lock);
258 }
259 
260 static inline void
sfw_init_session(sfw_session_t * sn,lst_sid_t sid,unsigned features,const char * name)261 sfw_init_session(sfw_session_t *sn, lst_sid_t sid,
262 		 unsigned features, const char *name)
263 {
264 	stt_timer_t *timer = &sn->sn_timer;
265 
266 	memset(sn, 0, sizeof(sfw_session_t));
267 	INIT_LIST_HEAD(&sn->sn_list);
268 	INIT_LIST_HEAD(&sn->sn_batches);
269 	atomic_set(&sn->sn_refcount, 1);	/* +1 for caller */
270 	atomic_set(&sn->sn_brw_errors, 0);
271 	atomic_set(&sn->sn_ping_errors, 0);
272 	strlcpy(&sn->sn_name[0], name, sizeof(sn->sn_name));
273 
274 	sn->sn_timer_active = 0;
275 	sn->sn_id           = sid;
276 	sn->sn_features	    = features;
277 	sn->sn_timeout      = session_timeout;
278 	sn->sn_started      = cfs_time_current();
279 
280 	timer->stt_data = sn;
281 	timer->stt_func = sfw_session_expired;
282 	INIT_LIST_HEAD(&timer->stt_list);
283 }
284 
285 /* completion handler for incoming framework RPCs */
286 static void
sfw_server_rpc_done(struct srpc_server_rpc * rpc)287 sfw_server_rpc_done(struct srpc_server_rpc *rpc)
288 {
289 	struct srpc_service *sv	= rpc->srpc_scd->scd_svc;
290 	int status = rpc->srpc_status;
291 
292 	CDEBUG(D_NET,
293 		"Incoming framework RPC done: service %s, peer %s, status %s:%d\n",
294 		sv->sv_name, libcfs_id2str(rpc->srpc_peer),
295 		swi_state2str(rpc->srpc_wi.swi_state),
296 		status);
297 
298 	if (rpc->srpc_bulk != NULL)
299 		sfw_free_pages(rpc);
300 	return;
301 }
302 
303 static void
sfw_client_rpc_fini(srpc_client_rpc_t * rpc)304 sfw_client_rpc_fini(srpc_client_rpc_t *rpc)
305 {
306 	LASSERT(rpc->crpc_bulk.bk_niov == 0);
307 	LASSERT(list_empty(&rpc->crpc_list));
308 	LASSERT(atomic_read(&rpc->crpc_refcount) == 0);
309 
310 	CDEBUG(D_NET,
311 		"Outgoing framework RPC done: service %d, peer %s, status %s:%d:%d\n",
312 		rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
313 		swi_state2str(rpc->crpc_wi.swi_state),
314 		rpc->crpc_aborted, rpc->crpc_status);
315 
316 	spin_lock(&sfw_data.fw_lock);
317 
318 	/* my callers must finish all RPCs before shutting me down */
319 	LASSERT(!sfw_data.fw_shuttingdown);
320 	list_add(&rpc->crpc_list, &sfw_data.fw_zombie_rpcs);
321 
322 	spin_unlock(&sfw_data.fw_lock);
323 }
324 
325 static sfw_batch_t *
sfw_find_batch(lst_bid_t bid)326 sfw_find_batch(lst_bid_t bid)
327 {
328 	sfw_session_t *sn = sfw_data.fw_session;
329 	sfw_batch_t *bat;
330 
331 	LASSERT(sn != NULL);
332 
333 	list_for_each_entry(bat, &sn->sn_batches, bat_list) {
334 		if (bat->bat_id.bat_id == bid.bat_id)
335 			return bat;
336 	}
337 
338 	return NULL;
339 }
340 
341 static sfw_batch_t *
sfw_bid2batch(lst_bid_t bid)342 sfw_bid2batch(lst_bid_t bid)
343 {
344 	sfw_session_t *sn = sfw_data.fw_session;
345 	sfw_batch_t *bat;
346 
347 	LASSERT(sn != NULL);
348 
349 	bat = sfw_find_batch(bid);
350 	if (bat != NULL)
351 		return bat;
352 
353 	LIBCFS_ALLOC(bat, sizeof(sfw_batch_t));
354 	if (bat == NULL)
355 		return NULL;
356 
357 	bat->bat_error    = 0;
358 	bat->bat_session  = sn;
359 	bat->bat_id       = bid;
360 	atomic_set(&bat->bat_nactive, 0);
361 	INIT_LIST_HEAD(&bat->bat_tests);
362 
363 	list_add_tail(&bat->bat_list, &sn->sn_batches);
364 	return bat;
365 }
366 
367 static int
sfw_get_stats(srpc_stat_reqst_t * request,srpc_stat_reply_t * reply)368 sfw_get_stats(srpc_stat_reqst_t *request, srpc_stat_reply_t *reply)
369 {
370 	sfw_session_t *sn = sfw_data.fw_session;
371 	sfw_counters_t *cnt = &reply->str_fw;
372 	sfw_batch_t *bat;
373 
374 	reply->str_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
375 
376 	if (request->str_sid.ses_nid == LNET_NID_ANY) {
377 		reply->str_status = EINVAL;
378 		return 0;
379 	}
380 
381 	if (sn == NULL || !sfw_sid_equal(request->str_sid, sn->sn_id)) {
382 		reply->str_status = ESRCH;
383 		return 0;
384 	}
385 
386 	lnet_counters_get(&reply->str_lnet);
387 	srpc_get_counters(&reply->str_rpc);
388 
389 	/* send over the msecs since the session was started
390 	 - with 32 bits to send, this is ~49 days */
391 	cnt->running_ms	     = jiffies_to_msecs(jiffies - sn->sn_started);
392 	cnt->brw_errors      = atomic_read(&sn->sn_brw_errors);
393 	cnt->ping_errors     = atomic_read(&sn->sn_ping_errors);
394 	cnt->zombie_sessions = atomic_read(&sfw_data.fw_nzombies);
395 
396 	cnt->active_batches = 0;
397 	list_for_each_entry(bat, &sn->sn_batches, bat_list) {
398 		if (atomic_read(&bat->bat_nactive) > 0)
399 			cnt->active_batches++;
400 	}
401 
402 	reply->str_status = 0;
403 	return 0;
404 }
405 
406 int
sfw_make_session(srpc_mksn_reqst_t * request,srpc_mksn_reply_t * reply)407 sfw_make_session(srpc_mksn_reqst_t *request, srpc_mksn_reply_t *reply)
408 {
409 	sfw_session_t *sn = sfw_data.fw_session;
410 	srpc_msg_t *msg = container_of(request, srpc_msg_t,
411 					  msg_body.mksn_reqst);
412 	int cplen = 0;
413 
414 	if (request->mksn_sid.ses_nid == LNET_NID_ANY) {
415 		reply->mksn_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
416 		reply->mksn_status = EINVAL;
417 		return 0;
418 	}
419 
420 	if (sn != NULL) {
421 		reply->mksn_status  = 0;
422 		reply->mksn_sid     = sn->sn_id;
423 		reply->mksn_timeout = sn->sn_timeout;
424 
425 		if (sfw_sid_equal(request->mksn_sid, sn->sn_id)) {
426 			atomic_inc(&sn->sn_refcount);
427 			return 0;
428 		}
429 
430 		if (!request->mksn_force) {
431 			reply->mksn_status = EBUSY;
432 			cplen = strlcpy(&reply->mksn_name[0], &sn->sn_name[0],
433 					sizeof(reply->mksn_name));
434 			if (cplen >= sizeof(reply->mksn_name))
435 				return -E2BIG;
436 			return 0;
437 		}
438 	}
439 
440 	/* reject the request if it requires unknown features
441 	 * NB: old version will always accept all features because it's not
442 	 * aware of srpc_msg_t::msg_ses_feats, it's a defect but it's also
443 	 * harmless because it will return zero feature to console, and it's
444 	 * console's responsibility to make sure all nodes in a session have
445 	 * same feature mask. */
446 	if ((msg->msg_ses_feats & ~LST_FEATS_MASK) != 0) {
447 		reply->mksn_status = EPROTO;
448 		return 0;
449 	}
450 
451 	/* brand new or create by force */
452 	LIBCFS_ALLOC(sn, sizeof(sfw_session_t));
453 	if (sn == NULL) {
454 		CERROR("Dropping RPC (mksn) under memory pressure.\n");
455 		return -ENOMEM;
456 	}
457 
458 	sfw_init_session(sn, request->mksn_sid,
459 			 msg->msg_ses_feats, &request->mksn_name[0]);
460 
461 	spin_lock(&sfw_data.fw_lock);
462 
463 	sfw_deactivate_session();
464 	LASSERT(sfw_data.fw_session == NULL);
465 	sfw_data.fw_session = sn;
466 
467 	spin_unlock(&sfw_data.fw_lock);
468 
469 	reply->mksn_status  = 0;
470 	reply->mksn_sid     = sn->sn_id;
471 	reply->mksn_timeout = sn->sn_timeout;
472 	return 0;
473 }
474 
475 static int
sfw_remove_session(srpc_rmsn_reqst_t * request,srpc_rmsn_reply_t * reply)476 sfw_remove_session(srpc_rmsn_reqst_t *request, srpc_rmsn_reply_t *reply)
477 {
478 	sfw_session_t *sn = sfw_data.fw_session;
479 
480 	reply->rmsn_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
481 
482 	if (request->rmsn_sid.ses_nid == LNET_NID_ANY) {
483 		reply->rmsn_status = EINVAL;
484 		return 0;
485 	}
486 
487 	if (sn == NULL || !sfw_sid_equal(request->rmsn_sid, sn->sn_id)) {
488 		reply->rmsn_status = (sn == NULL) ? ESRCH : EBUSY;
489 		return 0;
490 	}
491 
492 	if (!atomic_dec_and_test(&sn->sn_refcount)) {
493 		reply->rmsn_status = 0;
494 		return 0;
495 	}
496 
497 	spin_lock(&sfw_data.fw_lock);
498 	sfw_deactivate_session();
499 	spin_unlock(&sfw_data.fw_lock);
500 
501 	reply->rmsn_status = 0;
502 	reply->rmsn_sid    = LST_INVALID_SID;
503 	LASSERT(sfw_data.fw_session == NULL);
504 	return 0;
505 }
506 
507 static int
sfw_debug_session(srpc_debug_reqst_t * request,srpc_debug_reply_t * reply)508 sfw_debug_session(srpc_debug_reqst_t *request, srpc_debug_reply_t *reply)
509 {
510 	sfw_session_t *sn = sfw_data.fw_session;
511 
512 	if (sn == NULL) {
513 		reply->dbg_status = ESRCH;
514 		reply->dbg_sid    = LST_INVALID_SID;
515 		return 0;
516 	}
517 
518 	reply->dbg_status  = 0;
519 	reply->dbg_sid     = sn->sn_id;
520 	reply->dbg_timeout = sn->sn_timeout;
521 	if (strlcpy(reply->dbg_name, &sn->sn_name[0], sizeof(reply->dbg_name))
522 	    >= sizeof(reply->dbg_name))
523 		return -E2BIG;
524 
525 	return 0;
526 }
527 
528 static void
sfw_test_rpc_fini(srpc_client_rpc_t * rpc)529 sfw_test_rpc_fini(srpc_client_rpc_t *rpc)
530 {
531 	sfw_test_unit_t *tsu = rpc->crpc_priv;
532 	sfw_test_instance_t *tsi = tsu->tsu_instance;
533 
534 	/* Called with hold of tsi->tsi_lock */
535 	LASSERT(list_empty(&rpc->crpc_list));
536 	list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs);
537 }
538 
539 static inline int
sfw_test_buffers(sfw_test_instance_t * tsi)540 sfw_test_buffers(sfw_test_instance_t *tsi)
541 {
542 	struct sfw_test_case *tsc = sfw_find_test_case(tsi->tsi_service);
543 	struct srpc_service *svc = tsc->tsc_srv_service;
544 	int nbuf;
545 
546 	nbuf = min(svc->sv_wi_total, tsi->tsi_loop) / svc->sv_ncpts;
547 	return max(SFW_TEST_WI_MIN, nbuf + SFW_TEST_WI_EXTRA);
548 }
549 
550 static int
sfw_load_test(struct sfw_test_instance * tsi)551 sfw_load_test(struct sfw_test_instance *tsi)
552 {
553 	struct sfw_test_case *tsc;
554 	struct srpc_service *svc;
555 	int nbuf;
556 	int rc;
557 
558 	LASSERT(tsi != NULL);
559 	tsc = sfw_find_test_case(tsi->tsi_service);
560 	nbuf = sfw_test_buffers(tsi);
561 	LASSERT(tsc != NULL);
562 	svc = tsc->tsc_srv_service;
563 
564 	if (tsi->tsi_is_client) {
565 		tsi->tsi_ops = tsc->tsc_cli_ops;
566 		return 0;
567 	}
568 
569 	rc = srpc_service_add_buffers(svc, nbuf);
570 	if (rc != 0) {
571 		CWARN("Failed to reserve enough buffers: service %s, %d needed: %d\n",
572 		      svc->sv_name, nbuf, rc);
573 		/* NB: this error handler is not strictly correct, because
574 		 * it may release more buffers than already allocated,
575 		 * but it doesn't matter because request portal should
576 		 * be lazy portal and will grow buffers if necessary. */
577 		srpc_service_remove_buffers(svc, nbuf);
578 		return -ENOMEM;
579 	}
580 
581 	CDEBUG(D_NET, "Reserved %d buffers for test %s\n",
582 	       nbuf * (srpc_serv_is_framework(svc) ?
583 		       1 : cfs_cpt_number(cfs_cpt_table)), svc->sv_name);
584 	return 0;
585 }
586 
587 static void
sfw_unload_test(struct sfw_test_instance * tsi)588 sfw_unload_test(struct sfw_test_instance *tsi)
589 {
590 	struct sfw_test_case *tsc = sfw_find_test_case(tsi->tsi_service);
591 
592 	LASSERT(tsc != NULL);
593 
594 	if (tsi->tsi_is_client)
595 		return;
596 
597 	/* shrink buffers, because request portal is lazy portal
598 	 * which can grow buffers at runtime so we may leave
599 	 * some buffers behind, but never mind... */
600 	srpc_service_remove_buffers(tsc->tsc_srv_service,
601 				    sfw_test_buffers(tsi));
602 	return;
603 }
604 
605 static void
sfw_destroy_test_instance(sfw_test_instance_t * tsi)606 sfw_destroy_test_instance(sfw_test_instance_t *tsi)
607 {
608 	srpc_client_rpc_t *rpc;
609 	sfw_test_unit_t *tsu;
610 
611 	if (!tsi->tsi_is_client)
612 		goto clean;
613 
614 	tsi->tsi_ops->tso_fini(tsi);
615 
616 	LASSERT(!tsi->tsi_stopping);
617 	LASSERT(list_empty(&tsi->tsi_active_rpcs));
618 	LASSERT(!sfw_test_active(tsi));
619 
620 	while (!list_empty(&tsi->tsi_units)) {
621 		tsu = list_entry(tsi->tsi_units.next,
622 				     sfw_test_unit_t, tsu_list);
623 		list_del(&tsu->tsu_list);
624 		LIBCFS_FREE(tsu, sizeof(*tsu));
625 	}
626 
627 	while (!list_empty(&tsi->tsi_free_rpcs)) {
628 		rpc = list_entry(tsi->tsi_free_rpcs.next,
629 				     srpc_client_rpc_t, crpc_list);
630 		list_del(&rpc->crpc_list);
631 		LIBCFS_FREE(rpc, srpc_client_rpc_size(rpc));
632 	}
633 
634 clean:
635 	sfw_unload_test(tsi);
636 	LIBCFS_FREE(tsi, sizeof(*tsi));
637 	return;
638 }
639 
640 static void
sfw_destroy_batch(sfw_batch_t * tsb)641 sfw_destroy_batch(sfw_batch_t *tsb)
642 {
643 	sfw_test_instance_t *tsi;
644 
645 	LASSERT(!sfw_batch_active(tsb));
646 	LASSERT(list_empty(&tsb->bat_list));
647 
648 	while (!list_empty(&tsb->bat_tests)) {
649 		tsi = list_entry(tsb->bat_tests.next,
650 				     sfw_test_instance_t, tsi_list);
651 		list_del_init(&tsi->tsi_list);
652 		sfw_destroy_test_instance(tsi);
653 	}
654 
655 	LIBCFS_FREE(tsb, sizeof(sfw_batch_t));
656 	return;
657 }
658 
659 void
sfw_destroy_session(sfw_session_t * sn)660 sfw_destroy_session(sfw_session_t *sn)
661 {
662 	sfw_batch_t *batch;
663 
664 	LASSERT(list_empty(&sn->sn_list));
665 	LASSERT(sn != sfw_data.fw_session);
666 
667 	while (!list_empty(&sn->sn_batches)) {
668 		batch = list_entry(sn->sn_batches.next,
669 				       sfw_batch_t, bat_list);
670 		list_del_init(&batch->bat_list);
671 		sfw_destroy_batch(batch);
672 	}
673 
674 	LIBCFS_FREE(sn, sizeof(*sn));
675 	atomic_dec(&sfw_data.fw_nzombies);
676 	return;
677 }
678 
679 static void
sfw_unpack_addtest_req(srpc_msg_t * msg)680 sfw_unpack_addtest_req(srpc_msg_t *msg)
681 {
682 	srpc_test_reqst_t *req = &msg->msg_body.tes_reqst;
683 
684 	LASSERT(msg->msg_type == SRPC_MSG_TEST_REQST);
685 	LASSERT(req->tsr_is_client);
686 
687 	if (msg->msg_magic == SRPC_MSG_MAGIC)
688 		return; /* no flipping needed */
689 
690 	LASSERT(msg->msg_magic == __swab32(SRPC_MSG_MAGIC));
691 
692 	if (req->tsr_service == SRPC_SERVICE_BRW) {
693 		if ((msg->msg_ses_feats & LST_FEAT_BULK_LEN) == 0) {
694 			test_bulk_req_t *bulk = &req->tsr_u.bulk_v0;
695 
696 			__swab32s(&bulk->blk_opc);
697 			__swab32s(&bulk->blk_npg);
698 			__swab32s(&bulk->blk_flags);
699 
700 		} else {
701 			test_bulk_req_v1_t *bulk = &req->tsr_u.bulk_v1;
702 
703 			__swab16s(&bulk->blk_opc);
704 			__swab16s(&bulk->blk_flags);
705 			__swab32s(&bulk->blk_offset);
706 			__swab32s(&bulk->blk_len);
707 		}
708 
709 		return;
710 	}
711 
712 	if (req->tsr_service == SRPC_SERVICE_PING) {
713 		test_ping_req_t *ping = &req->tsr_u.ping;
714 
715 		__swab32s(&ping->png_size);
716 		__swab32s(&ping->png_flags);
717 		return;
718 	}
719 
720 	LBUG();
721 	return;
722 }
723 
724 static int
sfw_add_test_instance(sfw_batch_t * tsb,srpc_server_rpc_t * rpc)725 sfw_add_test_instance(sfw_batch_t *tsb, srpc_server_rpc_t *rpc)
726 {
727 	srpc_msg_t *msg = &rpc->srpc_reqstbuf->buf_msg;
728 	srpc_test_reqst_t *req = &msg->msg_body.tes_reqst;
729 	srpc_bulk_t *bk = rpc->srpc_bulk;
730 	int ndest = req->tsr_ndest;
731 	sfw_test_unit_t *tsu;
732 	sfw_test_instance_t *tsi;
733 	int i;
734 	int rc;
735 
736 	LIBCFS_ALLOC(tsi, sizeof(*tsi));
737 	if (tsi == NULL) {
738 		CERROR("Can't allocate test instance for batch: %llu\n",
739 			tsb->bat_id.bat_id);
740 		return -ENOMEM;
741 	}
742 
743 	spin_lock_init(&tsi->tsi_lock);
744 	atomic_set(&tsi->tsi_nactive, 0);
745 	INIT_LIST_HEAD(&tsi->tsi_units);
746 	INIT_LIST_HEAD(&tsi->tsi_free_rpcs);
747 	INIT_LIST_HEAD(&tsi->tsi_active_rpcs);
748 
749 	tsi->tsi_stopping      = 0;
750 	tsi->tsi_batch         = tsb;
751 	tsi->tsi_loop          = req->tsr_loop;
752 	tsi->tsi_concur        = req->tsr_concur;
753 	tsi->tsi_service       = req->tsr_service;
754 	tsi->tsi_is_client     = !!(req->tsr_is_client);
755 	tsi->tsi_stoptsu_onerr = !!(req->tsr_stop_onerr);
756 
757 	rc = sfw_load_test(tsi);
758 	if (rc != 0) {
759 		LIBCFS_FREE(tsi, sizeof(*tsi));
760 		return rc;
761 	}
762 
763 	LASSERT(!sfw_batch_active(tsb));
764 
765 	if (!tsi->tsi_is_client) {
766 		/* it's test server, just add it to tsb */
767 		list_add_tail(&tsi->tsi_list, &tsb->bat_tests);
768 		return 0;
769 	}
770 
771 	LASSERT(bk != NULL);
772 	LASSERT(bk->bk_niov * SFW_ID_PER_PAGE >= (unsigned int)ndest);
773 	LASSERT((unsigned int)bk->bk_len >=
774 		sizeof(lnet_process_id_packed_t) * ndest);
775 
776 	sfw_unpack_addtest_req(msg);
777 	memcpy(&tsi->tsi_u, &req->tsr_u, sizeof(tsi->tsi_u));
778 
779 	for (i = 0; i < ndest; i++) {
780 		lnet_process_id_packed_t *dests;
781 		lnet_process_id_packed_t id;
782 		int j;
783 
784 		dests = page_address(bk->bk_iovs[i / SFW_ID_PER_PAGE].kiov_page);
785 		LASSERT(dests != NULL);  /* my pages are within KVM always */
786 		id = dests[i % SFW_ID_PER_PAGE];
787 		if (msg->msg_magic != SRPC_MSG_MAGIC)
788 			sfw_unpack_id(id);
789 
790 		for (j = 0; j < tsi->tsi_concur; j++) {
791 			LIBCFS_ALLOC(tsu, sizeof(sfw_test_unit_t));
792 			if (tsu == NULL) {
793 				rc = -ENOMEM;
794 				CERROR("Can't allocate tsu for %d\n",
795 					tsi->tsi_service);
796 				goto error;
797 			}
798 
799 			tsu->tsu_dest.nid = id.nid;
800 			tsu->tsu_dest.pid = id.pid;
801 			tsu->tsu_instance = tsi;
802 			tsu->tsu_private  = NULL;
803 			list_add_tail(&tsu->tsu_list, &tsi->tsi_units);
804 		}
805 	}
806 
807 	rc = tsi->tsi_ops->tso_init(tsi);
808 	if (rc == 0) {
809 		list_add_tail(&tsi->tsi_list, &tsb->bat_tests);
810 		return 0;
811 	}
812 
813 error:
814 	LASSERT(rc != 0);
815 	sfw_destroy_test_instance(tsi);
816 	return rc;
817 }
818 
819 static void
sfw_test_unit_done(sfw_test_unit_t * tsu)820 sfw_test_unit_done(sfw_test_unit_t *tsu)
821 {
822 	sfw_test_instance_t *tsi = tsu->tsu_instance;
823 	sfw_batch_t *tsb = tsi->tsi_batch;
824 	sfw_session_t *sn = tsb->bat_session;
825 
826 	LASSERT(sfw_test_active(tsi));
827 
828 	if (!atomic_dec_and_test(&tsi->tsi_nactive))
829 		return;
830 
831 	/* the test instance is done */
832 	spin_lock(&tsi->tsi_lock);
833 
834 	tsi->tsi_stopping = 0;
835 
836 	spin_unlock(&tsi->tsi_lock);
837 
838 	spin_lock(&sfw_data.fw_lock);
839 
840 	if (!atomic_dec_and_test(&tsb->bat_nactive) ||/* tsb still active */
841 	    sn == sfw_data.fw_session) {		  /* sn also active */
842 		spin_unlock(&sfw_data.fw_lock);
843 		return;
844 	}
845 
846 	LASSERT(!list_empty(&sn->sn_list)); /* I'm a zombie! */
847 
848 	list_for_each_entry(tsb, &sn->sn_batches, bat_list) {
849 		if (sfw_batch_active(tsb)) {
850 			spin_unlock(&sfw_data.fw_lock);
851 			return;
852 		}
853 	}
854 
855 	list_del_init(&sn->sn_list);
856 	spin_unlock(&sfw_data.fw_lock);
857 
858 	sfw_destroy_session(sn);
859 	return;
860 }
861 
862 static void
sfw_test_rpc_done(srpc_client_rpc_t * rpc)863 sfw_test_rpc_done(srpc_client_rpc_t *rpc)
864 {
865 	sfw_test_unit_t *tsu = rpc->crpc_priv;
866 	sfw_test_instance_t *tsi = tsu->tsu_instance;
867 	int done = 0;
868 
869 	tsi->tsi_ops->tso_done_rpc(tsu, rpc);
870 
871 	spin_lock(&tsi->tsi_lock);
872 
873 	LASSERT(sfw_test_active(tsi));
874 	LASSERT(!list_empty(&rpc->crpc_list));
875 
876 	list_del_init(&rpc->crpc_list);
877 
878 	/* batch is stopping or loop is done or get error */
879 	if (tsi->tsi_stopping ||
880 	    tsu->tsu_loop == 0 ||
881 	    (rpc->crpc_status != 0 && tsi->tsi_stoptsu_onerr))
882 		done = 1;
883 
884 	/* dec ref for poster */
885 	srpc_client_rpc_decref(rpc);
886 
887 	spin_unlock(&tsi->tsi_lock);
888 
889 	if (!done) {
890 		swi_schedule_workitem(&tsu->tsu_worker);
891 		return;
892 	}
893 
894 	sfw_test_unit_done(tsu);
895 	return;
896 }
897 
898 int
sfw_create_test_rpc(sfw_test_unit_t * tsu,lnet_process_id_t peer,unsigned features,int nblk,int blklen,srpc_client_rpc_t ** rpcpp)899 sfw_create_test_rpc(sfw_test_unit_t *tsu, lnet_process_id_t peer,
900 		    unsigned features, int nblk, int blklen,
901 		    srpc_client_rpc_t **rpcpp)
902 {
903 	srpc_client_rpc_t *rpc = NULL;
904 	sfw_test_instance_t *tsi = tsu->tsu_instance;
905 
906 	spin_lock(&tsi->tsi_lock);
907 
908 	LASSERT(sfw_test_active(tsi));
909 
910 	if (!list_empty(&tsi->tsi_free_rpcs)) {
911 		/* pick request from buffer */
912 		rpc = list_entry(tsi->tsi_free_rpcs.next,
913 				     srpc_client_rpc_t, crpc_list);
914 		LASSERT(nblk == rpc->crpc_bulk.bk_niov);
915 		list_del_init(&rpc->crpc_list);
916 	}
917 
918 	spin_unlock(&tsi->tsi_lock);
919 
920 	if (rpc == NULL) {
921 		rpc = srpc_create_client_rpc(peer, tsi->tsi_service, nblk,
922 					     blklen, sfw_test_rpc_done,
923 					     sfw_test_rpc_fini, tsu);
924 	} else {
925 		srpc_init_client_rpc(rpc, peer, tsi->tsi_service, nblk,
926 				     blklen, sfw_test_rpc_done,
927 				     sfw_test_rpc_fini, tsu);
928 	}
929 
930 	if (rpc == NULL) {
931 		CERROR("Can't create rpc for test %d\n", tsi->tsi_service);
932 		return -ENOMEM;
933 	}
934 
935 	rpc->crpc_reqstmsg.msg_ses_feats = features;
936 	*rpcpp = rpc;
937 
938 	return 0;
939 }
940 
941 static int
sfw_run_test(swi_workitem_t * wi)942 sfw_run_test(swi_workitem_t *wi)
943 {
944 	sfw_test_unit_t *tsu = wi->swi_workitem.wi_data;
945 	sfw_test_instance_t *tsi = tsu->tsu_instance;
946 	srpc_client_rpc_t *rpc = NULL;
947 
948 	LASSERT(wi == &tsu->tsu_worker);
949 
950 	if (tsi->tsi_ops->tso_prep_rpc(tsu, tsu->tsu_dest, &rpc) != 0) {
951 		LASSERT(rpc == NULL);
952 		goto test_done;
953 	}
954 
955 	LASSERT(rpc != NULL);
956 
957 	spin_lock(&tsi->tsi_lock);
958 
959 	if (tsi->tsi_stopping) {
960 		list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs);
961 		spin_unlock(&tsi->tsi_lock);
962 		goto test_done;
963 	}
964 
965 	if (tsu->tsu_loop > 0)
966 		tsu->tsu_loop--;
967 
968 	list_add_tail(&rpc->crpc_list, &tsi->tsi_active_rpcs);
969 	spin_unlock(&tsi->tsi_lock);
970 
971 	rpc->crpc_timeout = rpc_timeout;
972 
973 	spin_lock(&rpc->crpc_lock);
974 	srpc_post_rpc(rpc);
975 	spin_unlock(&rpc->crpc_lock);
976 	return 0;
977 
978 test_done:
979 	/*
980 	 * No one can schedule me now since:
981 	 * - previous RPC, if any, has done and
982 	 * - no new RPC is initiated.
983 	 * - my batch is still active; no one can run it again now.
984 	 * Cancel pending schedules and prevent future schedule attempts:
985 	 */
986 	swi_exit_workitem(wi);
987 	sfw_test_unit_done(tsu);
988 	return 1;
989 }
990 
991 static int
sfw_run_batch(sfw_batch_t * tsb)992 sfw_run_batch(sfw_batch_t *tsb)
993 {
994 	swi_workitem_t *wi;
995 	sfw_test_unit_t *tsu;
996 	sfw_test_instance_t *tsi;
997 
998 	if (sfw_batch_active(tsb)) {
999 		CDEBUG(D_NET, "Batch already active: %llu (%d)\n",
1000 		       tsb->bat_id.bat_id, atomic_read(&tsb->bat_nactive));
1001 		return 0;
1002 	}
1003 
1004 	list_for_each_entry(tsi, &tsb->bat_tests, tsi_list) {
1005 		if (!tsi->tsi_is_client) /* skip server instances */
1006 			continue;
1007 
1008 		LASSERT(!tsi->tsi_stopping);
1009 		LASSERT(!sfw_test_active(tsi));
1010 
1011 		atomic_inc(&tsb->bat_nactive);
1012 
1013 		list_for_each_entry(tsu, &tsi->tsi_units, tsu_list) {
1014 			atomic_inc(&tsi->tsi_nactive);
1015 			tsu->tsu_loop = tsi->tsi_loop;
1016 			wi = &tsu->tsu_worker;
1017 			swi_init_workitem(wi, tsu, sfw_run_test,
1018 					  lst_sched_test[\
1019 					  lnet_cpt_of_nid(tsu->tsu_dest.nid)]);
1020 			swi_schedule_workitem(wi);
1021 		}
1022 	}
1023 
1024 	return 0;
1025 }
1026 
1027 int
sfw_stop_batch(sfw_batch_t * tsb,int force)1028 sfw_stop_batch(sfw_batch_t *tsb, int force)
1029 {
1030 	sfw_test_instance_t *tsi;
1031 	srpc_client_rpc_t *rpc;
1032 
1033 	if (!sfw_batch_active(tsb)) {
1034 		CDEBUG(D_NET, "Batch %llu inactive\n", tsb->bat_id.bat_id);
1035 		return 0;
1036 	}
1037 
1038 	list_for_each_entry(tsi, &tsb->bat_tests, tsi_list) {
1039 		spin_lock(&tsi->tsi_lock);
1040 
1041 		if (!tsi->tsi_is_client ||
1042 		    !sfw_test_active(tsi) || tsi->tsi_stopping) {
1043 			spin_unlock(&tsi->tsi_lock);
1044 			continue;
1045 		}
1046 
1047 		tsi->tsi_stopping = 1;
1048 
1049 		if (!force) {
1050 			spin_unlock(&tsi->tsi_lock);
1051 			continue;
1052 		}
1053 
1054 		/* abort launched rpcs in the test */
1055 		list_for_each_entry(rpc, &tsi->tsi_active_rpcs, crpc_list) {
1056 			spin_lock(&rpc->crpc_lock);
1057 
1058 			srpc_abort_rpc(rpc, -EINTR);
1059 
1060 			spin_unlock(&rpc->crpc_lock);
1061 		}
1062 
1063 		spin_unlock(&tsi->tsi_lock);
1064 	}
1065 
1066 	return 0;
1067 }
1068 
1069 static int
sfw_query_batch(sfw_batch_t * tsb,int testidx,srpc_batch_reply_t * reply)1070 sfw_query_batch(sfw_batch_t *tsb, int testidx, srpc_batch_reply_t *reply)
1071 {
1072 	sfw_test_instance_t *tsi;
1073 
1074 	if (testidx < 0)
1075 		return -EINVAL;
1076 
1077 	if (testidx == 0) {
1078 		reply->bar_active = atomic_read(&tsb->bat_nactive);
1079 		return 0;
1080 	}
1081 
1082 	list_for_each_entry(tsi, &tsb->bat_tests, tsi_list) {
1083 		if (testidx-- > 1)
1084 			continue;
1085 
1086 		reply->bar_active = atomic_read(&tsi->tsi_nactive);
1087 		return 0;
1088 	}
1089 
1090 	return -ENOENT;
1091 }
1092 
1093 void
sfw_free_pages(srpc_server_rpc_t * rpc)1094 sfw_free_pages(srpc_server_rpc_t *rpc)
1095 {
1096 	srpc_free_bulk(rpc->srpc_bulk);
1097 	rpc->srpc_bulk = NULL;
1098 }
1099 
1100 int
sfw_alloc_pages(struct srpc_server_rpc * rpc,int cpt,int npages,int len,int sink)1101 sfw_alloc_pages(struct srpc_server_rpc *rpc, int cpt, int npages, int len,
1102 		int sink)
1103 {
1104 	LASSERT(rpc->srpc_bulk == NULL);
1105 	LASSERT(npages > 0 && npages <= LNET_MAX_IOV);
1106 
1107 	rpc->srpc_bulk = srpc_alloc_bulk(cpt, npages, len, sink);
1108 	if (rpc->srpc_bulk == NULL)
1109 		return -ENOMEM;
1110 
1111 	return 0;
1112 }
1113 
1114 static int
sfw_add_test(srpc_server_rpc_t * rpc)1115 sfw_add_test(srpc_server_rpc_t *rpc)
1116 {
1117 	sfw_session_t *sn = sfw_data.fw_session;
1118 	srpc_test_reply_t *reply = &rpc->srpc_replymsg.msg_body.tes_reply;
1119 	srpc_test_reqst_t *request;
1120 	int rc;
1121 	sfw_batch_t *bat;
1122 
1123 	request = &rpc->srpc_reqstbuf->buf_msg.msg_body.tes_reqst;
1124 	reply->tsr_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
1125 
1126 	if (request->tsr_loop == 0 ||
1127 	    request->tsr_concur == 0 ||
1128 	    request->tsr_sid.ses_nid == LNET_NID_ANY ||
1129 	    request->tsr_ndest > SFW_MAX_NDESTS ||
1130 	    (request->tsr_is_client && request->tsr_ndest == 0) ||
1131 	    request->tsr_concur > SFW_MAX_CONCUR ||
1132 	    request->tsr_service > SRPC_SERVICE_MAX_ID ||
1133 	    request->tsr_service <= SRPC_FRAMEWORK_SERVICE_MAX_ID) {
1134 		reply->tsr_status = EINVAL;
1135 		return 0;
1136 	}
1137 
1138 	if (sn == NULL || !sfw_sid_equal(request->tsr_sid, sn->sn_id) ||
1139 	    sfw_find_test_case(request->tsr_service) == NULL) {
1140 		reply->tsr_status = ENOENT;
1141 		return 0;
1142 	}
1143 
1144 	bat = sfw_bid2batch(request->tsr_bid);
1145 	if (bat == NULL) {
1146 		CERROR("Dropping RPC (%s) from %s under memory pressure.\n",
1147 			rpc->srpc_scd->scd_svc->sv_name,
1148 			libcfs_id2str(rpc->srpc_peer));
1149 		return -ENOMEM;
1150 	}
1151 
1152 	if (sfw_batch_active(bat)) {
1153 		reply->tsr_status = EBUSY;
1154 		return 0;
1155 	}
1156 
1157 	if (request->tsr_is_client && rpc->srpc_bulk == NULL) {
1158 		/* rpc will be resumed later in sfw_bulk_ready */
1159 		int npg = sfw_id_pages(request->tsr_ndest);
1160 		int len;
1161 
1162 		if ((sn->sn_features & LST_FEAT_BULK_LEN) == 0) {
1163 			len = npg * PAGE_CACHE_SIZE;
1164 
1165 		} else  {
1166 			len = sizeof(lnet_process_id_packed_t) *
1167 			      request->tsr_ndest;
1168 		}
1169 
1170 		return sfw_alloc_pages(rpc, CFS_CPT_ANY, npg, len, 1);
1171 	}
1172 
1173 	rc = sfw_add_test_instance(bat, rpc);
1174 	CDEBUG(rc == 0 ? D_NET : D_WARNING,
1175 		"%s test: sv %d %s, loop %d, concur %d, ndest %d\n",
1176 		rc == 0 ? "Added" : "Failed to add", request->tsr_service,
1177 		request->tsr_is_client ? "client" : "server",
1178 		request->tsr_loop, request->tsr_concur, request->tsr_ndest);
1179 
1180 	reply->tsr_status = (rc < 0) ? -rc : rc;
1181 	return 0;
1182 }
1183 
1184 static int
sfw_control_batch(srpc_batch_reqst_t * request,srpc_batch_reply_t * reply)1185 sfw_control_batch(srpc_batch_reqst_t *request, srpc_batch_reply_t *reply)
1186 {
1187 	sfw_session_t *sn = sfw_data.fw_session;
1188 	int rc = 0;
1189 	sfw_batch_t *bat;
1190 
1191 	reply->bar_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
1192 
1193 	if (sn == NULL || !sfw_sid_equal(request->bar_sid, sn->sn_id)) {
1194 		reply->bar_status = ESRCH;
1195 		return 0;
1196 	}
1197 
1198 	bat = sfw_find_batch(request->bar_bid);
1199 	if (bat == NULL) {
1200 		reply->bar_status = ENOENT;
1201 		return 0;
1202 	}
1203 
1204 	switch (request->bar_opc) {
1205 	case SRPC_BATCH_OPC_RUN:
1206 		rc = sfw_run_batch(bat);
1207 		break;
1208 
1209 	case SRPC_BATCH_OPC_STOP:
1210 		rc = sfw_stop_batch(bat, request->bar_arg);
1211 		break;
1212 
1213 	case SRPC_BATCH_OPC_QUERY:
1214 		rc = sfw_query_batch(bat, request->bar_testidx, reply);
1215 		break;
1216 
1217 	default:
1218 		return -EINVAL; /* drop it */
1219 	}
1220 
1221 	reply->bar_status = (rc < 0) ? -rc : rc;
1222 	return 0;
1223 }
1224 
1225 static int
sfw_handle_server_rpc(struct srpc_server_rpc * rpc)1226 sfw_handle_server_rpc(struct srpc_server_rpc *rpc)
1227 {
1228 	struct srpc_service *sv = rpc->srpc_scd->scd_svc;
1229 	srpc_msg_t *reply = &rpc->srpc_replymsg;
1230 	srpc_msg_t *request = &rpc->srpc_reqstbuf->buf_msg;
1231 	unsigned features = LST_FEATS_MASK;
1232 	int rc = 0;
1233 
1234 	LASSERT(sfw_data.fw_active_srpc == NULL);
1235 	LASSERT(sv->sv_id <= SRPC_FRAMEWORK_SERVICE_MAX_ID);
1236 
1237 	spin_lock(&sfw_data.fw_lock);
1238 
1239 	if (sfw_data.fw_shuttingdown) {
1240 		spin_unlock(&sfw_data.fw_lock);
1241 		return -ESHUTDOWN;
1242 	}
1243 
1244 	/* Remove timer to avoid racing with it or expiring active session */
1245 	if (sfw_del_session_timer() != 0) {
1246 		CERROR("Dropping RPC (%s) from %s: racing with expiry timer.",
1247 		       sv->sv_name, libcfs_id2str(rpc->srpc_peer));
1248 		spin_unlock(&sfw_data.fw_lock);
1249 		return -EAGAIN;
1250 	}
1251 
1252 	sfw_data.fw_active_srpc = rpc;
1253 	spin_unlock(&sfw_data.fw_lock);
1254 
1255 	sfw_unpack_message(request);
1256 	LASSERT(request->msg_type == srpc_service2request(sv->sv_id));
1257 
1258 	/* rpc module should have checked this */
1259 	LASSERT(request->msg_version == SRPC_MSG_VERSION);
1260 
1261 	if (sv->sv_id != SRPC_SERVICE_MAKE_SESSION &&
1262 	    sv->sv_id != SRPC_SERVICE_DEBUG) {
1263 		sfw_session_t *sn = sfw_data.fw_session;
1264 
1265 		if (sn != NULL &&
1266 		    sn->sn_features != request->msg_ses_feats) {
1267 			CNETERR("Features of framework RPC don't match features of current session: %x/%x\n",
1268 				request->msg_ses_feats, sn->sn_features);
1269 			reply->msg_body.reply.status = EPROTO;
1270 			reply->msg_body.reply.sid    = sn->sn_id;
1271 			goto out;
1272 		}
1273 
1274 	} else if ((request->msg_ses_feats & ~LST_FEATS_MASK) != 0) {
1275 		/* NB: at this point, old version will ignore features and
1276 		 * create new session anyway, so console should be able
1277 		 * to handle this */
1278 		reply->msg_body.reply.status = EPROTO;
1279 		goto out;
1280 	}
1281 
1282 	switch (sv->sv_id) {
1283 	default:
1284 		LBUG();
1285 	case SRPC_SERVICE_TEST:
1286 		rc = sfw_add_test(rpc);
1287 		break;
1288 
1289 	case SRPC_SERVICE_BATCH:
1290 		rc = sfw_control_batch(&request->msg_body.bat_reqst,
1291 				       &reply->msg_body.bat_reply);
1292 		break;
1293 
1294 	case SRPC_SERVICE_QUERY_STAT:
1295 		rc = sfw_get_stats(&request->msg_body.stat_reqst,
1296 				   &reply->msg_body.stat_reply);
1297 		break;
1298 
1299 	case SRPC_SERVICE_DEBUG:
1300 		rc = sfw_debug_session(&request->msg_body.dbg_reqst,
1301 				       &reply->msg_body.dbg_reply);
1302 		break;
1303 
1304 	case SRPC_SERVICE_MAKE_SESSION:
1305 		rc = sfw_make_session(&request->msg_body.mksn_reqst,
1306 				      &reply->msg_body.mksn_reply);
1307 		break;
1308 
1309 	case SRPC_SERVICE_REMOVE_SESSION:
1310 		rc = sfw_remove_session(&request->msg_body.rmsn_reqst,
1311 					&reply->msg_body.rmsn_reply);
1312 		break;
1313 	}
1314 
1315 	if (sfw_data.fw_session != NULL)
1316 		features = sfw_data.fw_session->sn_features;
1317  out:
1318 	reply->msg_ses_feats = features;
1319 	rpc->srpc_done = sfw_server_rpc_done;
1320 	spin_lock(&sfw_data.fw_lock);
1321 
1322 	if (!sfw_data.fw_shuttingdown)
1323 		sfw_add_session_timer();
1324 
1325 	sfw_data.fw_active_srpc = NULL;
1326 	spin_unlock(&sfw_data.fw_lock);
1327 	return rc;
1328 }
1329 
1330 static int
sfw_bulk_ready(struct srpc_server_rpc * rpc,int status)1331 sfw_bulk_ready(struct srpc_server_rpc *rpc, int status)
1332 {
1333 	struct srpc_service *sv = rpc->srpc_scd->scd_svc;
1334 	int rc;
1335 
1336 	LASSERT(rpc->srpc_bulk != NULL);
1337 	LASSERT(sv->sv_id == SRPC_SERVICE_TEST);
1338 	LASSERT(sfw_data.fw_active_srpc == NULL);
1339 	LASSERT(rpc->srpc_reqstbuf->buf_msg.msg_body.tes_reqst.tsr_is_client);
1340 
1341 	spin_lock(&sfw_data.fw_lock);
1342 
1343 	if (status != 0) {
1344 		CERROR("Bulk transfer failed for RPC: service %s, peer %s, status %d\n",
1345 		       sv->sv_name, libcfs_id2str(rpc->srpc_peer), status);
1346 		spin_unlock(&sfw_data.fw_lock);
1347 		return -EIO;
1348 	}
1349 
1350 	if (sfw_data.fw_shuttingdown) {
1351 		spin_unlock(&sfw_data.fw_lock);
1352 		return -ESHUTDOWN;
1353 	}
1354 
1355 	if (sfw_del_session_timer() != 0) {
1356 		CERROR("Dropping RPC (%s) from %s: racing with expiry timer",
1357 		       sv->sv_name, libcfs_id2str(rpc->srpc_peer));
1358 		spin_unlock(&sfw_data.fw_lock);
1359 		return -EAGAIN;
1360 	}
1361 
1362 	sfw_data.fw_active_srpc = rpc;
1363 	spin_unlock(&sfw_data.fw_lock);
1364 
1365 	rc = sfw_add_test(rpc);
1366 
1367 	spin_lock(&sfw_data.fw_lock);
1368 
1369 	if (!sfw_data.fw_shuttingdown)
1370 		sfw_add_session_timer();
1371 
1372 	sfw_data.fw_active_srpc = NULL;
1373 	spin_unlock(&sfw_data.fw_lock);
1374 	return rc;
1375 }
1376 
1377 srpc_client_rpc_t *
sfw_create_rpc(lnet_process_id_t peer,int service,unsigned features,int nbulkiov,int bulklen,void (* done)(srpc_client_rpc_t *),void * priv)1378 sfw_create_rpc(lnet_process_id_t peer, int service,
1379 	       unsigned features, int nbulkiov, int bulklen,
1380 	       void (*done)(srpc_client_rpc_t *), void *priv)
1381 {
1382 	srpc_client_rpc_t *rpc = NULL;
1383 
1384 	spin_lock(&sfw_data.fw_lock);
1385 
1386 	LASSERT(!sfw_data.fw_shuttingdown);
1387 	LASSERT(service <= SRPC_FRAMEWORK_SERVICE_MAX_ID);
1388 
1389 	if (nbulkiov == 0 && !list_empty(&sfw_data.fw_zombie_rpcs)) {
1390 		rpc = list_entry(sfw_data.fw_zombie_rpcs.next,
1391 				     srpc_client_rpc_t, crpc_list);
1392 		list_del(&rpc->crpc_list);
1393 
1394 		srpc_init_client_rpc(rpc, peer, service, 0, 0,
1395 				     done, sfw_client_rpc_fini, priv);
1396 	}
1397 
1398 	spin_unlock(&sfw_data.fw_lock);
1399 
1400 	if (rpc == NULL) {
1401 		rpc = srpc_create_client_rpc(peer, service,
1402 					     nbulkiov, bulklen, done,
1403 					     nbulkiov != 0 ?  NULL :
1404 					     sfw_client_rpc_fini,
1405 					     priv);
1406 	}
1407 
1408 	if (rpc != NULL) /* "session" is concept in framework */
1409 		rpc->crpc_reqstmsg.msg_ses_feats = features;
1410 
1411 	return rpc;
1412 }
1413 
1414 void
sfw_unpack_message(srpc_msg_t * msg)1415 sfw_unpack_message(srpc_msg_t *msg)
1416 {
1417 	if (msg->msg_magic == SRPC_MSG_MAGIC)
1418 		return; /* no flipping needed */
1419 
1420 	/* srpc module should guarantee I wouldn't get crap */
1421 	LASSERT(msg->msg_magic == __swab32(SRPC_MSG_MAGIC));
1422 
1423 	if (msg->msg_type == SRPC_MSG_STAT_REQST) {
1424 		srpc_stat_reqst_t *req = &msg->msg_body.stat_reqst;
1425 
1426 		__swab32s(&req->str_type);
1427 		__swab64s(&req->str_rpyid);
1428 		sfw_unpack_sid(req->str_sid);
1429 		return;
1430 	}
1431 
1432 	if (msg->msg_type == SRPC_MSG_STAT_REPLY) {
1433 		srpc_stat_reply_t *rep = &msg->msg_body.stat_reply;
1434 
1435 		__swab32s(&rep->str_status);
1436 		sfw_unpack_sid(rep->str_sid);
1437 		sfw_unpack_fw_counters(rep->str_fw);
1438 		sfw_unpack_rpc_counters(rep->str_rpc);
1439 		sfw_unpack_lnet_counters(rep->str_lnet);
1440 		return;
1441 	}
1442 
1443 	if (msg->msg_type == SRPC_MSG_MKSN_REQST) {
1444 		srpc_mksn_reqst_t *req = &msg->msg_body.mksn_reqst;
1445 
1446 		__swab64s(&req->mksn_rpyid);
1447 		__swab32s(&req->mksn_force);
1448 		sfw_unpack_sid(req->mksn_sid);
1449 		return;
1450 	}
1451 
1452 	if (msg->msg_type == SRPC_MSG_MKSN_REPLY) {
1453 		srpc_mksn_reply_t *rep = &msg->msg_body.mksn_reply;
1454 
1455 		__swab32s(&rep->mksn_status);
1456 		__swab32s(&rep->mksn_timeout);
1457 		sfw_unpack_sid(rep->mksn_sid);
1458 		return;
1459 	}
1460 
1461 	if (msg->msg_type == SRPC_MSG_RMSN_REQST) {
1462 		srpc_rmsn_reqst_t *req = &msg->msg_body.rmsn_reqst;
1463 
1464 		__swab64s(&req->rmsn_rpyid);
1465 		sfw_unpack_sid(req->rmsn_sid);
1466 		return;
1467 	}
1468 
1469 	if (msg->msg_type == SRPC_MSG_RMSN_REPLY) {
1470 		srpc_rmsn_reply_t *rep = &msg->msg_body.rmsn_reply;
1471 
1472 		__swab32s(&rep->rmsn_status);
1473 		sfw_unpack_sid(rep->rmsn_sid);
1474 		return;
1475 	}
1476 
1477 	if (msg->msg_type == SRPC_MSG_DEBUG_REQST) {
1478 		srpc_debug_reqst_t *req = &msg->msg_body.dbg_reqst;
1479 
1480 		__swab64s(&req->dbg_rpyid);
1481 		__swab32s(&req->dbg_flags);
1482 		sfw_unpack_sid(req->dbg_sid);
1483 		return;
1484 	}
1485 
1486 	if (msg->msg_type == SRPC_MSG_DEBUG_REPLY) {
1487 		srpc_debug_reply_t *rep = &msg->msg_body.dbg_reply;
1488 
1489 		__swab32s(&rep->dbg_nbatch);
1490 		__swab32s(&rep->dbg_timeout);
1491 		sfw_unpack_sid(rep->dbg_sid);
1492 		return;
1493 	}
1494 
1495 	if (msg->msg_type == SRPC_MSG_BATCH_REQST) {
1496 		srpc_batch_reqst_t *req = &msg->msg_body.bat_reqst;
1497 
1498 		__swab32s(&req->bar_opc);
1499 		__swab64s(&req->bar_rpyid);
1500 		__swab32s(&req->bar_testidx);
1501 		__swab32s(&req->bar_arg);
1502 		sfw_unpack_sid(req->bar_sid);
1503 		__swab64s(&req->bar_bid.bat_id);
1504 		return;
1505 	}
1506 
1507 	if (msg->msg_type == SRPC_MSG_BATCH_REPLY) {
1508 		srpc_batch_reply_t *rep = &msg->msg_body.bat_reply;
1509 
1510 		__swab32s(&rep->bar_status);
1511 		sfw_unpack_sid(rep->bar_sid);
1512 		return;
1513 	}
1514 
1515 	if (msg->msg_type == SRPC_MSG_TEST_REQST) {
1516 		srpc_test_reqst_t *req = &msg->msg_body.tes_reqst;
1517 
1518 		__swab64s(&req->tsr_rpyid);
1519 		__swab64s(&req->tsr_bulkid);
1520 		__swab32s(&req->tsr_loop);
1521 		__swab32s(&req->tsr_ndest);
1522 		__swab32s(&req->tsr_concur);
1523 		__swab32s(&req->tsr_service);
1524 		sfw_unpack_sid(req->tsr_sid);
1525 		__swab64s(&req->tsr_bid.bat_id);
1526 		return;
1527 	}
1528 
1529 	if (msg->msg_type == SRPC_MSG_TEST_REPLY) {
1530 		srpc_test_reply_t *rep = &msg->msg_body.tes_reply;
1531 
1532 		__swab32s(&rep->tsr_status);
1533 		sfw_unpack_sid(rep->tsr_sid);
1534 		return;
1535 	}
1536 
1537 	if (msg->msg_type == SRPC_MSG_JOIN_REQST) {
1538 		srpc_join_reqst_t *req = &msg->msg_body.join_reqst;
1539 
1540 		__swab64s(&req->join_rpyid);
1541 		sfw_unpack_sid(req->join_sid);
1542 		return;
1543 	}
1544 
1545 	if (msg->msg_type == SRPC_MSG_JOIN_REPLY) {
1546 		srpc_join_reply_t *rep = &msg->msg_body.join_reply;
1547 
1548 		__swab32s(&rep->join_status);
1549 		__swab32s(&rep->join_timeout);
1550 		sfw_unpack_sid(rep->join_sid);
1551 		return;
1552 	}
1553 
1554 	LBUG();
1555 	return;
1556 }
1557 
1558 void
sfw_abort_rpc(srpc_client_rpc_t * rpc)1559 sfw_abort_rpc(srpc_client_rpc_t *rpc)
1560 {
1561 	LASSERT(atomic_read(&rpc->crpc_refcount) > 0);
1562 	LASSERT(rpc->crpc_service <= SRPC_FRAMEWORK_SERVICE_MAX_ID);
1563 
1564 	spin_lock(&rpc->crpc_lock);
1565 	srpc_abort_rpc(rpc, -EINTR);
1566 	spin_unlock(&rpc->crpc_lock);
1567 	return;
1568 }
1569 
1570 void
sfw_post_rpc(srpc_client_rpc_t * rpc)1571 sfw_post_rpc(srpc_client_rpc_t *rpc)
1572 {
1573 	spin_lock(&rpc->crpc_lock);
1574 
1575 	LASSERT(!rpc->crpc_closed);
1576 	LASSERT(!rpc->crpc_aborted);
1577 	LASSERT(list_empty(&rpc->crpc_list));
1578 	LASSERT(!sfw_data.fw_shuttingdown);
1579 
1580 	rpc->crpc_timeout = rpc_timeout;
1581 	srpc_post_rpc(rpc);
1582 
1583 	spin_unlock(&rpc->crpc_lock);
1584 	return;
1585 }
1586 
1587 static srpc_service_t sfw_services[] = {
1588 	{
1589 		/* sv_id */    SRPC_SERVICE_DEBUG,
1590 		/* sv_name */  "debug",
1591 		0
1592 	},
1593 	{
1594 		/* sv_id */    SRPC_SERVICE_QUERY_STAT,
1595 		/* sv_name */  "query stats",
1596 		0
1597 	},
1598 	{
1599 		/* sv_id */    SRPC_SERVICE_MAKE_SESSION,
1600 		/* sv_name */  "make session",
1601 		0
1602 	},
1603 	{
1604 		/* sv_id */    SRPC_SERVICE_REMOVE_SESSION,
1605 		/* sv_name */  "remove session",
1606 		0
1607 	},
1608 	{
1609 		/* sv_id */    SRPC_SERVICE_BATCH,
1610 		/* sv_name */  "batch service",
1611 		0
1612 	},
1613 	{
1614 		/* sv_id */    SRPC_SERVICE_TEST,
1615 		/* sv_name */  "test service",
1616 		0
1617 	},
1618 	{
1619 		/* sv_id */    0,
1620 		/* sv_name */  NULL,
1621 		0
1622 	}
1623 };
1624 
1625 extern sfw_test_client_ops_t ping_test_client;
1626 extern srpc_service_t	ping_test_service;
1627 extern void ping_init_test_client(void);
1628 extern void ping_init_test_service(void);
1629 
1630 extern sfw_test_client_ops_t brw_test_client;
1631 extern srpc_service_t	brw_test_service;
1632 extern void brw_init_test_client(void);
1633 extern void brw_init_test_service(void);
1634 
1635 int
sfw_startup(void)1636 sfw_startup(void)
1637 {
1638 	int i;
1639 	int rc;
1640 	int error;
1641 	srpc_service_t *sv;
1642 	sfw_test_case_t *tsc;
1643 
1644 	if (session_timeout < 0) {
1645 		CERROR("Session timeout must be non-negative: %d\n",
1646 			session_timeout);
1647 		return -EINVAL;
1648 	}
1649 
1650 	if (rpc_timeout < 0) {
1651 		CERROR("RPC timeout must be non-negative: %d\n",
1652 			rpc_timeout);
1653 		return -EINVAL;
1654 	}
1655 
1656 	if (session_timeout == 0)
1657 		CWARN("Zero session_timeout specified - test sessions never expire.\n");
1658 
1659 	if (rpc_timeout == 0)
1660 		CWARN("Zero rpc_timeout specified - test RPC never expire.\n");
1661 
1662 	memset(&sfw_data, 0, sizeof(struct smoketest_framework));
1663 
1664 	sfw_data.fw_session     = NULL;
1665 	sfw_data.fw_active_srpc = NULL;
1666 	spin_lock_init(&sfw_data.fw_lock);
1667 	atomic_set(&sfw_data.fw_nzombies, 0);
1668 	INIT_LIST_HEAD(&sfw_data.fw_tests);
1669 	INIT_LIST_HEAD(&sfw_data.fw_zombie_rpcs);
1670 	INIT_LIST_HEAD(&sfw_data.fw_zombie_sessions);
1671 
1672 	brw_init_test_client();
1673 	brw_init_test_service();
1674 	rc = sfw_register_test(&brw_test_service, &brw_test_client);
1675 	LASSERT(rc == 0);
1676 
1677 	ping_init_test_client();
1678 	ping_init_test_service();
1679 	rc = sfw_register_test(&ping_test_service, &ping_test_client);
1680 	LASSERT(rc == 0);
1681 
1682 	error = 0;
1683 	list_for_each_entry(tsc, &sfw_data.fw_tests, tsc_list) {
1684 		sv = tsc->tsc_srv_service;
1685 
1686 		rc = srpc_add_service(sv);
1687 		LASSERT(rc != -EBUSY);
1688 		if (rc != 0) {
1689 			CWARN("Failed to add %s service: %d\n",
1690 			       sv->sv_name, rc);
1691 			error = rc;
1692 		}
1693 	}
1694 
1695 	for (i = 0; ; i++) {
1696 		sv = &sfw_services[i];
1697 		if (sv->sv_name == NULL)
1698 			break;
1699 
1700 		sv->sv_bulk_ready = NULL;
1701 		sv->sv_handler    = sfw_handle_server_rpc;
1702 		sv->sv_wi_total   = SFW_FRWK_WI_MAX;
1703 		if (sv->sv_id == SRPC_SERVICE_TEST)
1704 			sv->sv_bulk_ready = sfw_bulk_ready;
1705 
1706 		rc = srpc_add_service(sv);
1707 		LASSERT(rc != -EBUSY);
1708 		if (rc != 0) {
1709 			CWARN("Failed to add %s service: %d\n",
1710 			       sv->sv_name, rc);
1711 			error = rc;
1712 		}
1713 
1714 		/* about to sfw_shutdown, no need to add buffer */
1715 		if (error)
1716 			continue;
1717 
1718 		rc = srpc_service_add_buffers(sv, sv->sv_wi_total);
1719 		if (rc != 0) {
1720 			CWARN("Failed to reserve enough buffers: service %s, %d needed: %d\n",
1721 			      sv->sv_name, sv->sv_wi_total, rc);
1722 			error = -ENOMEM;
1723 		}
1724 	}
1725 
1726 	if (error != 0)
1727 		sfw_shutdown();
1728 	return error;
1729 }
1730 
1731 void
sfw_shutdown(void)1732 sfw_shutdown(void)
1733 {
1734 	srpc_service_t *sv;
1735 	sfw_test_case_t	*tsc;
1736 	int i;
1737 
1738 	spin_lock(&sfw_data.fw_lock);
1739 
1740 	sfw_data.fw_shuttingdown = 1;
1741 	lst_wait_until(sfw_data.fw_active_srpc == NULL, sfw_data.fw_lock,
1742 		       "waiting for active RPC to finish.\n");
1743 
1744 	if (sfw_del_session_timer() != 0)
1745 		lst_wait_until(sfw_data.fw_session == NULL, sfw_data.fw_lock,
1746 			       "waiting for session timer to explode.\n");
1747 
1748 	sfw_deactivate_session();
1749 	lst_wait_until(atomic_read(&sfw_data.fw_nzombies) == 0,
1750 		       sfw_data.fw_lock,
1751 		       "waiting for %d zombie sessions to die.\n",
1752 		       atomic_read(&sfw_data.fw_nzombies));
1753 
1754 	spin_unlock(&sfw_data.fw_lock);
1755 
1756 	for (i = 0; ; i++) {
1757 		sv = &sfw_services[i];
1758 		if (sv->sv_name == NULL)
1759 			break;
1760 
1761 		srpc_shutdown_service(sv);
1762 		srpc_remove_service(sv);
1763 	}
1764 
1765 	list_for_each_entry(tsc, &sfw_data.fw_tests, tsc_list) {
1766 		sv = tsc->tsc_srv_service;
1767 		srpc_shutdown_service(sv);
1768 		srpc_remove_service(sv);
1769 	}
1770 
1771 	while (!list_empty(&sfw_data.fw_zombie_rpcs)) {
1772 		srpc_client_rpc_t *rpc;
1773 
1774 		rpc = list_entry(sfw_data.fw_zombie_rpcs.next,
1775 				     srpc_client_rpc_t, crpc_list);
1776 		list_del(&rpc->crpc_list);
1777 
1778 		LIBCFS_FREE(rpc, srpc_client_rpc_size(rpc));
1779 	}
1780 
1781 	for (i = 0; ; i++) {
1782 		sv = &sfw_services[i];
1783 		if (sv->sv_name == NULL)
1784 			break;
1785 
1786 		srpc_wait_service_shutdown(sv);
1787 	}
1788 
1789 	while (!list_empty(&sfw_data.fw_tests)) {
1790 		tsc = list_entry(sfw_data.fw_tests.next,
1791 				     sfw_test_case_t, tsc_list);
1792 
1793 		srpc_wait_service_shutdown(tsc->tsc_srv_service);
1794 
1795 		list_del(&tsc->tsc_list);
1796 		LIBCFS_FREE(tsc, sizeof(*tsc));
1797 	}
1798 
1799 	return;
1800 }
1801