vmm/io_uring/queue/
completion.rs1use std::fmt::Debug;
5use std::num::Wrapping;
6use std::os::unix::io::RawFd;
7use std::sync::atomic::Ordering;
8
9use vm_memory::{Bytes, VolatileMemory, VolatileMemoryError};
10
11use super::mmap::{MmapError, mmap};
12use crate::io_uring::generated;
13use crate::io_uring::operation::Cqe;
14use crate::vstate::memory::MmapRegion;
15
16#[derive(Debug, thiserror::Error, displaydoc::Display)]
17pub enum CQueueError {
19 Mmap(#[from] MmapError),
21 VolatileMemory(#[from] VolatileMemoryError),
23 SlabRemoveFailed,
25}
26
27#[derive(Debug)]
28pub(crate) struct CompletionQueue {
29 head_off: usize,
31 tail_off: usize,
32 cqes_off: usize,
33
34 unmasked_head: Wrapping<u32>,
36 count: u32,
37 ring_mask: u32,
38
39 cqes: MmapRegion,
41}
42
43impl CompletionQueue {
44 pub(crate) fn new(
45 io_uring_fd: RawFd,
46 params: &generated::io_uring_params,
47 ) -> Result<Self, CQueueError> {
48 let offsets = params.cq_off;
49
50 let ring_size = (params.cq_off.cqes as usize)
53 + (params.cq_entries as usize) * std::mem::size_of::<generated::io_uring_cqe>();
54 let cqes = mmap(ring_size, io_uring_fd, generated::IORING_OFF_CQ_RING.into())?;
55
56 let ring = cqes.as_volatile_slice();
57 let ring_mask = ring.read_obj(offsets.ring_mask as usize)?;
58
59 Ok(Self {
60 head_off: offsets.head as usize,
62 tail_off: offsets.tail as usize,
64 cqes_off: offsets.cqes as usize,
66 unmasked_head: Wrapping(0),
68 count: params.cq_entries,
69 ring_mask,
70 cqes,
71 })
72 }
73
74 pub(crate) fn count(&self) -> u32 {
75 self.count
76 }
77
78 pub(crate) fn pop<T: Debug>(
79 &mut self,
80 slab: &mut slab::Slab<T>,
81 ) -> Result<Option<Cqe<T>>, CQueueError> {
82 let ring = self.cqes.as_volatile_slice();
83 let head = self.unmasked_head.0 & self.ring_mask;
85 let unmasked_tail = ring.load::<u32>(self.tail_off, Ordering::Acquire)?;
86
87 if Wrapping(unmasked_tail) - self.unmasked_head > Wrapping(0) {
89 let cqe: generated::io_uring_cqe = ring.read_obj(
90 self.cqes_off + (head as usize) * std::mem::size_of::<generated::io_uring_cqe>(),
91 )?;
92
93 self.unmasked_head += Wrapping(1u32);
95 ring.store(self.unmasked_head.0, self.head_off, Ordering::Release)?;
96
97 let res = cqe.res;
98 #[allow(clippy::cast_possible_truncation)]
99 let index = cqe.user_data as usize;
100 match slab.try_remove(index) {
101 Some(user_data) => Ok(Some(Cqe::new(res, user_data))),
102 None => Err(CQueueError::SlabRemoveFailed),
103 }
104 } else {
105 Ok(None)
106 }
107 }
108}
109
110impl Drop for CompletionQueue {
111 fn drop(&mut self) {
112 unsafe { libc::munmap(self.cqes.as_ptr().cast::<libc::c_void>(), self.cqes.size()) };
114 }
115}