vmm/io_uring/queue/
completion.rs

1// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::fmt::Debug;
5use std::num::Wrapping;
6use std::os::unix::io::RawFd;
7use std::sync::atomic::Ordering;
8
9use vm_memory::{Bytes, VolatileMemory, VolatileMemoryError};
10
11use super::mmap::{MmapError, mmap};
12use crate::io_uring::generated;
13use crate::io_uring::operation::Cqe;
14use crate::vstate::memory::MmapRegion;
15
16#[derive(Debug, thiserror::Error, displaydoc::Display)]
17/// CQueue Error.
18pub enum CQueueError {
19    /// Error mapping the ring: {0}
20    Mmap(#[from] MmapError),
21    /// Error reading/writing volatile memory: {0}
22    VolatileMemory(#[from] VolatileMemoryError),
23    /// Error in removing data from the slab
24    SlabRemoveFailed,
25}
26
27#[derive(Debug)]
28pub(crate) struct CompletionQueue {
29    // Offsets.
30    head_off: usize,
31    tail_off: usize,
32    cqes_off: usize,
33
34    // Cached values.
35    unmasked_head: Wrapping<u32>,
36    count: u32,
37    ring_mask: u32,
38
39    // Mmap-ed cqes ring.
40    cqes: MmapRegion,
41}
42
43impl CompletionQueue {
44    pub(crate) fn new(
45        io_uring_fd: RawFd,
46        params: &generated::io_uring_params,
47    ) -> Result<Self, CQueueError> {
48        let offsets = params.cq_off;
49
50        // Map the CQ_ring. The actual size of the ring is `num_entries * size_of(entry_type)`.
51        // To this we add an offset as per the io_uring specifications.
52        let ring_size = (params.cq_off.cqes as usize)
53            + (params.cq_entries as usize) * std::mem::size_of::<generated::io_uring_cqe>();
54        let cqes = mmap(ring_size, io_uring_fd, generated::IORING_OFF_CQ_RING.into())?;
55
56        let ring = cqes.as_volatile_slice();
57        let ring_mask = ring.read_obj(offsets.ring_mask as usize)?;
58
59        Ok(Self {
60            // safe because it's an u32 offset
61            head_off: offsets.head as usize,
62            // safe because it's an u32 offset
63            tail_off: offsets.tail as usize,
64            // safe because it's an u32 offset
65            cqes_off: offsets.cqes as usize,
66            // We can init this to 0 and cache it because we are the only ones modifying it.
67            unmasked_head: Wrapping(0),
68            count: params.cq_entries,
69            ring_mask,
70            cqes,
71        })
72    }
73
74    pub(crate) fn count(&self) -> u32 {
75        self.count
76    }
77
78    pub(crate) fn pop<T: Debug>(
79        &mut self,
80        slab: &mut slab::Slab<T>,
81    ) -> Result<Option<Cqe<T>>, CQueueError> {
82        let ring = self.cqes.as_volatile_slice();
83        // get the head & tail
84        let head = self.unmasked_head.0 & self.ring_mask;
85        let unmasked_tail = ring.load::<u32>(self.tail_off, Ordering::Acquire)?;
86
87        // validate that we have smth to fetch
88        if Wrapping(unmasked_tail) - self.unmasked_head > Wrapping(0) {
89            let cqe: generated::io_uring_cqe = ring.read_obj(
90                self.cqes_off + (head as usize) * std::mem::size_of::<generated::io_uring_cqe>(),
91            )?;
92
93            // increase the head
94            self.unmasked_head += Wrapping(1u32);
95            ring.store(self.unmasked_head.0, self.head_off, Ordering::Release)?;
96
97            let res = cqe.res;
98            #[allow(clippy::cast_possible_truncation)]
99            let index = cqe.user_data as usize;
100            match slab.try_remove(index) {
101                Some(user_data) => Ok(Some(Cqe::new(res, user_data))),
102                None => Err(CQueueError::SlabRemoveFailed),
103            }
104        } else {
105            Ok(None)
106        }
107    }
108}
109
110impl Drop for CompletionQueue {
111    fn drop(&mut self) {
112        // SAFETY: Safe because parameters are valid.
113        unsafe { libc::munmap(self.cqes.as_ptr().cast::<libc::c_void>(), self.cqes.size()) };
114    }
115}