summaryrefslogtreecommitdiff
path: root/actions/src/remote_actions/workspace_manage
diff options
context:
space:
mode:
author魏曹先生 <1992414357@qq.com>2026-01-12 05:12:10 +0800
committer魏曹先生 <1992414357@qq.com>2026-01-12 05:36:40 +0800
commite3f4b8188515eeec149eec0405b51e1b57f6e7bb (patch)
treedf80e7289621c39c9d32befbe9bf85a2e129eefb /actions/src/remote_actions/workspace_manage
parentdd8cbe7e863b3fec594696c15c5e009f172749f5 (diff)
Rename and reorganize remote actions modules
Diffstat (limited to 'actions/src/remote_actions/workspace_manage')
-rw-r--r--actions/src/remote_actions/workspace_manage/set_upstream_vault.rs101
-rw-r--r--actions/src/remote_actions/workspace_manage/update_to_latest_info.rs433
2 files changed, 534 insertions, 0 deletions
diff --git a/actions/src/remote_actions/workspace_manage/set_upstream_vault.rs b/actions/src/remote_actions/workspace_manage/set_upstream_vault.rs
new file mode 100644
index 0000000..d3201b2
--- /dev/null
+++ b/actions/src/remote_actions/workspace_manage/set_upstream_vault.rs
@@ -0,0 +1,101 @@
+use std::net::SocketAddr;
+
+use action_system::{action::ActionContext, macros::action_gen};
+use cfg_file::config::ConfigFile;
+use log::info;
+use serde::{Deserialize, Serialize};
+use tcp_connection::error::TcpTargetError;
+use vcs_data::data::{local::config::LocalConfig, vault::config::VaultUuid};
+
+use crate::remote_actions::{
+ auth_member, check_connection_instance, try_get_local_workspace, try_get_vault,
+};
+
+#[derive(Serialize, Deserialize)]
+pub enum SetUpstreamVaultActionResult {
+ // Success
+ DirectedAndStained,
+ Redirected,
+
+ // Fail
+ AlreadyStained,
+ AuthorizeFailed(String),
+ RedirectFailed(String),
+ SameUpstream,
+
+ Done,
+}
+
+#[action_gen]
+pub async fn set_upstream_vault_action(
+ ctx: ActionContext,
+ upstream: SocketAddr,
+) -> Result<SetUpstreamVaultActionResult, TcpTargetError> {
+ let instance = check_connection_instance(&ctx)?;
+
+ // Auth Member
+ if let Err(e) = auth_member(&ctx, instance).await {
+ return Ok(SetUpstreamVaultActionResult::AuthorizeFailed(e.to_string()));
+ }
+
+ // Direct
+ if ctx.is_proc_on_remote() {
+ let vault = try_get_vault(&ctx)?;
+ instance
+ .lock()
+ .await
+ .write(*vault.config().vault_uuid())
+ .await?;
+ return Ok(SetUpstreamVaultActionResult::Done);
+ }
+
+ if ctx.is_proc_on_local() {
+ info!("Authorize successful. directing to upstream vault.");
+
+ // Read the vault UUID from the instance
+ let vault_uuid = instance.lock().await.read::<VaultUuid>().await?;
+
+ let local_workspace = try_get_local_workspace(&ctx)?;
+ let local_config = local_workspace.config();
+
+ let mut mut_local_config = local_config.lock().await;
+ if !mut_local_config.stained() {
+ // Stain the local workspace
+ mut_local_config.stain(vault_uuid);
+
+ // Set the upstream address
+ mut_local_config.set_vault_addr(upstream);
+
+ // Store the updated config
+ LocalConfig::write(&mut_local_config).await?;
+
+ info!("Workspace stained!");
+ return Ok(SetUpstreamVaultActionResult::DirectedAndStained);
+ } else {
+ // Local workspace is already stained, redirecting
+ let Some(stained_uuid) = mut_local_config.stained_uuid() else {
+ return Ok(SetUpstreamVaultActionResult::RedirectFailed(
+ "Stained uuid not found".to_string(),
+ ));
+ };
+ let local_upstream = mut_local_config.upstream_addr();
+
+ // Address changed, but same UUID.
+ if vault_uuid == stained_uuid {
+ if local_upstream != upstream {
+ // Set the upstream address
+ mut_local_config.set_vault_addr(upstream);
+
+ // Store the updated config
+ LocalConfig::write(&mut_local_config).await?;
+ return Ok(SetUpstreamVaultActionResult::Redirected);
+ } else {
+ return Ok(SetUpstreamVaultActionResult::SameUpstream);
+ }
+ }
+ return Ok(SetUpstreamVaultActionResult::AlreadyStained);
+ }
+ }
+
+ Err(TcpTargetError::NoResult("No result.".to_string()))
+}
diff --git a/actions/src/remote_actions/workspace_manage/update_to_latest_info.rs b/actions/src/remote_actions/workspace_manage/update_to_latest_info.rs
new file mode 100644
index 0000000..695d1f0
--- /dev/null
+++ b/actions/src/remote_actions/workspace_manage/update_to_latest_info.rs
@@ -0,0 +1,433 @@
+use std::{
+ collections::{HashMap, HashSet},
+ io::ErrorKind,
+ path::PathBuf,
+ time::SystemTime,
+};
+
+use action_system::{action::ActionContext, macros::action_gen};
+use cfg_file::config::ConfigFile;
+use log::info;
+use serde::{Deserialize, Serialize};
+use tcp_connection::error::TcpTargetError;
+use vcs_data::{
+ constants::{
+ CLIENT_PATH_CACHED_SHEET, CLIENT_PATH_LOCAL_SHEET, REF_SHEET_NAME,
+ SERVER_SUFFIX_SHEET_SHARE_FILE, VAULT_HOST_NAME,
+ },
+ data::{
+ local::{
+ cached_sheet::CachedSheet,
+ latest_file_data::LatestFileData,
+ latest_info::{LatestInfo, SheetInfo},
+ vault_modified::sign_vault_modified,
+ },
+ member::MemberId,
+ sheet::{SheetData, SheetName, SheetPathBuf},
+ vault::{
+ sheet_share::{Share, SheetShareId},
+ virtual_file::{VirtualFileId, VirtualFileVersion, VirtualFileVersionDescription},
+ },
+ },
+};
+
+use crate::remote_actions::{
+ auth_member, check_connection_instance, try_get_local_workspace, try_get_vault,
+};
+
+#[derive(Serialize, Deserialize)]
+pub enum UpdateToLatestInfoResult {
+ Success,
+
+ // Fail
+ AuthorizeFailed(String),
+ SyncCachedSheetFail(SyncCachedSheetFailReason),
+}
+
+#[derive(Serialize, Deserialize)]
+pub enum SyncCachedSheetFailReason {
+ PathAlreadyExist(PathBuf),
+}
+
+#[action_gen]
+pub async fn update_to_latest_info_action(
+ ctx: ActionContext,
+ _unused: (),
+) -> Result<UpdateToLatestInfoResult, TcpTargetError> {
+ let instance = check_connection_instance(&ctx)?;
+
+ let (member_id, _is_host_mode) = match auth_member(&ctx, instance).await {
+ Ok(id) => id,
+ Err(e) => return Ok(UpdateToLatestInfoResult::AuthorizeFailed(e.to_string())),
+ };
+
+ info!("Sending latest info to {}", member_id);
+
+ // Sync Latest Info
+ {
+ if ctx.is_proc_on_remote() {
+ let vault = try_get_vault(&ctx)?;
+
+ // Build latest info
+ let mut latest_info = LatestInfo::default();
+
+ // Sheet & Share
+ let mut shares_in_my_sheets: HashMap<SheetName, HashMap<SheetShareId, Share>> =
+ HashMap::new();
+ let mut member_owned = Vec::new();
+ let mut member_visible = Vec::new();
+ let mut ref_sheets = HashSet::new();
+
+ for sheet in vault.sheets().await? {
+ // Build share parts
+ if let Some(holder) = sheet.holder() {
+ if holder == &member_id || holder == VAULT_HOST_NAME {
+ let mut sheet_shares: HashMap<SheetShareId, Share> = HashMap::new();
+ for share in sheet.get_shares().await? {
+ // Get SharePath
+ let Some(share_path) = share.path.clone() else {
+ continue;
+ };
+ // Get ShareId from SharePath
+ let Some(share_id) = share_path.file_name() else {
+ continue;
+ };
+ let share_id = share_id.display().to_string();
+ let share_id_trimed =
+ share_id.trim_end_matches(SERVER_SUFFIX_SHEET_SHARE_FILE);
+ sheet_shares.insert(share_id_trimed.to_string(), share);
+ }
+ shares_in_my_sheets.insert(sheet.name().clone(), sheet_shares);
+ }
+ }
+
+ // Build sheet parts
+ let holder_is_host =
+ sheet.holder().unwrap_or(&String::default()) == &VAULT_HOST_NAME;
+ if sheet.holder().is_some()
+ && (sheet.holder().unwrap() == &member_id || holder_is_host)
+ {
+ member_owned.push(sheet.name().clone());
+ if holder_is_host {
+ ref_sheets.insert(sheet.name().clone());
+ }
+ } else {
+ member_visible.push(SheetInfo {
+ sheet_name: sheet.name().clone(),
+ holder_name: sheet.holder().cloned(),
+ });
+ }
+ }
+
+ // Record Share & Sheet
+ latest_info.visible_sheets = member_owned;
+ latest_info.invisible_sheets = member_visible;
+ latest_info.shares_in_my_sheets = shares_in_my_sheets;
+
+ // RefSheet
+ let ref_sheet_data = vault.sheet(&REF_SHEET_NAME.to_string()).await?.to_data();
+ latest_info.ref_sheet_content = ref_sheet_data.clone();
+ latest_info.ref_sheet_vfs_mapping = ref_sheet_data
+ .mapping()
+ .into_iter()
+ .map(|(path, file)| (file.id.clone(), path.clone()))
+ .collect::<HashMap<VirtualFileId, SheetPathBuf>>();
+ latest_info.reference_sheets = ref_sheets;
+
+ // Members
+ let members = vault.members().await?;
+ latest_info.vault_members = members;
+
+ // Send
+ instance
+ .lock()
+ .await
+ .write_large_msgpack(latest_info, 512_u16)
+ .await?;
+ }
+
+ if ctx.is_proc_on_local() {
+ let workspace = try_get_local_workspace(&ctx)?;
+ let mut latest_info = instance
+ .lock()
+ .await
+ .read_large_msgpack::<LatestInfo>(512_u16)
+ .await?;
+ latest_info.update_instant = Some(SystemTime::now());
+ LatestInfo::write_to(
+ &latest_info,
+ LatestInfo::latest_info_path(workspace.local_path(), &member_id),
+ )
+ .await?;
+ }
+ }
+
+ info!("Update sheets to {}", member_id);
+
+ // Sync Remote Sheets
+ {
+ if ctx.is_proc_on_local() {
+ let workspace = try_get_local_workspace(&ctx)?;
+ let Ok(latest_info) = LatestInfo::read_from(LatestInfo::latest_info_path(
+ workspace.local_path(),
+ &member_id,
+ ))
+ .await
+ else {
+ return Err(TcpTargetError::Io("Read latest info failed".to_string()));
+ };
+
+ // Collect all local versions
+ let mut local_versions = vec![];
+ for request_sheet in latest_info.visible_sheets {
+ let Ok(data) = CachedSheet::cached_sheet_data(&request_sheet).await else {
+ // For newly created sheets, the version is 0.
+ // Send -1 to distinguish from 0, ensuring the upstream will definitely send the sheet information
+ local_versions.push((request_sheet, -1));
+ continue;
+ };
+ local_versions.push((request_sheet, data.write_count()));
+ }
+
+ // Send the version list
+ let len = local_versions.len();
+ instance.lock().await.write_msgpack(local_versions).await?;
+
+ if len < 1 {
+ // Don't return here, continue to next section
+ // But we need to consume the false marker from the server
+ if ctx.is_proc_on_local() {
+ let mut mut_instance = instance.lock().await;
+ let _: bool = mut_instance.read_msgpack().await?;
+ }
+ } else {
+ // Receive data
+ if ctx.is_proc_on_local() {
+ let mut mut_instance = instance.lock().await;
+ loop {
+ let in_coming: bool = mut_instance.read_msgpack().await?;
+ if in_coming {
+ let (sheet_name, data): (SheetName, SheetData) =
+ mut_instance.read_large_msgpack(1024u16).await?;
+
+ let Some(path) = CachedSheet::cached_sheet_path(sheet_name) else {
+ return Err(TcpTargetError::NotFound(
+ "Workspace not found".to_string(),
+ ));
+ };
+
+ SheetData::write_to(&data, path).await?;
+ } else {
+ break;
+ }
+ }
+ }
+ }
+ }
+ if ctx.is_proc_on_remote() {
+ let vault = try_get_vault(&ctx)?;
+ let mut mut_instance = instance.lock().await;
+
+ let local_versions = mut_instance.read_msgpack::<Vec<(SheetName, i32)>>().await?;
+
+ for (sheet_name, version) in local_versions.iter() {
+ let sheet = vault.sheet(sheet_name).await?;
+ if let Some(holder) = sheet.holder()
+ && (holder == &member_id || holder == VAULT_HOST_NAME)
+ && &sheet.write_count() != version
+ {
+ mut_instance.write_msgpack(true).await?;
+ mut_instance
+ .write_large_msgpack((sheet_name, sheet.to_data()), 1024u16)
+ .await?;
+ }
+ }
+ mut_instance.write_msgpack(false).await?;
+ }
+ }
+
+ info!("Fetch held status to {}", member_id);
+
+ // Sync Held Info
+ {
+ if ctx.is_proc_on_local() {
+ let workspace = try_get_local_workspace(&ctx)?;
+
+ let Ok(latest_info) = LatestInfo::read_from(LatestInfo::latest_info_path(
+ workspace.local_path(),
+ &member_id,
+ ))
+ .await
+ else {
+ return Err(TcpTargetError::Io("Read latest info failed".to_string()));
+ };
+
+ // Collect files that need to know the holder
+ let mut holder_wants_know = Vec::new();
+ for sheet_name in &latest_info.visible_sheets {
+ if let Ok(sheet_data) = CachedSheet::cached_sheet_data(sheet_name).await {
+ holder_wants_know
+ .extend(sheet_data.mapping().values().map(|value| value.id.clone()));
+ }
+ }
+
+ // Send request
+ let mut mut_instance = instance.lock().await;
+ mut_instance
+ .write_large_msgpack(&holder_wants_know, 1024u16)
+ .await?;
+
+ // Receive information and write to local
+ let result: HashMap<
+ VirtualFileId,
+ (
+ Option<MemberId>,
+ VirtualFileVersion,
+ Vec<(VirtualFileVersion, VirtualFileVersionDescription)>,
+ ),
+ > = mut_instance.read_large_msgpack(1024u16).await?;
+
+ // Read configuration file
+ let path = LatestFileData::data_path(&member_id)?;
+ let mut latest_file_data: LatestFileData =
+ LatestFileData::read_from(&path).await.unwrap_or_default();
+
+ // Write the received information
+ latest_file_data.update_info(result);
+
+ // Write
+ LatestFileData::write_to(&latest_file_data, &path).await?;
+ }
+
+ if ctx.is_proc_on_remote() {
+ let vault = try_get_vault(&ctx)?;
+ let mut mut_instance = instance.lock().await;
+
+ // Read the request
+ let holder_wants_know: Vec<VirtualFileId> =
+ mut_instance.read_large_msgpack(1024u16).await?;
+
+ // Organize the information
+ let mut result: HashMap<
+ VirtualFileId,
+ (
+ Option<MemberId>,
+ VirtualFileVersion,
+ Vec<(VirtualFileVersion, VirtualFileVersionDescription)>,
+ ),
+ > = HashMap::new();
+ for id in holder_wants_know {
+ let Ok(meta) = vault.virtual_file_meta(&id).await else {
+ continue;
+ };
+ let holder = if meta.hold_member().is_empty() {
+ None
+ } else {
+ Some(meta.hold_member().clone())
+ };
+ let latest_version = meta.version_latest();
+
+ let all_versions = meta.versions();
+ let all_descriptions = meta.version_descriptions();
+ let histories = all_versions
+ .iter()
+ .filter_map(|v| {
+ let Some(desc) = all_descriptions.get(v) else {
+ return None;
+ };
+ Some((v.clone(), desc.clone()))
+ })
+ .collect::<Vec<(VirtualFileVersion, VirtualFileVersionDescription)>>();
+
+ result.insert(id, (holder, latest_version, histories));
+ }
+
+ // Send information
+ mut_instance.write_large_msgpack(&result, 1024u16).await?;
+ }
+ }
+
+ // Sync cached sheet to local sheet
+ if ctx.is_proc_on_local() {
+ let workspace = try_get_local_workspace(&ctx)?;
+ let cached_sheet_path = workspace.local_path().join(CLIENT_PATH_CACHED_SHEET);
+ let local_sheet_path = workspace.local_path().join(CLIENT_PATH_LOCAL_SHEET);
+ if !local_sheet_path.exists() || !cached_sheet_path.exists() {
+ // No need to sync
+ if ctx.is_proc_on_local() {
+ sign_vault_modified(false).await;
+ }
+ return Ok(UpdateToLatestInfoResult::Success);
+ }
+
+ let cached_sheet_paths =
+ extract_sheet_names_from_paths(CachedSheet::cached_sheet_paths().await?)?;
+
+ // Match cached sheets and local sheets, and sync content
+ for (cached_sheet_name, _cached_sheet_path) in cached_sheet_paths {
+ // Read cached sheet and local sheet
+ let cached_sheet = CachedSheet::cached_sheet_data(&cached_sheet_name).await?;
+ let Ok(mut local_sheet) = workspace.local_sheet(&member_id, &cached_sheet_name).await
+ else {
+ continue;
+ };
+
+ // Read cached id mapping
+ let Some(cached_sheet_id_mapping) = cached_sheet.id_mapping() else {
+ continue;
+ };
+
+ for (cached_item_id, cached_item_path) in cached_sheet_id_mapping.iter() {
+ let path_by_id = { local_sheet.path_by_id(cached_item_id).cloned() };
+
+ // Get local path
+ let Some(local_path) = path_by_id else {
+ continue;
+ };
+
+ if &local_path == cached_item_path {
+ continue;
+ }
+
+ // If path not match, try to move
+ let move_result = local_sheet.move_mapping(&local_path, cached_item_path);
+ if let Err(e) = move_result {
+ match e.kind() {
+ ErrorKind::AlreadyExists => {
+ return Ok(UpdateToLatestInfoResult::SyncCachedSheetFail(
+ SyncCachedSheetFailReason::PathAlreadyExist(
+ cached_item_path.clone(),
+ ),
+ ));
+ }
+ _ => return Err(e.into()),
+ }
+ }
+ local_sheet.write().await?;
+ }
+ }
+ }
+
+ if ctx.is_proc_on_local() {
+ sign_vault_modified(false).await;
+ }
+ Ok(UpdateToLatestInfoResult::Success)
+}
+
+/// Extract sheet names from file paths
+fn extract_sheet_names_from_paths(
+ paths: Vec<PathBuf>,
+) -> Result<HashMap<SheetName, PathBuf>, std::io::Error> {
+ let mut result = HashMap::new();
+ for p in paths {
+ let sheet_name = p
+ .file_stem()
+ .and_then(|s| s.to_str())
+ .map(|s| s.to_string())
+ .ok_or_else(|| {
+ std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid file name")
+ })?;
+ result.insert(sheet_name, p);
+ }
+ Ok(result)
+}