rendezvous_server.rs 49 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313
  1. use crate::common::*;
  2. use crate::peer::*;
  3. use hbb_common::{
  4. allow_err, bail,
  5. bytes::{Bytes, BytesMut},
  6. bytes_codec::BytesCodec,
  7. config,
  8. futures::future::join_all,
  9. futures_util::{
  10. sink::SinkExt,
  11. stream::{SplitSink, StreamExt},
  12. },
  13. log,
  14. protobuf::{Message as _, MessageField},
  15. rendezvous_proto::{
  16. register_pk_response::Result::{TOO_FREQUENT, UUID_MISMATCH},
  17. *,
  18. },
  19. tcp::{listen_any, FramedStream},
  20. timeout,
  21. tokio::{
  22. self,
  23. io::{AsyncReadExt, AsyncWriteExt},
  24. net::{TcpListener, TcpStream},
  25. sync::{mpsc, Mutex},
  26. time::{interval, Duration},
  27. },
  28. tokio_util::codec::Framed,
  29. try_into_v4,
  30. udp::FramedSocket,
  31. AddrMangle, ResultType,
  32. };
  33. use ipnetwork::Ipv4Network;
  34. use sodiumoxide::crypto::sign;
  35. use std::{
  36. collections::HashMap,
  37. net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
  38. sync::Arc,
  39. time::Instant,
  40. };
  41. #[derive(Clone, Debug)]
  42. enum Data {
  43. Msg(Box<RendezvousMessage>, SocketAddr),
  44. RelayServers0(String),
  45. RelayServers(RelayServers),
  46. }
  47. const REG_TIMEOUT: i32 = 30_000;
  48. type TcpStreamSink = SplitSink<Framed<TcpStream, BytesCodec>, Bytes>;
  49. type WsSink = SplitSink<tokio_tungstenite::WebSocketStream<TcpStream>, tungstenite::Message>;
  50. enum Sink {
  51. TcpStream(TcpStreamSink),
  52. Ws(WsSink),
  53. }
  54. type Sender = mpsc::UnboundedSender<Data>;
  55. type Receiver = mpsc::UnboundedReceiver<Data>;
  56. static mut ROTATION_RELAY_SERVER: usize = 0;
  57. type RelayServers = Vec<String>;
  58. static CHECK_RELAY_TIMEOUT: u64 = 3_000;
  59. static mut ALWAYS_USE_RELAY: bool = false;
  60. #[derive(Clone)]
  61. struct Inner {
  62. serial: i32,
  63. version: String,
  64. software_url: String,
  65. mask: Option<Ipv4Network>,
  66. local_ip: String,
  67. sk: Option<sign::SecretKey>,
  68. }
  69. #[derive(Clone)]
  70. pub struct RendezvousServer {
  71. tcp_punch: Arc<Mutex<HashMap<SocketAddr, Sink>>>,
  72. pm: PeerMap,
  73. tx: Sender,
  74. relay_servers: Arc<RelayServers>,
  75. relay_servers0: Arc<RelayServers>,
  76. rendezvous_servers: Arc<Vec<String>>,
  77. inner: Arc<Inner>,
  78. }
  79. enum LoopFailure {
  80. UdpSocket,
  81. Listener3,
  82. Listener2,
  83. Listener,
  84. }
  85. impl RendezvousServer {
  86. #[tokio::main(flavor = "multi_thread")]
  87. pub async fn start(port: i32, serial: i32, key: &str, rmem: usize) -> ResultType<()> {
  88. let (key, sk) = Self::get_server_sk(key);
  89. let nat_port = port - 1;
  90. let ws_port = port + 2;
  91. let pm = PeerMap::new().await?;
  92. log::info!("serial={}", serial);
  93. let rendezvous_servers = get_servers(&get_arg("rendezvous-servers"), "rendezvous-servers");
  94. log::info!("Listening on tcp/udp :{}", port);
  95. log::info!("Listening on tcp :{}, extra port for NAT test", nat_port);
  96. log::info!("Listening on websocket :{}", ws_port);
  97. let mut socket = create_udp_listener(port, rmem).await?;
  98. let (tx, mut rx) = mpsc::unbounded_channel::<Data>();
  99. let software_url = get_arg("software-url");
  100. let version = hbb_common::get_version_from_url(&software_url);
  101. if !version.is_empty() {
  102. log::info!("software_url: {}, version: {}", software_url, version);
  103. }
  104. let mask = get_arg("mask").parse().ok();
  105. let local_ip = if mask.is_none() {
  106. "".to_owned()
  107. } else {
  108. get_arg_or(
  109. "local-ip",
  110. local_ip_address::local_ip()
  111. .map(|x| x.to_string())
  112. .unwrap_or_default(),
  113. )
  114. };
  115. let mut rs = Self {
  116. tcp_punch: Arc::new(Mutex::new(HashMap::new())),
  117. pm,
  118. tx: tx.clone(),
  119. relay_servers: Default::default(),
  120. relay_servers0: Default::default(),
  121. rendezvous_servers: Arc::new(rendezvous_servers),
  122. inner: Arc::new(Inner {
  123. serial,
  124. version,
  125. software_url,
  126. sk,
  127. mask,
  128. local_ip,
  129. }),
  130. };
  131. log::info!("mask: {:?}", rs.inner.mask);
  132. log::info!("local-ip: {:?}", rs.inner.local_ip);
  133. std::env::set_var("PORT_FOR_API", port.to_string());
  134. rs.parse_relay_servers(&get_arg("relay-servers"));
  135. let mut listener = create_tcp_listener(port).await?;
  136. let mut listener2 = create_tcp_listener(nat_port).await?;
  137. let mut listener3 = create_tcp_listener(ws_port).await?;
  138. let test_addr = std::env::var("TEST_HBBS").unwrap_or_default();
  139. if std::env::var("ALWAYS_USE_RELAY")
  140. .unwrap_or_default()
  141. .to_uppercase()
  142. == "Y"
  143. {
  144. unsafe {
  145. ALWAYS_USE_RELAY = true;
  146. }
  147. }
  148. log::info!(
  149. "ALWAYS_USE_RELAY={}",
  150. if unsafe { ALWAYS_USE_RELAY } {
  151. "Y"
  152. } else {
  153. "N"
  154. }
  155. );
  156. if test_addr.to_lowercase() != "no" {
  157. let test_addr = if test_addr.is_empty() {
  158. listener.local_addr()?
  159. } else {
  160. test_addr.parse()?
  161. };
  162. tokio::spawn(async move {
  163. if let Err(err) = test_hbbs(test_addr).await {
  164. if test_addr.is_ipv6() && test_addr.ip().is_unspecified() {
  165. let mut test_addr = test_addr;
  166. test_addr.set_ip(IpAddr::V4(Ipv4Addr::UNSPECIFIED));
  167. if let Err(err) = test_hbbs(test_addr).await {
  168. log::error!("Failed to run hbbs test with {test_addr}: {err}");
  169. std::process::exit(1);
  170. }
  171. } else {
  172. log::error!("Failed to run hbbs test with {test_addr}: {err}");
  173. std::process::exit(1);
  174. }
  175. }
  176. });
  177. };
  178. let main_task = async move {
  179. loop {
  180. log::info!("Start");
  181. match rs
  182. .io_loop(
  183. &mut rx,
  184. &mut listener,
  185. &mut listener2,
  186. &mut listener3,
  187. &mut socket,
  188. &key,
  189. )
  190. .await
  191. {
  192. LoopFailure::UdpSocket => {
  193. drop(socket);
  194. socket = create_udp_listener(port, rmem).await?;
  195. }
  196. LoopFailure::Listener => {
  197. drop(listener);
  198. listener = create_tcp_listener(port).await?;
  199. }
  200. LoopFailure::Listener2 => {
  201. drop(listener2);
  202. listener2 = create_tcp_listener(nat_port).await?;
  203. }
  204. LoopFailure::Listener3 => {
  205. drop(listener3);
  206. listener3 = create_tcp_listener(ws_port).await?;
  207. }
  208. }
  209. }
  210. };
  211. let listen_signal = listen_signal();
  212. tokio::select!(
  213. res = main_task => res,
  214. res = listen_signal => res,
  215. )
  216. }
  217. async fn io_loop(
  218. &mut self,
  219. rx: &mut Receiver,
  220. listener: &mut TcpListener,
  221. listener2: &mut TcpListener,
  222. listener3: &mut TcpListener,
  223. socket: &mut FramedSocket,
  224. key: &str,
  225. ) -> LoopFailure {
  226. let mut timer_check_relay = interval(Duration::from_millis(CHECK_RELAY_TIMEOUT));
  227. loop {
  228. tokio::select! {
  229. _ = timer_check_relay.tick() => {
  230. if self.relay_servers0.len() > 1 {
  231. let rs = self.relay_servers0.clone();
  232. let tx = self.tx.clone();
  233. tokio::spawn(async move {
  234. check_relay_servers(rs, tx).await;
  235. });
  236. }
  237. }
  238. Some(data) = rx.recv() => {
  239. match data {
  240. Data::Msg(msg, addr) => { allow_err!(socket.send(msg.as_ref(), addr).await); }
  241. Data::RelayServers0(rs) => { self.parse_relay_servers(&rs); }
  242. Data::RelayServers(rs) => { self.relay_servers = Arc::new(rs); }
  243. }
  244. }
  245. res = socket.next() => {
  246. match res {
  247. Some(Ok((bytes, addr))) => {
  248. if let Err(err) = self.handle_udp(&bytes, addr.into(), socket, key).await {
  249. log::error!("udp failure: {}", err);
  250. return LoopFailure::UdpSocket;
  251. }
  252. }
  253. Some(Err(err)) => {
  254. log::error!("udp failure: {}", err);
  255. return LoopFailure::UdpSocket;
  256. }
  257. None => {
  258. // unreachable!() ?
  259. }
  260. }
  261. }
  262. res = listener2.accept() => {
  263. match res {
  264. Ok((stream, addr)) => {
  265. stream.set_nodelay(true).ok();
  266. self.handle_listener2(stream, addr).await;
  267. }
  268. Err(err) => {
  269. log::error!("listener2.accept failed: {}", err);
  270. return LoopFailure::Listener2;
  271. }
  272. }
  273. }
  274. res = listener3.accept() => {
  275. match res {
  276. Ok((stream, addr)) => {
  277. stream.set_nodelay(true).ok();
  278. self.handle_listener(stream, addr, key, true).await;
  279. }
  280. Err(err) => {
  281. log::error!("listener3.accept failed: {}", err);
  282. return LoopFailure::Listener3;
  283. }
  284. }
  285. }
  286. res = listener.accept() => {
  287. match res {
  288. Ok((stream, addr)) => {
  289. stream.set_nodelay(true).ok();
  290. self.handle_listener(stream, addr, key, false).await;
  291. }
  292. Err(err) => {
  293. log::error!("listener.accept failed: {}", err);
  294. return LoopFailure::Listener;
  295. }
  296. }
  297. }
  298. }
  299. }
  300. }
  301. #[inline]
  302. async fn handle_udp(
  303. &mut self,
  304. bytes: &BytesMut,
  305. addr: SocketAddr,
  306. socket: &mut FramedSocket,
  307. key: &str,
  308. ) -> ResultType<()> {
  309. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(bytes) {
  310. match msg_in.union {
  311. Some(rendezvous_message::Union::RegisterPeer(rp)) => {
  312. // B registered
  313. if !rp.id.is_empty() {
  314. log::trace!("New peer registered: {:?} {:?}", &rp.id, &addr);
  315. self.update_addr(rp.id, addr, socket).await?;
  316. if self.inner.serial > rp.serial {
  317. let mut msg_out = RendezvousMessage::new();
  318. msg_out.set_configure_update(ConfigUpdate {
  319. serial: self.inner.serial,
  320. rendezvous_servers: (*self.rendezvous_servers).clone(),
  321. ..Default::default()
  322. });
  323. socket.send(&msg_out, addr).await?;
  324. }
  325. }
  326. }
  327. Some(rendezvous_message::Union::RegisterPk(rk)) => {
  328. if rk.uuid.is_empty() || rk.pk.is_empty() {
  329. return Ok(());
  330. }
  331. let id = rk.id;
  332. let ip = addr.ip().to_string();
  333. if id.len() < 6 {
  334. return send_rk_res(socket, addr, UUID_MISMATCH).await;
  335. } else if !self.check_ip_blocker(&ip, &id).await {
  336. return send_rk_res(socket, addr, TOO_FREQUENT).await;
  337. }
  338. let peer = self.pm.get_or(&id).await;
  339. let (changed, ip_changed) = {
  340. let peer = peer.read().await;
  341. if peer.uuid.is_empty() {
  342. (true, false)
  343. } else {
  344. if peer.uuid == rk.uuid {
  345. if peer.info.ip != ip && peer.pk != rk.pk {
  346. log::warn!(
  347. "Peer {} ip/pk mismatch: {}/{:?} vs {}/{:?}",
  348. id,
  349. ip,
  350. rk.pk,
  351. peer.info.ip,
  352. peer.pk,
  353. );
  354. drop(peer);
  355. return send_rk_res(socket, addr, UUID_MISMATCH).await;
  356. }
  357. } else {
  358. log::warn!(
  359. "Peer {} uuid mismatch: {:?} vs {:?}",
  360. id,
  361. rk.uuid,
  362. peer.uuid
  363. );
  364. drop(peer);
  365. return send_rk_res(socket, addr, UUID_MISMATCH).await;
  366. }
  367. let ip_changed = peer.info.ip != ip;
  368. (
  369. peer.uuid != rk.uuid || peer.pk != rk.pk || ip_changed,
  370. ip_changed,
  371. )
  372. }
  373. };
  374. let mut req_pk = peer.read().await.reg_pk;
  375. if req_pk.1.elapsed().as_secs() > 6 {
  376. req_pk.0 = 0;
  377. } else if req_pk.0 > 2 {
  378. return send_rk_res(socket, addr, TOO_FREQUENT).await;
  379. }
  380. req_pk.0 += 1;
  381. req_pk.1 = Instant::now();
  382. peer.write().await.reg_pk = req_pk;
  383. if ip_changed {
  384. let mut lock = IP_CHANGES.lock().await;
  385. if let Some((tm, ips)) = lock.get_mut(&id) {
  386. if tm.elapsed().as_secs() > IP_CHANGE_DUR {
  387. *tm = Instant::now();
  388. ips.clear();
  389. ips.insert(ip.clone(), 1);
  390. } else if let Some(v) = ips.get_mut(&ip) {
  391. *v += 1;
  392. } else {
  393. ips.insert(ip.clone(), 1);
  394. }
  395. } else {
  396. lock.insert(
  397. id.clone(),
  398. (Instant::now(), HashMap::from([(ip.clone(), 1)])),
  399. );
  400. }
  401. }
  402. if changed {
  403. self.pm.update_pk(id, peer, addr, rk.uuid, rk.pk, ip).await;
  404. }
  405. let mut msg_out = RendezvousMessage::new();
  406. msg_out.set_register_pk_response(RegisterPkResponse {
  407. result: register_pk_response::Result::OK.into(),
  408. ..Default::default()
  409. });
  410. socket.send(&msg_out, addr).await?
  411. }
  412. Some(rendezvous_message::Union::PunchHoleRequest(ph)) => {
  413. if self.pm.is_in_memory(&ph.id).await {
  414. self.handle_udp_punch_hole_request(addr, ph, key).await?;
  415. } else {
  416. // not in memory, fetch from db with spawn in case blocking me
  417. let mut me = self.clone();
  418. let key = key.to_owned();
  419. tokio::spawn(async move {
  420. allow_err!(me.handle_udp_punch_hole_request(addr, ph, &key).await);
  421. });
  422. }
  423. }
  424. Some(rendezvous_message::Union::PunchHoleSent(phs)) => {
  425. self.handle_hole_sent(phs, addr, Some(socket)).await?;
  426. }
  427. Some(rendezvous_message::Union::LocalAddr(la)) => {
  428. self.handle_local_addr(la, addr, Some(socket)).await?;
  429. }
  430. Some(rendezvous_message::Union::ConfigureUpdate(mut cu)) => {
  431. if try_into_v4(addr).ip().is_loopback() && cu.serial > self.inner.serial {
  432. let mut inner: Inner = (*self.inner).clone();
  433. inner.serial = cu.serial;
  434. self.inner = Arc::new(inner);
  435. self.rendezvous_servers = Arc::new(
  436. cu.rendezvous_servers
  437. .drain(..)
  438. .filter(|x| {
  439. !x.is_empty()
  440. && test_if_valid_server(x, "rendezvous-server").is_ok()
  441. })
  442. .collect(),
  443. );
  444. log::info!(
  445. "configure updated: serial={} rendezvous-servers={:?}",
  446. self.inner.serial,
  447. self.rendezvous_servers
  448. );
  449. }
  450. }
  451. Some(rendezvous_message::Union::SoftwareUpdate(su)) => {
  452. if !self.inner.version.is_empty() && su.url != self.inner.version {
  453. let mut msg_out = RendezvousMessage::new();
  454. msg_out.set_software_update(SoftwareUpdate {
  455. url: self.inner.software_url.clone(),
  456. ..Default::default()
  457. });
  458. socket.send(&msg_out, addr).await?;
  459. }
  460. }
  461. _ => {}
  462. }
  463. }
  464. Ok(())
  465. }
  466. #[inline]
  467. async fn handle_tcp(
  468. &mut self,
  469. bytes: &[u8],
  470. sink: &mut Option<Sink>,
  471. addr: SocketAddr,
  472. key: &str,
  473. ws: bool,
  474. ) -> bool {
  475. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(bytes) {
  476. match msg_in.union {
  477. Some(rendezvous_message::Union::PunchHoleRequest(ph)) => {
  478. // there maybe several attempt, so sink can be none
  479. if let Some(sink) = sink.take() {
  480. self.tcp_punch.lock().await.insert(try_into_v4(addr), sink);
  481. }
  482. allow_err!(self.handle_tcp_punch_hole_request(addr, ph, key, ws).await);
  483. return true;
  484. }
  485. Some(rendezvous_message::Union::RequestRelay(mut rf)) => {
  486. // there maybe several attempt, so sink can be none
  487. if let Some(sink) = sink.take() {
  488. self.tcp_punch.lock().await.insert(try_into_v4(addr), sink);
  489. }
  490. if let Some(peer) = self.pm.get_in_memory(&rf.id).await {
  491. let mut msg_out = RendezvousMessage::new();
  492. rf.socket_addr = AddrMangle::encode(addr).into();
  493. msg_out.set_request_relay(rf);
  494. let peer_addr = peer.read().await.socket_addr;
  495. self.tx.send(Data::Msg(msg_out.into(), peer_addr)).ok();
  496. }
  497. return true;
  498. }
  499. Some(rendezvous_message::Union::RelayResponse(mut rr)) => {
  500. let addr_b = AddrMangle::decode(&rr.socket_addr);
  501. rr.socket_addr = Default::default();
  502. let id = rr.id();
  503. if !id.is_empty() {
  504. let pk = self.get_pk(&rr.version, id.to_owned()).await;
  505. rr.set_pk(pk);
  506. }
  507. let mut msg_out = RendezvousMessage::new();
  508. if !rr.relay_server.is_empty() {
  509. if self.is_lan(addr_b) {
  510. // https://github.com/rustdesk/rustdesk-server/issues/24
  511. rr.relay_server = self.inner.local_ip.clone();
  512. } else if rr.relay_server == self.inner.local_ip {
  513. rr.relay_server = self.get_relay_server(addr.ip(), addr_b.ip());
  514. }
  515. }
  516. msg_out.set_relay_response(rr);
  517. allow_err!(self.send_to_tcp_sync(msg_out, addr_b).await);
  518. }
  519. Some(rendezvous_message::Union::PunchHoleSent(phs)) => {
  520. allow_err!(self.handle_hole_sent(phs, addr, None).await);
  521. }
  522. Some(rendezvous_message::Union::LocalAddr(la)) => {
  523. allow_err!(self.handle_local_addr(la, addr, None).await);
  524. }
  525. Some(rendezvous_message::Union::TestNatRequest(tar)) => {
  526. let mut msg_out = RendezvousMessage::new();
  527. let mut res = TestNatResponse {
  528. port: addr.port() as _,
  529. ..Default::default()
  530. };
  531. if self.inner.serial > tar.serial {
  532. let mut cu = ConfigUpdate::new();
  533. cu.serial = self.inner.serial;
  534. cu.rendezvous_servers = (*self.rendezvous_servers).clone();
  535. res.cu = MessageField::from_option(Some(cu));
  536. }
  537. msg_out.set_test_nat_response(res);
  538. Self::send_to_sink(sink, msg_out).await;
  539. }
  540. Some(rendezvous_message::Union::RegisterPk(_)) => {
  541. let res = register_pk_response::Result::NOT_SUPPORT;
  542. let mut msg_out = RendezvousMessage::new();
  543. msg_out.set_register_pk_response(RegisterPkResponse {
  544. result: res.into(),
  545. ..Default::default()
  546. });
  547. Self::send_to_sink(sink, msg_out).await;
  548. }
  549. _ => {}
  550. }
  551. }
  552. false
  553. }
  554. #[inline]
  555. async fn update_addr(
  556. &mut self,
  557. id: String,
  558. socket_addr: SocketAddr,
  559. socket: &mut FramedSocket,
  560. ) -> ResultType<()> {
  561. let (request_pk, ip_change) = if let Some(old) = self.pm.get_in_memory(&id).await {
  562. let mut old = old.write().await;
  563. let ip = socket_addr.ip();
  564. let ip_change = if old.socket_addr.port() != 0 {
  565. ip != old.socket_addr.ip()
  566. } else {
  567. ip.to_string() != old.info.ip
  568. } && !ip.is_loopback();
  569. let request_pk = old.pk.is_empty() || ip_change;
  570. if !request_pk {
  571. old.socket_addr = socket_addr;
  572. old.last_reg_time = Instant::now();
  573. }
  574. let ip_change = if ip_change && old.reg_pk.0 <= 2 {
  575. Some(if old.socket_addr.port() == 0 {
  576. old.info.ip.clone()
  577. } else {
  578. old.socket_addr.to_string()
  579. })
  580. } else {
  581. None
  582. };
  583. (request_pk, ip_change)
  584. } else {
  585. (true, None)
  586. };
  587. if let Some(old) = ip_change {
  588. log::info!("IP change of {} from {} to {}", id, old, socket_addr);
  589. }
  590. let mut msg_out = RendezvousMessage::new();
  591. msg_out.set_register_peer_response(RegisterPeerResponse {
  592. request_pk,
  593. ..Default::default()
  594. });
  595. socket.send(&msg_out, socket_addr).await
  596. }
  597. #[inline]
  598. async fn handle_hole_sent<'a>(
  599. &mut self,
  600. phs: PunchHoleSent,
  601. addr: SocketAddr,
  602. socket: Option<&'a mut FramedSocket>,
  603. ) -> ResultType<()> {
  604. // punch hole sent from B, tell A that B is ready to be connected
  605. let addr_a = AddrMangle::decode(&phs.socket_addr);
  606. log::debug!(
  607. "{} punch hole response to {:?} from {:?}",
  608. if socket.is_none() { "TCP" } else { "UDP" },
  609. &addr_a,
  610. &addr
  611. );
  612. let mut msg_out = RendezvousMessage::new();
  613. let mut p = PunchHoleResponse {
  614. socket_addr: AddrMangle::encode(addr).into(),
  615. pk: self.get_pk(&phs.version, phs.id).await,
  616. relay_server: phs.relay_server.clone(),
  617. ..Default::default()
  618. };
  619. if let Ok(t) = phs.nat_type.enum_value() {
  620. p.set_nat_type(t);
  621. }
  622. msg_out.set_punch_hole_response(p);
  623. if let Some(socket) = socket {
  624. socket.send(&msg_out, addr_a).await?;
  625. } else {
  626. self.send_to_tcp(msg_out, addr_a).await;
  627. }
  628. Ok(())
  629. }
  630. #[inline]
  631. async fn handle_local_addr<'a>(
  632. &mut self,
  633. la: LocalAddr,
  634. addr: SocketAddr,
  635. socket: Option<&'a mut FramedSocket>,
  636. ) -> ResultType<()> {
  637. // relay local addrs of B to A
  638. let addr_a = AddrMangle::decode(&la.socket_addr);
  639. log::debug!(
  640. "{} local addrs response to {:?} from {:?}",
  641. if socket.is_none() { "TCP" } else { "UDP" },
  642. &addr_a,
  643. &addr
  644. );
  645. let mut msg_out = RendezvousMessage::new();
  646. let mut p = PunchHoleResponse {
  647. socket_addr: la.local_addr.clone(),
  648. pk: self.get_pk(&la.version, la.id).await,
  649. relay_server: la.relay_server,
  650. ..Default::default()
  651. };
  652. p.set_is_local(true);
  653. msg_out.set_punch_hole_response(p);
  654. if let Some(socket) = socket {
  655. socket.send(&msg_out, addr_a).await?;
  656. } else {
  657. self.send_to_tcp(msg_out, addr_a).await;
  658. }
  659. Ok(())
  660. }
  661. #[inline]
  662. async fn handle_punch_hole_request(
  663. &mut self,
  664. addr: SocketAddr,
  665. ph: PunchHoleRequest,
  666. key: &str,
  667. ws: bool,
  668. ) -> ResultType<(RendezvousMessage, Option<SocketAddr>)> {
  669. let mut ph = ph;
  670. if !key.is_empty() && ph.licence_key != key {
  671. let mut msg_out = RendezvousMessage::new();
  672. msg_out.set_punch_hole_response(PunchHoleResponse {
  673. failure: punch_hole_response::Failure::LICENSE_MISMATCH.into(),
  674. ..Default::default()
  675. });
  676. return Ok((msg_out, None));
  677. }
  678. let id = ph.id;
  679. // punch hole request from A, relay to B,
  680. // check if in same intranet first,
  681. // fetch local addrs if in same intranet.
  682. // because punch hole won't work if in the same intranet,
  683. // all routers will drop such self-connections.
  684. if let Some(peer) = self.pm.get(&id).await {
  685. let (elapsed, peer_addr) = {
  686. let r = peer.read().await;
  687. (r.last_reg_time.elapsed().as_millis() as i32, r.socket_addr)
  688. };
  689. if elapsed >= REG_TIMEOUT {
  690. let mut msg_out = RendezvousMessage::new();
  691. msg_out.set_punch_hole_response(PunchHoleResponse {
  692. failure: punch_hole_response::Failure::OFFLINE.into(),
  693. ..Default::default()
  694. });
  695. return Ok((msg_out, None));
  696. }
  697. let mut msg_out = RendezvousMessage::new();
  698. let peer_is_lan = self.is_lan(peer_addr);
  699. let is_lan = self.is_lan(addr);
  700. let mut relay_server = self.get_relay_server(addr.ip(), peer_addr.ip());
  701. if unsafe { ALWAYS_USE_RELAY } || (peer_is_lan ^ is_lan) {
  702. if peer_is_lan {
  703. // https://github.com/rustdesk/rustdesk-server/issues/24
  704. relay_server = self.inner.local_ip.clone()
  705. }
  706. ph.nat_type = NatType::SYMMETRIC.into(); // will force relay
  707. }
  708. let same_intranet: bool = !ws
  709. && (peer_is_lan && is_lan || {
  710. match (peer_addr, addr) {
  711. (SocketAddr::V4(a), SocketAddr::V4(b)) => a.ip() == b.ip(),
  712. (SocketAddr::V6(a), SocketAddr::V6(b)) => a.ip() == b.ip(),
  713. _ => false,
  714. }
  715. });
  716. let socket_addr = AddrMangle::encode(addr).into();
  717. if same_intranet {
  718. log::debug!(
  719. "Fetch local addr {:?} {:?} request from {:?}",
  720. id,
  721. peer_addr,
  722. addr
  723. );
  724. msg_out.set_fetch_local_addr(FetchLocalAddr {
  725. socket_addr,
  726. relay_server,
  727. ..Default::default()
  728. });
  729. } else {
  730. log::debug!(
  731. "Punch hole {:?} {:?} request from {:?}",
  732. id,
  733. peer_addr,
  734. addr
  735. );
  736. msg_out.set_punch_hole(PunchHole {
  737. socket_addr,
  738. nat_type: ph.nat_type,
  739. relay_server,
  740. ..Default::default()
  741. });
  742. }
  743. Ok((msg_out, Some(peer_addr)))
  744. } else {
  745. let mut msg_out = RendezvousMessage::new();
  746. msg_out.set_punch_hole_response(PunchHoleResponse {
  747. failure: punch_hole_response::Failure::ID_NOT_EXIST.into(),
  748. ..Default::default()
  749. });
  750. Ok((msg_out, None))
  751. }
  752. }
  753. #[inline]
  754. async fn handle_online_request(
  755. &mut self,
  756. stream: &mut FramedStream,
  757. peers: Vec<String>,
  758. ) -> ResultType<()> {
  759. let mut states = BytesMut::zeroed((peers.len() + 7) / 8);
  760. for (i, peer_id) in peers.iter().enumerate() {
  761. if let Some(peer) = self.pm.get_in_memory(peer_id).await {
  762. let elapsed = peer.read().await.last_reg_time.elapsed().as_millis() as i32;
  763. // bytes index from left to right
  764. let states_idx = i / 8;
  765. let bit_idx = 7 - i % 8;
  766. if elapsed < REG_TIMEOUT {
  767. states[states_idx] |= 0x01 << bit_idx;
  768. }
  769. }
  770. }
  771. let mut msg_out = RendezvousMessage::new();
  772. msg_out.set_online_response(OnlineResponse {
  773. states: states.into(),
  774. ..Default::default()
  775. });
  776. stream.send(&msg_out).await?;
  777. Ok(())
  778. }
  779. #[inline]
  780. async fn send_to_tcp(&mut self, msg: RendezvousMessage, addr: SocketAddr) {
  781. let mut tcp = self.tcp_punch.lock().await.remove(&try_into_v4(addr));
  782. tokio::spawn(async move {
  783. Self::send_to_sink(&mut tcp, msg).await;
  784. });
  785. }
  786. #[inline]
  787. async fn send_to_sink(sink: &mut Option<Sink>, msg: RendezvousMessage) {
  788. if let Some(sink) = sink.as_mut() {
  789. if let Ok(bytes) = msg.write_to_bytes() {
  790. match sink {
  791. Sink::TcpStream(s) => {
  792. allow_err!(s.send(Bytes::from(bytes)).await);
  793. }
  794. Sink::Ws(ws) => {
  795. allow_err!(ws.send(tungstenite::Message::Binary(bytes)).await);
  796. }
  797. }
  798. }
  799. }
  800. }
  801. #[inline]
  802. async fn send_to_tcp_sync(
  803. &mut self,
  804. msg: RendezvousMessage,
  805. addr: SocketAddr,
  806. ) -> ResultType<()> {
  807. let mut sink = self.tcp_punch.lock().await.remove(&try_into_v4(addr));
  808. Self::send_to_sink(&mut sink, msg).await;
  809. Ok(())
  810. }
  811. #[inline]
  812. async fn handle_tcp_punch_hole_request(
  813. &mut self,
  814. addr: SocketAddr,
  815. ph: PunchHoleRequest,
  816. key: &str,
  817. ws: bool,
  818. ) -> ResultType<()> {
  819. let (msg, to_addr) = self.handle_punch_hole_request(addr, ph, key, ws).await?;
  820. if let Some(addr) = to_addr {
  821. self.tx.send(Data::Msg(msg.into(), addr))?;
  822. } else {
  823. self.send_to_tcp_sync(msg, addr).await?;
  824. }
  825. Ok(())
  826. }
  827. #[inline]
  828. async fn handle_udp_punch_hole_request(
  829. &mut self,
  830. addr: SocketAddr,
  831. ph: PunchHoleRequest,
  832. key: &str,
  833. ) -> ResultType<()> {
  834. let (msg, to_addr) = self.handle_punch_hole_request(addr, ph, key, false).await?;
  835. self.tx.send(Data::Msg(
  836. msg.into(),
  837. match to_addr {
  838. Some(addr) => addr,
  839. None => addr,
  840. },
  841. ))?;
  842. Ok(())
  843. }
  844. async fn check_ip_blocker(&self, ip: &str, id: &str) -> bool {
  845. let mut lock = IP_BLOCKER.lock().await;
  846. let now = Instant::now();
  847. if let Some(old) = lock.get_mut(ip) {
  848. let counter = &mut old.0;
  849. if counter.1.elapsed().as_secs() > IP_BLOCK_DUR {
  850. counter.0 = 0;
  851. } else if counter.0 > 30 {
  852. return false;
  853. }
  854. counter.0 += 1;
  855. counter.1 = now;
  856. let counter = &mut old.1;
  857. let is_new = counter.0.get(id).is_none();
  858. if counter.1.elapsed().as_secs() > DAY_SECONDS {
  859. counter.0.clear();
  860. } else if counter.0.len() > 300 {
  861. return !is_new;
  862. }
  863. if is_new {
  864. counter.0.insert(id.to_owned());
  865. }
  866. counter.1 = now;
  867. } else {
  868. lock.insert(ip.to_owned(), ((0, now), (Default::default(), now)));
  869. }
  870. true
  871. }
  872. fn parse_relay_servers(&mut self, relay_servers: &str) {
  873. let rs = get_servers(relay_servers, "relay-servers");
  874. self.relay_servers0 = Arc::new(rs);
  875. self.relay_servers = self.relay_servers0.clone();
  876. }
  877. fn get_relay_server(&self, _pa: IpAddr, _pb: IpAddr) -> String {
  878. if self.relay_servers.is_empty() {
  879. return "".to_owned();
  880. } else if self.relay_servers.len() == 1 {
  881. return self.relay_servers[0].clone();
  882. }
  883. let i = unsafe {
  884. ROTATION_RELAY_SERVER += 1;
  885. ROTATION_RELAY_SERVER % self.relay_servers.len()
  886. };
  887. self.relay_servers[i].clone()
  888. }
  889. async fn check_cmd(&self, cmd: &str) -> String {
  890. use std::fmt::Write as _;
  891. let mut res = "".to_owned();
  892. let mut fds = cmd.trim().split(' ');
  893. match fds.next() {
  894. Some("h") => {
  895. res = format!(
  896. "{}\n{}\n{}\n{}\n{}\n{}\n",
  897. "relay-servers(rs) <separated by ,>",
  898. "reload-geo(rg)",
  899. "ip-blocker(ib) [<ip>|<number>] [-]",
  900. "ip-changes(ic) [<id>|<number>] [-]",
  901. "always-use-relay(aur)",
  902. "test-geo(tg) <ip1> <ip2>"
  903. )
  904. }
  905. Some("relay-servers" | "rs") => {
  906. if let Some(rs) = fds.next() {
  907. self.tx.send(Data::RelayServers0(rs.to_owned())).ok();
  908. } else {
  909. for ip in self.relay_servers.iter() {
  910. let _ = writeln!(res, "{ip}");
  911. }
  912. }
  913. }
  914. Some("ip-blocker" | "ib") => {
  915. let mut lock = IP_BLOCKER.lock().await;
  916. lock.retain(|&_, (a, b)| {
  917. a.1.elapsed().as_secs() <= IP_BLOCK_DUR
  918. || b.1.elapsed().as_secs() <= DAY_SECONDS
  919. });
  920. res = format!("{}\n", lock.len());
  921. let ip = fds.next();
  922. let mut start = ip.map(|x| x.parse::<i32>().unwrap_or(-1)).unwrap_or(-1);
  923. if start < 0 {
  924. if let Some(ip) = ip {
  925. if let Some((a, b)) = lock.get(ip) {
  926. let _ = writeln!(
  927. res,
  928. "{}/{}s {}/{}s",
  929. a.0,
  930. a.1.elapsed().as_secs(),
  931. b.0.len(),
  932. b.1.elapsed().as_secs()
  933. );
  934. }
  935. if fds.next() == Some("-") {
  936. lock.remove(ip);
  937. }
  938. } else {
  939. start = 0;
  940. }
  941. }
  942. if start >= 0 {
  943. let mut it = lock.iter();
  944. for i in 0..(start + 10) {
  945. let x = it.next();
  946. if x.is_none() {
  947. break;
  948. }
  949. if i < start {
  950. continue;
  951. }
  952. if let Some((ip, (a, b))) = x {
  953. let _ = writeln!(
  954. res,
  955. "{}: {}/{}s {}/{}s",
  956. ip,
  957. a.0,
  958. a.1.elapsed().as_secs(),
  959. b.0.len(),
  960. b.1.elapsed().as_secs()
  961. );
  962. }
  963. }
  964. }
  965. }
  966. Some("ip-changes" | "ic") => {
  967. let mut lock = IP_CHANGES.lock().await;
  968. lock.retain(|&_, v| v.0.elapsed().as_secs() < IP_CHANGE_DUR_X2 && v.1.len() > 1);
  969. res = format!("{}\n", lock.len());
  970. let id = fds.next();
  971. let mut start = id.map(|x| x.parse::<i32>().unwrap_or(-1)).unwrap_or(-1);
  972. if !(0..=10_000_000).contains(&start) {
  973. if let Some(id) = id {
  974. if let Some((tm, ips)) = lock.get(id) {
  975. let _ = writeln!(res, "{}s {:?}", tm.elapsed().as_secs(), ips);
  976. }
  977. if fds.next() == Some("-") {
  978. lock.remove(id);
  979. }
  980. } else {
  981. start = 0;
  982. }
  983. }
  984. if start >= 0 {
  985. let mut it = lock.iter();
  986. for i in 0..(start + 10) {
  987. let x = it.next();
  988. if x.is_none() {
  989. break;
  990. }
  991. if i < start {
  992. continue;
  993. }
  994. if let Some((id, (tm, ips))) = x {
  995. let _ = writeln!(res, "{}: {}s {:?}", id, tm.elapsed().as_secs(), ips,);
  996. }
  997. }
  998. }
  999. }
  1000. Some("always-use-relay" | "aur") => {
  1001. if let Some(rs) = fds.next() {
  1002. if rs.to_uppercase() == "Y" {
  1003. unsafe { ALWAYS_USE_RELAY = true };
  1004. } else {
  1005. unsafe { ALWAYS_USE_RELAY = false };
  1006. }
  1007. self.tx.send(Data::RelayServers0(rs.to_owned())).ok();
  1008. } else {
  1009. let _ = writeln!(res, "ALWAYS_USE_RELAY: {:?}", unsafe { ALWAYS_USE_RELAY });
  1010. }
  1011. }
  1012. Some("test-geo" | "tg") => {
  1013. if let Some(rs) = fds.next() {
  1014. if let Ok(a) = rs.parse::<IpAddr>() {
  1015. if let Some(rs) = fds.next() {
  1016. if let Ok(b) = rs.parse::<IpAddr>() {
  1017. res = format!("{:?}", self.get_relay_server(a, b));
  1018. }
  1019. } else {
  1020. res = format!("{:?}", self.get_relay_server(a, a));
  1021. }
  1022. }
  1023. }
  1024. }
  1025. _ => {}
  1026. }
  1027. res
  1028. }
  1029. async fn handle_listener2(&self, stream: TcpStream, addr: SocketAddr) {
  1030. let mut rs = self.clone();
  1031. if addr.ip().is_loopback() {
  1032. tokio::spawn(async move {
  1033. let mut stream = stream;
  1034. let mut buffer = [0; 1024];
  1035. if let Ok(Ok(n)) = timeout(1000, stream.read(&mut buffer[..])).await {
  1036. if let Ok(data) = std::str::from_utf8(&buffer[..n]) {
  1037. let res = rs.check_cmd(data).await;
  1038. stream.write(res.as_bytes()).await.ok();
  1039. }
  1040. }
  1041. });
  1042. return;
  1043. }
  1044. let stream = FramedStream::from(stream, addr);
  1045. tokio::spawn(async move {
  1046. let mut stream = stream;
  1047. if let Some(Ok(bytes)) = stream.next_timeout(30_000).await {
  1048. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
  1049. match msg_in.union {
  1050. Some(rendezvous_message::Union::TestNatRequest(_)) => {
  1051. let mut msg_out = RendezvousMessage::new();
  1052. msg_out.set_test_nat_response(TestNatResponse {
  1053. port: addr.port() as _,
  1054. ..Default::default()
  1055. });
  1056. stream.send(&msg_out).await.ok();
  1057. }
  1058. Some(rendezvous_message::Union::OnlineRequest(or)) => {
  1059. allow_err!(rs.handle_online_request(&mut stream, or.peers).await);
  1060. }
  1061. _ => {}
  1062. }
  1063. }
  1064. }
  1065. });
  1066. }
  1067. async fn handle_listener(&self, stream: TcpStream, addr: SocketAddr, key: &str, ws: bool) {
  1068. log::debug!("Tcp connection from {:?}, ws: {}", addr, ws);
  1069. let mut rs = self.clone();
  1070. let key = key.to_owned();
  1071. tokio::spawn(async move {
  1072. allow_err!(rs.handle_listener_inner(stream, addr, &key, ws).await);
  1073. });
  1074. }
  1075. #[inline]
  1076. async fn handle_listener_inner(
  1077. &mut self,
  1078. stream: TcpStream,
  1079. addr: SocketAddr,
  1080. key: &str,
  1081. ws: bool,
  1082. ) -> ResultType<()> {
  1083. let mut sink;
  1084. if ws {
  1085. let ws_stream = tokio_tungstenite::accept_async(stream).await?;
  1086. let (a, mut b) = ws_stream.split();
  1087. sink = Some(Sink::Ws(a));
  1088. while let Ok(Some(Ok(msg))) = timeout(30_000, b.next()).await {
  1089. if let tungstenite::Message::Binary(bytes) = msg {
  1090. if !self.handle_tcp(&bytes, &mut sink, addr, key, ws).await {
  1091. break;
  1092. }
  1093. }
  1094. }
  1095. } else {
  1096. let (a, mut b) = Framed::new(stream, BytesCodec::new()).split();
  1097. sink = Some(Sink::TcpStream(a));
  1098. while let Ok(Some(Ok(bytes))) = timeout(30_000, b.next()).await {
  1099. if !self.handle_tcp(&bytes, &mut sink, addr, key, ws).await {
  1100. break;
  1101. }
  1102. }
  1103. }
  1104. if sink.is_none() {
  1105. self.tcp_punch.lock().await.remove(&try_into_v4(addr));
  1106. }
  1107. log::debug!("Tcp connection from {:?} closed", addr);
  1108. Ok(())
  1109. }
  1110. #[inline]
  1111. async fn get_pk(&mut self, version: &str, id: String) -> Bytes {
  1112. if version.is_empty() || self.inner.sk.is_none() {
  1113. Bytes::new()
  1114. } else {
  1115. match self.pm.get(&id).await {
  1116. Some(peer) => {
  1117. let pk = peer.read().await.pk.clone();
  1118. sign::sign(
  1119. &hbb_common::message_proto::IdPk {
  1120. id,
  1121. pk,
  1122. ..Default::default()
  1123. }
  1124. .write_to_bytes()
  1125. .unwrap_or_default(),
  1126. self.inner.sk.as_ref().unwrap(),
  1127. )
  1128. .into()
  1129. }
  1130. _ => Bytes::new(),
  1131. }
  1132. }
  1133. }
  1134. #[inline]
  1135. fn get_server_sk(key: &str) -> (String, Option<sign::SecretKey>) {
  1136. let mut out_sk = None;
  1137. let mut key = key.to_owned();
  1138. if let Ok(sk) = base64::decode(&key) {
  1139. if sk.len() == sign::SECRETKEYBYTES {
  1140. log::info!("The key is a crypto private key");
  1141. key = base64::encode(&sk[(sign::SECRETKEYBYTES / 2)..]);
  1142. let mut tmp = [0u8; sign::SECRETKEYBYTES];
  1143. tmp[..].copy_from_slice(&sk);
  1144. out_sk = Some(sign::SecretKey(tmp));
  1145. }
  1146. }
  1147. if key.is_empty() || key == "-" || key == "_" {
  1148. let (pk, sk) = crate::common::gen_sk(0);
  1149. out_sk = sk;
  1150. if !key.is_empty() {
  1151. key = pk;
  1152. } else {
  1153. std::env::set_var("KEY_FOR_API", pk);
  1154. }
  1155. }
  1156. if !key.is_empty() {
  1157. log::info!("Key: {}", key);
  1158. std::env::set_var("KEY_FOR_API", key.clone());
  1159. }
  1160. (key, out_sk)
  1161. }
  1162. #[inline]
  1163. fn is_lan(&self, addr: SocketAddr) -> bool {
  1164. if let Some(network) = &self.inner.mask {
  1165. match addr {
  1166. SocketAddr::V4(v4_socket_addr) => {
  1167. return network.contains(*v4_socket_addr.ip());
  1168. }
  1169. SocketAddr::V6(v6_socket_addr) => {
  1170. if let Some(v4_addr) = v6_socket_addr.ip().to_ipv4() {
  1171. return network.contains(v4_addr);
  1172. }
  1173. }
  1174. }
  1175. }
  1176. false
  1177. }
  1178. }
  1179. async fn check_relay_servers(rs0: Arc<RelayServers>, tx: Sender) {
  1180. let mut futs = Vec::new();
  1181. let rs = Arc::new(Mutex::new(Vec::new()));
  1182. for x in rs0.iter() {
  1183. let mut host = x.to_owned();
  1184. if !host.contains(':') {
  1185. host = format!("{}:{}", host, config::RELAY_PORT);
  1186. }
  1187. let rs = rs.clone();
  1188. let x = x.clone();
  1189. futs.push(tokio::spawn(async move {
  1190. if FramedStream::new(&host, None, CHECK_RELAY_TIMEOUT)
  1191. .await
  1192. .is_ok()
  1193. {
  1194. rs.lock().await.push(x);
  1195. }
  1196. }));
  1197. }
  1198. join_all(futs).await;
  1199. log::debug!("check_relay_servers");
  1200. let rs = std::mem::take(&mut *rs.lock().await);
  1201. if !rs.is_empty() {
  1202. tx.send(Data::RelayServers(rs)).ok();
  1203. }
  1204. }
  1205. // temp solution to solve udp socket failure
  1206. async fn test_hbbs(addr: SocketAddr) -> ResultType<()> {
  1207. let mut addr = addr;
  1208. if addr.ip().is_unspecified() {
  1209. addr.set_ip(if addr.is_ipv4() {
  1210. IpAddr::V4(Ipv4Addr::LOCALHOST)
  1211. } else {
  1212. IpAddr::V6(Ipv6Addr::LOCALHOST)
  1213. });
  1214. }
  1215. let mut socket = FramedSocket::new(config::Config::get_any_listen_addr(addr.is_ipv4())).await?;
  1216. let mut msg_out = RendezvousMessage::new();
  1217. msg_out.set_register_peer(RegisterPeer {
  1218. id: "(:test_hbbs:)".to_owned(),
  1219. ..Default::default()
  1220. });
  1221. let mut last_time_recv = Instant::now();
  1222. let mut timer = interval(Duration::from_secs(1));
  1223. loop {
  1224. tokio::select! {
  1225. _ = timer.tick() => {
  1226. if last_time_recv.elapsed().as_secs() > 12 {
  1227. bail!("Timeout of test_hbbs");
  1228. }
  1229. socket.send(&msg_out, addr).await?;
  1230. }
  1231. Some(Ok((bytes, _))) = socket.next() => {
  1232. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
  1233. log::trace!("Recv {:?} of test_hbbs", msg_in);
  1234. last_time_recv = Instant::now();
  1235. }
  1236. }
  1237. }
  1238. }
  1239. }
  1240. #[inline]
  1241. async fn send_rk_res(
  1242. socket: &mut FramedSocket,
  1243. addr: SocketAddr,
  1244. res: register_pk_response::Result,
  1245. ) -> ResultType<()> {
  1246. let mut msg_out = RendezvousMessage::new();
  1247. msg_out.set_register_pk_response(RegisterPkResponse {
  1248. result: res.into(),
  1249. ..Default::default()
  1250. });
  1251. socket.send(&msg_out, addr).await
  1252. }
  1253. async fn create_udp_listener(port: i32, rmem: usize) -> ResultType<FramedSocket> {
  1254. let addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), port as _);
  1255. if let Ok(s) = FramedSocket::new_reuse(&addr, false, rmem).await {
  1256. log::debug!("listen on udp {:?}", s.local_addr());
  1257. return Ok(s);
  1258. }
  1259. let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port as _);
  1260. let s = FramedSocket::new_reuse(&addr, false, rmem).await?;
  1261. log::debug!("listen on udp {:?}", s.local_addr());
  1262. Ok(s)
  1263. }
  1264. #[inline]
  1265. async fn create_tcp_listener(port: i32) -> ResultType<TcpListener> {
  1266. let s = listen_any(port as _).await?;
  1267. log::debug!("listen on tcp {:?}", s.local_addr());
  1268. Ok(s)
  1269. }