From d6fc36596716c83133d8a8887fbc725373e63352 Mon Sep 17 00:00:00 2001 From: 魏曹先生 <1992414357@qq.com> Date: Mon, 12 Jan 2026 21:21:52 +0800 Subject: Rename track.rs to track_file.rs --- actions/src/remote_actions/content_manage.rs | 2 +- actions/src/remote_actions/content_manage/track.rs | 985 --------------------- .../remote_actions/content_manage/track_file.rs | 985 +++++++++++++++++++++ 3 files changed, 986 insertions(+), 986 deletions(-) delete mode 100644 actions/src/remote_actions/content_manage/track.rs create mode 100644 actions/src/remote_actions/content_manage/track_file.rs (limited to 'actions/src') diff --git a/actions/src/remote_actions/content_manage.rs b/actions/src/remote_actions/content_manage.rs index aa3cb07..1568085 100644 --- a/actions/src/remote_actions/content_manage.rs +++ b/actions/src/remote_actions/content_manage.rs @@ -1 +1 @@ -pub mod track; \ No newline at end of file +pub mod track_file; diff --git a/actions/src/remote_actions/content_manage/track.rs b/actions/src/remote_actions/content_manage/track.rs deleted file mode 100644 index a59ca76..0000000 --- a/actions/src/remote_actions/content_manage/track.rs +++ /dev/null @@ -1,985 +0,0 @@ -use std::{ - collections::{HashMap, HashSet}, - path::PathBuf, - sync::Arc, - time::SystemTime, -}; - -use action_system::{action::ActionContext, macros::action_gen}; -use cfg_file::config::ConfigFile; -use serde::{Deserialize, Serialize}; -use sha1_hash::calc_sha1; -use tcp_connection::{error::TcpTargetError, instance::ConnectionInstance}; -use tokio::{fs, sync::Mutex}; -use vcs_data::{ - constants::{CLIENT_FILE_TEMP_FILE, KEY_TEMP_NAME}, - data::{ - local::{ - cached_sheet::CachedSheet, latest_file_data::LatestFileData, - local_sheet::LocalMappingMetadata, modified_status::sign_vault_modified, - workspace_analyzer::AnalyzeResult, - }, - member::MemberId, - sheet::SheetName, - vault::{ - vault_config::VaultUuid, - virtual_file::{VirtualFileId, VirtualFileVersion, VirtualFileVersionDescription}, - }, - }, -}; - -use crate::{ - local_println, - remote_actions::{ - auth_member, check_connection_instance, get_current_sheet_name, try_get_local_output, - try_get_local_workspace, try_get_vault, - }, -}; - -pub type NextVersion = String; -pub type UpdateDescription = String; - -#[derive(Serialize, Deserialize)] -pub struct TrackFileActionArguments { - // Path need to track - pub relative_pathes: HashSet, - - // File update info - pub file_update_info: HashMap, - - // Print infos - pub print_infos: bool, - - // overwrite modified files - pub allow_overwrite_modified: bool, -} - -#[derive(Serialize, Deserialize)] -pub enum TrackFileActionResult { - Done { - created: Vec, - updated: Vec, - synced: Vec, - skipped: Vec, - }, - - // Fail - AuthorizeFailed(String), - - /// There are local move or missing items that have not been resolved, - /// this situation does not allow track - StructureChangesNotSolved, - - CreateTaskFailed(CreateTaskResult), - UpdateTaskFailed(UpdateTaskResult), - SyncTaskFailed(SyncTaskResult), -} - -#[derive(Serialize, Deserialize)] -pub enum CreateTaskResult { - Success(Vec), // Success(success_relative_pathes) - - /// Create file on existing path in the sheet - CreateFileOnExistPath(PathBuf), - - /// Sheet not found - SheetNotFound(SheetName), -} - -#[derive(Serialize, Deserialize)] -pub enum UpdateTaskResult { - Success(Vec), // Success(success_relative_pathes) - - VerifyFailed { - path: PathBuf, - reason: VerifyFailReason, - }, -} - -#[derive(Serialize, Deserialize, Clone)] -pub enum VerifyFailReason { - SheetNotFound(SheetName), - MappingNotFound, - VirtualFileNotFound(VirtualFileId), - VirtualFileReadFailed(VirtualFileId), - NotHeld, - VersionDismatch(VirtualFileVersion, VirtualFileVersion), // (CurrentVersion, RemoteVersion) - UpdateButNoDescription, // File needs update, but no description exists - VersionAlreadyExist(VirtualFileVersion), // (RemoteVersion) -} - -#[derive(Serialize, Deserialize)] -pub enum SyncTaskResult { - Success(Vec), // Success(success_relative_pathes) -} -#[action_gen] -pub async fn track_file_action( - ctx: ActionContext, - arguments: TrackFileActionArguments, -) -> Result { - let relative_pathes = arguments.relative_pathes; - let instance = check_connection_instance(&ctx)?; - - // Auth Member - let (member_id, is_host_mode) = match auth_member(&ctx, instance).await { - Ok(id) => id, - Err(e) => return Ok(TrackFileActionResult::AuthorizeFailed(e.to_string())), - }; - - // Check sheet - let (sheet_name, is_ref_sheet) = - get_current_sheet_name(&ctx, instance, &member_id, true).await?; - - // Can modify Sheet when not in reference sheet or in Host mode - let can_modify_sheet = !is_ref_sheet || is_host_mode; - - if ctx.is_proc_on_local() { - let workspace = try_get_local_workspace(&ctx)?; - let analyzed = AnalyzeResult::analyze_local_status(&workspace).await?; - let latest_file_data = - LatestFileData::read_from(LatestFileData::data_path(&member_id)?).await?; - - if !analyzed.lost.is_empty() || !analyzed.moved.is_empty() { - return Ok(TrackFileActionResult::StructureChangesNotSolved); - } - - let Some(sheet_in_use) = workspace.config().lock().await.sheet_in_use().clone() else { - return Err(TcpTargetError::NotFound("Sheet not found!".to_string())); - }; - - // Read local sheet and member held - let local_sheet = workspace.local_sheet(&member_id, &sheet_in_use).await?; - let cached_sheet = CachedSheet::cached_sheet_data(&sheet_in_use).await?; - let member_held = LatestFileData::read_from(LatestFileData::data_path(&member_id)?).await?; - - let modified = analyzed - .modified - .intersection(&relative_pathes) - .cloned() - .collect::>(); - - // Filter out created files - let created_task = analyzed - .created - .intersection(&relative_pathes) - .cloned() - .collect::>(); - - // Filter out modified files that need to be updated - let mut update_task: Vec = { - let result = modified.iter().filter_map(|p| { - if let Ok(local_data) = local_sheet.mapping_data(p) { - let id = local_data.mapping_vfid(); - let local_ver = local_data.version_when_updated(); - let Some(latest_ver) = latest_file_data.file_version(id) else { - return None; - }; - if let Some(held_member) = member_held.file_holder(id) { - // Check if holder and version match - if held_member == &member_id && local_ver == latest_ver { - return Some(p.clone()); - } - } - }; - None - }); - result.collect() - }; - - let mut skipped_task: Vec = Vec::new(); - - // Filter out files that do not exist locally or have version inconsistencies and need to be synchronized - let mut sync_task: Vec = { - let other: Vec = relative_pathes - .iter() - .filter(|p| !created_task.contains(p) && !update_task.contains(p)) - .cloned() - .collect(); - - let result = other.iter().filter_map(|p| { - // Not exists and not lost, first download - if !workspace.local_path().join(p).exists() && !analyzed.lost.contains(p) { - return Some(p.clone()); - } - - // In cached sheet - if !cached_sheet.mapping().contains_key(p) { - return None; - } - - // In local sheet - let local_sheet_mapping = local_sheet.mapping_data(p).ok()?; - let vfid = local_sheet_mapping.mapping_vfid(); - - if let Some(latest_version) = &latest_file_data.file_version(vfid) { - // Version does not match - if &local_sheet_mapping.version_when_updated() != latest_version { - let modified = modified.contains(p); - if modified && arguments.allow_overwrite_modified { - return Some(p.clone()); - } else if modified && !arguments.allow_overwrite_modified { - // If not allowed to overwrite, join skipped tasks - skipped_task.push(p.clone()); - return None; - } - return Some(p.clone()); - } - } - - // File not held and modified - let holder = latest_file_data.file_holder(vfid); - if (holder.is_none() || &member_id != holder.unwrap()) && modified.contains(p) { - // If allow overwrite modified is true, overwrite the file - if arguments.allow_overwrite_modified { - return Some(p.clone()); - } else { - // If not allowed to overwrite, join skipped tasks - skipped_task.push(p.clone()); - return None; - } - } - - None - }); - result.collect() - }; - - // If the sheet cannot be modified, - // the update_task here should be considered invalid and changed to sync rollback - if !can_modify_sheet { - if arguments.allow_overwrite_modified { - sync_task.append(&mut update_task); - update_task.clear(); - } else { - skipped_task.append(&mut update_task); - update_task.clear(); - } - } - - // Package tasks - let tasks: (Vec, Vec, Vec) = - (created_task, update_task, sync_task); - - // Send to remote - { - let mut mut_instance = instance.lock().await; - mut_instance - .write_large_msgpack(tasks.clone(), 1024u16) - .await?; - // Drop mutex here - } - - // Process create tasks - let mut success_create = Vec::::new(); - if can_modify_sheet { - success_create = match proc_create_tasks_local( - &ctx, - instance.clone(), - &member_id, - &sheet_name, - tasks.0, - arguments.print_infos, - ) - .await - { - Ok(r) => match r { - CreateTaskResult::Success(relative_pathes) => relative_pathes, - _ => { - return Ok(TrackFileActionResult::CreateTaskFailed(r)); - } - }, - Err(e) => return Err(e), - }; - } - - // Process update tasks - let mut success_update = Vec::::new(); - if can_modify_sheet { - success_update = match proc_update_tasks_local( - &ctx, - instance.clone(), - &member_id, - &sheet_name, - tasks.1, - arguments.print_infos, - arguments.file_update_info, - ) - .await - { - Ok(r) => match r { - UpdateTaskResult::Success(relative_pathes) => relative_pathes, - _ => { - return Ok(TrackFileActionResult::UpdateTaskFailed(r)); - } - }, - Err(e) => return Err(e), - }; - } - - // Process sync tasks - let success_sync = match proc_sync_tasks_local( - &ctx, - instance.clone(), - &member_id, - &sheet_name, - tasks.2, - arguments.print_infos, - ) - .await - { - Ok(r) => match r { - SyncTaskResult::Success(relative_pathes) => relative_pathes, - }, - Err(e) => return Err(e), - }; - - if success_create.len() + success_update.len() > 0 { - sign_vault_modified(true).await; - } - - return Ok(TrackFileActionResult::Done { - created: success_create, - updated: success_update, - synced: success_sync, - skipped: skipped_task, - }); - } - - if ctx.is_proc_on_remote() { - // Read tasks - let (created_task, update_task, sync_task): (Vec, Vec, Vec) = { - let mut mut_instance = instance.lock().await; - mut_instance.read_large_msgpack(1024u16).await? - }; - - // Process create tasks - let mut success_create = Vec::::new(); - if can_modify_sheet { - success_create = match proc_create_tasks_remote( - &ctx, - instance.clone(), - &member_id, - &sheet_name, - created_task, - ) - .await - { - Ok(r) => match r { - CreateTaskResult::Success(relative_pathes) => relative_pathes, - _ => { - return Ok(TrackFileActionResult::CreateTaskFailed(r)); - } - }, - Err(e) => return Err(e), - }; - } - - // Process update tasks - let mut success_update = Vec::::new(); - if can_modify_sheet { - success_update = match proc_update_tasks_remote( - &ctx, - instance.clone(), - &member_id, - &sheet_name, - update_task, - arguments.file_update_info, - ) - .await - { - Ok(r) => match r { - UpdateTaskResult::Success(relative_pathes) => relative_pathes, - _ => { - return Ok(TrackFileActionResult::UpdateTaskFailed(r)); - } - }, - Err(e) => return Err(e), - }; - } - - // Process sync tasks - let success_sync = match proc_sync_tasks_remote( - &ctx, - instance.clone(), - &member_id, - &sheet_name, - sync_task, - ) - .await - { - Ok(r) => match r { - SyncTaskResult::Success(relative_pathes) => relative_pathes, - }, - Err(e) => return Err(e), - }; - - return Ok(TrackFileActionResult::Done { - created: success_create, - updated: success_update, - synced: success_sync, - skipped: Vec::new(), // The server doesn't know which files were skipped - }); - } - - Err(TcpTargetError::NoResult("No result.".to_string())) -} - -async fn proc_create_tasks_local( - ctx: &ActionContext, - instance: Arc>, - member_id: &MemberId, - sheet_name: &SheetName, - relative_paths: Vec, - print_infos: bool, -) -> Result { - let workspace = try_get_local_workspace(ctx)?; - let local_output = try_get_local_output(ctx)?; - let mut mut_instance = instance.lock().await; - let mut local_sheet = workspace.local_sheet(member_id, sheet_name).await?; - - if print_infos && relative_paths.len() > 0 { - local_println!(local_output, "Creating {} files...", relative_paths.len()); - } - - // Wait for remote detection of whether the sheet exists - let has_sheet = mut_instance.read_msgpack::().await?; - if !has_sheet { - return Ok(CreateTaskResult::SheetNotFound(sheet_name.clone())); - } - - // Wait for remote detection of whether the file exists - let (hasnt_duplicate, duplicate_path) = mut_instance.read_msgpack::<(bool, PathBuf)>().await?; - if !hasnt_duplicate { - return Ok(CreateTaskResult::CreateFileOnExistPath(duplicate_path)); - } - - let mut success_relative_pathes = Vec::new(); - - // Start sending files - for path in relative_paths { - let full_path = workspace.local_path().join(&path); - - // Send file - if mut_instance.write_file(&full_path).await.is_err() { - continue; - } - - // Read virtual file id and version - let (vfid, version, version_desc) = mut_instance - .read_msgpack::<( - VirtualFileId, - VirtualFileVersion, - VirtualFileVersionDescription, - )>() - .await?; - - // Add mapping to local sheet - let hash = sha1_hash::calc_sha1(&full_path, 2048).await.unwrap().hash; - let time = std::fs::metadata(&full_path)?.modified()?; - local_sheet.add_mapping( - &path.clone(), - LocalMappingMetadata::new( - hash, // hash_when_updated - time, // time_when_updated - std::fs::metadata(&full_path)?.len(), // size_when_updated - version_desc, // version_desc_when_updated - version, // version_when_updated - vfid, // mapping_vfid - time, // last_modifiy_check_itme - false, // last_modifiy_check_result - ), - )?; - - // Print success info - if print_infos { - local_println!(local_output, "+ {}", path.display()); - } - - success_relative_pathes.push(path); - } - - // Write local sheet - local_sheet.write().await?; - - Ok(CreateTaskResult::Success(success_relative_pathes)) -} - -async fn proc_create_tasks_remote( - ctx: &ActionContext, - instance: Arc>, - member_id: &MemberId, - sheet_name: &SheetName, - relative_paths: Vec, -) -> Result { - let vault = try_get_vault(ctx)?; - let mut mut_instance = instance.lock().await; - - // Sheet check - let Ok(mut sheet) = vault.sheet(sheet_name).await else { - // Sheet not found - mut_instance.write_msgpack(false).await?; - return Ok(CreateTaskResult::SheetNotFound(sheet_name.to_string())); - }; - mut_instance.write_msgpack(true).await?; - - // Duplicate create precheck - for path in relative_paths.iter() { - if sheet.mapping().contains_key(path) { - // Duplicate file - mut_instance.write_msgpack((false, path)).await?; - return Ok(CreateTaskResult::CreateFileOnExistPath(path.clone())); - } - } - mut_instance.write_msgpack((true, PathBuf::new())).await?; - - let mut success_relative_pathes = Vec::new(); - - // Start receiving files - for path in relative_paths { - // Read file and create virtual file - let Ok(vfid) = vault - .create_virtual_file_from_connection(&mut mut_instance, member_id) - .await - else { - continue; - }; - - // Record virtual file to sheet - let vf_meta = vault.virtual_file(&vfid)?.read_meta().await?; - sheet - .add_mapping(path.clone(), vfid.clone(), vf_meta.version_latest()) - .await?; - - // Tell client the virtual file id and version - mut_instance - .write_msgpack(( - vfid, - vf_meta.version_latest(), - vf_meta - .version_description(vf_meta.version_latest()) - .unwrap(), - )) - .await?; - - success_relative_pathes.push(path); - } - - sheet.persist().await?; - - Ok(CreateTaskResult::Success(success_relative_pathes)) -} - -async fn proc_update_tasks_local( - ctx: &ActionContext, - instance: Arc>, - member_id: &MemberId, - sheet_name: &SheetName, - relative_paths: Vec, - print_infos: bool, - file_update_info: HashMap, -) -> Result { - let workspace = try_get_local_workspace(ctx)?; - let local_output = try_get_local_output(ctx)?; - let mut mut_instance = instance.lock().await; - let mut local_sheet = workspace.local_sheet(member_id, sheet_name).await?; - - let mut success = Vec::new(); - - if print_infos && relative_paths.len() > 0 { - local_println!(local_output, "Updating {} files...", relative_paths.len()); - } - - for path in relative_paths.iter() { - let Ok(mapping) = local_sheet.mapping_data(path) else { - // Is mapping not found, write empty - mut_instance.write_msgpack("".to_string()).await?; - continue; - }; - // Read and send file version - let Ok(_) = mut_instance - .write_msgpack(mapping.version_when_updated()) - .await - else { - continue; - }; - - // Read verify result - let verify_result: bool = mut_instance.read_msgpack().await?; - if !verify_result { - let reason = mut_instance.read_msgpack::().await?; - return Ok(UpdateTaskResult::VerifyFailed { - path: path.clone(), - reason: reason.clone(), - }); - } - - // Calc hash - let hash_result = match sha1_hash::calc_sha1(workspace.local_path().join(path), 2048).await - { - Ok(r) => r, - Err(_) => { - mut_instance.write_msgpack(false).await?; // Not Ready - continue; - } - }; - - // Get next version - let Some((next_version, description)) = file_update_info.get(path) else { - mut_instance.write_msgpack(false).await?; // Not Ready - continue; - }; - - // Write - mut_instance.write_msgpack(true).await?; // Ready - mut_instance.write_file(path).await?; - - // Read upload result - let upload_result: bool = mut_instance.read_msgpack().await?; - if upload_result { - // Success - let mapping_data_mut = local_sheet.mapping_data_mut(path).unwrap(); - let version = mapping_data_mut.version_when_updated().clone(); - mapping_data_mut.set_hash_when_updated(hash_result.hash); - mapping_data_mut.set_version_when_updated(next_version.clone()); - mapping_data_mut.set_version_desc_when_updated(VirtualFileVersionDescription { - creator: member_id.clone(), - description: description.clone(), - }); - mapping_data_mut.set_last_modifiy_check_result(false); // Mark file not modified - - // Write - local_sheet.write().await?; - - // Push path into success vec - success.push(path.clone()); - - // Print success info - if print_infos { - local_println!( - local_output, - "↑ {} ({} -> {})", - path.display(), - version, - next_version - ); - } - } - } - - Ok(UpdateTaskResult::Success(success)) -} - -async fn proc_update_tasks_remote( - ctx: &ActionContext, - instance: Arc>, - member_id: &MemberId, - sheet_name: &SheetName, - relative_paths: Vec, - file_update_info: HashMap, -) -> Result { - let vault = try_get_vault(ctx)?; - let mut mut_instance = instance.lock().await; - - let mut success = Vec::new(); - - for path in relative_paths.iter() { - // Read version - let Ok(version) = mut_instance.read_msgpack::().await else { - continue; - }; - if version.is_empty() { - continue; - } - - // Verify - let Some((next_version, description)) = file_update_info.get(path) else { - mut_instance.write_msgpack(false).await?; - let reason = VerifyFailReason::UpdateButNoDescription; - mut_instance.write_msgpack(reason.clone()).await?; - return Ok(UpdateTaskResult::VerifyFailed { - path: path.clone(), - reason, - }); // Sheet not found - }; - let Ok(mut sheet) = vault.sheet(sheet_name).await else { - mut_instance.write_msgpack(false).await?; - let reason = VerifyFailReason::SheetNotFound(sheet_name.clone()); - mut_instance.write_msgpack(reason.clone()).await?; - return Ok(UpdateTaskResult::VerifyFailed { - path: path.clone(), - reason, - }); // Sheet not found - }; - let Some(mapping_data) = sheet.mapping_mut().get_mut(path) else { - mut_instance.write_msgpack(false).await?; - let reason = VerifyFailReason::MappingNotFound; - mut_instance.write_msgpack(reason.clone()).await?; - return Ok(UpdateTaskResult::VerifyFailed { - path: path.clone(), - reason, - }); // Mapping not found - }; - let Ok(vf) = vault.virtual_file(&mapping_data.id) else { - mut_instance.write_msgpack(false).await?; - let reason = VerifyFailReason::VirtualFileNotFound(mapping_data.id.clone()); - mut_instance.write_msgpack(reason.clone()).await?; - return Ok(UpdateTaskResult::VerifyFailed { - path: path.clone(), - reason, - }); // Virtual file not found - }; - let Ok(vf_metadata) = vf.read_meta().await else { - mut_instance.write_msgpack(false).await?; - let reason = VerifyFailReason::VirtualFileReadFailed(mapping_data.id.clone()); - mut_instance.write_msgpack(reason.clone()).await?; - return Ok(UpdateTaskResult::VerifyFailed { - path: path.clone(), - reason, - }); // Read virtual file metadata failed - }; - if vf_metadata.versions().contains(next_version) { - mut_instance.write_msgpack(false).await?; - let reason = VerifyFailReason::VersionAlreadyExist(version); - mut_instance.write_msgpack(reason.clone()).await?; - return Ok(UpdateTaskResult::VerifyFailed { - path: path.clone(), - reason, - }); // VersionAlreadyExist - } - if vf_metadata.hold_member() != member_id { - mut_instance.write_msgpack(false).await?; - let reason = VerifyFailReason::NotHeld; - mut_instance.write_msgpack(reason.clone()).await?; - return Ok(UpdateTaskResult::VerifyFailed { - path: path.clone(), - reason, - }); // Member not held it - }; - if vf_metadata.version_latest() != version { - mut_instance.write_msgpack(false).await?; - let reason = - VerifyFailReason::VersionDismatch(version.clone(), vf_metadata.version_latest()); - mut_instance.write_msgpack(reason.clone()).await?; - return Ok(UpdateTaskResult::VerifyFailed { - path: path.clone(), - reason, - }); // Version does not match - }; - mut_instance.write_msgpack(true).await?; // Verified - - // Read if local ready - let ready: bool = mut_instance.read_msgpack().await?; - if !ready { - continue; - } - - // Read and update virtual file - match vault - .update_virtual_file_from_connection( - &mut mut_instance, - member_id, - &mapping_data.id, - next_version, - VirtualFileVersionDescription { - creator: member_id.clone(), - description: description.clone(), - }, - ) - .await - { - Ok(_) => { - // Update version to sheet - mapping_data.version = next_version.clone(); - - // Persist - sheet.persist().await?; - - success.push(path.clone()); - mut_instance.write_msgpack(true).await?; // Success - } - Err(e) => { - mut_instance.write_msgpack(false).await?; // Fail - return Err(e.into()); - } - } - } - - Ok(UpdateTaskResult::Success(success)) -} - -type SyncVersionInfo = Option<( - VirtualFileVersion, - VirtualFileVersionDescription, - VirtualFileId, -)>; - -async fn proc_sync_tasks_local( - ctx: &ActionContext, - instance: Arc>, - member_id: &MemberId, - sheet_name: &SheetName, - relative_paths: Vec, - print_infos: bool, -) -> Result { - let workspace = try_get_local_workspace(ctx)?; - let local_output = try_get_local_output(ctx)?; - let mut mut_instance = instance.lock().await; - let mut success: Vec = Vec::new(); - - if print_infos && relative_paths.len() > 0 { - local_println!(local_output, "Syncing {} files...", relative_paths.len()); - } - - for path in relative_paths { - let Some((version, description, vfid)) = - mut_instance.read_msgpack::().await? - else { - continue; - }; - - // Generate a temp path - let temp_path = workspace - .local_path() - .join(CLIENT_FILE_TEMP_FILE.replace(KEY_TEMP_NAME, &VaultUuid::new_v4().to_string())); - - let copy_to = workspace.local_path().join(&path); - - // Read file - match mut_instance.read_file(&temp_path).await { - Ok(_) => { - if !temp_path.exists() { - continue; - } - } - Err(_) => { - continue; - } - } - - // Calc hash - let new_hash = match calc_sha1(&temp_path, 2048).await { - Ok(hash) => hash, - Err(_) => { - continue; - } - }; - - // Calc size - let new_size = match fs::metadata(&temp_path).await.map(|meta| meta.len()) { - Ok(size) => size, - Err(_) => { - continue; - } - }; - - // Write file - if copy_to.exists() { - if let Err(_) = fs::remove_file(©_to).await { - continue; - } - } else { - // Not exist, create directory - if let Some(path) = copy_to.clone().parent() { - fs::create_dir_all(path).await?; - } - } - if let Err(_) = fs::rename(&temp_path, ©_to).await { - continue; - } - - // Modify local sheet - let mut local_sheet = match workspace.local_sheet(member_id, sheet_name).await { - Ok(sheet) => sheet, - Err(_) => { - continue; - } - }; - - // Get or create mapping - let mapping = match local_sheet.mapping_data_mut(&path) { - Ok(m) => m, - Err(_) => { - // First download - let mut data = LocalMappingMetadata::default(); - data.set_mapping_vfid(vfid); - if let Err(_) = local_sheet.add_mapping(&path, data) { - continue; - } - match local_sheet.mapping_data_mut(&path) { - Ok(m) => m, - Err(_) => { - continue; - } - } - } - }; - - let time = SystemTime::now(); - mapping.set_hash_when_updated(new_hash.hash); - mapping.set_last_modifiy_check_result(false); // Mark not modified - mapping.set_version_when_updated(version); - mapping.set_version_desc_when_updated(description); - mapping.set_size_when_updated(new_size); - mapping.set_time_when_updated(time); - mapping.set_last_modifiy_check_time(time); - if let Err(_) = local_sheet.write().await { - continue; - } - - success.push(path.clone()); - - // Print success info - if print_infos { - local_println!(local_output, "↓ {}", path.display()); - } - } - Ok(SyncTaskResult::Success(success)) -} - -async fn proc_sync_tasks_remote( - ctx: &ActionContext, - instance: Arc>, - _member_id: &MemberId, - sheet_name: &SheetName, - relative_paths: Vec, -) -> Result { - let vault = try_get_vault(ctx)?; - let sheet = vault.sheet(sheet_name).await?; - let mut mut_instance = instance.lock().await; - let mut success: Vec = Vec::new(); - - for path in relative_paths { - // Get mapping - let Some(mapping) = sheet.mapping().get(&path) else { - mut_instance.write_msgpack::(None).await?; // (ready) - continue; - }; - // Get virtual file - let Ok(vf) = vault.virtual_file(&mapping.id) else { - mut_instance.write_msgpack::(None).await?; // (ready) - continue; - }; - // Read metadata and get real path - let vf_meta = &vf.read_meta().await?; - let real_path = vault.virtual_file_real_path(&mapping.id, &vf_meta.version_latest()); - let version = vf_meta.version_latest(); - mut_instance - .write_msgpack::(Some(( - version.clone(), - vf_meta.version_description(version).cloned().unwrap_or( - VirtualFileVersionDescription { - creator: MemberId::default(), - description: "".to_string(), - }, - ), - vf.id(), - ))) - .await?; // (ready) - if mut_instance.write_file(real_path).await.is_err() { - continue; - } else { - success.push(path); - } - } - Ok(SyncTaskResult::Success(success)) -} diff --git a/actions/src/remote_actions/content_manage/track_file.rs b/actions/src/remote_actions/content_manage/track_file.rs new file mode 100644 index 0000000..a59ca76 --- /dev/null +++ b/actions/src/remote_actions/content_manage/track_file.rs @@ -0,0 +1,985 @@ +use std::{ + collections::{HashMap, HashSet}, + path::PathBuf, + sync::Arc, + time::SystemTime, +}; + +use action_system::{action::ActionContext, macros::action_gen}; +use cfg_file::config::ConfigFile; +use serde::{Deserialize, Serialize}; +use sha1_hash::calc_sha1; +use tcp_connection::{error::TcpTargetError, instance::ConnectionInstance}; +use tokio::{fs, sync::Mutex}; +use vcs_data::{ + constants::{CLIENT_FILE_TEMP_FILE, KEY_TEMP_NAME}, + data::{ + local::{ + cached_sheet::CachedSheet, latest_file_data::LatestFileData, + local_sheet::LocalMappingMetadata, modified_status::sign_vault_modified, + workspace_analyzer::AnalyzeResult, + }, + member::MemberId, + sheet::SheetName, + vault::{ + vault_config::VaultUuid, + virtual_file::{VirtualFileId, VirtualFileVersion, VirtualFileVersionDescription}, + }, + }, +}; + +use crate::{ + local_println, + remote_actions::{ + auth_member, check_connection_instance, get_current_sheet_name, try_get_local_output, + try_get_local_workspace, try_get_vault, + }, +}; + +pub type NextVersion = String; +pub type UpdateDescription = String; + +#[derive(Serialize, Deserialize)] +pub struct TrackFileActionArguments { + // Path need to track + pub relative_pathes: HashSet, + + // File update info + pub file_update_info: HashMap, + + // Print infos + pub print_infos: bool, + + // overwrite modified files + pub allow_overwrite_modified: bool, +} + +#[derive(Serialize, Deserialize)] +pub enum TrackFileActionResult { + Done { + created: Vec, + updated: Vec, + synced: Vec, + skipped: Vec, + }, + + // Fail + AuthorizeFailed(String), + + /// There are local move or missing items that have not been resolved, + /// this situation does not allow track + StructureChangesNotSolved, + + CreateTaskFailed(CreateTaskResult), + UpdateTaskFailed(UpdateTaskResult), + SyncTaskFailed(SyncTaskResult), +} + +#[derive(Serialize, Deserialize)] +pub enum CreateTaskResult { + Success(Vec), // Success(success_relative_pathes) + + /// Create file on existing path in the sheet + CreateFileOnExistPath(PathBuf), + + /// Sheet not found + SheetNotFound(SheetName), +} + +#[derive(Serialize, Deserialize)] +pub enum UpdateTaskResult { + Success(Vec), // Success(success_relative_pathes) + + VerifyFailed { + path: PathBuf, + reason: VerifyFailReason, + }, +} + +#[derive(Serialize, Deserialize, Clone)] +pub enum VerifyFailReason { + SheetNotFound(SheetName), + MappingNotFound, + VirtualFileNotFound(VirtualFileId), + VirtualFileReadFailed(VirtualFileId), + NotHeld, + VersionDismatch(VirtualFileVersion, VirtualFileVersion), // (CurrentVersion, RemoteVersion) + UpdateButNoDescription, // File needs update, but no description exists + VersionAlreadyExist(VirtualFileVersion), // (RemoteVersion) +} + +#[derive(Serialize, Deserialize)] +pub enum SyncTaskResult { + Success(Vec), // Success(success_relative_pathes) +} +#[action_gen] +pub async fn track_file_action( + ctx: ActionContext, + arguments: TrackFileActionArguments, +) -> Result { + let relative_pathes = arguments.relative_pathes; + let instance = check_connection_instance(&ctx)?; + + // Auth Member + let (member_id, is_host_mode) = match auth_member(&ctx, instance).await { + Ok(id) => id, + Err(e) => return Ok(TrackFileActionResult::AuthorizeFailed(e.to_string())), + }; + + // Check sheet + let (sheet_name, is_ref_sheet) = + get_current_sheet_name(&ctx, instance, &member_id, true).await?; + + // Can modify Sheet when not in reference sheet or in Host mode + let can_modify_sheet = !is_ref_sheet || is_host_mode; + + if ctx.is_proc_on_local() { + let workspace = try_get_local_workspace(&ctx)?; + let analyzed = AnalyzeResult::analyze_local_status(&workspace).await?; + let latest_file_data = + LatestFileData::read_from(LatestFileData::data_path(&member_id)?).await?; + + if !analyzed.lost.is_empty() || !analyzed.moved.is_empty() { + return Ok(TrackFileActionResult::StructureChangesNotSolved); + } + + let Some(sheet_in_use) = workspace.config().lock().await.sheet_in_use().clone() else { + return Err(TcpTargetError::NotFound("Sheet not found!".to_string())); + }; + + // Read local sheet and member held + let local_sheet = workspace.local_sheet(&member_id, &sheet_in_use).await?; + let cached_sheet = CachedSheet::cached_sheet_data(&sheet_in_use).await?; + let member_held = LatestFileData::read_from(LatestFileData::data_path(&member_id)?).await?; + + let modified = analyzed + .modified + .intersection(&relative_pathes) + .cloned() + .collect::>(); + + // Filter out created files + let created_task = analyzed + .created + .intersection(&relative_pathes) + .cloned() + .collect::>(); + + // Filter out modified files that need to be updated + let mut update_task: Vec = { + let result = modified.iter().filter_map(|p| { + if let Ok(local_data) = local_sheet.mapping_data(p) { + let id = local_data.mapping_vfid(); + let local_ver = local_data.version_when_updated(); + let Some(latest_ver) = latest_file_data.file_version(id) else { + return None; + }; + if let Some(held_member) = member_held.file_holder(id) { + // Check if holder and version match + if held_member == &member_id && local_ver == latest_ver { + return Some(p.clone()); + } + } + }; + None + }); + result.collect() + }; + + let mut skipped_task: Vec = Vec::new(); + + // Filter out files that do not exist locally or have version inconsistencies and need to be synchronized + let mut sync_task: Vec = { + let other: Vec = relative_pathes + .iter() + .filter(|p| !created_task.contains(p) && !update_task.contains(p)) + .cloned() + .collect(); + + let result = other.iter().filter_map(|p| { + // Not exists and not lost, first download + if !workspace.local_path().join(p).exists() && !analyzed.lost.contains(p) { + return Some(p.clone()); + } + + // In cached sheet + if !cached_sheet.mapping().contains_key(p) { + return None; + } + + // In local sheet + let local_sheet_mapping = local_sheet.mapping_data(p).ok()?; + let vfid = local_sheet_mapping.mapping_vfid(); + + if let Some(latest_version) = &latest_file_data.file_version(vfid) { + // Version does not match + if &local_sheet_mapping.version_when_updated() != latest_version { + let modified = modified.contains(p); + if modified && arguments.allow_overwrite_modified { + return Some(p.clone()); + } else if modified && !arguments.allow_overwrite_modified { + // If not allowed to overwrite, join skipped tasks + skipped_task.push(p.clone()); + return None; + } + return Some(p.clone()); + } + } + + // File not held and modified + let holder = latest_file_data.file_holder(vfid); + if (holder.is_none() || &member_id != holder.unwrap()) && modified.contains(p) { + // If allow overwrite modified is true, overwrite the file + if arguments.allow_overwrite_modified { + return Some(p.clone()); + } else { + // If not allowed to overwrite, join skipped tasks + skipped_task.push(p.clone()); + return None; + } + } + + None + }); + result.collect() + }; + + // If the sheet cannot be modified, + // the update_task here should be considered invalid and changed to sync rollback + if !can_modify_sheet { + if arguments.allow_overwrite_modified { + sync_task.append(&mut update_task); + update_task.clear(); + } else { + skipped_task.append(&mut update_task); + update_task.clear(); + } + } + + // Package tasks + let tasks: (Vec, Vec, Vec) = + (created_task, update_task, sync_task); + + // Send to remote + { + let mut mut_instance = instance.lock().await; + mut_instance + .write_large_msgpack(tasks.clone(), 1024u16) + .await?; + // Drop mutex here + } + + // Process create tasks + let mut success_create = Vec::::new(); + if can_modify_sheet { + success_create = match proc_create_tasks_local( + &ctx, + instance.clone(), + &member_id, + &sheet_name, + tasks.0, + arguments.print_infos, + ) + .await + { + Ok(r) => match r { + CreateTaskResult::Success(relative_pathes) => relative_pathes, + _ => { + return Ok(TrackFileActionResult::CreateTaskFailed(r)); + } + }, + Err(e) => return Err(e), + }; + } + + // Process update tasks + let mut success_update = Vec::::new(); + if can_modify_sheet { + success_update = match proc_update_tasks_local( + &ctx, + instance.clone(), + &member_id, + &sheet_name, + tasks.1, + arguments.print_infos, + arguments.file_update_info, + ) + .await + { + Ok(r) => match r { + UpdateTaskResult::Success(relative_pathes) => relative_pathes, + _ => { + return Ok(TrackFileActionResult::UpdateTaskFailed(r)); + } + }, + Err(e) => return Err(e), + }; + } + + // Process sync tasks + let success_sync = match proc_sync_tasks_local( + &ctx, + instance.clone(), + &member_id, + &sheet_name, + tasks.2, + arguments.print_infos, + ) + .await + { + Ok(r) => match r { + SyncTaskResult::Success(relative_pathes) => relative_pathes, + }, + Err(e) => return Err(e), + }; + + if success_create.len() + success_update.len() > 0 { + sign_vault_modified(true).await; + } + + return Ok(TrackFileActionResult::Done { + created: success_create, + updated: success_update, + synced: success_sync, + skipped: skipped_task, + }); + } + + if ctx.is_proc_on_remote() { + // Read tasks + let (created_task, update_task, sync_task): (Vec, Vec, Vec) = { + let mut mut_instance = instance.lock().await; + mut_instance.read_large_msgpack(1024u16).await? + }; + + // Process create tasks + let mut success_create = Vec::::new(); + if can_modify_sheet { + success_create = match proc_create_tasks_remote( + &ctx, + instance.clone(), + &member_id, + &sheet_name, + created_task, + ) + .await + { + Ok(r) => match r { + CreateTaskResult::Success(relative_pathes) => relative_pathes, + _ => { + return Ok(TrackFileActionResult::CreateTaskFailed(r)); + } + }, + Err(e) => return Err(e), + }; + } + + // Process update tasks + let mut success_update = Vec::::new(); + if can_modify_sheet { + success_update = match proc_update_tasks_remote( + &ctx, + instance.clone(), + &member_id, + &sheet_name, + update_task, + arguments.file_update_info, + ) + .await + { + Ok(r) => match r { + UpdateTaskResult::Success(relative_pathes) => relative_pathes, + _ => { + return Ok(TrackFileActionResult::UpdateTaskFailed(r)); + } + }, + Err(e) => return Err(e), + }; + } + + // Process sync tasks + let success_sync = match proc_sync_tasks_remote( + &ctx, + instance.clone(), + &member_id, + &sheet_name, + sync_task, + ) + .await + { + Ok(r) => match r { + SyncTaskResult::Success(relative_pathes) => relative_pathes, + }, + Err(e) => return Err(e), + }; + + return Ok(TrackFileActionResult::Done { + created: success_create, + updated: success_update, + synced: success_sync, + skipped: Vec::new(), // The server doesn't know which files were skipped + }); + } + + Err(TcpTargetError::NoResult("No result.".to_string())) +} + +async fn proc_create_tasks_local( + ctx: &ActionContext, + instance: Arc>, + member_id: &MemberId, + sheet_name: &SheetName, + relative_paths: Vec, + print_infos: bool, +) -> Result { + let workspace = try_get_local_workspace(ctx)?; + let local_output = try_get_local_output(ctx)?; + let mut mut_instance = instance.lock().await; + let mut local_sheet = workspace.local_sheet(member_id, sheet_name).await?; + + if print_infos && relative_paths.len() > 0 { + local_println!(local_output, "Creating {} files...", relative_paths.len()); + } + + // Wait for remote detection of whether the sheet exists + let has_sheet = mut_instance.read_msgpack::().await?; + if !has_sheet { + return Ok(CreateTaskResult::SheetNotFound(sheet_name.clone())); + } + + // Wait for remote detection of whether the file exists + let (hasnt_duplicate, duplicate_path) = mut_instance.read_msgpack::<(bool, PathBuf)>().await?; + if !hasnt_duplicate { + return Ok(CreateTaskResult::CreateFileOnExistPath(duplicate_path)); + } + + let mut success_relative_pathes = Vec::new(); + + // Start sending files + for path in relative_paths { + let full_path = workspace.local_path().join(&path); + + // Send file + if mut_instance.write_file(&full_path).await.is_err() { + continue; + } + + // Read virtual file id and version + let (vfid, version, version_desc) = mut_instance + .read_msgpack::<( + VirtualFileId, + VirtualFileVersion, + VirtualFileVersionDescription, + )>() + .await?; + + // Add mapping to local sheet + let hash = sha1_hash::calc_sha1(&full_path, 2048).await.unwrap().hash; + let time = std::fs::metadata(&full_path)?.modified()?; + local_sheet.add_mapping( + &path.clone(), + LocalMappingMetadata::new( + hash, // hash_when_updated + time, // time_when_updated + std::fs::metadata(&full_path)?.len(), // size_when_updated + version_desc, // version_desc_when_updated + version, // version_when_updated + vfid, // mapping_vfid + time, // last_modifiy_check_itme + false, // last_modifiy_check_result + ), + )?; + + // Print success info + if print_infos { + local_println!(local_output, "+ {}", path.display()); + } + + success_relative_pathes.push(path); + } + + // Write local sheet + local_sheet.write().await?; + + Ok(CreateTaskResult::Success(success_relative_pathes)) +} + +async fn proc_create_tasks_remote( + ctx: &ActionContext, + instance: Arc>, + member_id: &MemberId, + sheet_name: &SheetName, + relative_paths: Vec, +) -> Result { + let vault = try_get_vault(ctx)?; + let mut mut_instance = instance.lock().await; + + // Sheet check + let Ok(mut sheet) = vault.sheet(sheet_name).await else { + // Sheet not found + mut_instance.write_msgpack(false).await?; + return Ok(CreateTaskResult::SheetNotFound(sheet_name.to_string())); + }; + mut_instance.write_msgpack(true).await?; + + // Duplicate create precheck + for path in relative_paths.iter() { + if sheet.mapping().contains_key(path) { + // Duplicate file + mut_instance.write_msgpack((false, path)).await?; + return Ok(CreateTaskResult::CreateFileOnExistPath(path.clone())); + } + } + mut_instance.write_msgpack((true, PathBuf::new())).await?; + + let mut success_relative_pathes = Vec::new(); + + // Start receiving files + for path in relative_paths { + // Read file and create virtual file + let Ok(vfid) = vault + .create_virtual_file_from_connection(&mut mut_instance, member_id) + .await + else { + continue; + }; + + // Record virtual file to sheet + let vf_meta = vault.virtual_file(&vfid)?.read_meta().await?; + sheet + .add_mapping(path.clone(), vfid.clone(), vf_meta.version_latest()) + .await?; + + // Tell client the virtual file id and version + mut_instance + .write_msgpack(( + vfid, + vf_meta.version_latest(), + vf_meta + .version_description(vf_meta.version_latest()) + .unwrap(), + )) + .await?; + + success_relative_pathes.push(path); + } + + sheet.persist().await?; + + Ok(CreateTaskResult::Success(success_relative_pathes)) +} + +async fn proc_update_tasks_local( + ctx: &ActionContext, + instance: Arc>, + member_id: &MemberId, + sheet_name: &SheetName, + relative_paths: Vec, + print_infos: bool, + file_update_info: HashMap, +) -> Result { + let workspace = try_get_local_workspace(ctx)?; + let local_output = try_get_local_output(ctx)?; + let mut mut_instance = instance.lock().await; + let mut local_sheet = workspace.local_sheet(member_id, sheet_name).await?; + + let mut success = Vec::new(); + + if print_infos && relative_paths.len() > 0 { + local_println!(local_output, "Updating {} files...", relative_paths.len()); + } + + for path in relative_paths.iter() { + let Ok(mapping) = local_sheet.mapping_data(path) else { + // Is mapping not found, write empty + mut_instance.write_msgpack("".to_string()).await?; + continue; + }; + // Read and send file version + let Ok(_) = mut_instance + .write_msgpack(mapping.version_when_updated()) + .await + else { + continue; + }; + + // Read verify result + let verify_result: bool = mut_instance.read_msgpack().await?; + if !verify_result { + let reason = mut_instance.read_msgpack::().await?; + return Ok(UpdateTaskResult::VerifyFailed { + path: path.clone(), + reason: reason.clone(), + }); + } + + // Calc hash + let hash_result = match sha1_hash::calc_sha1(workspace.local_path().join(path), 2048).await + { + Ok(r) => r, + Err(_) => { + mut_instance.write_msgpack(false).await?; // Not Ready + continue; + } + }; + + // Get next version + let Some((next_version, description)) = file_update_info.get(path) else { + mut_instance.write_msgpack(false).await?; // Not Ready + continue; + }; + + // Write + mut_instance.write_msgpack(true).await?; // Ready + mut_instance.write_file(path).await?; + + // Read upload result + let upload_result: bool = mut_instance.read_msgpack().await?; + if upload_result { + // Success + let mapping_data_mut = local_sheet.mapping_data_mut(path).unwrap(); + let version = mapping_data_mut.version_when_updated().clone(); + mapping_data_mut.set_hash_when_updated(hash_result.hash); + mapping_data_mut.set_version_when_updated(next_version.clone()); + mapping_data_mut.set_version_desc_when_updated(VirtualFileVersionDescription { + creator: member_id.clone(), + description: description.clone(), + }); + mapping_data_mut.set_last_modifiy_check_result(false); // Mark file not modified + + // Write + local_sheet.write().await?; + + // Push path into success vec + success.push(path.clone()); + + // Print success info + if print_infos { + local_println!( + local_output, + "↑ {} ({} -> {})", + path.display(), + version, + next_version + ); + } + } + } + + Ok(UpdateTaskResult::Success(success)) +} + +async fn proc_update_tasks_remote( + ctx: &ActionContext, + instance: Arc>, + member_id: &MemberId, + sheet_name: &SheetName, + relative_paths: Vec, + file_update_info: HashMap, +) -> Result { + let vault = try_get_vault(ctx)?; + let mut mut_instance = instance.lock().await; + + let mut success = Vec::new(); + + for path in relative_paths.iter() { + // Read version + let Ok(version) = mut_instance.read_msgpack::().await else { + continue; + }; + if version.is_empty() { + continue; + } + + // Verify + let Some((next_version, description)) = file_update_info.get(path) else { + mut_instance.write_msgpack(false).await?; + let reason = VerifyFailReason::UpdateButNoDescription; + mut_instance.write_msgpack(reason.clone()).await?; + return Ok(UpdateTaskResult::VerifyFailed { + path: path.clone(), + reason, + }); // Sheet not found + }; + let Ok(mut sheet) = vault.sheet(sheet_name).await else { + mut_instance.write_msgpack(false).await?; + let reason = VerifyFailReason::SheetNotFound(sheet_name.clone()); + mut_instance.write_msgpack(reason.clone()).await?; + return Ok(UpdateTaskResult::VerifyFailed { + path: path.clone(), + reason, + }); // Sheet not found + }; + let Some(mapping_data) = sheet.mapping_mut().get_mut(path) else { + mut_instance.write_msgpack(false).await?; + let reason = VerifyFailReason::MappingNotFound; + mut_instance.write_msgpack(reason.clone()).await?; + return Ok(UpdateTaskResult::VerifyFailed { + path: path.clone(), + reason, + }); // Mapping not found + }; + let Ok(vf) = vault.virtual_file(&mapping_data.id) else { + mut_instance.write_msgpack(false).await?; + let reason = VerifyFailReason::VirtualFileNotFound(mapping_data.id.clone()); + mut_instance.write_msgpack(reason.clone()).await?; + return Ok(UpdateTaskResult::VerifyFailed { + path: path.clone(), + reason, + }); // Virtual file not found + }; + let Ok(vf_metadata) = vf.read_meta().await else { + mut_instance.write_msgpack(false).await?; + let reason = VerifyFailReason::VirtualFileReadFailed(mapping_data.id.clone()); + mut_instance.write_msgpack(reason.clone()).await?; + return Ok(UpdateTaskResult::VerifyFailed { + path: path.clone(), + reason, + }); // Read virtual file metadata failed + }; + if vf_metadata.versions().contains(next_version) { + mut_instance.write_msgpack(false).await?; + let reason = VerifyFailReason::VersionAlreadyExist(version); + mut_instance.write_msgpack(reason.clone()).await?; + return Ok(UpdateTaskResult::VerifyFailed { + path: path.clone(), + reason, + }); // VersionAlreadyExist + } + if vf_metadata.hold_member() != member_id { + mut_instance.write_msgpack(false).await?; + let reason = VerifyFailReason::NotHeld; + mut_instance.write_msgpack(reason.clone()).await?; + return Ok(UpdateTaskResult::VerifyFailed { + path: path.clone(), + reason, + }); // Member not held it + }; + if vf_metadata.version_latest() != version { + mut_instance.write_msgpack(false).await?; + let reason = + VerifyFailReason::VersionDismatch(version.clone(), vf_metadata.version_latest()); + mut_instance.write_msgpack(reason.clone()).await?; + return Ok(UpdateTaskResult::VerifyFailed { + path: path.clone(), + reason, + }); // Version does not match + }; + mut_instance.write_msgpack(true).await?; // Verified + + // Read if local ready + let ready: bool = mut_instance.read_msgpack().await?; + if !ready { + continue; + } + + // Read and update virtual file + match vault + .update_virtual_file_from_connection( + &mut mut_instance, + member_id, + &mapping_data.id, + next_version, + VirtualFileVersionDescription { + creator: member_id.clone(), + description: description.clone(), + }, + ) + .await + { + Ok(_) => { + // Update version to sheet + mapping_data.version = next_version.clone(); + + // Persist + sheet.persist().await?; + + success.push(path.clone()); + mut_instance.write_msgpack(true).await?; // Success + } + Err(e) => { + mut_instance.write_msgpack(false).await?; // Fail + return Err(e.into()); + } + } + } + + Ok(UpdateTaskResult::Success(success)) +} + +type SyncVersionInfo = Option<( + VirtualFileVersion, + VirtualFileVersionDescription, + VirtualFileId, +)>; + +async fn proc_sync_tasks_local( + ctx: &ActionContext, + instance: Arc>, + member_id: &MemberId, + sheet_name: &SheetName, + relative_paths: Vec, + print_infos: bool, +) -> Result { + let workspace = try_get_local_workspace(ctx)?; + let local_output = try_get_local_output(ctx)?; + let mut mut_instance = instance.lock().await; + let mut success: Vec = Vec::new(); + + if print_infos && relative_paths.len() > 0 { + local_println!(local_output, "Syncing {} files...", relative_paths.len()); + } + + for path in relative_paths { + let Some((version, description, vfid)) = + mut_instance.read_msgpack::().await? + else { + continue; + }; + + // Generate a temp path + let temp_path = workspace + .local_path() + .join(CLIENT_FILE_TEMP_FILE.replace(KEY_TEMP_NAME, &VaultUuid::new_v4().to_string())); + + let copy_to = workspace.local_path().join(&path); + + // Read file + match mut_instance.read_file(&temp_path).await { + Ok(_) => { + if !temp_path.exists() { + continue; + } + } + Err(_) => { + continue; + } + } + + // Calc hash + let new_hash = match calc_sha1(&temp_path, 2048).await { + Ok(hash) => hash, + Err(_) => { + continue; + } + }; + + // Calc size + let new_size = match fs::metadata(&temp_path).await.map(|meta| meta.len()) { + Ok(size) => size, + Err(_) => { + continue; + } + }; + + // Write file + if copy_to.exists() { + if let Err(_) = fs::remove_file(©_to).await { + continue; + } + } else { + // Not exist, create directory + if let Some(path) = copy_to.clone().parent() { + fs::create_dir_all(path).await?; + } + } + if let Err(_) = fs::rename(&temp_path, ©_to).await { + continue; + } + + // Modify local sheet + let mut local_sheet = match workspace.local_sheet(member_id, sheet_name).await { + Ok(sheet) => sheet, + Err(_) => { + continue; + } + }; + + // Get or create mapping + let mapping = match local_sheet.mapping_data_mut(&path) { + Ok(m) => m, + Err(_) => { + // First download + let mut data = LocalMappingMetadata::default(); + data.set_mapping_vfid(vfid); + if let Err(_) = local_sheet.add_mapping(&path, data) { + continue; + } + match local_sheet.mapping_data_mut(&path) { + Ok(m) => m, + Err(_) => { + continue; + } + } + } + }; + + let time = SystemTime::now(); + mapping.set_hash_when_updated(new_hash.hash); + mapping.set_last_modifiy_check_result(false); // Mark not modified + mapping.set_version_when_updated(version); + mapping.set_version_desc_when_updated(description); + mapping.set_size_when_updated(new_size); + mapping.set_time_when_updated(time); + mapping.set_last_modifiy_check_time(time); + if let Err(_) = local_sheet.write().await { + continue; + } + + success.push(path.clone()); + + // Print success info + if print_infos { + local_println!(local_output, "↓ {}", path.display()); + } + } + Ok(SyncTaskResult::Success(success)) +} + +async fn proc_sync_tasks_remote( + ctx: &ActionContext, + instance: Arc>, + _member_id: &MemberId, + sheet_name: &SheetName, + relative_paths: Vec, +) -> Result { + let vault = try_get_vault(ctx)?; + let sheet = vault.sheet(sheet_name).await?; + let mut mut_instance = instance.lock().await; + let mut success: Vec = Vec::new(); + + for path in relative_paths { + // Get mapping + let Some(mapping) = sheet.mapping().get(&path) else { + mut_instance.write_msgpack::(None).await?; // (ready) + continue; + }; + // Get virtual file + let Ok(vf) = vault.virtual_file(&mapping.id) else { + mut_instance.write_msgpack::(None).await?; // (ready) + continue; + }; + // Read metadata and get real path + let vf_meta = &vf.read_meta().await?; + let real_path = vault.virtual_file_real_path(&mapping.id, &vf_meta.version_latest()); + let version = vf_meta.version_latest(); + mut_instance + .write_msgpack::(Some(( + version.clone(), + vf_meta.version_description(version).cloned().unwrap_or( + VirtualFileVersionDescription { + creator: MemberId::default(), + description: "".to_string(), + }, + ), + vf.id(), + ))) + .await?; // (ready) + if mut_instance.write_file(real_path).await.is_err() { + continue; + } else { + success.push(path); + } + } + Ok(SyncTaskResult::Success(success)) +} -- cgit