rendezvous_server.rs 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715
  1. use hbb_common::{
  2. allow_err,
  3. bytes::{Bytes, BytesMut},
  4. bytes_codec::BytesCodec,
  5. futures_util::{
  6. sink::SinkExt,
  7. stream::{SplitSink, StreamExt},
  8. },
  9. log,
  10. protobuf::{Message as _, MessageField},
  11. rendezvous_proto::*,
  12. tcp::{new_listener, FramedStream},
  13. timeout,
  14. tokio::{
  15. self,
  16. net::TcpStream,
  17. sync::mpsc,
  18. time::{interval, Duration},
  19. },
  20. tokio_util::codec::Framed,
  21. udp::FramedSocket,
  22. AddrMangle, ResultType,
  23. };
  24. use serde_derive::{Deserialize, Serialize};
  25. use std::{
  26. collections::HashMap,
  27. net::SocketAddr,
  28. sync::{Arc, Mutex, RwLock},
  29. time::Instant,
  30. };
  31. #[derive(Clone, Debug)]
  32. struct Peer {
  33. socket_addr: SocketAddr,
  34. last_reg_time: Instant,
  35. uuid: Vec<u8>,
  36. pk: Vec<u8>,
  37. }
  38. impl Default for Peer {
  39. fn default() -> Self {
  40. Self {
  41. socket_addr: "0.0.0.0:0".parse().unwrap(),
  42. last_reg_time: Instant::now()
  43. .checked_sub(std::time::Duration::from_secs(3600))
  44. .unwrap(),
  45. uuid: Vec::new(),
  46. pk: Vec::new(),
  47. }
  48. }
  49. }
  50. #[derive(Debug, Serialize, Deserialize, Default)]
  51. struct PeerSerde {
  52. #[serde(default)]
  53. ip: String,
  54. #[serde(default)]
  55. uuid: Vec<u8>,
  56. #[serde(default)]
  57. pk: Vec<u8>,
  58. }
  59. #[derive(Clone)]
  60. struct PeerMap {
  61. map: Arc<RwLock<HashMap<String, Peer>>>,
  62. db: super::SledAsync,
  63. }
  64. lazy_static::lazy_static! {
  65. static ref PUBLIC_IP: Arc<RwLock<String>> = Default::default();
  66. }
  67. pub const DEFAULT_PORT: &'static str = "21116";
  68. impl PeerMap {
  69. fn new() -> ResultType<Self> {
  70. let mut db: String = "hbbs.db".to_owned();
  71. #[cfg(windows)]
  72. {
  73. if let Some(path) = hbb_common::config::Config::icon_path().parent() {
  74. db = format!("{}\\{}", path.to_str().unwrap_or("."), db);
  75. }
  76. }
  77. #[cfg(not(windows))]
  78. {
  79. db = format!("./{}", db);
  80. }
  81. Ok(Self {
  82. map: Default::default(),
  83. db: super::SledAsync::new(&db, true)?,
  84. })
  85. }
  86. #[inline]
  87. fn update_pk(&mut self, id: String, socket_addr: SocketAddr, uuid: Vec<u8>, pk: Vec<u8>) {
  88. log::info!("update_pk {} {:?} {:?} {:?}", id, socket_addr, uuid, pk);
  89. let mut lock = self.map.write().unwrap();
  90. lock.insert(
  91. id.clone(),
  92. Peer {
  93. socket_addr,
  94. last_reg_time: Instant::now(),
  95. uuid: uuid.clone(),
  96. pk: pk.clone(),
  97. },
  98. );
  99. drop(lock);
  100. let ip = socket_addr.ip().to_string();
  101. self.db.insert(id, PeerSerde { ip, uuid, pk });
  102. }
  103. #[inline]
  104. async fn get(&mut self, id: &str) -> Option<Peer> {
  105. let p = self.map.read().unwrap().get(id).map(|x| x.clone());
  106. if p.is_some() {
  107. return p;
  108. } else {
  109. let id = id.to_owned();
  110. let v = self.db.get(id.clone()).await;
  111. if let Some(v) = super::SledAsync::deserialize::<PeerSerde>(&v) {
  112. self.map.write().unwrap().insert(
  113. id,
  114. Peer {
  115. uuid: v.uuid,
  116. pk: v.pk,
  117. ..Default::default()
  118. },
  119. );
  120. return Some(Peer::default());
  121. }
  122. }
  123. None
  124. }
  125. #[inline]
  126. fn is_in_memory(&self, id: &str) -> bool {
  127. self.map.read().unwrap().contains_key(id)
  128. }
  129. }
  130. const REG_TIMEOUT: i32 = 30_000;
  131. type Sink = SplitSink<Framed<TcpStream, BytesCodec>, Bytes>;
  132. type Sender = mpsc::UnboundedSender<(RendezvousMessage, SocketAddr)>;
  133. static mut ROTATION_RELAY_SERVER: usize = 0;
  134. #[derive(Clone)]
  135. pub struct RendezvousServer {
  136. tcp_punch: Arc<Mutex<HashMap<SocketAddr, Sink>>>,
  137. pm: PeerMap,
  138. tx: Sender,
  139. relay_servers: Vec<String>,
  140. serial: i32,
  141. rendezvous_servers: Vec<String>,
  142. version: String,
  143. software_url: String,
  144. }
  145. impl RendezvousServer {
  146. #[tokio::main(basic_scheduler)]
  147. pub async fn start(
  148. addr: &str,
  149. addr2: &str,
  150. relay_servers: Vec<String>,
  151. serial: i32,
  152. rendezvous_servers: Vec<String>,
  153. software_url: String,
  154. license: &str,
  155. stop: Arc<Mutex<bool>>,
  156. ) -> ResultType<()> {
  157. log::info!("Listening on tcp/udp {}", addr);
  158. log::info!("Listening on tcp {}, extra port for NAT test", addr2);
  159. let mut socket = FramedSocket::new(addr).await?;
  160. let (tx, mut rx) = mpsc::unbounded_channel::<(RendezvousMessage, SocketAddr)>();
  161. let version = hbb_common::get_version_from_url(&software_url);
  162. if !version.is_empty() {
  163. log::info!("software_url: {}, version: {}", software_url, version);
  164. }
  165. let mut rs = Self {
  166. tcp_punch: Arc::new(Mutex::new(HashMap::new())),
  167. pm: PeerMap::new()?,
  168. tx: tx.clone(),
  169. relay_servers,
  170. serial,
  171. rendezvous_servers,
  172. version,
  173. software_url,
  174. };
  175. let mut listener = new_listener(addr, false).await?;
  176. let mut listener2 = new_listener(addr2, false).await?;
  177. let mut timer = interval(Duration::from_millis(300));
  178. loop {
  179. tokio::select! {
  180. _ = timer.tick() => {
  181. if *stop.lock().unwrap() {
  182. log::info!("Stopped");
  183. break;
  184. }
  185. }
  186. Some((msg, addr)) = rx.recv() => {
  187. allow_err!(socket.send(&msg, addr).await);
  188. }
  189. Some(Ok((bytes, addr))) = socket.next() => {
  190. allow_err!(rs.handle_msg(&bytes, addr, &mut socket, license).await);
  191. }
  192. Ok((stream, addr)) = listener2.accept() => {
  193. let stream = FramedStream::from(stream);
  194. tokio::spawn(async move {
  195. let mut stream = stream;
  196. if let Some(Ok(bytes)) = stream.next_timeout(30_000).await {
  197. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
  198. if let Some(rendezvous_message::Union::test_nat_request(_)) = msg_in.union {
  199. let mut msg_out = RendezvousMessage::new();
  200. msg_out.set_test_nat_response(TestNatResponse {
  201. port: addr.port() as _,
  202. ..Default::default()
  203. });
  204. stream.send(&msg_out).await.ok();
  205. }
  206. }
  207. }
  208. });
  209. }
  210. Ok((stream, addr)) = listener.accept() => {
  211. log::debug!("Tcp connection from {:?}", addr);
  212. if let Ok(local_addr) = stream.local_addr() {
  213. *PUBLIC_IP.write().unwrap() = local_addr.ip().to_string();
  214. }
  215. let (a, mut b) = Framed::new(stream, BytesCodec::new()).split();
  216. let tcp_punch = rs.tcp_punch.clone();
  217. let mut rs = rs.clone();
  218. let license = license.to_owned();
  219. tokio::spawn(async move {
  220. let mut sender = Some(a);
  221. while let Ok(Some(Ok(bytes))) = timeout(30_000, b.next()).await {
  222. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
  223. match msg_in.union {
  224. Some(rendezvous_message::Union::punch_hole_request(ph)) => {
  225. // there maybe several attempt, so sender can be none
  226. if let Some(sender) = sender.take() {
  227. tcp_punch.lock().unwrap().insert(addr, sender);
  228. }
  229. allow_err!(rs.handle_tcp_punch_hole_request(addr, ph, &license).await);
  230. }
  231. Some(rendezvous_message::Union::request_relay(mut rf)) => {
  232. // there maybe several attempt, so sender can be none
  233. if let Some(sender) = sender.take() {
  234. tcp_punch.lock().unwrap().insert(addr, sender);
  235. }
  236. if let Some(peer) = rs.pm.map.read().unwrap().get(&rf.id).map(|x| x.clone()) {
  237. let mut msg_out = RendezvousMessage::new();
  238. rf.socket_addr = AddrMangle::encode(addr);
  239. msg_out.set_request_relay(rf);
  240. rs.tx.send((msg_out, peer.socket_addr)).ok();
  241. }
  242. }
  243. Some(rendezvous_message::Union::relay_response(mut rr)) => {
  244. let addr_b = AddrMangle::decode(&rr.socket_addr);
  245. rr.socket_addr = Default::default();
  246. let id = rr.get_id();
  247. if !id.is_empty() {
  248. if let Some(peer) = rs.pm.get(&id).await {
  249. rr.set_pk(peer.pk.clone());
  250. }
  251. }
  252. let mut msg_out = RendezvousMessage::new();
  253. msg_out.set_relay_response(rr);
  254. allow_err!(rs.send_to_tcp_sync(&msg_out, addr_b).await);
  255. break;
  256. }
  257. Some(rendezvous_message::Union::punch_hole_sent(phs)) => {
  258. allow_err!(rs.handle_hole_sent(phs, addr, None).await);
  259. break;
  260. }
  261. Some(rendezvous_message::Union::local_addr(la)) => {
  262. allow_err!(rs.handle_local_addr(la, addr, None).await);
  263. break;
  264. }
  265. Some(rendezvous_message::Union::test_nat_request(tar)) => {
  266. let mut msg_out = RendezvousMessage::new();
  267. let mut res = TestNatResponse {
  268. port: addr.port() as _,
  269. ..Default::default()
  270. };
  271. if rs.serial > tar.serial {
  272. let mut cu = ConfigUpdate::new();
  273. cu.serial = rs.serial;
  274. cu.rendezvous_servers = rs.rendezvous_servers.clone();
  275. res.cu = MessageField::from_option(Some(cu));
  276. }
  277. msg_out.set_test_nat_response(res);
  278. if let Some(tcp) = sender.as_mut() {
  279. if let Ok(bytes) = msg_out.write_to_bytes() {
  280. allow_err!(tcp.send(Bytes::from(bytes)).await);
  281. }
  282. }
  283. break;
  284. }
  285. _ => {
  286. break;
  287. }
  288. }
  289. } else {
  290. break;
  291. }
  292. }
  293. if sender.is_none() {
  294. rs.tcp_punch.lock().unwrap().remove(&addr);
  295. }
  296. log::debug!("Tcp connection from {:?} closed", addr);
  297. });
  298. }
  299. }
  300. }
  301. Ok(())
  302. }
  303. #[inline]
  304. async fn handle_msg(
  305. &mut self,
  306. bytes: &BytesMut,
  307. addr: SocketAddr,
  308. socket: &mut FramedSocket,
  309. license: &str,
  310. ) -> ResultType<()> {
  311. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
  312. match msg_in.union {
  313. Some(rendezvous_message::Union::register_peer(rp)) => {
  314. // B registered
  315. if rp.id.len() > 0 {
  316. log::debug!("New peer registered: {:?} {:?}", &rp.id, &addr);
  317. self.update_addr(rp.id, addr, socket).await?;
  318. if self.serial > rp.serial {
  319. let mut msg_out = RendezvousMessage::new();
  320. msg_out.set_configure_update(ConfigUpdate {
  321. serial: self.serial,
  322. rendezvous_servers: self.rendezvous_servers.clone(),
  323. ..Default::default()
  324. });
  325. socket.send(&msg_out, addr).await?;
  326. }
  327. }
  328. }
  329. Some(rendezvous_message::Union::register_pk(rk)) => {
  330. if rk.uuid.is_empty() {
  331. return Ok(());
  332. }
  333. let id = rk.id;
  334. let mut res = register_pk_response::Result::OK;
  335. if let Some(peer) = self.pm.get(&id).await {
  336. if !peer.uuid.is_empty() && peer.uuid != rk.uuid {
  337. log::warn!(
  338. "Peer {} uuid mismatch: {:?} vs {:?}",
  339. id,
  340. rk.uuid,
  341. peer.uuid
  342. );
  343. res = register_pk_response::Result::UUID_MISMATCH;
  344. } else if peer.uuid.is_empty() || peer.pk != rk.pk {
  345. self.pm.update_pk(id, addr, rk.uuid, rk.pk);
  346. }
  347. } else {
  348. self.pm.update_pk(id, addr, rk.uuid, rk.pk);
  349. }
  350. let mut msg_out = RendezvousMessage::new();
  351. msg_out.set_register_pk_response(RegisterPkResponse {
  352. result: res.into(),
  353. ..Default::default()
  354. });
  355. socket.send(&msg_out, addr).await?
  356. }
  357. Some(rendezvous_message::Union::punch_hole_request(ph)) => {
  358. if self.pm.is_in_memory(&ph.id) {
  359. self.handle_udp_punch_hole_request(addr, ph, license)
  360. .await?;
  361. } else {
  362. // not in memory, fetch from db with spawn in case blocking me
  363. let mut me = self.clone();
  364. let license = license.to_owned();
  365. tokio::spawn(async move {
  366. allow_err!(me.handle_udp_punch_hole_request(addr, ph, &license).await);
  367. });
  368. }
  369. }
  370. Some(rendezvous_message::Union::punch_hole_sent(phs)) => {
  371. self.handle_hole_sent(phs, addr, Some(socket)).await?;
  372. }
  373. Some(rendezvous_message::Union::local_addr(la)) => {
  374. self.handle_local_addr(la, addr, Some(socket)).await?;
  375. }
  376. Some(rendezvous_message::Union::configure_update(mut cu)) => {
  377. if addr.ip() == std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1))
  378. && cu.serial > self.serial
  379. {
  380. self.serial = cu.serial;
  381. self.rendezvous_servers = cu
  382. .rendezvous_servers
  383. .drain(..)
  384. .filter(|x| {
  385. !x.is_empty()
  386. && test_if_valid_server(x, "rendezvous-server").is_ok()
  387. })
  388. .collect();
  389. log::info!(
  390. "configure updated: serial={} rendezvous-servers={:?}",
  391. self.serial,
  392. self.rendezvous_servers
  393. );
  394. }
  395. }
  396. Some(rendezvous_message::Union::software_update(su)) => {
  397. if !self.version.is_empty() && su.url != self.version {
  398. let mut msg_out = RendezvousMessage::new();
  399. msg_out.set_software_update(SoftwareUpdate {
  400. url: self.software_url.clone(),
  401. ..Default::default()
  402. });
  403. socket.send(&msg_out, addr).await?;
  404. }
  405. }
  406. _ => {}
  407. }
  408. }
  409. Ok(())
  410. }
  411. #[inline]
  412. async fn update_addr(
  413. &mut self,
  414. id: String,
  415. socket_addr: SocketAddr,
  416. socket: &mut FramedSocket,
  417. ) -> ResultType<()> {
  418. let mut lock = self.pm.map.write().unwrap();
  419. let last_reg_time = Instant::now();
  420. if let Some(old) = lock.get_mut(&id) {
  421. old.socket_addr = socket_addr;
  422. old.last_reg_time = last_reg_time;
  423. let request_pk = old.pk.is_empty();
  424. drop(lock);
  425. let mut msg_out = RendezvousMessage::new();
  426. msg_out.set_register_peer_response(RegisterPeerResponse {
  427. request_pk,
  428. ..Default::default()
  429. });
  430. socket.send(&msg_out, socket_addr).await?;
  431. } else {
  432. drop(lock);
  433. let mut pm = self.pm.clone();
  434. let tx = self.tx.clone();
  435. tokio::spawn(async move {
  436. let v = pm.db.get(id.clone()).await;
  437. let (uuid, pk) = {
  438. if let Some(v) = super::SledAsync::deserialize::<PeerSerde>(&v) {
  439. (v.uuid, v.pk)
  440. } else {
  441. (Vec::new(), Vec::new())
  442. }
  443. };
  444. let mut msg_out = RendezvousMessage::new();
  445. msg_out.set_register_peer_response(RegisterPeerResponse {
  446. request_pk: pk.is_empty(),
  447. ..Default::default()
  448. });
  449. tx.send((msg_out, socket_addr)).ok();
  450. pm.map.write().unwrap().insert(
  451. id,
  452. Peer {
  453. socket_addr,
  454. last_reg_time,
  455. uuid,
  456. pk,
  457. },
  458. );
  459. });
  460. }
  461. Ok(())
  462. }
  463. #[inline]
  464. async fn handle_hole_sent<'a>(
  465. &mut self,
  466. phs: PunchHoleSent,
  467. addr: SocketAddr,
  468. socket: Option<&'a mut FramedSocket>,
  469. ) -> ResultType<()> {
  470. // punch hole sent from B, tell A that B is ready to be connected
  471. let addr_a = AddrMangle::decode(&phs.socket_addr);
  472. log::debug!(
  473. "{} punch hole response to {:?} from {:?}",
  474. if socket.is_none() { "TCP" } else { "UDP" },
  475. &addr_a,
  476. &addr
  477. );
  478. let mut msg_out = RendezvousMessage::new();
  479. let pk = match self.pm.get(&phs.id).await {
  480. Some(peer) => peer.pk,
  481. _ => Vec::new(),
  482. };
  483. let mut p = PunchHoleResponse {
  484. socket_addr: AddrMangle::encode(addr),
  485. pk,
  486. relay_server: phs.relay_server.clone(),
  487. ..Default::default()
  488. };
  489. if let Ok(t) = phs.nat_type.enum_value() {
  490. p.set_nat_type(t);
  491. }
  492. msg_out.set_punch_hole_response(p);
  493. if let Some(socket) = socket {
  494. socket.send(&msg_out, addr_a).await?;
  495. } else {
  496. self.send_to_tcp(&msg_out, addr_a).await;
  497. }
  498. Ok(())
  499. }
  500. #[inline]
  501. async fn handle_local_addr<'a>(
  502. &mut self,
  503. la: LocalAddr,
  504. addr: SocketAddr,
  505. socket: Option<&'a mut FramedSocket>,
  506. ) -> ResultType<()> {
  507. // relay local addrs of B to A
  508. let addr_a = AddrMangle::decode(&la.socket_addr);
  509. log::debug!(
  510. "{} local addrs response to {:?} from {:?}",
  511. if socket.is_none() { "TCP" } else { "UDP" },
  512. &addr_a,
  513. &addr
  514. );
  515. let mut msg_out = RendezvousMessage::new();
  516. let mut p = PunchHoleResponse {
  517. socket_addr: la.local_addr.clone(),
  518. relay_server: la.relay_server,
  519. ..Default::default()
  520. };
  521. p.set_is_local(true);
  522. msg_out.set_punch_hole_response(p);
  523. if let Some(socket) = socket {
  524. socket.send(&msg_out, addr_a).await?;
  525. } else {
  526. self.send_to_tcp(&msg_out, addr_a).await;
  527. }
  528. Ok(())
  529. }
  530. #[inline]
  531. async fn handle_punch_hole_request(
  532. &mut self,
  533. addr: SocketAddr,
  534. ph: PunchHoleRequest,
  535. license: &str,
  536. ) -> ResultType<(RendezvousMessage, Option<SocketAddr>)> {
  537. if !license.is_empty() && ph.licence_key != license {
  538. let mut msg_out = RendezvousMessage::new();
  539. msg_out.set_punch_hole_response(PunchHoleResponse {
  540. failure: punch_hole_response::Failure::LICENCE_MISMATCH.into(),
  541. ..Default::default()
  542. });
  543. return Ok((msg_out, None));
  544. }
  545. let id = ph.id;
  546. // punch hole request from A, relay to B,
  547. // check if in same intranet first,
  548. // fetch local addrs if in same intranet.
  549. // because punch hole won't work if in the same intranet,
  550. // all routers will drop such self-connections.
  551. if let Some(peer) = self.pm.get(&id).await {
  552. if peer.last_reg_time.elapsed().as_millis() as i32 >= REG_TIMEOUT {
  553. let mut msg_out = RendezvousMessage::new();
  554. msg_out.set_punch_hole_response(PunchHoleResponse {
  555. failure: punch_hole_response::Failure::OFFLINE.into(),
  556. ..Default::default()
  557. });
  558. return Ok((msg_out, None));
  559. }
  560. let mut msg_out = RendezvousMessage::new();
  561. let same_intranet = match peer.socket_addr {
  562. SocketAddr::V4(a) => match addr {
  563. SocketAddr::V4(b) => a.ip() == b.ip(),
  564. _ => false,
  565. },
  566. SocketAddr::V6(a) => match addr {
  567. SocketAddr::V6(b) => a.ip() == b.ip(),
  568. _ => false,
  569. },
  570. };
  571. let socket_addr = AddrMangle::encode(addr);
  572. if same_intranet {
  573. log::debug!(
  574. "Fetch local addr {:?} {:?} request from {:?}",
  575. id,
  576. &peer.socket_addr,
  577. &addr
  578. );
  579. let i = unsafe {
  580. ROTATION_RELAY_SERVER += 1;
  581. ROTATION_RELAY_SERVER % self.relay_servers.len()
  582. };
  583. msg_out.set_fetch_local_addr(FetchLocalAddr {
  584. socket_addr,
  585. relay_server: check_relay_server(&self.relay_servers[i]),
  586. ..Default::default()
  587. });
  588. } else {
  589. log::debug!(
  590. "Punch hole {:?} {:?} request from {:?}",
  591. id,
  592. &peer.socket_addr,
  593. &addr
  594. );
  595. let i = unsafe {
  596. ROTATION_RELAY_SERVER += 1;
  597. ROTATION_RELAY_SERVER % self.relay_servers.len()
  598. };
  599. msg_out.set_punch_hole(PunchHole {
  600. socket_addr,
  601. nat_type: ph.nat_type,
  602. relay_server: check_relay_server(&self.relay_servers[i]),
  603. ..Default::default()
  604. });
  605. }
  606. return Ok((msg_out, Some(peer.socket_addr)));
  607. } else {
  608. let mut msg_out = RendezvousMessage::new();
  609. msg_out.set_punch_hole_response(PunchHoleResponse {
  610. failure: punch_hole_response::Failure::ID_NOT_EXIST.into(),
  611. ..Default::default()
  612. });
  613. return Ok((msg_out, None));
  614. }
  615. }
  616. #[inline]
  617. async fn send_to_tcp(&mut self, msg: &RendezvousMessage, addr: SocketAddr) {
  618. let tcp = self.tcp_punch.lock().unwrap().remove(&addr);
  619. if let Some(mut tcp) = tcp {
  620. if let Ok(bytes) = msg.write_to_bytes() {
  621. tokio::spawn(async move {
  622. allow_err!(tcp.send(Bytes::from(bytes)).await);
  623. });
  624. }
  625. }
  626. }
  627. #[inline]
  628. async fn send_to_tcp_sync(
  629. &mut self,
  630. msg: &RendezvousMessage,
  631. addr: SocketAddr,
  632. ) -> ResultType<()> {
  633. let tcp = self.tcp_punch.lock().unwrap().remove(&addr);
  634. if let Some(mut tcp) = tcp {
  635. if let Ok(bytes) = msg.write_to_bytes() {
  636. tcp.send(Bytes::from(bytes)).await?;
  637. }
  638. }
  639. Ok(())
  640. }
  641. #[inline]
  642. async fn handle_tcp_punch_hole_request(
  643. &mut self,
  644. addr: SocketAddr,
  645. ph: PunchHoleRequest,
  646. license: &str,
  647. ) -> ResultType<()> {
  648. let (msg, to_addr) = self.handle_punch_hole_request(addr, ph, license).await?;
  649. if let Some(addr) = to_addr {
  650. self.tx.send((msg, addr))?;
  651. } else {
  652. self.send_to_tcp_sync(&msg, addr).await?;
  653. }
  654. Ok(())
  655. }
  656. #[inline]
  657. async fn handle_udp_punch_hole_request(
  658. &mut self,
  659. addr: SocketAddr,
  660. ph: PunchHoleRequest,
  661. license: &str,
  662. ) -> ResultType<()> {
  663. let (msg, to_addr) = self.handle_punch_hole_request(addr, ph, license).await?;
  664. self.tx.send((
  665. msg,
  666. match to_addr {
  667. Some(addr) => addr,
  668. None => addr,
  669. },
  670. ))?;
  671. Ok(())
  672. }
  673. }
  674. pub fn test_if_valid_server(host: &str, name: &str) -> ResultType<SocketAddr> {
  675. let res = if host.contains(":") {
  676. hbb_common::to_socket_addr(host)
  677. } else {
  678. hbb_common::to_socket_addr(&format!("{}:{}", host, 0))
  679. };
  680. if res.is_err() {
  681. log::error!("Invalid {} {}: {:?}", name, host, res);
  682. }
  683. res
  684. }
  685. fn check_relay_server(addr: &str) -> String {
  686. if addr.contains("0.0.0.0") {
  687. return addr.replace("0.0.0.0", &*PUBLIC_IP.read().unwrap());
  688. }
  689. addr.to_owned()
  690. }