4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2014, Intel Corporation.
26 * This file is part of Lustre, http://www.lustre.org/
27 * Lustre is a trademark of Seagate, Inc.
29 * lnet/lnet/net_fault.c
31 * Lustre network fault simulation
33 * Author: liang.zhen@intel.com
36 #define DEBUG_SUBSYSTEM S_LNET
38 #include "../../include/linux/lnet/lib-lnet.h"
39 #include "../../include/linux/lnet/lnetctl.h"
41 #define LNET_MSG_MASK (LNET_PUT_BIT | LNET_ACK_BIT | \
42 LNET_GET_BIT | LNET_REPLY_BIT)
44 struct lnet_drop_rule {
45 /** link chain on the_lnet.ln_drop_rules */
46 struct list_head dr_link;
47 /** attributes of this rule */
48 struct lnet_fault_attr dr_attr;
49 /** lock to protect \a dr_drop_at and \a dr_stat */
52 * the message sequence to drop, which means message is dropped when
53 * dr_stat.drs_count == dr_drop_at
55 unsigned long dr_drop_at;
57 * seconds to drop the next message, it's exclusive with dr_drop_at
59 unsigned long dr_drop_time;
60 /** baseline to caculate dr_drop_time */
61 unsigned long dr_time_base;
62 /** statistic of dropped messages */
63 struct lnet_fault_stat dr_stat;
67 lnet_fault_nid_match(lnet_nid_t nid, lnet_nid_t msg_nid)
69 if (nid == msg_nid || nid == LNET_NID_ANY)
72 if (LNET_NIDNET(nid) != LNET_NIDNET(msg_nid))
75 /* 255.255.255.255@net is wildcard for all addresses in a network */
76 return LNET_NIDADDR(nid) == LNET_NIDADDR(LNET_NID_ANY);
80 lnet_fault_attr_match(struct lnet_fault_attr *attr, lnet_nid_t src,
81 lnet_nid_t dst, unsigned int type, unsigned int portal)
83 if (!lnet_fault_nid_match(attr->fa_src, src) ||
84 !lnet_fault_nid_match(attr->fa_dst, dst))
87 if (!(attr->fa_msg_mask & (1 << type)))
91 * NB: ACK and REPLY have no portal, but they should have been
92 * rejected by message mask
94 if (attr->fa_ptl_mask && /* has portal filter */
95 !(attr->fa_ptl_mask & (1ULL << portal)))
102 lnet_fault_attr_validate(struct lnet_fault_attr *attr)
104 if (!attr->fa_msg_mask)
105 attr->fa_msg_mask = LNET_MSG_MASK; /* all message types */
107 if (!attr->fa_ptl_mask) /* no portal filter */
110 /* NB: only PUT and GET can be filtered if portal filter has been set */
111 attr->fa_msg_mask &= LNET_GET_BIT | LNET_PUT_BIT;
112 if (!attr->fa_msg_mask) {
113 CDEBUG(D_NET, "can't find valid message type bits %x\n",
121 lnet_fault_stat_inc(struct lnet_fault_stat *stat, unsigned int type)
123 /* NB: fs_counter is NOT updated by this function */
141 * LNet message drop simulation
145 * Add a new drop rule to LNet
146 * There is no check for duplicated drop rule, all rules will be checked for
150 lnet_drop_rule_add(struct lnet_fault_attr *attr)
152 struct lnet_drop_rule *rule;
154 if (attr->u.drop.da_rate & attr->u.drop.da_interval) {
155 CDEBUG(D_NET, "please provide either drop rate or drop interval, but not both at the same time %d/%d\n",
156 attr->u.drop.da_rate, attr->u.drop.da_interval);
160 if (lnet_fault_attr_validate(attr))
167 spin_lock_init(&rule->dr_lock);
169 rule->dr_attr = *attr;
170 if (attr->u.drop.da_interval) {
171 rule->dr_time_base = cfs_time_shift(attr->u.drop.da_interval);
172 rule->dr_drop_time = cfs_time_shift(cfs_rand() %
173 attr->u.drop.da_interval);
175 rule->dr_drop_at = cfs_rand() % attr->u.drop.da_rate;
178 lnet_net_lock(LNET_LOCK_EX);
179 list_add(&rule->dr_link, &the_lnet.ln_drop_rules);
180 lnet_net_unlock(LNET_LOCK_EX);
182 CDEBUG(D_NET, "Added drop rule: src %s, dst %s, rate %d, interval %d\n",
183 libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
184 attr->u.drop.da_rate, attr->u.drop.da_interval);
189 * Remove matched drop rules from lnet, all rules that can match \a src and
190 * \a dst will be removed.
191 * If \a src is zero, then all rules have \a dst as destination will be remove
192 * If \a dst is zero, then all rules have \a src as source will be removed
193 * If both of them are zero, all rules will be removed
196 lnet_drop_rule_del(lnet_nid_t src, lnet_nid_t dst)
198 struct lnet_drop_rule *rule;
199 struct lnet_drop_rule *tmp;
200 struct list_head zombies;
203 INIT_LIST_HEAD(&zombies);
205 lnet_net_lock(LNET_LOCK_EX);
206 list_for_each_entry_safe(rule, tmp, &the_lnet.ln_drop_rules, dr_link) {
207 if (rule->dr_attr.fa_src != src && src)
210 if (rule->dr_attr.fa_dst != dst && dst)
213 list_move(&rule->dr_link, &zombies);
215 lnet_net_unlock(LNET_LOCK_EX);
217 list_for_each_entry_safe(rule, tmp, &zombies, dr_link) {
218 CDEBUG(D_NET, "Remove drop rule: src %s->dst: %s (1/%d, %d)\n",
219 libcfs_nid2str(rule->dr_attr.fa_src),
220 libcfs_nid2str(rule->dr_attr.fa_dst),
221 rule->dr_attr.u.drop.da_rate,
222 rule->dr_attr.u.drop.da_interval);
224 list_del(&rule->dr_link);
233 * List drop rule at position of \a pos
236 lnet_drop_rule_list(int pos, struct lnet_fault_attr *attr,
237 struct lnet_fault_stat *stat)
239 struct lnet_drop_rule *rule;
244 cpt = lnet_net_lock_current();
245 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
249 spin_lock(&rule->dr_lock);
250 *attr = rule->dr_attr;
251 *stat = rule->dr_stat;
252 spin_unlock(&rule->dr_lock);
257 lnet_net_unlock(cpt);
262 * reset counters for all drop rules
265 lnet_drop_rule_reset(void)
267 struct lnet_drop_rule *rule;
270 cpt = lnet_net_lock_current();
272 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
273 struct lnet_fault_attr *attr = &rule->dr_attr;
275 spin_lock(&rule->dr_lock);
277 memset(&rule->dr_stat, 0, sizeof(rule->dr_stat));
278 if (attr->u.drop.da_rate) {
279 rule->dr_drop_at = cfs_rand() % attr->u.drop.da_rate;
281 rule->dr_drop_time = cfs_time_shift(cfs_rand() %
282 attr->u.drop.da_interval);
283 rule->dr_time_base = cfs_time_shift(attr->u.drop.da_interval);
285 spin_unlock(&rule->dr_lock);
288 lnet_net_unlock(cpt);
292 * check source/destination NID, portal, message type and drop rate,
293 * decide whether should drop this message or not
296 drop_rule_match(struct lnet_drop_rule *rule, lnet_nid_t src,
297 lnet_nid_t dst, unsigned int type, unsigned int portal)
299 struct lnet_fault_attr *attr = &rule->dr_attr;
302 if (!lnet_fault_attr_match(attr, src, dst, type, portal))
305 /* match this rule, check drop rate now */
306 spin_lock(&rule->dr_lock);
307 if (rule->dr_drop_time) { /* time based drop */
308 unsigned long now = cfs_time_current();
310 rule->dr_stat.fs_count++;
311 drop = cfs_time_aftereq(now, rule->dr_drop_time);
313 if (cfs_time_after(now, rule->dr_time_base))
314 rule->dr_time_base = now;
316 rule->dr_drop_time = rule->dr_time_base +
317 cfs_time_seconds(cfs_rand() %
318 attr->u.drop.da_interval);
319 rule->dr_time_base += cfs_time_seconds(attr->u.drop.da_interval);
321 CDEBUG(D_NET, "Drop Rule %s->%s: next drop : %lu\n",
322 libcfs_nid2str(attr->fa_src),
323 libcfs_nid2str(attr->fa_dst),
327 } else { /* rate based drop */
328 drop = rule->dr_stat.fs_count++ == rule->dr_drop_at;
330 if (!do_div(rule->dr_stat.fs_count, attr->u.drop.da_rate)) {
331 rule->dr_drop_at = rule->dr_stat.fs_count +
332 cfs_rand() % attr->u.drop.da_rate;
333 CDEBUG(D_NET, "Drop Rule %s->%s: next drop: %lu\n",
334 libcfs_nid2str(attr->fa_src),
335 libcfs_nid2str(attr->fa_dst), rule->dr_drop_at);
339 if (drop) { /* drop this message, update counters */
340 lnet_fault_stat_inc(&rule->dr_stat, type);
341 rule->dr_stat.u.drop.ds_dropped++;
344 spin_unlock(&rule->dr_lock);
349 * Check if message from \a src to \a dst can match any existed drop rule
352 lnet_drop_rule_match(lnet_hdr_t *hdr)
354 struct lnet_drop_rule *rule;
355 lnet_nid_t src = le64_to_cpu(hdr->src_nid);
356 lnet_nid_t dst = le64_to_cpu(hdr->dest_nid);
357 unsigned int typ = le32_to_cpu(hdr->type);
358 unsigned int ptl = -1;
363 * NB: if Portal is specified, then only PUT and GET will be
364 * filtered by drop rule
366 if (typ == LNET_MSG_PUT)
367 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
368 else if (typ == LNET_MSG_GET)
369 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
371 cpt = lnet_net_lock_current();
372 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
373 drop = drop_rule_match(rule, src, dst, typ, ptl);
378 lnet_net_unlock(cpt);
383 * LNet Delay Simulation
385 /** timestamp (second) to send delayed message */
386 #define msg_delay_send msg_ev.hdr_data
388 struct lnet_delay_rule {
389 /** link chain on the_lnet.ln_delay_rules */
390 struct list_head dl_link;
391 /** link chain on delay_dd.dd_sched_rules */
392 struct list_head dl_sched_link;
393 /** attributes of this rule */
394 struct lnet_fault_attr dl_attr;
395 /** lock to protect \a below members */
397 /** refcount of delay rule */
398 atomic_t dl_refcount;
400 * the message sequence to delay, which means message is delayed when
401 * dl_stat.fs_count == dl_delay_at
403 unsigned long dl_delay_at;
405 * seconds to delay the next message, it's exclusive with dl_delay_at
407 unsigned long dl_delay_time;
408 /** baseline to caculate dl_delay_time */
409 unsigned long dl_time_base;
410 /** jiffies to send the next delayed message */
411 unsigned long dl_msg_send;
412 /** delayed message list */
413 struct list_head dl_msg_list;
414 /** statistic of delayed messages */
415 struct lnet_fault_stat dl_stat;
416 /** timer to wakeup delay_daemon */
417 struct timer_list dl_timer;
420 struct delay_daemon_data {
421 /** serialise rule add/remove */
422 struct mutex dd_mutex;
423 /** protect rules on \a dd_sched_rules */
425 /** scheduled delay rules (by timer) */
426 struct list_head dd_sched_rules;
427 /** daemon thread sleeps at here */
428 wait_queue_head_t dd_waitq;
429 /** controller (lctl command) wait at here */
430 wait_queue_head_t dd_ctl_waitq;
431 /** daemon is running */
432 unsigned int dd_running;
433 /** daemon stopped */
434 unsigned int dd_stopped;
437 static struct delay_daemon_data delay_dd;
440 round_timeout(unsigned long timeout)
442 return cfs_time_seconds((unsigned int)
443 cfs_duration_sec(cfs_time_sub(timeout, 0)) + 1);
447 delay_rule_decref(struct lnet_delay_rule *rule)
449 if (atomic_dec_and_test(&rule->dl_refcount)) {
450 LASSERT(list_empty(&rule->dl_sched_link));
451 LASSERT(list_empty(&rule->dl_msg_list));
452 LASSERT(list_empty(&rule->dl_link));
459 * check source/destination NID, portal, message type and delay rate,
460 * decide whether should delay this message or not
463 delay_rule_match(struct lnet_delay_rule *rule, lnet_nid_t src,
464 lnet_nid_t dst, unsigned int type, unsigned int portal,
465 struct lnet_msg *msg)
467 struct lnet_fault_attr *attr = &rule->dl_attr;
470 if (!lnet_fault_attr_match(attr, src, dst, type, portal))
473 /* match this rule, check delay rate now */
474 spin_lock(&rule->dl_lock);
475 if (rule->dl_delay_time) { /* time based delay */
476 unsigned long now = cfs_time_current();
478 rule->dl_stat.fs_count++;
479 delay = cfs_time_aftereq(now, rule->dl_delay_time);
481 if (cfs_time_after(now, rule->dl_time_base))
482 rule->dl_time_base = now;
484 rule->dl_delay_time = rule->dl_time_base +
485 cfs_time_seconds(cfs_rand() %
486 attr->u.delay.la_interval);
487 rule->dl_time_base += cfs_time_seconds(attr->u.delay.la_interval);
489 CDEBUG(D_NET, "Delay Rule %s->%s: next delay : %lu\n",
490 libcfs_nid2str(attr->fa_src),
491 libcfs_nid2str(attr->fa_dst),
492 rule->dl_delay_time);
495 } else { /* rate based delay */
496 delay = rule->dl_stat.fs_count++ == rule->dl_delay_at;
497 /* generate the next random rate sequence */
498 if (!do_div(rule->dl_stat.fs_count, attr->u.delay.la_rate)) {
499 rule->dl_delay_at = rule->dl_stat.fs_count +
500 cfs_rand() % attr->u.delay.la_rate;
501 CDEBUG(D_NET, "Delay Rule %s->%s: next delay: %lu\n",
502 libcfs_nid2str(attr->fa_src),
503 libcfs_nid2str(attr->fa_dst), rule->dl_delay_at);
508 spin_unlock(&rule->dl_lock);
512 /* delay this message, update counters */
513 lnet_fault_stat_inc(&rule->dl_stat, type);
514 rule->dl_stat.u.delay.ls_delayed++;
516 list_add_tail(&msg->msg_list, &rule->dl_msg_list);
517 msg->msg_delay_send = round_timeout(
518 cfs_time_shift(attr->u.delay.la_latency));
519 if (rule->dl_msg_send == -1) {
520 rule->dl_msg_send = msg->msg_delay_send;
521 mod_timer(&rule->dl_timer, rule->dl_msg_send);
524 spin_unlock(&rule->dl_lock);
529 * check if \a msg can match any Delay Rule, receiving of this message
530 * will be delayed if there is a match.
533 lnet_delay_rule_match_locked(lnet_hdr_t *hdr, struct lnet_msg *msg)
535 struct lnet_delay_rule *rule;
536 lnet_nid_t src = le64_to_cpu(hdr->src_nid);
537 lnet_nid_t dst = le64_to_cpu(hdr->dest_nid);
538 unsigned int typ = le32_to_cpu(hdr->type);
539 unsigned int ptl = -1;
541 /* NB: called with hold of lnet_net_lock */
544 * NB: if Portal is specified, then only PUT and GET will be
545 * filtered by delay rule
547 if (typ == LNET_MSG_PUT)
548 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
549 else if (typ == LNET_MSG_GET)
550 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
552 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
553 if (delay_rule_match(rule, src, dst, typ, ptl, msg))
560 /** check out delayed messages for send */
562 delayed_msg_check(struct lnet_delay_rule *rule, bool all,
563 struct list_head *msg_list)
565 struct lnet_msg *msg;
566 struct lnet_msg *tmp;
567 unsigned long now = cfs_time_current();
569 if (!all && rule->dl_msg_send > now)
572 spin_lock(&rule->dl_lock);
573 list_for_each_entry_safe(msg, tmp, &rule->dl_msg_list, msg_list) {
574 if (!all && msg->msg_delay_send > now)
577 msg->msg_delay_send = 0;
578 list_move_tail(&msg->msg_list, msg_list);
581 if (list_empty(&rule->dl_msg_list)) {
582 del_timer(&rule->dl_timer);
583 rule->dl_msg_send = -1;
585 } else if (!list_empty(msg_list)) {
587 * dequeued some timedout messages, update timer for the
588 * next delayed message on rule
590 msg = list_entry(rule->dl_msg_list.next,
591 struct lnet_msg, msg_list);
592 rule->dl_msg_send = msg->msg_delay_send;
593 mod_timer(&rule->dl_timer, rule->dl_msg_send);
595 spin_unlock(&rule->dl_lock);
599 delayed_msg_process(struct list_head *msg_list, bool drop)
601 struct lnet_msg *msg;
603 while (!list_empty(msg_list)) {
608 msg = list_entry(msg_list->next, struct lnet_msg, msg_list);
609 LASSERT(msg->msg_rxpeer);
611 ni = msg->msg_rxpeer->lp_ni;
612 cpt = msg->msg_rx_cpt;
614 list_del_init(&msg->msg_list);
618 } else if (!msg->msg_routing) {
619 rc = lnet_parse_local(ni, msg);
625 rc = lnet_parse_forward_locked(ni, msg);
626 lnet_net_unlock(cpt);
630 lnet_ni_recv(ni, msg->msg_private, msg, 0,
631 0, msg->msg_len, msg->msg_len);
632 case LNET_CREDIT_WAIT:
634 default: /* failures */
639 lnet_drop_message(ni, cpt, msg->msg_private, msg->msg_len);
640 lnet_finalize(ni, msg, rc);
645 * Process delayed messages for scheduled rules
646 * This function can either be called by delay_rule_daemon, or by lnet_finalise
649 lnet_delay_rule_check(void)
651 struct lnet_delay_rule *rule;
652 struct list_head msgs;
654 INIT_LIST_HEAD(&msgs);
656 if (list_empty(&delay_dd.dd_sched_rules))
659 spin_lock_bh(&delay_dd.dd_lock);
660 if (list_empty(&delay_dd.dd_sched_rules)) {
661 spin_unlock_bh(&delay_dd.dd_lock);
665 rule = list_entry(delay_dd.dd_sched_rules.next,
666 struct lnet_delay_rule, dl_sched_link);
667 list_del_init(&rule->dl_sched_link);
668 spin_unlock_bh(&delay_dd.dd_lock);
670 delayed_msg_check(rule, false, &msgs);
671 delay_rule_decref(rule); /* -1 for delay_dd.dd_sched_rules */
674 if (!list_empty(&msgs))
675 delayed_msg_process(&msgs, false);
678 /** daemon thread to handle delayed messages */
680 lnet_delay_rule_daemon(void *arg)
682 delay_dd.dd_running = 1;
683 wake_up(&delay_dd.dd_ctl_waitq);
685 while (delay_dd.dd_running) {
686 wait_event_interruptible(delay_dd.dd_waitq,
687 !delay_dd.dd_running ||
688 !list_empty(&delay_dd.dd_sched_rules));
689 lnet_delay_rule_check();
692 /* in case more rules have been enqueued after my last check */
693 lnet_delay_rule_check();
694 delay_dd.dd_stopped = 1;
695 wake_up(&delay_dd.dd_ctl_waitq);
701 delay_timer_cb(unsigned long arg)
703 struct lnet_delay_rule *rule = (struct lnet_delay_rule *)arg;
705 spin_lock_bh(&delay_dd.dd_lock);
706 if (list_empty(&rule->dl_sched_link) && delay_dd.dd_running) {
707 atomic_inc(&rule->dl_refcount);
708 list_add_tail(&rule->dl_sched_link, &delay_dd.dd_sched_rules);
709 wake_up(&delay_dd.dd_waitq);
711 spin_unlock_bh(&delay_dd.dd_lock);
715 * Add a new delay rule to LNet
716 * There is no check for duplicated delay rule, all rules will be checked for
720 lnet_delay_rule_add(struct lnet_fault_attr *attr)
722 struct lnet_delay_rule *rule;
725 if (attr->u.delay.la_rate & attr->u.delay.la_interval) {
726 CDEBUG(D_NET, "please provide either delay rate or delay interval, but not both at the same time %d/%d\n",
727 attr->u.delay.la_rate, attr->u.delay.la_interval);
731 if (!attr->u.delay.la_latency) {
732 CDEBUG(D_NET, "delay latency cannot be zero\n");
736 if (lnet_fault_attr_validate(attr))
743 mutex_lock(&delay_dd.dd_mutex);
744 if (!delay_dd.dd_running) {
745 struct task_struct *task;
748 * NB: although LND threads will process delayed message
749 * in lnet_finalize, but there is no guarantee that LND
750 * threads will be waken up if no other message needs to
752 * Only one daemon thread, performance is not the concern
753 * of this simualation module.
755 task = kthread_run(lnet_delay_rule_daemon, NULL, "lnet_dd");
760 wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_running);
763 setup_timer(&rule->dl_timer, delay_timer_cb, (unsigned long)rule);
765 spin_lock_init(&rule->dl_lock);
766 INIT_LIST_HEAD(&rule->dl_msg_list);
767 INIT_LIST_HEAD(&rule->dl_sched_link);
769 rule->dl_attr = *attr;
770 if (attr->u.delay.la_interval) {
771 rule->dl_time_base = cfs_time_shift(attr->u.delay.la_interval);
772 rule->dl_delay_time = cfs_time_shift(cfs_rand() %
773 attr->u.delay.la_interval);
775 rule->dl_delay_at = cfs_rand() % attr->u.delay.la_rate;
778 rule->dl_msg_send = -1;
780 lnet_net_lock(LNET_LOCK_EX);
781 atomic_set(&rule->dl_refcount, 1);
782 list_add(&rule->dl_link, &the_lnet.ln_delay_rules);
783 lnet_net_unlock(LNET_LOCK_EX);
785 CDEBUG(D_NET, "Added delay rule: src %s, dst %s, rate %d\n",
786 libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
787 attr->u.delay.la_rate);
789 mutex_unlock(&delay_dd.dd_mutex);
792 mutex_unlock(&delay_dd.dd_mutex);
798 * Remove matched Delay Rules from lnet, if \a shutdown is true or both \a src
799 * and \a dst are zero, all rules will be removed, otherwise only matched rules
801 * If \a src is zero, then all rules have \a dst as destination will be remove
802 * If \a dst is zero, then all rules have \a src as source will be removed
804 * When a delay rule is removed, all delayed messages of this rule will be
805 * processed immediately.
808 lnet_delay_rule_del(lnet_nid_t src, lnet_nid_t dst, bool shutdown)
810 struct lnet_delay_rule *rule;
811 struct lnet_delay_rule *tmp;
812 struct list_head rule_list;
813 struct list_head msg_list;
817 INIT_LIST_HEAD(&rule_list);
818 INIT_LIST_HEAD(&msg_list);
825 mutex_lock(&delay_dd.dd_mutex);
826 lnet_net_lock(LNET_LOCK_EX);
828 list_for_each_entry_safe(rule, tmp, &the_lnet.ln_delay_rules, dl_link) {
829 if (rule->dl_attr.fa_src != src && src)
832 if (rule->dl_attr.fa_dst != dst && dst)
835 CDEBUG(D_NET, "Remove delay rule: src %s->dst: %s (1/%d, %d)\n",
836 libcfs_nid2str(rule->dl_attr.fa_src),
837 libcfs_nid2str(rule->dl_attr.fa_dst),
838 rule->dl_attr.u.delay.la_rate,
839 rule->dl_attr.u.delay.la_interval);
840 /* refcount is taken over by rule_list */
841 list_move(&rule->dl_link, &rule_list);
844 /* check if we need to shutdown delay_daemon */
845 cleanup = list_empty(&the_lnet.ln_delay_rules) &&
846 !list_empty(&rule_list);
847 lnet_net_unlock(LNET_LOCK_EX);
849 list_for_each_entry_safe(rule, tmp, &rule_list, dl_link) {
850 list_del_init(&rule->dl_link);
852 del_timer_sync(&rule->dl_timer);
853 delayed_msg_check(rule, true, &msg_list);
854 delay_rule_decref(rule); /* -1 for the_lnet.ln_delay_rules */
858 if (cleanup) { /* no more delay rule, shutdown delay_daemon */
859 LASSERT(delay_dd.dd_running);
860 delay_dd.dd_running = 0;
861 wake_up(&delay_dd.dd_waitq);
863 while (!delay_dd.dd_stopped)
864 wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_stopped);
866 mutex_unlock(&delay_dd.dd_mutex);
868 if (!list_empty(&msg_list))
869 delayed_msg_process(&msg_list, shutdown);
875 * List Delay Rule at position of \a pos
878 lnet_delay_rule_list(int pos, struct lnet_fault_attr *attr,
879 struct lnet_fault_stat *stat)
881 struct lnet_delay_rule *rule;
886 cpt = lnet_net_lock_current();
887 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
891 spin_lock(&rule->dl_lock);
892 *attr = rule->dl_attr;
893 *stat = rule->dl_stat;
894 spin_unlock(&rule->dl_lock);
899 lnet_net_unlock(cpt);
904 * reset counters for all Delay Rules
907 lnet_delay_rule_reset(void)
909 struct lnet_delay_rule *rule;
912 cpt = lnet_net_lock_current();
914 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
915 struct lnet_fault_attr *attr = &rule->dl_attr;
917 spin_lock(&rule->dl_lock);
919 memset(&rule->dl_stat, 0, sizeof(rule->dl_stat));
920 if (attr->u.delay.la_rate) {
921 rule->dl_delay_at = cfs_rand() % attr->u.delay.la_rate;
923 rule->dl_delay_time = cfs_time_shift(cfs_rand() %
924 attr->u.delay.la_interval);
925 rule->dl_time_base = cfs_time_shift(attr->u.delay.la_interval);
927 spin_unlock(&rule->dl_lock);
930 lnet_net_unlock(cpt);
934 lnet_fault_ctl(int opc, struct libcfs_ioctl_data *data)
936 struct lnet_fault_attr *attr;
937 struct lnet_fault_stat *stat;
939 attr = (struct lnet_fault_attr *)data->ioc_inlbuf1;
945 case LNET_CTL_DROP_ADD:
949 return lnet_drop_rule_add(attr);
951 case LNET_CTL_DROP_DEL:
955 data->ioc_count = lnet_drop_rule_del(attr->fa_src,
959 case LNET_CTL_DROP_RESET:
960 lnet_drop_rule_reset();
963 case LNET_CTL_DROP_LIST:
964 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
968 return lnet_drop_rule_list(data->ioc_count, attr, stat);
970 case LNET_CTL_DELAY_ADD:
974 return lnet_delay_rule_add(attr);
976 case LNET_CTL_DELAY_DEL:
980 data->ioc_count = lnet_delay_rule_del(attr->fa_src,
981 attr->fa_dst, false);
984 case LNET_CTL_DELAY_RESET:
985 lnet_delay_rule_reset();
988 case LNET_CTL_DELAY_LIST:
989 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
993 return lnet_delay_rule_list(data->ioc_count, attr, stat);
998 lnet_fault_init(void)
1000 CLASSERT(LNET_PUT_BIT == 1 << LNET_MSG_PUT);
1001 CLASSERT(LNET_ACK_BIT == 1 << LNET_MSG_ACK);
1002 CLASSERT(LNET_GET_BIT == 1 << LNET_MSG_GET);
1003 CLASSERT(LNET_REPLY_BIT == 1 << LNET_MSG_REPLY);
1005 mutex_init(&delay_dd.dd_mutex);
1006 spin_lock_init(&delay_dd.dd_lock);
1007 init_waitqueue_head(&delay_dd.dd_waitq);
1008 init_waitqueue_head(&delay_dd.dd_ctl_waitq);
1009 INIT_LIST_HEAD(&delay_dd.dd_sched_rules);
1015 lnet_fault_fini(void)
1017 lnet_drop_rule_del(0, 0);
1018 lnet_delay_rule_del(0, 0, true);
1020 LASSERT(list_empty(&the_lnet.ln_drop_rules));
1021 LASSERT(list_empty(&the_lnet.ln_delay_rules));
1022 LASSERT(list_empty(&delay_dd.dd_sched_rules));