Просмотр исходного кода

handle tcp punch response via tcp connection

open-trade лет назад: 5
Родитель
Сommit
4e07acdb6c
1 измененных файлов с 70 добавлено и 22 удалено
  1. 70 22
      src/rendezvous_server.rs

+ 70 - 22
src/rendezvous_server.rs

@@ -59,18 +59,30 @@ impl RendezvousServer {
59
                     let tcp_punch = rs.tcp_punch.clone();
59
                     let tcp_punch = rs.tcp_punch.clone();
60
                     tcp_punch.lock().unwrap().insert(addr, a);
60
                     tcp_punch.lock().unwrap().insert(addr, a);
61
                     let tx = tx.clone();
61
                     let tx = tx.clone();
62
+                    let mut rs = Self {
63
+                        peer_map: PeerMap::new(),
64
+                        tcp_punch: tcp_punch,
65
+                    };
62
                     tokio::spawn(async move {
66
                     tokio::spawn(async move {
63
                         while let Some(Ok(bytes)) = b.next().await {
67
                         while let Some(Ok(bytes)) = b.next().await {
64
                             if let Ok(msg_in) = parse_from_bytes::<RendezvousMessage>(&bytes) {
68
                             if let Ok(msg_in) = parse_from_bytes::<RendezvousMessage>(&bytes) {
65
                                 match msg_in.union {
69
                                 match msg_in.union {
66
                                     Some(rendezvous_message::Union::punch_hole_request(ph)) => {
70
                                     Some(rendezvous_message::Union::punch_hole_request(ph)) => {
67
-                                        tx.send((addr, ph.id)).ok();
71
+                                        allow_err!(tx.send((addr, ph.id)));
72
+                                    }
73
+                                    Some(rendezvous_message::Union::punch_hole_sent(phs)) => {
74
+                                        allow_err!(rs.handle_hole_sent(&phs, addr, None).await);
75
+                                        break;
76
+                                    }
77
+                                    Some(rendezvous_message::Union::local_addr(la)) => {
78
+                                        allow_err!(rs.handle_local_addr(&la, addr, None).await);
79
+                                        break;
68
                                     }
80
                                     }
69
                                     _ => {}
81
                                     _ => {}
70
                                 }
82
                                 }
71
                             }
83
                             }
72
                         }
84
                         }
73
-                        tcp_punch.lock().unwrap().remove(&addr);
85
+                        rs.tcp_punch.lock().unwrap().remove(&addr);
74
                         log::debug!("Tcp connection from {:?} closed", addr);
86
                         log::debug!("Tcp connection from {:?} closed", addr);
75
                     });
87
                     });
76
                 }
88
                 }
@@ -106,28 +118,10 @@ impl RendezvousServer {
106
                     self.handle_punch_hole_request(addr, &ph.id, socket).await?;
118
                     self.handle_punch_hole_request(addr, &ph.id, socket).await?;
107
                 }
119
                 }
108
                 Some(rendezvous_message::Union::punch_hole_sent(phs)) => {
120
                 Some(rendezvous_message::Union::punch_hole_sent(phs)) => {
109
-                    // punch hole sent from B, tell A that B is ready to be connected
110
-                    let addr_a = AddrMangle::decode(&phs.socket_addr);
111
-                    log::debug!("Punch hole response to {:?} from {:?}", &addr_a, &addr);
112
-                    let mut msg_out = RendezvousMessage::new();
113
-                    msg_out.set_punch_hole_response(PunchHoleResponse {
114
-                        socket_addr: AddrMangle::encode(addr),
115
-                        ..Default::default()
116
-                    });
117
-                    socket.send(&msg_out, addr_a).await?;
118
-                    self.send_to_tcp(&msg_out, addr_a).await?;
121
+                    self.handle_hole_sent(&phs, addr, Some(socket)).await?;
119
                 }
122
                 }
120
                 Some(rendezvous_message::Union::local_addr(la)) => {
123
                 Some(rendezvous_message::Union::local_addr(la)) => {
121
-                    // forward local addrs of B to A
122
-                    let addr_a = AddrMangle::decode(&la.socket_addr);
123
-                    log::debug!("Local addrs response to {:?} from {:?}", &addr_a, &addr);
124
-                    let mut msg_out = RendezvousMessage::new();
125
-                    msg_out.set_punch_hole_response(PunchHoleResponse {
126
-                        socket_addr: la.local_addr,
127
-                        ..Default::default()
128
-                    });
129
-                    socket.send(&msg_out, addr_a).await?;
130
-                    self.send_to_tcp(&msg_out, addr_a).await?;
124
+                    self.handle_local_addr(&la, addr, Some(socket)).await?;
131
                 }
125
                 }
132
                 _ => {}
126
                 _ => {}
133
             }
127
             }
@@ -135,6 +129,60 @@ impl RendezvousServer {
135
         Ok(())
129
         Ok(())
136
     }
130
     }
137
 
131
 
132
+    async fn handle_hole_sent<'a>(
133
+        &mut self,
134
+        phs: &PunchHoleSent,
135
+        addr: SocketAddr,
136
+        socket: Option<&'a mut FramedSocket>,
137
+    ) -> ResultType<()> {
138
+        // punch hole sent from B, tell A that B is ready to be connected
139
+        let addr_a = AddrMangle::decode(&phs.socket_addr);
140
+        log::debug!(
141
+            "{} punch hole response to {:?} from {:?}",
142
+            if socket.is_none() { "TCP" } else { "UDP" },
143
+            &addr_a,
144
+            &addr
145
+        );
146
+        let mut msg_out = RendezvousMessage::new();
147
+        msg_out.set_punch_hole_response(PunchHoleResponse {
148
+            socket_addr: AddrMangle::encode(addr),
149
+            ..Default::default()
150
+        });
151
+        if let Some(socket) = socket {
152
+            socket.send(&msg_out, addr_a).await?;
153
+        } else {
154
+            self.send_to_tcp(&msg_out, addr_a).await?;
155
+        }
156
+        Ok(())
157
+    }
158
+
159
+    async fn handle_local_addr<'a>(
160
+        &mut self,
161
+        la: &LocalAddr,
162
+        addr: SocketAddr,
163
+        socket: Option<&'a mut FramedSocket>,
164
+    ) -> ResultType<()> {
165
+        // forward local addrs of B to A
166
+        let addr_a = AddrMangle::decode(&la.socket_addr);
167
+        log::debug!(
168
+            "{} local addrs response to {:?} from {:?}",
169
+            if socket.is_none() { "TCP" } else { "UDP" },
170
+            &addr_a,
171
+            &addr
172
+        );
173
+        let mut msg_out = RendezvousMessage::new();
174
+        msg_out.set_punch_hole_response(PunchHoleResponse {
175
+            socket_addr: la.local_addr.clone(),
176
+            ..Default::default()
177
+        });
178
+        if let Some(socket) = socket {
179
+            socket.send(&msg_out, addr_a).await?;
180
+        } else {
181
+            self.send_to_tcp(&msg_out, addr_a).await?;
182
+        }
183
+        Ok(())
184
+    }
185
+
138
     async fn handle_punch_hole_request(
186
     async fn handle_punch_hole_request(
139
         &mut self,
187
         &mut self,
140
         addr: SocketAddr,
188
         addr: SocketAddr,