Browse Source

working on key

open-trade 5 years ago
parent
commit
d837379a13
4 changed files with 307 additions and 176 deletions
  1. 243 154
      Cargo.lock
  2. 1 1
      libs/hbb_common
  3. 62 20
      src/rendezvous_server.rs
  4. 1 1
      src/sled_async.rs

File diff suppressed because it is too large
+ 243 - 154
Cargo.lock


+ 1 - 1
libs/hbb_common

@@ -1 +1 @@
1
-Subproject commit 1ba798488119d8d31950efca724440feb87cba54
1
+Subproject commit 5e977383041f35f856ee2dfc5fcf07c8300e2da5

+ 62 - 20
src/rendezvous_server.rs

@@ -27,6 +27,7 @@ use std::{
27 27
 struct Peer {
28 28
     socket_addr: SocketAddr,
29 29
     last_reg_time: Instant,
30
+    pk: Vec<u8>,
30 31
 }
31 32
 
32 33
 impl Default for Peer {
@@ -36,6 +37,7 @@ impl Default for Peer {
36 37
             last_reg_time: Instant::now()
37 38
                 .checked_sub(std::time::Duration::from_secs(3600))
38 39
                 .unwrap(),
40
+            pk: Vec::new(),
39 41
         }
40 42
     }
41 43
 }
@@ -44,6 +46,8 @@ impl Default for Peer {
44 46
 struct PeerSerde {
45 47
     #[serde(default)]
46 48
     ip: String,
49
+    #[serde(default)]
50
+    pk: Vec<u8>,
47 51
 }
48 52
 
49 53
 #[derive(Clone)]
@@ -61,27 +65,63 @@ impl PeerMap {
61 65
     }
62 66
 
63 67
     #[inline]
64
-    fn insert(&mut self, key: String, peer: Peer) {
65
-        let ip = peer.socket_addr.ip();
66
-        if self
67
-            .map
68
-            .write()
69
-            .unwrap()
70
-            .insert(key.clone(), peer)
71
-            .is_none()
72
-        {
73
-            let ip = ip.to_string();
74
-            self.db.insert(key, PeerSerde { ip });
68
+    async fn update_addr(&mut self, key: String, socket_addr: SocketAddr) {
69
+        let mut lock = self.map.write().unwrap();
70
+        let last_reg_time = Instant::now();
71
+        if let Some(old) = lock.get_mut(&key) {
72
+            old.socket_addr = socket_addr;
73
+            old.last_reg_time = last_reg_time;
74
+        } else {
75
+            let mut me = self.clone();
76
+            tokio::spawn(async move {
77
+                let v = me.db.get(key.clone()).await;
78
+                let pk = if let Some(v) = super::SledAsync::deserialize::<PeerSerde>(&v) {
79
+                    v.pk
80
+                } else {
81
+                    Vec::new()
82
+                };
83
+                me.map.write().unwrap().insert(
84
+                    key,
85
+                    Peer {
86
+                        socket_addr,
87
+                        last_reg_time,
88
+                        pk,
89
+                    },
90
+                );
91
+            });
75 92
         }
76 93
     }
77 94
 
95
+    #[inline]
96
+    fn update_key(&mut self, key: String, socket_addr: SocketAddr, pk: Vec<u8>) {
97
+        let mut lock = self.map.write().unwrap();
98
+        lock.insert(
99
+            key.clone(),
100
+            Peer {
101
+                socket_addr,
102
+                last_reg_time: Instant::now(),
103
+                pk: pk.clone(),
104
+            },
105
+        );
106
+        let ip = socket_addr.ip().to_string();
107
+        self.db.insert(key, PeerSerde { ip, pk });
108
+    }
109
+
78 110
     #[inline]
79 111
     async fn get(&mut self, key: String) -> Option<Peer> {
80 112
         let p = self.map.read().unwrap().get(&key).map(|x| x.clone());
81 113
         if p.is_some() {
82 114
             return p;
83 115
         } else {
84
-            if let Some(_) = self.db.get(key).await {
116
+            let v = self.db.get(key.clone()).await;
117
+            if let Some(v) = super::SledAsync::deserialize::<PeerSerde>(&v) {
118
+                self.map.write().unwrap().insert(
119
+                    key,
120
+                    Peer {
121
+                        pk: v.pk,
122
+                        ..Default::default()
123
+                    },
124
+                );
85 125
                 return Some(Peer::default());
86 126
             }
87 127
         }
@@ -168,18 +208,20 @@ impl RendezvousServer {
168 208
                     // B registered
169 209
                     if rp.id.len() > 0 {
170 210
                         log::debug!("New peer registered: {:?} {:?}", &rp.id, &addr);
171
-                        self.pm.insert(
172
-                            rp.id,
173
-                            Peer {
174
-                                socket_addr: addr,
175
-                                last_reg_time: Instant::now(),
176
-                            },
177
-                        );
211
+                        self.pm.update_addr(rp.id, addr).await;
178 212
                         let mut msg_out = RendezvousMessage::new();
179 213
                         msg_out.set_register_peer_response(RegisterPeerResponse::default());
180 214
                         socket.send(&msg_out, addr).await?
181 215
                     }
182 216
                 }
217
+                Some(rendezvous_message::Union::register_key(rk)) => {
218
+                    let id = rk.id;
219
+                    if let Some(peer) = self.pm.get(id.clone()).await {
220
+                        if peer.pk.is_empty() {
221
+                            self.pm.update_key(id, addr, rk.key);
222
+                        }
223
+                    }
224
+                }
183 225
                 Some(rendezvous_message::Union::punch_hole_request(ph)) => {
184 226
                     let id = ph.id;
185 227
                     if self.pm.is_in_memory(&id) {
@@ -199,7 +241,7 @@ impl RendezvousServer {
199 241
                     self.handle_local_addr(&la, addr, Some(socket)).await?;
200 242
                 }
201 243
                 Some(rendezvous_message::Union::system_info(info)) => {
202
-                    log::info!("{}", info.value);                    
244
+                    log::info!("{}", info.value);
203 245
                 }
204 246
                 _ => {}
205 247
             }

+ 1 - 1
src/sled_async.rs

@@ -79,7 +79,7 @@ impl SledAsync {
79 79
     }
80 80
 
81 81
     #[inline]
82
-    pub fn _deserialize<'a, T: serde::Deserialize<'a>>(v: &'a Option<sled::IVec>) -> Option<T> {
82
+    pub fn deserialize<'a, T: serde::Deserialize<'a>>(v: &'a Option<sled::IVec>) -> Option<T> {
83 83
         if let Some(v) = v {
84 84
             if let Ok(v) = std::str::from_utf8(v) {
85 85
                 if let Ok(v) = serde_json::from_str::<T>(&v) {