tokio_dual_stack

Dual-stack TcpListener.
git clone https://git.philomathiclife.com/repos/tokio_dual_stack
Log | Files | Refs | README

lib.rs (21698B)


      1 //! [![git]](https://git.philomathiclife.com/tokio_dual_stack/log.html) [![crates-io]](https://crates.io/crates/tokio_dual_stack) [![docs-rs]](crate)
      2 //!
      3 //! [git]: https://git.philomathiclife.com/git_badge.svg
      4 //! [crates-io]: https://img.shields.io/badge/crates.io-fc8d62?style=for-the-badge&labelColor=555555&logo=rust
      5 //! [docs-rs]: https://img.shields.io/badge/docs.rs-66c2a5?style=for-the-badge&labelColor=555555&logo=docs.rs
      6 //!
      7 //! `tokio_dual_stack` is a library that adds a "dual-stack" [`TcpListener`].
      8 //!
      9 //! ## Why is this useful?
     10 //!
     11 //! Only certain platforms offer the ability for one socket to handle both IPv6 and IPv4 requests
     12 //! (e.g., OpenBSD does not). For the platforms that do, it is often dependent on runtime configuration
     13 //! (e.g., [`IPV6_V6ONLY`](https://www.man7.org/linux/man-pages/man7/ipv6.7.html)). Additionally those platforms
     14 //! that support it often require the "wildcard" IPv6 address to be used (i.e., `::`) which has the unfortunate
     15 //! consequence of preventing other services from using the same protocol port.
     16 //!
     17 //! There are a few ways to work around this issue. One is to deploy the same service twice: one that uses
     18 //! an IPv6 socket and the other that uses an IPv4 socket. This can complicate deployments (e.g., the application
     19 //! may not have been written with the expectation that multiple deployments could be running at the same time) in
     20 //! addition to using more resources. Another is for the application to manually handle each socket (e.g.,
     21 //! [`select`](https://docs.rs/tokio/latest/tokio/macro.select.html)/[`join`](https://docs.rs/tokio/latest/tokio/macro.join.html)
     22 //! each [`TcpListener::accept`]).
     23 //!
     24 //! [`DualStackTcpListener`] chooses an implementation similar to what the equivalent `select` would do while
     25 //! also ensuring that one socket does not "starve" another by ensuring each socket is fairly given an opportunity
     26 //! to `TcpListener::accept` a connection. This has the nice benefit of having a similar API to what a single
     27 //! `TcpListener` would have as well as having similar performance to a socket that does handle both IPv6 and
     28 //! IPv4 requests.
     29 #![expect(
     30     clippy::doc_paragraphs_missing_punctuation,
     31     reason = "false positive for crate documentation having image links"
     32 )]
     33 use core::{
     34     net::{SocketAddr, SocketAddrV4, SocketAddrV6},
     35     pin::Pin,
     36     sync::atomic::{AtomicBool, Ordering},
     37     task::{Context, Poll},
     38 };
     39 use pin_project_lite::pin_project;
     40 use std::io::{Error, ErrorKind, Result};
     41 pub use tokio;
     42 use tokio::net::{self, TcpListener, TcpSocket, TcpStream, ToSocketAddrs};
     43 /// Prevents [`Sealed`] from being publicly implementable.
     44 mod private {
     45     /// Marker trait to prevent [`super::Tcp`] from being publicly implementable.
     46     #[expect(unnameable_types, reason = "want Tcp to be 'sealed'")]
     47     pub trait Sealed {}
     48 }
     49 use private::Sealed;
     50 /// TCP "listener".
     51 ///
     52 /// This `trait` is sealed and cannot be implemented for types outside of `tokio_dual_stack`.
     53 ///
     54 /// This exists primarily as a way to define type constructors or polymorphic functions
     55 /// that can user either a [`TcpListener`] or [`DualStackTcpListener`].
     56 ///
     57 /// # Examples
     58 ///
     59 /// ```no_run
     60 /// # use core::convert::Infallible;
     61 /// # use tokio_dual_stack::Tcp;
     62 /// async fn main_loop<T: Tcp>(listener: T) -> Infallible {
     63 ///     loop {
     64 ///         match listener.accept().await {
     65 ///             Ok((_, socket)) => println!("Client socket: {socket}"),
     66 ///             Err(e) => println!("TCP connection failure: {e}"),
     67 ///         }
     68 ///     }
     69 /// }
     70 /// ```
     71 pub trait Tcp: Sealed + Sized {
     72     /// Creates a new TCP listener, which will be bound to the specified address(es).
     73     ///
     74     /// The returned listener is ready for accepting connections.
     75     ///
     76     /// Binding with a port number of 0 will request that the OS assigns a port to this listener.
     77     /// The port allocated can be queried via the `local_addr` method.
     78     ///
     79     /// The address type can be any implementor of the [`ToSocketAddrs`] trait. If `addr` yields
     80     /// multiple addresses, bind will be attempted with each of the addresses until one succeeds
     81     /// and returns the listener. If none of the addresses succeed in creating a listener, the
     82     /// error returned from the last attempt (the last address) is returned.
     83     ///
     84     /// This function sets the `SO_REUSEADDR` option on the socket.
     85     ///
     86     /// # Examples
     87     ///
     88     /// ```no_run
     89     /// # use core::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
     90     /// # use std::io::Result;
     91     /// # use tokio_dual_stack::{DualStackTcpListener, Tcp as _};
     92     /// #[tokio::main(flavor = "current_thread")]
     93     /// async fn main() -> Result<()> {
     94     ///     let listener = DualStackTcpListener::bind(
     95     ///         [
     96     ///             SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0)),
     97     ///             SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)),
     98     ///         ]
     99     ///         .as_slice(),
    100     ///     )
    101     ///     .await?;
    102     ///     Ok(())
    103     /// }
    104     /// ```
    105     fn bind<A: ToSocketAddrs>(addr: A) -> impl Future<Output = Result<Self>>;
    106     /// Accepts a new incoming connection from this listener.
    107     ///
    108     /// This function will yield once a new TCP connection is established. When established,
    109     /// the corresponding `TcpStream` and the remote peer’s address will be returned.
    110     ///
    111     /// # Cancel safety
    112     ///
    113     /// This method is cancel safe. If the method is used as the event in a
    114     /// [`tokio::select!`](https://docs.rs/tokio/latest/tokio/macro.select.html)
    115     /// statement and some other branch completes first, then it is guaranteed that no new
    116     /// connections were accepted by this method.
    117     ///
    118     /// # Examples
    119     ///
    120     /// ```no_run
    121     /// # use core::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
    122     /// # use std::io::Result;
    123     /// # use tokio_dual_stack::{DualStackTcpListener, Tcp as _};
    124     /// #[tokio::main(flavor = "current_thread")]
    125     /// async fn main() -> Result<()> {
    126     ///     match DualStackTcpListener::bind(
    127     ///         [
    128     ///             SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0)),
    129     ///             SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)),
    130     ///         ]
    131     ///         .as_slice(),
    132     ///     )
    133     ///     .await?.accept().await {
    134     ///         Ok((_, addr)) => println!("new client: {addr}"),
    135     ///         Err(e) => println!("couldn't get client: {e}"),
    136     ///     }
    137     ///     Ok(())
    138     /// }
    139     /// ```
    140     fn accept(&self) -> impl Future<Output = Result<(TcpStream, SocketAddr)>> + Send + Sync;
    141     /// Polls to accept a new incoming connection to this listener.
    142     ///
    143     /// If there is no connection to accept, `Poll::Pending` is returned and the current task will be notified by
    144     /// a waker. Note that on multiple calls to `poll_accept`, only the `Waker` from the `Context` passed to the
    145     /// most recent call is scheduled to receive a wakeup.
    146     fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<Result<(TcpStream, SocketAddr)>>;
    147 }
    148 impl Sealed for TcpListener {}
    149 impl Tcp for TcpListener {
    150     #[inline]
    151     fn bind<A: ToSocketAddrs>(addr: A) -> impl Future<Output = Result<Self>> {
    152         Self::bind(addr)
    153     }
    154     #[inline]
    155     fn accept(&self) -> impl Future<Output = Result<(TcpStream, SocketAddr)>> + Send + Sync {
    156         self.accept()
    157     }
    158     #[inline]
    159     fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<Result<(TcpStream, SocketAddr)>> {
    160         self.poll_accept(cx)
    161     }
    162 }
    163 /// "Dual-stack" TCP listener.
    164 ///
    165 /// IPv6 and IPv4 TCP listener.
    166 #[derive(Debug)]
    167 pub struct DualStackTcpListener {
    168     /// IPv6 TCP listener.
    169     ip6: TcpListener,
    170     /// IPv4 TCP listener.
    171     ip4: TcpListener,
    172     /// `true` iff [`Self::ip6::accept`] should be `poll`ed first; otherwise [`Self::ip4::accept`] is `poll`ed
    173     /// first.
    174     ///
    175     /// This exists to prevent one IP version from "starving" another. Each time [`Self::accept`] or
    176     /// [`Self::poll_accept`] is called, it's overwritten with the opposite `bool`.
    177     ///
    178     /// Note we could make this a `core::cell::Cell`; but for maximal flexibility and consistency with `TcpListener`,
    179     /// we use an `AtomicBool`. This among other things means `DualStackTcpListener` will implement `Sync`.
    180     ip6_first: AtomicBool,
    181 }
    182 impl DualStackTcpListener {
    183     /// Creates `Self` using the [`TcpListener`]s returned from [`TcpSocket::listen`].
    184     ///
    185     /// [`Self::bind`] is useful when the behavior of [`TcpListener::bind`] is sufficient; however if the underlying
    186     /// `TcpSocket`s need to be configured differently, then one must call this function instead.
    187     ///
    188     /// # Errors
    189     ///
    190     /// Errors iff [`TcpSocket::local_addr`] does for either socket, the underlying sockets use the same IP version,
    191     /// or [`TcpSocket::listen`] errors for either socket.
    192     ///
    193     /// Note on Windows-based platforms `TcpSocket::local_addr` will error if [`TcpSocket::bind`] was not called.
    194     ///
    195     /// # Examples
    196     ///
    197     /// ```no_run
    198     /// # use core::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
    199     /// # use std::io::Result;
    200     /// # use tokio_dual_stack::DualStackTcpListener;
    201     /// # use tokio::net::TcpSocket;
    202     /// #[tokio::main(flavor = "current_thread")]
    203     /// async fn main() -> Result<()> {
    204     ///     let ip6 = TcpSocket::new_v6()?;
    205     ///     ip6.bind(SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0)))?;
    206     ///     let ip4 = TcpSocket::new_v4()?;
    207     ///     ip4.bind(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)))?;
    208     ///     let listener = DualStackTcpListener::from_sockets((ip6, 1024), (ip4, 1024))?;
    209     ///     Ok(())
    210     /// }
    211     /// ```
    212     #[inline]
    213     pub fn from_sockets(
    214         (socket_1, backlog_1): (TcpSocket, u32),
    215         (socket_2, backlog_2): (TcpSocket, u32),
    216     ) -> Result<Self> {
    217         socket_1.local_addr().and_then(|sock| {
    218             socket_2.local_addr().and_then(|sock_2| {
    219                 if sock.is_ipv6() {
    220                     if sock_2.is_ipv4() {
    221                         socket_1.listen(backlog_1).and_then(|ip6| {
    222                             socket_2.listen(backlog_2).map(|ip4| Self {
    223                                 ip6,
    224                                 ip4,
    225                                 ip6_first: AtomicBool::new(true),
    226                             })
    227                         })
    228                     } else {
    229                         Err(Error::new(
    230                             ErrorKind::InvalidData,
    231                             "TcpSockets are the same IP version",
    232                         ))
    233                     }
    234                 } else if sock_2.is_ipv6() {
    235                     socket_1.listen(backlog_1).and_then(|ip4| {
    236                         socket_2.listen(backlog_2).map(|ip6| Self {
    237                             ip6,
    238                             ip4,
    239                             ip6_first: AtomicBool::new(true),
    240                         })
    241                     })
    242                 } else {
    243                     Err(Error::new(
    244                         ErrorKind::InvalidData,
    245                         "TcpSockets are the same IP version",
    246                     ))
    247                 }
    248             })
    249         })
    250     }
    251     /// Returns the local address of each socket that the listeners are bound to.
    252     ///
    253     /// This can be useful, for example, when binding to port 0 to figure out which port was actually bound.
    254     ///
    255     /// # Errors
    256     ///
    257     /// Errors iff [`TcpListener::local_addr`] does for either listener.
    258     ///
    259     /// # Examples
    260     ///
    261     /// ```no_run
    262     /// # use core::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
    263     /// # use std::io::Result;
    264     /// # use tokio_dual_stack::{DualStackTcpListener, Tcp as _};
    265     /// #[tokio::main(flavor = "current_thread")]
    266     /// async fn main() -> Result<()> {
    267     ///     let ip6 = SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0);
    268     ///     let ip4 = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080);
    269     ///     assert_eq!(
    270     ///         DualStackTcpListener::bind([SocketAddr::V6(ip6), SocketAddr::V4(ip4)].as_slice())
    271     ///             .await?
    272     ///             .local_addr()?,
    273     ///         (ip6, ip4)
    274     ///     );
    275     ///     Ok(())
    276     /// }
    277     /// ```
    278     #[expect(clippy::unreachable, reason = "we want to crash when there is a bug")]
    279     #[inline]
    280     pub fn local_addr(&self) -> Result<(SocketAddrV6, SocketAddrV4)> {
    281         self.ip6.local_addr().and_then(|ip6| {
    282             self.ip4.local_addr().map(|ip4| {
    283                 (
    284                     if let SocketAddr::V6(sock6) = ip6 {
    285                         sock6
    286                     } else {
    287                         unreachable!("there is a bug in DualStackTcpListener::bind")
    288                     },
    289                     if let SocketAddr::V4(sock4) = ip4 {
    290                         sock4
    291                     } else {
    292                         unreachable!("there is a bug in DualStackTcpListener::bind")
    293                     },
    294                 )
    295             })
    296         })
    297     }
    298     /// Sets the value for the `IP_TTL` option on both sockets.
    299     ///
    300     /// This value sets the time-to-live field that is used in every packet sent from each socket.
    301     /// `ttl_ip6` is the `IP_TTL` value for the IPv6 socket and `ttl_ip4` is the `IP_TTL` value for the
    302     /// IPv4 socket.
    303     ///
    304     /// # Errors
    305     ///
    306     /// Errors iff [`TcpListener::set_ttl`] does for either listener.
    307     ///
    308     /// # Examples
    309     ///
    310     /// ```no_run
    311     /// # use core::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
    312     /// # use std::io::Result;
    313     /// # use tokio_dual_stack::{DualStackTcpListener, Tcp as _};
    314     /// #[tokio::main(flavor = "current_thread")]
    315     /// async fn main() -> Result<()> {
    316     ///     DualStackTcpListener::bind(
    317     ///         [
    318     ///             SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0)),
    319     ///             SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)),
    320     ///         ]
    321     ///         .as_slice(),
    322     ///     )
    323     ///     .await?.set_ttl(100, 100).expect("could not set TTL");
    324     ///     Ok(())
    325     /// }
    326     /// ```
    327     #[inline]
    328     pub fn set_ttl(&self, ttl_ip6: u32, ttl_ip4: u32) -> Result<()> {
    329         self.ip6
    330             .set_ttl(ttl_ip6)
    331             .and_then(|()| self.ip4.set_ttl(ttl_ip4))
    332     }
    333     /// Gets the values of the `IP_TTL` option for both sockets.
    334     ///
    335     /// The first `u32` represents the `IP_TTL` value for the IPv6 socket and the second `u32` is the
    336     /// `IP_TTL` value for the IPv4 socket. For more information about this option, see [`Self::set_ttl`].
    337     ///
    338     /// # Errors
    339     ///
    340     /// Errors iff [`TcpListener::ttl`] does for either listener.
    341     ///
    342     /// # Examples
    343     ///
    344     /// ```no_run
    345     /// # use core::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
    346     /// # use std::io::Result;
    347     /// # use tokio_dual_stack::{DualStackTcpListener, Tcp as _};
    348     /// #[tokio::main(flavor = "current_thread")]
    349     /// async fn main() -> Result<()> {
    350     ///     let listener = DualStackTcpListener::bind(
    351     ///         [
    352     ///             SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0)),
    353     ///             SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)),
    354     ///         ]
    355     ///         .as_slice(),
    356     ///     )
    357     ///     .await?;
    358     ///     listener.set_ttl(100, 100).expect("could not set TTL");
    359     ///     assert_eq!(listener.ttl()?, (100, 100));
    360     ///     Ok(())
    361     /// }
    362     /// ```
    363     #[inline]
    364     pub fn ttl(&self) -> Result<(u32, u32)> {
    365         self.ip6
    366             .ttl()
    367             .and_then(|ip6| self.ip4.ttl().map(|ip4| (ip6, ip4)))
    368     }
    369 }
    370 pin_project! {
    371     /// `Future` returned by [`DualStackTcpListener::accept]`.
    372     struct AcceptFut<
    373         F: Future<Output = Result<(TcpStream, SocketAddr)>>,
    374         F2: Future<Output = Result<(TcpStream, SocketAddr)>>,
    375     > {
    376         // Accept future for one `TcpListener`.
    377         #[pin]
    378         fut_1: F,
    379         // Accept future for the other `TcpListener`.
    380         #[pin]
    381         fut_2: F2,
    382     }
    383 }
    384 impl<
    385     F: Future<Output = Result<(TcpStream, SocketAddr)>>,
    386     F2: Future<Output = Result<(TcpStream, SocketAddr)>>,
    387 > Future for AcceptFut<F, F2>
    388 {
    389     type Output = Result<(TcpStream, SocketAddr)>;
    390     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
    391         let this = self.project();
    392         // Note we defer errors caused from polling a completed `Future` to the contained `tokio` `Future`s.
    393         // The only time this `Future` can be polled after completion without an error (due to `tokio`) is
    394         // if `fut_2` completes first, `self` is polled, then `fut_1` completes. We don't actually care
    395         // that this happens since the correctness of the code is still fine.
    396         // This means any bugs that could occur from polling this `Future` after completion are dependency-based
    397         // bugs where the correct solution is to fix the bugs in `tokio`.
    398         match this.fut_1.poll(cx) {
    399             Poll::Ready(res) => Poll::Ready(res),
    400             Poll::Pending => this.fut_2.poll(cx),
    401         }
    402     }
    403 }
    404 impl Sealed for DualStackTcpListener {}
    405 impl Tcp for DualStackTcpListener {
    406     #[inline]
    407     async fn bind<A: ToSocketAddrs>(addr: A) -> Result<Self> {
    408         match net::lookup_host(addr).await {
    409             Ok(socks) => {
    410                 let mut last_err = None;
    411                 let mut ip6_opt = None;
    412                 let mut ip4_opt = None;
    413                 for sock in socks {
    414                     match ip6_opt {
    415                         None => match ip4_opt {
    416                             None => {
    417                                 let is_ip6 = sock.is_ipv6();
    418                                 match TcpListener::bind(sock).await {
    419                                     Ok(ip) => {
    420                                         if is_ip6 {
    421                                             ip6_opt = Some(ip);
    422                                         } else {
    423                                             ip4_opt = Some(ip);
    424                                         }
    425                                     }
    426                                     Err(err) => last_err = Some(err),
    427                                 }
    428                             }
    429                             Some(ip4) => {
    430                                 if sock.is_ipv6() {
    431                                     match TcpListener::bind(sock).await {
    432                                         Ok(ip6) => {
    433                                             return Ok(Self {
    434                                                 ip6,
    435                                                 ip4,
    436                                                 ip6_first: AtomicBool::new(true),
    437                                             });
    438                                         }
    439                                         Err(err) => last_err = Some(err),
    440                                     }
    441                                 }
    442                                 ip4_opt = Some(ip4);
    443                             }
    444                         },
    445                         Some(ip6) => {
    446                             if sock.is_ipv4() {
    447                                 match TcpListener::bind(sock).await {
    448                                     Ok(ip4) => {
    449                                         return Ok(Self {
    450                                             ip6,
    451                                             ip4,
    452                                             ip6_first: AtomicBool::new(true),
    453                                         });
    454                                     }
    455                                     Err(err) => last_err = Some(err),
    456                                 }
    457                             }
    458                             ip6_opt = Some(ip6);
    459                         }
    460                     }
    461                 }
    462                 Err(last_err.unwrap_or_else(|| {
    463                     Error::new(
    464                         ErrorKind::InvalidInput,
    465                         "could not resolve to an IPv6 and IPv4 address",
    466                     )
    467                 }))
    468             }
    469             Err(err) => Err(err),
    470         }
    471     }
    472     #[inline]
    473     fn accept(&self) -> impl Future<Output = Result<(TcpStream, SocketAddr)>> + Send + Sync {
    474         // The correctness of code does not depend on `self.ip6_first`; therefore
    475         // we elect for the most performant `Ordering`.
    476         if self.ip6_first.swap(false, Ordering::Relaxed) {
    477             AcceptFut {
    478                 fut_1: self.ip6.accept(),
    479                 fut_2: self.ip4.accept(),
    480             }
    481         } else {
    482             // The correctness of code does not depend on `self.ip6_first`; therefore
    483             // we elect for the most performant `Ordering`.
    484             self.ip6_first.store(true, Ordering::Relaxed);
    485             AcceptFut {
    486                 fut_1: self.ip4.accept(),
    487                 fut_2: self.ip6.accept(),
    488             }
    489         }
    490     }
    491     #[inline]
    492     fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<Result<(TcpStream, SocketAddr)>> {
    493         // The correctness of code does not depend on `self.ip6_first`; therefore
    494         // we elect for the most performant `Ordering`.
    495         if self.ip6_first.swap(false, Ordering::Relaxed) {
    496             self.ip6.poll_accept(cx)
    497         } else {
    498             // The correctness of code does not depend on `self.ip6_first`; therefore
    499             // we elect for the most performant `Ordering`.
    500             self.ip6_first.store(true, Ordering::Relaxed);
    501             self.ip4.poll_accept(cx)
    502         }
    503     }
    504 }