diff options
| author | 魏曹先生 <1992414357@qq.com> | 2026-01-12 04:28:28 +0800 |
|---|---|---|
| committer | 魏曹先生 <1992414357@qq.com> | 2026-01-12 04:51:34 +0800 |
| commit | c5fb22694e95f12c24b8d8af76999be7aea3fcec (patch) | |
| tree | 399d8a24ce491fb635f3d09f2123290fe784059e /actions/src/connection/action_service.rs | |
| parent | 444754489aca0454eb54e15a49fb8a6db0b68a07 (diff) | |
Reorganize crate structure and move documentation files
Diffstat (limited to 'actions/src/connection/action_service.rs')
| -rw-r--r-- | actions/src/connection/action_service.rs | 221 |
1 files changed, 221 insertions, 0 deletions
diff --git a/actions/src/connection/action_service.rs b/actions/src/connection/action_service.rs new file mode 100644 index 0000000..f137126 --- /dev/null +++ b/actions/src/connection/action_service.rs @@ -0,0 +1,221 @@ +use std::{ + env::set_current_dir, + net::SocketAddr, + path::PathBuf, + sync::Arc, + time::{Duration, Instant}, +}; + +use action_system::{action::ActionContext, action_pool::ActionPool}; +use cfg_file::config::ConfigFile; +use log::{debug, error, info, warn}; +use tcp_connection::{error::TcpTargetError, instance::ConnectionInstance}; +use tokio::{ + net::{TcpListener, TcpStream}, + select, signal, spawn, + sync::mpsc, +}; +use vcs_data::data::vault::{Vault, config::VaultConfig}; + +use crate::{ + connection::protocol::RemoteActionInvoke, registry::server_registry::server_action_pool, +}; + +// Start the server with a Vault using the specified directory +pub async fn server_entry( + vault_path: impl Into<PathBuf>, + port_override: u16, +) -> Result<(), TcpTargetError> { + let vault_path = vault_path.into(); + + // Set to vault path + set_current_dir(&vault_path).map_err(|e| TcpTargetError::Io(e.to_string()))?; + + // Read the vault cfg + let vault_cfg = VaultConfig::read().await?; + + // Create TCPListener + let listener = create_tcp_listener(&vault_cfg, port_override).await?; + + // Initialize the vault + let vault: Arc<Vault> = init_vault(vault_cfg, vault_path).await?; + + // Lock the vault + vault + .lock() + .map_err(|e| TcpTargetError::Locked(e.to_string()))?; + + // Create ActionPool + let action_pool: Arc<ActionPool> = Arc::new(server_action_pool()); + + // Start the server + let (_shutdown_rx, future) = build_server_future(vault.clone(), action_pool.clone(), listener); + future.await?; // Start and block until shutdown + + // Unlock the vault + vault.unlock()?; + + Ok(()) +} + +async fn create_tcp_listener( + cfg: &VaultConfig, + port_override: u16, +) -> Result<TcpListener, TcpTargetError> { + let local_bind_addr = cfg.server_config().local_bind(); + let port = if port_override > 0 { + port_override // Override -> PORT > 0 + } else { + cfg.server_config().port() // Default -> Port = 0 + }; + let bind_port = port; + let sock_addr = SocketAddr::new(*local_bind_addr, bind_port); + let listener = TcpListener::bind(sock_addr).await?; + + Ok(listener) +} + +async fn init_vault(cfg: VaultConfig, path: PathBuf) -> Result<Arc<Vault>, TcpTargetError> { + // Init and create the vault + let Some(vault) = Vault::init(cfg, path) else { + return Err(TcpTargetError::NotFound("Vault not found".to_string())); + }; + let vault: Arc<Vault> = Arc::new(vault); + + Ok(vault) +} + +fn build_server_future( + vault: Arc<Vault>, + action_pool: Arc<ActionPool>, + listener: TcpListener, +) -> ( + mpsc::Sender<()>, + impl std::future::Future<Output = Result<(), TcpTargetError>>, +) { + let (tx, mut rx) = mpsc::channel::<i32>(100); + let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1); + let mut active_connections = 0; + let mut shutdown_requested = false; + + // Spawn task to handle Ctrl+C with rapid exit detection + let shutdown_tx_clone = shutdown_tx.clone(); + spawn(async move { + let mut ctrl_c_count = 0; + let mut last_ctrl_c_time = Instant::now(); + + while let Ok(()) = signal::ctrl_c().await { + let now = Instant::now(); + + // Reset counter if more than 5 seconds have passed + if now.duration_since(last_ctrl_c_time) > Duration::from_secs(5) { + ctrl_c_count = 0; + } + + ctrl_c_count += 1; + last_ctrl_c_time = now; + + let _ = shutdown_tx_clone.send(()).await; + + // If 3 Ctrl+C within 5 seconds, exit immediately + if ctrl_c_count >= 3 { + info!("Shutdown. (3/3)"); + std::process::exit(0); + } else { + info!("Ctrl + C to force shutdown. ({} / 3)", ctrl_c_count); + } + } + }); + + let future = async move { + loop { + select! { + // Accept new connections + accept_result = listener.accept(), if !shutdown_requested => { + match accept_result { + Ok((stream, _addr)) => { + debug!("New connection. (now {})", active_connections); + let _ = tx.send(1).await; + + let vault_clone = vault.clone(); + let action_pool_clone = action_pool.clone(); + let tx_clone = tx.clone(); + + spawn(async move { + process_connection(stream, vault_clone, action_pool_clone).await; + debug!("A connection closed. (now {})", active_connections); + let _ = tx_clone.send(-1).await; + }); + } + Err(_) => { + continue; + } + } + } + + // Handle connection count updates + Some(count_change) = rx.recv() => { + active_connections = (active_connections as i32 + count_change) as usize; + + // Check if we should shutdown after all connections are done + if shutdown_requested && active_connections == 0 { + break; + } + } + + // Handle shutdown signal + _ = shutdown_rx.recv() => { + shutdown_requested = true; + // If no active connections, break immediately + if active_connections == 0 { + info!("No active connections. Shutting down."); + break; + } else { + warn!("Cannot shutdown while active connections exist! ({} active)", active_connections); + } + } + } + } + + Ok(()) + }; + + (shutdown_tx, future) +} + +async fn process_connection(stream: TcpStream, vault: Arc<Vault>, action_pool: Arc<ActionPool>) { + // Setup connection instance + let mut instance = ConnectionInstance::from(stream); + + // Read action name and action arguments + let msg = match instance.read_msgpack::<RemoteActionInvoke>().await { + Ok(msg) => msg, + Err(e) => { + error!("Failed to read action message: {}", e); + return; + } + }; + + // Build context + let ctx: ActionContext = ActionContext::remote().insert_instance(instance); + + // Insert vault into context + let ctx = ctx.with_arc_data(vault); + + info!( + "Process action `{}` with argument `{}`", + msg.action_name, msg.action_args_json + ); + + // Process action + let result = action_pool + .process_json(&msg.action_name, ctx, msg.action_args_json) + .await; + + match result { + Ok(_result_json) => {} + Err(e) => { + warn!("Failed to process action `{}`: {}", msg.action_name, e); + } + } +} |
