Browse Source

prototyping forward request

open-trade 5 years ago
parent
commit
1d19007a46
4 changed files with 37 additions and 8 deletions
  1. 1 1
      libs/hbb_common
  2. 3 2
      src/hbbf/main.rs
  3. 2 2
      src/main.rs
  4. 31 3
      src/rendezvous_server.rs

+ 1 - 1
libs/hbb_common

@@ -1 +1 @@
1
-Subproject commit 8209a1f2d1d11b7019e224fdc161f138415f6d53
1
+Subproject commit 6ccb0d96e7bb37ea43df5c53e07983d6c07804e5

+ 3 - 2
src/hbbf/main.rs

@@ -1,5 +1,6 @@
1 1
 use hbb_common::{
2
-    env_logger, log,
2
+    env_logger::*,
3
+    log,
3 4
     protobuf::Message as _,
4 5
     rendezvous_proto::*,
5 6
     sleep,
@@ -18,7 +19,7 @@ lazy_static::lazy_static! {
18 19
 
19 20
 #[tokio::main]
20 21
 async fn main() -> ResultType<()> {
21
-    env_logger::init();
22
+    init_from_env(Env::default().filter_or(DEFAULT_FILTER_ENV, "info"));
22 23
     let addr = "0.0.0.0:21117";
23 24
     log::info!("Listening on {}", addr);
24 25
     let mut listener = new_listener(addr, true).await?;

+ 2 - 2
src/main.rs

@@ -1,12 +1,12 @@
1 1
 // https://tools.ietf.org/rfc/rfc5128.txt
2 2
 // https://blog.csdn.net/bytxl/article/details/44344855
3 3
 
4
-use hbb_common::{env_logger, log, tokio, ResultType};
4
+use hbb_common::{env_logger::*, log, tokio, ResultType};
5 5
 use hbbs::*;
6 6
 
7 7
 #[tokio::main]
8 8
 async fn main() -> ResultType<()> {
9
-    env_logger::init();
9
+    init_from_env(Env::default().filter_or(DEFAULT_FILTER_ENV, "info"));
10 10
     let addr = "0.0.0.0:21116";
11 11
     log::info!("Listening on {}", addr);
12 12
     RendezvousServer::start(&addr).await?;

+ 31 - 3
src/rendezvous_server.rs

@@ -157,8 +157,27 @@ impl RendezvousServer {
157 157
                                     Some(rendezvous_message::Union::punch_hole_request(ph)) => {
158 158
                                         allow_err!(rs.handle_tcp_punch_hole_request(addr, ph.id).await);
159 159
                                     }
160
+                                    Some(rendezvous_message::Union::request_forward(mut rf)) => {
161
+                                        if !rs.pm.is_in_memory(&rf.id) {
162
+                                            break;
163
+                                        }
164
+                                        let mut msg_out = RendezvousMessage::new();
165
+                                        rf.socket_addr = AddrMangle::encode(addr);
166
+                                        msg_out.set_request_forward(rf);
167
+                                        rs.tx.send((msg_out, addr)).ok();
168
+                                    }
169
+                                    Some(rendezvous_message::Union::request_forward_response(rfr)) => {
170
+                                        let addr_b = AddrMangle::decode(&rfr.socket_addr);
171
+                                        let sender_b= rs.tcp_punch.lock().unwrap().remove(&addr_b);
172
+                                        if let Some(mut sender_b) = sender_b {
173
+                                            if let Ok(bytes) = rfr.write_to_bytes() {
174
+                                                allow_err!(sender_b.send(Bytes::from(bytes)).await);
175
+                                            }
176
+                                        }
177
+                                        break;
178
+                                    }
160 179
                                     Some(rendezvous_message::Union::punch_hole_sent(phs)) => {
161
-                                        allow_err!(rs.handle_hole_sent(&phs, addr, None).await);
180
+                                        allow_err!(rs.handle_hole_sent(phs, addr, None).await);
162 181
                                         break;
163 182
                                     }
164 183
                                     Some(rendezvous_message::Union::local_addr(la)) => {
@@ -233,8 +252,15 @@ impl RendezvousServer {
233 252
                         });
234 253
                     }
235 254
                 }
255
+                Some(rendezvous_message::Union::request_forward(rf)) => {
256
+                    if self.pm.is_in_memory(&rf.id) {
257
+                        let mut msg_out = RendezvousMessage::new();
258
+                        msg_out.set_request_forward(rf);
259
+                        socket.send(&msg_out, addr).await?
260
+                    }
261
+                }
236 262
                 Some(rendezvous_message::Union::punch_hole_sent(phs)) => {
237
-                    self.handle_hole_sent(&phs, addr, Some(socket)).await?;
263
+                    self.handle_hole_sent(phs, addr, Some(socket)).await?;
238 264
                 }
239 265
                 Some(rendezvous_message::Union::local_addr(la)) => {
240 266
                     self.handle_local_addr(&la, addr, Some(socket)).await?;
@@ -304,7 +330,7 @@ impl RendezvousServer {
304 330
     #[inline]
305 331
     async fn handle_hole_sent<'a>(
306 332
         &mut self,
307
-        phs: &PunchHoleSent,
333
+        phs: PunchHoleSent,
308 334
         addr: SocketAddr,
309 335
         socket: Option<&'a mut FramedSocket>,
310 336
     ) -> ResultType<()> {
@@ -321,9 +347,11 @@ impl RendezvousServer {
321 347
             Some(peer) => peer.pk,
322 348
             _ => Vec::new(),
323 349
         };
350
+        let forward_server = phs.forward_server;
324 351
         msg_out.set_punch_hole_response(PunchHoleResponse {
325 352
             socket_addr: AddrMangle::encode(addr),
326 353
             pk,
354
+            forward_server,
327 355
             ..Default::default()
328 356
         });
329 357
         if let Some(socket) = socket {