open-trade лет назад: 4
Родитель
Сommit
fc7f790def
4 измененных файлов с 47 добавлено и 18 удалено
  1. 6 7
      src/hbbr/main.rs
  2. 17 2
      src/hbbr/relay_server.rs
  3. 6 8
      src/main.rs
  4. 18 1
      src/rendezvous_server.rs

+ 6 - 7
src/hbbr/main.rs

@@ -1,12 +1,10 @@
1
 use clap::App;
1
 use clap::App;
2
 mod relay_server;
2
 mod relay_server;
3
-use hbb_common::{env_logger::*, tokio, ResultType};
4
-use relay_server::start;
3
+use hbb_common::{env_logger::*, ResultType};
4
+use relay_server::*;
5
+use std::sync::{Arc, Mutex};
5
 
6
 
6
-const DEFAULT_PORT: &'static str = "21117";
7
-
8
-#[tokio::main]
9
-async fn main() -> ResultType<()> {
7
+fn main() -> ResultType<()> {
10
     init_from_env(Env::default().filter_or(DEFAULT_FILTER_ENV, "info"));
8
     init_from_env(Env::default().filter_or(DEFAULT_FILTER_ENV, "info"));
11
     let args = format!(
9
     let args = format!(
12
         "-p, --port=[NUMBER(default={})] 'Sets the listening port'",
10
         "-p, --port=[NUMBER(default={})] 'Sets the listening port'",
@@ -18,6 +16,7 @@ async fn main() -> ResultType<()> {
18
         .about("RustDesk Relay Server")
16
         .about("RustDesk Relay Server")
19
         .args_from_usage(&args)
17
         .args_from_usage(&args)
20
         .get_matches();
18
         .get_matches();
21
-    start(matches.value_of("port").unwrap_or(DEFAULT_PORT)).await?;
19
+    let stop: Arc<Mutex<bool>> = Default::default();
20
+    start(matches.value_of("port").unwrap_or(DEFAULT_PORT), stop)?;
22
     Ok(())
21
     Ok(())
23
 }
22
 }

+ 17 - 2
src/hbbr/relay_server.rs

@@ -4,7 +4,11 @@ use hbb_common::{
4
     rendezvous_proto::*,
4
     rendezvous_proto::*,
5
     sleep,
5
     sleep,
6
     tcp::{new_listener, FramedStream},
6
     tcp::{new_listener, FramedStream},
7
-    tokio, ResultType,
7
+    tokio::{
8
+        self,
9
+        time::{interval, Duration},
10
+    },
11
+    ResultType,
8
 };
12
 };
9
 use std::{
13
 use std::{
10
     collections::HashMap,
14
     collections::HashMap,
@@ -16,9 +20,13 @@ lazy_static::lazy_static! {
16
     static ref PEERS: Arc<Mutex<HashMap<String, FramedStream>>> = Arc::new(Mutex::new(HashMap::new()));
20
     static ref PEERS: Arc<Mutex<HashMap<String, FramedStream>>> = Arc::new(Mutex::new(HashMap::new()));
17
 }
21
 }
18
 
22
 
19
-pub async fn start(port: &str) -> ResultType<()> {
23
+pub const DEFAULT_PORT: &'static str = "21117";
24
+
25
+#[tokio::main(basic_scheduler)]
26
+pub async fn start(port: &str, stop: Arc<Mutex<bool>>) -> ResultType<()> {
20
     let addr = format!("0.0.0.0:{}", port);
27
     let addr = format!("0.0.0.0:{}", port);
21
     log::info!("Listening on {}", addr);
28
     log::info!("Listening on {}", addr);
29
+    let mut timer = interval(Duration::from_millis(300));
22
     let mut listener = new_listener(addr, false).await?;
30
     let mut listener = new_listener(addr, false).await?;
23
     loop {
31
     loop {
24
         tokio::select! {
32
         tokio::select! {
@@ -27,8 +35,15 @@ pub async fn start(port: &str) -> ResultType<()> {
27
                     make_pair(FramedStream::from(stream), addr).await.ok();
35
                     make_pair(FramedStream::from(stream), addr).await.ok();
28
                 });
36
                 });
29
             }
37
             }
38
+            _ = timer.tick() => {
39
+                if *stop.lock().unwrap() {
40
+                    log::info!("Stopped");
41
+                    break;
42
+                }
43
+            }
30
         }
44
         }
31
     }
45
     }
46
+    Ok(())
32
 }
47
 }
33
 
48
 
