fs.rs 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840
  1. #[cfg(windows)]
  2. use std::os::windows::prelude::*;
  3. use std::path::{Path, PathBuf};
  4. use std::time::{Duration, SystemTime, UNIX_EPOCH};
  5. use serde_derive::{Deserialize, Serialize};
  6. use tokio::{fs::File, io::*};
  7. use crate::{bail, get_version_number, message_proto::*, ResultType, Stream};
  8. // https://doc.rust-lang.org/std/os/windows/fs/trait.MetadataExt.html
  9. use crate::{
  10. compress::{compress, decompress},
  11. config::{Config, COMPRESS_LEVEL},
  12. };
  13. pub fn read_dir(path: &Path, include_hidden: bool) -> ResultType<FileDirectory> {
  14. let mut dir = FileDirectory {
  15. path: get_string(path),
  16. ..Default::default()
  17. };
  18. #[cfg(windows)]
  19. if "/" == &get_string(path) {
  20. let drives = unsafe { winapi::um::fileapi::GetLogicalDrives() };
  21. for i in 0..32 {
  22. if drives & (1 << i) != 0 {
  23. let name = format!(
  24. "{}:",
  25. std::char::from_u32('A' as u32 + i as u32).unwrap_or('A')
  26. );
  27. dir.entries.push(FileEntry {
  28. name,
  29. entry_type: FileType::DirDrive.into(),
  30. ..Default::default()
  31. });
  32. }
  33. }
  34. return Ok(dir);
  35. }
  36. for entry in path.read_dir()?.flatten() {
  37. let p = entry.path();
  38. let name = p
  39. .file_name()
  40. .map(|p| p.to_str().unwrap_or(""))
  41. .unwrap_or("")
  42. .to_owned();
  43. if name.is_empty() {
  44. continue;
  45. }
  46. let mut is_hidden = false;
  47. let meta;
  48. if let Ok(tmp) = std::fs::symlink_metadata(&p) {
  49. meta = tmp;
  50. } else {
  51. continue;
  52. }
  53. // docs.microsoft.com/en-us/windows/win32/fileio/file-attribute-constants
  54. #[cfg(windows)]
  55. if meta.file_attributes() & 0x2 != 0 {
  56. is_hidden = true;
  57. }
  58. #[cfg(not(windows))]
  59. if name.find('.').unwrap_or(usize::MAX) == 0 {
  60. is_hidden = true;
  61. }
  62. if is_hidden && !include_hidden {
  63. continue;
  64. }
  65. let (entry_type, size) = {
  66. if p.is_dir() {
  67. if meta.file_type().is_symlink() {
  68. (FileType::DirLink.into(), 0)
  69. } else {
  70. (FileType::Dir.into(), 0)
  71. }
  72. } else if meta.file_type().is_symlink() {
  73. (FileType::FileLink.into(), 0)
  74. } else {
  75. (FileType::File.into(), meta.len())
  76. }
  77. };
  78. let modified_time = meta
  79. .modified()
  80. .map(|x| {
  81. x.duration_since(std::time::SystemTime::UNIX_EPOCH)
  82. .map(|x| x.as_secs())
  83. .unwrap_or(0)
  84. })
  85. .unwrap_or(0);
  86. dir.entries.push(FileEntry {
  87. name: get_file_name(&p),
  88. entry_type,
  89. is_hidden,
  90. size,
  91. modified_time,
  92. ..Default::default()
  93. });
  94. }
  95. Ok(dir)
  96. }
  97. #[inline]
  98. pub fn get_file_name(p: &Path) -> String {
  99. p.file_name()
  100. .map(|p| p.to_str().unwrap_or(""))
  101. .unwrap_or("")
  102. .to_owned()
  103. }
  104. #[inline]
  105. pub fn get_string(path: &Path) -> String {
  106. path.to_str().unwrap_or("").to_owned()
  107. }
  108. #[inline]
  109. pub fn get_path(path: &str) -> PathBuf {
  110. Path::new(path).to_path_buf()
  111. }
  112. #[inline]
  113. pub fn get_home_as_string() -> String {
  114. get_string(&Config::get_home())
  115. }
  116. fn read_dir_recursive(
  117. path: &PathBuf,
  118. prefix: &Path,
  119. include_hidden: bool,
  120. ) -> ResultType<Vec<FileEntry>> {
  121. let mut files = Vec::new();
  122. if path.is_dir() {
  123. // to-do: symbol link handling, cp the link rather than the content
  124. // to-do: file mode, for unix
  125. let fd = read_dir(path, include_hidden)?;
  126. for entry in fd.entries.iter() {
  127. match entry.entry_type.enum_value() {
  128. Ok(FileType::File) => {
  129. let mut entry = entry.clone();
  130. entry.name = get_string(&prefix.join(entry.name));
  131. files.push(entry);
  132. }
  133. Ok(FileType::Dir) => {
  134. if let Ok(mut tmp) = read_dir_recursive(
  135. &path.join(&entry.name),
  136. &prefix.join(&entry.name),
  137. include_hidden,
  138. ) {
  139. for entry in tmp.drain(0..) {
  140. files.push(entry);
  141. }
  142. }
  143. }
  144. _ => {}
  145. }
  146. }
  147. Ok(files)
  148. } else if path.is_file() {
  149. let (size, modified_time) = if let Ok(meta) = std::fs::metadata(path) {
  150. (
  151. meta.len(),
  152. meta.modified()
  153. .map(|x| {
  154. x.duration_since(std::time::SystemTime::UNIX_EPOCH)
  155. .map(|x| x.as_secs())
  156. .unwrap_or(0)
  157. })
  158. .unwrap_or(0),
  159. )
  160. } else {
  161. (0, 0)
  162. };
  163. files.push(FileEntry {
  164. entry_type: FileType::File.into(),
  165. size,
  166. modified_time,
  167. ..Default::default()
  168. });
  169. Ok(files)
  170. } else {
  171. bail!("Not exists");
  172. }
  173. }
  174. pub fn get_recursive_files(path: &str, include_hidden: bool) -> ResultType<Vec<FileEntry>> {
  175. read_dir_recursive(&get_path(path), &get_path(""), include_hidden)
  176. }
  177. #[inline]
  178. pub fn is_file_exists(file_path: &str) -> bool {
  179. return Path::new(file_path).exists();
  180. }
  181. #[inline]
  182. pub fn can_enable_overwrite_detection(version: i64) -> bool {
  183. version >= get_version_number("1.1.10")
  184. }
  185. #[derive(Default)]
  186. pub struct TransferJob {
  187. pub id: i32,
  188. pub remote: String,
  189. pub path: PathBuf,
  190. pub show_hidden: bool,
  191. pub is_remote: bool,
  192. pub is_last_job: bool,
  193. pub file_num: i32,
  194. pub files: Vec<FileEntry>,
  195. file: Option<File>,
  196. total_size: u64,
  197. finished_size: u64,
  198. transferred: u64,
  199. enable_overwrite_detection: bool,
  200. file_confirmed: bool,
  201. // indicating the last file is skipped
  202. file_skipped: bool,
  203. file_is_waiting: bool,
  204. default_overwrite_strategy: Option<bool>,
  205. }
  206. #[derive(Debug, Default, Serialize, Deserialize, Clone)]
  207. pub struct TransferJobMeta {
  208. #[serde(default)]
  209. pub id: i32,
  210. #[serde(default)]
  211. pub remote: String,
  212. #[serde(default)]
  213. pub to: String,
  214. #[serde(default)]
  215. pub show_hidden: bool,
  216. #[serde(default)]
  217. pub file_num: i32,
  218. #[serde(default)]
  219. pub is_remote: bool,
  220. }
  221. #[derive(Debug, Default, Serialize, Deserialize, Clone)]
  222. pub struct RemoveJobMeta {
  223. #[serde(default)]
  224. pub path: String,
  225. #[serde(default)]
  226. pub is_remote: bool,
  227. #[serde(default)]
  228. pub no_confirm: bool,
  229. }
  230. #[inline]
  231. fn get_ext(name: &str) -> &str {
  232. if let Some(i) = name.rfind('.') {
  233. return &name[i + 1..];
  234. }
  235. ""
  236. }
  237. #[inline]
  238. fn is_compressed_file(name: &str) -> bool {
  239. let ext = get_ext(name);
  240. ext == "xz"
  241. || ext == "gz"
  242. || ext == "zip"
  243. || ext == "7z"
  244. || ext == "rar"
  245. || ext == "bz2"
  246. || ext == "tgz"
  247. || ext == "png"
  248. || ext == "jpg"
  249. }
  250. impl TransferJob {
  251. #[allow(clippy::too_many_arguments)]
  252. pub fn new_write(
  253. id: i32,
  254. remote: String,
  255. path: String,
  256. file_num: i32,
  257. show_hidden: bool,
  258. is_remote: bool,
  259. files: Vec<FileEntry>,
  260. enable_overwrite_detection: bool,
  261. ) -> Self {
  262. log::info!("new write {}", path);
  263. let total_size = files.iter().map(|x| x.size).sum();
  264. Self {
  265. id,
  266. remote,
  267. path: get_path(&path),
  268. file_num,
  269. show_hidden,
  270. is_remote,
  271. files,
  272. total_size,
  273. enable_overwrite_detection,
  274. ..Default::default()
  275. }
  276. }
  277. pub fn new_read(
  278. id: i32,
  279. remote: String,
  280. path: String,
  281. file_num: i32,
  282. show_hidden: bool,
  283. is_remote: bool,
  284. enable_overwrite_detection: bool,
  285. ) -> ResultType<Self> {
  286. log::info!("new read {}", path);
  287. let files = get_recursive_files(&path, show_hidden)?;
  288. let total_size = files.iter().map(|x| x.size).sum();
  289. Ok(Self {
  290. id,
  291. remote,
  292. path: get_path(&path),
  293. file_num,
  294. show_hidden,
  295. is_remote,
  296. files,
  297. total_size,
  298. enable_overwrite_detection,
  299. ..Default::default()
  300. })
  301. }
  302. #[inline]
  303. pub fn files(&self) -> &Vec<FileEntry> {
  304. &self.files
  305. }
  306. #[inline]
  307. pub fn set_files(&mut self, files: Vec<FileEntry>) {
  308. self.files = files;
  309. }
  310. #[inline]
  311. pub fn id(&self) -> i32 {
  312. self.id
  313. }
  314. #[inline]
  315. pub fn total_size(&self) -> u64 {
  316. self.total_size
  317. }
  318. #[inline]
  319. pub fn finished_size(&self) -> u64 {
  320. self.finished_size
  321. }
  322. #[inline]
  323. pub fn transferred(&self) -> u64 {
  324. self.transferred
  325. }
  326. #[inline]
  327. pub fn file_num(&self) -> i32 {
  328. self.file_num
  329. }
  330. pub fn modify_time(&self) {
  331. let file_num = self.file_num as usize;
  332. if file_num < self.files.len() {
  333. let entry = &self.files[file_num];
  334. let path = self.join(&entry.name);
  335. let download_path = format!("{}.download", get_string(&path));
  336. std::fs::rename(download_path, &path).ok();
  337. filetime::set_file_mtime(
  338. &path,
  339. filetime::FileTime::from_unix_time(entry.modified_time as _, 0),
  340. )
  341. .ok();
  342. }
  343. }
  344. pub fn remove_download_file(&self) {
  345. let file_num = self.file_num as usize;
  346. if file_num < self.files.len() {
  347. let entry = &self.files[file_num];
  348. let path = self.join(&entry.name);
  349. let download_path = format!("{}.download", get_string(&path));
  350. std::fs::remove_file(download_path).ok();
  351. }
  352. }
  353. pub async fn write(&mut self, block: FileTransferBlock) -> ResultType<()> {
  354. if block.id != self.id {
  355. bail!("Wrong id");
  356. }
  357. let file_num = block.file_num as usize;
  358. if file_num >= self.files.len() {
  359. bail!("Wrong file number");
  360. }
  361. if file_num != self.file_num as usize || self.file.is_none() {
  362. self.modify_time();
  363. if let Some(file) = self.file.as_mut() {
  364. file.sync_all().await?;
  365. }
  366. self.file_num = block.file_num;
  367. let entry = &self.files[file_num];
  368. let path = self.join(&entry.name);
  369. if let Some(p) = path.parent() {
  370. std::fs::create_dir_all(p).ok();
  371. }
  372. let path = format!("{}.download", get_string(&path));
  373. self.file = Some(File::create(&path).await?);
  374. }
  375. if block.compressed {
  376. let tmp = decompress(&block.data);
  377. self.file.as_mut().unwrap().write_all(&tmp).await?;
  378. self.finished_size += tmp.len() as u64;
  379. } else {
  380. self.file.as_mut().unwrap().write_all(&block.data).await?;
  381. self.finished_size += block.data.len() as u64;
  382. }
  383. self.transferred += block.data.len() as u64;
  384. Ok(())
  385. }
  386. #[inline]
  387. pub fn join(&self, name: &str) -> PathBuf {
  388. if name.is_empty() {
  389. self.path.clone()
  390. } else {
  391. self.path.join(name)
  392. }
  393. }
  394. pub async fn read(&mut self, stream: &mut Stream) -> ResultType<Option<FileTransferBlock>> {
  395. let file_num = self.file_num as usize;
  396. if file_num >= self.files.len() {
  397. self.file.take();
  398. return Ok(None);
  399. }
  400. let name = &self.files[file_num].name;
  401. if self.file.is_none() {
  402. match File::open(self.join(name)).await {
  403. Ok(file) => {
  404. self.file = Some(file);
  405. self.file_confirmed = false;
  406. self.file_is_waiting = false;
  407. }
  408. Err(err) => {
  409. self.file_num += 1;
  410. self.file_confirmed = false;
  411. self.file_is_waiting = false;
  412. return Err(err.into());
  413. }
  414. }
  415. }
  416. if self.enable_overwrite_detection && !self.file_confirmed() {
  417. if !self.file_is_waiting() {
  418. self.send_current_digest(stream).await?;
  419. self.set_file_is_waiting(true);
  420. }
  421. return Ok(None);
  422. }
  423. const BUF_SIZE: usize = 128 * 1024;
  424. let mut buf: Vec<u8> = vec![0; BUF_SIZE];
  425. let mut compressed = false;
  426. let mut offset: usize = 0;
  427. loop {
  428. match self.file.as_mut().unwrap().read(&mut buf[offset..]).await {
  429. Err(err) => {
  430. self.file_num += 1;
  431. self.file = None;
  432. self.file_confirmed = false;
  433. self.file_is_waiting = false;
  434. return Err(err.into());
  435. }
  436. Ok(n) => {
  437. offset += n;
  438. if n == 0 || offset == BUF_SIZE {
  439. break;
  440. }
  441. }
  442. }
  443. }
  444. unsafe { buf.set_len(offset) };
  445. if offset == 0 {
  446. self.file_num += 1;
  447. self.file = None;
  448. self.file_confirmed = false;
  449. self.file_is_waiting = false;
  450. } else {
  451. self.finished_size += offset as u64;
  452. if !is_compressed_file(name) {
  453. let tmp = compress(&buf, COMPRESS_LEVEL);
  454. if tmp.len() < buf.len() {
  455. buf = tmp;
  456. compressed = true;
  457. }
  458. }
  459. self.transferred += buf.len() as u64;
  460. }
  461. Ok(Some(FileTransferBlock {
  462. id: self.id,
  463. file_num: file_num as _,
  464. data: buf.into(),
  465. compressed,
  466. ..Default::default()
  467. }))
  468. }
  469. async fn send_current_digest(&mut self, stream: &mut Stream) -> ResultType<()> {
  470. let mut msg = Message::new();
  471. let mut resp = FileResponse::new();
  472. let meta = self.file.as_ref().unwrap().metadata().await?;
  473. let last_modified = meta
  474. .modified()?
  475. .duration_since(SystemTime::UNIX_EPOCH)?
  476. .as_secs();
  477. resp.set_digest(FileTransferDigest {
  478. id: self.id,
  479. file_num: self.file_num,
  480. last_modified,
  481. file_size: meta.len(),
  482. ..Default::default()
  483. });
  484. msg.set_file_response(resp);
  485. stream.send(&msg).await?;
  486. log::info!(
  487. "id: {}, file_num:{}, digest message is sent. waiting for confirm. msg: {:?}",
  488. self.id,
  489. self.file_num,
  490. msg
  491. );
  492. Ok(())
  493. }
  494. pub fn set_overwrite_strategy(&mut self, overwrite_strategy: Option<bool>) {
  495. self.default_overwrite_strategy = overwrite_strategy;
  496. }
  497. pub fn default_overwrite_strategy(&self) -> Option<bool> {
  498. self.default_overwrite_strategy
  499. }
  500. pub fn set_file_confirmed(&mut self, file_confirmed: bool) {
  501. log::info!("id: {}, file_confirmed: {}", self.id, file_confirmed);
  502. self.file_confirmed = file_confirmed;
  503. self.file_skipped = false;
  504. }
  505. pub fn set_file_is_waiting(&mut self, file_is_waiting: bool) {
  506. self.file_is_waiting = file_is_waiting;
  507. }
  508. #[inline]
  509. pub fn file_is_waiting(&self) -> bool {
  510. self.file_is_waiting
  511. }
  512. #[inline]
  513. pub fn file_confirmed(&self) -> bool {
  514. self.file_confirmed
  515. }
  516. /// Indicating whether the last file is skipped
  517. #[inline]
  518. pub fn file_skipped(&self) -> bool {
  519. self.file_skipped
  520. }
  521. /// Indicating whether the whole task is skipped
  522. #[inline]
  523. pub fn job_skipped(&self) -> bool {
  524. self.file_skipped() && self.files.len() == 1
  525. }
  526. /// Check whether the job is completed after `read` returns `None`
  527. /// This is a helper function which gives additional lifecycle when the job reads `None`.
  528. /// If returns `true`, it means we can delete the job automatically. `False` otherwise.
  529. ///
  530. /// [`Note`]
  531. /// Conditions:
  532. /// 1. Files are not waiting for confirmation by peers.
  533. #[inline]
  534. pub fn job_completed(&self) -> bool {
  535. // has no error, Condition 2
  536. !self.enable_overwrite_detection || (!self.file_confirmed && !self.file_is_waiting)
  537. }
  538. /// Get job error message, useful for getting status when job had finished
  539. pub fn job_error(&self) -> Option<String> {
  540. if self.job_skipped() {
  541. return Some("skipped".to_string());
  542. }
  543. None
  544. }
  545. pub fn set_file_skipped(&mut self) -> bool {
  546. log::debug!("skip file {} in job {}", self.file_num, self.id);
  547. self.file.take();
  548. self.set_file_confirmed(false);
  549. self.set_file_is_waiting(false);
  550. self.file_num += 1;
  551. self.file_skipped = true;
  552. true
  553. }
  554. pub fn confirm(&mut self, r: &FileTransferSendConfirmRequest) -> bool {
  555. if self.file_num() != r.file_num {
  556. log::info!("file num truncated, ignoring");
  557. } else {
  558. match r.union {
  559. Some(file_transfer_send_confirm_request::Union::Skip(s)) => {
  560. if s {
  561. self.set_file_skipped();
  562. } else {
  563. self.set_file_confirmed(true);
  564. }
  565. }
  566. Some(file_transfer_send_confirm_request::Union::OffsetBlk(_offset)) => {
  567. self.set_file_confirmed(true);
  568. }
  569. _ => {}
  570. }
  571. }
  572. true
  573. }
  574. #[inline]
  575. pub fn gen_meta(&self) -> TransferJobMeta {
  576. TransferJobMeta {
  577. id: self.id,
  578. remote: self.remote.to_string(),
  579. to: self.path.to_string_lossy().to_string(),
  580. file_num: self.file_num,
  581. show_hidden: self.show_hidden,
  582. is_remote: self.is_remote,
  583. }
  584. }
  585. }
  586. #[inline]
  587. pub fn new_error<T: std::string::ToString>(id: i32, err: T, file_num: i32) -> Message {
  588. let mut resp = FileResponse::new();
  589. resp.set_error(FileTransferError {
  590. id,
  591. error: err.to_string(),
  592. file_num,
  593. ..Default::default()
  594. });
  595. let mut msg_out = Message::new();
  596. msg_out.set_file_response(resp);
  597. msg_out
  598. }
  599. #[inline]
  600. pub fn new_dir(id: i32, path: String, files: Vec<FileEntry>) -> Message {
  601. let mut resp = FileResponse::new();
  602. resp.set_dir(FileDirectory {
  603. id,
  604. path,
  605. entries: files,
  606. ..Default::default()
  607. });
  608. let mut msg_out = Message::new();
  609. msg_out.set_file_response(resp);
  610. msg_out
  611. }
  612. #[inline]
  613. pub fn new_block(block: FileTransferBlock) -> Message {
  614. let mut resp = FileResponse::new();
  615. resp.set_block(block);
  616. let mut msg_out = Message::new();
  617. msg_out.set_file_response(resp);
  618. msg_out
  619. }
  620. #[inline]
  621. pub fn new_send_confirm(r: FileTransferSendConfirmRequest) -> Message {
  622. let mut msg_out = Message::new();
  623. let mut action = FileAction::new();
  624. action.set_send_confirm(r);
  625. msg_out.set_file_action(action);
  626. msg_out
  627. }
  628. #[inline]
  629. pub fn new_receive(id: i32, path: String, file_num: i32, files: Vec<FileEntry>) -> Message {
  630. let mut action = FileAction::new();
  631. action.set_receive(FileTransferReceiveRequest {
  632. id,
  633. path,
  634. files,
  635. file_num,
  636. ..Default::default()
  637. });
  638. let mut msg_out = Message::new();
  639. msg_out.set_file_action(action);
  640. msg_out
  641. }
  642. #[inline]
  643. pub fn new_send(id: i32, path: String, file_num: i32, include_hidden: bool) -> Message {
  644. log::info!("new send: {},id : {}", path, id);
  645. let mut action = FileAction::new();
  646. action.set_send(FileTransferSendRequest {
  647. id,
  648. path,
  649. include_hidden,
  650. file_num,
  651. ..Default::default()
  652. });
  653. let mut msg_out = Message::new();
  654. msg_out.set_file_action(action);
  655. msg_out
  656. }
  657. #[inline]
  658. pub fn new_done(id: i32, file_num: i32) -> Message {
  659. let mut resp = FileResponse::new();
  660. resp.set_done(FileTransferDone {
  661. id,
  662. file_num,
  663. ..Default::default()
  664. });
  665. let mut msg_out = Message::new();
  666. msg_out.set_file_response(resp);
  667. msg_out
  668. }
  669. #[inline]
  670. pub fn remove_job(id: i32, jobs: &mut Vec<TransferJob>) {
  671. *jobs = jobs.drain(0..).filter(|x| x.id() != id).collect();
  672. }
  673. #[inline]
  674. pub fn get_job(id: i32, jobs: &mut [TransferJob]) -> Option<&mut TransferJob> {
  675. jobs.iter_mut().find(|x| x.id() == id)
  676. }
  677. pub async fn handle_read_jobs(
  678. jobs: &mut Vec<TransferJob>,
  679. stream: &mut crate::Stream,
  680. ) -> ResultType<()> {
  681. let mut finished = Vec::new();
  682. for job in jobs.iter_mut() {
  683. if job.is_last_job {
  684. continue;
  685. }
  686. match job.read(stream).await {
  687. Err(err) => {
  688. stream
  689. .send(&new_error(job.id(), err, job.file_num()))
  690. .await?;
  691. }
  692. Ok(Some(block)) => {
  693. stream.send(&new_block(block)).await?;
  694. }
  695. Ok(None) => {
  696. if job.job_completed() {
  697. finished.push(job.id());
  698. let err = job.job_error();
  699. if err.is_some() {
  700. stream
  701. .send(&new_error(job.id(), err.unwrap(), job.file_num()))
  702. .await?;
  703. } else {
  704. stream.send(&new_done(job.id(), job.file_num())).await?;
  705. }
  706. } else {
  707. // waiting confirmation.
  708. }
  709. }
  710. }
  711. }
  712. for id in finished {
  713. remove_job(id, jobs);
  714. }
  715. Ok(())
  716. }
  717. pub fn remove_all_empty_dir(path: &PathBuf) -> ResultType<()> {
  718. let fd = read_dir(path, true)?;
  719. for entry in fd.entries.iter() {
  720. match entry.entry_type.enum_value() {
  721. Ok(FileType::Dir) => {
  722. remove_all_empty_dir(&path.join(&entry.name)).ok();
  723. }
  724. Ok(FileType::DirLink) | Ok(FileType::FileLink) => {
  725. std::fs::remove_file(path.join(&entry.name)).ok();
  726. }
  727. _ => {}
  728. }
  729. }
  730. std::fs::remove_dir(path).ok();
  731. Ok(())
  732. }
  733. #[inline]
  734. pub fn remove_file(file: &str) -> ResultType<()> {
  735. std::fs::remove_file(get_path(file))?;
  736. Ok(())
  737. }
  738. #[inline]
  739. pub fn create_dir(dir: &str) -> ResultType<()> {
  740. std::fs::create_dir_all(get_path(dir))?;
  741. Ok(())
  742. }
  743. #[inline]
  744. pub fn transform_windows_path(entries: &mut Vec<FileEntry>) {
  745. for entry in entries {
  746. entry.name = entry.name.replace('\\', "/");
  747. }
  748. }
  749. pub enum DigestCheckResult {
  750. IsSame,
  751. NeedConfirm(FileTransferDigest),
  752. NoSuchFile,
  753. }
  754. #[inline]
  755. pub fn is_write_need_confirmation(
  756. file_path: &str,
  757. digest: &FileTransferDigest,
  758. ) -> ResultType<DigestCheckResult> {
  759. let path = Path::new(file_path);
  760. if path.exists() && path.is_file() {
  761. let metadata = std::fs::metadata(path)?;
  762. let modified_time = metadata.modified()?;
  763. let remote_mt = Duration::from_secs(digest.last_modified);
  764. let local_mt = modified_time.duration_since(UNIX_EPOCH)?;
  765. if remote_mt == local_mt && digest.file_size == metadata.len() {
  766. return Ok(DigestCheckResult::IsSame);
  767. }
  768. Ok(DigestCheckResult::NeedConfirm(FileTransferDigest {
  769. id: digest.id,
  770. file_num: digest.file_num,
  771. last_modified: local_mt.as_secs(),
  772. file_size: metadata.len(),
  773. ..Default::default()
  774. }))
  775. } else {
  776. Ok(DigestCheckResult::NoSuchFile)
  777. }
  778. }