open-trade лет назад: 5
Родитель
Сommit
f3554b153d
5 измененных файлов с 176 добавлено и 13 удалено
  1. 43 0
      Cargo.lock
  2. 3 0
      Cargo.toml
  3. 2 0
      src/lib.rs
  4. 40 13
      src/rendezvous_server.rs
  5. 88 0
      src/sled_async.rs

+ 43 - 0
Cargo.lock

@@ -319,6 +319,9 @@ name = "hbbs"
319 319
 version = "0.1.0"
320 320
 dependencies = [
321 321
  "hbb_common 0.1.0",
322
+ "serde 1.0.110 (registry+https://github.com/rust-lang/crates.io-index)",
323
+ "serde_derive 1.0.110 (registry+https://github.com/rust-lang/crates.io-index)",
324
+ "serde_json 1.0.53 (registry+https://github.com/rust-lang/crates.io-index)",
322 325
  "sled 0.31.0 (registry+https://github.com/rust-lang/crates.io-index)",
323 326
 ]
324 327
 
@@ -346,6 +349,11 @@ dependencies = [
346 349
  "libc 0.2.69 (registry+https://github.com/rust-lang/crates.io-index)",
347 350
 ]
348 351
 
352
+[[package]]
353
+name = "itoa"
354
+version = "0.4.5"
355
+source = "registry+https://github.com/rust-lang/crates.io-index"
356
+
349 357
 [[package]]
350 358
 name = "jobserver"
351 359
 version = "0.1.21"
@@ -774,6 +782,11 @@ dependencies = [
774 782
  "syn 1.0.19 (registry+https://github.com/rust-lang/crates.io-index)",
775 783
 ]
776 784
 
785
+[[package]]
786
+name = "ryu"
787
+version = "1.0.4"
788
+source = "registry+https://github.com/rust-lang/crates.io-index"
789
+
777 790
 [[package]]
778 791
 name = "schannel"
779 792
 version = "0.1.19"
@@ -818,6 +831,31 @@ dependencies = [
818 831
  "libc 0.2.69 (registry+https://github.com/rust-lang/crates.io-index)",
819 832
 ]
820 833
 
834
+[[package]]
835
+name = "serde"
836
+version = "1.0.110"
837
+source = "registry+https://github.com/rust-lang/crates.io-index"
838
+
839
+[[package]]
840
+name = "serde_derive"
841
+version = "1.0.110"
842
+source = "registry+https://github.com/rust-lang/crates.io-index"
843
+dependencies = [
844
+ "proc-macro2 1.0.12 (registry+https://github.com/rust-lang/crates.io-index)",
845
+ "quote 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)",
846
+ "syn 1.0.19 (registry+https://github.com/rust-lang/crates.io-index)",
847
+]
848
+
849
+[[package]]
850
+name = "serde_json"
851
+version = "1.0.53"
852
+source = "registry+https://github.com/rust-lang/crates.io-index"
853
+dependencies = [
854
+ "itoa 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
855
+ "ryu 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)",
856
+ "serde 1.0.110 (registry+https://github.com/rust-lang/crates.io-index)",
857
+]
858
+
821 859
 [[package]]
822 860
 name = "signal-hook-registry"
823 861
 version = "1.2.0"
