rendezvous_server.rs 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764
  1. use hbb_common::{
  2. allow_err,
  3. bytes::{Bytes, BytesMut},
  4. bytes_codec::BytesCodec,
  5. futures_util::{
  6. sink::SinkExt,
  7. stream::{SplitSink, StreamExt},
  8. },
  9. log,
  10. protobuf::{Message as _, MessageField},
  11. rendezvous_proto::*,
  12. sleep,
  13. tcp::{new_listener, FramedStream},
  14. timeout,
  15. tokio::{
  16. self,
  17. net::{TcpListener, TcpStream},
  18. sync::mpsc,
  19. time::{interval, Duration},
  20. },
  21. tokio_util::codec::Framed,
  22. udp::FramedSocket,
  23. AddrMangle, ResultType,
  24. };
  25. use serde_derive::{Deserialize, Serialize};
  26. use std::{
  27. collections::HashMap,
  28. net::SocketAddr,
  29. sync::{Arc, Mutex, RwLock},
  30. time::Instant,
  31. };
  32. #[derive(Clone, Debug)]
  33. struct Peer {
  34. socket_addr: SocketAddr,
  35. last_reg_time: Instant,
  36. uuid: Vec<u8>,
  37. pk: Vec<u8>,
  38. }
  39. impl Default for Peer {
  40. fn default() -> Self {
  41. Self {
  42. socket_addr: "0.0.0.0:0".parse().unwrap(),
  43. last_reg_time: Instant::now()
  44. .checked_sub(std::time::Duration::from_secs(3600))
  45. .unwrap(),
  46. uuid: Vec::new(),
  47. pk: Vec::new(),
  48. }
  49. }
  50. }
  51. #[derive(Debug, Serialize, Deserialize, Default)]
  52. struct PeerSerde {
  53. #[serde(default)]
  54. ip: String,
  55. #[serde(default)]
  56. uuid: Vec<u8>,
  57. #[serde(default)]
  58. pk: Vec<u8>,
  59. }
  60. #[derive(Clone)]
  61. struct PeerMap {
  62. map: Arc<RwLock<HashMap<String, Peer>>>,
  63. db: super::SledAsync,
  64. }
  65. pub const DEFAULT_PORT: &'static str = "21116";
  66. impl PeerMap {
  67. fn new() -> ResultType<Self> {
  68. let mut db: String = "hbbs.db".to_owned();
  69. #[cfg(windows)]
  70. {
  71. if let Some(path) = hbb_common::config::Config::icon_path().parent() {
  72. db = format!("{}\\{}", path.to_str().unwrap_or("."), db);
  73. }
  74. }
  75. #[cfg(not(windows))]
  76. {
  77. db = format!("./{}", db);
  78. }
  79. Ok(Self {
  80. map: Default::default(),
  81. db: super::SledAsync::new(&db, true)?,
  82. })
  83. }
  84. #[inline]
  85. fn update_pk(&mut self, id: String, socket_addr: SocketAddr, uuid: Vec<u8>, pk: Vec<u8>) {
  86. log::info!("update_pk {} {:?} {:?} {:?}", id, socket_addr, uuid, pk);
  87. let mut lock = self.map.write().unwrap();
  88. lock.insert(
  89. id.clone(),
  90. Peer {
  91. socket_addr,
  92. last_reg_time: Instant::now(),
  93. uuid: uuid.clone(),
  94. pk: pk.clone(),
  95. },
  96. );
  97. drop(lock);
  98. let ip = socket_addr.ip().to_string();
  99. self.db.insert(id, PeerSerde { ip, uuid, pk });
  100. }
  101. #[inline]
  102. async fn get(&mut self, id: &str) -> Option<Peer> {
  103. let p = self.map.read().unwrap().get(id).map(|x| x.clone());
  104. if p.is_some() {
  105. return p;
  106. } else {
  107. let id = id.to_owned();
  108. let v = self.db.get(id.clone()).await;
  109. if let Some(v) = super::SledAsync::deserialize::<PeerSerde>(&v) {
  110. self.map.write().unwrap().insert(
  111. id,
  112. Peer {
  113. uuid: v.uuid,
  114. pk: v.pk,
  115. ..Default::default()
  116. },
  117. );
  118. return Some(Peer::default());
  119. }
  120. }
  121. None
  122. }
  123. #[inline]
  124. fn is_in_memory(&self, id: &str) -> bool {
  125. self.map.read().unwrap().contains_key(id)
  126. }
  127. }
  128. const REG_TIMEOUT: i32 = 30_000;
  129. type Sink = SplitSink<Framed<TcpStream, BytesCodec>, Bytes>;
  130. type Sender = mpsc::UnboundedSender<(RendezvousMessage, SocketAddr)>;
  131. type Receiver = mpsc::UnboundedReceiver<(RendezvousMessage, SocketAddr)>;
  132. static mut ROTATION_RELAY_SERVER: usize = 0;
  133. #[derive(Clone)]
  134. pub struct RendezvousServer {
  135. tcp_punch: Arc<Mutex<HashMap<SocketAddr, Sink>>>,
  136. pm: PeerMap,
  137. tx: Sender,
  138. relay_servers: Vec<String>,
  139. serial: i32,
  140. rendezvous_servers: Vec<String>,
  141. version: String,
  142. software_url: String,
  143. }
  144. impl RendezvousServer {
  145. #[tokio::main(basic_scheduler)]
  146. pub async fn start(
  147. addr: &str,
  148. addr2: &str,
  149. relay_servers: Vec<String>,
  150. serial: i32,
  151. rendezvous_servers: Vec<String>,
  152. software_url: String,
  153. key: &str,
  154. stop: Arc<Mutex<bool>>,
  155. id_change_support: bool,
  156. ) -> ResultType<()> {
  157. if !key.is_empty() {
  158. log::info!("Key: {}", key);
  159. }
  160. log::info!("Listening on tcp/udp {}", addr);
  161. log::info!("Listening on tcp {}, extra port for NAT test", addr2);
  162. log::info!("relay-servers={:?}", relay_servers);
  163. log::info!("change-id={:?}", id_change_support);
  164. let mut socket = FramedSocket::new(addr).await?;
  165. let (tx, mut rx) = mpsc::unbounded_channel::<(RendezvousMessage, SocketAddr)>();
  166. let version = hbb_common::get_version_from_url(&software_url);
  167. if !version.is_empty() {
  168. log::info!("software_url: {}, version: {}", software_url, version);
  169. }
  170. let mut rs = Self {
  171. tcp_punch: Arc::new(Mutex::new(HashMap::new())),
  172. pm: PeerMap::new()?,
  173. tx: tx.clone(),
  174. relay_servers,
  175. serial,
  176. rendezvous_servers,
  177. version,
  178. software_url,
  179. };
  180. let mut listener = new_listener(addr, false).await?;
  181. let mut listener2 = new_listener(addr2, false).await?;
  182. loop {
  183. if *stop.lock().unwrap() {
  184. sleep(0.1).await;
  185. continue;
  186. }
  187. log::info!("Start");
  188. rs.io_loop(
  189. &mut rx,
  190. &mut listener,
  191. &mut listener2,
  192. &mut socket,
  193. key,
  194. stop.clone(),
  195. id_change_support,
  196. )
  197. .await;
  198. }
  199. }
  200. async fn io_loop(
  201. &mut self,
  202. rx: &mut Receiver,
  203. listener: &mut TcpListener,
  204. listener2: &mut TcpListener,
  205. socket: &mut FramedSocket,
  206. key: &str,
  207. stop: Arc<Mutex<bool>>,
  208. id_change_support: bool,
  209. ) {
  210. let mut timer = interval(Duration::from_millis(100));
  211. loop {
  212. tokio::select! {
  213. _ = timer.tick() => {
  214. if *stop.lock().unwrap() {
  215. log::info!("Stopped");
  216. break;
  217. }
  218. }
  219. Some((msg, addr)) = rx.recv() => {
  220. allow_err!(socket.send(&msg, addr).await);
  221. }
  222. Some(Ok((bytes, addr))) = socket.next() => {
  223. allow_err!(self.handle_msg(&bytes, addr, socket, key).await);
  224. }
  225. Ok((stream, addr)) = listener2.accept() => {
  226. let stream = FramedStream::from(stream);
  227. tokio::spawn(async move {
  228. let mut stream = stream;
  229. if let Some(Ok(bytes)) = stream.next_timeout(30_000).await {
  230. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
  231. if let Some(rendezvous_message::Union::test_nat_request(_)) = msg_in.union {
  232. let mut msg_out = RendezvousMessage::new();
  233. msg_out.set_test_nat_response(TestNatResponse {
  234. port: addr.port() as _,
  235. ..Default::default()
  236. });
  237. stream.send(&msg_out).await.ok();
  238. }
  239. }
  240. }
  241. });
  242. }
  243. Ok((stream, addr)) = listener.accept() => {
  244. log::debug!("Tcp connection from {:?}", addr);
  245. let (a, mut b) = Framed::new(stream, BytesCodec::new()).split();
  246. let tcp_punch = self.tcp_punch.clone();
  247. let mut rs = self.clone();
  248. let key = key.to_owned();
  249. tokio::spawn(async move {
  250. let mut sender = Some(a);
  251. while let Ok(Some(Ok(bytes))) = timeout(30_000, b.next()).await {
  252. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
  253. match msg_in.union {
  254. Some(rendezvous_message::Union::punch_hole_request(ph)) => {
  255. // there maybe several attempt, so sender can be none
  256. if let Some(sender) = sender.take() {
  257. tcp_punch.lock().unwrap().insert(addr, sender);
  258. }
  259. allow_err!(rs.handle_tcp_punch_hole_request(addr, ph, &key).await);
  260. }
  261. Some(rendezvous_message::Union::request_relay(mut rf)) => {
  262. // there maybe several attempt, so sender can be none
  263. if let Some(sender) = sender.take() {
  264. tcp_punch.lock().unwrap().insert(addr, sender);
  265. }
  266. if let Some(peer) = rs.pm.map.read().unwrap().get(&rf.id).map(|x| x.clone()) {
  267. let mut msg_out = RendezvousMessage::new();
  268. rf.socket_addr = AddrMangle::encode(addr);
  269. msg_out.set_request_relay(rf);
  270. rs.tx.send((msg_out, peer.socket_addr)).ok();
  271. }
  272. }
  273. Some(rendezvous_message::Union::relay_response(mut rr)) => {
  274. let addr_b = AddrMangle::decode(&rr.socket_addr);
  275. rr.socket_addr = Default::default();
  276. let id = rr.get_id();
  277. if !id.is_empty() {
  278. if let Some(peer) = rs.pm.get(&id).await {
  279. rr.set_pk(peer.pk.clone());
  280. }
  281. }
  282. let mut msg_out = RendezvousMessage::new();
  283. msg_out.set_relay_response(rr);
  284. allow_err!(rs.send_to_tcp_sync(&msg_out, addr_b).await);
  285. break;
  286. }
  287. Some(rendezvous_message::Union::punch_hole_sent(phs)) => {
  288. allow_err!(rs.handle_hole_sent(phs, addr, None).await);
  289. break;
  290. }
  291. Some(rendezvous_message::Union::local_addr(la)) => {
  292. allow_err!(rs.handle_local_addr(la, addr, None).await);
  293. break;
  294. }
  295. Some(rendezvous_message::Union::test_nat_request(tar)) => {
  296. let mut msg_out = RendezvousMessage::new();
  297. let mut res = TestNatResponse {
  298. port: addr.port() as _,
  299. ..Default::default()
  300. };
  301. if rs.serial > tar.serial {
  302. let mut cu = ConfigUpdate::new();
  303. cu.serial = rs.serial;
  304. cu.rendezvous_servers = rs.rendezvous_servers.clone();
  305. res.cu = MessageField::from_option(Some(cu));
  306. }
  307. msg_out.set_test_nat_response(res);
  308. if let Some(tcp) = sender.as_mut() {
  309. if let Ok(bytes) = msg_out.write_to_bytes() {
  310. allow_err!(tcp.send(Bytes::from(bytes)).await);
  311. }
  312. }
  313. break;
  314. }
  315. Some(rendezvous_message::Union::register_pk(rk)) => {
  316. if rk.uuid.is_empty() {
  317. break;
  318. }
  319. let mut res = register_pk_response::Result::OK;
  320. if !id_change_support {
  321. res = register_pk_response::Result::NOT_SUPPORT;
  322. } else if !hbb_common::is_valid_custom_id(&rk.id) {
  323. res = register_pk_response::Result::INVALID_ID_FORMAT;
  324. } else if let Some(peer) = rs.pm.get(&rk.id).await {
  325. if peer.uuid != rk.uuid {
  326. res = register_pk_response::Result::ID_EXISTS;
  327. }
  328. }
  329. let mut msg_out = RendezvousMessage::new();
  330. msg_out.set_register_pk_response(RegisterPkResponse {
  331. result: res.into(),
  332. ..Default::default()
  333. });
  334. if let Some(tcp) = sender.as_mut() {
  335. if let Ok(bytes) = msg_out.write_to_bytes() {
  336. allow_err!(tcp.send(Bytes::from(bytes)).await);
  337. }
  338. }
  339. }
  340. _ => {
  341. break;
  342. }
  343. }
  344. } else {
  345. break;
  346. }
  347. }
  348. if sender.is_none() {
  349. rs.tcp_punch.lock().unwrap().remove(&addr);
  350. }
  351. log::debug!("Tcp connection from {:?} closed", addr);
  352. });
  353. }
  354. }
  355. }
  356. }
  357. #[inline]
  358. async fn handle_msg(
  359. &mut self,
  360. bytes: &BytesMut,
  361. addr: SocketAddr,
  362. socket: &mut FramedSocket,
  363. key: &str,
  364. ) -> ResultType<()> {
  365. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
  366. match msg_in.union {
  367. Some(rendezvous_message::Union::register_peer(rp)) => {
  368. // B registered
  369. if rp.id.len() > 0 {
  370. log::trace!("New peer registered: {:?} {:?}", &rp.id, &addr);
  371. self.update_addr(rp.id, addr, socket).await?;
  372. if self.serial > rp.serial {
  373. let mut msg_out = RendezvousMessage::new();
  374. msg_out.set_configure_update(ConfigUpdate {
  375. serial: self.serial,
  376. rendezvous_servers: self.rendezvous_servers.clone(),
  377. ..Default::default()
  378. });
  379. socket.send(&msg_out, addr).await?;
  380. }
  381. }
  382. }
  383. Some(rendezvous_message::Union::register_pk(rk)) => {
  384. if rk.uuid.is_empty() {
  385. return Ok(());
  386. }
  387. let id = rk.id;
  388. let mut res = register_pk_response::Result::OK;
  389. if let Some(peer) = self.pm.get(&id).await {
  390. if peer.uuid != rk.uuid {
  391. log::warn!(
  392. "Peer {} uuid mismatch: {:?} vs {:?}",
  393. id,
  394. rk.uuid,
  395. peer.uuid
  396. );
  397. res = register_pk_response::Result::UUID_MISMATCH;
  398. } else if peer.pk != rk.pk {
  399. self.pm.update_pk(id, addr, rk.uuid, rk.pk);
  400. }
  401. } else {
  402. self.pm.update_pk(id, addr, rk.uuid, rk.pk);
  403. }
  404. let mut msg_out = RendezvousMessage::new();
  405. msg_out.set_register_pk_response(RegisterPkResponse {
  406. result: res.into(),
  407. ..Default::default()
  408. });
  409. socket.send(&msg_out, addr).await?
  410. }
  411. Some(rendezvous_message::Union::punch_hole_request(ph)) => {
  412. if self.pm.is_in_memory(&ph.id) {
  413. self.handle_udp_punch_hole_request(addr, ph, key).await?;
  414. } else {
  415. // not in memory, fetch from db with spawn in case blocking me
  416. let mut me = self.clone();
  417. let key = key.to_owned();
  418. tokio::spawn(async move {
  419. allow_err!(me.handle_udp_punch_hole_request(addr, ph, &key).await);
  420. });
  421. }
  422. }
  423. Some(rendezvous_message::Union::punch_hole_sent(phs)) => {
  424. self.handle_hole_sent(phs, addr, Some(socket)).await?;
  425. }
  426. Some(rendezvous_message::Union::local_addr(la)) => {
  427. self.handle_local_addr(la, addr, Some(socket)).await?;
  428. }
  429. Some(rendezvous_message::Union::configure_update(mut cu)) => {
  430. if addr.ip() == std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1))
  431. && cu.serial > self.serial
  432. {
  433. self.serial = cu.serial;
  434. self.rendezvous_servers = cu
  435. .rendezvous_servers
  436. .drain(..)
  437. .filter(|x| {
  438. !x.is_empty()
  439. && test_if_valid_server(x, "rendezvous-server").is_ok()
  440. })
  441. .collect();
  442. log::info!(
  443. "configure updated: serial={} rendezvous-servers={:?}",
  444. self.serial,
  445. self.rendezvous_servers
  446. );
  447. }
  448. }
  449. Some(rendezvous_message::Union::software_update(su)) => {
  450. if !self.version.is_empty() && su.url != self.version {
  451. let mut msg_out = RendezvousMessage::new();
  452. msg_out.set_software_update(SoftwareUpdate {
  453. url: self.software_url.clone(),
  454. ..Default::default()
  455. });
  456. socket.send(&msg_out, addr).await?;
  457. }
  458. }
  459. _ => {}
  460. }
  461. }
  462. Ok(())
  463. }
  464. #[inline]
  465. async fn update_addr(
  466. &mut self,
  467. id: String,
  468. socket_addr: SocketAddr,
  469. socket: &mut FramedSocket,
  470. ) -> ResultType<()> {
  471. let mut lock = self.pm.map.write().unwrap();
  472. let last_reg_time = Instant::now();
  473. if let Some(old) = lock.get_mut(&id) {
  474. old.socket_addr = socket_addr;
  475. old.last_reg_time = last_reg_time;
  476. let request_pk = old.pk.is_empty();
  477. drop(lock);
  478. let mut msg_out = RendezvousMessage::new();
  479. msg_out.set_register_peer_response(RegisterPeerResponse {
  480. request_pk,
  481. ..Default::default()
  482. });
  483. socket.send(&msg_out, socket_addr).await?;
  484. } else {
  485. drop(lock);
  486. let mut pm = self.pm.clone();
  487. let tx = self.tx.clone();
  488. tokio::spawn(async move {
  489. let v = pm.db.get(id.clone()).await;
  490. let (uuid, pk) = {
  491. if let Some(v) = super::SledAsync::deserialize::<PeerSerde>(&v) {
  492. (v.uuid, v.pk)
  493. } else {
  494. (Vec::new(), Vec::new())
  495. }
  496. };
  497. let mut msg_out = RendezvousMessage::new();
  498. msg_out.set_register_peer_response(RegisterPeerResponse {
  499. request_pk: pk.is_empty(),
  500. ..Default::default()
  501. });
  502. tx.send((msg_out, socket_addr)).ok();
  503. pm.map.write().unwrap().insert(
  504. id,
  505. Peer {
  506. socket_addr,
  507. last_reg_time,
  508. uuid,
  509. pk,
  510. },
  511. );
  512. });
  513. }
  514. Ok(())
  515. }
  516. #[inline]
  517. async fn handle_hole_sent<'a>(
  518. &mut self,
  519. phs: PunchHoleSent,
  520. addr: SocketAddr,
  521. socket: Option<&'a mut FramedSocket>,
  522. ) -> ResultType<()> {
  523. // punch hole sent from B, tell A that B is ready to be connected
  524. let addr_a = AddrMangle::decode(&phs.socket_addr);
  525. log::debug!(
  526. "{} punch hole response to {:?} from {:?}",
  527. if socket.is_none() { "TCP" } else { "UDP" },
  528. &addr_a,
  529. &addr
  530. );
  531. let mut msg_out = RendezvousMessage::new();
  532. let pk = match self.pm.get(&phs.id).await {
  533. Some(peer) => peer.pk,
  534. _ => Vec::new(),
  535. };
  536. let mut p = PunchHoleResponse {
  537. socket_addr: AddrMangle::encode(addr),
  538. pk,
  539. relay_server: phs.relay_server.clone(),
  540. ..Default::default()
  541. };
  542. if let Ok(t) = phs.nat_type.enum_value() {
  543. p.set_nat_type(t);
  544. }
  545. msg_out.set_punch_hole_response(p);
  546. if let Some(socket) = socket {
  547. socket.send(&msg_out, addr_a).await?;
  548. } else {
  549. self.send_to_tcp(&msg_out, addr_a).await;
  550. }
  551. Ok(())
  552. }
  553. #[inline]
  554. async fn handle_local_addr<'a>(
  555. &mut self,
  556. la: LocalAddr,
  557. addr: SocketAddr,
  558. socket: Option<&'a mut FramedSocket>,
  559. ) -> ResultType<()> {
  560. // relay local addrs of B to A
  561. let addr_a = AddrMangle::decode(&la.socket_addr);
  562. log::debug!(
  563. "{} local addrs response to {:?} from {:?}",
  564. if socket.is_none() { "TCP" } else { "UDP" },
  565. &addr_a,
  566. &addr
  567. );
  568. let mut msg_out = RendezvousMessage::new();
  569. let mut p = PunchHoleResponse {
  570. socket_addr: la.local_addr.clone(),
  571. relay_server: la.relay_server,
  572. ..Default::default()
  573. };
  574. p.set_is_local(true);
  575. msg_out.set_punch_hole_response(p);
  576. if let Some(socket) = socket {
  577. socket.send(&msg_out, addr_a).await?;
  578. } else {
  579. self.send_to_tcp(&msg_out, addr_a).await;
  580. }
  581. Ok(())
  582. }
  583. #[inline]
  584. async fn handle_punch_hole_request(
  585. &mut self,
  586. addr: SocketAddr,
  587. ph: PunchHoleRequest,
  588. key: &str,
  589. ) -> ResultType<(RendezvousMessage, Option<SocketAddr>)> {
  590. if !key.is_empty() && ph.licence_key != key {
  591. let mut msg_out = RendezvousMessage::new();
  592. msg_out.set_punch_hole_response(PunchHoleResponse {
  593. failure: punch_hole_response::Failure::LICENCE_MISMATCH.into(),
  594. ..Default::default()
  595. });
  596. return Ok((msg_out, None));
  597. }
  598. let id = ph.id;
  599. // punch hole request from A, relay to B,
  600. // check if in same intranet first,
  601. // fetch local addrs if in same intranet.
  602. // because punch hole won't work if in the same intranet,
  603. // all routers will drop such self-connections.
  604. if let Some(peer) = self.pm.get(&id).await {
  605. if peer.last_reg_time.elapsed().as_millis() as i32 >= REG_TIMEOUT {
  606. let mut msg_out = RendezvousMessage::new();
  607. msg_out.set_punch_hole_response(PunchHoleResponse {
  608. failure: punch_hole_response::Failure::OFFLINE.into(),
  609. ..Default::default()
  610. });
  611. return Ok((msg_out, None));
  612. }
  613. let mut msg_out = RendezvousMessage::new();
  614. let same_intranet = match peer.socket_addr {
  615. SocketAddr::V4(a) => match addr {
  616. SocketAddr::V4(b) => a.ip() == b.ip(),
  617. _ => false,
  618. },
  619. SocketAddr::V6(a) => match addr {
  620. SocketAddr::V6(b) => a.ip() == b.ip(),
  621. _ => false,
  622. },
  623. };
  624. let socket_addr = AddrMangle::encode(addr);
  625. let relay_server = {
  626. if self.relay_servers.is_empty() {
  627. "".to_owned()
  628. } else {
  629. let i = unsafe {
  630. ROTATION_RELAY_SERVER += 1;
  631. ROTATION_RELAY_SERVER % self.relay_servers.len()
  632. };
  633. self.relay_servers[i].clone()
  634. }
  635. };
  636. if same_intranet {
  637. log::debug!(
  638. "Fetch local addr {:?} {:?} request from {:?}",
  639. id,
  640. &peer.socket_addr,
  641. &addr
  642. );
  643. msg_out.set_fetch_local_addr(FetchLocalAddr {
  644. socket_addr,
  645. relay_server,
  646. ..Default::default()
  647. });
  648. } else {
  649. log::debug!(
  650. "Punch hole {:?} {:?} request from {:?}",
  651. id,
  652. &peer.socket_addr,
  653. &addr
  654. );
  655. msg_out.set_punch_hole(PunchHole {
  656. socket_addr,
  657. nat_type: ph.nat_type,
  658. relay_server,
  659. ..Default::default()
  660. });
  661. }
  662. return Ok((msg_out, Some(peer.socket_addr)));
  663. } else {
  664. let mut msg_out = RendezvousMessage::new();
  665. msg_out.set_punch_hole_response(PunchHoleResponse {
  666. failure: punch_hole_response::Failure::ID_NOT_EXIST.into(),
  667. ..Default::default()
  668. });
  669. return Ok((msg_out, None));
  670. }
  671. }
  672. #[inline]
  673. async fn send_to_tcp(&mut self, msg: &RendezvousMessage, addr: SocketAddr) {
  674. let tcp = self.tcp_punch.lock().unwrap().remove(&addr);
  675. if let Some(mut tcp) = tcp {
  676. if let Ok(bytes) = msg.write_to_bytes() {
  677. tokio::spawn(async move {
  678. allow_err!(tcp.send(Bytes::from(bytes)).await);
  679. });
  680. }
  681. }
  682. }
  683. #[inline]
  684. async fn send_to_tcp_sync(
  685. &mut self,
  686. msg: &RendezvousMessage,
  687. addr: SocketAddr,
  688. ) -> ResultType<()> {
  689. let tcp = self.tcp_punch.lock().unwrap().remove(&addr);
  690. if let Some(mut tcp) = tcp {
  691. if let Ok(bytes) = msg.write_to_bytes() {
  692. tcp.send(Bytes::from(bytes)).await?;
  693. }
  694. }
  695. Ok(())
  696. }
  697. #[inline]
  698. async fn handle_tcp_punch_hole_request(
  699. &mut self,
  700. addr: SocketAddr,
  701. ph: PunchHoleRequest,
  702. key: &str,
  703. ) -> ResultType<()> {
  704. let (msg, to_addr) = self.handle_punch_hole_request(addr, ph, key).await?;
  705. if let Some(addr) = to_addr {
  706. self.tx.send((msg, addr))?;
  707. } else {
  708. self.send_to_tcp_sync(&msg, addr).await?;
  709. }
  710. Ok(())
  711. }
  712. #[inline]
  713. async fn handle_udp_punch_hole_request(
  714. &mut self,
  715. addr: SocketAddr,
  716. ph: PunchHoleRequest,
  717. key: &str,
  718. ) -> ResultType<()> {
  719. let (msg, to_addr) = self.handle_punch_hole_request(addr, ph, key).await?;
  720. self.tx.send((
  721. msg,
  722. match to_addr {
  723. Some(addr) => addr,
  724. None => addr,
  725. },
  726. ))?;
  727. Ok(())
  728. }
  729. }
  730. pub fn test_if_valid_server(host: &str, name: &str) -> ResultType<SocketAddr> {
  731. let res = if host.contains(":") {
  732. hbb_common::to_socket_addr(host)
  733. } else {
  734. hbb_common::to_socket_addr(&format!("{}:{}", host, 0))
  735. };
  736. if res.is_err() {
  737. log::error!("Invalid {} {}: {:?}", name, host, res);
  738. }
  739. res
  740. }