lib.rs (21631B)
1 //! [![git]](https://git.philomathiclife.com/tokio_dual_stack/log.html) [![crates-io]](https://crates.io/crates/tokio_dual_stack) [![docs-rs]](crate) 2 //! 3 //! [git]: https://git.philomathiclife.com/git_badge.svg 4 //! [crates-io]: https://img.shields.io/badge/crates.io-fc8d62?style=for-the-badge&labelColor=555555&logo=rust 5 //! [docs-rs]: https://img.shields.io/badge/docs.rs-66c2a5?style=for-the-badge&labelColor=555555&logo=docs.rs 6 //! 7 //! `tokio_dual_stack` is a library that adds a "dual-stack" [`TcpListener`]. 8 //! 9 //! ## Why is this useful? 10 //! 11 //! Only certain platforms offer the ability for one socket to handle both IPv6 and IPv4 requests 12 //! (e.g., OpenBSD does not). For the platforms that do, it is often dependent on runtime configuration 13 //! (e.g., [`IPV6_V6ONLY`](https://www.man7.org/linux/man-pages/man7/ipv6.7.html)). Additionally those platforms 14 //! that support it often require the "wildcard" IPv6 address to be used (i.e., `::`) which has the unfortunate 15 //! consequence of preventing other services from using the same protocol port. 16 //! 17 //! There are a few ways to work around this issue. One is to deploy the same service twice: one that uses 18 //! an IPv6 socket and the other that uses an IPv4 socket. This can complicate deployments (e.g., the application 19 //! may not have been written with the expectation that multiple deployments could be running at the same time) in 20 //! addition to using more resources. Another is for the application to manually handle each socket (e.g., 21 //! [`select`](https://docs.rs/tokio/latest/tokio/macro.select.html)/[`join`](https://docs.rs/tokio/latest/tokio/macro.join.html) 22 //! each [`TcpListener::accept`]). 23 //! 24 //! [`DualStackTcpListener`] chooses an implementation similar to what the equivalent `select` would do while 25 //! also ensuring that one socket does not "starve" another by ensuring each socket is fairly given an opportunity 26 //! to `TcpListener::accept` a connection. This has the nice benefit of having a similar API to what a single 27 //! `TcpListener` would have as well as having similar performance to a socket that does handle both IPv6 and 28 //! IPv4 requests. 29 #![expect(clippy::multiple_crate_versions, reason = "windows-sys")] 30 use core::{ 31 net::{SocketAddr, SocketAddrV4, SocketAddrV6}, 32 pin::Pin, 33 sync::atomic::{AtomicBool, Ordering}, 34 task::{Context, Poll}, 35 }; 36 use pin_project_lite::pin_project; 37 use std::io::{Error, ErrorKind, Result}; 38 pub use tokio; 39 use tokio::net::{self, TcpListener, TcpSocket, TcpStream, ToSocketAddrs}; 40 /// Prevents [`Sealed`] from being publicly implementable. 41 mod private { 42 /// Marker trait to prevent [`super::Tcp`] from being publicly implementable. 43 #[expect(unnameable_types, reason = "want Tcp to be 'sealed'")] 44 pub trait Sealed {} 45 } 46 use private::Sealed; 47 /// TCP "listener". 48 /// 49 /// This `trait` is sealed and cannot be implemented for types outside of `tokio_dual_stack`. 50 /// 51 /// This exists primarily as a way to define type constructors or polymorphic functions 52 /// that can user either a [`TcpListener`] or [`DualStackTcpListener`]. 53 /// 54 /// # Examples 55 /// 56 /// ```no_run 57 /// # use core::convert::Infallible; 58 /// # use tokio_dual_stack::Tcp; 59 /// async fn main_loop<T: Tcp>(listener: T) -> Infallible { 60 /// loop { 61 /// match listener.accept().await { 62 /// Ok((_, socket)) => println!("Client socket: {socket}"), 63 /// Err(e) => println!("TCP connection failure: {e}"), 64 /// } 65 /// } 66 /// } 67 /// ``` 68 pub trait Tcp: Sealed + Sized { 69 /// Creates a new TCP listener, which will be bound to the specified address(es). 70 /// 71 /// The returned listener is ready for accepting connections. 72 /// 73 /// Binding with a port number of 0 will request that the OS assigns a port to this listener. 74 /// The port allocated can be queried via the `local_addr` method. 75 /// 76 /// The address type can be any implementor of the [`ToSocketAddrs`] trait. If `addr` yields 77 /// multiple addresses, bind will be attempted with each of the addresses until one succeeds 78 /// and returns the listener. If none of the addresses succeed in creating a listener, the 79 /// error returned from the last attempt (the last address) is returned. 80 /// 81 /// This function sets the `SO_REUSEADDR` option on the socket. 82 /// 83 /// # Examples 84 /// 85 /// ```no_run 86 /// # use core::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; 87 /// # use std::io::Result; 88 /// # use tokio_dual_stack::{DualStackTcpListener, Tcp as _}; 89 /// #[tokio::main(flavor = "current_thread")] 90 /// async fn main() -> Result<()> { 91 /// let listener = DualStackTcpListener::bind( 92 /// [ 93 /// SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0)), 94 /// SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)), 95 /// ] 96 /// .as_slice(), 97 /// ) 98 /// .await?; 99 /// Ok(()) 100 /// } 101 /// ``` 102 fn bind<A: ToSocketAddrs>(addr: A) -> impl Future<Output = Result<Self>>; 103 /// Accepts a new incoming connection from this listener. 104 /// 105 /// This function will yield once a new TCP connection is established. When established, 106 /// the corresponding `TcpStream` and the remote peer’s address will be returned. 107 /// 108 /// # Cancel safety 109 /// 110 /// This method is cancel safe. If the method is used as the event in a 111 /// [`tokio::select!`](https://docs.rs/tokio/latest/tokio/macro.select.html) 112 /// statement and some other branch completes first, then it is guaranteed that no new 113 /// connections were accepted by this method. 114 /// 115 /// # Examples 116 /// 117 /// ```no_run 118 /// # use core::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; 119 /// # use std::io::Result; 120 /// # use tokio_dual_stack::{DualStackTcpListener, Tcp as _}; 121 /// #[tokio::main(flavor = "current_thread")] 122 /// async fn main() -> Result<()> { 123 /// match DualStackTcpListener::bind( 124 /// [ 125 /// SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0)), 126 /// SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)), 127 /// ] 128 /// .as_slice(), 129 /// ) 130 /// .await?.accept().await { 131 /// Ok((_, addr)) => println!("new client: {addr}"), 132 /// Err(e) => println!("couldn't get client: {e}"), 133 /// } 134 /// Ok(()) 135 /// } 136 /// ``` 137 fn accept(&self) -> impl Future<Output = Result<(TcpStream, SocketAddr)>> + Send + Sync; 138 /// Polls to accept a new incoming connection to this listener. 139 /// 140 /// If there is no connection to accept, `Poll::Pending` is returned and the current task will be notified by 141 /// a waker. Note that on multiple calls to `poll_accept`, only the `Waker` from the `Context` passed to the 142 /// most recent call is scheduled to receive a wakeup. 143 fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<Result<(TcpStream, SocketAddr)>>; 144 } 145 impl Sealed for TcpListener {} 146 impl Tcp for TcpListener { 147 #[inline] 148 fn bind<A: ToSocketAddrs>(addr: A) -> impl Future<Output = Result<Self>> { 149 Self::bind(addr) 150 } 151 #[inline] 152 fn accept(&self) -> impl Future<Output = Result<(TcpStream, SocketAddr)>> + Send + Sync { 153 self.accept() 154 } 155 #[inline] 156 fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<Result<(TcpStream, SocketAddr)>> { 157 self.poll_accept(cx) 158 } 159 } 160 /// "Dual-stack" TCP listener. 161 /// 162 /// IPv6 and IPv4 TCP listener. 163 #[derive(Debug)] 164 pub struct DualStackTcpListener { 165 /// IPv6 TCP listener. 166 ip6: TcpListener, 167 /// IPv4 TCP listener. 168 ip4: TcpListener, 169 /// `true` iff [`Self::ip6::accept`] should be `poll`ed first; otherwise [`Self::ip4::accept`] is `poll`ed 170 /// first. 171 /// 172 /// This exists to prevent one IP version from "starving" another. Each time [`Self::accept`] or 173 /// [`Self::poll_accept`] is called, it's overwritten with the opposite `bool`. 174 /// 175 /// Note we could make this a `core::cell::Cell`; but for maximal flexibility and consistency with `TcpListener`, 176 /// we use an `AtomicBool`. This among other things means `DualStackTcpListener` will implement `Sync`. 177 ip6_first: AtomicBool, 178 } 179 impl DualStackTcpListener { 180 /// Creates `Self` using the [`TcpListener`]s returned from [`TcpSocket::listen`]. 181 /// 182 /// [`Self::bind`] is useful when the behavior of [`TcpListener::bind`] is sufficient; however if the underlying 183 /// `TcpSocket`s need to be configured differently, then one must call this function instead. 184 /// 185 /// # Errors 186 /// 187 /// Errors iff [`TcpSocket::local_addr`] does for either socket, the underlying sockets use the same IP version, 188 /// or [`TcpSocket::listen`] errors for either socket. 189 /// 190 /// Note on Windows-based platforms `TcpSocket::local_addr` will error if [`TcpSocket::bind`] was not called. 191 /// 192 /// # Examples 193 /// 194 /// ```no_run 195 /// # use core::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; 196 /// # use std::io::Result; 197 /// # use tokio_dual_stack::DualStackTcpListener; 198 /// # use tokio::net::TcpSocket; 199 /// #[tokio::main(flavor = "current_thread")] 200 /// async fn main() -> Result<()> { 201 /// let ip6 = TcpSocket::new_v6()?; 202 /// ip6.bind(SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0)))?; 203 /// let ip4 = TcpSocket::new_v4()?; 204 /// ip4.bind(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)))?; 205 /// let listener = DualStackTcpListener::from_sockets((ip6, 1024), (ip4, 1024))?; 206 /// Ok(()) 207 /// } 208 /// ``` 209 #[inline] 210 pub fn from_sockets( 211 (socket_1, backlog_1): (TcpSocket, u32), 212 (socket_2, backlog_2): (TcpSocket, u32), 213 ) -> Result<Self> { 214 socket_1.local_addr().and_then(|sock| { 215 socket_2.local_addr().and_then(|sock_2| { 216 if sock.is_ipv6() { 217 if sock_2.is_ipv4() { 218 socket_1.listen(backlog_1).and_then(|ip6| { 219 socket_2.listen(backlog_2).map(|ip4| Self { 220 ip6, 221 ip4, 222 ip6_first: AtomicBool::new(true), 223 }) 224 }) 225 } else { 226 Err(Error::new( 227 ErrorKind::InvalidData, 228 "TcpSockets are the same IP version", 229 )) 230 } 231 } else if sock_2.is_ipv6() { 232 socket_1.listen(backlog_1).and_then(|ip4| { 233 socket_2.listen(backlog_2).map(|ip6| Self { 234 ip6, 235 ip4, 236 ip6_first: AtomicBool::new(true), 237 }) 238 }) 239 } else { 240 Err(Error::new( 241 ErrorKind::InvalidData, 242 "TcpSockets are the same IP version", 243 )) 244 } 245 }) 246 }) 247 } 248 /// Returns the local address of each socket that the listeners are bound to. 249 /// 250 /// This can be useful, for example, when binding to port 0 to figure out which port was actually bound. 251 /// 252 /// # Errors 253 /// 254 /// Errors iff [`TcpListener::local_addr`] does for either listener. 255 /// 256 /// # Examples 257 /// 258 /// ```no_run 259 /// # use core::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; 260 /// # use std::io::Result; 261 /// # use tokio_dual_stack::{DualStackTcpListener, Tcp as _}; 262 /// #[tokio::main(flavor = "current_thread")] 263 /// async fn main() -> Result<()> { 264 /// let ip6 = SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0); 265 /// let ip4 = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080); 266 /// assert_eq!( 267 /// DualStackTcpListener::bind([SocketAddr::V6(ip6), SocketAddr::V4(ip4)].as_slice()) 268 /// .await? 269 /// .local_addr()?, 270 /// (ip6, ip4) 271 /// ); 272 /// Ok(()) 273 /// } 274 /// ``` 275 #[expect(clippy::unreachable, reason = "we want to crash when there is a bug")] 276 #[inline] 277 pub fn local_addr(&self) -> Result<(SocketAddrV6, SocketAddrV4)> { 278 self.ip6.local_addr().and_then(|ip6| { 279 self.ip4.local_addr().map(|ip4| { 280 ( 281 if let SocketAddr::V6(sock6) = ip6 { 282 sock6 283 } else { 284 unreachable!("there is a bug in DualStackTcpListener::bind") 285 }, 286 if let SocketAddr::V4(sock4) = ip4 { 287 sock4 288 } else { 289 unreachable!("there is a bug in DualStackTcpListener::bind") 290 }, 291 ) 292 }) 293 }) 294 } 295 /// Sets the value for the `IP_TTL` option on both sockets. 296 /// 297 /// This value sets the time-to-live field that is used in every packet sent from each socket. 298 /// `ttl_ip6` is the `IP_TTL` value for the IPv6 socket and `ttl_ip4` is the `IP_TTL` value for the 299 /// IPv4 socket. 300 /// 301 /// # Errors 302 /// 303 /// Errors iff [`TcpListener::set_ttl`] does for either listener. 304 /// 305 /// # Examples 306 /// 307 /// ```no_run 308 /// # use core::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; 309 /// # use std::io::Result; 310 /// # use tokio_dual_stack::{DualStackTcpListener, Tcp as _}; 311 /// #[tokio::main(flavor = "current_thread")] 312 /// async fn main() -> Result<()> { 313 /// DualStackTcpListener::bind( 314 /// [ 315 /// SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0)), 316 /// SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)), 317 /// ] 318 /// .as_slice(), 319 /// ) 320 /// .await?.set_ttl(100, 100).expect("could not set TTL"); 321 /// Ok(()) 322 /// } 323 /// ``` 324 #[inline] 325 pub fn set_ttl(&self, ttl_ip6: u32, ttl_ip4: u32) -> Result<()> { 326 self.ip6 327 .set_ttl(ttl_ip6) 328 .and_then(|()| self.ip4.set_ttl(ttl_ip4)) 329 } 330 /// Gets the values of the `IP_TTL` option for both sockets. 331 /// 332 /// The first `u32` represents the `IP_TTL` value for the IPv6 socket and the second `u32` is the 333 /// `IP_TTL` value for the IPv4 socket. For more information about this option, see [`Self::set_ttl`]. 334 /// 335 /// # Errors 336 /// 337 /// Errors iff [`TcpListener::ttl`] does for either listener. 338 /// 339 /// # Examples 340 /// 341 /// ```no_run 342 /// # use core::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; 343 /// # use std::io::Result; 344 /// # use tokio_dual_stack::{DualStackTcpListener, Tcp as _}; 345 /// #[tokio::main(flavor = "current_thread")] 346 /// async fn main() -> Result<()> { 347 /// let listener = DualStackTcpListener::bind( 348 /// [ 349 /// SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0)), 350 /// SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)), 351 /// ] 352 /// .as_slice(), 353 /// ) 354 /// .await?; 355 /// listener.set_ttl(100, 100).expect("could not set TTL"); 356 /// assert_eq!(listener.ttl()?, (100, 100)); 357 /// Ok(()) 358 /// } 359 /// ``` 360 #[inline] 361 pub fn ttl(&self) -> Result<(u32, u32)> { 362 self.ip6 363 .ttl() 364 .and_then(|ip6| self.ip4.ttl().map(|ip4| (ip6, ip4))) 365 } 366 } 367 pin_project! { 368 /// `Future` returned by [`DualStackTcpListener::accept]`. 369 struct AcceptFut< 370 F: Future<Output = Result<(TcpStream, SocketAddr)>>, 371 F2: Future<Output = Result<(TcpStream, SocketAddr)>>, 372 > { 373 // Accept future for one `TcpListener`. 374 #[pin] 375 fut_1: F, 376 // Accept future for the other `TcpListener`. 377 #[pin] 378 fut_2: F2, 379 } 380 } 381 impl< 382 F: Future<Output = Result<(TcpStream, SocketAddr)>>, 383 F2: Future<Output = Result<(TcpStream, SocketAddr)>>, 384 > Future for AcceptFut<F, F2> 385 { 386 type Output = Result<(TcpStream, SocketAddr)>; 387 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 388 let this = self.project(); 389 // Note we defer errors caused from polling a completed `Future` to the contained `tokio` `Future`s. 390 // The only time this `Future` can be polled after completion without an error (due to `tokio`) is 391 // if `fut_2` completes first, `self` is polled, then `fut_1` completes. We don't actually care 392 // that this happens since the correctness of the code is still fine. 393 // This means any bugs that could occur from polling this `Future` after completion are dependency-based 394 // bugs where the correct solution is to fix the bugs in `tokio`. 395 match this.fut_1.poll(cx) { 396 Poll::Ready(res) => Poll::Ready(res), 397 Poll::Pending => this.fut_2.poll(cx), 398 } 399 } 400 } 401 impl Sealed for DualStackTcpListener {} 402 impl Tcp for DualStackTcpListener { 403 #[inline] 404 async fn bind<A: ToSocketAddrs>(addr: A) -> Result<Self> { 405 match net::lookup_host(addr).await { 406 Ok(socks) => { 407 let mut last_err = None; 408 let mut ip6_opt = None; 409 let mut ip4_opt = None; 410 for sock in socks { 411 match ip6_opt { 412 None => match ip4_opt { 413 None => { 414 let is_ip6 = sock.is_ipv6(); 415 match TcpListener::bind(sock).await { 416 Ok(ip) => { 417 if is_ip6 { 418 ip6_opt = Some(ip); 419 } else { 420 ip4_opt = Some(ip); 421 } 422 } 423 Err(err) => last_err = Some(err), 424 } 425 } 426 Some(ip4) => { 427 if sock.is_ipv6() { 428 match TcpListener::bind(sock).await { 429 Ok(ip6) => { 430 return Ok(Self { 431 ip6, 432 ip4, 433 ip6_first: AtomicBool::new(true), 434 }); 435 } 436 Err(err) => last_err = Some(err), 437 } 438 } 439 ip4_opt = Some(ip4); 440 } 441 }, 442 Some(ip6) => { 443 if sock.is_ipv4() { 444 match TcpListener::bind(sock).await { 445 Ok(ip4) => { 446 return Ok(Self { 447 ip6, 448 ip4, 449 ip6_first: AtomicBool::new(true), 450 }); 451 } 452 Err(err) => last_err = Some(err), 453 } 454 } 455 ip6_opt = Some(ip6); 456 } 457 } 458 } 459 Err(last_err.unwrap_or_else(|| { 460 Error::new( 461 ErrorKind::InvalidInput, 462 "could not resolve to an IPv6 and IPv4 address", 463 ) 464 })) 465 } 466 Err(err) => Err(err), 467 } 468 } 469 #[inline] 470 fn accept(&self) -> impl Future<Output = Result<(TcpStream, SocketAddr)>> + Send + Sync { 471 // The correctness of code does not depend on `self.ip6_first`; therefore 472 // we elect for the most performant `Ordering`. 473 if self.ip6_first.swap(false, Ordering::Relaxed) { 474 AcceptFut { 475 fut_1: self.ip6.accept(), 476 fut_2: self.ip4.accept(), 477 } 478 } else { 479 // The correctness of code does not depend on `self.ip6_first`; therefore 480 // we elect for the most performant `Ordering`. 481 self.ip6_first.store(true, Ordering::Relaxed); 482 AcceptFut { 483 fut_1: self.ip4.accept(), 484 fut_2: self.ip6.accept(), 485 } 486 } 487 } 488 #[inline] 489 fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<Result<(TcpStream, SocketAddr)>> { 490 // The correctness of code does not depend on `self.ip6_first`; therefore 491 // we elect for the most performant `Ordering`. 492 if self.ip6_first.swap(false, Ordering::Relaxed) { 493 self.ip6.poll_accept(cx) 494 } else { 495 // The correctness of code does not depend on `self.ip6_first`; therefore 496 // we elect for the most performant `Ordering`. 497 self.ip6_first.store(true, Ordering::Relaxed); 498 self.ip4.poll_accept(cx) 499 } 500 } 501 }