udp.rs 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. use crate::ResultType;
  2. use anyhow::{anyhow, Context};
  3. use bytes::{Bytes, BytesMut};
  4. use futures::{SinkExt, StreamExt};
  5. use protobuf::Message;
  6. use socket2::{Domain, Socket, Type};
  7. use std::net::SocketAddr;
  8. use tokio::net::{lookup_host, ToSocketAddrs, UdpSocket};
  9. use tokio_socks::{udp::Socks5UdpFramed, IntoTargetAddr, TargetAddr, ToProxyAddrs};
  10. use tokio_util::{codec::BytesCodec, udp::UdpFramed};
  11. pub enum FramedSocket {
  12. Direct(UdpFramed<BytesCodec>),
  13. ProxySocks(Socks5UdpFramed),
  14. }
  15. fn new_socket(addr: SocketAddr, reuse: bool, buf_size: usize) -> Result<Socket, std::io::Error> {
  16. let socket = match addr {
  17. SocketAddr::V4(..) => Socket::new(Domain::ipv4(), Type::dgram(), None),
  18. SocketAddr::V6(..) => Socket::new(Domain::ipv6(), Type::dgram(), None),
  19. }?;
  20. if reuse {
  21. // windows has no reuse_port, but its reuse_address
  22. // almost equals to unix's reuse_port + reuse_address,
  23. // though may introduce nondeterministic behavior.
  24. // illumos has no support for SO_REUSEPORT
  25. #[cfg(all(unix, not(target_os = "illumos")))]
  26. socket.set_reuse_port(true)?;
  27. socket.set_reuse_address(true)?;
  28. }
  29. // only nonblocking work with tokio, https://stackoverflow.com/questions/64649405/receiver-on-tokiompscchannel-only-receives-messages-when-buffer-is-full
  30. socket.set_nonblocking(true)?;
  31. if buf_size > 0 {
  32. socket.set_recv_buffer_size(buf_size).ok();
  33. }
  34. log::info!(
  35. "Receive buf size of udp {}: {:?}",
  36. addr,
  37. socket.recv_buffer_size()
  38. );
  39. if addr.is_ipv6() && addr.ip().is_unspecified() && addr.port() > 0 {
  40. socket.set_only_v6(false).ok();
  41. }
  42. socket.bind(&addr.into())?;
  43. Ok(socket)
  44. }
  45. impl FramedSocket {
  46. pub async fn new<T: ToSocketAddrs>(addr: T) -> ResultType<Self> {
  47. Self::new_reuse(addr, false, 0).await
  48. }
  49. pub async fn new_reuse<T: ToSocketAddrs>(
  50. addr: T,
  51. reuse: bool,
  52. buf_size: usize,
  53. ) -> ResultType<Self> {
  54. let addr = lookup_host(&addr)
  55. .await?
  56. .next()
  57. .context("could not resolve to any address")?;
  58. Ok(Self::Direct(UdpFramed::new(
  59. UdpSocket::from_std(new_socket(addr, reuse, buf_size)?.into_udp_socket())?,
  60. BytesCodec::new(),
  61. )))
  62. }
  63. pub async fn new_proxy<'a, 't, P: ToProxyAddrs, T: ToSocketAddrs>(
  64. proxy: P,
  65. local: T,
  66. username: &'a str,
  67. password: &'a str,
  68. ms_timeout: u64,
  69. ) -> ResultType<Self> {
  70. let framed = if username.trim().is_empty() {
  71. super::timeout(ms_timeout, Socks5UdpFramed::connect(proxy, Some(local))).await??
  72. } else {
  73. super::timeout(
  74. ms_timeout,
  75. Socks5UdpFramed::connect_with_password(proxy, Some(local), username, password),
  76. )
  77. .await??
  78. };
  79. log::trace!(
  80. "Socks5 udp connected, local addr: {:?}, target addr: {}",
  81. framed.local_addr(),
  82. framed.socks_addr()
  83. );
  84. Ok(Self::ProxySocks(framed))
  85. }
  86. #[inline]
  87. pub async fn send(
  88. &mut self,
  89. msg: &impl Message,
  90. addr: impl IntoTargetAddr<'_>,
  91. ) -> ResultType<()> {
  92. let addr = addr.into_target_addr()?.to_owned();
  93. let send_data = Bytes::from(msg.write_to_bytes()?);
  94. match self {
  95. Self::Direct(f) => {
  96. if let TargetAddr::Ip(addr) = addr {
  97. f.send((send_data, addr)).await?
  98. }
  99. }
  100. Self::ProxySocks(f) => f.send((send_data, addr)).await?,
  101. };
  102. Ok(())
  103. }
  104. // https://stackoverflow.com/a/68733302/1926020
  105. #[inline]
  106. pub async fn send_raw(
  107. &mut self,
  108. msg: &'static [u8],
  109. addr: impl IntoTargetAddr<'static>,
  110. ) -> ResultType<()> {
  111. let addr = addr.into_target_addr()?.to_owned();
  112. match self {
  113. Self::Direct(f) => {
  114. if let TargetAddr::Ip(addr) = addr {
  115. f.send((Bytes::from(msg), addr)).await?
  116. }
  117. }
  118. Self::ProxySocks(f) => f.send((Bytes::from(msg), addr)).await?,
  119. };
  120. Ok(())
  121. }
  122. #[inline]
  123. pub async fn next(&mut self) -> Option<ResultType<(BytesMut, TargetAddr<'static>)>> {
  124. match self {
  125. Self::Direct(f) => match f.next().await {
  126. Some(Ok((data, addr))) => {
  127. Some(Ok((data, addr.into_target_addr().ok()?.to_owned())))
  128. }
  129. Some(Err(e)) => Some(Err(anyhow!(e))),
  130. None => None,
  131. },
  132. Self::ProxySocks(f) => match f.next().await {
  133. Some(Ok((data, _))) => Some(Ok((data.data, data.dst_addr))),
  134. Some(Err(e)) => Some(Err(anyhow!(e))),
  135. None => None,
  136. },
  137. }
  138. }
  139. #[inline]
  140. pub async fn next_timeout(
  141. &mut self,
  142. ms: u64,
  143. ) -> Option<ResultType<(BytesMut, TargetAddr<'static>)>> {
  144. if let Ok(res) =
  145. tokio::time::timeout(std::time::Duration::from_millis(ms), self.next()).await
  146. {
  147. res
  148. } else {
  149. None
  150. }
  151. }
  152. pub fn local_addr(&self) -> Option<SocketAddr> {
  153. if let FramedSocket::Direct(x) = self {
  154. if let Ok(v) = x.get_ref().local_addr() {
  155. return Some(v);
  156. }
  157. }
  158. None
  159. }
  160. }