lejianwen месяцев назад: 5
Родитель
Сommit
b5e0484d03
1 измененных файлов с 27 добавлено и 54 удалено
  1. 27 54
      src/rendezvous_server.rs

+ 27 - 54
src/rendezvous_server.rs

@@ -39,6 +39,8 @@ use hbb_common::{
39
 };
39
 };
40
 use ipnetwork::Ipv4Network;
40
 use ipnetwork::Ipv4Network;
41
 
41
 
42
+use crate::jwt;
43
+use std::io::Error;
42
 use std::{
44
 use std::{
43
     collections::HashMap,
45
     collections::HashMap,
44
     net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
46
     net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
@@ -46,7 +48,6 @@ use std::{
46
     sync::Arc,
48
     sync::Arc,
47
     time::Instant,
49
     time::Instant,
48
 };
50
 };
49
-use crate::jwt;
50
 
51
 
51
 #[derive(Clone, Debug)]
52
 #[derive(Clone, Debug)]
52
 enum Data {
53
 enum Data {
@@ -62,6 +63,7 @@ struct SafeWsSink {
62
     sink: WsSink,
63
     sink: WsSink,
63
     encrypt: Option<Encrypt>,
64
     encrypt: Option<Encrypt>,
64
 }
65
 }
66
+
65
 struct SafeTcpStreamSink {
67
 struct SafeTcpStreamSink {
66
     sink: TcpStreamSink,
68
     sink: TcpStreamSink,
67
     encrypt: Option<Encrypt>,
69
     encrypt: Option<Encrypt>,
@@ -124,7 +126,7 @@ pub struct RendezvousServer {
124
     relay_servers0: Arc<RelayServers>,
126
     relay_servers0: Arc<RelayServers>,
125
     rendezvous_servers: Arc<Vec<String>>,
127
     rendezvous_servers: Arc<Vec<String>>,
126
     inner: Arc<Inner>,
128
     inner: Arc<Inner>,
127
-    ws_map: Arc<Mutex<HashMap<SocketAddr, Sink>>>
129
+    ws_map: Arc<Mutex<HashMap<SocketAddr, Sink>>>,
128
 }
130
 }
129
 
131
 
130
 enum LoopFailure {
132
 enum LoopFailure {
@@ -183,7 +185,7 @@ impl RendezvousServer {
183
                 secure_tcp_pk_b,
185
                 secure_tcp_pk_b,
184
                 secure_tcp_sk_b,
186
                 secure_tcp_sk_b,
185
             }),
187
             }),
186
-            ws_map: Arc::new(Mutex::new(HashMap::new()))
188
+            ws_map: Arc::new(Mutex::new(HashMap::new())),
187
         };
189
         };
188
         log::info!("mask: {:?}", rs.inner.mask);
190
         log::info!("mask: {:?}", rs.inner.mask);
189
         log::info!("local-ip: {:?}", rs.inner.local_ip);
191
         log::info!("local-ip: {:?}", rs.inner.local_ip);
