1mod generated;
5pub mod operation;
6mod probe;
7mod queue;
8pub mod restriction;
9
10use std::collections::HashSet;
11use std::fmt::Debug;
12use std::fs::File;
13use std::io::Error as IOError;
14use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
15
16use generated::io_uring_params;
17use operation::{Cqe, FixedFd, OpCode, Operation};
18use probe::{PROBE_LEN, ProbeWrapper};
19pub use queue::completion::CQueueError;
20use queue::completion::CompletionQueue;
21pub use queue::submission::SQueueError;
22use queue::submission::SubmissionQueue;
23use restriction::Restriction;
24use vmm_sys_util::syscall::SyscallReturnCode;
25
26use crate::io_uring::generated::io_uring_register_op;
27
28const REQUIRED_OPS: [OpCode; 2] = [OpCode::Read, OpCode::Write];
30const IORING_MAX_FIXED_FILES: usize = 1 << 15;
32
33#[derive(Debug, thiserror::Error, displaydoc::Display)]
34pub enum IoUringError {
36 CQueue(CQueueError),
38 Enable(IOError),
40 Fam(vmm_sys_util::fam::Error),
42 FullCQueue,
44 InvalidFixedFd(FixedFd),
46 NoRegisteredFds,
48 Probe(IOError),
50 RegisterEventfd(IOError),
52 RegisterFile(IOError),
54 RegisterFileLimitExceeded,
56 RegisterRestrictions(IOError),
58 Setup(IOError),
60 SQueue(SQueueError),
62 UnsupportedFeature(&'static str),
64 UnsupportedOperation(&'static str),
66}
67
68impl IoUringError {
69 pub fn is_throttling_err(&self) -> bool {
71 matches!(
72 self,
73 Self::FullCQueue | Self::SQueue(SQueueError::FullQueue)
74 )
75 }
76}
77
78#[derive(Debug)]
80pub struct IoUring<T> {
81 registered_fds_count: u32,
82 squeue: SubmissionQueue,
83 cqueue: CompletionQueue,
84 fd: File,
89
90 num_ops: u32,
93 slab: slab::Slab<T>,
94}
95
96impl<T: Debug> IoUring<T> {
97 pub fn new(
107 num_entries: u32,
108 files: Vec<&File>,
109 restrictions: Vec<Restriction>,
110 eventfd: Option<RawFd>,
111 ) -> Result<Self, IoUringError> {
112 let mut params = io_uring_params {
113 flags: generated::IORING_SETUP_R_DISABLED,
115
116 ..Default::default()
117 };
118
119 let fd = SyscallReturnCode(unsafe {
121 libc::syscall(
122 libc::SYS_io_uring_setup,
123 num_entries,
124 &mut params as *mut io_uring_params,
125 )
126 })
127 .into_result()
128 .map_err(IoUringError::Setup)?;
129 let fd = RawFd::try_from(fd).unwrap();
131
132 let file = unsafe { File::from_raw_fd(fd) };
134
135 Self::check_features(params)?;
136
137 let squeue = SubmissionQueue::new(fd, ¶ms).map_err(IoUringError::SQueue)?;
138 let cqueue = CompletionQueue::new(fd, ¶ms).map_err(IoUringError::CQueue)?;
139 let slab =
140 slab::Slab::with_capacity(params.sq_entries as usize + params.cq_entries as usize);
141
142 let mut instance = Self {
143 squeue,
144 cqueue,
145 fd: file,
146 registered_fds_count: 0,
147 num_ops: 0,
148 slab,
149 };
150
151 instance.check_operations()?;
152
153 if let Some(eventfd) = eventfd {
154 instance.register_eventfd(eventfd)?;
155 }
156
157 instance.register_restrictions(restrictions)?;
158
159 instance.register_files(files)?;
160
161 instance.enable()?;
162
163 Ok(instance)
164 }
165
166 pub fn push(&mut self, op: Operation<T>) -> Result<(), (IoUringError, T)> {
168 let fd = op.fd();
170 match self.registered_fds_count {
171 0 => Err((IoUringError::NoRegisteredFds, op.user_data)),
172 len if fd >= len => Err((IoUringError::InvalidFixedFd(fd), op.user_data)),
173 _ => {
174 if self.num_ops >= self.cqueue.count() {
175 return Err((IoUringError::FullCQueue, op.user_data));
176 }
177 self.squeue
178 .push(op.into_sqe(&mut self.slab))
179 .inspect(|_| {
180 self.num_ops += 1;
182 })
183 .map_err(|(sqe_err, user_data_key)| -> (IoUringError, T) {
184 (
185 IoUringError::SQueue(sqe_err),
186 #[allow(clippy::cast_possible_truncation)]
197 self.slab.remove(user_data_key as usize),
198 )
199 })
200 }
201 }
202 }
203
204 pub fn pop(&mut self) -> Result<Option<Cqe<T>>, IoUringError> {
207 self.cqueue
208 .pop(&mut self.slab)
209 .map(|maybe_cqe| {
210 maybe_cqe.inspect(|_| {
211 self.num_ops = self.num_ops.saturating_sub(1);
214 })
215 })
216 .map_err(IoUringError::CQueue)
217 }
218
219 fn do_submit(&mut self, min_complete: u32) -> Result<u32, IoUringError> {
220 self.squeue
221 .submit(min_complete)
222 .map_err(IoUringError::SQueue)
223 }
224
225 pub fn submit(&mut self) -> Result<u32, IoUringError> {
227 self.do_submit(0)
228 }
229
230 pub fn submit_and_wait_all(&mut self) -> Result<u32, IoUringError> {
232 self.do_submit(self.num_ops)
233 }
234
235 pub fn pending_sqes(&self) -> Result<u32, IoUringError> {
237 self.squeue.pending().map_err(IoUringError::SQueue)
238 }
239
240 pub fn num_ops(&self) -> u32 {
243 self.num_ops
244 }
245
246 fn enable(&mut self) -> Result<(), IoUringError> {
247 SyscallReturnCode(unsafe {
249 libc::syscall(
250 libc::SYS_io_uring_register,
251 self.fd.as_raw_fd(),
252 io_uring_register_op::IORING_REGISTER_ENABLE_RINGS,
253 std::ptr::null::<libc::c_void>(),
254 0,
255 )
256 })
257 .into_empty_result()
258 .map_err(IoUringError::Enable)
259 }
260
261 fn register_files(&mut self, files: Vec<&File>) -> Result<(), IoUringError> {
262 if files.is_empty() {
263 return Ok(());
265 }
266
267 if (self.registered_fds_count as usize).saturating_add(files.len()) > IORING_MAX_FIXED_FILES
268 {
269 return Err(IoUringError::RegisterFileLimitExceeded);
270 }
271
272 SyscallReturnCode(unsafe {
274 libc::syscall(
275 libc::SYS_io_uring_register,
276 self.fd.as_raw_fd(),
277 io_uring_register_op::IORING_REGISTER_FILES,
278 files
279 .iter()
280 .map(|f| f.as_raw_fd())
281 .collect::<Vec<_>>()
282 .as_mut_slice()
283 .as_mut_ptr() as *const _,
284 files.len(),
285 )
286 })
287 .into_empty_result()
288 .map_err(IoUringError::RegisterFile)?;
289
290 self.registered_fds_count += u32::try_from(files.len()).unwrap();
292 Ok(())
293 }
294
295 fn register_eventfd(&self, fd: RawFd) -> Result<(), IoUringError> {
296 SyscallReturnCode(unsafe {
298 libc::syscall(
299 libc::SYS_io_uring_register,
300 self.fd.as_raw_fd(),
301 io_uring_register_op::IORING_REGISTER_EVENTFD,
302 (&fd) as *const _,
303 1,
304 )
305 })
306 .into_empty_result()
307 .map_err(IoUringError::RegisterEventfd)
308 }
309
310 fn register_restrictions(&self, restrictions: Vec<Restriction>) -> Result<(), IoUringError> {
311 if restrictions.is_empty() {
312 return Ok(());
314 }
315 SyscallReturnCode(unsafe {
317 libc::syscall(
318 libc::SYS_io_uring_register,
319 self.fd.as_raw_fd(),
320 io_uring_register_op::IORING_REGISTER_RESTRICTIONS,
321 restrictions
322 .iter()
323 .map(generated::io_uring_restriction::from)
324 .collect::<Vec<_>>()
325 .as_mut_slice()
326 .as_mut_ptr(),
327 restrictions.len(),
328 )
329 })
330 .into_empty_result()
331 .map_err(IoUringError::RegisterRestrictions)
332 }
333
334 fn check_features(params: io_uring_params) -> Result<(), IoUringError> {
335 if (params.features & generated::IORING_FEAT_NODROP) == 0 {
342 return Err(IoUringError::UnsupportedFeature("IORING_FEAT_NODROP"));
343 }
344
345 Ok(())
346 }
347
348 fn check_operations(&self) -> Result<(), IoUringError> {
349 let mut probes = ProbeWrapper::new(PROBE_LEN).map_err(IoUringError::Fam)?;
350
351 SyscallReturnCode(unsafe {
353 libc::syscall(
354 libc::SYS_io_uring_register,
355 self.fd.as_raw_fd(),
356 io_uring_register_op::IORING_REGISTER_PROBE,
357 probes.as_mut_fam_struct_ptr(),
358 PROBE_LEN,
359 )
360 })
361 .into_empty_result()
362 .map_err(IoUringError::Probe)?;
363
364 let supported_opcodes: HashSet<u8> = probes
365 .as_slice()
366 .iter()
367 .filter(|op| ((u32::from(op.flags)) & generated::IO_URING_OP_SUPPORTED) != 0)
368 .map(|op| op.op)
369 .collect();
370
371 for opcode in REQUIRED_OPS.iter() {
372 if !supported_opcodes.contains(&(*opcode as u8)) {
373 return Err(IoUringError::UnsupportedOperation((*opcode).into()));
374 }
375 }
376
377 Ok(())
378 }
379}
380
381#[cfg(test)]
382mod tests {
383 #![allow(clippy::undocumented_unsafe_blocks)]
384 use std::os::unix::fs::FileExt;
385
386 use proptest::prelude::*;
387 use proptest::strategy::Strategy;
388 use proptest::test_runner::{Config, TestRunner};
389 use vm_memory::VolatileMemory;
390 use vmm_sys_util::syscall::SyscallReturnCode;
391 use vmm_sys_util::tempfile::TempFile;
392
393 use super::*;
396 use crate::vstate::memory::{Bytes, MmapRegion};
397
398 fn drain_cqueue(ring: &mut IoUring<u32>) {
399 while let Some(entry) = ring.pop().unwrap() {
400 entry.result().unwrap();
401
402 let count = entry.result().unwrap();
404 let user_data = entry.user_data();
405 assert_eq!(count, user_data);
406 }
407 }
408
409 fn setup_mem_region(len: usize) -> MmapRegion {
410 const PROT: i32 = libc::PROT_READ | libc::PROT_WRITE;
411 const FLAGS: i32 = libc::MAP_ANONYMOUS | libc::MAP_PRIVATE;
412
413 let ptr = unsafe { libc::mmap(std::ptr::null_mut(), len, PROT, FLAGS, -1, 0) };
414
415 if (ptr as isize) < 0 {
416 panic!("Mmap failed with {}", std::io::Error::last_os_error());
417 }
418
419 unsafe {
420 MmapRegion::build_raw(ptr.cast::<u8>(), len, PROT, FLAGS).unwrap()
422 }
423 }
424
425 fn free_mem_region(region: MmapRegion) {
426 unsafe { libc::munmap(region.as_ptr().cast::<libc::c_void>(), region.len()) };
427 }
428
429 fn read_entire_mem_region(region: &MmapRegion) -> Vec<u8> {
430 let mut result = vec![0u8; region.len()];
431 let count = region.as_volatile_slice().read(&mut result[..], 0).unwrap();
432 assert_eq!(count, region.len());
433 result
434 }
435
436 #[allow(clippy::let_with_type_underscore)]
437 fn arbitrary_rw_operation(file_len: u32) -> impl Strategy<Value = Operation<u32>> {
438 (
439 0..2,
441 0u32..file_len,
443 )
444 .prop_flat_map(move |(op, len)| {
445 (
446 Just(op),
448 Just(len),
450 (0u32..(file_len - len)),
452 (0u32..(file_len - len)),
454 )
455 })
456 .prop_map(move |(op, len, off, mem_off)| {
457 let mut operation = match op {
460 0 => Operation::write(0, mem_off as usize, len, off.into(), len),
461 _ => Operation::read(0, mem_off as usize, len, off.into(), len),
462 };
463
464 operation.set_linked();
467 operation
468 })
469 }
470
471 #[test]
472 fn proptest_read_write_correctness() {
473 const FILE_LEN: u32 = 1024;
479 const OPS_COUNT: usize = 2000;
481 const RING_SIZE: u32 = 128;
482
483 let write_mem_region = setup_mem_region(FILE_LEN as usize);
485
486 let sync_read_mem_region = setup_mem_region(FILE_LEN as usize);
487
488 let async_read_mem_region = setup_mem_region(FILE_LEN as usize);
489
490 for i in 0..FILE_LEN {
492 write_mem_region
493 .as_volatile_slice()
494 .write_obj(u8::try_from(i % u32::from(u8::MAX)).unwrap(), i as usize)
495 .unwrap();
496 }
497
498 let init_contents = [0u8; FILE_LEN as usize];
500 let file_async = TempFile::new().unwrap().into_file();
501 file_async.write_all_at(&init_contents, 0).unwrap();
502
503 let file_sync = TempFile::new().unwrap().into_file();
504 file_sync.write_all_at(&init_contents, 0).unwrap();
505
506 let mut runner = TestRunner::new(Config {
509 #[cfg(target_arch = "x86_64")]
510 cases: 1000, #[cfg(target_arch = "aarch64")]
513 cases: 500,
514 ..Config::default()
515 });
516
517 runner
518 .run(
519 &proptest::collection::vec(arbitrary_rw_operation(FILE_LEN), OPS_COUNT),
520 |set| {
521 let mut ring =
522 IoUring::new(RING_SIZE, vec![&file_async], vec![], None).unwrap();
523
524 for mut operation in set {
525 let count = match operation.opcode {
527 OpCode::Write => u32::try_from(
528 SyscallReturnCode(unsafe {
529 libc::pwrite(
530 file_sync.as_raw_fd(),
531 write_mem_region.as_ptr().add(operation.addr.unwrap())
532 as *const libc::c_void,
533 operation.len.unwrap() as usize,
534 i64::try_from(operation.offset.unwrap()).unwrap(),
535 )
536 })
537 .into_result()
538 .unwrap(),
539 )
540 .unwrap(),
541 OpCode::Read => u32::try_from(
542 SyscallReturnCode(unsafe {
543 libc::pread(
544 file_sync.as_raw_fd(),
545 sync_read_mem_region
546 .as_ptr()
547 .add(operation.addr.unwrap())
548 .cast::<libc::c_void>(),
549 operation.len.unwrap() as usize,
550 i64::try_from(operation.offset.unwrap()).unwrap(),
551 )
552 })
553 .into_result()
554 .unwrap(),
555 )
556 .unwrap(),
557 _ => unreachable!(),
558 };
559
560 if count < operation.len.unwrap() {
561 panic!("Synchronous partial operation: {:?}", operation);
562 }
563
564 match operation.opcode {
568 OpCode::Write => {
569 operation.addr = Some(unsafe {
570 write_mem_region.as_ptr().add(operation.addr.unwrap()) as usize
571 })
572 }
573 OpCode::Read => {
574 operation.addr = Some(unsafe {
575 async_read_mem_region.as_ptr().add(operation.addr.unwrap())
576 as usize
577 })
578 }
579 _ => unreachable!(),
580 };
581
582 if ring.pending_sqes().unwrap() == RING_SIZE {
584 ring.submit_and_wait_all().unwrap();
585 drain_cqueue(&mut ring);
586 }
587 ring.push(operation).unwrap();
588 }
589
590 ring.submit_and_wait_all().unwrap();
592 drain_cqueue(&mut ring);
593
594 let mut async_result = [0u8; FILE_LEN as usize];
596 file_async.read_exact_at(&mut async_result, 0).unwrap();
597
598 let mut sync_result = [0u8; FILE_LEN as usize];
600 file_sync.read_exact_at(&mut sync_result, 0).unwrap();
601
602 assert_eq!(sync_result, async_result);
604
605 assert_eq!(
607 read_entire_mem_region(&sync_read_mem_region),
608 read_entire_mem_region(&async_read_mem_region)
609 );
610
611 Ok(())
612 },
613 )
614 .unwrap();
615
616 free_mem_region(write_mem_region);
618 free_mem_region(sync_read_mem_region);
619 free_mem_region(async_read_mem_region);
620 }
621}