open-trade лет назад: 5
Родитель
Сommit
378729c94d
1 измененных файлов с 100 добавлено и 0 удалено
  1. 100 0
      src/hbbr/main.rs

+ 100 - 0
src/hbbr/main.rs

@@ -0,0 +1,100 @@
1
+use clap::App;
2
+use hbb_common::{
3
+    env_logger::*,
4
+    log,
5
+    protobuf::Message as _,
6
+    rendezvous_proto::*,
7
+    sleep,
8
+    tcp::{new_listener, FramedStream},
9
+    tokio, ResultType,
10
+};
11
+use std::{
12
+    collections::HashMap,
13
+    net::SocketAddr,
14
+    sync::{Arc, Mutex},
15
+};
16
+
17
+lazy_static::lazy_static! {
18
+    static ref PEERS: Arc<Mutex<HashMap<String, FramedStream>>> = Arc::new(Mutex::new(HashMap::new()));
19
+}
20
+
21
+const DEFAULT_PORT: &'static str = "21117";
22
+
23
+#[tokio::main]
24
+async fn main() -> ResultType<()> {
25
+    init_from_env(Env::default().filter_or(DEFAULT_FILTER_ENV, "info"));
26
+    let args = format!(
27
+        "-p, --port=[NUMBER(default={})] 'Sets the listening port'",
28
+        DEFAULT_PORT
29
+    );
30
+    let matches = App::new("hbbr")
31
+        .version("1.0")
32
+        .author("Zhou Huabing <info@rustdesk.com>")
33
+        .about("RustDesk Relay Server")
34
+        .args_from_usage(&args)
35
+        .get_matches();
36
+    let addr = format!(
37
+        "0.0.0.0:{}",
38
+        matches.value_of("port").unwrap_or(DEFAULT_PORT)
39
+    );
40
+    log::info!("Listening on {}", addr);
41
+    let mut listener = new_listener(addr, false).await?;
42
+    loop {
43
+        tokio::select! {
44
+            Ok((stream, addr)) = listener.accept() => {
45
+                tokio::spawn(async move {
46
+                    make_pair(FramedStream::from(stream), addr).await.ok();
47
+                });
48
+            }
49
+        }
50
+    }
51
+}
52
+
53
+async fn make_pair(stream: FramedStream, addr: SocketAddr) -> ResultType<()> {
54
+    let mut stream = stream;
55
+    if let Some(Ok(bytes)) = stream.next_timeout(30_000).await {
56
+        if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
57
+            if let Some(rendezvous_message::Union::request_relay(rf)) = msg_in.union {
58
+                if !rf.uuid.is_empty() {
59
+                    let peer = PEERS.lock().unwrap().remove(&rf.uuid);
60
+                    if let Some(peer) = peer {
61
+                        log::info!("Forward request {} from {} got paired", rf.uuid, addr);
62
+                        return relay(stream, peer).await;
63
+                    } else {
64
+                        log::info!("New relay request {} from {}", rf.uuid, addr);
65
+                        PEERS.lock().unwrap().insert(rf.uuid.clone(), stream);
66
+                        sleep(30.).await;
67
+                        PEERS.lock().unwrap().remove(&rf.uuid);
68
+                    }
69
+                }
70
+            }
71
+        }
72
+    }
73
+    Ok(())
74
+}
75
+
76
+async fn relay(stream: FramedStream, peer: FramedStream) -> ResultType<()> {
77
+    let mut peer = peer;
78
+    let mut stream = stream;
79
+    peer.set_raw();
80
+    stream.set_raw();
81
+    loop {
82
+        tokio::select! {
83
+            res = peer.next() => {
84
+                if let Some(Ok(bytes)) = res {
85
+                    stream.send_bytes(bytes.into()).await?;
86
+                } else {
87
+                    break;
88
+                }
89
+            },
90
+            res = stream.next() => {
91
+                if let Some(Ok(bytes)) = res {
92
+                    peer.send_bytes(bytes.into()).await?;
93
+                } else {
94
+                    break;
95
+                }
96
+            },
97
+        }
98
+    }
99
+    Ok(())
100
+}