peer.rs 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. use crate::common::*;
  2. use crate::database;
  3. use hbb_common::{
  4. bytes::Bytes,
  5. log,
  6. rendezvous_proto::*,
  7. tokio::sync::{Mutex, RwLock},
  8. ResultType,
  9. };
  10. use serde_derive::{Deserialize, Serialize};
  11. use std::{collections::HashMap, collections::HashSet, net::SocketAddr, sync::Arc, time::Instant};
  12. type IpBlockMap = HashMap<String, ((u32, Instant), (HashSet<String>, Instant))>;
  13. type UserStatusMap = HashMap<Vec<u8>, Arc<(Option<Vec<u8>>, bool)>>;
  14. type IpChangesMap = HashMap<String, (Instant, HashMap<String, i32>)>;
  15. lazy_static::lazy_static! {
  16. pub(crate) static ref IP_BLOCKER: Mutex<IpBlockMap> = Default::default();
  17. pub(crate) static ref USER_STATUS: RwLock<UserStatusMap> = Default::default();
  18. pub(crate) static ref IP_CHANGES: Mutex<IpChangesMap> = Default::default();
  19. }
  20. pub static IP_CHANGE_DUR: u64 = 180;
  21. pub static IP_CHANGE_DUR_X2: u64 = IP_CHANGE_DUR * 2;
  22. pub static DAY_SECONDS: u64 = 3600 * 24;
  23. pub static IP_BLOCK_DUR: u64 = 60;
  24. #[derive(Debug, Default, Serialize, Deserialize, Clone)]
  25. pub(crate) struct PeerInfo {
  26. #[serde(default)]
  27. pub(crate) ip: String,
  28. }
  29. pub(crate) struct Peer {
  30. pub(crate) socket_addr: SocketAddr,
  31. pub(crate) last_reg_time: Instant,
  32. pub(crate) guid: Vec<u8>,
  33. pub(crate) uuid: Bytes,
  34. pub(crate) pk: Bytes,
  35. // pub(crate) user: Option<Vec<u8>>,
  36. pub(crate) info: PeerInfo,
  37. // pub(crate) disabled: bool,
  38. pub(crate) reg_pk: (u32, Instant), // how often register_pk
  39. }
  40. impl Default for Peer {
  41. fn default() -> Self {
  42. Self {
  43. socket_addr: "0.0.0.0:0".parse().unwrap(),
  44. last_reg_time: get_expired_time(),
  45. guid: Vec::new(),
  46. uuid: Bytes::new(),
  47. pk: Bytes::new(),
  48. info: Default::default(),
  49. // user: None,
  50. // disabled: false,
  51. reg_pk: (0, get_expired_time()),
  52. }
  53. }
  54. }
  55. pub(crate) type LockPeer = Arc<RwLock<Peer>>;
  56. #[derive(Clone)]
  57. pub(crate) struct PeerMap {
  58. map: Arc<RwLock<HashMap<String, LockPeer>>>,
  59. pub(crate) db: database::Database,
  60. }
  61. impl PeerMap {
  62. pub(crate) async fn new() -> ResultType<Self> {
  63. let db = std::env::var("DB_URL").unwrap_or({
  64. let mut db = "db_v2.sqlite3".to_owned();
  65. #[cfg(all(windows, not(debug_assertions)))]
  66. {
  67. if let Some(path) = hbb_common::config::Config::icon_path().parent() {
  68. db = format!("{}\\{}", path.to_str().unwrap_or("."), db);
  69. }
  70. }
  71. #[cfg(not(windows))]
  72. {
  73. db = format!("./{db}");
  74. }
  75. db
  76. });
  77. log::info!("DB_URL={}", db);
  78. let pm = Self {
  79. map: Default::default(),
  80. db: database::Database::new(&db).await?,
  81. };
  82. Ok(pm)
  83. }
  84. #[inline]
  85. pub(crate) async fn update_pk(
  86. &mut self,
  87. id: String,
  88. peer: LockPeer,
  89. addr: SocketAddr,
  90. uuid: Bytes,
  91. pk: Bytes,
  92. ip: String,
  93. ) -> register_pk_response::Result {
  94. log::info!("update_pk {} {:?} {:?} {:?}", id, addr, uuid, pk);
  95. let (info_str, guid) = {
  96. let mut w = peer.write().await;
  97. w.socket_addr = addr;
  98. w.uuid = uuid.clone();
  99. w.pk = pk.clone();
  100. w.last_reg_time = Instant::now();
  101. w.info.ip = ip;
  102. (
  103. serde_json::to_string(&w.info).unwrap_or_default(),
  104. w.guid.clone(),
  105. )
  106. };
  107. if guid.is_empty() {
  108. match self.db.insert_peer(&id, &uuid, &pk, &info_str).await {
  109. Err(err) => {
  110. log::error!("db.insert_peer failed: {}", err);
  111. return register_pk_response::Result::SERVER_ERROR;
  112. }
  113. Ok(guid) => {
  114. peer.write().await.guid = guid;
  115. }
  116. }
  117. } else {
  118. if let Err(err) = self.db.update_pk(&guid, &id, &pk, &info_str).await {
  119. log::error!("db.update_pk failed: {}", err);
  120. return register_pk_response::Result::SERVER_ERROR;
  121. }
  122. log::info!("pk updated instead of insert");
  123. }
  124. register_pk_response::Result::OK
  125. }
  126. #[inline]
  127. pub(crate) async fn get(&self, id: &str) -> Option<LockPeer> {
  128. let p = self.map.read().await.get(id).cloned();
  129. if p.is_some() {
  130. return p;
  131. } else if let Ok(Some(v)) = self.db.get_peer(id).await {
  132. let peer = Peer {
  133. guid: v.guid,
  134. uuid: v.uuid.into(),
  135. pk: v.pk.into(),
  136. // user: v.user,
  137. info: serde_json::from_str::<PeerInfo>(&v.info).unwrap_or_default(),
  138. // disabled: v.status == Some(0),
  139. ..Default::default()
  140. };
  141. let peer = Arc::new(RwLock::new(peer));
  142. self.map.write().await.insert(id.to_owned(), peer.clone());
  143. return Some(peer);
  144. }
  145. None
  146. }
  147. #[inline]
  148. pub(crate) async fn get_or(&self, id: &str) -> LockPeer {
  149. if let Some(p) = self.get(id).await {
  150. return p;
  151. }
  152. let mut w = self.map.write().await;
  153. if let Some(p) = w.get(id) {
  154. return p.clone();
  155. }
  156. let tmp = LockPeer::default();
  157. w.insert(id.to_owned(), tmp.clone());
  158. tmp
  159. }
  160. #[inline]
  161. pub(crate) async fn get_in_memory(&self, id: &str) -> Option<LockPeer> {
  162. self.map.read().await.get(id).cloned()
  163. }
  164. #[inline]
  165. pub(crate) async fn is_in_memory(&self, id: &str) -> bool {
  166. self.map.read().await.contains_key(id)
  167. }
  168. }