open-trade 5 years ago
parent
commit
b32a948347
5 changed files with 91 additions and 2 deletions
  1. 1 0
      Cargo.lock
  2. 5 0
      Cargo.toml
  3. 1 1
      libs/hbb_common
  4. 83 0
      src/hbbf/main.rs
  5. 1 1
      src/main.rs

+ 1 - 0
Cargo.lock

@@ -497,6 +497,7 @@ name = "hbbs"
497
 version = "0.1.0"
497
 version = "0.1.0"
498
 dependencies = [
498
 dependencies = [
499
  "hbb_common 0.1.0",
499
  "hbb_common 0.1.0",
500
+ "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
500
  "serde 1.0.115 (registry+https://github.com/rust-lang/crates.io-index)",
501
  "serde 1.0.115 (registry+https://github.com/rust-lang/crates.io-index)",
501
  "serde_derive 1.0.115 (registry+https://github.com/rust-lang/crates.io-index)",
502
  "serde_derive 1.0.115 (registry+https://github.com/rust-lang/crates.io-index)",
502
  "serde_json 1.0.57 (registry+https://github.com/rust-lang/crates.io-index)",
503
  "serde_json 1.0.57 (registry+https://github.com/rust-lang/crates.io-index)",

+ 5 - 0
Cargo.toml

@@ -4,6 +4,10 @@ version = "0.1.0"
4
 authors = ["open-trade <info@opentradesolutions.com>"]
4
 authors = ["open-trade <info@opentradesolutions.com>"]
5
 edition = "2018"
5
 edition = "2018"
6
 
6
 
7
+[[bin]]
8
+name = "hbbf"
9
+path = "src/hbbf/main.rs"
10
+
7
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
11
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
8
 
12
 
9
 [dependencies]
13
 [dependencies]
@@ -12,6 +16,7 @@ sled = "0.31"
12
 serde_derive = "1.0"
16
 serde_derive = "1.0"
13
 serde = "1.0"
17
 serde = "1.0"
14
 serde_json = "1.0"
18
 serde_json = "1.0"
19
+lazy_static = "1.4"
15
 
20
 
16
 [workspace]
21
 [workspace]
17
 members = ["libs/hbb_common"]
22
 members = ["libs/hbb_common"]

+ 1 - 1
libs/hbb_common

@@ -1 +1 @@
1
-Subproject commit dc39de26981c2916c053762eef6961f6b659fb0c
1
+Subproject commit 8209a1f2d1d11b7019e224fdc161f138415f6d53

+ 83 - 0
src/hbbf/main.rs

@@ -0,0 +1,83 @@
1
+use hbb_common::{
2
+    env_logger, log,
3
+    protobuf::Message as _,
4
+    rendezvous_proto::*,
5
+    sleep,
6
+    tcp::{new_listener, FramedStream},
7
+    tokio, ResultType,
8
+};
9
+use std::{
10
+    collections::HashMap,
11
+    net::SocketAddr,
12
+    sync::{Arc, Mutex},
13
+};
14
+
15
+lazy_static::lazy_static! {
16
+    static ref PEERS: Arc<Mutex<HashMap<String, FramedStream>>> = Arc::new(Mutex::new(HashMap::new()));
17
+}
18
+
19
+#[tokio::main]
20
+async fn main() -> ResultType<()> {
21
+    env_logger::init();
22
+    let addr = "0.0.0.0:21117";
23
+    log::info!("Listening on {}", addr);
24
+    let mut listener = new_listener(addr, true).await?;
25
+    loop {
26
+        tokio::select! {
27
+            Ok((stream, addr)) = listener.accept() => {
28
+                tokio::spawn(async move {
29
+                    make_pair(FramedStream::from(stream), addr).await.ok();
30
+                });
31
+            }
32
+        }
33
+    }
34
+}
35
+
36
+async fn make_pair(stream: FramedStream, addr: SocketAddr) -> ResultType<()> {
37
+    let mut stream = stream;
38
+    if let Some(Ok(bytes)) = stream.next_timeout(30_000).await {
39
+        if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
40
+            if let Some(rendezvous_message::Union::request_forward(rf)) = msg_in.union {
41
+                if !rf.uuid.is_empty() {
42
+                    let peer = PEERS.lock().unwrap().remove(&rf.uuid);
43
+                    if let Some(peer) = peer {
44
+                        log::info!("Forward request {} from {} got paired", rf.uuid, addr);
45
+                        return forward(stream, peer).await;
46
+                    } else {
47
+                        log::info!("New forward request {} from {}", rf.uuid, addr);
48
+                        PEERS.lock().unwrap().insert(rf.uuid.clone(), stream);
49
+                        sleep(30.).await;
50
+                        PEERS.lock().unwrap().remove(&rf.uuid);
51
+                    }
52
+                }
53
+            }
54
+        }
55
+    }
56
+    Ok(())
57
+}
58
+
59
+async fn forward(stream: FramedStream, peer: FramedStream) -> ResultType<()> {
60
+    let mut peer = peer;
61
+    let mut stream = stream;
62
+    peer.set_raw();
63
+    stream.set_raw();
64
+    loop {
65
+        tokio::select! {
66
+            res = peer.next() => {
67
+                if let Some(Ok(bytes)) = res {
68
+                    stream.send_bytes(bytes.into()).await?;
69
+                } else {
70
+                    break;
71
+                }
72
+            },
73
+            res = stream.next() => {
74
+                if let Some(Ok(bytes)) = res {
75
+                    peer.send_bytes(bytes.into()).await?;
76
+                } else {
77
+                    break;
78
+                }
79
+            },
80
+        }
81
+    }
82
+    Ok(())
83
+}

+ 1 - 1
src/main.rs

@@ -8,7 +8,7 @@ use hbbs::*;
8
 async fn main() -> ResultType<()> {
8
 async fn main() -> ResultType<()> {
9
     env_logger::init();
9
     env_logger::init();
10
     let addr = "0.0.0.0:21116";
10
     let addr = "0.0.0.0:21116";
11
-    log::info!("Start Server {}", addr);
11
+    log::info!("Listening on {}", addr);
12
     RendezvousServer::start(&addr).await?;
12
     RendezvousServer::start(&addr).await?;
13
     Ok(())
13
     Ok(())
14
 }
14
 }