open-trade лет назад: 5
Родитель
Сommit
2812effd6d
2 измененных файлов с 86 добавлено и 56 удалено
  1. 1 1
      libs/hbb_common
  2. 85 55
      src/rendezvous_server.rs

+ 1 - 1
libs/hbb_common

@@ -1 +1 @@
1
-Subproject commit a9536bcc127a9d8057a9b7ffbe486506a7b963de
1
+Subproject commit 974815baaaa2efd886be73c3be8fb5931bcc29c4

+ 85 - 55
src/rendezvous_server.rs

@@ -3,19 +3,14 @@ use futures::SinkExt;
3 3
 use hbb_common::{
4 4
     message_proto::*,
5 5
     protobuf::{parse_from_bytes, Message as _},
6
-    V4AddrMangle,
7
-};
8
-use std::{
9
-    collections::HashMap,
10
-    error::Error,
11
-    net::{SocketAddr, SocketAddrV4},
12
-    time::Duration,
6
+    AddrMangle,
13 7
 };
8
+use std::{collections::HashMap, error::Error, net::SocketAddr, time::Duration};
14 9
 use tokio::{net::UdpSocket, stream::StreamExt, time::delay_for};
15 10
 use tokio_util::{codec::BytesCodec, udp::UdpFramed};
16 11
 
17 12
 pub struct Peer {
18
-    socket_addr: SocketAddrV4,
13
+    socket_addr: SocketAddr,
19 14
 }
20 15
 
21 16
 type PeerMap = HashMap<String, Peer>;
