database.rs 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. use async_trait::async_trait;
  2. use hbb_common::{log, ResultType};
  3. use serde_json::value::Value;
  4. use sqlx::{
  5. sqlite::SqliteConnectOptions, ConnectOptions, Connection, Error as SqlxError, SqliteConnection,
  6. };
  7. use std::{ops::DerefMut, str::FromStr};
  8. //use sqlx::postgres::PgPoolOptions;
  9. //use sqlx::mysql::MySqlPoolOptions;
  10. pub(crate) type DB = sqlx::Sqlite;
  11. pub(crate) type MapValue = serde_json::map::Map<String, Value>;
  12. pub(crate) type MapStr = std::collections::HashMap<String, String>;
  13. type Pool = deadpool::managed::Pool<DbPool>;
  14. pub struct DbPool {
  15. url: String,
  16. }
  17. #[async_trait]
  18. impl deadpool::managed::Manager for DbPool {
  19. type Type = SqliteConnection;
  20. type Error = SqlxError;
  21. async fn create(&self) -> Result<SqliteConnection, SqlxError> {
  22. let mut opt = SqliteConnectOptions::from_str(&self.url).unwrap();
  23. opt.log_statements(log::LevelFilter::Debug);
  24. SqliteConnection::connect_with(&opt).await
  25. }
  26. async fn recycle(
  27. &self,
  28. obj: &mut SqliteConnection,
  29. ) -> deadpool::managed::RecycleResult<SqlxError> {
  30. Ok(obj.ping().await?)
  31. }
  32. }
  33. #[derive(Clone)]
  34. pub struct Database {
  35. pool: Pool,
  36. }
  37. #[derive(Default)]
  38. pub struct Peer {
  39. pub guid: Vec<u8>,
  40. pub id: String,
  41. pub uuid: Vec<u8>,
  42. pub pk: Vec<u8>,
  43. pub user: Option<Vec<u8>>,
  44. pub info: String,
  45. pub status: Option<i64>,
  46. }
  47. impl Database {
  48. pub async fn new(url: &str) -> ResultType<Database> {
  49. if !std::path::Path::new(url).exists() {
  50. std::fs::File::create(url).ok();
  51. }
  52. let n: usize = std::env::var("MAX_DATABASE_CONNECTIONS")
  53. .unwrap_or("1".to_owned())
  54. .parse()
  55. .unwrap_or(1);
  56. log::debug!("MAX_DATABASE_CONNECTIONS={}", n);
  57. let pool = Pool::new(
  58. DbPool {
  59. url: url.to_owned(),
  60. },
  61. n,
  62. );
  63. let _ = pool.get().await?; // test
  64. let db = Database { pool };
  65. db.create_tables().await?;
  66. Ok(db)
  67. }
  68. async fn create_tables(&self) -> ResultType<()> {
  69. sqlx::query!(
  70. "
  71. create table if not exists peer (
  72. guid blob primary key not null,
  73. id varchar(100) not null,
  74. uuid blob not null,
  75. pk blob not null,
  76. created_at datetime not null default(current_timestamp),
  77. user blob,
  78. status tinyint,
  79. note varchar(300),
  80. info text not null
  81. ) without rowid;
  82. create unique index if not exists index_peer_id on peer (id);
  83. create index if not exists index_peer_user on peer (user);
  84. create index if not exists index_peer_created_at on peer (created_at);
  85. create index if not exists index_peer_status on peer (status);
  86. "
  87. )
  88. .execute(self.pool.get().await?.deref_mut())
  89. .await?;
  90. Ok(())
  91. }
  92. pub async fn get_peer(&self, id: &str) -> ResultType<Option<Peer>> {
  93. Ok(sqlx::query_as!(
  94. Peer,
  95. "select guid, id, uuid, pk, user, status, info from peer where id = ?",
  96. id
  97. )
  98. .fetch_optional(self.pool.get().await?.deref_mut())
  99. .await?)
  100. }
  101. pub async fn get_peer_id(&self, guid: &[u8]) -> ResultType<Option<String>> {
  102. Ok(sqlx::query!("select id from peer where guid = ?", guid)
  103. .fetch_optional(self.pool.get().await?.deref_mut())
  104. .await?
  105. .map(|x| x.id))
  106. }
  107. #[inline]
  108. pub async fn get_conn(&self) -> ResultType<deadpool::managed::Object<DbPool>> {
  109. Ok(self.pool.get().await?)
  110. }
  111. pub async fn update_peer(&self, payload: MapValue, guid: &[u8]) -> ResultType<()> {
  112. let mut conn = self.get_conn().await?;
  113. let mut tx = conn.begin().await?;
  114. if let Some(v) = payload.get("note") {
  115. let v = get_str(v);
  116. sqlx::query!("update peer set note = ? where guid = ?", v, guid)
  117. .execute(&mut tx)
  118. .await?;
  119. }
  120. tx.commit().await?;
  121. Ok(())
  122. }
  123. pub async fn insert_peer(
  124. &self,
  125. id: &str,
  126. uuid: &Vec<u8>,
  127. pk: &Vec<u8>,
  128. info: &str,
  129. ) -> ResultType<Vec<u8>> {
  130. let guid = uuid::Uuid::new_v4().as_bytes().to_vec();
  131. sqlx::query!(
  132. "insert into peer(guid, id, uuid, pk, info) values(?, ?, ?, ?, ?)",
  133. guid,
  134. id,
  135. uuid,
  136. pk,
  137. info
  138. )
  139. .execute(self.pool.get().await?.deref_mut())
  140. .await?;
  141. Ok(guid)
  142. }
  143. pub async fn update_pk(
  144. &self,
  145. guid: &Vec<u8>,
  146. id: &str,
  147. pk: &Vec<u8>,
  148. info: &str,
  149. ) -> ResultType<()> {
  150. sqlx::query!(
  151. "update peer set id=?, pk=?, info=? where guid=?",
  152. id,
  153. pk,
  154. info,
  155. guid
  156. )
  157. .execute(self.pool.get().await?.deref_mut())
  158. .await?;
  159. Ok(())
  160. }
  161. }
  162. #[cfg(test)]
  163. mod tests {
  164. use hbb_common::tokio;
  165. #[test]
  166. fn test_insert() {
  167. insert();
  168. }
  169. #[tokio::main(flavor = "multi_thread")]
  170. async fn insert() {
  171. let db = super::Database::new("test.sqlite3").await.unwrap();
  172. let mut jobs = vec![];
  173. for i in 0..10000 {
  174. let cloned = db.clone();
  175. let id = i.to_string();
  176. let a = tokio::spawn(async move {
  177. let empty_vec = Vec::new();
  178. cloned
  179. .insert_peer(&id, &empty_vec, &empty_vec, "")
  180. .await
  181. .unwrap();
  182. });
  183. jobs.push(a);
  184. }
  185. for i in 0..10000 {
  186. let cloned = db.clone();
  187. let id = i.to_string();
  188. let a = tokio::spawn(async move {
  189. cloned.get_peer(&id).await.unwrap();
  190. });
  191. jobs.push(a);
  192. }
  193. hbb_common::futures::future::join_all(jobs).await;
  194. }
  195. }
  196. #[inline]
  197. pub fn guid2str(guid: &Vec<u8>) -> String {
  198. let mut bytes = [0u8; 16];
  199. bytes[..].copy_from_slice(&guid);
  200. uuid::Uuid::from_bytes(bytes).to_string()
  201. }
  202. pub(crate) fn get_str(v: &Value) -> Option<&str> {
  203. match v {
  204. Value::String(v) => {
  205. let v = v.trim();
  206. if v.is_empty() {
  207. None
  208. } else {
  209. Some(v)
  210. }
  211. }
  212. _ => None,
  213. }
  214. }