Browse Source

fix on request_relay

open-trade 5 years ago
parent
commit
0a1fbadb4f
4 changed files with 17 additions and 104 deletions
  1. 2 2
      Cargo.toml
  2. 1 1
      libs/hbb_common
  3. 0 84
      src/hbbf/main.rs
  4. 14 17
      src/rendezvous_server.rs

+ 2 - 2
Cargo.toml

@@ -5,8 +5,8 @@ authors = ["open-trade <info@opentradesolutions.com>"]
5
 edition = "2018"
5
 edition = "2018"
6
 
6
 
7
 [[bin]]
7
 [[bin]]
8
-name = "hbbf"
9
-path = "src/hbbf/main.rs"
8
+name = "hbbr"
9
+path = "src/hbbr/main.rs"
10
 
10
 
11
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
11
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
12
 
12
 

+ 1 - 1
libs/hbb_common

@@ -1 +1 @@
1
-Subproject commit fdaa6c740c475f6023c93a148bc851655d0093ac
1
+Subproject commit 4b0de359f90425ce9c72903b7a9e5fcbe505a5cc

+ 0 - 84
src/hbbf/main.rs

@@ -1,84 +0,0 @@
1
-use hbb_common::{
2
-    env_logger::*,
3
-    log,
4
-    protobuf::Message as _,
5
-    rendezvous_proto::*,
6
-    sleep,
7
-    tcp::{new_listener, FramedStream},
8
-    tokio, ResultType,
9
-};
10
-use std::{
11
-    collections::HashMap,
12
-    net::SocketAddr,
13
-    sync::{Arc, Mutex},
14
-};
15
-
16
-lazy_static::lazy_static! {
17
-    static ref PEERS: Arc<Mutex<HashMap<String, FramedStream>>> = Arc::new(Mutex::new(HashMap::new()));
18
-}
19
-
20
-#[tokio::main]
21
-async fn main() -> ResultType<()> {
22
-    init_from_env(Env::default().filter_or(DEFAULT_FILTER_ENV, "info"));
23
-    let addr = "0.0.0.0:21117";
24
-    log::info!("Listening on {}", addr);
25
-    let mut listener = new_listener(addr, true).await?;
26
-    loop {
27
-        tokio::select! {
28
-            Ok((stream, addr)) = listener.accept() => {
29
-                tokio::spawn(async move {
30
-                    make_pair(FramedStream::from(stream), addr).await.ok();
31
-                });
32
-            }
33
-        }
34
-    }
35
-}
36
-
37
-async fn make_pair(stream: FramedStream, addr: SocketAddr) -> ResultType<()> {
38
-    let mut stream = stream;
39
-    if let Some(Ok(bytes)) = stream.next_timeout(30_000).await {
40
-        if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
41
-            if let Some(rendezvous_message::Union::request_relay(rf)) = msg_in.union {
42
-                if !rf.uuid.is_empty() {
43
-                    let peer = PEERS.lock().unwrap().remove(&rf.uuid);
44
-                    if let Some(peer) = peer {
45
-                        log::info!("Forward request {} from {} got paired", rf.uuid, addr);
46
-                        return relay(stream, peer).await;
47
-                    } else {
48
-                        log::info!("New relay request {} from {}", rf.uuid, addr);
49
-                        PEERS.lock().unwrap().insert(rf.uuid.clone(), stream);
50
-                        sleep(30.).await;
51
-                        PEERS.lock().unwrap().remove(&rf.uuid);
52
-                    }
53
-                }
54
-            }
55
-        }
56
-    }
57
-    Ok(())
58
-}
59
-
60
-async fn relay(stream: FramedStream, peer: FramedStream) -> ResultType<()> {
61
-    let mut peer = peer;
62
-    let mut stream = stream;
63
-    peer.set_raw();
64
-    stream.set_raw();
65
-    loop {
66
-        tokio::select! {
67
-            res = peer.next() => {
68
-                if let Some(Ok(bytes)) = res {
69
-                    stream.send_bytes(bytes.into()).await?;
70
-                } else {
71
-                    break;
72
-                }
73
-            },
74
-            res = stream.next() => {
75
-                if let Some(Ok(bytes)) = res {
76
-                    peer.send_bytes(bytes.into()).await?;
77
-                } else {
78
-                    break;
79
-                }
80
-            },
81
-        }
82
-    }
83
-    Ok(())
84
-}

+ 14 - 17
src/rendezvous_server.rs

