GNU Linux-libre 4.14.313-gnu1
[releases.git] / drivers / staging / lustre / lnet / lnet / router.c
1 /*
2  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
3  *
4  * Copyright (c) 2011, 2015, Intel Corporation.
5  *
6  *   This file is part of Portals
7  *   http://sourceforge.net/projects/sandiaportals/
8  *
9  *   Portals is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Portals is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  */
19
20 #define DEBUG_SUBSYSTEM S_LNET
21
22 #include <linux/completion.h>
23 #include <linux/lnet/lib-lnet.h>
24
25 #define LNET_NRB_TINY_MIN       512     /* min value for each CPT */
26 #define LNET_NRB_TINY           (LNET_NRB_TINY_MIN * 4)
27 #define LNET_NRB_SMALL_MIN      4096    /* min value for each CPT */
28 #define LNET_NRB_SMALL          (LNET_NRB_SMALL_MIN * 4)
29 #define LNET_NRB_SMALL_PAGES    1
30 #define LNET_NRB_LARGE_MIN      256     /* min value for each CPT */
31 #define LNET_NRB_LARGE          (LNET_NRB_LARGE_MIN * 4)
32 #define LNET_NRB_LARGE_PAGES   ((LNET_MTU + PAGE_SIZE - 1) >> \
33                                  PAGE_SHIFT)
34
35 static char *forwarding = "";
36 module_param(forwarding, charp, 0444);
37 MODULE_PARM_DESC(forwarding, "Explicitly enable/disable forwarding between networks");
38
39 static int tiny_router_buffers;
40 module_param(tiny_router_buffers, int, 0444);
41 MODULE_PARM_DESC(tiny_router_buffers, "# of 0 payload messages to buffer in the router");
42 static int small_router_buffers;
43 module_param(small_router_buffers, int, 0444);
44 MODULE_PARM_DESC(small_router_buffers, "# of small (1 page) messages to buffer in the router");
45 static int large_router_buffers;
46 module_param(large_router_buffers, int, 0444);
47 MODULE_PARM_DESC(large_router_buffers, "# of large messages to buffer in the router");
48 static int peer_buffer_credits;
49 module_param(peer_buffer_credits, int, 0444);
50 MODULE_PARM_DESC(peer_buffer_credits, "# router buffer credits per peer");
51
52 static int auto_down = 1;
53 module_param(auto_down, int, 0444);
54 MODULE_PARM_DESC(auto_down, "Automatically mark peers down on comms error");
55
56 int
57 lnet_peer_buffer_credits(struct lnet_ni *ni)
58 {
59         /* NI option overrides LNet default */
60         if (ni->ni_peerrtrcredits > 0)
61                 return ni->ni_peerrtrcredits;
62         if (peer_buffer_credits > 0)
63                 return peer_buffer_credits;
64
65         /*
66          * As an approximation, allow this peer the same number of router
67          * buffers as it is allowed outstanding sends
68          */
69         return ni->ni_peertxcredits;
70 }
71
72 /* forward ref's */
73 static int lnet_router_checker(void *);
74
75 static int check_routers_before_use;
76 module_param(check_routers_before_use, int, 0444);
77 MODULE_PARM_DESC(check_routers_before_use, "Assume routers are down and ping them before use");
78
79 int avoid_asym_router_failure = 1;
80 module_param(avoid_asym_router_failure, int, 0644);
81 MODULE_PARM_DESC(avoid_asym_router_failure, "Avoid asymmetrical router failures (0 to disable)");
82
83 static int dead_router_check_interval = 60;
84 module_param(dead_router_check_interval, int, 0644);
85 MODULE_PARM_DESC(dead_router_check_interval, "Seconds between dead router health checks (<= 0 to disable)");
86
87 static int live_router_check_interval = 60;
88 module_param(live_router_check_interval, int, 0644);
89 MODULE_PARM_DESC(live_router_check_interval, "Seconds between live router health checks (<= 0 to disable)");
90
91 static int router_ping_timeout = 50;
92 module_param(router_ping_timeout, int, 0644);
93 MODULE_PARM_DESC(router_ping_timeout, "Seconds to wait for the reply to a router health query");
94
95 int
96 lnet_peers_start_down(void)
97 {
98         return check_routers_before_use;
99 }
100
101 void
102 lnet_notify_locked(struct lnet_peer *lp, int notifylnd, int alive,
103                    unsigned long when)
104 {
105         if (time_before(when, lp->lp_timestamp)) { /* out of date information */
106                 CDEBUG(D_NET, "Out of date\n");
107                 return;
108         }
109
110         lp->lp_timestamp = when;                /* update timestamp */
111         lp->lp_ping_deadline = 0;              /* disable ping timeout */
112
113         if (lp->lp_alive_count &&         /* got old news */
114             (!lp->lp_alive) == (!alive)) {      /* new date for old news */
115                 CDEBUG(D_NET, "Old news\n");
116                 return;
117         }
118
119         /* Flag that notification is outstanding */
120
121         lp->lp_alive_count++;
122         lp->lp_alive = !(!alive);              /* 1 bit! */
123         lp->lp_notify = 1;
124         lp->lp_notifylnd |= notifylnd;
125         if (lp->lp_alive)
126                 lp->lp_ping_feats = LNET_PING_FEAT_INVAL; /* reset */
127
128         CDEBUG(D_NET, "set %s %d\n", libcfs_nid2str(lp->lp_nid), alive);
129 }
130
131 static void
132 lnet_ni_notify_locked(struct lnet_ni *ni, struct lnet_peer *lp)
133 {
134         int alive;
135         int notifylnd;
136
137         /*
138          * Notify only in 1 thread at any time to ensure ordered notification.
139          * NB individual events can be missed; the only guarantee is that you
140          * always get the most recent news
141          */
142         if (lp->lp_notifying || !ni)
143                 return;
144
145         lp->lp_notifying = 1;
146
147         while (lp->lp_notify) {
148                 alive = lp->lp_alive;
149                 notifylnd = lp->lp_notifylnd;
150
151                 lp->lp_notifylnd = 0;
152                 lp->lp_notify    = 0;
153
154                 if (notifylnd && ni->ni_lnd->lnd_notify) {
155                         lnet_net_unlock(lp->lp_cpt);
156
157                         /*
158                          * A new notification could happen now; I'll handle it
159                          * when control returns to me
160                          */
161                         ni->ni_lnd->lnd_notify(ni, lp->lp_nid, alive);
162
163                         lnet_net_lock(lp->lp_cpt);
164                 }
165         }
166
167         lp->lp_notifying = 0;
168 }
169
170 static void
171 lnet_rtr_addref_locked(struct lnet_peer *lp)
172 {
173         LASSERT(lp->lp_refcount > 0);
174         LASSERT(lp->lp_rtr_refcount >= 0);
175
176         /* lnet_net_lock must be exclusively locked */
177         lp->lp_rtr_refcount++;
178         if (lp->lp_rtr_refcount == 1) {
179                 struct list_head *pos;
180
181                 /* a simple insertion sort */
182                 list_for_each_prev(pos, &the_lnet.ln_routers) {
183                         struct lnet_peer *rtr;
184
185                         rtr = list_entry(pos, struct lnet_peer, lp_rtr_list);
186                         if (rtr->lp_nid < lp->lp_nid)
187                                 break;
188                 }
189
190                 list_add(&lp->lp_rtr_list, pos);
191                 /* addref for the_lnet.ln_routers */
192                 lnet_peer_addref_locked(lp);
193                 the_lnet.ln_routers_version++;
194         }
195 }
196
197 static void
198 lnet_rtr_decref_locked(struct lnet_peer *lp)
199 {
200         LASSERT(lp->lp_refcount > 0);
201         LASSERT(lp->lp_rtr_refcount > 0);
202
203         /* lnet_net_lock must be exclusively locked */
204         lp->lp_rtr_refcount--;
205         if (!lp->lp_rtr_refcount) {
206                 LASSERT(list_empty(&lp->lp_routes));
207
208                 if (lp->lp_rcd) {
209                         list_add(&lp->lp_rcd->rcd_list,
210                                  &the_lnet.ln_rcd_deathrow);
211                         lp->lp_rcd = NULL;
212                 }
213
214                 list_del(&lp->lp_rtr_list);
215                 /* decref for the_lnet.ln_routers */
216                 lnet_peer_decref_locked(lp);
217                 the_lnet.ln_routers_version++;
218         }
219 }
220
221 struct lnet_remotenet *
222 lnet_find_net_locked(__u32 net)
223 {
224         struct lnet_remotenet *rnet;
225         struct list_head *tmp;
226         struct list_head *rn_list;
227
228         LASSERT(!the_lnet.ln_shutdown);
229
230         rn_list = lnet_net2rnethash(net);
231         list_for_each(tmp, rn_list) {
232                 rnet = list_entry(tmp, struct lnet_remotenet, lrn_list);
233
234                 if (rnet->lrn_net == net)
235                         return rnet;
236         }
237         return NULL;
238 }
239
240 static void lnet_shuffle_seed(void)
241 {
242         static int seeded;
243         __u32 lnd_type, seed[2];
244         struct timespec64 ts;
245         struct lnet_ni *ni;
246         struct list_head *tmp;
247
248         if (seeded)
249                 return;
250
251         cfs_get_random_bytes(seed, sizeof(seed));
252
253         /*
254          * Nodes with small feet have little entropy
255          * the NID for this node gives the most entropy in the low bits
256          */
257         list_for_each(tmp, &the_lnet.ln_nis) {
258                 ni = list_entry(tmp, struct lnet_ni, ni_list);
259                 lnd_type = LNET_NETTYP(LNET_NIDNET(ni->ni_nid));
260
261                 if (lnd_type != LOLND)
262                         seed[0] ^= (LNET_NIDADDR(ni->ni_nid) | lnd_type);
263         }
264
265         ktime_get_ts64(&ts);
266         cfs_srand(ts.tv_sec ^ seed[0], ts.tv_nsec ^ seed[1]);
267         seeded = 1;
268 }
269
270 /* NB expects LNET_LOCK held */
271 static void
272 lnet_add_route_to_rnet(struct lnet_remotenet *rnet, struct lnet_route *route)
273 {
274         unsigned int len = 0;
275         unsigned int offset = 0;
276         struct list_head *e;
277
278         lnet_shuffle_seed();
279
280         list_for_each(e, &rnet->lrn_routes) {
281                 len++;
282         }
283
284         /* len+1 positions to add a new entry, also prevents division by 0 */
285         offset = cfs_rand() % (len + 1);
286         list_for_each(e, &rnet->lrn_routes) {
287                 if (!offset)
288                         break;
289                 offset--;
290         }
291         list_add(&route->lr_list, e);
292         list_add(&route->lr_gwlist, &route->lr_gateway->lp_routes);
293
294         the_lnet.ln_remote_nets_version++;
295         lnet_rtr_addref_locked(route->lr_gateway);
296 }
297
298 int
299 lnet_add_route(__u32 net, __u32 hops, lnet_nid_t gateway,
300                unsigned int priority)
301 {
302         struct list_head *e;
303         struct lnet_remotenet *rnet;
304         struct lnet_remotenet *rnet2;
305         struct lnet_route *route;
306         struct lnet_ni *ni;
307         int add_route;
308         int rc;
309
310         CDEBUG(D_NET, "Add route: net %s hops %d priority %u gw %s\n",
311                libcfs_net2str(net), hops, priority, libcfs_nid2str(gateway));
312
313         if (gateway == LNET_NID_ANY ||
314             LNET_NETTYP(LNET_NIDNET(gateway)) == LOLND ||
315             net == LNET_NIDNET(LNET_NID_ANY) ||
316             LNET_NETTYP(net) == LOLND ||
317             LNET_NIDNET(gateway) == net ||
318             (hops != LNET_UNDEFINED_HOPS && (hops < 1 || hops > 255)))
319                 return -EINVAL;
320
321         if (lnet_islocalnet(net))              /* it's a local network */
322                 return -EEXIST;
323
324         /* Assume net, route, all new */
325         LIBCFS_ALLOC(route, sizeof(*route));
326         LIBCFS_ALLOC(rnet, sizeof(*rnet));
327         if (!route || !rnet) {
328                 CERROR("Out of memory creating route %s %d %s\n",
329                        libcfs_net2str(net), hops, libcfs_nid2str(gateway));
330                 if (route)
331                         LIBCFS_FREE(route, sizeof(*route));
332                 if (rnet)
333                         LIBCFS_FREE(rnet, sizeof(*rnet));
334                 return -ENOMEM;
335         }
336
337         INIT_LIST_HEAD(&rnet->lrn_routes);
338         rnet->lrn_net = net;
339         route->lr_hops = hops;
340         route->lr_net = net;
341         route->lr_priority = priority;
342
343         lnet_net_lock(LNET_LOCK_EX);
344
345         rc = lnet_nid2peer_locked(&route->lr_gateway, gateway, LNET_LOCK_EX);
346         if (rc) {
347                 lnet_net_unlock(LNET_LOCK_EX);
348
349                 LIBCFS_FREE(route, sizeof(*route));
350                 LIBCFS_FREE(rnet, sizeof(*rnet));
351
352                 if (rc == -EHOSTUNREACH) /* gateway is not on a local net */
353                         return rc;      /* ignore the route entry */
354                 CERROR("Error %d creating route %s %d %s\n", rc,
355                        libcfs_net2str(net), hops,
356                        libcfs_nid2str(gateway));
357                 return rc;
358         }
359
360         LASSERT(!the_lnet.ln_shutdown);
361
362         rnet2 = lnet_find_net_locked(net);
363         if (!rnet2) {
364                 /* new network */
365                 list_add_tail(&rnet->lrn_list, lnet_net2rnethash(net));
366                 rnet2 = rnet;
367         }
368
369         /* Search for a duplicate route (it's a NOOP if it is) */
370         add_route = 1;
371         list_for_each(e, &rnet2->lrn_routes) {
372                 struct lnet_route *route2;
373
374                 route2 = list_entry(e, struct lnet_route, lr_list);
375                 if (route2->lr_gateway == route->lr_gateway) {
376                         add_route = 0;
377                         break;
378                 }
379
380                 /* our lookups must be true */
381                 LASSERT(route2->lr_gateway->lp_nid != gateway);
382         }
383
384         if (add_route) {
385                 lnet_peer_addref_locked(route->lr_gateway); /* +1 for notify */
386                 lnet_add_route_to_rnet(rnet2, route);
387
388                 ni = route->lr_gateway->lp_ni;
389                 lnet_net_unlock(LNET_LOCK_EX);
390
391                 /* XXX Assume alive */
392                 if (ni->ni_lnd->lnd_notify)
393                         ni->ni_lnd->lnd_notify(ni, gateway, 1);
394
395                 lnet_net_lock(LNET_LOCK_EX);
396         }
397
398         /* -1 for notify or !add_route */
399         lnet_peer_decref_locked(route->lr_gateway);
400         lnet_net_unlock(LNET_LOCK_EX);
401         rc = 0;
402
403         if (!add_route) {
404                 rc = -EEXIST;
405                 LIBCFS_FREE(route, sizeof(*route));
406         }
407
408         if (rnet != rnet2)
409                 LIBCFS_FREE(rnet, sizeof(*rnet));
410
411         /* indicate to startup the router checker if configured */
412         wake_up(&the_lnet.ln_rc_waitq);
413
414         return rc;
415 }
416
417 int
418 lnet_check_routes(void)
419 {
420         struct lnet_remotenet *rnet;
421         struct lnet_route *route;
422         struct lnet_route *route2;
423         struct list_head *e1;
424         struct list_head *e2;
425         int cpt;
426         struct list_head *rn_list;
427         int i;
428
429         cpt = lnet_net_lock_current();
430
431         for (i = 0; i < LNET_REMOTE_NETS_HASH_SIZE; i++) {
432                 rn_list = &the_lnet.ln_remote_nets_hash[i];
433                 list_for_each(e1, rn_list) {
434                         rnet = list_entry(e1, struct lnet_remotenet, lrn_list);
435
436                         route2 = NULL;
437                         list_for_each(e2, &rnet->lrn_routes) {
438                                 lnet_nid_t nid1;
439                                 lnet_nid_t nid2;
440                                 int net;
441
442                                 route = list_entry(e2, struct lnet_route, lr_list);
443
444                                 if (!route2) {
445                                         route2 = route;
446                                         continue;
447                                 }
448
449                                 if (route->lr_gateway->lp_ni ==
450                                     route2->lr_gateway->lp_ni)
451                                         continue;
452
453                                 nid1 = route->lr_gateway->lp_nid;
454                                 nid2 = route2->lr_gateway->lp_nid;
455                                 net = rnet->lrn_net;
456
457                                 lnet_net_unlock(cpt);
458
459                                 CERROR("Routes to %s via %s and %s not supported\n",
460                                        libcfs_net2str(net),
461                                        libcfs_nid2str(nid1),
462                                        libcfs_nid2str(nid2));
463                                 return -EINVAL;
464                         }
465                 }
466         }
467
468         lnet_net_unlock(cpt);
469         return 0;
470 }
471
472 int
473 lnet_del_route(__u32 net, lnet_nid_t gw_nid)
474 {
475         struct lnet_peer *gateway;
476         struct lnet_remotenet *rnet;
477         struct lnet_route *route;
478         struct list_head *e1;
479         struct list_head *e2;
480         int rc = -ENOENT;
481         struct list_head *rn_list;
482         int idx = 0;
483
484         CDEBUG(D_NET, "Del route: net %s : gw %s\n",
485                libcfs_net2str(net), libcfs_nid2str(gw_nid));
486
487         /*
488          * NB Caller may specify either all routes via the given gateway
489          * or a specific route entry actual NIDs)
490          */
491         lnet_net_lock(LNET_LOCK_EX);
492         if (net == LNET_NIDNET(LNET_NID_ANY))
493                 rn_list = &the_lnet.ln_remote_nets_hash[0];
494         else
495                 rn_list = lnet_net2rnethash(net);
496
497  again:
498         list_for_each(e1, rn_list) {
499                 rnet = list_entry(e1, struct lnet_remotenet, lrn_list);
500
501                 if (!(net == LNET_NIDNET(LNET_NID_ANY) ||
502                       net == rnet->lrn_net))
503                         continue;
504
505                 list_for_each(e2, &rnet->lrn_routes) {
506                         route = list_entry(e2, struct lnet_route, lr_list);
507
508                         gateway = route->lr_gateway;
509                         if (!(gw_nid == LNET_NID_ANY ||
510                               gw_nid == gateway->lp_nid))
511                                 continue;
512
513                         list_del(&route->lr_list);
514                         list_del(&route->lr_gwlist);
515                         the_lnet.ln_remote_nets_version++;
516
517                         if (list_empty(&rnet->lrn_routes))
518                                 list_del(&rnet->lrn_list);
519                         else
520                                 rnet = NULL;
521
522                         lnet_rtr_decref_locked(gateway);
523                         lnet_peer_decref_locked(gateway);
524
525                         lnet_net_unlock(LNET_LOCK_EX);
526
527                         LIBCFS_FREE(route, sizeof(*route));
528
529                         if (rnet)
530                                 LIBCFS_FREE(rnet, sizeof(*rnet));
531
532                         rc = 0;
533                         lnet_net_lock(LNET_LOCK_EX);
534                         goto again;
535                 }
536         }
537
538         if (net == LNET_NIDNET(LNET_NID_ANY) &&
539             ++idx < LNET_REMOTE_NETS_HASH_SIZE) {
540                 rn_list = &the_lnet.ln_remote_nets_hash[idx];
541                 goto again;
542         }
543         lnet_net_unlock(LNET_LOCK_EX);
544
545         return rc;
546 }
547
548 void
549 lnet_destroy_routes(void)
550 {
551         lnet_del_route(LNET_NIDNET(LNET_NID_ANY), LNET_NID_ANY);
552 }
553
554 int lnet_get_rtr_pool_cfg(int idx, struct lnet_ioctl_pool_cfg *pool_cfg)
555 {
556         int i, rc = -ENOENT, j;
557
558         if (!the_lnet.ln_rtrpools)
559                 return rc;
560
561         for (i = 0; i < LNET_NRBPOOLS; i++) {
562                 struct lnet_rtrbufpool *rbp;
563
564                 lnet_net_lock(LNET_LOCK_EX);
565                 cfs_percpt_for_each(rbp, j, the_lnet.ln_rtrpools) {
566                         if (i++ != idx)
567                                 continue;
568
569                         pool_cfg->pl_pools[i].pl_npages = rbp[i].rbp_npages;
570                         pool_cfg->pl_pools[i].pl_nbuffers = rbp[i].rbp_nbuffers;
571                         pool_cfg->pl_pools[i].pl_credits = rbp[i].rbp_credits;
572                         pool_cfg->pl_pools[i].pl_mincredits = rbp[i].rbp_mincredits;
573                         rc = 0;
574                         break;
575                 }
576                 lnet_net_unlock(LNET_LOCK_EX);
577         }
578
579         lnet_net_lock(LNET_LOCK_EX);
580         pool_cfg->pl_routing = the_lnet.ln_routing;
581         lnet_net_unlock(LNET_LOCK_EX);
582
583         return rc;
584 }
585
586 int
587 lnet_get_route(int idx, __u32 *net, __u32 *hops,
588                lnet_nid_t *gateway, __u32 *alive, __u32 *priority)
589 {
590         struct list_head *e1;
591         struct list_head *e2;
592         struct lnet_remotenet *rnet;
593         struct lnet_route *route;
594         int cpt;
595         int i;
596         struct list_head *rn_list;
597
598         cpt = lnet_net_lock_current();
599
600         for (i = 0; i < LNET_REMOTE_NETS_HASH_SIZE; i++) {
601                 rn_list = &the_lnet.ln_remote_nets_hash[i];
602                 list_for_each(e1, rn_list) {
603                         rnet = list_entry(e1, struct lnet_remotenet, lrn_list);
604
605                         list_for_each(e2, &rnet->lrn_routes) {
606                                 route = list_entry(e2, struct lnet_route,
607                                                    lr_list);
608
609                                 if (!idx--) {
610                                         *net      = rnet->lrn_net;
611                                         *hops     = route->lr_hops;
612                                         *priority = route->lr_priority;
613                                         *gateway  = route->lr_gateway->lp_nid;
614                                         *alive = lnet_is_route_alive(route);
615                                         lnet_net_unlock(cpt);
616                                         return 0;
617                                 }
618                         }
619                 }
620         }
621
622         lnet_net_unlock(cpt);
623         return -ENOENT;
624 }
625
626 void
627 lnet_swap_pinginfo(struct lnet_ping_info *info)
628 {
629         int i;
630         struct lnet_ni_status *stat;
631
632         __swab32s(&info->pi_magic);
633         __swab32s(&info->pi_features);
634         __swab32s(&info->pi_pid);
635         __swab32s(&info->pi_nnis);
636         for (i = 0; i < info->pi_nnis && i < LNET_MAX_RTR_NIS; i++) {
637                 stat = &info->pi_ni[i];
638                 __swab64s(&stat->ns_nid);
639                 __swab32s(&stat->ns_status);
640         }
641 }
642
643 /**
644  * parse router-checker pinginfo, record number of down NIs for remote
645  * networks on that router.
646  */
647 static void
648 lnet_parse_rc_info(struct lnet_rc_data *rcd)
649 {
650         struct lnet_ping_info *info = rcd->rcd_pinginfo;
651         struct lnet_peer *gw = rcd->rcd_gateway;
652         struct lnet_route *rte;
653
654         if (!gw->lp_alive)
655                 return;
656
657         if (info->pi_magic == __swab32(LNET_PROTO_PING_MAGIC))
658                 lnet_swap_pinginfo(info);
659
660         /* NB always racing with network! */
661         if (info->pi_magic != LNET_PROTO_PING_MAGIC) {
662                 CDEBUG(D_NET, "%s: Unexpected magic %08x\n",
663                        libcfs_nid2str(gw->lp_nid), info->pi_magic);
664                 gw->lp_ping_feats = LNET_PING_FEAT_INVAL;
665                 return;
666         }
667
668         gw->lp_ping_feats = info->pi_features;
669         if (!(gw->lp_ping_feats & LNET_PING_FEAT_MASK)) {
670                 CDEBUG(D_NET, "%s: Unexpected features 0x%x\n",
671                        libcfs_nid2str(gw->lp_nid), gw->lp_ping_feats);
672                 return; /* nothing I can understand */
673         }
674
675         if (!(gw->lp_ping_feats & LNET_PING_FEAT_NI_STATUS))
676                 return; /* can't carry NI status info */
677
678         list_for_each_entry(rte, &gw->lp_routes, lr_gwlist) {
679                 int down = 0;
680                 int up = 0;
681                 int i;
682
683                 if (gw->lp_ping_feats & LNET_PING_FEAT_RTE_DISABLED) {
684                         rte->lr_downis = 1;
685                         continue;
686                 }
687
688                 for (i = 0; i < info->pi_nnis && i < LNET_MAX_RTR_NIS; i++) {
689                         struct lnet_ni_status *stat = &info->pi_ni[i];
690                         lnet_nid_t nid = stat->ns_nid;
691
692                         if (nid == LNET_NID_ANY) {
693                                 CDEBUG(D_NET, "%s: unexpected LNET_NID_ANY\n",
694                                        libcfs_nid2str(gw->lp_nid));
695                                 gw->lp_ping_feats = LNET_PING_FEAT_INVAL;
696                                 return;
697                         }
698
699                         if (LNET_NETTYP(LNET_NIDNET(nid)) == LOLND)
700                                 continue;
701
702                         if (stat->ns_status == LNET_NI_STATUS_DOWN) {
703                                 down++;
704                                 continue;
705                         }
706
707                         if (stat->ns_status == LNET_NI_STATUS_UP) {
708                                 if (LNET_NIDNET(nid) == rte->lr_net) {
709                                         up = 1;
710                                         break;
711                                 }
712                                 continue;
713                         }
714
715                         CDEBUG(D_NET, "%s: Unexpected status 0x%x\n",
716                                libcfs_nid2str(gw->lp_nid), stat->ns_status);
717                         gw->lp_ping_feats = LNET_PING_FEAT_INVAL;
718                         return;
719                 }
720
721                 if (up) { /* ignore downed NIs if NI for dest network is up */
722                         rte->lr_downis = 0;
723                         continue;
724                 }
725                 /**
726                  * if @down is zero and this route is single-hop, it means
727                  * we can't find NI for target network
728                  */
729                 if (!down && rte->lr_hops == 1)
730                         down = 1;
731
732                 rte->lr_downis = down;
733         }
734 }
735
736 static void
737 lnet_router_checker_event(struct lnet_event *event)
738 {
739         struct lnet_rc_data *rcd = event->md.user_ptr;
740         struct lnet_peer *lp;
741
742         LASSERT(rcd);
743
744         if (event->unlinked) {
745                 LNetInvalidateMDHandle(&rcd->rcd_mdh);
746                 return;
747         }
748
749         LASSERT(event->type == LNET_EVENT_SEND ||
750                 event->type == LNET_EVENT_REPLY);
751
752         lp = rcd->rcd_gateway;
753         LASSERT(lp);
754
755         /*
756          * NB: it's called with holding lnet_res_lock, we have a few
757          * places need to hold both locks at the same time, please take
758          * care of lock ordering
759          */
760         lnet_net_lock(lp->lp_cpt);
761         if (!lnet_isrouter(lp) || lp->lp_rcd != rcd) {
762                 /* ignore if no longer a router or rcd is replaced */
763                 goto out;
764         }
765
766         if (event->type == LNET_EVENT_SEND) {
767                 lp->lp_ping_notsent = 0;
768                 if (!event->status)
769                         goto out;
770         }
771
772         /* LNET_EVENT_REPLY */
773         /*
774          * A successful REPLY means the router is up.  If _any_ comms
775          * to the router fail I assume it's down (this will happen if
776          * we ping alive routers to try to detect router death before
777          * apps get burned).
778          */
779         lnet_notify_locked(lp, 1, !event->status, cfs_time_current());
780
781         /*
782          * The router checker will wake up very shortly and do the
783          * actual notification.
784          * XXX If 'lp' stops being a router before then, it will still
785          * have the notification pending!!!
786          */
787         if (avoid_asym_router_failure && !event->status)
788                 lnet_parse_rc_info(rcd);
789
790  out:
791         lnet_net_unlock(lp->lp_cpt);
792 }
793
794 static void
795 lnet_wait_known_routerstate(void)
796 {
797         struct lnet_peer *rtr;
798         struct list_head *entry;
799         int all_known;
800
801         LASSERT(the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
802
803         for (;;) {
804                 int cpt = lnet_net_lock_current();
805
806                 all_known = 1;
807                 list_for_each(entry, &the_lnet.ln_routers) {
808                         rtr = list_entry(entry, struct lnet_peer, lp_rtr_list);
809
810                         if (!rtr->lp_alive_count) {
811                                 all_known = 0;
812                                 break;
813                         }
814                 }
815
816                 lnet_net_unlock(cpt);
817
818                 if (all_known)
819                         return;
820
821                 set_current_state(TASK_UNINTERRUPTIBLE);
822                 schedule_timeout(cfs_time_seconds(1));
823         }
824 }
825
826 void
827 lnet_router_ni_update_locked(struct lnet_peer *gw, __u32 net)
828 {
829         struct lnet_route *rte;
830
831         if ((gw->lp_ping_feats & LNET_PING_FEAT_NI_STATUS)) {
832                 list_for_each_entry(rte, &gw->lp_routes, lr_gwlist) {
833                         if (rte->lr_net == net) {
834                                 rte->lr_downis = 0;
835                                 break;
836                         }
837                 }
838         }
839 }
840
841 static void
842 lnet_update_ni_status_locked(void)
843 {
844         struct lnet_ni *ni;
845         time64_t now;
846         int timeout;
847
848         LASSERT(the_lnet.ln_routing);
849
850         timeout = router_ping_timeout +
851                   max(live_router_check_interval, dead_router_check_interval);
852
853         now = ktime_get_real_seconds();
854         list_for_each_entry(ni, &the_lnet.ln_nis, ni_list) {
855                 if (ni->ni_lnd->lnd_type == LOLND)
856                         continue;
857
858                 if (now < ni->ni_last_alive + timeout)
859                         continue;
860
861                 lnet_ni_lock(ni);
862                 /* re-check with lock */
863                 if (now < ni->ni_last_alive + timeout) {
864                         lnet_ni_unlock(ni);
865                         continue;
866                 }
867
868                 LASSERT(ni->ni_status);
869
870                 if (ni->ni_status->ns_status != LNET_NI_STATUS_DOWN) {
871                         CDEBUG(D_NET, "NI(%s:%d) status changed to down\n",
872                                libcfs_nid2str(ni->ni_nid), timeout);
873                         /*
874                          * NB: so far, this is the only place to set
875                          * NI status to "down"
876                          */
877                         ni->ni_status->ns_status = LNET_NI_STATUS_DOWN;
878                 }
879                 lnet_ni_unlock(ni);
880         }
881 }
882
883 static void
884 lnet_destroy_rc_data(struct lnet_rc_data *rcd)
885 {
886         LASSERT(list_empty(&rcd->rcd_list));
887         /* detached from network */
888         LASSERT(LNetMDHandleIsInvalid(rcd->rcd_mdh));
889
890         if (rcd->rcd_gateway) {
891                 int cpt = rcd->rcd_gateway->lp_cpt;
892
893                 lnet_net_lock(cpt);
894                 lnet_peer_decref_locked(rcd->rcd_gateway);
895                 lnet_net_unlock(cpt);
896         }
897
898         if (rcd->rcd_pinginfo)
899                 LIBCFS_FREE(rcd->rcd_pinginfo, LNET_PINGINFO_SIZE);
900
901         LIBCFS_FREE(rcd, sizeof(*rcd));
902 }
903
904 static struct lnet_rc_data *
905 lnet_create_rc_data_locked(struct lnet_peer *gateway)
906 {
907         struct lnet_rc_data *rcd = NULL;
908         struct lnet_ping_info *pi;
909         struct lnet_md md;
910         int rc;
911         int i;
912
913         lnet_net_unlock(gateway->lp_cpt);
914
915         LIBCFS_ALLOC(rcd, sizeof(*rcd));
916         if (!rcd)
917                 goto out;
918
919         LNetInvalidateMDHandle(&rcd->rcd_mdh);
920         INIT_LIST_HEAD(&rcd->rcd_list);
921
922         LIBCFS_ALLOC(pi, LNET_PINGINFO_SIZE);
923         if (!pi)
924                 goto out;
925
926         for (i = 0; i < LNET_MAX_RTR_NIS; i++) {
927                 pi->pi_ni[i].ns_nid = LNET_NID_ANY;
928                 pi->pi_ni[i].ns_status = LNET_NI_STATUS_INVALID;
929         }
930         rcd->rcd_pinginfo = pi;
931
932         md.start = pi;
933         md.user_ptr = rcd;
934         md.length = LNET_PINGINFO_SIZE;
935         md.threshold = LNET_MD_THRESH_INF;
936         md.options = LNET_MD_TRUNCATE;
937         md.eq_handle = the_lnet.ln_rc_eqh;
938
939         LASSERT(!LNetEQHandleIsInvalid(the_lnet.ln_rc_eqh));
940         rc = LNetMDBind(md, LNET_UNLINK, &rcd->rcd_mdh);
941         if (rc < 0) {
942                 CERROR("Can't bind MD: %d\n", rc);
943                 goto out;
944         }
945         LASSERT(!rc);
946
947         lnet_net_lock(gateway->lp_cpt);
948         /* router table changed or someone has created rcd for this gateway */
949         if (!lnet_isrouter(gateway) || gateway->lp_rcd) {
950                 lnet_net_unlock(gateway->lp_cpt);
951                 goto out;
952         }
953
954         lnet_peer_addref_locked(gateway);
955         rcd->rcd_gateway = gateway;
956         gateway->lp_rcd = rcd;
957         gateway->lp_ping_notsent = 0;
958
959         return rcd;
960
961  out:
962         if (rcd) {
963                 if (!LNetMDHandleIsInvalid(rcd->rcd_mdh)) {
964                         rc = LNetMDUnlink(rcd->rcd_mdh);
965                         LASSERT(!rc);
966                 }
967                 lnet_destroy_rc_data(rcd);
968         }
969
970         lnet_net_lock(gateway->lp_cpt);
971         return gateway->lp_rcd;
972 }
973
974 static int
975 lnet_router_check_interval(struct lnet_peer *rtr)
976 {
977         int secs;
978
979         secs = rtr->lp_alive ? live_router_check_interval :
980                                dead_router_check_interval;
981         if (secs < 0)
982                 secs = 0;
983
984         return secs;
985 }
986
987 static void
988 lnet_ping_router_locked(struct lnet_peer *rtr)
989 {
990         struct lnet_rc_data *rcd = NULL;
991         unsigned long now = cfs_time_current();
992         int secs;
993
994         lnet_peer_addref_locked(rtr);
995
996         if (rtr->lp_ping_deadline && /* ping timed out? */
997             cfs_time_after(now, rtr->lp_ping_deadline))
998                 lnet_notify_locked(rtr, 1, 0, now);
999
1000         /* Run any outstanding notifications */
1001         lnet_ni_notify_locked(rtr->lp_ni, rtr);
1002
1003         if (!lnet_isrouter(rtr) ||
1004             the_lnet.ln_rc_state != LNET_RC_STATE_RUNNING) {
1005                 /* router table changed or router checker is shutting down */
1006                 lnet_peer_decref_locked(rtr);
1007                 return;
1008         }
1009
1010         rcd = rtr->lp_rcd ?
1011               rtr->lp_rcd : lnet_create_rc_data_locked(rtr);
1012
1013         if (!rcd)
1014                 return;
1015
1016         secs = lnet_router_check_interval(rtr);
1017
1018         CDEBUG(D_NET,
1019                "rtr %s %d: deadline %lu ping_notsent %d alive %d alive_count %d lp_ping_timestamp %lu\n",
1020                libcfs_nid2str(rtr->lp_nid), secs,
1021                rtr->lp_ping_deadline, rtr->lp_ping_notsent,
1022                rtr->lp_alive, rtr->lp_alive_count, rtr->lp_ping_timestamp);
1023
1024         if (secs && !rtr->lp_ping_notsent &&
1025             cfs_time_after(now, cfs_time_add(rtr->lp_ping_timestamp,
1026                                              cfs_time_seconds(secs)))) {
1027                 int rc;
1028                 struct lnet_process_id id;
1029                 struct lnet_handle_md mdh;
1030
1031                 id.nid = rtr->lp_nid;
1032                 id.pid = LNET_PID_LUSTRE;
1033                 CDEBUG(D_NET, "Check: %s\n", libcfs_id2str(id));
1034
1035                 rtr->lp_ping_notsent   = 1;
1036                 rtr->lp_ping_timestamp = now;
1037
1038                 mdh = rcd->rcd_mdh;
1039
1040                 if (!rtr->lp_ping_deadline) {
1041                         rtr->lp_ping_deadline =
1042                                 cfs_time_shift(router_ping_timeout);
1043                 }
1044
1045                 lnet_net_unlock(rtr->lp_cpt);
1046
1047                 rc = LNetGet(LNET_NID_ANY, mdh, id, LNET_RESERVED_PORTAL,
1048                              LNET_PROTO_PING_MATCHBITS, 0);
1049
1050                 lnet_net_lock(rtr->lp_cpt);
1051                 if (rc)
1052                         rtr->lp_ping_notsent = 0; /* no event pending */
1053         }
1054
1055         lnet_peer_decref_locked(rtr);
1056 }
1057
1058 int
1059 lnet_router_checker_start(void)
1060 {
1061         struct task_struct *task;
1062         int rc;
1063         int eqsz = 0;
1064
1065         LASSERT(the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN);
1066
1067         if (check_routers_before_use &&
1068             dead_router_check_interval <= 0) {
1069                 LCONSOLE_ERROR_MSG(0x10a, "'dead_router_check_interval' must be set if 'check_routers_before_use' is set\n");
1070                 return -EINVAL;
1071         }
1072
1073         init_completion(&the_lnet.ln_rc_signal);
1074
1075         rc = LNetEQAlloc(0, lnet_router_checker_event, &the_lnet.ln_rc_eqh);
1076         if (rc) {
1077                 CERROR("Can't allocate EQ(%d): %d\n", eqsz, rc);
1078                 return -ENOMEM;
1079         }
1080
1081         the_lnet.ln_rc_state = LNET_RC_STATE_RUNNING;
1082         task = kthread_run(lnet_router_checker, NULL, "router_checker");
1083         if (IS_ERR(task)) {
1084                 rc = PTR_ERR(task);
1085                 CERROR("Can't start router checker thread: %d\n", rc);
1086                 /* block until event callback signals exit */
1087                 wait_for_completion(&the_lnet.ln_rc_signal);
1088                 rc = LNetEQFree(the_lnet.ln_rc_eqh);
1089                 LASSERT(!rc);
1090                 the_lnet.ln_rc_state = LNET_RC_STATE_SHUTDOWN;
1091                 return -ENOMEM;
1092         }
1093
1094         if (check_routers_before_use) {
1095                 /*
1096                  * Note that a helpful side-effect of pinging all known routers
1097                  * at startup is that it makes them drop stale connections they
1098                  * may have to a previous instance of me.
1099                  */
1100                 lnet_wait_known_routerstate();
1101         }
1102
1103         return 0;
1104 }
1105
1106 void
1107 lnet_router_checker_stop(void)
1108 {
1109         int rc;
1110
1111         if (the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN)
1112                 return;
1113
1114         LASSERT(the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
1115         the_lnet.ln_rc_state = LNET_RC_STATE_STOPPING;
1116         /* wakeup the RC thread if it's sleeping */
1117         wake_up(&the_lnet.ln_rc_waitq);
1118
1119         /* block until event callback signals exit */
1120         wait_for_completion(&the_lnet.ln_rc_signal);
1121         LASSERT(the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN);
1122
1123         rc = LNetEQFree(the_lnet.ln_rc_eqh);
1124         LASSERT(!rc);
1125 }
1126
1127 static void
1128 lnet_prune_rc_data(int wait_unlink)
1129 {
1130         struct lnet_rc_data *rcd;
1131         struct lnet_rc_data *tmp;
1132         struct lnet_peer *lp;
1133         struct list_head head;
1134         int i = 2;
1135
1136         if (likely(the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING &&
1137                    list_empty(&the_lnet.ln_rcd_deathrow) &&
1138                    list_empty(&the_lnet.ln_rcd_zombie)))
1139                 return;
1140
1141         INIT_LIST_HEAD(&head);
1142
1143         lnet_net_lock(LNET_LOCK_EX);
1144
1145         if (the_lnet.ln_rc_state != LNET_RC_STATE_RUNNING) {
1146                 /* router checker is stopping, prune all */
1147                 list_for_each_entry(lp, &the_lnet.ln_routers,
1148                                     lp_rtr_list) {
1149                         if (!lp->lp_rcd)
1150                                 continue;
1151
1152                         LASSERT(list_empty(&lp->lp_rcd->rcd_list));
1153                         list_add(&lp->lp_rcd->rcd_list,
1154                                  &the_lnet.ln_rcd_deathrow);
1155                         lp->lp_rcd = NULL;
1156                 }
1157         }
1158
1159         /* unlink all RCDs on deathrow list */
1160         list_splice_init(&the_lnet.ln_rcd_deathrow, &head);
1161
1162         if (!list_empty(&head)) {
1163                 lnet_net_unlock(LNET_LOCK_EX);
1164
1165                 list_for_each_entry(rcd, &head, rcd_list)
1166                         LNetMDUnlink(rcd->rcd_mdh);
1167
1168                 lnet_net_lock(LNET_LOCK_EX);
1169         }
1170
1171         list_splice_init(&head, &the_lnet.ln_rcd_zombie);
1172
1173         /* release all zombie RCDs */
1174         while (!list_empty(&the_lnet.ln_rcd_zombie)) {
1175                 list_for_each_entry_safe(rcd, tmp, &the_lnet.ln_rcd_zombie,
1176                                          rcd_list) {
1177                         if (LNetMDHandleIsInvalid(rcd->rcd_mdh))
1178                                 list_move(&rcd->rcd_list, &head);
1179                 }
1180
1181                 wait_unlink = wait_unlink &&
1182                               !list_empty(&the_lnet.ln_rcd_zombie);
1183
1184                 lnet_net_unlock(LNET_LOCK_EX);
1185
1186                 while (!list_empty(&head)) {
1187                         rcd = list_entry(head.next,
1188                                          struct lnet_rc_data, rcd_list);
1189                         list_del_init(&rcd->rcd_list);
1190                         lnet_destroy_rc_data(rcd);
1191                 }
1192
1193                 if (!wait_unlink)
1194                         return;
1195
1196                 i++;
1197                 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET,
1198                        "Waiting for rc buffers to unlink\n");
1199                 set_current_state(TASK_UNINTERRUPTIBLE);
1200                 schedule_timeout(cfs_time_seconds(1) / 4);
1201
1202                 lnet_net_lock(LNET_LOCK_EX);
1203         }
1204
1205         lnet_net_unlock(LNET_LOCK_EX);
1206 }
1207
1208 /*
1209  * This function is called to check if the RC should block indefinitely.
1210  * It's called from lnet_router_checker() as well as being passed to
1211  * wait_event_interruptible() to avoid the lost wake_up problem.
1212  *
1213  * When it's called from wait_event_interruptible() it is necessary to
1214  * also not sleep if the rc state is not running to avoid a deadlock
1215  * when the system is shutting down
1216  */
1217 static inline bool
1218 lnet_router_checker_active(void)
1219 {
1220         if (the_lnet.ln_rc_state != LNET_RC_STATE_RUNNING)
1221                 return true;
1222
1223         /*
1224          * Router Checker thread needs to run when routing is enabled in
1225          * order to call lnet_update_ni_status_locked()
1226          */
1227         if (the_lnet.ln_routing)
1228                 return true;
1229
1230         return !list_empty(&the_lnet.ln_routers) &&
1231                 (live_router_check_interval > 0 ||
1232                  dead_router_check_interval > 0);
1233 }
1234
1235 static int
1236 lnet_router_checker(void *arg)
1237 {
1238         struct lnet_peer *rtr;
1239         struct list_head *entry;
1240
1241         cfs_block_allsigs();
1242
1243         while (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING) {
1244                 __u64 version;
1245                 int cpt;
1246                 int cpt2;
1247
1248                 cpt = lnet_net_lock_current();
1249 rescan:
1250                 version = the_lnet.ln_routers_version;
1251
1252                 list_for_each(entry, &the_lnet.ln_routers) {
1253                         rtr = list_entry(entry, struct lnet_peer, lp_rtr_list);
1254
1255                         cpt2 = lnet_cpt_of_nid_locked(rtr->lp_nid);
1256                         if (cpt != cpt2) {
1257                                 lnet_net_unlock(cpt);
1258                                 cpt = cpt2;
1259                                 lnet_net_lock(cpt);
1260                                 /* the routers list has changed */
1261                                 if (version != the_lnet.ln_routers_version)
1262                                         goto rescan;
1263                         }
1264
1265                         lnet_ping_router_locked(rtr);
1266
1267                         /* NB dropped lock */
1268                         if (version != the_lnet.ln_routers_version) {
1269                                 /* the routers list has changed */
1270                                 goto rescan;
1271                         }
1272                 }
1273
1274                 if (the_lnet.ln_routing)
1275                         lnet_update_ni_status_locked();
1276
1277                 lnet_net_unlock(cpt);
1278
1279                 lnet_prune_rc_data(0); /* don't wait for UNLINK */
1280
1281                 /*
1282                  * Call schedule_timeout() here always adds 1 to load average
1283                  * because kernel counts # active tasks as nr_running
1284                  * + nr_uninterruptible.
1285                  */
1286                 /*
1287                  * if there are any routes then wakeup every second.  If
1288                  * there are no routes then sleep indefinitely until woken
1289                  * up by a user adding a route
1290                  */
1291                 if (!lnet_router_checker_active())
1292                         wait_event_interruptible(the_lnet.ln_rc_waitq,
1293                                                  lnet_router_checker_active());
1294                 else
1295                         wait_event_interruptible_timeout(the_lnet.ln_rc_waitq,
1296                                                          false,
1297                                                          cfs_time_seconds(1));
1298         }
1299
1300         lnet_prune_rc_data(1); /* wait for UNLINK */
1301
1302         the_lnet.ln_rc_state = LNET_RC_STATE_SHUTDOWN;
1303         complete(&the_lnet.ln_rc_signal);
1304         /* The unlink event callback will signal final completion */
1305         return 0;
1306 }
1307
1308 void
1309 lnet_destroy_rtrbuf(struct lnet_rtrbuf *rb, int npages)
1310 {
1311         int sz = offsetof(struct lnet_rtrbuf, rb_kiov[npages]);
1312
1313         while (--npages >= 0)
1314                 __free_page(rb->rb_kiov[npages].bv_page);
1315
1316         LIBCFS_FREE(rb, sz);
1317 }
1318
1319 static struct lnet_rtrbuf *
1320 lnet_new_rtrbuf(struct lnet_rtrbufpool *rbp, int cpt)
1321 {
1322         int npages = rbp->rbp_npages;
1323         int sz = offsetof(struct lnet_rtrbuf, rb_kiov[npages]);
1324         struct page *page;
1325         struct lnet_rtrbuf *rb;
1326         int i;
1327
1328         LIBCFS_CPT_ALLOC(rb, lnet_cpt_table(), cpt, sz);
1329         if (!rb)
1330                 return NULL;
1331
1332         rb->rb_pool = rbp;
1333
1334         for (i = 0; i < npages; i++) {
1335                 page = alloc_pages_node(
1336                                 cfs_cpt_spread_node(lnet_cpt_table(), cpt),
1337                                 GFP_KERNEL | __GFP_ZERO, 0);
1338                 if (!page) {
1339                         while (--i >= 0)
1340                                 __free_page(rb->rb_kiov[i].bv_page);
1341
1342                         LIBCFS_FREE(rb, sz);
1343                         return NULL;
1344                 }
1345
1346                 rb->rb_kiov[i].bv_len = PAGE_SIZE;
1347                 rb->rb_kiov[i].bv_offset = 0;
1348                 rb->rb_kiov[i].bv_page = page;
1349         }
1350
1351         return rb;
1352 }
1353
1354 static void
1355 lnet_rtrpool_free_bufs(struct lnet_rtrbufpool *rbp, int cpt)
1356 {
1357         int npages = rbp->rbp_npages;
1358         struct list_head tmp;
1359         struct lnet_rtrbuf *rb;
1360         struct lnet_rtrbuf *temp;
1361
1362         if (!rbp->rbp_nbuffers) /* not initialized or already freed */
1363                 return;
1364
1365         INIT_LIST_HEAD(&tmp);
1366
1367         lnet_net_lock(cpt);
1368         lnet_drop_routed_msgs_locked(&rbp->rbp_msgs, cpt);
1369         list_splice_init(&rbp->rbp_bufs, &tmp);
1370         rbp->rbp_req_nbuffers = 0;
1371         rbp->rbp_nbuffers = 0;
1372         rbp->rbp_credits = 0;
1373         rbp->rbp_mincredits = 0;
1374         lnet_net_unlock(cpt);
1375
1376         /* Free buffers on the free list. */
1377         list_for_each_entry_safe(rb, temp, &tmp, rb_list) {
1378                 list_del(&rb->rb_list);
1379                 lnet_destroy_rtrbuf(rb, npages);
1380         }
1381 }
1382
1383 static int
1384 lnet_rtrpool_adjust_bufs(struct lnet_rtrbufpool *rbp, int nbufs, int cpt)
1385 {
1386         struct list_head rb_list;
1387         struct lnet_rtrbuf *rb;
1388         int num_rb;
1389         int num_buffers = 0;
1390         int old_req_nbufs;
1391         int npages = rbp->rbp_npages;
1392
1393         lnet_net_lock(cpt);
1394         /*
1395          * If we are called for less buffers than already in the pool, we
1396          * just lower the req_nbuffers number and excess buffers will be
1397          * thrown away as they are returned to the free list.  Credits
1398          * then get adjusted as well.
1399          * If we already have enough buffers allocated to serve the
1400          * increase requested, then we can treat that the same way as we
1401          * do the decrease.
1402          */
1403         num_rb = nbufs - rbp->rbp_nbuffers;
1404         if (nbufs <= rbp->rbp_req_nbuffers || num_rb <= 0) {
1405                 rbp->rbp_req_nbuffers = nbufs;
1406                 lnet_net_unlock(cpt);
1407                 return 0;
1408         }
1409         /*
1410          * store the older value of rbp_req_nbuffers and then set it to
1411          * the new request to prevent lnet_return_rx_credits_locked() from
1412          * freeing buffers that we need to keep around
1413          */
1414         old_req_nbufs = rbp->rbp_req_nbuffers;
1415         rbp->rbp_req_nbuffers = nbufs;
1416         lnet_net_unlock(cpt);
1417
1418         INIT_LIST_HEAD(&rb_list);
1419
1420         /*
1421          * allocate the buffers on a local list first.  If all buffers are
1422          * allocated successfully then join this list to the rbp buffer
1423          * list. If not then free all allocated buffers.
1424          */
1425         while (num_rb-- > 0) {
1426                 rb = lnet_new_rtrbuf(rbp, cpt);
1427                 if (!rb) {
1428                         CERROR("Failed to allocate %d route bufs of %d pages\n",
1429                                nbufs, npages);
1430
1431                         lnet_net_lock(cpt);
1432                         rbp->rbp_req_nbuffers = old_req_nbufs;
1433                         lnet_net_unlock(cpt);
1434
1435                         goto failed;
1436                 }
1437
1438                 list_add(&rb->rb_list, &rb_list);
1439                 num_buffers++;
1440         }
1441
1442         lnet_net_lock(cpt);
1443
1444         list_splice_tail(&rb_list, &rbp->rbp_bufs);
1445         rbp->rbp_nbuffers += num_buffers;
1446         rbp->rbp_credits += num_buffers;
1447         rbp->rbp_mincredits = rbp->rbp_credits;
1448         /*
1449          * We need to schedule blocked msg using the newly
1450          * added buffers.
1451          */
1452         while (!list_empty(&rbp->rbp_bufs) &&
1453                !list_empty(&rbp->rbp_msgs))
1454                 lnet_schedule_blocked_locked(rbp);
1455
1456         lnet_net_unlock(cpt);
1457
1458         return 0;
1459
1460 failed:
1461         while (!list_empty(&rb_list)) {
1462                 rb = list_entry(rb_list.next, struct lnet_rtrbuf, rb_list);
1463                 list_del(&rb->rb_list);
1464                 lnet_destroy_rtrbuf(rb, npages);
1465         }
1466
1467         return -ENOMEM;
1468 }
1469
1470 static void
1471 lnet_rtrpool_init(struct lnet_rtrbufpool *rbp, int npages)
1472 {
1473         INIT_LIST_HEAD(&rbp->rbp_msgs);
1474         INIT_LIST_HEAD(&rbp->rbp_bufs);
1475
1476         rbp->rbp_npages = npages;
1477         rbp->rbp_credits = 0;
1478         rbp->rbp_mincredits = 0;
1479 }
1480
1481 void
1482 lnet_rtrpools_free(int keep_pools)
1483 {
1484         struct lnet_rtrbufpool *rtrp;
1485         int i;
1486
1487         if (!the_lnet.ln_rtrpools) /* uninitialized or freed */
1488                 return;
1489
1490         cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
1491                 lnet_rtrpool_free_bufs(&rtrp[LNET_TINY_BUF_IDX], i);
1492                 lnet_rtrpool_free_bufs(&rtrp[LNET_SMALL_BUF_IDX], i);
1493                 lnet_rtrpool_free_bufs(&rtrp[LNET_LARGE_BUF_IDX], i);
1494         }
1495
1496         if (!keep_pools) {
1497                 cfs_percpt_free(the_lnet.ln_rtrpools);
1498                 the_lnet.ln_rtrpools = NULL;
1499         }
1500 }
1501
1502 static int
1503 lnet_nrb_tiny_calculate(void)
1504 {
1505         int nrbs = LNET_NRB_TINY;
1506
1507         if (tiny_router_buffers < 0) {
1508                 LCONSOLE_ERROR_MSG(0x10c,
1509                                    "tiny_router_buffers=%d invalid when routing enabled\n",
1510                                    tiny_router_buffers);
1511                 return -EINVAL;
1512         }
1513
1514         if (tiny_router_buffers > 0)
1515                 nrbs = tiny_router_buffers;
1516
1517         nrbs /= LNET_CPT_NUMBER;
1518         return max(nrbs, LNET_NRB_TINY_MIN);
1519 }
1520
1521 static int
1522 lnet_nrb_small_calculate(void)
1523 {
1524         int nrbs = LNET_NRB_SMALL;
1525
1526         if (small_router_buffers < 0) {
1527                 LCONSOLE_ERROR_MSG(0x10c,
1528                                    "small_router_buffers=%d invalid when routing enabled\n",
1529                                    small_router_buffers);
1530                 return -EINVAL;
1531         }
1532
1533         if (small_router_buffers > 0)
1534                 nrbs = small_router_buffers;
1535
1536         nrbs /= LNET_CPT_NUMBER;
1537         return max(nrbs, LNET_NRB_SMALL_MIN);
1538 }
1539
1540 static int
1541 lnet_nrb_large_calculate(void)
1542 {
1543         int nrbs = LNET_NRB_LARGE;
1544
1545         if (large_router_buffers < 0) {
1546                 LCONSOLE_ERROR_MSG(0x10c,
1547                                    "large_router_buffers=%d invalid when routing enabled\n",
1548                                    large_router_buffers);
1549                 return -EINVAL;
1550         }
1551
1552         if (large_router_buffers > 0)
1553                 nrbs = large_router_buffers;
1554
1555         nrbs /= LNET_CPT_NUMBER;
1556         return max(nrbs, LNET_NRB_LARGE_MIN);
1557 }
1558
1559 int
1560 lnet_rtrpools_alloc(int im_a_router)
1561 {
1562         struct lnet_rtrbufpool *rtrp;
1563         int nrb_tiny;
1564         int nrb_small;
1565         int nrb_large;
1566         int rc;
1567         int i;
1568
1569         if (!strcmp(forwarding, "")) {
1570                 /* not set either way */
1571                 if (!im_a_router)
1572                         return 0;
1573         } else if (!strcmp(forwarding, "disabled")) {
1574                 /* explicitly disabled */
1575                 return 0;
1576         } else if (!strcmp(forwarding, "enabled")) {
1577                 /* explicitly enabled */
1578         } else {
1579                 LCONSOLE_ERROR_MSG(0x10b, "'forwarding' not set to either 'enabled' or 'disabled'\n");
1580                 return -EINVAL;
1581         }
1582
1583         nrb_tiny = lnet_nrb_tiny_calculate();
1584         if (nrb_tiny < 0)
1585                 return -EINVAL;
1586
1587         nrb_small = lnet_nrb_small_calculate();
1588         if (nrb_small < 0)
1589                 return -EINVAL;
1590
1591         nrb_large = lnet_nrb_large_calculate();
1592         if (nrb_large < 0)
1593                 return -EINVAL;
1594
1595         the_lnet.ln_rtrpools = cfs_percpt_alloc(lnet_cpt_table(),
1596                                                 LNET_NRBPOOLS *
1597                                                 sizeof(struct lnet_rtrbufpool));
1598         if (!the_lnet.ln_rtrpools) {
1599                 LCONSOLE_ERROR_MSG(0x10c,
1600                                    "Failed to initialize router buffe pool\n");
1601                 return -ENOMEM;
1602         }
1603
1604         cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
1605                 lnet_rtrpool_init(&rtrp[LNET_TINY_BUF_IDX], 0);
1606                 rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_TINY_BUF_IDX],
1607                                               nrb_tiny, i);
1608                 if (rc)
1609                         goto failed;
1610
1611                 lnet_rtrpool_init(&rtrp[LNET_SMALL_BUF_IDX],
1612                                   LNET_NRB_SMALL_PAGES);
1613                 rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_SMALL_BUF_IDX],
1614                                               nrb_small, i);
1615                 if (rc)
1616                         goto failed;
1617
1618                 lnet_rtrpool_init(&rtrp[LNET_LARGE_BUF_IDX],
1619                                   LNET_NRB_LARGE_PAGES);
1620                 rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_LARGE_BUF_IDX],
1621                                               nrb_large, i);
1622                 if (rc)
1623                         goto failed;
1624         }
1625
1626         lnet_net_lock(LNET_LOCK_EX);
1627         the_lnet.ln_routing = 1;
1628         lnet_net_unlock(LNET_LOCK_EX);
1629
1630         return 0;
1631
1632  failed:
1633         lnet_rtrpools_free(0);
1634         return rc;
1635 }
1636
1637 static int
1638 lnet_rtrpools_adjust_helper(int tiny, int small, int large)
1639 {
1640         int nrb = 0;
1641         int rc = 0;
1642         int i;
1643         struct lnet_rtrbufpool *rtrp;
1644
1645         /*
1646          * If the provided values for each buffer pool are different than the
1647          * configured values, we need to take action.
1648          */
1649         if (tiny >= 0) {
1650                 tiny_router_buffers = tiny;
1651                 nrb = lnet_nrb_tiny_calculate();
1652                 cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
1653                         rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_TINY_BUF_IDX],
1654                                                       nrb, i);
1655                         if (rc)
1656                                 return rc;
1657                 }
1658         }
1659         if (small >= 0) {
1660                 small_router_buffers = small;
1661                 nrb = lnet_nrb_small_calculate();
1662                 cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
1663                         rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_SMALL_BUF_IDX],
1664                                                       nrb, i);
1665                         if (rc)
1666                                 return rc;
1667                 }
1668         }
1669         if (large >= 0) {
1670                 large_router_buffers = large;
1671                 nrb = lnet_nrb_large_calculate();
1672                 cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
1673                         rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_LARGE_BUF_IDX],
1674                                                       nrb, i);
1675                         if (rc)
1676                                 return rc;
1677                 }
1678         }
1679
1680         return 0;
1681 }
1682
1683 int
1684 lnet_rtrpools_adjust(int tiny, int small, int large)
1685 {
1686         /*
1687          * this function doesn't revert the changes if adding new buffers
1688          * failed.  It's up to the user space caller to revert the
1689          * changes.
1690          */
1691         if (!the_lnet.ln_routing)
1692                 return 0;
1693
1694         return lnet_rtrpools_adjust_helper(tiny, small, large);
1695 }
1696
1697 int
1698 lnet_rtrpools_enable(void)
1699 {
1700         int rc = 0;
1701
1702         if (the_lnet.ln_routing)
1703                 return 0;
1704
1705         if (!the_lnet.ln_rtrpools)
1706                 /*
1707                  * If routing is turned off, and we have never
1708                  * initialized the pools before, just call the
1709                  * standard buffer pool allocation routine as
1710                  * if we are just configuring this for the first
1711                  * time.
1712                  */
1713                 rc = lnet_rtrpools_alloc(1);
1714         else
1715                 rc = lnet_rtrpools_adjust_helper(0, 0, 0);
1716         if (rc)
1717                 return rc;
1718
1719         lnet_net_lock(LNET_LOCK_EX);
1720         the_lnet.ln_routing = 1;
1721
1722         the_lnet.ln_ping_info->pi_features &= ~LNET_PING_FEAT_RTE_DISABLED;
1723         lnet_net_unlock(LNET_LOCK_EX);
1724
1725         return rc;
1726 }
1727
1728 void
1729 lnet_rtrpools_disable(void)
1730 {
1731         if (!the_lnet.ln_routing)
1732                 return;
1733
1734         lnet_net_lock(LNET_LOCK_EX);
1735         the_lnet.ln_routing = 0;
1736         the_lnet.ln_ping_info->pi_features |= LNET_PING_FEAT_RTE_DISABLED;
1737
1738         tiny_router_buffers = 0;
1739         small_router_buffers = 0;
1740         large_router_buffers = 0;
1741         lnet_net_unlock(LNET_LOCK_EX);
1742         lnet_rtrpools_free(1);
1743 }
1744
1745 int
1746 lnet_notify(struct lnet_ni *ni, lnet_nid_t nid, int alive, unsigned long when)
1747 {
1748         struct lnet_peer *lp = NULL;
1749         unsigned long now = cfs_time_current();
1750         int cpt = lnet_cpt_of_nid(nid);
1751
1752         LASSERT(!in_interrupt());
1753
1754         CDEBUG(D_NET, "%s notifying %s: %s\n",
1755                !ni ? "userspace" : libcfs_nid2str(ni->ni_nid),
1756                libcfs_nid2str(nid),
1757                alive ? "up" : "down");
1758
1759         if (ni &&
1760             LNET_NIDNET(ni->ni_nid) != LNET_NIDNET(nid)) {
1761                 CWARN("Ignoring notification of %s %s by %s (different net)\n",
1762                       libcfs_nid2str(nid), alive ? "birth" : "death",
1763                       libcfs_nid2str(ni->ni_nid));
1764                 return -EINVAL;
1765         }
1766
1767         /* can't do predictions... */
1768         if (cfs_time_after(when, now)) {
1769                 CWARN("Ignoring prediction from %s of %s %s %ld seconds in the future\n",
1770                       !ni ? "userspace" : libcfs_nid2str(ni->ni_nid),
1771                       libcfs_nid2str(nid), alive ? "up" : "down",
1772                       cfs_duration_sec(cfs_time_sub(when, now)));
1773                 return -EINVAL;
1774         }
1775
1776         if (ni && !alive &&          /* LND telling me she's down */
1777             !auto_down) {                      /* auto-down disabled */
1778                 CDEBUG(D_NET, "Auto-down disabled\n");
1779                 return 0;
1780         }
1781
1782         lnet_net_lock(cpt);
1783
1784         if (the_lnet.ln_shutdown) {
1785                 lnet_net_unlock(cpt);
1786                 return -ESHUTDOWN;
1787         }
1788
1789         lp = lnet_find_peer_locked(the_lnet.ln_peer_tables[cpt], nid);
1790         if (!lp) {
1791                 /* nid not found */
1792                 lnet_net_unlock(cpt);
1793                 CDEBUG(D_NET, "%s not found\n", libcfs_nid2str(nid));
1794                 return 0;
1795         }
1796
1797         /*
1798          * We can't fully trust LND on reporting exact peer last_alive
1799          * if he notifies us about dead peer. For example ksocklnd can
1800          * call us with when == _time_when_the_node_was_booted_ if
1801          * no connections were successfully established
1802          */
1803         if (ni && !alive && when < lp->lp_last_alive)
1804                 when = lp->lp_last_alive;
1805
1806         lnet_notify_locked(lp, !ni, alive, when);
1807
1808         if (ni)
1809                 lnet_ni_notify_locked(ni, lp);
1810
1811         lnet_peer_decref_locked(lp);
1812
1813         lnet_net_unlock(cpt);
1814         return 0;
1815 }
1816 EXPORT_SYMBOL(lnet_notify);