GNU Linux-libre 4.14.294-gnu1
[releases.git] / drivers / staging / lustre / lustre / obdecho / echo_client.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_ECHO
34 #include <linux/libcfs/libcfs.h>
35
36 #include <obd.h>
37 #include <obd_support.h>
38 #include <obd_class.h>
39 #include <lustre_debug.h>
40 #include <lprocfs_status.h>
41 #include <cl_object.h>
42 #include <lustre_fid.h>
43 #include <lustre_acl.h>
44 #include <uapi/linux/lustre/lustre_ioctl.h>
45 #include <lustre_net.h>
46
47 #include "echo_internal.h"
48
49 /** \defgroup echo_client Echo Client
50  * @{
51  */
52
53 struct echo_device {
54         struct cl_device        ed_cl;
55         struct echo_client_obd *ed_ec;
56
57         struct cl_site    ed_site_myself;
58         struct lu_site          *ed_site;
59         struct lu_device       *ed_next;
60 };
61
62 struct echo_object {
63         struct cl_object        eo_cl;
64         struct cl_object_header eo_hdr;
65
66         struct echo_device     *eo_dev;
67         struct list_head              eo_obj_chain;
68         struct lov_oinfo       *eo_oinfo;
69         atomic_t            eo_npages;
70         int                  eo_deleted;
71 };
72
73 struct echo_object_conf {
74         struct cl_object_conf  eoc_cl;
75         struct lov_oinfo      **eoc_oinfo;
76 };
77
78 struct echo_page {
79         struct cl_page_slice   ep_cl;
80         struct mutex            ep_lock;
81 };
82
83 struct echo_lock {
84         struct cl_lock_slice   el_cl;
85         struct list_head             el_chain;
86         struct echo_object    *el_object;
87         __u64             el_cookie;
88         atomic_t           el_refcount;
89 };
90
91 static int echo_client_setup(const struct lu_env *env,
92                              struct obd_device *obddev,
93                              struct lustre_cfg *lcfg);
94 static int echo_client_cleanup(struct obd_device *obddev);
95
96 /** \defgroup echo_helpers Helper functions
97  * @{
98  */
99 static inline struct echo_device *cl2echo_dev(const struct cl_device *dev)
100 {
101         return container_of0(dev, struct echo_device, ed_cl);
102 }
103
104 static inline struct cl_device *echo_dev2cl(struct echo_device *d)
105 {
106         return &d->ed_cl;
107 }
108
109 static inline struct echo_device *obd2echo_dev(const struct obd_device *obd)
110 {
111         return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev));
112 }
113
114 static inline struct cl_object *echo_obj2cl(struct echo_object *eco)
115 {
116         return &eco->eo_cl;
117 }
118
119 static inline struct echo_object *cl2echo_obj(const struct cl_object *o)
120 {
121         return container_of(o, struct echo_object, eo_cl);
122 }
123
124 static inline struct echo_page *cl2echo_page(const struct cl_page_slice *s)
125 {
126         return container_of(s, struct echo_page, ep_cl);
127 }
128
129 static inline struct echo_lock *cl2echo_lock(const struct cl_lock_slice *s)
130 {
131         return container_of(s, struct echo_lock, el_cl);
132 }
133
134 static inline struct cl_lock *echo_lock2cl(const struct echo_lock *ecl)
135 {
136         return ecl->el_cl.cls_lock;
137 }
138
139 static struct lu_context_key echo_thread_key;
140 static inline struct echo_thread_info *echo_env_info(const struct lu_env *env)
141 {
142         struct echo_thread_info *info;
143
144         info = lu_context_key_get(&env->le_ctx, &echo_thread_key);
145         LASSERT(info);
146         return info;
147 }
148
149 static inline
150 struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c)
151 {
152         return container_of(c, struct echo_object_conf, eoc_cl);
153 }
154
155 /** @} echo_helpers */
156 static int cl_echo_object_put(struct echo_object *eco);
157 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
158                               struct page **pages, int npages, int async);
159
160 struct echo_thread_info {
161         struct echo_object_conf eti_conf;
162         struct lustre_md        eti_md;
163
164         struct cl_2queue        eti_queue;
165         struct cl_io        eti_io;
166         struct cl_lock          eti_lock;
167         struct lu_fid      eti_fid;
168         struct lu_fid           eti_fid2;
169 };
170
171 /* No session used right now */
172 struct echo_session_info {
173         unsigned long dummy;
174 };
175
176 static struct kmem_cache *echo_lock_kmem;
177 static struct kmem_cache *echo_object_kmem;
178 static struct kmem_cache *echo_thread_kmem;
179 static struct kmem_cache *echo_session_kmem;
180
181 static struct lu_kmem_descr echo_caches[] = {
182         {
183                 .ckd_cache = &echo_lock_kmem,
184                 .ckd_name  = "echo_lock_kmem",
185                 .ckd_size  = sizeof(struct echo_lock)
186         },
187         {
188                 .ckd_cache = &echo_object_kmem,
189                 .ckd_name  = "echo_object_kmem",
190                 .ckd_size  = sizeof(struct echo_object)
191         },
192         {
193                 .ckd_cache = &echo_thread_kmem,
194                 .ckd_name  = "echo_thread_kmem",
195                 .ckd_size  = sizeof(struct echo_thread_info)
196         },
197         {
198                 .ckd_cache = &echo_session_kmem,
199                 .ckd_name  = "echo_session_kmem",
200                 .ckd_size  = sizeof(struct echo_session_info)
201         },
202         {
203                 .ckd_cache = NULL
204         }
205 };
206
207 /** \defgroup echo_page Page operations
208  *
209  * Echo page operations.
210  *
211  * @{
212  */
213 static int echo_page_own(const struct lu_env *env,
214                          const struct cl_page_slice *slice,
215                          struct cl_io *io, int nonblock)
216 {
217         struct echo_page *ep = cl2echo_page(slice);
218
219         if (!nonblock)
220                 mutex_lock(&ep->ep_lock);
221         else if (!mutex_trylock(&ep->ep_lock))
222                 return -EAGAIN;
223         return 0;
224 }
225
226 static void echo_page_disown(const struct lu_env *env,
227                              const struct cl_page_slice *slice,
228                              struct cl_io *io)
229 {
230         struct echo_page *ep = cl2echo_page(slice);
231
232         LASSERT(mutex_is_locked(&ep->ep_lock));
233         mutex_unlock(&ep->ep_lock);
234 }
235
236 static void echo_page_discard(const struct lu_env *env,
237                               const struct cl_page_slice *slice,
238                               struct cl_io *unused)
239 {
240         cl_page_delete(env, slice->cpl_page);
241 }
242
243 static int echo_page_is_vmlocked(const struct lu_env *env,
244                                  const struct cl_page_slice *slice)
245 {
246         if (mutex_is_locked(&cl2echo_page(slice)->ep_lock))
247                 return -EBUSY;
248         return -ENODATA;
249 }
250
251 static void echo_page_completion(const struct lu_env *env,
252                                  const struct cl_page_slice *slice,
253                                  int ioret)
254 {
255         LASSERT(slice->cpl_page->cp_sync_io);
256 }
257
258 static void echo_page_fini(const struct lu_env *env,
259                            struct cl_page_slice *slice)
260 {
261         struct echo_object *eco = cl2echo_obj(slice->cpl_obj);
262
263         atomic_dec(&eco->eo_npages);
264         put_page(slice->cpl_page->cp_vmpage);
265 }
266
267 static int echo_page_prep(const struct lu_env *env,
268                           const struct cl_page_slice *slice,
269                           struct cl_io *unused)
270 {
271         return 0;
272 }
273
274 static int echo_page_print(const struct lu_env *env,
275                            const struct cl_page_slice *slice,
276                            void *cookie, lu_printer_t printer)
277 {
278         struct echo_page *ep = cl2echo_page(slice);
279
280         (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME "-page@%p %d vm@%p\n",
281                    ep, mutex_is_locked(&ep->ep_lock),
282                    slice->cpl_page->cp_vmpage);
283         return 0;
284 }
285
286 static const struct cl_page_operations echo_page_ops = {
287         .cpo_own           = echo_page_own,
288         .cpo_disown     = echo_page_disown,
289         .cpo_discard       = echo_page_discard,
290         .cpo_fini         = echo_page_fini,
291         .cpo_print       = echo_page_print,
292         .cpo_is_vmlocked   = echo_page_is_vmlocked,
293         .io = {
294                 [CRT_READ] = {
295                         .cpo_prep       = echo_page_prep,
296                         .cpo_completion  = echo_page_completion,
297                 },
298                 [CRT_WRITE] = {
299                         .cpo_prep       = echo_page_prep,
300                         .cpo_completion  = echo_page_completion,
301                 }
302         }
303 };
304
305 /** @} echo_page */
306
307 /** \defgroup echo_lock Locking
308  *
309  * echo lock operations
310  *
311  * @{
312  */
313 static void echo_lock_fini(const struct lu_env *env,
314                            struct cl_lock_slice *slice)
315 {
316         struct echo_lock *ecl = cl2echo_lock(slice);
317
318         LASSERT(list_empty(&ecl->el_chain));
319         kmem_cache_free(echo_lock_kmem, ecl);
320 }
321
322 static const struct cl_lock_operations echo_lock_ops = {
323         .clo_fini      = echo_lock_fini,
324 };
325
326 /** @} echo_lock */
327
328 /** \defgroup echo_cl_ops cl_object operations
329  *
330  * operations for cl_object
331  *
332  * @{
333  */
334 static int echo_page_init(const struct lu_env *env, struct cl_object *obj,
335                           struct cl_page *page, pgoff_t index)
336 {
337         struct echo_page *ep = cl_object_page_slice(obj, page);
338         struct echo_object *eco = cl2echo_obj(obj);
339
340         get_page(page->cp_vmpage);
341         mutex_init(&ep->ep_lock);
342         cl_page_slice_add(page, &ep->ep_cl, obj, index, &echo_page_ops);
343         atomic_inc(&eco->eo_npages);
344         return 0;
345 }
346
347 static int echo_io_init(const struct lu_env *env, struct cl_object *obj,
348                         struct cl_io *io)
349 {
350         return 0;
351 }
352
353 static int echo_lock_init(const struct lu_env *env,
354                           struct cl_object *obj, struct cl_lock *lock,
355                           const struct cl_io *unused)
356 {
357         struct echo_lock *el;
358
359         el = kmem_cache_zalloc(echo_lock_kmem, GFP_NOFS);
360         if (el) {
361                 cl_lock_slice_add(lock, &el->el_cl, obj, &echo_lock_ops);
362                 el->el_object = cl2echo_obj(obj);
363                 INIT_LIST_HEAD(&el->el_chain);
364                 atomic_set(&el->el_refcount, 0);
365         }
366         return !el ? -ENOMEM : 0;
367 }
368
369 static int echo_conf_set(const struct lu_env *env, struct cl_object *obj,
370                          const struct cl_object_conf *conf)
371 {
372         return 0;
373 }
374
375 static const struct cl_object_operations echo_cl_obj_ops = {
376         .coo_page_init = echo_page_init,
377         .coo_lock_init = echo_lock_init,
378         .coo_io_init   = echo_io_init,
379         .coo_conf_set  = echo_conf_set
380 };
381
382 /** @} echo_cl_ops */
383
384 /** \defgroup echo_lu_ops lu_object operations
385  *
386  * operations for echo lu object.
387  *
388  * @{
389  */
390 static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
391                             const struct lu_object_conf *conf)
392 {
393         struct echo_device *ed   = cl2echo_dev(lu2cl_dev(obj->lo_dev));
394         struct echo_client_obd *ec     = ed->ed_ec;
395         struct echo_object *eco = cl2echo_obj(lu2cl(obj));
396         const struct cl_object_conf *cconf;
397         struct echo_object_conf *econf;
398
399         if (ed->ed_next) {
400                 struct lu_object  *below;
401                 struct lu_device  *under;
402
403                 under = ed->ed_next;
404                 below = under->ld_ops->ldo_object_alloc(env, obj->lo_header,
405                                                         under);
406                 if (!below)
407                         return -ENOMEM;
408                 lu_object_add(obj, below);
409         }
410
411         cconf = lu2cl_conf(conf);
412         econf = cl2echo_conf(cconf);
413
414         LASSERT(econf->eoc_oinfo);
415         /*
416          * Transfer the oinfo pointer to eco that it won't be
417          * freed.
418          */
419         eco->eo_oinfo = *econf->eoc_oinfo;
420         *econf->eoc_oinfo = NULL;
421
422         eco->eo_dev = ed;
423         atomic_set(&eco->eo_npages, 0);
424         cl_object_page_init(lu2cl(obj), sizeof(struct echo_page));
425
426         spin_lock(&ec->ec_lock);
427         list_add_tail(&eco->eo_obj_chain, &ec->ec_objects);
428         spin_unlock(&ec->ec_lock);
429
430         return 0;
431 }
432
433 static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
434 {
435         struct echo_object *eco    = cl2echo_obj(lu2cl(obj));
436         struct echo_client_obd *ec = eco->eo_dev->ed_ec;
437
438         LASSERT(atomic_read(&eco->eo_npages) == 0);
439
440         spin_lock(&ec->ec_lock);
441         list_del_init(&eco->eo_obj_chain);
442         spin_unlock(&ec->ec_lock);
443
444         lu_object_fini(obj);
445         lu_object_header_fini(obj->lo_header);
446
447         kfree(eco->eo_oinfo);
448         kmem_cache_free(echo_object_kmem, eco);
449 }
450
451 static int echo_object_print(const struct lu_env *env, void *cookie,
452                              lu_printer_t p, const struct lu_object *o)
453 {
454         struct echo_object *obj = cl2echo_obj(lu2cl(o));
455
456         return (*p)(env, cookie, "echoclient-object@%p", obj);
457 }
458
459 static const struct lu_object_operations echo_lu_obj_ops = {
460         .loo_object_init      = echo_object_init,
461         .loo_object_delete    = NULL,
462         .loo_object_release   = NULL,
463         .loo_object_free      = echo_object_free,
464         .loo_object_print     = echo_object_print,
465         .loo_object_invariant = NULL
466 };
467
468 /** @} echo_lu_ops */
469
470 /** \defgroup echo_lu_dev_ops  lu_device operations
471  *
472  * Operations for echo lu device.
473  *
474  * @{
475  */
476 static struct lu_object *echo_object_alloc(const struct lu_env *env,
477                                            const struct lu_object_header *hdr,
478                                            struct lu_device *dev)
479 {
480         struct echo_object *eco;
481         struct lu_object *obj = NULL;
482
483         /* we're the top dev. */
484         LASSERT(!hdr);
485         eco = kmem_cache_zalloc(echo_object_kmem, GFP_NOFS);
486         if (eco) {
487                 struct cl_object_header *hdr = &eco->eo_hdr;
488
489                 obj = &echo_obj2cl(eco)->co_lu;
490                 cl_object_header_init(hdr);
491                 hdr->coh_page_bufsize = cfs_size_round(sizeof(struct cl_page));
492
493                 lu_object_init(obj, &hdr->coh_lu, dev);
494                 lu_object_add_top(&hdr->coh_lu, obj);
495
496                 eco->eo_cl.co_ops = &echo_cl_obj_ops;
497                 obj->lo_ops       = &echo_lu_obj_ops;
498         }
499         return obj;
500 }
501
502 static const struct lu_device_operations echo_device_lu_ops = {
503         .ldo_object_alloc   = echo_object_alloc,
504 };
505
506 /** @} echo_lu_dev_ops */
507
508 /** \defgroup echo_init Setup and teardown
509  *
510  * Init and fini functions for echo client.
511  *
512  * @{
513  */
514 static int echo_site_init(const struct lu_env *env, struct echo_device *ed)
515 {
516         struct cl_site *site = &ed->ed_site_myself;
517         int rc;
518
519         /* initialize site */
520         rc = cl_site_init(site, &ed->ed_cl);
521         if (rc) {
522                 CERROR("Cannot initialize site for echo client(%d)\n", rc);
523                 return rc;
524         }
525
526         rc = lu_site_init_finish(&site->cs_lu);
527         if (rc) {
528                 cl_site_fini(site);
529                 return rc;
530         }
531
532         ed->ed_site = &site->cs_lu;
533         return 0;
534 }
535
536 static void echo_site_fini(const struct lu_env *env, struct echo_device *ed)
537 {
538         if (ed->ed_site) {
539                 lu_site_fini(ed->ed_site);
540                 ed->ed_site = NULL;
541         }
542 }
543
544 static void *echo_thread_key_init(const struct lu_context *ctx,
545                                   struct lu_context_key *key)
546 {
547         struct echo_thread_info *info;
548
549         info = kmem_cache_zalloc(echo_thread_kmem, GFP_NOFS);
550         if (!info)
551                 info = ERR_PTR(-ENOMEM);
552         return info;
553 }
554
555 static void echo_thread_key_fini(const struct lu_context *ctx,
556                                  struct lu_context_key *key, void *data)
557 {
558         struct echo_thread_info *info = data;
559
560         kmem_cache_free(echo_thread_kmem, info);
561 }
562
563 static struct lu_context_key echo_thread_key = {
564         .lct_tags = LCT_CL_THREAD,
565         .lct_init = echo_thread_key_init,
566         .lct_fini = echo_thread_key_fini,
567 };
568
569 static void *echo_session_key_init(const struct lu_context *ctx,
570                                    struct lu_context_key *key)
571 {
572         struct echo_session_info *session;
573
574         session = kmem_cache_zalloc(echo_session_kmem, GFP_NOFS);
575         if (!session)
576                 session = ERR_PTR(-ENOMEM);
577         return session;
578 }
579
580 static void echo_session_key_fini(const struct lu_context *ctx,
581                                   struct lu_context_key *key, void *data)
582 {
583         struct echo_session_info *session = data;
584
585         kmem_cache_free(echo_session_kmem, session);
586 }
587
588 static struct lu_context_key echo_session_key = {
589         .lct_tags = LCT_SESSION,
590         .lct_init = echo_session_key_init,
591         .lct_fini = echo_session_key_fini,
592 };
593
594 LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
595
596 static struct lu_device *echo_device_alloc(const struct lu_env *env,
597                                            struct lu_device_type *t,
598                                            struct lustre_cfg *cfg)
599 {
600         struct lu_device   *next;
601         struct echo_device *ed;
602         struct cl_device   *cd;
603         struct obd_device  *obd = NULL; /* to keep compiler happy */
604         struct obd_device  *tgt;
605         const char *tgt_type_name;
606         int rc, err;
607
608         ed = kzalloc(sizeof(*ed), GFP_NOFS);
609         if (!ed) {
610                 rc = -ENOMEM;
611                 goto out;
612         }
613
614         cd = &ed->ed_cl;
615         rc = cl_device_init(cd, t);
616         if (rc)
617                 goto out_free;
618
619         cd->cd_lu_dev.ld_ops = &echo_device_lu_ops;
620
621         obd = class_name2obd(lustre_cfg_string(cfg, 0));
622         LASSERT(obd);
623         LASSERT(env);
624
625         tgt = class_name2obd(lustre_cfg_string(cfg, 1));
626         if (!tgt) {
627                 CERROR("Can not find tgt device %s\n",
628                        lustre_cfg_string(cfg, 1));
629                 rc = -ENODEV;
630                 goto out_device_fini;
631         }
632
633         next = tgt->obd_lu_dev;
634         if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
635                 CERROR("echo MDT client must be run on server\n");
636                 rc = -EOPNOTSUPP;
637                 goto out_device_fini;
638         }
639
640         rc = echo_site_init(env, ed);
641         if (rc)
642                 goto out_device_fini;
643
644         rc = echo_client_setup(env, obd, cfg);
645         if (rc)
646                 goto out_site_fini;
647
648         ed->ed_ec = &obd->u.echo_client;
649
650         /* if echo client is to be stacked upon ost device, the next is
651          * NULL since ost is not a clio device so far
652          */
653         if (next && !lu_device_is_cl(next))
654                 next = NULL;
655
656         tgt_type_name = tgt->obd_type->typ_name;
657         if (next) {
658                 if (next->ld_site) {
659                         rc = -EBUSY;
660                         goto out_cleanup;
661                 }
662
663                 next->ld_site = ed->ed_site;
664                 rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
665                                                 next->ld_type->ldt_name,
666                                                               NULL);
667                 if (rc)
668                         goto out_cleanup;
669
670         } else {
671                 LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
672         }
673
674         ed->ed_next = next;
675         return &cd->cd_lu_dev;
676
677 out_cleanup:
678         err = echo_client_cleanup(obd);
679         if (err)
680                 CERROR("Cleanup obd device %s error(%d)\n",
681                        obd->obd_name, err);
682 out_site_fini:
683         echo_site_fini(env, ed);
684 out_device_fini:
685         cl_device_fini(&ed->ed_cl);
686 out_free:
687         kfree(ed);
688 out:
689         return ERR_PTR(rc);
690 }
691
692 static int echo_device_init(const struct lu_env *env, struct lu_device *d,
693                             const char *name, struct lu_device *next)
694 {
695         LBUG();
696         return 0;
697 }
698
699 static struct lu_device *echo_device_fini(const struct lu_env *env,
700                                           struct lu_device *d)
701 {
702         struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
703         struct lu_device *next = ed->ed_next;
704
705         while (next)
706                 next = next->ld_type->ldt_ops->ldto_device_fini(env, next);
707         return NULL;
708 }
709
710 static void echo_lock_release(const struct lu_env *env,
711                               struct echo_lock *ecl,
712                               int still_used)
713 {
714         struct cl_lock *clk = echo_lock2cl(ecl);
715
716         cl_lock_release(env, clk);
717 }
718
719 static struct lu_device *echo_device_free(const struct lu_env *env,
720                                           struct lu_device *d)
721 {
722         struct echo_device     *ed   = cl2echo_dev(lu2cl_dev(d));
723         struct echo_client_obd *ec   = ed->ed_ec;
724         struct echo_object     *eco;
725         struct lu_device       *next = ed->ed_next;
726
727         CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
728                ed, next);
729
730         lu_site_purge(env, ed->ed_site, -1);
731
732         /* check if there are objects still alive.
733          * It shouldn't have any object because lu_site_purge would cleanup
734          * all of cached objects. Anyway, probably the echo device is being
735          * parallelly accessed.
736          */
737         spin_lock(&ec->ec_lock);
738         list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain)
739                 eco->eo_deleted = 1;
740         spin_unlock(&ec->ec_lock);
741
742         /* purge again */
743         lu_site_purge(env, ed->ed_site, -1);
744
745         CDEBUG(D_INFO,
746                "Waiting for the reference of echo object to be dropped\n");
747
748         /* Wait for the last reference to be dropped. */
749         spin_lock(&ec->ec_lock);
750         while (!list_empty(&ec->ec_objects)) {
751                 spin_unlock(&ec->ec_lock);
752                 CERROR("echo_client still has objects at cleanup time, wait for 1 second\n");
753                 set_current_state(TASK_UNINTERRUPTIBLE);
754                 schedule_timeout(cfs_time_seconds(1));
755                 lu_site_purge(env, ed->ed_site, -1);
756                 spin_lock(&ec->ec_lock);
757         }
758         spin_unlock(&ec->ec_lock);
759
760         LASSERT(list_empty(&ec->ec_locks));
761
762         CDEBUG(D_INFO, "No object exists, exiting...\n");
763
764         echo_client_cleanup(d->ld_obd);
765
766         while (next)
767                 next = next->ld_type->ldt_ops->ldto_device_free(env, next);
768
769         LASSERT(ed->ed_site == d->ld_site);
770         echo_site_fini(env, ed);
771         cl_device_fini(&ed->ed_cl);
772         kfree(ed);
773
774         cl_env_cache_purge(~0);
775
776         return NULL;
777 }
778
779 static const struct lu_device_type_operations echo_device_type_ops = {
780         .ldto_init = echo_type_init,
781         .ldto_fini = echo_type_fini,
782
783         .ldto_start = echo_type_start,
784         .ldto_stop  = echo_type_stop,
785
786         .ldto_device_alloc = echo_device_alloc,
787         .ldto_device_free  = echo_device_free,
788         .ldto_device_init  = echo_device_init,
789         .ldto_device_fini  = echo_device_fini
790 };
791
792 static struct lu_device_type echo_device_type = {
793         .ldt_tags     = LU_DEVICE_CL,
794         .ldt_name     = LUSTRE_ECHO_CLIENT_NAME,
795         .ldt_ops      = &echo_device_type_ops,
796         .ldt_ctx_tags = LCT_CL_THREAD,
797 };
798
799 /** @} echo_init */
800
801 /** \defgroup echo_exports Exported operations
802  *
803  * exporting functions to echo client
804  *
805  * @{
806  */
807
808 /* Interfaces to echo client obd device */
809 static struct echo_object *
810 cl_echo_object_find(struct echo_device *d, const struct ost_id *oi)
811 {
812         struct lu_env *env;
813         struct echo_thread_info *info;
814         struct echo_object_conf *conf;
815         struct lov_oinfo *oinfo = NULL;
816         struct echo_object *eco;
817         struct cl_object   *obj;
818         struct lu_fid *fid;
819         u16 refcheck;
820         int rc;
821
822         LASSERTF(ostid_id(oi), DOSTID "\n", POSTID(oi));
823         LASSERTF(ostid_seq(oi) == FID_SEQ_ECHO, DOSTID "\n", POSTID(oi));
824
825         /* Never return an object if the obd is to be freed. */
826         if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping)
827                 return ERR_PTR(-ENODEV);
828
829         env = cl_env_get(&refcheck);
830         if (IS_ERR(env))
831                 return (void *)env;
832
833         info = echo_env_info(env);
834         conf = &info->eti_conf;
835         if (d->ed_next) {
836                 oinfo = kzalloc(sizeof(*oinfo), GFP_NOFS);
837                 if (!oinfo) {
838                         eco = ERR_PTR(-ENOMEM);
839                         goto out;
840                 }
841
842                 oinfo->loi_oi = *oi;
843                 conf->eoc_cl.u.coc_oinfo = oinfo;
844         }
845
846         /*
847          * If echo_object_init() is successful then ownership of oinfo
848          * is transferred to the object.
849          */
850         conf->eoc_oinfo = &oinfo;
851
852         fid  = &info->eti_fid;
853         rc = ostid_to_fid(fid, (struct ost_id *)oi, 0);
854         if (rc != 0) {
855                 eco = ERR_PTR(rc);
856                 goto out;
857         }
858
859         /* In the function below, .hs_keycmp resolves to
860          * lu_obj_hop_keycmp()
861          */
862         /* coverity[overrun-buffer-val] */
863         obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl);
864         if (IS_ERR(obj)) {
865                 eco = (void *)obj;
866                 goto out;
867         }
868
869         eco = cl2echo_obj(obj);
870         if (eco->eo_deleted) {
871                 cl_object_put(env, obj);
872                 eco = ERR_PTR(-EAGAIN);
873         }
874
875 out:
876         kfree(oinfo);
877         cl_env_put(env, &refcheck);
878         return eco;
879 }
880
881 static int cl_echo_object_put(struct echo_object *eco)
882 {
883         struct lu_env *env;
884         struct cl_object *obj = echo_obj2cl(eco);
885         u16 refcheck;
886
887         env = cl_env_get(&refcheck);
888         if (IS_ERR(env))
889                 return PTR_ERR(env);
890
891         /* an external function to kill an object? */
892         if (eco->eo_deleted) {
893                 struct lu_object_header *loh = obj->co_lu.lo_header;
894
895                 LASSERT(&eco->eo_hdr == luh2coh(loh));
896                 set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags);
897         }
898
899         cl_object_put(env, obj);
900         cl_env_put(env, &refcheck);
901         return 0;
902 }
903
904 static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
905                             u64 start, u64 end, int mode,
906                             __u64 *cookie, __u32 enqflags)
907 {
908         struct cl_io *io;
909         struct cl_lock *lck;
910         struct cl_object *obj;
911         struct cl_lock_descr *descr;
912         struct echo_thread_info *info;
913         int rc = -ENOMEM;
914
915         info = echo_env_info(env);
916         io = &info->eti_io;
917         lck = &info->eti_lock;
918         obj = echo_obj2cl(eco);
919
920         memset(lck, 0, sizeof(*lck));
921         descr = &lck->cll_descr;
922         descr->cld_obj   = obj;
923         descr->cld_start = cl_index(obj, start);
924         descr->cld_end   = cl_index(obj, end);
925         descr->cld_mode  = mode == LCK_PW ? CLM_WRITE : CLM_READ;
926         descr->cld_enq_flags = enqflags;
927         io->ci_obj = obj;
928
929         rc = cl_lock_request(env, io, lck);
930         if (rc == 0) {
931                 struct echo_client_obd *ec = eco->eo_dev->ed_ec;
932                 struct echo_lock *el;
933
934                 el = cl2echo_lock(cl_lock_at(lck, &echo_device_type));
935                 spin_lock(&ec->ec_lock);
936                 if (list_empty(&el->el_chain)) {
937                         list_add(&el->el_chain, &ec->ec_locks);
938                         el->el_cookie = ++ec->ec_unique;
939                 }
940                 atomic_inc(&el->el_refcount);
941                 *cookie = el->el_cookie;
942                 spin_unlock(&ec->ec_lock);
943         }
944         return rc;
945 }
946
947 static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed,
948                            __u64 cookie)
949 {
950         struct echo_client_obd *ec = ed->ed_ec;
951         struct echo_lock       *ecl = NULL;
952         struct list_head             *el;
953         int found = 0, still_used = 0;
954
955         spin_lock(&ec->ec_lock);
956         list_for_each(el, &ec->ec_locks) {
957                 ecl = list_entry(el, struct echo_lock, el_chain);
958                 CDEBUG(D_INFO, "ecl: %p, cookie: %#llx\n", ecl, ecl->el_cookie);
959                 found = (ecl->el_cookie == cookie);
960                 if (found) {
961                         if (atomic_dec_and_test(&ecl->el_refcount))
962                                 list_del_init(&ecl->el_chain);
963                         else
964                                 still_used = 1;
965                         break;
966                 }
967         }
968         spin_unlock(&ec->ec_lock);
969
970         if (!found)
971                 return -ENOENT;
972
973         echo_lock_release(env, ecl, still_used);
974         return 0;
975 }
976
977 static void echo_commit_callback(const struct lu_env *env, struct cl_io *io,
978                                  struct cl_page *page)
979 {
980         struct echo_thread_info *info;
981         struct cl_2queue *queue;
982
983         info = echo_env_info(env);
984         LASSERT(io == &info->eti_io);
985
986         queue = &info->eti_queue;
987         cl_page_list_add(&queue->c2_qout, page);
988 }
989
990 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
991                               struct page **pages, int npages, int async)
992 {
993         struct lu_env      *env;
994         struct echo_thread_info *info;
995         struct cl_object        *obj = echo_obj2cl(eco);
996         struct echo_device      *ed  = eco->eo_dev;
997         struct cl_2queue        *queue;
998         struct cl_io        *io;
999         struct cl_page    *clp;
1000         struct lustre_handle    lh = { 0 };
1001         size_t page_size = cl_page_size(obj);
1002         u16 refcheck;
1003         int rc;
1004         int i;
1005
1006         LASSERT((offset & ~PAGE_MASK) == 0);
1007         LASSERT(ed->ed_next);
1008         env = cl_env_get(&refcheck);
1009         if (IS_ERR(env))
1010                 return PTR_ERR(env);
1011
1012         info    = echo_env_info(env);
1013         io      = &info->eti_io;
1014         queue   = &info->eti_queue;
1015
1016         cl_2queue_init(queue);
1017
1018         io->ci_ignore_layout = 1;
1019         rc = cl_io_init(env, io, CIT_MISC, obj);
1020         if (rc < 0)
1021                 goto out;
1022         LASSERT(rc == 0);
1023
1024         rc = cl_echo_enqueue0(env, eco, offset,
1025                               offset + npages * PAGE_SIZE - 1,
1026                               rw == READ ? LCK_PR : LCK_PW, &lh.cookie,
1027                               CEF_NEVER);
1028         if (rc < 0)
1029                 goto error_lock;
1030
1031         for (i = 0; i < npages; i++) {
1032                 LASSERT(pages[i]);
1033                 clp = cl_page_find(env, obj, cl_index(obj, offset),
1034                                    pages[i], CPT_TRANSIENT);
1035                 if (IS_ERR(clp)) {
1036                         rc = PTR_ERR(clp);
1037                         break;
1038                 }
1039                 LASSERT(clp->cp_type == CPT_TRANSIENT);
1040
1041                 rc = cl_page_own(env, io, clp);
1042                 if (rc) {
1043                         LASSERT(clp->cp_state == CPS_FREEING);
1044                         cl_page_put(env, clp);
1045                         break;
1046                 }
1047                 /*
1048                  * Add a page to the incoming page list of 2-queue.
1049                  */
1050                 cl_page_list_add(&queue->c2_qin, clp);
1051
1052                 /* drop the reference count for cl_page_find, so that the page
1053                  * will be freed in cl_2queue_fini.
1054                  */
1055                 cl_page_put(env, clp);
1056                 cl_page_clip(env, clp, 0, page_size);
1057
1058                 offset += page_size;
1059         }
1060
1061         if (rc == 0) {
1062                 enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
1063
1064                 async = async && (typ == CRT_WRITE);
1065                 if (async)
1066                         rc = cl_io_commit_async(env, io, &queue->c2_qin,
1067                                                 0, PAGE_SIZE,
1068                                                 echo_commit_callback);
1069                 else
1070                         rc = cl_io_submit_sync(env, io, typ, queue, 0);
1071                 CDEBUG(D_INFO, "echo_client %s write returns %d\n",
1072                        async ? "async" : "sync", rc);
1073         }
1074
1075         cl_echo_cancel0(env, ed, lh.cookie);
1076 error_lock:
1077         cl_2queue_discard(env, io, queue);
1078         cl_2queue_disown(env, io, queue);
1079         cl_2queue_fini(env, queue);
1080         cl_io_fini(env, io);
1081 out:
1082         cl_env_put(env, &refcheck);
1083         return rc;
1084 }
1085
1086 /** @} echo_exports */
1087
1088 static u64 last_object_id;
1089
1090 static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
1091                               struct obdo *oa)
1092 {
1093         struct echo_object     *eco;
1094         struct echo_client_obd *ec = ed->ed_ec;
1095         int                  rc;
1096         int                  created = 0;
1097
1098         if (!(oa->o_valid & OBD_MD_FLID) ||
1099             !(oa->o_valid & OBD_MD_FLGROUP) ||
1100             !fid_seq_is_echo(ostid_seq(&oa->o_oi))) {
1101                 CERROR("invalid oid " DOSTID "\n", POSTID(&oa->o_oi));
1102                 return -EINVAL;
1103         }
1104
1105         if (!ostid_id(&oa->o_oi)) {
1106                 rc = ostid_set_id(&oa->o_oi, ++last_object_id);
1107                 if (rc)
1108                         goto failed;
1109         }
1110
1111         rc = obd_create(env, ec->ec_exp, oa);
1112         if (rc != 0) {
1113                 CERROR("Cannot create objects: rc = %d\n", rc);
1114                 goto failed;
1115         }
1116         created = 1;
1117
1118         oa->o_valid |= OBD_MD_FLID;
1119
1120         eco = cl_echo_object_find(ed, &oa->o_oi);
1121         if (IS_ERR(eco)) {
1122                 rc = PTR_ERR(eco);
1123                 goto failed;
1124         }
1125         cl_echo_object_put(eco);
1126
1127         CDEBUG(D_INFO, "oa oid " DOSTID "\n", POSTID(&oa->o_oi));
1128
1129  failed:
1130         if (created && rc)
1131                 obd_destroy(env, ec->ec_exp, oa);
1132         if (rc)
1133                 CERROR("create object failed with: rc = %d\n", rc);
1134         return rc;
1135 }
1136
1137 static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
1138                            struct obdo *oa)
1139 {
1140         struct echo_object     *eco;
1141         int                  rc;
1142
1143         if (!(oa->o_valid & OBD_MD_FLID) || !(oa->o_valid & OBD_MD_FLGROUP) ||
1144             !ostid_id(&oa->o_oi)) {
1145                 CERROR("invalid oid " DOSTID "\n", POSTID(&oa->o_oi));
1146                 return -EINVAL;
1147         }
1148
1149         rc = 0;
1150         eco = cl_echo_object_find(ed, &oa->o_oi);
1151         if (!IS_ERR(eco))
1152                 *ecop = eco;
1153         else
1154                 rc = PTR_ERR(eco);
1155         return rc;
1156 }
1157
1158 static void echo_put_object(struct echo_object *eco)
1159 {
1160         int rc;
1161
1162         rc = cl_echo_object_put(eco);
1163         if (rc)
1164                 CERROR("%s: echo client drop an object failed: rc = %d\n",
1165                        eco->eo_dev->ed_ec->ec_exp->exp_obd->obd_name, rc);
1166 }
1167
1168 static void
1169 echo_client_page_debug_setup(struct page *page, int rw, u64 id,
1170                              u64 offset, u64 count)
1171 {
1172         char    *addr;
1173         u64      stripe_off;
1174         u64      stripe_id;
1175         int      delta;
1176
1177         /* no partial pages on the client */
1178         LASSERT(count == PAGE_SIZE);
1179
1180         addr = kmap(page);
1181
1182         for (delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
1183                 if (rw == OBD_BRW_WRITE) {
1184                         stripe_off = offset + delta;
1185                         stripe_id = id;
1186                 } else {
1187                         stripe_off = 0xdeadbeef00c0ffeeULL;
1188                         stripe_id = 0xdeadbeef00c0ffeeULL;
1189                 }
1190                 block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
1191                                   stripe_off, stripe_id);
1192         }
1193
1194         kunmap(page);
1195 }
1196
1197 static int echo_client_page_debug_check(struct page *page, u64 id,
1198                                         u64 offset, u64 count)
1199 {
1200         u64     stripe_off;
1201         u64     stripe_id;
1202         char   *addr;
1203         int     delta;
1204         int     rc;
1205         int     rc2;
1206
1207         /* no partial pages on the client */
1208         LASSERT(count == PAGE_SIZE);
1209
1210         addr = kmap(page);
1211
1212         for (rc = delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
1213                 stripe_off = offset + delta;
1214                 stripe_id = id;
1215
1216                 rc2 = block_debug_check("test_brw",
1217                                         addr + delta, OBD_ECHO_BLOCK_SIZE,
1218                                         stripe_off, stripe_id);
1219                 if (rc2 != 0) {
1220                         CERROR("Error in echo object %#llx\n", id);
1221                         rc = rc2;
1222                 }
1223         }
1224
1225         kunmap(page);
1226         return rc;
1227 }
1228
1229 static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
1230                             struct echo_object *eco, u64 offset,
1231                             u64 count, int async)
1232 {
1233         u32            npages;
1234         struct brw_page *pga;
1235         struct brw_page *pgp;
1236         struct page         **pages;
1237         u64              off;
1238         int                  i;
1239         int                  rc;
1240         int                  verify;
1241         gfp_t                gfp_mask;
1242         int                  brw_flags = 0;
1243
1244         verify = (ostid_id(&oa->o_oi) != ECHO_PERSISTENT_OBJID &&
1245                   (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
1246                   (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
1247
1248         gfp_mask = ((ostid_id(&oa->o_oi) & 2) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
1249
1250         LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
1251
1252         if (count <= 0 ||
1253             (count & (~PAGE_MASK)) != 0)
1254                 return -EINVAL;
1255
1256         /* XXX think again with misaligned I/O */
1257         npages = count >> PAGE_SHIFT;
1258
1259         if (rw == OBD_BRW_WRITE)
1260                 brw_flags = OBD_BRW_ASYNC;
1261
1262         pga = kcalloc(npages, sizeof(*pga), GFP_NOFS);
1263         if (!pga)
1264                 return -ENOMEM;
1265
1266         pages = kcalloc(npages, sizeof(*pages), GFP_NOFS);
1267         if (!pages) {
1268                 kfree(pga);
1269                 return -ENOMEM;
1270         }
1271
1272         for (i = 0, pgp = pga, off = offset;
1273              i < npages;
1274              i++, pgp++, off += PAGE_SIZE) {
1275                 LASSERT(!pgp->pg);      /* for cleanup */
1276
1277                 rc = -ENOMEM;
1278                 pgp->pg = alloc_page(gfp_mask);
1279                 if (!pgp->pg)
1280                         goto out;
1281
1282                 pages[i] = pgp->pg;
1283                 pgp->count = PAGE_SIZE;
1284                 pgp->off = off;
1285                 pgp->flag = brw_flags;
1286
1287                 if (verify)
1288                         echo_client_page_debug_setup(pgp->pg, rw,
1289                                                      ostid_id(&oa->o_oi), off,
1290                                                      pgp->count);
1291         }
1292
1293         /* brw mode can only be used at client */
1294         LASSERT(ed->ed_next);
1295         rc = cl_echo_object_brw(eco, rw, offset, pages, npages, async);
1296
1297  out:
1298         if (rc != 0 || rw != OBD_BRW_READ)
1299                 verify = 0;
1300
1301         for (i = 0, pgp = pga; i < npages; i++, pgp++) {
1302                 if (!pgp->pg)
1303                         continue;
1304
1305                 if (verify) {
1306                         int vrc;
1307
1308                         vrc = echo_client_page_debug_check(pgp->pg,
1309                                                            ostid_id(&oa->o_oi),
1310                                                            pgp->off, pgp->count);
1311                         if (vrc != 0 && rc == 0)
1312                                 rc = vrc;
1313                 }
1314                 __free_page(pgp->pg);
1315         }
1316         kfree(pga);
1317         kfree(pages);
1318         return rc;
1319 }
1320
1321 static int echo_client_prep_commit(const struct lu_env *env,
1322                                    struct obd_export *exp, int rw,
1323                                    struct obdo *oa, struct echo_object *eco,
1324                                    u64 offset, u64 count,
1325                                    u64 batch, int async)
1326 {
1327         struct obd_ioobj ioo;
1328         struct niobuf_local *lnb;
1329         struct niobuf_remote rnb;
1330         u64 off;
1331         u64 npages, tot_pages;
1332         int i, ret = 0, brw_flags = 0;
1333
1334         if (count <= 0 || (count & (~PAGE_MASK)) != 0)
1335                 return -EINVAL;
1336
1337         npages = batch >> PAGE_SHIFT;
1338         tot_pages = count >> PAGE_SHIFT;
1339
1340         lnb = kcalloc(npages, sizeof(struct niobuf_local), GFP_NOFS);
1341         if (!lnb) {
1342                 ret = -ENOMEM;
1343                 goto out;
1344         }
1345
1346         if (rw == OBD_BRW_WRITE && async)
1347                 brw_flags |= OBD_BRW_ASYNC;
1348
1349         obdo_to_ioobj(oa, &ioo);
1350
1351         off = offset;
1352
1353         for (; tot_pages > 0; tot_pages -= npages) {
1354                 int lpages;
1355
1356                 if (tot_pages < npages)
1357                         npages = tot_pages;
1358
1359                 rnb.rnb_offset = off;
1360                 rnb.rnb_len = npages * PAGE_SIZE;
1361                 rnb.rnb_flags = brw_flags;
1362                 ioo.ioo_bufcnt = 1;
1363                 off += npages * PAGE_SIZE;
1364
1365                 lpages = npages;
1366                 ret = obd_preprw(env, rw, exp, oa, 1, &ioo, &rnb, &lpages, lnb);
1367                 if (ret != 0)
1368                         goto out;
1369
1370                 for (i = 0; i < lpages; i++) {
1371                         struct page *page = lnb[i].lnb_page;
1372
1373                         /* read past eof? */
1374                         if (!page  && lnb[i].lnb_rc == 0)
1375                                 continue;
1376
1377                         if (async)
1378                                 lnb[i].lnb_flags |= OBD_BRW_ASYNC;
1379
1380                         if (ostid_id(&oa->o_oi) == ECHO_PERSISTENT_OBJID ||
1381                             (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
1382                             (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
1383                                 continue;
1384
1385                         if (rw == OBD_BRW_WRITE)
1386                                 echo_client_page_debug_setup(page, rw,
1387                                                              ostid_id(&oa->o_oi),
1388                                                              lnb[i].lnb_file_offset,
1389                                                              lnb[i].lnb_len);
1390                         else
1391                                 echo_client_page_debug_check(page,
1392                                                              ostid_id(&oa->o_oi),
1393                                                              lnb[i].lnb_file_offset,
1394                                                              lnb[i].lnb_len);
1395                 }
1396
1397                 ret = obd_commitrw(env, rw, exp, oa, 1, &ioo, &rnb, npages, lnb,
1398                                    ret);
1399                 if (ret != 0)
1400                         goto out;
1401
1402                 /* Reuse env context. */
1403                 lu_context_exit((struct lu_context *)&env->le_ctx);
1404                 lu_context_enter((struct lu_context *)&env->le_ctx);
1405         }
1406
1407 out:
1408         kfree(lnb);
1409         return ret;
1410 }
1411
1412 static int echo_client_brw_ioctl(const struct lu_env *env, int rw,
1413                                  struct obd_export *exp,
1414                                  struct obd_ioctl_data *data)
1415 {
1416         struct obd_device *obd = class_exp2obd(exp);
1417         struct echo_device *ed = obd2echo_dev(obd);
1418         struct echo_client_obd *ec = ed->ed_ec;
1419         struct obdo *oa = &data->ioc_obdo1;
1420         struct echo_object *eco;
1421         int rc;
1422         int async = 1;
1423         long test_mode;
1424
1425         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
1426
1427         rc = echo_get_object(&eco, ed, oa);
1428         if (rc)
1429                 return rc;
1430
1431         oa->o_valid &= ~OBD_MD_FLHANDLE;
1432
1433         /* OFD/obdfilter works only via prep/commit */
1434         test_mode = (long)data->ioc_pbuf1;
1435         if (test_mode == 1)
1436                 async = 0;
1437
1438         if (!ed->ed_next && test_mode != 3) {
1439                 test_mode = 3;
1440                 data->ioc_plen1 = data->ioc_count;
1441         }
1442
1443         /* Truncate batch size to maximum */
1444         if (data->ioc_plen1 > PTLRPC_MAX_BRW_SIZE)
1445                 data->ioc_plen1 = PTLRPC_MAX_BRW_SIZE;
1446
1447         switch (test_mode) {
1448         case 1:
1449                 /* fall through */
1450         case 2:
1451                 rc = echo_client_kbrw(ed, rw, oa, eco, data->ioc_offset,
1452                                       data->ioc_count, async);
1453                 break;
1454         case 3:
1455                 rc = echo_client_prep_commit(env, ec->ec_exp, rw, oa, eco,
1456                                              data->ioc_offset, data->ioc_count,
1457                                              data->ioc_plen1, async);
1458                 break;
1459         default:
1460                 rc = -EINVAL;
1461         }
1462         echo_put_object(eco);
1463         return rc;
1464 }
1465
1466 static int
1467 echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1468                       void *karg, void __user *uarg)
1469 {
1470         struct obd_device      *obd = exp->exp_obd;
1471         struct echo_device     *ed = obd2echo_dev(obd);
1472         struct echo_client_obd *ec = ed->ed_ec;
1473         struct echo_object     *eco;
1474         struct obd_ioctl_data  *data = karg;
1475         struct lu_env     *env;
1476         struct obdo         *oa;
1477         struct lu_fid      fid;
1478         int                  rw = OBD_BRW_READ;
1479         int                  rc = 0;
1480
1481         oa = &data->ioc_obdo1;
1482         if (!(oa->o_valid & OBD_MD_FLGROUP)) {
1483                 oa->o_valid |= OBD_MD_FLGROUP;
1484                 ostid_set_seq_echo(&oa->o_oi);
1485         }
1486
1487         /* This FID is unpacked just for validation at this point */
1488         rc = ostid_to_fid(&fid, &oa->o_oi, 0);
1489         if (rc < 0)
1490                 return rc;
1491
1492         env = kzalloc(sizeof(*env), GFP_NOFS);
1493         if (!env)
1494                 return -ENOMEM;
1495
1496         rc = lu_env_init(env, LCT_DT_THREAD);
1497         if (rc) {
1498                 rc = -ENOMEM;
1499                 goto out;
1500         }
1501
1502         switch (cmd) {
1503         case OBD_IOC_CREATE:                /* may create echo object */
1504                 if (!capable(CFS_CAP_SYS_ADMIN)) {
1505                         rc = -EPERM;
1506                         goto out;
1507                 }
1508
1509                 rc = echo_create_object(env, ed, oa);
1510                 goto out;
1511
1512         case OBD_IOC_DESTROY:
1513                 if (!capable(CFS_CAP_SYS_ADMIN)) {
1514                         rc = -EPERM;
1515                         goto out;
1516                 }
1517
1518                 rc = echo_get_object(&eco, ed, oa);
1519                 if (rc == 0) {
1520                         rc = obd_destroy(env, ec->ec_exp, oa);
1521                         if (rc == 0)
1522                                 eco->eo_deleted = 1;
1523                         echo_put_object(eco);
1524                 }
1525                 goto out;
1526
1527         case OBD_IOC_GETATTR:
1528                 rc = echo_get_object(&eco, ed, oa);
1529                 if (rc == 0) {
1530                         rc = obd_getattr(env, ec->ec_exp, oa);
1531                         echo_put_object(eco);
1532                 }
1533                 goto out;
1534
1535         case OBD_IOC_SETATTR:
1536                 if (!capable(CFS_CAP_SYS_ADMIN)) {
1537                         rc = -EPERM;
1538                         goto out;
1539                 }
1540
1541                 rc = echo_get_object(&eco, ed, oa);
1542                 if (rc == 0) {
1543                         rc = obd_setattr(env, ec->ec_exp, oa);
1544                         echo_put_object(eco);
1545                 }
1546                 goto out;
1547
1548         case OBD_IOC_BRW_WRITE:
1549                 if (!capable(CFS_CAP_SYS_ADMIN)) {
1550                         rc = -EPERM;
1551                         goto out;
1552                 }
1553
1554                 rw = OBD_BRW_WRITE;
1555                 /* fall through */
1556         case OBD_IOC_BRW_READ:
1557                 rc = echo_client_brw_ioctl(env, rw, exp, data);
1558                 goto out;
1559
1560         default:
1561                 CERROR("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
1562                 rc = -ENOTTY;
1563                 goto out;
1564         }
1565
1566 out:
1567         lu_env_fini(env);
1568         kfree(env);
1569
1570         return rc;
1571 }
1572
1573 static int echo_client_setup(const struct lu_env *env,
1574                              struct obd_device *obddev, struct lustre_cfg *lcfg)
1575 {
1576         struct echo_client_obd *ec = &obddev->u.echo_client;
1577         struct obd_device *tgt;
1578         struct obd_uuid echo_uuid = { "ECHO_UUID" };
1579         struct obd_connect_data *ocd = NULL;
1580         int rc;
1581
1582         if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
1583                 CERROR("requires a TARGET OBD name\n");
1584                 return -EINVAL;
1585         }
1586
1587         tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
1588         if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
1589                 CERROR("device not attached or not set up (%s)\n",
1590                        lustre_cfg_string(lcfg, 1));
1591                 return -EINVAL;
1592         }
1593
1594         spin_lock_init(&ec->ec_lock);
1595         INIT_LIST_HEAD(&ec->ec_objects);
1596         INIT_LIST_HEAD(&ec->ec_locks);
1597         ec->ec_unique = 0;
1598
1599         ocd = kzalloc(sizeof(*ocd), GFP_NOFS);
1600         if (!ocd)
1601                 return -ENOMEM;
1602
1603         ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
1604                                  OBD_CONNECT_BRW_SIZE |
1605                                  OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
1606                                  OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE |
1607                                  OBD_CONNECT_FID;
1608         ocd->ocd_brw_size = DT_MAX_BRW_SIZE;
1609         ocd->ocd_version = LUSTRE_VERSION_CODE;
1610         ocd->ocd_group = FID_SEQ_ECHO;
1611
1612         rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
1613
1614         kfree(ocd);
1615
1616         if (rc != 0) {
1617                 CERROR("fail to connect to device %s\n",
1618                        lustre_cfg_string(lcfg, 1));
1619                 return rc;
1620         }
1621
1622         return rc;
1623 }
1624
1625 static int echo_client_cleanup(struct obd_device *obddev)
1626 {
1627         struct echo_client_obd *ec = &obddev->u.echo_client;
1628         int rc;
1629
1630         if (!list_empty(&obddev->obd_exports)) {
1631                 CERROR("still has clients!\n");
1632                 return -EBUSY;
1633         }
1634
1635         LASSERT(atomic_read(&ec->ec_exp->exp_refcount) > 0);
1636         rc = obd_disconnect(ec->ec_exp);
1637         if (rc != 0)
1638                 CERROR("fail to disconnect device: %d\n", rc);
1639
1640         return rc;
1641 }
1642
1643 static int echo_client_connect(const struct lu_env *env,
1644                                struct obd_export **exp,
1645                                struct obd_device *src, struct obd_uuid *cluuid,
1646                                struct obd_connect_data *data, void *localdata)
1647 {
1648         int             rc;
1649         struct lustre_handle conn = { 0 };
1650
1651         rc = class_connect(&conn, src, cluuid);
1652         if (rc == 0)
1653                 *exp = class_conn2export(&conn);
1654
1655         return rc;
1656 }
1657
1658 static int echo_client_disconnect(struct obd_export *exp)
1659 {
1660         int                  rc;
1661
1662         if (!exp) {
1663                 rc = -EINVAL;
1664                 goto out;
1665         }
1666
1667         rc = class_disconnect(exp);
1668         goto out;
1669  out:
1670         return rc;
1671 }
1672
1673 static struct obd_ops echo_client_obd_ops = {
1674         .owner          = THIS_MODULE,
1675         .iocontrol      = echo_client_iocontrol,
1676         .connect        = echo_client_connect,
1677         .disconnect     = echo_client_disconnect
1678 };
1679
1680 static int echo_client_init(void)
1681 {
1682         int rc;
1683
1684         rc = lu_kmem_init(echo_caches);
1685         if (rc == 0) {
1686                 rc = class_register_type(&echo_client_obd_ops, NULL,
1687                                          LUSTRE_ECHO_CLIENT_NAME,
1688                                          &echo_device_type);
1689                 if (rc)
1690                         lu_kmem_fini(echo_caches);
1691         }
1692         return rc;
1693 }
1694
1695 static void echo_client_exit(void)
1696 {
1697         class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
1698         lu_kmem_fini(echo_caches);
1699 }
1700
1701 static int __init obdecho_init(void)
1702 {
1703         LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
1704
1705         LASSERT(PAGE_SIZE % OBD_ECHO_BLOCK_SIZE == 0);
1706
1707         return echo_client_init();
1708 }
1709
1710 static void /*__exit*/ obdecho_exit(void)
1711 {
1712         echo_client_exit();
1713 }
1714
1715 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
1716 MODULE_DESCRIPTION("Lustre Echo Client test driver");
1717 MODULE_VERSION(LUSTRE_VERSION_STRING);
1718 MODULE_LICENSE("GPL");
1719
1720 module_init(obdecho_init);
1721 module_exit(obdecho_exit);
1722
1723 /** @} echo_client */