lib.rs (21667B)
1 //! [![git]](https://git.philomathiclife.com/tokio_dual_stack/log.html) [![crates-io]](https://crates.io/crates/tokio_dual_stack) [![docs-rs]](crate) 2 //! 3 //! [git]: https://git.philomathiclife.com/git_badge.svg 4 //! [crates-io]: https://img.shields.io/badge/crates.io-fc8d62?style=for-the-badge&labelColor=555555&logo=rust 5 //! [docs-rs]: https://img.shields.io/badge/docs.rs-66c2a5?style=for-the-badge&labelColor=555555&logo=docs.rs 6 //! 7 //! `tokio_dual_stack` is a library that adds a "dual-stack" [`TcpListener`]. 8 //! 9 //! ## Why is this useful? 10 //! 11 //! Only certain platforms offer the ability for one socket to handle both IPv6 and IPv4 requests 12 //! (e.g., OpenBSD does not). For the platforms that do, it is often dependent on runtime configuration 13 //! (e.g., [`IPV6_V6ONLY`](https://www.man7.org/linux/man-pages/man7/ipv6.7.html)). Additionally those platforms 14 //! that support it often require the "wildcard" IPv6 address to be used (i.e., `::`) which has the unfortunate 15 //! consequence of preventing other services from using the same protocol port. 16 //! 17 //! There are a few ways to work around this issue. One is to deploy the same service twice: one that uses 18 //! an IPv6 socket and the other that uses an IPv4 socket. This can complicate deployments (e.g., the application 19 //! may not have been written with the expectation that multiple deployments could be running at the same time) in 20 //! addition to using more resources. Another is for the application to manually handle each socket (e.g., 21 //! [`select`](https://docs.rs/tokio/latest/tokio/macro.select.html)/[`join`](https://docs.rs/tokio/latest/tokio/macro.join.html) 22 //! each [`TcpListener::accept`]). 23 //! 24 //! [`DualStackTcpListener`] chooses an implementation similar to what the equivalent `select` would do while 25 //! also ensuring that one socket does not "starve" another by ensuring each socket is fairly given an opportunity 26 //! to `TcpListener::accept` a connection. This has the nice benefit of having a similar API to what a single 27 //! `TcpListener` would have as well as having similar performance to a socket that does handle both IPv6 and 28 //! IPv4 requests. 29 #![deny( 30 unknown_lints, 31 future_incompatible, 32 let_underscore, 33 missing_docs, 34 nonstandard_style, 35 refining_impl_trait, 36 rust_2018_compatibility, 37 rust_2018_idioms, 38 rust_2021_compatibility, 39 rust_2024_compatibility, 40 unsafe_code, 41 unused, 42 warnings, 43 clippy::all, 44 clippy::cargo, 45 clippy::complexity, 46 clippy::correctness, 47 clippy::nursery, 48 clippy::pedantic, 49 clippy::perf, 50 clippy::restriction, 51 clippy::style, 52 clippy::suspicious 53 )] 54 #![expect( 55 clippy::arbitrary_source_item_ordering, 56 clippy::blanket_clippy_restriction_lints, 57 clippy::implicit_return, 58 clippy::return_and_then, 59 reason = "noisy, opinionated, and likely doesn't prevent bugs or improve APIs" 60 )] 61 use core::{ 62 future::Future, 63 net::{SocketAddr, SocketAddrV4, SocketAddrV6}, 64 pin::Pin, 65 sync::atomic::{AtomicBool, Ordering}, 66 task::{Context, Poll}, 67 }; 68 use pin_project_lite::pin_project; 69 use std::io::{Error, ErrorKind, Result}; 70 use tokio::net::{self, TcpListener, TcpSocket, TcpStream, ToSocketAddrs}; 71 /// Prevents [`Sealed`] from being publicly implementable. 72 mod private { 73 /// Marker trait to prevent [`super::Tcp`] from being publicly implementable. 74 pub trait Sealed {} 75 } 76 use private::Sealed; 77 /// TCP "listener". 78 /// 79 /// This `trait` is sealed and cannot be implemented for types outside of `tokio_dual_stack`. 80 /// 81 /// This exists primarily as a way to define type constructors or polymorphic functions 82 /// that can user either a [`TcpListener`] or [`DualStackTcpListener`]. 83 /// 84 /// # Examples 85 /// 86 /// ```no_run 87 /// # use core::convert::Infallible; 88 /// # use tokio_dual_stack::Tcp; 89 /// async fn main_loop<T: Tcp>(listener: T) -> Infallible { 90 /// loop { 91 /// match listener.accept().await { 92 /// Ok((_, socket)) => println!("Client socket: {socket}"), 93 /// Err(e) => println!("TCP connection failure: {e}"), 94 /// } 95 /// } 96 /// } 97 /// ``` 98 pub trait Tcp: Sealed + Sized { 99 /// Creates a new TCP listener, which will be bound to the specified address(es). 100 /// 101 /// The returned listener is ready for accepting connections. 102 /// 103 /// Binding with a port number of 0 will request that the OS assigns a port to this listener. 104 /// The port allocated can be queried via the `local_addr` method. 105 /// 106 /// The address type can be any implementor of the [`ToSocketAddrs`] trait. If `addr` yields 107 /// multiple addresses, bind will be attempted with each of the addresses until one succeeds 108 /// and returns the listener. If none of the addresses succeed in creating a listener, the 109 /// error returned from the last attempt (the last address) is returned. 110 /// 111 /// This function sets the `SO_REUSEADDR` option on the socket. 112 /// 113 /// # Examples 114 /// 115 /// ```no_run 116 /// # use core::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; 117 /// # use std::io::Result; 118 /// # use tokio_dual_stack::{DualStackTcpListener, Tcp as _}; 119 /// #[tokio::main(flavor = "current_thread")] 120 /// async fn main() -> Result<()> { 121 /// let listener = DualStackTcpListener::bind( 122 /// [ 123 /// SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0)), 124 /// SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)), 125 /// ] 126 /// .as_slice(), 127 /// ) 128 /// .await?; 129 /// Ok(()) 130 /// } 131 /// ``` 132 fn bind<A: ToSocketAddrs>(addr: A) -> impl Future<Output = Result<Self>>; 133 /// Accepts a new incoming connection from this listener. 134 /// 135 /// This function will yield once a new TCP connection is established. When established, 136 /// the corresponding `TcpStream` and the remote peer’s address will be returned. 137 /// 138 /// # Cancel safety 139 /// 140 /// This method is cancel safe. If the method is used as the event in a 141 /// [`tokio::select!`](https://docs.rs/tokio/latest/tokio/macro.select.html) 142 /// statement and some other branch completes first, then it is guaranteed that no new 143 /// connections were accepted by this method. 144 /// 145 /// # Examples 146 /// 147 /// ```no_run 148 /// # use core::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; 149 /// # use std::io::Result; 150 /// # use tokio_dual_stack::{DualStackTcpListener, Tcp as _}; 151 /// #[tokio::main(flavor = "current_thread")] 152 /// async fn main() -> Result<()> { 153 /// match DualStackTcpListener::bind( 154 /// [ 155 /// SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0)), 156 /// SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)), 157 /// ] 158 /// .as_slice(), 159 /// ) 160 /// .await?.accept().await { 161 /// Ok((_, addr)) => println!("new client: {addr}"), 162 /// Err(e) => println!("couldn't get client: {e}"), 163 /// } 164 /// Ok(()) 165 /// } 166 /// ``` 167 fn accept(&self) -> impl Future<Output = Result<(TcpStream, SocketAddr)>> + Send + Sync; 168 /// Polls to accept a new incoming connection to this listener. 169 /// 170 /// If there is no connection to accept, `Poll::Pending` is returned and the current task will be notified by 171 /// a waker. Note that on multiple calls to `poll_accept`, only the `Waker` from the `Context` passed to the 172 /// most recent call is scheduled to receive a wakeup. 173 fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<Result<(TcpStream, SocketAddr)>>; 174 } 175 impl Sealed for TcpListener {} 176 impl Tcp for TcpListener { 177 #[inline] 178 fn bind<A: ToSocketAddrs>(addr: A) -> impl Future<Output = Result<Self>> { 179 Self::bind(addr) 180 } 181 #[inline] 182 fn accept(&self) -> impl Future<Output = Result<(TcpStream, SocketAddr)>> + Send + Sync { 183 self.accept() 184 } 185 #[inline] 186 fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<Result<(TcpStream, SocketAddr)>> { 187 self.poll_accept(cx) 188 } 189 } 190 /// "Dual-stack" TCP listener. 191 /// 192 /// IPv6 and IPv4 TCP listener. 193 #[derive(Debug)] 194 pub struct DualStackTcpListener { 195 /// IPv6 TCP listener. 196 ip6: TcpListener, 197 /// IPv4 TCP listener. 198 ip4: TcpListener, 199 /// `true` iff [`Self::ip6::accept`] should be `poll`ed first; otherwise [`Self::ip4::accept`] is `poll`ed 200 /// first. 201 /// 202 /// This exists to prevent one IP version from "starving" another. Each time [`Self::accept`] or 203 /// [`Self::poll_accept`] is called, it's overwritten with the opposite `bool`. 204 /// 205 /// Note we could make this a `core::cell::Cell`; but for maximal flexibility and consistency with `TcpListener`, 206 /// we use an `AtomicBool`. This among other things means `DualStackTcpListener` will implement `Sync`. 207 ip6_first: AtomicBool, 208 } 209 impl DualStackTcpListener { 210 /// Creates `Self` using the [`TcpListener`]s returned from [`TcpSocket::listen`]. 211 /// 212 /// [`Self::bind`] is useful when the behavior of [`TcpListener::bind`] is sufficient; however if the underlying 213 /// `TcpSocket`s need to be configured differently, then one must call this function instead. 214 /// 215 /// # Errors 216 /// 217 /// Errors iff [`TcpSocket::local_addr`] does for either socket, the underlying sockets use the same IP version, 218 /// or [`TcpSocket::listen`] errors for either socket. 219 /// 220 /// Note on Windows-based platforms `TcpSocket::local_addr` will error if [`TcpSocket::bind`] was not called. 221 /// 222 /// # Examples 223 /// 224 /// ```no_run 225 /// # use core::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; 226 /// # use std::io::Result; 227 /// # use tokio_dual_stack::DualStackTcpListener; 228 /// # use tokio::net::TcpSocket; 229 /// #[tokio::main(flavor = "current_thread")] 230 /// async fn main() -> Result<()> { 231 /// let ip6 = TcpSocket::new_v6()?; 232 /// ip6.bind(SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0)))?; 233 /// let ip4 = TcpSocket::new_v4()?; 234 /// ip4.bind(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)))?; 235 /// let listener = DualStackTcpListener::from_sockets((ip6, 1024), (ip4, 1024))?; 236 /// Ok(()) 237 /// } 238 /// ``` 239 #[inline] 240 pub fn from_sockets( 241 (socket_1, backlog_1): (TcpSocket, u32), 242 (socket_2, backlog_2): (TcpSocket, u32), 243 ) -> Result<Self> { 244 socket_1.local_addr().and_then(|sock| { 245 socket_2.local_addr().and_then(|sock_2| { 246 if sock.is_ipv6() { 247 if sock_2.is_ipv4() { 248 socket_1.listen(backlog_1).and_then(|ip6| { 249 socket_2.listen(backlog_2).map(|ip4| Self { 250 ip6, 251 ip4, 252 ip6_first: AtomicBool::new(true), 253 }) 254 }) 255 } else { 256 Err(Error::new( 257 ErrorKind::InvalidData, 258 "TcpSockets are the same IP version", 259 )) 260 } 261 } else if sock_2.is_ipv6() { 262 socket_1.listen(backlog_1).and_then(|ip4| { 263 socket_2.listen(backlog_2).map(|ip6| Self { 264 ip6, 265 ip4, 266 ip6_first: AtomicBool::new(true), 267 }) 268 }) 269 } else { 270 Err(Error::new( 271 ErrorKind::InvalidData, 272 "TcpSockets are the same IP version", 273 )) 274 } 275 }) 276 }) 277 } 278 /// Returns the local address of each socket that the listeners are bound to. 279 /// 280 /// This can be useful, for example, when binding to port 0 to figure out which port was actually bound. 281 /// 282 /// # Errors 283 /// 284 /// Errors iff [`TcpListener::local_addr`] does for either listener. 285 /// 286 /// # Examples 287 /// 288 /// ```no_run 289 /// # use core::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; 290 /// # use std::io::Result; 291 /// # use tokio_dual_stack::{DualStackTcpListener, Tcp as _}; 292 /// #[tokio::main(flavor = "current_thread")] 293 /// async fn main() -> Result<()> { 294 /// let ip6 = SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0); 295 /// let ip4 = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080); 296 /// assert_eq!( 297 /// DualStackTcpListener::bind([SocketAddr::V6(ip6), SocketAddr::V4(ip4)].as_slice()) 298 /// .await? 299 /// .local_addr()?, 300 /// (ip6, ip4) 301 /// ); 302 /// Ok(()) 303 /// } 304 /// ``` 305 #[expect(clippy::unreachable, reason = "we want to crash when there is a bug")] 306 #[inline] 307 pub fn local_addr(&self) -> Result<(SocketAddrV6, SocketAddrV4)> { 308 self.ip6.local_addr().and_then(|ip6| { 309 self.ip4.local_addr().map(|ip4| { 310 ( 311 if let SocketAddr::V6(sock6) = ip6 { 312 sock6 313 } else { 314 unreachable!("there is a bug in DualStackTcpListener::bind") 315 }, 316 if let SocketAddr::V4(sock4) = ip4 { 317 sock4 318 } else { 319 unreachable!("there is a bug in DualStackTcpListener::bind") 320 }, 321 ) 322 }) 323 }) 324 } 325 /// Sets the value for the `IP_TTL` option on both sockets. 326 /// 327 /// This value sets the time-to-live field that is used in every packet sent from each socket. 328 /// `ttl_ip6` is the `IP_TTL` value for the IPv6 socket and `ttl_ip4` is the `IP_TTL` value for the 329 /// IPv4 socket. 330 /// 331 /// # Errors 332 /// 333 /// Errors iff [`TcpListener::set_ttl`] does for either listener. 334 /// 335 /// # Examples 336 /// 337 /// ```no_run 338 /// # use core::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; 339 /// # use std::io::Result; 340 /// # use tokio_dual_stack::{DualStackTcpListener, Tcp as _}; 341 /// #[tokio::main(flavor = "current_thread")] 342 /// async fn main() -> Result<()> { 343 /// DualStackTcpListener::bind( 344 /// [ 345 /// SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0)), 346 /// SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)), 347 /// ] 348 /// .as_slice(), 349 /// ) 350 /// .await?.set_ttl(100, 100).expect("could not set TTL"); 351 /// Ok(()) 352 /// } 353 /// ``` 354 #[inline] 355 pub fn set_ttl(&self, ttl_ip6: u32, ttl_ip4: u32) -> Result<()> { 356 self.ip6 357 .set_ttl(ttl_ip6) 358 .and_then(|()| self.ip4.set_ttl(ttl_ip4)) 359 } 360 /// Gets the values of the `IP_TTL` option for both sockets. 361 /// 362 /// The first `u32` represents the `IP_TTL` value for the IPv6 socket and the second `u32` is the 363 /// `IP_TTL` value for the IPv4 socket. For more information about this option, see [`Self::set_ttl`]. 364 /// 365 /// # Errors 366 /// 367 /// Errors iff [`TcpListener::ttl`] does for either listener. 368 /// 369 /// # Examples 370 /// 371 /// ```no_run 372 /// # use core::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; 373 /// # use std::io::Result; 374 /// # use tokio_dual_stack::{DualStackTcpListener, Tcp as _}; 375 /// #[tokio::main(flavor = "current_thread")] 376 /// async fn main() -> Result<()> { 377 /// let listener = DualStackTcpListener::bind( 378 /// [ 379 /// SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0)), 380 /// SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)), 381 /// ] 382 /// .as_slice(), 383 /// ) 384 /// .await?; 385 /// listener.set_ttl(100, 100).expect("could not set TTL"); 386 /// assert_eq!(listener.ttl()?, (100, 100)); 387 /// Ok(()) 388 /// } 389 /// ``` 390 #[inline] 391 pub fn ttl(&self) -> Result<(u32, u32)> { 392 self.ip6 393 .ttl() 394 .and_then(|ip6| self.ip4.ttl().map(|ip4| (ip6, ip4))) 395 } 396 } 397 pin_project! { 398 /// `Future` returned by [`DualStackTcpListener::accept]`. 399 struct AcceptFut< 400 F: Future<Output = Result<(TcpStream, SocketAddr)>>, 401 F2: Future<Output = Result<(TcpStream, SocketAddr)>>, 402 > { 403 // Accept future for one `TcpListener`. 404 #[pin] 405 fut_1: F, 406 // Accept future for the other `TcpListener`. 407 #[pin] 408 fut_2: F2, 409 } 410 } 411 impl< 412 F: Future<Output = Result<(TcpStream, SocketAddr)>>, 413 F2: Future<Output = Result<(TcpStream, SocketAddr)>>, 414 > Future for AcceptFut<F, F2> 415 { 416 type Output = Result<(TcpStream, SocketAddr)>; 417 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 418 let this = self.project(); 419 match this.fut_1.poll(cx) { 420 Poll::Ready(res) => Poll::Ready(res), 421 Poll::Pending => this.fut_2.poll(cx), 422 } 423 } 424 } 425 impl Sealed for DualStackTcpListener {} 426 impl Tcp for DualStackTcpListener { 427 #[inline] 428 async fn bind<A: ToSocketAddrs>(addr: A) -> Result<Self> { 429 match net::lookup_host(addr).await { 430 Ok(socks) => { 431 let mut last_err = None; 432 let mut ip6_opt = None; 433 let mut ip4_opt = None; 434 for sock in socks { 435 match ip6_opt { 436 None => match ip4_opt { 437 None => { 438 let is_ip6 = sock.is_ipv6(); 439 match TcpListener::bind(sock).await { 440 Ok(ip) => { 441 if is_ip6 { 442 ip6_opt = Some(ip); 443 } else { 444 ip4_opt = Some(ip); 445 } 446 } 447 Err(err) => last_err = Some(err), 448 } 449 } 450 Some(ip4) => { 451 if sock.is_ipv6() { 452 match TcpListener::bind(sock).await { 453 Ok(ip6) => { 454 return Ok(Self { 455 ip6, 456 ip4, 457 ip6_first: AtomicBool::new(true), 458 }); 459 } 460 Err(err) => last_err = Some(err), 461 } 462 } 463 ip4_opt = Some(ip4); 464 } 465 }, 466 Some(ip6) => { 467 if sock.is_ipv4() { 468 match TcpListener::bind(sock).await { 469 Ok(ip4) => { 470 return Ok(Self { 471 ip6, 472 ip4, 473 ip6_first: AtomicBool::new(true), 474 }); 475 } 476 Err(err) => last_err = Some(err), 477 } 478 } 479 ip6_opt = Some(ip6); 480 } 481 } 482 } 483 Err(last_err.unwrap_or_else(|| { 484 Error::new( 485 ErrorKind::InvalidInput, 486 "could not resolve to an IPv6 and IPv4 address", 487 ) 488 })) 489 } 490 Err(err) => Err(err), 491 } 492 } 493 #[inline] 494 fn accept(&self) -> impl Future<Output = Result<(TcpStream, SocketAddr)>> + Send + Sync { 495 // The correctness of code does not depend on `self.ip6_first`; therefore 496 // we elect for the most performant `Ordering`. 497 if self.ip6_first.swap(false, Ordering::Relaxed) { 498 AcceptFut { 499 fut_1: self.ip6.accept(), 500 fut_2: self.ip4.accept(), 501 } 502 } else { 503 // The correctness of code does not depend on `self.ip6_first`; therefore 504 // we elect for the most performant `Ordering`. 505 self.ip6_first.store(true, Ordering::Relaxed); 506 AcceptFut { 507 fut_1: self.ip4.accept(), 508 fut_2: self.ip6.accept(), 509 } 510 } 511 } 512 #[inline] 513 fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<Result<(TcpStream, SocketAddr)>> { 514 // The correctness of code does not depend on `self.ip6_first`; therefore 515 // we elect for the most performant `Ordering`. 516 if self.ip6_first.swap(false, Ordering::Relaxed) { 517 self.ip6.poll_accept(cx) 518 } else { 519 // The correctness of code does not depend on `self.ip6_first`; therefore 520 // we elect for the most performant `Ordering`. 521 self.ip6_first.store(true, Ordering::Relaxed); 522 self.ip4.poll_accept(cx) 523 } 524 } 525 }