Browse Source

tcp punch

open-trade 5 years ago
parent
commit
0034fd695b
3 changed files with 739 additions and 202 deletions
  1. 599 134
      Cargo.lock
  2. 1 1
      libs/hbb_common
  3. 139 67
      src/rendezvous_server.rs

File diff suppressed because it is too large
+ 599 - 134
Cargo.lock


+ 1 - 1
libs/hbb_common

@@ -1 +1 @@
1
-Subproject commit 3d3d1bd1432f14341c7b90393206a787b95a1638
1
+Subproject commit 56827464aa46ec0ecb83944d7b783fb95a93f74c

+ 139 - 67
src/rendezvous_server.rs

@@ -1,8 +1,27 @@
1 1
 use hbb_common::{
2
-    allow_err, bytes::BytesMut, log, protobuf::parse_from_bytes, rendezvous_proto::*,
3
-    tcp::new_listener, tokio, udp::FramedSocket, AddrMangle, ResultType,
2
+    allow_err,
3
+    bytes::Bytes,
4
+    bytes::BytesMut,
5
+    bytes_codec::BytesCodec,
6
+    futures_util::{
7
+        sink::SinkExt,
8
+        stream::{SplitSink, StreamExt},
9
+    },
10
+    log,
11
+    protobuf::{parse_from_bytes, Message as _},
12
+    rendezvous_proto::*,
13
+    tcp::new_listener,
14
+    tokio::{self, net::TcpStream, sync::mpsc},
15
+    tokio_util::codec::Framed,
16
+    udp::FramedSocket,
17
+    AddrMangle, ResultType,
18
+};
19
+use std::{
20
+    collections::HashMap,
21
+    net::SocketAddr,
22
+    sync::{Arc, Mutex},
23
+    time::Instant,
4 24
 };
5
-use std::{collections::HashMap, net::SocketAddr, time::Instant};
6 25
 