@@ -48,41 +43,35 @@ impl RendezvousServer {
48 43
         socket: &mut FramedSocket,
49 44
     ) -> ResultType {
50 45
         if let Ok(msg_in) = parse_from_bytes::<Message>(&bytes) {
51
-            if let SocketAddr::V4(addr_v4) = addr {
52
-                match msg_in.union {
53
-                    Some(Message_oneof_union::register_peer(rp)) => {
54
-                        if rp.hbb_addr.len() > 0 {
55
-                            self.peer_map.insert(
56
-                                rp.hbb_addr,
57
-                                Peer {
58
-                                    socket_addr: addr_v4,
59
-                                },
60
-                            );
61
-                        }
62
-                    }
63
-                    Some(Message_oneof_union::punch_hole_request(ph)) => {
64
-                        // punch hole request from A, forward to B
65
-                        if let Some(peer) = self.peer_map.get(&ph.hbb_addr) {
66
-                            let mut msg_out = Message::new();
67
-                            msg_out.set_punch_hole(PunchHole {
68
-                                socket_addr: V4AddrMangle::encode(&peer.socket_addr),
69
-                                ..Default::default()
70
-                            });
71
-                            send_to(&msg_out, addr, socket).await?;
72
-                        }
46
+            match msg_in.union {
47
+                Some(Message_oneof_union::register_peer(rp)) => {
48
+                    if rp.hbb_addr.len() > 0 {
49
+                        self.peer_map
50
+                            .insert(rp.hbb_addr, Peer { socket_addr: addr });
73 51
                     }
74
-                    Some(Message_oneof_union::punch_hole_sent(phs)) => {
75
-                        // punch hole sent from B, tell A that B ready
76
-                        let addr_a = V4AddrMangle::decode(&phs.socket_addr);
52
+                }
53
+                Some(Message_oneof_union::punch_hole_request(ph)) => {
54
+                    // punch hole request from A, forward to B
55
+                    if let Some(peer) = self.peer_map.get(&ph.hbb_addr) {
77 56
                         let mut msg_out = Message::new();
78
-                        msg_out.set_punch_hole_response(PunchHoleResponse {
79
-                            socket_addr: V4AddrMangle::encode(&addr_v4),
57
+                        msg_out.set_punch_hole(PunchHole {
58
+                            socket_addr: AddrMangle::encode(&addr),
80 59
                             ..Default::default()
81 60
                         });
82
-                        send_to(&msg_out, SocketAddr::V4(addr_a), socket).await?;
61
+                        send_to(&msg_out, peer.socket_addr, socket).await?;
83 62
                     }
84
-                    _ => {}
85 63
                 }
64
+                Some(Message_oneof_union::punch_hole_sent(phs)) => {
65
+                    // punch hole sent from B, tell A that B ready
66
+                    let addr_a = AddrMangle::decode(&phs.socket_addr);
67
+                    let mut msg_out = Message::new();
68
+                    msg_out.set_punch_hole_response(PunchHoleResponse {
69
+                        socket_addr: AddrMangle::encode(&addr),
70
+                        ..Default::default()
71
+                    });
72
+                    send_to(&msg_out, addr_a, socket).await?;
73
+                }
74
+                _ => {}
86 75
             }
87 76
         }
88 77
         Ok(())
@@ -109,36 +98,77 @@ mod tests {
109 98
     #[allow(unused_must_use)]
110 99
     #[tokio::main]
111 100
     async fn test_rs_async() {
112
-        let server_addr = "0.0.0.0:21116";
113
-        let f1 = RendezvousServer::start(server_addr);
114
-        let to_addr = server_addr.parse().unwrap();
101
+        let mut port_server: u16 = 0;
102
+        let socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
103
+        if let SocketAddr::V4(addr) = socket.local_addr().unwrap() {
104
+            port_server = addr.port();
105
+        }
106
+        drop(socket);
107
+        let addr_server = format!("127.0.0.1:{}", port_server);
108
+        let f1 = RendezvousServer::start(&addr_server);
109
+        let addr_server = addr_server.parse().unwrap();
115 110
         let f2 = async {
116
-            let socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
117
-            let local_addr = socket.local_addr().unwrap();
118
-            let mut socket = UdpFramed::new(socket, BytesCodec::new());
111
+            // B register it to server
112
+            let socket_b = UdpSocket::bind("127.0.0.1:0").await.unwrap();
113
+            let local_addr_b = socket_b.local_addr().unwrap();
114
+            let mut socket_b = UdpFramed::new(socket_b, BytesCodec::new());
119 115
             let mut msg_out = Message::new();
120 116
             msg_out.set_register_peer(RegisterPeer {
121 117
                 hbb_addr: "123".to_string(),
122 118
                 ..Default::default()
123 119
             });
124
-            send_to(&msg_out, to_addr, &mut socket).await;
120
+            send_to(&msg_out, addr_server, &mut socket_b).await;
121
+
122
+            // A send punch request to server
123
+            let socket_a = UdpSocket::bind("127.0.0.1:0").await.unwrap();
124
+            let local_addr_a = socket_a.local_addr().unwrap();
125
+            let mut socket_a = UdpFramed::new(socket_a, BytesCodec::new());
125 126
             msg_out.set_punch_hole_request(PunchHoleRequest {
126 127
                 hbb_addr: "123".to_string(),
127 128
                 ..Default::default()
128 129
             });
129
-            send_to(&msg_out, to_addr, &mut socket).await;
130
-            if let Ok(Some(Ok((bytes, _)))) =
131
-                tokio::time::timeout(Duration::from_millis(1), socket.next()).await
130
+            send_to(&msg_out, addr_server, &mut socket_a).await;
131
+
132
+            println!(
133
+                "A {:?} request punch hole to B {:?} via server {:?}",
134
+                local_addr_a, local_addr_b, addr_server,
135
+            );
136
+
137
+            // on B side, responsed to A's punch request forwarded from server
138
+            if let Ok(Some(Ok((bytes, addr)))) =
139
+                tokio::time::timeout(Duration::from_millis(1000), socket_b.next()).await
132 140
             {
133
-                if let Ok(msg_in) = parse_from_bytes::<Message>(&bytes) {
134
-                    assert_eq!(
135
-                        local_addr,
136
-                        SocketAddr::V4(V4AddrMangle::decode(
137
-                            &msg_in.get_punch_hole_response().socket_addr[..]
138
-                        ))
139
-                    );
140
-                }
141
+                assert_eq!(addr_server, addr);
142
+                let msg_in = parse_from_bytes::<Message>(&bytes).unwrap();
143
+                let remote_addr_a = AddrMangle::decode(&msg_in.get_punch_hole().socket_addr[..]);
144
+                assert_eq!(local_addr_a, remote_addr_a);
145
+
146
+                // B punch A
147
+                socket_b
148
+                    .get_mut()
149
+                    .send_to(&b"SYN"[..], &remote_addr_a)
150
+                    .await;
151
+
152
+                msg_out.set_punch_hole_sent(PunchHoleSent {
153
+                    socket_addr: AddrMangle::encode(&remote_addr_a),
154
+                    ..Default::default()
155
+                });
156
+                send_to(&msg_out, addr_server, &mut socket_b).await;
141 157
             }
158
+
159
+            // on A side
160
+            socket_a.next().await; // skip "SYN"
161
+            if let Ok(Some(Ok((bytes, addr)))) =
162
+                tokio::time::timeout(Duration::from_millis(1000), socket_a.next()).await
163
+            {
164
+                assert_eq!(addr_server, addr);
165
+                let msg_in = parse_from_bytes::<Message>(&bytes).unwrap();
166
+                let remote_addr_b =
167
+                    AddrMangle::decode(&msg_in.get_punch_hole_response().socket_addr[..]);
168
+                println!("{:?}", msg_in);
169
+                assert_eq!(local_addr_b, remote_addr_b);
170
+            }
171
+
142 172
             if true {
143 173
                 Err(Box::new(simple_error::SimpleError::new("done")))
144 174
             } else {