vmm/devices/virtio/vsock/
event_handler.rs1use std::fmt::Debug;
9
10use event_manager::{EventOps, Events, MutEventSubscriber};
30use log::{error, warn};
31use vmm_sys_util::epoll::EventSet;
32
33use super::VsockBackend;
34use super::device::{EVQ_INDEX, RXQ_INDEX, TXQ_INDEX, Vsock};
35use crate::devices::virtio::device::VirtioDevice;
36use crate::devices::virtio::queue::InvalidAvailIdx;
37use crate::devices::virtio::vsock::defs::VSOCK_NUM_QUEUES;
38use crate::devices::virtio::vsock::metrics::METRICS;
39use crate::logger::IncMetric;
40
41impl<B> Vsock<B>
42where
43 B: Debug + VsockBackend + 'static,
44{
45 const PROCESS_ACTIVATE: u32 = 0;
46 const PROCESS_RXQ: u32 = 1;
47 const PROCESS_TXQ: u32 = 2;
48 const PROCESS_EVQ: u32 = 3;
49 const PROCESS_NOTIFY_BACKEND: u32 = 4;
50
51 pub fn handle_rxq_event(&mut self, evset: EventSet) -> Vec<u16> {
52 let mut used_queues = Vec::new();
53 if evset != EventSet::IN {
54 warn!("vsock: rxq unexpected event {:?}", evset);
55 METRICS.rx_queue_event_fails.inc();
56 return used_queues;
57 }
58
59 if let Err(err) = self.queue_events[RXQ_INDEX].read() {
60 error!("Failed to get vsock rx queue event: {:?}", err);
61 METRICS.rx_queue_event_fails.inc();
62 } else if self.backend.has_pending_rx() {
63 if self.process_rx().unwrap() {
64 used_queues.push(RXQ_INDEX.try_into().unwrap());
65 }
66 METRICS.rx_queue_event_count.inc();
67 }
68 used_queues
69 }
70
71 pub fn handle_txq_event(&mut self, evset: EventSet) -> Vec<u16> {
72 let mut used_queues = Vec::new();
73 if evset != EventSet::IN {
74 warn!("vsock: txq unexpected event {:?}", evset);
75 METRICS.tx_queue_event_fails.inc();
76 return used_queues;
77 }
78
79 if let Err(err) = self.queue_events[TXQ_INDEX].read() {
80 error!("Failed to get vsock tx queue event: {:?}", err);
81 METRICS.tx_queue_event_fails.inc();
82 } else {
83 if self.process_tx().unwrap() {
84 used_queues.push(TXQ_INDEX.try_into().unwrap());
85 }
86 METRICS.tx_queue_event_count.inc();
87 if self.backend.has_pending_rx() && self.process_rx().unwrap() {
91 used_queues.push(RXQ_INDEX.try_into().unwrap());
92 }
93 }
94 used_queues
95 }
96
97 pub fn handle_evq_event(&mut self, evset: EventSet) {
98 if evset != EventSet::IN {
99 warn!("vsock: evq unexpected event {:?}", evset);
100 METRICS.ev_queue_event_fails.inc();
101 return;
102 }
103
104 if let Err(err) = self.queue_events[EVQ_INDEX].read() {
105 error!("Failed to consume vsock evq event: {:?}", err);
106 METRICS.ev_queue_event_fails.inc();
107 }
108 }
109
110 pub fn notify_backend(&mut self, evset: EventSet) -> Result<Vec<u16>, InvalidAvailIdx> {
112 let mut used_queues = Vec::new();
113 self.backend.notify(evset);
114 if self.process_tx()? {
120 used_queues.push(TXQ_INDEX.try_into().unwrap());
121 }
122 if self.backend.has_pending_rx() && self.process_rx()? {
123 used_queues.push(RXQ_INDEX.try_into().unwrap())
124 }
125
126 Ok(used_queues)
127 }
128
129 fn register_runtime_events(&self, ops: &mut EventOps) {
130 if let Err(err) = ops.add(Events::with_data(
131 &self.queue_events[RXQ_INDEX],
132 Self::PROCESS_RXQ,
133 EventSet::IN,
134 )) {
135 error!("Failed to register rx queue event: {}", err);
136 }
137 if let Err(err) = ops.add(Events::with_data(
138 &self.queue_events[TXQ_INDEX],
139 Self::PROCESS_TXQ,
140 EventSet::IN,
141 )) {
142 error!("Failed to register tx queue event: {}", err);
143 }
144 if let Err(err) = ops.add(Events::with_data(
145 &self.queue_events[EVQ_INDEX],
146 Self::PROCESS_EVQ,
147 EventSet::IN,
148 )) {
149 error!("Failed to register ev queue event: {}", err);
150 }
151 if let Err(err) = ops.add(Events::with_data(
152 &self.backend,
153 Self::PROCESS_NOTIFY_BACKEND,
154 self.backend.get_polled_evset(),
155 )) {
156 error!("Failed to register vsock backend event: {}", err);
157 }
158 }
159
160 fn register_activate_event(&self, ops: &mut EventOps) {
161 if let Err(err) = ops.add(Events::with_data(
162 &self.activate_evt,
163 Self::PROCESS_ACTIVATE,
164 EventSet::IN,
165 )) {
166 error!("Failed to register activate event: {}", err);
167 }
168 }
169
170 fn handle_activate_event(&self, ops: &mut EventOps) {
171 if let Err(err) = self.activate_evt.read() {
172 error!("Failed to consume net activate event: {:?}", err);
173 }
174 self.register_runtime_events(ops);
175 if let Err(err) = ops.remove(Events::with_data(
176 &self.activate_evt,
177 Self::PROCESS_ACTIVATE,
178 EventSet::IN,
179 )) {
180 error!("Failed to un-register activate event: {}", err);
181 }
182 }
183}
184
185impl<B> MutEventSubscriber for Vsock<B>
186where
187 B: Debug + VsockBackend + 'static,
188{
189 fn process(&mut self, event: Events, ops: &mut EventOps) {
190 let source = event.data();
191 let evset = event.event_set();
192
193 if self.is_activated() {
194 let used_queues = match source {
195 Self::PROCESS_ACTIVATE => {
196 self.handle_activate_event(ops);
197 Vec::new()
198 }
199 Self::PROCESS_RXQ => self.handle_rxq_event(evset),
200 Self::PROCESS_TXQ => self.handle_txq_event(evset),
201 Self::PROCESS_EVQ => {
202 self.handle_evq_event(evset);
203 Vec::new()
204 }
205 Self::PROCESS_NOTIFY_BACKEND => self.notify_backend(evset).unwrap(),
206 _ => {
207 warn!("Unexpected vsock event received: {:?}", source);
208 Vec::new()
209 }
210 };
211 self.signal_used_queues(&used_queues)
212 .expect("vsock: Could not trigger device interrupt");
213 } else {
214 warn!(
215 "Vsock: The device is not yet activated. Spurious event received: {:?}",
216 source
217 );
218 }
219 }
220
221 fn init(&mut self, ops: &mut EventOps) {
222 if self.is_activated() {
227 self.register_runtime_events(ops);
228 } else {
229 self.register_activate_event(ops);
230 }
231 }
232}
233
234#[cfg(test)]
235mod tests {
236 use std::sync::{Arc, Mutex};
237
238 use event_manager::{EventManager, SubscriberOps};
239
240 use super::super::*;
241 use super::*;
242 use crate::devices::virtio::vsock::test_utils::{EventHandlerContext, TestContext};
243
244 #[test]
245 fn test_txq_event() {
246 {
250 let test_ctx = TestContext::new();
251 let mut ctx = test_ctx.create_event_handler_context();
252 ctx.mock_activate(test_ctx.mem.clone(), test_ctx.interrupt.clone());
253
254 ctx.device.backend.set_pending_rx(false);
255 ctx.signal_txq_event();
256
257 assert_eq!(ctx.guest_txvq.used.idx.get(), 1);
259 assert_eq!(ctx.guest_rxvq.used.idx.get(), 0);
261 }
262
263 {
267 let test_ctx = TestContext::new();
268 let mut ctx = test_ctx.create_event_handler_context();
269 ctx.mock_activate(test_ctx.mem.clone(), test_ctx.interrupt.clone());
270
271 ctx.device.backend.set_pending_rx(true);
272 ctx.signal_txq_event();
273
274 assert_eq!(ctx.guest_txvq.used.idx.get(), 1);
276 assert_eq!(ctx.guest_rxvq.used.idx.get(), 1);
277 }
278
279 {
283 let test_ctx = TestContext::new();
284 let mut ctx = test_ctx.create_event_handler_context();
285 ctx.mock_activate(test_ctx.mem.clone(), test_ctx.interrupt.clone());
286
287 ctx.device.backend.set_pending_rx(false);
288 ctx.device.backend.set_tx_err(Some(VsockError::NoData));
289 ctx.signal_txq_event();
290
291 assert_eq!(ctx.guest_txvq.used.idx.get(), 0);
293 assert_eq!(ctx.guest_rxvq.used.idx.get(), 0);
294 }
295
296 {
299 let test_ctx = TestContext::new();
300 let mut ctx = test_ctx.create_event_handler_context();
301 ctx.mock_activate(test_ctx.mem.clone(), test_ctx.interrupt.clone());
302
303 ctx.guest_txvq.dtable[0].len.set(0);
305 ctx.guest_txvq.dtable[1].len.set(0);
306 ctx.signal_txq_event();
307
308 assert_eq!(ctx.guest_txvq.used.idx.get(), 1);
311 assert_eq!(ctx.device.backend.tx_ok_cnt, 0);
312 }
313
314 {
316 let test_ctx = TestContext::new();
317 let mut ctx = test_ctx.create_event_handler_context();
318 ctx.mock_activate(test_ctx.mem.clone(), test_ctx.interrupt.clone());
319
320 let metric_before = METRICS.tx_queue_event_fails.count();
321 ctx.device.handle_txq_event(EventSet::IN);
322 assert_eq!(metric_before + 1, METRICS.tx_queue_event_fails.count());
323 }
324 }
325
326 #[test]
327 fn test_rxq_event() {
328 {
333 let test_ctx = TestContext::new();
334 let mut ctx = test_ctx.create_event_handler_context();
335 ctx.mock_activate(test_ctx.mem.clone(), test_ctx.interrupt.clone());
336
337 ctx.device.backend.set_pending_rx(true);
338 ctx.device.backend.set_rx_err(Some(VsockError::NoData));
339 ctx.signal_rxq_event();
340
341 assert_eq!(ctx.guest_rxvq.used.idx.get(), 0);
343 }
344
345 {
350 let test_ctx = TestContext::new();
351 let mut ctx = test_ctx.create_event_handler_context();
352 ctx.mock_activate(test_ctx.mem.clone(), test_ctx.interrupt.clone());
353
354 ctx.device.backend.set_pending_rx(true);
355 ctx.signal_rxq_event();
356
357 assert_eq!(ctx.guest_rxvq.used.idx.get(), 1);
359 }
360
361 {
363 let test_ctx = TestContext::new();
364 let mut ctx = test_ctx.create_event_handler_context();
365 ctx.mock_activate(test_ctx.mem.clone(), test_ctx.interrupt.clone());
366
367 ctx.guest_rxvq.dtable[0].len.set(0);
369 ctx.guest_rxvq.dtable[1].len.set(0);
370
371 assert!(ctx.device.process_rx().unwrap());
373 assert_eq!(ctx.guest_rxvq.used.idx.get(), 1);
374 assert_eq!(ctx.device.backend.rx_ok_cnt, 0);
375 }
376
377 {
379 let test_ctx = TestContext::new();
380 let mut ctx = test_ctx.create_event_handler_context();
381 ctx.mock_activate(test_ctx.mem.clone(), test_ctx.interrupt.clone());
382 ctx.device.backend.set_pending_rx(false);
383 let metric_before = METRICS.rx_queue_event_fails.count();
384 ctx.device.handle_rxq_event(EventSet::IN);
385 assert_eq!(metric_before + 1, METRICS.rx_queue_event_fails.count());
386 }
387 }
388
389 #[test]
390 fn test_evq_event() {
391 {
393 let test_ctx = TestContext::new();
394 let mut ctx = test_ctx.create_event_handler_context();
395 ctx.device.backend.set_pending_rx(false);
396 let metric_before = METRICS.ev_queue_event_fails.count();
397 ctx.device.handle_evq_event(EventSet::IN);
398 assert_eq!(metric_before + 1, METRICS.ev_queue_event_fails.count());
399 }
400 }
401
402 #[test]
403 fn test_backend_event() {
404 {
408 let test_ctx = TestContext::new();
409 let mut ctx = test_ctx.create_event_handler_context();
410 ctx.mock_activate(test_ctx.mem.clone(), test_ctx.interrupt.clone());
411
412 ctx.device.backend.set_pending_rx(true);
413 ctx.device.notify_backend(EventSet::IN).unwrap();
414
415 assert_eq!(ctx.device.backend.evset, Some(EventSet::IN));
417 assert_eq!(ctx.guest_txvq.used.idx.get(), 1);
419 assert_eq!(ctx.guest_rxvq.used.idx.get(), 1);
421 }
422
423 {
427 let test_ctx = TestContext::new();
428 let mut ctx = test_ctx.create_event_handler_context();
429 ctx.mock_activate(test_ctx.mem.clone(), test_ctx.interrupt.clone());
430
431 ctx.device.backend.set_pending_rx(false);
432 ctx.device.notify_backend(EventSet::IN).unwrap();
433
434 assert_eq!(ctx.device.backend.evset, Some(EventSet::IN));
436 assert_eq!(ctx.guest_txvq.used.idx.get(), 1);
438 assert_eq!(ctx.guest_rxvq.used.idx.get(), 0);
440 }
441 }
442
443 #[cfg(target_arch = "x86_64")]
450 fn vsock_bof_helper(test_ctx: &mut TestContext, desc_idx: usize, addr: u64, len: u32) {
451 use crate::vstate::memory::{Bytes, GuestAddress};
452
453 assert!(desc_idx <= 1);
454
455 {
456 let mut ctx = test_ctx.create_event_handler_context();
457 ctx.guest_rxvq.dtable[desc_idx].addr.set(addr);
458 ctx.guest_rxvq.dtable[desc_idx].len.set(len);
459 if let Some(rx_desc) = ctx.device.queues[RXQ_INDEX].pop().unwrap() {
462 VsockPacketRx::new()
463 .unwrap()
464 .parse(&test_ctx.mem, rx_desc)
465 .unwrap_err();
466 }
467 }
468
469 {
470 let mut ctx = test_ctx.create_event_handler_context();
471
472 if desc_idx == 1 {
475 let hdr_len_addr = GuestAddress(ctx.guest_txvq.dtable[0].addr.get() + 24);
477 test_ctx
478 .mem
479 .write_obj(len.to_le_bytes(), hdr_len_addr)
480 .unwrap();
481 }
482
483 ctx.guest_txvq.dtable[desc_idx].addr.set(addr);
484 ctx.guest_txvq.dtable[desc_idx].len.set(len);
485
486 if let Some(tx_desc) = ctx.device.queues[TXQ_INDEX].pop().unwrap() {
487 VsockPacketTx::default()
488 .parse(&test_ctx.mem, tx_desc)
489 .unwrap_err();
490 }
491 }
492 }
493
494 #[test]
495 #[cfg(target_arch = "x86_64")]
496 #[allow(clippy::cast_possible_truncation)] fn test_vsock_bof() {
498 use crate::arch::x86_64::layout::FIRST_ADDR_PAST_32BITS;
499 use crate::arch::{MMIO32_MEM_SIZE, MMIO32_MEM_START};
500 use crate::devices::virtio::vsock::packet::VSOCK_PKT_HDR_SIZE;
501 use crate::test_utils::multi_region_mem;
502 use crate::utils::mib_to_bytes;
503 use crate::vstate::memory::GuestAddress;
504
505 const MIB: usize = mib_to_bytes(1);
506
507 let mut test_ctx = TestContext::new();
508 test_ctx.mem = multi_region_mem(&[
509 (GuestAddress(0), 8 * MIB),
510 (GuestAddress(MMIO32_MEM_START - MIB as u64), MIB),
511 (GuestAddress(FIRST_ADDR_PAST_32BITS), MIB),
512 ]);
513
514 {
516 let mut ctx = test_ctx.create_event_handler_context();
517 let rx_desc = ctx.device.queues[RXQ_INDEX].pop().unwrap().unwrap();
518 VsockPacketRx::new()
519 .unwrap()
520 .parse(&test_ctx.mem, rx_desc)
521 .unwrap();
522 }
523
524 {
525 let mut ctx = test_ctx.create_event_handler_context();
526 let tx_desc = ctx.device.queues[TXQ_INDEX].pop().unwrap().unwrap();
527 VsockPacketTx::default()
528 .parse(&test_ctx.mem, tx_desc)
529 .unwrap();
530 }
531
532 vsock_bof_helper(&mut test_ctx, 0, MMIO32_MEM_START - 1, VSOCK_PKT_HDR_SIZE);
534
535 vsock_bof_helper(
538 &mut test_ctx,
539 1,
540 MMIO32_MEM_START - 4,
541 MMIO32_MEM_SIZE as u32 + 4,
542 );
543
544 vsock_bof_helper(
547 &mut test_ctx,
548 1,
549 MMIO32_MEM_START - 4,
550 MMIO32_MEM_SIZE as u32 + 100,
551 );
552 }
553
554 #[test]
555 fn test_event_handler() {
556 let mut event_manager = EventManager::new().unwrap();
557 let test_ctx = TestContext::new();
558 let EventHandlerContext {
559 device,
560 guest_rxvq,
561 guest_txvq,
562 ..
563 } = test_ctx.create_event_handler_context();
564
565 let vsock = Arc::new(Mutex::new(device));
566 let _id = event_manager.add_subscriber(vsock.clone());
567
568 {
572 let mut device = vsock.lock().unwrap();
573 device.backend.set_pending_rx(true);
574 device.queue_events[TXQ_INDEX].write(1).unwrap();
575 }
576
577 let ev_count = event_manager.run_with_timeout(50).unwrap();
580 assert_eq!(ev_count, 0);
581
582 {
584 let device = vsock.lock().unwrap();
585
586 device.queue_events[TXQ_INDEX].write(1).unwrap();
588 let ev_count = event_manager.run_with_timeout(50).unwrap();
589 assert_eq!(ev_count, 0);
590
591 assert_eq!(guest_rxvq.used.idx.get(), 0);
593 assert_eq!(guest_txvq.used.idx.get(), 0);
594 }
595
596 vsock
598 .lock()
599 .unwrap()
600 .activate(test_ctx.mem.clone(), test_ctx.interrupt.clone())
601 .unwrap();
602 let ev_count = event_manager.run_with_timeout(50).unwrap();
604 assert_eq!(ev_count, 1);
605
606 {
608 let ev_count = event_manager
609 .run_with_timeout(100)
610 .expect("Metrics event timeout or error.");
611 assert_eq!(ev_count, 1);
612 assert_eq!(guest_rxvq.used.idx.get(), 1);
614 assert_eq!(guest_txvq.used.idx.get(), 1);
615 }
616 }
617}