tokio_dual_stack

Dual-stack TcpListener.
git clone https://git.philomathiclife.com/repos/tokio_dual_stack
Log | Files | Refs | README

lib.rs (21667B)


      1 //! [![git]](https://git.philomathiclife.com/tokio_dual_stack/log.html) [![crates-io]](https://crates.io/crates/tokio_dual_stack) [![docs-rs]](crate)
      2 //!
      3 //! [git]: https://git.philomathiclife.com/git_badge.svg
      4 //! [crates-io]: https://img.shields.io/badge/crates.io-fc8d62?style=for-the-badge&labelColor=555555&logo=rust
      5 //! [docs-rs]: https://img.shields.io/badge/docs.rs-66c2a5?style=for-the-badge&labelColor=555555&logo=docs.rs
      6 //!
      7 //! `tokio_dual_stack` is a library that adds a "dual-stack" [`TcpListener`].
      8 //!
      9 //! ## Why is this useful?
     10 //!
     11 //! Only certain platforms offer the ability for one socket to handle both IPv6 and IPv4 requests
     12 //! (e.g., OpenBSD does not). For the platforms that do, it is often dependent on runtime configuration
     13 //! (e.g., [`IPV6_V6ONLY`](https://www.man7.org/linux/man-pages/man7/ipv6.7.html)). Additionally those platforms
     14 //! that support it often require the "wildcard" IPv6 address to be used (i.e., `::`) which has the unfortunate
     15 //! consequence of preventing other services from using the same protocol port.
     16 //!
     17 //! There are a few ways to work around this issue. One is to deploy the same service twice: one that uses
     18 //! an IPv6 socket and the other that uses an IPv4 socket. This can complicate deployments (e.g., the application
     19 //! may not have been written with the expectation that multiple deployments could be running at the same time) in
     20 //! addition to using more resources. Another is for the application to manually handle each socket (e.g.,
     21 //! [`select`](https://docs.rs/tokio/latest/tokio/macro.select.html)/[`join`](https://docs.rs/tokio/latest/tokio/macro.join.html)
     22 //! each [`TcpListener::accept`]).
     23 //!
     24 //! [`DualStackTcpListener`] chooses an implementation similar to what the equivalent `select` would do while
     25 //! also ensuring that one socket does not "starve" another by ensuring each socket is fairly given an opportunity
     26 //! to `TcpListener::accept` a connection. This has the nice benefit of having a similar API to what a single
     27 //! `TcpListener` would have as well as having similar performance to a socket that does handle both IPv6 and
     28 //! IPv4 requests.
     29 #![deny(
     30     unknown_lints,
     31     future_incompatible,
     32     let_underscore,
     33     missing_docs,
     34     nonstandard_style,
     35     refining_impl_trait,
     36     rust_2018_compatibility,
     37     rust_2018_idioms,
     38     rust_2021_compatibility,
     39     rust_2024_compatibility,
     40     unsafe_code,
     41     unused,
     42     warnings,
     43     clippy::all,
     44     clippy::cargo,
     45     clippy::complexity,
     46     clippy::correctness,
     47     clippy::nursery,
     48     clippy::pedantic,
     49     clippy::perf,
     50     clippy::restriction,
     51     clippy::style,
     52     clippy::suspicious
     53 )]
     54 #![expect(
     55     clippy::arbitrary_source_item_ordering,
     56     clippy::blanket_clippy_restriction_lints,
     57     clippy::implicit_return,
     58     clippy::return_and_then,
     59     reason = "noisy, opinionated, and likely doesn't prevent bugs or improve APIs"
     60 )]
     61 use core::{
     62     future::Future,
     63     net::{SocketAddr, SocketAddrV4, SocketAddrV6},
     64     pin::Pin,
     65     sync::atomic::{AtomicBool, Ordering},
     66     task::{Context, Poll},
     67 };
     68 use pin_project_lite::pin_project;
     69 use std::io::{Error, ErrorKind, Result};
     70 use tokio::net::{self, TcpListener, TcpSocket, TcpStream, ToSocketAddrs};
     71 /// Prevents [`Sealed`] from being publicly implementable.
     72 mod private {
     73     /// Marker trait to prevent [`super::Tcp`] from being publicly implementable.
     74     pub trait Sealed {}
     75 }
     76 use private::Sealed;
     77 /// TCP "listener".
     78 ///
     79 /// This `trait` is sealed and cannot be implemented for types outside of `tokio_dual_stack`.
     80 ///
     81 /// This exists primarily as a way to define type constructors or polymorphic functions
     82 /// that can user either a [`TcpListener`] or [`DualStackTcpListener`].
     83 ///
     84 /// # Examples
     85 ///
     86 /// ```no_run
     87 /// # use core::convert::Infallible;
     88 /// # use tokio_dual_stack::Tcp;
     89 /// async fn main_loop<T: Tcp>(listener: T) -> Infallible {
     90 ///     loop {
     91 ///         match listener.accept().await {
     92 ///             Ok((_, socket)) => println!("Client socket: {socket}"),
     93 ///             Err(e) => println!("TCP connection failure: {e}"),
     94 ///         }
     95 ///     }
     96 /// }
     97 /// ```
     98 pub trait Tcp: Sealed + Sized {
     99     /// Creates a new TCP listener, which will be bound to the specified address(es).
    100     ///
    101     /// The returned listener is ready for accepting connections.
    102     ///
    103     /// Binding with a port number of 0 will request that the OS assigns a port to this listener.
    104     /// The port allocated can be queried via the `local_addr` method.
    105     ///
    106     /// The address type can be any implementor of the [`ToSocketAddrs`] trait. If `addr` yields
    107     /// multiple addresses, bind will be attempted with each of the addresses until one succeeds
    108     /// and returns the listener. If none of the addresses succeed in creating a listener, the
    109     /// error returned from the last attempt (the last address) is returned.
    110     ///
    111     /// This function sets the `SO_REUSEADDR` option on the socket.
    112     ///
    113     /// # Examples
    114     ///
    115     /// ```no_run
    116     /// # use core::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
    117     /// # use std::io::Result;
    118     /// # use tokio_dual_stack::{DualStackTcpListener, Tcp as _};
    119     /// #[tokio::main(flavor = "current_thread")]
    120     /// async fn main() -> Result<()> {
    121     ///     let listener = DualStackTcpListener::bind(
    122     ///         [
    123     ///             SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0)),
    124     ///             SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)),
    125     ///         ]
    126     ///         .as_slice(),
    127     ///     )
    128     ///     .await?;
    129     ///     Ok(())
    130     /// }
    131     /// ```
    132     fn bind<A: ToSocketAddrs>(addr: A) -> impl Future<Output = Result<Self>>;
    133     /// Accepts a new incoming connection from this listener.
    134     ///
    135     /// This function will yield once a new TCP connection is established. When established,
    136     /// the corresponding `TcpStream` and the remote peer’s address will be returned.
    137     ///
    138     /// # Cancel safety
    139     ///
    140     /// This method is cancel safe. If the method is used as the event in a
    141     /// [`tokio::select!`](https://docs.rs/tokio/latest/tokio/macro.select.html)
    142     /// statement and some other branch completes first, then it is guaranteed that no new
    143     /// connections were accepted by this method.
    144     ///
    145     /// # Examples
    146     ///
    147     /// ```no_run
    148     /// # use core::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
    149     /// # use std::io::Result;
    150     /// # use tokio_dual_stack::{DualStackTcpListener, Tcp as _};
    151     /// #[tokio::main(flavor = "current_thread")]
    152     /// async fn main() -> Result<()> {
    153     ///     match DualStackTcpListener::bind(
    154     ///         [
    155     ///             SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0)),
    156     ///             SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)),
    157     ///         ]
    158     ///         .as_slice(),
    159     ///     )
    160     ///     .await?.accept().await {
    161     ///         Ok((_, addr)) => println!("new client: {addr}"),
    162     ///         Err(e) => println!("couldn't get client: {e}"),
    163     ///     }
    164     ///     Ok(())
    165     /// }
    166     /// ```
    167     fn accept(&self) -> impl Future<Output = Result<(TcpStream, SocketAddr)>> + Send + Sync;
    168     /// Polls to accept a new incoming connection to this listener.
    169     ///
    170     /// If there is no connection to accept, `Poll::Pending` is returned and the current task will be notified by
    171     /// a waker. Note that on multiple calls to `poll_accept`, only the `Waker` from the `Context` passed to the
    172     /// most recent call is scheduled to receive a wakeup.
    173     fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<Result<(TcpStream, SocketAddr)>>;
    174 }
    175 impl Sealed for TcpListener {}
    176 impl Tcp for TcpListener {
    177     #[inline]
    178     fn bind<A: ToSocketAddrs>(addr: A) -> impl Future<Output = Result<Self>> {
    179         Self::bind(addr)
    180     }
    181     #[inline]
    182     fn accept(&self) -> impl Future<Output = Result<(TcpStream, SocketAddr)>> + Send + Sync {
    183         self.accept()
    184     }
    185     #[inline]
    186     fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<Result<(TcpStream, SocketAddr)>> {
    187         self.poll_accept(cx)
    188     }
    189 }
    190 /// "Dual-stack" TCP listener.
    191 ///
    192 /// IPv6 and IPv4 TCP listener.
    193 #[derive(Debug)]
    194 pub struct DualStackTcpListener {
    195     /// IPv6 TCP listener.
    196     ip6: TcpListener,
    197     /// IPv4 TCP listener.
    198     ip4: TcpListener,
    199     /// `true` iff [`Self::ip6::accept`] should be `poll`ed first; otherwise [`Self::ip4::accept`] is `poll`ed
    200     /// first.
    201     ///
    202     /// This exists to prevent one IP version from "starving" another. Each time [`Self::accept`] or
    203     /// [`Self::poll_accept`] is called, it's overwritten with the opposite `bool`.
    204     ///
    205     /// Note we could make this a `core::cell::Cell`; but for maximal flexibility and consistency with `TcpListener`,
    206     /// we use an `AtomicBool`. This among other things means `DualStackTcpListener` will implement `Sync`.
    207     ip6_first: AtomicBool,
    208 }
    209 impl DualStackTcpListener {
    210     /// Creates `Self` using the [`TcpListener`]s returned from [`TcpSocket::listen`].
    211     ///
    212     /// [`Self::bind`] is useful when the behavior of [`TcpListener::bind`] is sufficient; however if the underlying
    213     /// `TcpSocket`s need to be configured differently, then one must call this function instead.
    214     ///
    215     /// # Errors
    216     ///
    217     /// Errors iff [`TcpSocket::local_addr`] does for either socket, the underlying sockets use the same IP version,
    218     /// or [`TcpSocket::listen`] errors for either socket.
    219     ///
    220     /// Note on Windows-based platforms `TcpSocket::local_addr` will error if [`TcpSocket::bind`] was not called.
    221     ///
    222     /// # Examples
    223     ///
    224     /// ```no_run
    225     /// # use core::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
    226     /// # use std::io::Result;
    227     /// # use tokio_dual_stack::DualStackTcpListener;
    228     /// # use tokio::net::TcpSocket;
    229     /// #[tokio::main(flavor = "current_thread")]
    230     /// async fn main() -> Result<()> {
    231     ///     let ip6 = TcpSocket::new_v6()?;
    232     ///     ip6.bind(SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0)))?;
    233     ///     let ip4 = TcpSocket::new_v4()?;
    234     ///     ip4.bind(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)))?;
    235     ///     let listener = DualStackTcpListener::from_sockets((ip6, 1024), (ip4, 1024))?;
    236     ///     Ok(())
    237     /// }
    238     /// ```
    239     #[inline]
    240     pub fn from_sockets(
    241         (socket_1, backlog_1): (TcpSocket, u32),
    242         (socket_2, backlog_2): (TcpSocket, u32),
    243     ) -> Result<Self> {
    244         socket_1.local_addr().and_then(|sock| {
    245             socket_2.local_addr().and_then(|sock_2| {
    246                 if sock.is_ipv6() {
    247                     if sock_2.is_ipv4() {
    248                         socket_1.listen(backlog_1).and_then(|ip6| {
    249                             socket_2.listen(backlog_2).map(|ip4| Self {
    250                                 ip6,
    251                                 ip4,
    252                                 ip6_first: AtomicBool::new(true),
    253                             })
    254                         })
    255                     } else {
    256                         Err(Error::new(
    257                             ErrorKind::InvalidData,
    258                             "TcpSockets are the same IP version",
    259                         ))
    260                     }
    261                 } else if sock_2.is_ipv6() {
    262                     socket_1.listen(backlog_1).and_then(|ip4| {
    263                         socket_2.listen(backlog_2).map(|ip6| Self {
    264                             ip6,
    265                             ip4,
    266                             ip6_first: AtomicBool::new(true),
    267                         })
    268                     })
    269                 } else {
    270                     Err(Error::new(
    271                         ErrorKind::InvalidData,
    272                         "TcpSockets are the same IP version",
    273                     ))
    274                 }
    275             })
    276         })
    277     }
    278     /// Returns the local address of each socket that the listeners are bound to.
    279     ///
    280     /// This can be useful, for example, when binding to port 0 to figure out which port was actually bound.
    281     ///
    282     /// # Errors
    283     ///
    284     /// Errors iff [`TcpListener::local_addr`] does for either listener.
    285     ///
    286     /// # Examples
    287     ///
    288     /// ```no_run
    289     /// # use core::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
    290     /// # use std::io::Result;
    291     /// # use tokio_dual_stack::{DualStackTcpListener, Tcp as _};
    292     /// #[tokio::main(flavor = "current_thread")]
    293     /// async fn main() -> Result<()> {
    294     ///     let ip6 = SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0);
    295     ///     let ip4 = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080);
    296     ///     assert_eq!(
    297     ///         DualStackTcpListener::bind([SocketAddr::V6(ip6), SocketAddr::V4(ip4)].as_slice())
    298     ///             .await?
    299     ///             .local_addr()?,
    300     ///         (ip6, ip4)
    301     ///     );
    302     ///     Ok(())
    303     /// }
    304     /// ```
    305     #[expect(clippy::unreachable, reason = "we want to crash when there is a bug")]
    306     #[inline]
    307     pub fn local_addr(&self) -> Result<(SocketAddrV6, SocketAddrV4)> {
    308         self.ip6.local_addr().and_then(|ip6| {
    309             self.ip4.local_addr().map(|ip4| {
    310                 (
    311                     if let SocketAddr::V6(sock6) = ip6 {
    312                         sock6
    313                     } else {
    314                         unreachable!("there is a bug in DualStackTcpListener::bind")
    315                     },
    316                     if let SocketAddr::V4(sock4) = ip4 {
    317                         sock4
    318                     } else {
    319                         unreachable!("there is a bug in DualStackTcpListener::bind")
    320                     },
    321                 )
    322             })
    323         })
    324     }
    325     /// Sets the value for the `IP_TTL` option on both sockets.
    326     ///
    327     /// This value sets the time-to-live field that is used in every packet sent from each socket.
    328     /// `ttl_ip6` is the `IP_TTL` value for the IPv6 socket and `ttl_ip4` is the `IP_TTL` value for the
    329     /// IPv4 socket.
    330     ///
    331     /// # Errors
    332     ///
    333     /// Errors iff [`TcpListener::set_ttl`] does for either listener.
    334     ///
    335     /// # Examples
    336     ///
    337     /// ```no_run
    338     /// # use core::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
    339     /// # use std::io::Result;
    340     /// # use tokio_dual_stack::{DualStackTcpListener, Tcp as _};
    341     /// #[tokio::main(flavor = "current_thread")]
    342     /// async fn main() -> Result<()> {
    343     ///     DualStackTcpListener::bind(
    344     ///         [
    345     ///             SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0)),
    346     ///             SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)),
    347     ///         ]
    348     ///         .as_slice(),
    349     ///     )
    350     ///     .await?.set_ttl(100, 100).expect("could not set TTL");
    351     ///     Ok(())
    352     /// }
    353     /// ```
    354     #[inline]
    355     pub fn set_ttl(&self, ttl_ip6: u32, ttl_ip4: u32) -> Result<()> {
    356         self.ip6
    357             .set_ttl(ttl_ip6)
    358             .and_then(|()| self.ip4.set_ttl(ttl_ip4))
    359     }
    360     /// Gets the values of the `IP_TTL` option for both sockets.
    361     ///
    362     /// The first `u32` represents the `IP_TTL` value for the IPv6 socket and the second `u32` is the
    363     /// `IP_TTL` value for the IPv4 socket. For more information about this option, see [`Self::set_ttl`].
    364     ///
    365     /// # Errors
    366     ///
    367     /// Errors iff [`TcpListener::ttl`] does for either listener.
    368     ///
    369     /// # Examples
    370     ///
    371     /// ```no_run
    372     /// # use core::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
    373     /// # use std::io::Result;
    374     /// # use tokio_dual_stack::{DualStackTcpListener, Tcp as _};
    375     /// #[tokio::main(flavor = "current_thread")]
    376     /// async fn main() -> Result<()> {
    377     ///     let listener = DualStackTcpListener::bind(
    378     ///         [
    379     ///             SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0)),
    380     ///             SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)),
    381     ///         ]
    382     ///         .as_slice(),
    383     ///     )
    384     ///     .await?;
    385     ///     listener.set_ttl(100, 100).expect("could not set TTL");
    386     ///     assert_eq!(listener.ttl()?, (100, 100));
    387     ///     Ok(())
    388     /// }
    389     /// ```
    390     #[inline]
    391     pub fn ttl(&self) -> Result<(u32, u32)> {
    392         self.ip6
    393             .ttl()
    394             .and_then(|ip6| self.ip4.ttl().map(|ip4| (ip6, ip4)))
    395     }
    396 }
    397 pin_project! {
    398     /// `Future` returned by [`DualStackTcpListener::accept]`.
    399     struct AcceptFut<
    400         F: Future<Output = Result<(TcpStream, SocketAddr)>>,
    401         F2: Future<Output = Result<(TcpStream, SocketAddr)>>,
    402     > {
    403         // Accept future for one `TcpListener`.
    404         #[pin]
    405         fut_1: F,
    406         // Accept future for the other `TcpListener`.
    407         #[pin]
    408         fut_2: F2,
    409     }
    410 }
    411 impl<
    412     F: Future<Output = Result<(TcpStream, SocketAddr)>>,
    413     F2: Future<Output = Result<(TcpStream, SocketAddr)>>,
    414 > Future for AcceptFut<F, F2>
    415 {
    416     type Output = Result<(TcpStream, SocketAddr)>;
    417     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
    418         let this = self.project();
    419         match this.fut_1.poll(cx) {
    420             Poll::Ready(res) => Poll::Ready(res),
    421             Poll::Pending => this.fut_2.poll(cx),
    422         }
    423     }
    424 }
    425 impl Sealed for DualStackTcpListener {}
    426 impl Tcp for DualStackTcpListener {
    427     #[inline]
    428     async fn bind<A: ToSocketAddrs>(addr: A) -> Result<Self> {
    429         match net::lookup_host(addr).await {
    430             Ok(socks) => {
    431                 let mut last_err = None;
    432                 let mut ip6_opt = None;
    433                 let mut ip4_opt = None;
    434                 for sock in socks {
    435                     match ip6_opt {
    436                         None => match ip4_opt {
    437                             None => {
    438                                 let is_ip6 = sock.is_ipv6();
    439                                 match TcpListener::bind(sock).await {
    440                                     Ok(ip) => {
    441                                         if is_ip6 {
    442                                             ip6_opt = Some(ip);
    443                                         } else {
    444                                             ip4_opt = Some(ip);
    445                                         }
    446                                     }
    447                                     Err(err) => last_err = Some(err),
    448                                 }
    449                             }
    450                             Some(ip4) => {
    451                                 if sock.is_ipv6() {
    452                                     match TcpListener::bind(sock).await {
    453                                         Ok(ip6) => {
    454                                             return Ok(Self {
    455                                                 ip6,
    456                                                 ip4,
    457                                                 ip6_first: AtomicBool::new(true),
    458                                             });
    459                                         }
    460                                         Err(err) => last_err = Some(err),
    461                                     }
    462                                 }
    463                                 ip4_opt = Some(ip4);
    464                             }
    465                         },
    466                         Some(ip6) => {
    467                             if sock.is_ipv4() {
    468                                 match TcpListener::bind(sock).await {
    469                                     Ok(ip4) => {
    470                                         return Ok(Self {
    471                                             ip6,
    472                                             ip4,
    473                                             ip6_first: AtomicBool::new(true),
    474                                         });
    475                                     }
    476                                     Err(err) => last_err = Some(err),
    477                                 }
    478                             }
    479                             ip6_opt = Some(ip6);
    480                         }
    481                     }
    482                 }
    483                 Err(last_err.unwrap_or_else(|| {
    484                     Error::new(
    485                         ErrorKind::InvalidInput,
    486                         "could not resolve to an IPv6 and IPv4 address",
    487                     )
    488                 }))
    489             }
    490             Err(err) => Err(err),
    491         }
    492     }
    493     #[inline]
    494     fn accept(&self) -> impl Future<Output = Result<(TcpStream, SocketAddr)>> + Send + Sync {
    495         // The correctness of code does not depend on `self.ip6_first`; therefore
    496         // we elect for the most performant `Ordering`.
    497         if self.ip6_first.swap(false, Ordering::Relaxed) {
    498             AcceptFut {
    499                 fut_1: self.ip6.accept(),
    500                 fut_2: self.ip4.accept(),
    501             }
    502         } else {
    503             // The correctness of code does not depend on `self.ip6_first`; therefore
    504             // we elect for the most performant `Ordering`.
    505             self.ip6_first.store(true, Ordering::Relaxed);
    506             AcceptFut {
    507                 fut_1: self.ip4.accept(),
    508                 fut_2: self.ip6.accept(),
    509             }
    510         }
    511     }
    512     #[inline]
    513     fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<Result<(TcpStream, SocketAddr)>> {
    514         // The correctness of code does not depend on `self.ip6_first`; therefore
    515         // we elect for the most performant `Ordering`.
    516         if self.ip6_first.swap(false, Ordering::Relaxed) {
    517             self.ip6.poll_accept(cx)
    518         } else {
    519             // The correctness of code does not depend on `self.ip6_first`; therefore
    520             // we elect for the most performant `Ordering`.
    521             self.ip6_first.store(true, Ordering::Relaxed);
    522             self.ip4.poll_accept(cx)
    523         }
    524     }
    525 }