quic.rs 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. use crate::{allow_err, anyhow::anyhow, ResultType};
  2. use protobuf::Message;
  3. use std::{net::SocketAddr, sync::Arc};
  4. use tokio::{self, stream::StreamExt, sync::mpsc};
  5. const QUIC_HBB: &[&[u8]] = &[b"hbb"];
  6. const SERVER_NAME: &str = "hbb";
  7. type Sender = mpsc::UnboundedSender<Value>;
  8. type Receiver = mpsc::UnboundedReceiver<Value>;
  9. pub fn new_server(socket: std::net::UdpSocket) -> ResultType<(Server, SocketAddr)> {
  10. let mut transport_config = quinn::TransportConfig::default();
  11. transport_config.stream_window_uni(0);
  12. let mut server_config = quinn::ServerConfig::default();
  13. server_config.transport = Arc::new(transport_config);
  14. let mut server_config = quinn::ServerConfigBuilder::new(server_config);
  15. server_config.protocols(QUIC_HBB);
  16. // server_config.enable_keylog();
  17. // server_config.use_stateless_retry(true);
  18. let mut endpoint = quinn::Endpoint::builder();
  19. endpoint.listen(server_config.build());
  20. let (end, incoming) = endpoint.with_socket(socket)?;
  21. Ok((Server { incoming }, end.local_addr()?))
  22. }
  23. pub async fn new_client(local_addr: &SocketAddr, peer: &SocketAddr) -> ResultType<Connection> {
  24. let mut endpoint = quinn::Endpoint::builder();
  25. let mut client_config = quinn::ClientConfigBuilder::default();
  26. client_config.protocols(QUIC_HBB);
  27. //client_config.enable_keylog();
  28. endpoint.default_client_config(client_config.build());
  29. let (endpoint, _) = endpoint.bind(local_addr)?;
  30. let new_conn = endpoint.connect(peer, SERVER_NAME)?.await?;
  31. Connection::new_for_client(new_conn.connection).await
  32. }
  33. pub struct Server {
  34. incoming: quinn::Incoming,
  35. }
  36. impl Server {
  37. #[inline]
  38. pub async fn next(&mut self) -> ResultType<Option<Connection>> {
  39. Connection::new_for_server(&mut self.incoming).await
  40. }
  41. }
  42. pub struct Connection {
  43. conn: quinn::Connection,
  44. tx: quinn::SendStream,
  45. rx: Receiver,
  46. }
  47. type Value = ResultType<Vec<u8>>;
  48. impl Connection {
  49. async fn new_for_server(incoming: &mut quinn::Incoming) -> ResultType<Option<Self>> {
  50. if let Some(conn) = incoming.next().await {
  51. let quinn::NewConnection {
  52. connection: conn,
  53. // uni_streams,
  54. mut bi_streams,
  55. ..
  56. } = conn.await?;
  57. let (tx, rx) = mpsc::unbounded_channel::<Value>();
  58. tokio::spawn(async move {
  59. loop {
  60. let stream = bi_streams.next().await;
  61. if let Some(stream) = stream {
  62. let stream = match stream {
  63. Err(e) => {
  64. tx.send(Err(e.into())).ok();
  65. break;
  66. }
  67. Ok(s) => s,
  68. };
  69. let cloned = tx.clone();
  70. tokio::spawn(async move {
  71. allow_err!(handle_request(stream.1, cloned).await);
  72. });
  73. } else {
  74. tx.send(Err(anyhow!("Reset by the peer"))).ok();
  75. break;
  76. }
  77. }
  78. log::info!("Exit connection outer loop");
  79. });
  80. let tx = conn.open_uni().await?;
  81. Ok(Some(Self { conn, tx, rx }))
  82. } else {
  83. Ok(None)
  84. }
  85. }
  86. async fn new_for_client(conn: quinn::Connection) -> ResultType<Self> {
  87. let (tx, rx_quic) = conn.open_bi().await?;
  88. let (tx_mpsc, rx) = mpsc::unbounded_channel::<Value>();
  89. tokio::spawn(async move {
  90. allow_err!(handle_request(rx_quic, tx_mpsc).await);
  91. });
  92. Ok(Self { conn, tx, rx })
  93. }
  94. #[inline]
  95. pub async fn next(&mut self) -> Option<Value> {
  96. // None is returned when all Sender halves have dropped,
  97. // indicating that no further values can be sent on the channel.
  98. self.rx.recv().await
  99. }
  100. #[inline]
  101. pub fn remote_address(&self) -> SocketAddr {
  102. self.conn.remote_address()
  103. }
  104. #[inline]
  105. pub async fn send_raw(&mut self, bytes: &[u8]) -> ResultType<()> {
  106. self.tx.write_all(bytes).await?;
  107. Ok(())
  108. }
  109. #[inline]
  110. pub async fn send(&mut self, msg: &dyn Message) -> ResultType<()> {
  111. match msg.write_to_bytes() {
  112. Ok(bytes) => self.send_raw(&bytes).await?,
  113. err => allow_err!(err),
  114. }
  115. Ok(())
  116. }
  117. }
  118. async fn handle_request(rx: quinn::RecvStream, tx: Sender) -> ResultType<()> {
  119. Ok(())
  120. }