rendezvous_server.rs 24 KB


  1. use hbb_common::{
  2. allow_err,
  3. bytes::{Bytes, BytesMut},
  4. bytes_codec::BytesCodec,
  5. futures_util::{
  6. sink::SinkExt,
  7. stream::{SplitSink, StreamExt},
  8. },
  9. log,
  10. protobuf::{Message as _, MessageField},
  11. rendezvous_proto::*,
  12. tcp::{new_listener, FramedStream},
  13. timeout,
  14. tokio::{self, net::TcpStream, sync::mpsc},
  15. tokio_util::codec::Framed,
  16. udp::FramedSocket,
  17. AddrMangle, ResultType,
  18. };
  19. use serde_derive::{Deserialize, Serialize};
  20. use std::{
  21. collections::HashMap,
  22. net::SocketAddr,
  23. sync::{Arc, Mutex, RwLock},
  24. time::Instant,
  25. };
  26. #[derive(Clone, Debug)]
  27. struct Peer {
  28. socket_addr: SocketAddr,
  29. last_reg_time: Instant,
  30. uuid: Vec<u8>,
  31. pk: Vec<u8>,
  32. }
  33. impl Default for Peer {
  34. fn default() -> Self {
  35. Self {
  36. socket_addr: "0.0.0.0:0".parse().unwrap(),
  37. last_reg_time: Instant::now()
  38. .checked_sub(std::time::Duration::from_secs(3600))
  39. .unwrap(),
  40. uuid: Vec::new(),
  41. pk: Vec::new(),
  42. }
  43. }
  44. }
  45. #[derive(Debug, Serialize, Deserialize, Default)]
  46. struct PeerSerde {
  47. #[serde(default)]
  48. ip: String,
  49. #[serde(default)]
  50. uuid: Vec<u8>,
  51. #[serde(default)]
  52. pk: Vec<u8>,
  53. }
  54. #[derive(Clone)]
  55. struct PeerMap {
  56. map: Arc<RwLock<HashMap<String, Peer>>>,
  57. db: super::SledAsync,
  58. }
  59. impl PeerMap {
  60. fn new() -> ResultType<Self> {
  61. Ok(Self {
  62. map: Default::default(),
  63. db: super::SledAsync::new("./sled.db", true)?,
  64. })
  65. }
  66. #[inline]
  67. fn update_pk(&mut self, id: String, socket_addr: SocketAddr, uuid: Vec<u8>, pk: Vec<u8>) {
  68. log::info!("update_pk {} {:?} {:?} {:?}", id, socket_addr, uuid, pk);
  69. let mut lock = self.map.write().unwrap();
  70. lock.insert(
  71. id.clone(),
  72. Peer {
  73. socket_addr,
  74. last_reg_time: Instant::now(),
  75. uuid: uuid.clone(),
  76. pk: pk.clone(),
  77. },
  78. );
  79. drop(lock);
  80. let ip = socket_addr.ip().to_string();
  81. self.db.insert(id, PeerSerde { ip, uuid, pk });
  82. }
  83. #[inline]
  84. async fn get(&mut self, id: &str) -> Option<Peer> {
  85. let p = self.map.read().unwrap().get(id).map(|x| x.clone());
  86. if p.is_some() {
  87. return p;
  88. } else {
  89. let id = id.to_owned();
  90. let v = self.db.get(id.clone()).await;
  91. if let Some(v) = super::SledAsync::deserialize::<PeerSerde>(&v) {
  92. self.map.write().unwrap().insert(
  93. id,
  94. Peer {
  95. uuid: v.uuid,
  96. pk: v.pk,
  97. ..Default::default()
  98. },
  99. );
  100. return Some(Peer::default());
  101. }
  102. }
  103. None
  104. }
  105. #[inline]
  106. fn is_in_memory(&self, id: &str) -> bool {
  107. self.map.read().unwrap().contains_key(id)
  108. }
  109. }
  110. const REG_TIMEOUT: i32 = 30_000;
  111. type Sink = SplitSink<Framed<TcpStream, BytesCodec>, Bytes>;
  112. type Sender = mpsc::UnboundedSender<(RendezvousMessage, SocketAddr)>;
  113. #[derive(Clone)]
  114. pub struct RendezvousServer {
  115. tcp_punch: Arc<Mutex<HashMap<SocketAddr, Sink>>>,
  116. pm: PeerMap,
  117. tx: Sender,
  118. relay_server: String,
  119. serial: i32,
  120. rendezvous_servers: Vec<String>,
  121. version: String,
  122. software_url: String,
  123. }
  124. impl RendezvousServer {
  125. pub async fn start(
  126. addr: &str,
  127. addr2: &str,
  128. relay_server: String,
  129. serial: i32,
  130. rendezvous_servers: Vec<String>,
  131. software_url: String,
  132. ) -> ResultType<()> {
  133. let mut socket = FramedSocket::new(addr).await?;
  134. let (tx, mut rx) = mpsc::unbounded_channel::<(RendezvousMessage, SocketAddr)>();
  135. let version = hbb_common::get_version_from_url(&software_url);
  136. if !version.is_empty() {
  137. log::info!("software_url: {}, version: {}", software_url, version);
  138. }
  139. let mut rs = Self {
  140. tcp_punch: Arc::new(Mutex::new(HashMap::new())),
  141. pm: PeerMap::new()?,
  142. tx: tx.clone(),
  143. relay_server,
  144. serial,
  145. rendezvous_servers,
  146. version,
  147. software_url,
  148. };
  149. let mut listener = new_listener(addr, false).await?;
  150. let mut listener2 = new_listener(addr2, false).await?;
  151. loop {
  152. tokio::select! {
  153. Some((msg, addr)) = rx.recv() => {
  154. allow_err!(socket.send(&msg, addr).await);
  155. }
  156. Some(Ok((bytes, addr))) = socket.next() => {
  157. allow_err!(rs.handle_msg(&bytes, addr, &mut socket).await);
  158. }
  159. Ok((stream, addr)) = listener2.accept() => {
  160. let stream = FramedStream::from(stream);
  161. tokio::spawn(async move {
  162. let mut stream = stream;
  163. if let Some(Ok(bytes)) = stream.next_timeout(30_000).await {
  164. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
  165. if let Some(rendezvous_message::Union::test_nat_request(_)) = msg_in.union {
  166. let mut msg_out = RendezvousMessage::new();
  167. msg_out.set_test_nat_response(TestNatResponse {
  168. port: addr.port() as _,
  169. ..Default::default()
  170. });
  171. stream.send(&msg_out).await.ok();
  172. }
  173. }
  174. }
  175. });
  176. }
  177. Ok((stream, addr)) = listener.accept() => {
  178. log::debug!("Tcp connection from {:?}", addr);
  179. let (a, mut b) = Framed::new(stream, BytesCodec::new()).split();
  180. let tcp_punch = rs.tcp_punch.clone();
  181. let mut rs = rs.clone();
  182. tokio::spawn(async move {
  183. let mut sender = Some(a);
  184. while let Ok(Some(Ok(bytes))) = timeout(30_000, b.next()).await {
  185. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
  186. match msg_in.union {
  187. Some(rendezvous_message::Union::punch_hole_request(ph)) => {
  188. // there maybe several attempt, so sender can be none
  189. if let Some(sender) = sender.take() {
  190. tcp_punch.lock().unwrap().insert(addr, sender);
  191. }
  192. allow_err!(rs.handle_tcp_punch_hole_request(addr, ph).await);
  193. }
  194. Some(rendezvous_message::Union::request_relay(mut rf)) => {
  195. // there maybe several attempt, so sender can be none
  196. if let Some(sender) = sender.take() {
  197. tcp_punch.lock().unwrap().insert(addr, sender);
  198. }
  199. if let Some(peer) = rs.pm.map.read().unwrap().get(&rf.id).map(|x| x.clone()) {
  200. let mut msg_out = RendezvousMessage::new();
  201. rf.socket_addr = AddrMangle::encode(addr);
  202. msg_out.set_request_relay(rf);
  203. rs.tx.send((msg_out, peer.socket_addr)).ok();
  204. }
  205. }
  206. Some(rendezvous_message::Union::relay_response(mut rr)) => {
  207. let addr_b = AddrMangle::decode(&rr.socket_addr);
  208. rr.socket_addr = Default::default();
  209. let id = rr.get_id();
  210. if !id.is_empty() {
  211. if let Some(peer) = rs.pm.get(&id).await {
  212. rr.set_pk(peer.pk.clone());
  213. }
  214. }
  215. let mut msg_out = RendezvousMessage::new();
  216. msg_out.set_relay_response(rr);
  217. allow_err!(rs.send_to_tcp_sync(&msg_out, addr_b).await);
  218. break;
  219. }
  220. Some(rendezvous_message::Union::punch_hole_sent(phs)) => {
  221. allow_err!(rs.handle_hole_sent(phs, addr, None).await);
  222. break;
  223. }
  224. Some(rendezvous_message::Union::local_addr(la)) => {
  225. allow_err!(rs.handle_local_addr(la, addr, None).await);
  226. break;
  227. }
  228. Some(rendezvous_message::Union::test_nat_request(tar)) => {
  229. let mut msg_out = RendezvousMessage::new();
  230. let mut res = TestNatResponse {
  231. port: addr.port() as _,
  232. ..Default::default()
  233. }
  234. if rs.serial > tar.serial {
  235. let mut cu = ConfigUpdate::new();
  236. cu.serial = rs.serial;
  237. cu.rendezvous_servers = rs.rendezvous_servers.clone();
  238. res.cu = MessageField::from_option(Some(cu));
  239. }
  240. msg_out.set_test_nat_response(res);
  241. if let Some(tcp) = sender.as_mut() {
  242. if let Ok(bytes) = msg_out.write_to_bytes() {
  243. allow_err!(tcp.send(Bytes::from(bytes)).await);
  244. }
  245. }
  246. break;
  247. }
  248. _ => {
  249. break;
  250. }
  251. }
  252. } else {
  253. break;
  254. }
  255. }
  256. if sender.is_none() {
  257. rs.tcp_punch.lock().unwrap().remove(&addr);
  258. }
  259. log::debug!("Tcp connection from {:?} closed", addr);
  260. });
  261. }
  262. }
  263. }
  264. }
  265. #[inline]
  266. async fn handle_msg(
  267. &mut self,
  268. bytes: &BytesMut,
  269. addr: SocketAddr,
  270. socket: &mut FramedSocket,
  271. ) -> ResultType<()> {
  272. if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
  273. match msg_in.union {
  274. Some(rendezvous_message::Union::register_peer(rp)) => {
  275. // B registered
  276. if rp.id.len() > 0 {
  277. log::debug!("New peer registered: {:?} {:?}", &rp.id, &addr);
  278. self.update_addr(rp.id, addr, socket).await?;
  279. if self.serial > rp.serial {
  280. let mut msg_out = RendezvousMessage::new();
  281. msg_out.set_configure_update(ConfigUpdate {
  282. serial: self.serial,
  283. rendezvous_servers: self.rendezvous_servers.clone(),
  284. ..Default::default()
  285. });
  286. socket.send(&msg_out, addr).await?;
  287. }
  288. }
  289. }
  290. Some(rendezvous_message::Union::register_pk(rk)) => {
  291. if rk.uuid.is_empty() {
  292. return Ok(());
  293. }
  294. let id = rk.id;
  295. let mut res = register_pk_response::Result::OK;
  296. if let Some(peer) = self.pm.get(&id).await {
  297. if !peer.uuid.is_empty() && peer.uuid != rk.uuid {
  298. log::warn!(
  299. "Peer {} uuid mismatch: {:?} vs {:?}",
  300. id,
  301. rk.uuid,
  302. peer.uuid
  303. );
  304. res = register_pk_response::Result::UUID_MISMATCH;
  305. } else if peer.uuid.is_empty() || peer.pk != rk.pk {
  306. self.pm.update_pk(id, addr, rk.uuid, rk.pk);
  307. }
  308. } else {
  309. self.pm.update_pk(id, addr, rk.uuid, rk.pk);
  310. }
  311. let mut msg_out = RendezvousMessage::new();
  312. msg_out.set_register_pk_response(RegisterPkResponse {
  313. result: res.into(),
  314. ..Default::default()
  315. });
  316. socket.send(&msg_out, addr).await?
  317. }
  318. Some(rendezvous_message::Union::punch_hole_request(ph)) => {
  319. if self.pm.is_in_memory(&ph.id) {
  320. self.handle_udp_punch_hole_request(addr, ph).await?;
  321. } else {
  322. // not in memory, fetch from db with spawn in case blocking me
  323. let mut me = self.clone();
  324. tokio::spawn(async move {
  325. allow_err!(me.handle_udp_punch_hole_request(addr, ph).await);
  326. });
  327. }
  328. }
  329. Some(rendezvous_message::Union::punch_hole_sent(phs)) => {
  330. self.handle_hole_sent(phs, addr, Some(socket)).await?;
  331. }
  332. Some(rendezvous_message::Union::local_addr(la)) => {
  333. self.handle_local_addr(la, addr, Some(socket)).await?;
  334. }
  335. Some(rendezvous_message::Union::configure_update(mut cu)) => {
  336. if addr.ip() == std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1))
  337. && cu.serial > self.serial
  338. {
  339. self.serial = cu.serial;
  340. self.rendezvous_servers = cu
  341. .rendezvous_servers
  342. .drain(..)
  343. .filter(|x| test_if_valid_server(x).is_ok())
  344. .collect();
  345. log::info!(
  346. "configure updated: serial={} rendezvous-servers={:?}",
  347. self.serial,
  348. self.rendezvous_servers
  349. );
  350. }
  351. }
  352. Some(rendezvous_message::Union::software_update(su)) => {
  353. if !self.version.is_empty() && su.url != self.version {
  354. let mut msg_out = RendezvousMessage::new();
  355. msg_out.set_software_update(SoftwareUpdate {
  356. url: self.software_url.clone(),
  357. ..Default::default()
  358. });
  359. socket.send(&msg_out, addr).await?;
  360. }
  361. }
  362. _ => {}
  363. }
  364. }
  365. Ok(())
  366. }
  367. #[inline]
  368. async fn update_addr(
  369. &mut self,
  370. id: String,
  371. socket_addr: SocketAddr,
  372. socket: &mut FramedSocket,
  373. ) -> ResultType<()> {
  374. let mut lock = self.pm.map.write().unwrap();
  375. let last_reg_time = Instant::now();
  376. if let Some(old) = lock.get_mut(&id) {
  377. old.socket_addr = socket_addr;
  378. old.last_reg_time = last_reg_time;
  379. let request_pk = old.pk.is_empty();
  380. drop(lock);
  381. let mut msg_out = RendezvousMessage::new();
  382. msg_out.set_register_peer_response(RegisterPeerResponse {
  383. request_pk,
  384. ..Default::default()
  385. });
  386. socket.send(&msg_out, socket_addr).await?;
  387. } else {
  388. drop(lock);
  389. let mut pm = self.pm.clone();
  390. let tx = self.tx.clone();
  391. tokio::spawn(async move {
  392. let v = pm.db.get(id.clone()).await;
  393. let (uuid, pk) = {
  394. if let Some(v) = super::SledAsync::deserialize::<PeerSerde>(&v) {
  395. (v.uuid, v.pk)
  396. } else {
  397. (Vec::new(), Vec::new())
  398. }
  399. };
  400. let mut msg_out = RendezvousMessage::new();
  401. msg_out.set_register_peer_response(RegisterPeerResponse {
  402. request_pk: pk.is_empty(),
  403. ..Default::default()
  404. });
  405. tx.send((msg_out, socket_addr)).ok();
  406. pm.map.write().unwrap().insert(
  407. id,
  408. Peer {
  409. socket_addr,
  410. last_reg_time,
  411. uuid,
  412. pk,
  413. },
  414. );
  415. });
  416. }
  417. Ok(())
  418. }
  419. #[inline]
  420. async fn handle_hole_sent<'a>(
  421. &mut self,
  422. phs: PunchHoleSent,
  423. addr: SocketAddr,
  424. socket: Option<&'a mut FramedSocket>,
  425. ) -> ResultType<()> {
  426. // punch hole sent from B, tell A that B is ready to be connected
  427. let addr_a = AddrMangle::decode(&phs.socket_addr);
  428. log::debug!(
  429. "{} punch hole response to {:?} from {:?}",
  430. if socket.is_none() { "TCP" } else { "UDP" },
  431. &addr_a,
  432. &addr
  433. );
  434. let mut msg_out = RendezvousMessage::new();
  435. let pk = match self.pm.get(&phs.id).await {
  436. Some(peer) => peer.pk,
  437. _ => Vec::new(),
  438. };
  439. let mut p = PunchHoleResponse {
  440. socket_addr: AddrMangle::encode(addr),
  441. pk,
  442. relay_server: phs.relay_server.clone(),
  443. ..Default::default()
  444. };
  445. if let Ok(t) = phs.nat_type.enum_value() {
  446. p.set_nat_type(t);
  447. }
  448. msg_out.set_punch_hole_response(p);
  449. if let Some(socket) = socket {
  450. socket.send(&msg_out, addr_a).await?;
  451. } else {
  452. self.send_to_tcp(&msg_out, addr_a).await;
  453. }
  454. Ok(())
  455. }
  456. #[inline]
  457. async fn handle_local_addr<'a>(
  458. &mut self,
  459. la: LocalAddr,
  460. addr: SocketAddr,
  461. socket: Option<&'a mut FramedSocket>,
  462. ) -> ResultType<()> {
  463. // relay local addrs of B to A
  464. let addr_a = AddrMangle::decode(&la.socket_addr);
  465. log::debug!(
  466. "{} local addrs response to {:?} from {:?}",
  467. if socket.is_none() { "TCP" } else { "UDP" },
  468. &addr_a,
  469. &addr
  470. );
  471. let mut msg_out = RendezvousMessage::new();
  472. let mut p = PunchHoleResponse {
  473. socket_addr: la.local_addr.clone(),
  474. relay_server: la.relay_server,
  475. ..Default::default()
  476. };
  477. p.set_is_local(true);
  478. msg_out.set_punch_hole_response(p);
  479. if let Some(socket) = socket {
  480. socket.send(&msg_out, addr_a).await?;
  481. } else {
  482. self.send_to_tcp(&msg_out, addr_a).await;
  483. }
  484. Ok(())
  485. }
  486. #[inline]
  487. async fn handle_punch_hole_request(
  488. &mut self,
  489. addr: SocketAddr,
  490. ph: PunchHoleRequest,
  491. ) -> ResultType<(RendezvousMessage, Option<SocketAddr>)> {
  492. let id = ph.id;
  493. // punch hole request from A, relay to B,
  494. // check if in same intranet first,
  495. // fetch local addrs if in same intranet.
  496. // because punch hole won't work if in the same intranet,
  497. // all routers will drop such self-connections.
  498. if let Some(peer) = self.pm.get(&id).await {
  499. if peer.last_reg_time.elapsed().as_millis() as i32 >= REG_TIMEOUT {
  500. let mut msg_out = RendezvousMessage::new();
  501. msg_out.set_punch_hole_response(PunchHoleResponse {
  502. failure: punch_hole_response::Failure::OFFLINE.into(),
  503. ..Default::default()
  504. });
  505. return Ok((msg_out, None));
  506. }
  507. let mut msg_out = RendezvousMessage::new();
  508. let same_intranet = match peer.socket_addr {
  509. SocketAddr::V4(a) => match addr {
  510. SocketAddr::V4(b) => a.ip() == b.ip(),
  511. _ => false,
  512. },
  513. SocketAddr::V6(a) => match addr {
  514. SocketAddr::V6(b) => a.ip() == b.ip(),
  515. _ => false,
  516. },
  517. };
  518. let socket_addr = AddrMangle::encode(addr);
  519. if same_intranet {
  520. log::debug!(
  521. "Fetch local addr {:?} {:?} request from {:?}",
  522. id,
  523. &peer.socket_addr,
  524. &addr
  525. );
  526. msg_out.set_fetch_local_addr(FetchLocalAddr {
  527. socket_addr,
  528. relay_server: self.relay_server.clone(),
  529. ..Default::default()
  530. });
  531. } else {
  532. log::debug!(
  533. "Punch hole {:?} {:?} request from {:?}",
  534. id,
  535. &peer.socket_addr,
  536. &addr
  537. );
  538. msg_out.set_punch_hole(PunchHole {
  539. socket_addr,
  540. nat_type: ph.nat_type,
  541. relay_server: self.relay_server.clone(),
  542. ..Default::default()
  543. });
  544. }
  545. return Ok((msg_out, Some(peer.socket_addr)));
  546. } else {
  547. let mut msg_out = RendezvousMessage::new();
  548. msg_out.set_punch_hole_response(PunchHoleResponse {
  549. failure: punch_hole_response::Failure::ID_NOT_EXIST.into(),
  550. ..Default::default()
  551. });
  552. return Ok((msg_out, None));
  553. }
  554. }
  555. #[inline]
  556. async fn send_to_tcp(&mut self, msg: &RendezvousMessage, addr: SocketAddr) {
  557. let tcp = self.tcp_punch.lock().unwrap().remove(&addr);
  558. if let Some(mut tcp) = tcp {
  559. if let Ok(bytes) = msg.write_to_bytes() {
  560. tokio::spawn(async move {
  561. allow_err!(tcp.send(Bytes::from(bytes)).await);
  562. });
  563. }
  564. }
  565. }
  566. #[inline]
  567. async fn send_to_tcp_sync(
  568. &mut self,
  569. msg: &RendezvousMessage,
  570. addr: SocketAddr,
  571. ) -> ResultType<()> {
  572. let tcp = self.tcp_punch.lock().unwrap().remove(&addr);
  573. if let Some(mut tcp) = tcp {
  574. if let Ok(bytes) = msg.write_to_bytes() {
  575. tcp.send(Bytes::from(bytes)).await?;
  576. }
  577. }
  578. Ok(())
  579. }
  580. #[inline]
  581. async fn handle_tcp_punch_hole_request(
  582. &mut self,
  583. addr: SocketAddr,
  584. ph: PunchHoleRequest,
  585. ) -> ResultType<()> {
  586. let (msg, to_addr) = self.handle_punch_hole_request(addr, ph).await?;
  587. if let Some(addr) = to_addr {
  588. self.tx.send((msg, addr))?;
  589. } else {
  590. self.send_to_tcp_sync(&msg, addr).await?;
  591. }
  592. Ok(())
  593. }
  594. #[inline]
  595. async fn handle_udp_punch_hole_request(
  596. &mut self,
  597. addr: SocketAddr,
  598. ph: PunchHoleRequest,
  599. ) -> ResultType<()> {
  600. let (msg, to_addr) = self.handle_punch_hole_request(addr, ph).await?;
  601. self.tx.send((
  602. msg,
  603. match to_addr {
  604. Some(addr) => addr,
  605. None => addr,
  606. },
  607. ))?;
  608. Ok(())
  609. }
  610. }
  611. pub fn test_if_valid_server(host: &str) -> ResultType<SocketAddr> {
  612. if host.contains(":") {
  613. hbb_common::to_socket_addr(host)
  614. } else {
  615. hbb_common::to_socket_addr(&format!("{}:{}", host, 0))
  616. }
  617. }