rendezvous_server.rs 57 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505
  1. use crate::common::*;
  2. use crate::peer::*;
  3. use hbb_common::bytes::BufMut;
  4. use hbb_common::{
  5. allow_err, bail,
  6. bytes::{Bytes, BytesMut},
  7. bytes_codec::BytesCodec,
  8. config,
  9. futures::future::join_all,
  10. futures_util::{
  11. sink::SinkExt,
  12. stream::{SplitSink, StreamExt},
  13. },
  14. log,
  15. protobuf::{Message as _, MessageField},
  16. rendezvous_proto::{
  17. register_pk_response::Result::{INVALID_ID_FORMAT, TOO_FREQUENT, UUID_MISMATCH},
  18. *,
  19. },
  20. sodiumoxide::crypto::{
  21. box_, box_::PublicKey, box_::SecretKey, secretbox, secretbox::Key, secretbox::Nonce, sign,
  22. },
  23. sodiumoxide::hex,
  24. tcp,
  25. tcp::Encrypt,
  26. tcp::{listen_any, FramedStream},
  27. timeout,
  28. tokio::{
  29. self,
  30. io::{AsyncReadExt, AsyncWriteExt},
  31. net::{TcpListener, TcpStream},
  32. sync::{mpsc, Mutex},
  33. time::{interval, Duration},
  34. },
  35. tokio_util::codec::Framed,
  36. try_into_v4,
  37. udp::FramedSocket,
  38. AddrMangle, ResultType,
  39. };
  40. use ipnetwork::Ipv4Network;
  41. use std::{
  42. collections::HashMap,
  43. net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
  44. sync::atomic::{AtomicBool, AtomicUsize, Ordering},
  45. sync::Arc,
  46. time::Instant,
  47. };
  48. #[derive(Clone, Debug)]
  49. enum Data {
  50. Msg(Box<RendezvousMessage>, SocketAddr),
  51. RelayServers0(String),
  52. RelayServers(RelayServers),
  53. }
  54. const REG_TIMEOUT: i32 = 30_000;
  55. type TcpStreamSink = SplitSink<Framed<TcpStream, BytesCodec>, Bytes>;
  56. type WsSink = SplitSink<tokio_tungstenite::WebSocketStream<TcpStream>, tungstenite::Message>;
  57. enum SinkType {
  58. TcpStream(TcpStreamSink),
  59. Ws(WsSink),
  60. }
  61. struct Sink {
  62. tx: SinkType,
  63. key: Arc<Mutex<Option<Encrypt>>>,
  64. }
  65. type Sender = mpsc::UnboundedSender<Data>;
  66. type Receiver = mpsc::UnboundedReceiver<Data>;
  67. static ROTATION_RELAY_SERVER: AtomicUsize = AtomicUsize::new(0);
  68. type RelayServers = Vec<String>;
  69. const CHECK_RELAY_TIMEOUT: u64 = 3_000;
  70. static ALWAYS_USE_RELAY: AtomicBool = AtomicBool::new(false);
  71. static MUST_LOGIN: AtomicBool = AtomicBool::new(false);
  72. #[derive(Clone)]
  73. struct Inner {
  74. serial: i32,
  75. version: String,
  76. software_url: String,
  77. mask: Option<Ipv4Network>,
  78. local_ip: String,
  79. sk: Option<sign::SecretKey>,
  80. secure_tcp_pk_b: PublicKey,
  81. secure_tcp_sk_b: SecretKey,
  82. }
  83. #[derive(Clone)]
  84. pub struct RendezvousServer {
  85. tcp_punch: Arc<Mutex<HashMap<SocketAddr, Sink>>>,
  86. pm: PeerMap,
  87. tx: Sender,
  88. relay_servers: Arc<RelayServers>,
  89. relay_servers0: Arc<RelayServers>,
  90. rendezvous_servers: Arc<Vec<String>>,
  91. inner: Arc<Inner>,
  92. }
  93. enum LoopFailure {
  94. UdpSocket,
  95. Listener3,
  96. Listener2,
  97. Listener,
  98. }
  99. impl RendezvousServer {
  100. #[tokio::main(flavor = "multi_thread")]
  101. pub async fn start(port: i32, serial: i32, key: &str, rmem: usize) -> ResultType<()> {
  102. let (key, sk) = Self::get_server_sk(key);
  103. let nat_port = port - 1;
  104. let ws_port = port + 2;
  105. let pm = PeerMap::new().await?;
  106. log::info!("serial={}", serial);
  107. let rendezvous_servers = get_servers(&get_arg("rendezvous-servers"), "rendezvous-servers");
  108. log::info!("Listening on tcp/udp :{}", port);
  109. log::info!("Listening on tcp :{}, extra port for NAT test", nat_port);
  110. log::info!("Listening on websocket :{}", ws_port);
  111. let mut socket = create_udp_listener(port, rmem).await?;
  112. let (tx, mut rx) = mpsc::unbounded_channel::<Data>();
  113. let software_url = get_arg("software-url");
  114. let version = hbb_common::get_version_from_url(&software_url);
  115. if !version.is_empty() {
  116. log::info!("software_url: {}, version: {}", software_url, version);
  117. }
  118. let mask = get_arg("mask").parse().ok();
  119. let local_ip = if mask.is_none() {
  120. "".to_owned()
  121. } else {
  122. get_arg_or(
  123. "local-ip",
  124. local_ip_address::local_ip()
  125. .map(|x| x.to_string())
  126. .unwrap_or_default(),
  127. )
  128. };
  129. // For privacy use per connection key pair
  130. let (secure_tcp_pk_b, secure_tcp_sk_b) = box_::gen_keypair();
  131. let mut rs = Self {
  132. tcp_punch: Arc::new(Mutex::new(HashMap::new())),
  133. pm,
  134. tx: tx.clone(),
  135. relay_servers: Default::default(),
  136. relay_servers0: Default::default(),
  137. rendezvous_servers: Arc::new(rendezvous_servers),
  138. inner: Arc::new(Inner {
  139. serial,
  140. version,
  141. software_url,
  142. sk,
  143. mask,
  144. local_ip,
  145. secure_tcp_pk_b,
  146. secure_tcp_sk_b,
  147. }),
  148. };
  149. log::info!("mask: {:?}", rs.inner.mask);
  150. log::info!("local-ip: {:?}", rs.inner.local_ip);
  151. std::env::set_var("PORT_FOR_API", port.to_string());
  152. rs.parse_relay_servers(&get_arg("relay-servers"));
  153. let mut listener = create_tcp_listener(port).await?;
  154. let mut listener2 = create_tcp_listener(nat_port).await?;
  155. let mut listener3 = create_tcp_listener(ws_port).await?;
  156. let test_addr = std::env::var("TEST_HBBS").unwrap_or_default();
  157. if std::env::var("ALWAYS_USE_RELAY")
  158. .unwrap_or_default()
  159. .to_uppercase()
  160. == "Y"
  161. {
  162. ALWAYS_USE_RELAY.store(true, Ordering::SeqCst);
  163. }
  164. log::info!(
  165. "ALWAYS_USE_RELAY={}",
  166. if ALWAYS_USE_RELAY.load(Ordering::SeqCst) {
  167. "Y"
  168. } else {
  169. "N"
  170. }
  171. );
  172. let must_login = get_arg("must-login");
  173. log::debug!("must_login={}", must_login);
  174. if must_login.to_uppercase() == "Y" ||
  175. (must_login == "" && std::env::var("MUST_LOGIN")
  176. .unwrap_or_default()
  177. .to_uppercase()
  178. == "Y") {
  179. MUST_LOGIN.store(true, Ordering::SeqCst);
  180. }
  181. log::info!(
  182. "MUST_LOGIN={}",
  183. if MUST_LOGIN.load(Ordering::SeqCst) {
  184. "Y"
  185. } else {
  186. "N"
  187. }
  188. );
  189. if test_addr.to_lowercase() != "no" {
  190. let test_addr = if test_addr.is_empty() {
  191. listener.local_addr()?
  192. } else {
  193. test_addr.parse()?
  194. };
  195. tokio::spawn(async move {
  196. if let Err(err) = test_hbbs(test_addr).await {
  197. if test_addr.is_ipv6() && test_addr.ip().is_unspecified() {
  198. let mut test_addr = test_addr;
  199. test_addr.set_ip(IpAddr::V4(Ipv4Addr::UNSPECIFIED));
  200. if let Err(err) = test_hbbs(test_addr).await {
  201. log::error!("Failed to run hbbs test with {test_addr}: {err}");
  202. std::process::exit(1);
  203. }
  204. } else {
  205. log::error!("Failed to run hbbs test with {test_addr}: {err}");
  206. std::process::exit(1);
  207. }
  208. }
  209. });
  210. };
  211. let main_task = async move {
  212. loop {
  213. log::info!("Start");
  214. match rs
  215. .io_loop(
  216. &mut rx,
  217. &mut listener,
  218. &mut listener2,
  219. &mut listener3,
  220. &mut socket,
  221. &key,
  222. )
  223. .await
  224. {
  225. LoopFailure::UdpSocket => {
  226. drop(socket);
  227. socket = create_udp_listener(port, rmem).await?;
  228. }
  229. LoopFailure::Listener => {
  230. drop(listener);
  231. listener = create_tcp_listener(port).await?;
  232. }
  233. LoopFailure::Listener2 => {
  234. drop(listener2);
  235. listener2 = create_tcp_listener(nat_port).await?;
  236. }
  237. LoopFailure::Listener3 => {
  238. drop(listener3);
  239. listener3 = create_tcp_listener(ws_port).await?;
  240. }
  241. }
  242. }
  243. };
  244. let listen_signal = listen_signal();
  245. tokio::select!(
  246. res = main_task => res,
  247. res = listen_signal => res,
  248. )
  249. }
  250. async fn io_loop(
  251. &mut self,
  252. rx: &mut Receiver,
  253. listener: &mut TcpListener,
  254. listener2: &mut TcpListener,
  255. listener3: &mut TcpListener,
  256. socket: &mut FramedSocket,
  257. key: &str,
  258. ) -> LoopFailure {
  259. let mut timer_check_relay = interval(Duration::from_millis(CHECK_RELAY_TIMEOUT));
  260. loop {
  261. tokio::select! {
  262. _ = timer_check_relay.tick() => {
  263. if self.relay_servers0.len() > 1 {
  264. let rs = self.relay_servers0.clone();
  265. let tx = self.tx.clone();
  266. tokio::spawn(async move {
  267. check_relay_servers(rs, tx).await;
  268. });
  269. }
  270. }
  271. Some(data) = rx.recv() => {
  272. match data {
  273. Data::Msg(msg, addr) => { allow_err!(socket.send(msg.as_ref(), addr).await); }
  274. Data::RelayServers0(rs) => { self.parse_relay_servers(&rs); }
  275. Data::RelayServers(rs) => { self.relay_servers = Arc::new(rs); }
  276. }
  277. }
  278. res = socket.next() => {
  279. match res {
  280. Some(Ok((bytes, addr))) => {
  281. if let Err(err) = self.handle_udp(&bytes, addr.into(), socket, key).await {
  282. log::error!("udp failure: {}", err);
  283. return LoopFailure::UdpSocket;
  284. }
  285. }
  286. Some(Err(err)) => {
  287. log::error!("udp failure: {}", err);
  288. return LoopFailure::UdpSocket;
  289. }
  290. None => {
  291. // unreachable!() ?
  292. }
  293. }
  294. }
  295. res = listener2.accept() => {
  296. match res {
  297. Ok((stream, addr)) => {
  298. stream.set_nodelay(true).ok();
  299. self.handle_listener2(stream, addr).await;
  300. }
  301. Err(err) => {
  302. log::error!("listener2.accept failed: {}", err);
  303. return LoopFailure::Listener2;
  304. }
  305. }
  306. }
  307. res = listener3.accept() => {
  308. match res {
  309. Ok((stream, addr)) => {
  310. stream.set_nodelay(true).ok();
  311. self.handle_listener(stream, addr, key, true).await;
  312. }
  313. Err(err) => {
  314. log::error!("listener3.accept failed: {}", err);
  315. return LoopFailure::Listener3;
  316. }
  317. }
  318. }
  319. res = listener.accept() => {
  320. match res {
  321. Ok((stream, addr)) => {
  322. stream.set_nodelay(true).ok();
  323. self.handle_listener(stream, addr, key, false).await;
  324. }
  325. Err(err) => {
  326. log::error!("listener.accept failed: {}", err);
  327. return LoopFailure::Listener;
  328. }
  329. }
  330. }
  331. }
  332. }
  333. }
  334. #[inline]
  335. async fn handle_udp(
  336. &mut self,
  337. bytes: &BytesMut,
  338. addr: SocketAddr,
  339. socket: &mut FramedSocket,
  340. key: &str,
  341. ) -> ResultType<()> {
  342. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(bytes) {
  343. match msg_in.union {
  344. Some(rendezvous_message::Union::RegisterPeer(rp)) => {
  345. // B registered
  346. if !rp.id.is_empty() {
  347. log::trace!("New peer registered: {:?} {:?}", &rp.id, &addr);
  348. self.update_addr(rp.id, addr, socket).await?;
  349. if self.inner.serial > rp.serial {
  350. let mut msg_out = RendezvousMessage::new();
  351. msg_out.set_configure_update(ConfigUpdate {
  352. serial: self.inner.serial,
  353. rendezvous_servers: (*self.rendezvous_servers).clone(),
  354. ..Default::default()
  355. });
  356. socket.send(&msg_out, addr).await?;
  357. }
  358. }
  359. }
  360. Some(rendezvous_message::Union::RegisterPk(rk)) => {
  361. if rk.uuid.is_empty() || rk.pk.is_empty() {
  362. return Ok(());
  363. }
  364. let id = rk.id;
  365. let ip = addr.ip().to_string();
  366. if id.len() < 6 {
  367. return send_rk_res(socket, addr, UUID_MISMATCH).await;
  368. } else if !self.check_ip_blocker(&ip, &id).await {
  369. return send_rk_res(socket, addr, TOO_FREQUENT).await;
  370. }
  371. let peer = self.pm.get_or(&id).await;
  372. let (changed, ip_changed) = {
  373. let peer = peer.read().await;
  374. if peer.uuid.is_empty() {
  375. (true, false)
  376. } else {
  377. if peer.uuid == rk.uuid {
  378. if peer.info.ip != ip && peer.pk != rk.pk {
  379. log::warn!(
  380. "Peer {} ip/pk mismatch: {}/{:?} vs {}/{:?}",
  381. id,
  382. ip,
  383. rk.pk,
  384. peer.info.ip,
  385. peer.pk,
  386. );
  387. drop(peer);
  388. return send_rk_res(socket, addr, UUID_MISMATCH).await;
  389. }
  390. } else {
  391. log::warn!(
  392. "Peer {} uuid mismatch: {:?} vs {:?}",
  393. id,
  394. rk.uuid,
  395. peer.uuid
  396. );
  397. drop(peer);
  398. return send_rk_res(socket, addr, UUID_MISMATCH).await;
  399. }
  400. let ip_changed = peer.info.ip != ip;
  401. (
  402. peer.uuid != rk.uuid || peer.pk != rk.pk || ip_changed,
  403. ip_changed,
  404. )
  405. }
  406. };
  407. let mut req_pk = peer.read().await.reg_pk;
  408. if req_pk.1.elapsed().as_secs() > 6 {
  409. req_pk.0 = 0;
  410. } else if req_pk.0 > 2 {
  411. return send_rk_res(socket, addr, TOO_FREQUENT).await;
  412. }
  413. req_pk.0 += 1;
  414. req_pk.1 = Instant::now();
  415. peer.write().await.reg_pk = req_pk;
  416. if ip_changed {
  417. let mut lock = IP_CHANGES.lock().await;
  418. if let Some((tm, ips)) = lock.get_mut(&id) {
  419. if tm.elapsed().as_secs() > IP_CHANGE_DUR {
  420. *tm = Instant::now();
  421. ips.clear();
  422. ips.insert(ip.clone(), 1);
  423. } else if let Some(v) = ips.get_mut(&ip) {
  424. *v += 1;
  425. } else {
  426. ips.insert(ip.clone(), 1);
  427. }
  428. } else {
  429. lock.insert(
  430. id.clone(),
  431. (Instant::now(), HashMap::from([(ip.clone(), 1)])),
  432. );
  433. }
  434. }
  435. if changed {
  436. self.pm.update_pk(id, peer, addr, rk.uuid, rk.pk, ip).await;
  437. }
  438. let mut msg_out = RendezvousMessage::new();
  439. msg_out.set_register_pk_response(RegisterPkResponse {
  440. result: register_pk_response::Result::OK.into(),
  441. ..Default::default()
  442. });
  443. socket.send(&msg_out, addr).await?
  444. }
  445. Some(rendezvous_message::Union::PunchHoleRequest(ph)) => {
  446. if self.pm.is_in_memory(&ph.id).await {
  447. self.handle_udp_punch_hole_request(addr, ph, key).await?;
  448. } else {
  449. // not in memory, fetch from db with spawn in case blocking me
  450. let mut me = self.clone();
  451. let key = key.to_owned();
  452. tokio::spawn(async move {
  453. allow_err!(me.handle_udp_punch_hole_request(addr, ph, &key).await);
  454. });
  455. }
  456. }
  457. Some(rendezvous_message::Union::PunchHoleSent(phs)) => {
  458. self.handle_hole_sent(phs, addr, Some(socket)).await?;
  459. }
  460. Some(rendezvous_message::Union::LocalAddr(la)) => {
  461. self.handle_local_addr(la, addr, Some(socket)).await?;
  462. }
  463. Some(rendezvous_message::Union::ConfigureUpdate(mut cu)) => {
  464. if try_into_v4(addr).ip().is_loopback() && cu.serial > self.inner.serial {
  465. let mut inner: Inner = (*self.inner).clone();
  466. inner.serial = cu.serial;
  467. self.inner = Arc::new(inner);
  468. self.rendezvous_servers = Arc::new(
  469. cu.rendezvous_servers
  470. .drain(..)
  471. .filter(|x| {
  472. !x.is_empty()
  473. && test_if_valid_server(x, "rendezvous-server").is_ok()
  474. })
  475. .collect(),
  476. );
  477. log::info!(
  478. "configure updated: serial={} rendezvous-servers={:?}",
  479. self.inner.serial,
  480. self.rendezvous_servers
  481. );
  482. }
  483. }
  484. Some(rendezvous_message::Union::SoftwareUpdate(su)) => {
  485. if !self.inner.version.is_empty() && su.url != self.inner.version {
  486. let mut msg_out = RendezvousMessage::new();
  487. msg_out.set_software_update(SoftwareUpdate {
  488. url: self.inner.software_url.clone(),
  489. ..Default::default()
  490. });
  491. socket.send(&msg_out, addr).await?;
  492. }
  493. }
  494. _ => {}
  495. }
  496. }
  497. Ok(())
  498. }
  499. #[inline]
  500. async fn handle_tcp(
  501. &mut self,
  502. bytes: &[u8],
  503. sink: &mut Option<Sink>,
  504. addr: SocketAddr,
  505. key: &str,
  506. ws: bool,
  507. ) -> bool {
  508. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(bytes) {
  509. match msg_in.union {
  510. Some(rendezvous_message::Union::PunchHoleRequest(ph)) => {
  511. // there maybe several attempt, so sink can be none
  512. if let Some(sink) = sink.take() {
  513. self.tcp_punch.lock().await.insert(try_into_v4(addr), sink);
  514. }
  515. allow_err!(self.handle_tcp_punch_hole_request(addr, ph, key, ws).await);
  516. return true;
  517. }
  518. Some(rendezvous_message::Union::RequestRelay(mut rf)) => {
  519. // there maybe several attempt, so sink can be none
  520. if let Some(sink) = sink.take() {
  521. self.tcp_punch.lock().await.insert(try_into_v4(addr), sink);
  522. }
  523. if let Some(peer) = self.pm.get_in_memory(&rf.id).await {
  524. let mut msg_out = RendezvousMessage::new();
  525. rf.socket_addr = AddrMangle::encode(addr).into();
  526. msg_out.set_request_relay(rf);
  527. let peer_addr = peer.read().await.socket_addr;
  528. self.tx.send(Data::Msg(msg_out.into(), peer_addr)).ok();
  529. }
  530. return true;
  531. }
  532. Some(rendezvous_message::Union::RelayResponse(mut rr)) => {
  533. let addr_b = AddrMangle::decode(&rr.socket_addr);
  534. rr.socket_addr = Default::default();
  535. let id = rr.id();
  536. if !id.is_empty() {
  537. let pk = self.get_pk(&rr.version, id.to_owned()).await;
  538. rr.set_pk(pk);
  539. }
  540. let mut msg_out = RendezvousMessage::new();
  541. if !rr.relay_server.is_empty() {
  542. if self.is_lan(addr_b) {
  543. // https://github.com/rustdesk/rustdesk-server/issues/24
  544. rr.relay_server = self.inner.local_ip.clone();
  545. } else if rr.relay_server == self.inner.local_ip {
  546. rr.relay_server = self.get_relay_server(addr.ip(), addr_b.ip());
  547. }
  548. }
  549. msg_out.set_relay_response(rr);
  550. allow_err!(self.send_to_tcp_sync(msg_out, addr_b).await);
  551. }
  552. Some(rendezvous_message::Union::PunchHoleSent(phs)) => {
  553. allow_err!(self.handle_hole_sent(phs, addr, None).await);
  554. }
  555. Some(rendezvous_message::Union::LocalAddr(la)) => {
  556. allow_err!(self.handle_local_addr(la, addr, None).await);
  557. }
  558. Some(rendezvous_message::Union::TestNatRequest(tar)) => {
  559. let mut msg_out = RendezvousMessage::new();
  560. let mut res = TestNatResponse {
  561. port: addr.port() as _,
  562. ..Default::default()
  563. };
  564. if self.inner.serial > tar.serial {
  565. let mut cu = ConfigUpdate::new();
  566. cu.serial = self.inner.serial;
  567. cu.rendezvous_servers = (*self.rendezvous_servers).clone();
  568. res.cu = MessageField::from_option(Some(cu));
  569. }
  570. msg_out.set_test_nat_response(res);
  571. Self::send_to_sink(sink, msg_out).await;
  572. }
  573. Some(rendezvous_message::Union::RegisterPk(_)) => {
  574. let res = register_pk_response::Result::NOT_SUPPORT;
  575. let mut msg_out = RendezvousMessage::new();
  576. msg_out.set_register_pk_response(RegisterPkResponse {
  577. result: res.into(),
  578. ..Default::default()
  579. });
  580. Self::send_to_sink(sink, msg_out).await;
  581. }
  582. Some(rendezvous_message::Union::KeyExchange(ex)) => {
  583. log::trace!("KeyExchange {:?} <- bytes: {:?}", addr, hex::encode(&bytes));
  584. if ex.keys.len() != 2 {
  585. log::error!("Handshake failed: invalid phase 2 key exchange message");
  586. return false;
  587. }
  588. log::trace!("KeyExchange their_pk: {:?}", hex::encode(&ex.keys[0]));
  589. log::trace!("KeyExchange box: {:?}", hex::encode(&ex.keys[1]));
  590. let their_pk: [u8; 32] = ex.keys[0].to_vec().try_into().unwrap();
  591. let cryptobox: [u8; 48] = ex.keys[1].to_vec().try_into().unwrap();
  592. let symetric_key = get_symetric_key_from_msg(
  593. self.inner.secure_tcp_sk_b.0,
  594. their_pk,
  595. &cryptobox,
  596. );
  597. log::debug!("KeyExchange symetric key: {:?}", hex::encode(&symetric_key));
  598. let key = secretbox::Key::from_slice(&symetric_key);
  599. match key {
  600. Some(key) => {
  601. if let Some(sink) = sink.as_mut() {
  602. sink.key.lock().await.replace(Encrypt::new(key));
  603. }
  604. log::debug!("KeyExchange symetric key set");
  605. return true;
  606. }
  607. None => {
  608. log::error!("KeyExchange symetric key NOT set");
  609. return false;
  610. }
  611. }
  612. }
  613. Some(rendezvous_message::Union::OnlineRequest(or)) => {
  614. let mut states = BytesMut::zeroed((or.peers.len() + 7) / 8);
  615. for (i, peer_id) in or.peers.iter().enumerate() {
  616. if let Some(peer) = self.pm.get_in_memory(peer_id).await {
  617. let elapsed = peer.read().await.last_reg_time.elapsed().as_millis() as i32;
  618. // bytes index from left to right
  619. let states_idx = i / 8;
  620. let bit_idx = 7 - i % 8;
  621. if elapsed < REG_TIMEOUT {
  622. states[states_idx] |= 0x01 << bit_idx;
  623. }
  624. }
  625. }
  626. let mut msg_out = RendezvousMessage::new();
  627. msg_out.set_online_response(OnlineResponse {
  628. states: states.into(),
  629. ..Default::default()
  630. });
  631. Self::send_to_sink(sink, msg_out).await;
  632. }
  633. _ => {}
  634. }
  635. }
  636. false
  637. }
  638. #[inline]
  639. async fn update_addr(
  640. &mut self,
  641. id: String,
  642. socket_addr: SocketAddr,
  643. socket: &mut FramedSocket,
  644. ) -> ResultType<()> {
  645. let (request_pk, ip_change) = if let Some(old) = self.pm.get_in_memory(&id).await {
  646. let mut old = old.write().await;
  647. let ip = socket_addr.ip();
  648. let ip_change = if old.socket_addr.port() != 0 {
  649. ip != old.socket_addr.ip()
  650. } else {
  651. ip.to_string() != old.info.ip
  652. } && !ip.is_loopback();
  653. let request_pk = old.pk.is_empty() || ip_change;
  654. if !request_pk {
  655. old.socket_addr = socket_addr;
  656. old.last_reg_time = Instant::now();
  657. }
  658. let ip_change = if ip_change && old.reg_pk.0 <= 2 {
  659. Some(if old.socket_addr.port() == 0 {
  660. old.info.ip.clone()
  661. } else {
  662. old.socket_addr.to_string()
  663. })
  664. } else {
  665. None
  666. };
  667. (request_pk, ip_change)
  668. } else {
  669. (true, None)
  670. };
  671. if let Some(old) = ip_change {
  672. log::info!("IP change of {} from {} to {}", id, old, socket_addr);
  673. }
  674. let mut msg_out = RendezvousMessage::new();
  675. msg_out.set_register_peer_response(RegisterPeerResponse {
  676. request_pk,
  677. ..Default::default()
  678. });
  679. socket.send(&msg_out, socket_addr).await
  680. }
  681. #[inline]
  682. async fn handle_hole_sent<'a>(
  683. &mut self,
  684. phs: PunchHoleSent,
  685. addr: SocketAddr,
  686. socket: Option<&'a mut FramedSocket>,
  687. ) -> ResultType<()> {
  688. // punch hole sent from B, tell A that B is ready to be connected
  689. let addr_a = AddrMangle::decode(&phs.socket_addr);
  690. log::debug!(
  691. "{} punch hole response to {:?} from {:?}",
  692. if socket.is_none() { "TCP" } else { "UDP" },
  693. &addr_a,
  694. &addr
  695. );
  696. let mut msg_out = RendezvousMessage::new();
  697. let mut p = PunchHoleResponse {
  698. socket_addr: AddrMangle::encode(addr).into(),
  699. pk: self.get_pk(&phs.version, phs.id).await,
  700. relay_server: phs.relay_server.clone(),
  701. ..Default::default()
  702. };
  703. if let Ok(t) = phs.nat_type.enum_value() {
  704. p.set_nat_type(t);
  705. }
  706. msg_out.set_punch_hole_response(p);
  707. if let Some(socket) = socket {
  708. socket.send(&msg_out, addr_a).await?;
  709. } else {
  710. self.send_to_tcp(msg_out, addr_a).await;
  711. }
  712. Ok(())
  713. }
  714. #[inline]
  715. async fn handle_local_addr<'a>(
  716. &mut self,
  717. la: LocalAddr,
  718. addr: SocketAddr,
  719. socket: Option<&'a mut FramedSocket>,
  720. ) -> ResultType<()> {
  721. // relay local addrs of B to A
  722. let addr_a = AddrMangle::decode(&la.socket_addr);
  723. log::debug!(
  724. "{} local addrs response to {:?} from {:?}",
  725. if socket.is_none() { "TCP" } else { "UDP" },
  726. &addr_a,
  727. &addr
  728. );
  729. let mut msg_out = RendezvousMessage::new();
  730. let mut p = PunchHoleResponse {
  731. socket_addr: la.local_addr.clone(),
  732. pk: self.get_pk(&la.version, la.id).await,
  733. relay_server: la.relay_server,
  734. ..Default::default()
  735. };
  736. p.set_is_local(true);
  737. msg_out.set_punch_hole_response(p);
  738. if let Some(socket) = socket {
  739. socket.send(&msg_out, addr_a).await?;
  740. } else {
  741. self.send_to_tcp(msg_out, addr_a).await;
  742. }
  743. Ok(())
  744. }
  745. #[inline]
  746. async fn handle_punch_hole_request(
  747. &mut self,
  748. addr: SocketAddr,
  749. ph: PunchHoleRequest,
  750. key: &str,
  751. ws: bool,
  752. ) -> ResultType<(RendezvousMessage, Option<SocketAddr>)> {
  753. let mut ph = ph;
  754. if !key.is_empty() && ph.licence_key != key {
  755. let mut msg_out = RendezvousMessage::new();
  756. msg_out.set_punch_hole_response(PunchHoleResponse {
  757. failure: punch_hole_response::Failure::LICENSE_MISMATCH.into(),
  758. ..Default::default()
  759. });
  760. return Ok((msg_out, None));
  761. }
  762. // Todo check token by jwt
  763. if ph.token.is_empty() && MUST_LOGIN.load(Ordering::SeqCst) {
  764. let mut msg_out = RendezvousMessage::new();
  765. msg_out.set_punch_hole_response(PunchHoleResponse {
  766. other_failure: String::from("Connection failed, please login first"),
  767. ..Default::default()
  768. });
  769. return Ok((msg_out, None));
  770. }
  771. let id = ph.id;
  772. // punch hole request from A, relay to B,
  773. // check if in same intranet first,
  774. // fetch local addrs if in same intranet.
  775. // because punch hole won't work if in the same intranet,
  776. // all routers will drop such self-connections.
  777. if let Some(peer) = self.pm.get(&id).await {
  778. let (elapsed, peer_addr) = {
  779. let r = peer.read().await;
  780. (r.last_reg_time.elapsed().as_millis() as i32, r.socket_addr)
  781. };
  782. if elapsed >= REG_TIMEOUT {
  783. let mut msg_out = RendezvousMessage::new();
  784. msg_out.set_punch_hole_response(PunchHoleResponse {
  785. failure: punch_hole_response::Failure::OFFLINE.into(),
  786. ..Default::default()
  787. });
  788. return Ok((msg_out, None));
  789. }
  790. let mut msg_out = RendezvousMessage::new();
  791. let peer_is_lan = self.is_lan(peer_addr);
  792. let is_lan = self.is_lan(addr);
  793. let mut relay_server = self.get_relay_server(addr.ip(), peer_addr.ip());
  794. if ALWAYS_USE_RELAY.load(Ordering::SeqCst) || (peer_is_lan ^ is_lan) {
  795. if peer_is_lan {
  796. // https://github.com/rustdesk/rustdesk-server/issues/24
  797. relay_server = self.inner.local_ip.clone()
  798. }
  799. ph.nat_type = NatType::SYMMETRIC.into(); // will force relay
  800. }
  801. let same_intranet: bool = !ws
  802. && (peer_is_lan && is_lan || {
  803. match (peer_addr, addr) {
  804. (SocketAddr::V4(a), SocketAddr::V4(b)) => a.ip() == b.ip(),
  805. (SocketAddr::V6(a), SocketAddr::V6(b)) => a.ip() == b.ip(),
  806. _ => false,
  807. }
  808. });
  809. let socket_addr = AddrMangle::encode(addr).into();
  810. if same_intranet {
  811. log::debug!(
  812. "Fetch local addr {:?} {:?} request from {:?}",
  813. id,
  814. peer_addr,
  815. addr
  816. );
  817. msg_out.set_fetch_local_addr(FetchLocalAddr {
  818. socket_addr,
  819. relay_server,
  820. ..Default::default()
  821. });
  822. } else {
  823. log::debug!(
  824. "Punch hole {:?} {:?} request from {:?}",
  825. id,
  826. peer_addr,
  827. addr
  828. );
  829. msg_out.set_punch_hole(PunchHole {
  830. socket_addr,
  831. nat_type: ph.nat_type,
  832. relay_server,
  833. ..Default::default()
  834. });
  835. }
  836. Ok((msg_out, Some(peer_addr)))
  837. } else {
  838. let mut msg_out = RendezvousMessage::new();
  839. msg_out.set_punch_hole_response(PunchHoleResponse {
  840. failure: punch_hole_response::Failure::ID_NOT_EXIST.into(),
  841. ..Default::default()
  842. });
  843. Ok((msg_out, None))
  844. }
  845. }
  846. #[inline]
  847. async fn handle_online_request(
  848. &mut self,
  849. stream: &mut FramedStream,
  850. peers: Vec<String>,
  851. ) -> ResultType<()> {
  852. let mut states = BytesMut::zeroed((peers.len() + 7) / 8);
  853. for (i, peer_id) in peers.iter().enumerate() {
  854. if let Some(peer) = self.pm.get_in_memory(peer_id).await {
  855. let elapsed = peer.read().await.last_reg_time.elapsed().as_millis() as i32;
  856. // bytes index from left to right
  857. let states_idx = i / 8;
  858. let bit_idx = 7 - i % 8;
  859. if elapsed < REG_TIMEOUT {
  860. states[states_idx] |= 0x01 << bit_idx;
  861. }
  862. }
  863. }
  864. let mut msg_out = RendezvousMessage::new();
  865. msg_out.set_online_response(OnlineResponse {
  866. states: states.into(),
  867. ..Default::default()
  868. });
  869. stream.send(&msg_out).await?;
  870. Ok(())
  871. }
  872. #[inline]
  873. async fn send_to_tcp(&mut self, msg: RendezvousMessage, addr: SocketAddr) {
  874. let mut tcp = self.tcp_punch.lock().await.remove(&try_into_v4(addr));
  875. tokio::spawn(async move {
  876. Self::send_to_sink(&mut tcp, msg).await;
  877. });
  878. }
  879. #[inline]
  880. async fn send_to_sink(sink: &mut Option<Sink>, msg: RendezvousMessage) {
  881. if let Some(sink) = sink.as_mut() {
  882. if let Ok(mut bytes) = msg.write_to_bytes() {
  883. if let Some(enc) = &mut sink.key.lock().await.as_mut() {
  884. bytes = enc.enc(&bytes);
  885. }
  886. match &mut sink.tx {
  887. SinkType::TcpStream(s) => {
  888. allow_err!(s.send(Bytes::from(bytes)).await);
  889. }
  890. SinkType::Ws(ws) => {
  891. allow_err!(ws.send(tungstenite::Message::Binary(bytes)).await);
  892. }
  893. }
  894. }
  895. }
  896. }
  897. #[inline]
  898. async fn send_to_tcp_sync(
  899. &mut self,
  900. msg: RendezvousMessage,
  901. addr: SocketAddr,
  902. ) -> ResultType<()> {
  903. let mut sink = self.tcp_punch.lock().await.remove(&try_into_v4(addr));
  904. Self::send_to_sink(&mut sink, msg).await;
  905. Ok(())
  906. }
  907. #[inline]
  908. async fn handle_tcp_punch_hole_request(
  909. &mut self,
  910. addr: SocketAddr,
  911. ph: PunchHoleRequest,
  912. key: &str,
  913. ws: bool,
  914. ) -> ResultType<()> {
  915. let (msg, to_addr) = self.handle_punch_hole_request(addr, ph, key, ws).await?;
  916. if let Some(addr) = to_addr {
  917. self.tx.send(Data::Msg(msg.into(), addr))?;
  918. } else {
  919. self.send_to_tcp_sync(msg, addr).await?;
  920. }
  921. Ok(())
  922. }
  923. #[inline]
  924. async fn handle_udp_punch_hole_request(
  925. &mut self,
  926. addr: SocketAddr,
  927. ph: PunchHoleRequest,
  928. key: &str,
  929. ) -> ResultType<()> {
  930. let (msg, to_addr) = self.handle_punch_hole_request(addr, ph, key, false).await?;
  931. self.tx.send(Data::Msg(
  932. msg.into(),
  933. match to_addr {
  934. Some(addr) => addr,
  935. None => addr,
  936. },
  937. ))?;
  938. Ok(())
  939. }
  940. async fn check_ip_blocker(&self, ip: &str, id: &str) -> bool {
  941. let mut lock = IP_BLOCKER.lock().await;
  942. let now = Instant::now();
  943. if let Some(old) = lock.get_mut(ip) {
  944. let counter = &mut old.0;
  945. if counter.1.elapsed().as_secs() > IP_BLOCK_DUR {
  946. counter.0 = 0;
  947. } else if counter.0 > 30 {
  948. return false;
  949. }
  950. counter.0 += 1;
  951. counter.1 = now;
  952. let counter = &mut old.1;
  953. let is_new = counter.0.get(id).is_none();
  954. if counter.1.elapsed().as_secs() > DAY_SECONDS {
  955. counter.0.clear();
  956. } else if counter.0.len() > 300 {
  957. return !is_new;
  958. }
  959. if is_new {
  960. counter.0.insert(id.to_owned());
  961. }
  962. counter.1 = now;
  963. } else {
  964. lock.insert(ip.to_owned(), ((0, now), (Default::default(), now)));
  965. }
  966. true
  967. }
  968. fn parse_relay_servers(&mut self, relay_servers: &str) {
  969. let rs = get_servers(relay_servers, "relay-servers");
  970. self.relay_servers0 = Arc::new(rs);
  971. self.relay_servers = self.relay_servers0.clone();
  972. }
  973. fn get_relay_server(&self, _pa: IpAddr, _pb: IpAddr) -> String {
  974. if self.relay_servers.is_empty() {
  975. return "".to_owned();
  976. } else if self.relay_servers.len() == 1 {
  977. return self.relay_servers[0].clone();
  978. }
  979. let i = ROTATION_RELAY_SERVER.fetch_add(1, Ordering::SeqCst) % self.relay_servers.len();
  980. self.relay_servers[i].clone()
  981. }
  982. async fn check_cmd(&self, cmd: &str) -> String {
  983. use std::fmt::Write as _;
  984. let mut res = "".to_owned();
  985. let mut fds = cmd.trim().split(' ');
  986. match fds.next() {
  987. Some("h") => {
  988. res = format!(
  989. "{}\n{}\n{}\n{}\n{}\n{}\n{}\n",
  990. "relay-servers(rs) <separated by ,>",
  991. "reload-geo(rg)",
  992. "ip-blocker(ib) [<ip>|<number>] [-]",
  993. "ip-changes(ic) [<id>|<number>] [-]",
  994. "always-use-relay(aur) [Y|N]",
  995. "test-geo(tg) <ip1> <ip2>",
  996. "must-login(ml) [Y|N]",
  997. )
  998. }
  999. Some("relay-servers" | "rs") => {
  1000. if let Some(rs) = fds.next() {
  1001. self.tx.send(Data::RelayServers0(rs.to_owned())).ok();
  1002. } else {
  1003. for ip in self.relay_servers.iter() {
  1004. let _ = writeln!(res, "{ip}");
  1005. }
  1006. }
  1007. }
  1008. Some("ip-blocker" | "ib") => {
  1009. let mut lock = IP_BLOCKER.lock().await;
  1010. lock.retain(|&_, (a, b)| {
  1011. a.1.elapsed().as_secs() <= IP_BLOCK_DUR
  1012. || b.1.elapsed().as_secs() <= DAY_SECONDS
  1013. });
  1014. res = format!("{}\n", lock.len());
  1015. let ip = fds.next();
  1016. let mut start = ip.map(|x| x.parse::<i32>().unwrap_or(-1)).unwrap_or(-1);
  1017. if start < 0 {
  1018. if let Some(ip) = ip {
  1019. if let Some((a, b)) = lock.get(ip) {
  1020. let _ = writeln!(
  1021. res,
  1022. "{}/{}s {}/{}s",
  1023. a.0,
  1024. a.1.elapsed().as_secs(),
  1025. b.0.len(),
  1026. b.1.elapsed().as_secs()
  1027. );
  1028. }
  1029. if fds.next() == Some("-") {
  1030. lock.remove(ip);
  1031. }
  1032. } else {
  1033. start = 0;
  1034. }
  1035. }
  1036. if start >= 0 {
  1037. let mut it = lock.iter();
  1038. for i in 0..(start + 10) {
  1039. let x = it.next();
  1040. if x.is_none() {
  1041. break;
  1042. }
  1043. if i < start {
  1044. continue;
  1045. }
  1046. if let Some((ip, (a, b))) = x {
  1047. let _ = writeln!(
  1048. res,
  1049. "{}: {}/{}s {}/{}s",
  1050. ip,
  1051. a.0,
  1052. a.1.elapsed().as_secs(),
  1053. b.0.len(),
  1054. b.1.elapsed().as_secs()
  1055. );
  1056. }
  1057. }
  1058. }
  1059. }
  1060. Some("ip-changes" | "ic") => {
  1061. let mut lock = IP_CHANGES.lock().await;
  1062. lock.retain(|&_, v| v.0.elapsed().as_secs() < IP_CHANGE_DUR_X2 && v.1.len() > 1);
  1063. res = format!("{}\n", lock.len());
  1064. let id = fds.next();
  1065. let mut start = id.map(|x| x.parse::<i32>().unwrap_or(-1)).unwrap_or(-1);
  1066. if !(0..=10_000_000).contains(&start) {
  1067. if let Some(id) = id {
  1068. if let Some((tm, ips)) = lock.get(id) {
  1069. let _ = writeln!(res, "{}s {:?}", tm.elapsed().as_secs(), ips);
  1070. }
  1071. if fds.next() == Some("-") {
  1072. lock.remove(id);
  1073. }
  1074. } else {
  1075. start = 0;
  1076. }
  1077. }
  1078. if start >= 0 {
  1079. let mut it = lock.iter();
  1080. for i in 0..(start + 10) {
  1081. let x = it.next();
  1082. if x.is_none() {
  1083. break;
  1084. }
  1085. if i < start {
  1086. continue;
  1087. }
  1088. if let Some((id, (tm, ips))) = x {
  1089. let _ = writeln!(res, "{}: {}s {:?}", id, tm.elapsed().as_secs(), ips,);
  1090. }
  1091. }
  1092. }
  1093. }
  1094. Some("always-use-relay" | "aur") => {
  1095. if let Some(rs) = fds.next() {
  1096. if rs.to_uppercase() == "Y" {
  1097. ALWAYS_USE_RELAY.store(true, Ordering::SeqCst);
  1098. } else {
  1099. ALWAYS_USE_RELAY.store(false, Ordering::SeqCst);
  1100. }
  1101. self.tx.send(Data::RelayServers0(rs.to_owned())).ok();
  1102. } else {
  1103. let _ = writeln!(
  1104. res,
  1105. "ALWAYS_USE_RELAY: {:?}",
  1106. ALWAYS_USE_RELAY.load(Ordering::SeqCst)
  1107. );
  1108. }
  1109. }
  1110. Some("test-geo" | "tg") => {
  1111. if let Some(rs) = fds.next() {
  1112. if let Ok(a) = rs.parse::<IpAddr>() {
  1113. if let Some(rs) = fds.next() {
  1114. if let Ok(b) = rs.parse::<IpAddr>() {
  1115. res = format!("{:?}", self.get_relay_server(a, b));
  1116. }
  1117. } else {
  1118. res = format!("{:?}", self.get_relay_server(a, a));
  1119. }
  1120. }
  1121. }
  1122. }
  1123. Some("must-login" | "ml") => {
  1124. if let Some(rs) = fds.next() {
  1125. if rs.to_uppercase() == "Y" {
  1126. MUST_LOGIN.store(true, Ordering::SeqCst);
  1127. } else {
  1128. MUST_LOGIN.store(false, Ordering::SeqCst);
  1129. }
  1130. } else {
  1131. let _ = writeln!(
  1132. res,
  1133. "MUST_LOGIN: {:?}",
  1134. MUST_LOGIN.load(Ordering::SeqCst)
  1135. );
  1136. }
  1137. }
  1138. _ => {}
  1139. }
  1140. res
  1141. }
  1142. async fn handle_listener2(&self, stream: TcpStream, addr: SocketAddr) {
  1143. let mut rs = self.clone();
  1144. if addr.ip().is_loopback() {
  1145. tokio::spawn(async move {
  1146. let mut stream = stream;
  1147. let mut buffer = [0; 1024];
  1148. if let Ok(Ok(n)) = timeout(1000, stream.read(&mut buffer[..])).await {
  1149. if let Ok(data) = std::str::from_utf8(&buffer[..n]) {
  1150. let res = rs.check_cmd(data).await;
  1151. stream.write(res.as_bytes()).await.ok();
  1152. }
  1153. }
  1154. });
  1155. return;
  1156. }
  1157. let stream = FramedStream::from(stream, addr);
  1158. tokio::spawn(async move {
  1159. let mut stream = stream;
  1160. if let Some(Ok(bytes)) = stream.next_timeout(30_000).await {
  1161. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
  1162. match msg_in.union {
  1163. Some(rendezvous_message::Union::TestNatRequest(_)) => {
  1164. let mut msg_out = RendezvousMessage::new();
  1165. msg_out.set_test_nat_response(TestNatResponse {
  1166. port: addr.port() as _,
  1167. ..Default::default()
  1168. });
  1169. stream.send(&msg_out).await.ok();
  1170. }
  1171. Some(rendezvous_message::Union::OnlineRequest(or)) => {
  1172. allow_err!(rs.handle_online_request(&mut stream, or.peers).await);
  1173. }
  1174. _ => {}
  1175. }
  1176. }
  1177. }
  1178. });
  1179. }
  1180. async fn handle_listener(&self, stream: TcpStream, addr: SocketAddr, key: &str, ws: bool) {
  1181. log::debug!("Tcp connection from {:?}, ws: {}", addr, ws);
  1182. let mut rs = self.clone();
  1183. let key = key.to_owned();
  1184. tokio::spawn(async move {
  1185. allow_err!(rs.handle_listener_inner(stream, addr, &key, ws).await);
  1186. });
  1187. }
  1188. #[inline]
  1189. async fn handle_listener_inner(
  1190. &mut self,
  1191. stream: TcpStream,
  1192. mut addr: SocketAddr,
  1193. key: &str,
  1194. ws: bool,
  1195. ) -> ResultType<()> {
  1196. let mut sink;
  1197. if ws {
  1198. use tokio_tungstenite::tungstenite::handshake::server::{Request, Response};
  1199. let callback = |req: &Request, response: Response| {
  1200. let headers = req.headers();
  1201. let real_ip = headers
  1202. .get("X-Real-IP")
  1203. .or_else(|| headers.get("X-Forwarded-For"))
  1204. .and_then(|header_value| header_value.to_str().ok());
  1205. if let Some(ip) = real_ip {
  1206. if ip.contains('.') {
  1207. addr = format!("{ip}:0").parse().unwrap_or(addr);
  1208. } else {
  1209. addr = format!("[{ip}]:0").parse().unwrap_or(addr);
  1210. }
  1211. }
  1212. Ok(response)
  1213. };
  1214. let ws_stream = tokio_tungstenite::accept_hdr_async(stream, callback).await?;
  1215. let (a, mut b) = ws_stream.split();
  1216. sink = Some(Sink {
  1217. tx: SinkType::Ws(a),
  1218. key: Arc::new(Mutex::new(None)),
  1219. });
  1220. while let Ok(Some(Ok(msg))) = timeout(30_000, b.next()).await {
  1221. if let tungstenite::Message::Binary(bytes) = msg {
  1222. if !self.handle_tcp(&bytes, &mut sink, addr, key, ws).await {
  1223. break;
  1224. }
  1225. }
  1226. }
  1227. } else {
  1228. let (a, mut b) = Framed::new(stream, BytesCodec::new()).split();
  1229. let enc = Arc::new(Mutex::new(None));
  1230. sink = Some(Sink {
  1231. tx: SinkType::TcpStream(a),
  1232. key: enc.clone(),
  1233. });
  1234. // Avoid key exchange if answering on nat helper port
  1235. if !key.is_empty() {
  1236. self.key_exchange_phase1(addr, &mut sink).await;
  1237. }
  1238. while let Ok(Some(Ok(mut bytes))) = timeout(30_000, b.next()).await {
  1239. let mut enc_lock = enc.lock().await;
  1240. if enc_lock.is_some() {
  1241. if let Err(err) = enc_lock.as_mut().unwrap().dec(&mut bytes) {
  1242. return Err(err.into());
  1243. }
  1244. }
  1245. drop(enc_lock);
  1246. if !self.handle_tcp(&bytes, &mut sink, addr, key, ws).await {
  1247. break;
  1248. }
  1249. }
  1250. }
  1251. if sink.is_none() {
  1252. self.tcp_punch.lock().await.remove(&try_into_v4(addr));
  1253. }
  1254. log::debug!("Tcp connection from {:?} closed", addr);
  1255. Ok(())
  1256. }
  1257. #[inline]
  1258. async fn get_pk(&mut self, version: &str, id: String) -> Bytes {
  1259. if version.is_empty() || self.inner.sk.is_none() {
  1260. Bytes::new()
  1261. } else {
  1262. match self.pm.get(&id).await {
  1263. Some(peer) => {
  1264. let pk = peer.read().await.pk.clone();
  1265. sign::sign(
  1266. &hbb_common::message_proto::IdPk {
  1267. id,
  1268. pk,
  1269. ..Default::default()
  1270. }
  1271. .write_to_bytes()
  1272. .unwrap_or_default(),
  1273. self.inner.sk.as_ref().unwrap(),
  1274. )
  1275. .into()
  1276. }
  1277. _ => Bytes::new(),
  1278. }
  1279. }
  1280. }
  1281. #[inline]
  1282. fn get_server_sk(key: &str) -> (String, Option<sign::SecretKey>) {
  1283. let mut out_sk = None;
  1284. let mut key = key.to_owned();
  1285. if let Ok(sk) = base64::decode(&key) {
  1286. if sk.len() == sign::SECRETKEYBYTES {
  1287. log::info!("The key is a crypto private key");
  1288. key = base64::encode(&sk[(sign::SECRETKEYBYTES / 2)..]);
  1289. let mut tmp = [0u8; sign::SECRETKEYBYTES];
  1290. tmp[..].copy_from_slice(&sk);
  1291. out_sk = Some(sign::SecretKey(tmp));
  1292. }
  1293. }
  1294. if key.is_empty() || key == "-" || key == "_" {
  1295. let (pk, sk) = crate::common::gen_sk(0);
  1296. out_sk = sk;
  1297. if !key.is_empty() {
  1298. key = pk;
  1299. }
  1300. }
  1301. if !key.is_empty() {
  1302. log::info!("Key: {}", key);
  1303. }
  1304. (key, out_sk)
  1305. }
  1306. #[inline]
  1307. fn is_lan(&self, addr: SocketAddr) -> bool {
  1308. if let Some(network) = &self.inner.mask {
  1309. match addr {
  1310. SocketAddr::V4(v4_socket_addr) => {
  1311. return network.contains(*v4_socket_addr.ip());
  1312. }
  1313. SocketAddr::V6(v6_socket_addr) => {
  1314. if let Some(v4_addr) = v6_socket_addr.ip().to_ipv4() {
  1315. return network.contains(v4_addr);
  1316. }
  1317. }
  1318. }
  1319. }
  1320. false
  1321. }
  1322. async fn key_exchange_phase1(&mut self, addr: SocketAddr, sink: &mut Option<Sink>) {
  1323. let mut msg_out = RendezvousMessage::new();
  1324. log::debug!("KeyExchange phase 1: send our pk for this tcp connection in a message signed with our server key");
  1325. let sk = &self.inner.sk;
  1326. match sk {
  1327. Some(sk) => {
  1328. let our_pk_b = self.inner.secure_tcp_pk_b.clone();
  1329. let sm = sign::sign(&our_pk_b.0, &sk);
  1330. let bytes_sm = Bytes::from(sm);
  1331. msg_out.set_key_exchange(KeyExchange {
  1332. keys: vec![bytes_sm],
  1333. ..Default::default()
  1334. });
  1335. log::trace!(
  1336. "KeyExchange {:?} -> bytes: {:?}",
  1337. addr,
  1338. hex::encode(Bytes::from(msg_out.write_to_bytes().unwrap()))
  1339. );
  1340. Self::send_to_sink(sink, msg_out).await;
  1341. }
  1342. None => {}
  1343. }
  1344. }
  1345. }
  1346. async fn check_relay_servers(rs0: Arc<RelayServers>, tx: Sender) {
  1347. let mut futs = Vec::new();
  1348. let rs = Arc::new(Mutex::new(Vec::new()));
  1349. for x in rs0.iter() {
  1350. let mut host = x.to_owned();
  1351. if !host.contains(':') {
  1352. host = format!("{}:{}", host, config::RELAY_PORT);
  1353. }
  1354. let rs = rs.clone();
  1355. let x = x.clone();
  1356. futs.push(tokio::spawn(async move {
  1357. if FramedStream::new(&host, None, CHECK_RELAY_TIMEOUT)
  1358. .await
  1359. .is_ok()
  1360. {
  1361. rs.lock().await.push(x);
  1362. }
  1363. }));
  1364. }
  1365. join_all(futs).await;
  1366. log::debug!("check_relay_servers");
  1367. let rs = std::mem::take(&mut *rs.lock().await);
  1368. if !rs.is_empty() {
  1369. tx.send(Data::RelayServers(rs)).ok();
  1370. }
  1371. }
  1372. // temp solution to solve udp socket failure
  1373. async fn test_hbbs(addr: SocketAddr) -> ResultType<()> {
  1374. let mut addr = addr;
  1375. if addr.ip().is_unspecified() {
  1376. addr.set_ip(if addr.is_ipv4() {
  1377. IpAddr::V4(Ipv4Addr::LOCALHOST)
  1378. } else {
  1379. IpAddr::V6(Ipv6Addr::LOCALHOST)
  1380. });
  1381. }
  1382. let mut socket = FramedSocket::new(config::Config::get_any_listen_addr(addr.is_ipv4())).await?;
  1383. let mut msg_out = RendezvousMessage::new();
  1384. msg_out.set_register_peer(RegisterPeer {
  1385. id: "(:test_hbbs:)".to_owned(),
  1386. ..Default::default()
  1387. });
  1388. let mut last_time_recv = Instant::now();
  1389. let mut timer = interval(Duration::from_secs(1));
  1390. loop {
  1391. tokio::select! {
  1392. _ = timer.tick() => {
  1393. if last_time_recv.elapsed().as_secs() > 12 {
  1394. bail!("Timeout of test_hbbs");
  1395. }
  1396. socket.send(&msg_out, addr).await?;
  1397. }
  1398. Some(Ok((bytes, _))) = socket.next() => {
  1399. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
  1400. log::trace!("Recv {:?} of test_hbbs", msg_in);
  1401. last_time_recv = Instant::now();
  1402. }
  1403. }
  1404. }
  1405. }
  1406. }
  1407. #[inline]
  1408. async fn send_rk_res(
  1409. socket: &mut FramedSocket,
  1410. addr: SocketAddr,
  1411. res: register_pk_response::Result,
  1412. ) -> ResultType<()> {
  1413. let mut msg_out = RendezvousMessage::new();
  1414. msg_out.set_register_pk_response(RegisterPkResponse {
  1415. result: res.into(),
  1416. ..Default::default()
  1417. });
  1418. socket.send(&msg_out, addr).await
  1419. }
  1420. async fn create_udp_listener(port: i32, rmem: usize) -> ResultType<FramedSocket> {
  1421. let addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), port as _);
  1422. if let Ok(s) = FramedSocket::new_reuse(&addr, true, rmem).await {
  1423. log::debug!("listen on udp {:?}", s.local_addr());
  1424. return Ok(s);
  1425. }
  1426. let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port as _);
  1427. let s = FramedSocket::new_reuse(&addr, true, rmem).await?;
  1428. log::debug!("listen on udp {:?}", s.local_addr());
  1429. Ok(s)
  1430. }
  1431. #[inline]
  1432. async fn create_tcp_listener(port: i32) -> ResultType<TcpListener> {
  1433. let s = listen_any(port as _, true).await?;
  1434. log::debug!("listen on tcp {:?}", s.local_addr());
  1435. Ok(s)
  1436. }
  1437. fn get_symetric_key_from_msg(
  1438. our_sk_b: [u8; 32],
  1439. their_pk_b: [u8; 32],
  1440. sealed_value: &[u8; 48],
  1441. ) -> [u8; 32] {
  1442. let their_pk_b = box_::PublicKey(their_pk_b);
  1443. let nonce = box_::Nonce([0u8; box_::NONCEBYTES]);
  1444. let sk = box_::SecretKey(our_sk_b);
  1445. let key = box_::open(sealed_value, &nonce, &their_pk_b, &sk);
  1446. match key {
  1447. Ok(key) => {
  1448. let mut key_array = [0u8; 32];
  1449. key_array.copy_from_slice(&key);
  1450. key_array
  1451. }
  1452. Err(e) => panic!("Error while opening the seal key{:?}", e),
  1453. }
  1454. }