open-trade лет назад: 4
Родитель
Сommit
5f1259e131
2 измененных файлов с 85 добавлено и 81 удалено
  1. 4 81
      src/hbbr/main.rs
  2. 81 0
      src/hbbr/relay_server.rs

+ 4 - 81
src/hbbr/main.rs

@@ -1,22 +1,7 @@
1
 use clap::App;
1
 use clap::App;
2
-use hbb_common::{
3
-    env_logger::*,
4
-    log,
5
-    protobuf::Message as _,
6
-    rendezvous_proto::*,
7
-    sleep,
8
-    tcp::{new_listener, FramedStream},
9
-    tokio, ResultType,
10
-};
11
-use std::{
12
-    collections::HashMap,
13
-    net::SocketAddr,
14
-    sync::{Arc, Mutex},
15
-};
16
-
17
-lazy_static::lazy_static! {
18
-    static ref PEERS: Arc<Mutex<HashMap<String, FramedStream>>> = Arc::new(Mutex::new(HashMap::new()));
19
-}
2
+mod relay_server;
3
+use hbb_common::{env_logger::*, tokio, ResultType};
4
+use relay_server::start;
20
 
5
 
21
 const DEFAULT_PORT: &'static str = "21117";
6
 const DEFAULT_PORT: &'static str = "21117";
22
 
7
 
@@ -33,68 +18,6 @@ async fn main() -> ResultType<()> {
33
         .about("RustDesk Relay Server")
18
         .about("RustDesk Relay Server")
34
         .args_from_usage(&args)
19
         .args_from_usage(&args)
35
         .get_matches();
20
         .get_matches();
36
-    let addr = format!(
37
-        "0.0.0.0:{}",
38
-        matches.value_of("port").unwrap_or(DEFAULT_PORT)
39
-    );
40
-    log::info!("Listening on {}", addr);
41
-    let mut listener = new_listener(addr, false).await?;
42
-    loop {
43
-        tokio::select! {
44
-            Ok((stream, addr)) = listener.accept() => {
45
-                tokio::spawn(async move {
46
-                    make_pair(FramedStream::from(stream), addr).await.ok();
47
-                });
48
-            }
49
-        }
50
-    }
51
-}
52
-
53
-async fn make_pair(stream: FramedStream, addr: SocketAddr) -> ResultType<()> {
54
-    let mut stream = stream;
55
-    if let Some(Ok(bytes)) = stream.next_timeout(30_000).await {
56
-        if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
57
-            if let Some(rendezvous_message::Union::request_relay(rf)) = msg_in.union {
58
-                if !rf.uuid.is_empty() {
59
-                    let peer = PEERS.lock().unwrap().remove(&rf.uuid);
60
-                    if let Some(peer) = peer {
61
-                        log::info!("Forward request {} from {} got paired", rf.uuid, addr);
62
-                        return relay(stream, peer).await;
63
-                    } else {
64
-                        log::info!("New relay request {} from {}", rf.uuid, addr);
65
-                        PEERS.lock().unwrap().insert(rf.uuid.clone(), stream);
66
-                        sleep(30.).await;
67
-                        PEERS.lock().unwrap().remove(&rf.uuid);
68
-                    }
69
-                }
70
-            }
71
-        }
72
-    }
73
-    Ok(())
74
-}
75
-
76
-async fn relay(stream: FramedStream, peer: FramedStream) -> ResultType<()> {
77
-    let mut peer = peer;
78
-    let mut stream = stream;
79
-    peer.set_raw();
80
-    stream.set_raw();
81
-    loop {
82
-        tokio::select! {
83
-            res = peer.next() => {
84
-                if let Some(Ok(bytes)) = res {
85
-                    stream.send_bytes(bytes.into()).await?;
86
-                } else {
87
-                    break;
88
-                }
89
-            },
90
-            res = stream.next() => {
91
-                if let Some(Ok(bytes)) = res {
92
-                    peer.send_bytes(bytes.into()).await?;
93
-                } else {
94
-                    break;
95
-                }
96
-            },
97
-        }
98
-    }
21
+    start(matches.value_of("port").unwrap_or(DEFAULT_PORT)).await?;
99
     Ok(())
22
     Ok(())
100
 }
23
 }

+ 81 - 0
src/hbbr/relay_server.rs

@@ -0,0 +1,81 @@
1
+use hbb_common::{
2
+    log,
3
+    protobuf::Message as _,
4
+    rendezvous_proto::*,
5
+    sleep,
6
+    tcp::{new_listener, FramedStream},
7
+    tokio, ResultType,
8
+};
9
+use std::{
10
+    collections::HashMap,
11
+    net::SocketAddr,
12
+    sync::{Arc, Mutex},
13
+};
14
+
15
+lazy_static::lazy_static! {
16
+    static ref PEERS: Arc<Mutex<HashMap<String, FramedStream>>> = Arc::new(Mutex::new(HashMap::new()));
17
+}
18
+
19
+pub async fn start(port: &str) -> ResultType<()> {
20
+    let addr = format!("0.0.0.0:{}", port);
21
+    log::info!("Listening on {}", addr);
22
+    let mut listener = new_listener(addr, false).await?;
23
+    loop {
24
+        tokio::select! {
25
+            Ok((stream, addr)) = listener.accept() => {
26
+                tokio::spawn(async move {
27
+                    make_pair(FramedStream::from(stream), addr).await.ok();
28
+                });
29
+            }
30
+        }
31
+    }
32
+}
33
+
34
+async fn make_pair(stream: FramedStream, addr: SocketAddr) -> ResultType<()> {
35
+    let mut stream = stream;
36
+    if let Some(Ok(bytes)) = stream.next_timeout(30_000).await {
37
+        if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
38
+            if let Some(rendezvous_message::Union::request_relay(rf)) = msg_in.union {
39
+                if !rf.uuid.is_empty() {
40
+                    let peer = PEERS.lock().unwrap().remove(&rf.uuid);
41
+                    if let Some(peer) = peer {
42
+                        log::info!("Forward request {} from {} got paired", rf.uuid, addr);
43
+                        return relay(stream, peer).await;
44
+                    } else {
45
+                        log::info!("New relay request {} from {}", rf.uuid, addr);
46
+                        PEERS.lock().unwrap().insert(rf.uuid.clone(), stream);
47
+                        sleep(30.).await;
48
+                        PEERS.lock().unwrap().remove(&rf.uuid);
49
+                    }
50
+                }
51
+            }
52
+        }
53
+    }
54
+    Ok(())
55
+}
56
+
57
+async fn relay(stream: FramedStream, peer: FramedStream) -> ResultType<()> {
58
+    let mut peer = peer;
59
+    let mut stream = stream;
60
+    peer.set_raw();
61
+    stream.set_raw();
62
+    loop {
63
+        tokio::select! {
64
+            res = peer.next() => {
65
+                if let Some(Ok(bytes)) = res {
66
+                    stream.send_bytes(bytes.into()).await?;
67
+                } else {
68
+                    break;
69
+                }
70
+            },
71
+            res = stream.next() => {
72
+                if let Some(Ok(bytes)) = res {
73
+                    peer.send_bytes(bytes.into()).await?;
74
+                } else {
75
+                    break;
76
+                }
77
+            },
78
+        }
79
+    }
80
+    Ok(())
81
+}