@@ -10,6 +10,7 @@ use hbb_common::{
10
     protobuf::Message as _,
10
     protobuf::Message as _,
11
     rendezvous_proto::*,
11
     rendezvous_proto::*,
12
     tcp::new_listener,
12
     tcp::new_listener,
13
+    timeout,
13
     tokio::{self, net::TcpStream, sync::mpsc},
14
     tokio::{self, net::TcpStream, sync::mpsc},
14
     tokio_util::codec::Framed,
15
     tokio_util::codec::Framed,
15
     udp::FramedSocket,
16
     udp::FramedSocket,
@@ -151,27 +152,28 @@ impl RendezvousServer {
151
                     tcp_punch.lock().unwrap().insert(addr, a);
152
                     tcp_punch.lock().unwrap().insert(addr, a);
152
                     let mut rs = rs.clone();
153
                     let mut rs = rs.clone();
153
                     tokio::spawn(async move {
154
                     tokio::spawn(async move {
154
-                        while let Some(Ok(bytes)) = b.next().await {
155
+                        while let Ok(Some(Ok(bytes))) = timeout(30_000, b.next()).await {
155
                             if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
156
                             if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
156
                                 match msg_in.union {
157
                                 match msg_in.union {
157
                                     Some(rendezvous_message::Union::punch_hole_request(ph)) => {
158
                                     Some(rendezvous_message::Union::punch_hole_request(ph)) => {
158
                                         allow_err!(rs.handle_tcp_punch_hole_request(addr, ph.id).await);
159
                                         allow_err!(rs.handle_tcp_punch_hole_request(addr, ph.id).await);
159
                                     }
160
                                     }
160
                                     Some(rendezvous_message::Union::request_relay(mut rf)) => {
161
                                     Some(rendezvous_message::Union::request_relay(mut rf)) => {
161
-                                        if !rs.pm.is_in_memory(&rf.id) {
162
-                                            break;
162
+                                        if let Some(peer) = rs.pm.map.read().unwrap().get(&rf.id).map(|x| x.clone()) {
163
+                                            let mut msg_out = RendezvousMessage::new();
164
+                                            rf.socket_addr = AddrMangle::encode(addr);
165
+                                            msg_out.set_request_relay(rf);
166
+                                            rs.tx.send((msg_out, peer.socket_addr)).ok();
163
                                         }
167
                                         }
164
-                                        let mut msg_out = RendezvousMessage::new();
165
-                                        rf.socket_addr = AddrMangle::encode(addr);
166
-                                        msg_out.set_request_relay(rf);
167
-                                        rs.tx.send((msg_out, addr)).ok();
168
                                     }
168
                                     }
169
                                     Some(rendezvous_message::Union::request_relay_response(mut rfr)) => {
169
                                     Some(rendezvous_message::Union::request_relay_response(mut rfr)) => {
170
                                         let addr_b = AddrMangle::decode(&rfr.socket_addr);
170
                                         let addr_b = AddrMangle::decode(&rfr.socket_addr);
171
                                         rfr.socket_addr = Default::default();
171
                                         rfr.socket_addr = Default::default();
172
-                                        let sender_b= rs.tcp_punch.lock().unwrap().remove(&addr_b);
172
+                                        let mut msg_out = RendezvousMessage::new();
173
+                                        msg_out.set_request_relay_response(rfr);
174
+                                        let sender_b = rs.tcp_punch.lock().unwrap().remove(&addr_b);
173
                                         if let Some(mut sender_b) = sender_b {
175
                                         if let Some(mut sender_b) = sender_b {
174
-                                            if let Ok(bytes) = rfr.write_to_bytes() {
176
+                                            if let Ok(bytes) = msg_out.write_to_bytes() {
175
                                                 allow_err!(sender_b.send(Bytes::from(bytes)).await);
177
                                                 allow_err!(sender_b.send(Bytes::from(bytes)).await);
176
                                             }
178
                                             }
177
                                         }
179
                                         }
@@ -185,7 +187,9 @@ impl RendezvousServer {
185
                                         allow_err!(rs.handle_local_addr(&la, addr, None).await);
187
                                         allow_err!(rs.handle_local_addr(&la, addr, None).await);
186
                                         break;
188
                                         break;
187
                                     }
189
                                     }
188
-                                    _ => {}
190
+                                    _ => {
191
+                                        break;
192
+                                    }
189
                                 }
193
                                 }
190
                             } else {
194
                             } else {
191
                                 break;
195
                                 break;
@@ -255,13 +259,6 @@ impl RendezvousServer {
255
                         });
259
                         });
256
                     }
260
                     }
257
                 }
261
                 }
258
-                Some(rendezvous_message::Union::request_relay(rf)) => {
259
-                    if self.pm.is_in_memory(&rf.id) {
260
-                        let mut msg_out = RendezvousMessage::new();
261
-                        msg_out.set_request_relay(rf);
262
-                        socket.send(&msg_out, addr).await?
263
-                    }
264
-                }
265
                 Some(rendezvous_message::Union::punch_hole_sent(phs)) => {
262
                 Some(rendezvous_message::Union::punch_hole_sent(phs)) => {
266
                     self.handle_hole_sent(phs, addr, Some(socket)).await?;
263
                     self.handle_hole_sent(phs, addr, Some(socket)).await?;
267
                 }
264
                 }