Browse Source

Merge pull request #76 from fufesou/peer_online_state

peer_online_state: serve online state
RustDesk 3 years ago
parent
commit
baecd45f27
3 changed files with 64 additions and 33 deletions
  1. 1 0
      .gitignore
  2. 11 0
      libs/hbb_common/protos/rendezvous.proto
  3. 52 33
      src/rendezvous_server.rs

+ 1 - 0
.gitignore

@@ -5,3 +5,4 @@ debian-build
5
 debian/.debhelper
5
 debian/.debhelper
6
 debian/debhelper-build-stamp
6
 debian/debhelper-build-stamp
7
 .DS_Store
7
 .DS_Store
8
+.vscode

+ 11 - 0
libs/hbb_common/protos/rendezvous.proto

@@ -148,6 +148,15 @@ message PeerDiscovery {
148
   string misc = 7;
148
   string misc = 7;
149
 }
149
 }
150
 
150
 
151
+message OnlineRequest {
152
+  string id = 1;
153
+  repeated string peers = 2;
154
+}
155
+
156
+message OnlineResponse {
157
+  bytes states = 1;
158
+}
159
+
151
 message RendezvousMessage {
160
 message RendezvousMessage {
152
   oneof union {
161
   oneof union {
153
     RegisterPeer register_peer = 6;
162
     RegisterPeer register_peer = 6;
@@ -167,5 +176,7 @@ message RendezvousMessage {
167
     TestNatRequest test_nat_request = 20;
176
     TestNatRequest test_nat_request = 20;
168
     TestNatResponse test_nat_response = 21;
177
     TestNatResponse test_nat_response = 21;
169
     PeerDiscovery peer_discovery = 22;
178
     PeerDiscovery peer_discovery = 22;
179
+    OnlineRequest online_request = 23;
180
+    OnlineResponse online_response = 24;
170
   }
181
   }
171
 }
182
 }

+ 52 - 33
src/rendezvous_server.rs

@@ -89,12 +89,7 @@ enum LoopFailure {
89
 
89
 
90
 impl RendezvousServer {
90
 impl RendezvousServer {
91
     #[tokio::main(flavor = "multi_thread")]
91
     #[tokio::main(flavor = "multi_thread")]
92
-    pub async fn start(
93
-        port: i32,
94
-        serial: i32,
95
-        key: &str,
96
-        rmem: usize,
97
-    ) -> ResultType<()> {
92
+    pub async fn start(port: i32, serial: i32, key: &str, rmem: usize) -> ResultType<()> {
98
         let (key, sk) = Self::get_server_sk(key);
93
         let (key, sk) = Self::get_server_sk(key);
99
         let addr = format!("0.0.0.0:{}", port);
94
         let addr = format!("0.0.0.0:{}", port);
100
         let addr2 = format!("0.0.0.0:{}", port - 1);
95
         let addr2 = format!("0.0.0.0:{}", port - 1);
@@ -756,6 +751,39 @@ impl RendezvousServer {
756
         }
751
         }
757
     }
752
     }
758
 
753
 
754
+    #[inline]
755
+    async fn handle_online_request(
756
+        &mut self,
757
+        stream: &mut FramedStream,
758
+        peers: Vec<String>,
759
+    ) -> ResultType<()> {
760
+        let mut states = BytesMut::zeroed((peers.len() + 7) / 8);
761
+        for i in 0..peers.len() {
762
+            let peer_id = &peers[i];
763
+            // bytes index from left to right
764
+            let states_idx = i / 8;
765
+            let bit_idx = 7 - i % 8;
766
+            if let Some(peer) = self.pm.get_in_memory(&peer_id).await {
767
+                let (elapsed, _) = {
768
+                    let r = peer.read().await;
769
+                    (r.last_reg_time.elapsed().as_millis() as i32, r.socket_addr)
770
+                };
771
+                if elapsed < REG_TIMEOUT {
772
+                    states[states_idx] |= 0x01 << bit_idx;
773
+                }
774
+            }
775
+        }
776
+
777
+        let mut msg_out = RendezvousMessage::new();
778
+        msg_out.set_online_response(OnlineResponse {
779
+            states: states.into(),
780
+            ..Default::default()
781
+        });
782
+        stream.send(&msg_out).await?;
783
+
784
+        Ok(())
785
+    }
786
+
759
     #[inline]
787
     #[inline]
760
     async fn send_to_tcp(&mut self, msg: RendezvousMessage, addr: SocketAddr) {
788
     async fn send_to_tcp(&mut self, msg: RendezvousMessage, addr: SocketAddr) {
761
         let mut tcp = self.tcp_punch.lock().await.remove(&addr);
789
         let mut tcp = self.tcp_punch.lock().await.remove(&addr);
@@ -1014,8 +1042,8 @@ impl RendezvousServer {
1014
     }
1042
     }
1015
 
1043
 
1016
     async fn handle_listener2(&self, stream: TcpStream, addr: SocketAddr) {
1044
     async fn handle_listener2(&self, stream: TcpStream, addr: SocketAddr) {
1045
+        let mut rs = self.clone();
1017
         if addr.ip().to_string() == "127.0.0.1" {
1046
         if addr.ip().to_string() == "127.0.0.1" {
1018
-            let rs = self.clone();
1019
             tokio::spawn(async move {
1047
             tokio::spawn(async move {
1020
                 let mut stream = stream;
1048
                 let mut stream = stream;
1021
                 let mut buffer = [0; 64];
1049
                 let mut buffer = [0; 64];
@@ -1033,34 +1061,31 @@ impl RendezvousServer {
1033
             let mut stream = stream;
1061
             let mut stream = stream;
1034
             if let Some(Ok(bytes)) = stream.next_timeout(30_000).await {
1062
             if let Some(Ok(bytes)) = stream.next_timeout(30_000).await {
1035
                 if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
1063
                 if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
1036
-                    if let Some(rendezvous_message::Union::TestNatRequest(_)) = msg_in.union {
1037
-                        let mut msg_out = RendezvousMessage::new();
1038
-                        msg_out.set_test_nat_response(TestNatResponse {
1039
-                            port: addr.port() as _,
1040
-                            ..Default::default()
1041
-                        });
1042
-                        stream.send(&msg_out).await.ok();
1064
+                    match msg_in.union {
1065
+                        Some(rendezvous_message::Union::TestNatRequest(_)) => {
1066
+                            let mut msg_out = RendezvousMessage::new();
1067
+                            msg_out.set_test_nat_response(TestNatResponse {
1068
+                                port: addr.port() as _,
1069
+                                ..Default::default()
1070
+                            });
1071
+                            stream.send(&msg_out).await.ok();
1072
+                        }
1073
+                        Some(rendezvous_message::Union::OnlineRequest(or)) => {
1074
+                            allow_err!(rs.handle_online_request(&mut stream, or.peers).await);
1075
+                        }
1076
+                        _ => {}
1043
                     }
1077
                     }
1044
                 }
1078
                 }
1045
             }
1079
             }
1046
         });
1080
         });
1047
     }
1081
     }
1048
 
1082
 
1049
-    async fn handle_listener(
1050
-        &self,
1051
-        stream: TcpStream,
1052
-        addr: SocketAddr,
1053
-        key: &str,
1054
-        ws: bool,
1055
-    ) {
1083
+    async fn handle_listener(&self, stream: TcpStream, addr: SocketAddr, key: &str, ws: bool) {
1056
         log::debug!("Tcp connection from {:?}, ws: {}", addr, ws);
1084
         log::debug!("Tcp connection from {:?}, ws: {}", addr, ws);
1057
         let mut rs = self.clone();
1085
         let mut rs = self.clone();
1058
         let key = key.to_owned();
1086
         let key = key.to_owned();
1059
         tokio::spawn(async move {
1087
         tokio::spawn(async move {
1060
-            allow_err!(
1061
-                rs.handle_listener_inner(stream, addr, &key, ws)
1062
-                    .await
1063
-            );
1088
+            allow_err!(rs.handle_listener_inner(stream, addr, &key, ws).await);
1064
         });
1089
         });
1065
     }
1090
     }
1066
 
1091
 
@@ -1080,10 +1105,7 @@ impl RendezvousServer {
1080
             while let Ok(Some(Ok(msg))) = timeout(30_000, b.next()).await {
1105
             while let Ok(Some(Ok(msg))) = timeout(30_000, b.next()).await {
1081
                 match msg {
1106
                 match msg {
1082
                     tungstenite::Message::Binary(bytes) => {
1107
                     tungstenite::Message::Binary(bytes) => {
1083
-                        if !self
1084
-                            .handle_tcp(&bytes, &mut sink, addr, key, ws)
1085
-                            .await
1086
-                        {
1108
+                        if !self.handle_tcp(&bytes, &mut sink, addr, key, ws).await {
1087
                             break;
1109
                             break;
1088
                         }
1110
                         }
1089
                     }
1111
                     }
@@ -1094,10 +1116,7 @@ impl RendezvousServer {
1094
             let (a, mut b) = Framed::new(stream, BytesCodec::new()).split();
1116
             let (a, mut b) = Framed::new(stream, BytesCodec::new()).split();
1095
             sink = Some(Sink::TcpStream(a));
1117
             sink = Some(Sink::TcpStream(a));
1096
             while let Ok(Some(Ok(bytes))) = timeout(30_000, b.next()).await {
1118
             while let Ok(Some(Ok(bytes))) = timeout(30_000, b.next()).await {
1097
-                if !self
1098
-                    .handle_tcp(&bytes, &mut sink, addr, key, ws)
1099
-                    .await
1100
-                {
1119
+                if !self.handle_tcp(&bytes, &mut sink, addr, key, ws).await {
1101
                     break;
1120
                     break;
1102
                 }
1121
                 }
1103
             }
1122
             }