| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326 |
- use crate::{bail, bytes_codec::BytesCodec, ResultType};
- use anyhow::Context as AnyhowCtx;
- use bytes::{BufMut, Bytes, BytesMut};
- use futures::{SinkExt, StreamExt};
- use protobuf::Message;
- use sodiumoxide::crypto::secretbox::{self, Key, Nonce};
- use std::{
- io::{self, Error, ErrorKind},
- net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
- ops::{Deref, DerefMut},
- pin::Pin,
- task::{Context, Poll},
- };
- use tokio::{
- io::{AsyncRead, AsyncWrite, ReadBuf},
- net::{lookup_host, TcpListener, TcpSocket, ToSocketAddrs},
- };
- use tokio_socks::{tcp::Socks5Stream, IntoTargetAddr, ToProxyAddrs};
- use tokio_util::codec::Framed;
- pub trait TcpStreamTrait: AsyncRead + AsyncWrite + Unpin {}
- pub struct DynTcpStream(Box<dyn TcpStreamTrait + Send + Sync>);
- pub struct FramedStream(
- Framed<DynTcpStream, BytesCodec>,
- SocketAddr,
- Option<(Key, u64, u64)>,
- u64,
- );
- impl Deref for FramedStream {
- type Target = Framed<DynTcpStream, BytesCodec>;
- fn deref(&self) -> &Self::Target {
- &self.0
- }
- }
- impl DerefMut for FramedStream {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.0
- }
- }
- impl Deref for DynTcpStream {
- type Target = Box<dyn TcpStreamTrait + Send + Sync>;
- fn deref(&self) -> &Self::Target {
- &self.0
- }
- }
- impl DerefMut for DynTcpStream {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.0
- }
- }
- fn new_socket(addr: std::net::SocketAddr, reuse: bool) -> Result<TcpSocket, std::io::Error> {
- let socket = match addr {
- std::net::SocketAddr::V4(..) => TcpSocket::new_v4()?,
- std::net::SocketAddr::V6(..) => TcpSocket::new_v6()?,
- };
- if reuse {
- // windows has no reuse_port, but it's reuse_address
- // almost equals to unix's reuse_port + reuse_address,
- // though may introduce nondeterministic behavior
- #[cfg(unix)]
- socket.set_reuseport(true)?;
- socket.set_reuseaddr(true)?;
- }
- socket.bind(addr)?;
- Ok(socket)
- }
- impl FramedStream {
- pub async fn new<T: ToSocketAddrs + std::fmt::Display>(
- remote_addr: T,
- local_addr: Option<SocketAddr>,
- ms_timeout: u64,
- ) -> ResultType<Self> {
- for remote_addr in lookup_host(&remote_addr).await? {
- let local = if let Some(addr) = local_addr {
- addr
- } else {
- crate::config::Config::get_any_listen_addr(remote_addr.is_ipv4())
- };
- if let Ok(socket) = new_socket(local, true) {
- if let Ok(Ok(stream)) =
- super::timeout(ms_timeout, socket.connect(remote_addr)).await
- {
- stream.set_nodelay(true).ok();
- let addr = stream.local_addr()?;
- return Ok(Self(
- Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()),
- addr,
- None,
- 0,
- ));
- }
- }
- }
- bail!(format!("Failed to connect to {}", remote_addr));
- }
- pub async fn connect<'a, 't, P, T>(
- proxy: P,
- target: T,
- local_addr: Option<SocketAddr>,
- username: &'a str,
- password: &'a str,
- ms_timeout: u64,
- ) -> ResultType<Self>
- where
- P: ToProxyAddrs,
- T: IntoTargetAddr<'t>,
- {
- if let Some(Ok(proxy)) = proxy.to_proxy_addrs().next().await {
- let local = if let Some(addr) = local_addr {
- addr
- } else {
- crate::config::Config::get_any_listen_addr(proxy.is_ipv4())
- };
- let stream =
- super::timeout(ms_timeout, new_socket(local, true)?.connect(proxy)).await??;
- stream.set_nodelay(true).ok();
- let stream = if username.trim().is_empty() {
- super::timeout(
- ms_timeout,
- Socks5Stream::connect_with_socket(stream, target),
- )
- .await??
- } else {
- super::timeout(
- ms_timeout,
- Socks5Stream::connect_with_password_and_socket(
- stream, target, username, password,
- ),
- )
- .await??
- };
- let addr = stream.local_addr()?;
- return Ok(Self(
- Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()),
- addr,
- None,
- 0,
- ));
- }
- bail!("could not resolve to any address");
- }
- pub fn local_addr(&self) -> SocketAddr {
- self.1
- }
- pub fn set_send_timeout(&mut self, ms: u64) {
- self.3 = ms;
- }
- pub fn from(stream: impl TcpStreamTrait + Send + Sync + 'static, addr: SocketAddr) -> Self {
- Self(
- Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()),
- addr,
- None,
- 0,
- )
- }
- pub fn set_raw(&mut self) {
- self.0.codec_mut().set_raw();
- self.2 = None;
- }
- pub fn is_secured(&self) -> bool {
- self.2.is_some()
- }
- #[inline]
- pub async fn send(&mut self, msg: &impl Message) -> ResultType<()> {
- self.send_raw(msg.write_to_bytes()?).await
- }
- #[inline]
- pub async fn send_raw(&mut self, msg: Vec<u8>) -> ResultType<()> {
- let mut msg = msg;
- if let Some(key) = self.2.as_mut() {
- key.1 += 1;
- let nonce = Self::get_nonce(key.1);
- msg = secretbox::seal(&msg, &nonce, &key.0);
- }
- self.send_bytes(bytes::Bytes::from(msg)).await?;
- Ok(())
- }
- #[inline]
- pub async fn send_bytes(&mut self, bytes: Bytes) -> ResultType<()> {
- if self.3 > 0 {
- super::timeout(self.3, self.0.send(bytes)).await??;
- } else {
- self.0.send(bytes).await?;
- }
- Ok(())
- }
- #[inline]
- pub async fn next(&mut self) -> Option<Result<BytesMut, Error>> {
- let mut res = self.0.next().await;
- if let Some(key) = self.2.as_mut() {
- if let Some(Ok(bytes)) = res.as_mut() {
- key.2 += 1;
- let nonce = Self::get_nonce(key.2);
- match secretbox::open(bytes, &nonce, &key.0) {
- Ok(res) => {
- bytes.clear();
- bytes.put_slice(&res);
- }
- Err(()) => {
- return Some(Err(Error::new(ErrorKind::Other, "decryption error")));
- }
- }
- }
- }
- res
- }
- #[inline]
- pub async fn next_timeout(&mut self, ms: u64) -> Option<Result<BytesMut, Error>> {
- if let Ok(res) = super::timeout(ms, self.next()).await {
- res
- } else {
- None
- }
- }
- pub fn set_key(&mut self, key: Key) {
- self.2 = Some((key, 0, 0));
- }
- fn get_nonce(seqnum: u64) -> Nonce {
- let mut nonce = Nonce([0u8; secretbox::NONCEBYTES]);
- nonce.0[..std::mem::size_of_val(&seqnum)].copy_from_slice(&seqnum.to_le_bytes());
- nonce
- }
- }
- const DEFAULT_BACKLOG: u32 = 128;
- pub async fn new_listener<T: ToSocketAddrs>(addr: T, reuse: bool) -> ResultType<TcpListener> {
- if !reuse {
- Ok(TcpListener::bind(addr).await?)
- } else {
- let addr = lookup_host(&addr)
- .await?
- .next()
- .context("could not resolve to any address")?;
- new_socket(addr, true)?
- .listen(DEFAULT_BACKLOG)
- .map_err(anyhow::Error::msg)
- }
- }
- pub async fn listen_any(port: u16) -> ResultType<TcpListener> {
- if let Ok(mut socket) = TcpSocket::new_v6() {
- #[cfg(unix)]
- {
- use std::os::unix::io::{FromRawFd, IntoRawFd};
- let raw_fd = socket.into_raw_fd();
- let sock2 = unsafe { socket2::Socket::from_raw_fd(raw_fd) };
- sock2.set_only_v6(false).ok();
- socket = unsafe { TcpSocket::from_raw_fd(sock2.into_raw_fd()) };
- }
- #[cfg(windows)]
- {
- use std::os::windows::prelude::{FromRawSocket, IntoRawSocket};
- let raw_socket = socket.into_raw_socket();
- let sock2 = unsafe { socket2::Socket::from_raw_socket(raw_socket) };
- sock2.set_only_v6(false).ok();
- socket = unsafe { TcpSocket::from_raw_socket(sock2.into_raw_socket()) };
- }
- if socket
- .bind(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), port))
- .is_ok()
- {
- if let Ok(l) = socket.listen(DEFAULT_BACKLOG) {
- return Ok(l);
- }
- }
- }
- let s = TcpSocket::new_v4()?;
- s.bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port))?;
- Ok(s.listen(DEFAULT_BACKLOG)?)
- }
- impl Unpin for DynTcpStream {}
- impl AsyncRead for DynTcpStream {
- fn poll_read(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &mut ReadBuf<'_>,
- ) -> Poll<io::Result<()>> {
- AsyncRead::poll_read(Pin::new(&mut self.0), cx, buf)
- }
- }
- impl AsyncWrite for DynTcpStream {
- fn poll_write(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<io::Result<usize>> {
- AsyncWrite::poll_write(Pin::new(&mut self.0), cx, buf)
- }
- fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- AsyncWrite::poll_flush(Pin::new(&mut self.0), cx)
- }
- fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- AsyncWrite::poll_shutdown(Pin::new(&mut self.0), cx)
- }
- }
- impl<R: AsyncRead + AsyncWrite + Unpin> TcpStreamTrait for R {}
|