rendezvous_server.rs 48 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293
  1. use crate::common::*;
  2. use crate::peer::*;
  3. use hbb_common::{
  4. allow_err,
  5. bytes::{Bytes, BytesMut},
  6. bytes_codec::BytesCodec,
  7. config,
  8. futures::future::join_all,
  9. futures_util::{
  10. sink::SinkExt,
  11. stream::{SplitSink, StreamExt},
  12. },
  13. log,
  14. protobuf::{Message as _, MessageField},
  15. rendezvous_proto::{
  16. register_pk_response::Result::{TOO_FREQUENT, UUID_MISMATCH},
  17. *,
  18. },
  19. tcp::{listen_any, FramedStream},
  20. timeout,
  21. tokio::{
  22. self,
  23. io::{AsyncReadExt, AsyncWriteExt},
  24. net::{TcpListener, TcpStream},
  25. sync::{mpsc, Mutex},
  26. time::{interval, Duration},
  27. },
  28. tokio_util::codec::Framed,
  29. try_into_v4,
  30. udp::FramedSocket,
  31. AddrMangle, ResultType,
  32. };
  33. use ipnetwork::Ipv4Network;
  34. use sodiumoxide::crypto::sign;
  35. use std::{
  36. collections::HashMap,
  37. net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
  38. sync::Arc,
  39. time::Instant,
  40. };
  41. #[derive(Clone, Debug)]
  42. enum Data {
  43. Msg(Box<RendezvousMessage>, SocketAddr),
  44. RelayServers0(String),
  45. RelayServers(RelayServers),
  46. }
  47. const REG_TIMEOUT: i32 = 30_000;
  48. type TcpStreamSink = SplitSink<Framed<TcpStream, BytesCodec>, Bytes>;
  49. type WsSink = SplitSink<tokio_tungstenite::WebSocketStream<TcpStream>, tungstenite::Message>;
  50. enum Sink {
  51. TcpStream(TcpStreamSink),
  52. Ws(WsSink),
  53. }
  54. type Sender = mpsc::UnboundedSender<Data>;
  55. type Receiver = mpsc::UnboundedReceiver<Data>;
  56. static mut ROTATION_RELAY_SERVER: usize = 0;
  57. type RelayServers = Vec<String>;
  58. static CHECK_RELAY_TIMEOUT: u64 = 3_000;
  59. static mut ALWAYS_USE_RELAY: bool = false;
  60. #[derive(Clone)]
  61. struct Inner {
  62. serial: i32,
  63. version: String,
  64. software_url: String,
  65. mask: Option<Ipv4Network>,
  66. local_ip: String,
  67. sk: Option<sign::SecretKey>,
  68. }
  69. #[derive(Clone)]
  70. pub struct RendezvousServer {
  71. tcp_punch: Arc<Mutex<HashMap<SocketAddr, Sink>>>,
  72. pm: PeerMap,
  73. tx: Sender,
  74. relay_servers: Arc<RelayServers>,
  75. relay_servers0: Arc<RelayServers>,
  76. rendezvous_servers: Arc<Vec<String>>,
  77. inner: Arc<Inner>,
  78. }
  79. enum LoopFailure {
  80. UdpSocket,
  81. Listener3,
  82. Listener2,
  83. Listener,
  84. }
  85. impl RendezvousServer {
  86. #[tokio::main(flavor = "multi_thread")]
  87. pub async fn start(port: i32, serial: i32, key: &str, rmem: usize) -> ResultType<()> {
  88. let (key, sk) = Self::get_server_sk(key);
  89. let nat_port = port - 1;
  90. let ws_port = port + 2;
  91. let pm = PeerMap::new().await?;
  92. log::info!("serial={}", serial);
  93. let rendezvous_servers = get_servers(&get_arg("rendezvous-servers"), "rendezvous-servers");
  94. log::info!("Listening on tcp/udp :{}", port);
  95. log::info!("Listening on tcp :{}, extra port for NAT test", nat_port);
  96. log::info!("Listening on websocket :{}", ws_port);
  97. let mut socket = create_udp_listener(port, rmem).await?;
  98. let (tx, mut rx) = mpsc::unbounded_channel::<Data>();
  99. let software_url = get_arg("software-url");
  100. let version = hbb_common::get_version_from_url(&software_url);
  101. if !version.is_empty() {
  102. log::info!("software_url: {}, version: {}", software_url, version);
  103. }
  104. let mask = get_arg("mask").parse().ok();
  105. let local_ip = if mask.is_none() {
  106. "".to_owned()
  107. } else {
  108. get_arg_or(
  109. "local-ip",
  110. local_ip_address::local_ip()
  111. .map(|x| x.to_string())
  112. .unwrap_or_default(),
  113. )
  114. };
  115. let mut rs = Self {
  116. tcp_punch: Arc::new(Mutex::new(HashMap::new())),
  117. pm,
  118. tx: tx.clone(),
  119. relay_servers: Default::default(),
  120. relay_servers0: Default::default(),
  121. rendezvous_servers: Arc::new(rendezvous_servers),
  122. inner: Arc::new(Inner {
  123. serial,
  124. version,
  125. software_url,
  126. sk,
  127. mask,
  128. local_ip,
  129. }),
  130. };
  131. log::info!("mask: {:?}", rs.inner.mask);
  132. log::info!("local-ip: {:?}", rs.inner.local_ip);
  133. std::env::set_var("PORT_FOR_API", port.to_string());
  134. rs.parse_relay_servers(&get_arg("relay-servers"));
  135. let mut listener = create_tcp_listener(port).await?;
  136. let mut listener2 = create_tcp_listener(nat_port).await?;
  137. let mut listener3 = create_tcp_listener(ws_port).await?;
  138. let test_addr = std::env::var("TEST_HBBS").unwrap_or_default();
  139. if std::env::var("ALWAYS_USE_RELAY")
  140. .unwrap_or_default()
  141. .to_uppercase()
  142. == "Y"
  143. {
  144. unsafe {
  145. ALWAYS_USE_RELAY = true;
  146. }
  147. }
  148. log::info!(
  149. "ALWAYS_USE_RELAY={}",
  150. if unsafe { ALWAYS_USE_RELAY } {
  151. "Y"
  152. } else {
  153. "N"
  154. }
  155. );
  156. if test_addr.to_lowercase() != "no" {
  157. let test_addr = if test_addr.is_empty() {
  158. listener.local_addr()?
  159. } else {
  160. test_addr.parse()?
  161. };
  162. tokio::spawn(async move {
  163. allow_err!(test_hbbs(test_addr).await);
  164. });
  165. };
  166. let main_task = async move {
  167. loop {
  168. log::info!("Start");
  169. match rs
  170. .io_loop(
  171. &mut rx,
  172. &mut listener,
  173. &mut listener2,
  174. &mut listener3,
  175. &mut socket,
  176. &key,
  177. )
  178. .await
  179. {
  180. LoopFailure::UdpSocket => {
  181. drop(socket);
  182. socket = create_udp_listener(port, rmem).await?;
  183. }
  184. LoopFailure::Listener => {
  185. drop(listener);
  186. listener = create_tcp_listener(port).await?;
  187. }
  188. LoopFailure::Listener2 => {
  189. drop(listener2);
  190. listener2 = create_tcp_listener(nat_port).await?;
  191. }
  192. LoopFailure::Listener3 => {
  193. drop(listener3);
  194. listener3 = create_tcp_listener(ws_port).await?;
  195. }
  196. }
  197. }
  198. };
  199. let listen_signal = listen_signal();
  200. tokio::select!(
  201. res = main_task => res,
  202. res = listen_signal => res,
  203. )
  204. }
  205. async fn io_loop(
  206. &mut self,
  207. rx: &mut Receiver,
  208. listener: &mut TcpListener,
  209. listener2: &mut TcpListener,
  210. listener3: &mut TcpListener,
  211. socket: &mut FramedSocket,
  212. key: &str,
  213. ) -> LoopFailure {
  214. let mut timer_check_relay = interval(Duration::from_millis(CHECK_RELAY_TIMEOUT));
  215. loop {
  216. tokio::select! {
  217. _ = timer_check_relay.tick() => {
  218. if self.relay_servers0.len() > 1 {
  219. let rs = self.relay_servers0.clone();
  220. let tx = self.tx.clone();
  221. tokio::spawn(async move {
  222. check_relay_servers(rs, tx).await;
  223. });
  224. }
  225. }
  226. Some(data) = rx.recv() => {
  227. match data {
  228. Data::Msg(msg, addr) => { allow_err!(socket.send(msg.as_ref(), addr).await); }
  229. Data::RelayServers0(rs) => { self.parse_relay_servers(&rs); }
  230. Data::RelayServers(rs) => { self.relay_servers = Arc::new(rs); }
  231. }
  232. }
  233. res = socket.next() => {
  234. match res {
  235. Some(Ok((bytes, addr))) => {
  236. if let Err(err) = self.handle_udp(&bytes, addr.into(), socket, key).await {
  237. log::error!("udp failure: {}", err);
  238. return LoopFailure::UdpSocket;
  239. }
  240. }
  241. Some(Err(err)) => {
  242. log::error!("udp failure: {}", err);
  243. return LoopFailure::UdpSocket;
  244. }
  245. None => {
  246. // unreachable!() ?
  247. }
  248. }
  249. }
  250. res = listener2.accept() => {
  251. match res {
  252. Ok((stream, addr)) => {
  253. stream.set_nodelay(true).ok();
  254. self.handle_listener2(stream, addr).await;
  255. }
  256. Err(err) => {
  257. log::error!("listener2.accept failed: {}", err);
  258. return LoopFailure::Listener2;
  259. }
  260. }
  261. }
  262. res = listener3.accept() => {
  263. match res {
  264. Ok((stream, addr)) => {
  265. stream.set_nodelay(true).ok();
  266. self.handle_listener(stream, addr, key, true).await;
  267. }
  268. Err(err) => {
  269. log::error!("listener3.accept failed: {}", err);
  270. return LoopFailure::Listener3;
  271. }
  272. }
  273. }
  274. res = listener.accept() => {
  275. match res {
  276. Ok((stream, addr)) => {
  277. stream.set_nodelay(true).ok();
  278. self.handle_listener(stream, addr, key, false).await;
  279. }
  280. Err(err) => {
  281. log::error!("listener.accept failed: {}", err);
  282. return LoopFailure::Listener;
  283. }
  284. }
  285. }
  286. }
  287. }
  288. }
  289. #[inline]
  290. async fn handle_udp(
  291. &mut self,
  292. bytes: &BytesMut,
  293. addr: SocketAddr,
  294. socket: &mut FramedSocket,
  295. key: &str,
  296. ) -> ResultType<()> {
  297. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(bytes) {
  298. match msg_in.union {
  299. Some(rendezvous_message::Union::RegisterPeer(rp)) => {
  300. // B registered
  301. if !rp.id.is_empty() {
  302. log::trace!("New peer registered: {:?} {:?}", &rp.id, &addr);
  303. self.update_addr(rp.id, addr, socket).await?;
  304. if self.inner.serial > rp.serial {
  305. let mut msg_out = RendezvousMessage::new();
  306. msg_out.set_configure_update(ConfigUpdate {
  307. serial: self.inner.serial,
  308. rendezvous_servers: (*self.rendezvous_servers).clone(),
  309. ..Default::default()
  310. });
  311. socket.send(&msg_out, addr).await?;
  312. }
  313. }
  314. }
  315. Some(rendezvous_message::Union::RegisterPk(rk)) => {
  316. if rk.uuid.is_empty() || rk.pk.is_empty() {
  317. return Ok(());
  318. }
  319. let id = rk.id;
  320. let ip = addr.ip().to_string();
  321. if id.len() < 6 {
  322. return send_rk_res(socket, addr, UUID_MISMATCH).await;
  323. } else if !self.check_ip_blocker(&ip, &id).await {
  324. return send_rk_res(socket, addr, TOO_FREQUENT).await;
  325. }
  326. let peer = self.pm.get_or(&id).await;
  327. let (changed, ip_changed) = {
  328. let peer = peer.read().await;
  329. if peer.uuid.is_empty() {
  330. (true, false)
  331. } else {
  332. if peer.uuid == rk.uuid {
  333. if peer.info.ip != ip && peer.pk != rk.pk {
  334. log::warn!(
  335. "Peer {} ip/pk mismatch: {}/{:?} vs {}/{:?}",
  336. id,
  337. ip,
  338. rk.pk,
  339. peer.info.ip,
  340. peer.pk,
  341. );
  342. drop(peer);
  343. return send_rk_res(socket, addr, UUID_MISMATCH).await;
  344. }
  345. } else {
  346. log::warn!(
  347. "Peer {} uuid mismatch: {:?} vs {:?}",
  348. id,
  349. rk.uuid,
  350. peer.uuid
  351. );
  352. drop(peer);
  353. return send_rk_res(socket, addr, UUID_MISMATCH).await;
  354. }
  355. let ip_changed = peer.info.ip != ip;
  356. (
  357. peer.uuid != rk.uuid || peer.pk != rk.pk || ip_changed,
  358. ip_changed,
  359. )
  360. }
  361. };
  362. let mut req_pk = peer.read().await.reg_pk;
  363. if req_pk.1.elapsed().as_secs() > 6 {
  364. req_pk.0 = 0;
  365. } else if req_pk.0 > 2 {
  366. return send_rk_res(socket, addr, TOO_FREQUENT).await;
  367. }
  368. req_pk.0 += 1;
  369. req_pk.1 = Instant::now();
  370. peer.write().await.reg_pk = req_pk;
  371. if ip_changed {
  372. let mut lock = IP_CHANGES.lock().await;
  373. if let Some((tm, ips)) = lock.get_mut(&id) {
  374. if tm.elapsed().as_secs() > IP_CHANGE_DUR {
  375. *tm = Instant::now();
  376. ips.clear();
  377. ips.insert(ip.clone(), 1);
  378. } else if let Some(v) = ips.get_mut(&ip) {
  379. *v += 1;
  380. } else {
  381. ips.insert(ip.clone(), 1);
  382. }
  383. } else {
  384. lock.insert(
  385. id.clone(),
  386. (Instant::now(), HashMap::from([(ip.clone(), 1)])),
  387. );
  388. }
  389. }
  390. if changed {
  391. self.pm.update_pk(id, peer, addr, rk.uuid, rk.pk, ip).await;
  392. }
  393. let mut msg_out = RendezvousMessage::new();
  394. msg_out.set_register_pk_response(RegisterPkResponse {
  395. result: register_pk_response::Result::OK.into(),
  396. ..Default::default()
  397. });
  398. socket.send(&msg_out, addr).await?
  399. }
  400. Some(rendezvous_message::Union::PunchHoleRequest(ph)) => {
  401. if self.pm.is_in_memory(&ph.id).await {
  402. self.handle_udp_punch_hole_request(addr, ph, key).await?;
  403. } else {
  404. // not in memory, fetch from db with spawn in case blocking me
  405. let mut me = self.clone();
  406. let key = key.to_owned();
  407. tokio::spawn(async move {
  408. allow_err!(me.handle_udp_punch_hole_request(addr, ph, &key).await);
  409. });
  410. }
  411. }
  412. Some(rendezvous_message::Union::PunchHoleSent(phs)) => {
  413. self.handle_hole_sent(phs, addr, Some(socket)).await?;
  414. }
  415. Some(rendezvous_message::Union::LocalAddr(la)) => {
  416. self.handle_local_addr(la, addr, Some(socket)).await?;
  417. }
  418. Some(rendezvous_message::Union::ConfigureUpdate(mut cu)) => {
  419. if addr.ip().is_loopback() && cu.serial > self.inner.serial {
  420. let mut inner: Inner = (*self.inner).clone();
  421. inner.serial = cu.serial;
  422. self.inner = Arc::new(inner);
  423. self.rendezvous_servers = Arc::new(
  424. cu.rendezvous_servers
  425. .drain(..)
  426. .filter(|x| {
  427. !x.is_empty()
  428. && test_if_valid_server(x, "rendezvous-server").is_ok()
  429. })
  430. .collect(),
  431. );
  432. log::info!(
  433. "configure updated: serial={} rendezvous-servers={:?}",
  434. self.inner.serial,
  435. self.rendezvous_servers
  436. );
  437. }
  438. }
  439. Some(rendezvous_message::Union::SoftwareUpdate(su)) => {
  440. if !self.inner.version.is_empty() && su.url != self.inner.version {
  441. let mut msg_out = RendezvousMessage::new();
  442. msg_out.set_software_update(SoftwareUpdate {
  443. url: self.inner.software_url.clone(),
  444. ..Default::default()
  445. });
  446. socket.send(&msg_out, addr).await?;
  447. }
  448. }
  449. _ => {}
  450. }
  451. }
  452. Ok(())
  453. }
  454. #[inline]
  455. async fn handle_tcp(
  456. &mut self,
  457. bytes: &[u8],
  458. sink: &mut Option<Sink>,
  459. addr: SocketAddr,
  460. key: &str,
  461. ws: bool,
  462. ) -> bool {
  463. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(bytes) {
  464. match msg_in.union {
  465. Some(rendezvous_message::Union::PunchHoleRequest(ph)) => {
  466. // there maybe several attempt, so sink can be none
  467. if let Some(sink) = sink.take() {
  468. self.tcp_punch.lock().await.insert(try_into_v4(addr), sink);
  469. }
  470. allow_err!(self.handle_tcp_punch_hole_request(addr, ph, key, ws).await);
  471. return true;
  472. }
  473. Some(rendezvous_message::Union::RequestRelay(mut rf)) => {
  474. // there maybe several attempt, so sink can be none
  475. if let Some(sink) = sink.take() {
  476. self.tcp_punch.lock().await.insert(try_into_v4(addr), sink);
  477. }
  478. if let Some(peer) = self.pm.get_in_memory(&rf.id).await {
  479. let mut msg_out = RendezvousMessage::new();
  480. rf.socket_addr = AddrMangle::encode(addr).into();
  481. msg_out.set_request_relay(rf);
  482. let peer_addr = peer.read().await.socket_addr;
  483. self.tx.send(Data::Msg(msg_out.into(), peer_addr)).ok();
  484. }
  485. return true;
  486. }
  487. Some(rendezvous_message::Union::RelayResponse(mut rr)) => {
  488. let addr_b = AddrMangle::decode(&rr.socket_addr);
  489. rr.socket_addr = Default::default();
  490. let id = rr.id();
  491. if !id.is_empty() {
  492. let pk = self.get_pk(&rr.version, id.to_owned()).await;
  493. rr.set_pk(pk);
  494. }
  495. let mut msg_out = RendezvousMessage::new();
  496. if !rr.relay_server.is_empty() {
  497. if self.is_lan(addr_b) {
  498. // https://github.com/rustdesk/rustdesk-server/issues/24
  499. rr.relay_server = self.inner.local_ip.clone();
  500. } else if rr.relay_server == self.inner.local_ip {
  501. rr.relay_server = self.get_relay_server(addr.ip(), addr_b.ip());
  502. }
  503. }
  504. msg_out.set_relay_response(rr);
  505. allow_err!(self.send_to_tcp_sync(msg_out, addr_b).await);
  506. }
  507. Some(rendezvous_message::Union::PunchHoleSent(phs)) => {
  508. allow_err!(self.handle_hole_sent(phs, addr, None).await);
  509. }
  510. Some(rendezvous_message::Union::LocalAddr(la)) => {
  511. allow_err!(self.handle_local_addr(la, addr, None).await);
  512. }
  513. Some(rendezvous_message::Union::TestNatRequest(tar)) => {
  514. let mut msg_out = RendezvousMessage::new();
  515. let mut res = TestNatResponse {
  516. port: addr.port() as _,
  517. ..Default::default()
  518. };
  519. if self.inner.serial > tar.serial {
  520. let mut cu = ConfigUpdate::new();
  521. cu.serial = self.inner.serial;
  522. cu.rendezvous_servers = (*self.rendezvous_servers).clone();
  523. res.cu = MessageField::from_option(Some(cu));
  524. }
  525. msg_out.set_test_nat_response(res);
  526. Self::send_to_sink(sink, msg_out).await;
  527. }
  528. Some(rendezvous_message::Union::RegisterPk(_)) => {
  529. let res = register_pk_response::Result::NOT_SUPPORT;
  530. let mut msg_out = RendezvousMessage::new();
  531. msg_out.set_register_pk_response(RegisterPkResponse {
  532. result: res.into(),
  533. ..Default::default()
  534. });
  535. Self::send_to_sink(sink, msg_out).await;
  536. }
  537. _ => {}
  538. }
  539. }
  540. false
  541. }
  542. #[inline]
  543. async fn update_addr(
  544. &mut self,
  545. id: String,
  546. socket_addr: SocketAddr,
  547. socket: &mut FramedSocket,
  548. ) -> ResultType<()> {
  549. let (request_pk, ip_change) = if let Some(old) = self.pm.get_in_memory(&id).await {
  550. let mut old = old.write().await;
  551. let ip = socket_addr.ip();
  552. let ip_change = if old.socket_addr.port() != 0 {
  553. ip != old.socket_addr.ip()
  554. } else {
  555. ip.to_string() != old.info.ip
  556. } && !ip.is_loopback();
  557. let request_pk = old.pk.is_empty() || ip_change;
  558. if !request_pk {
  559. old.socket_addr = socket_addr;
  560. old.last_reg_time = Instant::now();
  561. }
  562. let ip_change = if ip_change && old.reg_pk.0 <= 2 {
  563. Some(if old.socket_addr.port() == 0 {
  564. old.info.ip.clone()
  565. } else {
  566. old.socket_addr.to_string()
  567. })
  568. } else {
  569. None
  570. };
  571. (request_pk, ip_change)
  572. } else {
  573. (true, None)
  574. };
  575. if let Some(old) = ip_change {
  576. log::info!("IP change of {} from {} to {}", id, old, socket_addr);
  577. }
  578. let mut msg_out = RendezvousMessage::new();
  579. msg_out.set_register_peer_response(RegisterPeerResponse {
  580. request_pk,
  581. ..Default::default()
  582. });
  583. socket.send(&msg_out, socket_addr).await
  584. }
  585. #[inline]
  586. async fn handle_hole_sent<'a>(
  587. &mut self,
  588. phs: PunchHoleSent,
  589. addr: SocketAddr,
  590. socket: Option<&'a mut FramedSocket>,
  591. ) -> ResultType<()> {
  592. // punch hole sent from B, tell A that B is ready to be connected
  593. let addr_a = AddrMangle::decode(&phs.socket_addr);
  594. log::debug!(
  595. "{} punch hole response to {:?} from {:?}",
  596. if socket.is_none() { "TCP" } else { "UDP" },
  597. &addr_a,
  598. &addr
  599. );
  600. let mut msg_out = RendezvousMessage::new();
  601. let mut p = PunchHoleResponse {
  602. socket_addr: AddrMangle::encode(addr).into(),
  603. pk: self.get_pk(&phs.version, phs.id).await,
  604. relay_server: phs.relay_server.clone(),
  605. ..Default::default()
  606. };
  607. if let Ok(t) = phs.nat_type.enum_value() {
  608. p.set_nat_type(t);
  609. }
  610. msg_out.set_punch_hole_response(p);
  611. if let Some(socket) = socket {
  612. socket.send(&msg_out, addr_a).await?;
  613. } else {
  614. self.send_to_tcp(msg_out, addr_a).await;
  615. }
  616. Ok(())
  617. }
  618. #[inline]
  619. async fn handle_local_addr<'a>(
  620. &mut self,
  621. la: LocalAddr,
  622. addr: SocketAddr,
  623. socket: Option<&'a mut FramedSocket>,
  624. ) -> ResultType<()> {
  625. // relay local addrs of B to A
  626. let addr_a = AddrMangle::decode(&la.socket_addr);
  627. log::debug!(
  628. "{} local addrs response to {:?} from {:?}",
  629. if socket.is_none() { "TCP" } else { "UDP" },
  630. &addr_a,
  631. &addr
  632. );
  633. let mut msg_out = RendezvousMessage::new();
  634. let mut p = PunchHoleResponse {
  635. socket_addr: la.local_addr.clone(),
  636. pk: self.get_pk(&la.version, la.id).await,
  637. relay_server: la.relay_server,
  638. ..Default::default()
  639. };
  640. p.set_is_local(true);
  641. msg_out.set_punch_hole_response(p);
  642. if let Some(socket) = socket {
  643. socket.send(&msg_out, addr_a).await?;
  644. } else {
  645. self.send_to_tcp(msg_out, addr_a).await;
  646. }
  647. Ok(())
  648. }
  649. #[inline]
  650. async fn handle_punch_hole_request(
  651. &mut self,
  652. addr: SocketAddr,
  653. ph: PunchHoleRequest,
  654. key: &str,
  655. ws: bool,
  656. ) -> ResultType<(RendezvousMessage, Option<SocketAddr>)> {
  657. let mut ph = ph;
  658. if !key.is_empty() && ph.licence_key != key {
  659. let mut msg_out = RendezvousMessage::new();
  660. msg_out.set_punch_hole_response(PunchHoleResponse {
  661. failure: punch_hole_response::Failure::LICENSE_MISMATCH.into(),
  662. ..Default::default()
  663. });
  664. return Ok((msg_out, None));
  665. }
  666. let id = ph.id;
  667. // punch hole request from A, relay to B,
  668. // check if in same intranet first,
  669. // fetch local addrs if in same intranet.
  670. // because punch hole won't work if in the same intranet,
  671. // all routers will drop such self-connections.
  672. if let Some(peer) = self.pm.get(&id).await {
  673. let (elapsed, peer_addr) = {
  674. let r = peer.read().await;
  675. (r.last_reg_time.elapsed().as_millis() as i32, r.socket_addr)
  676. };
  677. if elapsed >= REG_TIMEOUT {
  678. let mut msg_out = RendezvousMessage::new();
  679. msg_out.set_punch_hole_response(PunchHoleResponse {
  680. failure: punch_hole_response::Failure::OFFLINE.into(),
  681. ..Default::default()
  682. });
  683. return Ok((msg_out, None));
  684. }
  685. let mut msg_out = RendezvousMessage::new();
  686. let peer_is_lan = self.is_lan(peer_addr);
  687. let is_lan = self.is_lan(addr);
  688. let mut relay_server = self.get_relay_server(addr.ip(), peer_addr.ip());
  689. if unsafe { ALWAYS_USE_RELAY } || (peer_is_lan ^ is_lan) {
  690. if peer_is_lan {
  691. // https://github.com/rustdesk/rustdesk-server/issues/24
  692. relay_server = self.inner.local_ip.clone()
  693. }
  694. ph.nat_type = NatType::SYMMETRIC.into(); // will force relay
  695. }
  696. let same_intranet: bool = !ws
  697. && (peer_is_lan && is_lan || {
  698. match (peer_addr, addr) {
  699. (SocketAddr::V4(a), SocketAddr::V4(b)) => a.ip() == b.ip(),
  700. (SocketAddr::V6(a), SocketAddr::V6(b)) => a.ip() == b.ip(),
  701. _ => false,
  702. }
  703. });
  704. let socket_addr = AddrMangle::encode(addr).into();
  705. if same_intranet {
  706. log::debug!(
  707. "Fetch local addr {:?} {:?} request from {:?}",
  708. id,
  709. peer_addr,
  710. addr
  711. );
  712. msg_out.set_fetch_local_addr(FetchLocalAddr {
  713. socket_addr,
  714. relay_server,
  715. ..Default::default()
  716. });
  717. } else {
  718. log::debug!(
  719. "Punch hole {:?} {:?} request from {:?}",
  720. id,
  721. peer_addr,
  722. addr
  723. );
  724. msg_out.set_punch_hole(PunchHole {
  725. socket_addr,
  726. nat_type: ph.nat_type,
  727. relay_server,
  728. ..Default::default()
  729. });
  730. }
  731. Ok((msg_out, Some(peer_addr)))
  732. } else {
  733. let mut msg_out = RendezvousMessage::new();
  734. msg_out.set_punch_hole_response(PunchHoleResponse {
  735. failure: punch_hole_response::Failure::ID_NOT_EXIST.into(),
  736. ..Default::default()
  737. });
  738. Ok((msg_out, None))
  739. }
  740. }
  741. #[inline]
  742. async fn handle_online_request(
  743. &mut self,
  744. stream: &mut FramedStream,
  745. peers: Vec<String>,
  746. ) -> ResultType<()> {
  747. let mut states = BytesMut::zeroed((peers.len() + 7) / 8);
  748. for (i, peer_id) in peers.iter().enumerate() {
  749. if let Some(peer) = self.pm.get_in_memory(peer_id).await {
  750. let elapsed = peer.read().await.last_reg_time.elapsed().as_millis() as i32;
  751. // bytes index from left to right
  752. let states_idx = i / 8;
  753. let bit_idx = 7 - i % 8;
  754. if elapsed < REG_TIMEOUT {
  755. states[states_idx] |= 0x01 << bit_idx;
  756. }
  757. }
  758. }
  759. let mut msg_out = RendezvousMessage::new();
  760. msg_out.set_online_response(OnlineResponse {
  761. states: states.into(),
  762. ..Default::default()
  763. });
  764. stream.send(&msg_out).await?;
  765. Ok(())
  766. }
  767. #[inline]
  768. async fn send_to_tcp(&mut self, msg: RendezvousMessage, addr: SocketAddr) {
  769. let mut tcp = self.tcp_punch.lock().await.remove(&try_into_v4(addr));
  770. tokio::spawn(async move {
  771. Self::send_to_sink(&mut tcp, msg).await;
  772. });
  773. }
  774. #[inline]
  775. async fn send_to_sink(sink: &mut Option<Sink>, msg: RendezvousMessage) {
  776. if let Some(sink) = sink.as_mut() {
  777. if let Ok(bytes) = msg.write_to_bytes() {
  778. match sink {
  779. Sink::TcpStream(s) => {
  780. allow_err!(s.send(Bytes::from(bytes)).await);
  781. }
  782. Sink::Ws(ws) => {
  783. allow_err!(ws.send(tungstenite::Message::Binary(bytes)).await);
  784. }
  785. }
  786. }
  787. }
  788. }
  789. #[inline]
  790. async fn send_to_tcp_sync(
  791. &mut self,
  792. msg: RendezvousMessage,
  793. addr: SocketAddr,
  794. ) -> ResultType<()> {
  795. let mut sink = self.tcp_punch.lock().await.remove(&try_into_v4(addr));
  796. Self::send_to_sink(&mut sink, msg).await;
  797. Ok(())
  798. }
  799. #[inline]
  800. async fn handle_tcp_punch_hole_request(
  801. &mut self,
  802. addr: SocketAddr,
  803. ph: PunchHoleRequest,
  804. key: &str,
  805. ws: bool,
  806. ) -> ResultType<()> {
  807. let (msg, to_addr) = self.handle_punch_hole_request(addr, ph, key, ws).await?;
  808. if let Some(addr) = to_addr {
  809. self.tx.send(Data::Msg(msg.into(), addr))?;
  810. } else {
  811. self.send_to_tcp_sync(msg, addr).await?;
  812. }
  813. Ok(())
  814. }
  815. #[inline]
  816. async fn handle_udp_punch_hole_request(
  817. &mut self,
  818. addr: SocketAddr,
  819. ph: PunchHoleRequest,
  820. key: &str,
  821. ) -> ResultType<()> {
  822. let (msg, to_addr) = self.handle_punch_hole_request(addr, ph, key, false).await?;
  823. self.tx.send(Data::Msg(
  824. msg.into(),
  825. match to_addr {
  826. Some(addr) => addr,
  827. None => addr,
  828. },
  829. ))?;
  830. Ok(())
  831. }
  832. async fn check_ip_blocker(&self, ip: &str, id: &str) -> bool {
  833. let mut lock = IP_BLOCKER.lock().await;
  834. let now = Instant::now();
  835. if let Some(old) = lock.get_mut(ip) {
  836. let counter = &mut old.0;
  837. if counter.1.elapsed().as_secs() > IP_BLOCK_DUR {
  838. counter.0 = 0;
  839. } else if counter.0 > 30 {
  840. return false;
  841. }
  842. counter.0 += 1;
  843. counter.1 = now;
  844. let counter = &mut old.1;
  845. let is_new = counter.0.get(id).is_none();
  846. if counter.1.elapsed().as_secs() > DAY_SECONDS {
  847. counter.0.clear();
  848. } else if counter.0.len() > 300 {
  849. return !is_new;
  850. }
  851. if is_new {
  852. counter.0.insert(id.to_owned());
  853. }
  854. counter.1 = now;
  855. } else {
  856. lock.insert(ip.to_owned(), ((0, now), (Default::default(), now)));
  857. }
  858. true
  859. }
  860. fn parse_relay_servers(&mut self, relay_servers: &str) {
  861. let rs = get_servers(relay_servers, "relay-servers");
  862. self.relay_servers0 = Arc::new(rs);
  863. self.relay_servers = self.relay_servers0.clone();
  864. }
  865. fn get_relay_server(&self, _pa: IpAddr, _pb: IpAddr) -> String {
  866. if self.relay_servers.is_empty() {
  867. return "".to_owned();
  868. } else if self.relay_servers.len() == 1 {
  869. return self.relay_servers[0].clone();
  870. }
  871. let i = unsafe {
  872. ROTATION_RELAY_SERVER += 1;
  873. ROTATION_RELAY_SERVER % self.relay_servers.len()
  874. };
  875. self.relay_servers[i].clone()
  876. }
  877. async fn check_cmd(&self, cmd: &str) -> String {
  878. use std::fmt::Write as _;
  879. let mut res = "".to_owned();
  880. let mut fds = cmd.trim().split(' ');
  881. match fds.next() {
  882. Some("h") => {
  883. res = format!(
  884. "{}\n{}\n{}\n{}\n{}\n{}\n",
  885. "relay-servers(rs) <separated by ,>",
  886. "reload-geo(rg)",
  887. "ip-blocker(ib) [<ip>|<number>] [-]",
  888. "ip-changes(ic) [<id>|<number>] [-]",
  889. "always-use-relay(aur)",
  890. "test-geo(tg) <ip1> <ip2>"
  891. )
  892. }
  893. Some("relay-servers" | "rs") => {
  894. if let Some(rs) = fds.next() {
  895. self.tx.send(Data::RelayServers0(rs.to_owned())).ok();
  896. } else {
  897. for ip in self.relay_servers.iter() {
  898. let _ = writeln!(res, "{ip}");
  899. }
  900. }
  901. }
  902. Some("ip-blocker" | "ib") => {
  903. let mut lock = IP_BLOCKER.lock().await;
  904. lock.retain(|&_, (a, b)| {
  905. a.1.elapsed().as_secs() <= IP_BLOCK_DUR
  906. || b.1.elapsed().as_secs() <= DAY_SECONDS
  907. });
  908. res = format!("{}\n", lock.len());
  909. let ip = fds.next();
  910. let mut start = ip.map(|x| x.parse::<i32>().unwrap_or(-1)).unwrap_or(-1);
  911. if start < 0 {
  912. if let Some(ip) = ip {
  913. if let Some((a, b)) = lock.get(ip) {
  914. let _ = writeln!(
  915. res,
  916. "{}/{}s {}/{}s",
  917. a.0,
  918. a.1.elapsed().as_secs(),
  919. b.0.len(),
  920. b.1.elapsed().as_secs()
  921. );
  922. }
  923. if fds.next() == Some("-") {
  924. lock.remove(ip);
  925. }
  926. } else {
  927. start = 0;
  928. }
  929. }
  930. if start >= 0 {
  931. let mut it = lock.iter();
  932. for i in 0..(start + 10) {
  933. let x = it.next();
  934. if x.is_none() {
  935. break;
  936. }
  937. if i < start {
  938. continue;
  939. }
  940. if let Some((ip, (a, b))) = x {
  941. let _ = writeln!(
  942. res,
  943. "{}: {}/{}s {}/{}s",
  944. ip,
  945. a.0,
  946. a.1.elapsed().as_secs(),
  947. b.0.len(),
  948. b.1.elapsed().as_secs()
  949. );
  950. }
  951. }
  952. }
  953. }
  954. Some("ip-changes" | "ic") => {
  955. let mut lock = IP_CHANGES.lock().await;
  956. lock.retain(|&_, v| v.0.elapsed().as_secs() < IP_CHANGE_DUR_X2 && v.1.len() > 1);
  957. res = format!("{}\n", lock.len());
  958. let id = fds.next();
  959. let mut start = id.map(|x| x.parse::<i32>().unwrap_or(-1)).unwrap_or(-1);
  960. if !(0..=10_000_000).contains(&start) {
  961. if let Some(id) = id {
  962. if let Some((tm, ips)) = lock.get(id) {
  963. let _ = writeln!(res, "{}s {:?}", tm.elapsed().as_secs(), ips);
  964. }
  965. if fds.next() == Some("-") {
  966. lock.remove(id);
  967. }
  968. } else {
  969. start = 0;
  970. }
  971. }
  972. if start >= 0 {
  973. let mut it = lock.iter();
  974. for i in 0..(start + 10) {
  975. let x = it.next();
  976. if x.is_none() {
  977. break;
  978. }
  979. if i < start {
  980. continue;
  981. }
  982. if let Some((id, (tm, ips))) = x {
  983. let _ = writeln!(res, "{}: {}s {:?}", id, tm.elapsed().as_secs(), ips,);
  984. }
  985. }
  986. }
  987. }
  988. Some("always-use-relay" | "aur") => {
  989. if let Some(rs) = fds.next() {
  990. if rs.to_uppercase() == "Y" {
  991. unsafe { ALWAYS_USE_RELAY = true };
  992. } else {
  993. unsafe { ALWAYS_USE_RELAY = false };
  994. }
  995. self.tx.send(Data::RelayServers0(rs.to_owned())).ok();
  996. } else {
  997. let _ = writeln!(res, "ALWAYS_USE_RELAY: {:?}", unsafe { ALWAYS_USE_RELAY });
  998. }
  999. }
  1000. Some("test-geo" | "tg") => {
  1001. if let Some(rs) = fds.next() {
  1002. if let Ok(a) = rs.parse::<IpAddr>() {
  1003. if let Some(rs) = fds.next() {
  1004. if let Ok(b) = rs.parse::<IpAddr>() {
  1005. res = format!("{:?}", self.get_relay_server(a, b));
  1006. }
  1007. } else {
  1008. res = format!("{:?}", self.get_relay_server(a, a));
  1009. }
  1010. }
  1011. }
  1012. }
  1013. _ => {}
  1014. }
  1015. res
  1016. }
  1017. async fn handle_listener2(&self, stream: TcpStream, addr: SocketAddr) {
  1018. let mut rs = self.clone();
  1019. if addr.ip().is_loopback() {
  1020. tokio::spawn(async move {
  1021. let mut stream = stream;
  1022. let mut buffer = [0; 1024];
  1023. if let Ok(Ok(n)) = timeout(1000, stream.read(&mut buffer[..])).await {
  1024. if let Ok(data) = std::str::from_utf8(&buffer[..n]) {
  1025. let res = rs.check_cmd(data).await;
  1026. stream.write(res.as_bytes()).await.ok();
  1027. }
  1028. }
  1029. });
  1030. return;
  1031. }
  1032. let stream = FramedStream::from(stream, addr);
  1033. tokio::spawn(async move {
  1034. let mut stream = stream;
  1035. if let Some(Ok(bytes)) = stream.next_timeout(30_000).await {
  1036. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
  1037. match msg_in.union {
  1038. Some(rendezvous_message::Union::TestNatRequest(_)) => {
  1039. let mut msg_out = RendezvousMessage::new();
  1040. msg_out.set_test_nat_response(TestNatResponse {
  1041. port: addr.port() as _,
  1042. ..Default::default()
  1043. });
  1044. stream.send(&msg_out).await.ok();
  1045. }
  1046. Some(rendezvous_message::Union::OnlineRequest(or)) => {
  1047. allow_err!(rs.handle_online_request(&mut stream, or.peers).await);
  1048. }
  1049. _ => {}
  1050. }
  1051. }
  1052. }
  1053. });
  1054. }
  1055. async fn handle_listener(&self, stream: TcpStream, addr: SocketAddr, key: &str, ws: bool) {
  1056. log::debug!("Tcp connection from {:?}, ws: {}", addr, ws);
  1057. let mut rs = self.clone();
  1058. let key = key.to_owned();
  1059. tokio::spawn(async move {
  1060. allow_err!(rs.handle_listener_inner(stream, addr, &key, ws).await);
  1061. });
  1062. }
  1063. #[inline]
  1064. async fn handle_listener_inner(
  1065. &mut self,
  1066. stream: TcpStream,
  1067. addr: SocketAddr,
  1068. key: &str,
  1069. ws: bool,
  1070. ) -> ResultType<()> {
  1071. let mut sink;
  1072. if ws {
  1073. let ws_stream = tokio_tungstenite::accept_async(stream).await?;
  1074. let (a, mut b) = ws_stream.split();
  1075. sink = Some(Sink::Ws(a));
  1076. while let Ok(Some(Ok(msg))) = timeout(30_000, b.next()).await {
  1077. if let tungstenite::Message::Binary(bytes) = msg {
  1078. if !self.handle_tcp(&bytes, &mut sink, addr, key, ws).await {
  1079. break;
  1080. }
  1081. }
  1082. }
  1083. } else {
  1084. let (a, mut b) = Framed::new(stream, BytesCodec::new()).split();
  1085. sink = Some(Sink::TcpStream(a));
  1086. while let Ok(Some(Ok(bytes))) = timeout(30_000, b.next()).await {
  1087. if !self.handle_tcp(&bytes, &mut sink, addr, key, ws).await {
  1088. break;
  1089. }
  1090. }
  1091. }
  1092. if sink.is_none() {
  1093. self.tcp_punch.lock().await.remove(&try_into_v4(addr));
  1094. }
  1095. log::debug!("Tcp connection from {:?} closed", addr);
  1096. Ok(())
  1097. }
  1098. #[inline]
  1099. async fn get_pk(&mut self, version: &str, id: String) -> Bytes {
  1100. if version.is_empty() || self.inner.sk.is_none() {
  1101. Bytes::new()
  1102. } else {
  1103. match self.pm.get(&id).await {
  1104. Some(peer) => {
  1105. let pk = peer.read().await.pk.clone();
  1106. sign::sign(
  1107. &hbb_common::message_proto::IdPk {
  1108. id,
  1109. pk,
  1110. ..Default::default()
  1111. }
  1112. .write_to_bytes()
  1113. .unwrap_or_default(),
  1114. self.inner.sk.as_ref().unwrap(),
  1115. )
  1116. .into()
  1117. }
  1118. _ => Bytes::new(),
  1119. }
  1120. }
  1121. }
  1122. #[inline]
  1123. fn get_server_sk(key: &str) -> (String, Option<sign::SecretKey>) {
  1124. let mut out_sk = None;
  1125. let mut key = key.to_owned();
  1126. if let Ok(sk) = base64::decode(&key) {
  1127. if sk.len() == sign::SECRETKEYBYTES {
  1128. log::info!("The key is a crypto private key");
  1129. key = base64::encode(&sk[(sign::SECRETKEYBYTES / 2)..]);
  1130. let mut tmp = [0u8; sign::SECRETKEYBYTES];
  1131. tmp[..].copy_from_slice(&sk);
  1132. out_sk = Some(sign::SecretKey(tmp));
  1133. }
  1134. }
  1135. if key.is_empty() || key == "-" || key == "_" {
  1136. let (pk, sk) = crate::common::gen_sk(0);
  1137. out_sk = sk;
  1138. if !key.is_empty() {
  1139. key = pk;
  1140. } else {
  1141. std::env::set_var("KEY_FOR_API", pk);
  1142. }
  1143. }
  1144. if !key.is_empty() {
  1145. log::info!("Key: {}", key);
  1146. std::env::set_var("KEY_FOR_API", key.clone());
  1147. }
  1148. (key, out_sk)
  1149. }
  1150. #[inline]
  1151. fn is_lan(&self, addr: SocketAddr) -> bool {
  1152. if let Some(network) = &self.inner.mask {
  1153. match addr {
  1154. SocketAddr::V4(v4_socket_addr) => {
  1155. return network.contains(*v4_socket_addr.ip());
  1156. }
  1157. SocketAddr::V6(v6_socket_addr) => {
  1158. if let Some(v4_addr) = v6_socket_addr.ip().to_ipv4_mapped() {
  1159. return network.contains(v4_addr);
  1160. }
  1161. }
  1162. }
  1163. }
  1164. false
  1165. }
  1166. }
  1167. async fn check_relay_servers(rs0: Arc<RelayServers>, tx: Sender) {
  1168. let mut futs = Vec::new();
  1169. let rs = Arc::new(Mutex::new(Vec::new()));
  1170. for x in rs0.iter() {
  1171. let mut host = x.to_owned();
  1172. if !host.contains(':') {
  1173. host = format!("{}:{}", host, config::RELAY_PORT);
  1174. }
  1175. let rs = rs.clone();
  1176. let x = x.clone();
  1177. futs.push(tokio::spawn(async move {
  1178. if FramedStream::new(&host, None, CHECK_RELAY_TIMEOUT)
  1179. .await
  1180. .is_ok()
  1181. {
  1182. rs.lock().await.push(x);
  1183. }
  1184. }));
  1185. }
  1186. join_all(futs).await;
  1187. log::debug!("check_relay_servers");
  1188. let rs = std::mem::take(&mut *rs.lock().await);
  1189. if !rs.is_empty() {
  1190. tx.send(Data::RelayServers(rs)).ok();
  1191. }
  1192. }
  1193. // temp solution to solve udp socket failure
  1194. async fn test_hbbs(addr: SocketAddr) -> ResultType<()> {
  1195. let mut socket = FramedSocket::new(config::Config::get_any_listen_addr(addr.is_ipv4())).await?;
  1196. let mut msg_out = RendezvousMessage::new();
  1197. msg_out.set_register_peer(RegisterPeer {
  1198. id: "(:test_hbbs:)".to_owned(),
  1199. ..Default::default()
  1200. });
  1201. let mut last_time_recv = Instant::now();
  1202. let mut timer = interval(Duration::from_secs(1));
  1203. loop {
  1204. tokio::select! {
  1205. _ = timer.tick() => {
  1206. if last_time_recv.elapsed().as_secs() > 12 {
  1207. log::error!("Timeout of test_hbbs");
  1208. std::process::exit(1);
  1209. }
  1210. socket.send(&msg_out, addr).await?;
  1211. }
  1212. Some(Ok((bytes, _))) = socket.next() => {
  1213. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
  1214. log::trace!("Recv {:?} of test_hbbs", msg_in);
  1215. last_time_recv = Instant::now();
  1216. }
  1217. }
  1218. }
  1219. }
  1220. }
  1221. #[inline]
  1222. async fn send_rk_res(
  1223. socket: &mut FramedSocket,
  1224. addr: SocketAddr,
  1225. res: register_pk_response::Result,
  1226. ) -> ResultType<()> {
  1227. let mut msg_out = RendezvousMessage::new();
  1228. msg_out.set_register_pk_response(RegisterPkResponse {
  1229. result: res.into(),
  1230. ..Default::default()
  1231. });
  1232. socket.send(&msg_out, addr).await
  1233. }
  1234. async fn create_udp_listener(port: i32, rmem: usize) -> ResultType<FramedSocket> {
  1235. let addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), port as _);
  1236. if let Ok(s) = FramedSocket::new_reuse(&addr, false, rmem).await {
  1237. log::debug!("listen on udp {:?}", s.local_addr());
  1238. return Ok(s);
  1239. }
  1240. let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port as _);
  1241. let s = FramedSocket::new_reuse(&addr, false, rmem).await?;
  1242. log::debug!("listen on udp {:?}", s.local_addr());
  1243. Ok(s)
  1244. }
  1245. #[inline]
  1246. async fn create_tcp_listener(port: i32) -> ResultType<TcpListener> {
  1247. let s = listen_any(port as _).await?;
  1248. log::debug!("listen on tcp {:?}", s.local_addr());
  1249. Ok(s)
  1250. }