tcp.rs 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. use crate::{bail, bytes_codec::BytesCodec, ResultType};
  2. use anyhow::Context as AnyhowCtx;
  3. use bytes::{BufMut, Bytes, BytesMut};
  4. use futures::{SinkExt, StreamExt};
  5. use protobuf::Message;
  6. use sodiumoxide::crypto::secretbox::{self, Key, Nonce};
  7. use std::{
  8. io::{self, Error, ErrorKind},
  9. net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
  10. ops::{Deref, DerefMut},
  11. pin::Pin,
  12. task::{Context, Poll},
  13. };
  14. use tokio::{
  15. io::{AsyncRead, AsyncWrite, ReadBuf},
  16. net::{lookup_host, TcpListener, TcpSocket, ToSocketAddrs},
  17. };
  18. use tokio_socks::{tcp::Socks5Stream, IntoTargetAddr, ToProxyAddrs};
  19. use tokio_util::codec::Framed;
  20. pub trait TcpStreamTrait: AsyncRead + AsyncWrite + Unpin {}
  21. pub struct DynTcpStream(Box<dyn TcpStreamTrait + Send + Sync>);
  22. pub struct FramedStream(
  23. Framed<DynTcpStream, BytesCodec>,
  24. SocketAddr,
  25. Option<(Key, u64, u64)>,
  26. u64,
  27. );
  28. impl Deref for FramedStream {
  29. type Target = Framed<DynTcpStream, BytesCodec>;
  30. fn deref(&self) -> &Self::Target {
  31. &self.0
  32. }
  33. }
  34. impl DerefMut for FramedStream {
  35. fn deref_mut(&mut self) -> &mut Self::Target {
  36. &mut self.0
  37. }
  38. }
  39. impl Deref for DynTcpStream {
  40. type Target = Box<dyn TcpStreamTrait + Send + Sync>;
  41. fn deref(&self) -> &Self::Target {
  42. &self.0
  43. }
  44. }
  45. impl DerefMut for DynTcpStream {
  46. fn deref_mut(&mut self) -> &mut Self::Target {
  47. &mut self.0
  48. }
  49. }
  50. fn new_socket(addr: std::net::SocketAddr, reuse: bool) -> Result<TcpSocket, std::io::Error> {
  51. let socket = match addr {
  52. std::net::SocketAddr::V4(..) => TcpSocket::new_v4()?,
  53. std::net::SocketAddr::V6(..) => TcpSocket::new_v6()?,
  54. };
  55. if reuse {
  56. // windows has no reuse_port, but it's reuse_address
  57. // almost equals to unix's reuse_port + reuse_address,
  58. // though may introduce nondeterministic behavior
  59. #[cfg(unix)]
  60. socket.set_reuseport(true)?;
  61. socket.set_reuseaddr(true)?;
  62. }
  63. socket.bind(addr)?;
  64. Ok(socket)
  65. }
  66. impl FramedStream {
  67. pub async fn new<T: ToSocketAddrs + std::fmt::Display>(
  68. remote_addr: T,
  69. local_addr: Option<SocketAddr>,
  70. ms_timeout: u64,
  71. ) -> ResultType<Self> {
  72. for remote_addr in lookup_host(&remote_addr).await? {
  73. let local = if let Some(addr) = local_addr {
  74. addr
  75. } else {
  76. crate::config::Config::get_any_listen_addr(remote_addr.is_ipv4())
  77. };
  78. if let Ok(socket) = new_socket(local, true) {
  79. if let Ok(Ok(stream)) =
  80. super::timeout(ms_timeout, socket.connect(remote_addr)).await
  81. {
  82. stream.set_nodelay(true).ok();
  83. let addr = stream.local_addr()?;
  84. return Ok(Self(
  85. Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()),
  86. addr,
  87. None,
  88. 0,
  89. ));
  90. }
  91. }
  92. }
  93. bail!(format!("Failed to connect to {}", remote_addr));
  94. }
  95. pub async fn connect<'a, 't, P, T>(
  96. proxy: P,
  97. target: T,
  98. local_addr: Option<SocketAddr>,
  99. username: &'a str,
  100. password: &'a str,
  101. ms_timeout: u64,
  102. ) -> ResultType<Self>
  103. where
  104. P: ToProxyAddrs,
  105. T: IntoTargetAddr<'t>,
  106. {
  107. if let Some(Ok(proxy)) = proxy.to_proxy_addrs().next().await {
  108. let local = if let Some(addr) = local_addr {
  109. addr
  110. } else {
  111. crate::config::Config::get_any_listen_addr(proxy.is_ipv4())
  112. };
  113. let stream =
  114. super::timeout(ms_timeout, new_socket(local, true)?.connect(proxy)).await??;
  115. stream.set_nodelay(true).ok();
  116. let stream = if username.trim().is_empty() {
  117. super::timeout(
  118. ms_timeout,
  119. Socks5Stream::connect_with_socket(stream, target),
  120. )
  121. .await??
  122. } else {
  123. super::timeout(
  124. ms_timeout,
  125. Socks5Stream::connect_with_password_and_socket(
  126. stream, target, username, password,
  127. ),
  128. )
  129. .await??
  130. };
  131. let addr = stream.local_addr()?;
  132. return Ok(Self(
  133. Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()),
  134. addr,
  135. None,
  136. 0,
  137. ));
  138. }
  139. bail!("could not resolve to any address");
  140. }
  141. pub fn local_addr(&self) -> SocketAddr {
  142. self.1
  143. }
  144. pub fn set_send_timeout(&mut self, ms: u64) {
  145. self.3 = ms;
  146. }
  147. pub fn from(stream: impl TcpStreamTrait + Send + Sync + 'static, addr: SocketAddr) -> Self {
  148. Self(
  149. Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()),
  150. addr,
  151. None,
  152. 0,
  153. )
  154. }
  155. pub fn set_raw(&mut self) {
  156. self.0.codec_mut().set_raw();
  157. self.2 = None;
  158. }
  159. pub fn is_secured(&self) -> bool {
  160. self.2.is_some()
  161. }
  162. #[inline]
  163. pub async fn send(&mut self, msg: &impl Message) -> ResultType<()> {
  164. self.send_raw(msg.write_to_bytes()?).await
  165. }
  166. #[inline]
  167. pub async fn send_raw(&mut self, msg: Vec<u8>) -> ResultType<()> {
  168. let mut msg = msg;
  169. if let Some(key) = self.2.as_mut() {
  170. key.1 += 1;
  171. let nonce = Self::get_nonce(key.1);
  172. msg = secretbox::seal(&msg, &nonce, &key.0);
  173. }
  174. self.send_bytes(bytes::Bytes::from(msg)).await?;
  175. Ok(())
  176. }
  177. #[inline]
  178. pub async fn send_bytes(&mut self, bytes: Bytes) -> ResultType<()> {
  179. if self.3 > 0 {
  180. super::timeout(self.3, self.0.send(bytes)).await??;
  181. } else {
  182. self.0.send(bytes).await?;
  183. }
  184. Ok(())
  185. }
  186. #[inline]
  187. pub async fn next(&mut self) -> Option<Result<BytesMut, Error>> {
  188. let mut res = self.0.next().await;
  189. if let Some(key) = self.2.as_mut() {
  190. if let Some(Ok(bytes)) = res.as_mut() {
  191. key.2 += 1;
  192. let nonce = Self::get_nonce(key.2);
  193. match secretbox::open(bytes, &nonce, &key.0) {
  194. Ok(res) => {
  195. bytes.clear();
  196. bytes.put_slice(&res);
  197. }
  198. Err(()) => {
  199. return Some(Err(Error::new(ErrorKind::Other, "decryption error")));
  200. }
  201. }
  202. }
  203. }
  204. res
  205. }
  206. #[inline]
  207. pub async fn next_timeout(&mut self, ms: u64) -> Option<Result<BytesMut, Error>> {
  208. if let Ok(res) = super::timeout(ms, self.next()).await {
  209. res
  210. } else {
  211. None
  212. }
  213. }
  214. pub fn set_key(&mut self, key: Key) {
  215. self.2 = Some((key, 0, 0));
  216. }
  217. fn get_nonce(seqnum: u64) -> Nonce {
  218. let mut nonce = Nonce([0u8; secretbox::NONCEBYTES]);
  219. nonce.0[..std::mem::size_of_val(&seqnum)].copy_from_slice(&seqnum.to_le_bytes());
  220. nonce
  221. }
  222. }
  223. const DEFAULT_BACKLOG: u32 = 128;
  224. pub async fn new_listener<T: ToSocketAddrs>(addr: T, reuse: bool) -> ResultType<TcpListener> {
  225. if !reuse {
  226. Ok(TcpListener::bind(addr).await?)
  227. } else {
  228. let addr = lookup_host(&addr)
  229. .await?
  230. .next()
  231. .context("could not resolve to any address")?;
  232. new_socket(addr, true)?
  233. .listen(DEFAULT_BACKLOG)
  234. .map_err(anyhow::Error::msg)
  235. }
  236. }
  237. pub async fn listen_any(port: u16) -> ResultType<TcpListener> {
  238. if let Ok(mut socket) = TcpSocket::new_v6() {
  239. #[cfg(unix)]
  240. {
  241. use std::os::unix::io::{FromRawFd, IntoRawFd};
  242. let raw_fd = socket.into_raw_fd();
  243. let sock2 = unsafe { socket2::Socket::from_raw_fd(raw_fd) };
  244. sock2.set_only_v6(false).ok();
  245. socket = unsafe { TcpSocket::from_raw_fd(sock2.into_raw_fd()) };
  246. }
  247. #[cfg(windows)]
  248. {
  249. use std::os::windows::prelude::{FromRawSocket, IntoRawSocket};
  250. let raw_socket = socket.into_raw_socket();
  251. let sock2 = unsafe { socket2::Socket::from_raw_socket(raw_socket) };
  252. sock2.set_only_v6(false).ok();
  253. socket = unsafe { TcpSocket::from_raw_socket(sock2.into_raw_socket()) };
  254. }
  255. if socket
  256. .bind(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), port))
  257. .is_ok()
  258. {
  259. if let Ok(l) = socket.listen(DEFAULT_BACKLOG) {
  260. return Ok(l);
  261. }
  262. }
  263. }
  264. let s = TcpSocket::new_v4()?;
  265. s.bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port))?;
  266. Ok(s.listen(DEFAULT_BACKLOG)?)
  267. }
  268. impl Unpin for DynTcpStream {}
  269. impl AsyncRead for DynTcpStream {
  270. fn poll_read(
  271. mut self: Pin<&mut Self>,
  272. cx: &mut Context<'_>,
  273. buf: &mut ReadBuf<'_>,
  274. ) -> Poll<io::Result<()>> {
  275. AsyncRead::poll_read(Pin::new(&mut self.0), cx, buf)
  276. }
  277. }
  278. impl AsyncWrite for DynTcpStream {
  279. fn poll_write(
  280. mut self: Pin<&mut Self>,
  281. cx: &mut Context<'_>,
  282. buf: &[u8],
  283. ) -> Poll<io::Result<usize>> {
  284. AsyncWrite::poll_write(Pin::new(&mut self.0), cx, buf)
  285. }
  286. fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
  287. AsyncWrite::poll_flush(Pin::new(&mut self.0), cx)
  288. }
  289. fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
  290. AsyncWrite::poll_shutdown(Pin::new(&mut self.0), cx)
  291. }
  292. }
  293. impl<R: AsyncRead + AsyncWrite + Unpin> TcpStreamTrait for R {}