@@ -211,11 +213,13 @@ impl RendezvousServer {
211
 
213
 
212
         let must_login = get_arg("must-login");
214
         let must_login = get_arg("must-login");
213
         log::debug!("must_login={}", must_login);
215
         log::debug!("must_login={}", must_login);
214
-        if must_login.to_uppercase() == "Y" ||
215
-            (must_login == "" && std::env::var("MUST_LOGIN")
216
-            .unwrap_or_default()
217
-            .to_uppercase()
218
-            == "Y") {
216
+        if must_login.to_uppercase() == "Y"
217
+            || (must_login == ""
218
+                && std::env::var("MUST_LOGIN")
219
+                    .unwrap_or_default()
220
+                    .to_uppercase()
221
+                    == "Y")
222
+        {
219
             MUST_LOGIN.store(true, Ordering::SeqCst);
223
             MUST_LOGIN.store(true, Ordering::SeqCst);
220
         }
224
         }
221
 
225
 
@@ -498,7 +502,7 @@ impl RendezvousServer {
498
                     // B registered
502
                     // B registered
499
                     if !rp.id.is_empty() {
503
                     if !rp.id.is_empty() {
500
                         log::trace!("New peer registered: {:?} {:?}", &rp.id, &addr);
504
                         log::trace!("New peer registered: {:?} {:?}", &rp.id, &addr);
501
-                        let request_pk =  self.update_addr(rp.id, addr).await;
505
+                        let request_pk = self.update_addr(rp.id, addr).await;
502
                         let mut msg_out = RendezvousMessage::new();
506
                         let mut msg_out = RendezvousMessage::new();
503
                         msg_out.set_register_peer_response(RegisterPeerResponse {
507
                         msg_out.set_register_peer_response(RegisterPeerResponse {
504
                             request_pk,
508
                             request_pk,
@@ -772,11 +776,7 @@ impl RendezvousServer {
772
     }
776
     }
773
 
777
 
774
     #[inline]
778
     #[inline]
775
-    async fn update_addr(
776
-        &mut self,
777
-        id: String,
778
-        socket_addr: SocketAddr,
779
-    ) -> bool {
779
+    async fn update_addr(&mut self, id: String, socket_addr: SocketAddr) -> bool {
780
         let (request_pk, ip_change) = if let Some(old) = self.pm.get_in_memory(&id).await {
780
         let (request_pk, ip_change) = if let Some(old) = self.pm.get_in_memory(&id).await {
781
             let mut old = old.write().await;
781
             let mut old = old.write().await;
782
             let ip = socket_addr.ip();
782
             let ip = socket_addr.ip();
@@ -888,7 +888,7 @@ impl RendezvousServer {
888
         ph: PunchHoleRequest,
888
         ph: PunchHoleRequest,
889
         key: &str,
889
         key: &str,
890
         ws: bool,
890
         ws: bool,
891
-    ) -> ResultType<(RendezvousMessage, Option<(SocketAddr)>)> {
891
+    ) -> ResultType<(RendezvousMessage, Option<SocketAddr>)> {
892
         let mut ph = ph;
892
         let mut ph = ph;
893
         if !key.is_empty() && ph.licence_key != key {
893
         if !key.is_empty() && ph.licence_key != key {
894
             let mut msg_out = RendezvousMessage::new();
894
             let mut msg_out = RendezvousMessage::new();
@@ -987,7 +987,7 @@ impl RendezvousServer {
987
                 });
987
                 });
988
             }
988
             }
989
             //
989
             //
990
-            Ok((msg_out, Some((peer_addr))))
990
+            Ok((msg_out, Some(peer_addr)))
991
         } else {
991
         } else {
992
             let mut msg_out = RendezvousMessage::new();
992
             let mut msg_out = RendezvousMessage::new();
993
             msg_out.set_punch_hole_response(PunchHoleResponse {
993
             msg_out.set_punch_hole_response(PunchHoleResponse {
@@ -1064,38 +1064,6 @@ impl RendezvousServer {
1064
         Ok(())
1064
         Ok(())
1065
     }
1065
     }
1066
 
1066
 
1067
-    /**
1068
-    async fn handle_tcp_punch_hole_request(
1069
-        &mut self,
1070
-        addr: SocketAddr,
1071
-        ph: PunchHoleRequest,
1072
-        key: &str,
1073
-        ws: bool,
1074
-    ) -> ResultType<()> {
1075
-        let (msg, to_addr) = self.handle_punch_hole_request(addr, ph, key, ws).await?;
1076
-        log::debug!(
1077
-            "handle_tcp_punch_hole_request: {:?} to {:?}",
1078
-            &msg, &to_addr
1079
-        );
1080
-        //
1081
-
1082
-        if let Some(addr) = to_addr {
1083
-            // self.send_to_tcp_sync(msg, addr).await?;
1084
-            //这里无法判断目标addr是否是ws,请求无法发送到目标peer
1085
-            if self.tcp_punch.lock().await.contains_key(&try_into_v4(addr)) {
1086
-                self.send_to_tcp_sync(msg, addr).await?; //这里如果client使用ws,可以,但如果不用ws就必须用下面的,怎么判断呢
1087
-                //这样判断还是不行,如果client从ws切换回udp,tcp_punch中还是有残留,会导致发送失败,应该做个新的hashmap存??
1088
-            } else {
1089
-                self.tx.send(Data::Msg(msg.into(), addr))?;
1090
-            }
1091
-            // self.tx.send(Data::Msg(msg.into(), addr))?;
1092
-
1093
-        } else {
1094
-            self.send_to_tcp_sync(msg, addr).await?;
1095
-        }
1096
-        Ok(())
1097
-    }**/
1098
-
1099
     #[inline]
1067
     #[inline]
1100
     async fn handle_udp_punch_hole_request(
1068
     async fn handle_udp_punch_hole_request(
1101
         &mut self,
1069
         &mut self,
@@ -1310,11 +1278,7 @@ impl RendezvousServer {
1310
                         MUST_LOGIN.store(false, Ordering::SeqCst);
1278
                         MUST_LOGIN.store(false, Ordering::SeqCst);
1311
                     }
1279
                     }
1312
                 } else {
1280
                 } else {
1313
-                    let _ = writeln!(
1314
-                        res,
1315
-                        "MUST_LOGIN: {:?}",
1316
-                        MUST_LOGIN.load(Ordering::SeqCst)
1317
-                    );
1281
+                    let _ = writeln!(res, "MUST_LOGIN: {:?}", MUST_LOGIN.load(Ordering::SeqCst));
1318
                 }
1282
                 }
1319
             }
1283
             }
1320
             _ => {}
1284
             _ => {}
@@ -1420,7 +1384,16 @@ impl RendezvousServer {
1420
             if !key.is_empty() {
1384
             if !key.is_empty() {
1421
                 self.key_exchange_phase1(addr, &mut sink).await;
1385
                 self.key_exchange_phase1(addr, &mut sink).await;
1422
             }
1386
             }
1423
-            while let Ok(Some(Ok(bytes))) = timeout(30_000, b.next()).await {
1387
+            while let Ok(Some(Ok(mut bytes))) = timeout(30_000, b.next()).await {
1388
+                // log::debug!("receive tcp data from {:?} {:?}", addr, bytes);
1389
+                if let Some(Sink::Tss(s)) = sink.as_mut() {
1390
+                    if let Some(key) = s.encrypt.as_mut() {
1391
+                        if let Err(err) = key.dec(&mut bytes) {
1392
+                            log::error!("dec tcp data from {:?} err: {:?}", addr, err);
1393
+                            break;
1394
+                        }
1395
+                    }
1396
+                }
1424
                 if !self.handle_tcp(&bytes, &mut sink, addr, key, ws).await {
1397
                 if !self.handle_tcp(&bytes, &mut sink, addr, key, ws).await {
1425
                     break;
1398
                     break;
1426
                 }
1399
                 }