34
 async fn make_pair(stream: FramedStream, addr: SocketAddr) -> ResultType<()> {
49
 async fn make_pair(stream: FramedStream, addr: SocketAddr) -> ResultType<()> {

+ 6 - 8
src/main.rs

@@ -2,13 +2,12 @@
2
 // https://blog.csdn.net/bytxl/article/details/44344855
2
 // https://blog.csdn.net/bytxl/article/details/44344855
3
 
3
 
4
 use clap::App;
4
 use clap::App;
5
-use hbb_common::{env_logger::*, log, tokio, ResultType};
5
+use hbb_common::{env_logger::*, log, ResultType};
6
 use hbbs::*;
6
 use hbbs::*;
7
 use ini::Ini;
7
 use ini::Ini;
8
-const DEFAULT_PORT: &'static str = "21116";
8
+use std::sync::{Arc, Mutex};
9
 
9
 
10
-#[tokio::main]
11
-async fn main() -> ResultType<()> {
10
+fn main() -> ResultType<()> {
12
     init_from_env(Env::default().filter_or(DEFAULT_FILTER_ENV, "info"));
11
     init_from_env(Env::default().filter_or(DEFAULT_FILTER_ENV, "info"));
13
     let args = format!(
12
     let args = format!(
14
         "-c --config=[FILE] +takes_value 'Sets a custom config file'
13
         "-c --config=[FILE] +takes_value 'Sets a custom config file'
@@ -72,12 +71,11 @@ async fn main() -> ResultType<()> {
72
         .map(|x| x.to_owned())
71
         .map(|x| x.to_owned())
73
         .collect();
72
         .collect();
74
     let addr = format!("0.0.0.0:{}", port);
73
     let addr = format!("0.0.0.0:{}", port);
75
-    log::info!("Listening on {}", addr);
76
     let addr2 = format!("0.0.0.0:{}", port.parse::<i32>().unwrap_or(0) - 1);
74
     let addr2 = format!("0.0.0.0:{}", port.parse::<i32>().unwrap_or(0) - 1);
77
-    log::info!("Listening on {}, extra port for NAT test", addr2);
78
     log::info!("relay-servers={:?}", relay_servers);
75
     log::info!("relay-servers={:?}", relay_servers);
79
     log::info!("serial={}", serial);
76
     log::info!("serial={}", serial);
80
     log::info!("rendezvous-servers={:?}", rendezvous_servers);
77
     log::info!("rendezvous-servers={:?}", rendezvous_servers);
78
+    let stop: Arc<Mutex<bool>> = Default::default();
81
     RendezvousServer::start(
79
     RendezvousServer::start(
82
         &addr,
80
         &addr,
83
         &addr2,
81
         &addr2,
@@ -85,7 +83,7 @@ async fn main() -> ResultType<()> {
85
         serial,
83
         serial,
86
         rendezvous_servers,
84
         rendezvous_servers,
87
         get_arg("software-url", ""),
85
         get_arg("software-url", ""),
88
-    )
89
-    .await?;
86
+        stop,
87
+    )?;
90
     Ok(())
88
     Ok(())
91
 }
89
 }

+ 18 - 1
src/rendezvous_server.rs

@@ -11,7 +11,12 @@ use hbb_common::{
11
     rendezvous_proto::*,
11
     rendezvous_proto::*,
12
     tcp::{new_listener, FramedStream},
12
     tcp::{new_listener, FramedStream},
13
     timeout,
13
     timeout,
14
-    tokio::{self, net::TcpStream, sync::mpsc},
14
+    tokio::{
15
+        self,
16
+        net::TcpStream,
17
+        sync::mpsc,
18
+        time::{interval, Duration},
19
+    },
15
     tokio_util::codec::Framed,
20
     tokio_util::codec::Framed,
16
     udp::FramedSocket,
21
     udp::FramedSocket,
17
     AddrMangle, ResultType,
22
     AddrMangle, ResultType,
@@ -61,6 +66,8 @@ struct PeerMap {
61
     db: super::SledAsync,
66
     db: super::SledAsync,
62
 }
67
 }
63
 
68
 
69
+pub const DEFAULT_PORT: &'static str = "21116";
70
+
64
 impl PeerMap {
71
 impl PeerMap {
65
     fn new() -> ResultType<Self> {
72
     fn new() -> ResultType<Self> {
66
         Ok(Self {
73
         Ok(Self {
@@ -135,6 +142,7 @@ pub struct RendezvousServer {
135
 }
142
 }
136
 
143
 
137
 impl RendezvousServer {
144
 impl RendezvousServer {
145
+    #[tokio::main(basic_scheduler)]
138
     pub async fn start(
146
     pub async fn start(
139
         addr: &str,
147
         addr: &str,
140
         addr2: &str,
148
         addr2: &str,
@@ -142,6 +150,7 @@ impl RendezvousServer {
142
         serial: i32,
150
         serial: i32,
143
         rendezvous_servers: Vec<String>,
151
         rendezvous_servers: Vec<String>,
144
         software_url: String,
152
         software_url: String,
153
+        stop: Arc<Mutex<bool>>,
145
     ) -> ResultType<()> {
154
     ) -> ResultType<()> {
146
         let mut socket = FramedSocket::new(addr).await?;
155
         let mut socket = FramedSocket::new(addr).await?;
147
         let (tx, mut rx) = mpsc::unbounded_channel::<(RendezvousMessage, SocketAddr)>();
156
         let (tx, mut rx) = mpsc::unbounded_channel::<(RendezvousMessage, SocketAddr)>();
@@ -161,8 +170,15 @@ impl RendezvousServer {
161
         };
170
         };
162
         let mut listener = new_listener(addr, false).await?;
171
         let mut listener = new_listener(addr, false).await?;
163
         let mut listener2 = new_listener(addr2, false).await?;
172
         let mut listener2 = new_listener(addr2, false).await?;
173
+        let mut timer = interval(Duration::from_millis(300));
164
         loop {
174
         loop {
165
             tokio::select! {
175
             tokio::select! {
176
+                _ = timer.tick() => {
177
+                    if *stop.lock().unwrap() {
178
+                        log::info!("Stopped");
179
+                        break;
180
+                    }
181
+                }
166
                 Some((msg, addr)) = rx.recv() => {
182
                 Some((msg, addr)) = rx.recv() => {
167
                     allow_err!(socket.send(&msg, addr).await);
183
                     allow_err!(socket.send(&msg, addr).await);
168
                 }
184
                 }
@@ -274,6 +290,7 @@ impl RendezvousServer {
274
                 }
290
                 }
275
             }
291
             }
276
         }
292
         }
293
+        Ok(())
277
     }
294
     }
278
 
295
 
279
     #[inline]
296
     #[inline]