vmm/devices/virtio/vsock/
event_handler.rs

1// Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Portions Copyright 2017 The Chromium OS Authors. All rights reserved.
5// Use of this source code is governed by a BSD-style license that can be
6// found in the THIRD-PARTY file.
7
8use std::fmt::Debug;
9
10/// The vsock object implements the runtime logic of our vsock device:
11/// 1. Respond to TX queue events by wrapping virtio buffers into `VsockPacket`s, then sending
12///    those packets to the `VsockBackend`;
13/// 2. Forward backend FD event notifications to the `VsockBackend`;
14/// 3. Fetch incoming packets from the `VsockBackend` and place them into the virtio RX queue;
15/// 4. Whenever we have processed some virtio buffers (either TX or RX), let the driver know by
16///    raising our assigned IRQ.
17///
18/// In a nutshell, the logic looks like this:
19/// - on TX queue event:
20///   - fetch all packets from the TX queue and send them to the backend; then
21///   - if the backend has queued up any incoming packets, fetch them into any available RX
22///     buffers.
23/// - on RX queue event:
24///   - fetch any incoming packets, queued up by the backend, into newly available RX buffers.
25/// - on backend event:
26///   - forward the event to the backend; then
27///   - again, attempt to fetch any incoming packets queued by the backend into virtio RX
28///     buffers.
29use event_manager::{EventOps, Events, MutEventSubscriber};
30use log::{error, warn};
31use vmm_sys_util::epoll::EventSet;
32
33use super::VsockBackend;
34use super::device::{EVQ_INDEX, RXQ_INDEX, TXQ_INDEX, Vsock};
35use crate::devices::virtio::device::VirtioDevice;
36use crate::devices::virtio::queue::InvalidAvailIdx;
37use crate::devices::virtio::vsock::defs::VSOCK_NUM_QUEUES;
38use crate::devices::virtio::vsock::metrics::METRICS;
39use crate::logger::IncMetric;
40
41impl<B> Vsock<B>
42where
43    B: Debug + VsockBackend + 'static,
44{
45    const PROCESS_ACTIVATE: u32 = 0;
46    const PROCESS_RXQ: u32 = 1;
47    const PROCESS_TXQ: u32 = 2;
48    const PROCESS_EVQ: u32 = 3;
49    const PROCESS_NOTIFY_BACKEND: u32 = 4;
50
51    pub fn handle_rxq_event(&mut self, evset: EventSet) -> Vec<u16> {
52        let mut used_queues = Vec::new();
53        if evset != EventSet::IN {
54            warn!("vsock: rxq unexpected event {:?}", evset);
55            METRICS.rx_queue_event_fails.inc();
56            return used_queues;
57        }
58
59        if let Err(err) = self.queue_events[RXQ_INDEX].read() {
60            error!("Failed to get vsock rx queue event: {:?}", err);
61            METRICS.rx_queue_event_fails.inc();
62        } else if self.backend.has_pending_rx() {
63            if self.process_rx().unwrap() {
64                used_queues.push(RXQ_INDEX.try_into().unwrap());
65            }
66            METRICS.rx_queue_event_count.inc();
67        }
68        used_queues
69    }
70
71    pub fn handle_txq_event(&mut self, evset: EventSet) -> Vec<u16> {
72        let mut used_queues = Vec::new();
73        if evset != EventSet::IN {
74            warn!("vsock: txq unexpected event {:?}", evset);
75            METRICS.tx_queue_event_fails.inc();
76            return used_queues;
77        }
78
79        if let Err(err) = self.queue_events[TXQ_INDEX].read() {
80            error!("Failed to get vsock tx queue event: {:?}", err);
81            METRICS.tx_queue_event_fails.inc();
82        } else {
83            if self.process_tx().unwrap() {
84                used_queues.push(TXQ_INDEX.try_into().unwrap());
85            }
86            METRICS.tx_queue_event_count.inc();
87            // The backend may have queued up responses to the packets we sent during
88            // TX queue processing. If that happened, we need to fetch those responses
89            // and place them into RX buffers.
90            if self.backend.has_pending_rx() && self.process_rx().unwrap() {
91                used_queues.push(RXQ_INDEX.try_into().unwrap());
92            }
93        }
94        used_queues
95    }
96
97    pub fn handle_evq_event(&mut self, evset: EventSet) {
98        if evset != EventSet::IN {
99            warn!("vsock: evq unexpected event {:?}", evset);
100            METRICS.ev_queue_event_fails.inc();
101            return;
102        }
103
104        if let Err(err) = self.queue_events[EVQ_INDEX].read() {
105            error!("Failed to consume vsock evq event: {:?}", err);
106            METRICS.ev_queue_event_fails.inc();
107        }
108    }
109
110    /// Notify backend of new events.
111    pub fn notify_backend(&mut self, evset: EventSet) -> Result<Vec<u16>, InvalidAvailIdx> {
112        let mut used_queues = Vec::new();
113        self.backend.notify(evset);
114        // After the backend has been kicked, it might've freed up some resources, so we
115        // can attempt to send it more data to process.
116        // In particular, if `self.backend.send_pkt()` halted the TX queue processing (by
117        // returning an error) at some point in the past, now is the time to try walking the
118        // TX queue again.
119        if self.process_tx()? {
120            used_queues.push(TXQ_INDEX.try_into().unwrap());
121        }
122        if self.backend.has_pending_rx() && self.process_rx()? {
123            used_queues.push(RXQ_INDEX.try_into().unwrap())
124        }
125
126        Ok(used_queues)
127    }
128
129    fn register_runtime_events(&self, ops: &mut EventOps) {
130        if let Err(err) = ops.add(Events::with_data(
131            &self.queue_events[RXQ_INDEX],
132            Self::PROCESS_RXQ,
133            EventSet::IN,
134        )) {
135            error!("Failed to register rx queue event: {}", err);
136        }
137        if let Err(err) = ops.add(Events::with_data(
138            &self.queue_events[TXQ_INDEX],
139            Self::PROCESS_TXQ,
140            EventSet::IN,
141        )) {
142            error!("Failed to register tx queue event: {}", err);
143        }
144        if let Err(err) = ops.add(Events::with_data(
145            &self.queue_events[EVQ_INDEX],
146            Self::PROCESS_EVQ,
147            EventSet::IN,
148        )) {
149            error!("Failed to register ev queue event: {}", err);
150        }
151        if let Err(err) = ops.add(Events::with_data(
152            &self.backend,
153            Self::PROCESS_NOTIFY_BACKEND,
154            self.backend.get_polled_evset(),
155        )) {
156            error!("Failed to register vsock backend event: {}", err);
157        }
158    }
159
160    fn register_activate_event(&self, ops: &mut EventOps) {
161        if let Err(err) = ops.add(Events::with_data(
162            &self.activate_evt,
163            Self::PROCESS_ACTIVATE,
164            EventSet::IN,
165        )) {
166            error!("Failed to register activate event: {}", err);
167        }
168    }
169
170    fn handle_activate_event(&self, ops: &mut EventOps) {
171        if let Err(err) = self.activate_evt.read() {
172            error!("Failed to consume net activate event: {:?}", err);
173        }
174        self.register_runtime_events(ops);
175        if let Err(err) = ops.remove(Events::with_data(
176            &self.activate_evt,
177            Self::PROCESS_ACTIVATE,
178            EventSet::IN,
179        )) {
180            error!("Failed to un-register activate event: {}", err);
181        }
182    }
183}
184
185impl<B> MutEventSubscriber for Vsock<B>
186where
187    B: Debug + VsockBackend + 'static,
188{
189    fn process(&mut self, event: Events, ops: &mut EventOps) {
190        let source = event.data();
191        let evset = event.event_set();
192
193        if self.is_activated() {
194            let used_queues = match source {
195                Self::PROCESS_ACTIVATE => {
196                    self.handle_activate_event(ops);
197                    Vec::new()
198                }
199                Self::PROCESS_RXQ => self.handle_rxq_event(evset),
200                Self::PROCESS_TXQ => self.handle_txq_event(evset),
201                Self::PROCESS_EVQ => {
202                    self.handle_evq_event(evset);
203                    Vec::new()
204                }
205                Self::PROCESS_NOTIFY_BACKEND => self.notify_backend(evset).unwrap(),
206                _ => {
207                    warn!("Unexpected vsock event received: {:?}", source);
208                    Vec::new()
209                }
210            };
211            self.signal_used_queues(&used_queues)
212                .expect("vsock: Could not trigger device interrupt");
213        } else {
214            warn!(
215                "Vsock: The device is not yet activated. Spurious event received: {:?}",
216                source
217            );
218        }
219    }
220
221    fn init(&mut self, ops: &mut EventOps) {
222        // This function can be called during different points in the device lifetime:
223        //  - shortly after device creation,
224        //  - on device activation (is-activated already true at this point),
225        //  - on device restore from snapshot.
226        if self.is_activated() {
227            self.register_runtime_events(ops);
228        } else {
229            self.register_activate_event(ops);
230        }
231    }
232}
233
234#[cfg(test)]
235mod tests {
236    use std::sync::{Arc, Mutex};
237
238    use event_manager::{EventManager, SubscriberOps};
239
240    use super::super::*;
241    use super::*;
242    use crate::devices::virtio::vsock::test_utils::{EventHandlerContext, TestContext};
243
244    #[test]
245    fn test_txq_event() {
246        // Test case:
247        // - the driver has something to send (there's data in the TX queue); and
248        // - the backend has no pending RX data.
249        {
250            let test_ctx = TestContext::new();
251            let mut ctx = test_ctx.create_event_handler_context();
252            ctx.mock_activate(test_ctx.mem.clone(), test_ctx.interrupt.clone());
253
254            ctx.device.backend.set_pending_rx(false);
255            ctx.signal_txq_event();
256
257            // The available TX descriptor should have been used.
258            assert_eq!(ctx.guest_txvq.used.idx.get(), 1);
259            // The available RX descriptor should be untouched.
260            assert_eq!(ctx.guest_rxvq.used.idx.get(), 0);
261        }
262
263        // Test case:
264        // - the driver has something to send (there's data in the TX queue); and
265        // - the backend also has some pending RX data.
266        {
267            let test_ctx = TestContext::new();
268            let mut ctx = test_ctx.create_event_handler_context();
269            ctx.mock_activate(test_ctx.mem.clone(), test_ctx.interrupt.clone());
270
271            ctx.device.backend.set_pending_rx(true);
272            ctx.signal_txq_event();
273
274            // Both available RX and TX descriptors should have been used.
275            assert_eq!(ctx.guest_txvq.used.idx.get(), 1);
276            assert_eq!(ctx.guest_rxvq.used.idx.get(), 1);
277        }
278
279        // Test case:
280        // - the driver has something to send (there's data in the TX queue); and
281        // - the backend errors out and cannot process the TX queue.
282        {
283            let test_ctx = TestContext::new();
284            let mut ctx = test_ctx.create_event_handler_context();
285            ctx.mock_activate(test_ctx.mem.clone(), test_ctx.interrupt.clone());
286
287            ctx.device.backend.set_pending_rx(false);
288            ctx.device.backend.set_tx_err(Some(VsockError::NoData));
289            ctx.signal_txq_event();
290
291            // Both RX and TX queues should be untouched.
292            assert_eq!(ctx.guest_txvq.used.idx.get(), 0);
293            assert_eq!(ctx.guest_rxvq.used.idx.get(), 0);
294        }
295
296        // Test case:
297        // - the driver supplied a malformed TX buffer.
298        {
299            let test_ctx = TestContext::new();
300            let mut ctx = test_ctx.create_event_handler_context();
301            ctx.mock_activate(test_ctx.mem.clone(), test_ctx.interrupt.clone());
302
303            // Invalidate the descriptor chain, by setting its length to 0.
304            ctx.guest_txvq.dtable[0].len.set(0);
305            ctx.guest_txvq.dtable[1].len.set(0);
306            ctx.signal_txq_event();
307
308            // The available descriptor should have been consumed, but no packet should have
309            // reached the backend.
310            assert_eq!(ctx.guest_txvq.used.idx.get(), 1);
311            assert_eq!(ctx.device.backend.tx_ok_cnt, 0);
312        }
313
314        // Test case: spurious TXQ_EVENT.
315        {
316            let test_ctx = TestContext::new();
317            let mut ctx = test_ctx.create_event_handler_context();
318            ctx.mock_activate(test_ctx.mem.clone(), test_ctx.interrupt.clone());
319
320            let metric_before = METRICS.tx_queue_event_fails.count();
321            ctx.device.handle_txq_event(EventSet::IN);
322            assert_eq!(metric_before + 1, METRICS.tx_queue_event_fails.count());
323        }
324    }
325
326    #[test]
327    fn test_rxq_event() {
328        // Test case:
329        // - there is pending RX data in the backend; and
330        // - the driver makes RX buffers available; and
331        // - the backend successfully places its RX data into the queue.
332        {
333            let test_ctx = TestContext::new();
334            let mut ctx = test_ctx.create_event_handler_context();
335            ctx.mock_activate(test_ctx.mem.clone(), test_ctx.interrupt.clone());
336
337            ctx.device.backend.set_pending_rx(true);
338            ctx.device.backend.set_rx_err(Some(VsockError::NoData));
339            ctx.signal_rxq_event();
340
341            // The available RX buffer should've been left untouched.
342            assert_eq!(ctx.guest_rxvq.used.idx.get(), 0);
343        }
344
345        // Test case:
346        // - there is pending RX data in the backend; and
347        // - the driver makes RX buffers available; and
348        // - the backend errors out, when attempting to receive data.
349        {
350            let test_ctx = TestContext::new();
351            let mut ctx = test_ctx.create_event_handler_context();
352            ctx.mock_activate(test_ctx.mem.clone(), test_ctx.interrupt.clone());
353
354            ctx.device.backend.set_pending_rx(true);
355            ctx.signal_rxq_event();
356
357            // The available RX buffer should have been used.
358            assert_eq!(ctx.guest_rxvq.used.idx.get(), 1);
359        }
360
361        // Test case: the driver provided a malformed RX descriptor chain.
362        {
363            let test_ctx = TestContext::new();
364            let mut ctx = test_ctx.create_event_handler_context();
365            ctx.mock_activate(test_ctx.mem.clone(), test_ctx.interrupt.clone());
366
367            // Invalidate the descriptor chain, by setting its length to 0.
368            ctx.guest_rxvq.dtable[0].len.set(0);
369            ctx.guest_rxvq.dtable[1].len.set(0);
370
371            // The chain should've been processed, without employing the backend.
372            assert!(ctx.device.process_rx().unwrap());
373            assert_eq!(ctx.guest_rxvq.used.idx.get(), 1);
374            assert_eq!(ctx.device.backend.rx_ok_cnt, 0);
375        }
376
377        // Test case: spurious RXQ_EVENT.
378        {
379            let test_ctx = TestContext::new();
380            let mut ctx = test_ctx.create_event_handler_context();
381            ctx.mock_activate(test_ctx.mem.clone(), test_ctx.interrupt.clone());
382            ctx.device.backend.set_pending_rx(false);
383            let metric_before = METRICS.rx_queue_event_fails.count();
384            ctx.device.handle_rxq_event(EventSet::IN);
385            assert_eq!(metric_before + 1, METRICS.rx_queue_event_fails.count());
386        }
387    }
388
389    #[test]
390    fn test_evq_event() {
391        // Test case: spurious EVQ_EVENT.
392        {
393            let test_ctx = TestContext::new();
394            let mut ctx = test_ctx.create_event_handler_context();
395            ctx.device.backend.set_pending_rx(false);
396            let metric_before = METRICS.ev_queue_event_fails.count();
397            ctx.device.handle_evq_event(EventSet::IN);
398            assert_eq!(metric_before + 1, METRICS.ev_queue_event_fails.count());
399        }
400    }
401
402    #[test]
403    fn test_backend_event() {
404        // Test case:
405        // - a backend event is received; and
406        // - the backend has pending RX data.
407        {
408            let test_ctx = TestContext::new();
409            let mut ctx = test_ctx.create_event_handler_context();
410            ctx.mock_activate(test_ctx.mem.clone(), test_ctx.interrupt.clone());
411
412            ctx.device.backend.set_pending_rx(true);
413            ctx.device.notify_backend(EventSet::IN).unwrap();
414
415            // The backend should've received this event.
416            assert_eq!(ctx.device.backend.evset, Some(EventSet::IN));
417            // TX queue processing should've been triggered.
418            assert_eq!(ctx.guest_txvq.used.idx.get(), 1);
419            // RX queue processing should've been triggered.
420            assert_eq!(ctx.guest_rxvq.used.idx.get(), 1);
421        }
422
423        // Test case:
424        // - a backend event is received; and
425        // - the backend doesn't have any pending RX data.
426        {
427            let test_ctx = TestContext::new();
428            let mut ctx = test_ctx.create_event_handler_context();
429            ctx.mock_activate(test_ctx.mem.clone(), test_ctx.interrupt.clone());
430
431            ctx.device.backend.set_pending_rx(false);
432            ctx.device.notify_backend(EventSet::IN).unwrap();
433
434            // The backend should've received this event.
435            assert_eq!(ctx.device.backend.evset, Some(EventSet::IN));
436            // TX queue processing should've been triggered.
437            assert_eq!(ctx.guest_txvq.used.idx.get(), 1);
438            // The RX queue should've been left untouched.
439            assert_eq!(ctx.guest_rxvq.used.idx.get(), 0);
440        }
441    }
442
443    // Creates an epoll handler context and attempts to assemble a VsockPkt from the descriptor
444    // chains available on the rx and tx virtqueues, but first it will set the addr and len
445    // of the descriptor specified by desc_idx to the provided values. We are only using this
446    // function for testing error cases, so the asserts always expect is_err() to be true. When
447    // desc_idx = 0 we are altering the header (first descriptor in the chain), and when
448    // desc_idx = 1 we are altering the packet buffer.
449    #[cfg(target_arch = "x86_64")]
450    fn vsock_bof_helper(test_ctx: &mut TestContext, desc_idx: usize, addr: u64, len: u32) {
451        use crate::vstate::memory::{Bytes, GuestAddress};
452
453        assert!(desc_idx <= 1);
454
455        {
456            let mut ctx = test_ctx.create_event_handler_context();
457            ctx.guest_rxvq.dtable[desc_idx].addr.set(addr);
458            ctx.guest_rxvq.dtable[desc_idx].len.set(len);
459            // If the descriptor chain is already declared invalid, there's no reason to assemble
460            // a packet.
461            if let Some(rx_desc) = ctx.device.queues[RXQ_INDEX].pop().unwrap() {
462                VsockPacketRx::new()
463                    .unwrap()
464                    .parse(&test_ctx.mem, rx_desc)
465                    .unwrap_err();
466            }
467        }
468
469        {
470            let mut ctx = test_ctx.create_event_handler_context();
471
472            // When modifying the buffer descriptor, make sure the len field is altered in the
473            // vsock packet header descriptor as well.
474            if desc_idx == 1 {
475                // The vsock packet len field has offset 24 in the header.
476                let hdr_len_addr = GuestAddress(ctx.guest_txvq.dtable[0].addr.get() + 24);
477                test_ctx
478                    .mem
479                    .write_obj(len.to_le_bytes(), hdr_len_addr)
480                    .unwrap();
481            }
482
483            ctx.guest_txvq.dtable[desc_idx].addr.set(addr);
484            ctx.guest_txvq.dtable[desc_idx].len.set(len);
485
486            if let Some(tx_desc) = ctx.device.queues[TXQ_INDEX].pop().unwrap() {
487                VsockPacketTx::default()
488                    .parse(&test_ctx.mem, tx_desc)
489                    .unwrap_err();
490            }
491        }
492    }
493
494    #[test]
495    #[cfg(target_arch = "x86_64")]
496    #[allow(clippy::cast_possible_truncation)] /* casting of constants we know fit into u32 */
497    fn test_vsock_bof() {
498        use crate::arch::x86_64::layout::FIRST_ADDR_PAST_32BITS;
499        use crate::arch::{MMIO32_MEM_SIZE, MMIO32_MEM_START};
500        use crate::devices::virtio::vsock::packet::VSOCK_PKT_HDR_SIZE;
501        use crate::test_utils::multi_region_mem;
502        use crate::utils::mib_to_bytes;
503        use crate::vstate::memory::GuestAddress;
504
505        const MIB: usize = mib_to_bytes(1);
506
507        let mut test_ctx = TestContext::new();
508        test_ctx.mem = multi_region_mem(&[
509            (GuestAddress(0), 8 * MIB),
510            (GuestAddress(MMIO32_MEM_START - MIB as u64), MIB),
511            (GuestAddress(FIRST_ADDR_PAST_32BITS), MIB),
512        ]);
513
514        // The default configured descriptor chains are valid.
515        {
516            let mut ctx = test_ctx.create_event_handler_context();
517            let rx_desc = ctx.device.queues[RXQ_INDEX].pop().unwrap().unwrap();
518            VsockPacketRx::new()
519                .unwrap()
520                .parse(&test_ctx.mem, rx_desc)
521                .unwrap();
522        }
523
524        {
525            let mut ctx = test_ctx.create_event_handler_context();
526            let tx_desc = ctx.device.queues[TXQ_INDEX].pop().unwrap().unwrap();
527            VsockPacketTx::default()
528                .parse(&test_ctx.mem, tx_desc)
529                .unwrap();
530        }
531
532        // Let's check what happens when the header descriptor is right before the gap.
533        vsock_bof_helper(&mut test_ctx, 0, MMIO32_MEM_START - 1, VSOCK_PKT_HDR_SIZE);
534
535        // Let's check what happens when the buffer descriptor crosses into the gap, but does
536        // not go past its right edge.
537        vsock_bof_helper(
538            &mut test_ctx,
539            1,
540            MMIO32_MEM_START - 4,
541            MMIO32_MEM_SIZE as u32 + 4,
542        );
543
544        // Let's modify the buffer descriptor addr and len such that it crosses over the MMIO gap,
545        // and check we cannot assemble the VsockPkts.
546        vsock_bof_helper(
547            &mut test_ctx,
548            1,
549            MMIO32_MEM_START - 4,
550            MMIO32_MEM_SIZE as u32 + 100,
551        );
552    }
553
554    #[test]
555    fn test_event_handler() {
556        let mut event_manager = EventManager::new().unwrap();
557        let test_ctx = TestContext::new();
558        let EventHandlerContext {
559            device,
560            guest_rxvq,
561            guest_txvq,
562            ..
563        } = test_ctx.create_event_handler_context();
564
565        let vsock = Arc::new(Mutex::new(device));
566        let _id = event_manager.add_subscriber(vsock.clone());
567
568        // Push a queue event
569        // - the driver has something to send (there's data in the TX queue); and
570        // - the backend also has some pending RX data.
571        {
572            let mut device = vsock.lock().unwrap();
573            device.backend.set_pending_rx(true);
574            device.queue_events[TXQ_INDEX].write(1).unwrap();
575        }
576
577        // EventManager should report no events since vsock has only registered
578        // its activation event so far (even though there is also a queue event pending).
579        let ev_count = event_manager.run_with_timeout(50).unwrap();
580        assert_eq!(ev_count, 0);
581
582        // Manually force a queue event and check it's ignored pre-activation.
583        {
584            let device = vsock.lock().unwrap();
585
586            // Artificially push event.
587            device.queue_events[TXQ_INDEX].write(1).unwrap();
588            let ev_count = event_manager.run_with_timeout(50).unwrap();
589            assert_eq!(ev_count, 0);
590
591            // Both available RX and TX descriptors should be untouched.
592            assert_eq!(guest_rxvq.used.idx.get(), 0);
593            assert_eq!(guest_txvq.used.idx.get(), 0);
594        }
595
596        // Now activate the device.
597        vsock
598            .lock()
599            .unwrap()
600            .activate(test_ctx.mem.clone(), test_ctx.interrupt.clone())
601            .unwrap();
602        // Process the activate event.
603        let ev_count = event_manager.run_with_timeout(50).unwrap();
604        assert_eq!(ev_count, 1);
605
606        // Handle the previously pushed queue event through EventManager.
607        {
608            let ev_count = event_manager
609                .run_with_timeout(100)
610                .expect("Metrics event timeout or error.");
611            assert_eq!(ev_count, 1);
612            // Both available RX and TX descriptors should have been used.
613            assert_eq!(guest_rxvq.used.idx.get(), 1);
614            assert_eq!(guest_txvq.used.idx.get(), 1);
615        }
616    }
617}