open-trade 5 years ago
parent
commit
d8b829818e
1 changed files with 33 additions and 37 deletions
  1. 33 37
      src/rendezvous_server.rs

+ 33 - 37
src/rendezvous_server.rs

@@ -1,5 +1,5 @@
1 1
 use super::message_proto::*;
2
-use bytes::Bytes;
2
+use bytes::{Bytes, BytesMut};
3 3
 use futures::SinkExt;
4 4
 use protobuf::{parse_from_bytes, Message as _};
5 5
 use std::{
@@ -8,11 +8,7 @@ use std::{
8 8
     net::{Ipv4Addr, SocketAddr, SocketAddrV4},
9 9
     time::{Duration, SystemTime, UNIX_EPOCH},
10 10
 };
11
-use tokio::{
12
-    net::UdpSocket,
13
-    stream::StreamExt,
14
-    time::{self, delay_for},
15
-};
11
+use tokio::{net::UdpSocket, stream::StreamExt, time::delay_for};
16 12
 use tokio_util::{codec::BytesCodec, udp::UdpFramed};
17 13
 
18 14
 /// Certain router and firewalls scan the packet and if they
@@ -74,43 +70,43 @@ impl RendezvousServer {
74 70
             peer_map: PeerMap::new(),
75 71
         };
76 72
         while let Some(Ok((bytes, addr))) = socket.next().await {
77
-            if let SocketAddr::V4(addr_v4) = addr {
78
-                if let Ok(msg_in) = parse_from_bytes::<Message>(&bytes) {
79
-                    match msg_in.union {
80
-                        Some(Message_oneof_union::register_peer(rp)) => {
81
-                            if rp.hbb_addr.len() > 0 {
82
-                                rs.peer_map.insert(
83
-                                    rp.hbb_addr,
84
-                                    Peer {
85
-                                        socket_addr: addr_v4,
86
-                                    },
87
-                                );
88
-                            }
89
-                        }
90
-                        Some(Message_oneof_union::peek_peer(pp)) => {
91
-                            rs.handle_peek_peer(&pp, addr, &mut socket).await?;
92
-                        }
93
-                        _ => {}
94
-                    }
95
-                }
96
-            }
73
+            rs.handle_msg(&bytes, addr, &mut socket).await?;
97 74
         }
98 75
         Ok(())
99 76
     }
100 77
 
101
-    pub async fn handle_peek_peer(
102
-        &self,
103
-        pp: &PeekPeer,
78
+    pub async fn handle_msg(
79
+        &mut self,
80
+        bytes: &BytesMut,
104 81
         addr: SocketAddr,
105 82
         socket: &mut FramedSocket,
106 83
     ) -> ResultType {
107
-        if let Some(peer) = self.peer_map.get(&pp.hbb_addr) {
108
-            let mut msg_out = Message::new();
109
-            msg_out.set_peek_peer_response(PeekPeerResponse {
110
-                socket_addr: V4AddrMangle::encode(&peer.socket_addr),
111
-                ..Default::default()
112
-            });
113
-            send_to(&msg_out, addr, socket).await?;
84
+        if let Ok(msg_in) = parse_from_bytes::<Message>(&bytes) {
85
+            if let SocketAddr::V4(addr_v4) = addr {
86
+                match msg_in.union {
87
+                    Some(Message_oneof_union::register_peer(rp)) => {
88
+                        if rp.hbb_addr.len() > 0 {
89
+                            self.peer_map.insert(
90
+                                rp.hbb_addr,
91
+                                Peer {
92
+                                    socket_addr: addr_v4,
93
+                                },
94
+                            );
95
+                        }
96
+                    }
97
+                    Some(Message_oneof_union::peek_peer(pp)) => {
98
+                        if let Some(peer) = self.peer_map.get(&pp.hbb_addr) {
99
+                            let mut msg_out = Message::new();
100
+                            msg_out.set_peek_peer_response(PeekPeerResponse {
101
+                                socket_addr: V4AddrMangle::encode(&peer.socket_addr),
102
+                                ..Default::default()
103
+                            });
104
+                            send_to(&msg_out, addr, socket).await?;
105
+                        }
106
+                    }
107
+                    _ => {}
108
+                }
109
+            }
114 110
         }
115 111
         Ok(())
116 112
     }
@@ -159,7 +155,7 @@ mod tests {
159 155
             });
160 156
             send_to(&msg_out, to_addr, &mut socket).await;
161 157
             if let Ok(Some(Ok((bytes, _)))) =
162
-                time::timeout(Duration::from_millis(1), socket.next()).await
158
+                tokio::time::timeout(Duration::from_millis(1), socket.next()).await
163 159
             {
164 160
                 if let Ok(msg_in) = parse_from_bytes::<Message>(&bytes) {
165 161
                     assert_eq!(