Просмотр исходного кода

seems ok, but need to optimize for database write and read in the future
with cache etc

open-trade лет назад: 5
Родитель
Сommit
2424dcad69
2 измененных файлов с 42 добавлено и 9 удалено
  1. 24 3
      src/rendezvous_server.rs
  2. 18 6
      src/sled_async.rs

+ 24 - 3
src/rendezvous_server.rs

@@ -29,6 +29,17 @@ struct Peer {
29 29
     last_reg_time: Instant,
30 30
 }
31 31
 
32
+impl Default for Peer {
33
+    fn default() -> Self {
34
+        Self {
35
+            socket_addr: "0.0.0.0:0".parse().unwrap(),
36
+            last_reg_time: Instant::now()
37
+                .checked_sub(std::time::Duration::from_secs(3600))
38
+                .unwrap(),
39
+        }
40
+    }
41
+}
42
+
32 43
 #[derive(Debug, Serialize, Deserialize, Default)]
33 44
 struct PeerSerde {
34 45
     #[serde(default)]
@@ -45,13 +56,23 @@ impl PeerMap {
45 56
     fn new() -> ResultType<Self> {
46 57
         Ok(Self {
47 58
             map: Default::default(),
48
-            db: super::SledAsync::new("./sled.db")?,
59
+            db: super::SledAsync::new("./sled.db", true)?,
49 60
         })
50 61
     }
51 62
 
52 63
     #[inline]
53 64
     fn insert(&mut self, key: String, peer: Peer) {
54
-        if self.map.write().unwrap().insert(key, peer).is_none() {}
65
+        let ip = peer.socket_addr.ip();
66
+        if self
67
+            .map
68
+            .write()
69
+            .unwrap()
70
+            .insert(key.clone(), peer)
71
+            .is_none()
72
+        {
73
+            let ip = ip.to_string();
74
+            self.db.insert(key, PeerSerde { ip });
75
+        }
55 76
     }
56 77
 
57 78
     #[inline]
@@ -61,7 +82,7 @@ impl PeerMap {
61 82
             return p;
62 83
         } else {
63 84
             if let Some(_) = self.db.get(key).await {
64
-                // to-do
85
+                return Some(Peer::default());
65 86
             }
66 87
         }
67 88
         None

+ 18 - 6
src/sled_async.rs

@@ -18,11 +18,15 @@ pub struct SledAsync {
18 18
 }
19 19
 
20 20
 impl SledAsync {
21
-    pub fn new(path: &str) -> ResultType<Self> {
22
-        Ok(Self {
21
+    pub fn new(path: &str, run: bool) -> ResultType<Self> {
22
+        let mut res = Self {
23 23
             db: sled::open(path)?,
24 24
             tx: None,
25
-        })
25
+        };
26
+        if run {
27
+            res.run();
28
+        }
29
+        Ok(res)
26 30
     }
27 31
 
28 32
     pub fn run(&mut self) -> std::thread::JoinHandle<()> {
@@ -31,6 +35,7 @@ impl SledAsync {
31 35
         let db = self.db.clone();
32 36
         std::thread::spawn(move || {
33 37
             Self::io_loop(db, rx);
38
+            log::debug!("Exit SledAsync loop");
34 39
         })
35 40
     }
36 41
 
@@ -55,6 +60,13 @@ impl SledAsync {
55 60
         }
56 61
     }
57 62
 
63
+    pub fn _close(self, j: std::thread::JoinHandle<()>) {
64
+        if let Some(tx) = &self.tx {
65
+            allow_err!(tx.send(Action::Close));
66
+        }
67
+        allow_err!(j.join());
68
+    }
69
+
58 70
     pub async fn get(&mut self, key: String) -> Option<sled::IVec> {
59 71
         if let Some(tx) = &self.tx {
60 72
             let (tx_once, mut rx) = mpsc::channel::<Option<sled::IVec>>(1);
@@ -67,7 +79,7 @@ impl SledAsync {
67 79
     }
68 80
 
69 81
     #[inline]
70
-    pub fn deserialize<'a, T: serde::Deserialize<'a>>(v: &'a Option<sled::IVec>) -> Option<T> {
82
+    pub fn _deserialize<'a, T: serde::Deserialize<'a>>(v: &'a Option<sled::IVec>) -> Option<T> {
71 83
         if let Some(v) = v {
72 84
             if let Ok(v) = std::str::from_utf8(v) {
73 85
                 if let Ok(v) = serde_json::from_str::<T>(&v) {
@@ -78,9 +90,9 @@ impl SledAsync {
78 90
         None
79 91
     }
80 92
 
81
-    pub fn insert<'a, T: serde::Serialize>(&mut self, key: String, v: &T) {
93
+    pub fn insert<T: serde::Serialize>(&mut self, key: String, v: T) {
82 94
         if let Some(tx) = &self.tx {
83
-            if let Ok(v) = serde_json::to_vec(v) {
95
+            if let Ok(v) = serde_json::to_vec(&v) {
84 96
                 allow_err!(tx.send(Action::Insert((key, v))));
85 97
             }
86 98
         }