use std::{ collections::{HashMap, HashSet}, io::ErrorKind, net::SocketAddr, path::PathBuf, time::SystemTime, }; use action_system::{action::ActionContext, macros::action_gen}; use cfg_file::config::ConfigFile; use log::info; use serde::{Deserialize, Serialize}; use tcp_connection::error::TcpTargetError; use vcs_data::{ constants::{ CLIENT_PATH_CACHED_SHEET, CLIENT_PATH_LOCAL_SHEET, REF_SHEET_NAME, VAULT_HOST_NAME, }, data::{ local::{ cached_sheet::CachedSheet, config::LocalConfig, latest_file_data::LatestFileData, latest_info::{LatestInfo, SheetInfo}, vault_modified::sign_vault_modified, }, member::MemberId, sheet::{SheetData, SheetName, SheetPathBuf}, vault::{ config::VaultUuid, sheet_share::{Share, SheetShareId}, virtual_file::{VirtualFileId, VirtualFileVersion}, }, }, }; use crate::actions::{ auth_member, check_connection_instance, try_get_local_workspace, try_get_vault, }; #[derive(Serialize, Deserialize)] pub enum SetUpstreamVaultActionResult { // Success DirectedAndStained, Redirected, // Fail AlreadyStained, AuthorizeFailed(String), RedirectFailed(String), SameUpstream, Done, } #[action_gen] pub async fn set_upstream_vault_action( ctx: ActionContext, upstream: SocketAddr, ) -> Result { let instance = check_connection_instance(&ctx)?; // Auth Member if let Err(e) = auth_member(&ctx, instance).await { return Ok(SetUpstreamVaultActionResult::AuthorizeFailed(e.to_string())); } // Direct if ctx.is_proc_on_remote() { let vault = try_get_vault(&ctx)?; instance .lock() .await .write(*vault.config().vault_uuid()) .await?; return Ok(SetUpstreamVaultActionResult::Done); } if ctx.is_proc_on_local() { info!("Authorize successful. directing to upstream vault."); // Read the vault UUID from the instance let vault_uuid = instance.lock().await.read::().await?; let local_workspace = try_get_local_workspace(&ctx)?; let local_config = local_workspace.config(); let mut mut_local_config = local_config.lock().await; if !mut_local_config.stained() { // Stain the local workspace mut_local_config.stain(vault_uuid); // Set the upstream address mut_local_config.set_vault_addr(upstream); // Store the updated config LocalConfig::write(&mut_local_config).await?; info!("Workspace stained!"); return Ok(SetUpstreamVaultActionResult::DirectedAndStained); } else { // Local workspace is already stained, redirecting let Some(stained_uuid) = mut_local_config.stained_uuid() else { return Ok(SetUpstreamVaultActionResult::RedirectFailed( "Stained uuid not found".to_string(), )); }; let local_upstream = mut_local_config.upstream_addr(); // Address changed, but same UUID. if vault_uuid == stained_uuid { if local_upstream != upstream { // Set the upstream address mut_local_config.set_vault_addr(upstream); // Store the updated config LocalConfig::write(&mut_local_config).await?; return Ok(SetUpstreamVaultActionResult::Redirected); } else { return Ok(SetUpstreamVaultActionResult::SameUpstream); } } return Ok(SetUpstreamVaultActionResult::AlreadyStained); } } Err(TcpTargetError::NoResult("No result.".to_string())) } #[derive(Serialize, Deserialize)] pub enum UpdateToLatestInfoResult { Success, // Fail AuthorizeFailed(String), SyncCachedSheetFail(SyncCachedSheetFailReason), } #[derive(Serialize, Deserialize)] pub enum SyncCachedSheetFailReason { PathAlreadyExist(PathBuf), } #[action_gen] pub async fn update_to_latest_info_action( ctx: ActionContext, _unused: (), ) -> Result { let instance = check_connection_instance(&ctx)?; let (member_id, _is_host_mode) = match auth_member(&ctx, instance).await { Ok(id) => id, Err(e) => return Ok(UpdateToLatestInfoResult::AuthorizeFailed(e.to_string())), }; info!("Sending latest info to {}", member_id); // Sync Latest Info { if ctx.is_proc_on_remote() { let vault = try_get_vault(&ctx)?; // Build latest info let mut latest_info = LatestInfo::default(); // Sheet & Share let mut shares_in_my_sheets: HashMap> = HashMap::new(); let mut member_owned = Vec::new(); let mut member_visible = Vec::new(); let mut ref_sheets = HashSet::new(); for sheet in vault.sheets().await? { // Build share parts if let Some(holder) = sheet.holder() { if holder == &member_id { let mut sheet_shares: HashMap = HashMap::new(); for share in sheet.get_shares().await? { // Get SharePath let Some(share_path) = share.path.clone() else { continue; }; // Get ShareId from SharePath let Some(share_id) = share_path.file_name() else { continue; }; sheet_shares.insert(share_id.display().to_string(), share); } shares_in_my_sheets.insert(sheet.name().clone(), sheet_shares); } } // Build sheet parts let holder_is_host = sheet.holder().unwrap_or(&String::default()) == &VAULT_HOST_NAME; if sheet.holder().is_some() && (sheet.holder().unwrap() == &member_id || holder_is_host) { member_owned.push(sheet.name().clone()); if holder_is_host { ref_sheets.insert(sheet.name().clone()); } } else { member_visible.push(SheetInfo { sheet_name: sheet.name().clone(), holder_name: sheet.holder().cloned(), }); } } // Record Share & Sheet latest_info.visible_sheets = member_owned; latest_info.invisible_sheets = member_visible; latest_info.shares_in_my_sheets = shares_in_my_sheets; // RefSheet let ref_sheet_data = vault.sheet(&REF_SHEET_NAME.to_string()).await?.to_data(); latest_info.ref_sheet_content = ref_sheet_data.clone(); latest_info.ref_sheet_vfs_mapping = ref_sheet_data .mapping() .into_iter() .map(|(path, file)| (file.id.clone(), path.clone())) .collect::>(); latest_info.reference_sheets = ref_sheets; // Members let members = vault.members().await?; latest_info.vault_members = members; // Send instance .lock() .await .write_large_msgpack(latest_info, 512_u16) .await?; } if ctx.is_proc_on_local() { let workspace = try_get_local_workspace(&ctx)?; let mut latest_info = instance .lock() .await .read_large_msgpack::(512_u16) .await?; latest_info.update_instant = Some(SystemTime::now()); LatestInfo::write_to( &latest_info, LatestInfo::latest_info_path(workspace.local_path(), &member_id), ) .await?; } } info!("Update sheets to {}", member_id); // Sync Remote Sheets { if ctx.is_proc_on_local() { let workspace = try_get_local_workspace(&ctx)?; let Ok(latest_info) = LatestInfo::read_from(LatestInfo::latest_info_path( workspace.local_path(), &member_id, )) .await else { return Err(TcpTargetError::Io("Read latest info failed".to_string())); }; // Collect all local versions let mut local_versions = vec![]; for request_sheet in latest_info.visible_sheets { let Ok(data) = CachedSheet::cached_sheet_data(&request_sheet).await else { // For newly created sheets, the version is 0. // Send -1 to distinguish from 0, ensuring the upstream will definitely send the sheet information local_versions.push((request_sheet, -1)); continue; }; local_versions.push((request_sheet, data.write_count())); } // Send the version list let len = local_versions.len(); instance.lock().await.write_msgpack(local_versions).await?; if len < 1 { // Don't return here, continue to next section // But we need to consume the false marker from the server if ctx.is_proc_on_local() { let mut mut_instance = instance.lock().await; let _: bool = mut_instance.read_msgpack().await?; } } else { // Receive data if ctx.is_proc_on_local() { let mut mut_instance = instance.lock().await; loop { let in_coming: bool = mut_instance.read_msgpack().await?; if in_coming { let (sheet_name, data): (SheetName, SheetData) = mut_instance.read_large_msgpack(1024u16).await?; let Some(path) = CachedSheet::cached_sheet_path(sheet_name) else { return Err(TcpTargetError::NotFound( "Workspace not found".to_string(), )); }; SheetData::write_to(&data, path).await?; } else { break; } } } } } if ctx.is_proc_on_remote() { let vault = try_get_vault(&ctx)?; let mut mut_instance = instance.lock().await; let local_versions = mut_instance.read_msgpack::>().await?; for (sheet_name, version) in local_versions.iter() { let sheet = vault.sheet(sheet_name).await?; if let Some(holder) = sheet.holder() && (holder == &member_id || holder == VAULT_HOST_NAME) && &sheet.write_count() != version { mut_instance.write_msgpack(true).await?; mut_instance .write_large_msgpack((sheet_name, sheet.to_data()), 1024u16) .await?; } } mut_instance.write_msgpack(false).await?; } } info!("Fetch held status to {}", member_id); // Sync Held Info { if ctx.is_proc_on_local() { let workspace = try_get_local_workspace(&ctx)?; let Ok(latest_info) = LatestInfo::read_from(LatestInfo::latest_info_path( workspace.local_path(), &member_id, )) .await else { return Err(TcpTargetError::Io("Read latest info failed".to_string())); }; // Collect files that need to know the holder let mut holder_wants_know = Vec::new(); for sheet_name in &latest_info.visible_sheets { if let Ok(sheet_data) = CachedSheet::cached_sheet_data(sheet_name).await { holder_wants_know .extend(sheet_data.mapping().values().map(|value| value.id.clone())); } } // Send request let mut mut_instance = instance.lock().await; mut_instance .write_large_msgpack(&holder_wants_know, 1024u16) .await?; // Receive information and write to local let result: HashMap, VirtualFileVersion)> = mut_instance.read_large_msgpack(1024u16).await?; // Read configuration file let path = LatestFileData::data_path(&member_id)?; let mut latest_file_data: LatestFileData = LatestFileData::read_from(&path).await.unwrap_or_default(); // Write the received information latest_file_data.update_info(result); // Write LatestFileData::write_to(&latest_file_data, &path).await?; } if ctx.is_proc_on_remote() { let vault = try_get_vault(&ctx)?; let mut mut_instance = instance.lock().await; // Read the request let holder_wants_know: Vec = mut_instance.read_large_msgpack(1024u16).await?; // Organize the information let mut result: HashMap, VirtualFileVersion)> = HashMap::new(); for id in holder_wants_know { let Ok(meta) = vault.virtual_file_meta(&id).await else { continue; }; let holder = if meta.hold_member().is_empty() { None } else { Some(meta.hold_member().clone()) }; let version = meta.version_latest(); result.insert(id, (holder, version)); } // Send information mut_instance.write_large_msgpack(&result, 1024u16).await?; } } // Sync cached sheet to local sheet if ctx.is_proc_on_local() { let workspace = try_get_local_workspace(&ctx)?; let cached_sheet_path = workspace.local_path().join(CLIENT_PATH_CACHED_SHEET); let local_sheet_path = workspace.local_path().join(CLIENT_PATH_LOCAL_SHEET); if !local_sheet_path.exists() || !cached_sheet_path.exists() { // No need to sync if ctx.is_proc_on_local() { sign_vault_modified(false).await; } return Ok(UpdateToLatestInfoResult::Success); } let cached_sheet_paths = extract_sheet_names_from_paths(CachedSheet::cached_sheet_paths().await?)?; // Match cached sheets and local sheets, and sync content for (cached_sheet_name, _cached_sheet_path) in cached_sheet_paths { // Read cached sheet and local sheet let cached_sheet = CachedSheet::cached_sheet_data(&cached_sheet_name).await?; let Ok(mut local_sheet) = workspace.local_sheet(&member_id, &cached_sheet_name).await else { continue; }; // Read cached id mapping let Some(cached_sheet_id_mapping) = cached_sheet.id_mapping() else { continue; }; for (cached_item_id, cached_item_path) in cached_sheet_id_mapping.iter() { let path_by_id = { local_sheet.path_by_id(cached_item_id).cloned() }; // Get local path let Some(local_path) = path_by_id else { continue; }; if &local_path == cached_item_path { continue; } // If path not match, try to move let move_result = local_sheet.move_mapping(&local_path, cached_item_path); if let Err(e) = move_result { match e.kind() { ErrorKind::AlreadyExists => { return Ok(UpdateToLatestInfoResult::SyncCachedSheetFail( SyncCachedSheetFailReason::PathAlreadyExist( cached_item_path.clone(), ), )); } _ => return Err(e.into()), } } local_sheet.write().await?; } } } if ctx.is_proc_on_local() { sign_vault_modified(false).await; } Ok(UpdateToLatestInfoResult::Success) } /// Extract sheet names from file paths fn extract_sheet_names_from_paths( paths: Vec, ) -> Result, std::io::Error> { let mut result = HashMap::new(); for p in paths { let sheet_name = p .file_stem() .and_then(|s| s.to_str()) .map(|s| s.to_string()) .ok_or_else(|| { std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid file name") })?; result.insert(sheet_name, p); } Ok(result) }