Browse Source

refactor stop

open-trade 4 years ago
parent
commit
9132d85f74
2 changed files with 47 additions and 8 deletions
  1. 13 2
      src/relay_server.rs
  2. 34 6
      src/rendezvous_server.rs

+ 13 - 2
src/relay_server.rs

@@ -6,6 +6,7 @@ use hbb_common::{
6 6
     tcp::{new_listener, FramedStream},
7 7
     tokio::{
8 8
         self,
9
+        net::TcpListener,
9 10
         time::{interval, Duration},
10 11
     },
11 12
     ResultType,
@@ -26,8 +27,19 @@ pub const DEFAULT_PORT: &'static str = "21117";
26 27
 pub async fn start(port: &str, license: &str, stop: Arc<Mutex<bool>>) -> ResultType<()> {
27 28
     let addr = format!("0.0.0.0:{}", port);
28 29
     log::info!("Listening on tcp {}", addr);
29
-    let mut timer = interval(Duration::from_millis(300));
30 30
     let mut listener = new_listener(addr, false).await?;
31
+    loop {
32
+        if *stop.lock().unwrap() {
33
+            sleep(0.1).await;
34
+            continue;
35
+        }
36
+        log::info!("Start");
37
+        io_loop(&mut listener, license, stop.clone()).await;
38
+    }
39
+}
40
+
41
+async fn io_loop(listener: &mut TcpListener, license: &str, stop: Arc<Mutex<bool>>) {
42
+    let mut timer = interval(Duration::from_millis(100));
31 43
     loop {
32 44
         tokio::select! {
33 45
             Ok((stream, addr)) = listener.accept() => {
@@ -44,7 +56,6 @@ pub async fn start(port: &str, license: &str, stop: Arc<Mutex<bool>>) -> ResultT
44 56
             }
45 57
         }
46 58
     }
47
-    Ok(())
48 59
 }
49 60
 
50 61
 async fn make_pair(stream: FramedStream, addr: SocketAddr, license: &str) -> ResultType<()> {

+ 34 - 6
src/rendezvous_server.rs

@@ -9,11 +9,12 @@ use hbb_common::{
9 9
     log,
10 10
     protobuf::{Message as _, MessageField},
11 11
     rendezvous_proto::*,
12
+    sleep,
12 13
     tcp::{new_listener, FramedStream},
13 14
     timeout,
14 15
     tokio::{
15 16
         self,
16
-        net::TcpStream,
17
+        net::{TcpListener, TcpStream},
17 18
         sync::mpsc,
18 19
         time::{interval, Duration},
19 20
     },
@@ -137,6 +138,7 @@ impl PeerMap {
137 138
 const REG_TIMEOUT: i32 = 30_000;
138 139
 type Sink = SplitSink<Framed<TcpStream, BytesCodec>, Bytes>;
139 140
 type Sender = mpsc::UnboundedSender<(RendezvousMessage, SocketAddr)>;
141
+type Receiver = mpsc::UnboundedReceiver<(RendezvousMessage, SocketAddr)>;
140 142
 static mut ROTATION_RELAY_SERVER: usize = 0;
141 143
 
142 144
 #[derive(Clone)]
@@ -184,7 +186,34 @@ impl RendezvousServer {
184 186
         };
185 187
         let mut listener = new_listener(addr, false).await?;
186 188
         let mut listener2 = new_listener(addr2, false).await?;
187
-        let mut timer = interval(Duration::from_millis(300));
189
+        loop {
190
+            if *stop.lock().unwrap() {
191
+                sleep(0.1).await;
192
+                continue;
193
+            }
194
+            log::info!("Start");
195
+            rs.io_loop(
196
+                &mut rx,
197
+                &mut listener,
198
+                &mut listener2,
199
+                &mut socket,
200
+                license,
201
+                stop.clone(),
202
+            )
203
+            .await;
204
+        }
205
+    }
206
+
207
+    async fn io_loop(
208
+        &mut self,
209
+        rx: &mut Receiver,
210
+        listener: &mut TcpListener,
211
+        listener2: &mut TcpListener,
212
+        socket: &mut FramedSocket,
213
+        license: &str,
214
+        stop: Arc<Mutex<bool>>,
215
+    ) {
216
+        let mut timer = interval(Duration::from_millis(100));
188 217
         loop {
189 218
             tokio::select! {
190 219
                 _ = timer.tick() => {
@@ -197,7 +226,7 @@ impl RendezvousServer {
197 226
                     allow_err!(socket.send(&msg, addr).await);
198 227
                 }
199 228
                 Some(Ok((bytes, addr))) = socket.next() => {
200
-                    allow_err!(rs.handle_msg(&bytes, addr, &mut socket, license).await);
229
+                    allow_err!(self.handle_msg(&bytes, addr, socket, license).await);
201 230
                 }
202 231
                 Ok((stream, addr)) = listener2.accept() => {
203 232
                     let stream = FramedStream::from(stream);
@@ -220,8 +249,8 @@ impl RendezvousServer {
220 249
                 Ok((stream, addr)) = listener.accept() => {
221 250
                     log::debug!("Tcp connection from {:?}", addr);
222 251
                     let (a, mut b) = Framed::new(stream, BytesCodec::new()).split();
223
-                    let tcp_punch = rs.tcp_punch.clone();
224
-                    let mut rs = rs.clone();
252
+                    let tcp_punch = self.tcp_punch.clone();
253
+                    let mut rs = self.clone();
225 254
                     let license = license.to_owned();
226 255
                     tokio::spawn(async move {
227 256
                         let mut sender = Some(a);
@@ -305,7 +334,6 @@ impl RendezvousServer {
305 334
                 }
306 335
             }
307 336
         }
308
-        Ok(())
309 337
     }
310 338
 
311 339
     #[inline]