vmm/io_uring/
mod.rs

1// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4mod generated;
5pub mod operation;
6mod probe;
7mod queue;
8pub mod restriction;
9
10use std::collections::HashSet;
11use std::fmt::Debug;
12use std::fs::File;
13use std::io::Error as IOError;
14use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
15
16use generated::io_uring_params;
17use operation::{Cqe, FixedFd, OpCode, Operation};
18use probe::{PROBE_LEN, ProbeWrapper};
19pub use queue::completion::CQueueError;
20use queue::completion::CompletionQueue;
21pub use queue::submission::SQueueError;
22use queue::submission::SubmissionQueue;
23use restriction::Restriction;
24use vmm_sys_util::syscall::SyscallReturnCode;
25
26use crate::io_uring::generated::io_uring_register_op;
27
28// IO_uring operations that we require to be supported by the host kernel.
29const REQUIRED_OPS: [OpCode; 2] = [OpCode::Read, OpCode::Write];
30// Taken from linux/fs/io_uring.c
31const IORING_MAX_FIXED_FILES: usize = 1 << 15;
32
33#[derive(Debug, thiserror::Error, displaydoc::Display)]
34/// IoUring Error.
35pub enum IoUringError {
36    /// Error originating in the completion queue: {0}
37    CQueue(CQueueError),
38    /// Could not enable the ring: {0}
39    Enable(IOError),
40    /// A FamStructWrapper operation has failed: {0}
41    Fam(vmm_sys_util::fam::Error),
42    /// The number of ops in the ring is >= CQ::count
43    FullCQueue,
44    /// Fd was not registered: {0}
45    InvalidFixedFd(FixedFd),
46    /// There are no registered fds.
47    NoRegisteredFds,
48    /// Error probing the io_uring subsystem: {0}
49    Probe(IOError),
50    /// Could not register eventfd: {0}
51    RegisterEventfd(IOError),
52    /// Could not register file: {0}
53    RegisterFile(IOError),
54    /// Attempted to register too many files.
55    RegisterFileLimitExceeded,
56    /// Could not register restrictions: {0}
57    RegisterRestrictions(IOError),
58    /// Error calling io_uring_setup: {0}
59    Setup(IOError),
60    /// Error originating in the submission queue: {0}
61    SQueue(SQueueError),
62    /// Required feature is not supported on the host kernel: {0}
63    UnsupportedFeature(&'static str),
64    /// Required operation is not supported on the host kernel: {0}
65    UnsupportedOperation(&'static str),
66}
67
68impl IoUringError {
69    /// Return true if this error is caused by a full submission or completion queue.
70    pub fn is_throttling_err(&self) -> bool {
71        matches!(
72            self,
73            Self::FullCQueue | Self::SQueue(SQueueError::FullQueue)
74        )
75    }
76}
77
78/// Main object representing an io_uring instance.
79#[derive(Debug)]
80pub struct IoUring<T> {
81    registered_fds_count: u32,
82    squeue: SubmissionQueue,
83    cqueue: CompletionQueue,
84    // Make sure the fd is declared after the queues, so that it isn't dropped before them.
85    // If we drop the queues after the File, the associated kernel mem will never be freed.
86    // The correct cleanup order is munmap(rings) -> close(fd).
87    // We don't need to manually drop the fields in order,since Rust has a well defined drop order.
88    fd: File,
89
90    // The total number of ops. These includes the ops on the submission queue, the in-flight ops
91    // and the ops that are in the CQ, but haven't been popped yet.
92    num_ops: u32,
93    slab: slab::Slab<T>,
94}
95
96impl<T: Debug> IoUring<T> {
97    /// Create a new instance.
98    ///
99    /// # Arguments
100    ///
101    /// * `num_entries` - Requested number of entries in the ring. Will be rounded up to the nearest
102    ///   power of two.
103    /// * `files` - Files to be registered for IO.
104    /// * `restrictions` - Vector of [`Restriction`](restriction/enum.Restriction.html)s
105    /// * `eventfd` - Optional eventfd for receiving completion notifications.
106    pub fn new(
107        num_entries: u32,
108        files: Vec<&File>,
109        restrictions: Vec<Restriction>,
110        eventfd: Option<RawFd>,
111    ) -> Result<Self, IoUringError> {
112        let mut params = io_uring_params {
113            // Create the ring as disabled, so that we may register restrictions.
114            flags: generated::IORING_SETUP_R_DISABLED,
115
116            ..Default::default()
117        };
118
119        // SAFETY: Safe because values are valid and we check the return value.
120        let fd = SyscallReturnCode(unsafe {
121            libc::syscall(
122                libc::SYS_io_uring_setup,
123                num_entries,
124                &mut params as *mut io_uring_params,
125            )
126        })
127        .into_result()
128        .map_err(IoUringError::Setup)?;
129        // Safe to unwrap because the fd is valid.
130        let fd = RawFd::try_from(fd).unwrap();
131
132        // SAFETY: Safe because the fd is valid and because this struct owns the fd.
133        let file = unsafe { File::from_raw_fd(fd) };
134
135        Self::check_features(params)?;
136
137        let squeue = SubmissionQueue::new(fd, &params).map_err(IoUringError::SQueue)?;
138        let cqueue = CompletionQueue::new(fd, &params).map_err(IoUringError::CQueue)?;
139        let slab =
140            slab::Slab::with_capacity(params.sq_entries as usize + params.cq_entries as usize);
141
142        let mut instance = Self {
143            squeue,
144            cqueue,
145            fd: file,
146            registered_fds_count: 0,
147            num_ops: 0,
148            slab,
149        };
150
151        instance.check_operations()?;
152
153        if let Some(eventfd) = eventfd {
154            instance.register_eventfd(eventfd)?;
155        }
156
157        instance.register_restrictions(restrictions)?;
158
159        instance.register_files(files)?;
160
161        instance.enable()?;
162
163        Ok(instance)
164    }
165
166    /// Push an [`Operation`](operation/struct.Operation.html) onto the submission queue.
167    pub fn push(&mut self, op: Operation<T>) -> Result<(), (IoUringError, T)> {
168        // validate that we actually did register fds
169        let fd = op.fd();
170        match self.registered_fds_count {
171            0 => Err((IoUringError::NoRegisteredFds, op.user_data)),
172            len if fd >= len => Err((IoUringError::InvalidFixedFd(fd), op.user_data)),
173            _ => {
174                if self.num_ops >= self.cqueue.count() {
175                    return Err((IoUringError::FullCQueue, op.user_data));
176                }
177                self.squeue
178                    .push(op.into_sqe(&mut self.slab))
179                    .inspect(|_| {
180                        // This is safe since self.num_ops < IORING_MAX_CQ_ENTRIES (65536)
181                        self.num_ops += 1;
182                    })
183                    .map_err(|(sqe_err, user_data_key)| -> (IoUringError, T) {
184                        (
185                            IoUringError::SQueue(sqe_err),
186                            // We don't use slab.try_remove here for 2 reasons:
187                            // 1. user_data was inserted in slab with step `op.into_sqe` just
188                            //    before the push op so the user_data key should be valid and if
189                            //    key is valid then `slab.remove()` will not fail.
190                            // 2. If we use `slab.try_remove()` we'll have to find a way to return
191                            //    a default value for the generic type T which is difficult because
192                            //    it expands to more crates which don't make it easy to define a
193                            //    default/clone type for type T.
194                            // So believing that `slab.remove` won't fail we don't use
195                            // the `slab.try_remove` method.
196                            #[allow(clippy::cast_possible_truncation)]
197                            self.slab.remove(user_data_key as usize),
198                        )
199                    })
200            }
201        }
202    }
203
204    /// Pop a completed entry off the completion queue. Returns `Ok(None)` if there are no entries.
205    /// The type `T` must be the same as the `user_data` type used for `push`-ing the operation.
206    pub fn pop(&mut self) -> Result<Option<Cqe<T>>, IoUringError> {
207        self.cqueue
208            .pop(&mut self.slab)
209            .map(|maybe_cqe| {
210                maybe_cqe.inspect(|_| {
211                    // This is safe since the pop-ed CQEs have been previously pushed. However
212                    // we use a saturating_sub for extra safety.
213                    self.num_ops = self.num_ops.saturating_sub(1);
214                })
215            })
216            .map_err(IoUringError::CQueue)
217    }
218
219    fn do_submit(&mut self, min_complete: u32) -> Result<u32, IoUringError> {
220        self.squeue
221            .submit(min_complete)
222            .map_err(IoUringError::SQueue)
223    }
224
225    /// Submit all operations but don't wait for any completions.
226    pub fn submit(&mut self) -> Result<u32, IoUringError> {
227        self.do_submit(0)
228    }
229
230    /// Submit all operations and wait for their completion.
231    pub fn submit_and_wait_all(&mut self) -> Result<u32, IoUringError> {
232        self.do_submit(self.num_ops)
233    }
234
235    /// Return the number of operations currently on the submission queue.
236    pub fn pending_sqes(&self) -> Result<u32, IoUringError> {
237        self.squeue.pending().map_err(IoUringError::SQueue)
238    }
239
240    /// A total of the number of ops in the submission and completion queues, as well as the
241    /// in-flight ops.
242    pub fn num_ops(&self) -> u32 {
243        self.num_ops
244    }
245
246    fn enable(&mut self) -> Result<(), IoUringError> {
247        // SAFETY: Safe because values are valid and we check the return value.
248        SyscallReturnCode(unsafe {
249            libc::syscall(
250                libc::SYS_io_uring_register,
251                self.fd.as_raw_fd(),
252                io_uring_register_op::IORING_REGISTER_ENABLE_RINGS,
253                std::ptr::null::<libc::c_void>(),
254                0,
255            )
256        })
257        .into_empty_result()
258        .map_err(IoUringError::Enable)
259    }
260
261    fn register_files(&mut self, files: Vec<&File>) -> Result<(), IoUringError> {
262        if files.is_empty() {
263            // No-op.
264            return Ok(());
265        }
266
267        if (self.registered_fds_count as usize).saturating_add(files.len()) > IORING_MAX_FIXED_FILES
268        {
269            return Err(IoUringError::RegisterFileLimitExceeded);
270        }
271
272        // SAFETY: Safe because values are valid and we check the return value.
273        SyscallReturnCode(unsafe {
274            libc::syscall(
275                libc::SYS_io_uring_register,
276                self.fd.as_raw_fd(),
277                io_uring_register_op::IORING_REGISTER_FILES,
278                files
279                    .iter()
280                    .map(|f| f.as_raw_fd())
281                    .collect::<Vec<_>>()
282                    .as_mut_slice()
283                    .as_mut_ptr() as *const _,
284                files.len(),
285            )
286        })
287        .into_empty_result()
288        .map_err(IoUringError::RegisterFile)?;
289
290        // Safe to truncate since files.len() < IORING_MAX_FIXED_FILES
291        self.registered_fds_count += u32::try_from(files.len()).unwrap();
292        Ok(())
293    }
294
295    fn register_eventfd(&self, fd: RawFd) -> Result<(), IoUringError> {
296        // SAFETY: Safe because values are valid and we check the return value.
297        SyscallReturnCode(unsafe {
298            libc::syscall(
299                libc::SYS_io_uring_register,
300                self.fd.as_raw_fd(),
301                io_uring_register_op::IORING_REGISTER_EVENTFD,
302                (&fd) as *const _,
303                1,
304            )
305        })
306        .into_empty_result()
307        .map_err(IoUringError::RegisterEventfd)
308    }
309
310    fn register_restrictions(&self, restrictions: Vec<Restriction>) -> Result<(), IoUringError> {
311        if restrictions.is_empty() {
312            // No-op.
313            return Ok(());
314        }
315        // SAFETY: Safe because values are valid and we check the return value.
316        SyscallReturnCode(unsafe {
317            libc::syscall(
318                libc::SYS_io_uring_register,
319                self.fd.as_raw_fd(),
320                io_uring_register_op::IORING_REGISTER_RESTRICTIONS,
321                restrictions
322                    .iter()
323                    .map(generated::io_uring_restriction::from)
324                    .collect::<Vec<_>>()
325                    .as_mut_slice()
326                    .as_mut_ptr(),
327                restrictions.len(),
328            )
329        })
330        .into_empty_result()
331        .map_err(IoUringError::RegisterRestrictions)
332    }
333
334    fn check_features(params: io_uring_params) -> Result<(), IoUringError> {
335        // We require that the host kernel will never drop completed entries due to an (unlikely)
336        // overflow in the completion queue.
337        // This feature is supported for kernels greater than 5.7.
338        // An alternative fix would be to keep an internal counter that tracks the number of
339        // submitted entries that haven't been completed and makes sure it doesn't exceed
340        // (2 * num_entries).
341        if (params.features & generated::IORING_FEAT_NODROP) == 0 {
342            return Err(IoUringError::UnsupportedFeature("IORING_FEAT_NODROP"));
343        }
344
345        Ok(())
346    }
347
348    fn check_operations(&self) -> Result<(), IoUringError> {
349        let mut probes = ProbeWrapper::new(PROBE_LEN).map_err(IoUringError::Fam)?;
350
351        // SAFETY: Safe because values are valid and we check the return value.
352        SyscallReturnCode(unsafe {
353            libc::syscall(
354                libc::SYS_io_uring_register,
355                self.fd.as_raw_fd(),
356                io_uring_register_op::IORING_REGISTER_PROBE,
357                probes.as_mut_fam_struct_ptr(),
358                PROBE_LEN,
359            )
360        })
361        .into_empty_result()
362        .map_err(IoUringError::Probe)?;
363
364        let supported_opcodes: HashSet<u8> = probes
365            .as_slice()
366            .iter()
367            .filter(|op| ((u32::from(op.flags)) & generated::IO_URING_OP_SUPPORTED) != 0)
368            .map(|op| op.op)
369            .collect();
370
371        for opcode in REQUIRED_OPS.iter() {
372            if !supported_opcodes.contains(&(*opcode as u8)) {
373                return Err(IoUringError::UnsupportedOperation((*opcode).into()));
374            }
375        }
376
377        Ok(())
378    }
379}
380
381#[cfg(test)]
382mod tests {
383    #![allow(clippy::undocumented_unsafe_blocks)]
384    use std::os::unix::fs::FileExt;
385
386    use proptest::prelude::*;
387    use proptest::strategy::Strategy;
388    use proptest::test_runner::{Config, TestRunner};
389    use vm_memory::VolatileMemory;
390    use vmm_sys_util::syscall::SyscallReturnCode;
391    use vmm_sys_util::tempfile::TempFile;
392
393    /// -------------------------------------
394    /// BEGIN PROPERTY BASED TESTING
395    use super::*;
396    use crate::vstate::memory::{Bytes, MmapRegion};
397
398    fn drain_cqueue(ring: &mut IoUring<u32>) {
399        while let Some(entry) = ring.pop().unwrap() {
400            entry.result().unwrap();
401
402            // Assert that there were no partial writes.
403            let count = entry.result().unwrap();
404            let user_data = entry.user_data();
405            assert_eq!(count, user_data);
406        }
407    }
408
409    fn setup_mem_region(len: usize) -> MmapRegion {
410        const PROT: i32 = libc::PROT_READ | libc::PROT_WRITE;
411        const FLAGS: i32 = libc::MAP_ANONYMOUS | libc::MAP_PRIVATE;
412
413        let ptr = unsafe { libc::mmap(std::ptr::null_mut(), len, PROT, FLAGS, -1, 0) };
414
415        if (ptr as isize) < 0 {
416            panic!("Mmap failed with {}", std::io::Error::last_os_error());
417        }
418
419        unsafe {
420            // Use the raw version because we want to unmap memory ourselves.
421            MmapRegion::build_raw(ptr.cast::<u8>(), len, PROT, FLAGS).unwrap()
422        }
423    }
424
425    fn free_mem_region(region: MmapRegion) {
426        unsafe { libc::munmap(region.as_ptr().cast::<libc::c_void>(), region.len()) };
427    }
428
429    fn read_entire_mem_region(region: &MmapRegion) -> Vec<u8> {
430        let mut result = vec![0u8; region.len()];
431        let count = region.as_volatile_slice().read(&mut result[..], 0).unwrap();
432        assert_eq!(count, region.len());
433        result
434    }
435
436    #[allow(clippy::let_with_type_underscore)]
437    fn arbitrary_rw_operation(file_len: u32) -> impl Strategy<Value = Operation<u32>> {
438        (
439            // OpCode: 0 -> Write, 1 -> Read.
440            0..2,
441            // Length of the operation.
442            0u32..file_len,
443        )
444            .prop_flat_map(move |(op, len)| {
445                (
446                    // op
447                    Just(op),
448                    // len
449                    Just(len),
450                    // offset
451                    (0u32..(file_len - len)),
452                    // mem region offset
453                    (0u32..(file_len - len)),
454                )
455            })
456            .prop_map(move |(op, len, off, mem_off)| {
457                // We actually use an offset instead of an address, because we later need to modify
458                // the memory region on which the operation is performed, based on the opcode.
459                let mut operation = match op {
460                    0 => Operation::write(0, mem_off as usize, len, off.into(), len),
461                    _ => Operation::read(0, mem_off as usize, len, off.into(), len),
462                };
463
464                // Make sure the operations are executed in-order, so that they are equivalent to
465                // their sync counterparts.
466                operation.set_linked();
467                operation
468            })
469    }
470
471    #[test]
472    fn proptest_read_write_correctness() {
473        // Performs a sequence of random read and write operations on two files, with sync and
474        // async IO, respectively.
475        // Verifies that the files are identical afterwards and that the read operations returned
476        // the same values.
477
478        const FILE_LEN: u32 = 1024;
479        // The number of arbitrary operations in a testrun.
480        const OPS_COUNT: usize = 2000;
481        const RING_SIZE: u32 = 128;
482
483        // Allocate and init memory for holding the data that will be written into the file.
484        let write_mem_region = setup_mem_region(FILE_LEN as usize);
485
486        let sync_read_mem_region = setup_mem_region(FILE_LEN as usize);
487
488        let async_read_mem_region = setup_mem_region(FILE_LEN as usize);
489
490        // Init the write buffers with 0,1,2,...
491        for i in 0..FILE_LEN {
492            write_mem_region
493                .as_volatile_slice()
494                .write_obj(u8::try_from(i % u32::from(u8::MAX)).unwrap(), i as usize)
495                .unwrap();
496        }
497
498        // Create two files and init their contents to zeros.
499        let init_contents = [0u8; FILE_LEN as usize];
500        let file_async = TempFile::new().unwrap().into_file();
501        file_async.write_all_at(&init_contents, 0).unwrap();
502
503        let file_sync = TempFile::new().unwrap().into_file();
504        file_sync.write_all_at(&init_contents, 0).unwrap();
505
506        // Create a custom test runner since we had to add some state buildup to the test.
507        // (Referring to the above initializations).
508        let mut runner = TestRunner::new(Config {
509            #[cfg(target_arch = "x86_64")]
510            cases: 1000, // Should run for about a minute.
511            // Lower the cases on ARM since they take longer and cause coverage test timeouts.
512            #[cfg(target_arch = "aarch64")]
513            cases: 500,
514            ..Config::default()
515        });
516
517        runner
518            .run(
519                &proptest::collection::vec(arbitrary_rw_operation(FILE_LEN), OPS_COUNT),
520                |set| {
521                    let mut ring =
522                        IoUring::new(RING_SIZE, vec![&file_async], vec![], None).unwrap();
523
524                    for mut operation in set {
525                        // Perform the sync op.
526                        let count = match operation.opcode {
527                            OpCode::Write => u32::try_from(
528                                SyscallReturnCode(unsafe {
529                                    libc::pwrite(
530                                        file_sync.as_raw_fd(),
531                                        write_mem_region.as_ptr().add(operation.addr.unwrap())
532                                            as *const libc::c_void,
533                                        operation.len.unwrap() as usize,
534                                        i64::try_from(operation.offset.unwrap()).unwrap(),
535                                    )
536                                })
537                                .into_result()
538                                .unwrap(),
539                            )
540                            .unwrap(),
541                            OpCode::Read => u32::try_from(
542                                SyscallReturnCode(unsafe {
543                                    libc::pread(
544                                        file_sync.as_raw_fd(),
545                                        sync_read_mem_region
546                                            .as_ptr()
547                                            .add(operation.addr.unwrap())
548                                            .cast::<libc::c_void>(),
549                                        operation.len.unwrap() as usize,
550                                        i64::try_from(operation.offset.unwrap()).unwrap(),
551                                    )
552                                })
553                                .into_result()
554                                .unwrap(),
555                            )
556                            .unwrap(),
557                            _ => unreachable!(),
558                        };
559
560                        if count < operation.len.unwrap() {
561                            panic!("Synchronous partial operation: {:?}", operation);
562                        }
563
564                        // Perform the async op.
565
566                        // Modify the operation address based on the opcode.
567                        match operation.opcode {
568                            OpCode::Write => {
569                                operation.addr = Some(unsafe {
570                                    write_mem_region.as_ptr().add(operation.addr.unwrap()) as usize
571                                })
572                            }
573                            OpCode::Read => {
574                                operation.addr = Some(unsafe {
575                                    async_read_mem_region.as_ptr().add(operation.addr.unwrap())
576                                        as usize
577                                })
578                            }
579                            _ => unreachable!(),
580                        };
581
582                        // If the ring is full, submit and wait.
583                        if ring.pending_sqes().unwrap() == RING_SIZE {
584                            ring.submit_and_wait_all().unwrap();
585                            drain_cqueue(&mut ring);
586                        }
587                        ring.push(operation).unwrap();
588                    }
589
590                    // Submit any left async ops and wait.
591                    ring.submit_and_wait_all().unwrap();
592                    drain_cqueue(&mut ring);
593
594                    // Get the write result for async IO.
595                    let mut async_result = [0u8; FILE_LEN as usize];
596                    file_async.read_exact_at(&mut async_result, 0).unwrap();
597
598                    // Get the write result for sync IO.
599                    let mut sync_result = [0u8; FILE_LEN as usize];
600                    file_sync.read_exact_at(&mut sync_result, 0).unwrap();
601
602                    // Now compare the write results.
603                    assert_eq!(sync_result, async_result);
604
605                    // Now compare the read results for sync and async IO.
606                    assert_eq!(
607                        read_entire_mem_region(&sync_read_mem_region),
608                        read_entire_mem_region(&async_read_mem_region)
609                    );
610
611                    Ok(())
612                },
613            )
614            .unwrap();
615
616        // Clean up the memory.
617        free_mem_region(write_mem_region);
618        free_mem_region(sync_read_mem_region);
619        free_mem_region(async_read_mem_region);
620    }
621}