1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36 
37 #define DEBUG_SUBSYSTEM S_MDC
38 #include "../include/lustre_net.h"
39 #include "../include/lustre/lustre_idl.h"
40 #include "mdc_internal.h"
41 
__mdc_pack_body(struct mdt_body * b,__u32 suppgid)42 static void __mdc_pack_body(struct mdt_body *b, __u32 suppgid)
43 {
44 	LASSERT(b != NULL);
45 
46 	b->suppgid = suppgid;
47 	b->uid = from_kuid(&init_user_ns, current_uid());
48 	b->gid = from_kgid(&init_user_ns, current_gid());
49 	b->fsuid = from_kuid(&init_user_ns, current_fsuid());
50 	b->fsgid = from_kgid(&init_user_ns, current_fsgid());
51 	b->capability = cfs_curproc_cap_pack();
52 }
53 
mdc_is_subdir_pack(struct ptlrpc_request * req,const struct lu_fid * pfid,const struct lu_fid * cfid,int flags)54 void mdc_is_subdir_pack(struct ptlrpc_request *req, const struct lu_fid *pfid,
55 			const struct lu_fid *cfid, int flags)
56 {
57 	struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
58 						    &RMF_MDT_BODY);
59 
60 	if (pfid) {
61 		b->fid1 = *pfid;
62 		b->valid = OBD_MD_FLID;
63 	}
64 	if (cfid)
65 		b->fid2 = *cfid;
66 	b->flags = flags;
67 }
68 
mdc_swap_layouts_pack(struct ptlrpc_request * req,struct md_op_data * op_data)69 void mdc_swap_layouts_pack(struct ptlrpc_request *req,
70 			   struct md_op_data *op_data)
71 {
72 	struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
73 						    &RMF_MDT_BODY);
74 
75 	__mdc_pack_body(b, op_data->op_suppgids[0]);
76 	b->fid1 = op_data->op_fid1;
77 	b->fid2 = op_data->op_fid2;
78 	b->valid |= OBD_MD_FLID;
79 }
80 
mdc_pack_body(struct ptlrpc_request * req,const struct lu_fid * fid,__u64 valid,int ea_size,__u32 suppgid,int flags)81 void mdc_pack_body(struct ptlrpc_request *req, const struct lu_fid *fid,
82 		   __u64 valid, int ea_size, __u32 suppgid, int flags)
83 {
84 	struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
85 						    &RMF_MDT_BODY);
86 	LASSERT(b != NULL);
87 	b->valid = valid;
88 	b->eadatasize = ea_size;
89 	b->flags = flags;
90 	__mdc_pack_body(b, suppgid);
91 	if (fid) {
92 		b->fid1 = *fid;
93 		b->valid |= OBD_MD_FLID;
94 	}
95 }
96 
mdc_readdir_pack(struct ptlrpc_request * req,__u64 pgoff,__u32 size,const struct lu_fid * fid)97 void mdc_readdir_pack(struct ptlrpc_request *req, __u64 pgoff,
98 		      __u32 size, const struct lu_fid *fid)
99 {
100 	struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
101 						    &RMF_MDT_BODY);
102 	b->fid1 = *fid;
103 	b->valid |= OBD_MD_FLID;
104 	b->size = pgoff;		       /* !! */
105 	b->nlink = size;			/* !! */
106 	__mdc_pack_body(b, -1);
107 	b->mode = LUDA_FID | LUDA_TYPE;
108 }
109 
110 /* packing of MDS records */
mdc_create_pack(struct ptlrpc_request * req,struct md_op_data * op_data,const void * data,int datalen,__u32 mode,__u32 uid,__u32 gid,cfs_cap_t cap_effective,__u64 rdev)111 void mdc_create_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
112 		     const void *data, int datalen, __u32 mode,
113 		     __u32 uid, __u32 gid, cfs_cap_t cap_effective, __u64 rdev)
114 {
115 	struct mdt_rec_create	*rec;
116 	char			*tmp;
117 	__u64			 flags;
118 
119 	CLASSERT(sizeof(struct mdt_rec_reint) == sizeof(struct mdt_rec_create));
120 	rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
121 
122 	rec->cr_opcode   = REINT_CREATE;
123 	rec->cr_fsuid    = uid;
124 	rec->cr_fsgid    = gid;
125 	rec->cr_cap      = cap_effective;
126 	rec->cr_fid1     = op_data->op_fid1;
127 	rec->cr_fid2     = op_data->op_fid2;
128 	rec->cr_mode     = mode;
129 	rec->cr_rdev     = rdev;
130 	rec->cr_time     = op_data->op_mod_time;
131 	rec->cr_suppgid1 = op_data->op_suppgids[0];
132 	rec->cr_suppgid2 = op_data->op_suppgids[1];
133 	flags = op_data->op_flags & MF_SOM_LOCAL_FLAGS;
134 	if (op_data->op_bias & MDS_CREATE_VOLATILE)
135 		flags |= MDS_OPEN_VOLATILE;
136 	set_mrc_cr_flags(rec, flags);
137 	rec->cr_bias     = op_data->op_bias;
138 	rec->cr_umask    = current_umask();
139 
140 	tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
141 	LOGL0(op_data->op_name, op_data->op_namelen, tmp);
142 
143 	if (data) {
144 		tmp = req_capsule_client_get(&req->rq_pill, &RMF_EADATA);
145 		memcpy(tmp, data, datalen);
146 	}
147 }
148 
mds_pack_open_flags(__u64 flags,__u32 mode)149 static __u64 mds_pack_open_flags(__u64 flags, __u32 mode)
150 {
151 	__u64 cr_flags = (flags & (FMODE_READ | FMODE_WRITE |
152 				   MDS_OPEN_HAS_EA | MDS_OPEN_HAS_OBJS |
153 				   MDS_OPEN_OWNEROVERRIDE | MDS_OPEN_LOCK |
154 				   MDS_OPEN_BY_FID | MDS_OPEN_LEASE |
155 				   MDS_OPEN_RELEASE));
156 	if (flags & O_CREAT)
157 		cr_flags |= MDS_OPEN_CREAT;
158 	if (flags & O_EXCL)
159 		cr_flags |= MDS_OPEN_EXCL;
160 	if (flags & O_TRUNC)
161 		cr_flags |= MDS_OPEN_TRUNC;
162 	if (flags & O_APPEND)
163 		cr_flags |= MDS_OPEN_APPEND;
164 	if (flags & O_SYNC)
165 		cr_flags |= MDS_OPEN_SYNC;
166 	if (flags & O_DIRECTORY)
167 		cr_flags |= MDS_OPEN_DIRECTORY;
168 	if (flags & __FMODE_EXEC)
169 		cr_flags |= MDS_FMODE_EXEC;
170 	if (cl_is_lov_delay_create(flags))
171 		cr_flags |= MDS_OPEN_DELAY_CREATE;
172 
173 	if (flags & O_NONBLOCK)
174 		cr_flags |= MDS_OPEN_NORESTORE;
175 
176 	return cr_flags;
177 }
178 
179 /* packing of MDS records */
mdc_open_pack(struct ptlrpc_request * req,struct md_op_data * op_data,__u32 mode,__u64 rdev,__u64 flags,const void * lmm,int lmmlen)180 void mdc_open_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
181 		   __u32 mode, __u64 rdev, __u64 flags, const void *lmm,
182 		   int lmmlen)
183 {
184 	struct mdt_rec_create *rec;
185 	char *tmp;
186 	__u64 cr_flags;
187 
188 	CLASSERT(sizeof(struct mdt_rec_reint) == sizeof(struct mdt_rec_create));
189 	rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
190 
191 	/* XXX do something about time, uid, gid */
192 	rec->cr_opcode   = REINT_OPEN;
193 	rec->cr_fsuid    = from_kuid(&init_user_ns, current_fsuid());
194 	rec->cr_fsgid    = from_kgid(&init_user_ns, current_fsgid());
195 	rec->cr_cap      = cfs_curproc_cap_pack();
196 	rec->cr_fid1 = op_data->op_fid1;
197 	rec->cr_fid2 = op_data->op_fid2;
198 
199 	rec->cr_mode     = mode;
200 	cr_flags = mds_pack_open_flags(flags, mode);
201 	rec->cr_rdev     = rdev;
202 	rec->cr_time     = op_data->op_mod_time;
203 	rec->cr_suppgid1 = op_data->op_suppgids[0];
204 	rec->cr_suppgid2 = op_data->op_suppgids[1];
205 	rec->cr_bias     = op_data->op_bias;
206 	rec->cr_umask    = current_umask();
207 	rec->cr_old_handle = op_data->op_handle;
208 
209 	if (op_data->op_name) {
210 		tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
211 		LOGL0(op_data->op_name, op_data->op_namelen, tmp);
212 		if (op_data->op_bias & MDS_CREATE_VOLATILE)
213 			cr_flags |= MDS_OPEN_VOLATILE;
214 	}
215 
216 	if (lmm) {
217 		cr_flags |= MDS_OPEN_HAS_EA;
218 		tmp = req_capsule_client_get(&req->rq_pill, &RMF_EADATA);
219 		memcpy(tmp, lmm, lmmlen);
220 	}
221 	set_mrc_cr_flags(rec, cr_flags);
222 }
223 
attr_pack(unsigned int ia_valid)224 static inline __u64 attr_pack(unsigned int ia_valid)
225 {
226 	__u64 sa_valid = 0;
227 
228 	if (ia_valid & ATTR_MODE)
229 		sa_valid |= MDS_ATTR_MODE;
230 	if (ia_valid & ATTR_UID)
231 		sa_valid |= MDS_ATTR_UID;
232 	if (ia_valid & ATTR_GID)
233 		sa_valid |= MDS_ATTR_GID;
234 	if (ia_valid & ATTR_SIZE)
235 		sa_valid |= MDS_ATTR_SIZE;
236 	if (ia_valid & ATTR_ATIME)
237 		sa_valid |= MDS_ATTR_ATIME;
238 	if (ia_valid & ATTR_MTIME)
239 		sa_valid |= MDS_ATTR_MTIME;
240 	if (ia_valid & ATTR_CTIME)
241 		sa_valid |= MDS_ATTR_CTIME;
242 	if (ia_valid & ATTR_ATIME_SET)
243 		sa_valid |= MDS_ATTR_ATIME_SET;
244 	if (ia_valid & ATTR_MTIME_SET)
245 		sa_valid |= MDS_ATTR_MTIME_SET;
246 	if (ia_valid & ATTR_FORCE)
247 		sa_valid |= MDS_ATTR_FORCE;
248 	if (ia_valid & ATTR_ATTR_FLAG)
249 		sa_valid |= MDS_ATTR_ATTR_FLAG;
250 	if (ia_valid & ATTR_KILL_SUID)
251 		sa_valid |=  MDS_ATTR_KILL_SUID;
252 	if (ia_valid & ATTR_KILL_SGID)
253 		sa_valid |= MDS_ATTR_KILL_SGID;
254 	if (ia_valid & ATTR_CTIME_SET)
255 		sa_valid |= MDS_ATTR_CTIME_SET;
256 	if (ia_valid & ATTR_OPEN)
257 		sa_valid |= MDS_ATTR_FROM_OPEN;
258 	if (ia_valid & ATTR_BLOCKS)
259 		sa_valid |= MDS_ATTR_BLOCKS;
260 	if (ia_valid & MDS_OPEN_OWNEROVERRIDE)
261 		/* NFSD hack (see bug 5781) */
262 		sa_valid |= MDS_OPEN_OWNEROVERRIDE;
263 	return sa_valid;
264 }
265 
mdc_setattr_pack_rec(struct mdt_rec_setattr * rec,struct md_op_data * op_data)266 static void mdc_setattr_pack_rec(struct mdt_rec_setattr *rec,
267 				 struct md_op_data *op_data)
268 {
269 	rec->sa_opcode  = REINT_SETATTR;
270 	rec->sa_fsuid   = from_kuid(&init_user_ns, current_fsuid());
271 	rec->sa_fsgid   = from_kgid(&init_user_ns, current_fsgid());
272 	rec->sa_cap     = cfs_curproc_cap_pack();
273 	rec->sa_suppgid = -1;
274 
275 	rec->sa_fid    = op_data->op_fid1;
276 	rec->sa_valid  = attr_pack(op_data->op_attr.ia_valid);
277 	rec->sa_mode   = op_data->op_attr.ia_mode;
278 	rec->sa_uid    = from_kuid(&init_user_ns, op_data->op_attr.ia_uid);
279 	rec->sa_gid    = from_kgid(&init_user_ns, op_data->op_attr.ia_gid);
280 	rec->sa_size   = op_data->op_attr.ia_size;
281 	rec->sa_blocks = op_data->op_attr_blocks;
282 	rec->sa_atime  = LTIME_S(op_data->op_attr.ia_atime);
283 	rec->sa_mtime  = LTIME_S(op_data->op_attr.ia_mtime);
284 	rec->sa_ctime  = LTIME_S(op_data->op_attr.ia_ctime);
285 	rec->sa_attr_flags =
286 			((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags;
287 	if ((op_data->op_attr.ia_valid & ATTR_GID) &&
288 	    in_group_p(op_data->op_attr.ia_gid))
289 		rec->sa_suppgid =
290 			from_kgid(&init_user_ns, op_data->op_attr.ia_gid);
291 	else
292 		rec->sa_suppgid = op_data->op_suppgids[0];
293 
294 	rec->sa_bias = op_data->op_bias;
295 }
296 
mdc_ioepoch_pack(struct mdt_ioepoch * epoch,struct md_op_data * op_data)297 static void mdc_ioepoch_pack(struct mdt_ioepoch *epoch,
298 			     struct md_op_data *op_data)
299 {
300 	memcpy(&epoch->handle, &op_data->op_handle, sizeof(epoch->handle));
301 	epoch->ioepoch = op_data->op_ioepoch;
302 	epoch->flags = op_data->op_flags & MF_SOM_LOCAL_FLAGS;
303 }
304 
mdc_setattr_pack(struct ptlrpc_request * req,struct md_op_data * op_data,void * ea,int ealen,void * ea2,int ea2len)305 void mdc_setattr_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
306 		      void *ea, int ealen, void *ea2, int ea2len)
307 {
308 	struct mdt_rec_setattr *rec;
309 	struct mdt_ioepoch *epoch;
310 	struct lov_user_md *lum = NULL;
311 
312 	CLASSERT(sizeof(struct mdt_rec_reint) ==
313 					sizeof(struct mdt_rec_setattr));
314 	rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
315 	mdc_setattr_pack_rec(rec, op_data);
316 
317 	if (op_data->op_flags & (MF_SOM_CHANGE | MF_EPOCH_OPEN)) {
318 		epoch = req_capsule_client_get(&req->rq_pill, &RMF_MDT_EPOCH);
319 		mdc_ioepoch_pack(epoch, op_data);
320 	}
321 
322 	if (ealen == 0)
323 		return;
324 
325 	lum = req_capsule_client_get(&req->rq_pill, &RMF_EADATA);
326 	if (ea == NULL) { /* Remove LOV EA */
327 		lum->lmm_magic = LOV_USER_MAGIC_V1;
328 		lum->lmm_stripe_size = 0;
329 		lum->lmm_stripe_count = 0;
330 		lum->lmm_stripe_offset = (typeof(lum->lmm_stripe_offset))(-1);
331 	} else {
332 		memcpy(lum, ea, ealen);
333 	}
334 
335 	if (ea2len == 0)
336 		return;
337 
338 	memcpy(req_capsule_client_get(&req->rq_pill, &RMF_LOGCOOKIES), ea2,
339 	       ea2len);
340 }
341 
mdc_unlink_pack(struct ptlrpc_request * req,struct md_op_data * op_data)342 void mdc_unlink_pack(struct ptlrpc_request *req, struct md_op_data *op_data)
343 {
344 	struct mdt_rec_unlink *rec;
345 	char *tmp;
346 
347 	CLASSERT(sizeof(struct mdt_rec_reint) == sizeof(struct mdt_rec_unlink));
348 	rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
349 	LASSERT(rec != NULL);
350 
351 	rec->ul_opcode   = op_data->op_cli_flags & CLI_RM_ENTRY ?
352 					REINT_RMENTRY : REINT_UNLINK;
353 	rec->ul_fsuid    = op_data->op_fsuid;
354 	rec->ul_fsgid    = op_data->op_fsgid;
355 	rec->ul_cap      = op_data->op_cap;
356 	rec->ul_mode     = op_data->op_mode;
357 	rec->ul_suppgid1 = op_data->op_suppgids[0];
358 	rec->ul_suppgid2 = -1;
359 	rec->ul_fid1     = op_data->op_fid1;
360 	rec->ul_fid2     = op_data->op_fid2;
361 	rec->ul_time     = op_data->op_mod_time;
362 	rec->ul_bias     = op_data->op_bias;
363 
364 	tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
365 	LASSERT(tmp != NULL);
366 	LOGL0(op_data->op_name, op_data->op_namelen, tmp);
367 }
368 
mdc_link_pack(struct ptlrpc_request * req,struct md_op_data * op_data)369 void mdc_link_pack(struct ptlrpc_request *req, struct md_op_data *op_data)
370 {
371 	struct mdt_rec_link *rec;
372 	char *tmp;
373 
374 	CLASSERT(sizeof(struct mdt_rec_reint) == sizeof(struct mdt_rec_link));
375 	rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
376 	LASSERT(rec != NULL);
377 
378 	rec->lk_opcode   = REINT_LINK;
379 	rec->lk_fsuid    = op_data->op_fsuid; /* current->fsuid; */
380 	rec->lk_fsgid    = op_data->op_fsgid; /* current->fsgid; */
381 	rec->lk_cap      = op_data->op_cap;   /* current->cap_effective; */
382 	rec->lk_suppgid1 = op_data->op_suppgids[0];
383 	rec->lk_suppgid2 = op_data->op_suppgids[1];
384 	rec->lk_fid1     = op_data->op_fid1;
385 	rec->lk_fid2     = op_data->op_fid2;
386 	rec->lk_time     = op_data->op_mod_time;
387 	rec->lk_bias     = op_data->op_bias;
388 
389 	tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
390 	LOGL0(op_data->op_name, op_data->op_namelen, tmp);
391 }
392 
mdc_rename_pack(struct ptlrpc_request * req,struct md_op_data * op_data,const char * old,int oldlen,const char * new,int newlen)393 void mdc_rename_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
394 		     const char *old, int oldlen, const char *new, int newlen)
395 {
396 	struct mdt_rec_rename *rec;
397 	char *tmp;
398 
399 	CLASSERT(sizeof(struct mdt_rec_reint) == sizeof(struct mdt_rec_rename));
400 	rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
401 
402 	/* XXX do something about time, uid, gid */
403 	rec->rn_opcode   = REINT_RENAME;
404 	rec->rn_fsuid    = op_data->op_fsuid;
405 	rec->rn_fsgid    = op_data->op_fsgid;
406 	rec->rn_cap      = op_data->op_cap;
407 	rec->rn_suppgid1 = op_data->op_suppgids[0];
408 	rec->rn_suppgid2 = op_data->op_suppgids[1];
409 	rec->rn_fid1     = op_data->op_fid1;
410 	rec->rn_fid2     = op_data->op_fid2;
411 	rec->rn_time     = op_data->op_mod_time;
412 	rec->rn_mode     = op_data->op_mode;
413 	rec->rn_bias     = op_data->op_bias;
414 
415 	tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
416 	LOGL0(old, oldlen, tmp);
417 
418 	if (new) {
419 		tmp = req_capsule_client_get(&req->rq_pill, &RMF_SYMTGT);
420 		LOGL0(new, newlen, tmp);
421 	}
422 }
423 
mdc_getattr_pack(struct ptlrpc_request * req,__u64 valid,int flags,struct md_op_data * op_data,int ea_size)424 void mdc_getattr_pack(struct ptlrpc_request *req, __u64 valid, int flags,
425 		      struct md_op_data *op_data, int ea_size)
426 {
427 	struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
428 						    &RMF_MDT_BODY);
429 
430 	b->valid = valid;
431 	if (op_data->op_bias & MDS_CHECK_SPLIT)
432 		b->valid |= OBD_MD_FLCKSPLIT;
433 	if (op_data->op_bias & MDS_CROSS_REF)
434 		b->valid |= OBD_MD_FLCROSSREF;
435 	b->eadatasize = ea_size;
436 	b->flags = flags;
437 	__mdc_pack_body(b, op_data->op_suppgids[0]);
438 
439 	b->fid1 = op_data->op_fid1;
440 	b->fid2 = op_data->op_fid2;
441 	b->valid |= OBD_MD_FLID;
442 
443 	if (op_data->op_name) {
444 		char *tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
445 
446 		LOGL0(op_data->op_name, op_data->op_namelen, tmp);
447 
448 	}
449 }
450 
mdc_hsm_release_pack(struct ptlrpc_request * req,struct md_op_data * op_data)451 static void mdc_hsm_release_pack(struct ptlrpc_request *req,
452 				 struct md_op_data *op_data)
453 {
454 	if (op_data->op_bias & MDS_HSM_RELEASE) {
455 		struct close_data *data;
456 		struct ldlm_lock *lock;
457 
458 		data = req_capsule_client_get(&req->rq_pill, &RMF_CLOSE_DATA);
459 		LASSERT(data != NULL);
460 
461 		lock = ldlm_handle2lock(&op_data->op_lease_handle);
462 		if (lock != NULL) {
463 			data->cd_handle = lock->l_remote_handle;
464 			ldlm_lock_put(lock);
465 		}
466 		ldlm_cli_cancel(&op_data->op_lease_handle, LCF_LOCAL);
467 
468 		data->cd_data_version = op_data->op_data_version;
469 		data->cd_fid = op_data->op_fid2;
470 	}
471 }
472 
mdc_close_pack(struct ptlrpc_request * req,struct md_op_data * op_data)473 void mdc_close_pack(struct ptlrpc_request *req, struct md_op_data *op_data)
474 {
475 	struct mdt_ioepoch *epoch;
476 	struct mdt_rec_setattr *rec;
477 
478 	epoch = req_capsule_client_get(&req->rq_pill, &RMF_MDT_EPOCH);
479 	rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
480 
481 	mdc_setattr_pack_rec(rec, op_data);
482 	mdc_ioepoch_pack(epoch, op_data);
483 	mdc_hsm_release_pack(req, op_data);
484 }
485 
mdc_req_avail(struct client_obd * cli,struct mdc_cache_waiter * mcw)486 static int mdc_req_avail(struct client_obd *cli, struct mdc_cache_waiter *mcw)
487 {
488 	int rc;
489 
490 	client_obd_list_lock(&cli->cl_loi_list_lock);
491 	rc = list_empty(&mcw->mcw_entry);
492 	client_obd_list_unlock(&cli->cl_loi_list_lock);
493 	return rc;
494 };
495 
496 /* We record requests in flight in cli->cl_r_in_flight here.
497  * There is only one write rpc possible in mdc anyway. If this to change
498  * in the future - the code may need to be revisited. */
mdc_enter_request(struct client_obd * cli)499 int mdc_enter_request(struct client_obd *cli)
500 {
501 	int rc = 0;
502 	struct mdc_cache_waiter mcw;
503 	struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
504 
505 	client_obd_list_lock(&cli->cl_loi_list_lock);
506 	if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) {
507 		list_add_tail(&mcw.mcw_entry, &cli->cl_cache_waiters);
508 		init_waitqueue_head(&mcw.mcw_waitq);
509 		client_obd_list_unlock(&cli->cl_loi_list_lock);
510 		rc = l_wait_event(mcw.mcw_waitq, mdc_req_avail(cli, &mcw),
511 				  &lwi);
512 		if (rc) {
513 			client_obd_list_lock(&cli->cl_loi_list_lock);
514 			if (list_empty(&mcw.mcw_entry))
515 				cli->cl_r_in_flight--;
516 			list_del_init(&mcw.mcw_entry);
517 			client_obd_list_unlock(&cli->cl_loi_list_lock);
518 		}
519 	} else {
520 		cli->cl_r_in_flight++;
521 		client_obd_list_unlock(&cli->cl_loi_list_lock);
522 	}
523 	return rc;
524 }
525 
mdc_exit_request(struct client_obd * cli)526 void mdc_exit_request(struct client_obd *cli)
527 {
528 	struct list_head *l, *tmp;
529 	struct mdc_cache_waiter *mcw;
530 
531 	client_obd_list_lock(&cli->cl_loi_list_lock);
532 	cli->cl_r_in_flight--;
533 	list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
534 		if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) {
535 			/* No free request slots anymore */
536 			break;
537 		}
538 
539 		mcw = list_entry(l, struct mdc_cache_waiter, mcw_entry);
540 		list_del_init(&mcw->mcw_entry);
541 		cli->cl_r_in_flight++;
542 		wake_up(&mcw->mcw_waitq);
543 	}
544 	/* Empty waiting list? Decrease reqs in-flight number */
545 
546 	client_obd_list_unlock(&cli->cl_loi_list_lock);
547 }
548