rendezvous_server.rs 61 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622
  1. use crate::common::*;
  2. use crate::peer::*;
  3. use hbb_common::bytes::BufMut;
  4. use hbb_common::{
  5. allow_err, bail,
  6. bytes::{Bytes, BytesMut},
  7. bytes_codec::BytesCodec,
  8. config,
  9. futures::future::join_all,
  10. futures_util::{
  11. sink::SinkExt,
  12. stream::{SplitSink, StreamExt},
  13. },
  14. log,
  15. protobuf::{Message as _, MessageField},
  16. rendezvous_proto::{
  17. register_pk_response::Result::{INVALID_ID_FORMAT, TOO_FREQUENT, UUID_MISMATCH},
  18. *,
  19. },
  20. sodiumoxide::crypto::{
  21. box_, box_::PublicKey, box_::SecretKey, secretbox, secretbox::Key, secretbox::Nonce, sign,
  22. },
  23. sodiumoxide::hex,
  24. tcp,
  25. tcp::Encrypt,
  26. tcp::{listen_any, FramedStream},
  27. timeout,
  28. tokio::{
  29. self,
  30. io::{AsyncReadExt, AsyncWriteExt},
  31. net::{TcpListener, TcpStream},
  32. sync::{mpsc, Mutex},
  33. time::{interval, Duration},
  34. },
  35. tokio_util::codec::Framed,
  36. try_into_v4,
  37. udp::FramedSocket,
  38. AddrMangle, ResultType,
  39. };
  40. use ipnetwork::Ipv4Network;
  41. use crate::jwt;
  42. use std::io::Error;
  43. use std::{
  44. collections::HashMap,
  45. net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
  46. sync::atomic::{AtomicBool, AtomicUsize, Ordering},
  47. sync::Arc,
  48. time::Instant,
  49. };
  50. #[derive(Clone, Debug)]
  51. enum Data {
  52. Msg(Box<RendezvousMessage>, SocketAddr),
  53. RelayServers0(String),
  54. RelayServers(RelayServers),
  55. }
  56. const REG_TIMEOUT: i32 = 30_000;
  57. type TcpStreamSink = SplitSink<Framed<TcpStream, BytesCodec>, Bytes>;
  58. type WsSink = SplitSink<tokio_tungstenite::WebSocketStream<TcpStream>, tungstenite::Message>;
  59. struct SafeWsSink {
  60. sink: WsSink,
  61. encrypt: Option<Encrypt>,
  62. }
  63. struct SafeTcpStreamSink {
  64. sink: TcpStreamSink,
  65. encrypt: Option<Encrypt>,
  66. }
  67. enum Sink {
  68. // TcpStream(TcpStreamSink),
  69. // Ws(WsSink),
  70. Wss(SafeWsSink),
  71. Tss(SafeTcpStreamSink),
  72. }
  73. impl Sink {
  74. async fn send(&mut self, msg: &RendezvousMessage) {
  75. if let Ok(mut bytes) = msg.write_to_bytes() {
  76. match self {
  77. // Sink::TcpStream(mut s) => allow_err!(s.send(Bytes::from(bytes)).await),
  78. // Sink::Ws(mut s) => allow_err!(s.send(tungstenite::Message::Binary(bytes)).await),
  79. Sink::Wss(s) => {
  80. if let Some(key) = s.encrypt.as_mut() {
  81. bytes = key.enc(&bytes);
  82. }
  83. allow_err!(s.sink.send(tungstenite::Message::Binary(bytes)).await)
  84. }
  85. Sink::Tss(s) => {
  86. if let Some(key) = s.encrypt.as_mut() {
  87. bytes = key.enc(&bytes);
  88. }
  89. allow_err!(s.sink.send(Bytes::from(bytes)).await)
  90. }
  91. }
  92. }
  93. }
  94. }
  95. type Sender = mpsc::UnboundedSender<Data>;
  96. type Receiver = mpsc::UnboundedReceiver<Data>;
  97. static ROTATION_RELAY_SERVER: AtomicUsize = AtomicUsize::new(0);
  98. type RelayServers = Vec<String>;
  99. const CHECK_RELAY_TIMEOUT: u64 = 3_000;
  100. static ALWAYS_USE_RELAY: AtomicBool = AtomicBool::new(false);
  101. static MUST_LOGIN: AtomicBool = AtomicBool::new(false);
  102. #[derive(Clone)]
  103. struct Inner {
  104. serial: i32,
  105. version: String,
  106. software_url: String,
  107. mask: Option<Ipv4Network>,
  108. local_ip: String,
  109. sk: Option<sign::SecretKey>,
  110. secure_tcp_pk_b: PublicKey,
  111. secure_tcp_sk_b: SecretKey,
  112. }
  113. #[derive(Clone)]
  114. pub struct RendezvousServer {
  115. tcp_punch: Arc<Mutex<HashMap<SocketAddr, Sink>>>,
  116. pm: PeerMap,
  117. tx: Sender,
  118. relay_servers: Arc<RelayServers>,
  119. relay_servers0: Arc<RelayServers>,
  120. rendezvous_servers: Arc<Vec<String>>,
  121. inner: Arc<Inner>,
  122. ws_map: Arc<Mutex<HashMap<SocketAddr, Sink>>>,
  123. }
  124. enum LoopFailure {
  125. UdpSocket,
  126. Listener3,
  127. Listener2,
  128. Listener,
  129. }
  130. impl RendezvousServer {
  131. #[tokio::main(flavor = "multi_thread")]
  132. pub async fn start(port: i32, serial: i32, key: &str, rmem: usize) -> ResultType<()> {
  133. let (key, sk) = Self::get_server_sk(key);
  134. let nat_port = port - 1;
  135. let ws_port = port + 2;
  136. let pm = PeerMap::new().await?;
  137. log::info!("serial={}", serial);
  138. let rendezvous_servers = get_servers(&get_arg("rendezvous-servers"), "rendezvous-servers");
  139. log::info!("Listening on tcp/udp :{}", port);
  140. log::info!("Listening on tcp :{}, extra port for NAT test", nat_port);
  141. log::info!("Listening on websocket :{}", ws_port);
  142. let mut socket = create_udp_listener(port, rmem).await?;
  143. let (tx, mut rx) = mpsc::unbounded_channel::<Data>();
  144. let software_url = get_arg("software-url");
  145. let version = hbb_common::get_version_from_url(&software_url);
  146. if !version.is_empty() {
  147. log::info!("software_url: {}, version: {}", software_url, version);
  148. }
  149. let mask = get_arg("mask").parse().ok();
  150. let local_ip = if mask.is_none() {
  151. "".to_owned()
  152. } else {
  153. get_arg_or(
  154. "local-ip",
  155. local_ip_address::local_ip()
  156. .map(|x| x.to_string())
  157. .unwrap_or_default(),
  158. )
  159. };
  160. // For privacy use per connection key pair
  161. let (secure_tcp_pk_b, secure_tcp_sk_b) = box_::gen_keypair();
  162. let mut rs = Self {
  163. tcp_punch: Arc::new(Mutex::new(HashMap::new())),
  164. pm,
  165. tx: tx.clone(),
  166. relay_servers: Default::default(),
  167. relay_servers0: Default::default(),
  168. rendezvous_servers: Arc::new(rendezvous_servers),
  169. inner: Arc::new(Inner {
  170. serial,
  171. version,
  172. software_url,
  173. sk,
  174. mask,
  175. local_ip,
  176. secure_tcp_pk_b,
  177. secure_tcp_sk_b,
  178. }),
  179. ws_map: Arc::new(Mutex::new(HashMap::new())),
  180. };
  181. log::info!("mask: {:?}", rs.inner.mask);
  182. log::info!("local-ip: {:?}", rs.inner.local_ip);
  183. std::env::set_var("PORT_FOR_API", port.to_string());
  184. rs.parse_relay_servers(&get_arg("relay-servers"));
  185. let mut listener = create_tcp_listener(port).await?;
  186. let mut listener2 = create_tcp_listener(nat_port).await?;
  187. let mut listener3 = create_tcp_listener(ws_port).await?;
  188. let test_addr = std::env::var("TEST_HBBS").unwrap_or_default();
  189. if std::env::var("ALWAYS_USE_RELAY")
  190. .unwrap_or_default()
  191. .to_uppercase()
  192. == "Y"
  193. {
  194. ALWAYS_USE_RELAY.store(true, Ordering::SeqCst);
  195. }
  196. log::info!(
  197. "ALWAYS_USE_RELAY={}",
  198. if ALWAYS_USE_RELAY.load(Ordering::SeqCst) {
  199. "Y"
  200. } else {
  201. "N"
  202. }
  203. );
  204. let must_login = get_arg("must-login");
  205. log::debug!("must_login={}", must_login);
  206. if must_login.to_uppercase() == "Y"
  207. || (must_login == ""
  208. && std::env::var("MUST_LOGIN")
  209. .unwrap_or_default()
  210. .to_uppercase()
  211. == "Y")
  212. {
  213. MUST_LOGIN.store(true, Ordering::SeqCst);
  214. }
  215. log::info!(
  216. "MUST_LOGIN={}",
  217. if MUST_LOGIN.load(Ordering::SeqCst) {
  218. "Y"
  219. } else {
  220. "N"
  221. }
  222. );
  223. if test_addr.to_lowercase() != "no" {
  224. let test_addr = if test_addr.is_empty() {
  225. listener.local_addr()?
  226. } else {
  227. test_addr.parse()?
  228. };
  229. tokio::spawn(async move {
  230. if let Err(err) = test_hbbs(test_addr).await {
  231. if test_addr.is_ipv6() && test_addr.ip().is_unspecified() {
  232. let mut test_addr = test_addr;
  233. test_addr.set_ip(IpAddr::V4(Ipv4Addr::UNSPECIFIED));
  234. if let Err(err) = test_hbbs(test_addr).await {
  235. log::error!("Failed to run hbbs test with {test_addr}: {err}");
  236. std::process::exit(1);
  237. }
  238. } else {
  239. log::error!("Failed to run hbbs test with {test_addr}: {err}");
  240. std::process::exit(1);
  241. }
  242. }
  243. });
  244. };
  245. let main_task = async move {
  246. loop {
  247. log::info!("Start");
  248. match rs
  249. .io_loop(
  250. &mut rx,
  251. &mut listener,
  252. &mut listener2,
  253. &mut listener3,
  254. &mut socket,
  255. &key,
  256. )
  257. .await
  258. {
  259. LoopFailure::UdpSocket => {
  260. drop(socket);
  261. socket = create_udp_listener(port, rmem).await?;
  262. }
  263. LoopFailure::Listener => {
  264. drop(listener);
  265. listener = create_tcp_listener(port).await?;
  266. }
  267. LoopFailure::Listener2 => {
  268. drop(listener2);
  269. listener2 = create_tcp_listener(nat_port).await?;
  270. }
  271. LoopFailure::Listener3 => {
  272. drop(listener3);
  273. listener3 = create_tcp_listener(ws_port).await?;
  274. }
  275. }
  276. }
  277. };
  278. let listen_signal = listen_signal();
  279. tokio::select!(
  280. res = main_task => res,
  281. res = listen_signal => res,
  282. )
  283. }
  284. async fn io_loop(
  285. &mut self,
  286. rx: &mut Receiver,
  287. listener: &mut TcpListener,
  288. listener2: &mut TcpListener,
  289. listener3: &mut TcpListener,
  290. socket: &mut FramedSocket,
  291. key: &str,
  292. ) -> LoopFailure {
  293. let mut timer_check_relay = interval(Duration::from_millis(CHECK_RELAY_TIMEOUT));
  294. loop {
  295. tokio::select! {
  296. _ = timer_check_relay.tick() => {
  297. if self.relay_servers0.len() > 1 {
  298. let rs = self.relay_servers0.clone();
  299. let tx = self.tx.clone();
  300. tokio::spawn(async move {
  301. check_relay_servers(rs, tx).await;
  302. });
  303. }
  304. }
  305. Some(data) = rx.recv() => {
  306. match data {
  307. Data::Msg(msg, addr) => { allow_err!(socket.send(msg.as_ref(), addr).await); }
  308. Data::RelayServers0(rs) => { self.parse_relay_servers(&rs); }
  309. Data::RelayServers(rs) => { self.relay_servers = Arc::new(rs); }
  310. }
  311. }
  312. res = socket.next() => {
  313. match res {
  314. Some(Ok((bytes, addr))) => {
  315. if let Err(err) = self.handle_udp(&bytes, addr.into(), socket, key).await {
  316. log::error!("udp failure: {}", err);
  317. return LoopFailure::UdpSocket;
  318. }
  319. }
  320. Some(Err(err)) => {
  321. log::error!("udp failure: {}", err);
  322. return LoopFailure::UdpSocket;
  323. }
  324. None => {
  325. // unreachable!() ?
  326. }
  327. }
  328. }
  329. res = listener2.accept() => {
  330. match res {
  331. Ok((stream, addr)) => {
  332. stream.set_nodelay(true).ok();
  333. self.handle_listener2(stream, addr).await;
  334. }
  335. Err(err) => {
  336. log::error!("listener2.accept failed: {}", err);
  337. return LoopFailure::Listener2;
  338. }
  339. }
  340. }
  341. res = listener3.accept() => {
  342. match res {
  343. Ok((stream, addr)) => {
  344. stream.set_nodelay(true).ok();
  345. self.handle_listener(stream, addr, key, true).await;
  346. }
  347. Err(err) => {
  348. log::error!("listener3.accept failed: {}", err);
  349. return LoopFailure::Listener3;
  350. }
  351. }
  352. }
  353. res = listener.accept() => {
  354. match res {
  355. Ok((stream, addr)) => {
  356. stream.set_nodelay(true).ok();
  357. self.handle_listener(stream, addr, key, false).await;
  358. }
  359. Err(err) => {
  360. log::error!("listener.accept failed: {}", err);
  361. return LoopFailure::Listener;
  362. }
  363. }
  364. }
  365. }
  366. }
  367. }
  368. #[inline]
  369. async fn handle_udp(
  370. &mut self,
  371. bytes: &BytesMut,
  372. addr: SocketAddr,
  373. socket: &mut FramedSocket,
  374. key: &str,
  375. ) -> ResultType<()> {
  376. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(bytes) {
  377. match msg_in.union {
  378. Some(rendezvous_message::Union::RegisterPeer(rp)) => {
  379. // B registered
  380. if !rp.id.is_empty() {
  381. log::trace!("New peer registered: {:?} {:?}", &rp.id, &addr);
  382. let request_pk = self.update_addr(rp.id, addr).await;
  383. let mut msg_out = RendezvousMessage::new();
  384. msg_out.set_register_peer_response(RegisterPeerResponse {
  385. request_pk,
  386. ..Default::default()
  387. });
  388. socket.send(&msg_out, addr).await?;
  389. if self.inner.serial > rp.serial {
  390. let mut msg_out = RendezvousMessage::new();
  391. msg_out.set_configure_update(ConfigUpdate {
  392. serial: self.inner.serial,
  393. rendezvous_servers: (*self.rendezvous_servers).clone(),
  394. ..Default::default()
  395. });
  396. socket.send(&msg_out, addr).await?;
  397. }
  398. }
  399. }
  400. Some(rendezvous_message::Union::RegisterPk(rk)) => {
  401. let response = self.handle_register_pk(rk, addr, false).await;
  402. match response {
  403. Err(err) => {
  404. let mut msg_out = RendezvousMessage::new();
  405. msg_out.set_register_pk_response(RegisterPkResponse {
  406. result: err.into(),
  407. ..Default::default()
  408. });
  409. socket.send(&msg_out, addr).await?;
  410. }
  411. Ok(res) => {
  412. let mut msg_out = RendezvousMessage::new();
  413. msg_out.set_register_pk_response(RegisterPkResponse {
  414. result: res.into(),
  415. ..Default::default()
  416. });
  417. socket.send(&msg_out, addr).await?;
  418. }
  419. }
  420. }
  421. Some(rendezvous_message::Union::PunchHoleRequest(ph)) => {
  422. if self.pm.is_in_memory(&ph.id).await {
  423. self.handle_udp_punch_hole_request(addr, ph, key).await?;
  424. } else {
  425. // not in memory, fetch from db with spawn in case blocking me
  426. let mut me = self.clone();
  427. let key = key.to_owned();
  428. tokio::spawn(async move {
  429. allow_err!(me.handle_udp_punch_hole_request(addr, ph, &key).await);
  430. });
  431. }
  432. }
  433. Some(rendezvous_message::Union::PunchHoleSent(phs)) => {
  434. self.handle_hole_sent(phs, addr, Some(socket)).await?;
  435. }
  436. Some(rendezvous_message::Union::LocalAddr(la)) => {
  437. self.handle_local_addr(la, addr, Some(socket)).await?;
  438. }
  439. Some(rendezvous_message::Union::ConfigureUpdate(mut cu)) => {
  440. if try_into_v4(addr).ip().is_loopback() && cu.serial > self.inner.serial {
  441. let mut inner: Inner = (*self.inner).clone();
  442. inner.serial = cu.serial;
  443. self.inner = Arc::new(inner);
  444. self.rendezvous_servers = Arc::new(
  445. cu.rendezvous_servers
  446. .drain(..)
  447. .filter(|x| {
  448. !x.is_empty()
  449. && test_if_valid_server(x, "rendezvous-server").is_ok()
  450. })
  451. .collect(),
  452. );
  453. log::info!(
  454. "configure updated: serial={} rendezvous-servers={:?}",
  455. self.inner.serial,
  456. self.rendezvous_servers
  457. );
  458. }
  459. }
  460. Some(rendezvous_message::Union::SoftwareUpdate(su)) => {
  461. if !self.inner.version.is_empty() && su.url != self.inner.version {
  462. let mut msg_out = RendezvousMessage::new();
  463. msg_out.set_software_update(SoftwareUpdate {
  464. url: self.inner.software_url.clone(),
  465. ..Default::default()
  466. });
  467. socket.send(&msg_out, addr).await?;
  468. }
  469. }
  470. _ => {}
  471. }
  472. }
  473. Ok(())
  474. }
  475. #[inline]
  476. async fn handle_tcp(
  477. &mut self,
  478. bytes: &[u8],
  479. sink: &mut Option<Sink>,
  480. addr: SocketAddr,
  481. key: &str,
  482. ws: bool,
  483. ) -> bool {
  484. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(bytes) {
  485. // log::debug!("Received TCP message from {}: {:?}", addr, msg_in);
  486. match msg_in.union {
  487. Some(rendezvous_message::Union::RegisterPeer(rp)) => {
  488. // B registered
  489. if !rp.id.is_empty() {
  490. log::trace!("New peer registered: {:?} {:?}", &rp.id, &addr);
  491. let request_pk = self.update_addr(rp.id, addr).await;
  492. let mut msg_out = RendezvousMessage::new();
  493. msg_out.set_register_peer_response(RegisterPeerResponse {
  494. request_pk,
  495. ..Default::default()
  496. });
  497. Self::send_to_sink(sink, msg_out).await;
  498. if self.inner.serial > rp.serial {
  499. let mut msg_out = RendezvousMessage::new();
  500. msg_out.set_configure_update(ConfigUpdate {
  501. serial: self.inner.serial,
  502. rendezvous_servers: (*self.rendezvous_servers).clone(),
  503. ..Default::default()
  504. });
  505. Self::send_to_sink(sink, msg_out).await;
  506. }
  507. }
  508. }
  509. Some(rendezvous_message::Union::PunchHoleRequest(ph)) => {
  510. // there maybe several attempt, so sink can be none
  511. if let Some(sink) = sink.take() {
  512. self.tcp_punch.lock().await.insert(try_into_v4(addr), sink);
  513. }
  514. allow_err!(self.handle_tcp_punch_hole_request(addr, ph, key, ws).await);
  515. return true;
  516. }
  517. Some(rendezvous_message::Union::RequestRelay(mut rf)) => {
  518. // there maybe several attempt, so sink can be none
  519. if let Some(sink) = sink.take() {
  520. self.tcp_punch.lock().await.insert(try_into_v4(addr), sink);
  521. }
  522. if let Some(peer) = self.pm.get_in_memory(&rf.id).await {
  523. let mut msg_out = RendezvousMessage::new();
  524. rf.socket_addr = AddrMangle::encode(addr).into();
  525. msg_out.set_request_relay(rf);
  526. let peer_addr = peer.read().await.socket_addr;
  527. self.tx.send(Data::Msg(msg_out.into(), peer_addr)).ok();
  528. }
  529. return true;
  530. }
  531. Some(rendezvous_message::Union::RelayResponse(mut rr)) => {
  532. let addr_b = AddrMangle::decode(&rr.socket_addr);
  533. rr.socket_addr = Default::default();
  534. let id = rr.id();
  535. if !id.is_empty() {
  536. let pk = self.get_pk(&rr.version, id.to_owned()).await;
  537. rr.set_pk(pk);
  538. }
  539. let mut msg_out = RendezvousMessage::new();
  540. if !rr.relay_server.is_empty() {
  541. if self.is_lan(addr_b) {
  542. // https://github.com/rustdesk/rustdesk-server/issues/24
  543. rr.relay_server = self.inner.local_ip.clone();
  544. } else if rr.relay_server == self.inner.local_ip {
  545. rr.relay_server = self.get_relay_server(addr.ip(), addr_b.ip());
  546. }
  547. }
  548. msg_out.set_relay_response(rr);
  549. allow_err!(self.send_to_tcp_sync(msg_out, addr_b).await);
  550. }
  551. Some(rendezvous_message::Union::PunchHoleSent(phs)) => {
  552. allow_err!(self.handle_hole_sent(phs, addr, None).await);
  553. }
  554. Some(rendezvous_message::Union::LocalAddr(la)) => {
  555. allow_err!(self.handle_local_addr(la, addr, None).await);
  556. }
  557. Some(rendezvous_message::Union::TestNatRequest(tar)) => {
  558. let mut msg_out = RendezvousMessage::new();
  559. let mut res = TestNatResponse {
  560. port: addr.port() as _,
  561. ..Default::default()
  562. };
  563. if self.inner.serial > tar.serial {
  564. let mut cu = ConfigUpdate::new();
  565. cu.serial = self.inner.serial;
  566. cu.rendezvous_servers = (*self.rendezvous_servers).clone();
  567. res.cu = MessageField::from_option(Some(cu));
  568. }
  569. msg_out.set_test_nat_response(res);
  570. Self::send_to_sink(sink, msg_out).await;
  571. }
  572. Some(rendezvous_message::Union::RegisterPk(rk)) => {
  573. let response = self.handle_register_pk(rk, addr, ws).await;
  574. match response {
  575. Err(err) => {
  576. let mut msg_out = RendezvousMessage::new();
  577. msg_out.set_register_pk_response(RegisterPkResponse {
  578. result: err.into(),
  579. ..Default::default()
  580. });
  581. Self::send_to_sink(sink, msg_out).await;
  582. return false;
  583. }
  584. Ok(res) => {
  585. let mut msg_out = RendezvousMessage::new();
  586. msg_out.set_register_pk_response(RegisterPkResponse {
  587. result: res.into(),
  588. ..Default::default()
  589. });
  590. Self::send_to_sink(sink, msg_out).await;
  591. if ws {
  592. // for ws, we can only get addr when register_pk
  593. if let Some(sink) = sink.take() {
  594. self.ws_map.lock().await.insert(try_into_v4(addr), sink);
  595. }
  596. }
  597. return true;
  598. }
  599. }
  600. }
  601. Some(rendezvous_message::Union::KeyExchange(ex)) => {
  602. log::trace!("KeyExchange {:?} <- bytes: {:?}", addr, hex::encode(&bytes));
  603. if ex.keys.len() != 2 {
  604. log::error!("Handshake failed: invalid phase 2 key exchange message");
  605. return false;
  606. }
  607. log::trace!("KeyExchange their_pk: {:?}", hex::encode(&ex.keys[0]));
  608. log::trace!("KeyExchange box: {:?}", hex::encode(&ex.keys[1]));
  609. let their_pk: [u8; 32] = ex.keys[0].to_vec().try_into().unwrap();
  610. let cryptobox: [u8; 48] = ex.keys[1].to_vec().try_into().unwrap();
  611. let symetric_key = get_symetric_key_from_msg(
  612. self.inner.secure_tcp_sk_b.0,
  613. their_pk,
  614. &cryptobox,
  615. );
  616. log::debug!("KeyExchange symetric key: {:?}", hex::encode(&symetric_key));
  617. let key = secretbox::Key::from_slice(&symetric_key);
  618. match key {
  619. Some(key) => {
  620. if let Some(sink) = sink.as_mut() {
  621. match sink {
  622. Sink::Wss(s) => s.encrypt = Some(Encrypt::new(key)),
  623. Sink::Tss(s) => s.encrypt = Some(Encrypt::new(key)),
  624. }
  625. }
  626. log::debug!("KeyExchange symetric key set");
  627. return true;
  628. }
  629. None => {
  630. log::error!("KeyExchange symetric key NOT set");
  631. return false;
  632. }
  633. }
  634. }
  635. Some(rendezvous_message::Union::OnlineRequest(or)) => {
  636. let mut states = self.peers_online_state(or.peers).await;
  637. let mut msg_out = RendezvousMessage::new();
  638. msg_out.set_online_response(OnlineResponse {
  639. states: states.into(),
  640. ..Default::default()
  641. });
  642. Self::send_to_sink(sink, msg_out).await;
  643. }
  644. _ => {}
  645. }
  646. }
  647. false
  648. }
  649. async fn peers_online_state(&mut self, peers: Vec<String>) -> BytesMut {
  650. let mut states = BytesMut::zeroed((peers.len() + 7) / 8);
  651. for (i, peer_id) in peers.iter().enumerate() {
  652. if let Some(peer) = self.pm.get_in_memory(peer_id).await {
  653. let elapsed = peer.read().await.last_reg_time.elapsed().as_millis() as i32;
  654. // bytes index from left to right
  655. let states_idx = i / 8;
  656. let bit_idx = 7 - i % 8;
  657. if elapsed < REG_TIMEOUT {
  658. states[states_idx] |= 0x01 << bit_idx;
  659. }
  660. }
  661. }
  662. states
  663. }
  664. async fn handle_register_pk(
  665. &mut self,
  666. rk: RegisterPk,
  667. addr: SocketAddr,
  668. ws: bool,
  669. ) -> Result<register_pk_response::Result, register_pk_response::Result> {
  670. if rk.uuid.is_empty() || rk.pk.is_empty() {
  671. return Err(INVALID_ID_FORMAT);
  672. }
  673. let id = rk.id;
  674. let ip = addr.ip().to_string();
  675. if id.len() < 6 {
  676. return Err(UUID_MISMATCH);
  677. //return Err(send_rk_res(socket, addr, UUID_MISMATCH).await);
  678. } else if !self.check_ip_blocker(&ip, &id).await {
  679. return Err(TOO_FREQUENT);
  680. //return Err(send_rk_res(socket, addr, TOO_FREQUENT).await);
  681. }
  682. let peer = self.pm.get_or(&id).await;
  683. let (changed, ip_changed) = {
  684. let peer = peer.read().await;
  685. if peer.uuid.is_empty() {
  686. (true, false)
  687. } else {
  688. if peer.uuid == rk.uuid {
  689. if peer.info.ip != ip && peer.pk != rk.pk {
  690. log::warn!(
  691. "Peer {} ip/pk mismatch: {}/{:?} vs {}/{:?}",
  692. id,
  693. ip,
  694. rk.pk,
  695. peer.info.ip,
  696. peer.pk,
  697. );
  698. drop(peer);
  699. return Err(UUID_MISMATCH);
  700. //return Err(send_rk_res(socket, addr, UUID_MISMATCH).await);
  701. }
  702. } else {
  703. log::warn!(
  704. "Peer {} uuid mismatch: {:?} vs {:?}",
  705. id,
  706. rk.uuid,
  707. peer.uuid
  708. );
  709. drop(peer);
  710. return Err(UUID_MISMATCH);
  711. //return Err(send_rk_res(socket, addr, UUID_MISMATCH).await);
  712. }
  713. let ip_changed = peer.info.ip != ip;
  714. (
  715. peer.uuid != rk.uuid || peer.pk != rk.pk || ip_changed,
  716. ip_changed,
  717. )
  718. }
  719. };
  720. let mut req_pk = peer.read().await.reg_pk;
  721. if req_pk.1.elapsed().as_secs() > 6 {
  722. req_pk.0 = 0;
  723. } else if req_pk.0 > 2 {
  724. return Err(TOO_FREQUENT);
  725. //return Err(send_rk_res(socket, addr, TOO_FREQUENT).await);
  726. }
  727. req_pk.0 += 1;
  728. req_pk.1 = Instant::now();
  729. peer.write().await.reg_pk = req_pk;
  730. if ip_changed {
  731. let mut lock = IP_CHANGES.lock().await;
  732. if let Some((tm, ips)) = lock.get_mut(&id) {
  733. if tm.elapsed().as_secs() > IP_CHANGE_DUR {
  734. *tm = Instant::now();
  735. ips.clear();
  736. ips.insert(ip.clone(), 1);
  737. } else if let Some(v) = ips.get_mut(&ip) {
  738. *v += 1;
  739. } else {
  740. ips.insert(ip.clone(), 1);
  741. }
  742. } else {
  743. lock.insert(
  744. id.clone(),
  745. (Instant::now(), HashMap::from([(ip.clone(), 1)])),
  746. );
  747. }
  748. }
  749. if changed || ws {
  750. // update peer info,解决tcp过程中不更新在线时间的问题
  751. self.pm.update_pk(id, peer, addr, rk.uuid, rk.pk, ip).await;
  752. }
  753. Ok(register_pk_response::Result::OK)
  754. // let mut msg_out = RendezvousMessage::new();
  755. // msg_out.set_register_pk_response(RegisterPkResponse {
  756. // result: register_pk_response::Result::OK.into(),
  757. // ..Default::default()
  758. // });
  759. // Ok(msg_out)
  760. }
  761. #[inline]
  762. async fn update_addr(&mut self, id: String, socket_addr: SocketAddr) -> bool {
  763. let (request_pk, ip_change) = if let Some(old) = self.pm.get_in_memory(&id).await {
  764. let mut old = old.write().await;
  765. let ip = socket_addr.ip();
  766. let ip_change = if old.socket_addr.port() != 0 {
  767. ip != old.socket_addr.ip()
  768. } else {
  769. ip.to_string() != old.info.ip
  770. } && !ip.is_loopback();
  771. let request_pk = old.pk.is_empty() || ip_change;
  772. if !request_pk {
  773. old.socket_addr = socket_addr;
  774. old.last_reg_time = Instant::now();
  775. }
  776. let ip_change = if ip_change && old.reg_pk.0 <= 2 {
  777. Some(if old.socket_addr.port() == 0 {
  778. old.info.ip.clone()
  779. } else {
  780. old.socket_addr.to_string()
  781. })
  782. } else {
  783. None
  784. };
  785. (request_pk, ip_change)
  786. } else {
  787. (true, None)
  788. };
  789. if let Some(old) = ip_change {
  790. log::info!("IP change of {} from {} to {}", id, old, socket_addr);
  791. }
  792. request_pk
  793. // let mut msg_out = RendezvousMessage::new();
  794. // msg_out.set_register_peer_response(RegisterPeerResponse {
  795. // request_pk,
  796. // ..Default::default()
  797. // });
  798. // socket.send(&msg_out, socket_addr).await
  799. }
  800. #[inline]
  801. async fn handle_hole_sent<'a>(
  802. &mut self,
  803. phs: PunchHoleSent,
  804. addr: SocketAddr,
  805. socket: Option<&'a mut FramedSocket>,
  806. ) -> ResultType<()> {
  807. // punch hole sent from B, tell A that B is ready to be connected
  808. let addr_a = AddrMangle::decode(&phs.socket_addr);
  809. log::debug!(
  810. "{} punch hole response to {:?} from {:?}",
  811. if socket.is_none() { "TCP" } else { "UDP" },
  812. &addr_a,
  813. &addr
  814. );
  815. let mut msg_out = RendezvousMessage::new();
  816. let mut p = PunchHoleResponse {
  817. socket_addr: AddrMangle::encode(addr).into(),
  818. pk: self.get_pk(&phs.version, phs.id).await,
  819. relay_server: phs.relay_server.clone(),
  820. ..Default::default()
  821. };
  822. if let Ok(t) = phs.nat_type.enum_value() {
  823. p.set_nat_type(t);
  824. }
  825. msg_out.set_punch_hole_response(p);
  826. if let Some(socket) = socket {
  827. socket.send(&msg_out, addr_a).await?;
  828. } else {
  829. self.send_to_tcp(msg_out, addr_a).await;
  830. }
  831. Ok(())
  832. }
  833. #[inline]
  834. async fn handle_local_addr<'a>(
  835. &mut self,
  836. la: LocalAddr,
  837. addr: SocketAddr,
  838. socket: Option<&'a mut FramedSocket>,
  839. ) -> ResultType<()> {
  840. // relay local addrs of B to A
  841. let addr_a = AddrMangle::decode(&la.socket_addr);
  842. log::debug!(
  843. "{} local addrs response to {:?} from {:?}",
  844. if socket.is_none() { "TCP" } else { "UDP" },
  845. &addr_a,
  846. &addr
  847. );
  848. let mut msg_out = RendezvousMessage::new();
  849. let mut p = PunchHoleResponse {
  850. socket_addr: la.local_addr.clone(),
  851. pk: self.get_pk(&la.version, la.id).await,
  852. relay_server: la.relay_server,
  853. ..Default::default()
  854. };
  855. p.set_is_local(true);
  856. msg_out.set_punch_hole_response(p);
  857. if let Some(socket) = socket {
  858. socket.send(&msg_out, addr_a).await?;
  859. } else {
  860. self.send_to_tcp(msg_out, addr_a).await;
  861. }
  862. Ok(())
  863. }
  864. #[inline]
  865. async fn handle_punch_hole_request(
  866. &mut self,
  867. addr: SocketAddr,
  868. ph: PunchHoleRequest,
  869. key: &str,
  870. ws: bool,
  871. ) -> ResultType<(RendezvousMessage, Option<SocketAddr>)> {
  872. let mut ph = ph;
  873. if !key.is_empty() && ph.licence_key != key {
  874. let mut msg_out = RendezvousMessage::new();
  875. msg_out.set_punch_hole_response(PunchHoleResponse {
  876. failure: punch_hole_response::Failure::LICENSE_MISMATCH.into(),
  877. ..Default::default()
  878. });
  879. return Ok((msg_out, None));
  880. }
  881. // if secret is not empty check token by jwt
  882. if MUST_LOGIN.load(Ordering::SeqCst) {
  883. if ph.token.is_empty() {
  884. let mut msg_out = RendezvousMessage::new();
  885. msg_out.set_punch_hole_response(PunchHoleResponse {
  886. other_failure: String::from("Connection failed, please login!"),
  887. ..Default::default()
  888. });
  889. return Ok((msg_out, None));
  890. } else if !jwt::SECRET.is_empty() {
  891. let token = ph.token;
  892. let token = jwt::verify_token(token.as_str());
  893. if token.is_err() {
  894. let mut msg_out = RendezvousMessage::new();
  895. msg_out.set_punch_hole_response(PunchHoleResponse {
  896. //提示重新登录
  897. other_failure: String::from("Token error, please log out and log back in!"),
  898. ..Default::default()
  899. });
  900. return Ok((msg_out, None));
  901. }
  902. }
  903. }
  904. let id = ph.id;
  905. // punch hole request from A, relay to B,
  906. // check if in same intranet first,
  907. // fetch local addrs if in same intranet.
  908. // because punch hole won't work if in the same intranet,
  909. // all routers will drop such self-connections.
  910. if let Some(peer) = self.pm.get(&id).await {
  911. let (elapsed, peer_addr) = {
  912. let r = peer.read().await;
  913. (r.last_reg_time.elapsed().as_millis() as i32, r.socket_addr)
  914. };
  915. if elapsed >= REG_TIMEOUT {
  916. let mut msg_out = RendezvousMessage::new();
  917. msg_out.set_punch_hole_response(PunchHoleResponse {
  918. failure: punch_hole_response::Failure::OFFLINE.into(),
  919. ..Default::default()
  920. });
  921. return Ok((msg_out, None));
  922. }
  923. let mut msg_out = RendezvousMessage::new();
  924. let peer_is_lan = self.is_lan(peer_addr);
  925. let is_lan = self.is_lan(addr);
  926. let mut relay_server = self.get_relay_server(addr.ip(), peer_addr.ip());
  927. if ALWAYS_USE_RELAY.load(Ordering::SeqCst) || (peer_is_lan ^ is_lan) {
  928. if peer_is_lan {
  929. // https://github.com/rustdesk/rustdesk-server/issues/24
  930. relay_server = self.inner.local_ip.clone()
  931. }
  932. ph.nat_type = NatType::SYMMETRIC.into(); // will force relay
  933. }
  934. let same_intranet: bool = !ws
  935. && (peer_is_lan && is_lan || {
  936. match (peer_addr, addr) {
  937. (SocketAddr::V4(a), SocketAddr::V4(b)) => a.ip() == b.ip(),
  938. (SocketAddr::V6(a), SocketAddr::V6(b)) => a.ip() == b.ip(),
  939. _ => false,
  940. }
  941. });
  942. let socket_addr = AddrMangle::encode(addr).into();
  943. if same_intranet {
  944. log::debug!(
  945. "Fetch local addr {:?} {:?} request from {:?}",
  946. id,
  947. peer_addr,
  948. addr
  949. );
  950. msg_out.set_fetch_local_addr(FetchLocalAddr {
  951. socket_addr,
  952. relay_server,
  953. ..Default::default()
  954. });
  955. } else {
  956. log::debug!(
  957. "Punch hole {:?} {:?} request from {:?}",
  958. id,
  959. peer_addr,
  960. addr
  961. );
  962. msg_out.set_punch_hole(PunchHole {
  963. socket_addr,
  964. nat_type: ph.nat_type,
  965. relay_server,
  966. ..Default::default()
  967. });
  968. }
  969. //
  970. Ok((msg_out, Some(peer_addr)))
  971. } else {
  972. let mut msg_out = RendezvousMessage::new();
  973. msg_out.set_punch_hole_response(PunchHoleResponse {
  974. failure: punch_hole_response::Failure::ID_NOT_EXIST.into(),
  975. ..Default::default()
  976. });
  977. Ok((msg_out, None))
  978. }
  979. }
  980. #[inline]
  981. async fn handle_online_request(
  982. &mut self,
  983. stream: &mut FramedStream,
  984. peers: Vec<String>,
  985. ) -> ResultType<()> {
  986. let mut states = self.peers_online_state(peers).await;
  987. let mut msg_out = RendezvousMessage::new();
  988. msg_out.set_online_response(OnlineResponse {
  989. states: states.into(),
  990. ..Default::default()
  991. });
  992. stream.send(&msg_out).await?;
  993. Ok(())
  994. }
  995. #[inline]
  996. async fn send_to_tcp(&mut self, msg: RendezvousMessage, addr: SocketAddr) {
  997. let mut tcp = self.tcp_punch.lock().await.remove(&try_into_v4(addr));
  998. tokio::spawn(async move {
  999. Self::send_to_sink(&mut tcp, msg).await;
  1000. });
  1001. }
  1002. #[inline]
  1003. async fn send_to_sink(sink: &mut Option<Sink>, msg: RendezvousMessage) {
  1004. if let Some(sink) = sink.as_mut() {
  1005. sink.send(&msg).await;
  1006. }
  1007. }
  1008. #[inline]
  1009. async fn send_to_tcp_sync(
  1010. &mut self,
  1011. msg: RendezvousMessage,
  1012. addr: SocketAddr,
  1013. ) -> ResultType<()> {
  1014. let mut sink = self.tcp_punch.lock().await.remove(&try_into_v4(addr));
  1015. Self::send_to_sink(&mut sink, msg).await;
  1016. Ok(())
  1017. }
  1018. #[inline]
  1019. async fn handle_tcp_punch_hole_request(
  1020. &mut self,
  1021. addr: SocketAddr,
  1022. ph: PunchHoleRequest,
  1023. key: &str,
  1024. ws: bool,
  1025. ) -> ResultType<()> {
  1026. let (msg, to_addr) = self.handle_punch_hole_request(addr, ph, key, ws).await?;
  1027. if let Some(addr) = to_addr {
  1028. let mut sink = self.ws_map.lock().await.remove(&try_into_v4(addr));
  1029. if let Some(s) = sink.as_mut() {
  1030. s.send(&msg).await;
  1031. } else {
  1032. self.tx.send(Data::Msg(msg.into(), addr))?;
  1033. }
  1034. } else {
  1035. self.send_to_tcp_sync(msg, addr).await?;
  1036. }
  1037. Ok(())
  1038. }
  1039. #[inline]
  1040. async fn handle_udp_punch_hole_request(
  1041. &mut self,
  1042. addr: SocketAddr,
  1043. ph: PunchHoleRequest,
  1044. key: &str,
  1045. ) -> ResultType<()> {
  1046. let (msg, to_addr) = self.handle_punch_hole_request(addr, ph, key, false).await?;
  1047. self.tx.send(Data::Msg(
  1048. msg.into(),
  1049. match to_addr {
  1050. Some(addr) => addr,
  1051. None => addr,
  1052. },
  1053. ))?;
  1054. Ok(())
  1055. }
  1056. async fn check_ip_blocker(&self, ip: &str, id: &str) -> bool {
  1057. let mut lock = IP_BLOCKER.lock().await;
  1058. let now = Instant::now();
  1059. if let Some(old) = lock.get_mut(ip) {
  1060. let counter = &mut old.0;
  1061. if counter.1.elapsed().as_secs() > IP_BLOCK_DUR {
  1062. counter.0 = 0;
  1063. } else if counter.0 > 30 {
  1064. return false;
  1065. }
  1066. counter.0 += 1;
  1067. counter.1 = now;
  1068. let counter = &mut old.1;
  1069. let is_new = counter.0.get(id).is_none();
  1070. if counter.1.elapsed().as_secs() > DAY_SECONDS {
  1071. counter.0.clear();
  1072. } else if counter.0.len() > 300 {
  1073. return !is_new;
  1074. }
  1075. if is_new {
  1076. counter.0.insert(id.to_owned());
  1077. }
  1078. counter.1 = now;
  1079. } else {
  1080. lock.insert(ip.to_owned(), ((0, now), (Default::default(), now)));
  1081. }
  1082. true
  1083. }
  1084. fn parse_relay_servers(&mut self, relay_servers: &str) {
  1085. let rs = get_servers(relay_servers, "relay-servers");
  1086. self.relay_servers0 = Arc::new(rs);
  1087. self.relay_servers = self.relay_servers0.clone();
  1088. }
  1089. fn get_relay_server(&self, _pa: IpAddr, _pb: IpAddr) -> String {
  1090. if self.relay_servers.is_empty() {
  1091. return "".to_owned();
  1092. } else if self.relay_servers.len() == 1 {
  1093. return self.relay_servers[0].clone();
  1094. }
  1095. let i = ROTATION_RELAY_SERVER.fetch_add(1, Ordering::SeqCst) % self.relay_servers.len();
  1096. self.relay_servers[i].clone()
  1097. }
  1098. async fn check_cmd(&self, cmd: &str) -> String {
  1099. use std::fmt::Write as _;
  1100. let mut res = "".to_owned();
  1101. let mut fds = cmd.trim().split(' ');
  1102. match fds.next() {
  1103. Some("h") => {
  1104. res = format!(
  1105. "{}\n{}\n{}\n{}\n{}\n{}\n{}\n",
  1106. "relay-servers(rs) <separated by ,>",
  1107. "reload-geo(rg)",
  1108. "ip-blocker(ib) [<ip>|<number>] [-]",
  1109. "ip-changes(ic) [<id>|<number>] [-]",
  1110. "always-use-relay(aur) [Y|N]",
  1111. "test-geo(tg) <ip1> <ip2>",
  1112. "must-login(ml) [Y|N]",
  1113. )
  1114. }
  1115. Some("relay-servers" | "rs") => {
  1116. if let Some(rs) = fds.next() {
  1117. self.tx.send(Data::RelayServers0(rs.to_owned())).ok();
  1118. } else {
  1119. for ip in self.relay_servers.iter() {
  1120. let _ = writeln!(res, "{ip}");
  1121. }
  1122. }
  1123. }
  1124. Some("ip-blocker" | "ib") => {
  1125. let mut lock = IP_BLOCKER.lock().await;
  1126. lock.retain(|&_, (a, b)| {
  1127. a.1.elapsed().as_secs() <= IP_BLOCK_DUR
  1128. || b.1.elapsed().as_secs() <= DAY_SECONDS
  1129. });
  1130. res = format!("{}\n", lock.len());
  1131. let ip = fds.next();
  1132. let mut start = ip.map(|x| x.parse::<i32>().unwrap_or(-1)).unwrap_or(-1);
  1133. if start < 0 {
  1134. if let Some(ip) = ip {
  1135. if let Some((a, b)) = lock.get(ip) {
  1136. let _ = writeln!(
  1137. res,
  1138. "{}/{}s {}/{}s",
  1139. a.0,
  1140. a.1.elapsed().as_secs(),
  1141. b.0.len(),
  1142. b.1.elapsed().as_secs()
  1143. );
  1144. }
  1145. if fds.next() == Some("-") {
  1146. lock.remove(ip);
  1147. }
  1148. } else {
  1149. start = 0;
  1150. }
  1151. }
  1152. if start >= 0 {
  1153. let mut it = lock.iter();
  1154. for i in 0..(start + 10) {
  1155. let x = it.next();
  1156. if x.is_none() {
  1157. break;
  1158. }
  1159. if i < start {
  1160. continue;
  1161. }
  1162. if let Some((ip, (a, b))) = x {
  1163. let _ = writeln!(
  1164. res,
  1165. "{}: {}/{}s {}/{}s",
  1166. ip,
  1167. a.0,
  1168. a.1.elapsed().as_secs(),
  1169. b.0.len(),
  1170. b.1.elapsed().as_secs()
  1171. );
  1172. }
  1173. }
  1174. }
  1175. }
  1176. Some("ip-changes" | "ic") => {
  1177. let mut lock = IP_CHANGES.lock().await;
  1178. lock.retain(|&_, v| v.0.elapsed().as_secs() < IP_CHANGE_DUR_X2 && v.1.len() > 1);
  1179. res = format!("{}\n", lock.len());
  1180. let id = fds.next();
  1181. let mut start = id.map(|x| x.parse::<i32>().unwrap_or(-1)).unwrap_or(-1);
  1182. if !(0..=10_000_000).contains(&start) {
  1183. if let Some(id) = id {
  1184. if let Some((tm, ips)) = lock.get(id) {
  1185. let _ = writeln!(res, "{}s {:?}", tm.elapsed().as_secs(), ips);
  1186. }
  1187. if fds.next() == Some("-") {
  1188. lock.remove(id);
  1189. }
  1190. } else {
  1191. start = 0;
  1192. }
  1193. }
  1194. if start >= 0 {
  1195. let mut it = lock.iter();
  1196. for i in 0..(start + 10) {
  1197. let x = it.next();
  1198. if x.is_none() {
  1199. break;
  1200. }
  1201. if i < start {
  1202. continue;
  1203. }
  1204. if let Some((id, (tm, ips))) = x {
  1205. let _ = writeln!(res, "{}: {}s {:?}", id, tm.elapsed().as_secs(), ips,);
  1206. }
  1207. }
  1208. }
  1209. }
  1210. Some("always-use-relay" | "aur") => {
  1211. if let Some(rs) = fds.next() {
  1212. if rs.to_uppercase() == "Y" {
  1213. ALWAYS_USE_RELAY.store(true, Ordering::SeqCst);
  1214. } else {
  1215. ALWAYS_USE_RELAY.store(false, Ordering::SeqCst);
  1216. }
  1217. self.tx.send(Data::RelayServers0(rs.to_owned())).ok();
  1218. } else {
  1219. let _ = writeln!(
  1220. res,
  1221. "ALWAYS_USE_RELAY: {:?}",
  1222. ALWAYS_USE_RELAY.load(Ordering::SeqCst)
  1223. );
  1224. }
  1225. }
  1226. Some("test-geo" | "tg") => {
  1227. if let Some(rs) = fds.next() {
  1228. if let Ok(a) = rs.parse::<IpAddr>() {
  1229. if let Some(rs) = fds.next() {
  1230. if let Ok(b) = rs.parse::<IpAddr>() {
  1231. res = format!("{:?}", self.get_relay_server(a, b));
  1232. }
  1233. } else {
  1234. res = format!("{:?}", self.get_relay_server(a, a));
  1235. }
  1236. }
  1237. }
  1238. }
  1239. Some("must-login" | "ml") => {
  1240. if let Some(rs) = fds.next() {
  1241. if rs.to_uppercase() == "Y" {
  1242. MUST_LOGIN.store(true, Ordering::SeqCst);
  1243. } else {
  1244. MUST_LOGIN.store(false, Ordering::SeqCst);
  1245. }
  1246. } else {
  1247. let _ = writeln!(res, "MUST_LOGIN: {:?}", MUST_LOGIN.load(Ordering::SeqCst));
  1248. }
  1249. }
  1250. _ => {}
  1251. }
  1252. res
  1253. }
  1254. async fn handle_listener2(&self, stream: TcpStream, addr: SocketAddr) {
  1255. let mut rs = self.clone();
  1256. let ip = try_into_v4(addr).ip();
  1257. if ip.is_loopback() {
  1258. tokio::spawn(async move {
  1259. let mut stream = stream;
  1260. let mut buffer = [0; 1024];
  1261. if let Ok(Ok(n)) = timeout(1000, stream.read(&mut buffer[..])).await {
  1262. if let Ok(data) = std::str::from_utf8(&buffer[..n]) {
  1263. let res = rs.check_cmd(data).await;
  1264. stream.write(res.as_bytes()).await.ok();
  1265. }
  1266. }
  1267. });
  1268. return;
  1269. }
  1270. let stream = FramedStream::from(stream, addr);
  1271. tokio::spawn(async move {
  1272. let mut stream = stream;
  1273. if let Some(Ok(bytes)) = stream.next_timeout(30_000).await {
  1274. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
  1275. match msg_in.union {
  1276. Some(rendezvous_message::Union::TestNatRequest(_)) => {
  1277. let mut msg_out = RendezvousMessage::new();
  1278. msg_out.set_test_nat_response(TestNatResponse {
  1279. port: addr.port() as _,
  1280. ..Default::default()
  1281. });
  1282. stream.send(&msg_out).await.ok();
  1283. }
  1284. Some(rendezvous_message::Union::OnlineRequest(or)) => {
  1285. allow_err!(rs.handle_online_request(&mut stream, or.peers).await);
  1286. }
  1287. _ => {}
  1288. }
  1289. }
  1290. }
  1291. });
  1292. }
  1293. async fn handle_listener(&self, stream: TcpStream, addr: SocketAddr, key: &str, ws: bool) {
  1294. log::debug!("Tcp connection from {:?}, ws: {}", addr, ws);
  1295. let mut rs = self.clone();
  1296. let key = key.to_owned();
  1297. tokio::spawn(async move {
  1298. allow_err!(rs.handle_listener_inner(stream, addr, &key, ws).await);
  1299. });
  1300. }
  1301. #[inline]
  1302. async fn handle_listener_inner(
  1303. &mut self,
  1304. stream: TcpStream,
  1305. mut addr: SocketAddr,
  1306. key: &str,
  1307. ws: bool,
  1308. ) -> ResultType<()> {
  1309. let mut sink;
  1310. if ws {
  1311. use tokio_tungstenite::tungstenite::handshake::server::{Request, Response};
  1312. let callback = |req: &Request, response: Response| {
  1313. let headers = req.headers();
  1314. let real_ip = headers
  1315. .get("X-Real-IP")
  1316. .or_else(|| headers.get("X-Forwarded-For"))
  1317. .and_then(|header_value| header_value.to_str().ok());
  1318. if let Some(ip) = real_ip {
  1319. if ip.contains('.') {
  1320. addr = format!("{ip}:0").parse().unwrap_or(addr);
  1321. } else {
  1322. addr = format!("[{ip}]:0").parse().unwrap_or(addr);
  1323. }
  1324. }
  1325. Ok(response)
  1326. };
  1327. let ws_stream = tokio_tungstenite::accept_hdr_async(stream, callback).await?;
  1328. let (a, mut b) = ws_stream.split();
  1329. sink = Some(Sink::Wss(SafeWsSink {
  1330. sink: a,
  1331. encrypt: None,
  1332. }));
  1333. while let Ok(Some(Ok(msg))) = timeout(30_000, b.next()).await {
  1334. if let tungstenite::Message::Binary(bytes) = msg {
  1335. if !self.handle_tcp(&bytes, &mut sink, addr, key, ws).await {
  1336. break;
  1337. }
  1338. }
  1339. }
  1340. } else {
  1341. let (a, mut b) = Framed::new(stream, BytesCodec::new()).split();
  1342. sink = Some(Sink::Tss(SafeTcpStreamSink {
  1343. sink: a,
  1344. encrypt: None,
  1345. }));
  1346. // Avoid key exchange if answering on nat helper port
  1347. if !key.is_empty() {
  1348. self.key_exchange_phase1(addr, &mut sink).await;
  1349. }
  1350. while let Ok(Some(Ok(mut bytes))) = timeout(30_000, b.next()).await {
  1351. // log::debug!("receive tcp data from {:?} {:?}", addr, bytes);
  1352. if let Some(Sink::Tss(s)) = sink.as_mut() {
  1353. if let Some(key) = s.encrypt.as_mut() {
  1354. if let Err(err) = key.dec(&mut bytes) {
  1355. log::error!("dec tcp data from {:?} err: {:?}", addr, err);
  1356. break;
  1357. }
  1358. }
  1359. }
  1360. if !self.handle_tcp(&bytes, &mut sink, addr, key, ws).await {
  1361. break;
  1362. }
  1363. }
  1364. }
  1365. if sink.is_none() {
  1366. self.tcp_punch.lock().await.remove(&try_into_v4(addr));
  1367. }
  1368. log::debug!("Tcp connection from {:?} closed", addr);
  1369. Ok(())
  1370. }
  1371. #[inline]
  1372. async fn get_pk(&mut self, version: &str, id: String) -> Bytes {
  1373. if version.is_empty() || self.inner.sk.is_none() {
  1374. Bytes::new()
  1375. } else {
  1376. match self.pm.get(&id).await {
  1377. Some(peer) => {
  1378. let pk = peer.read().await.pk.clone();
  1379. sign::sign(
  1380. &hbb_common::message_proto::IdPk {
  1381. id,
  1382. pk,
  1383. ..Default::default()
  1384. }
  1385. .write_to_bytes()
  1386. .unwrap_or_default(),
  1387. self.inner.sk.as_ref().unwrap(),
  1388. )
  1389. .into()
  1390. }
  1391. _ => Bytes::new(),
  1392. }
  1393. }
  1394. }
  1395. #[inline]
  1396. fn get_server_sk(key: &str) -> (String, Option<sign::SecretKey>) {
  1397. let mut out_sk = None;
  1398. let mut key = key.to_owned();
  1399. if let Ok(sk) = base64::decode(&key) {
  1400. if sk.len() == sign::SECRETKEYBYTES {
  1401. log::info!("The key is a crypto private key");
  1402. key = base64::encode(&sk[(sign::SECRETKEYBYTES / 2)..]);
  1403. let mut tmp = [0u8; sign::SECRETKEYBYTES];
  1404. tmp[..].copy_from_slice(&sk);
  1405. out_sk = Some(sign::SecretKey(tmp));
  1406. }
  1407. }
  1408. if key.is_empty() || key == "-" || key == "_" {
  1409. let (pk, sk) = crate::common::gen_sk(0);
  1410. out_sk = sk;
  1411. if !key.is_empty() {
  1412. key = pk;
  1413. }
  1414. }
  1415. if !key.is_empty() {
  1416. log::info!("Key: {}", key);
  1417. }
  1418. (key, out_sk)
  1419. }
  1420. #[inline]
  1421. fn is_lan(&self, addr: SocketAddr) -> bool {
  1422. if let Some(network) = &self.inner.mask {
  1423. match addr {
  1424. SocketAddr::V4(v4_socket_addr) => {
  1425. return network.contains(*v4_socket_addr.ip());
  1426. }
  1427. SocketAddr::V6(v6_socket_addr) => {
  1428. if let Some(v4_addr) = v6_socket_addr.ip().to_ipv4() {
  1429. return network.contains(v4_addr);
  1430. }
  1431. }
  1432. }
  1433. }
  1434. false
  1435. }
  1436. async fn key_exchange_phase1(&mut self, addr: SocketAddr, sink: &mut Option<Sink>) {
  1437. let mut msg_out = RendezvousMessage::new();
  1438. log::debug!("KeyExchange phase 1: send our pk for this tcp connection in a message signed with our server key");
  1439. let sk = &self.inner.sk;
  1440. match sk {
  1441. Some(sk) => {
  1442. let our_pk_b = self.inner.secure_tcp_pk_b.clone();
  1443. let sm = sign::sign(&our_pk_b.0, &sk);
  1444. let bytes_sm = Bytes::from(sm);
  1445. msg_out.set_key_exchange(KeyExchange {
  1446. keys: vec![bytes_sm],
  1447. ..Default::default()
  1448. });
  1449. log::trace!(
  1450. "KeyExchange {:?} -> bytes: {:?}",
  1451. addr,
  1452. hex::encode(Bytes::from(msg_out.write_to_bytes().unwrap()))
  1453. );
  1454. Self::send_to_sink(sink, msg_out).await;
  1455. }
  1456. None => {}
  1457. }
  1458. }
  1459. }
  1460. async fn check_relay_servers(rs0: Arc<RelayServers>, tx: Sender) {
  1461. let mut futs = Vec::new();
  1462. let rs = Arc::new(Mutex::new(Vec::new()));
  1463. for x in rs0.iter() {
  1464. let mut host = x.to_owned();
  1465. if !host.contains(':') {
  1466. host = format!("{}:{}", host, config::RELAY_PORT);
  1467. }
  1468. let rs = rs.clone();
  1469. let x = x.clone();
  1470. futs.push(tokio::spawn(async move {
  1471. if FramedStream::new(&host, None, CHECK_RELAY_TIMEOUT)
  1472. .await
  1473. .is_ok()
  1474. {
  1475. rs.lock().await.push(x);
  1476. }
  1477. }));
  1478. }
  1479. join_all(futs).await;
  1480. log::debug!("check_relay_servers");
  1481. let rs = std::mem::take(&mut *rs.lock().await);
  1482. if !rs.is_empty() {
  1483. tx.send(Data::RelayServers(rs)).ok();
  1484. }
  1485. }
  1486. // temp solution to solve udp socket failure
  1487. async fn test_hbbs(addr: SocketAddr) -> ResultType<()> {
  1488. let mut addr = addr;
  1489. if addr.ip().is_unspecified() {
  1490. addr.set_ip(if addr.is_ipv4() {
  1491. IpAddr::V4(Ipv4Addr::LOCALHOST)
  1492. } else {
  1493. IpAddr::V6(Ipv6Addr::LOCALHOST)
  1494. });
  1495. }
  1496. let mut socket = FramedSocket::new(config::Config::get_any_listen_addr(addr.is_ipv4())).await?;
  1497. let mut msg_out = RendezvousMessage::new();
  1498. msg_out.set_register_peer(RegisterPeer {
  1499. id: "(:test_hbbs:)".to_owned(),
  1500. ..Default::default()
  1501. });
  1502. let mut last_time_recv = Instant::now();
  1503. let mut timer = interval(Duration::from_secs(1));
  1504. loop {
  1505. tokio::select! {
  1506. _ = timer.tick() => {
  1507. if last_time_recv.elapsed().as_secs() > 12 {
  1508. bail!("Timeout of test_hbbs");
  1509. }
  1510. socket.send(&msg_out, addr).await?;
  1511. }
  1512. Some(Ok((bytes, _))) = socket.next() => {
  1513. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
  1514. log::trace!("Recv {:?} of test_hbbs", msg_in);
  1515. last_time_recv = Instant::now();
  1516. }
  1517. }
  1518. }
  1519. }
  1520. }
  1521. #[inline]
  1522. async fn send_rk_res(
  1523. socket: &mut FramedSocket,
  1524. addr: SocketAddr,
  1525. res: register_pk_response::Result,
  1526. ) -> ResultType<()> {
  1527. let mut msg_out = RendezvousMessage::new();
  1528. msg_out.set_register_pk_response(RegisterPkResponse {
  1529. result: res.into(),
  1530. ..Default::default()
  1531. });
  1532. socket.send(&msg_out, addr).await
  1533. }
  1534. async fn create_udp_listener(port: i32, rmem: usize) -> ResultType<FramedSocket> {
  1535. let addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), port as _);
  1536. if let Ok(s) = FramedSocket::new_reuse(&addr, true, rmem).await {
  1537. log::debug!("listen on udp {:?}", s.local_addr());
  1538. return Ok(s);
  1539. }
  1540. let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port as _);
  1541. let s = FramedSocket::new_reuse(&addr, true, rmem).await?;
  1542. log::debug!("listen on udp {:?}", s.local_addr());
  1543. Ok(s)
  1544. }
  1545. #[inline]
  1546. async fn create_tcp_listener(port: i32) -> ResultType<TcpListener> {
  1547. let s = listen_any(port as _).await?;
  1548. log::debug!("listen on tcp {:?}", s.local_addr());
  1549. Ok(s)
  1550. }
  1551. fn get_symetric_key_from_msg(
  1552. our_sk_b: [u8; 32],
  1553. their_pk_b: [u8; 32],
  1554. sealed_value: &[u8; 48],
  1555. ) -> [u8; 32] {
  1556. let their_pk_b = box_::PublicKey(their_pk_b);
  1557. let nonce = box_::Nonce([0u8; box_::NONCEBYTES]);
  1558. let sk = box_::SecretKey(our_sk_b);
  1559. let key = box_::open(sealed_value, &nonce, &their_pk_b, &sk);
  1560. match key {
  1561. Ok(key) => {
  1562. let mut key_array = [0u8; 32];
  1563. key_array.copy_from_slice(&key);
  1564. key_array
  1565. }
  1566. Err(e) => panic!("Error while opening the seal key{:?}", e),
  1567. }
  1568. }