diff options
Diffstat (limited to 'actions/src/remote_actions/workspace_manage')
| -rw-r--r-- | actions/src/remote_actions/workspace_manage/set_upstream_vault.rs | 101 | ||||
| -rw-r--r-- | actions/src/remote_actions/workspace_manage/update_to_latest_info.rs | 433 |
2 files changed, 534 insertions, 0 deletions
diff --git a/actions/src/remote_actions/workspace_manage/set_upstream_vault.rs b/actions/src/remote_actions/workspace_manage/set_upstream_vault.rs new file mode 100644 index 0000000..d3201b2 --- /dev/null +++ b/actions/src/remote_actions/workspace_manage/set_upstream_vault.rs @@ -0,0 +1,101 @@ +use std::net::SocketAddr; + +use action_system::{action::ActionContext, macros::action_gen}; +use cfg_file::config::ConfigFile; +use log::info; +use serde::{Deserialize, Serialize}; +use tcp_connection::error::TcpTargetError; +use vcs_data::data::{local::config::LocalConfig, vault::config::VaultUuid}; + +use crate::remote_actions::{ + auth_member, check_connection_instance, try_get_local_workspace, try_get_vault, +}; + +#[derive(Serialize, Deserialize)] +pub enum SetUpstreamVaultActionResult { + // Success + DirectedAndStained, + Redirected, + + // Fail + AlreadyStained, + AuthorizeFailed(String), + RedirectFailed(String), + SameUpstream, + + Done, +} + +#[action_gen] +pub async fn set_upstream_vault_action( + ctx: ActionContext, + upstream: SocketAddr, +) -> Result<SetUpstreamVaultActionResult, TcpTargetError> { + let instance = check_connection_instance(&ctx)?; + + // Auth Member + if let Err(e) = auth_member(&ctx, instance).await { + return Ok(SetUpstreamVaultActionResult::AuthorizeFailed(e.to_string())); + } + + // Direct + if ctx.is_proc_on_remote() { + let vault = try_get_vault(&ctx)?; + instance + .lock() + .await + .write(*vault.config().vault_uuid()) + .await?; + return Ok(SetUpstreamVaultActionResult::Done); + } + + if ctx.is_proc_on_local() { + info!("Authorize successful. directing to upstream vault."); + + // Read the vault UUID from the instance + let vault_uuid = instance.lock().await.read::<VaultUuid>().await?; + + let local_workspace = try_get_local_workspace(&ctx)?; + let local_config = local_workspace.config(); + + let mut mut_local_config = local_config.lock().await; + if !mut_local_config.stained() { + // Stain the local workspace + mut_local_config.stain(vault_uuid); + + // Set the upstream address + mut_local_config.set_vault_addr(upstream); + + // Store the updated config + LocalConfig::write(&mut_local_config).await?; + + info!("Workspace stained!"); + return Ok(SetUpstreamVaultActionResult::DirectedAndStained); + } else { + // Local workspace is already stained, redirecting + let Some(stained_uuid) = mut_local_config.stained_uuid() else { + return Ok(SetUpstreamVaultActionResult::RedirectFailed( + "Stained uuid not found".to_string(), + )); + }; + let local_upstream = mut_local_config.upstream_addr(); + + // Address changed, but same UUID. + if vault_uuid == stained_uuid { + if local_upstream != upstream { + // Set the upstream address + mut_local_config.set_vault_addr(upstream); + + // Store the updated config + LocalConfig::write(&mut_local_config).await?; + return Ok(SetUpstreamVaultActionResult::Redirected); + } else { + return Ok(SetUpstreamVaultActionResult::SameUpstream); + } + } + return Ok(SetUpstreamVaultActionResult::AlreadyStained); + } + } + + Err(TcpTargetError::NoResult("No result.".to_string())) +} diff --git a/actions/src/remote_actions/workspace_manage/update_to_latest_info.rs b/actions/src/remote_actions/workspace_manage/update_to_latest_info.rs new file mode 100644 index 0000000..695d1f0 --- /dev/null +++ b/actions/src/remote_actions/workspace_manage/update_to_latest_info.rs @@ -0,0 +1,433 @@ +use std::{ + collections::{HashMap, HashSet}, + io::ErrorKind, + path::PathBuf, + time::SystemTime, +}; + +use action_system::{action::ActionContext, macros::action_gen}; +use cfg_file::config::ConfigFile; +use log::info; +use serde::{Deserialize, Serialize}; +use tcp_connection::error::TcpTargetError; +use vcs_data::{ + constants::{ + CLIENT_PATH_CACHED_SHEET, CLIENT_PATH_LOCAL_SHEET, REF_SHEET_NAME, + SERVER_SUFFIX_SHEET_SHARE_FILE, VAULT_HOST_NAME, + }, + data::{ + local::{ + cached_sheet::CachedSheet, + latest_file_data::LatestFileData, + latest_info::{LatestInfo, SheetInfo}, + vault_modified::sign_vault_modified, + }, + member::MemberId, + sheet::{SheetData, SheetName, SheetPathBuf}, + vault::{ + sheet_share::{Share, SheetShareId}, + virtual_file::{VirtualFileId, VirtualFileVersion, VirtualFileVersionDescription}, + }, + }, +}; + +use crate::remote_actions::{ + auth_member, check_connection_instance, try_get_local_workspace, try_get_vault, +}; + +#[derive(Serialize, Deserialize)] +pub enum UpdateToLatestInfoResult { + Success, + + // Fail + AuthorizeFailed(String), + SyncCachedSheetFail(SyncCachedSheetFailReason), +} + +#[derive(Serialize, Deserialize)] +pub enum SyncCachedSheetFailReason { + PathAlreadyExist(PathBuf), +} + +#[action_gen] +pub async fn update_to_latest_info_action( + ctx: ActionContext, + _unused: (), +) -> Result<UpdateToLatestInfoResult, TcpTargetError> { + let instance = check_connection_instance(&ctx)?; + + let (member_id, _is_host_mode) = match auth_member(&ctx, instance).await { + Ok(id) => id, + Err(e) => return Ok(UpdateToLatestInfoResult::AuthorizeFailed(e.to_string())), + }; + + info!("Sending latest info to {}", member_id); + + // Sync Latest Info + { + if ctx.is_proc_on_remote() { + let vault = try_get_vault(&ctx)?; + + // Build latest info + let mut latest_info = LatestInfo::default(); + + // Sheet & Share + let mut shares_in_my_sheets: HashMap<SheetName, HashMap<SheetShareId, Share>> = + HashMap::new(); + let mut member_owned = Vec::new(); + let mut member_visible = Vec::new(); + let mut ref_sheets = HashSet::new(); + + for sheet in vault.sheets().await? { + // Build share parts + if let Some(holder) = sheet.holder() { + if holder == &member_id || holder == VAULT_HOST_NAME { + let mut sheet_shares: HashMap<SheetShareId, Share> = HashMap::new(); + for share in sheet.get_shares().await? { + // Get SharePath + let Some(share_path) = share.path.clone() else { + continue; + }; + // Get ShareId from SharePath + let Some(share_id) = share_path.file_name() else { + continue; + }; + let share_id = share_id.display().to_string(); + let share_id_trimed = + share_id.trim_end_matches(SERVER_SUFFIX_SHEET_SHARE_FILE); + sheet_shares.insert(share_id_trimed.to_string(), share); + } + shares_in_my_sheets.insert(sheet.name().clone(), sheet_shares); + } + } + + // Build sheet parts + let holder_is_host = + sheet.holder().unwrap_or(&String::default()) == &VAULT_HOST_NAME; + if sheet.holder().is_some() + && (sheet.holder().unwrap() == &member_id || holder_is_host) + { + member_owned.push(sheet.name().clone()); + if holder_is_host { + ref_sheets.insert(sheet.name().clone()); + } + } else { + member_visible.push(SheetInfo { + sheet_name: sheet.name().clone(), + holder_name: sheet.holder().cloned(), + }); + } + } + + // Record Share & Sheet + latest_info.visible_sheets = member_owned; + latest_info.invisible_sheets = member_visible; + latest_info.shares_in_my_sheets = shares_in_my_sheets; + + // RefSheet + let ref_sheet_data = vault.sheet(&REF_SHEET_NAME.to_string()).await?.to_data(); + latest_info.ref_sheet_content = ref_sheet_data.clone(); + latest_info.ref_sheet_vfs_mapping = ref_sheet_data + .mapping() + .into_iter() + .map(|(path, file)| (file.id.clone(), path.clone())) + .collect::<HashMap<VirtualFileId, SheetPathBuf>>(); + latest_info.reference_sheets = ref_sheets; + + // Members + let members = vault.members().await?; + latest_info.vault_members = members; + + // Send + instance + .lock() + .await + .write_large_msgpack(latest_info, 512_u16) + .await?; + } + + if ctx.is_proc_on_local() { + let workspace = try_get_local_workspace(&ctx)?; + let mut latest_info = instance + .lock() + .await + .read_large_msgpack::<LatestInfo>(512_u16) + .await?; + latest_info.update_instant = Some(SystemTime::now()); + LatestInfo::write_to( + &latest_info, + LatestInfo::latest_info_path(workspace.local_path(), &member_id), + ) + .await?; + } + } + + info!("Update sheets to {}", member_id); + + // Sync Remote Sheets + { + if ctx.is_proc_on_local() { + let workspace = try_get_local_workspace(&ctx)?; + let Ok(latest_info) = LatestInfo::read_from(LatestInfo::latest_info_path( + workspace.local_path(), + &member_id, + )) + .await + else { + return Err(TcpTargetError::Io("Read latest info failed".to_string())); + }; + + // Collect all local versions + let mut local_versions = vec![]; + for request_sheet in latest_info.visible_sheets { + let Ok(data) = CachedSheet::cached_sheet_data(&request_sheet).await else { + // For newly created sheets, the version is 0. + // Send -1 to distinguish from 0, ensuring the upstream will definitely send the sheet information + local_versions.push((request_sheet, -1)); + continue; + }; + local_versions.push((request_sheet, data.write_count())); + } + + // Send the version list + let len = local_versions.len(); + instance.lock().await.write_msgpack(local_versions).await?; + + if len < 1 { + // Don't return here, continue to next section + // But we need to consume the false marker from the server + if ctx.is_proc_on_local() { + let mut mut_instance = instance.lock().await; + let _: bool = mut_instance.read_msgpack().await?; + } + } else { + // Receive data + if ctx.is_proc_on_local() { + let mut mut_instance = instance.lock().await; + loop { + let in_coming: bool = mut_instance.read_msgpack().await?; + if in_coming { + let (sheet_name, data): (SheetName, SheetData) = + mut_instance.read_large_msgpack(1024u16).await?; + + let Some(path) = CachedSheet::cached_sheet_path(sheet_name) else { + return Err(TcpTargetError::NotFound( + "Workspace not found".to_string(), + )); + }; + + SheetData::write_to(&data, path).await?; + } else { + break; + } + } + } + } + } + if ctx.is_proc_on_remote() { + let vault = try_get_vault(&ctx)?; + let mut mut_instance = instance.lock().await; + + let local_versions = mut_instance.read_msgpack::<Vec<(SheetName, i32)>>().await?; + + for (sheet_name, version) in local_versions.iter() { + let sheet = vault.sheet(sheet_name).await?; + if let Some(holder) = sheet.holder() + && (holder == &member_id || holder == VAULT_HOST_NAME) + && &sheet.write_count() != version + { + mut_instance.write_msgpack(true).await?; + mut_instance + .write_large_msgpack((sheet_name, sheet.to_data()), 1024u16) + .await?; + } + } + mut_instance.write_msgpack(false).await?; + } + } + + info!("Fetch held status to {}", member_id); + + // Sync Held Info + { + if ctx.is_proc_on_local() { + let workspace = try_get_local_workspace(&ctx)?; + + let Ok(latest_info) = LatestInfo::read_from(LatestInfo::latest_info_path( + workspace.local_path(), + &member_id, + )) + .await + else { + return Err(TcpTargetError::Io("Read latest info failed".to_string())); + }; + + // Collect files that need to know the holder + let mut holder_wants_know = Vec::new(); + for sheet_name in &latest_info.visible_sheets { + if let Ok(sheet_data) = CachedSheet::cached_sheet_data(sheet_name).await { + holder_wants_know + .extend(sheet_data.mapping().values().map(|value| value.id.clone())); + } + } + + // Send request + let mut mut_instance = instance.lock().await; + mut_instance + .write_large_msgpack(&holder_wants_know, 1024u16) + .await?; + + // Receive information and write to local + let result: HashMap< + VirtualFileId, + ( + Option<MemberId>, + VirtualFileVersion, + Vec<(VirtualFileVersion, VirtualFileVersionDescription)>, + ), + > = mut_instance.read_large_msgpack(1024u16).await?; + + // Read configuration file + let path = LatestFileData::data_path(&member_id)?; + let mut latest_file_data: LatestFileData = + LatestFileData::read_from(&path).await.unwrap_or_default(); + + // Write the received information + latest_file_data.update_info(result); + + // Write + LatestFileData::write_to(&latest_file_data, &path).await?; + } + + if ctx.is_proc_on_remote() { + let vault = try_get_vault(&ctx)?; + let mut mut_instance = instance.lock().await; + + // Read the request + let holder_wants_know: Vec<VirtualFileId> = + mut_instance.read_large_msgpack(1024u16).await?; + + // Organize the information + let mut result: HashMap< + VirtualFileId, + ( + Option<MemberId>, + VirtualFileVersion, + Vec<(VirtualFileVersion, VirtualFileVersionDescription)>, + ), + > = HashMap::new(); + for id in holder_wants_know { + let Ok(meta) = vault.virtual_file_meta(&id).await else { + continue; + }; + let holder = if meta.hold_member().is_empty() { + None + } else { + Some(meta.hold_member().clone()) + }; + let latest_version = meta.version_latest(); + + let all_versions = meta.versions(); + let all_descriptions = meta.version_descriptions(); + let histories = all_versions + .iter() + .filter_map(|v| { + let Some(desc) = all_descriptions.get(v) else { + return None; + }; + Some((v.clone(), desc.clone())) + }) + .collect::<Vec<(VirtualFileVersion, VirtualFileVersionDescription)>>(); + + result.insert(id, (holder, latest_version, histories)); + } + + // Send information + mut_instance.write_large_msgpack(&result, 1024u16).await?; + } + } + + // Sync cached sheet to local sheet + if ctx.is_proc_on_local() { + let workspace = try_get_local_workspace(&ctx)?; + let cached_sheet_path = workspace.local_path().join(CLIENT_PATH_CACHED_SHEET); + let local_sheet_path = workspace.local_path().join(CLIENT_PATH_LOCAL_SHEET); + if !local_sheet_path.exists() || !cached_sheet_path.exists() { + // No need to sync + if ctx.is_proc_on_local() { + sign_vault_modified(false).await; + } + return Ok(UpdateToLatestInfoResult::Success); + } + + let cached_sheet_paths = + extract_sheet_names_from_paths(CachedSheet::cached_sheet_paths().await?)?; + + // Match cached sheets and local sheets, and sync content + for (cached_sheet_name, _cached_sheet_path) in cached_sheet_paths { + // Read cached sheet and local sheet + let cached_sheet = CachedSheet::cached_sheet_data(&cached_sheet_name).await?; + let Ok(mut local_sheet) = workspace.local_sheet(&member_id, &cached_sheet_name).await + else { + continue; + }; + + // Read cached id mapping + let Some(cached_sheet_id_mapping) = cached_sheet.id_mapping() else { + continue; + }; + + for (cached_item_id, cached_item_path) in cached_sheet_id_mapping.iter() { + let path_by_id = { local_sheet.path_by_id(cached_item_id).cloned() }; + + // Get local path + let Some(local_path) = path_by_id else { + continue; + }; + + if &local_path == cached_item_path { + continue; + } + + // If path not match, try to move + let move_result = local_sheet.move_mapping(&local_path, cached_item_path); + if let Err(e) = move_result { + match e.kind() { + ErrorKind::AlreadyExists => { + return Ok(UpdateToLatestInfoResult::SyncCachedSheetFail( + SyncCachedSheetFailReason::PathAlreadyExist( + cached_item_path.clone(), + ), + )); + } + _ => return Err(e.into()), + } + } + local_sheet.write().await?; + } + } + } + + if ctx.is_proc_on_local() { + sign_vault_modified(false).await; + } + Ok(UpdateToLatestInfoResult::Success) +} + +/// Extract sheet names from file paths +fn extract_sheet_names_from_paths( + paths: Vec<PathBuf>, +) -> Result<HashMap<SheetName, PathBuf>, std::io::Error> { + let mut result = HashMap::new(); + for p in paths { + let sheet_name = p + .file_stem() + .and_then(|s| s.to_str()) + .map(|s| s.to_string()) + .ok_or_else(|| { + std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid file name") + })?; + result.insert(sheet_name, p); + } + Ok(result) +} |
