diff options
| author | 魏曹先生 <1992414357@qq.com> | 2026-02-05 22:35:05 +0800 |
|---|---|---|
| committer | 魏曹先生 <1992414357@qq.com> | 2026-02-05 22:35:05 +0800 |
| commit | 27f6414ad1ff451feb0044af62f37dc2a6255ffa (patch) | |
| tree | cb5693bc014cc8579dcf02a730fd4d2a5dfcf1a5 /actions | |
| parent | ade2fcb9302a4ab759795820dbde3b2b269490ee (diff) | |
Remove examples and legacy code, update .gitignore
- Delete examples directory and its example action system
- Rename actions/ to legacy_actions/ and data/ to legacy_data/
- Update Cargo.toml license file reference
- Move setup scripts to scripts/dev/ directory
- Add todo.txt patterns to .gitignore
Diffstat (limited to 'actions')
32 files changed, 0 insertions, 3061 deletions
diff --git a/actions/Cargo.toml b/actions/Cargo.toml deleted file mode 100644 index 615d3af..0000000 --- a/actions/Cargo.toml +++ /dev/null @@ -1,29 +0,0 @@ -[package] -name = "vcs_actions" -edition = "2024" -version.workspace = true - -[dependencies] - -# Utils -tcp_connection = { path = "../utils/tcp_connection" } -cfg_file = { path = "../utils/cfg_file", features = ["default"] } -sha1_hash = { path = "../utils/sha1_hash" } -string_proc = { path = "../utils/string_proc" } - -# Core dependencies -action_system = { path = "../systems/action" } -vcs_data = { path = "../data" } - -# Error handling -thiserror = "2.0.17" - -# Serialization -serde = { version = "1.0.228", features = ["derive"] } -serde_json = "1.0.145" - -# Async & Networking -tokio = { version = "1.48.0", features = ["full"] } - -# Logging -log = "0.4.28" diff --git a/actions/src/connection.rs b/actions/src/connection.rs deleted file mode 100644 index 918f93c..0000000 --- a/actions/src/connection.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod action_service; -pub mod error; -pub mod protocol; diff --git a/actions/src/connection/action_service.rs b/actions/src/connection/action_service.rs deleted file mode 100644 index 3e05a70..0000000 --- a/actions/src/connection/action_service.rs +++ /dev/null @@ -1,221 +0,0 @@ -use std::{ - env::set_current_dir, - net::SocketAddr, - path::PathBuf, - sync::Arc, - time::{Duration, Instant}, -}; - -use action_system::{action::ActionContext, action_pool::ActionPool}; -use cfg_file::config::ConfigFile; -use log::{debug, error, info, warn}; -use tcp_connection::{error::TcpTargetError, instance::ConnectionInstance}; -use tokio::{ - net::{TcpListener, TcpStream}, - select, signal, spawn, - sync::mpsc, -}; -use vcs_data::data::vault::{Vault, vault_config::VaultConfig}; - -use crate::{ - connection::protocol::RemoteActionInvoke, registry::server_registry::server_action_pool, -}; - -// Start the server with a Vault using the specified directory -pub async fn server_entry( - vault_path: impl Into<PathBuf>, - port_override: u16, -) -> Result<(), TcpTargetError> { - let vault_path = vault_path.into(); - - // Set to vault path - set_current_dir(&vault_path).map_err(|e| TcpTargetError::Io(e.to_string()))?; - - // Read the vault cfg - let vault_cfg = VaultConfig::read().await?; - - // Create TCPListener - let listener = create_tcp_listener(&vault_cfg, port_override).await?; - - // Initialize the vault - let vault: Arc<Vault> = init_vault(vault_cfg, vault_path).await?; - - // Lock the vault - vault - .lock() - .map_err(|e| TcpTargetError::Locked(e.to_string()))?; - - // Create ActionPool - let action_pool: Arc<ActionPool> = Arc::new(server_action_pool()); - - // Start the server - let (_shutdown_rx, future) = build_server_future(vault.clone(), action_pool.clone(), listener); - future.await?; // Start and block until shutdown - - // Unlock the vault - vault.unlock()?; - - Ok(()) -} - -async fn create_tcp_listener( - cfg: &VaultConfig, - port_override: u16, -) -> Result<TcpListener, TcpTargetError> { - let local_bind_addr = cfg.server_config().local_bind(); - let port = if port_override > 0 { - port_override // Override -> PORT > 0 - } else { - cfg.server_config().port() // Default -> Port = 0 - }; - let bind_port = port; - let sock_addr = SocketAddr::new(*local_bind_addr, bind_port); - let listener = TcpListener::bind(sock_addr).await?; - - Ok(listener) -} - -async fn init_vault(cfg: VaultConfig, path: PathBuf) -> Result<Arc<Vault>, TcpTargetError> { - // Init and create the vault - let Some(vault) = Vault::init(cfg, path) else { - return Err(TcpTargetError::NotFound("Vault not found".to_string())); - }; - let vault: Arc<Vault> = Arc::new(vault); - - Ok(vault) -} - -fn build_server_future( - vault: Arc<Vault>, - action_pool: Arc<ActionPool>, - listener: TcpListener, -) -> ( - mpsc::Sender<()>, - impl std::future::Future<Output = Result<(), TcpTargetError>>, -) { - let (tx, mut rx) = mpsc::channel::<i32>(100); - let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1); - let mut active_connections = 0; - let mut shutdown_requested = false; - - // Spawn task to handle Ctrl+C with rapid exit detection - let shutdown_tx_clone = shutdown_tx.clone(); - spawn(async move { - let mut ctrl_c_count = 0; - let mut last_ctrl_c_time = Instant::now(); - - while let Ok(()) = signal::ctrl_c().await { - let now = Instant::now(); - - // Reset counter if more than 5 seconds have passed - if now.duration_since(last_ctrl_c_time) > Duration::from_secs(5) { - ctrl_c_count = 0; - } - - ctrl_c_count += 1; - last_ctrl_c_time = now; - - let _ = shutdown_tx_clone.send(()).await; - - // If 3 Ctrl+C within 5 seconds, exit immediately - if ctrl_c_count >= 3 { - info!("Shutdown. (3/3)"); - std::process::exit(0); - } else { - info!("Ctrl + C to force shutdown. ({} / 3)", ctrl_c_count); - } - } - }); - - let future = async move { - loop { - select! { - // Accept new connections - accept_result = listener.accept(), if !shutdown_requested => { - match accept_result { - Ok((stream, _addr)) => { - debug!("New connection. (now {})", active_connections); - let _ = tx.send(1).await; - - let vault_clone = vault.clone(); - let action_pool_clone = action_pool.clone(); - let tx_clone = tx.clone(); - - spawn(async move { - process_connection(stream, vault_clone, action_pool_clone).await; - debug!("A connection closed. (now {})", active_connections); - let _ = tx_clone.send(-1).await; - }); - } - Err(_) => { - continue; - } - } - } - - // Handle connection count updates - Some(count_change) = rx.recv() => { - active_connections = (active_connections as i32 + count_change) as usize; - - // Check if we should shutdown after all connections are done - if shutdown_requested && active_connections == 0 { - break; - } - } - - // Handle shutdown signal - _ = shutdown_rx.recv() => { - shutdown_requested = true; - // If no active connections, break immediately - if active_connections == 0 { - info!("No active connections. Shutting down."); - break; - } else { - warn!("Cannot shutdown while active connections exist! ({} active)", active_connections); - } - } - } - } - - Ok(()) - }; - - (shutdown_tx, future) -} - -async fn process_connection(stream: TcpStream, vault: Arc<Vault>, action_pool: Arc<ActionPool>) { - // Setup connection instance - let mut instance = ConnectionInstance::from(stream); - - // Read action name and action arguments - let msg = match instance.read_msgpack::<RemoteActionInvoke>().await { - Ok(msg) => msg, - Err(e) => { - error!("Failed to read action message: {}", e); - return; - } - }; - - // Build context - let ctx: ActionContext = ActionContext::remote().insert_instance(instance); - - // Insert vault into context - let ctx = ctx.with_arc_data(vault); - - info!( - "Process action `{}` with argument `{}`", - msg.action_name, msg.action_args_json - ); - - // Process action - let result = action_pool - .process_json(&msg.action_name, ctx, msg.action_args_json) - .await; - - match result { - Ok(_result_json) => {} - Err(e) => { - warn!("Failed to process action `{}`: {}", msg.action_name, e); - } - } -} diff --git a/actions/src/connection/error.rs b/actions/src/connection/error.rs deleted file mode 100644 index 1a4e221..0000000 --- a/actions/src/connection/error.rs +++ /dev/null @@ -1,27 +0,0 @@ -use std::io; -use thiserror::Error; -use vcs_data::data::member::MemberId; - -#[derive(Error, Debug, Clone)] -pub enum ConnectionError { - #[error("I/O error: {0}")] - Io(String), -} - -#[derive(Error, Debug, Clone)] -pub enum ProcessActionError { - #[error("Action `{0}` not registered")] - ActionNotRegistered(String), - - #[error("Authorize `{0}` failed")] - AuthorizeFailed(MemberId), - - #[error("Authorize host `{0}` failed")] - AuthorizeHostFailed(MemberId), -} - -impl From<io::Error> for ConnectionError { - fn from(error: io::Error) -> Self { - ConnectionError::Io(error.to_string()) - } -} diff --git a/actions/src/connection/protocol.rs b/actions/src/connection/protocol.rs deleted file mode 100644 index 2cebe79..0000000 --- a/actions/src/connection/protocol.rs +++ /dev/null @@ -1,7 +0,0 @@ -use serde::{Deserialize, Serialize}; - -#[derive(Default, Clone, Serialize, Deserialize)] -pub struct RemoteActionInvoke { - pub action_name: String, - pub action_args_json: String, -} diff --git a/actions/src/lib.rs b/actions/src/lib.rs deleted file mode 100644 index c1dda86..0000000 --- a/actions/src/lib.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub mod connection; -pub mod local_actions; -pub mod registry; -pub mod remote_actions; diff --git a/actions/src/local_actions.rs b/actions/src/local_actions.rs deleted file mode 100644 index d6f47f9..0000000 --- a/actions/src/local_actions.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod account_manage; -pub mod current_sheet; diff --git a/actions/src/local_actions/account_manage.rs b/actions/src/local_actions/account_manage.rs deleted file mode 100644 index 03a7851..0000000 --- a/actions/src/local_actions/account_manage.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod register_account; -pub mod remove_account; -pub mod switch_account; diff --git a/actions/src/local_actions/account_manage/register_account.rs b/actions/src/local_actions/account_manage/register_account.rs deleted file mode 100644 index e69de29..0000000 --- a/actions/src/local_actions/account_manage/register_account.rs +++ /dev/null diff --git a/actions/src/local_actions/account_manage/remove_account.rs b/actions/src/local_actions/account_manage/remove_account.rs deleted file mode 100644 index e69de29..0000000 --- a/actions/src/local_actions/account_manage/remove_account.rs +++ /dev/null diff --git a/actions/src/local_actions/account_manage/switch_account.rs b/actions/src/local_actions/account_manage/switch_account.rs deleted file mode 100644 index e69de29..0000000 --- a/actions/src/local_actions/account_manage/switch_account.rs +++ /dev/null diff --git a/actions/src/local_actions/current_sheet.rs b/actions/src/local_actions/current_sheet.rs deleted file mode 100644 index 785d7ee..0000000 --- a/actions/src/local_actions/current_sheet.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod exit_sheet; -pub mod use_sheet; diff --git a/actions/src/local_actions/current_sheet/exit_sheet.rs b/actions/src/local_actions/current_sheet/exit_sheet.rs deleted file mode 100644 index e69de29..0000000 --- a/actions/src/local_actions/current_sheet/exit_sheet.rs +++ /dev/null diff --git a/actions/src/local_actions/current_sheet/use_sheet.rs b/actions/src/local_actions/current_sheet/use_sheet.rs deleted file mode 100644 index e69de29..0000000 --- a/actions/src/local_actions/current_sheet/use_sheet.rs +++ /dev/null diff --git a/actions/src/registry.rs b/actions/src/registry.rs deleted file mode 100644 index ceec1a1..0000000 --- a/actions/src/registry.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod client_registry; -pub mod server_registry; diff --git a/actions/src/registry/client_registry.rs b/actions/src/registry/client_registry.rs deleted file mode 100644 index db69889..0000000 --- a/actions/src/registry/client_registry.rs +++ /dev/null @@ -1,128 +0,0 @@ -use std::sync::Arc; - -use action_system::{action::ActionContext, action_pool::ActionPool}; -use cfg_file::config::ConfigFile; -use tcp_connection::error::TcpTargetError; -use vcs_data::data::{ - local::{LocalWorkspace, workspace_config::LocalConfig}, - user::UserDirectory, -}; - -use crate::{ - connection::protocol::RemoteActionInvoke, - remote_actions::{ - content_manage::track_file::register_track_file_action, - edit_right_manage::change_virtual_file_edit_right::register_change_virtual_file_edit_right_action, - mapping_manage::{ - edit_mapping::register_edit_mapping_action, - merge_share_mapping::register_merge_share_mapping_action, - share_mapping::register_share_mapping_action, - }, - sheet_manage::{ - drop_sheet::register_drop_sheet_action, make_sheet::register_make_sheet_action, - }, - workspace_manage::{ - set_upstream_vault::register_set_upstream_vault_action, - update_to_latest_info::register_update_to_latest_info_action, - }, - }, -}; - -fn register_actions(pool: &mut ActionPool) { - // Pool register here - - // Local Actions - register_set_upstream_vault_action(pool); - register_update_to_latest_info_action(pool); - - // Sheet Actions - register_make_sheet_action(pool); - register_drop_sheet_action(pool); - register_edit_mapping_action(pool); - - // Share / Merge Share Actions - register_share_mapping_action(pool); - register_merge_share_mapping_action(pool); - - // Track Action - register_track_file_action(pool); - - // User Actions - register_change_virtual_file_edit_right_action(pool); -} - -pub fn client_action_pool() -> ActionPool { - // Create pool - let mut pool = ActionPool::new(); - - // Register actions - register_actions(&mut pool); - - // Add process events - pool.set_on_proc_begin(|ctx, args| Box::pin(on_proc_begin(ctx, args))); - - // Return - pool -} - -async fn on_proc_begin( - ctx: &mut ActionContext, - _args: &(dyn std::any::Any + Send + Sync), -) -> Result<(), TcpTargetError> { - // Is ctx remote - let is_remote = ctx.is_remote_action(); - - // Action name and arguments - let action_name = ctx.action_name().to_string(); - let action_args_json = ctx.action_args_json().clone(); - - // Insert LocalWorkspace Arc - let Ok(local_config) = LocalConfig::read().await else { - return Err(TcpTargetError::NotFound( - "The current directory does not have a local workspace".to_string(), - )); - }; - let local_workspace = match LocalWorkspace::init_current_dir(local_config) { - Some(workspace) => workspace, - None => { - return Err(TcpTargetError::NotFound( - "Failed to initialize local workspace.".to_string(), - )); - } - }; - let local_workspace_arc = Arc::new(local_workspace); - ctx.insert_arc_data(local_workspace_arc); - - // Insert UserDirectory Arc - let Some(user_directory) = UserDirectory::current_cfg_dir() else { - return Err(TcpTargetError::NotFound( - "The user directory does not exist.".to_string(), - )); - }; - - let user_directory_arc = Arc::new(user_directory); - ctx.insert_arc_data(user_directory_arc); - - // Get instance - let Some(instance) = ctx.instance() else { - return Err(TcpTargetError::Unsupported( - "Missing ConnectionInstance in current context, this ActionPool does not support this call" - .to_string())); - }; - - // If it's remote, invoke action at server - if is_remote { - // Build protocol message - let msg = RemoteActionInvoke { - action_name, - action_args_json, - }; - - // Send - let mut instance = instance.lock().await; - instance.write_msgpack(&msg).await?; - } - - // Return OK, wait for client to execute Action locally - Ok(()) -} diff --git a/actions/src/registry/server_registry.rs b/actions/src/registry/server_registry.rs deleted file mode 100644 index aee867c..0000000 --- a/actions/src/registry/server_registry.rs +++ /dev/null @@ -1,43 +0,0 @@ -use action_system::action_pool::ActionPool; - -use crate::remote_actions::{ - content_manage::track_file::register_track_file_action, - edit_right_manage::change_virtual_file_edit_right::register_change_virtual_file_edit_right_action, - mapping_manage::{ - edit_mapping::register_edit_mapping_action, - merge_share_mapping::register_merge_share_mapping_action, - share_mapping::register_share_mapping_action, - }, - sheet_manage::{ - drop_sheet::register_drop_sheet_action, make_sheet::register_make_sheet_action, - }, - workspace_manage::{ - set_upstream_vault::register_set_upstream_vault_action, - update_to_latest_info::register_update_to_latest_info_action, - }, -}; - -pub fn server_action_pool() -> ActionPool { - let mut pool = ActionPool::new(); - - // Local Actions - register_set_upstream_vault_action(&mut pool); - register_update_to_latest_info_action(&mut pool); - - // Sheet Actions - register_make_sheet_action(&mut pool); - register_drop_sheet_action(&mut pool); - register_edit_mapping_action(&mut pool); - - // Share / Merge Share Actions - register_share_mapping_action(&mut pool); - register_merge_share_mapping_action(&mut pool); - - // Track Action - register_track_file_action(&mut pool); - - // User Actions - register_change_virtual_file_edit_right_action(&mut pool); - - pool -} diff --git a/actions/src/remote_actions.rs b/actions/src/remote_actions.rs deleted file mode 100644 index d15edc9..0000000 --- a/actions/src/remote_actions.rs +++ /dev/null @@ -1,288 +0,0 @@ -use std::sync::Arc; - -use action_system::action::ActionContext; -use cfg_file::config::ConfigFile; -use tcp_connection::{error::TcpTargetError, instance::ConnectionInstance}; -use tokio::sync::{Mutex, mpsc::Sender}; -use vcs_data::{ - constants::{SERVER_PATH_MEMBER_PUB, VAULT_HOST_NAME}, - data::{ - local::{LocalWorkspace, latest_info::LatestInfo, workspace_config::LocalConfig}, - member::MemberId, - sheet::SheetName, - user::UserDirectory, - vault::Vault, - }, -}; - -pub mod content_manage; -pub mod edit_right_manage; -pub mod mapping_manage; -pub mod sheet_manage; -pub mod workspace_manage; - -/// Check if the connection instance is valid in the given context. -/// This function is used to verify the connection instance in actions that require remote calls. -pub fn check_connection_instance( - ctx: &ActionContext, -) -> Result<&Arc<Mutex<ConnectionInstance>>, TcpTargetError> { - let Some(instance) = ctx.instance() else { - return Err(TcpTargetError::NotFound( - "Connection instance lost.".to_string(), - )); - }; - Ok(instance) -} - -/// Try to get the Vault instance from the context. -pub fn try_get_vault(ctx: &ActionContext) -> Result<Arc<Vault>, TcpTargetError> { - let Some(vault) = ctx.get_arc::<Vault>() else { - return Err(TcpTargetError::NotFound( - "Vault instance not found".to_string(), - )); - }; - Ok(vault) -} - -/// Try to get the LocalWorkspace instance from the context. -pub fn try_get_local_workspace(ctx: &ActionContext) -> Result<Arc<LocalWorkspace>, TcpTargetError> { - let Some(local_workspace) = ctx.get_arc::<LocalWorkspace>() else { - return Err(TcpTargetError::NotFound( - "LocalWorkspace instance not found".to_string(), - )); - }; - Ok(local_workspace) -} - -/// Try to get the UserDirectory instance from the context. -pub fn try_get_user_directory(ctx: &ActionContext) -> Result<Arc<UserDirectory>, TcpTargetError> { - let Some(user_directory) = ctx.get_arc::<UserDirectory>() else { - return Err(TcpTargetError::NotFound( - "UserDirectory instance not found".to_string(), - )); - }; - Ok(user_directory) -} - -/// Try to get the LocalWorkspace instance from the context. -pub fn try_get_local_output(ctx: &ActionContext) -> Result<Arc<Sender<String>>, TcpTargetError> { - let Some(output) = ctx.get_arc::<Sender<String>>() else { - return Err(TcpTargetError::NotFound( - "Client sender not found".to_string(), - )); - }; - Ok(output) -} - -/// Authenticate member based on context and return MemberId -pub async fn auth_member( - ctx: &ActionContext, - instance: &Arc<Mutex<ConnectionInstance>>, -) -> Result<(MemberId, bool), TcpTargetError> { - // Window开服Linux连接 -> 此函数内产生 early eof - // ~ WS # jv update - // 身份认证失败:I/O error: early eof! - - // 分析相应流程: - // 1. 服务端发起挑战,客户端接受 - // 2. 服务端发送结果,客户端接受 - // 3. 推测此时发生 early eof ---> 无 ack,导致客户端尝试拿到结果时,服务端已经结束 - // 这很有可能是 Windows 和 Linux 对于连接处理的方案差异导致的问题,需要进一步排查 - - // Start Challenge (Remote) - if ctx.is_proc_on_remote() { - let mut mut_instance = instance.lock().await; - let vault = try_get_vault(ctx)?; - - let using_host_mode = mut_instance.read_msgpack::<bool>().await?; - - let result = mut_instance - .challenge(vault.vault_path().join(SERVER_PATH_MEMBER_PUB)) - .await; - - return match result { - Ok((pass, member_id)) => { - if !pass { - // Send false to inform the client that authentication failed - mut_instance.write(false).await?; - Err(TcpTargetError::Authentication( - "Authenticate failed.".to_string(), - )) - } else { - if using_host_mode { - if vault.config().vault_host_list().contains(&member_id) { - // Using Host mode authentication, and is indeed an administrator - mut_instance.write(true).await?; - Ok((member_id, true)) - } else { - // Using Host mode authentication, but not an administrator - mut_instance.write(false).await?; - Err(TcpTargetError::Authentication( - "Authenticate failed.".to_string(), - )) - } - } else { - // Not using Host mode authentication - mut_instance.write(true).await?; - Ok((member_id, false)) - } - } - } - Err(e) => Err(e), - }; - } - - // Accept Challenge (Local) - if ctx.is_proc_on_local() { - let mut mut_instance = instance.lock().await; - let local_workspace = try_get_local_workspace(ctx)?; - let (is_host_mode, member_name) = { - let cfg = local_workspace.config().lock_owned().await; - (cfg.is_host_mode(), cfg.current_account()) - }; - let user_directory = try_get_user_directory(ctx)?; - - // Inform remote whether to authenticate in Host mode - mut_instance.write_msgpack(is_host_mode).await?; - - // Member name & Private key - let private_key = user_directory.account_private_key_path(&member_name); - let _ = mut_instance - .accept_challenge(private_key, &member_name) - .await?; - - // Read result - let challenge_result = mut_instance.read::<bool>().await?; - if challenge_result { - return Ok((member_name.clone(), is_host_mode)); - } else { - return Err(TcpTargetError::Authentication( - "Authenticate failed.".to_string(), - )); - } - } - - Err(TcpTargetError::NoResult("Auth failed.".to_string())) -} - -/// Get the current sheet name based on the context (local or remote). -/// This function handles the communication between local and remote instances -/// to verify and retrieve the current sheet name and whether it's a reference sheet. -/// -/// On local: -/// - Reads the current sheet from local configuration -/// - Sends the sheet name to remote for verification -/// - Returns the sheet name and whether it's a reference sheet if remote confirms it exists -/// -/// On remote: -/// - Receives sheet name from local -/// - Verifies the sheet exists in the vault -/// - Checks if the sheet is a reference sheet -/// - If allow_ref is true, reference sheets are allowed to pass verification -/// - Sends confirmation and reference status back to local -/// -/// Returns a tuple of (SheetName, bool) where the bool indicates if it's a reference sheet, -/// or an error if the sheet doesn't exist or doesn't meet the verification criteria. -pub async fn get_current_sheet_name( - ctx: &ActionContext, - instance: &Arc<Mutex<ConnectionInstance>>, - member_id: &MemberId, - allow_ref: bool, -) -> Result<(SheetName, bool), TcpTargetError> { - let mut mut_instance = instance.lock().await; - if ctx.is_proc_on_local() { - let workspace = try_get_local_workspace(ctx)?; - let config = LocalConfig::read().await?; - let latest = LatestInfo::read_from(LatestInfo::latest_info_path( - workspace.local_path(), - member_id, - )) - .await?; - if let Some(sheet_name) = config.sheet_in_use() { - // Send sheet name - mut_instance.write_msgpack(sheet_name).await?; - - // Read result - if mut_instance.read_msgpack::<bool>().await? { - // Check if sheet is a reference sheet - let is_ref_sheet = latest.reference_sheets.contains(sheet_name); - if allow_ref { - // Allow reference sheets, directly return the determination result - return Ok((sheet_name.clone(), is_ref_sheet)); - } else if is_ref_sheet { - // Not allowed but it's a reference sheet, return an error - return Err(TcpTargetError::ReferenceSheetNotAllowed( - "Reference sheet not allowed".to_string(), - )); - } else { - // Not allowed but not a reference sheet, return normally - return Ok((sheet_name.clone(), false)); - } - } else { - return Err(TcpTargetError::NotFound("Sheet not found".to_string())); - } - } - // Send empty sheet_name - mut_instance.write_msgpack("".to_string()).await?; - - // Read result, since we know it's impossible to pass here, we just consume this result - let _ = mut_instance.read_msgpack::<bool>().await?; - - return Err(TcpTargetError::NotFound("Sheet not found".to_string())); - } - if ctx.is_proc_on_remote() { - let vault = try_get_vault(ctx)?; - - // Read sheet name - let sheet_name: SheetName = mut_instance.read_msgpack().await?; - - // Check if sheet exists - if let Ok(sheet) = vault.sheet(&sheet_name).await - && let Some(holder) = sheet.holder() - { - let is_ref_sheet = holder == VAULT_HOST_NAME; - if allow_ref { - // Allow reference sheets, directly return the determination result - if holder == member_id || holder == VAULT_HOST_NAME { - mut_instance.write_msgpack(true).await?; - return Ok((sheet.name().clone(), is_ref_sheet)); - } - } else if is_ref_sheet { - // Not allowed but it's a reference sheet, return an error - mut_instance.write_msgpack(true).await?; - return Err(TcpTargetError::ReferenceSheetNotAllowed( - "Reference sheet not allowed".to_string(), - )); - } else { - // Not allowed but not a reference sheet, return normally - if holder == member_id { - mut_instance.write_msgpack(true).await?; - return Ok((sheet_name.clone(), false)); - } - } - } - // Tell local the check is not passed - mut_instance.write_msgpack(false).await?; - return Err(TcpTargetError::NotFound("Sheet not found".to_string())); - } - Err(TcpTargetError::NoResult("NoResult".to_string())) -} - -/// The macro to write and return a result. -#[macro_export] -macro_rules! write_and_return { - ($instance:expr, $result:expr) => {{ - $instance.lock().await.write($result).await?; - return Ok($result); - }}; -} - -/// The macro to send formatted string to output channel. -/// Usage: local_println!(output, "format string", arg1, arg2, ...) -#[macro_export] -macro_rules! local_println { - ($output:expr, $($arg:tt)*) => {{ - let formatted = format!($($arg)*); - let _ = $output.send(formatted).await; - }}; -} diff --git a/actions/src/remote_actions/content_manage.rs b/actions/src/remote_actions/content_manage.rs deleted file mode 100644 index 1568085..0000000 --- a/actions/src/remote_actions/content_manage.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod track_file; diff --git a/actions/src/remote_actions/content_manage/track_file.rs b/actions/src/remote_actions/content_manage/track_file.rs deleted file mode 100644 index a59ca76..0000000 --- a/actions/src/remote_actions/content_manage/track_file.rs +++ /dev/null @@ -1,985 +0,0 @@ -use std::{ - collections::{HashMap, HashSet}, - path::PathBuf, - sync::Arc, - time::SystemTime, -}; - -use action_system::{action::ActionContext, macros::action_gen}; -use cfg_file::config::ConfigFile; -use serde::{Deserialize, Serialize}; -use sha1_hash::calc_sha1; -use tcp_connection::{error::TcpTargetError, instance::ConnectionInstance}; -use tokio::{fs, sync::Mutex}; -use vcs_data::{ - constants::{CLIENT_FILE_TEMP_FILE, KEY_TEMP_NAME}, - data::{ - local::{ - cached_sheet::CachedSheet, latest_file_data::LatestFileData, - local_sheet::LocalMappingMetadata, modified_status::sign_vault_modified, - workspace_analyzer::AnalyzeResult, - }, - member::MemberId, - sheet::SheetName, - vault::{ - vault_config::VaultUuid, - virtual_file::{VirtualFileId, VirtualFileVersion, VirtualFileVersionDescription}, - }, - }, -}; - -use crate::{ - local_println, - remote_actions::{ - auth_member, check_connection_instance, get_current_sheet_name, try_get_local_output, - try_get_local_workspace, try_get_vault, - }, -}; - -pub type NextVersion = String; -pub type UpdateDescription = String; - -#[derive(Serialize, Deserialize)] -pub struct TrackFileActionArguments { - // Path need to track - pub relative_pathes: HashSet<PathBuf>, - - // File update info - pub file_update_info: HashMap<PathBuf, (NextVersion, UpdateDescription)>, - - // Print infos - pub print_infos: bool, - - // overwrite modified files - pub allow_overwrite_modified: bool, -} - -#[derive(Serialize, Deserialize)] -pub enum TrackFileActionResult { - Done { - created: Vec<PathBuf>, - updated: Vec<PathBuf>, - synced: Vec<PathBuf>, - skipped: Vec<PathBuf>, - }, - - // Fail - AuthorizeFailed(String), - - /// There are local move or missing items that have not been resolved, - /// this situation does not allow track - StructureChangesNotSolved, - - CreateTaskFailed(CreateTaskResult), - UpdateTaskFailed(UpdateTaskResult), - SyncTaskFailed(SyncTaskResult), -} - -#[derive(Serialize, Deserialize)] -pub enum CreateTaskResult { - Success(Vec<PathBuf>), // Success(success_relative_pathes) - - /// Create file on existing path in the sheet - CreateFileOnExistPath(PathBuf), - - /// Sheet not found - SheetNotFound(SheetName), -} - -#[derive(Serialize, Deserialize)] -pub enum UpdateTaskResult { - Success(Vec<PathBuf>), // Success(success_relative_pathes) - - VerifyFailed { - path: PathBuf, - reason: VerifyFailReason, - }, -} - -#[derive(Serialize, Deserialize, Clone)] -pub enum VerifyFailReason { - SheetNotFound(SheetName), - MappingNotFound, - VirtualFileNotFound(VirtualFileId), - VirtualFileReadFailed(VirtualFileId), - NotHeld, - VersionDismatch(VirtualFileVersion, VirtualFileVersion), // (CurrentVersion, RemoteVersion) - UpdateButNoDescription, // File needs update, but no description exists - VersionAlreadyExist(VirtualFileVersion), // (RemoteVersion) -} - -#[derive(Serialize, Deserialize)] -pub enum SyncTaskResult { - Success(Vec<PathBuf>), // Success(success_relative_pathes) -} -#[action_gen] -pub async fn track_file_action( - ctx: ActionContext, - arguments: TrackFileActionArguments, -) -> Result<TrackFileActionResult, TcpTargetError> { - let relative_pathes = arguments.relative_pathes; - let instance = check_connection_instance(&ctx)?; - - // Auth Member - let (member_id, is_host_mode) = match auth_member(&ctx, instance).await { - Ok(id) => id, - Err(e) => return Ok(TrackFileActionResult::AuthorizeFailed(e.to_string())), - }; - - // Check sheet - let (sheet_name, is_ref_sheet) = - get_current_sheet_name(&ctx, instance, &member_id, true).await?; - - // Can modify Sheet when not in reference sheet or in Host mode - let can_modify_sheet = !is_ref_sheet || is_host_mode; - - if ctx.is_proc_on_local() { - let workspace = try_get_local_workspace(&ctx)?; - let analyzed = AnalyzeResult::analyze_local_status(&workspace).await?; - let latest_file_data = - LatestFileData::read_from(LatestFileData::data_path(&member_id)?).await?; - - if !analyzed.lost.is_empty() || !analyzed.moved.is_empty() { - return Ok(TrackFileActionResult::StructureChangesNotSolved); - } - - let Some(sheet_in_use) = workspace.config().lock().await.sheet_in_use().clone() else { - return Err(TcpTargetError::NotFound("Sheet not found!".to_string())); - }; - - // Read local sheet and member held - let local_sheet = workspace.local_sheet(&member_id, &sheet_in_use).await?; - let cached_sheet = CachedSheet::cached_sheet_data(&sheet_in_use).await?; - let member_held = LatestFileData::read_from(LatestFileData::data_path(&member_id)?).await?; - - let modified = analyzed - .modified - .intersection(&relative_pathes) - .cloned() - .collect::<Vec<_>>(); - - // Filter out created files - let created_task = analyzed - .created - .intersection(&relative_pathes) - .cloned() - .collect::<Vec<_>>(); - - // Filter out modified files that need to be updated - let mut update_task: Vec<PathBuf> = { - let result = modified.iter().filter_map(|p| { - if let Ok(local_data) = local_sheet.mapping_data(p) { - let id = local_data.mapping_vfid(); - let local_ver = local_data.version_when_updated(); - let Some(latest_ver) = latest_file_data.file_version(id) else { - return None; - }; - if let Some(held_member) = member_held.file_holder(id) { - // Check if holder and version match - if held_member == &member_id && local_ver == latest_ver { - return Some(p.clone()); - } - } - }; - None - }); - result.collect() - }; - - let mut skipped_task: Vec<PathBuf> = Vec::new(); - - // Filter out files that do not exist locally or have version inconsistencies and need to be synchronized - let mut sync_task: Vec<PathBuf> = { - let other: Vec<PathBuf> = relative_pathes - .iter() - .filter(|p| !created_task.contains(p) && !update_task.contains(p)) - .cloned() - .collect(); - - let result = other.iter().filter_map(|p| { - // Not exists and not lost, first download - if !workspace.local_path().join(p).exists() && !analyzed.lost.contains(p) { - return Some(p.clone()); - } - - // In cached sheet - if !cached_sheet.mapping().contains_key(p) { - return None; - } - - // In local sheet - let local_sheet_mapping = local_sheet.mapping_data(p).ok()?; - let vfid = local_sheet_mapping.mapping_vfid(); - - if let Some(latest_version) = &latest_file_data.file_version(vfid) { - // Version does not match - if &local_sheet_mapping.version_when_updated() != latest_version { - let modified = modified.contains(p); - if modified && arguments.allow_overwrite_modified { - return Some(p.clone()); - } else if modified && !arguments.allow_overwrite_modified { - // If not allowed to overwrite, join skipped tasks - skipped_task.push(p.clone()); - return None; - } - return Some(p.clone()); - } - } - - // File not held and modified - let holder = latest_file_data.file_holder(vfid); - if (holder.is_none() || &member_id != holder.unwrap()) && modified.contains(p) { - // If allow overwrite modified is true, overwrite the file - if arguments.allow_overwrite_modified { - return Some(p.clone()); - } else { - // If not allowed to overwrite, join skipped tasks - skipped_task.push(p.clone()); - return None; - } - } - - None - }); - result.collect() - }; - - // If the sheet cannot be modified, - // the update_task here should be considered invalid and changed to sync rollback - if !can_modify_sheet { - if arguments.allow_overwrite_modified { - sync_task.append(&mut update_task); - update_task.clear(); - } else { - skipped_task.append(&mut update_task); - update_task.clear(); - } - } - - // Package tasks - let tasks: (Vec<PathBuf>, Vec<PathBuf>, Vec<PathBuf>) = - (created_task, update_task, sync_task); - - // Send to remote - { - let mut mut_instance = instance.lock().await; - mut_instance - .write_large_msgpack(tasks.clone(), 1024u16) - .await?; - // Drop mutex here - } - - // Process create tasks - let mut success_create = Vec::<PathBuf>::new(); - if can_modify_sheet { - success_create = match proc_create_tasks_local( - &ctx, - instance.clone(), - &member_id, - &sheet_name, - tasks.0, - arguments.print_infos, - ) - .await - { - Ok(r) => match r { - CreateTaskResult::Success(relative_pathes) => relative_pathes, - _ => { - return Ok(TrackFileActionResult::CreateTaskFailed(r)); - } - }, - Err(e) => return Err(e), - }; - } - - // Process update tasks - let mut success_update = Vec::<PathBuf>::new(); - if can_modify_sheet { - success_update = match proc_update_tasks_local( - &ctx, - instance.clone(), - &member_id, - &sheet_name, - tasks.1, - arguments.print_infos, - arguments.file_update_info, - ) - .await - { - Ok(r) => match r { - UpdateTaskResult::Success(relative_pathes) => relative_pathes, - _ => { - return Ok(TrackFileActionResult::UpdateTaskFailed(r)); - } - }, - Err(e) => return Err(e), - }; - } - - // Process sync tasks - let success_sync = match proc_sync_tasks_local( - &ctx, - instance.clone(), - &member_id, - &sheet_name, - tasks.2, - arguments.print_infos, - ) - .await - { - Ok(r) => match r { - SyncTaskResult::Success(relative_pathes) => relative_pathes, - }, - Err(e) => return Err(e), - }; - - if success_create.len() + success_update.len() > 0 { - sign_vault_modified(true).await; - } - - return Ok(TrackFileActionResult::Done { - created: success_create, - updated: success_update, - synced: success_sync, - skipped: skipped_task, - }); - } - - if ctx.is_proc_on_remote() { - // Read tasks - let (created_task, update_task, sync_task): (Vec<PathBuf>, Vec<PathBuf>, Vec<PathBuf>) = { - let mut mut_instance = instance.lock().await; - mut_instance.read_large_msgpack(1024u16).await? - }; - - // Process create tasks - let mut success_create = Vec::<PathBuf>::new(); - if can_modify_sheet { - success_create = match proc_create_tasks_remote( - &ctx, - instance.clone(), - &member_id, - &sheet_name, - created_task, - ) - .await - { - Ok(r) => match r { - CreateTaskResult::Success(relative_pathes) => relative_pathes, - _ => { - return Ok(TrackFileActionResult::CreateTaskFailed(r)); - } - }, - Err(e) => return Err(e), - }; - } - - // Process update tasks - let mut success_update = Vec::<PathBuf>::new(); - if can_modify_sheet { - success_update = match proc_update_tasks_remote( - &ctx, - instance.clone(), - &member_id, - &sheet_name, - update_task, - arguments.file_update_info, - ) - .await - { - Ok(r) => match r { - UpdateTaskResult::Success(relative_pathes) => relative_pathes, - _ => { - return Ok(TrackFileActionResult::UpdateTaskFailed(r)); - } - }, - Err(e) => return Err(e), - }; - } - - // Process sync tasks - let success_sync = match proc_sync_tasks_remote( - &ctx, - instance.clone(), - &member_id, - &sheet_name, - sync_task, - ) - .await - { - Ok(r) => match r { - SyncTaskResult::Success(relative_pathes) => relative_pathes, - }, - Err(e) => return Err(e), - }; - - return Ok(TrackFileActionResult::Done { - created: success_create, - updated: success_update, - synced: success_sync, - skipped: Vec::new(), // The server doesn't know which files were skipped - }); - } - - Err(TcpTargetError::NoResult("No result.".to_string())) -} - -async fn proc_create_tasks_local( - ctx: &ActionContext, - instance: Arc<Mutex<ConnectionInstance>>, - member_id: &MemberId, - sheet_name: &SheetName, - relative_paths: Vec<PathBuf>, - print_infos: bool, -) -> Result<CreateTaskResult, TcpTargetError> { - let workspace = try_get_local_workspace(ctx)?; - let local_output = try_get_local_output(ctx)?; - let mut mut_instance = instance.lock().await; - let mut local_sheet = workspace.local_sheet(member_id, sheet_name).await?; - - if print_infos && relative_paths.len() > 0 { - local_println!(local_output, "Creating {} files...", relative_paths.len()); - } - - // Wait for remote detection of whether the sheet exists - let has_sheet = mut_instance.read_msgpack::<bool>().await?; - if !has_sheet { - return Ok(CreateTaskResult::SheetNotFound(sheet_name.clone())); - } - - // Wait for remote detection of whether the file exists - let (hasnt_duplicate, duplicate_path) = mut_instance.read_msgpack::<(bool, PathBuf)>().await?; - if !hasnt_duplicate { - return Ok(CreateTaskResult::CreateFileOnExistPath(duplicate_path)); - } - - let mut success_relative_pathes = Vec::new(); - - // Start sending files - for path in relative_paths { - let full_path = workspace.local_path().join(&path); - - // Send file - if mut_instance.write_file(&full_path).await.is_err() { - continue; - } - - // Read virtual file id and version - let (vfid, version, version_desc) = mut_instance - .read_msgpack::<( - VirtualFileId, - VirtualFileVersion, - VirtualFileVersionDescription, - )>() - .await?; - - // Add mapping to local sheet - let hash = sha1_hash::calc_sha1(&full_path, 2048).await.unwrap().hash; - let time = std::fs::metadata(&full_path)?.modified()?; - local_sheet.add_mapping( - &path.clone(), - LocalMappingMetadata::new( - hash, // hash_when_updated - time, // time_when_updated - std::fs::metadata(&full_path)?.len(), // size_when_updated - version_desc, // version_desc_when_updated - version, // version_when_updated - vfid, // mapping_vfid - time, // last_modifiy_check_itme - false, // last_modifiy_check_result - ), - )?; - - // Print success info - if print_infos { - local_println!(local_output, "+ {}", path.display()); - } - - success_relative_pathes.push(path); - } - - // Write local sheet - local_sheet.write().await?; - - Ok(CreateTaskResult::Success(success_relative_pathes)) -} - -async fn proc_create_tasks_remote( - ctx: &ActionContext, - instance: Arc<Mutex<ConnectionInstance>>, - member_id: &MemberId, - sheet_name: &SheetName, - relative_paths: Vec<PathBuf>, -) -> Result<CreateTaskResult, TcpTargetError> { - let vault = try_get_vault(ctx)?; - let mut mut_instance = instance.lock().await; - - // Sheet check - let Ok(mut sheet) = vault.sheet(sheet_name).await else { - // Sheet not found - mut_instance.write_msgpack(false).await?; - return Ok(CreateTaskResult::SheetNotFound(sheet_name.to_string())); - }; - mut_instance.write_msgpack(true).await?; - - // Duplicate create precheck - for path in relative_paths.iter() { - if sheet.mapping().contains_key(path) { - // Duplicate file - mut_instance.write_msgpack((false, path)).await?; - return Ok(CreateTaskResult::CreateFileOnExistPath(path.clone())); - } - } - mut_instance.write_msgpack((true, PathBuf::new())).await?; - - let mut success_relative_pathes = Vec::new(); - - // Start receiving files - for path in relative_paths { - // Read file and create virtual file - let Ok(vfid) = vault - .create_virtual_file_from_connection(&mut mut_instance, member_id) - .await - else { - continue; - }; - - // Record virtual file to sheet - let vf_meta = vault.virtual_file(&vfid)?.read_meta().await?; - sheet - .add_mapping(path.clone(), vfid.clone(), vf_meta.version_latest()) - .await?; - - // Tell client the virtual file id and version - mut_instance - .write_msgpack(( - vfid, - vf_meta.version_latest(), - vf_meta - .version_description(vf_meta.version_latest()) - .unwrap(), - )) - .await?; - - success_relative_pathes.push(path); - } - - sheet.persist().await?; - - Ok(CreateTaskResult::Success(success_relative_pathes)) -} - -async fn proc_update_tasks_local( - ctx: &ActionContext, - instance: Arc<Mutex<ConnectionInstance>>, - member_id: &MemberId, - sheet_name: &SheetName, - relative_paths: Vec<PathBuf>, - print_infos: bool, - file_update_info: HashMap<PathBuf, (NextVersion, UpdateDescription)>, -) -> Result<UpdateTaskResult, TcpTargetError> { - let workspace = try_get_local_workspace(ctx)?; - let local_output = try_get_local_output(ctx)?; - let mut mut_instance = instance.lock().await; - let mut local_sheet = workspace.local_sheet(member_id, sheet_name).await?; - - let mut success = Vec::new(); - - if print_infos && relative_paths.len() > 0 { - local_println!(local_output, "Updating {} files...", relative_paths.len()); - } - - for path in relative_paths.iter() { - let Ok(mapping) = local_sheet.mapping_data(path) else { - // Is mapping not found, write empty - mut_instance.write_msgpack("".to_string()).await?; - continue; - }; - // Read and send file version - let Ok(_) = mut_instance - .write_msgpack(mapping.version_when_updated()) - .await - else { - continue; - }; - - // Read verify result - let verify_result: bool = mut_instance.read_msgpack().await?; - if !verify_result { - let reason = mut_instance.read_msgpack::<VerifyFailReason>().await?; - return Ok(UpdateTaskResult::VerifyFailed { - path: path.clone(), - reason: reason.clone(), - }); - } - - // Calc hash - let hash_result = match sha1_hash::calc_sha1(workspace.local_path().join(path), 2048).await - { - Ok(r) => r, - Err(_) => { - mut_instance.write_msgpack(false).await?; // Not Ready - continue; - } - }; - - // Get next version - let Some((next_version, description)) = file_update_info.get(path) else { - mut_instance.write_msgpack(false).await?; // Not Ready - continue; - }; - - // Write - mut_instance.write_msgpack(true).await?; // Ready - mut_instance.write_file(path).await?; - - // Read upload result - let upload_result: bool = mut_instance.read_msgpack().await?; - if upload_result { - // Success - let mapping_data_mut = local_sheet.mapping_data_mut(path).unwrap(); - let version = mapping_data_mut.version_when_updated().clone(); - mapping_data_mut.set_hash_when_updated(hash_result.hash); - mapping_data_mut.set_version_when_updated(next_version.clone()); - mapping_data_mut.set_version_desc_when_updated(VirtualFileVersionDescription { - creator: member_id.clone(), - description: description.clone(), - }); - mapping_data_mut.set_last_modifiy_check_result(false); // Mark file not modified - - // Write - local_sheet.write().await?; - - // Push path into success vec - success.push(path.clone()); - - // Print success info - if print_infos { - local_println!( - local_output, - "↑ {} ({} -> {})", - path.display(), - version, - next_version - ); - } - } - } - - Ok(UpdateTaskResult::Success(success)) -} - -async fn proc_update_tasks_remote( - ctx: &ActionContext, - instance: Arc<Mutex<ConnectionInstance>>, - member_id: &MemberId, - sheet_name: &SheetName, - relative_paths: Vec<PathBuf>, - file_update_info: HashMap<PathBuf, (NextVersion, UpdateDescription)>, -) -> Result<UpdateTaskResult, TcpTargetError> { - let vault = try_get_vault(ctx)?; - let mut mut_instance = instance.lock().await; - - let mut success = Vec::new(); - - for path in relative_paths.iter() { - // Read version - let Ok(version) = mut_instance.read_msgpack::<VirtualFileVersion>().await else { - continue; - }; - if version.is_empty() { - continue; - } - - // Verify - let Some((next_version, description)) = file_update_info.get(path) else { - mut_instance.write_msgpack(false).await?; - let reason = VerifyFailReason::UpdateButNoDescription; - mut_instance.write_msgpack(reason.clone()).await?; - return Ok(UpdateTaskResult::VerifyFailed { - path: path.clone(), - reason, - }); // Sheet not found - }; - let Ok(mut sheet) = vault.sheet(sheet_name).await else { - mut_instance.write_msgpack(false).await?; - let reason = VerifyFailReason::SheetNotFound(sheet_name.clone()); - mut_instance.write_msgpack(reason.clone()).await?; - return Ok(UpdateTaskResult::VerifyFailed { - path: path.clone(), - reason, - }); // Sheet not found - }; - let Some(mapping_data) = sheet.mapping_mut().get_mut(path) else { - mut_instance.write_msgpack(false).await?; - let reason = VerifyFailReason::MappingNotFound; - mut_instance.write_msgpack(reason.clone()).await?; - return Ok(UpdateTaskResult::VerifyFailed { - path: path.clone(), - reason, - }); // Mapping not found - }; - let Ok(vf) = vault.virtual_file(&mapping_data.id) else { - mut_instance.write_msgpack(false).await?; - let reason = VerifyFailReason::VirtualFileNotFound(mapping_data.id.clone()); - mut_instance.write_msgpack(reason.clone()).await?; - return Ok(UpdateTaskResult::VerifyFailed { - path: path.clone(), - reason, - }); // Virtual file not found - }; - let Ok(vf_metadata) = vf.read_meta().await else { - mut_instance.write_msgpack(false).await?; - let reason = VerifyFailReason::VirtualFileReadFailed(mapping_data.id.clone()); - mut_instance.write_msgpack(reason.clone()).await?; - return Ok(UpdateTaskResult::VerifyFailed { - path: path.clone(), - reason, - }); // Read virtual file metadata failed - }; - if vf_metadata.versions().contains(next_version) { - mut_instance.write_msgpack(false).await?; - let reason = VerifyFailReason::VersionAlreadyExist(version); - mut_instance.write_msgpack(reason.clone()).await?; - return Ok(UpdateTaskResult::VerifyFailed { - path: path.clone(), - reason, - }); // VersionAlreadyExist - } - if vf_metadata.hold_member() != member_id { - mut_instance.write_msgpack(false).await?; - let reason = VerifyFailReason::NotHeld; - mut_instance.write_msgpack(reason.clone()).await?; - return Ok(UpdateTaskResult::VerifyFailed { - path: path.clone(), - reason, - }); // Member not held it - }; - if vf_metadata.version_latest() != version { - mut_instance.write_msgpack(false).await?; - let reason = - VerifyFailReason::VersionDismatch(version.clone(), vf_metadata.version_latest()); - mut_instance.write_msgpack(reason.clone()).await?; - return Ok(UpdateTaskResult::VerifyFailed { - path: path.clone(), - reason, - }); // Version does not match - }; - mut_instance.write_msgpack(true).await?; // Verified - - // Read if local ready - let ready: bool = mut_instance.read_msgpack().await?; - if !ready { - continue; - } - - // Read and update virtual file - match vault - .update_virtual_file_from_connection( - &mut mut_instance, - member_id, - &mapping_data.id, - next_version, - VirtualFileVersionDescription { - creator: member_id.clone(), - description: description.clone(), - }, - ) - .await - { - Ok(_) => { - // Update version to sheet - mapping_data.version = next_version.clone(); - - // Persist - sheet.persist().await?; - - success.push(path.clone()); - mut_instance.write_msgpack(true).await?; // Success - } - Err(e) => { - mut_instance.write_msgpack(false).await?; // Fail - return Err(e.into()); - } - } - } - - Ok(UpdateTaskResult::Success(success)) -} - -type SyncVersionInfo = Option<( - VirtualFileVersion, - VirtualFileVersionDescription, - VirtualFileId, -)>; - -async fn proc_sync_tasks_local( - ctx: &ActionContext, - instance: Arc<Mutex<ConnectionInstance>>, - member_id: &MemberId, - sheet_name: &SheetName, - relative_paths: Vec<PathBuf>, - print_infos: bool, -) -> Result<SyncTaskResult, TcpTargetError> { - let workspace = try_get_local_workspace(ctx)?; - let local_output = try_get_local_output(ctx)?; - let mut mut_instance = instance.lock().await; - let mut success: Vec<PathBuf> = Vec::new(); - - if print_infos && relative_paths.len() > 0 { - local_println!(local_output, "Syncing {} files...", relative_paths.len()); - } - - for path in relative_paths { - let Some((version, description, vfid)) = - mut_instance.read_msgpack::<SyncVersionInfo>().await? - else { - continue; - }; - - // Generate a temp path - let temp_path = workspace - .local_path() - .join(CLIENT_FILE_TEMP_FILE.replace(KEY_TEMP_NAME, &VaultUuid::new_v4().to_string())); - - let copy_to = workspace.local_path().join(&path); - - // Read file - match mut_instance.read_file(&temp_path).await { - Ok(_) => { - if !temp_path.exists() { - continue; - } - } - Err(_) => { - continue; - } - } - - // Calc hash - let new_hash = match calc_sha1(&temp_path, 2048).await { - Ok(hash) => hash, - Err(_) => { - continue; - } - }; - - // Calc size - let new_size = match fs::metadata(&temp_path).await.map(|meta| meta.len()) { - Ok(size) => size, - Err(_) => { - continue; - } - }; - - // Write file - if copy_to.exists() { - if let Err(_) = fs::remove_file(©_to).await { - continue; - } - } else { - // Not exist, create directory - if let Some(path) = copy_to.clone().parent() { - fs::create_dir_all(path).await?; - } - } - if let Err(_) = fs::rename(&temp_path, ©_to).await { - continue; - } - - // Modify local sheet - let mut local_sheet = match workspace.local_sheet(member_id, sheet_name).await { - Ok(sheet) => sheet, - Err(_) => { - continue; - } - }; - - // Get or create mapping - let mapping = match local_sheet.mapping_data_mut(&path) { - Ok(m) => m, - Err(_) => { - // First download - let mut data = LocalMappingMetadata::default(); - data.set_mapping_vfid(vfid); - if let Err(_) = local_sheet.add_mapping(&path, data) { - continue; - } - match local_sheet.mapping_data_mut(&path) { - Ok(m) => m, - Err(_) => { - continue; - } - } - } - }; - - let time = SystemTime::now(); - mapping.set_hash_when_updated(new_hash.hash); - mapping.set_last_modifiy_check_result(false); // Mark not modified - mapping.set_version_when_updated(version); - mapping.set_version_desc_when_updated(description); - mapping.set_size_when_updated(new_size); - mapping.set_time_when_updated(time); - mapping.set_last_modifiy_check_time(time); - if let Err(_) = local_sheet.write().await { - continue; - } - - success.push(path.clone()); - - // Print success info - if print_infos { - local_println!(local_output, "↓ {}", path.display()); - } - } - Ok(SyncTaskResult::Success(success)) -} - -async fn proc_sync_tasks_remote( - ctx: &ActionContext, - instance: Arc<Mutex<ConnectionInstance>>, - _member_id: &MemberId, - sheet_name: &SheetName, - relative_paths: Vec<PathBuf>, -) -> Result<SyncTaskResult, TcpTargetError> { - let vault = try_get_vault(ctx)?; - let sheet = vault.sheet(sheet_name).await?; - let mut mut_instance = instance.lock().await; - let mut success: Vec<PathBuf> = Vec::new(); - - for path in relative_paths { - // Get mapping - let Some(mapping) = sheet.mapping().get(&path) else { - mut_instance.write_msgpack::<SyncVersionInfo>(None).await?; // (ready) - continue; - }; - // Get virtual file - let Ok(vf) = vault.virtual_file(&mapping.id) else { - mut_instance.write_msgpack::<SyncVersionInfo>(None).await?; // (ready) - continue; - }; - // Read metadata and get real path - let vf_meta = &vf.read_meta().await?; - let real_path = vault.virtual_file_real_path(&mapping.id, &vf_meta.version_latest()); - let version = vf_meta.version_latest(); - mut_instance - .write_msgpack::<SyncVersionInfo>(Some(( - version.clone(), - vf_meta.version_description(version).cloned().unwrap_or( - VirtualFileVersionDescription { - creator: MemberId::default(), - description: "".to_string(), - }, - ), - vf.id(), - ))) - .await?; // (ready) - if mut_instance.write_file(real_path).await.is_err() { - continue; - } else { - success.push(path); - } - } - Ok(SyncTaskResult::Success(success)) -} diff --git a/actions/src/remote_actions/edit_right_manage.rs b/actions/src/remote_actions/edit_right_manage.rs deleted file mode 100644 index 52305f5..0000000 --- a/actions/src/remote_actions/edit_right_manage.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod change_virtual_file_edit_right; diff --git a/actions/src/remote_actions/edit_right_manage/change_virtual_file_edit_right.rs b/actions/src/remote_actions/edit_right_manage/change_virtual_file_edit_right.rs deleted file mode 100644 index 1045bce..0000000 --- a/actions/src/remote_actions/edit_right_manage/change_virtual_file_edit_right.rs +++ /dev/null @@ -1,144 +0,0 @@ -use std::path::PathBuf; - -use action_system::{action::ActionContext, macros::action_gen}; -use serde::{Deserialize, Serialize}; -use tcp_connection::error::TcpTargetError; -use vcs_data::data::local::modified_status::sign_vault_modified; - -use crate::remote_actions::{ - auth_member, check_connection_instance, get_current_sheet_name, try_get_vault, -}; - -#[derive(Serialize, Deserialize)] -pub enum ChangeVirtualFileEditRightResult { - // Success - Success { - success_hold: Vec<PathBuf>, - success_throw: Vec<PathBuf>, - }, - - // Fail - AuthorizeFailed(String), - DoNothing, -} - -#[derive(Serialize, Deserialize, PartialEq, Clone)] -pub enum EditRightChangeBehaviour { - Hold, - Throw, -} - -/// The server part only checks: -/// 1. Whether the file exists -/// 2. Whether the file has no holder -/// If both conditions are met, send success information to the local client -/// -/// All version checks are handled locally -#[action_gen] -pub async fn change_virtual_file_edit_right_action( - ctx: ActionContext, - arguments: (Vec<(PathBuf, EditRightChangeBehaviour)>, bool), -) -> Result<ChangeVirtualFileEditRightResult, TcpTargetError> { - let instance = check_connection_instance(&ctx)?; - let (relative_paths, print_info) = arguments; - - // Auth Member - let (member_id, is_host_mode) = match auth_member(&ctx, instance).await { - Ok(id) => id, - Err(e) => { - return Ok(ChangeVirtualFileEditRightResult::AuthorizeFailed( - e.to_string(), - )); - } - }; - - // Check sheet - let (sheet_name, _is_ref_sheet) = - get_current_sheet_name(&ctx, instance, &member_id, true).await?; - - if ctx.is_proc_on_remote() { - let mut mut_instance = instance.lock().await; - let mut success_hold: Vec<PathBuf> = Vec::new(); - let mut success_throw: Vec<PathBuf> = Vec::new(); - let vault = try_get_vault(&ctx)?; - for (path, behaviour) in relative_paths { - let Ok(sheet) = vault.sheet(&sheet_name).await else { - continue; - }; - let Some(mapping) = sheet.mapping().get(&path) else { - continue; - }; - let Ok(has_edit_right) = vault - .has_virtual_file_edit_right(&member_id, &mapping.id) - .await - else { - continue; - }; - - // Hold file - if !has_edit_right && behaviour == EditRightChangeBehaviour::Hold { - match vault - .grant_virtual_file_edit_right(&member_id, &mapping.id) - .await - { - Ok(_) => { - success_hold.push(path.clone()); - } - Err(_) => continue, - } - } else - // Throw file - if (has_edit_right || is_host_mode) - && behaviour == EditRightChangeBehaviour::Throw - { - match vault.revoke_virtual_file_edit_right(&mapping.id).await { - Ok(_) => { - success_throw.push(path.clone()); - } - Err(_) => continue, - } - } - } - - // Write success list - mut_instance - .write_large_msgpack::<(Vec<PathBuf>, Vec<PathBuf>)>( - (success_hold.clone(), success_throw.clone()), - 4096u16, - ) - .await?; - return Ok(ChangeVirtualFileEditRightResult::Success { - success_hold, - success_throw, - }); - } - - if ctx.is_proc_on_local() { - let mut mut_instance = instance.lock().await; - let (success_hold, success_throw) = mut_instance - .read_large_msgpack::<(Vec<PathBuf>, Vec<PathBuf>)>(4096u16) - .await?; - - // If there are any successful items, mark as modified - if success_hold.len() + success_throw.len() > 0 { - sign_vault_modified(true).await; - } - - // Print info - if print_info { - success_hold - .iter() - .for_each(|s| println!("--> {}", s.display())); - success_throw - .iter() - .for_each(|s| println!("<-- {}", s.display())); - } - - return Ok(ChangeVirtualFileEditRightResult::Success { - success_hold, - success_throw, - }); - } - - Ok(ChangeVirtualFileEditRightResult::DoNothing) -} diff --git a/actions/src/remote_actions/mapping_manage.rs b/actions/src/remote_actions/mapping_manage.rs deleted file mode 100644 index 624b4ab..0000000 --- a/actions/src/remote_actions/mapping_manage.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod edit_mapping; -pub mod merge_share_mapping; -pub mod share_mapping; diff --git a/actions/src/remote_actions/mapping_manage/edit_mapping.rs b/actions/src/remote_actions/mapping_manage/edit_mapping.rs deleted file mode 100644 index 3c39c5d..0000000 --- a/actions/src/remote_actions/mapping_manage/edit_mapping.rs +++ /dev/null @@ -1,157 +0,0 @@ -use std::collections::HashMap; - -use action_system::{action::ActionContext, macros::action_gen}; -use serde::{Deserialize, Serialize}; -use tcp_connection::error::TcpTargetError; -use vcs_data::data::local::{ - modified_status::sign_vault_modified, - workspace_analyzer::{FromRelativePathBuf, ToRelativePathBuf}, -}; - -use crate::{ - remote_actions::{ - auth_member, check_connection_instance, get_current_sheet_name, try_get_vault, - }, - write_and_return, -}; - -pub type OperationArgument = (EditMappingOperations, Option<ToRelativePathBuf>); - -#[derive(Serialize, Deserialize, PartialEq, Eq, Clone)] -pub enum EditMappingOperations { - Move, - Erase, -} - -#[derive(Serialize, Deserialize, Default)] -pub enum EditMappingActionResult { - Success, - - // Fail - AuthorizeFailed(String), - EditNotAllowed, - MappingNotFound(FromRelativePathBuf), - InvalidMove(InvalidMoveReason), - - #[default] - Unknown, -} - -#[derive(Serialize, Deserialize)] -pub enum InvalidMoveReason { - MoveOperationButNoTarget(FromRelativePathBuf), - ContainsDuplicateMapping(ToRelativePathBuf), -} - -#[derive(Serialize, Deserialize, Clone)] -pub struct EditMappingActionArguments { - pub operations: HashMap<FromRelativePathBuf, OperationArgument>, -} - -/// This Action only modifies Sheet Mapping and -/// does not interfere with the actual location of local files or Local Mapping -#[action_gen] -pub async fn edit_mapping_action( - ctx: ActionContext, - args: EditMappingActionArguments, -) -> Result<EditMappingActionResult, TcpTargetError> { - let instance = check_connection_instance(&ctx)?; - - // Auth Member - let (member_id, is_host_mode) = match auth_member(&ctx, instance).await { - Ok(id) => id, - Err(e) => { - return Ok(EditMappingActionResult::AuthorizeFailed(e.to_string())); - } - }; - - // Check sheet - let (sheet_name, is_ref_sheet) = - get_current_sheet_name(&ctx, instance, &member_id, true).await?; - - // Can modify Sheet when not in reference sheet or in Host mode - let can_modify_sheet = !is_ref_sheet || is_host_mode; - - if !can_modify_sheet { - return Ok(EditMappingActionResult::EditNotAllowed); - } - - if ctx.is_proc_on_remote() { - let vault = try_get_vault(&ctx)?; - let mut sheet = vault.sheet(&sheet_name).await?; - - // Precheck - for (from_path, (operation, to_path)) in args.operations.iter() { - // Check mapping exists - if !sheet.mapping().contains_key(from_path) { - write_and_return!( - instance, - EditMappingActionResult::MappingNotFound(from_path.clone()) - ); - } - - // Move check - if operation == &EditMappingOperations::Move { - // Check if target exists - if let Some(to_path) = to_path { - // Check if target is duplicate - if sheet.mapping().contains_key(to_path) { - write_and_return!( - instance, - EditMappingActionResult::InvalidMove( - InvalidMoveReason::ContainsDuplicateMapping(to_path.clone()) - ) - ); - } - } else { - write_and_return!( - instance, - EditMappingActionResult::InvalidMove( - InvalidMoveReason::MoveOperationButNoTarget(from_path.clone()) - ) - ); - } - } - } - - // Process - for (from_path, (operation, to_path)) in args.operations { - match operation { - // During the Precheck phase, it has been ensured that: - // 1. The mapping to be edited for the From path indeed exists - // 2. The location of the To path is indeed empty - // 3. In Move mode, To path can be safely unwrapped - // Therefore, the following unwrap() calls are safe to execute - EditMappingOperations::Move => { - let mapping = sheet.mapping_mut().remove(&from_path).unwrap(); - let to_path = to_path.unwrap(); - sheet - .add_mapping(to_path, mapping.id, mapping.version) - .await?; - } - EditMappingOperations::Erase => { - sheet.mapping_mut().remove(&from_path).unwrap(); - } - } - } - - // Write - sheet.persist().await?; - - write_and_return!(instance, EditMappingActionResult::Success); - } - - if ctx.is_proc_on_local() { - let result = instance - .lock() - .await - .read::<EditMappingActionResult>() - .await?; - if matches!(result, EditMappingActionResult::Success) { - sign_vault_modified(true).await; - } - return Ok(result); - } - - Ok(EditMappingActionResult::Success) -} diff --git a/actions/src/remote_actions/mapping_manage/merge_share_mapping.rs b/actions/src/remote_actions/mapping_manage/merge_share_mapping.rs deleted file mode 100644 index df889a1..0000000 --- a/actions/src/remote_actions/mapping_manage/merge_share_mapping.rs +++ /dev/null @@ -1,117 +0,0 @@ -use std::io::ErrorKind; - -use action_system::{action::ActionContext, macros::action_gen}; -use serde::{Deserialize, Serialize}; -use tcp_connection::error::TcpTargetError; -use vcs_data::data::{ - local::modified_status::sign_vault_modified, - vault::mapping_share::{ShareMergeMode, SheetShareId}, -}; - -use crate::{ - remote_actions::{ - auth_member, check_connection_instance, get_current_sheet_name, try_get_vault, - }, - write_and_return, -}; - -#[derive(Serialize, Deserialize, PartialEq, Eq, Clone)] -pub struct MergeShareMappingArguments { - pub share_id: SheetShareId, - pub share_merge_mode: ShareMergeMode, -} - -#[derive(Serialize, Deserialize, Default)] -pub enum MergeShareMappingActionResult { - Success, - - // Fail - HasConflicts, - AuthorizeFailed(String), - EditNotAllowed, - ShareIdNotFound(SheetShareId), - MergeFails(String), - - #[default] - Unknown, -} - -#[action_gen] -pub async fn merge_share_mapping_action( - ctx: ActionContext, - args: MergeShareMappingArguments, -) -> Result<MergeShareMappingActionResult, TcpTargetError> { - let instance = check_connection_instance(&ctx)?; - - // Auth Member - let (member_id, is_host_mode) = match auth_member(&ctx, instance).await { - Ok(id) => id, - Err(e) => { - return Ok(MergeShareMappingActionResult::AuthorizeFailed( - e.to_string(), - )); - } - }; - - // Check sheet - let (sheet_name, is_ref_sheet) = - get_current_sheet_name(&ctx, instance, &member_id, true).await?; - - // Can modify Sheet when not in reference sheet or in Host mode - let can_modify_sheet = !is_ref_sheet || is_host_mode; - - if !can_modify_sheet { - return Ok(MergeShareMappingActionResult::EditNotAllowed); - } - - if ctx.is_proc_on_remote() { - let vault = try_get_vault(&ctx)?; - let share_id = args.share_id; - - // Get the share and sheet - let (sheet, share) = if vault.share_file_path(&sheet_name, &share_id).exists() { - let sheet = vault.sheet(&sheet_name).await?; - let share = sheet.get_share(&share_id).await?; - (sheet, share) - } else { - // Share does not exist - write_and_return!( - instance, - MergeShareMappingActionResult::ShareIdNotFound(share_id.clone()) - ); - }; - - // Perform the merge - match sheet.merge_share(share, args.share_merge_mode).await { - Ok(_) => write_and_return!(instance, MergeShareMappingActionResult::Success), - Err(e) => match e.kind() { - ErrorKind::AlreadyExists => { - write_and_return!(instance, MergeShareMappingActionResult::HasConflicts); - } - _ => { - write_and_return!( - instance, - MergeShareMappingActionResult::MergeFails(e.to_string()) - ); - } - }, - } - } - - if ctx.is_proc_on_local() { - let result = instance - .lock() - .await - .read::<MergeShareMappingActionResult>() - .await?; - match result { - MergeShareMappingActionResult::Success => { - sign_vault_modified(true).await; - } - _ => {} - } - return Ok(result); - } - - Ok(MergeShareMappingActionResult::Success) -} diff --git a/actions/src/remote_actions/mapping_manage/share_mapping.rs b/actions/src/remote_actions/mapping_manage/share_mapping.rs deleted file mode 100644 index 5c77e53..0000000 --- a/actions/src/remote_actions/mapping_manage/share_mapping.rs +++ /dev/null @@ -1,135 +0,0 @@ -use action_system::{action::ActionContext, macros::action_gen}; -use serde::{Deserialize, Serialize}; -use tcp_connection::error::TcpTargetError; -use vcs_data::{ - constants::VAULT_HOST_NAME, - data::{local::workspace_analyzer::FromRelativePathBuf, sheet::SheetName}, -}; - -use crate::{ - remote_actions::{ - auth_member, check_connection_instance, get_current_sheet_name, try_get_vault, - }, - write_and_return, -}; - -#[derive(Serialize, Deserialize, PartialEq, Eq, Clone)] -pub struct ShareMappingArguments { - pub mappings: Vec<FromRelativePathBuf>, - pub description: String, - // None = current sheet, - // Some(sheet_name) = other ref(public) sheet - pub from_sheet: Option<SheetName>, - pub to_sheet: SheetName, -} - -#[derive(Serialize, Deserialize, Default)] -pub enum ShareMappingActionResult { - Success, - - // Fail - AuthorizeFailed(String), - TargetSheetNotFound(SheetName), - TargetIsSelf, - MappingNotFound(FromRelativePathBuf), - - #[default] - Unknown, -} - -#[action_gen] -pub async fn share_mapping_action( - ctx: ActionContext, - args: ShareMappingArguments, -) -> Result<ShareMappingActionResult, TcpTargetError> { - let instance = check_connection_instance(&ctx)?; - - // Auth Member - let (member_id, _is_host_mode) = match auth_member(&ctx, instance).await { - Ok(id) => id, - Err(e) => { - return Ok(ShareMappingActionResult::AuthorizeFailed(e.to_string())); - } - }; - - // Check sheet - let sheet_name = args.from_sheet.unwrap_or( - get_current_sheet_name(&ctx, instance, &member_id, true) - .await? - .0, - ); - - if ctx.is_proc_on_remote() { - let vault = try_get_vault(&ctx)?; - let sheet = vault.sheet(&sheet_name).await?; - - // Tip: Because sheet_name may specify a sheet that does not belong to the user, - // a secondary verification is required. - - // Check if the sheet holder is Some and matches the member_id or is the host - let Some(holder) = sheet.holder() else { - // If there's no holder, the sheet cannot be shared from - write_and_return!( - instance, - ShareMappingActionResult::AuthorizeFailed("Sheet has no holder".to_string()) - ); - }; - - // Verify the holder is either the current member or the host - if holder != &member_id && holder != VAULT_HOST_NAME { - write_and_return!( - instance, - ShareMappingActionResult::AuthorizeFailed( - "Not sheet holder or ref sheet".to_string() - ) - ); - } - - let to_sheet_name = args.to_sheet; - - // Verify target sheet exists - if !vault.sheet_names()?.contains(&to_sheet_name) { - // Does not exist - write_and_return!( - instance, - ShareMappingActionResult::TargetSheetNotFound(to_sheet_name.clone()) - ); - } - - // Verify sheet is not self - if sheet_name == to_sheet_name { - // Is self - write_and_return!(instance, ShareMappingActionResult::TargetIsSelf); - } - - // Verify all mappings are correct - for mapping in args.mappings.iter() { - if !sheet.mapping().contains_key(mapping) { - // If any mapping is invalid, indicate failure - write_and_return!( - instance, - ShareMappingActionResult::MappingNotFound(mapping.clone()) - ); - } - } - - // Execute sharing logic - sheet - .share_mappings(&to_sheet_name, args.mappings, &member_id, args.description) - .await?; - - // Sharing successful - write_and_return!(instance, ShareMappingActionResult::Success); - } - - if ctx.is_proc_on_local() { - let result = instance - .lock() - .await - .read::<ShareMappingActionResult>() - .await?; - return Ok(result); - } - - Ok(ShareMappingActionResult::Success) -} diff --git a/actions/src/remote_actions/sheet_manage.rs b/actions/src/remote_actions/sheet_manage.rs deleted file mode 100644 index a3a91a2..0000000 --- a/actions/src/remote_actions/sheet_manage.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod drop_sheet; -pub mod make_sheet; diff --git a/actions/src/remote_actions/sheet_manage/drop_sheet.rs b/actions/src/remote_actions/sheet_manage/drop_sheet.rs deleted file mode 100644 index e21f3dd..0000000 --- a/actions/src/remote_actions/sheet_manage/drop_sheet.rs +++ /dev/null @@ -1,123 +0,0 @@ -use std::io::ErrorKind; - -use action_system::{action::ActionContext, macros::action_gen}; -use serde::{Deserialize, Serialize}; -use tcp_connection::error::TcpTargetError; -use vcs_data::data::{local::modified_status::sign_vault_modified, sheet::SheetName}; - -use crate::{ - remote_actions::{ - auth_member, check_connection_instance, try_get_local_workspace, try_get_vault, - }, - write_and_return, -}; - -#[derive(Default, Serialize, Deserialize)] -pub enum DropSheetActionResult { - Success, - - // Fail - SheetInUse, - AuthorizeFailed(String), - SheetNotExists, - SheetDropFailed(String), - NoHolder, - NotOwner, - - #[default] - Unknown, -} - -#[action_gen] -pub async fn drop_sheet_action( - ctx: ActionContext, - sheet_name: SheetName, -) -> Result<DropSheetActionResult, TcpTargetError> { - let instance = check_connection_instance(&ctx)?; - - // Auth Member - let (member_id, is_host_mode) = match auth_member(&ctx, instance).await { - Ok(id) => id, - Err(e) => { - return Ok(DropSheetActionResult::AuthorizeFailed(e.to_string())); - } - }; - - // Check sheet in use on local - if ctx.is_proc_on_local() { - let local_workspace = try_get_local_workspace(&ctx)?; - if let Some(sheet) = local_workspace.config().lock().await.sheet_in_use() { - if sheet == &sheet_name { - instance.lock().await.write(false).await?; - return Ok(DropSheetActionResult::SheetInUse); - } - instance.lock().await.write(true).await?; - } else { - instance.lock().await.write(true).await?; - } - } - - if ctx.is_proc_on_remote() { - // Check if client sheet is in use - let sheet_in_use = instance.lock().await.read::<bool>().await?; - if !sheet_in_use { - return Ok(DropSheetActionResult::SheetInUse); - } - - let vault = try_get_vault(&ctx)?; - - // Check if the sheet exists - let mut sheet = match vault.sheet(&sheet_name).await { - Ok(sheet) => sheet, - Err(e) => { - if e.kind() == ErrorKind::NotFound { - write_and_return!(instance, DropSheetActionResult::SheetNotExists); - } else { - write_and_return!( - instance, - DropSheetActionResult::SheetDropFailed(e.to_string()) - ); - } - } - }; - - // Get the sheet's holder - let Some(holder) = sheet.holder() else { - write_and_return!(instance, DropSheetActionResult::NoHolder); - }; - - // Verify that the sheet holder is either the current user or the host - // All sheets belong to the host - if holder != &member_id && !is_host_mode { - write_and_return!(instance, DropSheetActionResult::NotOwner); - } - - // Drop the sheet - sheet.forget_holder(); - match sheet.persist().await { - Ok(_) => { - write_and_return!(instance, DropSheetActionResult::Success); - } - Err(e) => { - write_and_return!( - instance, - DropSheetActionResult::SheetDropFailed(e.to_string()) - ); - } - } - } - - if ctx.is_proc_on_local() { - let result = instance - .lock() - .await - .read::<DropSheetActionResult>() - .await?; - if matches!(result, DropSheetActionResult::Success) { - sign_vault_modified(true).await; - } - return Ok(result); - } - - Err(TcpTargetError::NoResult("No result.".to_string())) -} diff --git a/actions/src/remote_actions/sheet_manage/make_sheet.rs b/actions/src/remote_actions/sheet_manage/make_sheet.rs deleted file mode 100644 index a323413..0000000 --- a/actions/src/remote_actions/sheet_manage/make_sheet.rs +++ /dev/null @@ -1,98 +0,0 @@ -use action_system::{action::ActionContext, macros::action_gen}; -use serde::{Deserialize, Serialize}; -use tcp_connection::error::TcpTargetError; -use vcs_data::{ - constants::VAULT_HOST_NAME, - data::{local::modified_status::sign_vault_modified, sheet::SheetName}, -}; - -use crate::{ - remote_actions::{auth_member, check_connection_instance, try_get_vault}, - write_and_return, -}; - -#[derive(Default, Serialize, Deserialize)] -pub enum MakeSheetActionResult { - Success, - SuccessRestore, - - // Fail - AuthorizeFailed(String), - SheetAlreadyExists, - SheetCreationFailed(String), - - #[default] - Unknown, -} - -/// Build a sheet with context -#[action_gen] -pub async fn make_sheet_action( - ctx: ActionContext, - sheet_name: SheetName, -) -> Result<MakeSheetActionResult, TcpTargetError> { - let instance = check_connection_instance(&ctx)?; - - // Auth Member - let (member_id, is_host_mode) = match auth_member(&ctx, instance).await { - Ok(id) => id, - Err(e) => return Ok(MakeSheetActionResult::AuthorizeFailed(e.to_string())), - }; - - if ctx.is_proc_on_remote() { - let vault = try_get_vault(&ctx)?; - let holder = if is_host_mode { - VAULT_HOST_NAME.to_string() - } else { - member_id - }; - - // Check if the sheet already exists - if let Ok(mut sheet) = vault.sheet(&sheet_name).await { - // If the sheet has no holder, assign it to the current member (restore operation) - if sheet.holder().is_none() { - sheet.set_holder(holder.clone()); - match sheet.persist().await { - Ok(_) => { - write_and_return!(instance, MakeSheetActionResult::SuccessRestore); - } - Err(e) => { - write_and_return!( - instance, - MakeSheetActionResult::SheetCreationFailed(e.to_string()) - ); - } - } - } else { - write_and_return!(instance, MakeSheetActionResult::SheetAlreadyExists); - } - } else { - // Create the sheet - match vault.create_sheet(&sheet_name, &holder).await { - Ok(_) => { - write_and_return!(instance, MakeSheetActionResult::Success); - } - Err(e) => { - write_and_return!( - instance, - MakeSheetActionResult::SheetCreationFailed(e.to_string()) - ); - } - } - } - } - - if ctx.is_proc_on_local() { - let result = instance - .lock() - .await - .read::<MakeSheetActionResult>() - .await?; - if matches!(result, MakeSheetActionResult::Success) { - sign_vault_modified(true).await; - } - return Ok(result); - } - - Err(TcpTargetError::NoResult("No result.".to_string())) -} diff --git a/actions/src/remote_actions/workspace_manage.rs b/actions/src/remote_actions/workspace_manage.rs deleted file mode 100644 index 15f70e8..0000000 --- a/actions/src/remote_actions/workspace_manage.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod set_upstream_vault; -pub mod update_to_latest_info; diff --git a/actions/src/remote_actions/workspace_manage/set_upstream_vault.rs b/actions/src/remote_actions/workspace_manage/set_upstream_vault.rs deleted file mode 100644 index ba45214..0000000 --- a/actions/src/remote_actions/workspace_manage/set_upstream_vault.rs +++ /dev/null @@ -1,101 +0,0 @@ -use std::net::SocketAddr; - -use action_system::{action::ActionContext, macros::action_gen}; -use cfg_file::config::ConfigFile; -use log::info; -use serde::{Deserialize, Serialize}; -use tcp_connection::error::TcpTargetError; -use vcs_data::data::{local::workspace_config::LocalConfig, vault::vault_config::VaultUuid}; - -use crate::remote_actions::{ - auth_member, check_connection_instance, try_get_local_workspace, try_get_vault, -}; - -#[derive(Serialize, Deserialize)] -pub enum SetUpstreamVaultActionResult { - // Success - DirectedAndStained, - Redirected, - - // Fail - AlreadyStained, - AuthorizeFailed(String), - RedirectFailed(String), - SameUpstream, - - Done, -} - -#[action_gen] -pub async fn set_upstream_vault_action( - ctx: ActionContext, - upstream: SocketAddr, -) -> Result<SetUpstreamVaultActionResult, TcpTargetError> { - let instance = check_connection_instance(&ctx)?; - - // Auth Member - if let Err(e) = auth_member(&ctx, instance).await { - return Ok(SetUpstreamVaultActionResult::AuthorizeFailed(e.to_string())); - } - - // Direct - if ctx.is_proc_on_remote() { - let vault = try_get_vault(&ctx)?; - instance - .lock() - .await - .write(*vault.config().vault_uuid()) - .await?; - return Ok(SetUpstreamVaultActionResult::Done); - } - - if ctx.is_proc_on_local() { - info!("Authorize successful. directing to upstream vault."); - - // Read the vault UUID from the instance - let vault_uuid = instance.lock().await.read::<VaultUuid>().await?; - - let local_workspace = try_get_local_workspace(&ctx)?; - let local_config = local_workspace.config(); - - let mut mut_local_config = local_config.lock().await; - if !mut_local_config.stained() { - // Stain the local workspace - mut_local_config.stain(vault_uuid); - - // Set the upstream address - mut_local_config.set_vault_addr(upstream); - - // Store the updated config - LocalConfig::write(&mut_local_config).await?; - - info!("Workspace stained!"); - return Ok(SetUpstreamVaultActionResult::DirectedAndStained); - } else { - // Local workspace is already stained, redirecting - let Some(stained_uuid) = mut_local_config.stained_uuid() else { - return Ok(SetUpstreamVaultActionResult::RedirectFailed( - "Stained uuid not found".to_string(), - )); - }; - let local_upstream = mut_local_config.upstream_addr(); - - // Address changed, but same UUID. - if vault_uuid == stained_uuid { - if local_upstream != upstream { - // Set the upstream address - mut_local_config.set_vault_addr(upstream); - - // Store the updated config - LocalConfig::write(&mut_local_config).await?; - return Ok(SetUpstreamVaultActionResult::Redirected); - } else { - return Ok(SetUpstreamVaultActionResult::SameUpstream); - } - } - return Ok(SetUpstreamVaultActionResult::AlreadyStained); - } - } - - Err(TcpTargetError::NoResult("No result.".to_string())) -} diff --git a/actions/src/remote_actions/workspace_manage/update_to_latest_info.rs b/actions/src/remote_actions/workspace_manage/update_to_latest_info.rs deleted file mode 100644 index cd17c32..0000000 --- a/actions/src/remote_actions/workspace_manage/update_to_latest_info.rs +++ /dev/null @@ -1,433 +0,0 @@ -use std::{ - collections::{HashMap, HashSet}, - io::ErrorKind, - path::PathBuf, - time::SystemTime, -}; - -use action_system::{action::ActionContext, macros::action_gen}; -use cfg_file::config::ConfigFile; -use log::info; -use serde::{Deserialize, Serialize}; -use tcp_connection::error::TcpTargetError; -use vcs_data::{ - constants::{ - CLIENT_PATH_CACHED_SHEET, CLIENT_PATH_LOCAL_SHEET, REF_SHEET_NAME, - SERVER_SUFFIX_SHEET_SHARE_FILE, VAULT_HOST_NAME, - }, - data::{ - local::{ - cached_sheet::CachedSheet, - latest_file_data::LatestFileData, - latest_info::{LatestInfo, SheetInfo}, - modified_status::sign_vault_modified, - }, - member::MemberId, - sheet::{SheetData, SheetName, SheetPathBuf}, - vault::{ - mapping_share::{Share, SheetShareId}, - virtual_file::{VirtualFileId, VirtualFileVersion, VirtualFileVersionDescription}, - }, - }, -}; - -use crate::remote_actions::{ - auth_member, check_connection_instance, try_get_local_workspace, try_get_vault, -}; - -#[derive(Serialize, Deserialize)] -pub enum UpdateToLatestInfoResult { - Success, - - // Fail - AuthorizeFailed(String), - SyncCachedSheetFail(SyncCachedSheetFailReason), -} - -#[derive(Serialize, Deserialize)] -pub enum SyncCachedSheetFailReason { - PathAlreadyExist(PathBuf), -} - -#[action_gen] -pub async fn update_to_latest_info_action( - ctx: ActionContext, - _unused: (), -) -> Result<UpdateToLatestInfoResult, TcpTargetError> { - let instance = check_connection_instance(&ctx)?; - - let (member_id, _is_host_mode) = match auth_member(&ctx, instance).await { - Ok(id) => id, - Err(e) => return Ok(UpdateToLatestInfoResult::AuthorizeFailed(e.to_string())), - }; - - info!("Sending latest info to {}", member_id); - - // Sync Latest Info - { - if ctx.is_proc_on_remote() { - let vault = try_get_vault(&ctx)?; - - // Build latest info - let mut latest_info = LatestInfo::default(); - - // Sheet & Share - let mut shares_in_my_sheets: HashMap<SheetName, HashMap<SheetShareId, Share>> = - HashMap::new(); - let mut member_owned = Vec::new(); - let mut member_visible = Vec::new(); - let mut ref_sheets = HashSet::new(); - - for sheet in vault.sheets().await? { - // Build share parts - if let Some(holder) = sheet.holder() { - if holder == &member_id || holder == VAULT_HOST_NAME { - let mut sheet_shares: HashMap<SheetShareId, Share> = HashMap::new(); - for share in sheet.get_shares().await? { - // Get SharePath - let Some(share_path) = share.path.clone() else { - continue; - }; - // Get ShareId from SharePath - let Some(share_id) = share_path.file_name() else { - continue; - }; - let share_id = share_id.display().to_string(); - let share_id_trimed = - share_id.trim_end_matches(SERVER_SUFFIX_SHEET_SHARE_FILE); - sheet_shares.insert(share_id_trimed.to_string(), share); - } - shares_in_my_sheets.insert(sheet.name().clone(), sheet_shares); - } - } - - // Build sheet parts - let holder_is_host = - sheet.holder().unwrap_or(&String::default()) == &VAULT_HOST_NAME; - if sheet.holder().is_some() - && (sheet.holder().unwrap() == &member_id || holder_is_host) - { - member_owned.push(sheet.name().clone()); - if holder_is_host { - ref_sheets.insert(sheet.name().clone()); - } - } else { - member_visible.push(SheetInfo { - sheet_name: sheet.name().clone(), - holder_name: sheet.holder().cloned(), - }); - } - } - - // Record Share & Sheet - latest_info.visible_sheets = member_owned; - latest_info.invisible_sheets = member_visible; - latest_info.shares_in_my_sheets = shares_in_my_sheets; - - // RefSheet - let ref_sheet_data = vault.sheet(&REF_SHEET_NAME.to_string()).await?.to_data(); - latest_info.ref_sheet_content = ref_sheet_data.clone(); - latest_info.ref_sheet_vfs_mapping = ref_sheet_data - .mapping() - .into_iter() - .map(|(path, file)| (file.id.clone(), path.clone())) - .collect::<HashMap<VirtualFileId, SheetPathBuf>>(); - latest_info.reference_sheets = ref_sheets; - - // Members - let members = vault.members().await?; - latest_info.vault_members = members; - - // Send - instance - .lock() - .await - .write_large_msgpack(latest_info, 512_u16) - .await?; - } - - if ctx.is_proc_on_local() { - let workspace = try_get_local_workspace(&ctx)?; - let mut latest_info = instance - .lock() - .await - .read_large_msgpack::<LatestInfo>(512_u16) - .await?; - latest_info.update_instant = Some(SystemTime::now()); - LatestInfo::write_to( - &latest_info, - LatestInfo::latest_info_path(workspace.local_path(), &member_id), - ) - .await?; - } - } - - info!("Update sheets to {}", member_id); - - // Sync Remote Sheets - { - if ctx.is_proc_on_local() { - let workspace = try_get_local_workspace(&ctx)?; - let Ok(latest_info) = LatestInfo::read_from(LatestInfo::latest_info_path( - workspace.local_path(), - &member_id, - )) - .await - else { - return Err(TcpTargetError::Io("Read latest info failed".to_string())); - }; - - // Collect all local versions - let mut local_versions = vec![]; - for request_sheet in latest_info.visible_sheets { - let Ok(data) = CachedSheet::cached_sheet_data(&request_sheet).await else { - // For newly created sheets, the version is 0. - // Send -1 to distinguish from 0, ensuring the upstream will definitely send the sheet information - local_versions.push((request_sheet, -1)); - continue; - }; - local_versions.push((request_sheet, data.write_count())); - } - - // Send the version list - let len = local_versions.len(); - instance.lock().await.write_msgpack(local_versions).await?; - - if len < 1 { - // Don't return here, continue to next section - // But we need to consume the false marker from the server - if ctx.is_proc_on_local() { - let mut mut_instance = instance.lock().await; - let _: bool = mut_instance.read_msgpack().await?; - } - } else { - // Receive data - if ctx.is_proc_on_local() { - let mut mut_instance = instance.lock().await; - loop { - let in_coming: bool = mut_instance.read_msgpack().await?; - if in_coming { - let (sheet_name, data): (SheetName, SheetData) = - mut_instance.read_large_msgpack(1024u16).await?; - - let Some(path) = CachedSheet::cached_sheet_path(sheet_name) else { - return Err(TcpTargetError::NotFound( - "Workspace not found".to_string(), - )); - }; - - SheetData::write_to(&data, path).await?; - } else { - break; - } - } - } - } - } - if ctx.is_proc_on_remote() { - let vault = try_get_vault(&ctx)?; - let mut mut_instance = instance.lock().await; - - let local_versions = mut_instance.read_msgpack::<Vec<(SheetName, i32)>>().await?; - - for (sheet_name, version) in local_versions.iter() { - let sheet = vault.sheet(sheet_name).await?; - if let Some(holder) = sheet.holder() - && (holder == &member_id || holder == VAULT_HOST_NAME) - && &sheet.write_count() != version - { - mut_instance.write_msgpack(true).await?; - mut_instance - .write_large_msgpack((sheet_name, sheet.to_data()), 1024u16) - .await?; - } - } - mut_instance.write_msgpack(false).await?; - } - } - - info!("Fetch held status to {}", member_id); - - // Sync Held Info - { - if ctx.is_proc_on_local() { - let workspace = try_get_local_workspace(&ctx)?; - - let Ok(latest_info) = LatestInfo::read_from(LatestInfo::latest_info_path( - workspace.local_path(), - &member_id, - )) - .await - else { - return Err(TcpTargetError::Io("Read latest info failed".to_string())); - }; - - // Collect files that need to know the holder - let mut holder_wants_know = Vec::new(); - for sheet_name in &latest_info.visible_sheets { - if let Ok(sheet_data) = CachedSheet::cached_sheet_data(sheet_name).await { - holder_wants_know - .extend(sheet_data.mapping().values().map(|value| value.id.clone())); - } - } - - // Send request - let mut mut_instance = instance.lock().await; - mut_instance - .write_large_msgpack(&holder_wants_know, 1024u16) - .await?; - - // Receive information and write to local - let result: HashMap< - VirtualFileId, - ( - Option<MemberId>, - VirtualFileVersion, - Vec<(VirtualFileVersion, VirtualFileVersionDescription)>, - ), - > = mut_instance.read_large_msgpack(1024u16).await?; - - // Read configuration file - let path = LatestFileData::data_path(&member_id)?; - let mut latest_file_data: LatestFileData = - LatestFileData::read_from(&path).await.unwrap_or_default(); - - // Write the received information - latest_file_data.update_info(result); - - // Write - LatestFileData::write_to(&latest_file_data, &path).await?; - } - - if ctx.is_proc_on_remote() { - let vault = try_get_vault(&ctx)?; - let mut mut_instance = instance.lock().await; - - // Read the request - let holder_wants_know: Vec<VirtualFileId> = - mut_instance.read_large_msgpack(1024u16).await?; - - // Organize the information - let mut result: HashMap< - VirtualFileId, - ( - Option<MemberId>, - VirtualFileVersion, - Vec<(VirtualFileVersion, VirtualFileVersionDescription)>, - ), - > = HashMap::new(); - for id in holder_wants_know { - let Ok(meta) = vault.virtual_file_meta(&id).await else { - continue; - }; - let holder = if meta.hold_member().is_empty() { - None - } else { - Some(meta.hold_member().clone()) - }; - let latest_version = meta.version_latest(); - - let all_versions = meta.versions(); - let all_descriptions = meta.version_descriptions(); - let histories = all_versions - .iter() - .filter_map(|v| { - let Some(desc) = all_descriptions.get(v) else { - return None; - }; - Some((v.clone(), desc.clone())) - }) - .collect::<Vec<(VirtualFileVersion, VirtualFileVersionDescription)>>(); - - result.insert(id, (holder, latest_version, histories)); - } - - // Send information - mut_instance.write_large_msgpack(&result, 1024u16).await?; - } - } - - // Sync cached sheet to local sheet - if ctx.is_proc_on_local() { - let workspace = try_get_local_workspace(&ctx)?; - let cached_sheet_path = workspace.local_path().join(CLIENT_PATH_CACHED_SHEET); - let local_sheet_path = workspace.local_path().join(CLIENT_PATH_LOCAL_SHEET); - if !local_sheet_path.exists() || !cached_sheet_path.exists() { - // No need to sync - if ctx.is_proc_on_local() { - sign_vault_modified(false).await; - } - return Ok(UpdateToLatestInfoResult::Success); - } - - let cached_sheet_paths = - extract_sheet_names_from_paths(CachedSheet::cached_sheet_paths().await?)?; - - // Match cached sheets and local sheets, and sync content - for (cached_sheet_name, _cached_sheet_path) in cached_sheet_paths { - // Read cached sheet and local sheet - let cached_sheet = CachedSheet::cached_sheet_data(&cached_sheet_name).await?; - let Ok(mut local_sheet) = workspace.local_sheet(&member_id, &cached_sheet_name).await - else { - continue; - }; - - // Read cached id mapping - let Some(cached_sheet_id_mapping) = cached_sheet.id_mapping() else { - continue; - }; - - for (cached_item_id, cached_item_path) in cached_sheet_id_mapping.iter() { - let path_by_id = { local_sheet.path_by_id(cached_item_id).cloned() }; - - // Get local path - let Some(local_path) = path_by_id else { - continue; - }; - - if &local_path == cached_item_path { - continue; - } - - // If path not match, try to move - let move_result = local_sheet.move_mapping(&local_path, cached_item_path); - if let Err(e) = move_result { - match e.kind() { - ErrorKind::AlreadyExists => { - return Ok(UpdateToLatestInfoResult::SyncCachedSheetFail( - SyncCachedSheetFailReason::PathAlreadyExist( - cached_item_path.clone(), - ), - )); - } - _ => return Err(e.into()), - } - } - local_sheet.write().await?; - } - } - } - - if ctx.is_proc_on_local() { - sign_vault_modified(false).await; - } - Ok(UpdateToLatestInfoResult::Success) -} - -/// Extract sheet names from file paths -fn extract_sheet_names_from_paths( - paths: Vec<PathBuf>, -) -> Result<HashMap<SheetName, PathBuf>, std::io::Error> { - let mut result = HashMap::new(); - for p in paths { - let sheet_name = p - .file_stem() - .and_then(|s| s.to_str()) - .map(|s| s.to_string()) - .ok_or_else(|| { - std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid file name") - })?; - result.insert(sheet_name, p); - } - Ok(result) -} |
