diff options
| author | 魏曹先生 <1992414357@qq.com> | 2025-11-20 17:40:14 +0800 |
|---|---|---|
| committer | 魏曹先生 <1992414357@qq.com> | 2025-11-20 17:40:14 +0800 |
| commit | 03335c9816085cde77e0ffbae35e7f85623b7293 (patch) | |
| tree | 7e129a0f2f32c27dcc0e03b6703975dd4d8fc1f8 /crates/vcs_actions | |
| parent | 8b70533434d86f72a9a62d79f0f447a619e25040 (diff) | |
feat: Add file update verification system
Add comprehensive file update verification with detailed failure reasons including version mismatch, file not held, and missing descriptions.
Diffstat (limited to 'crates/vcs_actions')
| -rw-r--r-- | crates/vcs_actions/src/actions/virtual_file_actions.rs | 350 |
1 files changed, 306 insertions, 44 deletions
diff --git a/crates/vcs_actions/src/actions/virtual_file_actions.rs b/crates/vcs_actions/src/actions/virtual_file_actions.rs index 967f573..dff31fb 100644 --- a/crates/vcs_actions/src/actions/virtual_file_actions.rs +++ b/crates/vcs_actions/src/actions/virtual_file_actions.rs @@ -1,14 +1,19 @@ -use std::{collections::HashSet, path::PathBuf, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + path::PathBuf, + sync::Arc, +}; use action_system::{action::ActionContext, macros::action_gen}; use cfg_file::config::ConfigFile; +use log::info; use serde::{Deserialize, Serialize}; use tcp_connection::{error::TcpTargetError, instance::ConnectionInstance}; use tokio::sync::Mutex; use vcs_data::data::{ local::{ - cached_sheet::CachedSheet, file_status::AnalyzeResult, local_sheet::LocalMappingMetadata, - member_held::MemberHeld, + cached_sheet::CachedSheet, file_status::AnalyzeResult, latest_file_data::LatestFileData, + local_sheet::LocalMappingMetadata, }, member::MemberId, sheet::SheetName, @@ -20,13 +25,19 @@ use crate::actions::{ try_get_vault, }; +pub type NextVersion = String; +pub type UpdateDescription = String; + #[derive(Serialize, Deserialize)] pub struct TrackFileActionArguments { // Path need to track pub relative_pathes: HashSet<PathBuf>, - // Display progress bar - pub display_progressbar: bool, + // File update info + pub file_update_info: HashMap<PathBuf, (NextVersion, UpdateDescription)>, + + // Print infos + pub print_infos: bool, } #[derive(Serialize, Deserialize)] @@ -63,6 +74,23 @@ pub enum CreateTaskResult { #[derive(Serialize, Deserialize)] pub enum UpdateTaskResult { Success(Vec<PathBuf>), // Success(success_relative_pathes) + + VerifyFailed { + path: PathBuf, + reason: VerifyFailReason, + }, +} + +#[derive(Serialize, Deserialize, Clone)] +pub enum VerifyFailReason { + SheetNotFound(SheetName), + MappingNotFound, + VirtualFileNotFound(VirtualFileId), + VirtualFileReadFailed(VirtualFileId), + NotHeld, + VersionDismatch(VirtualFileVersion, VirtualFileVersion), // (CurrentVersion, RemoteVersion) + UpdateButNoDescription, // File needs update, but no description exists + VersionAlreadyExist(VirtualFileVersion), // (RemoteVersion) } #[derive(Serialize, Deserialize)] @@ -101,7 +129,7 @@ pub async fn track_file_action( // Read local sheet and member held let local_sheet = workspace.local_sheet(&member_id, &sheet_in_use).await?; let cached_sheet = CachedSheet::cached_sheet_data(&sheet_in_use).await?; - let member_held = MemberHeld::read_from(MemberHeld::held_file_path(&member_id)?).await?; + let member_held = LatestFileData::read_from(LatestFileData::data_path(&member_id)?).await?; let modified = analyzed .modified @@ -182,46 +210,65 @@ pub async fn track_file_action( } // Process create tasks - let success_create = - match proc_create_tasks_local(&ctx, instance.clone(), &member_id, &sheet_name, tasks.0) - .await - { - Ok(r) => match r { - CreateTaskResult::Success(relative_pathes) => relative_pathes, - _ => { - return Ok(TrackFileActionResult::CreateTaskFailed(r)); - } - }, - Err(e) => return Err(e), - }; + let success_create = match proc_create_tasks_local( + &ctx, + instance.clone(), + &member_id, + &sheet_name, + tasks.0, + arguments.print_infos, + ) + .await + { + Ok(r) => match r { + CreateTaskResult::Success(relative_pathes) => relative_pathes, + _ => { + return Ok(TrackFileActionResult::CreateTaskFailed(r)); + } + }, + Err(e) => return Err(e), + }; // Process update tasks - let success_update = - match proc_update_tasks_local(&ctx, instance.clone(), &member_id, &sheet_name, tasks.1) - .await - { - Ok(r) => match r { - UpdateTaskResult::Success(relative_pathes) => relative_pathes, - _ => { - return Ok(TrackFileActionResult::UpdateTaskFailed(r)); - } - }, - Err(e) => return Err(e), - }; + let success_update = match proc_update_tasks_local( + &ctx, + instance.clone(), + &member_id, + &sheet_name, + tasks.1, + arguments.print_infos, + arguments.file_update_info, + ) + .await + { + Ok(r) => match r { + UpdateTaskResult::Success(relative_pathes) => relative_pathes, + _ => { + return Ok(TrackFileActionResult::UpdateTaskFailed(r)); + } + }, + Err(e) => return Err(e), + }; // Process sync tasks - let success_sync = - match proc_sync_tasks_local(&ctx, instance.clone(), &member_id, &sheet_name, tasks.2) - .await - { - Ok(r) => match r { - SyncTaskResult::Success(relative_pathes) => relative_pathes, - _ => { - return Ok(TrackFileActionResult::SyncTaskFailed(r)); - } - }, - Err(e) => return Err(e), - }; + let success_sync = match proc_sync_tasks_local( + &ctx, + instance.clone(), + &member_id, + &sheet_name, + tasks.2, + arguments.print_infos, + ) + .await + { + Ok(r) => match r { + SyncTaskResult::Success(relative_pathes) => relative_pathes, + _ => { + return Ok(TrackFileActionResult::SyncTaskFailed(r)); + } + }, + Err(e) => return Err(e), + }; return Ok(TrackFileActionResult::Done { created: success_create, @@ -263,6 +310,7 @@ pub async fn track_file_action( &member_id, &sheet_name, update_task, + arguments.file_update_info, ) .await { @@ -310,6 +358,7 @@ async fn proc_create_tasks_local( member_id: &MemberId, sheet_name: &SheetName, relative_paths: Vec<PathBuf>, + print_infos: bool, ) -> Result<CreateTaskResult, TcpTargetError> { let workspace = try_get_local_workspace(&ctx)?; let mut mut_instance = instance.lock().await; @@ -364,6 +413,11 @@ async fn proc_create_tasks_local( ), )?; + // Print success info + if print_infos { + println!("+ {}", path.display()); + } + success_relative_pathes.push(path); } @@ -444,8 +498,87 @@ async fn proc_update_tasks_local( member_id: &MemberId, sheet_name: &SheetName, relative_paths: Vec<PathBuf>, + print_infos: bool, + file_update_info: HashMap<PathBuf, (NextVersion, UpdateDescription)>, ) -> Result<UpdateTaskResult, TcpTargetError> { - Ok(UpdateTaskResult::Success(Vec::new())) + let workspace = try_get_local_workspace(&ctx)?; + let mut mut_instance = instance.lock().await; + let mut local_sheet = workspace.local_sheet(member_id, sheet_name).await?; + + let mut success = Vec::new(); + + for path in relative_paths.iter() { + let Ok(mapping) = local_sheet.mapping_data(&path) else { + // Is mapping not found, write empty + mut_instance.write_msgpack("".to_string()).await?; + continue; + }; + // Read and send file version + let Ok(_) = mut_instance + .write_msgpack(mapping.version_when_updated()) + .await + else { + continue; + }; + + // Read verify result + let verify_result: bool = mut_instance.read_msgpack().await?; + if !verify_result { + let reason = mut_instance.read_msgpack::<VerifyFailReason>().await?; + return Ok(UpdateTaskResult::VerifyFailed { + path: path.clone(), + reason: reason.clone(), + }); + } + + // Calc hash + let hash_result = match sha1_hash::calc_sha1(workspace.local_path().join(path), 2048).await + { + Ok(r) => r, + Err(_) => { + mut_instance.write_msgpack(false).await?; // Not Ready + continue; + } + }; + + // Get next version + let Some((next_version, description)) = file_update_info.get(path) else { + mut_instance.write_msgpack(false).await?; // Not Ready + continue; + }; + + // Write + mut_instance.write_msgpack(true).await?; // Ready + mut_instance.write_file(path).await?; + + // Read upload result + let upload_result: bool = mut_instance.read_msgpack().await?; + if upload_result { + // Success + let mapping_data_mut = local_sheet.mapping_data_mut(path).unwrap(); + let version = mapping_data_mut.version_when_updated().clone(); + mapping_data_mut.set_hash_when_updated(hash_result.hash); + mapping_data_mut.set_version_when_updated(next_version.clone()); + mapping_data_mut.set_version_desc_when_updated(VirtualFileVersionDescription { + creator: member_id.clone(), + description: description.clone(), + }); + mapping_data_mut.set_last_modifiy_check_result(false); // Mark file not modified + + // Write + local_sheet.write().await?; + + // Push path into success vec + success.push(path.clone()); + + // Print success info + if print_infos { + println!("* {} ({} -> {})", path.display(), version, next_version); + } + } + } + + Ok(UpdateTaskResult::Success(success)) } async fn proc_update_tasks_remote( @@ -454,8 +587,136 @@ async fn proc_update_tasks_remote( member_id: &MemberId, sheet_name: &SheetName, relative_paths: Vec<PathBuf>, + file_update_info: HashMap<PathBuf, (NextVersion, UpdateDescription)>, ) -> Result<UpdateTaskResult, TcpTargetError> { - Ok(UpdateTaskResult::Success(Vec::new())) + let vault = try_get_vault(ctx)?; + let mut mut_instance = instance.lock().await; + + let mut success = Vec::new(); + + for path in relative_paths.iter() { + // Read version + let Ok(version) = mut_instance.read_msgpack::<VirtualFileVersion>().await else { + continue; + }; + if version.is_empty() { + continue; + } + + // Verify + let Some((next_version, description)) = file_update_info.get(path) else { + mut_instance.write_msgpack(false).await?; + let reason = VerifyFailReason::UpdateButNoDescription; + mut_instance.write_msgpack(reason.clone()).await?; + return Ok(UpdateTaskResult::VerifyFailed { + path: path.clone(), + reason: reason, + }); // Sheet not found + }; + let Ok(mut sheet) = vault.sheet(sheet_name).await else { + mut_instance.write_msgpack(false).await?; + let reason = VerifyFailReason::SheetNotFound(sheet_name.clone()); + mut_instance.write_msgpack(reason.clone()).await?; + return Ok(UpdateTaskResult::VerifyFailed { + path: path.clone(), + reason: reason, + }); // Sheet not found + }; + let Some(mapping_data) = sheet.mapping_mut().get_mut(path) else { + mut_instance.write_msgpack(false).await?; + let reason = VerifyFailReason::MappingNotFound; + mut_instance.write_msgpack(reason.clone()).await?; + return Ok(UpdateTaskResult::VerifyFailed { + path: path.clone(), + reason: reason, + }); // Mapping not found + }; + let Ok(vf) = vault.virtual_file(&mapping_data.id) else { + mut_instance.write_msgpack(false).await?; + let reason = VerifyFailReason::VirtualFileNotFound(mapping_data.id.clone()); + mut_instance.write_msgpack(reason.clone()).await?; + return Ok(UpdateTaskResult::VerifyFailed { + path: path.clone(), + reason: reason, + }); // Virtual file not found + }; + let Ok(vf_metadata) = vf.read_meta().await else { + mut_instance.write_msgpack(false).await?; + let reason = VerifyFailReason::VirtualFileReadFailed(mapping_data.id.clone()); + mut_instance.write_msgpack(reason.clone()).await?; + return Ok(UpdateTaskResult::VerifyFailed { + path: path.clone(), + reason: reason, + }); // Read virtual file metadata failed + }; + if vf_metadata.versions().contains(next_version) { + mut_instance.write_msgpack(false).await?; + let reason = VerifyFailReason::VersionAlreadyExist(version); + mut_instance.write_msgpack(reason.clone()).await?; + return Ok(UpdateTaskResult::VerifyFailed { + path: path.clone(), + reason: reason, + }); // VersionAlreadyExist + } + if vf_metadata.hold_member() != member_id { + mut_instance.write_msgpack(false).await?; + let reason = VerifyFailReason::NotHeld; + mut_instance.write_msgpack(reason.clone()).await?; + return Ok(UpdateTaskResult::VerifyFailed { + path: path.clone(), + reason: reason, + }); // Member not held it + }; + if mapping_data.version != version { + mut_instance.write_msgpack(false).await?; + let reason = + VerifyFailReason::VersionDismatch(version.clone(), mapping_data.version.clone()); + mut_instance.write_msgpack(reason.clone()).await?; + return Ok(UpdateTaskResult::VerifyFailed { + path: path.clone(), + reason: reason, + }); // Version does not match + }; + mut_instance.write_msgpack(true).await?; // Verified + + // Read if local ready + let ready: bool = mut_instance.read_msgpack().await?; + if !ready { + continue; + } + + // Read and update virtual file + match vault + .update_virtual_file_from_connection( + &mut mut_instance, + member_id, + &mapping_data.id, + &next_version, + VirtualFileVersionDescription { + creator: member_id.clone(), + description: description.clone(), + }, + ) + .await + { + Ok(_) => { + // Update version to sheet + mapping_data.version = next_version.clone(); + + // Persist + sheet.persist().await?; + + success.push(path.clone()); + mut_instance.write_msgpack(true).await?; // Success + } + Err(e) => { + mut_instance.write_msgpack(false).await?; // Fail + return Err(e.into()); + } + } + } + + Ok(UpdateTaskResult::Success(success)) } async fn proc_sync_tasks_local( @@ -464,6 +725,7 @@ async fn proc_sync_tasks_local( member_id: &MemberId, sheet_name: &SheetName, relative_paths: Vec<PathBuf>, + print_infos: bool, ) -> Result<SyncTaskResult, TcpTargetError> { Ok(SyncTaskResult::Success(Vec::new())) } |
