rendezvous_server.rs 50 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327
  1. use crate::common::*;
  2. use crate::peer::*;
  3. use hbb_common::{
  4. allow_err, bail,
  5. bytes::{Bytes, BytesMut},
  6. bytes_codec::BytesCodec,
  7. config,
  8. futures::future::join_all,
  9. futures_util::{
  10. sink::SinkExt,
  11. stream::{SplitSink, StreamExt},
  12. },
  13. log,
  14. protobuf::{Message as _, MessageField},
  15. rendezvous_proto::{
  16. register_pk_response::Result::{TOO_FREQUENT, UUID_MISMATCH},
  17. *,
  18. },
  19. tcp::{listen_any, FramedStream},
  20. timeout,
  21. tokio::{
  22. self,
  23. io::{AsyncReadExt, AsyncWriteExt},
  24. net::{TcpListener, TcpStream},
  25. sync::{mpsc, Mutex},
  26. time::{interval, Duration},
  27. },
  28. tokio_util::codec::Framed,
  29. try_into_v4,
  30. udp::FramedSocket,
  31. AddrMangle, ResultType,
  32. };
  33. use ipnetwork::Ipv4Network;
  34. use sodiumoxide::crypto::sign;
  35. use std::{
  36. collections::HashMap,
  37. net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
  38. sync::atomic::{AtomicBool, AtomicUsize, Ordering},
  39. sync::Arc,
  40. time::Instant,
  41. };
  42. #[derive(Clone, Debug)]
  43. enum Data {
  44. Msg(Box<RendezvousMessage>, SocketAddr),
  45. RelayServers0(String),
  46. RelayServers(RelayServers),
  47. }
  48. const REG_TIMEOUT: i32 = 30_000;
  49. type TcpStreamSink = SplitSink<Framed<TcpStream, BytesCodec>, Bytes>;
  50. type WsSink = SplitSink<tokio_tungstenite::WebSocketStream<TcpStream>, tungstenite::Message>;
  51. enum Sink {
  52. TcpStream(TcpStreamSink),
  53. Ws(WsSink),
  54. }
  55. type Sender = mpsc::UnboundedSender<Data>;
  56. type Receiver = mpsc::UnboundedReceiver<Data>;
  57. static ROTATION_RELAY_SERVER: AtomicUsize = AtomicUsize::new(0);
  58. type RelayServers = Vec<String>;
  59. const CHECK_RELAY_TIMEOUT: u64 = 3_000;
  60. static ALWAYS_USE_RELAY: AtomicBool = AtomicBool::new(false);
  61. #[derive(Clone)]
  62. struct Inner {
  63. serial: i32,
  64. version: String,
  65. software_url: String,
  66. mask: Option<Ipv4Network>,
  67. local_ip: String,
  68. sk: Option<sign::SecretKey>,
  69. }
  70. #[derive(Clone)]
  71. pub struct RendezvousServer {
  72. tcp_punch: Arc<Mutex<HashMap<SocketAddr, Sink>>>,
  73. pm: PeerMap,
  74. tx: Sender,
  75. relay_servers: Arc<RelayServers>,
  76. relay_servers0: Arc<RelayServers>,
  77. rendezvous_servers: Arc<Vec<String>>,
  78. inner: Arc<Inner>,
  79. }
  80. enum LoopFailure {
  81. UdpSocket,
  82. Listener3,
  83. Listener2,
  84. Listener,
  85. }
  86. impl RendezvousServer {
  87. #[tokio::main(flavor = "multi_thread")]
  88. pub async fn start(port: i32, serial: i32, key: &str, rmem: usize) -> ResultType<()> {
  89. let (key, sk) = Self::get_server_sk(key);
  90. let nat_port = port - 1;
  91. let ws_port = port + 2;
  92. let pm = PeerMap::new().await?;
  93. log::info!("serial={}", serial);
  94. let rendezvous_servers = get_servers(&get_arg("rendezvous-servers"), "rendezvous-servers");
  95. log::info!("Listening on tcp/udp :{}", port);
  96. log::info!("Listening on tcp :{}, extra port for NAT test", nat_port);
  97. log::info!("Listening on websocket :{}", ws_port);
  98. let mut socket = create_udp_listener(port, rmem).await?;
  99. let (tx, mut rx) = mpsc::unbounded_channel::<Data>();
  100. let software_url = get_arg("software-url");
  101. let version = hbb_common::get_version_from_url(&software_url);
  102. if !version.is_empty() {
  103. log::info!("software_url: {}, version: {}", software_url, version);
  104. }
  105. let mask = get_arg("mask").parse().ok();
  106. let local_ip = if mask.is_none() {
  107. "".to_owned()
  108. } else {
  109. get_arg_or(
  110. "local-ip",
  111. local_ip_address::local_ip()
  112. .map(|x| x.to_string())
  113. .unwrap_or_default(),
  114. )
  115. };
  116. let mut rs = Self {
  117. tcp_punch: Arc::new(Mutex::new(HashMap::new())),
  118. pm,
  119. tx: tx.clone(),
  120. relay_servers: Default::default(),
  121. relay_servers0: Default::default(),
  122. rendezvous_servers: Arc::new(rendezvous_servers),
  123. inner: Arc::new(Inner {
  124. serial,
  125. version,
  126. software_url,
  127. sk,
  128. mask,
  129. local_ip,
  130. }),
  131. };
  132. log::info!("mask: {:?}", rs.inner.mask);
  133. log::info!("local-ip: {:?}", rs.inner.local_ip);
  134. std::env::set_var("PORT_FOR_API", port.to_string());
  135. rs.parse_relay_servers(&get_arg("relay-servers"));
  136. let mut listener = create_tcp_listener(port).await?;
  137. let mut listener2 = create_tcp_listener(nat_port).await?;
  138. let mut listener3 = create_tcp_listener(ws_port).await?;
  139. let test_addr = std::env::var("TEST_HBBS").unwrap_or_default();
  140. if std::env::var("ALWAYS_USE_RELAY")
  141. .unwrap_or_default()
  142. .to_uppercase()
  143. == "Y"
  144. {
  145. ALWAYS_USE_RELAY.store(true, Ordering::SeqCst);
  146. }
  147. log::info!(
  148. "ALWAYS_USE_RELAY={}",
  149. if ALWAYS_USE_RELAY.load(Ordering::SeqCst) {
  150. "Y"
  151. } else {
  152. "N"
  153. }
  154. );
  155. if test_addr.to_lowercase() != "no" {
  156. let test_addr = if test_addr.is_empty() {
  157. listener.local_addr()?
  158. } else {
  159. test_addr.parse()?
  160. };
  161. tokio::spawn(async move {
  162. if let Err(err) = test_hbbs(test_addr).await {
  163. if test_addr.is_ipv6() && test_addr.ip().is_unspecified() {
  164. let mut test_addr = test_addr;
  165. test_addr.set_ip(IpAddr::V4(Ipv4Addr::UNSPECIFIED));
  166. if let Err(err) = test_hbbs(test_addr).await {
  167. log::error!("Failed to run hbbs test with {test_addr}: {err}");
  168. std::process::exit(1);
  169. }
  170. } else {
  171. log::error!("Failed to run hbbs test with {test_addr}: {err}");
  172. std::process::exit(1);
  173. }
  174. }
  175. });
  176. };
  177. let main_task = async move {
  178. loop {
  179. log::info!("Start");
  180. match rs
  181. .io_loop(
  182. &mut rx,
  183. &mut listener,
  184. &mut listener2,
  185. &mut listener3,
  186. &mut socket,
  187. &key,
  188. )
  189. .await
  190. {
  191. LoopFailure::UdpSocket => {
  192. drop(socket);
  193. socket = create_udp_listener(port, rmem).await?;
  194. }
  195. LoopFailure::Listener => {
  196. drop(listener);
  197. listener = create_tcp_listener(port).await?;
  198. }
  199. LoopFailure::Listener2 => {
  200. drop(listener2);
  201. listener2 = create_tcp_listener(nat_port).await?;
  202. }
  203. LoopFailure::Listener3 => {
  204. drop(listener3);
  205. listener3 = create_tcp_listener(ws_port).await?;
  206. }
  207. }
  208. }
  209. };
  210. let listen_signal = listen_signal();
  211. tokio::select!(
  212. res = main_task => res,
  213. res = listen_signal => res,
  214. )
  215. }
  216. async fn io_loop(
  217. &mut self,
  218. rx: &mut Receiver,
  219. listener: &mut TcpListener,
  220. listener2: &mut TcpListener,
  221. listener3: &mut TcpListener,
  222. socket: &mut FramedSocket,
  223. key: &str,
  224. ) -> LoopFailure {
  225. let mut timer_check_relay = interval(Duration::from_millis(CHECK_RELAY_TIMEOUT));
  226. loop {
  227. tokio::select! {
  228. _ = timer_check_relay.tick() => {
  229. if self.relay_servers0.len() > 1 {
  230. let rs = self.relay_servers0.clone();
  231. let tx = self.tx.clone();
  232. tokio::spawn(async move {
  233. check_relay_servers(rs, tx).await;
  234. });
  235. }
  236. }
  237. Some(data) = rx.recv() => {
  238. match data {
  239. Data::Msg(msg, addr) => { allow_err!(socket.send(msg.as_ref(), addr).await); }
  240. Data::RelayServers0(rs) => { self.parse_relay_servers(&rs); }
  241. Data::RelayServers(rs) => { self.relay_servers = Arc::new(rs); }
  242. }
  243. }
  244. res = socket.next() => {
  245. match res {
  246. Some(Ok((bytes, addr))) => {
  247. if let Err(err) = self.handle_udp(&bytes, addr.into(), socket, key).await {
  248. log::error!("udp failure: {}", err);
  249. return LoopFailure::UdpSocket;
  250. }
  251. }
  252. Some(Err(err)) => {
  253. log::error!("udp failure: {}", err);
  254. return LoopFailure::UdpSocket;
  255. }
  256. None => {
  257. // unreachable!() ?
  258. }
  259. }
  260. }
  261. res = listener2.accept() => {
  262. match res {
  263. Ok((stream, addr)) => {
  264. stream.set_nodelay(true).ok();
  265. self.handle_listener2(stream, addr).await;
  266. }
  267. Err(err) => {
  268. log::error!("listener2.accept failed: {}", err);
  269. return LoopFailure::Listener2;
  270. }
  271. }
  272. }
  273. res = listener3.accept() => {
  274. match res {
  275. Ok((stream, addr)) => {
  276. stream.set_nodelay(true).ok();
  277. self.handle_listener(stream, addr, key, true).await;
  278. }
  279. Err(err) => {
  280. log::error!("listener3.accept failed: {}", err);
  281. return LoopFailure::Listener3;
  282. }
  283. }
  284. }
  285. res = listener.accept() => {
  286. match res {
  287. Ok((stream, addr)) => {
  288. stream.set_nodelay(true).ok();
  289. self.handle_listener(stream, addr, key, false).await;
  290. }
  291. Err(err) => {
  292. log::error!("listener.accept failed: {}", err);
  293. return LoopFailure::Listener;
  294. }
  295. }
  296. }
  297. }
  298. }
  299. }
  300. #[inline]
  301. async fn handle_udp(
  302. &mut self,
  303. bytes: &BytesMut,
  304. addr: SocketAddr,
  305. socket: &mut FramedSocket,
  306. key: &str,
  307. ) -> ResultType<()> {
  308. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(bytes) {
  309. match msg_in.union {
  310. Some(rendezvous_message::Union::RegisterPeer(rp)) => {
  311. // B registered
  312. if !rp.id.is_empty() {
  313. log::trace!("New peer registered: {:?} {:?}", &rp.id, &addr);
  314. self.update_addr(rp.id, addr, socket).await?;
  315. if self.inner.serial > rp.serial {
  316. let mut msg_out = RendezvousMessage::new();
  317. msg_out.set_configure_update(ConfigUpdate {
  318. serial: self.inner.serial,
  319. rendezvous_servers: (*self.rendezvous_servers).clone(),
  320. ..Default::default()
  321. });
  322. socket.send(&msg_out, addr).await?;
  323. }
  324. }
  325. }
  326. Some(rendezvous_message::Union::RegisterPk(rk)) => {
  327. if rk.uuid.is_empty() || rk.pk.is_empty() {
  328. return Ok(());
  329. }
  330. let id = rk.id;
  331. let ip = addr.ip().to_string();
  332. if id.len() < 6 {
  333. return send_rk_res(socket, addr, UUID_MISMATCH).await;
  334. } else if !self.check_ip_blocker(&ip, &id).await {
  335. return send_rk_res(socket, addr, TOO_FREQUENT).await;
  336. }
  337. let peer = self.pm.get_or(&id).await;
  338. let (changed, ip_changed) = {
  339. let peer = peer.read().await;
  340. if peer.uuid.is_empty() {
  341. (true, false)
  342. } else {
  343. if peer.uuid == rk.uuid {
  344. if peer.info.ip != ip && peer.pk != rk.pk {
  345. log::warn!(
  346. "Peer {} ip/pk mismatch: {}/{:?} vs {}/{:?}",
  347. id,
  348. ip,
  349. rk.pk,
  350. peer.info.ip,
  351. peer.pk,
  352. );
  353. drop(peer);
  354. return send_rk_res(socket, addr, UUID_MISMATCH).await;
  355. }
  356. } else {
  357. log::warn!(
  358. "Peer {} uuid mismatch: {:?} vs {:?}",
  359. id,
  360. rk.uuid,
  361. peer.uuid
  362. );
  363. drop(peer);
  364. return send_rk_res(socket, addr, UUID_MISMATCH).await;
  365. }
  366. let ip_changed = peer.info.ip != ip;
  367. (
  368. peer.uuid != rk.uuid || peer.pk != rk.pk || ip_changed,
  369. ip_changed,
  370. )
  371. }
  372. };
  373. let mut req_pk = peer.read().await.reg_pk;
  374. if req_pk.1.elapsed().as_secs() > 6 {
  375. req_pk.0 = 0;
  376. } else if req_pk.0 > 2 {
  377. return send_rk_res(socket, addr, TOO_FREQUENT).await;
  378. }
  379. req_pk.0 += 1;
  380. req_pk.1 = Instant::now();
  381. peer.write().await.reg_pk = req_pk;
  382. if ip_changed {
  383. let mut lock = IP_CHANGES.lock().await;
  384. if let Some((tm, ips)) = lock.get_mut(&id) {
  385. if tm.elapsed().as_secs() > IP_CHANGE_DUR {
  386. *tm = Instant::now();
  387. ips.clear();
  388. ips.insert(ip.clone(), 1);
  389. } else if let Some(v) = ips.get_mut(&ip) {
  390. *v += 1;
  391. } else {
  392. ips.insert(ip.clone(), 1);
  393. }
  394. } else {
  395. lock.insert(
  396. id.clone(),
  397. (Instant::now(), HashMap::from([(ip.clone(), 1)])),
  398. );
  399. }
  400. }
  401. if changed {
  402. self.pm.update_pk(id, peer, addr, rk.uuid, rk.pk, ip).await;
  403. }
  404. let mut msg_out = RendezvousMessage::new();
  405. msg_out.set_register_pk_response(RegisterPkResponse {
  406. result: register_pk_response::Result::OK.into(),
  407. ..Default::default()
  408. });
  409. socket.send(&msg_out, addr).await?
  410. }
  411. Some(rendezvous_message::Union::PunchHoleRequest(ph)) => {
  412. if self.pm.is_in_memory(&ph.id).await {
  413. self.handle_udp_punch_hole_request(addr, ph, key).await?;
  414. } else {
  415. // not in memory, fetch from db with spawn in case blocking me
  416. let mut me = self.clone();
  417. let key = key.to_owned();
  418. tokio::spawn(async move {
  419. allow_err!(me.handle_udp_punch_hole_request(addr, ph, &key).await);
  420. });
  421. }
  422. }
  423. Some(rendezvous_message::Union::PunchHoleSent(phs)) => {
  424. self.handle_hole_sent(phs, addr, Some(socket)).await?;
  425. }
  426. Some(rendezvous_message::Union::LocalAddr(la)) => {
  427. self.handle_local_addr(la, addr, Some(socket)).await?;
  428. }
  429. Some(rendezvous_message::Union::ConfigureUpdate(mut cu)) => {
  430. if try_into_v4(addr).ip().is_loopback() && cu.serial > self.inner.serial {
  431. let mut inner: Inner = (*self.inner).clone();
  432. inner.serial = cu.serial;
  433. self.inner = Arc::new(inner);
  434. self.rendezvous_servers = Arc::new(
  435. cu.rendezvous_servers
  436. .drain(..)
  437. .filter(|x| {
  438. !x.is_empty()
  439. && test_if_valid_server(x, "rendezvous-server").is_ok()
  440. })
  441. .collect(),
  442. );
  443. log::info!(
  444. "configure updated: serial={} rendezvous-servers={:?}",
  445. self.inner.serial,
  446. self.rendezvous_servers
  447. );
  448. }
  449. }
  450. Some(rendezvous_message::Union::SoftwareUpdate(su)) => {
  451. if !self.inner.version.is_empty() && su.url != self.inner.version {
  452. let mut msg_out = RendezvousMessage::new();
  453. msg_out.set_software_update(SoftwareUpdate {
  454. url: self.inner.software_url.clone(),
  455. ..Default::default()
  456. });
  457. socket.send(&msg_out, addr).await?;
  458. }
  459. }
  460. _ => {}
  461. }
  462. }
  463. Ok(())
  464. }
  465. #[inline]
  466. async fn handle_tcp(
  467. &mut self,
  468. bytes: &[u8],
  469. sink: &mut Option<Sink>,
  470. addr: SocketAddr,
  471. key: &str,
  472. ws: bool,
  473. ) -> bool {
  474. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(bytes) {
  475. match msg_in.union {
  476. Some(rendezvous_message::Union::PunchHoleRequest(ph)) => {
  477. // there maybe several attempt, so sink can be none
  478. if let Some(sink) = sink.take() {
  479. self.tcp_punch.lock().await.insert(try_into_v4(addr), sink);
  480. }
  481. allow_err!(self.handle_tcp_punch_hole_request(addr, ph, key, ws).await);
  482. return true;
  483. }
  484. Some(rendezvous_message::Union::RequestRelay(mut rf)) => {
  485. // there maybe several attempt, so sink can be none
  486. if let Some(sink) = sink.take() {
  487. self.tcp_punch.lock().await.insert(try_into_v4(addr), sink);
  488. }
  489. if let Some(peer) = self.pm.get_in_memory(&rf.id).await {
  490. let mut msg_out = RendezvousMessage::new();
  491. rf.socket_addr = AddrMangle::encode(addr).into();
  492. msg_out.set_request_relay(rf);
  493. let peer_addr = peer.read().await.socket_addr;
  494. self.tx.send(Data::Msg(msg_out.into(), peer_addr)).ok();
  495. }
  496. return true;
  497. }
  498. Some(rendezvous_message::Union::RelayResponse(mut rr)) => {
  499. let addr_b = AddrMangle::decode(&rr.socket_addr);
  500. rr.socket_addr = Default::default();
  501. let id = rr.id();
  502. if !id.is_empty() {
  503. let pk = self.get_pk(&rr.version, id.to_owned()).await;
  504. rr.set_pk(pk);
  505. }
  506. let mut msg_out = RendezvousMessage::new();
  507. if !rr.relay_server.is_empty() {
  508. if self.is_lan(addr_b) {
  509. // https://github.com/rustdesk/rustdesk-server/issues/24
  510. rr.relay_server = self.inner.local_ip.clone();
  511. } else if rr.relay_server == self.inner.local_ip {
  512. rr.relay_server = self.get_relay_server(addr.ip(), addr_b.ip());
  513. }
  514. }
  515. msg_out.set_relay_response(rr);
  516. allow_err!(self.send_to_tcp_sync(msg_out, addr_b).await);
  517. }
  518. Some(rendezvous_message::Union::PunchHoleSent(phs)) => {
  519. allow_err!(self.handle_hole_sent(phs, addr, None).await);
  520. }
  521. Some(rendezvous_message::Union::LocalAddr(la)) => {
  522. allow_err!(self.handle_local_addr(la, addr, None).await);
  523. }
  524. Some(rendezvous_message::Union::TestNatRequest(tar)) => {
  525. let mut msg_out = RendezvousMessage::new();
  526. let mut res = TestNatResponse {
  527. port: addr.port() as _,
  528. ..Default::default()
  529. };
  530. if self.inner.serial > tar.serial {
  531. let mut cu = ConfigUpdate::new();
  532. cu.serial = self.inner.serial;
  533. cu.rendezvous_servers = (*self.rendezvous_servers).clone();
  534. res.cu = MessageField::from_option(Some(cu));
  535. }
  536. msg_out.set_test_nat_response(res);
  537. Self::send_to_sink(sink, msg_out).await;
  538. }
  539. Some(rendezvous_message::Union::RegisterPk(_)) => {
  540. let res = register_pk_response::Result::NOT_SUPPORT;
  541. let mut msg_out = RendezvousMessage::new();
  542. msg_out.set_register_pk_response(RegisterPkResponse {
  543. result: res.into(),
  544. ..Default::default()
  545. });
  546. Self::send_to_sink(sink, msg_out).await;
  547. }
  548. _ => {}
  549. }
  550. }
  551. false
  552. }
  553. #[inline]
  554. async fn update_addr(
  555. &mut self,
  556. id: String,
  557. socket_addr: SocketAddr,
  558. socket: &mut FramedSocket,
  559. ) -> ResultType<()> {
  560. let (request_pk, ip_change) = if let Some(old) = self.pm.get_in_memory(&id).await {
  561. let mut old = old.write().await;
  562. let ip = socket_addr.ip();
  563. let ip_change = if old.socket_addr.port() != 0 {
  564. ip != old.socket_addr.ip()
  565. } else {
  566. ip.to_string() != old.info.ip
  567. } && !ip.is_loopback();
  568. let request_pk = old.pk.is_empty() || ip_change;
  569. if !request_pk {
  570. old.socket_addr = socket_addr;
  571. old.last_reg_time = Instant::now();
  572. }
  573. let ip_change = if ip_change && old.reg_pk.0 <= 2 {
  574. Some(if old.socket_addr.port() == 0 {
  575. old.info.ip.clone()
  576. } else {
  577. old.socket_addr.to_string()
  578. })
  579. } else {
  580. None
  581. };
  582. (request_pk, ip_change)
  583. } else {
  584. (true, None)
  585. };
  586. if let Some(old) = ip_change {
  587. log::info!("IP change of {} from {} to {}", id, old, socket_addr);
  588. }
  589. let mut msg_out = RendezvousMessage::new();
  590. msg_out.set_register_peer_response(RegisterPeerResponse {
  591. request_pk,
  592. ..Default::default()
  593. });
  594. socket.send(&msg_out, socket_addr).await
  595. }
  596. #[inline]
  597. async fn handle_hole_sent<'a>(
  598. &mut self,
  599. phs: PunchHoleSent,
  600. addr: SocketAddr,
  601. socket: Option<&'a mut FramedSocket>,
  602. ) -> ResultType<()> {
  603. // punch hole sent from B, tell A that B is ready to be connected
  604. let addr_a = AddrMangle::decode(&phs.socket_addr);
  605. log::debug!(
  606. "{} punch hole response to {:?} from {:?}",
  607. if socket.is_none() { "TCP" } else { "UDP" },
  608. &addr_a,
  609. &addr
  610. );
  611. let mut msg_out = RendezvousMessage::new();
  612. let mut p = PunchHoleResponse {
  613. socket_addr: AddrMangle::encode(addr).into(),
  614. pk: self.get_pk(&phs.version, phs.id).await,
  615. relay_server: phs.relay_server.clone(),
  616. ..Default::default()
  617. };
  618. if let Ok(t) = phs.nat_type.enum_value() {
  619. p.set_nat_type(t);
  620. }
  621. msg_out.set_punch_hole_response(p);
  622. if let Some(socket) = socket {
  623. socket.send(&msg_out, addr_a).await?;
  624. } else {
  625. self.send_to_tcp(msg_out, addr_a).await;
  626. }
  627. Ok(())
  628. }
  629. #[inline]
  630. async fn handle_local_addr<'a>(
  631. &mut self,
  632. la: LocalAddr,
  633. addr: SocketAddr,
  634. socket: Option<&'a mut FramedSocket>,
  635. ) -> ResultType<()> {
  636. // relay local addrs of B to A
  637. let addr_a = AddrMangle::decode(&la.socket_addr);
  638. log::debug!(
  639. "{} local addrs response to {:?} from {:?}",
  640. if socket.is_none() { "TCP" } else { "UDP" },
  641. &addr_a,
  642. &addr
  643. );
  644. let mut msg_out = RendezvousMessage::new();
  645. let mut p = PunchHoleResponse {
  646. socket_addr: la.local_addr.clone(),
  647. pk: self.get_pk(&la.version, la.id).await,
  648. relay_server: la.relay_server,
  649. ..Default::default()
  650. };
  651. p.set_is_local(true);
  652. msg_out.set_punch_hole_response(p);
  653. if let Some(socket) = socket {
  654. socket.send(&msg_out, addr_a).await?;
  655. } else {
  656. self.send_to_tcp(msg_out, addr_a).await;
  657. }
  658. Ok(())
  659. }
  660. #[inline]
  661. async fn handle_punch_hole_request(
  662. &mut self,
  663. addr: SocketAddr,
  664. ph: PunchHoleRequest,
  665. key: &str,
  666. ws: bool,
  667. ) -> ResultType<(RendezvousMessage, Option<SocketAddr>)> {
  668. let mut ph = ph;
  669. if !key.is_empty() && ph.licence_key != key {
  670. let mut msg_out = RendezvousMessage::new();
  671. msg_out.set_punch_hole_response(PunchHoleResponse {
  672. failure: punch_hole_response::Failure::LICENSE_MISMATCH.into(),
  673. ..Default::default()
  674. });
  675. return Ok((msg_out, None));
  676. }
  677. let id = ph.id;
  678. // punch hole request from A, relay to B,
  679. // check if in same intranet first,
  680. // fetch local addrs if in same intranet.
  681. // because punch hole won't work if in the same intranet,
  682. // all routers will drop such self-connections.
  683. if let Some(peer) = self.pm.get(&id).await {
  684. let (elapsed, peer_addr) = {
  685. let r = peer.read().await;
  686. (r.last_reg_time.elapsed().as_millis() as i32, r.socket_addr)
  687. };
  688. if elapsed >= REG_TIMEOUT {
  689. let mut msg_out = RendezvousMessage::new();
  690. msg_out.set_punch_hole_response(PunchHoleResponse {
  691. failure: punch_hole_response::Failure::OFFLINE.into(),
  692. ..Default::default()
  693. });
  694. return Ok((msg_out, None));
  695. }
  696. let mut msg_out = RendezvousMessage::new();
  697. let peer_is_lan = self.is_lan(peer_addr);
  698. let is_lan = self.is_lan(addr);
  699. let mut relay_server = self.get_relay_server(addr.ip(), peer_addr.ip());
  700. if ALWAYS_USE_RELAY.load(Ordering::SeqCst) || (peer_is_lan ^ is_lan) {
  701. if peer_is_lan {
  702. // https://github.com/rustdesk/rustdesk-server/issues/24
  703. relay_server = self.inner.local_ip.clone()
  704. }
  705. ph.nat_type = NatType::SYMMETRIC.into(); // will force relay
  706. }
  707. let same_intranet: bool = !ws
  708. && (peer_is_lan && is_lan || {
  709. match (peer_addr, addr) {
  710. (SocketAddr::V4(a), SocketAddr::V4(b)) => a.ip() == b.ip(),
  711. (SocketAddr::V6(a), SocketAddr::V6(b)) => a.ip() == b.ip(),
  712. _ => false,
  713. }
  714. });
  715. let socket_addr = AddrMangle::encode(addr).into();
  716. if same_intranet {
  717. log::debug!(
  718. "Fetch local addr {:?} {:?} request from {:?}",
  719. id,
  720. peer_addr,
  721. addr
  722. );
  723. msg_out.set_fetch_local_addr(FetchLocalAddr {
  724. socket_addr,
  725. relay_server,
  726. ..Default::default()
  727. });
  728. } else {
  729. log::debug!(
  730. "Punch hole {:?} {:?} request from {:?}",
  731. id,
  732. peer_addr,
  733. addr
  734. );
  735. msg_out.set_punch_hole(PunchHole {
  736. socket_addr,
  737. nat_type: ph.nat_type,
  738. relay_server,
  739. ..Default::default()
  740. });
  741. }
  742. Ok((msg_out, Some(peer_addr)))
  743. } else {
  744. let mut msg_out = RendezvousMessage::new();
  745. msg_out.set_punch_hole_response(PunchHoleResponse {
  746. failure: punch_hole_response::Failure::ID_NOT_EXIST.into(),
  747. ..Default::default()
  748. });
  749. Ok((msg_out, None))
  750. }
  751. }
  752. #[inline]
  753. async fn handle_online_request(
  754. &mut self,
  755. stream: &mut FramedStream,
  756. peers: Vec<String>,
  757. ) -> ResultType<()> {
  758. let mut states = BytesMut::zeroed((peers.len() + 7) / 8);
  759. for (i, peer_id) in peers.iter().enumerate() {
  760. if let Some(peer) = self.pm.get_in_memory(peer_id).await {
  761. let elapsed = peer.read().await.last_reg_time.elapsed().as_millis() as i32;
  762. // bytes index from left to right
  763. let states_idx = i / 8;
  764. let bit_idx = 7 - i % 8;
  765. if elapsed < REG_TIMEOUT {
  766. states[states_idx] |= 0x01 << bit_idx;
  767. }
  768. }
  769. }
  770. let mut msg_out = RendezvousMessage::new();
  771. msg_out.set_online_response(OnlineResponse {
  772. states: states.into(),
  773. ..Default::default()
  774. });
  775. stream.send(&msg_out).await?;
  776. Ok(())
  777. }
  778. #[inline]
  779. async fn send_to_tcp(&mut self, msg: RendezvousMessage, addr: SocketAddr) {
  780. let mut tcp = self.tcp_punch.lock().await.remove(&try_into_v4(addr));
  781. tokio::spawn(async move {
  782. Self::send_to_sink(&mut tcp, msg).await;
  783. });
  784. }
  785. #[inline]
  786. async fn send_to_sink(sink: &mut Option<Sink>, msg: RendezvousMessage) {
  787. if let Some(sink) = sink.as_mut() {
  788. if let Ok(bytes) = msg.write_to_bytes() {
  789. match sink {
  790. Sink::TcpStream(s) => {
  791. allow_err!(s.send(Bytes::from(bytes)).await);
  792. }
  793. Sink::Ws(ws) => {
  794. allow_err!(ws.send(tungstenite::Message::Binary(bytes)).await);
  795. }
  796. }
  797. }
  798. }
  799. }
  800. #[inline]
  801. async fn send_to_tcp_sync(
  802. &mut self,
  803. msg: RendezvousMessage,
  804. addr: SocketAddr,
  805. ) -> ResultType<()> {
  806. let mut sink = self.tcp_punch.lock().await.remove(&try_into_v4(addr));
  807. Self::send_to_sink(&mut sink, msg).await;
  808. Ok(())
  809. }
  810. #[inline]
  811. async fn handle_tcp_punch_hole_request(
  812. &mut self,
  813. addr: SocketAddr,
  814. ph: PunchHoleRequest,
  815. key: &str,
  816. ws: bool,
  817. ) -> ResultType<()> {
  818. let (msg, to_addr) = self.handle_punch_hole_request(addr, ph, key, ws).await?;
  819. if let Some(addr) = to_addr {
  820. self.tx.send(Data::Msg(msg.into(), addr))?;
  821. } else {
  822. self.send_to_tcp_sync(msg, addr).await?;
  823. }
  824. Ok(())
  825. }
  826. #[inline]
  827. async fn handle_udp_punch_hole_request(
  828. &mut self,
  829. addr: SocketAddr,
  830. ph: PunchHoleRequest,
  831. key: &str,
  832. ) -> ResultType<()> {
  833. let (msg, to_addr) = self.handle_punch_hole_request(addr, ph, key, false).await?;
  834. self.tx.send(Data::Msg(
  835. msg.into(),
  836. match to_addr {
  837. Some(addr) => addr,
  838. None => addr,
  839. },
  840. ))?;
  841. Ok(())
  842. }
  843. async fn check_ip_blocker(&self, ip: &str, id: &str) -> bool {
  844. let mut lock = IP_BLOCKER.lock().await;
  845. let now = Instant::now();
  846. if let Some(old) = lock.get_mut(ip) {
  847. let counter = &mut old.0;
  848. if counter.1.elapsed().as_secs() > IP_BLOCK_DUR {
  849. counter.0 = 0;
  850. } else if counter.0 > 30 {
  851. return false;
  852. }
  853. counter.0 += 1;
  854. counter.1 = now;
  855. let counter = &mut old.1;
  856. let is_new = counter.0.get(id).is_none();
  857. if counter.1.elapsed().as_secs() > DAY_SECONDS {
  858. counter.0.clear();
  859. } else if counter.0.len() > 300 {
  860. return !is_new;
  861. }
  862. if is_new {
  863. counter.0.insert(id.to_owned());
  864. }
  865. counter.1 = now;
  866. } else {
  867. lock.insert(ip.to_owned(), ((0, now), (Default::default(), now)));
  868. }
  869. true
  870. }
  871. fn parse_relay_servers(&mut self, relay_servers: &str) {
  872. let rs = get_servers(relay_servers, "relay-servers");
  873. self.relay_servers0 = Arc::new(rs);
  874. self.relay_servers = self.relay_servers0.clone();
  875. }
  876. fn get_relay_server(&self, _pa: IpAddr, _pb: IpAddr) -> String {
  877. if self.relay_servers.is_empty() {
  878. return "".to_owned();
  879. } else if self.relay_servers.len() == 1 {
  880. return self.relay_servers[0].clone();
  881. }
  882. let i = ROTATION_RELAY_SERVER.fetch_add(1, Ordering::SeqCst) % self.relay_servers.len();
  883. self.relay_servers[i].clone()
  884. }
  885. async fn check_cmd(&self, cmd: &str) -> String {
  886. use std::fmt::Write as _;
  887. let mut res = "".to_owned();
  888. let mut fds = cmd.trim().split(' ');
  889. match fds.next() {
  890. Some("h") => {
  891. res = format!(
  892. "{}\n{}\n{}\n{}\n{}\n{}\n",
  893. "relay-servers(rs) <separated by ,>",
  894. "reload-geo(rg)",
  895. "ip-blocker(ib) [<ip>|<number>] [-]",
  896. "ip-changes(ic) [<id>|<number>] [-]",
  897. "always-use-relay(aur)",
  898. "test-geo(tg) <ip1> <ip2>"
  899. )
  900. }
  901. Some("relay-servers" | "rs") => {
  902. if let Some(rs) = fds.next() {
  903. self.tx.send(Data::RelayServers0(rs.to_owned())).ok();
  904. } else {
  905. for ip in self.relay_servers.iter() {
  906. let _ = writeln!(res, "{ip}");
  907. }
  908. }
  909. }
  910. Some("ip-blocker" | "ib") => {
  911. let mut lock = IP_BLOCKER.lock().await;
  912. lock.retain(|&_, (a, b)| {
  913. a.1.elapsed().as_secs() <= IP_BLOCK_DUR
  914. || b.1.elapsed().as_secs() <= DAY_SECONDS
  915. });
  916. res = format!("{}\n", lock.len());
  917. let ip = fds.next();
  918. let mut start = ip.map(|x| x.parse::<i32>().unwrap_or(-1)).unwrap_or(-1);
  919. if start < 0 {
  920. if let Some(ip) = ip {
  921. if let Some((a, b)) = lock.get(ip) {
  922. let _ = writeln!(
  923. res,
  924. "{}/{}s {}/{}s",
  925. a.0,
  926. a.1.elapsed().as_secs(),
  927. b.0.len(),
  928. b.1.elapsed().as_secs()
  929. );
  930. }
  931. if fds.next() == Some("-") {
  932. lock.remove(ip);
  933. }
  934. } else {
  935. start = 0;
  936. }
  937. }
  938. if start >= 0 {
  939. let mut it = lock.iter();
  940. for i in 0..(start + 10) {
  941. let x = it.next();
  942. if x.is_none() {
  943. break;
  944. }
  945. if i < start {
  946. continue;
  947. }
  948. if let Some((ip, (a, b))) = x {
  949. let _ = writeln!(
  950. res,
  951. "{}: {}/{}s {}/{}s",
  952. ip,
  953. a.0,
  954. a.1.elapsed().as_secs(),
  955. b.0.len(),
  956. b.1.elapsed().as_secs()
  957. );
  958. }
  959. }
  960. }
  961. }
  962. Some("ip-changes" | "ic") => {
  963. let mut lock = IP_CHANGES.lock().await;
  964. lock.retain(|&_, v| v.0.elapsed().as_secs() < IP_CHANGE_DUR_X2 && v.1.len() > 1);
  965. res = format!("{}\n", lock.len());
  966. let id = fds.next();
  967. let mut start = id.map(|x| x.parse::<i32>().unwrap_or(-1)).unwrap_or(-1);
  968. if !(0..=10_000_000).contains(&start) {
  969. if let Some(id) = id {
  970. if let Some((tm, ips)) = lock.get(id) {
  971. let _ = writeln!(res, "{}s {:?}", tm.elapsed().as_secs(), ips);
  972. }
  973. if fds.next() == Some("-") {
  974. lock.remove(id);
  975. }
  976. } else {
  977. start = 0;
  978. }
  979. }
  980. if start >= 0 {
  981. let mut it = lock.iter();
  982. for i in 0..(start + 10) {
  983. let x = it.next();
  984. if x.is_none() {
  985. break;
  986. }
  987. if i < start {
  988. continue;
  989. }
  990. if let Some((id, (tm, ips))) = x {
  991. let _ = writeln!(res, "{}: {}s {:?}", id, tm.elapsed().as_secs(), ips,);
  992. }
  993. }
  994. }
  995. }
  996. Some("always-use-relay" | "aur") => {
  997. if let Some(rs) = fds.next() {
  998. if rs.to_uppercase() == "Y" {
  999. ALWAYS_USE_RELAY.store(true, Ordering::SeqCst);
  1000. } else {
  1001. ALWAYS_USE_RELAY.store(false, Ordering::SeqCst);
  1002. }
  1003. self.tx.send(Data::RelayServers0(rs.to_owned())).ok();
  1004. } else {
  1005. let _ = writeln!(
  1006. res,
  1007. "ALWAYS_USE_RELAY: {:?}",
  1008. ALWAYS_USE_RELAY.load(Ordering::SeqCst)
  1009. );
  1010. }
  1011. }
  1012. Some("test-geo" | "tg") => {
  1013. if let Some(rs) = fds.next() {
  1014. if let Ok(a) = rs.parse::<IpAddr>() {
  1015. if let Some(rs) = fds.next() {
  1016. if let Ok(b) = rs.parse::<IpAddr>() {
  1017. res = format!("{:?}", self.get_relay_server(a, b));
  1018. }
  1019. } else {
  1020. res = format!("{:?}", self.get_relay_server(a, a));
  1021. }
  1022. }
  1023. }
  1024. }
  1025. _ => {}
  1026. }
  1027. res
  1028. }
  1029. async fn handle_listener2(&self, stream: TcpStream, addr: SocketAddr) {
  1030. let mut rs = self.clone();
  1031. let ip = try_into_v4(addr).ip();
  1032. if ip.is_loopback() {
  1033. tokio::spawn(async move {
  1034. let mut stream = stream;
  1035. let mut buffer = [0; 1024];
  1036. if let Ok(Ok(n)) = timeout(1000, stream.read(&mut buffer[..])).await {
  1037. if let Ok(data) = std::str::from_utf8(&buffer[..n]) {
  1038. let res = rs.check_cmd(data).await;
  1039. stream.write(res.as_bytes()).await.ok();
  1040. }
  1041. }
  1042. });
  1043. return;
  1044. }
  1045. let stream = FramedStream::from(stream, addr);
  1046. tokio::spawn(async move {
  1047. let mut stream = stream;
  1048. if let Some(Ok(bytes)) = stream.next_timeout(30_000).await {
  1049. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
  1050. match msg_in.union {
  1051. Some(rendezvous_message::Union::TestNatRequest(_)) => {
  1052. let mut msg_out = RendezvousMessage::new();
  1053. msg_out.set_test_nat_response(TestNatResponse {
  1054. port: addr.port() as _,
  1055. ..Default::default()
  1056. });
  1057. stream.send(&msg_out).await.ok();
  1058. }
  1059. Some(rendezvous_message::Union::OnlineRequest(or)) => {
  1060. allow_err!(rs.handle_online_request(&mut stream, or.peers).await);
  1061. }
  1062. _ => {}
  1063. }
  1064. }
  1065. }
  1066. });
  1067. }
  1068. async fn handle_listener(&self, stream: TcpStream, addr: SocketAddr, key: &str, ws: bool) {
  1069. log::debug!("Tcp connection from {:?}, ws: {}", addr, ws);
  1070. let mut rs = self.clone();
  1071. let key = key.to_owned();
  1072. tokio::spawn(async move {
  1073. allow_err!(rs.handle_listener_inner(stream, addr, &key, ws).await);
  1074. });
  1075. }
  1076. #[inline]
  1077. async fn handle_listener_inner(
  1078. &mut self,
  1079. stream: TcpStream,
  1080. mut addr: SocketAddr,
  1081. key: &str,
  1082. ws: bool,
  1083. ) -> ResultType<()> {
  1084. let mut sink;
  1085. if ws {
  1086. use tokio_tungstenite::tungstenite::handshake::server::{Request, Response};
  1087. let callback = |req: &Request, response: Response| {
  1088. let headers = req.headers();
  1089. let real_ip = headers
  1090. .get("X-Real-IP")
  1091. .or_else(|| headers.get("X-Forwarded-For"))
  1092. .and_then(|header_value| header_value.to_str().ok());
  1093. if let Some(ip) = real_ip {
  1094. if ip.contains('.') {
  1095. addr = format!("{ip}:0").parse().unwrap_or(addr);
  1096. } else {
  1097. addr = format!("[{ip}]:0").parse().unwrap_or(addr);
  1098. }
  1099. }
  1100. Ok(response)
  1101. };
  1102. let ws_stream = tokio_tungstenite::accept_hdr_async(stream, callback).await?;
  1103. let (a, mut b) = ws_stream.split();
  1104. sink = Some(Sink::Ws(a));
  1105. while let Ok(Some(Ok(msg))) = timeout(30_000, b.next()).await {
  1106. if let tungstenite::Message::Binary(bytes) = msg {
  1107. if !self.handle_tcp(&bytes, &mut sink, addr, key, ws).await {
  1108. break;
  1109. }
  1110. }
  1111. }
  1112. } else {
  1113. let (a, mut b) = Framed::new(stream, BytesCodec::new()).split();
  1114. sink = Some(Sink::TcpStream(a));
  1115. while let Ok(Some(Ok(bytes))) = timeout(30_000, b.next()).await {
  1116. if !self.handle_tcp(&bytes, &mut sink, addr, key, ws).await {
  1117. break;
  1118. }
  1119. }
  1120. }
  1121. if sink.is_none() {
  1122. self.tcp_punch.lock().await.remove(&try_into_v4(addr));
  1123. }
  1124. log::debug!("Tcp connection from {:?} closed", addr);
  1125. Ok(())
  1126. }
  1127. #[inline]
  1128. async fn get_pk(&mut self, version: &str, id: String) -> Bytes {
  1129. if version.is_empty() || self.inner.sk.is_none() {
  1130. Bytes::new()
  1131. } else {
  1132. match self.pm.get(&id).await {
  1133. Some(peer) => {
  1134. let pk = peer.read().await.pk.clone();
  1135. sign::sign(
  1136. &hbb_common::message_proto::IdPk {
  1137. id,
  1138. pk,
  1139. ..Default::default()
  1140. }
  1141. .write_to_bytes()
  1142. .unwrap_or_default(),
  1143. self.inner.sk.as_ref().unwrap(),
  1144. )
  1145. .into()
  1146. }
  1147. _ => Bytes::new(),
  1148. }
  1149. }
  1150. }
  1151. #[inline]
  1152. fn get_server_sk(key: &str) -> (String, Option<sign::SecretKey>) {
  1153. let mut out_sk = None;
  1154. let mut key = key.to_owned();
  1155. if let Ok(sk) = base64::decode(&key) {
  1156. if sk.len() == sign::SECRETKEYBYTES {
  1157. log::info!("The key is a crypto private key");
  1158. key = base64::encode(&sk[(sign::SECRETKEYBYTES / 2)..]);
  1159. let mut tmp = [0u8; sign::SECRETKEYBYTES];
  1160. tmp[..].copy_from_slice(&sk);
  1161. out_sk = Some(sign::SecretKey(tmp));
  1162. }
  1163. }
  1164. if key.is_empty() || key == "-" || key == "_" {
  1165. let (pk, sk) = crate::common::gen_sk(0);
  1166. out_sk = sk;
  1167. if !key.is_empty() {
  1168. key = pk;
  1169. }
  1170. }
  1171. if !key.is_empty() {
  1172. log::info!("Key: {}", key);
  1173. }
  1174. (key, out_sk)
  1175. }
  1176. #[inline]
  1177. fn is_lan(&self, addr: SocketAddr) -> bool {
  1178. if let Some(network) = &self.inner.mask {
  1179. match addr {
  1180. SocketAddr::V4(v4_socket_addr) => {
  1181. return network.contains(*v4_socket_addr.ip());
  1182. }
  1183. SocketAddr::V6(v6_socket_addr) => {
  1184. if let Some(v4_addr) = v6_socket_addr.ip().to_ipv4() {
  1185. return network.contains(v4_addr);
  1186. }
  1187. }
  1188. }
  1189. }
  1190. false
  1191. }
  1192. }
  1193. async fn check_relay_servers(rs0: Arc<RelayServers>, tx: Sender) {
  1194. let mut futs = Vec::new();
  1195. let rs = Arc::new(Mutex::new(Vec::new()));
  1196. for x in rs0.iter() {
  1197. let mut host = x.to_owned();
  1198. if !host.contains(':') {
  1199. host = format!("{}:{}", host, config::RELAY_PORT);
  1200. }
  1201. let rs = rs.clone();
  1202. let x = x.clone();
  1203. futs.push(tokio::spawn(async move {
  1204. if FramedStream::new(&host, None, CHECK_RELAY_TIMEOUT)
  1205. .await
  1206. .is_ok()
  1207. {
  1208. rs.lock().await.push(x);
  1209. }
  1210. }));
  1211. }
  1212. join_all(futs).await;
  1213. log::debug!("check_relay_servers");
  1214. let rs = std::mem::take(&mut *rs.lock().await);
  1215. if !rs.is_empty() {
  1216. tx.send(Data::RelayServers(rs)).ok();
  1217. }
  1218. }
  1219. // temp solution to solve udp socket failure
  1220. async fn test_hbbs(addr: SocketAddr) -> ResultType<()> {
  1221. let mut addr = addr;
  1222. if addr.ip().is_unspecified() {
  1223. addr.set_ip(if addr.is_ipv4() {
  1224. IpAddr::V4(Ipv4Addr::LOCALHOST)
  1225. } else {
  1226. IpAddr::V6(Ipv6Addr::LOCALHOST)
  1227. });
  1228. }
  1229. let mut socket = FramedSocket::new(config::Config::get_any_listen_addr(addr.is_ipv4())).await?;
  1230. let mut msg_out = RendezvousMessage::new();
  1231. msg_out.set_register_peer(RegisterPeer {
  1232. id: "(:test_hbbs:)".to_owned(),
  1233. ..Default::default()
  1234. });
  1235. let mut last_time_recv = Instant::now();
  1236. let mut timer = interval(Duration::from_secs(1));
  1237. loop {
  1238. tokio::select! {
  1239. _ = timer.tick() => {
  1240. if last_time_recv.elapsed().as_secs() > 12 {
  1241. bail!("Timeout of test_hbbs");
  1242. }
  1243. socket.send(&msg_out, addr).await?;
  1244. }
  1245. Some(Ok((bytes, _))) = socket.next() => {
  1246. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
  1247. log::trace!("Recv {:?} of test_hbbs", msg_in);
  1248. last_time_recv = Instant::now();
  1249. }
  1250. }
  1251. }
  1252. }
  1253. }
  1254. #[inline]
  1255. async fn send_rk_res(
  1256. socket: &mut FramedSocket,
  1257. addr: SocketAddr,
  1258. res: register_pk_response::Result,
  1259. ) -> ResultType<()> {
  1260. let mut msg_out = RendezvousMessage::new();
  1261. msg_out.set_register_pk_response(RegisterPkResponse {
  1262. result: res.into(),
  1263. ..Default::default()
  1264. });
  1265. socket.send(&msg_out, addr).await
  1266. }
  1267. async fn create_udp_listener(port: i32, rmem: usize) -> ResultType<FramedSocket> {
  1268. let addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), port as _);
  1269. if let Ok(s) = FramedSocket::new_reuse(&addr, true, rmem).await {
  1270. log::debug!("listen on udp {:?}", s.local_addr());
  1271. return Ok(s);
  1272. }
  1273. let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port as _);
  1274. let s = FramedSocket::new_reuse(&addr, true, rmem).await?;
  1275. log::debug!("listen on udp {:?}", s.local_addr());
  1276. Ok(s)
  1277. }
  1278. #[inline]
  1279. async fn create_tcp_listener(port: i32) -> ResultType<TcpListener> {
  1280. let s = listen_any(port as _).await?;
  1281. log::debug!("listen on tcp {:?}", s.local_addr());
  1282. Ok(s)
  1283. }