rendezvous_server.rs 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777
  1. use hbb_common::{
  2. allow_err,
  3. bytes::{Bytes, BytesMut},
  4. bytes_codec::BytesCodec,
  5. futures_util::{
  6. sink::SinkExt,
  7. stream::{SplitSink, StreamExt},
  8. },
  9. log,
  10. protobuf::{Message as _, MessageField},
  11. rendezvous_proto::*,
  12. sleep,
  13. tcp::{new_listener, FramedStream},
  14. timeout,
  15. tokio::{
  16. self,
  17. net::{TcpListener, TcpStream},
  18. sync::mpsc,
  19. time::{interval, Duration},
  20. },
  21. tokio_util::codec::Framed,
  22. udp::FramedSocket,
  23. AddrMangle, ResultType,
  24. };
  25. use serde_derive::{Deserialize, Serialize};
  26. use std::{
  27. collections::HashMap,
  28. net::SocketAddr,
  29. sync::{Arc, Mutex, RwLock},
  30. time::Instant,
  31. };
  32. #[derive(Clone, Debug)]
  33. struct Peer {
  34. socket_addr: SocketAddr,
  35. last_reg_time: Instant,
  36. uuid: Vec<u8>,
  37. pk: Vec<u8>,
  38. }
  39. impl Default for Peer {
  40. fn default() -> Self {
  41. Self {
  42. socket_addr: "0.0.0.0:0".parse().unwrap(),
  43. last_reg_time: Instant::now()
  44. .checked_sub(std::time::Duration::from_secs(3600))
  45. .unwrap(),
  46. uuid: Vec::new(),
  47. pk: Vec::new(),
  48. }
  49. }
  50. }
  51. #[derive(Debug, Serialize, Deserialize, Default)]
  52. struct PeerSerde {
  53. #[serde(default)]
  54. ip: String,
  55. #[serde(default)]
  56. uuid: Vec<u8>,
  57. #[serde(default)]
  58. pk: Vec<u8>,
  59. }
  60. #[derive(Clone)]
  61. struct PeerMap {
  62. map: Arc<RwLock<HashMap<String, Peer>>>,
  63. db: super::SledAsync,
  64. }
  65. pub const DEFAULT_PORT: &'static str = "21116";
  66. impl PeerMap {
  67. fn new() -> ResultType<Self> {
  68. let mut db: String = "hbbs.db".to_owned();
  69. #[cfg(windows)]
  70. {
  71. if let Some(path) = hbb_common::config::Config::icon_path().parent() {
  72. db = format!("{}\\{}", path.to_str().unwrap_or("."), db);
  73. }
  74. }
  75. #[cfg(not(windows))]
  76. {
  77. db = format!("./{}", db);
  78. }
  79. Ok(Self {
  80. map: Default::default(),
  81. db: super::SledAsync::new(&db, true)?,
  82. })
  83. }
  84. #[inline]
  85. fn update_pk(&mut self, id: String, socket_addr: SocketAddr, uuid: Vec<u8>, pk: Vec<u8>) {
  86. log::info!("update_pk {} {:?} {:?} {:?}", id, socket_addr, uuid, pk);
  87. let mut lock = self.map.write().unwrap();
  88. lock.insert(
  89. id.clone(),
  90. Peer {
  91. socket_addr,
  92. last_reg_time: Instant::now(),
  93. uuid: uuid.clone(),
  94. pk: pk.clone(),
  95. },
  96. );
  97. drop(lock);
  98. let ip = socket_addr.ip().to_string();
  99. self.db.insert(id, PeerSerde { ip, uuid, pk });
  100. }
  101. #[inline]
  102. async fn get(&mut self, id: &str) -> Option<Peer> {
  103. let p = self.map.read().unwrap().get(id).map(|x| x.clone());
  104. if p.is_some() {
  105. return p;
  106. } else {
  107. let id = id.to_owned();
  108. let v = self.db.get(id.clone()).await;
  109. if let Some(v) = super::SledAsync::deserialize::<PeerSerde>(&v) {
  110. self.map.write().unwrap().insert(
  111. id,
  112. Peer {
  113. uuid: v.uuid,
  114. pk: v.pk,
  115. ..Default::default()
  116. },
  117. );
  118. return Some(Peer::default());
  119. }
  120. }
  121. None
  122. }
  123. #[inline]
  124. fn is_in_memory(&self, id: &str) -> bool {
  125. self.map.read().unwrap().contains_key(id)
  126. }
  127. }
  128. const REG_TIMEOUT: i32 = 30_000;
  129. type Sink = SplitSink<Framed<TcpStream, BytesCodec>, Bytes>;
  130. type Sender = mpsc::UnboundedSender<(RendezvousMessage, SocketAddr)>;
  131. type Receiver = mpsc::UnboundedReceiver<(RendezvousMessage, SocketAddr)>;
  132. static mut ROTATION_RELAY_SERVER: usize = 0;
  133. #[derive(Clone)]
  134. pub struct RendezvousServer {
  135. tcp_punch: Arc<Mutex<HashMap<SocketAddr, Sink>>>,
  136. pm: PeerMap,
  137. tx: Sender,
  138. relay_servers: Vec<String>,
  139. serial: i32,
  140. rendezvous_servers: Vec<String>,
  141. version: String,
  142. software_url: String,
  143. }
  144. impl RendezvousServer {
  145. #[tokio::main(basic_scheduler)]
  146. pub async fn start(
  147. addr: &str,
  148. addr2: &str,
  149. relay_servers: Vec<String>,
  150. serial: i32,
  151. rendezvous_servers: Vec<String>,
  152. software_url: String,
  153. key: &str,
  154. stop: Arc<Mutex<bool>>,
  155. id_change_support: bool,
  156. ) -> ResultType<()> {
  157. if !key.is_empty() {
  158. log::info!("Key: {}", key);
  159. }
  160. log::info!("Listening on tcp/udp {}", addr);
  161. log::info!("Listening on tcp {}, extra port for NAT test", addr2);
  162. log::info!("relay-servers={:?}", relay_servers);
  163. log::info!("change-id={:?}", id_change_support);
  164. let mut socket = FramedSocket::new(addr).await?;
  165. let (tx, mut rx) = mpsc::unbounded_channel::<(RendezvousMessage, SocketAddr)>();
  166. let version = hbb_common::get_version_from_url(&software_url);
  167. if !version.is_empty() {
  168. log::info!("software_url: {}, version: {}", software_url, version);
  169. }
  170. let mut rs = Self {
  171. tcp_punch: Arc::new(Mutex::new(HashMap::new())),
  172. pm: PeerMap::new()?,
  173. tx: tx.clone(),
  174. relay_servers,
  175. serial,
  176. rendezvous_servers,
  177. version,
  178. software_url,
  179. };
  180. let mut listener = new_listener(addr, false).await?;
  181. let mut listener2 = new_listener(addr2, false).await?;
  182. loop {
  183. if *stop.lock().unwrap() {
  184. sleep(0.1).await;
  185. continue;
  186. }
  187. log::info!("Start");
  188. rs.io_loop(
  189. &mut rx,
  190. &mut listener,
  191. &mut listener2,
  192. &mut socket,
  193. key,
  194. stop.clone(),
  195. id_change_support,
  196. )
  197. .await;
  198. }
  199. }
  200. async fn io_loop(
  201. &mut self,
  202. rx: &mut Receiver,
  203. listener: &mut TcpListener,
  204. listener2: &mut TcpListener,
  205. socket: &mut FramedSocket,
  206. key: &str,
  207. stop: Arc<Mutex<bool>>,
  208. id_change_support: bool,
  209. ) {
  210. let mut timer = interval(Duration::from_millis(100));
  211. loop {
  212. tokio::select! {
  213. _ = timer.tick() => {
  214. if *stop.lock().unwrap() {
  215. log::info!("Stopped");
  216. break;
  217. }
  218. }
  219. Some((msg, addr)) = rx.recv() => {
  220. allow_err!(socket.send(&msg, addr).await);
  221. }
  222. Some(Ok((bytes, addr))) = socket.next() => {
  223. allow_err!(self.handle_msg(&bytes, addr, socket, key).await);
  224. }
  225. Ok((stream, addr)) = listener2.accept() => {
  226. let stream = FramedStream::from(stream);
  227. tokio::spawn(async move {
  228. let mut stream = stream;
  229. if let Some(Ok(bytes)) = stream.next_timeout(30_000).await {
  230. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
  231. if let Some(rendezvous_message::Union::test_nat_request(_)) = msg_in.union {
  232. let mut msg_out = RendezvousMessage::new();
  233. msg_out.set_test_nat_response(TestNatResponse {
  234. port: addr.port() as _,
  235. ..Default::default()
  236. });
  237. stream.send(&msg_out).await.ok();
  238. }
  239. }
  240. }
  241. });
  242. }
  243. Ok((stream, addr)) = listener.accept() => {
  244. log::debug!("Tcp connection from {:?}", addr);
  245. let (a, mut b) = Framed::new(stream, BytesCodec::new()).split();
  246. let tcp_punch = self.tcp_punch.clone();
  247. let mut rs = self.clone();
  248. let key = key.to_owned();
  249. tokio::spawn(async move {
  250. let mut sender = Some(a);
  251. while let Ok(Some(Ok(bytes))) = timeout(30_000, b.next()).await {
  252. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
  253. match msg_in.union {
  254. Some(rendezvous_message::Union::punch_hole_request(ph)) => {
  255. // there maybe several attempt, so sender can be none
  256. if let Some(sender) = sender.take() {
  257. tcp_punch.lock().unwrap().insert(addr, sender);
  258. }
  259. allow_err!(rs.handle_tcp_punch_hole_request(addr, ph, &key).await);
  260. }
  261. Some(rendezvous_message::Union::request_relay(mut rf)) => {
  262. // there maybe several attempt, so sender can be none
  263. if let Some(sender) = sender.take() {
  264. tcp_punch.lock().unwrap().insert(addr, sender);
  265. }
  266. if let Some(peer) = rs.pm.map.read().unwrap().get(&rf.id).map(|x| x.clone()) {
  267. let mut msg_out = RendezvousMessage::new();
  268. rf.socket_addr = AddrMangle::encode(addr);
  269. msg_out.set_request_relay(rf);
  270. rs.tx.send((msg_out, peer.socket_addr)).ok();
  271. }
  272. }
  273. Some(rendezvous_message::Union::relay_response(mut rr)) => {
  274. let addr_b = AddrMangle::decode(&rr.socket_addr);
  275. rr.socket_addr = Default::default();
  276. let id = rr.get_id();
  277. if !id.is_empty() {
  278. if let Some(peer) = rs.pm.get(&id).await {
  279. rr.set_pk(peer.pk.clone());
  280. }
  281. }
  282. let mut msg_out = RendezvousMessage::new();
  283. msg_out.set_relay_response(rr);
  284. allow_err!(rs.send_to_tcp_sync(&msg_out, addr_b).await);
  285. break;
  286. }
  287. Some(rendezvous_message::Union::punch_hole_sent(phs)) => {
  288. allow_err!(rs.handle_hole_sent(phs, addr, None).await);
  289. break;
  290. }
  291. Some(rendezvous_message::Union::local_addr(la)) => {
  292. allow_err!(rs.handle_local_addr(la, addr, None).await);
  293. break;
  294. }
  295. Some(rendezvous_message::Union::test_nat_request(tar)) => {
  296. let mut msg_out = RendezvousMessage::new();
  297. let mut res = TestNatResponse {
  298. port: addr.port() as _,
  299. ..Default::default()
  300. };
  301. if rs.serial > tar.serial {
  302. let mut cu = ConfigUpdate::new();
  303. cu.serial = rs.serial;
  304. cu.rendezvous_servers = rs.rendezvous_servers.clone();
  305. res.cu = MessageField::from_option(Some(cu));
  306. }
  307. msg_out.set_test_nat_response(res);
  308. if let Some(tcp) = sender.as_mut() {
  309. if let Ok(bytes) = msg_out.write_to_bytes() {
  310. allow_err!(tcp.send(Bytes::from(bytes)).await);
  311. }
  312. }
  313. break;
  314. }
  315. Some(rendezvous_message::Union::register_pk(rk)) => {
  316. if rk.uuid.is_empty() {
  317. break;
  318. }
  319. let mut res = register_pk_response::Result::OK;
  320. if !id_change_support {
  321. res = register_pk_response::Result::NOT_SUPPORT;
  322. } else if !hbb_common::is_valid_custom_id(&rk.id) {
  323. res = register_pk_response::Result::INVALID_ID_FORMAT;
  324. } else if let Some(peer) = rs.pm.get(&rk.id).await {
  325. if peer.uuid != rk.uuid {
  326. res = register_pk_response::Result::ID_EXISTS;
  327. }
  328. }
  329. let mut msg_out = RendezvousMessage::new();
  330. msg_out.set_register_pk_response(RegisterPkResponse {
  331. result: res.into(),
  332. ..Default::default()
  333. });
  334. if let Some(tcp) = sender.as_mut() {
  335. if let Ok(bytes) = msg_out.write_to_bytes() {
  336. allow_err!(tcp.send(Bytes::from(bytes)).await);
  337. }
  338. }
  339. }
  340. _ => {
  341. break;
  342. }
  343. }
  344. } else {
  345. break;
  346. }
  347. }
  348. if sender.is_none() {
  349. rs.tcp_punch.lock().unwrap().remove(&addr);
  350. }
  351. log::debug!("Tcp connection from {:?} closed", addr);
  352. });
  353. }
  354. }
  355. }
  356. }
  357. #[inline]
  358. async fn handle_msg(
  359. &mut self,
  360. bytes: &BytesMut,
  361. addr: SocketAddr,
  362. socket: &mut FramedSocket,
  363. key: &str,
  364. ) -> ResultType<()> {
  365. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
  366. match msg_in.union {
  367. Some(rendezvous_message::Union::register_peer(rp)) => {
  368. // B registered
  369. if rp.id.len() > 0 {
  370. log::trace!("New peer registered: {:?} {:?}", &rp.id, &addr);
  371. self.update_addr(rp.id, addr, socket).await?;
  372. if self.serial > rp.serial {
  373. let mut msg_out = RendezvousMessage::new();
  374. msg_out.set_configure_update(ConfigUpdate {
  375. serial: self.serial,
  376. rendezvous_servers: self.rendezvous_servers.clone(),
  377. ..Default::default()
  378. });
  379. socket.send(&msg_out, addr).await?;
  380. }
  381. }
  382. }
  383. Some(rendezvous_message::Union::register_pk(rk)) => {
  384. if rk.uuid.is_empty() {
  385. return Ok(());
  386. }
  387. let id = rk.id;
  388. let mut res = register_pk_response::Result::OK;
  389. if id.len() < 6 {
  390. res = register_pk_response::Result::UUID_MISMATCH;
  391. } else if let Some(peer) = self.pm.get(&id).await {
  392. if peer.uuid.is_empty() {
  393. self.pm.update_pk(id, addr, rk.uuid, rk.pk);
  394. } else if peer.uuid != rk.uuid {
  395. log::warn!(
  396. "Peer {} uuid mismatch: {:?} vs {:?}",
  397. id,
  398. rk.uuid,
  399. peer.uuid
  400. );
  401. res = register_pk_response::Result::UUID_MISMATCH;
  402. } else if peer.pk != rk.pk {
  403. self.pm.update_pk(id, addr, rk.uuid, rk.pk);
  404. }
  405. } else {
  406. self.pm.update_pk(id, addr, rk.uuid, rk.pk);
  407. }
  408. let mut msg_out = RendezvousMessage::new();
  409. msg_out.set_register_pk_response(RegisterPkResponse {
  410. result: res.into(),
  411. ..Default::default()
  412. });
  413. socket.send(&msg_out, addr).await?
  414. }
  415. Some(rendezvous_message::Union::punch_hole_request(ph)) => {
  416. if self.pm.is_in_memory(&ph.id) {
  417. self.handle_udp_punch_hole_request(addr, ph, key).await?;
  418. } else {
  419. // not in memory, fetch from db with spawn in case blocking me
  420. let mut me = self.clone();
  421. let key = key.to_owned();
  422. tokio::spawn(async move {
  423. allow_err!(me.handle_udp_punch_hole_request(addr, ph, &key).await);
  424. });
  425. }
  426. }
  427. Some(rendezvous_message::Union::punch_hole_sent(phs)) => {
  428. self.handle_hole_sent(phs, addr, Some(socket)).await?;
  429. }
  430. Some(rendezvous_message::Union::local_addr(la)) => {
  431. self.handle_local_addr(la, addr, Some(socket)).await?;
  432. }
  433. Some(rendezvous_message::Union::configure_update(mut cu)) => {
  434. if addr.ip() == std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1))
  435. && cu.serial > self.serial
  436. {
  437. self.serial = cu.serial;
  438. self.rendezvous_servers = cu
  439. .rendezvous_servers
  440. .drain(..)
  441. .filter(|x| {
  442. !x.is_empty()
  443. && test_if_valid_server(x, "rendezvous-server").is_ok()
  444. })
  445. .collect();
  446. log::info!(
  447. "configure updated: serial={} rendezvous-servers={:?}",
  448. self.serial,
  449. self.rendezvous_servers
  450. );
  451. }
  452. }
  453. Some(rendezvous_message::Union::software_update(su)) => {
  454. if !self.version.is_empty() && su.url != self.version {
  455. let mut msg_out = RendezvousMessage::new();
  456. msg_out.set_software_update(SoftwareUpdate {
  457. url: self.software_url.clone(),
  458. ..Default::default()
  459. });
  460. socket.send(&msg_out, addr).await?;
  461. }
  462. }
  463. _ => {}
  464. }
  465. }
  466. Ok(())
  467. }
  468. #[inline]
  469. async fn update_addr(
  470. &mut self,
  471. id: String,
  472. socket_addr: SocketAddr,
  473. socket: &mut FramedSocket,
  474. ) -> ResultType<()> {
  475. let mut lock = self.pm.map.write().unwrap();
  476. let last_reg_time = Instant::now();
  477. if let Some(old) = lock.get_mut(&id) {
  478. old.socket_addr = socket_addr;
  479. old.last_reg_time = last_reg_time;
  480. let request_pk = old.pk.is_empty();
  481. drop(lock);
  482. let mut msg_out = RendezvousMessage::new();
  483. msg_out.set_register_peer_response(RegisterPeerResponse {
  484. request_pk,
  485. ..Default::default()
  486. });
  487. socket.send(&msg_out, socket_addr).await?;
  488. } else {
  489. drop(lock);
  490. let mut pm = self.pm.clone();
  491. let tx = self.tx.clone();
  492. tokio::spawn(async move {
  493. let v = pm.db.get(id.clone()).await;
  494. let (uuid, pk) = {
  495. if let Some(v) = super::SledAsync::deserialize::<PeerSerde>(&v) {
  496. (v.uuid, v.pk)
  497. } else {
  498. (Vec::new(), Vec::new())
  499. }
  500. };
  501. let mut msg_out = RendezvousMessage::new();
  502. msg_out.set_register_peer_response(RegisterPeerResponse {
  503. request_pk: pk.is_empty(),
  504. ..Default::default()
  505. });
  506. tx.send((msg_out, socket_addr)).ok();
  507. pm.map.write().unwrap().insert(
  508. id,
  509. Peer {
  510. socket_addr,
  511. last_reg_time,
  512. uuid,
  513. pk,
  514. },
  515. );
  516. });
  517. }
  518. Ok(())
  519. }
  520. #[inline]
  521. async fn handle_hole_sent<'a>(
  522. &mut self,
  523. phs: PunchHoleSent,
  524. addr: SocketAddr,
  525. socket: Option<&'a mut FramedSocket>,
  526. ) -> ResultType<()> {
  527. // punch hole sent from B, tell A that B is ready to be connected
  528. let addr_a = AddrMangle::decode(&phs.socket_addr);
  529. log::debug!(
  530. "{} punch hole response to {:?} from {:?}",
  531. if socket.is_none() { "TCP" } else { "UDP" },
  532. &addr_a,
  533. &addr
  534. );
  535. let mut msg_out = RendezvousMessage::new();
  536. let pk = match self.pm.get(&phs.id).await {
  537. Some(peer) => peer.pk,
  538. _ => Vec::new(),
  539. };
  540. let mut p = PunchHoleResponse {
  541. socket_addr: AddrMangle::encode(addr),
  542. pk,
  543. relay_server: phs.relay_server.clone(),
  544. ..Default::default()
  545. };
  546. if let Ok(t) = phs.nat_type.enum_value() {
  547. p.set_nat_type(t);
  548. }
  549. msg_out.set_punch_hole_response(p);
  550. if let Some(socket) = socket {
  551. socket.send(&msg_out, addr_a).await?;
  552. } else {
  553. self.send_to_tcp(&msg_out, addr_a).await;
  554. }
  555. Ok(())
  556. }
  557. #[inline]
  558. async fn handle_local_addr<'a>(
  559. &mut self,
  560. la: LocalAddr,
  561. addr: SocketAddr,
  562. socket: Option<&'a mut FramedSocket>,
  563. ) -> ResultType<()> {
  564. // relay local addrs of B to A
  565. let addr_a = AddrMangle::decode(&la.socket_addr);
  566. log::debug!(
  567. "{} local addrs response to {:?} from {:?}",
  568. if socket.is_none() { "TCP" } else { "UDP" },
  569. &addr_a,
  570. &addr
  571. );
  572. let mut msg_out = RendezvousMessage::new();
  573. let pk = if la.id.is_empty() {
  574. Vec::new()
  575. } else {
  576. match self.pm.get(&la.id).await {
  577. Some(peer) => peer.pk,
  578. _ => Vec::new(),
  579. }
  580. };
  581. let mut p = PunchHoleResponse {
  582. socket_addr: la.local_addr.clone(),
  583. pk,
  584. relay_server: la.relay_server,
  585. ..Default::default()
  586. };
  587. p.set_is_local(true);
  588. msg_out.set_punch_hole_response(p);
  589. if let Some(socket) = socket {
  590. socket.send(&msg_out, addr_a).await?;
  591. } else {
  592. self.send_to_tcp(&msg_out, addr_a).await;
  593. }
  594. Ok(())
  595. }
  596. #[inline]
  597. async fn handle_punch_hole_request(
  598. &mut self,
  599. addr: SocketAddr,
  600. ph: PunchHoleRequest,
  601. key: &str,
  602. ) -> ResultType<(RendezvousMessage, Option<SocketAddr>)> {
  603. if !key.is_empty() && ph.licence_key != key {
  604. let mut msg_out = RendezvousMessage::new();
  605. msg_out.set_punch_hole_response(PunchHoleResponse {
  606. failure: punch_hole_response::Failure::LICENCE_MISMATCH.into(),
  607. ..Default::default()
  608. });
  609. return Ok((msg_out, None));
  610. }
  611. let id = ph.id;
  612. // punch hole request from A, relay to B,
  613. // check if in same intranet first,
  614. // fetch local addrs if in same intranet.
  615. // because punch hole won't work if in the same intranet,
  616. // all routers will drop such self-connections.
  617. if let Some(peer) = self.pm.get(&id).await {
  618. if peer.last_reg_time.elapsed().as_millis() as i32 >= REG_TIMEOUT {
  619. let mut msg_out = RendezvousMessage::new();
  620. msg_out.set_punch_hole_response(PunchHoleResponse {
  621. failure: punch_hole_response::Failure::OFFLINE.into(),
  622. ..Default::default()
  623. });
  624. return Ok((msg_out, None));
  625. }
  626. let mut msg_out = RendezvousMessage::new();
  627. let same_intranet = match peer.socket_addr {
  628. SocketAddr::V4(a) => match addr {
  629. SocketAddr::V4(b) => a.ip() == b.ip(),
  630. _ => false,
  631. },
  632. SocketAddr::V6(a) => match addr {
  633. SocketAddr::V6(b) => a.ip() == b.ip(),
  634. _ => false,
  635. },
  636. };
  637. let socket_addr = AddrMangle::encode(addr);
  638. let relay_server = {
  639. if self.relay_servers.is_empty() {
  640. "".to_owned()
  641. } else {
  642. let i = unsafe {
  643. ROTATION_RELAY_SERVER += 1;
  644. ROTATION_RELAY_SERVER % self.relay_servers.len()
  645. };
  646. self.relay_servers[i].clone()
  647. }
  648. };
  649. if same_intranet {
  650. log::debug!(
  651. "Fetch local addr {:?} {:?} request from {:?}",
  652. id,
  653. &peer.socket_addr,
  654. &addr
  655. );
  656. msg_out.set_fetch_local_addr(FetchLocalAddr {
  657. socket_addr,
  658. relay_server,
  659. ..Default::default()
  660. });
  661. } else {
  662. log::debug!(
  663. "Punch hole {:?} {:?} request from {:?}",
  664. id,
  665. &peer.socket_addr,
  666. &addr
  667. );
  668. msg_out.set_punch_hole(PunchHole {
  669. socket_addr,
  670. nat_type: ph.nat_type,
  671. relay_server,
  672. ..Default::default()
  673. });
  674. }
  675. return Ok((msg_out, Some(peer.socket_addr)));
  676. } else {
  677. let mut msg_out = RendezvousMessage::new();
  678. msg_out.set_punch_hole_response(PunchHoleResponse {
  679. failure: punch_hole_response::Failure::ID_NOT_EXIST.into(),
  680. ..Default::default()
  681. });
  682. return Ok((msg_out, None));
  683. }
  684. }
  685. #[inline]
  686. async fn send_to_tcp(&mut self, msg: &RendezvousMessage, addr: SocketAddr) {
  687. let tcp = self.tcp_punch.lock().unwrap().remove(&addr);
  688. if let Some(mut tcp) = tcp {
  689. if let Ok(bytes) = msg.write_to_bytes() {
  690. tokio::spawn(async move {
  691. allow_err!(tcp.send(Bytes::from(bytes)).await);
  692. });
  693. }
  694. }
  695. }
  696. #[inline]
  697. async fn send_to_tcp_sync(
  698. &mut self,
  699. msg: &RendezvousMessage,
  700. addr: SocketAddr,
  701. ) -> ResultType<()> {
  702. let tcp = self.tcp_punch.lock().unwrap().remove(&addr);
  703. if let Some(mut tcp) = tcp {
  704. if let Ok(bytes) = msg.write_to_bytes() {
  705. tcp.send(Bytes::from(bytes)).await?;
  706. }
  707. }
  708. Ok(())
  709. }
  710. #[inline]
  711. async fn handle_tcp_punch_hole_request(
  712. &mut self,
  713. addr: SocketAddr,
  714. ph: PunchHoleRequest,
  715. key: &str,
  716. ) -> ResultType<()> {
  717. let (msg, to_addr) = self.handle_punch_hole_request(addr, ph, key).await?;
  718. if let Some(addr) = to_addr {
  719. self.tx.send((msg, addr))?;
  720. } else {
  721. self.send_to_tcp_sync(&msg, addr).await?;
  722. }
  723. Ok(())
  724. }
  725. #[inline]
  726. async fn handle_udp_punch_hole_request(
  727. &mut self,
  728. addr: SocketAddr,
  729. ph: PunchHoleRequest,
  730. key: &str,
  731. ) -> ResultType<()> {
  732. let (msg, to_addr) = self.handle_punch_hole_request(addr, ph, key).await?;
  733. self.tx.send((
  734. msg,
  735. match to_addr {
  736. Some(addr) => addr,
  737. None => addr,
  738. },
  739. ))?;
  740. Ok(())
  741. }
  742. }
  743. pub fn test_if_valid_server(host: &str, name: &str) -> ResultType<SocketAddr> {
  744. let res = if host.contains(":") {
  745. hbb_common::to_socket_addr(host)
  746. } else {
  747. hbb_common::to_socket_addr(&format!("{}:{}", host, 0))
  748. };
  749. if res.is_err() {
  750. log::error!("Invalid {} {}: {:?}", name, host, res);
  751. }
  752. res
  753. }