sled_async.rs 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. use hbb_common::{
  2. allow_err, log,
  3. tokio::{self, sync::mpsc},
  4. ResultType,
  5. };
  6. use rocksdb::DB;
  7. #[derive(Debug)]
  8. enum Action {
  9. Insert((String, Vec<u8>)),
  10. Get((String, mpsc::Sender<Option<Vec<u8>>>)),
  11. _Close,
  12. }
  13. #[derive(Clone)]
  14. pub struct SledAsync {
  15. tx: Option<mpsc::UnboundedSender<Action>>,
  16. path: String,
  17. }
  18. impl SledAsync {
  19. pub fn new(path: &str, run: bool) -> ResultType<Self> {
  20. let mut res = Self {
  21. tx: None,
  22. path: path.to_owned(),
  23. };
  24. if run {
  25. res.run()?;
  26. }
  27. Ok(res)
  28. }
  29. pub fn run(&mut self) -> ResultType<std::thread::JoinHandle<()>> {
  30. let (tx, rx) = mpsc::unbounded_channel::<Action>();
  31. self.tx = Some(tx);
  32. let db = DB::open_default(&self.path)?;
  33. Ok(std::thread::spawn(move || {
  34. Self::io_loop(db, rx);
  35. log::debug!("Exit SledAsync loop");
  36. }))
  37. }
  38. #[tokio::main(basic_scheduler)]
  39. async fn io_loop(db: DB, rx: mpsc::UnboundedReceiver<Action>) {
  40. let mut rx = rx;
  41. while let Some(x) = rx.recv().await {
  42. match x {
  43. Action::Insert((key, value)) => {
  44. allow_err!(db.put(&key, &value));
  45. }
  46. Action::Get((key, sender)) => {
  47. let mut sender = sender;
  48. allow_err!(
  49. sender
  50. .send(if let Ok(v) = db.get(key) { v } else { None })
  51. .await
  52. );
  53. }
  54. Action::_Close => break,
  55. }
  56. }
  57. }
  58. pub fn _close(self, j: std::thread::JoinHandle<()>) {
  59. if let Some(tx) = &self.tx {
  60. allow_err!(tx.send(Action::_Close));
  61. }
  62. allow_err!(j.join());
  63. }
  64. pub async fn get(&mut self, key: String) -> Option<Vec<u8>> {
  65. if let Some(tx) = &self.tx {
  66. let (tx_once, mut rx) = mpsc::channel::<Option<Vec<u8>>>(1);
  67. allow_err!(tx.send(Action::Get((key, tx_once))));
  68. if let Some(v) = rx.recv().await {
  69. return v;
  70. }
  71. }
  72. None
  73. }
  74. #[inline]
  75. pub fn deserialize<'a, T: serde::Deserialize<'a>>(v: &'a Option<Vec<u8>>) -> Option<T> {
  76. if let Some(v) = v {
  77. if let Ok(v) = std::str::from_utf8(v) {
  78. if let Ok(v) = serde_json::from_str::<T>(&v) {
  79. return Some(v);
  80. }
  81. }
  82. }
  83. None
  84. }
  85. pub fn insert<T: serde::Serialize>(&mut self, key: String, v: T) {
  86. if let Some(tx) = &self.tx {
  87. if let Ok(v) = serde_json::to_vec(&v) {
  88. allow_err!(tx.send(Action::Insert((key, v))));
  89. }
  90. }
  91. }
  92. }