1use std::collections::{HashMap, HashSet};
34use std::fmt::Debug;
35use std::io::Read;
36use std::os::unix::io::{AsRawFd, RawFd};
37use std::os::unix::net::{UnixListener, UnixStream};
38
39use log::{debug, error, info, warn};
40use vmm_sys_util::epoll::{ControlOperation, Epoll, EpollEvent, EventSet};
41
42use super::super::csm::ConnState;
43use super::super::defs::uapi;
44use super::super::{VsockBackend, VsockChannel, VsockEpollListener, VsockError};
45use super::muxer_killq::MuxerKillQ;
46use super::muxer_rxq::MuxerRxQ;
47use super::{MuxerConnection, VsockUnixBackendError, defs};
48use crate::devices::virtio::vsock::metrics::METRICS;
49use crate::devices::virtio::vsock::packet::{VsockPacketRx, VsockPacketTx};
50use crate::logger::IncMetric;
51
52#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
55pub struct ConnMapKey {
56 local_port: u32,
57 peer_port: u32,
58}
59
60#[derive(Clone, Copy, Debug)]
62pub enum MuxerRx {
63 ConnRx(ConnMapKey),
65 RstPkt { local_port: u32, peer_port: u32 },
67}
68
69#[derive(Debug)]
71enum EpollListener {
72 Connection { key: ConnMapKey, evset: EventSet },
76 HostSock,
78 LocalStream(UnixStream),
81}
82
83#[derive(Debug)]
85pub struct VsockMuxer {
86 cid: u64,
88 conn_map: HashMap<ConnMapKey, MuxerConnection>,
90 listener_map: HashMap<RawFd, EpollListener>,
92 rxq: MuxerRxQ,
97 killq: MuxerKillQ,
99 host_sock: UnixListener,
101 pub(crate) host_sock_path: String,
104 epoll: Epoll,
106 local_port_set: HashSet<u32>,
109 local_port_last: u32,
111}
112
113impl VsockChannel for VsockMuxer {
114 fn recv_pkt(&mut self, pkt: &mut VsockPacketRx) -> Result<(), VsockError> {
120 if self.rxq.is_empty() && !self.rxq.is_synced() {
125 self.rxq = MuxerRxQ::from_conn_map(&self.conn_map);
126 }
127
128 while let Some(rx) = self.rxq.peek() {
129 let res = match rx {
130 MuxerRx::RstPkt {
132 local_port,
133 peer_port,
134 } => {
135 pkt.hdr
136 .set_op(uapi::VSOCK_OP_RST)
137 .set_src_cid(uapi::VSOCK_HOST_CID)
138 .set_dst_cid(self.cid)
139 .set_src_port(local_port)
140 .set_dst_port(peer_port)
141 .set_len(0)
142 .set_type(uapi::VSOCK_TYPE_STREAM)
143 .set_flags(0)
144 .set_buf_alloc(0)
145 .set_fwd_cnt(0);
146 self.rxq.pop().unwrap();
147 return Ok(());
148 }
149
150 MuxerRx::ConnRx(key) => {
153 let mut conn_res = Err(VsockError::NoData);
154 let mut do_pop = true;
155 self.apply_conn_mutation(key, |conn| {
156 conn_res = conn.recv_pkt(pkt);
157 do_pop = !conn.has_pending_rx();
158 });
159 if do_pop {
160 self.rxq.pop().unwrap();
161 }
162 conn_res
163 }
164 };
165
166 if res.is_ok() {
167 if pkt.hdr.op() == uapi::VSOCK_OP_RST {
171 self.remove_connection(ConnMapKey {
172 local_port: pkt.hdr.src_port(),
173 peer_port: pkt.hdr.dst_port(),
174 });
175 }
176
177 debug!("vsock muxer: RX pkt: {:?}", pkt.hdr);
178 return Ok(());
179 }
180 }
181
182 Err(VsockError::NoData)
183 }
184
185 fn send_pkt(&mut self, pkt: &VsockPacketTx) -> Result<(), VsockError> {
194 let conn_key = ConnMapKey {
195 local_port: pkt.hdr.dst_port(),
196 peer_port: pkt.hdr.src_port(),
197 };
198
199 debug!(
200 "vsock: muxer.send[rxq.len={}]: {:?}",
201 self.rxq.len(),
202 pkt.hdr
203 );
204
205 if pkt.hdr.type_() != uapi::VSOCK_TYPE_STREAM {
208 self.enq_rst(pkt.hdr.dst_port(), pkt.hdr.src_port());
209 return Ok(());
210 }
211
212 if pkt.hdr.dst_cid() != uapi::VSOCK_HOST_CID {
215 info!(
216 "vsock: dropping guest packet for unknown CID: {:?}",
217 pkt.hdr
218 );
219 return Ok(());
220 }
221
222 if !self.conn_map.contains_key(&conn_key) {
223 if pkt.hdr.op() == uapi::VSOCK_OP_REQUEST {
227 self.handle_peer_request_pkt(pkt);
229 } else {
230 self.enq_rst(pkt.hdr.dst_port(), pkt.hdr.src_port());
232 }
233 return Ok(());
234 }
235
236 if pkt.hdr.op() == uapi::VSOCK_OP_RST {
240 self.remove_connection(conn_key);
241 return Ok(());
242 }
243
244 let mut res: Result<(), VsockError> = Ok(());
246 self.apply_conn_mutation(conn_key, |conn| {
247 res = conn.send_pkt(pkt);
248 });
249
250 res
251 }
252
253 fn has_pending_rx(&self) -> bool {
256 !self.rxq.is_empty() || !self.rxq.is_synced()
257 }
258}
259
260impl AsRawFd for VsockMuxer {
261 fn as_raw_fd(&self) -> RawFd {
266 self.epoll.as_raw_fd()
267 }
268}
269
270impl VsockEpollListener for VsockMuxer {
271 fn get_polled_evset(&self) -> EventSet {
276 EventSet::IN
277 }
278
279 fn notify(&mut self, _: EventSet) {
281 let mut epoll_events = vec![EpollEvent::new(EventSet::empty(), 0); 32];
282 match self.epoll.wait(0, epoll_events.as_mut_slice()) {
283 Ok(ev_cnt) => {
284 for ev in &epoll_events[0..ev_cnt] {
285 self.handle_event(
286 ev.fd(),
287 EventSet::from_bits(ev.events).unwrap(),
291 );
292 }
293 }
294 Err(err) => {
295 warn!("vsock: failed to consume muxer epoll event: {}", err);
296 METRICS.muxer_event_fails.inc();
297 }
298 }
299 }
300}
301
302impl VsockBackend for VsockMuxer {}
303
304impl VsockMuxer {
305 pub fn new(cid: u64, host_sock_path: String) -> Result<Self, VsockUnixBackendError> {
307 let host_sock = UnixListener::bind(&host_sock_path)
310 .and_then(|sock| sock.set_nonblocking(true).map(|_| sock))
311 .map_err(VsockUnixBackendError::UnixBind)?;
312
313 let mut muxer = Self {
314 cid,
315 host_sock,
316 host_sock_path,
317 epoll: Epoll::new().map_err(VsockUnixBackendError::EpollFdCreate)?,
318 rxq: MuxerRxQ::new(),
319 conn_map: HashMap::with_capacity(defs::MAX_CONNECTIONS),
320 listener_map: HashMap::with_capacity(defs::MAX_CONNECTIONS + 1),
321 killq: MuxerKillQ::new(),
322 local_port_last: (1u32 << 30) - 1,
323 local_port_set: HashSet::with_capacity(defs::MAX_CONNECTIONS),
324 };
325
326 muxer.add_listener(muxer.host_sock.as_raw_fd(), EpollListener::HostSock)?;
328 Ok(muxer)
329 }
330
331 pub fn host_sock_path(&self) -> &str {
333 &self.host_sock_path
334 }
335
336 fn handle_event(&mut self, fd: RawFd, event_set: EventSet) {
338 debug!(
339 "vsock: muxer processing event: fd={}, evset={:?}",
340 fd, event_set
341 );
342
343 match self.listener_map.get_mut(&fd) {
344 Some(EpollListener::Connection { key, evset: _ }) => {
347 let key_copy = *key;
348 self.apply_conn_mutation(key_copy, |conn| {
353 conn.notify(event_set);
354 });
355 }
356
357 Some(EpollListener::HostSock) => {
359 if self.conn_map.len() == defs::MAX_CONNECTIONS {
360 warn!("vsock: connection limit reached; refusing new host connection");
363 self.host_sock.accept().map(|_| 0).unwrap_or(0);
364 return;
365 }
366 self.host_sock
367 .accept()
368 .map_err(VsockUnixBackendError::UnixAccept)
369 .and_then(|(stream, _)| {
370 stream
371 .set_nonblocking(true)
372 .map(|_| stream)
373 .map_err(VsockUnixBackendError::UnixAccept)
374 })
375 .and_then(|stream| {
376 self.add_listener(stream.as_raw_fd(), EpollListener::LocalStream(stream))
381 })
382 .unwrap_or_else(|err| {
383 warn!("vsock: unable to accept local connection: {:?}", err);
384 });
385 }
386
387 Some(EpollListener::LocalStream(_)) => {
390 if let Some(EpollListener::LocalStream(mut stream)) = self.remove_listener(fd) {
391 Self::read_local_stream_port(&mut stream)
392 .map(|peer_port| (self.allocate_local_port(), peer_port))
393 .and_then(|(local_port, peer_port)| {
394 self.add_connection(
395 ConnMapKey {
396 local_port,
397 peer_port,
398 },
399 MuxerConnection::new_local_init(
400 stream,
401 uapi::VSOCK_HOST_CID,
402 self.cid,
403 local_port,
404 peer_port,
405 ),
406 )
407 })
408 .unwrap_or_else(|err| {
409 info!("vsock: error adding local-init connection: {:?}", err);
410 })
411 }
412 }
413
414 _ => {
415 info!(
416 "vsock: unexpected event: fd={:?}, evset={:?}",
417 fd, event_set
418 );
419 METRICS.muxer_event_fails.inc();
420 }
421 }
422 }
423
424 fn read_local_stream_port(stream: &mut UnixStream) -> Result<u32, VsockUnixBackendError> {
426 let mut buf = [0u8; 32];
427
428 const MIN_READ_LEN: usize = 10;
431
432 stream
434 .read_exact(&mut buf[..MIN_READ_LEN])
435 .map_err(VsockUnixBackendError::UnixRead)?;
436
437 let mut blen = MIN_READ_LEN;
441 while buf[blen - 1] != b'\n' && blen < buf.len() {
442 stream
443 .read_exact(&mut buf[blen..=blen])
444 .map_err(VsockUnixBackendError::UnixRead)?;
445 blen += 1;
446 }
447
448 let mut word_iter = std::str::from_utf8(&buf[..blen])
449 .map_err(|_| VsockUnixBackendError::InvalidPortRequest)?
450 .split_whitespace();
451
452 word_iter
453 .next()
454 .ok_or(VsockUnixBackendError::InvalidPortRequest)
455 .and_then(|word| {
456 if word.to_lowercase() == "connect" {
457 Ok(())
458 } else {
459 Err(VsockUnixBackendError::InvalidPortRequest)
460 }
461 })
462 .and_then(|_| {
463 word_iter
464 .next()
465 .ok_or(VsockUnixBackendError::InvalidPortRequest)
466 })
467 .and_then(|word| {
468 word.parse::<u32>()
469 .map_err(|_| VsockUnixBackendError::InvalidPortRequest)
470 })
471 .map_err(|_| VsockUnixBackendError::InvalidPortRequest)
472 }
473
474 fn add_connection(
476 &mut self,
477 key: ConnMapKey,
478 conn: MuxerConnection,
479 ) -> Result<(), VsockUnixBackendError> {
480 self.sweep_killq();
485
486 if self.conn_map.len() >= defs::MAX_CONNECTIONS {
487 info!(
488 "vsock: muxer connection limit reached ({})",
489 defs::MAX_CONNECTIONS
490 );
491 return Err(VsockUnixBackendError::TooManyConnections);
492 }
493
494 self.add_listener(
495 conn.as_raw_fd(),
496 EpollListener::Connection {
497 key,
498 evset: conn.get_polled_evset(),
499 },
500 )
501 .map(|_| {
502 if conn.has_pending_rx() {
503 self.rxq.push(MuxerRx::ConnRx(key));
507 }
508 self.conn_map.insert(key, conn);
509 METRICS.conns_added.inc();
510 })
511 }
512
513 fn remove_connection(&mut self, key: ConnMapKey) {
515 if let Some(conn) = self.conn_map.remove(&key) {
516 self.remove_listener(conn.as_raw_fd());
517 METRICS.conns_removed.inc();
518 }
519 self.free_local_port(key.local_port);
520 }
521
522 fn kill_connection(&mut self, key: ConnMapKey) {
526 let mut had_rx = false;
527 METRICS.conns_killed.inc();
528
529 self.conn_map.entry(key).and_modify(|conn| {
530 had_rx = conn.has_pending_rx();
531 conn.kill();
532 });
533 if !had_rx {
536 self.rxq.push(MuxerRx::ConnRx(key));
540 }
541 }
542
543 fn add_listener(
545 &mut self,
546 fd: RawFd,
547 listener: EpollListener,
548 ) -> Result<(), VsockUnixBackendError> {
549 let evset = match listener {
550 EpollListener::Connection { evset, .. } => evset,
551 EpollListener::LocalStream(_) => EventSet::IN,
552 EpollListener::HostSock => EventSet::IN,
553 };
554
555 self.epoll
556 .ctl(
557 ControlOperation::Add,
558 fd,
559 EpollEvent::new(evset, u64::try_from(fd).unwrap()),
560 )
561 .map(|_| {
562 self.listener_map.insert(fd, listener);
563 })
564 .map_err(VsockUnixBackendError::EpollAdd)?;
565
566 Ok(())
567 }
568
569 fn remove_listener(&mut self, fd: RawFd) -> Option<EpollListener> {
571 let maybe_listener = self.listener_map.remove(&fd);
572
573 if maybe_listener.is_some() {
574 self.epoll
575 .ctl(ControlOperation::Delete, fd, EpollEvent::default())
576 .unwrap_or_else(|err| {
577 warn!(
578 "vosck muxer: error removing epoll listener for fd {:?}: {:?}",
579 fd, err
580 );
581 });
582 }
583
584 maybe_listener
585 }
586
587 fn allocate_local_port(&mut self) -> u32 {
589 loop {
594 self.local_port_last = (self.local_port_last + 1) & !(1 << 31) | (1 << 30);
595 if self.local_port_set.insert(self.local_port_last) {
596 break;
597 }
598 }
599 self.local_port_last
600 }
601
602 fn free_local_port(&mut self, port: u32) {
604 self.local_port_set.remove(&port);
605 }
606
607 fn handle_peer_request_pkt(&mut self, pkt: &VsockPacketTx) {
614 let port_path = format!("{}_{}", self.host_sock_path, pkt.hdr.dst_port());
615
616 UnixStream::connect(port_path)
617 .and_then(|stream| stream.set_nonblocking(true).map(|_| stream))
618 .map_err(VsockUnixBackendError::UnixConnect)
619 .and_then(|stream| {
620 self.add_connection(
621 ConnMapKey {
622 local_port: pkt.hdr.dst_port(),
623 peer_port: pkt.hdr.src_port(),
624 },
625 MuxerConnection::new_peer_init(
626 stream,
627 uapi::VSOCK_HOST_CID,
628 self.cid,
629 pkt.hdr.dst_port(),
630 pkt.hdr.src_port(),
631 pkt.hdr.buf_alloc(),
632 ),
633 )
634 })
635 .unwrap_or_else(|_| self.enq_rst(pkt.hdr.dst_port(), pkt.hdr.src_port()));
636 }
637
638 fn apply_conn_mutation<F>(&mut self, key: ConnMapKey, mut_fn: F)
646 where
647 F: FnOnce(&mut MuxerConnection),
648 {
649 if let Some(conn) = self.conn_map.get_mut(&key) {
650 let had_rx = conn.has_pending_rx();
651 let was_expiring = conn.will_expire();
652 let prev_state = conn.state();
653
654 mut_fn(conn);
655
656 if prev_state == ConnState::LocalInit && conn.state() == ConnState::Established {
659 let msg = format!("OK {}\n", key.local_port);
660 match conn.send_bytes_raw(msg.as_bytes()) {
661 Ok(written) if written == msg.len() => (),
662 Ok(_) => {
663 conn.kill();
666 warn!("vsock: unable to fully write connection ack msg.");
667 }
668 Err(err) => {
669 conn.kill();
670 warn!("vsock: unable to ack host connection: {:?}", err);
671 }
672 };
673 }
674
675 if !had_rx && conn.has_pending_rx() {
677 self.rxq.push(MuxerRx::ConnRx(key));
678 }
679
680 if !was_expiring && conn.will_expire() {
683 self.killq.push(key, conn.expiry().unwrap());
686 }
687
688 let fd = conn.as_raw_fd();
689 let new_evset = conn.get_polled_evset();
690 if new_evset.is_empty() {
691 self.remove_listener(fd);
694 return;
695 }
696 if let Some(EpollListener::Connection { evset, .. }) = self.listener_map.get_mut(&fd) {
697 if *evset != new_evset {
698 debug!(
701 "vsock: updating listener for (lp={}, pp={}): old={:?}, new={:?}",
702 key.local_port, key.peer_port, *evset, new_evset
703 );
704
705 *evset = new_evset;
706 self.epoll
707 .ctl(
708 ControlOperation::Modify,
709 fd,
710 EpollEvent::new(new_evset, u64::try_from(fd).unwrap()),
711 )
712 .unwrap_or_else(|err| {
713 self.kill_connection(key);
716 error!(
717 "vsock: error updating epoll listener for (lp={}, pp={}): {:?}",
718 key.local_port, key.peer_port, err
719 );
720 METRICS.muxer_event_fails.inc();
721 });
722 }
723 } else {
724 self.add_listener(
727 fd,
728 EpollListener::Connection {
729 key,
730 evset: new_evset,
731 },
732 )
733 .unwrap_or_else(|err| {
734 self.kill_connection(key);
735 error!(
736 "vsock: error updating epoll listener for (lp={}, pp={}): {:?}",
737 key.local_port, key.peer_port, err
738 );
739 METRICS.muxer_event_fails.inc();
740 });
741 }
742 }
743 }
744
745 fn sweep_killq(&mut self) {
748 while let Some(key) = self.killq.pop() {
749 let mut kill = false;
753 self.conn_map
754 .entry(key)
755 .and_modify(|conn| kill = conn.has_expired());
756 if kill {
757 self.kill_connection(key);
758 }
759 }
760
761 if self.killq.is_empty() && !self.killq.is_synced() {
762 self.killq = MuxerKillQ::from_conn_map(&self.conn_map);
763 METRICS.killq_resync.inc();
764 self.sweep_killq();
767 }
768 }
769
770 fn enq_rst(&mut self, local_port: u32, peer_port: u32) {
776 let pushed = self.rxq.push(MuxerRx::RstPkt {
777 local_port,
778 peer_port,
779 });
780 if !pushed {
781 warn!(
782 "vsock: muxer.rxq full; dropping RST packet for lp={}, pp={}",
783 local_port, peer_port
784 );
785 }
786 }
787}
788
789#[cfg(test)]
790mod tests {
791 use std::io::{Read, Write};
792 use std::ops::Drop;
793 use std::os::unix::net::{UnixListener, UnixStream};
794 use std::path::{Path, PathBuf};
795
796 use vmm_sys_util::tempfile::TempFile;
797
798 use super::super::super::csm::defs as csm_defs;
799 use super::*;
800 use crate::devices::virtio::vsock::device::{RXQ_INDEX, TXQ_INDEX};
801 use crate::devices::virtio::vsock::test_utils;
802 use crate::devices::virtio::vsock::test_utils::TestContext as VsockTestContext;
803
804 const PEER_CID: u64 = 3;
805 const PEER_BUF_ALLOC: u32 = 64 * 1024;
806
807 #[derive(Debug)]
808 struct MuxerTestContext {
809 _vsock_test_ctx: VsockTestContext,
810 rx_pkt: VsockPacketRx,
812 tx_pkt: VsockPacketTx,
813 muxer: VsockMuxer,
814 }
815
816 impl Drop for MuxerTestContext {
817 fn drop(&mut self) {
818 std::fs::remove_file(self.muxer.host_sock_path.as_str()).unwrap();
819 }
820 }
821
822 fn get_file(fprefix: &str) -> String {
824 let listener_path = TempFile::new_with_prefix(fprefix).unwrap();
825 listener_path
826 .as_path()
827 .as_os_str()
828 .to_str()
829 .unwrap()
830 .to_owned()
831 }
832
833 impl MuxerTestContext {
834 fn new(name: &str) -> Self {
835 let vsock_test_ctx = VsockTestContext::new();
836 let mut handler_ctx = vsock_test_ctx.create_event_handler_context();
837 let mut rx_pkt = VsockPacketRx::new().unwrap();
838 rx_pkt
839 .parse(
840 &vsock_test_ctx.mem,
841 handler_ctx.device.queues[RXQ_INDEX].pop().unwrap().unwrap(),
842 )
843 .unwrap();
844 let mut tx_pkt = VsockPacketTx::default();
845 tx_pkt
846 .parse(
847 &vsock_test_ctx.mem,
848 handler_ctx.device.queues[TXQ_INDEX].pop().unwrap().unwrap(),
849 )
850 .unwrap();
851
852 let muxer = VsockMuxer::new(PEER_CID, get_file(name)).unwrap();
853 Self {
854 _vsock_test_ctx: vsock_test_ctx,
855 rx_pkt,
856 tx_pkt,
857 muxer,
858 }
859 }
860
861 fn init_tx_pkt(&mut self, local_port: u32, peer_port: u32, op: u16) -> &mut VsockPacketTx {
862 self.tx_pkt
863 .hdr
864 .set_type(uapi::VSOCK_TYPE_STREAM)
865 .set_src_cid(PEER_CID)
866 .set_dst_cid(uapi::VSOCK_HOST_CID)
867 .set_src_port(peer_port)
868 .set_dst_port(local_port)
869 .set_op(op)
870 .set_buf_alloc(PEER_BUF_ALLOC);
871 &mut self.tx_pkt
872 }
873
874 fn init_data_tx_pkt(
875 &mut self,
876 local_port: u32,
877 peer_port: u32,
878 mut data: &[u8],
879 ) -> &mut VsockPacketTx {
880 assert!(data.len() <= self.tx_pkt.buf_size() as usize);
881 let tx_pkt = self.init_tx_pkt(local_port, peer_port, uapi::VSOCK_OP_RW);
882 tx_pkt.hdr.set_len(u32::try_from(data.len()).unwrap());
883
884 let data_len = data.len().try_into().unwrap(); self.rx_pkt
886 .read_at_offset_from(&mut data, 0, data_len)
887 .unwrap();
888 &mut self.tx_pkt
889 }
890
891 fn send(&mut self) {
892 self.muxer.send_pkt(&self.tx_pkt).unwrap();
893 }
894
895 fn recv(&mut self) {
896 self.muxer.recv_pkt(&mut self.rx_pkt).unwrap();
897 }
898
899 fn notify_muxer(&mut self) {
900 self.muxer.notify(EventSet::IN);
901 }
902
903 fn count_epoll_listeners(&self) -> (usize, usize) {
904 let mut local_lsn_count = 0usize;
905 let mut conn_lsn_count = 0usize;
906 for key in self.muxer.listener_map.values() {
907 match key {
908 EpollListener::LocalStream(_) => local_lsn_count += 1,
909 EpollListener::Connection { .. } => conn_lsn_count += 1,
910 _ => (),
911 };
912 }
913 (local_lsn_count, conn_lsn_count)
914 }
915
916 fn create_local_listener(&self, port: u32) -> LocalListener {
917 LocalListener::new(format!("{}_{}", self.muxer.host_sock_path, port))
918 }
919
920 fn local_connect(&mut self, peer_port: u32) -> (UnixStream, u32) {
921 let (init_local_lsn_count, init_conn_lsn_count) = self.count_epoll_listeners();
922
923 let mut stream = UnixStream::connect(self.muxer.host_sock_path.clone()).unwrap();
924 stream.set_nonblocking(true).unwrap();
925 self.notify_muxer();
928
929 let (local_lsn_count, _) = self.count_epoll_listeners();
932 assert_eq!(local_lsn_count, init_local_lsn_count + 1);
933
934 let buf = format!("CONNECT {}\n", peer_port);
935 stream.write_all(buf.as_bytes()).unwrap();
936 self.notify_muxer();
939
940 let (local_lsn_count, conn_lsn_count) = self.count_epoll_listeners();
943 assert_eq!(local_lsn_count, init_local_lsn_count);
944 assert_eq!(conn_lsn_count, init_conn_lsn_count + 1);
945
946 let local_port = self.muxer.local_port_last;
949 let key = ConnMapKey {
950 local_port,
951 peer_port,
952 };
953 assert!(self.muxer.conn_map.contains_key(&key));
954 assert!(self.muxer.local_port_set.contains(&local_port));
955
956 assert!(self.muxer.has_pending_rx());
958 self.recv();
959 assert_eq!(self.rx_pkt.hdr.op(), uapi::VSOCK_OP_REQUEST);
960 assert_eq!(self.rx_pkt.hdr.dst_port(), peer_port);
961 assert_eq!(self.rx_pkt.hdr.src_port(), local_port);
962
963 self.init_tx_pkt(local_port, peer_port, uapi::VSOCK_OP_RESPONSE);
964 self.send();
965
966 let mut buf = [0u8; 32];
967 let len = stream.read(&mut buf[..]).unwrap();
968 assert_eq!(&buf[..len], format!("OK {}\n", local_port).as_bytes());
969
970 (stream, local_port)
971 }
972 }
973
974 #[derive(Debug)]
975 struct LocalListener {
976 path: PathBuf,
977 sock: UnixListener,
978 }
979 impl LocalListener {
980 fn new<P: AsRef<Path> + Clone + Debug>(path: P) -> Self {
981 let path_buf = path.as_ref().to_path_buf();
982 let sock = UnixListener::bind(path).unwrap();
983 sock.set_nonblocking(true).unwrap();
984 Self {
985 path: path_buf,
986 sock,
987 }
988 }
989 fn accept(&mut self) -> UnixStream {
990 let (stream, _) = self.sock.accept().unwrap();
991 stream.set_nonblocking(true).unwrap();
992 stream
993 }
994 }
995 impl Drop for LocalListener {
996 fn drop(&mut self) {
997 std::fs::remove_file(&self.path).unwrap();
998 }
999 }
1000
1001 #[test]
1002 fn test_muxer_epoll_listener() {
1003 let ctx = MuxerTestContext::new("muxer_epoll_listener");
1004 assert_eq!(ctx.muxer.as_raw_fd(), ctx.muxer.epoll.as_raw_fd());
1005 assert_eq!(ctx.muxer.get_polled_evset(), EventSet::IN);
1006 }
1007
1008 #[test]
1009 fn test_muxer_epoll_listener_regression() {
1010 let mut ctx = MuxerTestContext::new("muxer_epoll_listener");
1011 ctx.local_connect(1025);
1012
1013 let (_, conn) = ctx.muxer.conn_map.iter().next().unwrap();
1014
1015 assert_eq!(conn.get_polled_evset(), EventSet::IN);
1016
1017 assert_eq!(METRICS.conn_event_fails.count(), 0);
1018
1019 let conn_eventfd = conn.as_raw_fd();
1020
1021 ctx.muxer.handle_event(conn_eventfd, EventSet::OUT);
1022
1023 assert_eq!(METRICS.conn_event_fails.count(), 1);
1024 }
1025
1026 #[test]
1027 fn test_bad_peer_pkt() {
1028 const LOCAL_PORT: u32 = 1026;
1029 const PEER_PORT: u32 = 1025;
1030 const SOCK_DGRAM: u16 = 2;
1031
1032 let mut ctx = MuxerTestContext::new("bad_peer_pkt");
1033 let tx_pkt = ctx.init_tx_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST);
1034 tx_pkt.hdr.set_type(SOCK_DGRAM);
1035 ctx.send();
1036
1037 assert!(ctx.muxer.has_pending_rx());
1040 ctx.recv();
1041 assert_eq!(ctx.rx_pkt.hdr.op(), uapi::VSOCK_OP_RST);
1042 assert_eq!(ctx.rx_pkt.hdr.src_cid(), uapi::VSOCK_HOST_CID);
1043 assert_eq!(ctx.rx_pkt.hdr.dst_cid(), PEER_CID);
1044 assert_eq!(ctx.rx_pkt.hdr.src_port(), LOCAL_PORT);
1045 assert_eq!(ctx.rx_pkt.hdr.dst_port(), PEER_PORT);
1046
1047 let bad_ops = [
1050 uapi::VSOCK_OP_RESPONSE,
1051 uapi::VSOCK_OP_CREDIT_REQUEST,
1052 uapi::VSOCK_OP_CREDIT_UPDATE,
1053 uapi::VSOCK_OP_SHUTDOWN,
1054 uapi::VSOCK_OP_RW,
1055 ];
1056 for op in bad_ops.iter() {
1057 ctx.init_tx_pkt(LOCAL_PORT, PEER_PORT, *op);
1058 ctx.send();
1059 assert!(ctx.muxer.has_pending_rx());
1060 ctx.recv();
1061 assert_eq!(ctx.rx_pkt.hdr.op(), uapi::VSOCK_OP_RST);
1062 assert_eq!(ctx.rx_pkt.hdr.src_port(), LOCAL_PORT);
1063 assert_eq!(ctx.rx_pkt.hdr.dst_port(), PEER_PORT);
1064 }
1065
1066 assert!(!ctx.muxer.has_pending_rx());
1068 let tx_pkt = ctx.init_tx_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST);
1069 tx_pkt.hdr.set_dst_cid(uapi::VSOCK_HOST_CID + 1);
1070 ctx.send();
1071 assert!(!ctx.muxer.has_pending_rx());
1072 }
1073
1074 #[test]
1075 fn test_peer_connection() {
1076 const LOCAL_PORT: u32 = 1026;
1077 const PEER_PORT: u32 = 1025;
1078
1079 let mut ctx = MuxerTestContext::new("peer_connection");
1080
1081 ctx.init_tx_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST);
1083 ctx.send();
1084 ctx.recv();
1085 assert_eq!(ctx.rx_pkt.hdr.op(), uapi::VSOCK_OP_RST);
1086 assert_eq!(ctx.rx_pkt.hdr.len(), 0);
1087 assert_eq!(ctx.rx_pkt.hdr.src_cid(), uapi::VSOCK_HOST_CID);
1088 assert_eq!(ctx.rx_pkt.hdr.dst_cid(), PEER_CID);
1089 assert_eq!(ctx.rx_pkt.hdr.src_port(), LOCAL_PORT);
1090 assert_eq!(ctx.rx_pkt.hdr.dst_port(), PEER_PORT);
1091
1092 let mut listener = ctx.create_local_listener(LOCAL_PORT);
1094 ctx.init_tx_pkt(LOCAL_PORT, PEER_PORT, uapi::VSOCK_OP_REQUEST);
1095 ctx.send();
1096 assert_eq!(ctx.muxer.conn_map.len(), 1);
1097 let mut stream = listener.accept();
1098 ctx.recv();
1099 assert_eq!(ctx.rx_pkt.hdr.op(), uapi::VSOCK_OP_RESPONSE);
1100 assert_eq!(ctx.rx_pkt.hdr.len(), 0);
1101 assert_eq!(ctx.rx_pkt.hdr.src_cid(), uapi::VSOCK_HOST_CID);
1102 assert_eq!(ctx.rx_pkt.hdr.dst_cid(), PEER_CID);
1103 assert_eq!(ctx.rx_pkt.hdr.src_port(), LOCAL_PORT);
1104 assert_eq!(ctx.rx_pkt.hdr.dst_port(), PEER_PORT);
1105 let key = ConnMapKey {
1106 local_port: LOCAL_PORT,
1107 peer_port: PEER_PORT,
1108 };
1109 assert!(ctx.muxer.conn_map.contains_key(&key));
1110
1111 let data = [1, 2, 3, 4];
1113 ctx.init_data_tx_pkt(LOCAL_PORT, PEER_PORT, &data);
1114 ctx.send();
1115 let mut buf = vec![0; data.len()];
1116 stream.read_exact(buf.as_mut_slice()).unwrap();
1117 assert_eq!(buf.as_slice(), data);
1118
1119 let data = [5u8, 6, 7, 8];
1121 stream.write_all(&data).unwrap();
1122
1123 ctx.notify_muxer();
1127 assert!(ctx.muxer.has_pending_rx());
1130 ctx.recv();
1131 assert_eq!(ctx.rx_pkt.hdr.op(), uapi::VSOCK_OP_RW);
1132 assert_eq!(ctx.rx_pkt.hdr.src_port(), LOCAL_PORT);
1133 assert_eq!(ctx.rx_pkt.hdr.dst_port(), PEER_PORT);
1134
1135 let buf = test_utils::read_packet_data(&ctx.tx_pkt, 4);
1136 assert_eq!(&buf, &data);
1137
1138 assert!(!ctx.muxer.has_pending_rx());
1139 }
1140
1141 #[test]
1142 fn test_local_connection() {
1143 let mut ctx = MuxerTestContext::new("local_connection");
1145 let peer_port = 1025;
1146 let (mut stream, local_port) = ctx.local_connect(peer_port);
1147
1148 let data = [1, 2, 3, 4];
1149 ctx.init_data_tx_pkt(local_port, peer_port, &data);
1150 ctx.send();
1151
1152 let mut buf = vec![0u8; data.len()];
1153 stream.read_exact(buf.as_mut_slice()).unwrap();
1154 assert_eq!(buf.as_slice(), &data);
1155
1156 let mut ctx = MuxerTestContext::new("local_connection");
1158 let peer_port = 1025;
1159 let (mut stream, local_port) = ctx.local_connect(peer_port);
1160
1161 let data = [5, 6, 7, 8];
1162 stream.write_all(&data).unwrap();
1163 ctx.notify_muxer();
1164
1165 assert!(ctx.muxer.has_pending_rx());
1166 ctx.recv();
1167 assert_eq!(ctx.rx_pkt.hdr.op(), uapi::VSOCK_OP_RW);
1168 assert_eq!(ctx.rx_pkt.hdr.src_port(), local_port);
1169 assert_eq!(ctx.rx_pkt.hdr.dst_port(), peer_port);
1170
1171 let buf = test_utils::read_packet_data(&ctx.tx_pkt, 4);
1172 assert_eq!(&buf, &data);
1173 }
1174
1175 #[test]
1176 fn test_local_close() {
1177 let peer_port = 1025;
1178 let mut ctx = MuxerTestContext::new("local_close");
1179 let local_port;
1180 {
1181 let (_stream, local_port_) = ctx.local_connect(peer_port);
1182 local_port = local_port_;
1183 }
1184 ctx.notify_muxer();
1188 assert!(ctx.muxer.has_pending_rx());
1189 ctx.recv();
1190 assert_eq!(ctx.rx_pkt.hdr.op(), uapi::VSOCK_OP_SHUTDOWN);
1191 assert_ne!(ctx.rx_pkt.hdr.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_SEND, 0);
1192 assert_ne!(ctx.rx_pkt.hdr.flags() & uapi::VSOCK_FLAGS_SHUTDOWN_RCV, 0);
1193 assert_eq!(ctx.rx_pkt.hdr.src_port(), local_port);
1194 assert_eq!(ctx.rx_pkt.hdr.dst_port(), peer_port);
1195
1196 ctx.init_tx_pkt(local_port, peer_port, uapi::VSOCK_OP_RST);
1199 ctx.send();
1200 let key = ConnMapKey {
1201 local_port,
1202 peer_port,
1203 };
1204 assert!(!ctx.muxer.conn_map.contains_key(&key));
1205 assert!(!ctx.muxer.local_port_set.contains(&local_port));
1206 }
1207
1208 #[test]
1209 fn test_peer_close() {
1210 let peer_port = 1025;
1211 let local_port = 1026;
1212 let mut ctx = MuxerTestContext::new("peer_close");
1213
1214 let mut sock = ctx.create_local_listener(local_port);
1215 ctx.init_tx_pkt(local_port, peer_port, uapi::VSOCK_OP_REQUEST);
1216 ctx.send();
1217 let mut stream = sock.accept();
1218
1219 assert!(ctx.muxer.has_pending_rx());
1220 ctx.recv();
1221 assert_eq!(ctx.rx_pkt.hdr.op(), uapi::VSOCK_OP_RESPONSE);
1222 assert_eq!(ctx.rx_pkt.hdr.src_port(), local_port);
1223 assert_eq!(ctx.rx_pkt.hdr.dst_port(), peer_port);
1224 let key = ConnMapKey {
1225 local_port,
1226 peer_port,
1227 };
1228 assert!(ctx.muxer.conn_map.contains_key(&key));
1229
1230 let tx_pkt = ctx.init_tx_pkt(local_port, peer_port, uapi::VSOCK_OP_SHUTDOWN);
1232 tx_pkt.hdr.set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_SEND);
1233 tx_pkt.hdr.set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_RCV);
1234 ctx.send();
1235
1236 assert!(ctx.muxer.has_pending_rx());
1238 ctx.recv();
1239 assert_eq!(ctx.rx_pkt.hdr.op(), uapi::VSOCK_OP_RST);
1240 assert_eq!(ctx.rx_pkt.hdr.src_port(), local_port);
1241 assert_eq!(ctx.rx_pkt.hdr.dst_port(), peer_port);
1242 let key = ConnMapKey {
1243 local_port,
1244 peer_port,
1245 };
1246 assert!(!ctx.muxer.conn_map.contains_key(&key));
1247
1248 let mut buf = vec![0u8; 16];
1250 assert_eq!(stream.read(buf.as_mut_slice()).unwrap(), 0);
1251 }
1252
1253 #[test]
1254 fn test_muxer_rxq() {
1255 let mut ctx = MuxerTestContext::new("muxer_rxq");
1256 let local_port = 1026;
1257 let peer_port_first = 1025;
1258 let mut listener = ctx.create_local_listener(local_port);
1259 let mut streams: Vec<UnixStream> = Vec::new();
1260
1261 for peer_port in peer_port_first..peer_port_first + defs::MUXER_RXQ_SIZE {
1262 ctx.init_tx_pkt(local_port, peer_port, uapi::VSOCK_OP_REQUEST);
1263 ctx.send();
1264 streams.push(listener.accept());
1265 }
1266
1267 assert!(ctx.muxer.rxq.is_synced());
1270
1271 ctx.init_tx_pkt(
1273 local_port,
1274 peer_port_first + defs::MUXER_RXQ_SIZE,
1275 uapi::VSOCK_OP_REQUEST,
1276 );
1277 ctx.send();
1278 assert!(!ctx.muxer.rxq.is_synced());
1279
1280 ctx.init_tx_pkt(local_port + 1, peer_port_first, uapi::VSOCK_OP_REQUEST);
1284 ctx.send();
1285
1286 for peer_port in peer_port_first..peer_port_first + defs::MUXER_RXQ_SIZE - 1 {
1287 ctx.recv();
1288 assert_eq!(ctx.rx_pkt.hdr.op(), uapi::VSOCK_OP_RESPONSE);
1289 assert_eq!(ctx.rx_pkt.hdr.dst_port(), peer_port);
1292 }
1293 assert_eq!(ctx.muxer.rxq.len(), 1);
1295 ctx.recv();
1296 assert_eq!(ctx.rx_pkt.hdr.op(), uapi::VSOCK_OP_RST);
1297
1298 assert!(ctx.muxer.rxq.is_empty());
1301 assert!(!ctx.muxer.rxq.is_synced());
1302 assert!(ctx.muxer.has_pending_rx());
1303
1304 ctx.recv();
1309 assert!(ctx.muxer.rxq.is_synced());
1310 assert_eq!(ctx.rx_pkt.hdr.op(), uapi::VSOCK_OP_RESPONSE);
1311
1312 assert!(ctx.muxer.has_pending_rx());
1313 ctx.recv();
1314 assert_eq!(ctx.rx_pkt.hdr.op(), uapi::VSOCK_OP_RESPONSE);
1315 }
1316
1317 #[test]
1318 fn test_muxer_killq() {
1319 let mut ctx = MuxerTestContext::new("muxer_killq");
1320 let local_port = 1026;
1321 let peer_port_first = 1025;
1322 let peer_port_last = peer_port_first + defs::MUXER_KILLQ_SIZE;
1323 let mut listener = ctx.create_local_listener(local_port);
1324
1325 let conns_added = METRICS.conns_added.count();
1327 let conns_killed = METRICS.conns_killed.count();
1328 let conns_removed = METRICS.conns_removed.count();
1329 let killq_resync = METRICS.killq_resync.count();
1330
1331 for peer_port in peer_port_first..=peer_port_last {
1332 ctx.init_tx_pkt(local_port, peer_port, uapi::VSOCK_OP_REQUEST);
1333 ctx.send();
1334 ctx.notify_muxer();
1335 ctx.recv();
1336 assert_eq!(ctx.rx_pkt.hdr.op(), uapi::VSOCK_OP_RESPONSE);
1337 assert_eq!(ctx.rx_pkt.hdr.src_port(), local_port);
1338 assert_eq!(ctx.rx_pkt.hdr.dst_port(), peer_port);
1339 {
1340 let _stream = listener.accept();
1341 }
1342 ctx.notify_muxer();
1343 ctx.recv();
1344 assert_eq!(ctx.rx_pkt.hdr.op(), uapi::VSOCK_OP_SHUTDOWN);
1345 assert_eq!(ctx.rx_pkt.hdr.src_port(), local_port);
1346 assert_eq!(ctx.rx_pkt.hdr.dst_port(), peer_port);
1347 assert_eq!(
1350 ctx.muxer.killq.is_synced(),
1351 peer_port < peer_port_first + defs::MUXER_KILLQ_SIZE
1352 );
1353 }
1354
1355 assert!(!ctx.muxer.killq.is_synced());
1356 assert!(!ctx.muxer.has_pending_rx());
1357
1358 std::thread::sleep(std::time::Duration::from_millis(
1360 csm_defs::CONN_SHUTDOWN_TIMEOUT_MS,
1361 ));
1362
1363 ctx.init_tx_pkt(local_port, peer_port_last + 1, uapi::VSOCK_OP_REQUEST);
1365 ctx.send();
1366
1367 assert_eq!(
1371 METRICS.conns_added.count(),
1372 conns_added + u64::from(defs::MUXER_KILLQ_SIZE) + 2
1373 );
1374 assert_eq!(
1376 METRICS.conns_killed.count(),
1377 conns_killed + u64::from(defs::MUXER_KILLQ_SIZE)
1378 );
1379 assert_eq!(METRICS.conns_removed.count(), conns_removed);
1381
1382 assert_eq!(METRICS.killq_resync.count(), killq_resync + 1);
1383 assert!(ctx.muxer.killq.is_synced());
1386 assert!(ctx.muxer.has_pending_rx());
1387 for _p in peer_port_first..peer_port_last {
1390 ctx.recv();
1391 assert_eq!(ctx.rx_pkt.hdr.op(), uapi::VSOCK_OP_RST);
1392 assert_eq!(ctx.rx_pkt.hdr.src_port(), local_port);
1393 }
1394
1395 assert_eq!(
1397 METRICS.conns_removed.count(),
1398 conns_removed + u64::from(defs::MUXER_KILLQ_SIZE)
1399 );
1400
1401 ctx.recv();
1404 assert_eq!(ctx.rx_pkt.hdr.op(), uapi::VSOCK_OP_RESPONSE);
1405 assert_eq!(ctx.rx_pkt.hdr.dst_port(), peer_port_last + 1);
1406
1407 assert!(!ctx.muxer.has_pending_rx());
1408 }
1409
1410 #[test]
1411 fn test_regression_handshake() {
1412 let mut ctx = MuxerTestContext::new("regression_handshake");
1416 let peer_port = 1025;
1417
1418 let (_, local_port) = ctx.local_connect(peer_port);
1420
1421 let key = ConnMapKey {
1423 local_port,
1424 peer_port,
1425 };
1426 let conn = ctx.muxer.conn_map.get_mut(&key).unwrap();
1427
1428 assert_eq!(conn.fwd_cnt().0, 0);
1430 }
1431
1432 #[test]
1433 fn test_regression_rxq_pop() {
1434 let mut ctx = MuxerTestContext::new("regression_rxq_pop");
1439 let peer_port = 1025;
1440 let (mut stream, local_port) = ctx.local_connect(peer_port);
1441
1442 let data = [5u8, 6, 7, 8];
1444 stream.write_all(&data).unwrap();
1445 ctx.notify_muxer();
1446
1447 let key = ConnMapKey {
1449 local_port,
1450 peer_port,
1451 };
1452 let conn = ctx.muxer.conn_map.get_mut(&key).unwrap();
1453
1454 conn.insert_credit_update();
1456
1457 assert!(ctx.muxer.has_pending_rx());
1460 ctx.recv();
1461 assert!(ctx.muxer.has_pending_rx());
1462 ctx.recv();
1463
1464 assert!(!ctx.muxer.has_pending_rx());
1467 }
1468
1469 #[test]
1470 fn test_vsock_basic_metrics() {
1471 let mut tx_packets_count = METRICS.tx_packets_count.count();
1473 let mut rx_packets_count = METRICS.rx_packets_count.count();
1474
1475 let tx_bytes_count = METRICS.tx_bytes_count.count();
1476 let rx_bytes_count = METRICS.rx_bytes_count.count();
1477
1478 let conns_added = METRICS.conns_added.count();
1479 let conns_removed = METRICS.conns_removed.count();
1480
1481 let mut ctx = MuxerTestContext::new("vsock_basic_metrics");
1483 let peer_port = 1025;
1484 let (mut stream, local_port) = ctx.local_connect(peer_port);
1485
1486 assert_eq!(METRICS.tx_bytes_count.count(), tx_bytes_count);
1489
1490 assert_eq!(METRICS.tx_packets_count.count(), tx_packets_count + 1);
1492 tx_packets_count = METRICS.tx_packets_count.count();
1493
1494 assert_eq!(METRICS.rx_packets_count.count(), rx_packets_count + 1);
1496 rx_packets_count = METRICS.rx_packets_count.count();
1497
1498 assert_eq!(METRICS.conns_added.count(), conns_added + 1);
1500
1501 let data = [1, 2, 3, 4];
1503 ctx.init_data_tx_pkt(local_port, peer_port, &data);
1504 ctx.send();
1505
1506 assert_eq!(
1508 METRICS.tx_bytes_count.count(),
1509 tx_bytes_count + data.len() as u64
1510 );
1511
1512 assert_eq!(METRICS.tx_packets_count.count(), tx_packets_count + 1);
1514
1515 let data = [1, 2, 3, 4, 5, 6];
1517 stream.write_all(&data).unwrap();
1518 ctx.notify_muxer();
1519 ctx.recv();
1520
1521 assert_eq!(METRICS.rx_packets_count.count(), rx_packets_count + 1);
1523
1524 assert_eq!(
1526 METRICS.rx_bytes_count.count(),
1527 rx_bytes_count + data.len() as u64
1528 );
1529
1530 ctx.init_tx_pkt(local_port, peer_port, uapi::VSOCK_OP_RST);
1532 ctx.send();
1533
1534 assert_eq!(METRICS.conns_removed.count(), conns_removed + 1);
1536 }
1537}