GNU Linux-libre 4.14.328-gnu1
[releases.git] / drivers / staging / lustre / lustre / ldlm / ldlm_flock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003 Hewlett-Packard Development Company LP.
24  * Developed under the sponsorship of the US Government under
25  * Subcontract No. B514193
26  *
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2010, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 /**
38  * This file implements POSIX lock type for Lustre.
39  * Its policy properties are start and end of extent and PID.
40  *
41  * These locks are only done through MDS due to POSIX semantics requiring
42  * e.g. that locks could be only partially released and as such split into
43  * two parts, and also that two adjacent locks from the same process may be
44  * merged into a single wider lock.
45  *
46  * Lock modes are mapped like this:
47  * PR and PW for READ and WRITE locks
48  * NL to request a releasing of a portion of the lock
49  *
50  * These flock locks never timeout.
51  */
52
53 #define DEBUG_SUBSYSTEM S_LDLM
54
55 #include <lustre_dlm.h>
56 #include <obd_support.h>
57 #include <obd_class.h>
58 #include <lustre_lib.h>
59 #include <linux/list.h>
60 #include "ldlm_internal.h"
61
62 /**
63  * list_for_remaining_safe - iterate over the remaining entries in a list
64  *            and safeguard against removal of a list entry.
65  * \param pos   the &struct list_head to use as a loop counter. pos MUST
66  *            have been initialized prior to using it in this macro.
67  * \param n     another &struct list_head to use as temporary storage
68  * \param head  the head for your list.
69  */
70 #define list_for_remaining_safe(pos, n, head) \
71         for (n = pos->next; pos != (head); pos = n, n = pos->next)
72
73 static inline int
74 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
75 {
76         return((new->l_policy_data.l_flock.owner ==
77                 lock->l_policy_data.l_flock.owner) &&
78                (new->l_export == lock->l_export));
79 }
80
81 static inline int
82 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
83 {
84         return((new->l_policy_data.l_flock.start <=
85                 lock->l_policy_data.l_flock.end) &&
86                (new->l_policy_data.l_flock.end >=
87                 lock->l_policy_data.l_flock.start));
88 }
89
90 static inline void
91 ldlm_flock_destroy(struct ldlm_lock *lock, enum ldlm_mode mode, __u64 flags)
92 {
93         LDLM_DEBUG(lock, "%s(mode: %d, flags: 0x%llx)",
94                    __func__, mode, flags);
95
96         /* Safe to not lock here, since it should be empty anyway */
97         LASSERT(hlist_unhashed(&lock->l_exp_flock_hash));
98
99         list_del_init(&lock->l_res_link);
100         if (flags == LDLM_FL_WAIT_NOREPROC) {
101                 /* client side - set a flag to prevent sending a CANCEL */
102                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
103
104                 /* when reaching here, it is under lock_res_and_lock(). Thus,
105                  * need call the nolock version of ldlm_lock_decref_internal
106                  */
107                 ldlm_lock_decref_internal_nolock(lock, mode);
108         }
109
110         ldlm_lock_destroy_nolock(lock);
111 }
112
113 /**
114  * Process a granting attempt for flock lock.
115  * Must be called under ns lock held.
116  *
117  * This function looks for any conflicts for \a lock in the granted or
118  * waiting queues. The lock is granted if no conflicts are found in
119  * either queue.
120  *
121  * It is also responsible for splitting a lock if a portion of the lock
122  * is released.
123  *
124  * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
125  *   - blocking ASTs have already been sent
126  *
127  * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
128  *   - blocking ASTs have not been sent yet, so list of conflicting locks
129  *     would be collected and ASTs sent.
130  */
131 static int ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags,
132                                    int first_enq, enum ldlm_error *err,
133                                    struct list_head *work_list)
134 {
135         struct ldlm_resource *res = req->l_resource;
136         struct ldlm_namespace *ns = ldlm_res_to_ns(res);
137         struct list_head *tmp;
138         struct list_head *ownlocks = NULL;
139         struct ldlm_lock *lock = NULL;
140         struct ldlm_lock *new = req;
141         struct ldlm_lock *new2 = NULL;
142         enum ldlm_mode mode = req->l_req_mode;
143         int added = (mode == LCK_NL);
144         int overlaps = 0;
145         int splitted = 0;
146         const struct ldlm_callback_suite null_cbs = { };
147
148         CDEBUG(D_DLMTRACE,
149                "flags %#llx owner %llu pid %u mode %u start %llu end %llu\n",
150                *flags, new->l_policy_data.l_flock.owner,
151                new->l_policy_data.l_flock.pid, mode,
152                req->l_policy_data.l_flock.start,
153                req->l_policy_data.l_flock.end);
154
155         *err = ELDLM_OK;
156
157         /* No blocking ASTs are sent to the clients for
158          * Posix file & record locks
159          */
160         req->l_blocking_ast = NULL;
161
162 reprocess:
163         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
164                 /* This loop determines where this processes locks start
165                  * in the resource lr_granted list.
166                  */
167                 list_for_each(tmp, &res->lr_granted) {
168                         lock = list_entry(tmp, struct ldlm_lock,
169                                           l_res_link);
170                         if (ldlm_same_flock_owner(lock, req)) {
171                                 ownlocks = tmp;
172                                 break;
173                         }
174                 }
175         } else {
176                 int reprocess_failed = 0;
177
178                 lockmode_verify(mode);
179
180                 /* This loop determines if there are existing locks
181                  * that conflict with the new lock request.
182                  */
183                 list_for_each(tmp, &res->lr_granted) {
184                         lock = list_entry(tmp, struct ldlm_lock,
185                                           l_res_link);
186
187                         if (ldlm_same_flock_owner(lock, req)) {
188                                 if (!ownlocks)
189                                         ownlocks = tmp;
190                                 continue;
191                         }
192
193                         /* locks are compatible, overlap doesn't matter */
194                         if (lockmode_compat(lock->l_granted_mode, mode))
195                                 continue;
196
197                         if (!ldlm_flocks_overlap(lock, req))
198                                 continue;
199
200                         if (!first_enq) {
201                                 reprocess_failed = 1;
202                                 continue;
203                         }
204
205                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
206                                 ldlm_flock_destroy(req, mode, *flags);
207                                 *err = -EAGAIN;
208                                 return LDLM_ITER_STOP;
209                         }
210
211                         if (*flags & LDLM_FL_TEST_LOCK) {
212                                 ldlm_flock_destroy(req, mode, *flags);
213                                 req->l_req_mode = lock->l_granted_mode;
214                                 req->l_policy_data.l_flock.pid =
215                                         lock->l_policy_data.l_flock.pid;
216                                 req->l_policy_data.l_flock.start =
217                                         lock->l_policy_data.l_flock.start;
218                                 req->l_policy_data.l_flock.end =
219                                         lock->l_policy_data.l_flock.end;
220                                 *flags |= LDLM_FL_LOCK_CHANGED;
221                                 return LDLM_ITER_STOP;
222                         }
223
224                         ldlm_resource_add_lock(res, &res->lr_waiting, req);
225                         *flags |= LDLM_FL_BLOCK_GRANTED;
226                         return LDLM_ITER_STOP;
227                 }
228                 if (reprocess_failed)
229                         return LDLM_ITER_CONTINUE;
230         }
231
232         if (*flags & LDLM_FL_TEST_LOCK) {
233                 ldlm_flock_destroy(req, mode, *flags);
234                 req->l_req_mode = LCK_NL;
235                 *flags |= LDLM_FL_LOCK_CHANGED;
236                 return LDLM_ITER_STOP;
237         }
238
239         /* Scan the locks owned by this process that overlap this request.
240          * We may have to merge or split existing locks.
241          */
242         if (!ownlocks)
243                 ownlocks = &res->lr_granted;
244
245         list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
246                 lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
247
248                 if (!ldlm_same_flock_owner(lock, new))
249                         break;
250
251                 if (lock->l_granted_mode == mode) {
252                         /* If the modes are the same then we need to process
253                          * locks that overlap OR adjoin the new lock. The extra
254                          * logic condition is necessary to deal with arithmetic
255                          * overflow and underflow.
256                          */
257                         if ((new->l_policy_data.l_flock.start >
258                              (lock->l_policy_data.l_flock.end + 1)) &&
259                             (lock->l_policy_data.l_flock.end != OBD_OBJECT_EOF))
260                                 continue;
261
262                         if ((new->l_policy_data.l_flock.end <
263                              (lock->l_policy_data.l_flock.start - 1)) &&
264                             (lock->l_policy_data.l_flock.start != 0))
265                                 break;
266
267                         if (new->l_policy_data.l_flock.start <
268                             lock->l_policy_data.l_flock.start) {
269                                 lock->l_policy_data.l_flock.start =
270                                         new->l_policy_data.l_flock.start;
271                         } else {
272                                 new->l_policy_data.l_flock.start =
273                                         lock->l_policy_data.l_flock.start;
274                         }
275
276                         if (new->l_policy_data.l_flock.end >
277                             lock->l_policy_data.l_flock.end) {
278                                 lock->l_policy_data.l_flock.end =
279                                         new->l_policy_data.l_flock.end;
280                         } else {
281                                 new->l_policy_data.l_flock.end =
282                                         lock->l_policy_data.l_flock.end;
283                         }
284
285                         if (added) {
286                                 ldlm_flock_destroy(lock, mode, *flags);
287                         } else {
288                                 new = lock;
289                                 added = 1;
290                         }
291                         continue;
292                 }
293
294                 if (new->l_policy_data.l_flock.start >
295                     lock->l_policy_data.l_flock.end)
296                         continue;
297
298                 if (new->l_policy_data.l_flock.end <
299                     lock->l_policy_data.l_flock.start)
300                         break;
301
302                 ++overlaps;
303
304                 if (new->l_policy_data.l_flock.start <=
305                     lock->l_policy_data.l_flock.start) {
306                         if (new->l_policy_data.l_flock.end <
307                             lock->l_policy_data.l_flock.end) {
308                                 lock->l_policy_data.l_flock.start =
309                                         new->l_policy_data.l_flock.end + 1;
310                                 break;
311                         }
312                         ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
313                         continue;
314                 }
315                 if (new->l_policy_data.l_flock.end >=
316                     lock->l_policy_data.l_flock.end) {
317                         lock->l_policy_data.l_flock.end =
318                                 new->l_policy_data.l_flock.start - 1;
319                         continue;
320                 }
321
322                 /* split the existing lock into two locks */
323
324                 /* if this is an F_UNLCK operation then we could avoid
325                  * allocating a new lock and use the req lock passed in
326                  * with the request but this would complicate the reply
327                  * processing since updates to req get reflected in the
328                  * reply. The client side replays the lock request so
329                  * it must see the original lock data in the reply.
330                  */
331
332                 /* XXX - if ldlm_lock_new() can sleep we should
333                  * release the lr_lock, allocate the new lock,
334                  * and restart processing this lock.
335                  */
336                 if (!new2) {
337                         unlock_res_and_lock(req);
338                         new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
339                                                 lock->l_granted_mode, &null_cbs,
340                                                 NULL, 0, LVB_T_NONE);
341                         lock_res_and_lock(req);
342                         if (IS_ERR(new2)) {
343                                 ldlm_flock_destroy(req, lock->l_granted_mode,
344                                                    *flags);
345                                 *err = PTR_ERR(new2);
346                                 return LDLM_ITER_STOP;
347                         }
348                         goto reprocess;
349                 }
350
351                 splitted = 1;
352
353                 new2->l_granted_mode = lock->l_granted_mode;
354                 new2->l_policy_data.l_flock.pid =
355                         new->l_policy_data.l_flock.pid;
356                 new2->l_policy_data.l_flock.owner =
357                         new->l_policy_data.l_flock.owner;
358                 new2->l_policy_data.l_flock.start =
359                         lock->l_policy_data.l_flock.start;
360                 new2->l_policy_data.l_flock.end =
361                         new->l_policy_data.l_flock.start - 1;
362                 lock->l_policy_data.l_flock.start =
363                         new->l_policy_data.l_flock.end + 1;
364                 new2->l_conn_export = lock->l_conn_export;
365                 if (lock->l_export) {
366                         new2->l_export = class_export_lock_get(lock->l_export,
367                                                                new2);
368                         if (new2->l_export->exp_lock_hash &&
369                             hlist_unhashed(&new2->l_exp_hash))
370                                 cfs_hash_add(new2->l_export->exp_lock_hash,
371                                              &new2->l_remote_handle,
372                                              &new2->l_exp_hash);
373                 }
374                 if (*flags == LDLM_FL_WAIT_NOREPROC)
375                         ldlm_lock_addref_internal_nolock(new2,
376                                                          lock->l_granted_mode);
377
378                 /* insert new2 at lock */
379                 ldlm_resource_add_lock(res, ownlocks, new2);
380                 LDLM_LOCK_RELEASE(new2);
381                 break;
382         }
383
384         /* if new2 is created but never used, destroy it*/
385         if (splitted == 0 && new2)
386                 ldlm_lock_destroy_nolock(new2);
387
388         /* At this point we're granting the lock request. */
389         req->l_granted_mode = req->l_req_mode;
390
391         if (!added) {
392                 list_del_init(&req->l_res_link);
393                 /* insert new lock before ownlocks in list. */
394                 ldlm_resource_add_lock(res, ownlocks, req);
395         }
396
397         if (*flags != LDLM_FL_WAIT_NOREPROC) {
398                 /* The only one possible case for client-side calls flock
399                  * policy function is ldlm_flock_completion_ast inside which
400                  * carries LDLM_FL_WAIT_NOREPROC flag.
401                  */
402                 CERROR("Illegal parameter for client-side-only module.\n");
403                 LBUG();
404         }
405
406         /* In case we're reprocessing the requested lock we can't destroy
407          * it until after calling ldlm_add_ast_work_item() above so that laawi()
408          * can bump the reference count on \a req. Otherwise \a req
409          * could be freed before the completion AST can be sent.
410          */
411         if (added)
412                 ldlm_flock_destroy(req, mode, *flags);
413
414         ldlm_resource_dump(D_INFO, res);
415         return LDLM_ITER_CONTINUE;
416 }
417
418 struct ldlm_flock_wait_data {
419         struct ldlm_lock *fwd_lock;
420         int            fwd_generation;
421 };
422
423 static void
424 ldlm_flock_interrupted_wait(void *data)
425 {
426         struct ldlm_lock *lock;
427
428         lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
429
430         lock_res_and_lock(lock);
431
432         /* client side - set flag to prevent lock from being put on LRU list */
433         ldlm_set_cbpending(lock);
434         unlock_res_and_lock(lock);
435 }
436
437 /**
438  * Flock completion callback function.
439  *
440  * \param lock [in,out]: A lock to be handled
441  * \param flags    [in]: flags
442  * \param *data    [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
443  *
444  * \retval 0    : success
445  * \retval <0   : failure
446  */
447 int
448 ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
449 {
450         struct file_lock                *getlk = lock->l_ast_data;
451         struct obd_device             *obd;
452         struct obd_import             *imp = NULL;
453         struct ldlm_flock_wait_data     fwd;
454         struct l_wait_info            lwi;
455         enum ldlm_error             err;
456         int                          rc = 0;
457
458         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT2, 4);
459         if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_CP_CB_WAIT3)) {
460                 lock_res_and_lock(lock);
461                 lock->l_flags |= LDLM_FL_FAIL_LOC;
462                 unlock_res_and_lock(lock);
463                 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT3, 4);
464         }
465         CDEBUG(D_DLMTRACE, "flags: 0x%llx data: %p getlk: %p\n",
466                flags, data, getlk);
467
468         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
469
470         if (flags & LDLM_FL_FAILED)
471                 goto granted;
472
473         if (!(flags & LDLM_FL_BLOCKED_MASK)) {
474                 if (!data)
475                         /* mds granted the lock in the reply */
476                         goto granted;
477                 /* CP AST RPC: lock get granted, wake it up */
478                 wake_up(&lock->l_waitq);
479                 return 0;
480         }
481
482         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, sleeping");
483         fwd.fwd_lock = lock;
484         obd = class_exp2obd(lock->l_conn_export);
485
486         /* if this is a local lock, there is no import */
487         if (obd)
488                 imp = obd->u.cli.cl_import;
489
490         if (imp) {
491                 spin_lock(&imp->imp_lock);
492                 fwd.fwd_generation = imp->imp_generation;
493                 spin_unlock(&imp->imp_lock);
494         }
495
496         lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
497
498         /* Go to sleep until the lock is granted. */
499         rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
500
501         if (rc) {
502                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
503                            rc);
504                 return rc;
505         }
506
507 granted:
508         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
509
510         if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_CP_CB_WAIT4)) {
511                 lock_res_and_lock(lock);
512                 /* DEADLOCK is always set with CBPENDING */
513                 lock->l_flags |= LDLM_FL_FLOCK_DEADLOCK | LDLM_FL_CBPENDING;
514                 unlock_res_and_lock(lock);
515                 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT4, 4);
516         }
517         if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_CP_CB_WAIT5)) {
518                 lock_res_and_lock(lock);
519                 /* DEADLOCK is always set with CBPENDING */
520                 lock->l_flags |= LDLM_FL_FAIL_LOC |
521                                  LDLM_FL_FLOCK_DEADLOCK | LDLM_FL_CBPENDING;
522                 unlock_res_and_lock(lock);
523                 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT5, 4);
524         }
525
526         lock_res_and_lock(lock);
527
528         /*
529          * Protect against race where lock could have been just destroyed
530          * due to overlap in ldlm_process_flock_lock().
531          */
532         if (ldlm_is_destroyed(lock)) {
533                 unlock_res_and_lock(lock);
534                 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
535                 /*
536                  * An error is still to be returned, to propagate it up to
537                  * ldlm_cli_enqueue_fini() caller.
538                  */
539                 return -EIO;
540         }
541
542         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
543         ldlm_resource_unlink_lock(lock);
544
545         /*
546          * Import invalidation. We need to actually release the lock
547          * references being held, so that it can go away. No point in
548          * holding the lock even if app still believes it has it, since
549          * server already dropped it anyway. Only for granted locks too.
550          */
551         /* Do the same for DEADLOCK'ed locks. */
552         if (ldlm_is_failed(lock) || ldlm_is_flock_deadlock(lock)) {
553                 int mode;
554
555                 if (flags & LDLM_FL_TEST_LOCK)
556                         LASSERT(ldlm_is_test_lock(lock));
557
558                 if (ldlm_is_test_lock(lock) || ldlm_is_flock_deadlock(lock))
559                         mode = getlk->fl_type;
560                 else
561                         mode = lock->l_granted_mode;
562
563                 if (ldlm_is_flock_deadlock(lock)) {
564                         LDLM_DEBUG(lock, "client-side enqueue deadlock received");
565                         rc = -EDEADLK;
566                 }
567                 ldlm_flock_destroy(lock, mode, LDLM_FL_WAIT_NOREPROC);
568                 unlock_res_and_lock(lock);
569
570                 /* Need to wake up the waiter if we were evicted */
571                 wake_up(&lock->l_waitq);
572
573                 /*
574                  * An error is still to be returned, to propagate it up to
575                  * ldlm_cli_enqueue_fini() caller.
576                  */
577                 return rc ? : -EIO;
578         }
579
580         LDLM_DEBUG(lock, "client-side enqueue granted");
581
582         if (flags & LDLM_FL_TEST_LOCK) {
583                 /* fcntl(F_GETLK) request */
584                 /* The old mode was saved in getlk->fl_type so that if the mode
585                  * in the lock changes we can decref the appropriate refcount.
586                  */
587                 LASSERT(ldlm_is_test_lock(lock));
588                 ldlm_flock_destroy(lock, getlk->fl_type, LDLM_FL_WAIT_NOREPROC);
589                 switch (lock->l_granted_mode) {
590                 case LCK_PR:
591                         getlk->fl_type = F_RDLCK;
592                         break;
593                 case LCK_PW:
594                         getlk->fl_type = F_WRLCK;
595                         break;
596                 default:
597                         getlk->fl_type = F_UNLCK;
598                 }
599                 getlk->fl_pid = -(pid_t)lock->l_policy_data.l_flock.pid;
600                 getlk->fl_start = (loff_t)lock->l_policy_data.l_flock.start;
601                 getlk->fl_end = (loff_t)lock->l_policy_data.l_flock.end;
602         } else {
603                 __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
604
605                 /* We need to reprocess the lock to do merges or splits
606                  * with existing locks owned by this process.
607                  */
608                 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
609         }
610         unlock_res_and_lock(lock);
611         return rc;
612 }
613 EXPORT_SYMBOL(ldlm_flock_completion_ast);
614
615 void ldlm_flock_policy_wire_to_local(const union ldlm_wire_policy_data *wpolicy,
616                                      union ldlm_policy_data *lpolicy)
617 {
618         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
619         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
620         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
621         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
622 }
623
624 void ldlm_flock_policy_local_to_wire(const union ldlm_policy_data *lpolicy,
625                                      union ldlm_wire_policy_data *wpolicy)
626 {
627         memset(wpolicy, 0, sizeof(*wpolicy));
628         wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
629         wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
630         wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
631         wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
632 }