vmm/io_uring/queue/
submission.rs

1// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::fmt::Debug;
5use std::io::Error as IOError;
6use std::mem;
7use std::num::Wrapping;
8use std::os::unix::io::RawFd;
9use std::sync::atomic::Ordering;
10
11use vm_memory::{VolatileMemory, VolatileMemoryError};
12use vmm_sys_util::syscall::SyscallReturnCode;
13
14use super::mmap::{MmapError, mmap};
15use crate::io_uring::generated;
16use crate::io_uring::operation::Sqe;
17use crate::vstate::memory::{Bytes, MmapRegion};
18
19#[derive(Debug, thiserror::Error, displaydoc::Display)]
20/// SQueue Error.
21pub enum SQueueError {
22    /// The queue is full.
23    FullQueue,
24    /// Error mapping the ring: {0}
25    Mmap(#[from] MmapError),
26    /// Error reading/writing volatile memory: {0}
27    VolatileMemory(#[from] VolatileMemoryError),
28    /// Error returned by `io_uring_enter`: {0}
29    Submit(#[from] IOError),
30}
31
32#[derive(Debug)]
33pub(crate) struct SubmissionQueue {
34    io_uring_fd: RawFd,
35
36    // Offsets.
37    head_off: usize,
38    tail_off: usize,
39
40    // Cached values.
41    ring_mask: u32,
42    count: u32,
43    unmasked_tail: Wrapping<u32>,
44
45    // Mmap-ed ring.
46    ring: MmapRegion,
47    // Mmap-ed sqes.
48    sqes: MmapRegion,
49
50    // Number of ops yet to be submitted.
51    to_submit: u32,
52}
53
54impl SubmissionQueue {
55    pub(crate) fn new(
56        io_uring_fd: RawFd,
57        params: &generated::io_uring_params,
58    ) -> Result<Self, SQueueError> {
59        let (ring, sqes) = Self::mmap(io_uring_fd, params)?;
60        let ring_slice = ring.as_volatile_slice();
61
62        // since we don't need the extra layer of indirection, we can simply map the index array
63        // to be array[i] = i;
64        let sq_array = ring_slice.offset(params.sq_off.array as usize)?;
65        for i in 0..params.sq_entries {
66            sq_array.write_obj(i, mem::size_of::<u32>() * (i as usize))?;
67        }
68
69        let ring_mask = ring_slice.read_obj(params.sq_off.ring_mask as usize)?;
70
71        Ok(Self {
72            io_uring_fd,
73            head_off: params.sq_off.head as usize,
74            tail_off: params.sq_off.tail as usize,
75            ring_mask,
76            count: params.sq_entries,
77            // We can init this to 0 and cache it because we are the only ones modifying it.
78            unmasked_tail: Wrapping(0),
79            ring,
80            sqes,
81            to_submit: 0,
82        })
83    }
84
85    pub(crate) fn push(&mut self, sqe: Sqe) -> Result<(), (SQueueError, u64)> {
86        let ring_slice = self.ring.as_volatile_slice();
87
88        // get the sqe tail
89        let tail = self.unmasked_tail.0 & self.ring_mask;
90
91        // get the pending sqes
92        let pending = match self.pending() {
93            Ok(n) => n,
94            Err(err) => return Err((err, sqe.user_data())),
95        };
96
97        if pending >= self.count {
98            return Err((SQueueError::FullQueue, sqe.user_data()));
99        }
100
101        // retrieve and populate the sqe
102        if let Err(err) = self.sqes.as_volatile_slice().write_obj(
103            sqe.0,
104            (tail as usize) * mem::size_of::<generated::io_uring_sqe>(),
105        ) {
106            return Err((SQueueError::VolatileMemory(err), sqe.user_data()));
107        }
108
109        // increment the sqe tail
110        self.unmasked_tail += Wrapping(1u32);
111
112        if let Err(err) = ring_slice.store(self.unmasked_tail.0, self.tail_off, Ordering::Release) {
113            return Err((SQueueError::VolatileMemory(err), sqe.user_data()));
114        }
115
116        // This is safe since we already checked if there is enough space in the queue;
117        self.to_submit += 1;
118
119        Ok(())
120    }
121
122    pub(crate) fn submit(&mut self, min_complete: u32) -> Result<u32, SQueueError> {
123        if self.to_submit == 0 && min_complete == 0 {
124            // Nothing to submit and nothing to wait for.
125            return Ok(0);
126        }
127
128        let mut flags = 0;
129
130        if min_complete > 0 {
131            flags |= generated::IORING_ENTER_GETEVENTS;
132        }
133        // SAFETY: Safe because values are valid and we check the return value.
134        let submitted = SyscallReturnCode(unsafe {
135            libc::syscall(
136                libc::SYS_io_uring_enter,
137                self.io_uring_fd,
138                self.to_submit,
139                min_complete,
140                flags,
141                std::ptr::null::<libc::sigset_t>(),
142            )
143        })
144        .into_result()?;
145        // It's safe to convert to u32 since the syscall didn't return an error.
146        let submitted = u32::try_from(submitted).unwrap();
147
148        // This is safe since submitted <= self.to_submit. However we use a saturating_sub
149        // for extra safety.
150        self.to_submit = self.to_submit.saturating_sub(submitted);
151
152        Ok(submitted)
153    }
154
155    fn mmap(
156        io_uring_fd: RawFd,
157        params: &generated::io_uring_params,
158    ) -> Result<(MmapRegion, MmapRegion), SQueueError> {
159        // map the SQ_ring. The actual size of the ring is `num_entries * size_of(entry_type)`.
160        // To this we add an offset as per the io_uring specifications.
161        let sqe_ring_size =
162            (params.sq_off.array as usize) + (params.sq_entries as usize) * mem::size_of::<u32>();
163
164        let sqe_ring = mmap(
165            sqe_ring_size,
166            io_uring_fd,
167            generated::IORING_OFF_SQ_RING.into(),
168        )?;
169
170        // map the SQEs.
171        let sqes_array_size =
172            (params.sq_entries as usize) * mem::size_of::<generated::io_uring_sqe>();
173
174        let sqes = mmap(
175            sqes_array_size,
176            io_uring_fd,
177            generated::IORING_OFF_SQES.into(),
178        )?;
179
180        Ok((sqe_ring, sqes))
181    }
182
183    pub(crate) fn pending(&self) -> Result<u32, SQueueError> {
184        let ring_slice = self.ring.as_volatile_slice();
185        // get the sqe head
186        let unmasked_head = ring_slice.load::<u32>(self.head_off, Ordering::Acquire)?;
187
188        Ok((self.unmasked_tail - Wrapping(unmasked_head)).0)
189    }
190}
191
192impl Drop for SubmissionQueue {
193    fn drop(&mut self) {
194        // SAFETY: Safe because parameters are valid.
195        unsafe { libc::munmap(self.ring.as_ptr().cast::<libc::c_void>(), self.ring.size()) };
196        // SAFETY: Safe because parameters are valid.
197        unsafe { libc::munmap(self.sqes.as_ptr().cast::<libc::c_void>(), self.sqes.size()) };
198    }
199}