GNU Linux-libre 4.14.257-gnu1
[releases.git] / drivers / staging / lustre / lnet / klnds / socklnd / socklnd_cb.c
1 /*
2  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3  *
4  * Copyright (c) 2011, 2012, Intel Corporation.
5  *
6  *   Author: Zach Brown <zab@zabbo.net>
7  *   Author: Peter J. Braam <braam@clusterfs.com>
8  *   Author: Phil Schwan <phil@clusterfs.com>
9  *   Author: Eric Barton <eric@bartonsoftware.com>
10  *
11  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
12  *
13  *   Portals is free software; you can redistribute it and/or
14  *   modify it under the terms of version 2 of the GNU General Public
15  *   License as published by the Free Software Foundation.
16  *
17  *   Portals is distributed in the hope that it will be useful,
18  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
19  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20  *   GNU General Public License for more details.
21  *
22  */
23
24 #include "socklnd.h"
25
26 struct ksock_tx *
27 ksocknal_alloc_tx(int type, int size)
28 {
29         struct ksock_tx *tx = NULL;
30
31         if (type == KSOCK_MSG_NOOP) {
32                 LASSERT(size == KSOCK_NOOP_TX_SIZE);
33
34                 /* searching for a noop tx in free list */
35                 spin_lock(&ksocknal_data.ksnd_tx_lock);
36
37                 if (!list_empty(&ksocknal_data.ksnd_idle_noop_txs)) {
38                         tx = list_entry(ksocknal_data.ksnd_idle_noop_txs.next,
39                                         struct ksock_tx, tx_list);
40                         LASSERT(tx->tx_desc_size == size);
41                         list_del(&tx->tx_list);
42                 }
43
44                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
45         }
46
47         if (!tx)
48                 LIBCFS_ALLOC(tx, size);
49
50         if (!tx)
51                 return NULL;
52
53         atomic_set(&tx->tx_refcount, 1);
54         tx->tx_zc_aborted = 0;
55         tx->tx_zc_capable = 0;
56         tx->tx_zc_checked = 0;
57         tx->tx_desc_size  = size;
58
59         atomic_inc(&ksocknal_data.ksnd_nactive_txs);
60
61         return tx;
62 }
63
64 struct ksock_tx *
65 ksocknal_alloc_tx_noop(__u64 cookie, int nonblk)
66 {
67         struct ksock_tx *tx;
68
69         tx = ksocknal_alloc_tx(KSOCK_MSG_NOOP, KSOCK_NOOP_TX_SIZE);
70         if (!tx) {
71                 CERROR("Can't allocate noop tx desc\n");
72                 return NULL;
73         }
74
75         tx->tx_conn    = NULL;
76         tx->tx_lnetmsg = NULL;
77         tx->tx_kiov    = NULL;
78         tx->tx_nkiov   = 0;
79         tx->tx_iov     = tx->tx_frags.virt.iov;
80         tx->tx_niov    = 1;
81         tx->tx_nonblk  = nonblk;
82
83         tx->tx_msg.ksm_csum = 0;
84         tx->tx_msg.ksm_type = KSOCK_MSG_NOOP;
85         tx->tx_msg.ksm_zc_cookies[0] = 0;
86         tx->tx_msg.ksm_zc_cookies[1] = cookie;
87
88         return tx;
89 }
90
91 void
92 ksocknal_free_tx(struct ksock_tx *tx)
93 {
94         atomic_dec(&ksocknal_data.ksnd_nactive_txs);
95
96         if (!tx->tx_lnetmsg && tx->tx_desc_size == KSOCK_NOOP_TX_SIZE) {
97                 /* it's a noop tx */
98                 spin_lock(&ksocknal_data.ksnd_tx_lock);
99
100                 list_add(&tx->tx_list, &ksocknal_data.ksnd_idle_noop_txs);
101
102                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
103         } else {
104                 LIBCFS_FREE(tx, tx->tx_desc_size);
105         }
106 }
107
108 static int
109 ksocknal_send_iov(struct ksock_conn *conn, struct ksock_tx *tx)
110 {
111         struct kvec *iov = tx->tx_iov;
112         int nob;
113         int rc;
114
115         LASSERT(tx->tx_niov > 0);
116
117         /* Never touch tx->tx_iov inside ksocknal_lib_send_iov() */
118         rc = ksocknal_lib_send_iov(conn, tx);
119
120         if (rc <= 0)                        /* sent nothing? */
121                 return rc;
122
123         nob = rc;
124         LASSERT(nob <= tx->tx_resid);
125         tx->tx_resid -= nob;
126
127         /* "consume" iov */
128         do {
129                 LASSERT(tx->tx_niov > 0);
130
131                 if (nob < (int)iov->iov_len) {
132                         iov->iov_base = (void *)((char *)iov->iov_base + nob);
133                         iov->iov_len -= nob;
134                         return rc;
135                 }
136
137                 nob -= iov->iov_len;
138                 tx->tx_iov = ++iov;
139                 tx->tx_niov--;
140         } while (nob);
141
142         return rc;
143 }
144
145 static int
146 ksocknal_send_kiov(struct ksock_conn *conn, struct ksock_tx *tx)
147 {
148         struct bio_vec *kiov = tx->tx_kiov;
149         int nob;
150         int rc;
151
152         LASSERT(!tx->tx_niov);
153         LASSERT(tx->tx_nkiov > 0);
154
155         /* Never touch tx->tx_kiov inside ksocknal_lib_send_kiov() */
156         rc = ksocknal_lib_send_kiov(conn, tx);
157
158         if (rc <= 0)                        /* sent nothing? */
159                 return rc;
160
161         nob = rc;
162         LASSERT(nob <= tx->tx_resid);
163         tx->tx_resid -= nob;
164
165         /* "consume" kiov */
166         do {
167                 LASSERT(tx->tx_nkiov > 0);
168
169                 if (nob < (int)kiov->bv_len) {
170                         kiov->bv_offset += nob;
171                         kiov->bv_len -= nob;
172                         return rc;
173                 }
174
175                 nob -= (int)kiov->bv_len;
176                 tx->tx_kiov = ++kiov;
177                 tx->tx_nkiov--;
178         } while (nob);
179
180         return rc;
181 }
182
183 static int
184 ksocknal_transmit(struct ksock_conn *conn, struct ksock_tx *tx)
185 {
186         int rc;
187         int bufnob;
188
189         if (ksocknal_data.ksnd_stall_tx) {
190                 set_current_state(TASK_UNINTERRUPTIBLE);
191                 schedule_timeout(cfs_time_seconds(ksocknal_data.ksnd_stall_tx));
192         }
193
194         LASSERT(tx->tx_resid);
195
196         rc = ksocknal_connsock_addref(conn);
197         if (rc) {
198                 LASSERT(conn->ksnc_closing);
199                 return -ESHUTDOWN;
200         }
201
202         do {
203                 if (ksocknal_data.ksnd_enomem_tx > 0) {
204                         /* testing... */
205                         ksocknal_data.ksnd_enomem_tx--;
206                         rc = -EAGAIN;
207                 } else if (tx->tx_niov) {
208                         rc = ksocknal_send_iov(conn, tx);
209                 } else {
210                         rc = ksocknal_send_kiov(conn, tx);
211                 }
212
213                 bufnob = conn->ksnc_sock->sk->sk_wmem_queued;
214                 if (rc > 0)                  /* sent something? */
215                         conn->ksnc_tx_bufnob += rc; /* account it */
216
217                 if (bufnob < conn->ksnc_tx_bufnob) {
218                         /*
219                          * allocated send buffer bytes < computed; infer
220                          * something got ACKed
221                          */
222                         conn->ksnc_tx_deadline =
223                                 cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
224                         conn->ksnc_peer->ksnp_last_alive = cfs_time_current();
225                         conn->ksnc_tx_bufnob = bufnob;
226                         mb();
227                 }
228
229                 if (rc <= 0) { /* Didn't write anything? */
230
231                         if (!rc) /* some stacks return 0 instead of -EAGAIN */
232                                 rc = -EAGAIN;
233
234                         /* Check if EAGAIN is due to memory pressure */
235                         if (rc == -EAGAIN && ksocknal_lib_memory_pressure(conn))
236                                 rc = -ENOMEM;
237
238                         break;
239                 }
240
241                 /* socket's wmem_queued now includes 'rc' bytes */
242                 atomic_sub(rc, &conn->ksnc_tx_nob);
243                 rc = 0;
244
245         } while (tx->tx_resid);
246
247         ksocknal_connsock_decref(conn);
248         return rc;
249 }
250
251 static int
252 ksocknal_recv_iov(struct ksock_conn *conn)
253 {
254         struct kvec *iov = conn->ksnc_rx_iov;
255         int nob;
256         int rc;
257
258         LASSERT(conn->ksnc_rx_niov > 0);
259
260         /*
261          * Never touch conn->ksnc_rx_iov or change connection
262          * status inside ksocknal_lib_recv_iov
263          */
264         rc = ksocknal_lib_recv_iov(conn);
265
266         if (rc <= 0)
267                 return rc;
268
269         /* received something... */
270         nob = rc;
271
272         conn->ksnc_peer->ksnp_last_alive = cfs_time_current();
273         conn->ksnc_rx_deadline =
274                 cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
275         mb();                  /* order with setting rx_started */
276         conn->ksnc_rx_started = 1;
277
278         conn->ksnc_rx_nob_wanted -= nob;
279         conn->ksnc_rx_nob_left -= nob;
280
281         do {
282                 LASSERT(conn->ksnc_rx_niov > 0);
283
284                 if (nob < (int)iov->iov_len) {
285                         iov->iov_len -= nob;
286                         iov->iov_base += nob;
287                         return -EAGAIN;
288                 }
289
290                 nob -= iov->iov_len;
291                 conn->ksnc_rx_iov = ++iov;
292                 conn->ksnc_rx_niov--;
293         } while (nob);
294
295         return rc;
296 }
297
298 static int
299 ksocknal_recv_kiov(struct ksock_conn *conn)
300 {
301         struct bio_vec *kiov = conn->ksnc_rx_kiov;
302         int nob;
303         int rc;
304
305         LASSERT(conn->ksnc_rx_nkiov > 0);
306
307         /*
308          * Never touch conn->ksnc_rx_kiov or change connection
309          * status inside ksocknal_lib_recv_iov
310          */
311         rc = ksocknal_lib_recv_kiov(conn);
312
313         if (rc <= 0)
314                 return rc;
315
316         /* received something... */
317         nob = rc;
318
319         conn->ksnc_peer->ksnp_last_alive = cfs_time_current();
320         conn->ksnc_rx_deadline =
321                 cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
322         mb();                  /* order with setting rx_started */
323         conn->ksnc_rx_started = 1;
324
325         conn->ksnc_rx_nob_wanted -= nob;
326         conn->ksnc_rx_nob_left -= nob;
327
328         do {
329                 LASSERT(conn->ksnc_rx_nkiov > 0);
330
331                 if (nob < (int)kiov->bv_len) {
332                         kiov->bv_offset += nob;
333                         kiov->bv_len -= nob;
334                         return -EAGAIN;
335                 }
336
337                 nob -= kiov->bv_len;
338                 conn->ksnc_rx_kiov = ++kiov;
339                 conn->ksnc_rx_nkiov--;
340         } while (nob);
341
342         return 1;
343 }
344
345 static int
346 ksocknal_receive(struct ksock_conn *conn)
347 {
348         /*
349          * Return 1 on success, 0 on EOF, < 0 on error.
350          * Caller checks ksnc_rx_nob_wanted to determine
351          * progress/completion.
352          */
353         int rc;
354
355         if (ksocknal_data.ksnd_stall_rx) {
356                 set_current_state(TASK_UNINTERRUPTIBLE);
357                 schedule_timeout(cfs_time_seconds(ksocknal_data.ksnd_stall_rx));
358         }
359
360         rc = ksocknal_connsock_addref(conn);
361         if (rc) {
362                 LASSERT(conn->ksnc_closing);
363                 return -ESHUTDOWN;
364         }
365
366         for (;;) {
367                 if (conn->ksnc_rx_niov)
368                         rc = ksocknal_recv_iov(conn);
369                 else
370                         rc = ksocknal_recv_kiov(conn);
371
372                 if (rc <= 0) {
373                         /* error/EOF or partial receive */
374                         if (rc == -EAGAIN) {
375                                 rc = 1;
376                         } else if (!rc && conn->ksnc_rx_started) {
377                                 /* EOF in the middle of a message */
378                                 rc = -EPROTO;
379                         }
380                         break;
381                 }
382
383                 /* Completed a fragment */
384
385                 if (!conn->ksnc_rx_nob_wanted) {
386                         rc = 1;
387                         break;
388                 }
389         }
390
391         ksocknal_connsock_decref(conn);
392         return rc;
393 }
394
395 void
396 ksocknal_tx_done(struct lnet_ni *ni, struct ksock_tx *tx)
397 {
398         struct lnet_msg *lnetmsg = tx->tx_lnetmsg;
399         int rc = (!tx->tx_resid && !tx->tx_zc_aborted) ? 0 : -EIO;
400
401         LASSERT(ni || tx->tx_conn);
402
403         if (tx->tx_conn)
404                 ksocknal_conn_decref(tx->tx_conn);
405
406         if (!ni && tx->tx_conn)
407                 ni = tx->tx_conn->ksnc_peer->ksnp_ni;
408
409         ksocknal_free_tx(tx);
410         if (lnetmsg) /* KSOCK_MSG_NOOP go without lnetmsg */
411                 lnet_finalize(ni, lnetmsg, rc);
412 }
413
414 void
415 ksocknal_txlist_done(struct lnet_ni *ni, struct list_head *txlist, int error)
416 {
417         struct ksock_tx *tx;
418
419         while (!list_empty(txlist)) {
420                 tx = list_entry(txlist->next, struct ksock_tx, tx_list);
421
422                 if (error && tx->tx_lnetmsg) {
423                         CNETERR("Deleting packet type %d len %d %s->%s\n",
424                                 le32_to_cpu(tx->tx_lnetmsg->msg_hdr.type),
425                                 le32_to_cpu(tx->tx_lnetmsg->msg_hdr.payload_length),
426                                 libcfs_nid2str(le64_to_cpu(tx->tx_lnetmsg->msg_hdr.src_nid)),
427                                 libcfs_nid2str(le64_to_cpu(tx->tx_lnetmsg->msg_hdr.dest_nid)));
428                 } else if (error) {
429                         CNETERR("Deleting noop packet\n");
430                 }
431
432                 list_del(&tx->tx_list);
433
434                 LASSERT(atomic_read(&tx->tx_refcount) == 1);
435                 ksocknal_tx_done(ni, tx);
436         }
437 }
438
439 static void
440 ksocknal_check_zc_req(struct ksock_tx *tx)
441 {
442         struct ksock_conn *conn = tx->tx_conn;
443         struct ksock_peer *peer = conn->ksnc_peer;
444
445         /*
446          * Set tx_msg.ksm_zc_cookies[0] to a unique non-zero cookie and add tx
447          * to ksnp_zc_req_list if some fragment of this message should be sent
448          * zero-copy.  Our peer will send an ACK containing this cookie when
449          * she has received this message to tell us we can signal completion.
450          * tx_msg.ksm_zc_cookies[0] remains non-zero while tx is on
451          * ksnp_zc_req_list.
452          */
453         LASSERT(tx->tx_msg.ksm_type != KSOCK_MSG_NOOP);
454         LASSERT(tx->tx_zc_capable);
455
456         tx->tx_zc_checked = 1;
457
458         if (conn->ksnc_proto == &ksocknal_protocol_v1x ||
459             !conn->ksnc_zc_capable)
460                 return;
461
462         /*
463          * assign cookie and queue tx to pending list, it will be released when
464          * a matching ack is received. See ksocknal_handle_zcack()
465          */
466         ksocknal_tx_addref(tx);
467
468         spin_lock(&peer->ksnp_lock);
469
470         /* ZC_REQ is going to be pinned to the peer */
471         tx->tx_deadline =
472                 cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
473
474         LASSERT(!tx->tx_msg.ksm_zc_cookies[0]);
475
476         tx->tx_msg.ksm_zc_cookies[0] = peer->ksnp_zc_next_cookie++;
477
478         if (!peer->ksnp_zc_next_cookie)
479                 peer->ksnp_zc_next_cookie = SOCKNAL_KEEPALIVE_PING + 1;
480
481         list_add_tail(&tx->tx_zc_list, &peer->ksnp_zc_req_list);
482
483         spin_unlock(&peer->ksnp_lock);
484 }
485
486 static void
487 ksocknal_uncheck_zc_req(struct ksock_tx *tx)
488 {
489         struct ksock_peer *peer = tx->tx_conn->ksnc_peer;
490
491         LASSERT(tx->tx_msg.ksm_type != KSOCK_MSG_NOOP);
492         LASSERT(tx->tx_zc_capable);
493
494         tx->tx_zc_checked = 0;
495
496         spin_lock(&peer->ksnp_lock);
497
498         if (!tx->tx_msg.ksm_zc_cookies[0]) {
499                 /* Not waiting for an ACK */
500                 spin_unlock(&peer->ksnp_lock);
501                 return;
502         }
503
504         tx->tx_msg.ksm_zc_cookies[0] = 0;
505         list_del(&tx->tx_zc_list);
506
507         spin_unlock(&peer->ksnp_lock);
508
509         ksocknal_tx_decref(tx);
510 }
511
512 static int
513 ksocknal_process_transmit(struct ksock_conn *conn, struct ksock_tx *tx)
514 {
515         int rc;
516
517         if (tx->tx_zc_capable && !tx->tx_zc_checked)
518                 ksocknal_check_zc_req(tx);
519
520         rc = ksocknal_transmit(conn, tx);
521
522         CDEBUG(D_NET, "send(%d) %d\n", tx->tx_resid, rc);
523
524         if (!tx->tx_resid) {
525                 /* Sent everything OK */
526                 LASSERT(!rc);
527
528                 return 0;
529         }
530
531         if (rc == -EAGAIN)
532                 return rc;
533
534         if (rc == -ENOMEM) {
535                 static int counter;
536
537                 counter++;   /* exponential backoff warnings */
538                 if ((counter & (-counter)) == counter)
539                         CWARN("%u ENOMEM tx %p\n", counter, conn);
540
541                 /* Queue on ksnd_enomem_conns for retry after a timeout */
542                 spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
543
544                 /* enomem list takes over scheduler's ref... */
545                 LASSERT(conn->ksnc_tx_scheduled);
546                 list_add_tail(&conn->ksnc_tx_list,
547                               &ksocknal_data.ksnd_enomem_conns);
548                 if (!cfs_time_aftereq(cfs_time_add(cfs_time_current(),
549                                                    SOCKNAL_ENOMEM_RETRY),
550                                    ksocknal_data.ksnd_reaper_waketime))
551                         wake_up(&ksocknal_data.ksnd_reaper_waitq);
552
553                 spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
554                 return rc;
555         }
556
557         /* Actual error */
558         LASSERT(rc < 0);
559
560         if (!conn->ksnc_closing) {
561                 switch (rc) {
562                 case -ECONNRESET:
563                         LCONSOLE_WARN("Host %pI4h reset our connection while we were sending data; it may have rebooted.\n",
564                                       &conn->ksnc_ipaddr);
565                         break;
566                 default:
567                         LCONSOLE_WARN("There was an unexpected network error while writing to %pI4h: %d.\n",
568                                       &conn->ksnc_ipaddr, rc);
569                         break;
570                 }
571                 CDEBUG(D_NET, "[%p] Error %d on write to %s ip %pI4h:%d\n",
572                        conn, rc,
573                        libcfs_id2str(conn->ksnc_peer->ksnp_id),
574                        &conn->ksnc_ipaddr,
575                        conn->ksnc_port);
576         }
577
578         if (tx->tx_zc_checked)
579                 ksocknal_uncheck_zc_req(tx);
580
581         /* it's not an error if conn is being closed */
582         ksocknal_close_conn_and_siblings(conn, (conn->ksnc_closing) ? 0 : rc);
583
584         return rc;
585 }
586
587 static void
588 ksocknal_launch_connection_locked(struct ksock_route *route)
589 {
590         /* called holding write lock on ksnd_global_lock */
591
592         LASSERT(!route->ksnr_scheduled);
593         LASSERT(!route->ksnr_connecting);
594         LASSERT(ksocknal_route_mask() & ~route->ksnr_connected);
595
596         route->ksnr_scheduled = 1;            /* scheduling conn for connd */
597         ksocknal_route_addref(route);      /* extra ref for connd */
598
599         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
600
601         list_add_tail(&route->ksnr_connd_list,
602                       &ksocknal_data.ksnd_connd_routes);
603         wake_up(&ksocknal_data.ksnd_connd_waitq);
604
605         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
606 }
607
608 void
609 ksocknal_launch_all_connections_locked(struct ksock_peer *peer)
610 {
611         struct ksock_route *route;
612
613         /* called holding write lock on ksnd_global_lock */
614         for (;;) {
615                 /* launch any/all connections that need it */
616                 route = ksocknal_find_connectable_route_locked(peer);
617                 if (!route)
618                         return;
619
620                 ksocknal_launch_connection_locked(route);
621         }
622 }
623
624 struct ksock_conn *
625 ksocknal_find_conn_locked(struct ksock_peer *peer, struct ksock_tx *tx,
626                           int nonblk)
627 {
628         struct list_head *tmp;
629         struct ksock_conn *conn;
630         struct ksock_conn *typed = NULL;
631         struct ksock_conn *fallback = NULL;
632         int tnob = 0;
633         int fnob = 0;
634
635         list_for_each(tmp, &peer->ksnp_conns) {
636                 struct ksock_conn *c;
637                 int nob, rc;
638
639                 c = list_entry(tmp, struct ksock_conn, ksnc_list);
640                 nob = atomic_read(&c->ksnc_tx_nob) +
641                       c->ksnc_sock->sk->sk_wmem_queued;
642
643                 LASSERT(!c->ksnc_closing);
644                 LASSERT(c->ksnc_proto &&
645                         c->ksnc_proto->pro_match_tx);
646
647                 rc = c->ksnc_proto->pro_match_tx(c, tx, nonblk);
648
649                 switch (rc) {
650                 default:
651                         LBUG();
652                 case SOCKNAL_MATCH_NO: /* protocol rejected the tx */
653                         continue;
654
655                 case SOCKNAL_MATCH_YES: /* typed connection */
656                         if (!typed || tnob > nob ||
657                             (tnob == nob && *ksocknal_tunables.ksnd_round_robin &&
658                              cfs_time_after(typed->ksnc_tx_last_post, c->ksnc_tx_last_post))) {
659                                 typed = c;
660                                 tnob  = nob;
661                         }
662                         break;
663
664                 case SOCKNAL_MATCH_MAY: /* fallback connection */
665                         if (!fallback || fnob > nob ||
666                             (fnob == nob && *ksocknal_tunables.ksnd_round_robin &&
667                              cfs_time_after(fallback->ksnc_tx_last_post, c->ksnc_tx_last_post))) {
668                                 fallback = c;
669                                 fnob = nob;
670                         }
671                         break;
672                 }
673         }
674
675         /* prefer the typed selection */
676         conn = (typed) ? typed : fallback;
677
678         if (conn)
679                 conn->ksnc_tx_last_post = cfs_time_current();
680
681         return conn;
682 }
683
684 void
685 ksocknal_tx_prep(struct ksock_conn *conn, struct ksock_tx *tx)
686 {
687         conn->ksnc_proto->pro_pack(tx);
688
689         atomic_add(tx->tx_nob, &conn->ksnc_tx_nob);
690         ksocknal_conn_addref(conn); /* +1 ref for tx */
691         tx->tx_conn = conn;
692 }
693
694 void
695 ksocknal_queue_tx_locked(struct ksock_tx *tx, struct ksock_conn *conn)
696 {
697         struct ksock_sched *sched = conn->ksnc_scheduler;
698         struct ksock_msg *msg = &tx->tx_msg;
699         struct ksock_tx *ztx = NULL;
700         int bufnob = 0;
701
702         /*
703          * called holding global lock (read or irq-write) and caller may
704          * not have dropped this lock between finding conn and calling me,
705          * so we don't need the {get,put}connsock dance to deref
706          * ksnc_sock...
707          */
708         LASSERT(!conn->ksnc_closing);
709
710         CDEBUG(D_NET, "Sending to %s ip %pI4h:%d\n",
711                libcfs_id2str(conn->ksnc_peer->ksnp_id),
712                &conn->ksnc_ipaddr, conn->ksnc_port);
713
714         ksocknal_tx_prep(conn, tx);
715
716         /*
717          * Ensure the frags we've been given EXACTLY match the number of
718          * bytes we want to send.  Many TCP/IP stacks disregard any total
719          * size parameters passed to them and just look at the frags.
720          *
721          * We always expect at least 1 mapped fragment containing the
722          * complete ksocknal message header.
723          */
724         LASSERT(lnet_iov_nob(tx->tx_niov, tx->tx_iov) +
725                 lnet_kiov_nob(tx->tx_nkiov, tx->tx_kiov) ==
726                 (unsigned int)tx->tx_nob);
727         LASSERT(tx->tx_niov >= 1);
728         LASSERT(tx->tx_resid == tx->tx_nob);
729
730         CDEBUG(D_NET, "Packet %p type %d, nob %d niov %d nkiov %d\n",
731                tx, (tx->tx_lnetmsg) ? tx->tx_lnetmsg->msg_hdr.type :
732                                               KSOCK_MSG_NOOP,
733                tx->tx_nob, tx->tx_niov, tx->tx_nkiov);
734
735         /*
736          * FIXME: SOCK_WMEM_QUEUED and SOCK_ERROR could block in __DARWIN8__
737          * but they're used inside spinlocks a lot.
738          */
739         bufnob = conn->ksnc_sock->sk->sk_wmem_queued;
740         spin_lock_bh(&sched->kss_lock);
741
742         if (list_empty(&conn->ksnc_tx_queue) && !bufnob) {
743                 /* First packet starts the timeout */
744                 conn->ksnc_tx_deadline =
745                         cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
746                 if (conn->ksnc_tx_bufnob > 0) /* something got ACKed */
747                         conn->ksnc_peer->ksnp_last_alive = cfs_time_current();
748                 conn->ksnc_tx_bufnob = 0;
749                 mb(); /* order with adding to tx_queue */
750         }
751
752         if (msg->ksm_type == KSOCK_MSG_NOOP) {
753                 /*
754                  * The packet is noop ZC ACK, try to piggyback the ack_cookie
755                  * on a normal packet so I don't need to send it
756                  */
757                 LASSERT(msg->ksm_zc_cookies[1]);
758                 LASSERT(conn->ksnc_proto->pro_queue_tx_zcack);
759
760                 /* ZC ACK piggybacked on ztx release tx later */
761                 if (conn->ksnc_proto->pro_queue_tx_zcack(conn, tx, 0))
762                         ztx = tx;
763         } else {
764                 /*
765                  * It's a normal packet - can it piggback a noop zc-ack that
766                  * has been queued already?
767                  */
768                 LASSERT(!msg->ksm_zc_cookies[1]);
769                 LASSERT(conn->ksnc_proto->pro_queue_tx_msg);
770
771                 ztx = conn->ksnc_proto->pro_queue_tx_msg(conn, tx);
772                 /* ztx will be released later */
773         }
774
775         if (ztx) {
776                 atomic_sub(ztx->tx_nob, &conn->ksnc_tx_nob);
777                 list_add_tail(&ztx->tx_list, &sched->kss_zombie_noop_txs);
778         }
779
780         if (conn->ksnc_tx_ready &&      /* able to send */
781             !conn->ksnc_tx_scheduled) { /* not scheduled to send */
782                 /* +1 ref for scheduler */
783                 ksocknal_conn_addref(conn);
784                 list_add_tail(&conn->ksnc_tx_list, &sched->kss_tx_conns);
785                 conn->ksnc_tx_scheduled = 1;
786                 wake_up(&sched->kss_waitq);
787         }
788
789         spin_unlock_bh(&sched->kss_lock);
790 }
791
792 struct ksock_route *
793 ksocknal_find_connectable_route_locked(struct ksock_peer *peer)
794 {
795         unsigned long now = cfs_time_current();
796         struct list_head *tmp;
797         struct ksock_route *route;
798
799         list_for_each(tmp, &peer->ksnp_routes) {
800                 route = list_entry(tmp, struct ksock_route, ksnr_list);
801
802                 LASSERT(!route->ksnr_connecting || route->ksnr_scheduled);
803
804                 /* connections being established */
805                 if (route->ksnr_scheduled)
806                         continue;
807
808                 /* all route types connected ? */
809                 if (!(ksocknal_route_mask() & ~route->ksnr_connected))
810                         continue;
811
812                 if (!(!route->ksnr_retry_interval || /* first attempt */
813                       cfs_time_aftereq(now, route->ksnr_timeout))) {
814                         CDEBUG(D_NET,
815                                "Too soon to retry route %pI4h (cnted %d, interval %ld, %ld secs later)\n",
816                                &route->ksnr_ipaddr,
817                                route->ksnr_connected,
818                                route->ksnr_retry_interval,
819                                cfs_duration_sec(route->ksnr_timeout - now));
820                         continue;
821                 }
822
823                 return route;
824         }
825
826         return NULL;
827 }
828
829 struct ksock_route *
830 ksocknal_find_connecting_route_locked(struct ksock_peer *peer)
831 {
832         struct list_head *tmp;
833         struct ksock_route *route;
834
835         list_for_each(tmp, &peer->ksnp_routes) {
836                 route = list_entry(tmp, struct ksock_route, ksnr_list);
837
838                 LASSERT(!route->ksnr_connecting || route->ksnr_scheduled);
839
840                 if (route->ksnr_scheduled)
841                         return route;
842         }
843
844         return NULL;
845 }
846
847 int
848 ksocknal_launch_packet(struct lnet_ni *ni, struct ksock_tx *tx,
849                        struct lnet_process_id id)
850 {
851         struct ksock_peer *peer;
852         struct ksock_conn *conn;
853         rwlock_t *g_lock;
854         int retry;
855         int rc;
856
857         LASSERT(!tx->tx_conn);
858
859         g_lock = &ksocknal_data.ksnd_global_lock;
860
861         for (retry = 0;; retry = 1) {
862                 read_lock(g_lock);
863                 peer = ksocknal_find_peer_locked(ni, id);
864                 if (peer) {
865                         if (!ksocknal_find_connectable_route_locked(peer)) {
866                                 conn = ksocknal_find_conn_locked(peer, tx, tx->tx_nonblk);
867                                 if (conn) {
868                                         /*
869                                          * I've got no routes that need to be
870                                          * connecting and I do have an actual
871                                          * connection...
872                                          */
873                                         ksocknal_queue_tx_locked(tx, conn);
874                                         read_unlock(g_lock);
875                                         return 0;
876                                 }
877                         }
878                 }
879
880                 /* I'll need a write lock... */
881                 read_unlock(g_lock);
882
883                 write_lock_bh(g_lock);
884
885                 peer = ksocknal_find_peer_locked(ni, id);
886                 if (peer)
887                         break;
888
889                 write_unlock_bh(g_lock);
890
891                 if (id.pid & LNET_PID_USERFLAG) {
892                         CERROR("Refusing to create a connection to userspace process %s\n",
893                                libcfs_id2str(id));
894                         return -EHOSTUNREACH;
895                 }
896
897                 if (retry) {
898                         CERROR("Can't find peer %s\n", libcfs_id2str(id));
899                         return -EHOSTUNREACH;
900                 }
901
902                 rc = ksocknal_add_peer(ni, id,
903                                        LNET_NIDADDR(id.nid),
904                                        lnet_acceptor_port());
905                 if (rc) {
906                         CERROR("Can't add peer %s: %d\n",
907                                libcfs_id2str(id), rc);
908                         return rc;
909                 }
910         }
911
912         ksocknal_launch_all_connections_locked(peer);
913
914         conn = ksocknal_find_conn_locked(peer, tx, tx->tx_nonblk);
915         if (conn) {
916                 /* Connection exists; queue message on it */
917                 ksocknal_queue_tx_locked(tx, conn);
918                 write_unlock_bh(g_lock);
919                 return 0;
920         }
921
922         if (peer->ksnp_accepting > 0 ||
923             ksocknal_find_connecting_route_locked(peer)) {
924                 /* the message is going to be pinned to the peer */
925                 tx->tx_deadline =
926                         cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
927
928                 /* Queue the message until a connection is established */
929                 list_add_tail(&tx->tx_list, &peer->ksnp_tx_queue);
930                 write_unlock_bh(g_lock);
931                 return 0;
932         }
933
934         write_unlock_bh(g_lock);
935
936         /* NB Routes may be ignored if connections to them failed recently */
937         CNETERR("No usable routes to %s\n", libcfs_id2str(id));
938         return -EHOSTUNREACH;
939 }
940
941 int
942 ksocknal_send(struct lnet_ni *ni, void *private, struct lnet_msg *lntmsg)
943 {
944         int mpflag = 1;
945         int type = lntmsg->msg_type;
946         struct lnet_process_id target = lntmsg->msg_target;
947         unsigned int payload_niov = lntmsg->msg_niov;
948         struct kvec *payload_iov = lntmsg->msg_iov;
949         struct bio_vec *payload_kiov = lntmsg->msg_kiov;
950         unsigned int payload_offset = lntmsg->msg_offset;
951         unsigned int payload_nob = lntmsg->msg_len;
952         struct ksock_tx *tx;
953         int desc_size;
954         int rc;
955
956         /*
957          * NB 'private' is different depending on what we're sending.
958          * Just ignore it...
959          */
960         CDEBUG(D_NET, "sending %u bytes in %d frags to %s\n",
961                payload_nob, payload_niov, libcfs_id2str(target));
962
963         LASSERT(!payload_nob || payload_niov > 0);
964         LASSERT(payload_niov <= LNET_MAX_IOV);
965         /* payload is either all vaddrs or all pages */
966         LASSERT(!(payload_kiov && payload_iov));
967         LASSERT(!in_interrupt());
968
969         if (payload_iov)
970                 desc_size = offsetof(struct ksock_tx,
971                                      tx_frags.virt.iov[1 + payload_niov]);
972         else
973                 desc_size = offsetof(struct ksock_tx,
974                                      tx_frags.paged.kiov[payload_niov]);
975
976         if (lntmsg->msg_vmflush)
977                 mpflag = cfs_memory_pressure_get_and_set();
978         tx = ksocknal_alloc_tx(KSOCK_MSG_LNET, desc_size);
979         if (!tx) {
980                 CERROR("Can't allocate tx desc type %d size %d\n",
981                        type, desc_size);
982                 if (lntmsg->msg_vmflush)
983                         cfs_memory_pressure_restore(mpflag);
984                 return -ENOMEM;
985         }
986
987         tx->tx_conn = NULL;                  /* set when assigned a conn */
988         tx->tx_lnetmsg = lntmsg;
989
990         if (payload_iov) {
991                 tx->tx_kiov = NULL;
992                 tx->tx_nkiov = 0;
993                 tx->tx_iov = tx->tx_frags.virt.iov;
994                 tx->tx_niov = 1 +
995                               lnet_extract_iov(payload_niov, &tx->tx_iov[1],
996                                                payload_niov, payload_iov,
997                                                payload_offset, payload_nob);
998         } else {
999                 tx->tx_niov = 1;
1000                 tx->tx_iov = &tx->tx_frags.paged.iov;
1001                 tx->tx_kiov = tx->tx_frags.paged.kiov;
1002                 tx->tx_nkiov = lnet_extract_kiov(payload_niov, tx->tx_kiov,
1003                                                  payload_niov, payload_kiov,
1004                                                  payload_offset, payload_nob);
1005
1006                 if (payload_nob >= *ksocknal_tunables.ksnd_zc_min_payload)
1007                         tx->tx_zc_capable = 1;
1008         }
1009
1010         tx->tx_msg.ksm_csum = 0;
1011         tx->tx_msg.ksm_type = KSOCK_MSG_LNET;
1012         tx->tx_msg.ksm_zc_cookies[0] = 0;
1013         tx->tx_msg.ksm_zc_cookies[1] = 0;
1014
1015         /* The first fragment will be set later in pro_pack */
1016         rc = ksocknal_launch_packet(ni, tx, target);
1017         if (!mpflag)
1018                 cfs_memory_pressure_restore(mpflag);
1019
1020         if (!rc)
1021                 return 0;
1022
1023         ksocknal_free_tx(tx);
1024         return -EIO;
1025 }
1026
1027 int
1028 ksocknal_thread_start(int (*fn)(void *arg), void *arg, char *name)
1029 {
1030         struct task_struct *task = kthread_run(fn, arg, "%s", name);
1031
1032         if (IS_ERR(task))
1033                 return PTR_ERR(task);
1034
1035         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1036         ksocknal_data.ksnd_nthreads++;
1037         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1038         return 0;
1039 }
1040
1041 void
1042 ksocknal_thread_fini(void)
1043 {
1044         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1045         ksocknal_data.ksnd_nthreads--;
1046         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1047 }
1048
1049 int
1050 ksocknal_new_packet(struct ksock_conn *conn, int nob_to_skip)
1051 {
1052         static char ksocknal_slop_buffer[4096];
1053
1054         int nob;
1055         unsigned int niov;
1056         int skipped;
1057
1058         LASSERT(conn->ksnc_proto);
1059
1060         if (*ksocknal_tunables.ksnd_eager_ack & conn->ksnc_type) {
1061                 /* Remind the socket to ack eagerly... */
1062                 ksocknal_lib_eager_ack(conn);
1063         }
1064
1065         if (!nob_to_skip) {      /* right at next packet boundary now */
1066                 conn->ksnc_rx_started = 0;
1067                 mb();                  /* racing with timeout thread */
1068
1069                 switch (conn->ksnc_proto->pro_version) {
1070                 case  KSOCK_PROTO_V2:
1071                 case  KSOCK_PROTO_V3:
1072                         conn->ksnc_rx_state = SOCKNAL_RX_KSM_HEADER;
1073                         conn->ksnc_rx_iov = (struct kvec *)&conn->ksnc_rx_iov_space;
1074                         conn->ksnc_rx_iov[0].iov_base = &conn->ksnc_msg;
1075
1076                         conn->ksnc_rx_nob_wanted = offsetof(struct ksock_msg, ksm_u);
1077                         conn->ksnc_rx_nob_left = offsetof(struct ksock_msg, ksm_u);
1078                         conn->ksnc_rx_iov[0].iov_len = offsetof(struct ksock_msg, ksm_u);
1079                         break;
1080
1081                 case KSOCK_PROTO_V1:
1082                         /* Receiving bare struct lnet_hdr */
1083                         conn->ksnc_rx_state = SOCKNAL_RX_LNET_HEADER;
1084                         conn->ksnc_rx_nob_wanted = sizeof(struct lnet_hdr);
1085                         conn->ksnc_rx_nob_left = sizeof(struct lnet_hdr);
1086
1087                         conn->ksnc_rx_iov = (struct kvec *)&conn->ksnc_rx_iov_space;
1088                         conn->ksnc_rx_iov[0].iov_base = &conn->ksnc_msg.ksm_u.lnetmsg;
1089                         conn->ksnc_rx_iov[0].iov_len = sizeof(struct lnet_hdr);
1090                         break;
1091
1092                 default:
1093                         LBUG();
1094                 }
1095                 conn->ksnc_rx_niov = 1;
1096
1097                 conn->ksnc_rx_kiov = NULL;
1098                 conn->ksnc_rx_nkiov = 0;
1099                 conn->ksnc_rx_csum = ~0;
1100                 return 1;
1101         }
1102
1103         /*
1104          * Set up to skip as much as possible now.  If there's more left
1105          * (ran out of iov entries) we'll get called again
1106          */
1107         conn->ksnc_rx_state = SOCKNAL_RX_SLOP;
1108         conn->ksnc_rx_nob_left = nob_to_skip;
1109         conn->ksnc_rx_iov = (struct kvec *)&conn->ksnc_rx_iov_space;
1110         skipped = 0;
1111         niov = 0;
1112
1113         do {
1114                 nob = min_t(int, nob_to_skip, sizeof(ksocknal_slop_buffer));
1115
1116                 conn->ksnc_rx_iov[niov].iov_base = ksocknal_slop_buffer;
1117                 conn->ksnc_rx_iov[niov].iov_len  = nob;
1118                 niov++;
1119                 skipped += nob;
1120                 nob_to_skip -= nob;
1121
1122         } while (nob_to_skip &&    /* mustn't overflow conn's rx iov */
1123                  niov < sizeof(conn->ksnc_rx_iov_space) / sizeof(struct iovec));
1124
1125         conn->ksnc_rx_niov = niov;
1126         conn->ksnc_rx_kiov = NULL;
1127         conn->ksnc_rx_nkiov = 0;
1128         conn->ksnc_rx_nob_wanted = skipped;
1129         return 0;
1130 }
1131
1132 static int
1133 ksocknal_process_receive(struct ksock_conn *conn)
1134 {
1135         struct lnet_hdr *lhdr;
1136         struct lnet_process_id *id;
1137         int rc;
1138
1139         LASSERT(atomic_read(&conn->ksnc_conn_refcount) > 0);
1140
1141         /* NB: sched lock NOT held */
1142         /* SOCKNAL_RX_LNET_HEADER is here for backward compatibility */
1143         LASSERT(conn->ksnc_rx_state == SOCKNAL_RX_KSM_HEADER ||
1144                 conn->ksnc_rx_state == SOCKNAL_RX_LNET_PAYLOAD ||
1145                 conn->ksnc_rx_state == SOCKNAL_RX_LNET_HEADER ||
1146                 conn->ksnc_rx_state == SOCKNAL_RX_SLOP);
1147  again:
1148         if (conn->ksnc_rx_nob_wanted) {
1149                 rc = ksocknal_receive(conn);
1150
1151                 if (rc <= 0) {
1152                         LASSERT(rc != -EAGAIN);
1153
1154                         if (!rc)
1155                                 CDEBUG(D_NET, "[%p] EOF from %s ip %pI4h:%d\n",
1156                                        conn,
1157                                        libcfs_id2str(conn->ksnc_peer->ksnp_id),
1158                                        &conn->ksnc_ipaddr,
1159                                        conn->ksnc_port);
1160                         else if (!conn->ksnc_closing)
1161                                 CERROR("[%p] Error %d on read from %s ip %pI4h:%d\n",
1162                                        conn, rc,
1163                                        libcfs_id2str(conn->ksnc_peer->ksnp_id),
1164                                        &conn->ksnc_ipaddr,
1165                                        conn->ksnc_port);
1166
1167                         /* it's not an error if conn is being closed */
1168                         ksocknal_close_conn_and_siblings(conn,
1169                                                          (conn->ksnc_closing) ? 0 : rc);
1170                         return (!rc ? -ESHUTDOWN : rc);
1171                 }
1172
1173                 if (conn->ksnc_rx_nob_wanted) {
1174                         /* short read */
1175                         return -EAGAIN;
1176                 }
1177         }
1178         switch (conn->ksnc_rx_state) {
1179         case SOCKNAL_RX_KSM_HEADER:
1180                 if (conn->ksnc_flip) {
1181                         __swab32s(&conn->ksnc_msg.ksm_type);
1182                         __swab32s(&conn->ksnc_msg.ksm_csum);
1183                         __swab64s(&conn->ksnc_msg.ksm_zc_cookies[0]);
1184                         __swab64s(&conn->ksnc_msg.ksm_zc_cookies[1]);
1185                 }
1186
1187                 if (conn->ksnc_msg.ksm_type != KSOCK_MSG_NOOP &&
1188                     conn->ksnc_msg.ksm_type != KSOCK_MSG_LNET) {
1189                         CERROR("%s: Unknown message type: %x\n",
1190                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1191                                conn->ksnc_msg.ksm_type);
1192                         ksocknal_new_packet(conn, 0);
1193                         ksocknal_close_conn_and_siblings(conn, -EPROTO);
1194                         return -EPROTO;
1195                 }
1196
1197                 if (conn->ksnc_msg.ksm_type == KSOCK_MSG_NOOP &&
1198                     conn->ksnc_msg.ksm_csum &&     /* has checksum */
1199                     conn->ksnc_msg.ksm_csum != conn->ksnc_rx_csum) {
1200                         /* NOOP Checksum error */
1201                         CERROR("%s: Checksum error, wire:0x%08X data:0x%08X\n",
1202                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1203                                conn->ksnc_msg.ksm_csum, conn->ksnc_rx_csum);
1204                         ksocknal_new_packet(conn, 0);
1205                         ksocknal_close_conn_and_siblings(conn, -EPROTO);
1206                         return -EIO;
1207                 }
1208
1209                 if (conn->ksnc_msg.ksm_zc_cookies[1]) {
1210                         __u64 cookie = 0;
1211
1212                         LASSERT(conn->ksnc_proto != &ksocknal_protocol_v1x);
1213
1214                         if (conn->ksnc_msg.ksm_type == KSOCK_MSG_NOOP)
1215                                 cookie = conn->ksnc_msg.ksm_zc_cookies[0];
1216
1217                         rc = conn->ksnc_proto->pro_handle_zcack(conn, cookie,
1218                                                conn->ksnc_msg.ksm_zc_cookies[1]);
1219
1220                         if (rc) {
1221                                 CERROR("%s: Unknown ZC-ACK cookie: %llu, %llu\n",
1222                                        libcfs_id2str(conn->ksnc_peer->ksnp_id),
1223                                        cookie, conn->ksnc_msg.ksm_zc_cookies[1]);
1224                                 ksocknal_new_packet(conn, 0);
1225                                 ksocknal_close_conn_and_siblings(conn, -EPROTO);
1226                                 return rc;
1227                         }
1228                 }
1229
1230                 if (conn->ksnc_msg.ksm_type == KSOCK_MSG_NOOP) {
1231                         ksocknal_new_packet(conn, 0);
1232                         return 0;       /* NOOP is done and just return */
1233                 }
1234
1235                 conn->ksnc_rx_state = SOCKNAL_RX_LNET_HEADER;
1236                 conn->ksnc_rx_nob_wanted = sizeof(struct ksock_lnet_msg);
1237                 conn->ksnc_rx_nob_left = sizeof(struct ksock_lnet_msg);
1238
1239                 conn->ksnc_rx_iov = (struct kvec *)&conn->ksnc_rx_iov_space;
1240                 conn->ksnc_rx_iov[0].iov_base = &conn->ksnc_msg.ksm_u.lnetmsg;
1241                 conn->ksnc_rx_iov[0].iov_len = sizeof(struct ksock_lnet_msg);
1242
1243                 conn->ksnc_rx_niov = 1;
1244                 conn->ksnc_rx_kiov = NULL;
1245                 conn->ksnc_rx_nkiov = 0;
1246
1247                 goto again;     /* read lnet header now */
1248
1249         case SOCKNAL_RX_LNET_HEADER:
1250                 /* unpack message header */
1251                 conn->ksnc_proto->pro_unpack(&conn->ksnc_msg);
1252
1253                 if (conn->ksnc_peer->ksnp_id.pid & LNET_PID_USERFLAG) {
1254                         /* Userspace peer */
1255                         lhdr = &conn->ksnc_msg.ksm_u.lnetmsg.ksnm_hdr;
1256                         id = &conn->ksnc_peer->ksnp_id;
1257
1258                         /* Substitute process ID assigned at connection time */
1259                         lhdr->src_pid = cpu_to_le32(id->pid);
1260                         lhdr->src_nid = cpu_to_le64(id->nid);
1261                 }
1262
1263                 conn->ksnc_rx_state = SOCKNAL_RX_PARSE;
1264                 ksocknal_conn_addref(conn);     /* ++ref while parsing */
1265
1266                 rc = lnet_parse(conn->ksnc_peer->ksnp_ni,
1267                                 &conn->ksnc_msg.ksm_u.lnetmsg.ksnm_hdr,
1268                                 conn->ksnc_peer->ksnp_id.nid, conn, 0);
1269                 if (rc < 0) {
1270                         /* I just received garbage: give up on this conn */
1271                         ksocknal_new_packet(conn, 0);
1272                         ksocknal_close_conn_and_siblings(conn, rc);
1273                         ksocknal_conn_decref(conn);
1274                         return -EPROTO;
1275                 }
1276
1277                 /* I'm racing with ksocknal_recv() */
1278                 LASSERT(conn->ksnc_rx_state == SOCKNAL_RX_PARSE ||
1279                         conn->ksnc_rx_state == SOCKNAL_RX_LNET_PAYLOAD);
1280
1281                 if (conn->ksnc_rx_state != SOCKNAL_RX_LNET_PAYLOAD)
1282                         return 0;
1283
1284                 /* ksocknal_recv() got called */
1285                 goto again;
1286
1287         case SOCKNAL_RX_LNET_PAYLOAD:
1288                 /* payload all received */
1289                 rc = 0;
1290
1291                 if (!conn->ksnc_rx_nob_left &&   /* not truncating */
1292                     conn->ksnc_msg.ksm_csum &&  /* has checksum */
1293                     conn->ksnc_msg.ksm_csum != conn->ksnc_rx_csum) {
1294                         CERROR("%s: Checksum error, wire:0x%08X data:0x%08X\n",
1295                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1296                                conn->ksnc_msg.ksm_csum, conn->ksnc_rx_csum);
1297                         rc = -EIO;
1298                 }
1299
1300                 if (!rc && conn->ksnc_msg.ksm_zc_cookies[0]) {
1301                         LASSERT(conn->ksnc_proto != &ksocknal_protocol_v1x);
1302
1303                         lhdr = &conn->ksnc_msg.ksm_u.lnetmsg.ksnm_hdr;
1304                         id = &conn->ksnc_peer->ksnp_id;
1305
1306                         rc = conn->ksnc_proto->pro_handle_zcreq(conn,
1307                                         conn->ksnc_msg.ksm_zc_cookies[0],
1308                                         *ksocknal_tunables.ksnd_nonblk_zcack ||
1309                                         le64_to_cpu(lhdr->src_nid) != id->nid);
1310                 }
1311
1312                 lnet_finalize(conn->ksnc_peer->ksnp_ni, conn->ksnc_cookie, rc);
1313
1314                 if (rc) {
1315                         ksocknal_new_packet(conn, 0);
1316                         ksocknal_close_conn_and_siblings(conn, rc);
1317                         return -EPROTO;
1318                 }
1319                 /* Fall through */
1320
1321         case SOCKNAL_RX_SLOP:
1322                 /* starting new packet? */
1323                 if (ksocknal_new_packet(conn, conn->ksnc_rx_nob_left))
1324                         return 0;       /* come back later */
1325                 goto again;          /* try to finish reading slop now */
1326
1327         default:
1328                 break;
1329         }
1330
1331         /* Not Reached */
1332         LBUG();
1333         return -EINVAL;                /* keep gcc happy */
1334 }
1335
1336 int
1337 ksocknal_recv(struct lnet_ni *ni, void *private, struct lnet_msg *msg,
1338               int delayed, struct iov_iter *to, unsigned int rlen)
1339 {
1340         struct ksock_conn *conn = private;
1341         struct ksock_sched *sched = conn->ksnc_scheduler;
1342
1343         LASSERT(iov_iter_count(to) <= rlen);
1344         LASSERT(to->nr_segs <= LNET_MAX_IOV);
1345
1346         conn->ksnc_cookie = msg;
1347         conn->ksnc_rx_nob_wanted = iov_iter_count(to);
1348         conn->ksnc_rx_nob_left = rlen;
1349
1350         if (to->type & ITER_KVEC) {
1351                 conn->ksnc_rx_nkiov = 0;
1352                 conn->ksnc_rx_kiov = NULL;
1353                 conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov;
1354                 conn->ksnc_rx_niov =
1355                         lnet_extract_iov(LNET_MAX_IOV, conn->ksnc_rx_iov,
1356                                          to->nr_segs, to->kvec,
1357                                          to->iov_offset, iov_iter_count(to));
1358         } else {
1359                 conn->ksnc_rx_niov = 0;
1360                 conn->ksnc_rx_iov = NULL;
1361                 conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
1362                 conn->ksnc_rx_nkiov =
1363                         lnet_extract_kiov(LNET_MAX_IOV, conn->ksnc_rx_kiov,
1364                                          to->nr_segs, to->bvec,
1365                                          to->iov_offset, iov_iter_count(to));
1366         }
1367
1368         LASSERT(conn->ksnc_rx_scheduled);
1369
1370         spin_lock_bh(&sched->kss_lock);
1371
1372         switch (conn->ksnc_rx_state) {
1373         case SOCKNAL_RX_PARSE_WAIT:
1374                 list_add_tail(&conn->ksnc_rx_list, &sched->kss_rx_conns);
1375                 wake_up(&sched->kss_waitq);
1376                 LASSERT(conn->ksnc_rx_ready);
1377                 break;
1378
1379         case SOCKNAL_RX_PARSE:
1380                 /* scheduler hasn't noticed I'm parsing yet */
1381                 break;
1382         }
1383
1384         conn->ksnc_rx_state = SOCKNAL_RX_LNET_PAYLOAD;
1385
1386         spin_unlock_bh(&sched->kss_lock);
1387         ksocknal_conn_decref(conn);
1388         return 0;
1389 }
1390
1391 static inline int
1392 ksocknal_sched_cansleep(struct ksock_sched *sched)
1393 {
1394         int rc;
1395
1396         spin_lock_bh(&sched->kss_lock);
1397
1398         rc = !ksocknal_data.ksnd_shuttingdown &&
1399               list_empty(&sched->kss_rx_conns) &&
1400               list_empty(&sched->kss_tx_conns);
1401
1402         spin_unlock_bh(&sched->kss_lock);
1403         return rc;
1404 }
1405
1406 int ksocknal_scheduler(void *arg)
1407 {
1408         struct ksock_sched_info *info;
1409         struct ksock_sched *sched;
1410         struct ksock_conn *conn;
1411         struct ksock_tx *tx;
1412         int rc;
1413         int nloops = 0;
1414         long id = (long)arg;
1415
1416         info = ksocknal_data.ksnd_sched_info[KSOCK_THREAD_CPT(id)];
1417         sched = &info->ksi_scheds[KSOCK_THREAD_SID(id)];
1418
1419         cfs_block_allsigs();
1420
1421         rc = cfs_cpt_bind(lnet_cpt_table(), info->ksi_cpt);
1422         if (rc) {
1423                 CWARN("Can't set CPU partition affinity to %d: %d\n",
1424                       info->ksi_cpt, rc);
1425         }
1426
1427         spin_lock_bh(&sched->kss_lock);
1428
1429         while (!ksocknal_data.ksnd_shuttingdown) {
1430                 int did_something = 0;
1431
1432                 /* Ensure I progress everything semi-fairly */
1433
1434                 if (!list_empty(&sched->kss_rx_conns)) {
1435                         conn = list_entry(sched->kss_rx_conns.next,
1436                                           struct ksock_conn, ksnc_rx_list);
1437                         list_del(&conn->ksnc_rx_list);
1438
1439                         LASSERT(conn->ksnc_rx_scheduled);
1440                         LASSERT(conn->ksnc_rx_ready);
1441
1442                         /*
1443                          * clear rx_ready in case receive isn't complete.
1444                          * Do it BEFORE we call process_recv, since
1445                          * data_ready can set it any time after we release
1446                          * kss_lock.
1447                          */
1448                         conn->ksnc_rx_ready = 0;
1449                         spin_unlock_bh(&sched->kss_lock);
1450
1451                         rc = ksocknal_process_receive(conn);
1452
1453                         spin_lock_bh(&sched->kss_lock);
1454
1455                         /* I'm the only one that can clear this flag */
1456                         LASSERT(conn->ksnc_rx_scheduled);
1457
1458                         /* Did process_receive get everything it wanted? */
1459                         if (!rc)
1460                                 conn->ksnc_rx_ready = 1;
1461
1462                         if (conn->ksnc_rx_state == SOCKNAL_RX_PARSE) {
1463                                 /*
1464                                  * Conn blocked waiting for ksocknal_recv()
1465                                  * I change its state (under lock) to signal
1466                                  * it can be rescheduled
1467                                  */
1468                                 conn->ksnc_rx_state = SOCKNAL_RX_PARSE_WAIT;
1469                         } else if (conn->ksnc_rx_ready) {
1470                                 /* reschedule for rx */
1471                                 list_add_tail(&conn->ksnc_rx_list,
1472                                               &sched->kss_rx_conns);
1473                         } else {
1474                                 conn->ksnc_rx_scheduled = 0;
1475                                 /* drop my ref */
1476                                 ksocknal_conn_decref(conn);
1477                         }
1478
1479                         did_something = 1;
1480                 }
1481
1482                 if (!list_empty(&sched->kss_tx_conns)) {
1483                         LIST_HEAD(zlist);
1484
1485                         if (!list_empty(&sched->kss_zombie_noop_txs)) {
1486                                 list_add(&zlist, &sched->kss_zombie_noop_txs);
1487                                 list_del_init(&sched->kss_zombie_noop_txs);
1488                         }
1489
1490                         conn = list_entry(sched->kss_tx_conns.next,
1491                                           struct ksock_conn, ksnc_tx_list);
1492                         list_del(&conn->ksnc_tx_list);
1493
1494                         LASSERT(conn->ksnc_tx_scheduled);
1495                         LASSERT(conn->ksnc_tx_ready);
1496                         LASSERT(!list_empty(&conn->ksnc_tx_queue));
1497
1498                         tx = list_entry(conn->ksnc_tx_queue.next,
1499                                         struct ksock_tx, tx_list);
1500
1501                         if (conn->ksnc_tx_carrier == tx)
1502                                 ksocknal_next_tx_carrier(conn);
1503
1504                         /* dequeue now so empty list => more to send */
1505                         list_del(&tx->tx_list);
1506
1507                         /*
1508                          * Clear tx_ready in case send isn't complete.  Do
1509                          * it BEFORE we call process_transmit, since
1510                          * write_space can set it any time after we release
1511                          * kss_lock.
1512                          */
1513                         conn->ksnc_tx_ready = 0;
1514                         spin_unlock_bh(&sched->kss_lock);
1515
1516                         if (!list_empty(&zlist)) {
1517                                 /*
1518                                  * free zombie noop txs, it's fast because
1519                                  * noop txs are just put in freelist
1520                                  */
1521                                 ksocknal_txlist_done(NULL, &zlist, 0);
1522                         }
1523
1524                         rc = ksocknal_process_transmit(conn, tx);
1525
1526                         if (rc == -ENOMEM || rc == -EAGAIN) {
1527                                 /*
1528                                  * Incomplete send: replace tx on HEAD of
1529                                  * tx_queue
1530                                  */
1531                                 spin_lock_bh(&sched->kss_lock);
1532                                 list_add(&tx->tx_list, &conn->ksnc_tx_queue);
1533                         } else {
1534                                 /* Complete send; tx -ref */
1535                                 ksocknal_tx_decref(tx);
1536
1537                                 spin_lock_bh(&sched->kss_lock);
1538                                 /* assume space for more */
1539                                 conn->ksnc_tx_ready = 1;
1540                         }
1541
1542                         if (rc == -ENOMEM) {
1543                                 /*
1544                                  * Do nothing; after a short timeout, this
1545                                  * conn will be reposted on kss_tx_conns.
1546                                  */
1547                         } else if (conn->ksnc_tx_ready &&
1548                                    !list_empty(&conn->ksnc_tx_queue)) {
1549                                 /* reschedule for tx */
1550                                 list_add_tail(&conn->ksnc_tx_list,
1551                                               &sched->kss_tx_conns);
1552                         } else {
1553                                 conn->ksnc_tx_scheduled = 0;
1554                                 /* drop my ref */
1555                                 ksocknal_conn_decref(conn);
1556                         }
1557
1558                         did_something = 1;
1559                 }
1560                 if (!did_something ||      /* nothing to do */
1561                     ++nloops == SOCKNAL_RESCHED) { /* hogging CPU? */
1562                         spin_unlock_bh(&sched->kss_lock);
1563
1564                         nloops = 0;
1565
1566                         if (!did_something) {   /* wait for something to do */
1567                                 rc = wait_event_interruptible_exclusive(
1568                                         sched->kss_waitq,
1569                                         !ksocknal_sched_cansleep(sched));
1570                                 LASSERT(!rc);
1571                         } else {
1572                                 cond_resched();
1573                         }
1574
1575                         spin_lock_bh(&sched->kss_lock);
1576                 }
1577         }
1578
1579         spin_unlock_bh(&sched->kss_lock);
1580         ksocknal_thread_fini();
1581         return 0;
1582 }
1583
1584 /*
1585  * Add connection to kss_rx_conns of scheduler
1586  * and wakeup the scheduler.
1587  */
1588 void ksocknal_read_callback(struct ksock_conn *conn)
1589 {
1590         struct ksock_sched *sched;
1591
1592         sched = conn->ksnc_scheduler;
1593
1594         spin_lock_bh(&sched->kss_lock);
1595
1596         conn->ksnc_rx_ready = 1;
1597
1598         if (!conn->ksnc_rx_scheduled) {  /* not being progressed */
1599                 list_add_tail(&conn->ksnc_rx_list, &sched->kss_rx_conns);
1600                 conn->ksnc_rx_scheduled = 1;
1601                 /* extra ref for scheduler */
1602                 ksocknal_conn_addref(conn);
1603
1604                 wake_up(&sched->kss_waitq);
1605         }
1606         spin_unlock_bh(&sched->kss_lock);
1607 }
1608
1609 /*
1610  * Add connection to kss_tx_conns of scheduler
1611  * and wakeup the scheduler.
1612  */
1613 void ksocknal_write_callback(struct ksock_conn *conn)
1614 {
1615         struct ksock_sched *sched;
1616
1617         sched = conn->ksnc_scheduler;
1618
1619         spin_lock_bh(&sched->kss_lock);
1620
1621         conn->ksnc_tx_ready = 1;
1622
1623         if (!conn->ksnc_tx_scheduled && /* not being progressed */
1624             !list_empty(&conn->ksnc_tx_queue)) { /* packets to send */
1625                 list_add_tail(&conn->ksnc_tx_list, &sched->kss_tx_conns);
1626                 conn->ksnc_tx_scheduled = 1;
1627                 /* extra ref for scheduler */
1628                 ksocknal_conn_addref(conn);
1629
1630                 wake_up(&sched->kss_waitq);
1631         }
1632
1633         spin_unlock_bh(&sched->kss_lock);
1634 }
1635
1636 static struct ksock_proto *
1637 ksocknal_parse_proto_version(struct ksock_hello_msg *hello)
1638 {
1639         __u32 version = 0;
1640
1641         if (hello->kshm_magic == LNET_PROTO_MAGIC)
1642                 version = hello->kshm_version;
1643         else if (hello->kshm_magic == __swab32(LNET_PROTO_MAGIC))
1644                 version = __swab32(hello->kshm_version);
1645
1646         if (version) {
1647 #if SOCKNAL_VERSION_DEBUG
1648                 if (*ksocknal_tunables.ksnd_protocol == 1)
1649                         return NULL;
1650
1651                 if (*ksocknal_tunables.ksnd_protocol == 2 &&
1652                     version == KSOCK_PROTO_V3)
1653                         return NULL;
1654 #endif
1655                 if (version == KSOCK_PROTO_V2)
1656                         return &ksocknal_protocol_v2x;
1657
1658                 if (version == KSOCK_PROTO_V3)
1659                         return &ksocknal_protocol_v3x;
1660
1661                 return NULL;
1662         }
1663
1664         if (hello->kshm_magic == le32_to_cpu(LNET_PROTO_TCP_MAGIC)) {
1665                 struct lnet_magicversion *hmv = (struct lnet_magicversion *)hello;
1666
1667                 BUILD_BUG_ON(sizeof(struct lnet_magicversion) !=
1668                              offsetof(struct ksock_hello_msg, kshm_src_nid));
1669
1670                 if (hmv->version_major == cpu_to_le16(KSOCK_PROTO_V1_MAJOR) &&
1671                     hmv->version_minor == cpu_to_le16(KSOCK_PROTO_V1_MINOR))
1672                         return &ksocknal_protocol_v1x;
1673         }
1674
1675         return NULL;
1676 }
1677
1678 int
1679 ksocknal_send_hello(struct lnet_ni *ni, struct ksock_conn *conn,
1680                     lnet_nid_t peer_nid, struct ksock_hello_msg *hello)
1681 {
1682         /* CAVEAT EMPTOR: this byte flips 'ipaddrs' */
1683         struct ksock_net *net = (struct ksock_net *)ni->ni_data;
1684
1685         LASSERT(hello->kshm_nips <= LNET_MAX_INTERFACES);
1686
1687         /* rely on caller to hold a ref on socket so it wouldn't disappear */
1688         LASSERT(conn->ksnc_proto);
1689
1690         hello->kshm_src_nid = ni->ni_nid;
1691         hello->kshm_dst_nid = peer_nid;
1692         hello->kshm_src_pid = the_lnet.ln_pid;
1693
1694         hello->kshm_src_incarnation = net->ksnn_incarnation;
1695         hello->kshm_ctype = conn->ksnc_type;
1696
1697         return conn->ksnc_proto->pro_send_hello(conn, hello);
1698 }
1699
1700 static int
1701 ksocknal_invert_type(int type)
1702 {
1703         switch (type) {
1704         case SOCKLND_CONN_ANY:
1705         case SOCKLND_CONN_CONTROL:
1706                 return type;
1707         case SOCKLND_CONN_BULK_IN:
1708                 return SOCKLND_CONN_BULK_OUT;
1709         case SOCKLND_CONN_BULK_OUT:
1710                 return SOCKLND_CONN_BULK_IN;
1711         default:
1712                 return SOCKLND_CONN_NONE;
1713         }
1714 }
1715
1716 int
1717 ksocknal_recv_hello(struct lnet_ni *ni, struct ksock_conn *conn,
1718                     struct ksock_hello_msg *hello,
1719                     struct lnet_process_id *peerid,
1720                     __u64 *incarnation)
1721 {
1722         /* Return < 0   fatal error
1723          *      0         success
1724          *      EALREADY   lost connection race
1725          *      EPROTO     protocol version mismatch
1726          */
1727         struct socket *sock = conn->ksnc_sock;
1728         int active = !!conn->ksnc_proto;
1729         int timeout;
1730         int proto_match;
1731         int rc;
1732         struct ksock_proto *proto;
1733         struct lnet_process_id recv_id;
1734
1735         /* socket type set on active connections - not set on passive */
1736         LASSERT(!active == !(conn->ksnc_type != SOCKLND_CONN_NONE));
1737
1738         timeout = active ? *ksocknal_tunables.ksnd_timeout :
1739                             lnet_acceptor_timeout();
1740
1741         rc = lnet_sock_read(sock, &hello->kshm_magic,
1742                             sizeof(hello->kshm_magic), timeout);
1743         if (rc) {
1744                 CERROR("Error %d reading HELLO from %pI4h\n",
1745                        rc, &conn->ksnc_ipaddr);
1746                 LASSERT(rc < 0);
1747                 return rc;
1748         }
1749
1750         if (hello->kshm_magic != LNET_PROTO_MAGIC &&
1751             hello->kshm_magic != __swab32(LNET_PROTO_MAGIC) &&
1752             hello->kshm_magic != le32_to_cpu(LNET_PROTO_TCP_MAGIC)) {
1753                 /* Unexpected magic! */
1754                 CERROR("Bad magic(1) %#08x (%#08x expected) from %pI4h\n",
1755                        __cpu_to_le32(hello->kshm_magic),
1756                        LNET_PROTO_TCP_MAGIC,
1757                        &conn->ksnc_ipaddr);
1758                 return -EPROTO;
1759         }
1760
1761         rc = lnet_sock_read(sock, &hello->kshm_version,
1762                             sizeof(hello->kshm_version), timeout);
1763         if (rc) {
1764                 CERROR("Error %d reading HELLO from %pI4h\n",
1765                        rc, &conn->ksnc_ipaddr);
1766                 LASSERT(rc < 0);
1767                 return rc;
1768         }
1769
1770         proto = ksocknal_parse_proto_version(hello);
1771         if (!proto) {
1772                 if (!active) {
1773                         /* unknown protocol from peer, tell peer my protocol */
1774                         conn->ksnc_proto = &ksocknal_protocol_v3x;
1775 #if SOCKNAL_VERSION_DEBUG
1776                         if (*ksocknal_tunables.ksnd_protocol == 2)
1777                                 conn->ksnc_proto = &ksocknal_protocol_v2x;
1778                         else if (*ksocknal_tunables.ksnd_protocol == 1)
1779                                 conn->ksnc_proto = &ksocknal_protocol_v1x;
1780 #endif
1781                         hello->kshm_nips = 0;
1782                         ksocknal_send_hello(ni, conn, ni->ni_nid, hello);
1783                 }
1784
1785                 CERROR("Unknown protocol version (%d.x expected) from %pI4h\n",
1786                        conn->ksnc_proto->pro_version,
1787                        &conn->ksnc_ipaddr);
1788
1789                 return -EPROTO;
1790         }
1791
1792         proto_match = (conn->ksnc_proto == proto);
1793         conn->ksnc_proto = proto;
1794
1795         /* receive the rest of hello message anyway */
1796         rc = conn->ksnc_proto->pro_recv_hello(conn, hello, timeout);
1797         if (rc) {
1798                 CERROR("Error %d reading or checking hello from from %pI4h\n",
1799                        rc, &conn->ksnc_ipaddr);
1800                 LASSERT(rc < 0);
1801                 return rc;
1802         }
1803
1804         *incarnation = hello->kshm_src_incarnation;
1805
1806         if (hello->kshm_src_nid == LNET_NID_ANY) {
1807                 CERROR("Expecting a HELLO hdr with a NID, but got LNET_NID_ANY from %pI4h\n",
1808                        &conn->ksnc_ipaddr);
1809                 return -EPROTO;
1810         }
1811
1812         if (!active &&
1813             conn->ksnc_port > LNET_ACCEPTOR_MAX_RESERVED_PORT) {
1814                 /* Userspace NAL assigns peer process ID from socket */
1815                 recv_id.pid = conn->ksnc_port | LNET_PID_USERFLAG;
1816                 recv_id.nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid),
1817                                          conn->ksnc_ipaddr);
1818         } else {
1819                 recv_id.nid = hello->kshm_src_nid;
1820                 recv_id.pid = hello->kshm_src_pid;
1821         }
1822
1823         if (!active) {
1824                 *peerid = recv_id;
1825
1826                 /* peer determines type */
1827                 conn->ksnc_type = ksocknal_invert_type(hello->kshm_ctype);
1828                 if (conn->ksnc_type == SOCKLND_CONN_NONE) {
1829                         CERROR("Unexpected type %d from %s ip %pI4h\n",
1830                                hello->kshm_ctype, libcfs_id2str(*peerid),
1831                                &conn->ksnc_ipaddr);
1832                         return -EPROTO;
1833                 }
1834
1835                 return 0;
1836         }
1837
1838         if (peerid->pid != recv_id.pid ||
1839             peerid->nid != recv_id.nid) {
1840                 LCONSOLE_ERROR_MSG(0x130, "Connected successfully to %s on host %pI4h, but they claimed they were %s; please check your Lustre configuration.\n",
1841                                    libcfs_id2str(*peerid),
1842                                    &conn->ksnc_ipaddr,
1843                                    libcfs_id2str(recv_id));
1844                 return -EPROTO;
1845         }
1846
1847         if (hello->kshm_ctype == SOCKLND_CONN_NONE) {
1848                 /* Possible protocol mismatch or I lost the connection race */
1849                 return proto_match ? EALREADY : EPROTO;
1850         }
1851
1852         if (ksocknal_invert_type(hello->kshm_ctype) != conn->ksnc_type) {
1853                 CERROR("Mismatched types: me %d, %s ip %pI4h %d\n",
1854                        conn->ksnc_type, libcfs_id2str(*peerid),
1855                        &conn->ksnc_ipaddr, hello->kshm_ctype);
1856                 return -EPROTO;
1857         }
1858
1859         return 0;
1860 }
1861
1862 static int
1863 ksocknal_connect(struct ksock_route *route)
1864 {
1865         LIST_HEAD(zombies);
1866         struct ksock_peer *peer = route->ksnr_peer;
1867         int type;
1868         int wanted;
1869         struct socket *sock;
1870         unsigned long deadline;
1871         int retry_later = 0;
1872         int rc = 0;
1873
1874         deadline = cfs_time_add(cfs_time_current(),
1875                                 cfs_time_seconds(*ksocknal_tunables.ksnd_timeout));
1876
1877         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1878
1879         LASSERT(route->ksnr_scheduled);
1880         LASSERT(!route->ksnr_connecting);
1881
1882         route->ksnr_connecting = 1;
1883
1884         for (;;) {
1885                 wanted = ksocknal_route_mask() & ~route->ksnr_connected;
1886
1887                 /*
1888                  * stop connecting if peer/route got closed under me, or
1889                  * route got connected while queued
1890                  */
1891                 if (peer->ksnp_closing || route->ksnr_deleted ||
1892                     !wanted) {
1893                         retry_later = 0;
1894                         break;
1895                 }
1896
1897                 /* reschedule if peer is connecting to me */
1898                 if (peer->ksnp_accepting > 0) {
1899                         CDEBUG(D_NET,
1900                                "peer %s(%d) already connecting to me, retry later.\n",
1901                                libcfs_nid2str(peer->ksnp_id.nid),
1902                                peer->ksnp_accepting);
1903                         retry_later = 1;
1904                 }
1905
1906                 if (retry_later) /* needs reschedule */
1907                         break;
1908
1909                 if (wanted & BIT(SOCKLND_CONN_ANY)) {
1910                         type = SOCKLND_CONN_ANY;
1911                 } else if (wanted & BIT(SOCKLND_CONN_CONTROL)) {
1912                         type = SOCKLND_CONN_CONTROL;
1913                 } else if (wanted & BIT(SOCKLND_CONN_BULK_IN)) {
1914                         type = SOCKLND_CONN_BULK_IN;
1915                 } else {
1916                         LASSERT(wanted & BIT(SOCKLND_CONN_BULK_OUT));
1917                         type = SOCKLND_CONN_BULK_OUT;
1918                 }
1919
1920                 write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1921
1922                 if (cfs_time_aftereq(cfs_time_current(), deadline)) {
1923                         rc = -ETIMEDOUT;
1924                         lnet_connect_console_error(rc, peer->ksnp_id.nid,
1925                                                    route->ksnr_ipaddr,
1926                                                    route->ksnr_port);
1927                         goto failed;
1928                 }
1929
1930                 rc = lnet_connect(&sock, peer->ksnp_id.nid,
1931                                   route->ksnr_myipaddr,
1932                                   route->ksnr_ipaddr, route->ksnr_port);
1933                 if (rc)
1934                         goto failed;
1935
1936                 rc = ksocknal_create_conn(peer->ksnp_ni, route, sock, type);
1937                 if (rc < 0) {
1938                         lnet_connect_console_error(rc, peer->ksnp_id.nid,
1939                                                    route->ksnr_ipaddr,
1940                                                    route->ksnr_port);
1941                         goto failed;
1942                 }
1943
1944                 /*
1945                  * A +ve RC means I have to retry because I lost the connection
1946                  * race or I have to renegotiate protocol version
1947                  */
1948                 retry_later = (rc);
1949                 if (retry_later)
1950                         CDEBUG(D_NET, "peer %s: conn race, retry later.\n",
1951                                libcfs_nid2str(peer->ksnp_id.nid));
1952
1953                 write_lock_bh(&ksocknal_data.ksnd_global_lock);
1954         }
1955
1956         route->ksnr_scheduled = 0;
1957         route->ksnr_connecting = 0;
1958
1959         if (retry_later) {
1960                 /*
1961                  * re-queue for attention; this frees me up to handle
1962                  * the peer's incoming connection request
1963                  */
1964                 if (rc == EALREADY ||
1965                     (!rc && peer->ksnp_accepting > 0)) {
1966                         /*
1967                          * We want to introduce a delay before next
1968                          * attempt to connect if we lost conn race,
1969                          * but the race is resolved quickly usually,
1970                          * so min_reconnectms should be good heuristic
1971                          */
1972                         route->ksnr_retry_interval =
1973                                 cfs_time_seconds(*ksocknal_tunables.ksnd_min_reconnectms) / 1000;
1974                         route->ksnr_timeout = cfs_time_add(cfs_time_current(),
1975                                                            route->ksnr_retry_interval);
1976                 }
1977
1978                 ksocknal_launch_connection_locked(route);
1979         }
1980
1981         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1982         return retry_later;
1983
1984  failed:
1985         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1986
1987         route->ksnr_scheduled = 0;
1988         route->ksnr_connecting = 0;
1989
1990         /* This is a retry rather than a new connection */
1991         route->ksnr_retry_interval *= 2;
1992         route->ksnr_retry_interval =
1993                 max(route->ksnr_retry_interval,
1994                     cfs_time_seconds(*ksocknal_tunables.ksnd_min_reconnectms) / 1000);
1995         route->ksnr_retry_interval =
1996                 min(route->ksnr_retry_interval,
1997                     cfs_time_seconds(*ksocknal_tunables.ksnd_max_reconnectms) / 1000);
1998
1999         LASSERT(route->ksnr_retry_interval);
2000         route->ksnr_timeout = cfs_time_add(cfs_time_current(),
2001                                            route->ksnr_retry_interval);
2002
2003         if (!list_empty(&peer->ksnp_tx_queue) &&
2004             !peer->ksnp_accepting &&
2005             !ksocknal_find_connecting_route_locked(peer)) {
2006                 struct ksock_conn *conn;
2007
2008                 /*
2009                  * ksnp_tx_queue is queued on a conn on successful
2010                  * connection for V1.x and V2.x
2011                  */
2012                 if (!list_empty(&peer->ksnp_conns)) {
2013                         conn = list_entry(peer->ksnp_conns.next,
2014                                           struct ksock_conn, ksnc_list);
2015                         LASSERT(conn->ksnc_proto == &ksocknal_protocol_v3x);
2016                 }
2017
2018                 /*
2019                  * take all the blocked packets while I've got the lock and
2020                  * complete below...
2021                  */
2022                 list_splice_init(&peer->ksnp_tx_queue, &zombies);
2023         }
2024
2025         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
2026
2027         ksocknal_peer_failed(peer);
2028         ksocknal_txlist_done(peer->ksnp_ni, &zombies, 1);
2029         return 0;
2030 }
2031
2032 /*
2033  * check whether we need to create more connds.
2034  * It will try to create new thread if it's necessary, @timeout can
2035  * be updated if failed to create, so caller wouldn't keep try while
2036  * running out of resource.
2037  */
2038 static int
2039 ksocknal_connd_check_start(time64_t sec, long *timeout)
2040 {
2041         char name[16];
2042         int rc;
2043         int total = ksocknal_data.ksnd_connd_starting +
2044                     ksocknal_data.ksnd_connd_running;
2045
2046         if (unlikely(ksocknal_data.ksnd_init < SOCKNAL_INIT_ALL)) {
2047                 /* still in initializing */
2048                 return 0;
2049         }
2050
2051         if (total >= *ksocknal_tunables.ksnd_nconnds_max ||
2052             total > ksocknal_data.ksnd_connd_connecting + SOCKNAL_CONND_RESV) {
2053                 /*
2054                  * can't create more connd, or still have enough
2055                  * threads to handle more connecting
2056                  */
2057                 return 0;
2058         }
2059
2060         if (list_empty(&ksocknal_data.ksnd_connd_routes)) {
2061                 /* no pending connecting request */
2062                 return 0;
2063         }
2064
2065         if (sec - ksocknal_data.ksnd_connd_failed_stamp <= 1) {
2066                 /* may run out of resource, retry later */
2067                 *timeout = cfs_time_seconds(1);
2068                 return 0;
2069         }
2070
2071         if (ksocknal_data.ksnd_connd_starting > 0) {
2072                 /* serialize starting to avoid flood */
2073                 return 0;
2074         }
2075
2076         ksocknal_data.ksnd_connd_starting_stamp = sec;
2077         ksocknal_data.ksnd_connd_starting++;
2078         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
2079
2080         /* NB: total is the next id */
2081         snprintf(name, sizeof(name), "socknal_cd%02d", total);
2082         rc = ksocknal_thread_start(ksocknal_connd, NULL, name);
2083
2084         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
2085         if (!rc)
2086                 return 1;
2087
2088         /* we tried ... */
2089         LASSERT(ksocknal_data.ksnd_connd_starting > 0);
2090         ksocknal_data.ksnd_connd_starting--;
2091         ksocknal_data.ksnd_connd_failed_stamp = ktime_get_real_seconds();
2092
2093         return 1;
2094 }
2095
2096 /*
2097  * check whether current thread can exit, it will return 1 if there are too
2098  * many threads and no creating in past 120 seconds.
2099  * Also, this function may update @timeout to make caller come back
2100  * again to recheck these conditions.
2101  */
2102 static int
2103 ksocknal_connd_check_stop(time64_t sec, long *timeout)
2104 {
2105         int val;
2106
2107         if (unlikely(ksocknal_data.ksnd_init < SOCKNAL_INIT_ALL)) {
2108                 /* still in initializing */
2109                 return 0;
2110         }
2111
2112         if (ksocknal_data.ksnd_connd_starting > 0) {
2113                 /* in progress of starting new thread */
2114                 return 0;
2115         }
2116
2117         if (ksocknal_data.ksnd_connd_running <=
2118             *ksocknal_tunables.ksnd_nconnds) { /* can't shrink */
2119                 return 0;
2120         }
2121
2122         /* created thread in past 120 seconds? */
2123         val = (int)(ksocknal_data.ksnd_connd_starting_stamp +
2124                     SOCKNAL_CONND_TIMEOUT - sec);
2125
2126         *timeout = (val > 0) ? cfs_time_seconds(val) :
2127                                cfs_time_seconds(SOCKNAL_CONND_TIMEOUT);
2128         if (val > 0)
2129                 return 0;
2130
2131         /* no creating in past 120 seconds */
2132
2133         return ksocknal_data.ksnd_connd_running >
2134                ksocknal_data.ksnd_connd_connecting + SOCKNAL_CONND_RESV;
2135 }
2136
2137 /*
2138  * Go through connd_routes queue looking for a route that we can process
2139  * right now, @timeout_p can be updated if we need to come back later
2140  */
2141 static struct ksock_route *
2142 ksocknal_connd_get_route_locked(signed long *timeout_p)
2143 {
2144         struct ksock_route *route;
2145         unsigned long now;
2146
2147         now = cfs_time_current();
2148
2149         /* connd_routes can contain both pending and ordinary routes */
2150         list_for_each_entry(route, &ksocknal_data.ksnd_connd_routes,
2151                             ksnr_connd_list) {
2152                 if (!route->ksnr_retry_interval ||
2153                     cfs_time_aftereq(now, route->ksnr_timeout))
2154                         return route;
2155
2156                 if (*timeout_p == MAX_SCHEDULE_TIMEOUT ||
2157                     (int)*timeout_p > (int)(route->ksnr_timeout - now))
2158                         *timeout_p = (int)(route->ksnr_timeout - now);
2159         }
2160
2161         return NULL;
2162 }
2163
2164 int
2165 ksocknal_connd(void *arg)
2166 {
2167         spinlock_t *connd_lock = &ksocknal_data.ksnd_connd_lock;
2168         struct ksock_connreq *cr;
2169         wait_queue_entry_t wait;
2170         int nloops = 0;
2171         int cons_retry = 0;
2172
2173         cfs_block_allsigs();
2174
2175         init_waitqueue_entry(&wait, current);
2176
2177         spin_lock_bh(connd_lock);
2178
2179         LASSERT(ksocknal_data.ksnd_connd_starting > 0);
2180         ksocknal_data.ksnd_connd_starting--;
2181         ksocknal_data.ksnd_connd_running++;
2182
2183         while (!ksocknal_data.ksnd_shuttingdown) {
2184                 struct ksock_route *route = NULL;
2185                 time64_t sec = ktime_get_real_seconds();
2186                 long timeout = MAX_SCHEDULE_TIMEOUT;
2187                 int dropped_lock = 0;
2188
2189                 if (ksocknal_connd_check_stop(sec, &timeout)) {
2190                         /* wakeup another one to check stop */
2191                         wake_up(&ksocknal_data.ksnd_connd_waitq);
2192                         break;
2193                 }
2194
2195                 if (ksocknal_connd_check_start(sec, &timeout)) {
2196                         /* created new thread */
2197                         dropped_lock = 1;
2198                 }
2199
2200                 if (!list_empty(&ksocknal_data.ksnd_connd_connreqs)) {
2201                         /* Connection accepted by the listener */
2202                         cr = list_entry(ksocknal_data.ksnd_connd_connreqs.next,
2203                                         struct ksock_connreq, ksncr_list);
2204
2205                         list_del(&cr->ksncr_list);
2206                         spin_unlock_bh(connd_lock);
2207                         dropped_lock = 1;
2208
2209                         ksocknal_create_conn(cr->ksncr_ni, NULL,
2210                                              cr->ksncr_sock, SOCKLND_CONN_NONE);
2211                         lnet_ni_decref(cr->ksncr_ni);
2212                         LIBCFS_FREE(cr, sizeof(*cr));
2213
2214                         spin_lock_bh(connd_lock);
2215                 }
2216
2217                 /*
2218                  * Only handle an outgoing connection request if there
2219                  * is a thread left to handle incoming connections and
2220                  * create new connd
2221                  */
2222                 if (ksocknal_data.ksnd_connd_connecting + SOCKNAL_CONND_RESV <
2223                     ksocknal_data.ksnd_connd_running) {
2224                         route = ksocknal_connd_get_route_locked(&timeout);
2225                 }
2226                 if (route) {
2227                         list_del(&route->ksnr_connd_list);
2228                         ksocknal_data.ksnd_connd_connecting++;
2229                         spin_unlock_bh(connd_lock);
2230                         dropped_lock = 1;
2231
2232                         if (ksocknal_connect(route)) {
2233                                 /* consecutive retry */
2234                                 if (cons_retry++ > SOCKNAL_INSANITY_RECONN) {
2235                                         CWARN("massive consecutive re-connecting to %pI4h\n",
2236                                               &route->ksnr_ipaddr);
2237                                         cons_retry = 0;
2238                                 }
2239                         } else {
2240                                 cons_retry = 0;
2241                         }
2242
2243                         ksocknal_route_decref(route);
2244
2245                         spin_lock_bh(connd_lock);
2246                         ksocknal_data.ksnd_connd_connecting--;
2247                 }
2248
2249                 if (dropped_lock) {
2250                         if (++nloops < SOCKNAL_RESCHED)
2251                                 continue;
2252                         spin_unlock_bh(connd_lock);
2253                         nloops = 0;
2254                         cond_resched();
2255                         spin_lock_bh(connd_lock);
2256                         continue;
2257                 }
2258
2259                 /* Nothing to do for 'timeout'  */
2260                 set_current_state(TASK_INTERRUPTIBLE);
2261                 add_wait_queue_exclusive(&ksocknal_data.ksnd_connd_waitq,
2262                                          &wait);
2263                 spin_unlock_bh(connd_lock);
2264
2265                 nloops = 0;
2266                 schedule_timeout(timeout);
2267
2268                 remove_wait_queue(&ksocknal_data.ksnd_connd_waitq, &wait);
2269                 spin_lock_bh(connd_lock);
2270         }
2271         ksocknal_data.ksnd_connd_running--;
2272         spin_unlock_bh(connd_lock);
2273
2274         ksocknal_thread_fini();
2275         return 0;
2276 }
2277
2278 static struct ksock_conn *
2279 ksocknal_find_timed_out_conn(struct ksock_peer *peer)
2280 {
2281         /* We're called with a shared lock on ksnd_global_lock */
2282         struct ksock_conn *conn;
2283         struct list_head *ctmp;
2284
2285         list_for_each(ctmp, &peer->ksnp_conns) {
2286                 int error;
2287
2288                 conn = list_entry(ctmp, struct ksock_conn, ksnc_list);
2289
2290                 /* Don't need the {get,put}connsock dance to deref ksnc_sock */
2291                 LASSERT(!conn->ksnc_closing);
2292
2293                 /*
2294                  * SOCK_ERROR will reset error code of socket in
2295                  * some platform (like Darwin8.x)
2296                  */
2297                 error = conn->ksnc_sock->sk->sk_err;
2298                 if (error) {
2299                         ksocknal_conn_addref(conn);
2300
2301                         switch (error) {
2302                         case ECONNRESET:
2303                                 CNETERR("A connection with %s (%pI4h:%d) was reset; it may have rebooted.\n",
2304                                         libcfs_id2str(peer->ksnp_id),
2305                                         &conn->ksnc_ipaddr,
2306                                         conn->ksnc_port);
2307                                 break;
2308                         case ETIMEDOUT:
2309                                 CNETERR("A connection with %s (%pI4h:%d) timed out; the network or node may be down.\n",
2310                                         libcfs_id2str(peer->ksnp_id),
2311                                         &conn->ksnc_ipaddr,
2312                                         conn->ksnc_port);
2313                                 break;
2314                         default:
2315                                 CNETERR("An unexpected network error %d occurred with %s (%pI4h:%d\n",
2316                                         error,
2317                                         libcfs_id2str(peer->ksnp_id),
2318                                         &conn->ksnc_ipaddr,
2319                                         conn->ksnc_port);
2320                                 break;
2321                         }
2322
2323                         return conn;
2324                 }
2325
2326                 if (conn->ksnc_rx_started &&
2327                     cfs_time_aftereq(cfs_time_current(),
2328                                      conn->ksnc_rx_deadline)) {
2329                         /* Timed out incomplete incoming message */
2330                         ksocknal_conn_addref(conn);
2331                         CNETERR("Timeout receiving from %s (%pI4h:%d), state %d wanted %d left %d\n",
2332                                 libcfs_id2str(peer->ksnp_id),
2333                                 &conn->ksnc_ipaddr,
2334                                 conn->ksnc_port,
2335                                 conn->ksnc_rx_state,
2336                                 conn->ksnc_rx_nob_wanted,
2337                                 conn->ksnc_rx_nob_left);
2338                         return conn;
2339                 }
2340
2341                 if ((!list_empty(&conn->ksnc_tx_queue) ||
2342                      conn->ksnc_sock->sk->sk_wmem_queued) &&
2343                     cfs_time_aftereq(cfs_time_current(),
2344                                      conn->ksnc_tx_deadline)) {
2345                         /*
2346                          * Timed out messages queued for sending or
2347                          * buffered in the socket's send buffer
2348                          */
2349                         ksocknal_conn_addref(conn);
2350                         CNETERR("Timeout sending data to %s (%pI4h:%d) the network or that node may be down.\n",
2351                                 libcfs_id2str(peer->ksnp_id),
2352                                 &conn->ksnc_ipaddr,
2353                                 conn->ksnc_port);
2354                         return conn;
2355                 }
2356         }
2357
2358         return NULL;
2359 }
2360
2361 static inline void
2362 ksocknal_flush_stale_txs(struct ksock_peer *peer)
2363 {
2364         struct ksock_tx *tx;
2365         struct ksock_tx *tmp;
2366         LIST_HEAD(stale_txs);
2367
2368         write_lock_bh(&ksocknal_data.ksnd_global_lock);
2369
2370         list_for_each_entry_safe(tx, tmp, &peer->ksnp_tx_queue, tx_list) {
2371                 if (!cfs_time_aftereq(cfs_time_current(),
2372                                       tx->tx_deadline))
2373                         break;
2374
2375                 list_del(&tx->tx_list);
2376                 list_add_tail(&tx->tx_list, &stale_txs);
2377         }
2378
2379         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
2380
2381         ksocknal_txlist_done(peer->ksnp_ni, &stale_txs, 1);
2382 }
2383
2384 static int
2385 ksocknal_send_keepalive_locked(struct ksock_peer *peer)
2386         __must_hold(&ksocknal_data.ksnd_global_lock)
2387 {
2388         struct ksock_sched *sched;
2389         struct ksock_conn *conn;
2390         struct ksock_tx *tx;
2391
2392         /* last_alive will be updated by create_conn */
2393         if (list_empty(&peer->ksnp_conns))
2394                 return 0;
2395
2396         if (peer->ksnp_proto != &ksocknal_protocol_v3x)
2397                 return 0;
2398
2399         if (*ksocknal_tunables.ksnd_keepalive <= 0 ||
2400             time_before(cfs_time_current(),
2401                         cfs_time_add(peer->ksnp_last_alive,
2402                                      cfs_time_seconds(*ksocknal_tunables.ksnd_keepalive))))
2403                 return 0;
2404
2405         if (time_before(cfs_time_current(), peer->ksnp_send_keepalive))
2406                 return 0;
2407
2408         /*
2409          * retry 10 secs later, so we wouldn't put pressure
2410          * on this peer if we failed to send keepalive this time
2411          */
2412         peer->ksnp_send_keepalive = cfs_time_shift(10);
2413
2414         conn = ksocknal_find_conn_locked(peer, NULL, 1);
2415         if (conn) {
2416                 sched = conn->ksnc_scheduler;
2417
2418                 spin_lock_bh(&sched->kss_lock);
2419                 if (!list_empty(&conn->ksnc_tx_queue)) {
2420                         spin_unlock_bh(&sched->kss_lock);
2421                         /* there is an queued ACK, don't need keepalive */
2422                         return 0;
2423                 }
2424
2425                 spin_unlock_bh(&sched->kss_lock);
2426         }
2427
2428         read_unlock(&ksocknal_data.ksnd_global_lock);
2429
2430         /* cookie = 1 is reserved for keepalive PING */
2431         tx = ksocknal_alloc_tx_noop(1, 1);
2432         if (!tx) {
2433                 read_lock(&ksocknal_data.ksnd_global_lock);
2434                 return -ENOMEM;
2435         }
2436
2437         if (!ksocknal_launch_packet(peer->ksnp_ni, tx, peer->ksnp_id)) {
2438                 read_lock(&ksocknal_data.ksnd_global_lock);
2439                 return 1;
2440         }
2441
2442         ksocknal_free_tx(tx);
2443         read_lock(&ksocknal_data.ksnd_global_lock);
2444
2445         return -EIO;
2446 }
2447
2448 static void
2449 ksocknal_check_peer_timeouts(int idx)
2450 {
2451         struct list_head *peers = &ksocknal_data.ksnd_peers[idx];
2452         struct ksock_peer *peer;
2453         struct ksock_conn *conn;
2454         struct ksock_tx *tx;
2455
2456  again:
2457         /*
2458          * NB. We expect to have a look at all the peers and not find any
2459          * connections to time out, so we just use a shared lock while we
2460          * take a look...
2461          */
2462         read_lock(&ksocknal_data.ksnd_global_lock);
2463
2464         list_for_each_entry(peer, peers, ksnp_list) {
2465                 unsigned long deadline = 0;
2466                 struct ksock_tx *tx_stale;
2467                 int resid = 0;
2468                 int n = 0;
2469
2470                 if (ksocknal_send_keepalive_locked(peer)) {
2471                         read_unlock(&ksocknal_data.ksnd_global_lock);
2472                         goto again;
2473                 }
2474
2475                 conn = ksocknal_find_timed_out_conn(peer);
2476
2477                 if (conn) {
2478                         read_unlock(&ksocknal_data.ksnd_global_lock);
2479
2480                         ksocknal_close_conn_and_siblings(conn, -ETIMEDOUT);
2481
2482                         /*
2483                          * NB we won't find this one again, but we can't
2484                          * just proceed with the next peer, since we dropped
2485                          * ksnd_global_lock and it might be dead already!
2486                          */
2487                         ksocknal_conn_decref(conn);
2488                         goto again;
2489                 }
2490
2491                 /*
2492                  * we can't process stale txs right here because we're
2493                  * holding only shared lock
2494                  */
2495                 if (!list_empty(&peer->ksnp_tx_queue)) {
2496                         tx = list_entry(peer->ksnp_tx_queue.next,
2497                                         struct ksock_tx, tx_list);
2498
2499                         if (cfs_time_aftereq(cfs_time_current(),
2500                                              tx->tx_deadline)) {
2501                                 ksocknal_peer_addref(peer);
2502                                 read_unlock(&ksocknal_data.ksnd_global_lock);
2503
2504                                 ksocknal_flush_stale_txs(peer);
2505
2506                                 ksocknal_peer_decref(peer);
2507                                 goto again;
2508                         }
2509                 }
2510
2511                 if (list_empty(&peer->ksnp_zc_req_list))
2512                         continue;
2513
2514                 tx_stale = NULL;
2515                 spin_lock(&peer->ksnp_lock);
2516                 list_for_each_entry(tx, &peer->ksnp_zc_req_list, tx_zc_list) {
2517                         if (!cfs_time_aftereq(cfs_time_current(),
2518                                               tx->tx_deadline))
2519                                 break;
2520                         /* ignore the TX if connection is being closed */
2521                         if (tx->tx_conn->ksnc_closing)
2522                                 continue;
2523                         if (!tx_stale)
2524                                 tx_stale = tx;
2525                         n++;
2526                 }
2527
2528                 if (!tx_stale) {
2529                         spin_unlock(&peer->ksnp_lock);
2530                         continue;
2531                 }
2532
2533                 deadline = tx_stale->tx_deadline;
2534                 resid = tx_stale->tx_resid;
2535                 conn = tx_stale->tx_conn;
2536                 ksocknal_conn_addref(conn);
2537
2538                 spin_unlock(&peer->ksnp_lock);
2539                 read_unlock(&ksocknal_data.ksnd_global_lock);
2540
2541                 CERROR("Total %d stale ZC_REQs for peer %s detected; the oldest(%p) timed out %ld secs ago, resid: %d, wmem: %d\n",
2542                        n, libcfs_nid2str(peer->ksnp_id.nid), tx_stale,
2543                        cfs_duration_sec(cfs_time_current() - deadline),
2544                        resid, conn->ksnc_sock->sk->sk_wmem_queued);
2545
2546                 ksocknal_close_conn_and_siblings(conn, -ETIMEDOUT);
2547                 ksocknal_conn_decref(conn);
2548                 goto again;
2549         }
2550
2551         read_unlock(&ksocknal_data.ksnd_global_lock);
2552 }
2553
2554 int
2555 ksocknal_reaper(void *arg)
2556 {
2557         wait_queue_entry_t wait;
2558         struct ksock_conn *conn;
2559         struct ksock_sched *sched;
2560         struct list_head enomem_conns;
2561         int nenomem_conns;
2562         long timeout;
2563         int i;
2564         int peer_index = 0;
2565         unsigned long deadline = cfs_time_current();
2566
2567         cfs_block_allsigs();
2568
2569         INIT_LIST_HEAD(&enomem_conns);
2570         init_waitqueue_entry(&wait, current);
2571
2572         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
2573
2574         while (!ksocknal_data.ksnd_shuttingdown) {
2575                 if (!list_empty(&ksocknal_data.ksnd_deathrow_conns)) {
2576                         conn = list_entry(ksocknal_data.ksnd_deathrow_conns.next,
2577                                           struct ksock_conn, ksnc_list);
2578                         list_del(&conn->ksnc_list);
2579
2580                         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
2581
2582                         ksocknal_terminate_conn(conn);
2583                         ksocknal_conn_decref(conn);
2584
2585                         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
2586                         continue;
2587                 }
2588
2589                 if (!list_empty(&ksocknal_data.ksnd_zombie_conns)) {
2590                         conn = list_entry(ksocknal_data.ksnd_zombie_conns.next,
2591                                           struct ksock_conn, ksnc_list);
2592                         list_del(&conn->ksnc_list);
2593
2594                         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
2595
2596                         ksocknal_destroy_conn(conn);
2597
2598                         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
2599                         continue;
2600                 }
2601
2602                 if (!list_empty(&ksocknal_data.ksnd_enomem_conns)) {
2603                         list_add(&enomem_conns,
2604                                  &ksocknal_data.ksnd_enomem_conns);
2605                         list_del_init(&ksocknal_data.ksnd_enomem_conns);
2606                 }
2607
2608                 spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
2609
2610                 /* reschedule all the connections that stalled with ENOMEM... */
2611                 nenomem_conns = 0;
2612                 while (!list_empty(&enomem_conns)) {
2613                         conn = list_entry(enomem_conns.next, struct ksock_conn,
2614                                           ksnc_tx_list);
2615                         list_del(&conn->ksnc_tx_list);
2616
2617                         sched = conn->ksnc_scheduler;
2618
2619                         spin_lock_bh(&sched->kss_lock);
2620
2621                         LASSERT(conn->ksnc_tx_scheduled);
2622                         conn->ksnc_tx_ready = 1;
2623                         list_add_tail(&conn->ksnc_tx_list,
2624                                       &sched->kss_tx_conns);
2625                         wake_up(&sched->kss_waitq);
2626
2627                         spin_unlock_bh(&sched->kss_lock);
2628                         nenomem_conns++;
2629                 }
2630
2631                 /* careful with the jiffy wrap... */
2632                 while ((timeout = cfs_time_sub(deadline,
2633                                                cfs_time_current())) <= 0) {
2634                         const int n = 4;
2635                         const int p = 1;
2636                         int chunk = ksocknal_data.ksnd_peer_hash_size;
2637
2638                         /*
2639                          * Time to check for timeouts on a few more peers: I do
2640                          * checks every 'p' seconds on a proportion of the peer
2641                          * table and I need to check every connection 'n' times
2642                          * within a timeout interval, to ensure I detect a
2643                          * timeout on any connection within (n+1)/n times the
2644                          * timeout interval.
2645                          */
2646                         if (*ksocknal_tunables.ksnd_timeout > n * p)
2647                                 chunk = (chunk * n * p) /
2648                                         *ksocknal_tunables.ksnd_timeout;
2649                         if (!chunk)
2650                                 chunk = 1;
2651
2652                         for (i = 0; i < chunk; i++) {
2653                                 ksocknal_check_peer_timeouts(peer_index);
2654                                 peer_index = (peer_index + 1) %
2655                                              ksocknal_data.ksnd_peer_hash_size;
2656                         }
2657
2658                         deadline = cfs_time_add(deadline, cfs_time_seconds(p));
2659                 }
2660
2661                 if (nenomem_conns) {
2662                         /*
2663                          * Reduce my timeout if I rescheduled ENOMEM conns.
2664                          * This also prevents me getting woken immediately
2665                          * if any go back on my enomem list.
2666                          */
2667                         timeout = SOCKNAL_ENOMEM_RETRY;
2668                 }
2669                 ksocknal_data.ksnd_reaper_waketime =
2670                         cfs_time_add(cfs_time_current(), timeout);
2671
2672                 set_current_state(TASK_INTERRUPTIBLE);
2673                 add_wait_queue(&ksocknal_data.ksnd_reaper_waitq, &wait);
2674
2675                 if (!ksocknal_data.ksnd_shuttingdown &&
2676                     list_empty(&ksocknal_data.ksnd_deathrow_conns) &&
2677                     list_empty(&ksocknal_data.ksnd_zombie_conns))
2678                         schedule_timeout(timeout);
2679
2680                 set_current_state(TASK_RUNNING);
2681                 remove_wait_queue(&ksocknal_data.ksnd_reaper_waitq, &wait);
2682
2683                 spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
2684         }
2685
2686         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
2687
2688         ksocknal_thread_fini();
2689         return 0;
2690 }