Browse Source

tested, will refactor to spawn

open-trade 5 years ago
parent
commit
5be70826aa
3 changed files with 50 additions and 61 deletions
  1. 7 46
      Cargo.lock
  2. 1 1
      Cargo.toml
  3. 42 14
      src/rendezvous_server.rs

+ 7 - 46
Cargo.lock

@@ -23,11 +23,6 @@ dependencies = [
23 23
  "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
24 24
 ]
25 25
 
26
-[[package]]
27
-name = "autocfg"
28
-version = "1.0.0"
29
-source = "registry+https://github.com/rust-lang/crates.io-index"
30
-
31 26
 [[package]]
32 27
 name = "bitflags"
33 28
 version = "1.2.1"
@@ -43,16 +38,6 @@ name = "cfg-if"
43 38
 version = "0.1.10"
44 39
 source = "registry+https://github.com/rust-lang/crates.io-index"
45 40
 
46
-[[package]]
47
-name = "crossbeam-utils"
48
-version = "0.7.2"
49
-source = "registry+https://github.com/rust-lang/crates.io-index"
50
-dependencies = [
51
- "autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
52
- "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
53
- "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
54
-]
55
-
56 41
 [[package]]
57 42
 name = "env_logger"
58 43
 version = "0.7.1"
@@ -84,11 +69,6 @@ name = "fuchsia-zircon-sys"
84 69
 version = "0.3.3"
85 70
 source = "registry+https://github.com/rust-lang/crates.io-index"
86 71
 
87
-[[package]]
88
-name = "futures"
89
-version = "0.1.29"
90
-source = "registry+https://github.com/rust-lang/crates.io-index"
91
-
92 72
 [[package]]
93 73
 name = "futures"
94 74
 version = "0.3.4"
@@ -181,8 +161,8 @@ dependencies = [
181 161
  "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
182 162
  "protobuf 2.10.2 (registry+https://github.com/rust-lang/crates.io-index)",
183 163
  "protobuf-codegen-pure 2.10.2 (registry+https://github.com/rust-lang/crates.io-index)",
164
+ "simple-error 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
184 165
  "tokio 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)",
185
- "tokio-timer 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)",
186 166
  "tokio-util 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
187 167
 ]
188 168
 
@@ -418,6 +398,11 @@ dependencies = [
418 398
  "libc 0.2.67 (registry+https://github.com/rust-lang/crates.io-index)",
419 399
 ]
420 400
 
401
+[[package]]
402
+name = "simple-error"
403
+version = "0.2.1"
404
+source = "registry+https://github.com/rust-lang/crates.io-index"
405
+
421 406
 [[package]]
422 407
 name = "slab"
423 408
 version = "0.4.2"
@@ -483,15 +468,6 @@ dependencies = [
483 468
  "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
484 469
 ]
485 470
 
486
-[[package]]
487
-name = "tokio-executor"
488
-version = "0.1.10"
489
-source = "registry+https://github.com/rust-lang/crates.io-index"
490
-dependencies = [
491
- "crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
492
- "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
493
-]
494
-
495 471
 [[package]]
496 472
 name = "tokio-macros"
497 473
 version = "0.2.5"
@@ -502,17 +478,6 @@ dependencies = [
502 478
  "syn 1.0.16 (registry+https://github.com/rust-lang/crates.io-index)",
503 479
 ]
504 480
 
505
-[[package]]
506
-name = "tokio-timer"
507
-version = "0.2.13"
508
-source = "registry+https://github.com/rust-lang/crates.io-index"
509
-dependencies = [
510
- "crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
511
- "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
512
- "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
513
- "tokio-executor 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
514
-]
515
-
516 481
 [[package]]
517 482
 name = "tokio-util"
518 483
 version = "0.3.0"
