udp.rs 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. use crate::ResultType;
  2. use anyhow::{anyhow, Context};
  3. use bytes::{Bytes, BytesMut};
  4. use futures::{SinkExt, StreamExt};
  5. use protobuf::Message;
  6. use socket2::{Domain, Socket, Type};
  7. use std::net::SocketAddr;
  8. use tokio::net::{lookup_host, ToSocketAddrs, UdpSocket};
  9. use tokio_socks::{udp::Socks5UdpFramed, IntoTargetAddr, TargetAddr, ToProxyAddrs};
  10. use tokio_util::{codec::BytesCodec, udp::UdpFramed};
  11. pub enum FramedSocket {
  12. Direct(UdpFramed<BytesCodec>),
  13. ProxySocks(Socks5UdpFramed),
  14. }
  15. fn new_socket(addr: SocketAddr, reuse: bool, buf_size: usize) -> Result<Socket, std::io::Error> {
  16. let socket = match addr {
  17. SocketAddr::V4(..) => Socket::new(Domain::ipv4(), Type::dgram(), None),
  18. SocketAddr::V6(..) => Socket::new(Domain::ipv6(), Type::dgram(), None),
  19. }?;
  20. if reuse {
  21. // windows has no reuse_port, but it's reuse_address
  22. // almost equals to unix's reuse_port + reuse_address,
  23. // though may introduce nondeterministic behavior
  24. #[cfg(unix)]
  25. socket.set_reuse_port(true)?;
  26. socket.set_reuse_address(true)?;
  27. }
  28. // only nonblocking work with tokio, https://stackoverflow.com/questions/64649405/receiver-on-tokiompscchannel-only-receives-messages-when-buffer-is-full
  29. socket.set_nonblocking(true)?;
  30. if buf_size > 0 {
  31. socket.set_recv_buffer_size(buf_size).ok();
  32. }
  33. log::info!(
  34. "Receive buf size of udp {}: {:?}",
  35. addr,
  36. socket.recv_buffer_size()
  37. );
  38. if addr.is_ipv6() && addr.ip().is_unspecified() && addr.port() > 0 {
  39. socket.set_only_v6(false).ok();
  40. }
  41. socket.bind(&addr.into())?;
  42. Ok(socket)
  43. }
  44. impl FramedSocket {
  45. pub async fn new<T: ToSocketAddrs>(addr: T) -> ResultType<Self> {
  46. Self::new_reuse(addr, false, 0).await
  47. }
  48. pub async fn new_reuse<T: ToSocketAddrs>(
  49. addr: T,
  50. reuse: bool,
  51. buf_size: usize,
  52. ) -> ResultType<Self> {
  53. let addr = lookup_host(&addr)
  54. .await?
  55. .next()
  56. .context("could not resolve to any address")?;
  57. Ok(Self::Direct(UdpFramed::new(
  58. UdpSocket::from_std(new_socket(addr, reuse, buf_size)?.into_udp_socket())?,
  59. BytesCodec::new(),
  60. )))
  61. }
  62. pub async fn new_proxy<'a, 't, P: ToProxyAddrs, T: ToSocketAddrs>(
  63. proxy: P,
  64. local: T,
  65. username: &'a str,
  66. password: &'a str,
  67. ms_timeout: u64,
  68. ) -> ResultType<Self> {
  69. let framed = if username.trim().is_empty() {
  70. super::timeout(ms_timeout, Socks5UdpFramed::connect(proxy, Some(local))).await??
  71. } else {
  72. super::timeout(
  73. ms_timeout,
  74. Socks5UdpFramed::connect_with_password(proxy, Some(local), username, password),
  75. )
  76. .await??
  77. };
  78. log::trace!(
  79. "Socks5 udp connected, local addr: {:?}, target addr: {}",
  80. framed.local_addr(),
  81. framed.socks_addr()
  82. );
  83. Ok(Self::ProxySocks(framed))
  84. }
  85. #[inline]
  86. pub async fn send(
  87. &mut self,
  88. msg: &impl Message,
  89. addr: impl IntoTargetAddr<'_>,
  90. ) -> ResultType<()> {
  91. let addr = addr.into_target_addr()?.to_owned();
  92. let send_data = Bytes::from(msg.write_to_bytes()?);
  93. match self {
  94. Self::Direct(f) => {
  95. if let TargetAddr::Ip(addr) = addr {
  96. f.send((send_data, addr)).await?
  97. }
  98. }
  99. Self::ProxySocks(f) => f.send((send_data, addr)).await?,
  100. };
  101. Ok(())
  102. }
  103. // https://stackoverflow.com/a/68733302/1926020
  104. #[inline]
  105. pub async fn send_raw(
  106. &mut self,
  107. msg: &'static [u8],
  108. addr: impl IntoTargetAddr<'static>,
  109. ) -> ResultType<()> {
  110. let addr = addr.into_target_addr()?.to_owned();
  111. match self {
  112. Self::Direct(f) => {
  113. if let TargetAddr::Ip(addr) = addr {
  114. f.send((Bytes::from(msg), addr)).await?
  115. }
  116. }
  117. Self::ProxySocks(f) => f.send((Bytes::from(msg), addr)).await?,
  118. };
  119. Ok(())
  120. }
  121. #[inline]
  122. pub async fn next(&mut self) -> Option<ResultType<(BytesMut, TargetAddr<'static>)>> {
  123. match self {
  124. Self::Direct(f) => match f.next().await {
  125. Some(Ok((data, addr))) => {
  126. Some(Ok((data, addr.into_target_addr().ok()?.to_owned())))
  127. }
  128. Some(Err(e)) => Some(Err(anyhow!(e))),
  129. None => None,
  130. },
  131. Self::ProxySocks(f) => match f.next().await {
  132. Some(Ok((data, _))) => Some(Ok((data.data, data.dst_addr))),
  133. Some(Err(e)) => Some(Err(anyhow!(e))),
  134. None => None,
  135. },
  136. }
  137. }
  138. #[inline]
  139. pub async fn next_timeout(
  140. &mut self,
  141. ms: u64,
  142. ) -> Option<ResultType<(BytesMut, TargetAddr<'static>)>> {
  143. if let Ok(res) =
  144. tokio::time::timeout(std::time::Duration::from_millis(ms), self.next()).await
  145. {
  146. res
  147. } else {
  148. None
  149. }
  150. }
  151. pub fn local_addr(&self) -> Option<SocketAddr> {
  152. if let FramedSocket::Direct(x) = self {
  153. if let Ok(v) = x.get_ref().local_addr() {
  154. return Some(v);
  155. }
  156. }
  157. None
  158. }
  159. }