diff options
| author | 魏曹先生 <1992414357@qq.com> | 2026-02-05 22:35:05 +0800 |
|---|---|---|
| committer | 魏曹先生 <1992414357@qq.com> | 2026-02-05 22:35:05 +0800 |
| commit | 27f6414ad1ff451feb0044af62f37dc2a6255ffa (patch) | |
| tree | cb5693bc014cc8579dcf02a730fd4d2a5dfcf1a5 /legacy_actions | |
| parent | ade2fcb9302a4ab759795820dbde3b2b269490ee (diff) | |
Remove examples and legacy code, update .gitignore
- Delete examples directory and its example action system
- Rename actions/ to legacy_actions/ and data/ to legacy_data/
- Update Cargo.toml license file reference
- Move setup scripts to scripts/dev/ directory
- Add todo.txt patterns to .gitignore
Diffstat (limited to 'legacy_actions')
32 files changed, 3061 insertions, 0 deletions
diff --git a/legacy_actions/Cargo.toml b/legacy_actions/Cargo.toml new file mode 100644 index 0000000..a49e1b0 --- /dev/null +++ b/legacy_actions/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "vcs_actions" +edition = "2024" +version.workspace = true + +[dependencies] + +# Utils +tcp_connection = { path = "../utils/tcp_connection" } +cfg_file = { path = "../utils/cfg_file", features = ["default"] } +sha1_hash = { path = "../utils/sha1_hash" } +string_proc = { path = "../utils/string_proc" } + +# Core dependencies +action_system = { path = "../systems/action" } +vcs_data = { path = "../legacy_data" } + +# Error handling +thiserror = "2.0.17" + +# Serialization +serde = { version = "1.0.228", features = ["derive"] } +serde_json = "1.0.145" + +# Async & Networking +tokio = { version = "1.48.0", features = ["full"] } + +# Logging +log = "0.4.28" diff --git a/legacy_actions/src/connection.rs b/legacy_actions/src/connection.rs new file mode 100644 index 0000000..918f93c --- /dev/null +++ b/legacy_actions/src/connection.rs @@ -0,0 +1,3 @@ +pub mod action_service; +pub mod error; +pub mod protocol; diff --git a/legacy_actions/src/connection/action_service.rs b/legacy_actions/src/connection/action_service.rs new file mode 100644 index 0000000..3e05a70 --- /dev/null +++ b/legacy_actions/src/connection/action_service.rs @@ -0,0 +1,221 @@ +use std::{ + env::set_current_dir, + net::SocketAddr, + path::PathBuf, + sync::Arc, + time::{Duration, Instant}, +}; + +use action_system::{action::ActionContext, action_pool::ActionPool}; +use cfg_file::config::ConfigFile; +use log::{debug, error, info, warn}; +use tcp_connection::{error::TcpTargetError, instance::ConnectionInstance}; +use tokio::{ + net::{TcpListener, TcpStream}, + select, signal, spawn, + sync::mpsc, +}; +use vcs_data::data::vault::{Vault, vault_config::VaultConfig}; + +use crate::{ + connection::protocol::RemoteActionInvoke, registry::server_registry::server_action_pool, +}; + +// Start the server with a Vault using the specified directory +pub async fn server_entry( + vault_path: impl Into<PathBuf>, + port_override: u16, +) -> Result<(), TcpTargetError> { + let vault_path = vault_path.into(); + + // Set to vault path + set_current_dir(&vault_path).map_err(|e| TcpTargetError::Io(e.to_string()))?; + + // Read the vault cfg + let vault_cfg = VaultConfig::read().await?; + + // Create TCPListener + let listener = create_tcp_listener(&vault_cfg, port_override).await?; + + // Initialize the vault + let vault: Arc<Vault> = init_vault(vault_cfg, vault_path).await?; + + // Lock the vault + vault + .lock() + .map_err(|e| TcpTargetError::Locked(e.to_string()))?; + + // Create ActionPool + let action_pool: Arc<ActionPool> = Arc::new(server_action_pool()); + + // Start the server + let (_shutdown_rx, future) = build_server_future(vault.clone(), action_pool.clone(), listener); + future.await?; // Start and block until shutdown + + // Unlock the vault + vault.unlock()?; + + Ok(()) +} + +async fn create_tcp_listener( + cfg: &VaultConfig, + port_override: u16, +) -> Result<TcpListener, TcpTargetError> { + let local_bind_addr = cfg.server_config().local_bind(); + let port = if port_override > 0 { + port_override // Override -> PORT > 0 + } else { + cfg.server_config().port() // Default -> Port = 0 + }; + let bind_port = port; + let sock_addr = SocketAddr::new(*local_bind_addr, bind_port); + let listener = TcpListener::bind(sock_addr).await?; + + Ok(listener) +} + +async fn init_vault(cfg: VaultConfig, path: PathBuf) -> Result<Arc<Vault>, TcpTargetError> { + // Init and create the vault + let Some(vault) = Vault::init(cfg, path) else { + return Err(TcpTargetError::NotFound("Vault not found".to_string())); + }; + let vault: Arc<Vault> = Arc::new(vault); + + Ok(vault) +} + +fn build_server_future( + vault: Arc<Vault>, + action_pool: Arc<ActionPool>, + listener: TcpListener, +) -> ( + mpsc::Sender<()>, + impl std::future::Future<Output = Result<(), TcpTargetError>>, +) { + let (tx, mut rx) = mpsc::channel::<i32>(100); + let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1); + let mut active_connections = 0; + let mut shutdown_requested = false; + + // Spawn task to handle Ctrl+C with rapid exit detection + let shutdown_tx_clone = shutdown_tx.clone(); + spawn(async move { + let mut ctrl_c_count = 0; + let mut last_ctrl_c_time = Instant::now(); + + while let Ok(()) = signal::ctrl_c().await { + let now = Instant::now(); + + // Reset counter if more than 5 seconds have passed + if now.duration_since(last_ctrl_c_time) > Duration::from_secs(5) { + ctrl_c_count = 0; + } + + ctrl_c_count += 1; + last_ctrl_c_time = now; + + let _ = shutdown_tx_clone.send(()).await; + + // If 3 Ctrl+C within 5 seconds, exit immediately + if ctrl_c_count >= 3 { + info!("Shutdown. (3/3)"); + std::process::exit(0); + } else { + info!("Ctrl + C to force shutdown. ({} / 3)", ctrl_c_count); + } + } + }); + + let future = async move { + loop { + select! { + // Accept new connections + accept_result = listener.accept(), if !shutdown_requested => { + match accept_result { + Ok((stream, _addr)) => { + debug!("New connection. (now {})", active_connections); + let _ = tx.send(1).await; + + let vault_clone = vault.clone(); + let action_pool_clone = action_pool.clone(); + let tx_clone = tx.clone(); + + spawn(async move { + process_connection(stream, vault_clone, action_pool_clone).await; + debug!("A connection closed. (now {})", active_connections); + let _ = tx_clone.send(-1).await; + }); + } + Err(_) => { + continue; + } + } + } + + // Handle connection count updates + Some(count_change) = rx.recv() => { + active_connections = (active_connections as i32 + count_change) as usize; + + // Check if we should shutdown after all connections are done + if shutdown_requested && active_connections == 0 { + break; + } + } + + // Handle shutdown signal + _ = shutdown_rx.recv() => { + shutdown_requested = true; + // If no active connections, break immediately + if active_connections == 0 { + info!("No active connections. Shutting down."); + break; + } else { + warn!("Cannot shutdown while active connections exist! ({} active)", active_connections); + } + } + } + } + + Ok(()) + }; + + (shutdown_tx, future) +} + +async fn process_connection(stream: TcpStream, vault: Arc<Vault>, action_pool: Arc<ActionPool>) { + // Setup connection instance + let mut instance = ConnectionInstance::from(stream); + + // Read action name and action arguments + let msg = match instance.read_msgpack::<RemoteActionInvoke>().await { + Ok(msg) => msg, + Err(e) => { + error!("Failed to read action message: {}", e); + return; + } + }; + + // Build context + let ctx: ActionContext = ActionContext::remote().insert_instance(instance); + + // Insert vault into context + let ctx = ctx.with_arc_data(vault); + + info!( + "Process action `{}` with argument `{}`", + msg.action_name, msg.action_args_json + ); + + // Process action + let result = action_pool + .process_json(&msg.action_name, ctx, msg.action_args_json) + .await; + + match result { + Ok(_result_json) => {} + Err(e) => { + warn!("Failed to process action `{}`: {}", msg.action_name, e); + } + } +} diff --git a/legacy_actions/src/connection/error.rs b/legacy_actions/src/connection/error.rs new file mode 100644 index 0000000..1a4e221 --- /dev/null +++ b/legacy_actions/src/connection/error.rs @@ -0,0 +1,27 @@ +use std::io; +use thiserror::Error; +use vcs_data::data::member::MemberId; + +#[derive(Error, Debug, Clone)] +pub enum ConnectionError { + #[error("I/O error: {0}")] + Io(String), +} + +#[derive(Error, Debug, Clone)] +pub enum ProcessActionError { + #[error("Action `{0}` not registered")] + ActionNotRegistered(String), + + #[error("Authorize `{0}` failed")] + AuthorizeFailed(MemberId), + + #[error("Authorize host `{0}` failed")] + AuthorizeHostFailed(MemberId), +} + +impl From<io::Error> for ConnectionError { + fn from(error: io::Error) -> Self { + ConnectionError::Io(error.to_string()) + } +} diff --git a/legacy_actions/src/connection/protocol.rs b/legacy_actions/src/connection/protocol.rs new file mode 100644 index 0000000..2cebe79 --- /dev/null +++ b/legacy_actions/src/connection/protocol.rs @@ -0,0 +1,7 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Default, Clone, Serialize, Deserialize)] +pub struct RemoteActionInvoke { + pub action_name: String, + pub action_args_json: String, +} diff --git a/legacy_actions/src/lib.rs b/legacy_actions/src/lib.rs new file mode 100644 index 0000000..c1dda86 --- /dev/null +++ b/legacy_actions/src/lib.rs @@ -0,0 +1,4 @@ +pub mod connection; +pub mod local_actions; +pub mod registry; +pub mod remote_actions; diff --git a/legacy_actions/src/local_actions.rs b/legacy_actions/src/local_actions.rs new file mode 100644 index 0000000..d6f47f9 --- /dev/null +++ b/legacy_actions/src/local_actions.rs @@ -0,0 +1,2 @@ +pub mod account_manage; +pub mod current_sheet; diff --git a/legacy_actions/src/local_actions/account_manage.rs b/legacy_actions/src/local_actions/account_manage.rs new file mode 100644 index 0000000..03a7851 --- /dev/null +++ b/legacy_actions/src/local_actions/account_manage.rs @@ -0,0 +1,3 @@ +pub mod register_account; +pub mod remove_account; +pub mod switch_account; diff --git a/legacy_actions/src/local_actions/account_manage/register_account.rs b/legacy_actions/src/local_actions/account_manage/register_account.rs new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/legacy_actions/src/local_actions/account_manage/register_account.rs diff --git a/legacy_actions/src/local_actions/account_manage/remove_account.rs b/legacy_actions/src/local_actions/account_manage/remove_account.rs new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/legacy_actions/src/local_actions/account_manage/remove_account.rs diff --git a/legacy_actions/src/local_actions/account_manage/switch_account.rs b/legacy_actions/src/local_actions/account_manage/switch_account.rs new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/legacy_actions/src/local_actions/account_manage/switch_account.rs diff --git a/legacy_actions/src/local_actions/current_sheet.rs b/legacy_actions/src/local_actions/current_sheet.rs new file mode 100644 index 0000000..785d7ee --- /dev/null +++ b/legacy_actions/src/local_actions/current_sheet.rs @@ -0,0 +1,2 @@ +pub mod exit_sheet; +pub mod use_sheet; diff --git a/legacy_actions/src/local_actions/current_sheet/exit_sheet.rs b/legacy_actions/src/local_actions/current_sheet/exit_sheet.rs new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/legacy_actions/src/local_actions/current_sheet/exit_sheet.rs diff --git a/legacy_actions/src/local_actions/current_sheet/use_sheet.rs b/legacy_actions/src/local_actions/current_sheet/use_sheet.rs new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/legacy_actions/src/local_actions/current_sheet/use_sheet.rs diff --git a/legacy_actions/src/registry.rs b/legacy_actions/src/registry.rs new file mode 100644 index 0000000..ceec1a1 --- /dev/null +++ b/legacy_actions/src/registry.rs @@ -0,0 +1,2 @@ +pub mod client_registry; +pub mod server_registry; diff --git a/legacy_actions/src/registry/client_registry.rs b/legacy_actions/src/registry/client_registry.rs new file mode 100644 index 0000000..db69889 --- /dev/null +++ b/legacy_actions/src/registry/client_registry.rs @@ -0,0 +1,128 @@ +use std::sync::Arc; + +use action_system::{action::ActionContext, action_pool::ActionPool}; +use cfg_file::config::ConfigFile; +use tcp_connection::error::TcpTargetError; +use vcs_data::data::{ + local::{LocalWorkspace, workspace_config::LocalConfig}, + user::UserDirectory, +}; + +use crate::{ + connection::protocol::RemoteActionInvoke, + remote_actions::{ + content_manage::track_file::register_track_file_action, + edit_right_manage::change_virtual_file_edit_right::register_change_virtual_file_edit_right_action, + mapping_manage::{ + edit_mapping::register_edit_mapping_action, + merge_share_mapping::register_merge_share_mapping_action, + share_mapping::register_share_mapping_action, + }, + sheet_manage::{ + drop_sheet::register_drop_sheet_action, make_sheet::register_make_sheet_action, + }, + workspace_manage::{ + set_upstream_vault::register_set_upstream_vault_action, + update_to_latest_info::register_update_to_latest_info_action, + }, + }, +}; + +fn register_actions(pool: &mut ActionPool) { + // Pool register here + + // Local Actions + register_set_upstream_vault_action(pool); + register_update_to_latest_info_action(pool); + + // Sheet Actions + register_make_sheet_action(pool); + register_drop_sheet_action(pool); + register_edit_mapping_action(pool); + + // Share / Merge Share Actions + register_share_mapping_action(pool); + register_merge_share_mapping_action(pool); + + // Track Action + register_track_file_action(pool); + + // User Actions + register_change_virtual_file_edit_right_action(pool); +} + +pub fn client_action_pool() -> ActionPool { + // Create pool + let mut pool = ActionPool::new(); + + // Register actions + register_actions(&mut pool); + + // Add process events + pool.set_on_proc_begin(|ctx, args| Box::pin(on_proc_begin(ctx, args))); + + // Return + pool +} + +async fn on_proc_begin( + ctx: &mut ActionContext, + _args: &(dyn std::any::Any + Send + Sync), +) -> Result<(), TcpTargetError> { + // Is ctx remote + let is_remote = ctx.is_remote_action(); + + // Action name and arguments + let action_name = ctx.action_name().to_string(); + let action_args_json = ctx.action_args_json().clone(); + + // Insert LocalWorkspace Arc + let Ok(local_config) = LocalConfig::read().await else { + return Err(TcpTargetError::NotFound( + "The current directory does not have a local workspace".to_string(), + )); + }; + let local_workspace = match LocalWorkspace::init_current_dir(local_config) { + Some(workspace) => workspace, + None => { + return Err(TcpTargetError::NotFound( + "Failed to initialize local workspace.".to_string(), + )); + } + }; + let local_workspace_arc = Arc::new(local_workspace); + ctx.insert_arc_data(local_workspace_arc); + + // Insert UserDirectory Arc + let Some(user_directory) = UserDirectory::current_cfg_dir() else { + return Err(TcpTargetError::NotFound( + "The user directory does not exist.".to_string(), + )); + }; + + let user_directory_arc = Arc::new(user_directory); + ctx.insert_arc_data(user_directory_arc); + + // Get instance + let Some(instance) = ctx.instance() else { + return Err(TcpTargetError::Unsupported( + "Missing ConnectionInstance in current context, this ActionPool does not support this call" + .to_string())); + }; + + // If it's remote, invoke action at server + if is_remote { + // Build protocol message + let msg = RemoteActionInvoke { + action_name, + action_args_json, + }; + + // Send + let mut instance = instance.lock().await; + instance.write_msgpack(&msg).await?; + } + + // Return OK, wait for client to execute Action locally + Ok(()) +} diff --git a/legacy_actions/src/registry/server_registry.rs b/legacy_actions/src/registry/server_registry.rs new file mode 100644 index 0000000..aee867c --- /dev/null +++ b/legacy_actions/src/registry/server_registry.rs @@ -0,0 +1,43 @@ +use action_system::action_pool::ActionPool; + +use crate::remote_actions::{ + content_manage::track_file::register_track_file_action, + edit_right_manage::change_virtual_file_edit_right::register_change_virtual_file_edit_right_action, + mapping_manage::{ + edit_mapping::register_edit_mapping_action, + merge_share_mapping::register_merge_share_mapping_action, + share_mapping::register_share_mapping_action, + }, + sheet_manage::{ + drop_sheet::register_drop_sheet_action, make_sheet::register_make_sheet_action, + }, + workspace_manage::{ + set_upstream_vault::register_set_upstream_vault_action, + update_to_latest_info::register_update_to_latest_info_action, + }, +}; + +pub fn server_action_pool() -> ActionPool { + let mut pool = ActionPool::new(); + + // Local Actions + register_set_upstream_vault_action(&mut pool); + register_update_to_latest_info_action(&mut pool); + + // Sheet Actions + register_make_sheet_action(&mut pool); + register_drop_sheet_action(&mut pool); + register_edit_mapping_action(&mut pool); + + // Share / Merge Share Actions + register_share_mapping_action(&mut pool); + register_merge_share_mapping_action(&mut pool); + + // Track Action + register_track_file_action(&mut pool); + + // User Actions + register_change_virtual_file_edit_right_action(&mut pool); + + pool +} diff --git a/legacy_actions/src/remote_actions.rs b/legacy_actions/src/remote_actions.rs new file mode 100644 index 0000000..d15edc9 --- /dev/null +++ b/legacy_actions/src/remote_actions.rs @@ -0,0 +1,288 @@ +use std::sync::Arc; + +use action_system::action::ActionContext; +use cfg_file::config::ConfigFile; +use tcp_connection::{error::TcpTargetError, instance::ConnectionInstance}; +use tokio::sync::{Mutex, mpsc::Sender}; +use vcs_data::{ + constants::{SERVER_PATH_MEMBER_PUB, VAULT_HOST_NAME}, + data::{ + local::{LocalWorkspace, latest_info::LatestInfo, workspace_config::LocalConfig}, + member::MemberId, + sheet::SheetName, + user::UserDirectory, + vault::Vault, + }, +}; + +pub mod content_manage; +pub mod edit_right_manage; +pub mod mapping_manage; +pub mod sheet_manage; +pub mod workspace_manage; + +/// Check if the connection instance is valid in the given context. +/// This function is used to verify the connection instance in actions that require remote calls. +pub fn check_connection_instance( + ctx: &ActionContext, +) -> Result<&Arc<Mutex<ConnectionInstance>>, TcpTargetError> { + let Some(instance) = ctx.instance() else { + return Err(TcpTargetError::NotFound( + "Connection instance lost.".to_string(), + )); + }; + Ok(instance) +} + +/// Try to get the Vault instance from the context. +pub fn try_get_vault(ctx: &ActionContext) -> Result<Arc<Vault>, TcpTargetError> { + let Some(vault) = ctx.get_arc::<Vault>() else { + return Err(TcpTargetError::NotFound( + "Vault instance not found".to_string(), + )); + }; + Ok(vault) +} + +/// Try to get the LocalWorkspace instance from the context. +pub fn try_get_local_workspace(ctx: &ActionContext) -> Result<Arc<LocalWorkspace>, TcpTargetError> { + let Some(local_workspace) = ctx.get_arc::<LocalWorkspace>() else { + return Err(TcpTargetError::NotFound( + "LocalWorkspace instance not found".to_string(), + )); + }; + Ok(local_workspace) +} + +/// Try to get the UserDirectory instance from the context. +pub fn try_get_user_directory(ctx: &ActionContext) -> Result<Arc<UserDirectory>, TcpTargetError> { + let Some(user_directory) = ctx.get_arc::<UserDirectory>() else { + return Err(TcpTargetError::NotFound( + "UserDirectory instance not found".to_string(), + )); + }; + Ok(user_directory) +} + +/// Try to get the LocalWorkspace instance from the context. +pub fn try_get_local_output(ctx: &ActionContext) -> Result<Arc<Sender<String>>, TcpTargetError> { + let Some(output) = ctx.get_arc::<Sender<String>>() else { + return Err(TcpTargetError::NotFound( + "Client sender not found".to_string(), + )); + }; + Ok(output) +} + +/// Authenticate member based on context and return MemberId +pub async fn auth_member( + ctx: &ActionContext, + instance: &Arc<Mutex<ConnectionInstance>>, +) -> Result<(MemberId, bool), TcpTargetError> { + // Window开服Linux连接 -> 此函数内产生 early eof + // ~ WS # jv update + // 身份认证失败:I/O error: early eof! + + // 分析相应流程: + // 1. 服务端发起挑战,客户端接受 + // 2. 服务端发送结果,客户端接受 + // 3. 推测此时发生 early eof ---> 无 ack,导致客户端尝试拿到结果时,服务端已经结束 + // 这很有可能是 Windows 和 Linux 对于连接处理的方案差异导致的问题,需要进一步排查 + + // Start Challenge (Remote) + if ctx.is_proc_on_remote() { + let mut mut_instance = instance.lock().await; + let vault = try_get_vault(ctx)?; + + let using_host_mode = mut_instance.read_msgpack::<bool>().await?; + + let result = mut_instance + .challenge(vault.vault_path().join(SERVER_PATH_MEMBER_PUB)) + .await; + + return match result { + Ok((pass, member_id)) => { + if !pass { + // Send false to inform the client that authentication failed + mut_instance.write(false).await?; + Err(TcpTargetError::Authentication( + "Authenticate failed.".to_string(), + )) + } else { + if using_host_mode { + if vault.config().vault_host_list().contains(&member_id) { + // Using Host mode authentication, and is indeed an administrator + mut_instance.write(true).await?; + Ok((member_id, true)) + } else { + // Using Host mode authentication, but not an administrator + mut_instance.write(false).await?; + Err(TcpTargetError::Authentication( + "Authenticate failed.".to_string(), + )) + } + } else { + // Not using Host mode authentication + mut_instance.write(true).await?; + Ok((member_id, false)) + } + } + } + Err(e) => Err(e), + }; + } + + // Accept Challenge (Local) + if ctx.is_proc_on_local() { + let mut mut_instance = instance.lock().await; + let local_workspace = try_get_local_workspace(ctx)?; + let (is_host_mode, member_name) = { + let cfg = local_workspace.config().lock_owned().await; + (cfg.is_host_mode(), cfg.current_account()) + }; + let user_directory = try_get_user_directory(ctx)?; + + // Inform remote whether to authenticate in Host mode + mut_instance.write_msgpack(is_host_mode).await?; + + // Member name & Private key + let private_key = user_directory.account_private_key_path(&member_name); + let _ = mut_instance + .accept_challenge(private_key, &member_name) + .await?; + + // Read result + let challenge_result = mut_instance.read::<bool>().await?; + if challenge_result { + return Ok((member_name.clone(), is_host_mode)); + } else { + return Err(TcpTargetError::Authentication( + "Authenticate failed.".to_string(), + )); + } + } + + Err(TcpTargetError::NoResult("Auth failed.".to_string())) +} + +/// Get the current sheet name based on the context (local or remote). +/// This function handles the communication between local and remote instances +/// to verify and retrieve the current sheet name and whether it's a reference sheet. +/// +/// On local: +/// - Reads the current sheet from local configuration +/// - Sends the sheet name to remote for verification +/// - Returns the sheet name and whether it's a reference sheet if remote confirms it exists +/// +/// On remote: +/// - Receives sheet name from local +/// - Verifies the sheet exists in the vault +/// - Checks if the sheet is a reference sheet +/// - If allow_ref is true, reference sheets are allowed to pass verification +/// - Sends confirmation and reference status back to local +/// +/// Returns a tuple of (SheetName, bool) where the bool indicates if it's a reference sheet, +/// or an error if the sheet doesn't exist or doesn't meet the verification criteria. +pub async fn get_current_sheet_name( + ctx: &ActionContext, + instance: &Arc<Mutex<ConnectionInstance>>, + member_id: &MemberId, + allow_ref: bool, +) -> Result<(SheetName, bool), TcpTargetError> { + let mut mut_instance = instance.lock().await; + if ctx.is_proc_on_local() { + let workspace = try_get_local_workspace(ctx)?; + let config = LocalConfig::read().await?; + let latest = LatestInfo::read_from(LatestInfo::latest_info_path( + workspace.local_path(), + member_id, + )) + .await?; + if let Some(sheet_name) = config.sheet_in_use() { + // Send sheet name + mut_instance.write_msgpack(sheet_name).await?; + + // Read result + if mut_instance.read_msgpack::<bool>().await? { + // Check if sheet is a reference sheet + let is_ref_sheet = latest.reference_sheets.contains(sheet_name); + if allow_ref { + // Allow reference sheets, directly return the determination result + return Ok((sheet_name.clone(), is_ref_sheet)); + } else if is_ref_sheet { + // Not allowed but it's a reference sheet, return an error + return Err(TcpTargetError::ReferenceSheetNotAllowed( + "Reference sheet not allowed".to_string(), + )); + } else { + // Not allowed but not a reference sheet, return normally + return Ok((sheet_name.clone(), false)); + } + } else { + return Err(TcpTargetError::NotFound("Sheet not found".to_string())); + } + } + // Send empty sheet_name + mut_instance.write_msgpack("".to_string()).await?; + + // Read result, since we know it's impossible to pass here, we just consume this result + let _ = mut_instance.read_msgpack::<bool>().await?; + + return Err(TcpTargetError::NotFound("Sheet not found".to_string())); + } + if ctx.is_proc_on_remote() { + let vault = try_get_vault(ctx)?; + + // Read sheet name + let sheet_name: SheetName = mut_instance.read_msgpack().await?; + + // Check if sheet exists + if let Ok(sheet) = vault.sheet(&sheet_name).await + && let Some(holder) = sheet.holder() + { + let is_ref_sheet = holder == VAULT_HOST_NAME; + if allow_ref { + // Allow reference sheets, directly return the determination result + if holder == member_id || holder == VAULT_HOST_NAME { + mut_instance.write_msgpack(true).await?; + return Ok((sheet.name().clone(), is_ref_sheet)); + } + } else if is_ref_sheet { + // Not allowed but it's a reference sheet, return an error + mut_instance.write_msgpack(true).await?; + return Err(TcpTargetError::ReferenceSheetNotAllowed( + "Reference sheet not allowed".to_string(), + )); + } else { + // Not allowed but not a reference sheet, return normally + if holder == member_id { + mut_instance.write_msgpack(true).await?; + return Ok((sheet_name.clone(), false)); + } + } + } + // Tell local the check is not passed + mut_instance.write_msgpack(false).await?; + return Err(TcpTargetError::NotFound("Sheet not found".to_string())); + } + Err(TcpTargetError::NoResult("NoResult".to_string())) +} + +/// The macro to write and return a result. +#[macro_export] +macro_rules! write_and_return { + ($instance:expr, $result:expr) => {{ + $instance.lock().await.write($result).await?; + return Ok($result); + }}; +} + +/// The macro to send formatted string to output channel. +/// Usage: local_println!(output, "format string", arg1, arg2, ...) +#[macro_export] +macro_rules! local_println { + ($output:expr, $($arg:tt)*) => {{ + let formatted = format!($($arg)*); + let _ = $output.send(formatted).await; + }}; +} diff --git a/legacy_actions/src/remote_actions/content_manage.rs b/legacy_actions/src/remote_actions/content_manage.rs new file mode 100644 index 0000000..1568085 --- /dev/null +++ b/legacy_actions/src/remote_actions/content_manage.rs @@ -0,0 +1 @@ +pub mod track_file; diff --git a/legacy_actions/src/remote_actions/content_manage/track_file.rs b/legacy_actions/src/remote_actions/content_manage/track_file.rs new file mode 100644 index 0000000..a59ca76 --- /dev/null +++ b/legacy_actions/src/remote_actions/content_manage/track_file.rs @@ -0,0 +1,985 @@ +use std::{ + collections::{HashMap, HashSet}, + path::PathBuf, + sync::Arc, + time::SystemTime, +}; + +use action_system::{action::ActionContext, macros::action_gen}; +use cfg_file::config::ConfigFile; +use serde::{Deserialize, Serialize}; +use sha1_hash::calc_sha1; +use tcp_connection::{error::TcpTargetError, instance::ConnectionInstance}; +use tokio::{fs, sync::Mutex}; +use vcs_data::{ + constants::{CLIENT_FILE_TEMP_FILE, KEY_TEMP_NAME}, + data::{ + local::{ + cached_sheet::CachedSheet, latest_file_data::LatestFileData, + local_sheet::LocalMappingMetadata, modified_status::sign_vault_modified, + workspace_analyzer::AnalyzeResult, + }, + member::MemberId, + sheet::SheetName, + vault::{ + vault_config::VaultUuid, + virtual_file::{VirtualFileId, VirtualFileVersion, VirtualFileVersionDescription}, + }, + }, +}; + +use crate::{ + local_println, + remote_actions::{ + auth_member, check_connection_instance, get_current_sheet_name, try_get_local_output, + try_get_local_workspace, try_get_vault, + }, +}; + +pub type NextVersion = String; +pub type UpdateDescription = String; + +#[derive(Serialize, Deserialize)] +pub struct TrackFileActionArguments { + // Path need to track + pub relative_pathes: HashSet<PathBuf>, + + // File update info + pub file_update_info: HashMap<PathBuf, (NextVersion, UpdateDescription)>, + + // Print infos + pub print_infos: bool, + + // overwrite modified files + pub allow_overwrite_modified: bool, +} + +#[derive(Serialize, Deserialize)] +pub enum TrackFileActionResult { + Done { + created: Vec<PathBuf>, + updated: Vec<PathBuf>, + synced: Vec<PathBuf>, + skipped: Vec<PathBuf>, + }, + + // Fail + AuthorizeFailed(String), + + /// There are local move or missing items that have not been resolved, + /// this situation does not allow track + StructureChangesNotSolved, + + CreateTaskFailed(CreateTaskResult), + UpdateTaskFailed(UpdateTaskResult), + SyncTaskFailed(SyncTaskResult), +} + +#[derive(Serialize, Deserialize)] +pub enum CreateTaskResult { + Success(Vec<PathBuf>), // Success(success_relative_pathes) + + /// Create file on existing path in the sheet + CreateFileOnExistPath(PathBuf), + + /// Sheet not found + SheetNotFound(SheetName), +} + +#[derive(Serialize, Deserialize)] +pub enum UpdateTaskResult { + Success(Vec<PathBuf>), // Success(success_relative_pathes) + + VerifyFailed { + path: PathBuf, + reason: VerifyFailReason, + }, +} + +#[derive(Serialize, Deserialize, Clone)] +pub enum VerifyFailReason { + SheetNotFound(SheetName), + MappingNotFound, + VirtualFileNotFound(VirtualFileId), + VirtualFileReadFailed(VirtualFileId), + NotHeld, + VersionDismatch(VirtualFileVersion, VirtualFileVersion), // (CurrentVersion, RemoteVersion) + UpdateButNoDescription, // File needs update, but no description exists + VersionAlreadyExist(VirtualFileVersion), // (RemoteVersion) +} + +#[derive(Serialize, Deserialize)] +pub enum SyncTaskResult { + Success(Vec<PathBuf>), // Success(success_relative_pathes) +} +#[action_gen] +pub async fn track_file_action( + ctx: ActionContext, + arguments: TrackFileActionArguments, +) -> Result<TrackFileActionResult, TcpTargetError> { + let relative_pathes = arguments.relative_pathes; + let instance = check_connection_instance(&ctx)?; + + // Auth Member + let (member_id, is_host_mode) = match auth_member(&ctx, instance).await { + Ok(id) => id, + Err(e) => return Ok(TrackFileActionResult::AuthorizeFailed(e.to_string())), + }; + + // Check sheet + let (sheet_name, is_ref_sheet) = + get_current_sheet_name(&ctx, instance, &member_id, true).await?; + + // Can modify Sheet when not in reference sheet or in Host mode + let can_modify_sheet = !is_ref_sheet || is_host_mode; + + if ctx.is_proc_on_local() { + let workspace = try_get_local_workspace(&ctx)?; + let analyzed = AnalyzeResult::analyze_local_status(&workspace).await?; + let latest_file_data = + LatestFileData::read_from(LatestFileData::data_path(&member_id)?).await?; + + if !analyzed.lost.is_empty() || !analyzed.moved.is_empty() { + return Ok(TrackFileActionResult::StructureChangesNotSolved); + } + + let Some(sheet_in_use) = workspace.config().lock().await.sheet_in_use().clone() else { + return Err(TcpTargetError::NotFound("Sheet not found!".to_string())); + }; + + // Read local sheet and member held + let local_sheet = workspace.local_sheet(&member_id, &sheet_in_use).await?; + let cached_sheet = CachedSheet::cached_sheet_data(&sheet_in_use).await?; + let member_held = LatestFileData::read_from(LatestFileData::data_path(&member_id)?).await?; + + let modified = analyzed + .modified + .intersection(&relative_pathes) + .cloned() + .collect::<Vec<_>>(); + + // Filter out created files + let created_task = analyzed + .created + .intersection(&relative_pathes) + .cloned() + .collect::<Vec<_>>(); + + // Filter out modified files that need to be updated + let mut update_task: Vec<PathBuf> = { + let result = modified.iter().filter_map(|p| { + if let Ok(local_data) = local_sheet.mapping_data(p) { + let id = local_data.mapping_vfid(); + let local_ver = local_data.version_when_updated(); + let Some(latest_ver) = latest_file_data.file_version(id) else { + return None; + }; + if let Some(held_member) = member_held.file_holder(id) { + // Check if holder and version match + if held_member == &member_id && local_ver == latest_ver { + return Some(p.clone()); + } + } + }; + None + }); + result.collect() + }; + + let mut skipped_task: Vec<PathBuf> = Vec::new(); + + // Filter out files that do not exist locally or have version inconsistencies and need to be synchronized + let mut sync_task: Vec<PathBuf> = { + let other: Vec<PathBuf> = relative_pathes + .iter() + .filter(|p| !created_task.contains(p) && !update_task.contains(p)) + .cloned() + .collect(); + + let result = other.iter().filter_map(|p| { + // Not exists and not lost, first download + if !workspace.local_path().join(p).exists() && !analyzed.lost.contains(p) { + return Some(p.clone()); + } + + // In cached sheet + if !cached_sheet.mapping().contains_key(p) { + return None; + } + + // In local sheet + let local_sheet_mapping = local_sheet.mapping_data(p).ok()?; + let vfid = local_sheet_mapping.mapping_vfid(); + + if let Some(latest_version) = &latest_file_data.file_version(vfid) { + // Version does not match + if &local_sheet_mapping.version_when_updated() != latest_version { + let modified = modified.contains(p); + if modified && arguments.allow_overwrite_modified { + return Some(p.clone()); + } else if modified && !arguments.allow_overwrite_modified { + // If not allowed to overwrite, join skipped tasks + skipped_task.push(p.clone()); + return None; + } + return Some(p.clone()); + } + } + + // File not held and modified + let holder = latest_file_data.file_holder(vfid); + if (holder.is_none() || &member_id != holder.unwrap()) && modified.contains(p) { + // If allow overwrite modified is true, overwrite the file + if arguments.allow_overwrite_modified { + return Some(p.clone()); + } else { + // If not allowed to overwrite, join skipped tasks + skipped_task.push(p.clone()); + return None; + } + } + + None + }); + result.collect() + }; + + // If the sheet cannot be modified, + // the update_task here should be considered invalid and changed to sync rollback + if !can_modify_sheet { + if arguments.allow_overwrite_modified { + sync_task.append(&mut update_task); + update_task.clear(); + } else { + skipped_task.append(&mut update_task); + update_task.clear(); + } + } + + // Package tasks + let tasks: (Vec<PathBuf>, Vec<PathBuf>, Vec<PathBuf>) = + (created_task, update_task, sync_task); + + // Send to remote + { + let mut mut_instance = instance.lock().await; + mut_instance + .write_large_msgpack(tasks.clone(), 1024u16) + .await?; + // Drop mutex here + } + + // Process create tasks + let mut success_create = Vec::<PathBuf>::new(); + if can_modify_sheet { + success_create = match proc_create_tasks_local( + &ctx, + instance.clone(), + &member_id, + &sheet_name, + tasks.0, + arguments.print_infos, + ) + .await + { + Ok(r) => match r { + CreateTaskResult::Success(relative_pathes) => relative_pathes, + _ => { + return Ok(TrackFileActionResult::CreateTaskFailed(r)); + } + }, + Err(e) => return Err(e), + }; + } + + // Process update tasks + let mut success_update = Vec::<PathBuf>::new(); + if can_modify_sheet { + success_update = match proc_update_tasks_local( + &ctx, + instance.clone(), + &member_id, + &sheet_name, + tasks.1, + arguments.print_infos, + arguments.file_update_info, + ) + .await + { + Ok(r) => match r { + UpdateTaskResult::Success(relative_pathes) => relative_pathes, + _ => { + return Ok(TrackFileActionResult::UpdateTaskFailed(r)); + } + }, + Err(e) => return Err(e), + }; + } + + // Process sync tasks + let success_sync = match proc_sync_tasks_local( + &ctx, + instance.clone(), + &member_id, + &sheet_name, + tasks.2, + arguments.print_infos, + ) + .await + { + Ok(r) => match r { + SyncTaskResult::Success(relative_pathes) => relative_pathes, + }, + Err(e) => return Err(e), + }; + + if success_create.len() + success_update.len() > 0 { + sign_vault_modified(true).await; + } + + return Ok(TrackFileActionResult::Done { + created: success_create, + updated: success_update, + synced: success_sync, + skipped: skipped_task, + }); + } + + if ctx.is_proc_on_remote() { + // Read tasks + let (created_task, update_task, sync_task): (Vec<PathBuf>, Vec<PathBuf>, Vec<PathBuf>) = { + let mut mut_instance = instance.lock().await; + mut_instance.read_large_msgpack(1024u16).await? + }; + + // Process create tasks + let mut success_create = Vec::<PathBuf>::new(); + if can_modify_sheet { + success_create = match proc_create_tasks_remote( + &ctx, + instance.clone(), + &member_id, + &sheet_name, + created_task, + ) + .await + { + Ok(r) => match r { + CreateTaskResult::Success(relative_pathes) => relative_pathes, + _ => { + return Ok(TrackFileActionResult::CreateTaskFailed(r)); + } + }, + Err(e) => return Err(e), + }; + } + + // Process update tasks + let mut success_update = Vec::<PathBuf>::new(); + if can_modify_sheet { + success_update = match proc_update_tasks_remote( + &ctx, + instance.clone(), + &member_id, + &sheet_name, + update_task, + arguments.file_update_info, + ) + .await + { + Ok(r) => match r { + UpdateTaskResult::Success(relative_pathes) => relative_pathes, + _ => { + return Ok(TrackFileActionResult::UpdateTaskFailed(r)); + } + }, + Err(e) => return Err(e), + }; + } + + // Process sync tasks + let success_sync = match proc_sync_tasks_remote( + &ctx, + instance.clone(), + &member_id, + &sheet_name, + sync_task, + ) + .await + { + Ok(r) => match r { + SyncTaskResult::Success(relative_pathes) => relative_pathes, + }, + Err(e) => return Err(e), + }; + + return Ok(TrackFileActionResult::Done { + created: success_create, + updated: success_update, + synced: success_sync, + skipped: Vec::new(), // The server doesn't know which files were skipped + }); + } + + Err(TcpTargetError::NoResult("No result.".to_string())) +} + +async fn proc_create_tasks_local( + ctx: &ActionContext, + instance: Arc<Mutex<ConnectionInstance>>, + member_id: &MemberId, + sheet_name: &SheetName, + relative_paths: Vec<PathBuf>, + print_infos: bool, +) -> Result<CreateTaskResult, TcpTargetError> { + let workspace = try_get_local_workspace(ctx)?; + let local_output = try_get_local_output(ctx)?; + let mut mut_instance = instance.lock().await; + let mut local_sheet = workspace.local_sheet(member_id, sheet_name).await?; + + if print_infos && relative_paths.len() > 0 { + local_println!(local_output, "Creating {} files...", relative_paths.len()); + } + + // Wait for remote detection of whether the sheet exists + let has_sheet = mut_instance.read_msgpack::<bool>().await?; + if !has_sheet { + return Ok(CreateTaskResult::SheetNotFound(sheet_name.clone())); + } + + // Wait for remote detection of whether the file exists + let (hasnt_duplicate, duplicate_path) = mut_instance.read_msgpack::<(bool, PathBuf)>().await?; + if !hasnt_duplicate { + return Ok(CreateTaskResult::CreateFileOnExistPath(duplicate_path)); + } + + let mut success_relative_pathes = Vec::new(); + + // Start sending files + for path in relative_paths { + let full_path = workspace.local_path().join(&path); + + // Send file + if mut_instance.write_file(&full_path).await.is_err() { + continue; + } + + // Read virtual file id and version + let (vfid, version, version_desc) = mut_instance + .read_msgpack::<( + VirtualFileId, + VirtualFileVersion, + VirtualFileVersionDescription, + )>() + .await?; + + // Add mapping to local sheet + let hash = sha1_hash::calc_sha1(&full_path, 2048).await.unwrap().hash; + let time = std::fs::metadata(&full_path)?.modified()?; + local_sheet.add_mapping( + &path.clone(), + LocalMappingMetadata::new( + hash, // hash_when_updated + time, // time_when_updated + std::fs::metadata(&full_path)?.len(), // size_when_updated + version_desc, // version_desc_when_updated + version, // version_when_updated + vfid, // mapping_vfid + time, // last_modifiy_check_itme + false, // last_modifiy_check_result + ), + )?; + + // Print success info + if print_infos { + local_println!(local_output, "+ {}", path.display()); + } + + success_relative_pathes.push(path); + } + + // Write local sheet + local_sheet.write().await?; + + Ok(CreateTaskResult::Success(success_relative_pathes)) +} + +async fn proc_create_tasks_remote( + ctx: &ActionContext, + instance: Arc<Mutex<ConnectionInstance>>, + member_id: &MemberId, + sheet_name: &SheetName, + relative_paths: Vec<PathBuf>, +) -> Result<CreateTaskResult, TcpTargetError> { + let vault = try_get_vault(ctx)?; + let mut mut_instance = instance.lock().await; + + // Sheet check + let Ok(mut sheet) = vault.sheet(sheet_name).await else { + // Sheet not found + mut_instance.write_msgpack(false).await?; + return Ok(CreateTaskResult::SheetNotFound(sheet_name.to_string())); + }; + mut_instance.write_msgpack(true).await?; + + // Duplicate create precheck + for path in relative_paths.iter() { + if sheet.mapping().contains_key(path) { + // Duplicate file + mut_instance.write_msgpack((false, path)).await?; + return Ok(CreateTaskResult::CreateFileOnExistPath(path.clone())); + } + } + mut_instance.write_msgpack((true, PathBuf::new())).await?; + + let mut success_relative_pathes = Vec::new(); + + // Start receiving files + for path in relative_paths { + // Read file and create virtual file + let Ok(vfid) = vault + .create_virtual_file_from_connection(&mut mut_instance, member_id) + .await + else { + continue; + }; + + // Record virtual file to sheet + let vf_meta = vault.virtual_file(&vfid)?.read_meta().await?; + sheet + .add_mapping(path.clone(), vfid.clone(), vf_meta.version_latest()) + .await?; + + // Tell client the virtual file id and version + mut_instance + .write_msgpack(( + vfid, + vf_meta.version_latest(), + vf_meta + .version_description(vf_meta.version_latest()) + .unwrap(), + )) + .await?; + + success_relative_pathes.push(path); + } + + sheet.persist().await?; + + Ok(CreateTaskResult::Success(success_relative_pathes)) +} + +async fn proc_update_tasks_local( + ctx: &ActionContext, + instance: Arc<Mutex<ConnectionInstance>>, + member_id: &MemberId, + sheet_name: &SheetName, + relative_paths: Vec<PathBuf>, + print_infos: bool, + file_update_info: HashMap<PathBuf, (NextVersion, UpdateDescription)>, +) -> Result<UpdateTaskResult, TcpTargetError> { + let workspace = try_get_local_workspace(ctx)?; + let local_output = try_get_local_output(ctx)?; + let mut mut_instance = instance.lock().await; + let mut local_sheet = workspace.local_sheet(member_id, sheet_name).await?; + + let mut success = Vec::new(); + + if print_infos && relative_paths.len() > 0 { + local_println!(local_output, "Updating {} files...", relative_paths.len()); + } + + for path in relative_paths.iter() { + let Ok(mapping) = local_sheet.mapping_data(path) else { + // Is mapping not found, write empty + mut_instance.write_msgpack("".to_string()).await?; + continue; + }; + // Read and send file version + let Ok(_) = mut_instance + .write_msgpack(mapping.version_when_updated()) + .await + else { + continue; + }; + + // Read verify result + let verify_result: bool = mut_instance.read_msgpack().await?; + if !verify_result { + let reason = mut_instance.read_msgpack::<VerifyFailReason>().await?; + return Ok(UpdateTaskResult::VerifyFailed { + path: path.clone(), + reason: reason.clone(), + }); + } + + // Calc hash + let hash_result = match sha1_hash::calc_sha1(workspace.local_path().join(path), 2048).await + { + Ok(r) => r, + Err(_) => { + mut_instance.write_msgpack(false).await?; // Not Ready + continue; + } + }; + + // Get next version + let Some((next_version, description)) = file_update_info.get(path) else { + mut_instance.write_msgpack(false).await?; // Not Ready + continue; + }; + + // Write + mut_instance.write_msgpack(true).await?; // Ready + mut_instance.write_file(path).await?; + + // Read upload result + let upload_result: bool = mut_instance.read_msgpack().await?; + if upload_result { + // Success + let mapping_data_mut = local_sheet.mapping_data_mut(path).unwrap(); + let version = mapping_data_mut.version_when_updated().clone(); + mapping_data_mut.set_hash_when_updated(hash_result.hash); + mapping_data_mut.set_version_when_updated(next_version.clone()); + mapping_data_mut.set_version_desc_when_updated(VirtualFileVersionDescription { + creator: member_id.clone(), + description: description.clone(), + }); + mapping_data_mut.set_last_modifiy_check_result(false); // Mark file not modified + + // Write + local_sheet.write().await?; + + // Push path into success vec + success.push(path.clone()); + + // Print success info + if print_infos { + local_println!( + local_output, + "↑ {} ({} -> {})", + path.display(), + version, + next_version + ); + } + } + } + + Ok(UpdateTaskResult::Success(success)) +} + +async fn proc_update_tasks_remote( + ctx: &ActionContext, + instance: Arc<Mutex<ConnectionInstance>>, + member_id: &MemberId, + sheet_name: &SheetName, + relative_paths: Vec<PathBuf>, + file_update_info: HashMap<PathBuf, (NextVersion, UpdateDescription)>, +) -> Result<UpdateTaskResult, TcpTargetError> { + let vault = try_get_vault(ctx)?; + let mut mut_instance = instance.lock().await; + + let mut success = Vec::new(); + + for path in relative_paths.iter() { + // Read version + let Ok(version) = mut_instance.read_msgpack::<VirtualFileVersion>().await else { + continue; + }; + if version.is_empty() { + continue; + } + + // Verify + let Some((next_version, description)) = file_update_info.get(path) else { + mut_instance.write_msgpack(false).await?; + let reason = VerifyFailReason::UpdateButNoDescription; + mut_instance.write_msgpack(reason.clone()).await?; + return Ok(UpdateTaskResult::VerifyFailed { + path: path.clone(), + reason, + }); // Sheet not found + }; + let Ok(mut sheet) = vault.sheet(sheet_name).await else { + mut_instance.write_msgpack(false).await?; + let reason = VerifyFailReason::SheetNotFound(sheet_name.clone()); + mut_instance.write_msgpack(reason.clone()).await?; + return Ok(UpdateTaskResult::VerifyFailed { + path: path.clone(), + reason, + }); // Sheet not found + }; + let Some(mapping_data) = sheet.mapping_mut().get_mut(path) else { + mut_instance.write_msgpack(false).await?; + let reason = VerifyFailReason::MappingNotFound; + mut_instance.write_msgpack(reason.clone()).await?; + return Ok(UpdateTaskResult::VerifyFailed { + path: path.clone(), + reason, + }); // Mapping not found + }; + let Ok(vf) = vault.virtual_file(&mapping_data.id) else { + mut_instance.write_msgpack(false).await?; + let reason = VerifyFailReason::VirtualFileNotFound(mapping_data.id.clone()); + mut_instance.write_msgpack(reason.clone()).await?; + return Ok(UpdateTaskResult::VerifyFailed { + path: path.clone(), + reason, + }); // Virtual file not found + }; + let Ok(vf_metadata) = vf.read_meta().await else { + mut_instance.write_msgpack(false).await?; + let reason = VerifyFailReason::VirtualFileReadFailed(mapping_data.id.clone()); + mut_instance.write_msgpack(reason.clone()).await?; + return Ok(UpdateTaskResult::VerifyFailed { + path: path.clone(), + reason, + }); // Read virtual file metadata failed + }; + if vf_metadata.versions().contains(next_version) { + mut_instance.write_msgpack(false).await?; + let reason = VerifyFailReason::VersionAlreadyExist(version); + mut_instance.write_msgpack(reason.clone()).await?; + return Ok(UpdateTaskResult::VerifyFailed { + path: path.clone(), + reason, + }); // VersionAlreadyExist + } + if vf_metadata.hold_member() != member_id { + mut_instance.write_msgpack(false).await?; + let reason = VerifyFailReason::NotHeld; + mut_instance.write_msgpack(reason.clone()).await?; + return Ok(UpdateTaskResult::VerifyFailed { + path: path.clone(), + reason, + }); // Member not held it + }; + if vf_metadata.version_latest() != version { + mut_instance.write_msgpack(false).await?; + let reason = + VerifyFailReason::VersionDismatch(version.clone(), vf_metadata.version_latest()); + mut_instance.write_msgpack(reason.clone()).await?; + return Ok(UpdateTaskResult::VerifyFailed { + path: path.clone(), + reason, + }); // Version does not match + }; + mut_instance.write_msgpack(true).await?; // Verified + + // Read if local ready + let ready: bool = mut_instance.read_msgpack().await?; + if !ready { + continue; + } + + // Read and update virtual file + match vault + .update_virtual_file_from_connection( + &mut mut_instance, + member_id, + &mapping_data.id, + next_version, + VirtualFileVersionDescription { + creator: member_id.clone(), + description: description.clone(), + }, + ) + .await + { + Ok(_) => { + // Update version to sheet + mapping_data.version = next_version.clone(); + + // Persist + sheet.persist().await?; + + success.push(path.clone()); + mut_instance.write_msgpack(true).await?; // Success + } + Err(e) => { + mut_instance.write_msgpack(false).await?; // Fail + return Err(e.into()); + } + } + } + + Ok(UpdateTaskResult::Success(success)) +} + +type SyncVersionInfo = Option<( + VirtualFileVersion, + VirtualFileVersionDescription, + VirtualFileId, +)>; + +async fn proc_sync_tasks_local( + ctx: &ActionContext, + instance: Arc<Mutex<ConnectionInstance>>, + member_id: &MemberId, + sheet_name: &SheetName, + relative_paths: Vec<PathBuf>, + print_infos: bool, +) -> Result<SyncTaskResult, TcpTargetError> { + let workspace = try_get_local_workspace(ctx)?; + let local_output = try_get_local_output(ctx)?; + let mut mut_instance = instance.lock().await; + let mut success: Vec<PathBuf> = Vec::new(); + + if print_infos && relative_paths.len() > 0 { + local_println!(local_output, "Syncing {} files...", relative_paths.len()); + } + + for path in relative_paths { + let Some((version, description, vfid)) = + mut_instance.read_msgpack::<SyncVersionInfo>().await? + else { + continue; + }; + + // Generate a temp path + let temp_path = workspace + .local_path() + .join(CLIENT_FILE_TEMP_FILE.replace(KEY_TEMP_NAME, &VaultUuid::new_v4().to_string())); + + let copy_to = workspace.local_path().join(&path); + + // Read file + match mut_instance.read_file(&temp_path).await { + Ok(_) => { + if !temp_path.exists() { + continue; + } + } + Err(_) => { + continue; + } + } + + // Calc hash + let new_hash = match calc_sha1(&temp_path, 2048).await { + Ok(hash) => hash, + Err(_) => { + continue; + } + }; + + // Calc size + let new_size = match fs::metadata(&temp_path).await.map(|meta| meta.len()) { + Ok(size) => size, + Err(_) => { + continue; + } + }; + + // Write file + if copy_to.exists() { + if let Err(_) = fs::remove_file(©_to).await { + continue; + } + } else { + // Not exist, create directory + if let Some(path) = copy_to.clone().parent() { + fs::create_dir_all(path).await?; + } + } + if let Err(_) = fs::rename(&temp_path, ©_to).await { + continue; + } + + // Modify local sheet + let mut local_sheet = match workspace.local_sheet(member_id, sheet_name).await { + Ok(sheet) => sheet, + Err(_) => { + continue; + } + }; + + // Get or create mapping + let mapping = match local_sheet.mapping_data_mut(&path) { + Ok(m) => m, + Err(_) => { + // First download + let mut data = LocalMappingMetadata::default(); + data.set_mapping_vfid(vfid); + if let Err(_) = local_sheet.add_mapping(&path, data) { + continue; + } + match local_sheet.mapping_data_mut(&path) { + Ok(m) => m, + Err(_) => { + continue; + } + } + } + }; + + let time = SystemTime::now(); + mapping.set_hash_when_updated(new_hash.hash); + mapping.set_last_modifiy_check_result(false); // Mark not modified + mapping.set_version_when_updated(version); + mapping.set_version_desc_when_updated(description); + mapping.set_size_when_updated(new_size); + mapping.set_time_when_updated(time); + mapping.set_last_modifiy_check_time(time); + if let Err(_) = local_sheet.write().await { + continue; + } + + success.push(path.clone()); + + // Print success info + if print_infos { + local_println!(local_output, "↓ {}", path.display()); + } + } + Ok(SyncTaskResult::Success(success)) +} + +async fn proc_sync_tasks_remote( + ctx: &ActionContext, + instance: Arc<Mutex<ConnectionInstance>>, + _member_id: &MemberId, + sheet_name: &SheetName, + relative_paths: Vec<PathBuf>, +) -> Result<SyncTaskResult, TcpTargetError> { + let vault = try_get_vault(ctx)?; + let sheet = vault.sheet(sheet_name).await?; + let mut mut_instance = instance.lock().await; + let mut success: Vec<PathBuf> = Vec::new(); + + for path in relative_paths { + // Get mapping + let Some(mapping) = sheet.mapping().get(&path) else { + mut_instance.write_msgpack::<SyncVersionInfo>(None).await?; // (ready) + continue; + }; + // Get virtual file + let Ok(vf) = vault.virtual_file(&mapping.id) else { + mut_instance.write_msgpack::<SyncVersionInfo>(None).await?; // (ready) + continue; + }; + // Read metadata and get real path + let vf_meta = &vf.read_meta().await?; + let real_path = vault.virtual_file_real_path(&mapping.id, &vf_meta.version_latest()); + let version = vf_meta.version_latest(); + mut_instance + .write_msgpack::<SyncVersionInfo>(Some(( + version.clone(), + vf_meta.version_description(version).cloned().unwrap_or( + VirtualFileVersionDescription { + creator: MemberId::default(), + description: "".to_string(), + }, + ), + vf.id(), + ))) + .await?; // (ready) + if mut_instance.write_file(real_path).await.is_err() { + continue; + } else { + success.push(path); + } + } + Ok(SyncTaskResult::Success(success)) +} diff --git a/legacy_actions/src/remote_actions/edit_right_manage.rs b/legacy_actions/src/remote_actions/edit_right_manage.rs new file mode 100644 index 0000000..52305f5 --- /dev/null +++ b/legacy_actions/src/remote_actions/edit_right_manage.rs @@ -0,0 +1 @@ +pub mod change_virtual_file_edit_right; diff --git a/legacy_actions/src/remote_actions/edit_right_manage/change_virtual_file_edit_right.rs b/legacy_actions/src/remote_actions/edit_right_manage/change_virtual_file_edit_right.rs new file mode 100644 index 0000000..1045bce --- /dev/null +++ b/legacy_actions/src/remote_actions/edit_right_manage/change_virtual_file_edit_right.rs @@ -0,0 +1,144 @@ +use std::path::PathBuf; + +use action_system::{action::ActionContext, macros::action_gen}; +use serde::{Deserialize, Serialize}; +use tcp_connection::error::TcpTargetError; +use vcs_data::data::local::modified_status::sign_vault_modified; + +use crate::remote_actions::{ + auth_member, check_connection_instance, get_current_sheet_name, try_get_vault, +}; + +#[derive(Serialize, Deserialize)] +pub enum ChangeVirtualFileEditRightResult { + // Success + Success { + success_hold: Vec<PathBuf>, + success_throw: Vec<PathBuf>, + }, + + // Fail + AuthorizeFailed(String), + DoNothing, +} + +#[derive(Serialize, Deserialize, PartialEq, Clone)] +pub enum EditRightChangeBehaviour { + Hold, + Throw, +} + +/// The server part only checks: +/// 1. Whether the file exists +/// 2. Whether the file has no holder +/// If both conditions are met, send success information to the local client +/// +/// All version checks are handled locally +#[action_gen] +pub async fn change_virtual_file_edit_right_action( + ctx: ActionContext, + arguments: (Vec<(PathBuf, EditRightChangeBehaviour)>, bool), +) -> Result<ChangeVirtualFileEditRightResult, TcpTargetError> { + let instance = check_connection_instance(&ctx)?; + let (relative_paths, print_info) = arguments; + + // Auth Member + let (member_id, is_host_mode) = match auth_member(&ctx, instance).await { + Ok(id) => id, + Err(e) => { + return Ok(ChangeVirtualFileEditRightResult::AuthorizeFailed( + e.to_string(), + )); + } + }; + + // Check sheet + let (sheet_name, _is_ref_sheet) = + get_current_sheet_name(&ctx, instance, &member_id, true).await?; + + if ctx.is_proc_on_remote() { + let mut mut_instance = instance.lock().await; + let mut success_hold: Vec<PathBuf> = Vec::new(); + let mut success_throw: Vec<PathBuf> = Vec::new(); + let vault = try_get_vault(&ctx)?; + for (path, behaviour) in relative_paths { + let Ok(sheet) = vault.sheet(&sheet_name).await else { + continue; + }; + let Some(mapping) = sheet.mapping().get(&path) else { + continue; + }; + let Ok(has_edit_right) = vault + .has_virtual_file_edit_right(&member_id, &mapping.id) + .await + else { + continue; + }; + + // Hold file + if !has_edit_right && behaviour == EditRightChangeBehaviour::Hold { + match vault + .grant_virtual_file_edit_right(&member_id, &mapping.id) + .await + { + Ok(_) => { + success_hold.push(path.clone()); + } + Err(_) => continue, + } + } else + // Throw file + if (has_edit_right || is_host_mode) + && behaviour == EditRightChangeBehaviour::Throw + { + match vault.revoke_virtual_file_edit_right(&mapping.id).await { + Ok(_) => { + success_throw.push(path.clone()); + } + Err(_) => continue, + } + } + } + + // Write success list + mut_instance + .write_large_msgpack::<(Vec<PathBuf>, Vec<PathBuf>)>( + (success_hold.clone(), success_throw.clone()), + 4096u16, + ) + .await?; + return Ok(ChangeVirtualFileEditRightResult::Success { + success_hold, + success_throw, + }); + } + + if ctx.is_proc_on_local() { + let mut mut_instance = instance.lock().await; + let (success_hold, success_throw) = mut_instance + .read_large_msgpack::<(Vec<PathBuf>, Vec<PathBuf>)>(4096u16) + .await?; + + // If there are any successful items, mark as modified + if success_hold.len() + success_throw.len() > 0 { + sign_vault_modified(true).await; + } + + // Print info + if print_info { + success_hold + .iter() + .for_each(|s| println!("--> {}", s.display())); + success_throw + .iter() + .for_each(|s| println!("<-- {}", s.display())); + } + + return Ok(ChangeVirtualFileEditRightResult::Success { + success_hold, + success_throw, + }); + } + + Ok(ChangeVirtualFileEditRightResult::DoNothing) +} diff --git a/legacy_actions/src/remote_actions/mapping_manage.rs b/legacy_actions/src/remote_actions/mapping_manage.rs new file mode 100644 index 0000000..624b4ab --- /dev/null +++ b/legacy_actions/src/remote_actions/mapping_manage.rs @@ -0,0 +1,3 @@ +pub mod edit_mapping; +pub mod merge_share_mapping; +pub mod share_mapping; diff --git a/legacy_actions/src/remote_actions/mapping_manage/edit_mapping.rs b/legacy_actions/src/remote_actions/mapping_manage/edit_mapping.rs new file mode 100644 index 0000000..3c39c5d --- /dev/null +++ b/legacy_actions/src/remote_actions/mapping_manage/edit_mapping.rs @@ -0,0 +1,157 @@ +use std::collections::HashMap; + +use action_system::{action::ActionContext, macros::action_gen}; +use serde::{Deserialize, Serialize}; +use tcp_connection::error::TcpTargetError; +use vcs_data::data::local::{ + modified_status::sign_vault_modified, + workspace_analyzer::{FromRelativePathBuf, ToRelativePathBuf}, +}; + +use crate::{ + remote_actions::{ + auth_member, check_connection_instance, get_current_sheet_name, try_get_vault, + }, + write_and_return, +}; + +pub type OperationArgument = (EditMappingOperations, Option<ToRelativePathBuf>); + +#[derive(Serialize, Deserialize, PartialEq, Eq, Clone)] +pub enum EditMappingOperations { + Move, + Erase, +} + +#[derive(Serialize, Deserialize, Default)] +pub enum EditMappingActionResult { + Success, + + // Fail + AuthorizeFailed(String), + EditNotAllowed, + MappingNotFound(FromRelativePathBuf), + InvalidMove(InvalidMoveReason), + + #[default] + Unknown, +} + +#[derive(Serialize, Deserialize)] +pub enum InvalidMoveReason { + MoveOperationButNoTarget(FromRelativePathBuf), + ContainsDuplicateMapping(ToRelativePathBuf), +} + +#[derive(Serialize, Deserialize, Clone)] +pub struct EditMappingActionArguments { + pub operations: HashMap<FromRelativePathBuf, OperationArgument>, +} + +/// This Action only modifies Sheet Mapping and +/// does not interfere with the actual location of local files or Local Mapping +#[action_gen] +pub async fn edit_mapping_action( + ctx: ActionContext, + args: EditMappingActionArguments, +) -> Result<EditMappingActionResult, TcpTargetError> { + let instance = check_connection_instance(&ctx)?; + + // Auth Member + let (member_id, is_host_mode) = match auth_member(&ctx, instance).await { + Ok(id) => id, + Err(e) => { + return Ok(EditMappingActionResult::AuthorizeFailed(e.to_string())); + } + }; + + // Check sheet + let (sheet_name, is_ref_sheet) = + get_current_sheet_name(&ctx, instance, &member_id, true).await?; + + // Can modify Sheet when not in reference sheet or in Host mode + let can_modify_sheet = !is_ref_sheet || is_host_mode; + + if !can_modify_sheet { + return Ok(EditMappingActionResult::EditNotAllowed); + } + + if ctx.is_proc_on_remote() { + let vault = try_get_vault(&ctx)?; + let mut sheet = vault.sheet(&sheet_name).await?; + + // Precheck + for (from_path, (operation, to_path)) in args.operations.iter() { + // Check mapping exists + if !sheet.mapping().contains_key(from_path) { + write_and_return!( + instance, + EditMappingActionResult::MappingNotFound(from_path.clone()) + ); + } + + // Move check + if operation == &EditMappingOperations::Move { + // Check if target exists + if let Some(to_path) = to_path { + // Check if target is duplicate + if sheet.mapping().contains_key(to_path) { + write_and_return!( + instance, + EditMappingActionResult::InvalidMove( + InvalidMoveReason::ContainsDuplicateMapping(to_path.clone()) + ) + ); + } + } else { + write_and_return!( + instance, + EditMappingActionResult::InvalidMove( + InvalidMoveReason::MoveOperationButNoTarget(from_path.clone()) + ) + ); + } + } + } + + // Process + for (from_path, (operation, to_path)) in args.operations { + match operation { + // During the Precheck phase, it has been ensured that: + // 1. The mapping to be edited for the From path indeed exists + // 2. The location of the To path is indeed empty + // 3. In Move mode, To path can be safely unwrapped + // Therefore, the following unwrap() calls are safe to execute + EditMappingOperations::Move => { + let mapping = sheet.mapping_mut().remove(&from_path).unwrap(); + let to_path = to_path.unwrap(); + sheet + .add_mapping(to_path, mapping.id, mapping.version) + .await?; + } + EditMappingOperations::Erase => { + sheet.mapping_mut().remove(&from_path).unwrap(); + } + } + } + + // Write + sheet.persist().await?; + + write_and_return!(instance, EditMappingActionResult::Success); + } + + if ctx.is_proc_on_local() { + let result = instance + .lock() + .await + .read::<EditMappingActionResult>() + .await?; + if matches!(result, EditMappingActionResult::Success) { + sign_vault_modified(true).await; + } + return Ok(result); + } + + Ok(EditMappingActionResult::Success) +} diff --git a/legacy_actions/src/remote_actions/mapping_manage/merge_share_mapping.rs b/legacy_actions/src/remote_actions/mapping_manage/merge_share_mapping.rs new file mode 100644 index 0000000..df889a1 --- /dev/null +++ b/legacy_actions/src/remote_actions/mapping_manage/merge_share_mapping.rs @@ -0,0 +1,117 @@ +use std::io::ErrorKind; + +use action_system::{action::ActionContext, macros::action_gen}; +use serde::{Deserialize, Serialize}; +use tcp_connection::error::TcpTargetError; +use vcs_data::data::{ + local::modified_status::sign_vault_modified, + vault::mapping_share::{ShareMergeMode, SheetShareId}, +}; + +use crate::{ + remote_actions::{ + auth_member, check_connection_instance, get_current_sheet_name, try_get_vault, + }, + write_and_return, +}; + +#[derive(Serialize, Deserialize, PartialEq, Eq, Clone)] +pub struct MergeShareMappingArguments { + pub share_id: SheetShareId, + pub share_merge_mode: ShareMergeMode, +} + +#[derive(Serialize, Deserialize, Default)] +pub enum MergeShareMappingActionResult { + Success, + + // Fail + HasConflicts, + AuthorizeFailed(String), + EditNotAllowed, + ShareIdNotFound(SheetShareId), + MergeFails(String), + + #[default] + Unknown, +} + +#[action_gen] +pub async fn merge_share_mapping_action( + ctx: ActionContext, + args: MergeShareMappingArguments, +) -> Result<MergeShareMappingActionResult, TcpTargetError> { + let instance = check_connection_instance(&ctx)?; + + // Auth Member + let (member_id, is_host_mode) = match auth_member(&ctx, instance).await { + Ok(id) => id, + Err(e) => { + return Ok(MergeShareMappingActionResult::AuthorizeFailed( + e.to_string(), + )); + } + }; + + // Check sheet + let (sheet_name, is_ref_sheet) = + get_current_sheet_name(&ctx, instance, &member_id, true).await?; + + // Can modify Sheet when not in reference sheet or in Host mode + let can_modify_sheet = !is_ref_sheet || is_host_mode; + + if !can_modify_sheet { + return Ok(MergeShareMappingActionResult::EditNotAllowed); + } + + if ctx.is_proc_on_remote() { + let vault = try_get_vault(&ctx)?; + let share_id = args.share_id; + + // Get the share and sheet + let (sheet, share) = if vault.share_file_path(&sheet_name, &share_id).exists() { + let sheet = vault.sheet(&sheet_name).await?; + let share = sheet.get_share(&share_id).await?; + (sheet, share) + } else { + // Share does not exist + write_and_return!( + instance, + MergeShareMappingActionResult::ShareIdNotFound(share_id.clone()) + ); + }; + + // Perform the merge + match sheet.merge_share(share, args.share_merge_mode).await { + Ok(_) => write_and_return!(instance, MergeShareMappingActionResult::Success), + Err(e) => match e.kind() { + ErrorKind::AlreadyExists => { + write_and_return!(instance, MergeShareMappingActionResult::HasConflicts); + } + _ => { + write_and_return!( + instance, + MergeShareMappingActionResult::MergeFails(e.to_string()) + ); + } + }, + } + } + + if ctx.is_proc_on_local() { + let result = instance + .lock() + .await + .read::<MergeShareMappingActionResult>() + .await?; + match result { + MergeShareMappingActionResult::Success => { + sign_vault_modified(true).await; + } + _ => {} + } + return Ok(result); + } + + Ok(MergeShareMappingActionResult::Success) +} diff --git a/legacy_actions/src/remote_actions/mapping_manage/share_mapping.rs b/legacy_actions/src/remote_actions/mapping_manage/share_mapping.rs new file mode 100644 index 0000000..5c77e53 --- /dev/null +++ b/legacy_actions/src/remote_actions/mapping_manage/share_mapping.rs @@ -0,0 +1,135 @@ +use action_system::{action::ActionContext, macros::action_gen}; +use serde::{Deserialize, Serialize}; +use tcp_connection::error::TcpTargetError; +use vcs_data::{ + constants::VAULT_HOST_NAME, + data::{local::workspace_analyzer::FromRelativePathBuf, sheet::SheetName}, +}; + +use crate::{ + remote_actions::{ + auth_member, check_connection_instance, get_current_sheet_name, try_get_vault, + }, + write_and_return, +}; + +#[derive(Serialize, Deserialize, PartialEq, Eq, Clone)] +pub struct ShareMappingArguments { + pub mappings: Vec<FromRelativePathBuf>, + pub description: String, + // None = current sheet, + // Some(sheet_name) = other ref(public) sheet + pub from_sheet: Option<SheetName>, + pub to_sheet: SheetName, +} + +#[derive(Serialize, Deserialize, Default)] +pub enum ShareMappingActionResult { + Success, + + // Fail + AuthorizeFailed(String), + TargetSheetNotFound(SheetName), + TargetIsSelf, + MappingNotFound(FromRelativePathBuf), + + #[default] + Unknown, +} + +#[action_gen] +pub async fn share_mapping_action( + ctx: ActionContext, + args: ShareMappingArguments, +) -> Result<ShareMappingActionResult, TcpTargetError> { + let instance = check_connection_instance(&ctx)?; + + // Auth Member + let (member_id, _is_host_mode) = match auth_member(&ctx, instance).await { + Ok(id) => id, + Err(e) => { + return Ok(ShareMappingActionResult::AuthorizeFailed(e.to_string())); + } + }; + + // Check sheet + let sheet_name = args.from_sheet.unwrap_or( + get_current_sheet_name(&ctx, instance, &member_id, true) + .await? + .0, + ); + + if ctx.is_proc_on_remote() { + let vault = try_get_vault(&ctx)?; + let sheet = vault.sheet(&sheet_name).await?; + + // Tip: Because sheet_name may specify a sheet that does not belong to the user, + // a secondary verification is required. + + // Check if the sheet holder is Some and matches the member_id or is the host + let Some(holder) = sheet.holder() else { + // If there's no holder, the sheet cannot be shared from + write_and_return!( + instance, + ShareMappingActionResult::AuthorizeFailed("Sheet has no holder".to_string()) + ); + }; + + // Verify the holder is either the current member or the host + if holder != &member_id && holder != VAULT_HOST_NAME { + write_and_return!( + instance, + ShareMappingActionResult::AuthorizeFailed( + "Not sheet holder or ref sheet".to_string() + ) + ); + } + + let to_sheet_name = args.to_sheet; + + // Verify target sheet exists + if !vault.sheet_names()?.contains(&to_sheet_name) { + // Does not exist + write_and_return!( + instance, + ShareMappingActionResult::TargetSheetNotFound(to_sheet_name.clone()) + ); + } + + // Verify sheet is not self + if sheet_name == to_sheet_name { + // Is self + write_and_return!(instance, ShareMappingActionResult::TargetIsSelf); + } + + // Verify all mappings are correct + for mapping in args.mappings.iter() { + if !sheet.mapping().contains_key(mapping) { + // If any mapping is invalid, indicate failure + write_and_return!( + instance, + ShareMappingActionResult::MappingNotFound(mapping.clone()) + ); + } + } + + // Execute sharing logic + sheet + .share_mappings(&to_sheet_name, args.mappings, &member_id, args.description) + .await?; + + // Sharing successful + write_and_return!(instance, ShareMappingActionResult::Success); + } + + if ctx.is_proc_on_local() { + let result = instance + .lock() + .await + .read::<ShareMappingActionResult>() + .await?; + return Ok(result); + } + + Ok(ShareMappingActionResult::Success) +} diff --git a/legacy_actions/src/remote_actions/sheet_manage.rs b/legacy_actions/src/remote_actions/sheet_manage.rs new file mode 100644 index 0000000..a3a91a2 --- /dev/null +++ b/legacy_actions/src/remote_actions/sheet_manage.rs @@ -0,0 +1,2 @@ +pub mod drop_sheet; +pub mod make_sheet; diff --git a/legacy_actions/src/remote_actions/sheet_manage/drop_sheet.rs b/legacy_actions/src/remote_actions/sheet_manage/drop_sheet.rs new file mode 100644 index 0000000..e21f3dd --- /dev/null +++ b/legacy_actions/src/remote_actions/sheet_manage/drop_sheet.rs @@ -0,0 +1,123 @@ +use std::io::ErrorKind; + +use action_system::{action::ActionContext, macros::action_gen}; +use serde::{Deserialize, Serialize}; +use tcp_connection::error::TcpTargetError; +use vcs_data::data::{local::modified_status::sign_vault_modified, sheet::SheetName}; + +use crate::{ + remote_actions::{ + auth_member, check_connection_instance, try_get_local_workspace, try_get_vault, + }, + write_and_return, +}; + +#[derive(Default, Serialize, Deserialize)] +pub enum DropSheetActionResult { + Success, + + // Fail + SheetInUse, + AuthorizeFailed(String), + SheetNotExists, + SheetDropFailed(String), + NoHolder, + NotOwner, + + #[default] + Unknown, +} + +#[action_gen] +pub async fn drop_sheet_action( + ctx: ActionContext, + sheet_name: SheetName, +) -> Result<DropSheetActionResult, TcpTargetError> { + let instance = check_connection_instance(&ctx)?; + + // Auth Member + let (member_id, is_host_mode) = match auth_member(&ctx, instance).await { + Ok(id) => id, + Err(e) => { + return Ok(DropSheetActionResult::AuthorizeFailed(e.to_string())); + } + }; + + // Check sheet in use on local + if ctx.is_proc_on_local() { + let local_workspace = try_get_local_workspace(&ctx)?; + if let Some(sheet) = local_workspace.config().lock().await.sheet_in_use() { + if sheet == &sheet_name { + instance.lock().await.write(false).await?; + return Ok(DropSheetActionResult::SheetInUse); + } + instance.lock().await.write(true).await?; + } else { + instance.lock().await.write(true).await?; + } + } + + if ctx.is_proc_on_remote() { + // Check if client sheet is in use + let sheet_in_use = instance.lock().await.read::<bool>().await?; + if !sheet_in_use { + return Ok(DropSheetActionResult::SheetInUse); + } + + let vault = try_get_vault(&ctx)?; + + // Check if the sheet exists + let mut sheet = match vault.sheet(&sheet_name).await { + Ok(sheet) => sheet, + Err(e) => { + if e.kind() == ErrorKind::NotFound { + write_and_return!(instance, DropSheetActionResult::SheetNotExists); + } else { + write_and_return!( + instance, + DropSheetActionResult::SheetDropFailed(e.to_string()) + ); + } + } + }; + + // Get the sheet's holder + let Some(holder) = sheet.holder() else { + write_and_return!(instance, DropSheetActionResult::NoHolder); + }; + + // Verify that the sheet holder is either the current user or the host + // All sheets belong to the host + if holder != &member_id && !is_host_mode { + write_and_return!(instance, DropSheetActionResult::NotOwner); + } + + // Drop the sheet + sheet.forget_holder(); + match sheet.persist().await { + Ok(_) => { + write_and_return!(instance, DropSheetActionResult::Success); + } + Err(e) => { + write_and_return!( + instance, + DropSheetActionResult::SheetDropFailed(e.to_string()) + ); + } + } + } + + if ctx.is_proc_on_local() { + let result = instance + .lock() + .await + .read::<DropSheetActionResult>() + .await?; + if matches!(result, DropSheetActionResult::Success) { + sign_vault_modified(true).await; + } + return Ok(result); + } + + Err(TcpTargetError::NoResult("No result.".to_string())) +} diff --git a/legacy_actions/src/remote_actions/sheet_manage/make_sheet.rs b/legacy_actions/src/remote_actions/sheet_manage/make_sheet.rs new file mode 100644 index 0000000..a323413 --- /dev/null +++ b/legacy_actions/src/remote_actions/sheet_manage/make_sheet.rs @@ -0,0 +1,98 @@ +use action_system::{action::ActionContext, macros::action_gen}; +use serde::{Deserialize, Serialize}; +use tcp_connection::error::TcpTargetError; +use vcs_data::{ + constants::VAULT_HOST_NAME, + data::{local::modified_status::sign_vault_modified, sheet::SheetName}, +}; + +use crate::{ + remote_actions::{auth_member, check_connection_instance, try_get_vault}, + write_and_return, +}; + +#[derive(Default, Serialize, Deserialize)] +pub enum MakeSheetActionResult { + Success, + SuccessRestore, + + // Fail + AuthorizeFailed(String), + SheetAlreadyExists, + SheetCreationFailed(String), + + #[default] + Unknown, +} + +/// Build a sheet with context +#[action_gen] +pub async fn make_sheet_action( + ctx: ActionContext, + sheet_name: SheetName, +) -> Result<MakeSheetActionResult, TcpTargetError> { + let instance = check_connection_instance(&ctx)?; + + // Auth Member + let (member_id, is_host_mode) = match auth_member(&ctx, instance).await { + Ok(id) => id, + Err(e) => return Ok(MakeSheetActionResult::AuthorizeFailed(e.to_string())), + }; + + if ctx.is_proc_on_remote() { + let vault = try_get_vault(&ctx)?; + let holder = if is_host_mode { + VAULT_HOST_NAME.to_string() + } else { + member_id + }; + + // Check if the sheet already exists + if let Ok(mut sheet) = vault.sheet(&sheet_name).await { + // If the sheet has no holder, assign it to the current member (restore operation) + if sheet.holder().is_none() { + sheet.set_holder(holder.clone()); + match sheet.persist().await { + Ok(_) => { + write_and_return!(instance, MakeSheetActionResult::SuccessRestore); + } + Err(e) => { + write_and_return!( + instance, + MakeSheetActionResult::SheetCreationFailed(e.to_string()) + ); + } + } + } else { + write_and_return!(instance, MakeSheetActionResult::SheetAlreadyExists); + } + } else { + // Create the sheet + match vault.create_sheet(&sheet_name, &holder).await { + Ok(_) => { + write_and_return!(instance, MakeSheetActionResult::Success); + } + Err(e) => { + write_and_return!( + instance, + MakeSheetActionResult::SheetCreationFailed(e.to_string()) + ); + } + } + } + } + + if ctx.is_proc_on_local() { + let result = instance + .lock() + .await + .read::<MakeSheetActionResult>() + .await?; + if matches!(result, MakeSheetActionResult::Success) { + sign_vault_modified(true).await; + } + return Ok(result); + } + + Err(TcpTargetError::NoResult("No result.".to_string())) +} diff --git a/legacy_actions/src/remote_actions/workspace_manage.rs b/legacy_actions/src/remote_actions/workspace_manage.rs new file mode 100644 index 0000000..15f70e8 --- /dev/null +++ b/legacy_actions/src/remote_actions/workspace_manage.rs @@ -0,0 +1,2 @@ +pub mod set_upstream_vault; +pub mod update_to_latest_info; diff --git a/legacy_actions/src/remote_actions/workspace_manage/set_upstream_vault.rs b/legacy_actions/src/remote_actions/workspace_manage/set_upstream_vault.rs new file mode 100644 index 0000000..ba45214 --- /dev/null +++ b/legacy_actions/src/remote_actions/workspace_manage/set_upstream_vault.rs @@ -0,0 +1,101 @@ +use std::net::SocketAddr; + +use action_system::{action::ActionContext, macros::action_gen}; +use cfg_file::config::ConfigFile; +use log::info; +use serde::{Deserialize, Serialize}; +use tcp_connection::error::TcpTargetError; +use vcs_data::data::{local::workspace_config::LocalConfig, vault::vault_config::VaultUuid}; + +use crate::remote_actions::{ + auth_member, check_connection_instance, try_get_local_workspace, try_get_vault, +}; + +#[derive(Serialize, Deserialize)] +pub enum SetUpstreamVaultActionResult { + // Success + DirectedAndStained, + Redirected, + + // Fail + AlreadyStained, + AuthorizeFailed(String), + RedirectFailed(String), + SameUpstream, + + Done, +} + +#[action_gen] +pub async fn set_upstream_vault_action( + ctx: ActionContext, + upstream: SocketAddr, +) -> Result<SetUpstreamVaultActionResult, TcpTargetError> { + let instance = check_connection_instance(&ctx)?; + + // Auth Member + if let Err(e) = auth_member(&ctx, instance).await { + return Ok(SetUpstreamVaultActionResult::AuthorizeFailed(e.to_string())); + } + + // Direct + if ctx.is_proc_on_remote() { + let vault = try_get_vault(&ctx)?; + instance + .lock() + .await + .write(*vault.config().vault_uuid()) + .await?; + return Ok(SetUpstreamVaultActionResult::Done); + } + + if ctx.is_proc_on_local() { + info!("Authorize successful. directing to upstream vault."); + + // Read the vault UUID from the instance + let vault_uuid = instance.lock().await.read::<VaultUuid>().await?; + + let local_workspace = try_get_local_workspace(&ctx)?; + let local_config = local_workspace.config(); + + let mut mut_local_config = local_config.lock().await; + if !mut_local_config.stained() { + // Stain the local workspace + mut_local_config.stain(vault_uuid); + + // Set the upstream address + mut_local_config.set_vault_addr(upstream); + + // Store the updated config + LocalConfig::write(&mut_local_config).await?; + + info!("Workspace stained!"); + return Ok(SetUpstreamVaultActionResult::DirectedAndStained); + } else { + // Local workspace is already stained, redirecting + let Some(stained_uuid) = mut_local_config.stained_uuid() else { + return Ok(SetUpstreamVaultActionResult::RedirectFailed( + "Stained uuid not found".to_string(), + )); + }; + let local_upstream = mut_local_config.upstream_addr(); + + // Address changed, but same UUID. + if vault_uuid == stained_uuid { + if local_upstream != upstream { + // Set the upstream address + mut_local_config.set_vault_addr(upstream); + + // Store the updated config + LocalConfig::write(&mut_local_config).await?; + return Ok(SetUpstreamVaultActionResult::Redirected); + } else { + return Ok(SetUpstreamVaultActionResult::SameUpstream); + } + } + return Ok(SetUpstreamVaultActionResult::AlreadyStained); + } + } + + Err(TcpTargetError::NoResult("No result.".to_string())) +} diff --git a/legacy_actions/src/remote_actions/workspace_manage/update_to_latest_info.rs b/legacy_actions/src/remote_actions/workspace_manage/update_to_latest_info.rs new file mode 100644 index 0000000..cd17c32 --- /dev/null +++ b/legacy_actions/src/remote_actions/workspace_manage/update_to_latest_info.rs @@ -0,0 +1,433 @@ +use std::{ + collections::{HashMap, HashSet}, + io::ErrorKind, + path::PathBuf, + time::SystemTime, +}; + +use action_system::{action::ActionContext, macros::action_gen}; +use cfg_file::config::ConfigFile; +use log::info; +use serde::{Deserialize, Serialize}; +use tcp_connection::error::TcpTargetError; +use vcs_data::{ + constants::{ + CLIENT_PATH_CACHED_SHEET, CLIENT_PATH_LOCAL_SHEET, REF_SHEET_NAME, + SERVER_SUFFIX_SHEET_SHARE_FILE, VAULT_HOST_NAME, + }, + data::{ + local::{ + cached_sheet::CachedSheet, + latest_file_data::LatestFileData, + latest_info::{LatestInfo, SheetInfo}, + modified_status::sign_vault_modified, + }, + member::MemberId, + sheet::{SheetData, SheetName, SheetPathBuf}, + vault::{ + mapping_share::{Share, SheetShareId}, + virtual_file::{VirtualFileId, VirtualFileVersion, VirtualFileVersionDescription}, + }, + }, +}; + +use crate::remote_actions::{ + auth_member, check_connection_instance, try_get_local_workspace, try_get_vault, +}; + +#[derive(Serialize, Deserialize)] +pub enum UpdateToLatestInfoResult { + Success, + + // Fail + AuthorizeFailed(String), + SyncCachedSheetFail(SyncCachedSheetFailReason), +} + +#[derive(Serialize, Deserialize)] +pub enum SyncCachedSheetFailReason { + PathAlreadyExist(PathBuf), +} + +#[action_gen] +pub async fn update_to_latest_info_action( + ctx: ActionContext, + _unused: (), +) -> Result<UpdateToLatestInfoResult, TcpTargetError> { + let instance = check_connection_instance(&ctx)?; + + let (member_id, _is_host_mode) = match auth_member(&ctx, instance).await { + Ok(id) => id, + Err(e) => return Ok(UpdateToLatestInfoResult::AuthorizeFailed(e.to_string())), + }; + + info!("Sending latest info to {}", member_id); + + // Sync Latest Info + { + if ctx.is_proc_on_remote() { + let vault = try_get_vault(&ctx)?; + + // Build latest info + let mut latest_info = LatestInfo::default(); + + // Sheet & Share + let mut shares_in_my_sheets: HashMap<SheetName, HashMap<SheetShareId, Share>> = + HashMap::new(); + let mut member_owned = Vec::new(); + let mut member_visible = Vec::new(); + let mut ref_sheets = HashSet::new(); + + for sheet in vault.sheets().await? { + // Build share parts + if let Some(holder) = sheet.holder() { + if holder == &member_id || holder == VAULT_HOST_NAME { + let mut sheet_shares: HashMap<SheetShareId, Share> = HashMap::new(); + for share in sheet.get_shares().await? { + // Get SharePath + let Some(share_path) = share.path.clone() else { + continue; + }; + // Get ShareId from SharePath + let Some(share_id) = share_path.file_name() else { + continue; + }; + let share_id = share_id.display().to_string(); + let share_id_trimed = + share_id.trim_end_matches(SERVER_SUFFIX_SHEET_SHARE_FILE); + sheet_shares.insert(share_id_trimed.to_string(), share); + } + shares_in_my_sheets.insert(sheet.name().clone(), sheet_shares); + } + } + + // Build sheet parts + let holder_is_host = + sheet.holder().unwrap_or(&String::default()) == &VAULT_HOST_NAME; + if sheet.holder().is_some() + && (sheet.holder().unwrap() == &member_id || holder_is_host) + { + member_owned.push(sheet.name().clone()); + if holder_is_host { + ref_sheets.insert(sheet.name().clone()); + } + } else { + member_visible.push(SheetInfo { + sheet_name: sheet.name().clone(), + holder_name: sheet.holder().cloned(), + }); + } + } + + // Record Share & Sheet + latest_info.visible_sheets = member_owned; + latest_info.invisible_sheets = member_visible; + latest_info.shares_in_my_sheets = shares_in_my_sheets; + + // RefSheet + let ref_sheet_data = vault.sheet(&REF_SHEET_NAME.to_string()).await?.to_data(); + latest_info.ref_sheet_content = ref_sheet_data.clone(); + latest_info.ref_sheet_vfs_mapping = ref_sheet_data + .mapping() + .into_iter() + .map(|(path, file)| (file.id.clone(), path.clone())) + .collect::<HashMap<VirtualFileId, SheetPathBuf>>(); + latest_info.reference_sheets = ref_sheets; + + // Members + let members = vault.members().await?; + latest_info.vault_members = members; + + // Send + instance + .lock() + .await + .write_large_msgpack(latest_info, 512_u16) + .await?; + } + + if ctx.is_proc_on_local() { + let workspace = try_get_local_workspace(&ctx)?; + let mut latest_info = instance + .lock() + .await + .read_large_msgpack::<LatestInfo>(512_u16) + .await?; + latest_info.update_instant = Some(SystemTime::now()); + LatestInfo::write_to( + &latest_info, + LatestInfo::latest_info_path(workspace.local_path(), &member_id), + ) + .await?; + } + } + + info!("Update sheets to {}", member_id); + + // Sync Remote Sheets + { + if ctx.is_proc_on_local() { + let workspace = try_get_local_workspace(&ctx)?; + let Ok(latest_info) = LatestInfo::read_from(LatestInfo::latest_info_path( + workspace.local_path(), + &member_id, + )) + .await + else { + return Err(TcpTargetError::Io("Read latest info failed".to_string())); + }; + + // Collect all local versions + let mut local_versions = vec![]; + for request_sheet in latest_info.visible_sheets { + let Ok(data) = CachedSheet::cached_sheet_data(&request_sheet).await else { + // For newly created sheets, the version is 0. + // Send -1 to distinguish from 0, ensuring the upstream will definitely send the sheet information + local_versions.push((request_sheet, -1)); + continue; + }; + local_versions.push((request_sheet, data.write_count())); + } + + // Send the version list + let len = local_versions.len(); + instance.lock().await.write_msgpack(local_versions).await?; + + if len < 1 { + // Don't return here, continue to next section + // But we need to consume the false marker from the server + if ctx.is_proc_on_local() { + let mut mut_instance = instance.lock().await; + let _: bool = mut_instance.read_msgpack().await?; + } + } else { + // Receive data + if ctx.is_proc_on_local() { + let mut mut_instance = instance.lock().await; + loop { + let in_coming: bool = mut_instance.read_msgpack().await?; + if in_coming { + let (sheet_name, data): (SheetName, SheetData) = + mut_instance.read_large_msgpack(1024u16).await?; + + let Some(path) = CachedSheet::cached_sheet_path(sheet_name) else { + return Err(TcpTargetError::NotFound( + "Workspace not found".to_string(), + )); + }; + + SheetData::write_to(&data, path).await?; + } else { + break; + } + } + } + } + } + if ctx.is_proc_on_remote() { + let vault = try_get_vault(&ctx)?; + let mut mut_instance = instance.lock().await; + + let local_versions = mut_instance.read_msgpack::<Vec<(SheetName, i32)>>().await?; + + for (sheet_name, version) in local_versions.iter() { + let sheet = vault.sheet(sheet_name).await?; + if let Some(holder) = sheet.holder() + && (holder == &member_id || holder == VAULT_HOST_NAME) + && &sheet.write_count() != version + { + mut_instance.write_msgpack(true).await?; + mut_instance + .write_large_msgpack((sheet_name, sheet.to_data()), 1024u16) + .await?; + } + } + mut_instance.write_msgpack(false).await?; + } + } + + info!("Fetch held status to {}", member_id); + + // Sync Held Info + { + if ctx.is_proc_on_local() { + let workspace = try_get_local_workspace(&ctx)?; + + let Ok(latest_info) = LatestInfo::read_from(LatestInfo::latest_info_path( + workspace.local_path(), + &member_id, + )) + .await + else { + return Err(TcpTargetError::Io("Read latest info failed".to_string())); + }; + + // Collect files that need to know the holder + let mut holder_wants_know = Vec::new(); + for sheet_name in &latest_info.visible_sheets { + if let Ok(sheet_data) = CachedSheet::cached_sheet_data(sheet_name).await { + holder_wants_know + .extend(sheet_data.mapping().values().map(|value| value.id.clone())); + } + } + + // Send request + let mut mut_instance = instance.lock().await; + mut_instance + .write_large_msgpack(&holder_wants_know, 1024u16) + .await?; + + // Receive information and write to local + let result: HashMap< + VirtualFileId, + ( + Option<MemberId>, + VirtualFileVersion, + Vec<(VirtualFileVersion, VirtualFileVersionDescription)>, + ), + > = mut_instance.read_large_msgpack(1024u16).await?; + + // Read configuration file + let path = LatestFileData::data_path(&member_id)?; + let mut latest_file_data: LatestFileData = + LatestFileData::read_from(&path).await.unwrap_or_default(); + + // Write the received information + latest_file_data.update_info(result); + + // Write + LatestFileData::write_to(&latest_file_data, &path).await?; + } + + if ctx.is_proc_on_remote() { + let vault = try_get_vault(&ctx)?; + let mut mut_instance = instance.lock().await; + + // Read the request + let holder_wants_know: Vec<VirtualFileId> = + mut_instance.read_large_msgpack(1024u16).await?; + + // Organize the information + let mut result: HashMap< + VirtualFileId, + ( + Option<MemberId>, + VirtualFileVersion, + Vec<(VirtualFileVersion, VirtualFileVersionDescription)>, + ), + > = HashMap::new(); + for id in holder_wants_know { + let Ok(meta) = vault.virtual_file_meta(&id).await else { + continue; + }; + let holder = if meta.hold_member().is_empty() { + None + } else { + Some(meta.hold_member().clone()) + }; + let latest_version = meta.version_latest(); + + let all_versions = meta.versions(); + let all_descriptions = meta.version_descriptions(); + let histories = all_versions + .iter() + .filter_map(|v| { + let Some(desc) = all_descriptions.get(v) else { + return None; + }; + Some((v.clone(), desc.clone())) + }) + .collect::<Vec<(VirtualFileVersion, VirtualFileVersionDescription)>>(); + + result.insert(id, (holder, latest_version, histories)); + } + + // Send information + mut_instance.write_large_msgpack(&result, 1024u16).await?; + } + } + + // Sync cached sheet to local sheet + if ctx.is_proc_on_local() { + let workspace = try_get_local_workspace(&ctx)?; + let cached_sheet_path = workspace.local_path().join(CLIENT_PATH_CACHED_SHEET); + let local_sheet_path = workspace.local_path().join(CLIENT_PATH_LOCAL_SHEET); + if !local_sheet_path.exists() || !cached_sheet_path.exists() { + // No need to sync + if ctx.is_proc_on_local() { + sign_vault_modified(false).await; + } + return Ok(UpdateToLatestInfoResult::Success); + } + + let cached_sheet_paths = + extract_sheet_names_from_paths(CachedSheet::cached_sheet_paths().await?)?; + + // Match cached sheets and local sheets, and sync content + for (cached_sheet_name, _cached_sheet_path) in cached_sheet_paths { + // Read cached sheet and local sheet + let cached_sheet = CachedSheet::cached_sheet_data(&cached_sheet_name).await?; + let Ok(mut local_sheet) = workspace.local_sheet(&member_id, &cached_sheet_name).await + else { + continue; + }; + + // Read cached id mapping + let Some(cached_sheet_id_mapping) = cached_sheet.id_mapping() else { + continue; + }; + + for (cached_item_id, cached_item_path) in cached_sheet_id_mapping.iter() { + let path_by_id = { local_sheet.path_by_id(cached_item_id).cloned() }; + + // Get local path + let Some(local_path) = path_by_id else { + continue; + }; + + if &local_path == cached_item_path { + continue; + } + + // If path not match, try to move + let move_result = local_sheet.move_mapping(&local_path, cached_item_path); + if let Err(e) = move_result { + match e.kind() { + ErrorKind::AlreadyExists => { + return Ok(UpdateToLatestInfoResult::SyncCachedSheetFail( + SyncCachedSheetFailReason::PathAlreadyExist( + cached_item_path.clone(), + ), + )); + } + _ => return Err(e.into()), + } + } + local_sheet.write().await?; + } + } + } + + if ctx.is_proc_on_local() { + sign_vault_modified(false).await; + } + Ok(UpdateToLatestInfoResult::Success) +} + +/// Extract sheet names from file paths +fn extract_sheet_names_from_paths( + paths: Vec<PathBuf>, +) -> Result<HashMap<SheetName, PathBuf>, std::io::Error> { + let mut result = HashMap::new(); + for p in paths { + let sheet_name = p + .file_stem() + .and_then(|s| s.to_str()) + .map(|s| s.to_string()) + .ok_or_else(|| { + std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid file name") + })?; + result.insert(sheet_name, p); + } + Ok(result) +} |
