tokio_dual_stack

Dual-stack TcpListener.
git clone https://git.philomathiclife.com/repos/tokio_dual_stack
Log | Files | Refs | README

lib.rs (21631B)


      1 //! [![git]](https://git.philomathiclife.com/tokio_dual_stack/log.html) [![crates-io]](https://crates.io/crates/tokio_dual_stack) [![docs-rs]](crate)
      2 //!
      3 //! [git]: https://git.philomathiclife.com/git_badge.svg
      4 //! [crates-io]: https://img.shields.io/badge/crates.io-fc8d62?style=for-the-badge&labelColor=555555&logo=rust
      5 //! [docs-rs]: https://img.shields.io/badge/docs.rs-66c2a5?style=for-the-badge&labelColor=555555&logo=docs.rs
      6 //!
      7 //! `tokio_dual_stack` is a library that adds a "dual-stack" [`TcpListener`].
      8 //!
      9 //! ## Why is this useful?
     10 //!
     11 //! Only certain platforms offer the ability for one socket to handle both IPv6 and IPv4 requests
     12 //! (e.g., OpenBSD does not). For the platforms that do, it is often dependent on runtime configuration
     13 //! (e.g., [`IPV6_V6ONLY`](https://www.man7.org/linux/man-pages/man7/ipv6.7.html)). Additionally those platforms
     14 //! that support it often require the "wildcard" IPv6 address to be used (i.e., `::`) which has the unfortunate
     15 //! consequence of preventing other services from using the same protocol port.
     16 //!
     17 //! There are a few ways to work around this issue. One is to deploy the same service twice: one that uses
     18 //! an IPv6 socket and the other that uses an IPv4 socket. This can complicate deployments (e.g., the application
     19 //! may not have been written with the expectation that multiple deployments could be running at the same time) in
     20 //! addition to using more resources. Another is for the application to manually handle each socket (e.g.,
     21 //! [`select`](https://docs.rs/tokio/latest/tokio/macro.select.html)/[`join`](https://docs.rs/tokio/latest/tokio/macro.join.html)
     22 //! each [`TcpListener::accept`]).
     23 //!
     24 //! [`DualStackTcpListener`] chooses an implementation similar to what the equivalent `select` would do while
     25 //! also ensuring that one socket does not "starve" another by ensuring each socket is fairly given an opportunity
     26 //! to `TcpListener::accept` a connection. This has the nice benefit of having a similar API to what a single
     27 //! `TcpListener` would have as well as having similar performance to a socket that does handle both IPv6 and
     28 //! IPv4 requests.
     29 #![expect(clippy::multiple_crate_versions, reason = "windows-sys")]
     30 use core::{
     31     net::{SocketAddr, SocketAddrV4, SocketAddrV6},
     32     pin::Pin,
     33     sync::atomic::{AtomicBool, Ordering},
     34     task::{Context, Poll},
     35 };
     36 use pin_project_lite::pin_project;
     37 use std::io::{Error, ErrorKind, Result};
     38 pub use tokio;
     39 use tokio::net::{self, TcpListener, TcpSocket, TcpStream, ToSocketAddrs};
     40 /// Prevents [`Sealed`] from being publicly implementable.
     41 mod private {
     42     /// Marker trait to prevent [`super::Tcp`] from being publicly implementable.
     43     #[expect(unnameable_types, reason = "want Tcp to be 'sealed'")]
     44     pub trait Sealed {}
     45 }
     46 use private::Sealed;
     47 /// TCP "listener".
     48 ///
     49 /// This `trait` is sealed and cannot be implemented for types outside of `tokio_dual_stack`.
     50 ///
     51 /// This exists primarily as a way to define type constructors or polymorphic functions
     52 /// that can user either a [`TcpListener`] or [`DualStackTcpListener`].
     53 ///
     54 /// # Examples
     55 ///
     56 /// ```no_run
     57 /// # use core::convert::Infallible;
     58 /// # use tokio_dual_stack::Tcp;
     59 /// async fn main_loop<T: Tcp>(listener: T) -> Infallible {
     60 ///     loop {
     61 ///         match listener.accept().await {
     62 ///             Ok((_, socket)) => println!("Client socket: {socket}"),
     63 ///             Err(e) => println!("TCP connection failure: {e}"),
     64 ///         }
     65 ///     }
     66 /// }
     67 /// ```
     68 pub trait Tcp: Sealed + Sized {
     69     /// Creates a new TCP listener, which will be bound to the specified address(es).
     70     ///
     71     /// The returned listener is ready for accepting connections.
     72     ///
     73     /// Binding with a port number of 0 will request that the OS assigns a port to this listener.
     74     /// The port allocated can be queried via the `local_addr` method.
     75     ///
     76     /// The address type can be any implementor of the [`ToSocketAddrs`] trait. If `addr` yields
     77     /// multiple addresses, bind will be attempted with each of the addresses until one succeeds
     78     /// and returns the listener. If none of the addresses succeed in creating a listener, the
     79     /// error returned from the last attempt (the last address) is returned.
     80     ///
     81     /// This function sets the `SO_REUSEADDR` option on the socket.
     82     ///
     83     /// # Examples
     84     ///
     85     /// ```no_run
     86     /// # use core::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
     87     /// # use std::io::Result;
     88     /// # use tokio_dual_stack::{DualStackTcpListener, Tcp as _};
     89     /// #[tokio::main(flavor = "current_thread")]
     90     /// async fn main() -> Result<()> {
     91     ///     let listener = DualStackTcpListener::bind(
     92     ///         [
     93     ///             SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0)),
     94     ///             SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)),
     95     ///         ]
     96     ///         .as_slice(),
     97     ///     )
     98     ///     .await?;
     99     ///     Ok(())
    100     /// }
    101     /// ```
    102     fn bind<A: ToSocketAddrs>(addr: A) -> impl Future<Output = Result<Self>>;
    103     /// Accepts a new incoming connection from this listener.
    104     ///
    105     /// This function will yield once a new TCP connection is established. When established,
    106     /// the corresponding `TcpStream` and the remote peer’s address will be returned.
    107     ///
    108     /// # Cancel safety
    109     ///
    110     /// This method is cancel safe. If the method is used as the event in a
    111     /// [`tokio::select!`](https://docs.rs/tokio/latest/tokio/macro.select.html)
    112     /// statement and some other branch completes first, then it is guaranteed that no new
    113     /// connections were accepted by this method.
    114     ///
    115     /// # Examples
    116     ///
    117     /// ```no_run
    118     /// # use core::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
    119     /// # use std::io::Result;
    120     /// # use tokio_dual_stack::{DualStackTcpListener, Tcp as _};
    121     /// #[tokio::main(flavor = "current_thread")]
    122     /// async fn main() -> Result<()> {
    123     ///     match DualStackTcpListener::bind(
    124     ///         [
    125     ///             SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0)),
    126     ///             SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)),
    127     ///         ]
    128     ///         .as_slice(),
    129     ///     )
    130     ///     .await?.accept().await {
    131     ///         Ok((_, addr)) => println!("new client: {addr}"),
    132     ///         Err(e) => println!("couldn't get client: {e}"),
    133     ///     }
    134     ///     Ok(())
    135     /// }
    136     /// ```
    137     fn accept(&self) -> impl Future<Output = Result<(TcpStream, SocketAddr)>> + Send + Sync;
    138     /// Polls to accept a new incoming connection to this listener.
    139     ///
    140     /// If there is no connection to accept, `Poll::Pending` is returned and the current task will be notified by
    141     /// a waker. Note that on multiple calls to `poll_accept`, only the `Waker` from the `Context` passed to the
    142     /// most recent call is scheduled to receive a wakeup.
    143     fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<Result<(TcpStream, SocketAddr)>>;
    144 }
    145 impl Sealed for TcpListener {}
    146 impl Tcp for TcpListener {
    147     #[inline]
    148     fn bind<A: ToSocketAddrs>(addr: A) -> impl Future<Output = Result<Self>> {
    149         Self::bind(addr)
    150     }
    151     #[inline]
    152     fn accept(&self) -> impl Future<Output = Result<(TcpStream, SocketAddr)>> + Send + Sync {
    153         self.accept()
    154     }
    155     #[inline]
    156     fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<Result<(TcpStream, SocketAddr)>> {
    157         self.poll_accept(cx)
    158     }
    159 }
    160 /// "Dual-stack" TCP listener.
    161 ///
    162 /// IPv6 and IPv4 TCP listener.
    163 #[derive(Debug)]
    164 pub struct DualStackTcpListener {
    165     /// IPv6 TCP listener.
    166     ip6: TcpListener,
    167     /// IPv4 TCP listener.
    168     ip4: TcpListener,
    169     /// `true` iff [`Self::ip6::accept`] should be `poll`ed first; otherwise [`Self::ip4::accept`] is `poll`ed
    170     /// first.
    171     ///
    172     /// This exists to prevent one IP version from "starving" another. Each time [`Self::accept`] or
    173     /// [`Self::poll_accept`] is called, it's overwritten with the opposite `bool`.
    174     ///
    175     /// Note we could make this a `core::cell::Cell`; but for maximal flexibility and consistency with `TcpListener`,
    176     /// we use an `AtomicBool`. This among other things means `DualStackTcpListener` will implement `Sync`.
    177     ip6_first: AtomicBool,
    178 }
    179 impl DualStackTcpListener {
    180     /// Creates `Self` using the [`TcpListener`]s returned from [`TcpSocket::listen`].
    181     ///
    182     /// [`Self::bind`] is useful when the behavior of [`TcpListener::bind`] is sufficient; however if the underlying
    183     /// `TcpSocket`s need to be configured differently, then one must call this function instead.
    184     ///
    185     /// # Errors
    186     ///
    187     /// Errors iff [`TcpSocket::local_addr`] does for either socket, the underlying sockets use the same IP version,
    188     /// or [`TcpSocket::listen`] errors for either socket.
    189     ///
    190     /// Note on Windows-based platforms `TcpSocket::local_addr` will error if [`TcpSocket::bind`] was not called.
    191     ///
    192     /// # Examples
    193     ///
    194     /// ```no_run
    195     /// # use core::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
    196     /// # use std::io::Result;
    197     /// # use tokio_dual_stack::DualStackTcpListener;
    198     /// # use tokio::net::TcpSocket;
    199     /// #[tokio::main(flavor = "current_thread")]
    200     /// async fn main() -> Result<()> {
    201     ///     let ip6 = TcpSocket::new_v6()?;
    202     ///     ip6.bind(SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0)))?;
    203     ///     let ip4 = TcpSocket::new_v4()?;
    204     ///     ip4.bind(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)))?;
    205     ///     let listener = DualStackTcpListener::from_sockets((ip6, 1024), (ip4, 1024))?;
    206     ///     Ok(())
    207     /// }
    208     /// ```
    209     #[inline]
    210     pub fn from_sockets(
    211         (socket_1, backlog_1): (TcpSocket, u32),
    212         (socket_2, backlog_2): (TcpSocket, u32),
    213     ) -> Result<Self> {
    214         socket_1.local_addr().and_then(|sock| {
    215             socket_2.local_addr().and_then(|sock_2| {
    216                 if sock.is_ipv6() {
    217                     if sock_2.is_ipv4() {
    218                         socket_1.listen(backlog_1).and_then(|ip6| {
    219                             socket_2.listen(backlog_2).map(|ip4| Self {
    220                                 ip6,
    221                                 ip4,
    222                                 ip6_first: AtomicBool::new(true),
    223                             })
    224                         })
    225                     } else {
    226                         Err(Error::new(
    227                             ErrorKind::InvalidData,
    228                             "TcpSockets are the same IP version",
    229                         ))
    230                     }
    231                 } else if sock_2.is_ipv6() {
    232                     socket_1.listen(backlog_1).and_then(|ip4| {
    233                         socket_2.listen(backlog_2).map(|ip6| Self {
    234                             ip6,
    235                             ip4,
    236                             ip6_first: AtomicBool::new(true),
    237                         })
    238                     })
    239                 } else {
    240                     Err(Error::new(
    241                         ErrorKind::InvalidData,
    242                         "TcpSockets are the same IP version",
    243                     ))
    244                 }
    245             })
    246         })
    247     }
    248     /// Returns the local address of each socket that the listeners are bound to.
    249     ///
    250     /// This can be useful, for example, when binding to port 0 to figure out which port was actually bound.
    251     ///
    252     /// # Errors
    253     ///
    254     /// Errors iff [`TcpListener::local_addr`] does for either listener.
    255     ///
    256     /// # Examples
    257     ///
    258     /// ```no_run
    259     /// # use core::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
    260     /// # use std::io::Result;
    261     /// # use tokio_dual_stack::{DualStackTcpListener, Tcp as _};
    262     /// #[tokio::main(flavor = "current_thread")]
    263     /// async fn main() -> Result<()> {
    264     ///     let ip6 = SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0);
    265     ///     let ip4 = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080);
    266     ///     assert_eq!(
    267     ///         DualStackTcpListener::bind([SocketAddr::V6(ip6), SocketAddr::V4(ip4)].as_slice())
    268     ///             .await?
    269     ///             .local_addr()?,
    270     ///         (ip6, ip4)
    271     ///     );
    272     ///     Ok(())
    273     /// }
    274     /// ```
    275     #[expect(clippy::unreachable, reason = "we want to crash when there is a bug")]
    276     #[inline]
    277     pub fn local_addr(&self) -> Result<(SocketAddrV6, SocketAddrV4)> {
    278         self.ip6.local_addr().and_then(|ip6| {
    279             self.ip4.local_addr().map(|ip4| {
    280                 (
    281                     if let SocketAddr::V6(sock6) = ip6 {
    282                         sock6
    283                     } else {
    284                         unreachable!("there is a bug in DualStackTcpListener::bind")
    285                     },
    286                     if let SocketAddr::V4(sock4) = ip4 {
    287                         sock4
    288                     } else {
    289                         unreachable!("there is a bug in DualStackTcpListener::bind")
    290                     },
    291                 )
    292             })
    293         })
    294     }
    295     /// Sets the value for the `IP_TTL` option on both sockets.
    296     ///
    297     /// This value sets the time-to-live field that is used in every packet sent from each socket.
    298     /// `ttl_ip6` is the `IP_TTL` value for the IPv6 socket and `ttl_ip4` is the `IP_TTL` value for the
    299     /// IPv4 socket.
    300     ///
    301     /// # Errors
    302     ///
    303     /// Errors iff [`TcpListener::set_ttl`] does for either listener.
    304     ///
    305     /// # Examples
    306     ///
    307     /// ```no_run
    308     /// # use core::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
    309     /// # use std::io::Result;
    310     /// # use tokio_dual_stack::{DualStackTcpListener, Tcp as _};
    311     /// #[tokio::main(flavor = "current_thread")]
    312     /// async fn main() -> Result<()> {
    313     ///     DualStackTcpListener::bind(
    314     ///         [
    315     ///             SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0)),
    316     ///             SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)),
    317     ///         ]
    318     ///         .as_slice(),
    319     ///     )
    320     ///     .await?.set_ttl(100, 100).expect("could not set TTL");
    321     ///     Ok(())
    322     /// }
    323     /// ```
    324     #[inline]
    325     pub fn set_ttl(&self, ttl_ip6: u32, ttl_ip4: u32) -> Result<()> {
    326         self.ip6
    327             .set_ttl(ttl_ip6)
    328             .and_then(|()| self.ip4.set_ttl(ttl_ip4))
    329     }
    330     /// Gets the values of the `IP_TTL` option for both sockets.
    331     ///
    332     /// The first `u32` represents the `IP_TTL` value for the IPv6 socket and the second `u32` is the
    333     /// `IP_TTL` value for the IPv4 socket. For more information about this option, see [`Self::set_ttl`].
    334     ///
    335     /// # Errors
    336     ///
    337     /// Errors iff [`TcpListener::ttl`] does for either listener.
    338     ///
    339     /// # Examples
    340     ///
    341     /// ```no_run
    342     /// # use core::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
    343     /// # use std::io::Result;
    344     /// # use tokio_dual_stack::{DualStackTcpListener, Tcp as _};
    345     /// #[tokio::main(flavor = "current_thread")]
    346     /// async fn main() -> Result<()> {
    347     ///     let listener = DualStackTcpListener::bind(
    348     ///         [
    349     ///             SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0)),
    350     ///             SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)),
    351     ///         ]
    352     ///         .as_slice(),
    353     ///     )
    354     ///     .await?;
    355     ///     listener.set_ttl(100, 100).expect("could not set TTL");
    356     ///     assert_eq!(listener.ttl()?, (100, 100));
    357     ///     Ok(())
    358     /// }
    359     /// ```
    360     #[inline]
    361     pub fn ttl(&self) -> Result<(u32, u32)> {
    362         self.ip6
    363             .ttl()
    364             .and_then(|ip6| self.ip4.ttl().map(|ip4| (ip6, ip4)))
    365     }
    366 }
    367 pin_project! {
    368     /// `Future` returned by [`DualStackTcpListener::accept]`.
    369     struct AcceptFut<
    370         F: Future<Output = Result<(TcpStream, SocketAddr)>>,
    371         F2: Future<Output = Result<(TcpStream, SocketAddr)>>,
    372     > {
    373         // Accept future for one `TcpListener`.
    374         #[pin]
    375         fut_1: F,
    376         // Accept future for the other `TcpListener`.
    377         #[pin]
    378         fut_2: F2,
    379     }
    380 }
    381 impl<
    382     F: Future<Output = Result<(TcpStream, SocketAddr)>>,
    383     F2: Future<Output = Result<(TcpStream, SocketAddr)>>,
    384 > Future for AcceptFut<F, F2>
    385 {
    386     type Output = Result<(TcpStream, SocketAddr)>;
    387     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
    388         let this = self.project();
    389         // Note we defer errors caused from polling a completed `Future` to the contained `tokio` `Future`s.
    390         // The only time this `Future` can be polled after completion without an error (due to `tokio`) is
    391         // if `fut_2` completes first, `self` is polled, then `fut_1` completes. We don't actually care
    392         // that this happens since the correctness of the code is still fine.
    393         // This means any bugs that could occur from polling this `Future` after completion are dependency-based
    394         // bugs where the correct solution is to fix the bugs in `tokio`.
    395         match this.fut_1.poll(cx) {
    396             Poll::Ready(res) => Poll::Ready(res),
    397             Poll::Pending => this.fut_2.poll(cx),
    398         }
    399     }
    400 }
    401 impl Sealed for DualStackTcpListener {}
    402 impl Tcp for DualStackTcpListener {
    403     #[inline]
    404     async fn bind<A: ToSocketAddrs>(addr: A) -> Result<Self> {
    405         match net::lookup_host(addr).await {
    406             Ok(socks) => {
    407                 let mut last_err = None;
    408                 let mut ip6_opt = None;
    409                 let mut ip4_opt = None;
    410                 for sock in socks {
    411                     match ip6_opt {
    412                         None => match ip4_opt {
    413                             None => {
    414                                 let is_ip6 = sock.is_ipv6();
    415                                 match TcpListener::bind(sock).await {
    416                                     Ok(ip) => {
    417                                         if is_ip6 {
    418                                             ip6_opt = Some(ip);
    419                                         } else {
    420                                             ip4_opt = Some(ip);
    421                                         }
    422                                     }
    423                                     Err(err) => last_err = Some(err),
    424                                 }
    425                             }
    426                             Some(ip4) => {
    427                                 if sock.is_ipv6() {
    428                                     match TcpListener::bind(sock).await {
    429                                         Ok(ip6) => {
    430                                             return Ok(Self {
    431                                                 ip6,
    432                                                 ip4,
    433                                                 ip6_first: AtomicBool::new(true),
    434                                             });
    435                                         }
    436                                         Err(err) => last_err = Some(err),
    437                                     }
    438                                 }
    439                                 ip4_opt = Some(ip4);
    440                             }
    441                         },
    442                         Some(ip6) => {
    443                             if sock.is_ipv4() {
    444                                 match TcpListener::bind(sock).await {
    445                                     Ok(ip4) => {
    446                                         return Ok(Self {
    447                                             ip6,
    448                                             ip4,
    449                                             ip6_first: AtomicBool::new(true),
    450                                         });
    451                                     }
    452                                     Err(err) => last_err = Some(err),
    453                                 }
    454                             }
    455                             ip6_opt = Some(ip6);
    456                         }
    457                     }
    458                 }
    459                 Err(last_err.unwrap_or_else(|| {
    460                     Error::new(
    461                         ErrorKind::InvalidInput,
    462                         "could not resolve to an IPv6 and IPv4 address",
    463                     )
    464                 }))
    465             }
    466             Err(err) => Err(err),
    467         }
    468     }
    469     #[inline]
    470     fn accept(&self) -> impl Future<Output = Result<(TcpStream, SocketAddr)>> + Send + Sync {
    471         // The correctness of code does not depend on `self.ip6_first`; therefore
    472         // we elect for the most performant `Ordering`.
    473         if self.ip6_first.swap(false, Ordering::Relaxed) {
    474             AcceptFut {
    475                 fut_1: self.ip6.accept(),
    476                 fut_2: self.ip4.accept(),
    477             }
    478         } else {
    479             // The correctness of code does not depend on `self.ip6_first`; therefore
    480             // we elect for the most performant `Ordering`.
    481             self.ip6_first.store(true, Ordering::Relaxed);
    482             AcceptFut {
    483                 fut_1: self.ip4.accept(),
    484                 fut_2: self.ip6.accept(),
    485             }
    486         }
    487     }
    488     #[inline]
    489     fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<Result<(TcpStream, SocketAddr)>> {
    490         // The correctness of code does not depend on `self.ip6_first`; therefore
    491         // we elect for the most performant `Ordering`.
    492         if self.ip6_first.swap(false, Ordering::Relaxed) {
    493             self.ip6.poll_accept(cx)
    494         } else {
    495             // The correctness of code does not depend on `self.ip6_first`; therefore
    496             // we elect for the most performant `Ordering`.
    497             self.ip6_first.store(true, Ordering::Relaxed);
    498             self.ip4.poll_accept(cx)
    499         }
    500     }
    501 }