| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622 |
- use crate::common::*;
- use crate::peer::*;
- use hbb_common::bytes::BufMut;
- use hbb_common::{
- allow_err, bail,
- bytes::{Bytes, BytesMut},
- bytes_codec::BytesCodec,
- config,
- futures::future::join_all,
- futures_util::{
- sink::SinkExt,
- stream::{SplitSink, StreamExt},
- },
- log,
- protobuf::{Message as _, MessageField},
- rendezvous_proto::{
- register_pk_response::Result::{INVALID_ID_FORMAT, TOO_FREQUENT, UUID_MISMATCH},
- *,
- },
- sodiumoxide::crypto::{
- box_, box_::PublicKey, box_::SecretKey, secretbox, secretbox::Key, secretbox::Nonce, sign,
- },
- sodiumoxide::hex,
- tcp,
- tcp::Encrypt,
- tcp::{listen_any, FramedStream},
- timeout,
- tokio::{
- self,
- io::{AsyncReadExt, AsyncWriteExt},
- net::{TcpListener, TcpStream},
- sync::{mpsc, Mutex},
- time::{interval, Duration},
- },
- tokio_util::codec::Framed,
- try_into_v4,
- udp::FramedSocket,
- AddrMangle, ResultType,
- };
- use ipnetwork::Ipv4Network;
- use crate::jwt;
- use std::io::Error;
- use std::{
- collections::HashMap,
- net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
- sync::atomic::{AtomicBool, AtomicUsize, Ordering},
- sync::Arc,
- time::Instant,
- };
- #[derive(Clone, Debug)]
- enum Data {
- Msg(Box<RendezvousMessage>, SocketAddr),
- RelayServers0(String),
- RelayServers(RelayServers),
- }
- const REG_TIMEOUT: i32 = 30_000;
- type TcpStreamSink = SplitSink<Framed<TcpStream, BytesCodec>, Bytes>;
- type WsSink = SplitSink<tokio_tungstenite::WebSocketStream<TcpStream>, tungstenite::Message>;
- struct SafeWsSink {
- sink: WsSink,
- encrypt: Option<Encrypt>,
- }
- struct SafeTcpStreamSink {
- sink: TcpStreamSink,
- encrypt: Option<Encrypt>,
- }
- enum Sink {
- // TcpStream(TcpStreamSink),
- // Ws(WsSink),
- Wss(SafeWsSink),
- Tss(SafeTcpStreamSink),
- }
- impl Sink {
- async fn send(&mut self, msg: &RendezvousMessage) {
- if let Ok(mut bytes) = msg.write_to_bytes() {
- match self {
- // Sink::TcpStream(mut s) => allow_err!(s.send(Bytes::from(bytes)).await),
- // Sink::Ws(mut s) => allow_err!(s.send(tungstenite::Message::Binary(bytes)).await),
- Sink::Wss(s) => {
- if let Some(key) = s.encrypt.as_mut() {
- bytes = key.enc(&bytes);
- }
- allow_err!(s.sink.send(tungstenite::Message::Binary(bytes)).await)
- }
- Sink::Tss(s) => {
- if let Some(key) = s.encrypt.as_mut() {
- bytes = key.enc(&bytes);
- }
- allow_err!(s.sink.send(Bytes::from(bytes)).await)
- }
- }
- }
- }
- }
- type Sender = mpsc::UnboundedSender<Data>;
- type Receiver = mpsc::UnboundedReceiver<Data>;
- static ROTATION_RELAY_SERVER: AtomicUsize = AtomicUsize::new(0);
- type RelayServers = Vec<String>;
- const CHECK_RELAY_TIMEOUT: u64 = 3_000;
- static ALWAYS_USE_RELAY: AtomicBool = AtomicBool::new(false);
- static MUST_LOGIN: AtomicBool = AtomicBool::new(false);
- #[derive(Clone)]
- struct Inner {
- serial: i32,
- version: String,
- software_url: String,
- mask: Option<Ipv4Network>,
- local_ip: String,
- sk: Option<sign::SecretKey>,
- secure_tcp_pk_b: PublicKey,
- secure_tcp_sk_b: SecretKey,
- }
- #[derive(Clone)]
- pub struct RendezvousServer {
- tcp_punch: Arc<Mutex<HashMap<SocketAddr, Sink>>>,
- pm: PeerMap,
- tx: Sender,
- relay_servers: Arc<RelayServers>,
- relay_servers0: Arc<RelayServers>,
- rendezvous_servers: Arc<Vec<String>>,
- inner: Arc<Inner>,
- ws_map: Arc<Mutex<HashMap<SocketAddr, Sink>>>,
- }
- enum LoopFailure {
- UdpSocket,
- Listener3,
- Listener2,
- Listener,
- }
- impl RendezvousServer {
- #[tokio::main(flavor = "multi_thread")]
- pub async fn start(port: i32, serial: i32, key: &str, rmem: usize) -> ResultType<()> {
- let (key, sk) = Self::get_server_sk(key);
- let nat_port = port - 1;
- let ws_port = port + 2;
- let pm = PeerMap::new().await?;
- log::info!("serial={}", serial);
- let rendezvous_servers = get_servers(&get_arg("rendezvous-servers"), "rendezvous-servers");
- log::info!("Listening on tcp/udp :{}", port);
- log::info!("Listening on tcp :{}, extra port for NAT test", nat_port);
- log::info!("Listening on websocket :{}", ws_port);
- let mut socket = create_udp_listener(port, rmem).await?;
- let (tx, mut rx) = mpsc::unbounded_channel::<Data>();
- let software_url = get_arg("software-url");
- let version = hbb_common::get_version_from_url(&software_url);
- if !version.is_empty() {
- log::info!("software_url: {}, version: {}", software_url, version);
- }
- let mask = get_arg("mask").parse().ok();
- let local_ip = if mask.is_none() {
- "".to_owned()
- } else {
- get_arg_or(
- "local-ip",
- local_ip_address::local_ip()
- .map(|x| x.to_string())
- .unwrap_or_default(),
- )
- };
- // For privacy use per connection key pair
- let (secure_tcp_pk_b, secure_tcp_sk_b) = box_::gen_keypair();
- let mut rs = Self {
- tcp_punch: Arc::new(Mutex::new(HashMap::new())),
- pm,
- tx: tx.clone(),
- relay_servers: Default::default(),
- relay_servers0: Default::default(),
- rendezvous_servers: Arc::new(rendezvous_servers),
- inner: Arc::new(Inner {
- serial,
- version,
- software_url,
- sk,
- mask,
- local_ip,
- secure_tcp_pk_b,
- secure_tcp_sk_b,
- }),
- ws_map: Arc::new(Mutex::new(HashMap::new())),
- };
- log::info!("mask: {:?}", rs.inner.mask);
- log::info!("local-ip: {:?}", rs.inner.local_ip);
- std::env::set_var("PORT_FOR_API", port.to_string());
- rs.parse_relay_servers(&get_arg("relay-servers"));
- let mut listener = create_tcp_listener(port).await?;
- let mut listener2 = create_tcp_listener(nat_port).await?;
- let mut listener3 = create_tcp_listener(ws_port).await?;
- let test_addr = std::env::var("TEST_HBBS").unwrap_or_default();
- if std::env::var("ALWAYS_USE_RELAY")
- .unwrap_or_default()
- .to_uppercase()
- == "Y"
- {
- ALWAYS_USE_RELAY.store(true, Ordering::SeqCst);
- }
- log::info!(
- "ALWAYS_USE_RELAY={}",
- if ALWAYS_USE_RELAY.load(Ordering::SeqCst) {
- "Y"
- } else {
- "N"
- }
- );
- let must_login = get_arg("must-login");
- log::debug!("must_login={}", must_login);
- if must_login.to_uppercase() == "Y"
- || (must_login == ""
- && std::env::var("MUST_LOGIN")
- .unwrap_or_default()
- .to_uppercase()
- == "Y")
- {
- MUST_LOGIN.store(true, Ordering::SeqCst);
- }
- log::info!(
- "MUST_LOGIN={}",
- if MUST_LOGIN.load(Ordering::SeqCst) {
- "Y"
- } else {
- "N"
- }
- );
- if test_addr.to_lowercase() != "no" {
- let test_addr = if test_addr.is_empty() {
- listener.local_addr()?
- } else {
- test_addr.parse()?
- };
- tokio::spawn(async move {
- if let Err(err) = test_hbbs(test_addr).await {
- if test_addr.is_ipv6() && test_addr.ip().is_unspecified() {
- let mut test_addr = test_addr;
- test_addr.set_ip(IpAddr::V4(Ipv4Addr::UNSPECIFIED));
- if let Err(err) = test_hbbs(test_addr).await {
- log::error!("Failed to run hbbs test with {test_addr}: {err}");
- std::process::exit(1);
- }
- } else {
- log::error!("Failed to run hbbs test with {test_addr}: {err}");
- std::process::exit(1);
- }
- }
- });
- };
- let main_task = async move {
- loop {
- log::info!("Start");
- match rs
- .io_loop(
- &mut rx,
- &mut listener,
- &mut listener2,
- &mut listener3,
- &mut socket,
- &key,
- )
- .await
- {
- LoopFailure::UdpSocket => {
- drop(socket);
- socket = create_udp_listener(port, rmem).await?;
- }
- LoopFailure::Listener => {
- drop(listener);
- listener = create_tcp_listener(port).await?;
- }
- LoopFailure::Listener2 => {
- drop(listener2);
- listener2 = create_tcp_listener(nat_port).await?;
- }
- LoopFailure::Listener3 => {
- drop(listener3);
- listener3 = create_tcp_listener(ws_port).await?;
- }
- }
- }
- };
- let listen_signal = listen_signal();
- tokio::select!(
- res = main_task => res,
- res = listen_signal => res,
- )
- }
- async fn io_loop(
- &mut self,
- rx: &mut Receiver,
- listener: &mut TcpListener,
- listener2: &mut TcpListener,
- listener3: &mut TcpListener,
- socket: &mut FramedSocket,
- key: &str,
- ) -> LoopFailure {
- let mut timer_check_relay = interval(Duration::from_millis(CHECK_RELAY_TIMEOUT));
- loop {
- tokio::select! {
- _ = timer_check_relay.tick() => {
- if self.relay_servers0.len() > 1 {
- let rs = self.relay_servers0.clone();
- let tx = self.tx.clone();
- tokio::spawn(async move {
- check_relay_servers(rs, tx).await;
- });
- }
- }
- Some(data) = rx.recv() => {
- match data {
- Data::Msg(msg, addr) => { allow_err!(socket.send(msg.as_ref(), addr).await); }
- Data::RelayServers0(rs) => { self.parse_relay_servers(&rs); }
- Data::RelayServers(rs) => { self.relay_servers = Arc::new(rs); }
- }
- }
- res = socket.next() => {
- match res {
- Some(Ok((bytes, addr))) => {
- if let Err(err) = self.handle_udp(&bytes, addr.into(), socket, key).await {
- log::error!("udp failure: {}", err);
- return LoopFailure::UdpSocket;
- }
- }
- Some(Err(err)) => {
- log::error!("udp failure: {}", err);
- return LoopFailure::UdpSocket;
- }
- None => {
- // unreachable!() ?
- }
- }
- }
- res = listener2.accept() => {
- match res {
- Ok((stream, addr)) => {
- stream.set_nodelay(true).ok();
- self.handle_listener2(stream, addr).await;
- }
- Err(err) => {
- log::error!("listener2.accept failed: {}", err);
- return LoopFailure::Listener2;
- }
- }
- }
- res = listener3.accept() => {
- match res {
- Ok((stream, addr)) => {
- stream.set_nodelay(true).ok();
- self.handle_listener(stream, addr, key, true).await;
- }
- Err(err) => {
- log::error!("listener3.accept failed: {}", err);
- return LoopFailure::Listener3;
- }
- }
- }
- res = listener.accept() => {
- match res {
- Ok((stream, addr)) => {
- stream.set_nodelay(true).ok();
- self.handle_listener(stream, addr, key, false).await;
- }
- Err(err) => {
- log::error!("listener.accept failed: {}", err);
- return LoopFailure::Listener;
- }
- }
- }
- }
- }
- }
- #[inline]
- async fn handle_udp(
- &mut self,
- bytes: &BytesMut,
- addr: SocketAddr,
- socket: &mut FramedSocket,
- key: &str,
- ) -> ResultType<()> {
- if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(bytes) {
- match msg_in.union {
- Some(rendezvous_message::Union::RegisterPeer(rp)) => {
- // B registered
- if !rp.id.is_empty() {
- log::trace!("New peer registered: {:?} {:?}", &rp.id, &addr);
- let request_pk = self.update_addr(rp.id, addr).await;
- let mut msg_out = RendezvousMessage::new();
- msg_out.set_register_peer_response(RegisterPeerResponse {
- request_pk,
- ..Default::default()
- });
- socket.send(&msg_out, addr).await?;
- if self.inner.serial > rp.serial {
- let mut msg_out = RendezvousMessage::new();
- msg_out.set_configure_update(ConfigUpdate {
- serial: self.inner.serial,
- rendezvous_servers: (*self.rendezvous_servers).clone(),
- ..Default::default()
- });
- socket.send(&msg_out, addr).await?;
- }
- }
- }
- Some(rendezvous_message::Union::RegisterPk(rk)) => {
- let response = self.handle_register_pk(rk, addr, false).await;
- match response {
- Err(err) => {
- let mut msg_out = RendezvousMessage::new();
- msg_out.set_register_pk_response(RegisterPkResponse {
- result: err.into(),
- ..Default::default()
- });
- socket.send(&msg_out, addr).await?;
- }
- Ok(res) => {
- let mut msg_out = RendezvousMessage::new();
- msg_out.set_register_pk_response(RegisterPkResponse {
- result: res.into(),
- ..Default::default()
- });
- socket.send(&msg_out, addr).await?;
- }
- }
- }
- Some(rendezvous_message::Union::PunchHoleRequest(ph)) => {
- if self.pm.is_in_memory(&ph.id).await {
- self.handle_udp_punch_hole_request(addr, ph, key).await?;
- } else {
- // not in memory, fetch from db with spawn in case blocking me
- let mut me = self.clone();
- let key = key.to_owned();
- tokio::spawn(async move {
- allow_err!(me.handle_udp_punch_hole_request(addr, ph, &key).await);
- });
- }
- }
- Some(rendezvous_message::Union::PunchHoleSent(phs)) => {
- self.handle_hole_sent(phs, addr, Some(socket)).await?;
- }
- Some(rendezvous_message::Union::LocalAddr(la)) => {
- self.handle_local_addr(la, addr, Some(socket)).await?;
- }
- Some(rendezvous_message::Union::ConfigureUpdate(mut cu)) => {
- if try_into_v4(addr).ip().is_loopback() && cu.serial > self.inner.serial {
- let mut inner: Inner = (*self.inner).clone();
- inner.serial = cu.serial;
- self.inner = Arc::new(inner);
- self.rendezvous_servers = Arc::new(
- cu.rendezvous_servers
- .drain(..)
- .filter(|x| {
- !x.is_empty()
- && test_if_valid_server(x, "rendezvous-server").is_ok()
- })
- .collect(),
- );
- log::info!(
- "configure updated: serial={} rendezvous-servers={:?}",
- self.inner.serial,
- self.rendezvous_servers
- );
- }
- }
- Some(rendezvous_message::Union::SoftwareUpdate(su)) => {
- if !self.inner.version.is_empty() && su.url != self.inner.version {
- let mut msg_out = RendezvousMessage::new();
- msg_out.set_software_update(SoftwareUpdate {
- url: self.inner.software_url.clone(),
- ..Default::default()
- });
- socket.send(&msg_out, addr).await?;
- }
- }
- _ => {}
- }
- }
- Ok(())
- }
- #[inline]
- async fn handle_tcp(
- &mut self,
- bytes: &[u8],
- sink: &mut Option<Sink>,
- addr: SocketAddr,
- key: &str,
- ws: bool,
- ) -> bool {
- if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(bytes) {
- // log::debug!("Received TCP message from {}: {:?}", addr, msg_in);
- match msg_in.union {
- Some(rendezvous_message::Union::RegisterPeer(rp)) => {
- // B registered
- if !rp.id.is_empty() {
- log::trace!("New peer registered: {:?} {:?}", &rp.id, &addr);
- let request_pk = self.update_addr(rp.id, addr).await;
- let mut msg_out = RendezvousMessage::new();
- msg_out.set_register_peer_response(RegisterPeerResponse {
- request_pk,
- ..Default::default()
- });
- Self::send_to_sink(sink, msg_out).await;
- if self.inner.serial > rp.serial {
- let mut msg_out = RendezvousMessage::new();
- msg_out.set_configure_update(ConfigUpdate {
- serial: self.inner.serial,
- rendezvous_servers: (*self.rendezvous_servers).clone(),
- ..Default::default()
- });
- Self::send_to_sink(sink, msg_out).await;
- }
- }
- }
- Some(rendezvous_message::Union::PunchHoleRequest(ph)) => {
- // there maybe several attempt, so sink can be none
- if let Some(sink) = sink.take() {
- self.tcp_punch.lock().await.insert(try_into_v4(addr), sink);
- }
- allow_err!(self.handle_tcp_punch_hole_request(addr, ph, key, ws).await);
- return true;
- }
- Some(rendezvous_message::Union::RequestRelay(mut rf)) => {
- // there maybe several attempt, so sink can be none
- if let Some(sink) = sink.take() {
- self.tcp_punch.lock().await.insert(try_into_v4(addr), sink);
- }
- if let Some(peer) = self.pm.get_in_memory(&rf.id).await {
- let mut msg_out = RendezvousMessage::new();
- rf.socket_addr = AddrMangle::encode(addr).into();
- msg_out.set_request_relay(rf);
- let peer_addr = peer.read().await.socket_addr;
- self.tx.send(Data::Msg(msg_out.into(), peer_addr)).ok();
- }
- return true;
- }
- Some(rendezvous_message::Union::RelayResponse(mut rr)) => {
- let addr_b = AddrMangle::decode(&rr.socket_addr);
- rr.socket_addr = Default::default();
- let id = rr.id();
- if !id.is_empty() {
- let pk = self.get_pk(&rr.version, id.to_owned()).await;
- rr.set_pk(pk);
- }
- let mut msg_out = RendezvousMessage::new();
- if !rr.relay_server.is_empty() {
- if self.is_lan(addr_b) {
- // https://github.com/rustdesk/rustdesk-server/issues/24
- rr.relay_server = self.inner.local_ip.clone();
- } else if rr.relay_server == self.inner.local_ip {
- rr.relay_server = self.get_relay_server(addr.ip(), addr_b.ip());
- }
- }
- msg_out.set_relay_response(rr);
- allow_err!(self.send_to_tcp_sync(msg_out, addr_b).await);
- }
- Some(rendezvous_message::Union::PunchHoleSent(phs)) => {
- allow_err!(self.handle_hole_sent(phs, addr, None).await);
- }
- Some(rendezvous_message::Union::LocalAddr(la)) => {
- allow_err!(self.handle_local_addr(la, addr, None).await);
- }
- Some(rendezvous_message::Union::TestNatRequest(tar)) => {
- let mut msg_out = RendezvousMessage::new();
- let mut res = TestNatResponse {
- port: addr.port() as _,
- ..Default::default()
- };
- if self.inner.serial > tar.serial {
- let mut cu = ConfigUpdate::new();
- cu.serial = self.inner.serial;
- cu.rendezvous_servers = (*self.rendezvous_servers).clone();
- res.cu = MessageField::from_option(Some(cu));
- }
- msg_out.set_test_nat_response(res);
- Self::send_to_sink(sink, msg_out).await;
- }
- Some(rendezvous_message::Union::RegisterPk(rk)) => {
- let response = self.handle_register_pk(rk, addr, ws).await;
- match response {
- Err(err) => {
- let mut msg_out = RendezvousMessage::new();
- msg_out.set_register_pk_response(RegisterPkResponse {
- result: err.into(),
- ..Default::default()
- });
- Self::send_to_sink(sink, msg_out).await;
- return false;
- }
- Ok(res) => {
- let mut msg_out = RendezvousMessage::new();
- msg_out.set_register_pk_response(RegisterPkResponse {
- result: res.into(),
- ..Default::default()
- });
- Self::send_to_sink(sink, msg_out).await;
- if ws {
- // for ws, we can only get addr when register_pk
- if let Some(sink) = sink.take() {
- self.ws_map.lock().await.insert(try_into_v4(addr), sink);
- }
- }
- return true;
- }
- }
- }
- Some(rendezvous_message::Union::KeyExchange(ex)) => {
- log::trace!("KeyExchange {:?} <- bytes: {:?}", addr, hex::encode(&bytes));
- if ex.keys.len() != 2 {
- log::error!("Handshake failed: invalid phase 2 key exchange message");
- return false;
- }
- log::trace!("KeyExchange their_pk: {:?}", hex::encode(&ex.keys[0]));
- log::trace!("KeyExchange box: {:?}", hex::encode(&ex.keys[1]));
- let their_pk: [u8; 32] = ex.keys[0].to_vec().try_into().unwrap();
- let cryptobox: [u8; 48] = ex.keys[1].to_vec().try_into().unwrap();
- let symetric_key = get_symetric_key_from_msg(
- self.inner.secure_tcp_sk_b.0,
- their_pk,
- &cryptobox,
- );
- log::debug!("KeyExchange symetric key: {:?}", hex::encode(&symetric_key));
- let key = secretbox::Key::from_slice(&symetric_key);
- match key {
- Some(key) => {
- if let Some(sink) = sink.as_mut() {
- match sink {
- Sink::Wss(s) => s.encrypt = Some(Encrypt::new(key)),
- Sink::Tss(s) => s.encrypt = Some(Encrypt::new(key)),
- }
- }
- log::debug!("KeyExchange symetric key set");
- return true;
- }
- None => {
- log::error!("KeyExchange symetric key NOT set");
- return false;
- }
- }
- }
- Some(rendezvous_message::Union::OnlineRequest(or)) => {
- let mut states = self.peers_online_state(or.peers).await;
- let mut msg_out = RendezvousMessage::new();
- msg_out.set_online_response(OnlineResponse {
- states: states.into(),
- ..Default::default()
- });
- Self::send_to_sink(sink, msg_out).await;
- }
- _ => {}
- }
- }
- false
- }
- async fn peers_online_state(&mut self, peers: Vec<String>) -> BytesMut {
- let mut states = BytesMut::zeroed((peers.len() + 7) / 8);
- for (i, peer_id) in peers.iter().enumerate() {
- if let Some(peer) = self.pm.get_in_memory(peer_id).await {
- let elapsed = peer.read().await.last_reg_time.elapsed().as_millis() as i32;
- // bytes index from left to right
- let states_idx = i / 8;
- let bit_idx = 7 - i % 8;
- if elapsed < REG_TIMEOUT {
- states[states_idx] |= 0x01 << bit_idx;
- }
- }
- }
- states
- }
- async fn handle_register_pk(
- &mut self,
- rk: RegisterPk,
- addr: SocketAddr,
- ws: bool,
- ) -> Result<register_pk_response::Result, register_pk_response::Result> {
- if rk.uuid.is_empty() || rk.pk.is_empty() {
- return Err(INVALID_ID_FORMAT);
- }
- let id = rk.id;
- let ip = addr.ip().to_string();
- if id.len() < 6 {
- return Err(UUID_MISMATCH);
- //return Err(send_rk_res(socket, addr, UUID_MISMATCH).await);
- } else if !self.check_ip_blocker(&ip, &id).await {
- return Err(TOO_FREQUENT);
- //return Err(send_rk_res(socket, addr, TOO_FREQUENT).await);
- }
- let peer = self.pm.get_or(&id).await;
- let (changed, ip_changed) = {
- let peer = peer.read().await;
- if peer.uuid.is_empty() {
- (true, false)
- } else {
- if peer.uuid == rk.uuid {
- if peer.info.ip != ip && peer.pk != rk.pk {
- log::warn!(
- "Peer {} ip/pk mismatch: {}/{:?} vs {}/{:?}",
- id,
- ip,
- rk.pk,
- peer.info.ip,
- peer.pk,
- );
- drop(peer);
- return Err(UUID_MISMATCH);
- //return Err(send_rk_res(socket, addr, UUID_MISMATCH).await);
- }
- } else {
- log::warn!(
- "Peer {} uuid mismatch: {:?} vs {:?}",
- id,
- rk.uuid,
- peer.uuid
- );
- drop(peer);
- return Err(UUID_MISMATCH);
- //return Err(send_rk_res(socket, addr, UUID_MISMATCH).await);
- }
- let ip_changed = peer.info.ip != ip;
- (
- peer.uuid != rk.uuid || peer.pk != rk.pk || ip_changed,
- ip_changed,
- )
- }
- };
- let mut req_pk = peer.read().await.reg_pk;
- if req_pk.1.elapsed().as_secs() > 6 {
- req_pk.0 = 0;
- } else if req_pk.0 > 2 {
- return Err(TOO_FREQUENT);
- //return Err(send_rk_res(socket, addr, TOO_FREQUENT).await);
- }
- req_pk.0 += 1;
- req_pk.1 = Instant::now();
- peer.write().await.reg_pk = req_pk;
- if ip_changed {
- let mut lock = IP_CHANGES.lock().await;
- if let Some((tm, ips)) = lock.get_mut(&id) {
- if tm.elapsed().as_secs() > IP_CHANGE_DUR {
- *tm = Instant::now();
- ips.clear();
- ips.insert(ip.clone(), 1);
- } else if let Some(v) = ips.get_mut(&ip) {
- *v += 1;
- } else {
- ips.insert(ip.clone(), 1);
- }
- } else {
- lock.insert(
- id.clone(),
- (Instant::now(), HashMap::from([(ip.clone(), 1)])),
- );
- }
- }
- if changed || ws {
- // update peer info,解决tcp过程中不更新在线时间的问题
- self.pm.update_pk(id, peer, addr, rk.uuid, rk.pk, ip).await;
- }
- Ok(register_pk_response::Result::OK)
- // let mut msg_out = RendezvousMessage::new();
- // msg_out.set_register_pk_response(RegisterPkResponse {
- // result: register_pk_response::Result::OK.into(),
- // ..Default::default()
- // });
- // Ok(msg_out)
- }
- #[inline]
- async fn update_addr(&mut self, id: String, socket_addr: SocketAddr) -> bool {
- let (request_pk, ip_change) = if let Some(old) = self.pm.get_in_memory(&id).await {
- let mut old = old.write().await;
- let ip = socket_addr.ip();
- let ip_change = if old.socket_addr.port() != 0 {
- ip != old.socket_addr.ip()
- } else {
- ip.to_string() != old.info.ip
- } && !ip.is_loopback();
- let request_pk = old.pk.is_empty() || ip_change;
- if !request_pk {
- old.socket_addr = socket_addr;
- old.last_reg_time = Instant::now();
- }
- let ip_change = if ip_change && old.reg_pk.0 <= 2 {
- Some(if old.socket_addr.port() == 0 {
- old.info.ip.clone()
- } else {
- old.socket_addr.to_string()
- })
- } else {
- None
- };
- (request_pk, ip_change)
- } else {
- (true, None)
- };
- if let Some(old) = ip_change {
- log::info!("IP change of {} from {} to {}", id, old, socket_addr);
- }
- request_pk
- // let mut msg_out = RendezvousMessage::new();
- // msg_out.set_register_peer_response(RegisterPeerResponse {
- // request_pk,
- // ..Default::default()
- // });
- // socket.send(&msg_out, socket_addr).await
- }
- #[inline]
- async fn handle_hole_sent<'a>(
- &mut self,
- phs: PunchHoleSent,
- addr: SocketAddr,
- socket: Option<&'a mut FramedSocket>,
- ) -> ResultType<()> {
- // punch hole sent from B, tell A that B is ready to be connected
- let addr_a = AddrMangle::decode(&phs.socket_addr);
- log::debug!(
- "{} punch hole response to {:?} from {:?}",
- if socket.is_none() { "TCP" } else { "UDP" },
- &addr_a,
- &addr
- );
- let mut msg_out = RendezvousMessage::new();
- let mut p = PunchHoleResponse {
- socket_addr: AddrMangle::encode(addr).into(),
- pk: self.get_pk(&phs.version, phs.id).await,
- relay_server: phs.relay_server.clone(),
- ..Default::default()
- };
- if let Ok(t) = phs.nat_type.enum_value() {
- p.set_nat_type(t);
- }
- msg_out.set_punch_hole_response(p);
- if let Some(socket) = socket {
- socket.send(&msg_out, addr_a).await?;
- } else {
- self.send_to_tcp(msg_out, addr_a).await;
- }
- Ok(())
- }
- #[inline]
- async fn handle_local_addr<'a>(
- &mut self,
- la: LocalAddr,
- addr: SocketAddr,
- socket: Option<&'a mut FramedSocket>,
- ) -> ResultType<()> {
- // relay local addrs of B to A
- let addr_a = AddrMangle::decode(&la.socket_addr);
- log::debug!(
- "{} local addrs response to {:?} from {:?}",
- if socket.is_none() { "TCP" } else { "UDP" },
- &addr_a,
- &addr
- );
- let mut msg_out = RendezvousMessage::new();
- let mut p = PunchHoleResponse {
- socket_addr: la.local_addr.clone(),
- pk: self.get_pk(&la.version, la.id).await,
- relay_server: la.relay_server,
- ..Default::default()
- };
- p.set_is_local(true);
- msg_out.set_punch_hole_response(p);
- if let Some(socket) = socket {
- socket.send(&msg_out, addr_a).await?;
- } else {
- self.send_to_tcp(msg_out, addr_a).await;
- }
- Ok(())
- }
- #[inline]
- async fn handle_punch_hole_request(
- &mut self,
- addr: SocketAddr,
- ph: PunchHoleRequest,
- key: &str,
- ws: bool,
- ) -> ResultType<(RendezvousMessage, Option<SocketAddr>)> {
- let mut ph = ph;
- if !key.is_empty() && ph.licence_key != key {
- let mut msg_out = RendezvousMessage::new();
- msg_out.set_punch_hole_response(PunchHoleResponse {
- failure: punch_hole_response::Failure::LICENSE_MISMATCH.into(),
- ..Default::default()
- });
- return Ok((msg_out, None));
- }
- // if secret is not empty check token by jwt
- if MUST_LOGIN.load(Ordering::SeqCst) {
- if ph.token.is_empty() {
- let mut msg_out = RendezvousMessage::new();
- msg_out.set_punch_hole_response(PunchHoleResponse {
- other_failure: String::from("Connection failed, please login!"),
- ..Default::default()
- });
- return Ok((msg_out, None));
- } else if !jwt::SECRET.is_empty() {
- let token = ph.token;
- let token = jwt::verify_token(token.as_str());
- if token.is_err() {
- let mut msg_out = RendezvousMessage::new();
- msg_out.set_punch_hole_response(PunchHoleResponse {
- //提示重新登录
- other_failure: String::from("Token error, please log out and log back in!"),
- ..Default::default()
- });
- return Ok((msg_out, None));
- }
- }
- }
- let id = ph.id;
- // punch hole request from A, relay to B,
- // check if in same intranet first,
- // fetch local addrs if in same intranet.
- // because punch hole won't work if in the same intranet,
- // all routers will drop such self-connections.
- if let Some(peer) = self.pm.get(&id).await {
- let (elapsed, peer_addr) = {
- let r = peer.read().await;
- (r.last_reg_time.elapsed().as_millis() as i32, r.socket_addr)
- };
- if elapsed >= REG_TIMEOUT {
- let mut msg_out = RendezvousMessage::new();
- msg_out.set_punch_hole_response(PunchHoleResponse {
- failure: punch_hole_response::Failure::OFFLINE.into(),
- ..Default::default()
- });
- return Ok((msg_out, None));
- }
- let mut msg_out = RendezvousMessage::new();
- let peer_is_lan = self.is_lan(peer_addr);
- let is_lan = self.is_lan(addr);
- let mut relay_server = self.get_relay_server(addr.ip(), peer_addr.ip());
- if ALWAYS_USE_RELAY.load(Ordering::SeqCst) || (peer_is_lan ^ is_lan) {
- if peer_is_lan {
- // https://github.com/rustdesk/rustdesk-server/issues/24
- relay_server = self.inner.local_ip.clone()
- }
- ph.nat_type = NatType::SYMMETRIC.into(); // will force relay
- }
- let same_intranet: bool = !ws
- && (peer_is_lan && is_lan || {
- match (peer_addr, addr) {
- (SocketAddr::V4(a), SocketAddr::V4(b)) => a.ip() == b.ip(),
- (SocketAddr::V6(a), SocketAddr::V6(b)) => a.ip() == b.ip(),
- _ => false,
- }
- });
- let socket_addr = AddrMangle::encode(addr).into();
- if same_intranet {
- log::debug!(
- "Fetch local addr {:?} {:?} request from {:?}",
- id,
- peer_addr,
- addr
- );
- msg_out.set_fetch_local_addr(FetchLocalAddr {
- socket_addr,
- relay_server,
- ..Default::default()
- });
- } else {
- log::debug!(
- "Punch hole {:?} {:?} request from {:?}",
- id,
- peer_addr,
- addr
- );
- msg_out.set_punch_hole(PunchHole {
- socket_addr,
- nat_type: ph.nat_type,
- relay_server,
- ..Default::default()
- });
- }
- //
- Ok((msg_out, Some(peer_addr)))
- } else {
- let mut msg_out = RendezvousMessage::new();
- msg_out.set_punch_hole_response(PunchHoleResponse {
- failure: punch_hole_response::Failure::ID_NOT_EXIST.into(),
- ..Default::default()
- });
- Ok((msg_out, None))
- }
- }
- #[inline]
- async fn handle_online_request(
- &mut self,
- stream: &mut FramedStream,
- peers: Vec<String>,
- ) -> ResultType<()> {
- let mut states = self.peers_online_state(peers).await;
- let mut msg_out = RendezvousMessage::new();
- msg_out.set_online_response(OnlineResponse {
- states: states.into(),
- ..Default::default()
- });
- stream.send(&msg_out).await?;
- Ok(())
- }
- #[inline]
- async fn send_to_tcp(&mut self, msg: RendezvousMessage, addr: SocketAddr) {
- let mut tcp = self.tcp_punch.lock().await.remove(&try_into_v4(addr));
- tokio::spawn(async move {
- Self::send_to_sink(&mut tcp, msg).await;
- });
- }
- #[inline]
- async fn send_to_sink(sink: &mut Option<Sink>, msg: RendezvousMessage) {
- if let Some(sink) = sink.as_mut() {
- sink.send(&msg).await;
- }
- }
- #[inline]
- async fn send_to_tcp_sync(
- &mut self,
- msg: RendezvousMessage,
- addr: SocketAddr,
- ) -> ResultType<()> {
- let mut sink = self.tcp_punch.lock().await.remove(&try_into_v4(addr));
- Self::send_to_sink(&mut sink, msg).await;
- Ok(())
- }
- #[inline]
- async fn handle_tcp_punch_hole_request(
- &mut self,
- addr: SocketAddr,
- ph: PunchHoleRequest,
- key: &str,
- ws: bool,
- ) -> ResultType<()> {
- let (msg, to_addr) = self.handle_punch_hole_request(addr, ph, key, ws).await?;
- if let Some(addr) = to_addr {
- let mut sink = self.ws_map.lock().await.remove(&try_into_v4(addr));
- if let Some(s) = sink.as_mut() {
- s.send(&msg).await;
- } else {
- self.tx.send(Data::Msg(msg.into(), addr))?;
- }
- } else {
- self.send_to_tcp_sync(msg, addr).await?;
- }
- Ok(())
- }
- #[inline]
- async fn handle_udp_punch_hole_request(
- &mut self,
- addr: SocketAddr,
- ph: PunchHoleRequest,
- key: &str,
- ) -> ResultType<()> {
- let (msg, to_addr) = self.handle_punch_hole_request(addr, ph, key, false).await?;
- self.tx.send(Data::Msg(
- msg.into(),
- match to_addr {
- Some(addr) => addr,
- None => addr,
- },
- ))?;
- Ok(())
- }
- async fn check_ip_blocker(&self, ip: &str, id: &str) -> bool {
- let mut lock = IP_BLOCKER.lock().await;
- let now = Instant::now();
- if let Some(old) = lock.get_mut(ip) {
- let counter = &mut old.0;
- if counter.1.elapsed().as_secs() > IP_BLOCK_DUR {
- counter.0 = 0;
- } else if counter.0 > 30 {
- return false;
- }
- counter.0 += 1;
- counter.1 = now;
- let counter = &mut old.1;
- let is_new = counter.0.get(id).is_none();
- if counter.1.elapsed().as_secs() > DAY_SECONDS {
- counter.0.clear();
- } else if counter.0.len() > 300 {
- return !is_new;
- }
- if is_new {
- counter.0.insert(id.to_owned());
- }
- counter.1 = now;
- } else {
- lock.insert(ip.to_owned(), ((0, now), (Default::default(), now)));
- }
- true
- }
- fn parse_relay_servers(&mut self, relay_servers: &str) {
- let rs = get_servers(relay_servers, "relay-servers");
- self.relay_servers0 = Arc::new(rs);
- self.relay_servers = self.relay_servers0.clone();
- }
- fn get_relay_server(&self, _pa: IpAddr, _pb: IpAddr) -> String {
- if self.relay_servers.is_empty() {
- return "".to_owned();
- } else if self.relay_servers.len() == 1 {
- return self.relay_servers[0].clone();
- }
- let i = ROTATION_RELAY_SERVER.fetch_add(1, Ordering::SeqCst) % self.relay_servers.len();
- self.relay_servers[i].clone()
- }
- async fn check_cmd(&self, cmd: &str) -> String {
- use std::fmt::Write as _;
- let mut res = "".to_owned();
- let mut fds = cmd.trim().split(' ');
- match fds.next() {
- Some("h") => {
- res = format!(
- "{}\n{}\n{}\n{}\n{}\n{}\n{}\n",
- "relay-servers(rs) <separated by ,>",
- "reload-geo(rg)",
- "ip-blocker(ib) [<ip>|<number>] [-]",
- "ip-changes(ic) [<id>|<number>] [-]",
- "always-use-relay(aur) [Y|N]",
- "test-geo(tg) <ip1> <ip2>",
- "must-login(ml) [Y|N]",
- )
- }
- Some("relay-servers" | "rs") => {
- if let Some(rs) = fds.next() {
- self.tx.send(Data::RelayServers0(rs.to_owned())).ok();
- } else {
- for ip in self.relay_servers.iter() {
- let _ = writeln!(res, "{ip}");
- }
- }
- }
- Some("ip-blocker" | "ib") => {
- let mut lock = IP_BLOCKER.lock().await;
- lock.retain(|&_, (a, b)| {
- a.1.elapsed().as_secs() <= IP_BLOCK_DUR
- || b.1.elapsed().as_secs() <= DAY_SECONDS
- });
- res = format!("{}\n", lock.len());
- let ip = fds.next();
- let mut start = ip.map(|x| x.parse::<i32>().unwrap_or(-1)).unwrap_or(-1);
- if start < 0 {
- if let Some(ip) = ip {
- if let Some((a, b)) = lock.get(ip) {
- let _ = writeln!(
- res,
- "{}/{}s {}/{}s",
- a.0,
- a.1.elapsed().as_secs(),
- b.0.len(),
- b.1.elapsed().as_secs()
- );
- }
- if fds.next() == Some("-") {
- lock.remove(ip);
- }
- } else {
- start = 0;
- }
- }
- if start >= 0 {
- let mut it = lock.iter();
- for i in 0..(start + 10) {
- let x = it.next();
- if x.is_none() {
- break;
- }
- if i < start {
- continue;
- }
- if let Some((ip, (a, b))) = x {
- let _ = writeln!(
- res,
- "{}: {}/{}s {}/{}s",
- ip,
- a.0,
- a.1.elapsed().as_secs(),
- b.0.len(),
- b.1.elapsed().as_secs()
- );
- }
- }
- }
- }
- Some("ip-changes" | "ic") => {
- let mut lock = IP_CHANGES.lock().await;
- lock.retain(|&_, v| v.0.elapsed().as_secs() < IP_CHANGE_DUR_X2 && v.1.len() > 1);
- res = format!("{}\n", lock.len());
- let id = fds.next();
- let mut start = id.map(|x| x.parse::<i32>().unwrap_or(-1)).unwrap_or(-1);
- if !(0..=10_000_000).contains(&start) {
- if let Some(id) = id {
- if let Some((tm, ips)) = lock.get(id) {
- let _ = writeln!(res, "{}s {:?}", tm.elapsed().as_secs(), ips);
- }
- if fds.next() == Some("-") {
- lock.remove(id);
- }
- } else {
- start = 0;
- }
- }
- if start >= 0 {
- let mut it = lock.iter();
- for i in 0..(start + 10) {
- let x = it.next();
- if x.is_none() {
- break;
- }
- if i < start {
- continue;
- }
- if let Some((id, (tm, ips))) = x {
- let _ = writeln!(res, "{}: {}s {:?}", id, tm.elapsed().as_secs(), ips,);
- }
- }
- }
- }
- Some("always-use-relay" | "aur") => {
- if let Some(rs) = fds.next() {
- if rs.to_uppercase() == "Y" {
- ALWAYS_USE_RELAY.store(true, Ordering::SeqCst);
- } else {
- ALWAYS_USE_RELAY.store(false, Ordering::SeqCst);
- }
- self.tx.send(Data::RelayServers0(rs.to_owned())).ok();
- } else {
- let _ = writeln!(
- res,
- "ALWAYS_USE_RELAY: {:?}",
- ALWAYS_USE_RELAY.load(Ordering::SeqCst)
- );
- }
- }
- Some("test-geo" | "tg") => {
- if let Some(rs) = fds.next() {
- if let Ok(a) = rs.parse::<IpAddr>() {
- if let Some(rs) = fds.next() {
- if let Ok(b) = rs.parse::<IpAddr>() {
- res = format!("{:?}", self.get_relay_server(a, b));
- }
- } else {
- res = format!("{:?}", self.get_relay_server(a, a));
- }
- }
- }
- }
- Some("must-login" | "ml") => {
- if let Some(rs) = fds.next() {
- if rs.to_uppercase() == "Y" {
- MUST_LOGIN.store(true, Ordering::SeqCst);
- } else {
- MUST_LOGIN.store(false, Ordering::SeqCst);
- }
- } else {
- let _ = writeln!(res, "MUST_LOGIN: {:?}", MUST_LOGIN.load(Ordering::SeqCst));
- }
- }
- _ => {}
- }
- res
- }
- async fn handle_listener2(&self, stream: TcpStream, addr: SocketAddr) {
- let mut rs = self.clone();
- let ip = try_into_v4(addr).ip();
- if ip.is_loopback() {
- tokio::spawn(async move {
- let mut stream = stream;
- let mut buffer = [0; 1024];
- if let Ok(Ok(n)) = timeout(1000, stream.read(&mut buffer[..])).await {
- if let Ok(data) = std::str::from_utf8(&buffer[..n]) {
- let res = rs.check_cmd(data).await;
- stream.write(res.as_bytes()).await.ok();
- }
- }
- });
- return;
- }
- let stream = FramedStream::from(stream, addr);
- tokio::spawn(async move {
- let mut stream = stream;
- if let Some(Ok(bytes)) = stream.next_timeout(30_000).await {
- if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
- match msg_in.union {
- Some(rendezvous_message::Union::TestNatRequest(_)) => {
- let mut msg_out = RendezvousMessage::new();
- msg_out.set_test_nat_response(TestNatResponse {
- port: addr.port() as _,
- ..Default::default()
- });
- stream.send(&msg_out).await.ok();
- }
- Some(rendezvous_message::Union::OnlineRequest(or)) => {
- allow_err!(rs.handle_online_request(&mut stream, or.peers).await);
- }
- _ => {}
- }
- }
- }
- });
- }
- async fn handle_listener(&self, stream: TcpStream, addr: SocketAddr, key: &str, ws: bool) {
- log::debug!("Tcp connection from {:?}, ws: {}", addr, ws);
- let mut rs = self.clone();
- let key = key.to_owned();
- tokio::spawn(async move {
- allow_err!(rs.handle_listener_inner(stream, addr, &key, ws).await);
- });
- }
- #[inline]
- async fn handle_listener_inner(
- &mut self,
- stream: TcpStream,
- mut addr: SocketAddr,
- key: &str,
- ws: bool,
- ) -> ResultType<()> {
- let mut sink;
- if ws {
- use tokio_tungstenite::tungstenite::handshake::server::{Request, Response};
- let callback = |req: &Request, response: Response| {
- let headers = req.headers();
- let real_ip = headers
- .get("X-Real-IP")
- .or_else(|| headers.get("X-Forwarded-For"))
- .and_then(|header_value| header_value.to_str().ok());
- if let Some(ip) = real_ip {
- if ip.contains('.') {
- addr = format!("{ip}:0").parse().unwrap_or(addr);
- } else {
- addr = format!("[{ip}]:0").parse().unwrap_or(addr);
- }
- }
- Ok(response)
- };
- let ws_stream = tokio_tungstenite::accept_hdr_async(stream, callback).await?;
- let (a, mut b) = ws_stream.split();
- sink = Some(Sink::Wss(SafeWsSink {
- sink: a,
- encrypt: None,
- }));
- while let Ok(Some(Ok(msg))) = timeout(30_000, b.next()).await {
- if let tungstenite::Message::Binary(bytes) = msg {
- if !self.handle_tcp(&bytes, &mut sink, addr, key, ws).await {
- break;
- }
- }
- }
- } else {
- let (a, mut b) = Framed::new(stream, BytesCodec::new()).split();
- sink = Some(Sink::Tss(SafeTcpStreamSink {
- sink: a,
- encrypt: None,
- }));
- // Avoid key exchange if answering on nat helper port
- if !key.is_empty() {
- self.key_exchange_phase1(addr, &mut sink).await;
- }
- while let Ok(Some(Ok(mut bytes))) = timeout(30_000, b.next()).await {
- // log::debug!("receive tcp data from {:?} {:?}", addr, bytes);
- if let Some(Sink::Tss(s)) = sink.as_mut() {
- if let Some(key) = s.encrypt.as_mut() {
- if let Err(err) = key.dec(&mut bytes) {
- log::error!("dec tcp data from {:?} err: {:?}", addr, err);
- break;
- }
- }
- }
- if !self.handle_tcp(&bytes, &mut sink, addr, key, ws).await {
- break;
- }
- }
- }
- if sink.is_none() {
- self.tcp_punch.lock().await.remove(&try_into_v4(addr));
- }
- log::debug!("Tcp connection from {:?} closed", addr);
- Ok(())
- }
- #[inline]
- async fn get_pk(&mut self, version: &str, id: String) -> Bytes {
- if version.is_empty() || self.inner.sk.is_none() {
- Bytes::new()
- } else {
- match self.pm.get(&id).await {
- Some(peer) => {
- let pk = peer.read().await.pk.clone();
- sign::sign(
- &hbb_common::message_proto::IdPk {
- id,
- pk,
- ..Default::default()
- }
- .write_to_bytes()
- .unwrap_or_default(),
- self.inner.sk.as_ref().unwrap(),
- )
- .into()
- }
- _ => Bytes::new(),
- }
- }
- }
- #[inline]
- fn get_server_sk(key: &str) -> (String, Option<sign::SecretKey>) {
- let mut out_sk = None;
- let mut key = key.to_owned();
- if let Ok(sk) = base64::decode(&key) {
- if sk.len() == sign::SECRETKEYBYTES {
- log::info!("The key is a crypto private key");
- key = base64::encode(&sk[(sign::SECRETKEYBYTES / 2)..]);
- let mut tmp = [0u8; sign::SECRETKEYBYTES];
- tmp[..].copy_from_slice(&sk);
- out_sk = Some(sign::SecretKey(tmp));
- }
- }
- if key.is_empty() || key == "-" || key == "_" {
- let (pk, sk) = crate::common::gen_sk(0);
- out_sk = sk;
- if !key.is_empty() {
- key = pk;
- }
- }
- if !key.is_empty() {
- log::info!("Key: {}", key);
- }
- (key, out_sk)
- }
- #[inline]
- fn is_lan(&self, addr: SocketAddr) -> bool {
- if let Some(network) = &self.inner.mask {
- match addr {
- SocketAddr::V4(v4_socket_addr) => {
- return network.contains(*v4_socket_addr.ip());
- }
- SocketAddr::V6(v6_socket_addr) => {
- if let Some(v4_addr) = v6_socket_addr.ip().to_ipv4() {
- return network.contains(v4_addr);
- }
- }
- }
- }
- false
- }
- async fn key_exchange_phase1(&mut self, addr: SocketAddr, sink: &mut Option<Sink>) {
- let mut msg_out = RendezvousMessage::new();
- log::debug!("KeyExchange phase 1: send our pk for this tcp connection in a message signed with our server key");
- let sk = &self.inner.sk;
- match sk {
- Some(sk) => {
- let our_pk_b = self.inner.secure_tcp_pk_b.clone();
- let sm = sign::sign(&our_pk_b.0, &sk);
- let bytes_sm = Bytes::from(sm);
- msg_out.set_key_exchange(KeyExchange {
- keys: vec![bytes_sm],
- ..Default::default()
- });
- log::trace!(
- "KeyExchange {:?} -> bytes: {:?}",
- addr,
- hex::encode(Bytes::from(msg_out.write_to_bytes().unwrap()))
- );
- Self::send_to_sink(sink, msg_out).await;
- }
- None => {}
- }
- }
- }
- async fn check_relay_servers(rs0: Arc<RelayServers>, tx: Sender) {
- let mut futs = Vec::new();
- let rs = Arc::new(Mutex::new(Vec::new()));
- for x in rs0.iter() {
- let mut host = x.to_owned();
- if !host.contains(':') {
- host = format!("{}:{}", host, config::RELAY_PORT);
- }
- let rs = rs.clone();
- let x = x.clone();
- futs.push(tokio::spawn(async move {
- if FramedStream::new(&host, None, CHECK_RELAY_TIMEOUT)
- .await
- .is_ok()
- {
- rs.lock().await.push(x);
- }
- }));
- }
- join_all(futs).await;
- log::debug!("check_relay_servers");
- let rs = std::mem::take(&mut *rs.lock().await);
- if !rs.is_empty() {
- tx.send(Data::RelayServers(rs)).ok();
- }
- }
- // temp solution to solve udp socket failure
- async fn test_hbbs(addr: SocketAddr) -> ResultType<()> {
- let mut addr = addr;
- if addr.ip().is_unspecified() {
- addr.set_ip(if addr.is_ipv4() {
- IpAddr::V4(Ipv4Addr::LOCALHOST)
- } else {
- IpAddr::V6(Ipv6Addr::LOCALHOST)
- });
- }
- let mut socket = FramedSocket::new(config::Config::get_any_listen_addr(addr.is_ipv4())).await?;
- let mut msg_out = RendezvousMessage::new();
- msg_out.set_register_peer(RegisterPeer {
- id: "(:test_hbbs:)".to_owned(),
- ..Default::default()
- });
- let mut last_time_recv = Instant::now();
- let mut timer = interval(Duration::from_secs(1));
- loop {
- tokio::select! {
- _ = timer.tick() => {
- if last_time_recv.elapsed().as_secs() > 12 {
- bail!("Timeout of test_hbbs");
- }
- socket.send(&msg_out, addr).await?;
- }
- Some(Ok((bytes, _))) = socket.next() => {
- if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
- log::trace!("Recv {:?} of test_hbbs", msg_in);
- last_time_recv = Instant::now();
- }
- }
- }
- }
- }
- #[inline]
- async fn send_rk_res(
- socket: &mut FramedSocket,
- addr: SocketAddr,
- res: register_pk_response::Result,
- ) -> ResultType<()> {
- let mut msg_out = RendezvousMessage::new();
- msg_out.set_register_pk_response(RegisterPkResponse {
- result: res.into(),
- ..Default::default()
- });
- socket.send(&msg_out, addr).await
- }
- async fn create_udp_listener(port: i32, rmem: usize) -> ResultType<FramedSocket> {
- let addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), port as _);
- if let Ok(s) = FramedSocket::new_reuse(&addr, true, rmem).await {
- log::debug!("listen on udp {:?}", s.local_addr());
- return Ok(s);
- }
- let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port as _);
- let s = FramedSocket::new_reuse(&addr, true, rmem).await?;
- log::debug!("listen on udp {:?}", s.local_addr());
- Ok(s)
- }
- #[inline]
- async fn create_tcp_listener(port: i32) -> ResultType<TcpListener> {
- let s = listen_any(port as _).await?;
- log::debug!("listen on tcp {:?}", s.local_addr());
- Ok(s)
- }
- fn get_symetric_key_from_msg(
- our_sk_b: [u8; 32],
- their_pk_b: [u8; 32],
- sealed_value: &[u8; 48],
- ) -> [u8; 32] {
- let their_pk_b = box_::PublicKey(their_pk_b);
- let nonce = box_::Nonce([0u8; box_::NONCEBYTES]);
- let sk = box_::SecretKey(our_sk_b);
- let key = box_::open(sealed_value, &nonce, &their_pk_b, &sk);
- match key {
- Ok(key) => {
- let mut key_array = [0u8; 32];
- key_array.copy_from_slice(&key);
- key_array
- }
- Err(e) => panic!("Error while opening the seal key{:?}", e),
- }
- }
|