vmm/dumbo/tcp/
handler.rs

1// Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Exposes simple TCP over IPv4 listener functionality via the [`TcpIPv4Handler`] structure.
5//!
6//! [`TcpIPv4Handler`]: struct.TcpIPv4Handler.html
7
8use std::collections::{HashMap, HashSet};
9use std::fmt::Debug;
10use std::net::Ipv4Addr;
11use std::num::NonZeroUsize;
12
13use micro_http::{Request, Response};
14
15use crate::dumbo::pdu::bytes::NetworkBytes;
16use crate::dumbo::pdu::ipv4::{IPv4Packet, Ipv4Error as IPv4PacketError, PROTOCOL_TCP};
17use crate::dumbo::pdu::tcp::{Flags as TcpFlags, TcpError as TcpSegmentError, TcpSegment};
18use crate::dumbo::tcp::endpoint::Endpoint;
19use crate::dumbo::tcp::{NextSegmentStatus, RstConfig};
20
21// TODO: This is currently IPv4 specific. Maybe change it to a more generic implementation.
22
23/// Describes events which may occur when the handler receives packets.
24#[derive(Debug, PartialEq, Eq)]
25pub enum RecvEvent {
26    /// The local endpoint is done communicating, and has been removed.
27    EndpointDone,
28    /// An error occurred while trying to create a new `Endpoint` object, based on an incoming
29    /// `SYN` segment.
30    FailedNewConnection,
31    /// A new local `Endpoint` has been successfully created.
32    NewConnectionSuccessful,
33    /// Failed to add a local `Endpoint` because the handler is already at the maximum number of
34    /// concurrent connections, and there are no evictable Endpoints.
35    NewConnectionDropped,
36    /// A new local `Endpoint` has been successfully created, but the handler had to make room by
37    /// evicting an older `Endpoint`.
38    NewConnectionReplacing,
39    /// Nothing interesting happened regarding the state of the handler.
40    Nothing,
41    /// The handler received a non-`SYN` segment which does not belong to any existing
42    /// connection.
43    UnexpectedSegment,
44}
45
46/// Describes events which may occur when the handler writes packets.
47#[derive(Debug, PartialEq, Eq)]
48pub enum WriteEvent {
49    /// The local `Endpoint` transitioned to being done after this segment was written.
50    EndpointDone,
51    /// Nothing interesting happened.
52    Nothing,
53}
54
55/// Describes errors which may be encountered by the [`receive_packet`] method from
56/// [`TcpIPv4Handler`].
57///
58/// [`receive_packet`]: struct.TcpIPv4Handler.html#method.receive_packet
59/// [`TcpIPv4Handler`]: struct.TcpIPv4Handler.html
60#[derive(Debug, PartialEq, Eq, thiserror::Error, displaydoc::Display)]
61pub enum RecvError {
62    /// The inner segment has an invalid destination port.
63    InvalidPort,
64    /// The handler encountered an error while parsing the inner TCP segment: {0}
65    TcpSegment(#[from] TcpSegmentError),
66}
67
68/// Describes errors which may be encountered by the [`write_next_packet`] method from
69/// [`TcpIPv4Handler`].
70///
71/// [`write_next_packet`]: struct.TcpIPv4Handler.html#method.write_next_packet
72/// [`TcpIPv4Handler`]: struct.TcpIPv4Handler.html
73#[derive(Debug, PartialEq, Eq, thiserror::Error, displaydoc::Display)]
74pub enum WriteNextError {
75    /// There was an error while writing the contents of the IPv4 packet: {0}
76    IPv4Packet(#[from] IPv4PacketError),
77    /// There was an error while writing the contents of the inner TCP segment: {0}
78    TcpSegment(#[from] TcpSegmentError),
79}
80
81// Generally speaking, a TCP/IPv4 connection is identified using the four-tuple (src_addr, src_port,
82// dst_addr, dst_port). However, the IPv4 address and TCP port of the MMDS endpoint are fixed, so
83// we can get away with uniquely identifying connections using just the remote address and port.
84#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
85struct ConnectionTuple {
86    remote_addr: Ipv4Addr,
87    remote_port: u16,
88}
89
90impl ConnectionTuple {
91    fn new(remote_addr: Ipv4Addr, remote_port: u16) -> Self {
92        ConnectionTuple {
93            remote_addr,
94            remote_port,
95        }
96    }
97}
98
99/// Implements a minimalist TCP over IPv4 listener.
100///
101/// Forwards incoming TCP segments to the appropriate connection object, based on the associated
102/// tuple, or attempts to establish new connections (when receiving `SYN` segments). Aside from
103/// constructors, the handler operation is based on three methods:
104///
105/// * [`receive_packet`] examines an incoming IPv4 packet. It checks whether the destination address
106///   is correct, the attempts examine the inner TCP segment, making sure the destination port
107///   number is also correct. Then, it steers valid segments towards exiting connections, creates
108///   new connections for incoming `SYN` segments, and enqueues `RST` replies in response to any
109///   segments which cannot be associated with a connection (except other `RST` segments). On
110///   success, also describes any internal status changes triggered by the reception of the packet.
111/// * [`write_next_packet`] writes the next IPv4 packet (if available) that would be sent by the
112///   handler itself (right now it can only mean an enqueued `RST`), or one of the existing
113///   connections. On success, also describes any internal status changes triggered as the packet
114///   gets transmitted.
115/// * [`next_segment_status`] describes whether the handler can send a packet immediately, or after
116///   some retransmission timeout associated with a connection fires, or if there's nothing to send
117///   for the moment. This is used to determine whether it's appropriate to call
118///   [`write_next_packet`].
119///
120/// [`receive_packet`]: ../handler/struct.TcpIPv4Handler.html#method.receive_packet
121/// [`write_next_packet`]: ../handler/struct.TcpIPv4Handler.html#method.write_next_packet
122/// [`next_segment_status`]: ../handler/struct.TcpIPv4Handler.html#method.next_segment_status
123#[derive(Debug)]
124pub struct TcpIPv4Handler {
125    // Handler IPv4 address used for every connection.
126    local_ipv4_addr: Ipv4Addr,
127    // Handler TCP port used for every connection.
128    local_port: u16,
129    // This map holds the currently active endpoints, identified by their connection tuple.
130    connections: HashMap<ConnectionTuple, Endpoint>,
131    // Maximum number of concurrent connections we are willing to handle.
132    max_connections: NonZeroUsize,
133    // Holds connections which are able to send segments immediately.
134    active_connections: HashSet<ConnectionTuple>,
135    // Remembers the closest timestamp into the future when one of the connections has to deal
136    // with an RTO trigger.
137    next_timeout: Option<(u64, ConnectionTuple)>,
138    // RST segments awaiting to be sent.
139    rst_queue: Vec<(ConnectionTuple, RstConfig)>,
140    // Maximum size of the RST queue.
141    max_pending_resets: NonZeroUsize,
142}
143
144// Only used locally, in the receive_packet method, to differentiate between different outcomes
145// associated with processing incoming packets.
146#[derive(Debug)]
147enum RecvSegmentOutcome {
148    EndpointDone,
149    EndpointRunning(NextSegmentStatus),
150    NewConnection,
151    UnexpectedSegment(bool),
152}
153
154impl TcpIPv4Handler {
155    /// Creates a new `TcpIPv4Handler`.
156    ///
157    /// The handler acts as if bound to `local_addr`:`local_port`, and will accept at most
158    /// `max_connections` concurrent connections. `RST` segments generated by unexpected incoming
159    /// segments are placed in a queue which is at most `max_pending_resets` long.
160    #[inline]
161    pub fn new(
162        local_ipv4_addr: Ipv4Addr,
163        local_port: u16,
164        max_connections: NonZeroUsize,
165        max_pending_resets: NonZeroUsize,
166    ) -> Self {
167        TcpIPv4Handler {
168            local_ipv4_addr,
169            local_port,
170            connections: HashMap::with_capacity(max_connections.get()),
171            max_connections,
172            active_connections: HashSet::with_capacity(max_connections.get()),
173            next_timeout: None,
174            rst_queue: Vec::with_capacity(max_pending_resets.get()),
175            max_pending_resets,
176        }
177    }
178
179    /// Setter for the local IPv4 address of this TCP handler.
180    pub fn set_local_ipv4_addr(&mut self, ipv4_addr: Ipv4Addr) {
181        self.local_ipv4_addr = ipv4_addr;
182    }
183
184    /// Returns the local IPv4 address of this TCP handler.
185    pub fn local_ipv4_addr(&self) -> Ipv4Addr {
186        self.local_ipv4_addr
187    }
188
189    /// Returns the local port of this TCP handler.
190    pub fn local_port(&self) -> u16 {
191        self.local_port
192    }
193
194    /// Returns the max connections of this TCP handler.
195    pub fn max_connections(&self) -> NonZeroUsize {
196        self.max_connections
197    }
198
199    /// Returns the max pending resets of this TCP handler.
200    pub fn max_pending_resets(&self) -> NonZeroUsize {
201        self.max_pending_resets
202    }
203
204    /// Contains logic for handling incoming segments.
205    ///
206    /// Any changes to the state of the handler are communicated through an `Ok(RecvEvent)`.
207    pub fn receive_packet<T: NetworkBytes + Debug, F: FnOnce(Request) -> Response>(
208        &mut self,
209        packet: &IPv4Packet<T>,
210        callback: F,
211    ) -> Result<RecvEvent, RecvError> {
212        // TODO: We skip verifying the checksum, just in case the device model relies on offloading
213        // checksum computation from the guest to some other entity. Clear this up at some point!
214        // (Issue #520)
215        let segment = TcpSegment::from_bytes(packet.payload(), None)?;
216
217        if segment.destination_port() != self.local_port {
218            return Err(RecvError::InvalidPort);
219        }
220
221        let tuple = ConnectionTuple::new(packet.source_address(), segment.source_port());
222
223        let outcome = if let Some(endpoint) = self.connections.get_mut(&tuple) {
224            endpoint.receive_segment(&segment, callback);
225            if endpoint.is_done() {
226                RecvSegmentOutcome::EndpointDone
227            } else {
228                RecvSegmentOutcome::EndpointRunning(endpoint.next_segment_status())
229            }
230        } else if segment.flags_after_ns() == TcpFlags::SYN {
231            RecvSegmentOutcome::NewConnection
232        } else {
233            // We should send a RST for every non-RST unexpected segment we receive.
234            RecvSegmentOutcome::UnexpectedSegment(
235                !segment.flags_after_ns().intersects(TcpFlags::RST),
236            )
237        };
238
239        match outcome {
240            RecvSegmentOutcome::EndpointDone => {
241                self.remove_connection(tuple);
242                Ok(RecvEvent::EndpointDone)
243            }
244            RecvSegmentOutcome::EndpointRunning(status) => {
245                if !self.check_next_segment_status(tuple, status) {
246                    // The connection may not have been a member of active_connection, but it's
247                    // more straightforward to cover both cases this way.
248                    self.active_connections.remove(&tuple);
249                }
250                Ok(RecvEvent::Nothing)
251            }
252            RecvSegmentOutcome::NewConnection => {
253                let endpoint = match Endpoint::new_with_defaults(&segment) {
254                    Ok(endpoint) => endpoint,
255                    Err(_) => return Ok(RecvEvent::FailedNewConnection),
256                };
257
258                if self.connections.len() >= self.max_connections.get() {
259                    if let Some(evict_tuple) = self.find_evictable_connection() {
260                        let rst_config = self.connections[&evict_tuple]
261                            .connection()
262                            .make_rst_config();
263                        self.enqueue_rst_config(evict_tuple, rst_config);
264                        self.remove_connection(evict_tuple);
265                        self.add_connection(tuple, endpoint);
266                        Ok(RecvEvent::NewConnectionReplacing)
267                    } else {
268                        // No room to accept the new connection. Try to enqueue a RST, and forget
269                        // about it.
270                        self.enqueue_rst(tuple, &segment);
271                        Ok(RecvEvent::NewConnectionDropped)
272                    }
273                } else {
274                    self.add_connection(tuple, endpoint);
275                    Ok(RecvEvent::NewConnectionSuccessful)
276                }
277            }
278            RecvSegmentOutcome::UnexpectedSegment(enqueue_rst) => {
279                if enqueue_rst {
280                    self.enqueue_rst(tuple, &segment);
281                }
282                Ok(RecvEvent::UnexpectedSegment)
283            }
284        }
285    }
286
287    fn check_timeout(&mut self, value: u64, tuple: ConnectionTuple) {
288        match self.next_timeout {
289            Some((t, _)) if t > value => self.next_timeout = Some((value, tuple)),
290            None => self.next_timeout = Some((value, tuple)),
291            _ => (),
292        };
293    }
294
295    fn find_next_timeout(&mut self) {
296        let mut next_timeout = None;
297        for (tuple, endpoint) in self.connections.iter() {
298            if let NextSegmentStatus::Timeout(value) = endpoint.next_segment_status() {
299                if let Some((t, _)) = next_timeout {
300                    if t > value {
301                        next_timeout = Some((value, *tuple));
302                    }
303                } else {
304                    next_timeout = Some((value, *tuple));
305                }
306            }
307        }
308        self.next_timeout = next_timeout;
309    }
310
311    // Returns true if the endpoint has been added to the set of active connections (it may have
312    // been there already).
313    fn check_next_segment_status(
314        &mut self,
315        tuple: ConnectionTuple,
316        status: NextSegmentStatus,
317    ) -> bool {
318        if let Some((_, timeout_tuple)) = self.next_timeout
319            && tuple == timeout_tuple
320        {
321            self.find_next_timeout();
322        }
323        match status {
324            NextSegmentStatus::Available => {
325                self.active_connections.insert(tuple);
326                return true;
327            }
328            NextSegmentStatus::Timeout(value) => self.check_timeout(value, tuple),
329            NextSegmentStatus::Nothing => (),
330        };
331
332        false
333    }
334
335    fn add_connection(&mut self, tuple: ConnectionTuple, endpoint: Endpoint) {
336        self.check_next_segment_status(tuple, endpoint.next_segment_status());
337        self.connections.insert(tuple, endpoint);
338    }
339
340    fn remove_connection(&mut self, tuple: ConnectionTuple) {
341        // Just in case it's in there somewhere.
342        self.active_connections.remove(&tuple);
343        self.connections.remove(&tuple);
344
345        if let Some((_, timeout_tuple)) = self.next_timeout
346            && timeout_tuple == tuple
347        {
348            self.find_next_timeout();
349        }
350    }
351
352    // TODO: I guess this should be refactored at some point to also remove the endpoint if found.
353    fn find_evictable_connection(&self) -> Option<ConnectionTuple> {
354        for (tuple, endpoint) in self.connections.iter() {
355            if endpoint.is_evictable() {
356                return Some(*tuple);
357            }
358        }
359        None
360    }
361
362    fn enqueue_rst_config(&mut self, tuple: ConnectionTuple, cfg: RstConfig) {
363        // We simply forgo sending any RSTs if the queue is already full.
364        if self.rst_queue.len() < self.max_pending_resets.get() {
365            self.rst_queue.push((tuple, cfg));
366        }
367    }
368
369    fn enqueue_rst<T: NetworkBytes + Debug>(&mut self, tuple: ConnectionTuple, s: &TcpSegment<T>) {
370        self.enqueue_rst_config(tuple, RstConfig::new(s));
371    }
372
373    /// Attempts to write one packet, from either the `RST` queue or one of the existing endpoints,
374    /// to `buf`.
375    ///
376    /// On success, the function returns a pair containing an `Option<NonZeroUsize>` and a
377    /// `WriteEvent`. The options represents how many bytes have been written to `buf`, or
378    /// that no packet can be send presently (when equal to `None`). The `WriteEvent` describes
379    /// whether any noteworthy state changes are associated with the write.
380    pub fn write_next_packet(
381        &mut self,
382        buf: &mut [u8],
383    ) -> Result<(Option<NonZeroUsize>, WriteEvent), WriteNextError> {
384        let mut len = None;
385        let mut writer_status = None;
386        let mut event = WriteEvent::Nothing;
387
388        // Write an incomplete Ipv4 packet and complete it afterwards with missing information.
389        let mut packet =
390            IPv4Packet::write_header(buf, PROTOCOL_TCP, Ipv4Addr::LOCALHOST, Ipv4Addr::LOCALHOST)?;
391
392        // We set mss_used to 0, because we don't add any IP options.
393        // TODO: Maybe get this nicely from packet at some point.
394        let mss_reserved = 0;
395
396        // We prioritize sending RSTs for now. The 10000 value for window size is just an arbitrary
397        // number, and using mss_remaining = 0 is perfectly fine in this case, because we don't add
398        // any TCP options, or a payload.
399        if let Some((tuple, rst_cfg)) = self.rst_queue.pop() {
400            let (seq, ack, flags_after_ns) = rst_cfg.seq_ack_tcp_flags();
401            let segment_len = TcpSegment::write_incomplete_segment::<[u8]>(
402                packet.inner_mut().payload_mut(),
403                seq,
404                ack,
405                flags_after_ns,
406                10000,
407                None,
408                0,
409                None,
410            )?
411            .finalize(
412                self.local_port,
413                tuple.remote_port,
414                Some((self.local_ipv4_addr, tuple.remote_addr)),
415            )
416            .len();
417
418            packet
419                .inner_mut()
420                .set_source_address(self.local_ipv4_addr)
421                .set_destination_address(tuple.remote_addr);
422
423            let packet_len = packet.with_payload_len_unchecked(segment_len, true).len();
424            // The unwrap() is safe because packet_len > 0.
425            return Ok((
426                Some(NonZeroUsize::new(packet_len).unwrap()),
427                WriteEvent::Nothing,
428            ));
429        }
430
431        for tuple in self
432            .active_connections
433            .iter()
434            .chain(self.next_timeout.as_ref().map(|(_, x)| x))
435        {
436            // Tuples in self.active_connection or self.next_timeout should also appear as keys
437            // in self.connections.
438            let endpoint = self.connections.get_mut(tuple).unwrap();
439            // We need this block to clearly delimit the lifetime of the mutable borrow started by
440            // the following packet.inner_mut().
441            let segment_len = {
442                let maybe_segment =
443                    endpoint.write_next_segment(packet.inner_mut().payload_mut(), mss_reserved);
444
445                match maybe_segment {
446                    Some(segment) => segment
447                        .finalize(
448                            self.local_port,
449                            tuple.remote_port,
450                            Some((self.local_ipv4_addr, tuple.remote_addr)),
451                        )
452                        .len(),
453                    None => continue,
454                }
455            };
456
457            packet
458                .inner_mut()
459                .set_source_address(self.local_ipv4_addr)
460                .set_destination_address(tuple.remote_addr);
461
462            let ip_len = packet.with_payload_len_unchecked(segment_len, true).len();
463
464            // The unwrap is safe because ip_len > 0.
465            len = Some(NonZeroUsize::new(ip_len).unwrap());
466            writer_status = Some((*tuple, endpoint.is_done()));
467
468            break;
469        }
470
471        if let Some((tuple, is_done)) = writer_status {
472            if is_done {
473                self.remove_connection(tuple);
474                event = WriteEvent::EndpointDone;
475            } else {
476                // The unwrap is safe because tuple is present as a key in self.connections if we
477                // got here.
478                let status = self.connections[&tuple].next_segment_status();
479                if !self.check_next_segment_status(tuple, status) {
480                    self.active_connections.remove(&tuple);
481                }
482            }
483        }
484
485        Ok((len, event))
486    }
487
488    /// Describes the status of the next segment to be sent by the handler.
489    #[inline]
490    pub fn next_segment_status(&self) -> NextSegmentStatus {
491        if !self.active_connections.is_empty() || !self.rst_queue.is_empty() {
492            return NextSegmentStatus::Available;
493        }
494
495        if let Some((value, _)) = self.next_timeout {
496            return NextSegmentStatus::Timeout(value);
497        }
498
499        NextSegmentStatus::Nothing
500    }
501}
502
503#[cfg(test)]
504mod tests {
505    use std::fmt::Debug;
506
507    use super::*;
508    use crate::dumbo::pdu::bytes::NetworkBytesMut;
509    use crate::dumbo::tcp::tests::mock_callback;
510
511    fn inner_tcp_mut<'a, T: NetworkBytesMut + Debug>(
512        p: &'a mut IPv4Packet<'_, T>,
513    ) -> TcpSegment<'a, &'a mut [u8]> {
514        TcpSegment::from_bytes(p.payload_mut(), None).unwrap()
515    }
516
517    #[allow(clippy::type_complexity)]
518    fn write_next<'a>(
519        h: &mut TcpIPv4Handler,
520        buf: &'a mut [u8],
521    ) -> Result<(Option<IPv4Packet<'a, &'a mut [u8]>>, WriteEvent), WriteNextError> {
522        h.write_next_packet(buf).map(|(o, err)| {
523            (
524                o.map(move |len| {
525                    let len = len.get();
526                    IPv4Packet::from_bytes(buf.split_at_mut(len).0, true).unwrap()
527                }),
528                err,
529            )
530        })
531    }
532
533    fn next_written_segment<'a>(
534        h: &mut TcpIPv4Handler,
535        buf: &'a mut [u8],
536        expected_event: WriteEvent,
537    ) -> TcpSegment<'a, &'a mut [u8]> {
538        let (segment_start, segment_end) = {
539            let (o, event) = write_next(h, buf).unwrap();
540            assert_eq!(event, expected_event);
541            let p = o.unwrap();
542            (p.header_len(), p.len())
543        };
544
545        TcpSegment::from_bytes(&mut buf[segment_start.into()..segment_end], None).unwrap()
546    }
547
548    // Calls write_next_packet until either an error occurs, or there's nothing left to send.
549    // When successful, returns how many packets were written. The remote_addr argument is used
550    // to check the packets are sent to the appropriate destination.
551    fn drain_packets(
552        h: &mut TcpIPv4Handler,
553        src_addr: Ipv4Addr,
554        remote_addr: Ipv4Addr,
555    ) -> Result<usize, WriteNextError> {
556        let mut buf = [0u8; 2000];
557        let mut count: usize = 0;
558        loop {
559            let (o, _) = write_next(h, buf.as_mut())?;
560            if let Some(packet) = o {
561                count += 1;
562                assert_eq!(packet.source_address(), src_addr);
563                assert_eq!(packet.destination_address(), remote_addr);
564            } else {
565                break;
566            }
567        }
568        Ok(count)
569    }
570
571    #[test]
572    #[allow(clippy::cognitive_complexity)]
573    fn test_handler() {
574        let mut buf = [0u8; 100];
575        let mut buf2 = [0u8; 2000];
576
577        let wrong_local_addr = Ipv4Addr::new(123, 123, 123, 123);
578        let local_addr = Ipv4Addr::new(169, 254, 169, 254);
579        let local_port = 80;
580        let remote_addr = Ipv4Addr::new(10, 0, 0, 1);
581        let remote_port = 1012;
582        let max_connections = 2;
583        let max_pending_resets = 2;
584
585        let mut h = TcpIPv4Handler::new(
586            local_addr,
587            local_port,
588            NonZeroUsize::new(max_connections).unwrap(),
589            NonZeroUsize::new(max_pending_resets).unwrap(),
590        );
591
592        // We start with a wrong destination address and destination port to check those error
593        // conditions first.
594        let mut p =
595            IPv4Packet::write_header(buf.as_mut(), PROTOCOL_TCP, remote_addr, wrong_local_addr)
596                .unwrap();
597
598        let seq_number = 123;
599
600        let s_len = {
601            // We're going to use this simple segment to test stuff.
602            let s = TcpSegment::write_segment::<[u8]>(
603                p.inner_mut().payload_mut(),
604                remote_port,
605                // We use the wrong port here initially, to trigger an error.
606                local_port + 1,
607                seq_number,
608                456,
609                TcpFlags::empty(),
610                10000,
611                None,
612                100,
613                None,
614                None,
615            )
616            .unwrap();
617            s.len()
618        };
619
620        // The handler should have nothing to send at this point.
621        assert_eq!(h.next_segment_status(), NextSegmentStatus::Nothing);
622        assert_eq!(drain_packets(&mut h, local_addr, remote_addr), Ok(0));
623
624        let mut p = p.with_payload_len_unchecked(s_len, false);
625
626        p.set_destination_address(local_addr);
627        assert_eq!(
628            h.receive_packet(&p, mock_callback).unwrap_err(),
629            RecvError::InvalidPort
630        );
631
632        // Let's fix the port. However, the segment is not a valid SYN, so we should get an
633        // UnexpectedSegment status, and the handler should write a RST.
634        assert_eq!(h.rst_queue.len(), 0);
635        inner_tcp_mut(&mut p).set_destination_port(local_port);
636        assert_eq!(
637            h.receive_packet(&p, mock_callback),
638            Ok(RecvEvent::UnexpectedSegment)
639        );
640        assert_eq!(h.rst_queue.len(), 1);
641        assert_eq!(h.next_segment_status(), NextSegmentStatus::Available);
642        {
643            let s = next_written_segment(&mut h, buf2.as_mut(), WriteEvent::Nothing);
644            assert!(s.flags_after_ns().intersects(TcpFlags::RST));
645            assert_eq!(s.source_port(), local_port);
646            assert_eq!(s.destination_port(), remote_port);
647        }
648
649        assert_eq!(h.rst_queue.len(), 0);
650        assert_eq!(h.next_segment_status(), NextSegmentStatus::Nothing);
651
652        // Let's check we can only enqueue max_pending_resets resets.
653        assert_eq!(
654            h.receive_packet(&p, mock_callback),
655            Ok(RecvEvent::UnexpectedSegment)
656        );
657        assert_eq!(h.rst_queue.len(), 1);
658        assert_eq!(
659            h.receive_packet(&p, mock_callback),
660            Ok(RecvEvent::UnexpectedSegment)
661        );
662        assert_eq!(h.rst_queue.len(), 2);
663        assert_eq!(
664            h.receive_packet(&p, mock_callback),
665            Ok(RecvEvent::UnexpectedSegment)
666        );
667        assert_eq!(h.rst_queue.len(), 2);
668
669        // Drain the resets.
670        assert_eq!(h.next_segment_status(), NextSegmentStatus::Available);
671        assert_eq!(drain_packets(&mut h, local_addr, remote_addr), Ok(2));
672        assert_eq!(h.next_segment_status(), NextSegmentStatus::Nothing);
673
674        // Ok now let's send a valid SYN.
675        assert_eq!(h.connections.len(), 0);
676        inner_tcp_mut(&mut p).set_flags_after_ns(TcpFlags::SYN);
677        assert_eq!(
678            h.receive_packet(&p, mock_callback),
679            Ok(RecvEvent::NewConnectionSuccessful)
680        );
681        assert_eq!(h.connections.len(), 1);
682        assert_eq!(h.active_connections.len(), 1);
683
684        // Let's immediately send a RST to the newly initiated connection. This should
685        // terminate it.
686        inner_tcp_mut(&mut p)
687            .set_flags_after_ns(TcpFlags::RST)
688            .set_sequence_number(seq_number.wrapping_add(1));
689        assert_eq!(
690            h.receive_packet(&p, mock_callback),
691            Ok(RecvEvent::EndpointDone)
692        );
693        assert_eq!(h.connections.len(), 0);
694        assert_eq!(h.active_connections.len(), 0);
695
696        // Now, let's restore the previous SYN, and resend it to initiate a connection.
697        inner_tcp_mut(&mut p)
698            .set_flags_after_ns(TcpFlags::SYN)
699            .set_sequence_number(seq_number);
700        assert_eq!(
701            h.receive_packet(&p, mock_callback),
702            Ok(RecvEvent::NewConnectionSuccessful)
703        );
704        assert_eq!(h.connections.len(), 1);
705        assert_eq!(h.active_connections.len(), 1);
706
707        // There will be a SYNACK in response.
708        assert_eq!(h.next_segment_status(), NextSegmentStatus::Available);
709        assert_eq!(drain_packets(&mut h, local_addr, remote_addr), Ok(1));
710
711        let remote_tuple = ConnectionTuple::new(remote_addr, remote_port);
712        let remote_tuple2 = ConnectionTuple::new(remote_addr, remote_port + 1);
713
714        // Also, there should be a retransmission timer associated with the previous SYNACK now.
715        assert_eq!(h.active_connections.len(), 0);
716        let old_timeout_value = if let Some((t, tuple)) = h.next_timeout {
717            assert_eq!(tuple, remote_tuple);
718            t
719        } else {
720            panic!("missing first expected timeout");
721        };
722
723        // Using the same SYN again will route the packet to the previous connection, and not
724        // create a new one.
725        assert_eq!(h.receive_packet(&p, mock_callback), Ok(RecvEvent::Nothing));
726        assert_eq!(h.connections.len(), 1);
727        // SYNACK retransmission.
728        assert_eq!(drain_packets(&mut h, local_addr, remote_addr), Ok(1));
729
730        // The timeout value should've gotten updated.
731        assert_eq!(h.active_connections.len(), 0);
732        if let Some((t, tuple)) = h.next_timeout {
733            assert_eq!(tuple, remote_tuple);
734            // The current Endpoint implementation gets timestamps using timestamp_cycles(), which
735            // increases VERY fast so the following inequality is guaranteed to be true. If the
736            // timestamp source gets coarser at some point, we might need an explicit wait before
737            // the previous h.receive_packet() :-s
738            assert!(t > old_timeout_value);
739        } else {
740            panic!("missing second expected timeout");
741        };
742
743        // Let's ACK the SYNACK.
744        {
745            let seq = h.connections[&remote_tuple].connection().first_not_sent().0;
746            inner_tcp_mut(&mut p)
747                .set_flags_after_ns(TcpFlags::ACK)
748                .set_ack_number(seq);
749            assert_eq!(h.receive_packet(&p, mock_callback), Ok(RecvEvent::Nothing));
750        }
751
752        // There should be no more active connections now, and also no pending timeout.
753        assert_eq!(h.active_connections.len(), 0);
754        assert_eq!(h.next_timeout, None);
755
756        // Make p a SYN packet again.
757        inner_tcp_mut(&mut p).set_flags_after_ns(TcpFlags::SYN);
758
759        // Create a new connection, from a different remote_port.
760        inner_tcp_mut(&mut p).set_source_port(remote_port + 1);
761        assert_eq!(
762            h.receive_packet(&p, mock_callback),
763            Ok(RecvEvent::NewConnectionSuccessful)
764        );
765        assert_eq!(h.connections.len(), 2);
766        assert_eq!(h.active_connections.len(), 1);
767        // SYNACK
768        assert_eq!(drain_packets(&mut h, local_addr, remote_addr), Ok(1));
769
770        // The timeout associated with the SYNACK of the second connection should be next.
771        assert_eq!(h.active_connections.len(), 0);
772        if let Some((_, tuple)) = h.next_timeout {
773            assert_ne!(tuple, ConnectionTuple::new(remote_addr, remote_port));
774        } else {
775            panic!("missing third expected timeout");
776        }
777
778        // No more room for another one.
779        {
780            let port = remote_port + 2;
781            inner_tcp_mut(&mut p).set_source_port(port);
782            assert_eq!(
783                h.receive_packet(&p, mock_callback),
784                Ok(RecvEvent::NewConnectionDropped)
785            );
786            assert_eq!(h.connections.len(), 2);
787
788            // We should get a RST.
789            assert_eq!(h.rst_queue.len(), 1);
790            let s = next_written_segment(&mut h, buf2.as_mut(), WriteEvent::Nothing);
791            assert!(s.flags_after_ns().intersects(TcpFlags::RST));
792            assert_eq!(s.destination_port(), port);
793        }
794
795        // Let's make the second endpoint evictable.
796        h.connections
797            .get_mut(&remote_tuple2)
798            .unwrap()
799            .set_eviction_threshold(0);
800
801        // The new connection will replace the old one.
802        assert_eq!(
803            h.receive_packet(&p, mock_callback),
804            Ok(RecvEvent::NewConnectionReplacing)
805        );
806        assert_eq!(h.connections.len(), 2);
807        assert_eq!(h.active_connections.len(), 1);
808
809        // One SYNACK for the new connection, and one RST for the old one.
810        assert_eq!(h.rst_queue.len(), 1);
811        assert_eq!(drain_packets(&mut h, local_addr, remote_addr), Ok(2));
812        assert_eq!(h.rst_queue.len(), 0);
813        assert_eq!(h.active_connections.len(), 0);
814
815        // Let's send another SYN to the first connection. This should make it reappear among the
816        // active connections (because it will have a RST to send), and then cause it to be removed
817        // altogether after sending the RST (because is_done() will be true).
818        inner_tcp_mut(&mut p).set_source_port(remote_port);
819        assert_eq!(h.receive_packet(&p, mock_callback), Ok(RecvEvent::Nothing));
820        assert_eq!(h.active_connections.len(), 1);
821        assert_eq!(drain_packets(&mut h, local_addr, remote_addr), Ok(1));
822        assert_eq!(h.connections.len(), 1);
823        assert_eq!(h.active_connections.len(), 0);
824    }
825}