rendezvous_server.rs 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613
  1. use hbb_common::{
  2. allow_err,
  3. bytes::{Bytes, BytesMut},
  4. bytes_codec::BytesCodec,
  5. futures_util::{
  6. sink::SinkExt,
  7. stream::{SplitSink, StreamExt},
  8. },
  9. log,
  10. protobuf::Message as _,
  11. rendezvous_proto::*,
  12. tcp::new_listener,
  13. timeout,
  14. tokio::{self, net::TcpStream, sync::mpsc},
  15. tokio_util::codec::Framed,
  16. udp::FramedSocket,
  17. AddrMangle, ResultType,
  18. };
  19. use serde_derive::{Deserialize, Serialize};
  20. use std::{
  21. collections::HashMap,
  22. net::SocketAddr,
  23. sync::{Arc, Mutex, RwLock},
  24. time::Instant,
  25. };
  26. #[derive(Clone, Debug)]
  27. struct Peer {
  28. socket_addr: SocketAddr,
  29. last_reg_time: Instant,
  30. uuid: Vec<u8>,
  31. pk: Vec<u8>,
  32. }
  33. impl Default for Peer {
  34. fn default() -> Self {
  35. Self {
  36. socket_addr: "0.0.0.0:0".parse().unwrap(),
  37. last_reg_time: Instant::now()
  38. .checked_sub(std::time::Duration::from_secs(3600))
  39. .unwrap(),
  40. uuid: Vec::new(),
  41. pk: Vec::new(),
  42. }
  43. }
  44. }
  45. #[derive(Debug, Serialize, Deserialize, Default)]
  46. struct PeerSerde {
  47. #[serde(default)]
  48. ip: String,
  49. #[serde(default)]
  50. uuid: Vec<u8>,
  51. #[serde(default)]
  52. pk: Vec<u8>,
  53. }
  54. #[derive(Clone)]
  55. struct PeerMap {
  56. map: Arc<RwLock<HashMap<String, Peer>>>,
  57. db: super::SledAsync,
  58. }
  59. impl PeerMap {
  60. fn new() -> ResultType<Self> {
  61. Ok(Self {
  62. map: Default::default(),
  63. db: super::SledAsync::new("./sled.db", true)?,
  64. })
  65. }
  66. #[inline]
  67. fn update_pk(&mut self, id: String, socket_addr: SocketAddr, uuid: Vec<u8>, pk: Vec<u8>) {
  68. log::info!("update_pk {} {:?} {:?} {:?}", id, socket_addr, uuid, pk);
  69. let mut lock = self.map.write().unwrap();
  70. lock.insert(
  71. id.clone(),
  72. Peer {
  73. socket_addr,
  74. last_reg_time: Instant::now(),
  75. uuid: uuid.clone(),
  76. pk: pk.clone(),
  77. },
  78. );
  79. drop(lock);
  80. let ip = socket_addr.ip().to_string();
  81. self.db.insert(id, PeerSerde { ip, uuid, pk });
  82. }
  83. #[inline]
  84. async fn get(&mut self, id: &str) -> Option<Peer> {
  85. let p = self.map.read().unwrap().get(id).map(|x| x.clone());
  86. if p.is_some() {
  87. return p;
  88. } else {
  89. let id = id.to_owned();
  90. let v = self.db.get(id.clone()).await;
  91. if let Some(v) = super::SledAsync::deserialize::<PeerSerde>(&v) {
  92. self.map.write().unwrap().insert(
  93. id,
  94. Peer {
  95. uuid: v.uuid,
  96. pk: v.pk,
  97. ..Default::default()
  98. },
  99. );
  100. return Some(Peer::default());
  101. }
  102. }
  103. None
  104. }
  105. #[inline]
  106. fn is_in_memory(&self, id: &str) -> bool {
  107. self.map.read().unwrap().contains_key(id)
  108. }
  109. }
  110. const REG_TIMEOUT: i32 = 30_000;
  111. type Sink = SplitSink<Framed<TcpStream, BytesCodec>, Bytes>;
  112. type Sender = mpsc::UnboundedSender<(RendezvousMessage, SocketAddr)>;
  113. #[derive(Clone)]
  114. pub struct RendezvousServer {
  115. tcp_punch: Arc<Mutex<HashMap<SocketAddr, Sink>>>,
  116. pm: PeerMap,
  117. tx: Sender,
  118. relay_server: String,
  119. serial: i32,
  120. rendezvous_servers: Vec<String>,
  121. version: String,
  122. software_url: String,
  123. }
  124. impl RendezvousServer {
  125. pub async fn start(
  126. addr: &str,
  127. relay_server: String,
  128. serial: i32,
  129. rendezvous_servers: Vec<String>,
  130. software_url: String,
  131. ) -> ResultType<()> {
  132. let mut socket = FramedSocket::new(addr).await?;
  133. let (tx, mut rx) = mpsc::unbounded_channel::<(RendezvousMessage, SocketAddr)>();
  134. let version = hbb_common::get_version_from_url(&software_url);
  135. if !version.is_empty() {
  136. log::info!("software_url: {}, version: {}", software_url, version);
  137. }
  138. let mut rs = Self {
  139. tcp_punch: Arc::new(Mutex::new(HashMap::new())),
  140. pm: PeerMap::new()?,
  141. tx: tx.clone(),
  142. relay_server,
  143. serial,
  144. rendezvous_servers,
  145. version,
  146. software_url,
  147. };
  148. let mut listener = new_listener(addr, false).await?;
  149. loop {
  150. tokio::select! {
  151. Some((msg, addr)) = rx.recv() => {
  152. allow_err!(socket.send(&msg, addr).await);
  153. }
  154. Some(Ok((bytes, addr))) = socket.next() => {
  155. allow_err!(rs.handle_msg(&bytes, addr, &mut socket).await);
  156. }
  157. Ok((stream, addr)) = listener.accept() => {
  158. log::debug!("Tcp connection from {:?}", addr);
  159. let (a, mut b) = Framed::new(stream, BytesCodec::new()).split();
  160. let tcp_punch = rs.tcp_punch.clone();
  161. let mut rs = rs.clone();
  162. tokio::spawn(async move {
  163. let mut sender = Some(a);
  164. while let Ok(Some(Ok(bytes))) = timeout(30_000, b.next()).await {
  165. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
  166. match msg_in.union {
  167. Some(rendezvous_message::Union::punch_hole_request(ph)) => {
  168. if let Some(sender) = sender.take() {
  169. tcp_punch.lock().unwrap().insert(addr, sender);
  170. } else {
  171. break;
  172. }
  173. allow_err!(rs.handle_tcp_punch_hole_request(addr, ph.id).await);
  174. }
  175. Some(rendezvous_message::Union::request_relay(mut rf)) => {
  176. if let Some(sender) = sender.take() {
  177. tcp_punch.lock().unwrap().insert(addr, sender);
  178. } else {
  179. break;
  180. }
  181. if let Some(peer) = rs.pm.map.read().unwrap().get(&rf.id).map(|x| x.clone()) {
  182. let mut msg_out = RendezvousMessage::new();
  183. rf.socket_addr = AddrMangle::encode(addr);
  184. msg_out.set_request_relay(rf);
  185. rs.tx.send((msg_out, peer.socket_addr)).ok();
  186. }
  187. }
  188. Some(rendezvous_message::Union::request_relay_response(mut rfr)) => {
  189. let addr_b = AddrMangle::decode(&rfr.socket_addr);
  190. rfr.socket_addr = Default::default();
  191. let mut msg_out = RendezvousMessage::new();
  192. msg_out.set_request_relay_response(rfr);
  193. allow_err!(rs.send_to_tcp_sync(&msg_out, addr_b).await);
  194. break;
  195. }
  196. Some(rendezvous_message::Union::punch_hole_sent(phs)) => {
  197. allow_err!(rs.handle_hole_sent(phs, addr, None).await);
  198. break;
  199. }
  200. Some(rendezvous_message::Union::local_addr(la)) => {
  201. allow_err!(rs.handle_local_addr(la, addr, None).await);
  202. break;
  203. }
  204. Some(rendezvous_message::Union::test_nat_request(_)) => {
  205. let mut msg_out = RendezvousMessage::new();
  206. msg_out.set_test_nat_response(TestNatResponse {
  207. port: addr.port() as _,
  208. ..Default::default()
  209. });
  210. if let Some(tcp) = sender.as_mut() {
  211. if let Ok(bytes) = msg_out.write_to_bytes() {
  212. allow_err!(tcp.send(Bytes::from(bytes)).await);
  213. }
  214. }
  215. break;
  216. }
  217. _ => {
  218. break;
  219. }
  220. }
  221. } else {
  222. break;
  223. }
  224. }
  225. if sender.is_none() {
  226. rs.tcp_punch.lock().unwrap().remove(&addr);
  227. }
  228. log::debug!("Tcp connection from {:?} closed", addr);
  229. });
  230. }
  231. }
  232. }
  233. }
  234. #[inline]
  235. async fn handle_msg(
  236. &mut self,
  237. bytes: &BytesMut,
  238. addr: SocketAddr,
  239. socket: &mut FramedSocket,
  240. ) -> ResultType<()> {
  241. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
  242. match msg_in.union {
  243. Some(rendezvous_message::Union::register_peer(rp)) => {
  244. // B registered
  245. if rp.id.len() > 0 {
  246. log::debug!("New peer registered: {:?} {:?}", &rp.id, &addr);
  247. self.update_addr(rp.id, addr, socket).await?;
  248. if self.serial > rp.serial {
  249. let mut msg_out = RendezvousMessage::new();
  250. msg_out.set_configure_update(ConfigUpdate {
  251. serial: self.serial,
  252. rendezvous_servers: self.rendezvous_servers.clone(),
  253. ..Default::default()
  254. });
  255. socket.send(&msg_out, addr).await?;
  256. }
  257. }
  258. }
  259. Some(rendezvous_message::Union::register_pk(rk)) => {
  260. if rk.uuid.is_empty() {
  261. return Ok(());
  262. }
  263. let id = rk.id;
  264. let mut res = register_pk_response::Result::OK;
  265. if let Some(peer) = self.pm.get(&id).await {
  266. if !peer.uuid.is_empty() && peer.uuid != rk.uuid {
  267. log::warn!(
  268. "Peer {} uuid mismatch: {:?} vs {:?}",
  269. id,
  270. rk.uuid,
  271. peer.uuid
  272. );
  273. res = register_pk_response::Result::UUID_MISMATCH;
  274. } else if peer.uuid.is_empty() || peer.pk != rk.pk {
  275. self.pm.update_pk(id, addr, rk.uuid, rk.pk);
  276. }
  277. } else {
  278. self.pm.update_pk(id, addr, rk.uuid, rk.pk);
  279. }
  280. let mut msg_out = RendezvousMessage::new();
  281. msg_out.set_register_pk_response(RegisterPkResponse {
  282. result: res.into(),
  283. ..Default::default()
  284. });
  285. socket.send(&msg_out, addr).await?
  286. }
  287. Some(rendezvous_message::Union::punch_hole_request(ph)) => {
  288. let id = ph.id;
  289. if self.pm.is_in_memory(&id) {
  290. self.handle_udp_punch_hole_request(addr, id).await?;
  291. } else {
  292. // not in memory, fetch from db with spawn in case blocking me
  293. let mut me = self.clone();
  294. tokio::spawn(async move {
  295. allow_err!(me.handle_udp_punch_hole_request(addr, id).await);
  296. });
  297. }
  298. }
  299. Some(rendezvous_message::Union::punch_hole_sent(phs)) => {
  300. self.handle_hole_sent(phs, addr, Some(socket)).await?;
  301. }
  302. Some(rendezvous_message::Union::local_addr(la)) => {
  303. self.handle_local_addr(la, addr, Some(socket)).await?;
  304. }
  305. Some(rendezvous_message::Union::configure_update(mut cu)) => {
  306. if addr.ip() == std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1))
  307. && cu.serial > self.serial
  308. {
  309. self.serial = cu.serial;
  310. self.rendezvous_servers = cu
  311. .rendezvous_servers
  312. .drain(..)
  313. .filter(|x| test_if_valid_server(x).is_ok())
  314. .collect();
  315. log::info!(
  316. "configure updated: serial={} rendezvous-servers={:?}",
  317. self.serial,
  318. self.rendezvous_servers
  319. );
  320. }
  321. }
  322. Some(rendezvous_message::Union::software_update(su)) => {
  323. if !self.version.is_empty() && su.url != self.version {
  324. let mut msg_out = RendezvousMessage::new();
  325. msg_out.set_software_update(SoftwareUpdate {
  326. url: self.software_url.clone(),
  327. ..Default::default()
  328. });
  329. socket.send(&msg_out, addr).await?;
  330. }
  331. }
  332. _ => {}
  333. }
  334. }
  335. Ok(())
  336. }
  337. #[inline]
  338. async fn update_addr(
  339. &mut self,
  340. id: String,
  341. socket_addr: SocketAddr,
  342. socket: &mut FramedSocket,
  343. ) -> ResultType<()> {
  344. let mut lock = self.pm.map.write().unwrap();
  345. let last_reg_time = Instant::now();
  346. if let Some(old) = lock.get_mut(&id) {
  347. old.socket_addr = socket_addr;
  348. old.last_reg_time = last_reg_time;
  349. let request_pk = old.pk.is_empty();
  350. drop(lock);
  351. let mut msg_out = RendezvousMessage::new();
  352. msg_out.set_register_peer_response(RegisterPeerResponse {
  353. request_pk,
  354. ..Default::default()
  355. });
  356. socket.send(&msg_out, socket_addr).await?;
  357. } else {
  358. drop(lock);
  359. let mut pm = self.pm.clone();
  360. let tx = self.tx.clone();
  361. tokio::spawn(async move {
  362. let v = pm.db.get(id.clone()).await;
  363. let (uuid, pk) = {
  364. if let Some(v) = super::SledAsync::deserialize::<PeerSerde>(&v) {
  365. (v.uuid, v.pk)
  366. } else {
  367. (Vec::new(), Vec::new())
  368. }
  369. };
  370. let mut msg_out = RendezvousMessage::new();
  371. msg_out.set_register_peer_response(RegisterPeerResponse {
  372. request_pk: pk.is_empty(),
  373. ..Default::default()
  374. });
  375. tx.send((msg_out, socket_addr)).ok();
  376. pm.map.write().unwrap().insert(
  377. id,
  378. Peer {
  379. socket_addr,
  380. last_reg_time,
  381. uuid,
  382. pk,
  383. },
  384. );
  385. });
  386. }
  387. Ok(())
  388. }
  389. #[inline]
  390. async fn handle_hole_sent<'a>(
  391. &mut self,
  392. phs: PunchHoleSent,
  393. addr: SocketAddr,
  394. socket: Option<&'a mut FramedSocket>,
  395. ) -> ResultType<()> {
  396. // punch hole sent from B, tell A that B is ready to be connected
  397. let addr_a = AddrMangle::decode(&phs.socket_addr);
  398. log::debug!(
  399. "{} punch hole response to {:?} from {:?}",
  400. if socket.is_none() { "TCP" } else { "UDP" },
  401. &addr_a,
  402. &addr
  403. );
  404. let mut msg_out = RendezvousMessage::new();
  405. let pk = match self.pm.get(&phs.id).await {
  406. Some(peer) => peer.pk,
  407. _ => Vec::new(),
  408. };
  409. let mut relay_server = phs.relay_server;
  410. if relay_server.is_empty() {
  411. relay_server = self.relay_server.clone();
  412. }
  413. let mut p = PunchHoleResponse {
  414. socket_addr: AddrMangle::encode(addr),
  415. pk,
  416. relay_server,
  417. ..Default::default()
  418. };
  419. if let Ok(t) = phs.nat_type.enum_value() {
  420. p.set_nat_type(t);
  421. }
  422. msg_out.set_punch_hole_response(p);
  423. if let Some(socket) = socket {
  424. socket.send(&msg_out, addr_a).await?;
  425. } else {
  426. self.send_to_tcp(&msg_out, addr_a).await;
  427. }
  428. Ok(())
  429. }
  430. #[inline]
  431. async fn handle_local_addr<'a>(
  432. &mut self,
  433. la: LocalAddr,
  434. addr: SocketAddr,
  435. socket: Option<&'a mut FramedSocket>,
  436. ) -> ResultType<()> {
  437. // relay local addrs of B to A
  438. let addr_a = AddrMangle::decode(&la.socket_addr);
  439. log::debug!(
  440. "{} local addrs response to {:?} from {:?}",
  441. if socket.is_none() { "TCP" } else { "UDP" },
  442. &addr_a,
  443. &addr
  444. );
  445. let mut msg_out = RendezvousMessage::new();
  446. let mut relay_server = la.relay_server;
  447. if relay_server.is_empty() {
  448. relay_server = self.relay_server.clone();
  449. }
  450. let mut p = PunchHoleResponse {
  451. socket_addr: la.local_addr.clone(),
  452. relay_server,
  453. ..Default::default()
  454. };
  455. p.set_is_local(true);
  456. msg_out.set_punch_hole_response(p);
  457. if let Some(socket) = socket {
  458. socket.send(&msg_out, addr_a).await?;
  459. } else {
  460. self.send_to_tcp(&msg_out, addr_a).await;
  461. }
  462. Ok(())
  463. }
  464. #[inline]
  465. async fn handle_punch_hole_request(
  466. &mut self,
  467. addr: SocketAddr,
  468. id: String,
  469. ) -> ResultType<(RendezvousMessage, Option<SocketAddr>)> {
  470. // punch hole request from A, relay to B,
  471. // check if in same intranet first,
  472. // fetch local addrs if in same intranet.
  473. // because punch hole won't work if in the same intranet,
  474. // all routers will drop such self-connections.
  475. if let Some(peer) = self.pm.get(&id).await {
  476. if peer.last_reg_time.elapsed().as_millis() as i32 >= REG_TIMEOUT {
  477. let mut msg_out = RendezvousMessage::new();
  478. msg_out.set_punch_hole_response(PunchHoleResponse {
  479. failure: punch_hole_response::Failure::OFFLINE.into(),
  480. ..Default::default()
  481. });
  482. return Ok((msg_out, None));
  483. }
  484. let mut msg_out = RendezvousMessage::new();
  485. let same_intranet = match peer.socket_addr {
  486. SocketAddr::V4(a) => match addr {
  487. SocketAddr::V4(b) => a.ip() == b.ip(),
  488. _ => false,
  489. },
  490. SocketAddr::V6(a) => match addr {
  491. SocketAddr::V6(b) => a.ip() == b.ip(),
  492. _ => false,
  493. },
  494. };
  495. let socket_addr = AddrMangle::encode(addr);
  496. if same_intranet {
  497. log::debug!(
  498. "Fetch local addr {:?} {:?} request from {:?}",
  499. id,
  500. &peer.socket_addr,
  501. &addr
  502. );
  503. msg_out.set_fetch_local_addr(FetchLocalAddr {
  504. socket_addr,
  505. ..Default::default()
  506. });
  507. } else {
  508. log::debug!(
  509. "Punch hole {:?} {:?} request from {:?}",
  510. id,
  511. &peer.socket_addr,
  512. &addr
  513. );
  514. msg_out.set_punch_hole(PunchHole {
  515. socket_addr,
  516. ..Default::default()
  517. });
  518. }
  519. return Ok((msg_out, Some(peer.socket_addr)));
  520. } else {
  521. let mut msg_out = RendezvousMessage::new();
  522. msg_out.set_punch_hole_response(PunchHoleResponse {
  523. failure: punch_hole_response::Failure::ID_NOT_EXIST.into(),
  524. ..Default::default()
  525. });
  526. return Ok((msg_out, None));
  527. }
  528. }
  529. #[inline]
  530. async fn send_to_tcp(&mut self, msg: &RendezvousMessage, addr: SocketAddr) {
  531. let tcp = self.tcp_punch.lock().unwrap().remove(&addr);
  532. if let Some(mut tcp) = tcp {
  533. if let Ok(bytes) = msg.write_to_bytes() {
  534. tokio::spawn(async move {
  535. allow_err!(tcp.send(Bytes::from(bytes)).await);
  536. });
  537. }
  538. }
  539. }
  540. #[inline]
  541. async fn send_to_tcp_sync(
  542. &mut self,
  543. msg: &RendezvousMessage,
  544. addr: SocketAddr,
  545. ) -> ResultType<()> {
  546. let tcp = self.tcp_punch.lock().unwrap().remove(&addr);
  547. if let Some(mut tcp) = tcp {
  548. if let Ok(bytes) = msg.write_to_bytes() {
  549. tcp.send(Bytes::from(bytes)).await?;
  550. }
  551. }
  552. Ok(())
  553. }
  554. #[inline]
  555. async fn handle_tcp_punch_hole_request(
  556. &mut self,
  557. addr: SocketAddr,
  558. id: String,
  559. ) -> ResultType<()> {
  560. let (msg, to_addr) = self.handle_punch_hole_request(addr, id).await?;
  561. if let Some(addr) = to_addr {
  562. self.tx.send((msg, addr))?;
  563. } else {
  564. self.send_to_tcp_sync(&msg, addr).await?;
  565. }
  566. Ok(())
  567. }
  568. #[inline]
  569. async fn handle_udp_punch_hole_request(
  570. &mut self,
  571. addr: SocketAddr,
  572. id: String,
  573. ) -> ResultType<()> {
  574. let (msg, to_addr) = self.handle_punch_hole_request(addr, id).await?;
  575. self.tx.send((
  576. msg,
  577. match to_addr {
  578. Some(addr) => addr,
  579. None => addr,
  580. },
  581. ))?;
  582. Ok(())
  583. }
  584. }
  585. pub fn test_if_valid_server(host: &str) -> ResultType<SocketAddr> {
  586. if host.contains(":") {
  587. hbb_common::to_socket_addr(host)
  588. } else {
  589. hbb_common::to_socket_addr(&format!("{}:{}", host, 0))
  590. }
  591. }