diff options
| author | 魏曹先生 <1992414357@qq.com> | 2026-01-12 04:28:28 +0800 |
|---|---|---|
| committer | 魏曹先生 <1992414357@qq.com> | 2026-01-12 04:51:34 +0800 |
| commit | c5fb22694e95f12c24b8d8af76999be7aea3fcec (patch) | |
| tree | 399d8a24ce491fb635f3d09f2123290fe784059e /crates/vcs_actions/src/actions/local_actions.rs | |
| parent | 444754489aca0454eb54e15a49fb8a6db0b68a07 (diff) | |
Reorganize crate structure and move documentation files
Diffstat (limited to 'crates/vcs_actions/src/actions/local_actions.rs')
| -rw-r--r-- | crates/vcs_actions/src/actions/local_actions.rs | 525 |
1 files changed, 0 insertions, 525 deletions
diff --git a/crates/vcs_actions/src/actions/local_actions.rs b/crates/vcs_actions/src/actions/local_actions.rs deleted file mode 100644 index 53a1ff8..0000000 --- a/crates/vcs_actions/src/actions/local_actions.rs +++ /dev/null @@ -1,525 +0,0 @@ -use std::{ - collections::{HashMap, HashSet}, - io::ErrorKind, - net::SocketAddr, - path::PathBuf, - time::SystemTime, -}; - -use action_system::{action::ActionContext, macros::action_gen}; -use cfg_file::config::ConfigFile; -use log::info; -use serde::{Deserialize, Serialize}; -use tcp_connection::error::TcpTargetError; -use vcs_data::{ - constants::{ - CLIENT_PATH_CACHED_SHEET, CLIENT_PATH_LOCAL_SHEET, REF_SHEET_NAME, - SERVER_SUFFIX_SHEET_SHARE_FILE, VAULT_HOST_NAME, - }, - data::{ - local::{ - cached_sheet::CachedSheet, - config::LocalConfig, - latest_file_data::LatestFileData, - latest_info::{LatestInfo, SheetInfo}, - vault_modified::sign_vault_modified, - }, - member::MemberId, - sheet::{SheetData, SheetName, SheetPathBuf}, - vault::{ - config::VaultUuid, - sheet_share::{Share, SheetShareId}, - virtual_file::{VirtualFileId, VirtualFileVersion, VirtualFileVersionDescription}, - }, - }, -}; - -use crate::actions::{ - auth_member, check_connection_instance, try_get_local_workspace, try_get_vault, -}; - -#[derive(Serialize, Deserialize)] -pub enum SetUpstreamVaultActionResult { - // Success - DirectedAndStained, - Redirected, - - // Fail - AlreadyStained, - AuthorizeFailed(String), - RedirectFailed(String), - SameUpstream, - - Done, -} - -#[action_gen] -pub async fn set_upstream_vault_action( - ctx: ActionContext, - upstream: SocketAddr, -) -> Result<SetUpstreamVaultActionResult, TcpTargetError> { - let instance = check_connection_instance(&ctx)?; - - // Auth Member - if let Err(e) = auth_member(&ctx, instance).await { - return Ok(SetUpstreamVaultActionResult::AuthorizeFailed(e.to_string())); - } - - // Direct - if ctx.is_proc_on_remote() { - let vault = try_get_vault(&ctx)?; - instance - .lock() - .await - .write(*vault.config().vault_uuid()) - .await?; - return Ok(SetUpstreamVaultActionResult::Done); - } - - if ctx.is_proc_on_local() { - info!("Authorize successful. directing to upstream vault."); - - // Read the vault UUID from the instance - let vault_uuid = instance.lock().await.read::<VaultUuid>().await?; - - let local_workspace = try_get_local_workspace(&ctx)?; - let local_config = local_workspace.config(); - - let mut mut_local_config = local_config.lock().await; - if !mut_local_config.stained() { - // Stain the local workspace - mut_local_config.stain(vault_uuid); - - // Set the upstream address - mut_local_config.set_vault_addr(upstream); - - // Store the updated config - LocalConfig::write(&mut_local_config).await?; - - info!("Workspace stained!"); - return Ok(SetUpstreamVaultActionResult::DirectedAndStained); - } else { - // Local workspace is already stained, redirecting - let Some(stained_uuid) = mut_local_config.stained_uuid() else { - return Ok(SetUpstreamVaultActionResult::RedirectFailed( - "Stained uuid not found".to_string(), - )); - }; - let local_upstream = mut_local_config.upstream_addr(); - - // Address changed, but same UUID. - if vault_uuid == stained_uuid { - if local_upstream != upstream { - // Set the upstream address - mut_local_config.set_vault_addr(upstream); - - // Store the updated config - LocalConfig::write(&mut_local_config).await?; - return Ok(SetUpstreamVaultActionResult::Redirected); - } else { - return Ok(SetUpstreamVaultActionResult::SameUpstream); - } - } - return Ok(SetUpstreamVaultActionResult::AlreadyStained); - } - } - - Err(TcpTargetError::NoResult("No result.".to_string())) -} - -#[derive(Serialize, Deserialize)] -pub enum UpdateToLatestInfoResult { - Success, - - // Fail - AuthorizeFailed(String), - SyncCachedSheetFail(SyncCachedSheetFailReason), -} - -#[derive(Serialize, Deserialize)] -pub enum SyncCachedSheetFailReason { - PathAlreadyExist(PathBuf), -} - -#[action_gen] -pub async fn update_to_latest_info_action( - ctx: ActionContext, - _unused: (), -) -> Result<UpdateToLatestInfoResult, TcpTargetError> { - let instance = check_connection_instance(&ctx)?; - - let (member_id, _is_host_mode) = match auth_member(&ctx, instance).await { - Ok(id) => id, - Err(e) => return Ok(UpdateToLatestInfoResult::AuthorizeFailed(e.to_string())), - }; - - info!("Sending latest info to {}", member_id); - - // Sync Latest Info - { - if ctx.is_proc_on_remote() { - let vault = try_get_vault(&ctx)?; - - // Build latest info - let mut latest_info = LatestInfo::default(); - - // Sheet & Share - let mut shares_in_my_sheets: HashMap<SheetName, HashMap<SheetShareId, Share>> = - HashMap::new(); - let mut member_owned = Vec::new(); - let mut member_visible = Vec::new(); - let mut ref_sheets = HashSet::new(); - - for sheet in vault.sheets().await? { - // Build share parts - if let Some(holder) = sheet.holder() { - if holder == &member_id || holder == VAULT_HOST_NAME { - let mut sheet_shares: HashMap<SheetShareId, Share> = HashMap::new(); - for share in sheet.get_shares().await? { - // Get SharePath - let Some(share_path) = share.path.clone() else { - continue; - }; - // Get ShareId from SharePath - let Some(share_id) = share_path.file_name() else { - continue; - }; - let share_id = share_id.display().to_string(); - let share_id_trimed = - share_id.trim_end_matches(SERVER_SUFFIX_SHEET_SHARE_FILE); - sheet_shares.insert(share_id_trimed.to_string(), share); - } - shares_in_my_sheets.insert(sheet.name().clone(), sheet_shares); - } - } - - // Build sheet parts - let holder_is_host = - sheet.holder().unwrap_or(&String::default()) == &VAULT_HOST_NAME; - if sheet.holder().is_some() - && (sheet.holder().unwrap() == &member_id || holder_is_host) - { - member_owned.push(sheet.name().clone()); - if holder_is_host { - ref_sheets.insert(sheet.name().clone()); - } - } else { - member_visible.push(SheetInfo { - sheet_name: sheet.name().clone(), - holder_name: sheet.holder().cloned(), - }); - } - } - - // Record Share & Sheet - latest_info.visible_sheets = member_owned; - latest_info.invisible_sheets = member_visible; - latest_info.shares_in_my_sheets = shares_in_my_sheets; - - // RefSheet - let ref_sheet_data = vault.sheet(&REF_SHEET_NAME.to_string()).await?.to_data(); - latest_info.ref_sheet_content = ref_sheet_data.clone(); - latest_info.ref_sheet_vfs_mapping = ref_sheet_data - .mapping() - .into_iter() - .map(|(path, file)| (file.id.clone(), path.clone())) - .collect::<HashMap<VirtualFileId, SheetPathBuf>>(); - latest_info.reference_sheets = ref_sheets; - - // Members - let members = vault.members().await?; - latest_info.vault_members = members; - - // Send - instance - .lock() - .await - .write_large_msgpack(latest_info, 512_u16) - .await?; - } - - if ctx.is_proc_on_local() { - let workspace = try_get_local_workspace(&ctx)?; - let mut latest_info = instance - .lock() - .await - .read_large_msgpack::<LatestInfo>(512_u16) - .await?; - latest_info.update_instant = Some(SystemTime::now()); - LatestInfo::write_to( - &latest_info, - LatestInfo::latest_info_path(workspace.local_path(), &member_id), - ) - .await?; - } - } - - info!("Update sheets to {}", member_id); - - // Sync Remote Sheets - { - if ctx.is_proc_on_local() { - let workspace = try_get_local_workspace(&ctx)?; - let Ok(latest_info) = LatestInfo::read_from(LatestInfo::latest_info_path( - workspace.local_path(), - &member_id, - )) - .await - else { - return Err(TcpTargetError::Io("Read latest info failed".to_string())); - }; - - // Collect all local versions - let mut local_versions = vec![]; - for request_sheet in latest_info.visible_sheets { - let Ok(data) = CachedSheet::cached_sheet_data(&request_sheet).await else { - // For newly created sheets, the version is 0. - // Send -1 to distinguish from 0, ensuring the upstream will definitely send the sheet information - local_versions.push((request_sheet, -1)); - continue; - }; - local_versions.push((request_sheet, data.write_count())); - } - - // Send the version list - let len = local_versions.len(); - instance.lock().await.write_msgpack(local_versions).await?; - - if len < 1 { - // Don't return here, continue to next section - // But we need to consume the false marker from the server - if ctx.is_proc_on_local() { - let mut mut_instance = instance.lock().await; - let _: bool = mut_instance.read_msgpack().await?; - } - } else { - // Receive data - if ctx.is_proc_on_local() { - let mut mut_instance = instance.lock().await; - loop { - let in_coming: bool = mut_instance.read_msgpack().await?; - if in_coming { - let (sheet_name, data): (SheetName, SheetData) = - mut_instance.read_large_msgpack(1024u16).await?; - - let Some(path) = CachedSheet::cached_sheet_path(sheet_name) else { - return Err(TcpTargetError::NotFound( - "Workspace not found".to_string(), - )); - }; - - SheetData::write_to(&data, path).await?; - } else { - break; - } - } - } - } - } - if ctx.is_proc_on_remote() { - let vault = try_get_vault(&ctx)?; - let mut mut_instance = instance.lock().await; - - let local_versions = mut_instance.read_msgpack::<Vec<(SheetName, i32)>>().await?; - - for (sheet_name, version) in local_versions.iter() { - let sheet = vault.sheet(sheet_name).await?; - if let Some(holder) = sheet.holder() - && (holder == &member_id || holder == VAULT_HOST_NAME) - && &sheet.write_count() != version - { - mut_instance.write_msgpack(true).await?; - mut_instance - .write_large_msgpack((sheet_name, sheet.to_data()), 1024u16) - .await?; - } - } - mut_instance.write_msgpack(false).await?; - } - } - - info!("Fetch held status to {}", member_id); - - // Sync Held Info - { - if ctx.is_proc_on_local() { - let workspace = try_get_local_workspace(&ctx)?; - - let Ok(latest_info) = LatestInfo::read_from(LatestInfo::latest_info_path( - workspace.local_path(), - &member_id, - )) - .await - else { - return Err(TcpTargetError::Io("Read latest info failed".to_string())); - }; - - // Collect files that need to know the holder - let mut holder_wants_know = Vec::new(); - for sheet_name in &latest_info.visible_sheets { - if let Ok(sheet_data) = CachedSheet::cached_sheet_data(sheet_name).await { - holder_wants_know - .extend(sheet_data.mapping().values().map(|value| value.id.clone())); - } - } - - // Send request - let mut mut_instance = instance.lock().await; - mut_instance - .write_large_msgpack(&holder_wants_know, 1024u16) - .await?; - - // Receive information and write to local - let result: HashMap< - VirtualFileId, - ( - Option<MemberId>, - VirtualFileVersion, - Vec<(VirtualFileVersion, VirtualFileVersionDescription)>, - ), - > = mut_instance.read_large_msgpack(1024u16).await?; - - // Read configuration file - let path = LatestFileData::data_path(&member_id)?; - let mut latest_file_data: LatestFileData = - LatestFileData::read_from(&path).await.unwrap_or_default(); - - // Write the received information - latest_file_data.update_info(result); - - // Write - LatestFileData::write_to(&latest_file_data, &path).await?; - } - - if ctx.is_proc_on_remote() { - let vault = try_get_vault(&ctx)?; - let mut mut_instance = instance.lock().await; - - // Read the request - let holder_wants_know: Vec<VirtualFileId> = - mut_instance.read_large_msgpack(1024u16).await?; - - // Organize the information - let mut result: HashMap< - VirtualFileId, - ( - Option<MemberId>, - VirtualFileVersion, - Vec<(VirtualFileVersion, VirtualFileVersionDescription)>, - ), - > = HashMap::new(); - for id in holder_wants_know { - let Ok(meta) = vault.virtual_file_meta(&id).await else { - continue; - }; - let holder = if meta.hold_member().is_empty() { - None - } else { - Some(meta.hold_member().clone()) - }; - let latest_version = meta.version_latest(); - - let all_versions = meta.versions(); - let all_descriptions = meta.version_descriptions(); - let histories = all_versions - .iter() - .filter_map(|v| { - let Some(desc) = all_descriptions.get(v) else { - return None; - }; - Some((v.clone(), desc.clone())) - }) - .collect::<Vec<(VirtualFileVersion, VirtualFileVersionDescription)>>(); - - result.insert(id, (holder, latest_version, histories)); - } - - // Send information - mut_instance.write_large_msgpack(&result, 1024u16).await?; - } - } - - // Sync cached sheet to local sheet - if ctx.is_proc_on_local() { - let workspace = try_get_local_workspace(&ctx)?; - let cached_sheet_path = workspace.local_path().join(CLIENT_PATH_CACHED_SHEET); - let local_sheet_path = workspace.local_path().join(CLIENT_PATH_LOCAL_SHEET); - if !local_sheet_path.exists() || !cached_sheet_path.exists() { - // No need to sync - if ctx.is_proc_on_local() { - sign_vault_modified(false).await; - } - return Ok(UpdateToLatestInfoResult::Success); - } - - let cached_sheet_paths = - extract_sheet_names_from_paths(CachedSheet::cached_sheet_paths().await?)?; - - // Match cached sheets and local sheets, and sync content - for (cached_sheet_name, _cached_sheet_path) in cached_sheet_paths { - // Read cached sheet and local sheet - let cached_sheet = CachedSheet::cached_sheet_data(&cached_sheet_name).await?; - let Ok(mut local_sheet) = workspace.local_sheet(&member_id, &cached_sheet_name).await - else { - continue; - }; - - // Read cached id mapping - let Some(cached_sheet_id_mapping) = cached_sheet.id_mapping() else { - continue; - }; - - for (cached_item_id, cached_item_path) in cached_sheet_id_mapping.iter() { - let path_by_id = { local_sheet.path_by_id(cached_item_id).cloned() }; - - // Get local path - let Some(local_path) = path_by_id else { - continue; - }; - - if &local_path == cached_item_path { - continue; - } - - // If path not match, try to move - let move_result = local_sheet.move_mapping(&local_path, cached_item_path); - if let Err(e) = move_result { - match e.kind() { - ErrorKind::AlreadyExists => { - return Ok(UpdateToLatestInfoResult::SyncCachedSheetFail( - SyncCachedSheetFailReason::PathAlreadyExist( - cached_item_path.clone(), - ), - )); - } - _ => return Err(e.into()), - } - } - local_sheet.write().await?; - } - } - } - - if ctx.is_proc_on_local() { - sign_vault_modified(false).await; - } - Ok(UpdateToLatestInfoResult::Success) -} - -/// Extract sheet names from file paths -fn extract_sheet_names_from_paths( - paths: Vec<PathBuf>, -) -> Result<HashMap<SheetName, PathBuf>, std::io::Error> { - let mut result = HashMap::new(); - for p in paths { - let sheet_name = p - .file_stem() - .and_then(|s| s.to_str()) - .map(|s| s.to_string()) - .ok_or_else(|| { - std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid file name") - })?; - result.insert(sheet_name, p); - } - Ok(result) -} |
