From 27f6414ad1ff451feb0044af62f37dc2a6255ffa Mon Sep 17 00:00:00 2001 From: 魏曹先生 <1992414357@qq.com> Date: Thu, 5 Feb 2026 22:35:05 +0800 Subject: Remove examples and legacy code, update .gitignore - Delete examples directory and its example action system - Rename actions/ to legacy_actions/ and data/ to legacy_data/ - Update Cargo.toml license file reference - Move setup scripts to scripts/dev/ directory - Add todo.txt patterns to .gitignore --- actions/src/connection/action_service.rs | 221 ------------------------------- actions/src/connection/error.rs | 27 ---- actions/src/connection/protocol.rs | 7 - 3 files changed, 255 deletions(-) delete mode 100644 actions/src/connection/action_service.rs delete mode 100644 actions/src/connection/error.rs delete mode 100644 actions/src/connection/protocol.rs (limited to 'actions/src/connection') diff --git a/actions/src/connection/action_service.rs b/actions/src/connection/action_service.rs deleted file mode 100644 index 3e05a70..0000000 --- a/actions/src/connection/action_service.rs +++ /dev/null @@ -1,221 +0,0 @@ -use std::{ - env::set_current_dir, - net::SocketAddr, - path::PathBuf, - sync::Arc, - time::{Duration, Instant}, -}; - -use action_system::{action::ActionContext, action_pool::ActionPool}; -use cfg_file::config::ConfigFile; -use log::{debug, error, info, warn}; -use tcp_connection::{error::TcpTargetError, instance::ConnectionInstance}; -use tokio::{ - net::{TcpListener, TcpStream}, - select, signal, spawn, - sync::mpsc, -}; -use vcs_data::data::vault::{Vault, vault_config::VaultConfig}; - -use crate::{ - connection::protocol::RemoteActionInvoke, registry::server_registry::server_action_pool, -}; - -// Start the server with a Vault using the specified directory -pub async fn server_entry( - vault_path: impl Into, - port_override: u16, -) -> Result<(), TcpTargetError> { - let vault_path = vault_path.into(); - - // Set to vault path - set_current_dir(&vault_path).map_err(|e| TcpTargetError::Io(e.to_string()))?; - - // Read the vault cfg - let vault_cfg = VaultConfig::read().await?; - - // Create TCPListener - let listener = create_tcp_listener(&vault_cfg, port_override).await?; - - // Initialize the vault - let vault: Arc = init_vault(vault_cfg, vault_path).await?; - - // Lock the vault - vault - .lock() - .map_err(|e| TcpTargetError::Locked(e.to_string()))?; - - // Create ActionPool - let action_pool: Arc = Arc::new(server_action_pool()); - - // Start the server - let (_shutdown_rx, future) = build_server_future(vault.clone(), action_pool.clone(), listener); - future.await?; // Start and block until shutdown - - // Unlock the vault - vault.unlock()?; - - Ok(()) -} - -async fn create_tcp_listener( - cfg: &VaultConfig, - port_override: u16, -) -> Result { - let local_bind_addr = cfg.server_config().local_bind(); - let port = if port_override > 0 { - port_override // Override -> PORT > 0 - } else { - cfg.server_config().port() // Default -> Port = 0 - }; - let bind_port = port; - let sock_addr = SocketAddr::new(*local_bind_addr, bind_port); - let listener = TcpListener::bind(sock_addr).await?; - - Ok(listener) -} - -async fn init_vault(cfg: VaultConfig, path: PathBuf) -> Result, TcpTargetError> { - // Init and create the vault - let Some(vault) = Vault::init(cfg, path) else { - return Err(TcpTargetError::NotFound("Vault not found".to_string())); - }; - let vault: Arc = Arc::new(vault); - - Ok(vault) -} - -fn build_server_future( - vault: Arc, - action_pool: Arc, - listener: TcpListener, -) -> ( - mpsc::Sender<()>, - impl std::future::Future>, -) { - let (tx, mut rx) = mpsc::channel::(100); - let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1); - let mut active_connections = 0; - let mut shutdown_requested = false; - - // Spawn task to handle Ctrl+C with rapid exit detection - let shutdown_tx_clone = shutdown_tx.clone(); - spawn(async move { - let mut ctrl_c_count = 0; - let mut last_ctrl_c_time = Instant::now(); - - while let Ok(()) = signal::ctrl_c().await { - let now = Instant::now(); - - // Reset counter if more than 5 seconds have passed - if now.duration_since(last_ctrl_c_time) > Duration::from_secs(5) { - ctrl_c_count = 0; - } - - ctrl_c_count += 1; - last_ctrl_c_time = now; - - let _ = shutdown_tx_clone.send(()).await; - - // If 3 Ctrl+C within 5 seconds, exit immediately - if ctrl_c_count >= 3 { - info!("Shutdown. (3/3)"); - std::process::exit(0); - } else { - info!("Ctrl + C to force shutdown. ({} / 3)", ctrl_c_count); - } - } - }); - - let future = async move { - loop { - select! { - // Accept new connections - accept_result = listener.accept(), if !shutdown_requested => { - match accept_result { - Ok((stream, _addr)) => { - debug!("New connection. (now {})", active_connections); - let _ = tx.send(1).await; - - let vault_clone = vault.clone(); - let action_pool_clone = action_pool.clone(); - let tx_clone = tx.clone(); - - spawn(async move { - process_connection(stream, vault_clone, action_pool_clone).await; - debug!("A connection closed. (now {})", active_connections); - let _ = tx_clone.send(-1).await; - }); - } - Err(_) => { - continue; - } - } - } - - // Handle connection count updates - Some(count_change) = rx.recv() => { - active_connections = (active_connections as i32 + count_change) as usize; - - // Check if we should shutdown after all connections are done - if shutdown_requested && active_connections == 0 { - break; - } - } - - // Handle shutdown signal - _ = shutdown_rx.recv() => { - shutdown_requested = true; - // If no active connections, break immediately - if active_connections == 0 { - info!("No active connections. Shutting down."); - break; - } else { - warn!("Cannot shutdown while active connections exist! ({} active)", active_connections); - } - } - } - } - - Ok(()) - }; - - (shutdown_tx, future) -} - -async fn process_connection(stream: TcpStream, vault: Arc, action_pool: Arc) { - // Setup connection instance - let mut instance = ConnectionInstance::from(stream); - - // Read action name and action arguments - let msg = match instance.read_msgpack::().await { - Ok(msg) => msg, - Err(e) => { - error!("Failed to read action message: {}", e); - return; - } - }; - - // Build context - let ctx: ActionContext = ActionContext::remote().insert_instance(instance); - - // Insert vault into context - let ctx = ctx.with_arc_data(vault); - - info!( - "Process action `{}` with argument `{}`", - msg.action_name, msg.action_args_json - ); - - // Process action - let result = action_pool - .process_json(&msg.action_name, ctx, msg.action_args_json) - .await; - - match result { - Ok(_result_json) => {} - Err(e) => { - warn!("Failed to process action `{}`: {}", msg.action_name, e); - } - } -} diff --git a/actions/src/connection/error.rs b/actions/src/connection/error.rs deleted file mode 100644 index 1a4e221..0000000 --- a/actions/src/connection/error.rs +++ /dev/null @@ -1,27 +0,0 @@ -use std::io; -use thiserror::Error; -use vcs_data::data::member::MemberId; - -#[derive(Error, Debug, Clone)] -pub enum ConnectionError { - #[error("I/O error: {0}")] - Io(String), -} - -#[derive(Error, Debug, Clone)] -pub enum ProcessActionError { - #[error("Action `{0}` not registered")] - ActionNotRegistered(String), - - #[error("Authorize `{0}` failed")] - AuthorizeFailed(MemberId), - - #[error("Authorize host `{0}` failed")] - AuthorizeHostFailed(MemberId), -} - -impl From for ConnectionError { - fn from(error: io::Error) -> Self { - ConnectionError::Io(error.to_string()) - } -} diff --git a/actions/src/connection/protocol.rs b/actions/src/connection/protocol.rs deleted file mode 100644 index 2cebe79..0000000 --- a/actions/src/connection/protocol.rs +++ /dev/null @@ -1,7 +0,0 @@ -use serde::{Deserialize, Serialize}; - -#[derive(Default, Clone, Serialize, Deserialize)] -pub struct RemoteActionInvoke { - pub action_name: String, - pub action_args_json: String, -} -- cgit