summaryrefslogtreecommitdiff
path: root/crates/vcs_actions/src/connection
diff options
context:
space:
mode:
author魏曹先生 <1992414357@qq.com>2026-01-12 04:28:28 +0800
committer魏曹先生 <1992414357@qq.com>2026-01-12 04:51:34 +0800
commitc5fb22694e95f12c24b8d8af76999be7aea3fcec (patch)
tree399d8a24ce491fb635f3d09f2123290fe784059e /crates/vcs_actions/src/connection
parent444754489aca0454eb54e15a49fb8a6db0b68a07 (diff)
Reorganize crate structure and move documentation files
Diffstat (limited to 'crates/vcs_actions/src/connection')
-rw-r--r--crates/vcs_actions/src/connection/action_service.rs221
-rw-r--r--crates/vcs_actions/src/connection/error.rs14
-rw-r--r--crates/vcs_actions/src/connection/protocol.rs7
3 files changed, 0 insertions, 242 deletions
diff --git a/crates/vcs_actions/src/connection/action_service.rs b/crates/vcs_actions/src/connection/action_service.rs
deleted file mode 100644
index f137126..0000000
--- a/crates/vcs_actions/src/connection/action_service.rs
+++ /dev/null
@@ -1,221 +0,0 @@
-use std::{
- env::set_current_dir,
- net::SocketAddr,
- path::PathBuf,
- sync::Arc,
- time::{Duration, Instant},
-};
-
-use action_system::{action::ActionContext, action_pool::ActionPool};
-use cfg_file::config::ConfigFile;
-use log::{debug, error, info, warn};
-use tcp_connection::{error::TcpTargetError, instance::ConnectionInstance};
-use tokio::{
- net::{TcpListener, TcpStream},
- select, signal, spawn,
- sync::mpsc,
-};
-use vcs_data::data::vault::{Vault, config::VaultConfig};
-
-use crate::{
- connection::protocol::RemoteActionInvoke, registry::server_registry::server_action_pool,
-};
-
-// Start the server with a Vault using the specified directory
-pub async fn server_entry(
- vault_path: impl Into<PathBuf>,
- port_override: u16,
-) -> Result<(), TcpTargetError> {
- let vault_path = vault_path.into();
-
- // Set to vault path
- set_current_dir(&vault_path).map_err(|e| TcpTargetError::Io(e.to_string()))?;
-
- // Read the vault cfg
- let vault_cfg = VaultConfig::read().await?;
-
- // Create TCPListener
- let listener = create_tcp_listener(&vault_cfg, port_override).await?;
-
- // Initialize the vault
- let vault: Arc<Vault> = init_vault(vault_cfg, vault_path).await?;
-
- // Lock the vault
- vault
- .lock()
- .map_err(|e| TcpTargetError::Locked(e.to_string()))?;
-
- // Create ActionPool
- let action_pool: Arc<ActionPool> = Arc::new(server_action_pool());
-
- // Start the server
- let (_shutdown_rx, future) = build_server_future(vault.clone(), action_pool.clone(), listener);
- future.await?; // Start and block until shutdown
-
- // Unlock the vault
- vault.unlock()?;
-
- Ok(())
-}
-
-async fn create_tcp_listener(
- cfg: &VaultConfig,
- port_override: u16,
-) -> Result<TcpListener, TcpTargetError> {
- let local_bind_addr = cfg.server_config().local_bind();
- let port = if port_override > 0 {
- port_override // Override -> PORT > 0
- } else {
- cfg.server_config().port() // Default -> Port = 0
- };
- let bind_port = port;
- let sock_addr = SocketAddr::new(*local_bind_addr, bind_port);
- let listener = TcpListener::bind(sock_addr).await?;
-
- Ok(listener)
-}
-
-async fn init_vault(cfg: VaultConfig, path: PathBuf) -> Result<Arc<Vault>, TcpTargetError> {
- // Init and create the vault
- let Some(vault) = Vault::init(cfg, path) else {
- return Err(TcpTargetError::NotFound("Vault not found".to_string()));
- };
- let vault: Arc<Vault> = Arc::new(vault);
-
- Ok(vault)
-}
-
-fn build_server_future(
- vault: Arc<Vault>,
- action_pool: Arc<ActionPool>,
- listener: TcpListener,
-) -> (
- mpsc::Sender<()>,
- impl std::future::Future<Output = Result<(), TcpTargetError>>,
-) {
- let (tx, mut rx) = mpsc::channel::<i32>(100);
- let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
- let mut active_connections = 0;
- let mut shutdown_requested = false;
-
- // Spawn task to handle Ctrl+C with rapid exit detection
- let shutdown_tx_clone = shutdown_tx.clone();
- spawn(async move {
- let mut ctrl_c_count = 0;
- let mut last_ctrl_c_time = Instant::now();
-
- while let Ok(()) = signal::ctrl_c().await {
- let now = Instant::now();
-
- // Reset counter if more than 5 seconds have passed
- if now.duration_since(last_ctrl_c_time) > Duration::from_secs(5) {
- ctrl_c_count = 0;
- }
-
- ctrl_c_count += 1;
- last_ctrl_c_time = now;
-
- let _ = shutdown_tx_clone.send(()).await;
-
- // If 3 Ctrl+C within 5 seconds, exit immediately
- if ctrl_c_count >= 3 {
- info!("Shutdown. (3/3)");
- std::process::exit(0);
- } else {
- info!("Ctrl + C to force shutdown. ({} / 3)", ctrl_c_count);
- }
- }
- });
-
- let future = async move {
- loop {
- select! {
- // Accept new connections
- accept_result = listener.accept(), if !shutdown_requested => {
- match accept_result {
- Ok((stream, _addr)) => {
- debug!("New connection. (now {})", active_connections);
- let _ = tx.send(1).await;
-
- let vault_clone = vault.clone();
- let action_pool_clone = action_pool.clone();
- let tx_clone = tx.clone();
-
- spawn(async move {
- process_connection(stream, vault_clone, action_pool_clone).await;
- debug!("A connection closed. (now {})", active_connections);
- let _ = tx_clone.send(-1).await;
- });
- }
- Err(_) => {
- continue;
- }
- }
- }
-
- // Handle connection count updates
- Some(count_change) = rx.recv() => {
- active_connections = (active_connections as i32 + count_change) as usize;
-
- // Check if we should shutdown after all connections are done
- if shutdown_requested && active_connections == 0 {
- break;
- }
- }
-
- // Handle shutdown signal
- _ = shutdown_rx.recv() => {
- shutdown_requested = true;
- // If no active connections, break immediately
- if active_connections == 0 {
- info!("No active connections. Shutting down.");
- break;
- } else {
- warn!("Cannot shutdown while active connections exist! ({} active)", active_connections);
- }
- }
- }
- }
-
- Ok(())
- };
-
- (shutdown_tx, future)
-}
-
-async fn process_connection(stream: TcpStream, vault: Arc<Vault>, action_pool: Arc<ActionPool>) {
- // Setup connection instance
- let mut instance = ConnectionInstance::from(stream);
-
- // Read action name and action arguments
- let msg = match instance.read_msgpack::<RemoteActionInvoke>().await {
- Ok(msg) => msg,
- Err(e) => {
- error!("Failed to read action message: {}", e);
- return;
- }
- };
-
- // Build context
- let ctx: ActionContext = ActionContext::remote().insert_instance(instance);
-
- // Insert vault into context
- let ctx = ctx.with_arc_data(vault);
-
- info!(
- "Process action `{}` with argument `{}`",
- msg.action_name, msg.action_args_json
- );
-
- // Process action
- let result = action_pool
- .process_json(&msg.action_name, ctx, msg.action_args_json)
- .await;
-
- match result {
- Ok(_result_json) => {}
- Err(e) => {
- warn!("Failed to process action `{}`: {}", msg.action_name, e);
- }
- }
-}
diff --git a/crates/vcs_actions/src/connection/error.rs b/crates/vcs_actions/src/connection/error.rs
deleted file mode 100644
index 241c16e..0000000
--- a/crates/vcs_actions/src/connection/error.rs
+++ /dev/null
@@ -1,14 +0,0 @@
-use std::io;
-use thiserror::Error;
-
-#[derive(Error, Debug, Clone)]
-pub enum ConnectionError {
- #[error("I/O error: {0}")]
- Io(String),
-}
-
-impl From<io::Error> for ConnectionError {
- fn from(error: io::Error) -> Self {
- ConnectionError::Io(error.to_string())
- }
-}
diff --git a/crates/vcs_actions/src/connection/protocol.rs b/crates/vcs_actions/src/connection/protocol.rs
deleted file mode 100644
index 2cebe79..0000000
--- a/crates/vcs_actions/src/connection/protocol.rs
+++ /dev/null
@@ -1,7 +0,0 @@
-use serde::{Deserialize, Serialize};
-
-#[derive(Default, Clone, Serialize, Deserialize)]
-pub struct RemoteActionInvoke {
- pub action_name: String,
- pub action_args_json: String,
-}