@@ -1190,6 +1228,7 @@ dependencies = [
1190 1228
 "checksum hermit-abi 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "61565ff7aaace3525556587bd2dc31d4a07071957be715e63ce7b1eccf51a8f4"
1191 1229
 "checksum humantime 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f"
1192 1230
 "checksum iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e"
1231
+"checksum itoa 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)" = "b8b7a7c0c47db5545ed3fef7468ee7bb5b74691498139e4b3f6a20685dc6dd8e"
1193 1232
 "checksum jobserver 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)" = "5c71313ebb9439f74b00d9d2dcec36440beaf57a6aa0623068441dd7cd81a7f2"
1194 1233
 "checksum js-sys 0.3.39 (registry+https://github.com/rust-lang/crates.io-index)" = "fa5a448de267e7358beaf4a5d849518fe9a0c13fce7afd44b06e68550e5562a7"
1195 1234
 "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
@@ -1239,11 +1278,15 @@ dependencies = [
1239 1278
 "checksum rustls 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c0d4a31f5d68413404705d6982529b0e11a9aacd4839d1d6222ee3b8cb4015e1"
1240 1279
 "checksum rustls-native-certs 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a75ffeb84a6bd9d014713119542ce415db3a3e4748f0bfce1e1416cd224a23a5"
1241 1280
 "checksum rustversion 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b3bba175698996010c4f6dce5e7f173b6eb781fce25d2cfc45e27091ce0b79f6"
1281
+"checksum ryu 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "ed3d612bc64430efeb3f7ee6ef26d590dce0c43249217bddc62112540c7941e1"
1242 1282
 "checksum schannel 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)" = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75"
1243 1283
 "checksum scopeguard 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
1244 1284
 "checksum sct 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e3042af939fca8c3453b7af0f1c66e533a15a86169e39de2657310ade8f98d3c"
1245 1285
 "checksum security-framework 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "64808902d7d99f78eaddd2b4e2509713babc3dc3c85ad6f4c447680f3c01e535"
1246 1286
 "checksum security-framework-sys 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "17bf11d99252f512695eb468de5516e5cf75455521e69dfe343f3b74e4748405"
1287
+"checksum serde 1.0.110 (registry+https://github.com/rust-lang/crates.io-index)" = "99e7b308464d16b56eba9964e4972a3eee817760ab60d88c3f86e1fecb08204c"
1288
+"checksum serde_derive 1.0.110 (registry+https://github.com/rust-lang/crates.io-index)" = "818fbf6bfa9a42d3bfcaca148547aa00c7b915bec71d1757aa2d44ca68771984"
1289
+"checksum serde_json 1.0.53 (registry+https://github.com/rust-lang/crates.io-index)" = "993948e75b189211a9b31a7528f950c6adc21f9720b6438ff80a7fa2f864cea2"
1247 1290
 "checksum signal-hook-registry 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "94f478ede9f64724c5d173d7bb56099ec3e2d9fc2774aac65d34b8b890405f41"
1248 1291
 "checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
1249 1292
 "checksum sled 0.31.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8fb6824dde66ad33bf20c6e8476f5b82b871bc8bc3c129a10ea2f7dae5060fa3"

+ 3 - 0
Cargo.toml

@@ -9,6 +9,9 @@ edition = "2018"
9 9
 [dependencies]
10 10
 hbb_common = { path = "libs/hbb_common" }
11 11
 sled = "0.31"
12
+serde_derive = "1.0"
13
+serde = "1.0"
14
+serde_json = "1.0"
12 15
 
13 16
 [workspace]
14 17
 members = ["libs/hbb_common"]

+ 2 - 0
src/lib.rs

@@ -1,2 +1,4 @@
1 1
 mod rendezvous_server;
2
+mod sled_async;
3
+use sled_async::*;
2 4
 pub use rendezvous_server::*;

+ 40 - 13
src/rendezvous_server.rs

@@ -1,7 +1,6 @@
1 1
 use hbb_common::{
2 2
     allow_err,
3
-    bytes::Bytes,
4
-    bytes::BytesMut,
3
+    bytes::{Bytes, BytesMut},
5 4
     bytes_codec::BytesCodec,
6 5
     futures_util::{
7 6
         sink::SinkExt,
@@ -16,6 +15,7 @@ use hbb_common::{
16 15
     udp::FramedSocket,
17 16
     AddrMangle, ResultType,
18 17
 };
18
+use serde_derive::{Deserialize, Serialize};
19 19
 use std::{
20 20
     collections::HashMap,
21 21
     net::SocketAddr,
@@ -29,28 +29,47 @@ struct Peer {
29 29
     last_reg_time: Instant,
30 30
 }
31 31
 
32
+#[derive(Debug, Serialize, Deserialize, Default)]
33
+struct PeerSerde {
34
+    #[serde(default)]
35
+    ip: String,
36
+}
37
+
32 38
 #[derive(Clone)]
33 39
 struct PeerMap {
34 40
     map: Arc<RwLock<HashMap<String, Peer>>>,
35
-    db: sled::Db,
41
+    db: super::SledAsync,
36 42
 }
37 43
 
38 44
 impl PeerMap {
39 45
     fn new() -> ResultType<Self> {
40 46
         Ok(Self {
41 47
             map: Default::default(),
42
-            db: sled::open("./sled.db")?,
48
+            db: super::SledAsync::new("./sled.db")?,
43 49
         })
44 50
     }
45 51
 
46 52
     #[inline]
47 53
     fn insert(&mut self, key: String, peer: Peer) {
48
-        self.map.write().unwrap().insert(key, peer);
54
+        if self.map.write().unwrap().insert(key, peer).is_none() {}
49 55
     }
50 56
 
51 57
     #[inline]
52
-    fn get(&self, key: &str) -> Option<Peer> {
53
-        self.map.read().unwrap().get(key).map(|x| x.clone())
58
+    async fn get(&mut self, key: String) -> Option<Peer> {
59
+        let p = self.map.read().unwrap().get(&key).map(|x| x.clone());
60
+        if p.is_some() {
61
+            return p;
62
+        } else {
63
+            if let Some(_) = self.db.get(key).await {
64
+                // to-do
65
+            }
66
+        }
67
+        None
68
+    }
69
+
70
+    #[inline]
71
+    fn is_in_memory(&self, key: &str) -> bool {
72
+        self.map.read().unwrap().contains_key(key)
54 73
     }
55 74
 }
56 75
 
@@ -93,7 +112,7 @@ impl RendezvousServer {
93 112
                             if let Ok(msg_in) = parse_from_bytes::<RendezvousMessage>(&bytes) {
94 113
                                 match msg_in.union {
95 114
                                     Some(rendezvous_message::Union::punch_hole_request(ph)) => {
96
-                                        allow_err!(rs.handle_tcp_punch_hole_request(addr, &ph.id).await);
115
+                                        allow_err!(rs.handle_tcp_punch_hole_request(addr, ph.id).await);
97 116
                                     }
98 117
                                     Some(rendezvous_message::Union::punch_hole_sent(phs)) => {
99 118
                                         allow_err!(rs.handle_hole_sent(&phs, addr, None).await);
@@ -141,7 +160,15 @@ impl RendezvousServer {
141 160
                     }
142 161
                 }
143 162
                 Some(rendezvous_message::Union::punch_hole_request(ph)) => {
144
-                    self.handle_udp_punch_hole_request(addr, &ph.id).await?;
163
+                    let id = ph.id;
164
+                    if self.pm.is_in_memory(&id) {
165
+                        self.handle_udp_punch_hole_request(addr, id).await?;
166
+                    } else {
167
+                        let mut me = self.clone();
168
+                        tokio::spawn(async move {
169
+                            allow_err!(me.handle_udp_punch_hole_request(addr, id).await);
170
+                        });
171
+                    }
145 172
                 }
146 173
                 Some(rendezvous_message::Union::punch_hole_sent(phs)) => {
147 174
                     self.handle_hole_sent(&phs, addr, Some(socket)).await?;
@@ -215,14 +242,14 @@ impl RendezvousServer {
215 242
     async fn handle_punch_hole_request(
216 243
         &mut self,
217 244
         addr: SocketAddr,
218
-        id: &str,
245
+        id: String,
219 246
     ) -> ResultType<(RendezvousMessage, Option<SocketAddr>)> {
220 247
         // punch hole request from A, forward to B,
221 248
         // check if in same intranet first,
222 249
         // fetch local addrs if in same intranet.
223 250
         // because punch hole won't work if in the same intranet,
224 251
         // all routers will drop such self-connections.
225
-        if let Some(peer) = self.pm.get(id) {
252
+        if let Some(peer) = self.pm.get(id.clone()).await {
226 253
             if peer.last_reg_time.elapsed().as_millis() as i32 >= REG_TIMEOUT {
227 254
                 let mut msg_out = RendezvousMessage::new();
228 255
                 msg_out.set_punch_hole_response(PunchHoleResponse {
@@ -308,7 +335,7 @@ impl RendezvousServer {
308 335
     async fn handle_tcp_punch_hole_request(
309 336
         &mut self,
310 337
         addr: SocketAddr,
311
-        id: &str,
338
+        id: String,
312 339
     ) -> ResultType<()> {
313 340
         let (msg, to_addr) = self.handle_punch_hole_request(addr, id).await?;
314 341
         if let Some(addr) = to_addr {
@@ -323,7 +350,7 @@ impl RendezvousServer {
323 350
     async fn handle_udp_punch_hole_request(
324 351
         &mut self,
325 352
         addr: SocketAddr,
326
-        id: &str,
353
+        id: String,
327 354
     ) -> ResultType<()> {
328 355
         let (msg, to_addr) = self.handle_punch_hole_request(addr, id).await?;
329 356
         self.tx.send((

+ 88 - 0
src/sled_async.rs

@@ -0,0 +1,88 @@
1
+use hbb_common::{
2
+    allow_err, log,
3
+    tokio::{self, sync::mpsc},
4
+    ResultType,
5
+};
6
+
7
+#[derive(Debug)]
8
+enum Action {
9
+    Insert((String, Vec<u8>)),
10
+    Get((String, mpsc::Sender<Option<sled::IVec>>)),
11
+    Close,
12
+}
13
+
14
+#[derive(Clone)]
15
+pub struct SledAsync {
16
+    db: sled::Db,
17
+    tx: Option<mpsc::UnboundedSender<Action>>,
18
+}
19
+
20
+impl SledAsync {
21
+    pub fn new(path: &str) -> ResultType<Self> {
22
+        Ok(Self {
23
+            db: sled::open(path)?,
24
+            tx: None,
25
+        })
26
+    }
27
+
28
+    pub fn run(&mut self) -> std::thread::JoinHandle<()> {
29
+        let (tx, rx) = mpsc::unbounded_channel::<Action>();
30
+        self.tx = Some(tx);
31
+        let db = self.db.clone();
32
+        std::thread::spawn(move || {
33
+            Self::io_loop(db, rx);
34
+        })
35
+    }
36
+
37
+    #[tokio::main(basic_scheduler)]
38
+    async fn io_loop(db: sled::Db, rx: mpsc::UnboundedReceiver<Action>) {
39
+        let mut rx = rx;
40
+        while let Some(x) = rx.recv().await {
41
+            match x {
42
+                Action::Insert((key, value)) => {
43
+                    allow_err!(db.insert(key, value));
44
+                }
45
+                Action::Get((key, sender)) => {
46
+                    let mut sender = sender;
47
+                    allow_err!(
48
+                        sender
49
+                            .send(if let Ok(v) = db.get(key) { v } else { None })
50
+                            .await
51
+                    );
52
+                }
53
+                Action::Close => break,
54
+            }
55
+        }
56
+    }
57
+
58
+    pub async fn get(&mut self, key: String) -> Option<sled::IVec> {
59
+        if let Some(tx) = &self.tx {
60
+            let (tx_once, mut rx) = mpsc::channel::<Option<sled::IVec>>(1);
61
+            allow_err!(tx.send(Action::Get((key, tx_once))));
62
+            if let Some(v) = rx.recv().await {
63
+                return v;
64
+            }
65
+        }
66
+        None
67
+    }
68
+
69
+    #[inline]
70
+    pub fn deserialize<'a, T: serde::Deserialize<'a>>(v: &'a Option<sled::IVec>) -> Option<T> {
71
+        if let Some(v) = v {
72
+            if let Ok(v) = std::str::from_utf8(v) {
73
+                if let Ok(v) = serde_json::from_str::<T>(&v) {
74
+                    return Some(v);
75
+                }
76
+            }
77
+        }
78
+        None
79
+    }
80
+
81
+    pub fn insert<'a, T: serde::Serialize>(&mut self, key: String, v: &T) {
82
+        if let Some(tx) = &self.tx {
83
+            if let Ok(v) = serde_json::to_vec(v) {
84
+                allow_err!(tx.send(Action::Insert((key, v))));
85
+            }
86
+        }
87
+    }
88
+}