@@ -582,16 +547,13 @@ dependencies = [
582 547
 "checksum aho-corasick 0.7.9 (registry+https://github.com/rust-lang/crates.io-index)" = "d5e63fd144e18ba274ae7095c0197a870a7b9468abc801dd62f190d80817d2ec"
583 548
 "checksum arc-swap 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "d7b8a9123b8027467bce0099fe556c628a53c8d83df0507084c31e9ba2e39aff"
584 549
 "checksum atty 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)" = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
585
-"checksum autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d"
586 550
 "checksum bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
587 551
 "checksum bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)" = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1"
588 552
 "checksum cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
589
-"checksum crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8"
590 553
 "checksum env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36"
591 554
 "checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3"
592 555
 "checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82"
593 556
 "checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
594
-"checksum futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)" = "1b980f2816d6ee8673b6517b52cb0e808a180efc92e5c19d02cdda79066703ef"
595 557
 "checksum futures 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5c329ae8753502fb44ae4fc2b622fa2a94652c41e795143765ba0927f92ab780"
596 558
 "checksum futures-channel 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "f0c77d04ce8edd9cb903932b608268b3fffec4163dc053b3b402bf47eac1f1a8"
597 559
 "checksum futures-core 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "f25592f769825e89b92358db00d26f965761e094951ac44d3663ef25b7ac464a"
@@ -630,15 +592,14 @@ dependencies = [
630 592
 "checksum regex 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "322cf97724bea3ee221b78fe25ac9c46114ebb51747ad5babd51a2fc6a8235a8"
631 593
 "checksum regex-syntax 0.6.16 (registry+https://github.com/rust-lang/crates.io-index)" = "1132f845907680735a84409c3bebc64d1364a5683ffbce899550cd09d5eaefc1"
632 594
 "checksum signal-hook-registry 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "94f478ede9f64724c5d173d7bb56099ec3e2d9fc2774aac65d34b8b890405f41"
595
+"checksum simple-error 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "339844c9af2d844b9230bb28e8f819a7790cbf20a29b5cbd2b59916a03a1ef51"
633 596
 "checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
634 597
 "checksum socket2 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)" = "e8b74de517221a2cb01a53349cf54182acdc31a074727d3079068448c0676d85"
635 598
 "checksum syn 1.0.16 (registry+https://github.com/rust-lang/crates.io-index)" = "123bd9499cfb380418d509322d7a6d52e5315f064fe4b3ad18a53d6b92c07859"
636 599
 "checksum termcolor 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bb6bfa289a4d7c5766392812c0a1f4c1ba45afa1ad47803c11e1f407d846d75f"
637 600
 "checksum thread_local 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14"
638 601
 "checksum tokio 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)" = "0fa5e81d6bc4e67fe889d5783bd2a128ab2e0cfa487e0be16b6a8d177b101616"
639
-"checksum tokio-executor 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "fb2d1b8f4548dbf5e1f7818512e9c406860678f29c300cdf0ebac72d1a3a1671"
640 602
 "checksum tokio-macros 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389"
641
-"checksum tokio-timer 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)" = "93044f2d313c95ff1cb7809ce9a7a05735b012288a888b62d4434fd58c94f296"
642 603
 "checksum tokio-util 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "af67cdce2b40f8dffb0ee04c853a24217b5d0d3e358f0f5ccc0b5332174ed9a8"
643 604
 "checksum unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c"
644 605
 "checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a"

+ 1 - 1
Cargo.toml

@@ -10,11 +10,11 @@ edition = "2018"
10 10
 tokio = { version = "0.2", features = ["full"] }
11 11
 protobuf = "2.10"
12 12
 tokio-util = { version = "0.3", features = ["full"] }
13
-tokio-timer = "0.2"
14 13
 log = "0.4"
15 14
 env_logger = "0.7"
16 15
 futures = "0.3"
17 16
 bytes = "0.5"
17
+simple-error = "0.2"
18 18
 
19 19
 [build-dependencies]
20 20
 protobuf-codegen-pure = "2.10"

+ 42 - 14
src/rendezvous_server.rs

@@ -1,24 +1,27 @@
1 1
 use super::message_proto::*;
2 2
 use bytes::Bytes;
3
-use futures::{FutureExt, SinkExt};
3
+use futures::SinkExt;
4 4
 use protobuf::{parse_from_bytes, Message as _};
5 5
 use std::{
6 6
     collections::HashMap,
7 7
     error::Error,
8 8
     net::{Ipv4Addr, SocketAddr, SocketAddrV4},
9
-    time::{SystemTime, UNIX_EPOCH},
9
+    time::{Duration, SystemTime, UNIX_EPOCH},
10
+};
11
+use tokio::{
12
+    net::UdpSocket,
13
+    stream::StreamExt,
14
+    time::{self, delay_for},
10 15
 };
11
-use tokio::net::UdpSocket;
12
-use tokio::stream::StreamExt;
13 16
 use tokio_util::{codec::BytesCodec, udp::UdpFramed};
14 17
 
15 18
 /// Certain router and firewalls scan the packet and if they
16 19
 /// find an IP address belonging to their pool that they use to do the NAT mapping/translation, so here we mangle the ip address
17 20
 
18
-pub struct V4AddrMangle(Vec<u8>);
21
+pub struct V4AddrMangle();
19 22
 
20 23
 impl V4AddrMangle {
21
-    pub fn encode(addr: &SocketAddrV4) -> Self {
24
+    pub fn encode(addr: &SocketAddrV4) -> Vec<u8> {
22 25
         let tm = (SystemTime::now()
23 26
             .duration_since(UNIX_EPOCH)
24 27
             .unwrap()
@@ -35,12 +38,12 @@ impl V4AddrMangle {
35 38
                 break;
36 39
             }
37 40
         }
38
-        Self(bytes[..(16 - n_padding)].to_vec())
41
+        bytes[..(16 - n_padding)].to_vec()
39 42
     }
40 43
 
41
-    pub fn decode(&self) -> SocketAddrV4 {
44
+    pub fn decode(bytes: &[u8]) -> SocketAddrV4 {
42 45
         let mut padded = [0u8; 16];
43
-        padded[..self.0.len()].copy_from_slice(&self.0);
46
+        padded[..bytes.len()].copy_from_slice(&bytes);
44 47
         let number = u128::from_ne_bytes(padded);
45 48
         let tm = (number >> 17) & (u32::max_value() as u128);
46 49
         let ip = (((number >> 49) - tm) as u32).to_ne_bytes();
@@ -83,11 +86,9 @@ impl RendezvousServer {
83 86
                                     },
84 87
                                 );
85 88
                             }
86
-                            tokio_timer::sleep(std::time::Duration::from_secs(60));
87 89
                         }
88 90
                         Some(Message_oneof_union::peek_peer(pp)) => {
89 91
                             rs.handle_peek_peer(&pp, addr, &mut socket).await?;
90
-                            tokio_timer::sleep(std::time::Duration::from_secs(60));
91 92
                         }
92 93
                         _ => {}
93 94
                     }
@@ -106,7 +107,7 @@ impl RendezvousServer {
106 107
         if let Some(peer) = self.peer_map.get(&pp.hbb_addr) {
107 108
             let mut msg_out = Message::new();
108 109
             msg_out.set_peek_peer_response(PeekPeerResponse {
109
-                socket_addr: V4AddrMangle::encode(&peer.socket_addr).0.to_vec(),
110
+                socket_addr: V4AddrMangle::encode(&peer.socket_addr),
110 111
                 ..Default::default()
111 112
             });
112 113
             send_to(&msg_out, addr, socket).await?;
@@ -123,13 +124,17 @@ pub async fn send_to(msg: &Message, addr: SocketAddr, socket: &mut FramedSocket)
123 124
     Ok(())
124 125
 }
125 126
 
127
+pub async fn sleep(sec: f32) {
128
+    delay_for(Duration::from_secs_f32(sec)).await;
129
+}
130
+
126 131
 #[cfg(test)]
127 132
 mod tests {
128 133
     use super::*;
129 134
     #[test]
130 135
     fn test_mangle() {
131 136
         let addr = SocketAddrV4::new(Ipv4Addr::new(192, 168, 16, 32), 21116);
132
-        assert_eq!(addr, V4AddrMangle::encode(&addr).decode());
137
+        assert_eq!(addr, V4AddrMangle::decode(&V4AddrMangle::encode(&addr)[..]));
133 138
     }
134 139
 
135 140
     #[allow(unused_must_use)]
@@ -140,15 +145,38 @@ mod tests {
140 145
         let to_addr = server_addr.parse().unwrap();
141 146
         let f2 = async {
142 147
             let socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
148
+            let local_addr = socket.local_addr().unwrap();
143 149
             let mut socket = UdpFramed::new(socket, BytesCodec::new());
144 150
             let mut msg_out = Message::new();
151
+            msg_out.set_register_peer(RegisterPeer {
152
+                hbb_addr: "123".to_string(),
153
+                ..Default::default()
154
+            });
155
+            send_to(&msg_out, to_addr, &mut socket).await;
145 156
             msg_out.set_peek_peer(PeekPeer {
146 157
                 hbb_addr: "123".to_string(),
147 158
                 ..Default::default()
148 159
             });
149 160
             send_to(&msg_out, to_addr, &mut socket).await;
161
+            if let Ok(Some(Ok((bytes, _)))) =
162
+                time::timeout(Duration::from_millis(1), socket.next()).await
163
+            {
164
+                if let Ok(msg_in) = parse_from_bytes::<Message>(&bytes) {
165
+                    assert_eq!(
166
+                        local_addr,
167
+                        SocketAddr::V4(V4AddrMangle::decode(
168
+                            &msg_in.get_peek_peer_response().socket_addr[..]
169
+                        ))
170
+                    );
171
+                }
172
+            }
173
+            if true {
174
+                Err(Box::new(simple_error::SimpleError::new("done")))
175
+            } else {
176
+                Ok(())
177
+            }
150 178
         };
151
-        tokio::join!(f1, f2);
179
+        tokio::try_join!(f1, f2);
152 180
     }
153 181
 
154 182
     #[test]