7 26
 pub struct Peer {
8 27
     socket_addr: SocketAddr,
@@ -14,6 +33,7 @@ const REG_TIMEOUT: i32 = 30_000;
14 33
 
15 34
 pub struct RendezvousServer {
16 35
     peer_map: PeerMap,
36
+    tcp_punch: Arc<Mutex<HashMap<SocketAddr, SplitSink<Framed<TcpStream, BytesCodec>, Bytes>>>>,
17 37
 }
18 38
 
19 39
 impl RendezvousServer {
@@ -21,17 +41,38 @@ impl RendezvousServer {
21 41
         let mut socket = FramedSocket::new(addr).await?;
22 42
         let mut rs = Self {
23 43
             peer_map: PeerMap::new(),
44
+            tcp_punch: Arc::new(Mutex::new(HashMap::new())),
24 45
         };
25
-        // tcp listener used to test if udp/tcp share the same NAT port, yes in my test.
26
-        // also be used to help client to get local ip.
46
+        let (tx, mut rx) = mpsc::unbounded_channel::<(SocketAddr, String)>();
27 47
         let mut listener = new_listener(addr, true).await.unwrap();
28 48
         loop {
29 49
             tokio::select! {
50
+                Some((addr, id)) = rx.recv() => {
51
+                    allow_err!(rs.handle_punch_hole_request(addr, &id, &mut socket).await);
52
+                }
30 53
                 Some(Ok((bytes, addr))) = socket.next() => {
31 54
                     allow_err!(rs.handle_msg(&bytes, addr, &mut socket).await);
32 55
                 }
33
-                Ok((_, addr)) = listener.accept() => {
56
+                Ok((stream, addr)) = listener.accept() => {
34 57
                     log::debug!("Tcp connection from {:?}", addr);
58
+                    let (a, mut b) = Framed::new(stream, BytesCodec::new()).split();
59
+                    let tcp_punch = rs.tcp_punch.clone();
60
+                    tcp_punch.lock().unwrap().insert(addr, a);
61
+                    let tx = tx.clone();
62
+                    tokio::spawn(async move {
63
+                        while let Some(Ok(bytes)) = b.next().await {
64
+                            if let Ok(msg_in) = parse_from_bytes::<RendezvousMessage>(&bytes) {
65
+                                match msg_in.union {
66
+                                    Some(rendezvous_message::Union::punch_hole_request(ph)) => {
67
+                                        tx.send((addr, ph.id)).ok();
68
+                                    }
69
+                                    _ => {}
70
+                                }
71
+                            }
72
+                        }
73
+                        tcp_punch.lock().unwrap().remove(&addr);
74
+                        log::debug!("Tcp connection from {:?} closed", addr);
75
+                    });
35 76
                 }
36 77
             }
37 78
         }
@@ -49,71 +90,20 @@ impl RendezvousServer {
49 90
                     // B registered
50 91
                     if rp.id.len() > 0 {
51 92
                         log::debug!("New peer registered: {:?} {:?}", &rp.id, &addr);
52
-                        self.peer_map.insert(rp.id, Peer { socket_addr: addr, last_reg_time: Instant::now() });
93
+                        self.peer_map.insert(
94
+                            rp.id,
95
+                            Peer {
96
+                                socket_addr: addr,
97
+                                last_reg_time: Instant::now(),
98
+                            },
99
+                        );
53 100
                         let mut msg_out = RendezvousMessage::new();
54 101
                         msg_out.set_register_peer_response(RegisterPeerResponse::default());
55 102
                         socket.send(&msg_out, addr).await?
56 103
                     }
57 104
                 }
58 105
                 Some(rendezvous_message::Union::punch_hole_request(ph)) => {
59
-                    // punch hole request from A, forward to B,
60
-                    // check if in same intranet first,
61
-                    // fetch local addrs if in same intranet.
62
-                    // because punch hole won't work if in the same intranet,
63
-                    // all routers will drop such self-connections.
64
-                    if let Some(peer) = self.peer_map.get(&ph.id) {
65
-                        if peer.last_reg_time.elapsed().as_millis() as i32 >= REG_TIMEOUT {
66
-                            let mut msg_out = RendezvousMessage::new();
67
-                            msg_out.set_punch_hole_response(PunchHoleResponse {
68
-                                failure: punch_hole_response::Failure::OFFLINE.into(),
69
-                                ..Default::default()
70
-                            });
71
-                            return socket.send(&msg_out, addr).await;
72
-                        }
73
-                        let mut msg_out = RendezvousMessage::new();
74
-                        let same_intranet = match peer.socket_addr {
75
-                            SocketAddr::V4(a) => match addr {
76
-                                SocketAddr::V4(b) => a.ip() == b.ip(),
77
-                                _ => false,
78
-                            },
79
-                            SocketAddr::V6(a) => match addr {
80
-                                SocketAddr::V6(b) => a.ip() == b.ip(),
81
-                                _ => false,
82
-                            },
83
-                        };
84
-                        let socket_addr = AddrMangle::encode(&addr);
85
-                        if same_intranet {
86
-                            log::debug!(
87
-                                "Fetch local addr {:?} {:?} request from {:?}",
88
-                                &ph.id,
89
-                                &peer.socket_addr,
90
-                                &addr
91
-                            );
92
-                            msg_out.set_fetch_local_addr(FetchLocalAddr {
93
-                                socket_addr,
94
-                                ..Default::default()
95
-                            });
96
-                        } else {
97
-                            log::debug!(
98
-                                "Punch hole {:?} {:?} request from {:?}",
99
-                                &ph.id,
100
-                                &peer.socket_addr,
101
-                                &addr
102
-                            );
103
-                            msg_out.set_punch_hole(PunchHole {
104
-                                socket_addr,
105
-                                ..Default::default()
106
-                            });
107
-                        }
108
-                        socket.send(&msg_out, peer.socket_addr).await?;
109
-                    } else {
110
-                        let mut msg_out = RendezvousMessage::new();
111
-                        msg_out.set_punch_hole_response(PunchHoleResponse {
112
-                            failure: punch_hole_response::Failure::ID_NOT_EXIST.into(),
113
-                            ..Default::default()
114
-                        });
115
-                        socket.send(&msg_out, addr).await?
116
-                    }
106
+                    self.handle_punch_hole_request(addr, &ph.id, socket).await?;
117 107
                 }
118 108
                 Some(rendezvous_message::Union::punch_hole_sent(phs)) => {
119 109
                     // punch hole sent from B, tell A that B is ready to be connected
@@ -121,10 +111,11 @@ impl RendezvousServer {
121 111
                     log::debug!("Punch hole response to {:?} from {:?}", &addr_a, &addr);
122 112
                     let mut msg_out = RendezvousMessage::new();
123 113
                     msg_out.set_punch_hole_response(PunchHoleResponse {
124
-                        socket_addr: AddrMangle::encode(&addr),
114
+                        socket_addr: AddrMangle::encode(addr),
125 115
                         ..Default::default()
126 116
                     });
127 117
                     socket.send(&msg_out, addr_a).await?;
118
+                    self.send_to_tcp(&msg_out, addr_a).await?;
128 119
                 }
129 120
                 Some(rendezvous_message::Union::local_addr(la)) => {
130 121
                     // forward local addrs of B to A
@@ -136,10 +127,91 @@ impl RendezvousServer {
136 127
                         ..Default::default()
137 128
                     });
138 129
                     socket.send(&msg_out, addr_a).await?;
130
+                    self.send_to_tcp(&msg_out, addr_a).await?;
139 131
                 }
140 132
                 _ => {}
141 133
             }
142 134
         }
143 135
         Ok(())
144 136
     }
145
-}
137
+
138
+    async fn handle_punch_hole_request(
139
+        &mut self,
140
+        addr: SocketAddr,
141
+        id: &str,
142
+        socket: &mut FramedSocket,
143
+    ) -> ResultType<()> {
144
+        // punch hole request from A, forward to B,
145
+        // check if in same intranet first,
146
+        // fetch local addrs if in same intranet.
147
+        // because punch hole won't work if in the same intranet,
148
+        // all routers will drop such self-connections.
149
+        if let Some(peer) = self.peer_map.get(id) {
150
+            if peer.last_reg_time.elapsed().as_millis() as i32 >= REG_TIMEOUT {
151
+                let mut msg_out = RendezvousMessage::new();
152
+                msg_out.set_punch_hole_response(PunchHoleResponse {
153
+                    failure: punch_hole_response::Failure::OFFLINE.into(),
154
+                    ..Default::default()
155
+                });
156
+                return socket.send(&msg_out, addr).await;
157
+            }
158
+            let mut msg_out = RendezvousMessage::new();
159
+            let same_intranet = match peer.socket_addr {
160
+                SocketAddr::V4(a) => match addr {
161
+                    SocketAddr::V4(b) => a.ip() == b.ip(),
162
+                    _ => false,
163
+                },
164
+                SocketAddr::V6(a) => match addr {
165
+                    SocketAddr::V6(b) => a.ip() == b.ip(),
166
+                    _ => false,
167
+                },
168
+            };
169
+            let socket_addr = AddrMangle::encode(addr);
170
+            if same_intranet {
171
+                log::debug!(
172
+                    "Fetch local addr {:?} {:?} request from {:?}",
173
+                    id,
174
+                    &peer.socket_addr,
175
+                    &addr
176
+                );
177
+                msg_out.set_fetch_local_addr(FetchLocalAddr {
178
+                    socket_addr,
179
+                    ..Default::default()
180
+                });
181
+            } else {
182
+                log::debug!(
183
+                    "Punch hole {:?} {:?} request from {:?}",
184
+                    id,
185
+                    &peer.socket_addr,
186
+                    &addr
187
+                );
188
+                msg_out.set_punch_hole(PunchHole {
189
+                    socket_addr,
190
+                    ..Default::default()
191
+                });
192
+            }
193
+            socket.send(&msg_out, peer.socket_addr).await?;
194
+        } else {
195
+            let mut msg_out = RendezvousMessage::new();
196
+            msg_out.set_punch_hole_response(PunchHoleResponse {
197
+                failure: punch_hole_response::Failure::ID_NOT_EXIST.into(),
198
+                ..Default::default()
199
+            });
200
+            socket.send(&msg_out, addr).await?
201
+        }
202
+        Ok(())
203
+    }
204
+
205
+    async fn send_to_tcp(&mut self, msg: &RendezvousMessage, addr: SocketAddr) -> ResultType<()> {
206
+        let tcp = self.tcp_punch.lock().unwrap().remove(&addr);
207
+        if let Some(mut tcp) = tcp {
208
+            if let Ok(bytes) = msg.write_to_bytes() {
209
+                tokio::spawn(async move {
210
+                    allow_err!(tcp.send(Bytes::from(bytes)).await);
211
+                    log::debug!("Send punch hole to {} via tcp", addr);
212
+                });
213
+            }
214
+        }
215
+        Ok(())
216
+    }
217
+}