Просмотр исходного кода

feat: 优化并加入websocket支持

lejianwen месяцев назад: 5
Родитель
Сommit
872f1f95c8
1 измененных файлов с 272 добавлено и 137 удалено
  1. 272 137
      src/rendezvous_server.rs

+ 272 - 137
src/rendezvous_server.rs

@@ -58,13 +58,42 @@ enum Data {
58 58
 const REG_TIMEOUT: i32 = 30_000;
59 59
 type TcpStreamSink = SplitSink<Framed<TcpStream, BytesCodec>, Bytes>;
60 60
 type WsSink = SplitSink<tokio_tungstenite::WebSocketStream<TcpStream>, tungstenite::Message>;
61
-enum SinkType {
62
-    TcpStream(TcpStreamSink),
63
-    Ws(WsSink),
61
+struct SafeWsSink {
62
+    sink: WsSink,
63
+    encrypt: Option<Encrypt>,
64 64
 }
65
-struct Sink {
66
-    tx: SinkType,
67
-    key: Arc<Mutex<Option<Encrypt>>>,
65
+struct SafeTcpStreamSink {
66
+    sink: TcpStreamSink,
67
+    encrypt: Option<Encrypt>,
68
+}
69
+enum Sink {
70
+    // TcpStream(TcpStreamSink),
71
+    // Ws(WsSink),
72
+    Wss(SafeWsSink),
73
+    Tss(SafeTcpStreamSink),
74
+}
75
+
76
+impl Sink {
77
+    async fn send(&mut self, msg: &RendezvousMessage) {
78
+        if let Ok(mut bytes) = msg.write_to_bytes() {
79
+            match self {
80
+                // Sink::TcpStream(mut s) => allow_err!(s.send(Bytes::from(bytes)).await),
81
+                // Sink::Ws(mut s) => allow_err!(s.send(tungstenite::Message::Binary(bytes)).await),
82
+                Sink::Wss(s) => {
83
+                    if let Some(key) = s.encrypt.as_mut() {
84
+                        bytes = key.enc(&bytes);
85
+                    }
86
+                    allow_err!(s.sink.send(tungstenite::Message::Binary(bytes)).await)
87
+                }
88
+                Sink::Tss(s) => {
89
+                    if let Some(key) = s.encrypt.as_mut() {
90
+                        bytes = key.enc(&bytes);
91
+                    }
92
+                    allow_err!(s.sink.send(Bytes::from(bytes)).await)
93
+                }
94
+            }
95
+        }
96
+    }
68 97
 }
69 98
 type Sender = mpsc::UnboundedSender<Data>;
70 99
 type Receiver = mpsc::UnboundedReceiver<Data>;
@@ -95,6 +124,7 @@ pub struct RendezvousServer {
95 124
     relay_servers0: Arc<RelayServers>,
96 125
     rendezvous_servers: Arc<Vec<String>>,
97 126
     inner: Arc<Inner>,
127
+    ws_map: Arc<Mutex<HashMap<SocketAddr, Sink>>>
98 128
 }
99 129
 
100 130
 enum LoopFailure {
@@ -153,6 +183,7 @@ impl RendezvousServer {
153 183
                 secure_tcp_pk_b,
154 184
                 secure_tcp_sk_b,
155 185
             }),
186
+            ws_map: Arc::new(Mutex::new(HashMap::new()))
156 187
         };
157 188
         log::info!("mask: {:?}", rs.inner.mask);
158 189
         log::info!("local-ip: {:?}", rs.inner.local_ip);
@@ -357,7 +388,13 @@ impl RendezvousServer {
357 388
                     // B registered
358 389
                     if !rp.id.is_empty() {
359 390
                         log::trace!("New peer registered: {:?} {:?}", &rp.id, &addr);
360
-                        self.update_addr(rp.id, addr, socket).await?;
391
+                        let request_pk = self.update_addr(rp.id, addr).await;
392
+                        let mut msg_out = RendezvousMessage::new();
393
+                        msg_out.set_register_peer_response(RegisterPeerResponse {
394
+                            request_pk,
395
+                            ..Default::default()
396
+                        });
397
+                        socket.send(&msg_out, addr).await?;
361 398
                         if self.inner.serial > rp.serial {
362 399
                             let mut msg_out = RendezvousMessage::new();
363 400
                             msg_out.set_configure_update(ConfigUpdate {
@@ -370,89 +407,25 @@ impl RendezvousServer {
370 407
                     }
371 408
                 }
372 409
                 Some(rendezvous_message::Union::RegisterPk(rk)) => {
373
-                    if rk.uuid.is_empty() || rk.pk.is_empty() {
374
-                        return Ok(());
375
-                    }
376
-                    let id = rk.id;
377
-                    let ip = addr.ip().to_string();
378
-                    if id.len() < 6 {
379
-                        return send_rk_res(socket, addr, UUID_MISMATCH).await;
380
-                    } else if !self.check_ip_blocker(&ip, &id).await {
381
-                        return send_rk_res(socket, addr, TOO_FREQUENT).await;
382
-                    }
383
-                    let peer = self.pm.get_or(&id).await;
384
-                    let (changed, ip_changed) = {
385
-                        let peer = peer.read().await;
386
-                        if peer.uuid.is_empty() {
387
-                            (true, false)
388
-                        } else {
389
-                            if peer.uuid == rk.uuid {
390
-                                if peer.info.ip != ip && peer.pk != rk.pk {
391
-                                    log::warn!(
392
-                                        "Peer {} ip/pk mismatch: {}/{:?} vs {}/{:?}",
393
-                                        id,
394
-                                        ip,
395
-                                        rk.pk,
396
-                                        peer.info.ip,
397
-                                        peer.pk,
398
-                                    );
399
-                                    drop(peer);
400
-                                    return send_rk_res(socket, addr, UUID_MISMATCH).await;
401
-                                }
402
-                            } else {
403
-                                log::warn!(
404
-                                    "Peer {} uuid mismatch: {:?} vs {:?}",
405
-                                    id,
406
-                                    rk.uuid,
407
-                                    peer.uuid
408
-                                );
409
-                                drop(peer);
410
-                                return send_rk_res(socket, addr, UUID_MISMATCH).await;
411
-                            }
412
-                            let ip_changed = peer.info.ip != ip;
413
-                            (
414
-                                peer.uuid != rk.uuid || peer.pk != rk.pk || ip_changed,
415
-                                ip_changed,
416
-                            )
410
+                    let response = self.handle_register_pk(rk, addr, false).await;
411
+                    match response {
412
+                        Err(err) => {
413
+                            let mut msg_out = RendezvousMessage::new();
414
+                            msg_out.set_register_pk_response(RegisterPkResponse {
415
+                                result: err.into(),
416
+                                ..Default::default()
417
+                            });
418
+                            socket.send(&msg_out, addr).await?;
417 419
                         }
418
-                    };
419
-                    let mut req_pk = peer.read().await.reg_pk;
420
-                    if req_pk.1.elapsed().as_secs() > 6 {
421
-                        req_pk.0 = 0;
422
-                    } else if req_pk.0 > 2 {
423
-                        return send_rk_res(socket, addr, TOO_FREQUENT).await;
424
-                    }
425
-                    req_pk.0 += 1;
426
-                    req_pk.1 = Instant::now();
427
-                    peer.write().await.reg_pk = req_pk;
428
-                    if ip_changed {
429
-                        let mut lock = IP_CHANGES.lock().await;
430
-                        if let Some((tm, ips)) = lock.get_mut(&id) {
431
-                            if tm.elapsed().as_secs() > IP_CHANGE_DUR {
432
-                                *tm = Instant::now();
433
-                                ips.clear();
434
-                                ips.insert(ip.clone(), 1);
435
-                            } else if let Some(v) = ips.get_mut(&ip) {
436
-                                *v += 1;
437
-                            } else {
438
-                                ips.insert(ip.clone(), 1);
439
-                            }
440
-                        } else {
441
-                            lock.insert(
442
-                                id.clone(),
443
-                                (Instant::now(), HashMap::from([(ip.clone(), 1)])),
444
-                            );
420
+                        Ok(res) => {
421
+                            let mut msg_out = RendezvousMessage::new();
422
+                            msg_out.set_register_pk_response(RegisterPkResponse {
423
+                                result: res.into(),
424
+                                ..Default::default()
425
+                            });
426
+                            socket.send(&msg_out, addr).await?;
445 427
                         }
446 428
                     }
447
-                    if changed {
448
-                        self.pm.update_pk(id, peer, addr, rk.uuid, rk.pk, ip).await;
449
-                    }
450
-                    let mut msg_out = RendezvousMessage::new();
451
-                    msg_out.set_register_pk_response(RegisterPkResponse {
452
-                        result: register_pk_response::Result::OK.into(),
453
-                        ..Default::default()
454
-                    });
455
-                    socket.send(&msg_out, addr).await?
456 429
                 }
457 430
                 Some(rendezvous_message::Union::PunchHoleRequest(ph)) => {
458 431
                     if self.pm.is_in_memory(&ph.id).await {
@@ -519,7 +492,30 @@ impl RendezvousServer {
519 492
         ws: bool,
520 493
     ) -> bool {
521 494
         if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(bytes) {
495
+            // log::debug!("Received TCP message from {}: {:?}", addr, msg_in);
522 496
             match msg_in.union {
497
+                Some(rendezvous_message::Union::RegisterPeer(rp)) => {
498
+                    // B registered
499
+                    if !rp.id.is_empty() {
500
+                        log::trace!("New peer registered: {:?} {:?}", &rp.id, &addr);
501
+                        let request_pk =  self.update_addr(rp.id, addr).await;
502
+                        let mut msg_out = RendezvousMessage::new();
503
+                        msg_out.set_register_peer_response(RegisterPeerResponse {
504
+                            request_pk,
505
+                            ..Default::default()
506
+                        });
507
+                        Self::send_to_sink(sink, msg_out).await;
508
+                        if self.inner.serial > rp.serial {
509
+                            let mut msg_out = RendezvousMessage::new();
510
+                            msg_out.set_configure_update(ConfigUpdate {
511
+                                serial: self.inner.serial,
512
+                                rendezvous_servers: (*self.rendezvous_servers).clone(),
513
+                                ..Default::default()
514
+                            });
515
+                            Self::send_to_sink(sink, msg_out).await;
516
+                        }
517
+                    }
518
+                }
523 519
                 Some(rendezvous_message::Union::PunchHoleRequest(ph)) => {
524 520
                     // there maybe several attempt, so sink can be none
525 521
                     if let Some(sink) = sink.take() {
@@ -583,14 +579,34 @@ impl RendezvousServer {
583 579
                     msg_out.set_test_nat_response(res);
584 580
                     Self::send_to_sink(sink, msg_out).await;
585 581
                 }
586
-                Some(rendezvous_message::Union::RegisterPk(_)) => {
587
-                    let res = register_pk_response::Result::NOT_SUPPORT;
588
-                    let mut msg_out = RendezvousMessage::new();
589
-                    msg_out.set_register_pk_response(RegisterPkResponse {
590
-                        result: res.into(),
591
-                        ..Default::default()
592
-                    });
593
-                    Self::send_to_sink(sink, msg_out).await;
582
+                Some(rendezvous_message::Union::RegisterPk(rk)) => {
583
+                    let response = self.handle_register_pk(rk, addr, ws).await;
584
+                    match response {
585
+                        Err(err) => {
586
+                            let mut msg_out = RendezvousMessage::new();
587
+                            msg_out.set_register_pk_response(RegisterPkResponse {
588
+                                result: err.into(),
589
+                                ..Default::default()
590
+                            });
591
+                            Self::send_to_sink(sink, msg_out).await;
592
+                            return false;
593
+                        }
594
+                        Ok(res) => {
595
+                            let mut msg_out = RendezvousMessage::new();
596
+                            msg_out.set_register_pk_response(RegisterPkResponse {
597
+                                result: res.into(),
598
+                                ..Default::default()
599
+                            });
600
+                            Self::send_to_sink(sink, msg_out).await;
601
+                            if ws {
602
+                                // for ws, we can only get addr when register_pk
603
+                                if let Some(sink) = sink.take() {
604
+                                    self.ws_map.lock().await.insert(try_into_v4(addr), sink);
605
+                                }
606
+                            }
607
+                            return true;
608
+                        }
609
+                    }
594 610
                 }
595 611
                 Some(rendezvous_message::Union::KeyExchange(ex)) => {
596 612
                     log::trace!("KeyExchange {:?} <- bytes: {:?}", addr, hex::encode(&bytes));
@@ -612,7 +628,10 @@ impl RendezvousServer {
612 628
                     match key {
613 629
                         Some(key) => {
614 630
                             if let Some(sink) = sink.as_mut() {
615
-                                sink.key.lock().await.replace(Encrypt::new(key));
631
+                                match sink {
632
+                                    Sink::Wss(s) => s.encrypt = Some(Encrypt::new(key)),
633
+                                    Sink::Tss(s) => s.encrypt = Some(Encrypt::new(key)),
634
+                                }
616 635
                             }
617 636
                             log::debug!("KeyExchange symetric key set");
618 637
                             return true;
@@ -654,13 +673,110 @@ impl RendezvousServer {
654 673
         states
655 674
     }
656 675
 
676
+    async fn handle_register_pk(
677
+        &mut self,
678
+        rk: RegisterPk,
679
+        addr: SocketAddr,
680
+        ws: bool,
681
+    ) -> Result<register_pk_response::Result, register_pk_response::Result> {
682
+        if rk.uuid.is_empty() || rk.pk.is_empty() {
683
+            return Err(INVALID_ID_FORMAT);
684
+        }
685
+        let id = rk.id;
686
+        let ip = addr.ip().to_string();
687
+        if id.len() < 6 {
688
+            return Err(UUID_MISMATCH);
689
+            //return Err(send_rk_res(socket, addr, UUID_MISMATCH).await);
690
+        } else if !self.check_ip_blocker(&ip, &id).await {
691
+            return Err(TOO_FREQUENT);
692
+            //return Err(send_rk_res(socket, addr, TOO_FREQUENT).await);
693
+        }
694
+        let peer = self.pm.get_or(&id).await;
695
+        let (changed, ip_changed) = {
696
+            let peer = peer.read().await;
697
+            if peer.uuid.is_empty() {
698
+                (true, false)
699
+            } else {
700
+                if peer.uuid == rk.uuid {
701
+                    if peer.info.ip != ip && peer.pk != rk.pk {
702
+                        log::warn!(
703
+                            "Peer {} ip/pk mismatch: {}/{:?} vs {}/{:?}",
704
+                            id,
705
+                            ip,
706
+                            rk.pk,
707
+                            peer.info.ip,
708
+                            peer.pk,
709
+                        );
710
+                        drop(peer);
711
+                        return Err(UUID_MISMATCH);
712
+                        //return Err(send_rk_res(socket, addr, UUID_MISMATCH).await);
713
+                    }
714
+                } else {
715
+                    log::warn!(
716
+                        "Peer {} uuid mismatch: {:?} vs {:?}",
717
+                        id,
718
+                        rk.uuid,
719
+                        peer.uuid
720
+                    );
721
+                    drop(peer);
722
+                    return Err(UUID_MISMATCH);
723
+                    //return Err(send_rk_res(socket, addr, UUID_MISMATCH).await);
724
+                }
725
+                let ip_changed = peer.info.ip != ip;
726
+                (
727
+                    peer.uuid != rk.uuid || peer.pk != rk.pk || ip_changed,
728
+                    ip_changed,
729
+                )
730
+            }
731
+        };
732
+        let mut req_pk = peer.read().await.reg_pk;
733
+        if req_pk.1.elapsed().as_secs() > 6 {
734
+            req_pk.0 = 0;
735
+        } else if req_pk.0 > 2 {
736
+            return Err(TOO_FREQUENT);
737
+            //return Err(send_rk_res(socket, addr, TOO_FREQUENT).await);
738
+        }
739
+        req_pk.0 += 1;
740
+        req_pk.1 = Instant::now();
741
+        peer.write().await.reg_pk = req_pk;
742
+        if ip_changed {
743
+            let mut lock = IP_CHANGES.lock().await;
744
+            if let Some((tm, ips)) = lock.get_mut(&id) {
745
+                if tm.elapsed().as_secs() > IP_CHANGE_DUR {
746
+                    *tm = Instant::now();
747
+                    ips.clear();
748
+                    ips.insert(ip.clone(), 1);
749
+                } else if let Some(v) = ips.get_mut(&ip) {
750
+                    *v += 1;
751
+                } else {
752
+                    ips.insert(ip.clone(), 1);
753
+                }
754
+            } else {
755
+                lock.insert(
756
+                    id.clone(),
757
+                    (Instant::now(), HashMap::from([(ip.clone(), 1)])),
758
+                );
759
+            }
760
+        }
761
+        if changed || ws {
762
+            // update peer info,解决tcp过程中不更新在线时间的问题
763
+            self.pm.update_pk(id, peer, addr, rk.uuid, rk.pk, ip).await;
764
+        }
765
+        Ok(register_pk_response::Result::OK)
766
+        // let mut msg_out = RendezvousMessage::new();
767
+        // msg_out.set_register_pk_response(RegisterPkResponse {
768
+        //     result: register_pk_response::Result::OK.into(),
769
+        //     ..Default::default()
770
+        // });
771
+        // Ok(msg_out)
772
+    }
773
+
657 774
     #[inline]
658 775
     async fn update_addr(
659 776
         &mut self,
660 777
         id: String,
661 778
         socket_addr: SocketAddr,
662
-        socket: &mut FramedSocket,
663
-    ) -> ResultType<()> {
779
+    ) -> bool {
664 780
         let (request_pk, ip_change) = if let Some(old) = self.pm.get_in_memory(&id).await {
665 781
             let mut old = old.write().await;
666 782
             let ip = socket_addr.ip();
@@ -690,12 +806,13 @@ impl RendezvousServer {
690 806
         if let Some(old) = ip_change {
691 807
             log::info!("IP change of {} from {} to {}", id, old, socket_addr);
692 808
         }
693
-        let mut msg_out = RendezvousMessage::new();
694
-        msg_out.set_register_peer_response(RegisterPeerResponse {
695
-            request_pk,
696
-            ..Default::default()
697
-        });
698
-        socket.send(&msg_out, socket_addr).await
809
+        request_pk
810
+        // let mut msg_out = RendezvousMessage::new();
811
+        // msg_out.set_register_peer_response(RegisterPeerResponse {
812
+        //     request_pk,
813
+        //     ..Default::default()
814
+        // });
815
+        // socket.send(&msg_out, socket_addr).await
699 816
     }
700 817
 
701 818
     #[inline]
@@ -771,7 +888,7 @@ impl RendezvousServer {
771 888
         ph: PunchHoleRequest,
772 889
         key: &str,
773 890
         ws: bool,
774
-    ) -> ResultType<(RendezvousMessage, Option<SocketAddr>)> {
891
+    ) -> ResultType<(RendezvousMessage, Option<(SocketAddr)>)> {
775 892
         let mut ph = ph;
776 893
         if !key.is_empty() && ph.licence_key != key {
777 894
             let mut msg_out = RendezvousMessage::new();
@@ -869,7 +986,8 @@ impl RendezvousServer {
869 986
                     ..Default::default()
870 987
                 });
871 988
             }
872
-            Ok((msg_out, Some(peer_addr)))
989
+            //
990
+            Ok((msg_out, Some((peer_addr))))
873 991
         } else {
874 992
             let mut msg_out = RendezvousMessage::new();
875 993
             msg_out.set_punch_hole_response(PunchHoleResponse {
@@ -909,19 +1027,7 @@ impl RendezvousServer {
909 1027
     #[inline]
910 1028
     async fn send_to_sink(sink: &mut Option<Sink>, msg: RendezvousMessage) {
911 1029
         if let Some(sink) = sink.as_mut() {
912
-            if let Ok(mut bytes) = msg.write_to_bytes() {
913
-                if let Some(enc) = &mut sink.key.lock().await.as_mut() {
914
-                    bytes = enc.enc(&bytes);
915
-                }
916
-                match &mut sink.tx {
917
-                    SinkType::TcpStream(s) => {
918
-                        allow_err!(s.send(Bytes::from(bytes)).await);
919
-                    }
920
-                    SinkType::Ws(ws) => {
921
-                        allow_err!(ws.send(tungstenite::Message::Binary(bytes)).await);
922
-                    }
923
-                }
924
-            }
1030
+            sink.send(&msg).await;
925 1031
         }
926 1032
     }
927 1033
 
@@ -946,13 +1052,50 @@ impl RendezvousServer {
946 1052
     ) -> ResultType<()> {
947 1053
         let (msg, to_addr) = self.handle_punch_hole_request(addr, ph, key, ws).await?;
948 1054
         if let Some(addr) = to_addr {
949
-            self.tx.send(Data::Msg(msg.into(), addr))?;
1055
+            let mut sink = self.ws_map.lock().await.remove(&try_into_v4(addr));
1056
+            if let Some(s) = sink.as_mut() {
1057
+                s.send(&msg).await;
1058
+            } else {
1059
+                self.tx.send(Data::Msg(msg.into(), addr))?;
1060
+            }
950 1061
         } else {
951 1062
             self.send_to_tcp_sync(msg, addr).await?;
952 1063
         }
953 1064
         Ok(())
954 1065
     }
955 1066
 
1067
+    /**
1068
+    async fn handle_tcp_punch_hole_request(
1069
+        &mut self,
1070
+        addr: SocketAddr,
1071
+        ph: PunchHoleRequest,
1072
+        key: &str,
1073
+        ws: bool,
1074
+    ) -> ResultType<()> {
1075
+        let (msg, to_addr) = self.handle_punch_hole_request(addr, ph, key, ws).await?;
1076
+        log::debug!(
1077
+            "handle_tcp_punch_hole_request: {:?} to {:?}",
1078
+            &msg, &to_addr
1079
+        );
1080
+        //
1081
+
1082
+        if let Some(addr) = to_addr {
1083
+            // self.send_to_tcp_sync(msg, addr).await?;
1084
+            //这里无法判断目标addr是否是ws,请求无法发送到目标peer
1085
+            if self.tcp_punch.lock().await.contains_key(&try_into_v4(addr)) {
1086
+                self.send_to_tcp_sync(msg, addr).await?; //这里如果client使用ws,可以,但如果不用ws就必须用下面的,怎么判断呢
1087
+                //这样判断还是不行,如果client从ws切换回udp,tcp_punch中还是有残留,会导致发送失败,应该做个新的hashmap存??
1088
+            } else {
1089
+                self.tx.send(Data::Msg(msg.into(), addr))?;
1090
+            }
1091
+            // self.tx.send(Data::Msg(msg.into(), addr))?;
1092
+
1093
+        } else {
1094
+            self.send_to_tcp_sync(msg, addr).await?;
1095
+        }
1096
+        Ok(())
1097
+    }**/
1098
+
956 1099
     #[inline]
957 1100
     async fn handle_udp_punch_hole_request(
958 1101
         &mut self,
@@ -1256,10 +1399,10 @@ impl RendezvousServer {
1256 1399
             };
1257 1400
             let ws_stream = tokio_tungstenite::accept_hdr_async(stream, callback).await?;
1258 1401
             let (a, mut b) = ws_stream.split();
1259
-            sink = Some(Sink {
1260
-                tx: SinkType::Ws(a),
1261
-                key: Arc::new(Mutex::new(None)),
1262
-            });
1402
+            sink = Some(Sink::Wss(SafeWsSink {
1403
+                sink: a,
1404
+                encrypt: None,
1405
+            }));
1263 1406
             while let Ok(Some(Ok(msg))) = timeout(30_000, b.next()).await {
1264 1407
                 if let tungstenite::Message::Binary(bytes) = msg {
1265 1408
                     if !self.handle_tcp(&bytes, &mut sink, addr, key, ws).await {
@@ -1269,23 +1412,15 @@ impl RendezvousServer {
1269 1412
             }
1270 1413
         } else {
1271 1414
             let (a, mut b) = Framed::new(stream, BytesCodec::new()).split();
1272
-            let enc = Arc::new(Mutex::new(None));
1273
-            sink = Some(Sink {
1274
-                tx: SinkType::TcpStream(a),
1275
-                key: enc.clone(),
1276
-            });
1415
+            sink = Some(Sink::Tss(SafeTcpStreamSink {
1416
+                sink: a,
1417
+                encrypt: None,
1418
+            }));
1277 1419
             // Avoid key exchange if answering on nat helper port
1278 1420
             if !key.is_empty() {
1279 1421
                 self.key_exchange_phase1(addr, &mut sink).await;
1280 1422
             }
1281
-            while let Ok(Some(Ok(mut bytes))) = timeout(30_000, b.next()).await {
1282
-                let mut enc_lock = enc.lock().await;
1283
-                if enc_lock.is_some() {
1284
-                    if let Err(err) = enc_lock.as_mut().unwrap().dec(&mut bytes) {
1285
-                        return Err(err.into());
1286
-                    }
1287
-                }
1288
-                drop(enc_lock);
1423
+            while let Ok(Some(Ok(bytes))) = timeout(30_000, b.next()).await {
1289 1424
                 if !self.handle_tcp(&bytes, &mut sink, addr, key, ws).await {
1290 1425
                     break;
1291 1426
                 }