GNU Linux-libre 5.4.257-gnu1
[releases.git] / net / rxrpc / recvmsg.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* RxRPC recvmsg() implementation
3  *
4  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
5  * Written by David Howells (dhowells@redhat.com)
6  */
7
8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9
10 #include <linux/net.h>
11 #include <linux/skbuff.h>
12 #include <linux/export.h>
13 #include <linux/sched/signal.h>
14
15 #include <net/sock.h>
16 #include <net/af_rxrpc.h>
17 #include "ar-internal.h"
18
19 /*
20  * Post a call for attention by the socket or kernel service.  Further
21  * notifications are suppressed by putting recvmsg_link on a dummy queue.
22  */
23 void rxrpc_notify_socket(struct rxrpc_call *call)
24 {
25         struct rxrpc_sock *rx;
26         struct sock *sk;
27
28         _enter("%d", call->debug_id);
29
30         if (!list_empty(&call->recvmsg_link))
31                 return;
32
33         rcu_read_lock();
34
35         rx = rcu_dereference(call->socket);
36         sk = &rx->sk;
37         if (rx && sk->sk_state < RXRPC_CLOSE) {
38                 if (call->notify_rx) {
39                         spin_lock_bh(&call->notify_lock);
40                         call->notify_rx(sk, call, call->user_call_ID);
41                         spin_unlock_bh(&call->notify_lock);
42                 } else {
43                         write_lock_bh(&rx->recvmsg_lock);
44                         if (list_empty(&call->recvmsg_link)) {
45                                 rxrpc_get_call(call, rxrpc_call_got);
46                                 list_add_tail(&call->recvmsg_link, &rx->recvmsg_q);
47                         }
48                         write_unlock_bh(&rx->recvmsg_lock);
49
50                         if (!sock_flag(sk, SOCK_DEAD)) {
51                                 _debug("call %ps", sk->sk_data_ready);
52                                 sk->sk_data_ready(sk);
53                         }
54                 }
55         }
56
57         rcu_read_unlock();
58         _leave("");
59 }
60
61 /*
62  * Pass a call terminating message to userspace.
63  */
64 static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
65 {
66         u32 tmp = 0;
67         int ret;
68
69         switch (call->completion) {
70         case RXRPC_CALL_SUCCEEDED:
71                 ret = 0;
72                 if (rxrpc_is_service_call(call))
73                         ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp);
74                 break;
75         case RXRPC_CALL_REMOTELY_ABORTED:
76                 tmp = call->abort_code;
77                 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
78                 break;
79         case RXRPC_CALL_LOCALLY_ABORTED:
80                 tmp = call->abort_code;
81                 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
82                 break;
83         case RXRPC_CALL_NETWORK_ERROR:
84                 tmp = -call->error;
85                 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp);
86                 break;
87         case RXRPC_CALL_LOCAL_ERROR:
88                 tmp = -call->error;
89                 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
90                 break;
91         default:
92                 pr_err("Invalid terminal call state %u\n", call->state);
93                 BUG();
94                 break;
95         }
96
97         trace_rxrpc_recvmsg(call, rxrpc_recvmsg_terminal, call->rx_hard_ack,
98                             call->rx_pkt_offset, call->rx_pkt_len, ret);
99         return ret;
100 }
101
102 /*
103  * Pass back notification of a new call.  The call is added to the
104  * to-be-accepted list.  This means that the next call to be accepted might not
105  * be the last call seen awaiting acceptance, but unless we leave this on the
106  * front of the queue and block all other messages until someone gives us a
107  * user_ID for it, there's not a lot we can do.
108  */
109 static int rxrpc_recvmsg_new_call(struct rxrpc_sock *rx,
110                                   struct rxrpc_call *call,
111                                   struct msghdr *msg, int flags)
112 {
113         int tmp = 0, ret;
114
115         ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NEW_CALL, 0, &tmp);
116
117         if (ret == 0 && !(flags & MSG_PEEK)) {
118                 _debug("to be accepted");
119                 write_lock_bh(&rx->recvmsg_lock);
120                 list_del_init(&call->recvmsg_link);
121                 write_unlock_bh(&rx->recvmsg_lock);
122
123                 rxrpc_get_call(call, rxrpc_call_got);
124                 write_lock(&rx->call_lock);
125                 list_add_tail(&call->accept_link, &rx->to_be_accepted);
126                 write_unlock(&rx->call_lock);
127         }
128
129         trace_rxrpc_recvmsg(call, rxrpc_recvmsg_to_be_accepted, 1, 0, 0, ret);
130         return ret;
131 }
132
133 /*
134  * End the packet reception phase.
135  */
136 static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
137 {
138         _enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]);
139
140         trace_rxrpc_receive(call, rxrpc_receive_end, 0, call->rx_top);
141         ASSERTCMP(call->rx_hard_ack, ==, call->rx_top);
142
143         if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) {
144                 rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, serial, false, true,
145                                   rxrpc_propose_ack_terminal_ack);
146                 //rxrpc_send_ack_packet(call, false, NULL);
147         }
148
149         write_lock_bh(&call->state_lock);
150
151         switch (call->state) {
152         case RXRPC_CALL_CLIENT_RECV_REPLY:
153                 __rxrpc_call_completed(call);
154                 write_unlock_bh(&call->state_lock);
155                 break;
156
157         case RXRPC_CALL_SERVER_RECV_REQUEST:
158                 call->tx_phase = true;
159                 call->state = RXRPC_CALL_SERVER_ACK_REQUEST;
160                 call->expect_req_by = jiffies + MAX_JIFFY_OFFSET;
161                 write_unlock_bh(&call->state_lock);
162                 rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, serial, false, true,
163                                   rxrpc_propose_ack_processing_op);
164                 break;
165         default:
166                 write_unlock_bh(&call->state_lock);
167                 break;
168         }
169 }
170
171 /*
172  * Discard a packet we've used up and advance the Rx window by one.
173  */
174 static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
175 {
176         struct rxrpc_skb_priv *sp;
177         struct sk_buff *skb;
178         rxrpc_serial_t serial;
179         rxrpc_seq_t hard_ack, top;
180         bool last = false;
181         u8 subpacket;
182         int ix;
183
184         _enter("%d", call->debug_id);
185
186         hard_ack = call->rx_hard_ack;
187         top = smp_load_acquire(&call->rx_top);
188         ASSERT(before(hard_ack, top));
189
190         hard_ack++;
191         ix = hard_ack & RXRPC_RXTX_BUFF_MASK;
192         skb = call->rxtx_buffer[ix];
193         rxrpc_see_skb(skb, rxrpc_skb_rotated);
194         sp = rxrpc_skb(skb);
195
196         subpacket = call->rxtx_annotations[ix] & RXRPC_RX_ANNO_SUBPACKET;
197         serial = sp->hdr.serial + subpacket;
198
199         if (subpacket == sp->nr_subpackets - 1 &&
200             sp->rx_flags & RXRPC_SKB_INCL_LAST)
201                 last = true;
202
203         call->rxtx_buffer[ix] = NULL;
204         call->rxtx_annotations[ix] = 0;
205         /* Barrier against rxrpc_input_data(). */
206         smp_store_release(&call->rx_hard_ack, hard_ack);
207
208         rxrpc_free_skb(skb, rxrpc_skb_freed);
209
210         trace_rxrpc_receive(call, rxrpc_receive_rotate, serial, hard_ack);
211         if (last) {
212                 rxrpc_end_rx_phase(call, serial);
213         } else {
214                 /* Check to see if there's an ACK that needs sending. */
215                 if (atomic_inc_return(&call->ackr_nr_consumed) > 2)
216                         rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, serial,
217                                           true, false,
218                                           rxrpc_propose_ack_rotate_rx);
219                 if (call->ackr_reason && call->ackr_reason != RXRPC_ACK_DELAY)
220                         rxrpc_send_ack_packet(call, false, NULL);
221         }
222 }
223
224 /*
225  * Decrypt and verify a (sub)packet.  The packet's length may be changed due to
226  * padding, but if this is the case, the packet length will be resident in the
227  * socket buffer.  Note that we can't modify the master skb info as the skb may
228  * be the home to multiple subpackets.
229  */
230 static int rxrpc_verify_packet(struct rxrpc_call *call, struct sk_buff *skb,
231                                u8 annotation,
232                                unsigned int offset, unsigned int len)
233 {
234         struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
235         rxrpc_seq_t seq = sp->hdr.seq;
236         u16 cksum = sp->hdr.cksum;
237         u8 subpacket = annotation & RXRPC_RX_ANNO_SUBPACKET;
238
239         _enter("");
240
241         /* For all but the head jumbo subpacket, the security checksum is in a
242          * jumbo header immediately prior to the data.
243          */
244         if (subpacket > 0) {
245                 __be16 tmp;
246                 if (skb_copy_bits(skb, offset - 2, &tmp, 2) < 0)
247                         BUG();
248                 cksum = ntohs(tmp);
249                 seq += subpacket;
250         }
251
252         return call->security->verify_packet(call, skb, offset, len,
253                                              seq, cksum);
254 }
255
256 /*
257  * Locate the data within a packet.  This is complicated by:
258  *
259  * (1) An skb may contain a jumbo packet - so we have to find the appropriate
260  *     subpacket.
261  *
262  * (2) The (sub)packets may be encrypted and, if so, the encrypted portion
263  *     contains an extra header which includes the true length of the data,
264  *     excluding any encrypted padding.
265  */
266 static int rxrpc_locate_data(struct rxrpc_call *call, struct sk_buff *skb,
267                              u8 *_annotation,
268                              unsigned int *_offset, unsigned int *_len,
269                              bool *_last)
270 {
271         struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
272         unsigned int offset = sizeof(struct rxrpc_wire_header);
273         unsigned int len;
274         bool last = false;
275         int ret;
276         u8 annotation = *_annotation;
277         u8 subpacket = annotation & RXRPC_RX_ANNO_SUBPACKET;
278
279         /* Locate the subpacket */
280         offset += subpacket * RXRPC_JUMBO_SUBPKTLEN;
281         len = skb->len - offset;
282         if (subpacket < sp->nr_subpackets - 1)
283                 len = RXRPC_JUMBO_DATALEN;
284         else if (sp->rx_flags & RXRPC_SKB_INCL_LAST)
285                 last = true;
286
287         if (!(annotation & RXRPC_RX_ANNO_VERIFIED)) {
288                 ret = rxrpc_verify_packet(call, skb, annotation, offset, len);
289                 if (ret < 0)
290                         return ret;
291                 *_annotation |= RXRPC_RX_ANNO_VERIFIED;
292         }
293
294         *_offset = offset;
295         *_len = len;
296         *_last = last;
297         call->security->locate_data(call, skb, _offset, _len);
298         return 0;
299 }
300
301 /*
302  * Deliver messages to a call.  This keeps processing packets until the buffer
303  * is filled and we find either more DATA (returns 0) or the end of the DATA
304  * (returns 1).  If more packets are required, it returns -EAGAIN.
305  */
306 static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
307                               struct msghdr *msg, struct iov_iter *iter,
308                               size_t len, int flags, size_t *_offset)
309 {
310         struct rxrpc_skb_priv *sp;
311         struct sk_buff *skb;
312         rxrpc_serial_t serial;
313         rxrpc_seq_t hard_ack, top, seq;
314         size_t remain;
315         bool rx_pkt_last;
316         unsigned int rx_pkt_offset, rx_pkt_len;
317         int ix, copy, ret = -EAGAIN, ret2;
318
319         if (test_and_clear_bit(RXRPC_CALL_RX_UNDERRUN, &call->flags) &&
320             call->ackr_reason)
321                 rxrpc_send_ack_packet(call, false, NULL);
322
323         rx_pkt_offset = call->rx_pkt_offset;
324         rx_pkt_len = call->rx_pkt_len;
325         rx_pkt_last = call->rx_pkt_last;
326
327         if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) {
328                 seq = call->rx_hard_ack;
329                 ret = 1;
330                 goto done;
331         }
332
333         /* Barriers against rxrpc_input_data(). */
334         hard_ack = call->rx_hard_ack;
335         seq = hard_ack + 1;
336
337         while (top = smp_load_acquire(&call->rx_top),
338                before_eq(seq, top)
339                ) {
340                 ix = seq & RXRPC_RXTX_BUFF_MASK;
341                 skb = call->rxtx_buffer[ix];
342                 if (!skb) {
343                         trace_rxrpc_recvmsg(call, rxrpc_recvmsg_hole, seq,
344                                             rx_pkt_offset, rx_pkt_len, 0);
345                         break;
346                 }
347                 smp_rmb();
348                 rxrpc_see_skb(skb, rxrpc_skb_seen);
349                 sp = rxrpc_skb(skb);
350
351                 if (!(flags & MSG_PEEK)) {
352                         serial = sp->hdr.serial;
353                         serial += call->rxtx_annotations[ix] & RXRPC_RX_ANNO_SUBPACKET;
354                         trace_rxrpc_receive(call, rxrpc_receive_front,
355                                             serial, seq);
356                 }
357
358                 if (msg)
359                         sock_recv_timestamp(msg, sock->sk, skb);
360
361                 if (rx_pkt_offset == 0) {
362                         ret2 = rxrpc_locate_data(call, skb,
363                                                  &call->rxtx_annotations[ix],
364                                                  &rx_pkt_offset, &rx_pkt_len,
365                                                  &rx_pkt_last);
366                         trace_rxrpc_recvmsg(call, rxrpc_recvmsg_next, seq,
367                                             rx_pkt_offset, rx_pkt_len, ret2);
368                         if (ret2 < 0) {
369                                 ret = ret2;
370                                 goto out;
371                         }
372                 } else {
373                         trace_rxrpc_recvmsg(call, rxrpc_recvmsg_cont, seq,
374                                             rx_pkt_offset, rx_pkt_len, 0);
375                 }
376
377                 /* We have to handle short, empty and used-up DATA packets. */
378                 remain = len - *_offset;
379                 copy = rx_pkt_len;
380                 if (copy > remain)
381                         copy = remain;
382                 if (copy > 0) {
383                         ret2 = skb_copy_datagram_iter(skb, rx_pkt_offset, iter,
384                                                       copy);
385                         if (ret2 < 0) {
386                                 ret = ret2;
387                                 goto out;
388                         }
389
390                         /* handle piecemeal consumption of data packets */
391                         rx_pkt_offset += copy;
392                         rx_pkt_len -= copy;
393                         *_offset += copy;
394                 }
395
396                 if (rx_pkt_len > 0) {
397                         trace_rxrpc_recvmsg(call, rxrpc_recvmsg_full, seq,
398                                             rx_pkt_offset, rx_pkt_len, 0);
399                         ASSERTCMP(*_offset, ==, len);
400                         ret = 0;
401                         break;
402                 }
403
404                 /* The whole packet has been transferred. */
405                 if (!(flags & MSG_PEEK))
406                         rxrpc_rotate_rx_window(call);
407                 rx_pkt_offset = 0;
408                 rx_pkt_len = 0;
409
410                 if (rx_pkt_last) {
411                         ASSERTCMP(seq, ==, READ_ONCE(call->rx_top));
412                         ret = 1;
413                         goto out;
414                 }
415
416                 seq++;
417         }
418
419 out:
420         if (!(flags & MSG_PEEK)) {
421                 call->rx_pkt_offset = rx_pkt_offset;
422                 call->rx_pkt_len = rx_pkt_len;
423                 call->rx_pkt_last = rx_pkt_last;
424         }
425 done:
426         trace_rxrpc_recvmsg(call, rxrpc_recvmsg_data_return, seq,
427                             rx_pkt_offset, rx_pkt_len, ret);
428         if (ret == -EAGAIN)
429                 set_bit(RXRPC_CALL_RX_UNDERRUN, &call->flags);
430         return ret;
431 }
432
433 /*
434  * Receive a message from an RxRPC socket
435  * - we need to be careful about two or more threads calling recvmsg
436  *   simultaneously
437  */
438 int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
439                   int flags)
440 {
441         struct rxrpc_call *call;
442         struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
443         struct list_head *l;
444         size_t copied = 0;
445         long timeo;
446         int ret;
447
448         DEFINE_WAIT(wait);
449
450         trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_enter, 0, 0, 0, 0);
451
452         if (flags & (MSG_OOB | MSG_TRUNC))
453                 return -EOPNOTSUPP;
454
455         timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
456
457 try_again:
458         lock_sock(&rx->sk);
459
460         /* Return immediately if a client socket has no outstanding calls */
461         if (RB_EMPTY_ROOT(&rx->calls) &&
462             list_empty(&rx->recvmsg_q) &&
463             rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
464                 release_sock(&rx->sk);
465                 return -EAGAIN;
466         }
467
468         if (list_empty(&rx->recvmsg_q)) {
469                 ret = -EWOULDBLOCK;
470                 if (timeo == 0) {
471                         call = NULL;
472                         goto error_no_call;
473                 }
474
475                 release_sock(&rx->sk);
476
477                 /* Wait for something to happen */
478                 prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
479                                           TASK_INTERRUPTIBLE);
480                 ret = sock_error(&rx->sk);
481                 if (ret)
482                         goto wait_error;
483
484                 if (list_empty(&rx->recvmsg_q)) {
485                         if (signal_pending(current))
486                                 goto wait_interrupted;
487                         trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_wait,
488                                             0, 0, 0, 0);
489                         timeo = schedule_timeout(timeo);
490                 }
491                 finish_wait(sk_sleep(&rx->sk), &wait);
492                 goto try_again;
493         }
494
495         /* Find the next call and dequeue it if we're not just peeking.  If we
496          * do dequeue it, that comes with a ref that we will need to release.
497          */
498         write_lock_bh(&rx->recvmsg_lock);
499         l = rx->recvmsg_q.next;
500         call = list_entry(l, struct rxrpc_call, recvmsg_link);
501         if (!(flags & MSG_PEEK))
502                 list_del_init(&call->recvmsg_link);
503         else
504                 rxrpc_get_call(call, rxrpc_call_got);
505         write_unlock_bh(&rx->recvmsg_lock);
506
507         trace_rxrpc_recvmsg(call, rxrpc_recvmsg_dequeue, 0, 0, 0, 0);
508
509         /* We're going to drop the socket lock, so we need to lock the call
510          * against interference by sendmsg.
511          */
512         if (!mutex_trylock(&call->user_mutex)) {
513                 ret = -EWOULDBLOCK;
514                 if (flags & MSG_DONTWAIT)
515                         goto error_requeue_call;
516                 ret = -ERESTARTSYS;
517                 if (mutex_lock_interruptible(&call->user_mutex) < 0)
518                         goto error_requeue_call;
519         }
520
521         release_sock(&rx->sk);
522
523         if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
524                 BUG();
525
526         if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
527                 if (flags & MSG_CMSG_COMPAT) {
528                         unsigned int id32 = call->user_call_ID;
529
530                         ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
531                                        sizeof(unsigned int), &id32);
532                 } else {
533                         unsigned long idl = call->user_call_ID;
534
535                         ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
536                                        sizeof(unsigned long), &idl);
537                 }
538                 if (ret < 0)
539                         goto error_unlock_call;
540         }
541
542         if (msg->msg_name && call->peer) {
543                 struct sockaddr_rxrpc *srx = msg->msg_name;
544                 size_t len = sizeof(call->peer->srx);
545
546                 memcpy(msg->msg_name, &call->peer->srx, len);
547                 srx->srx_service = call->service_id;
548                 msg->msg_namelen = len;
549         }
550
551         switch (READ_ONCE(call->state)) {
552         case RXRPC_CALL_SERVER_ACCEPTING:
553                 ret = rxrpc_recvmsg_new_call(rx, call, msg, flags);
554                 break;
555         case RXRPC_CALL_CLIENT_RECV_REPLY:
556         case RXRPC_CALL_SERVER_RECV_REQUEST:
557         case RXRPC_CALL_SERVER_ACK_REQUEST:
558                 ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
559                                          flags, &copied);
560                 if (ret == -EAGAIN)
561                         ret = 0;
562
563                 if (after(call->rx_top, call->rx_hard_ack) &&
564                     call->rxtx_buffer[(call->rx_hard_ack + 1) & RXRPC_RXTX_BUFF_MASK])
565                         rxrpc_notify_socket(call);
566                 break;
567         default:
568                 ret = 0;
569                 break;
570         }
571
572         if (ret < 0)
573                 goto error_unlock_call;
574
575         if (call->state == RXRPC_CALL_COMPLETE) {
576                 ret = rxrpc_recvmsg_term(call, msg);
577                 if (ret < 0)
578                         goto error_unlock_call;
579                 if (!(flags & MSG_PEEK))
580                         rxrpc_release_call(rx, call);
581                 msg->msg_flags |= MSG_EOR;
582                 ret = 1;
583         }
584
585         if (ret == 0)
586                 msg->msg_flags |= MSG_MORE;
587         else
588                 msg->msg_flags &= ~MSG_MORE;
589         ret = copied;
590
591 error_unlock_call:
592         mutex_unlock(&call->user_mutex);
593         rxrpc_put_call(call, rxrpc_call_put);
594         trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, 0, 0, 0, ret);
595         return ret;
596
597 error_requeue_call:
598         if (!(flags & MSG_PEEK)) {
599                 write_lock_bh(&rx->recvmsg_lock);
600                 list_add(&call->recvmsg_link, &rx->recvmsg_q);
601                 write_unlock_bh(&rx->recvmsg_lock);
602                 trace_rxrpc_recvmsg(call, rxrpc_recvmsg_requeue, 0, 0, 0, 0);
603         } else {
604                 rxrpc_put_call(call, rxrpc_call_put);
605         }
606 error_no_call:
607         release_sock(&rx->sk);
608 error_trace:
609         trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, 0, 0, 0, ret);
610         return ret;
611
612 wait_interrupted:
613         ret = sock_intr_errno(timeo);
614 wait_error:
615         finish_wait(sk_sleep(&rx->sk), &wait);
616         call = NULL;
617         goto error_trace;
618 }
619
620 /**
621  * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
622  * @sock: The socket that the call exists on
623  * @call: The call to send data through
624  * @iter: The buffer to receive into
625  * @want_more: True if more data is expected to be read
626  * @_abort: Where the abort code is stored if -ECONNABORTED is returned
627  * @_service: Where to store the actual service ID (may be upgraded)
628  *
629  * Allow a kernel service to receive data and pick up information about the
630  * state of a call.  Returns 0 if got what was asked for and there's more
631  * available, 1 if we got what was asked for and we're at the end of the data
632  * and -EAGAIN if we need more data.
633  *
634  * Note that we may return -EAGAIN to drain empty packets at the end of the
635  * data, even if we've already copied over the requested data.
636  *
637  * *_abort should also be initialised to 0.
638  */
639 int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
640                            struct iov_iter *iter,
641                            bool want_more, u32 *_abort, u16 *_service)
642 {
643         size_t offset = 0;
644         int ret;
645
646         _enter("{%d,%s},%zu,%d",
647                call->debug_id, rxrpc_call_states[call->state],
648                iov_iter_count(iter), want_more);
649
650         ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_ACCEPTING);
651
652         mutex_lock(&call->user_mutex);
653
654         switch (READ_ONCE(call->state)) {
655         case RXRPC_CALL_CLIENT_RECV_REPLY:
656         case RXRPC_CALL_SERVER_RECV_REQUEST:
657         case RXRPC_CALL_SERVER_ACK_REQUEST:
658                 ret = rxrpc_recvmsg_data(sock, call, NULL, iter,
659                                          iov_iter_count(iter), 0,
660                                          &offset);
661                 if (ret < 0)
662                         goto out;
663
664                 /* We can only reach here with a partially full buffer if we
665                  * have reached the end of the data.  We must otherwise have a
666                  * full buffer or have been given -EAGAIN.
667                  */
668                 if (ret == 1) {
669                         if (iov_iter_count(iter) > 0)
670                                 goto short_data;
671                         if (!want_more)
672                                 goto read_phase_complete;
673                         ret = 0;
674                         goto out;
675                 }
676
677                 if (!want_more)
678                         goto excess_data;
679                 goto out;
680
681         case RXRPC_CALL_COMPLETE:
682                 goto call_complete;
683
684         default:
685                 ret = -EINPROGRESS;
686                 goto out;
687         }
688
689 read_phase_complete:
690         ret = 1;
691 out:
692         switch (call->ackr_reason) {
693         case RXRPC_ACK_IDLE:
694                 break;
695         case RXRPC_ACK_DELAY:
696                 if (ret != -EAGAIN)
697                         break;
698                 /* Fall through */
699         default:
700                 rxrpc_send_ack_packet(call, false, NULL);
701         }
702
703         if (_service)
704                 *_service = call->service_id;
705         mutex_unlock(&call->user_mutex);
706         _leave(" = %d [%zu,%d]", ret, iov_iter_count(iter), *_abort);
707         return ret;
708
709 short_data:
710         trace_rxrpc_rx_eproto(call, 0, tracepoint_string("short_data"));
711         ret = -EBADMSG;
712         goto out;
713 excess_data:
714         trace_rxrpc_rx_eproto(call, 0, tracepoint_string("excess_data"));
715         ret = -EMSGSIZE;
716         goto out;
717 call_complete:
718         *_abort = call->abort_code;
719         ret = call->error;
720         if (call->completion == RXRPC_CALL_SUCCEEDED) {
721                 ret = 1;
722                 if (iov_iter_count(iter) > 0)
723                         ret = -ECONNRESET;
724         }
725         goto out;
726 }
727 EXPORT_SYMBOL(rxrpc_kernel_recv_data);
728
729 /**
730  * rxrpc_kernel_get_reply_time - Get timestamp on first reply packet
731  * @sock: The socket that the call exists on
732  * @call: The call to query
733  * @_ts: Where to put the timestamp
734  *
735  * Retrieve the timestamp from the first DATA packet of the reply if it is
736  * in the ring.  Returns true if successful, false if not.
737  */
738 bool rxrpc_kernel_get_reply_time(struct socket *sock, struct rxrpc_call *call,
739                                  ktime_t *_ts)
740 {
741         struct sk_buff *skb;
742         rxrpc_seq_t hard_ack, top, seq;
743         bool success = false;
744
745         mutex_lock(&call->user_mutex);
746
747         if (READ_ONCE(call->state) != RXRPC_CALL_CLIENT_RECV_REPLY)
748                 goto out;
749
750         hard_ack = call->rx_hard_ack;
751         if (hard_ack != 0)
752                 goto out;
753
754         seq = hard_ack + 1;
755         top = smp_load_acquire(&call->rx_top);
756         if (after(seq, top))
757                 goto out;
758
759         skb = call->rxtx_buffer[seq & RXRPC_RXTX_BUFF_MASK];
760         if (!skb)
761                 goto out;
762
763         *_ts = skb_get_ktime(skb);
764         success = true;
765
766 out:
767         mutex_unlock(&call->user_mutex);
768         return success;
769 }
770 EXPORT_SYMBOL(rxrpc_kernel_get_reply_time);