summaryrefslogtreecommitdiff
path: root/crates/vcs_actions/src/connection/action_service.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/vcs_actions/src/connection/action_service.rs')
-rw-r--r--crates/vcs_actions/src/connection/action_service.rs75
1 files changed, 65 insertions, 10 deletions
diff --git a/crates/vcs_actions/src/connection/action_service.rs b/crates/vcs_actions/src/connection/action_service.rs
index c302fd4..f787cae 100644
--- a/crates/vcs_actions/src/connection/action_service.rs
+++ b/crates/vcs_actions/src/connection/action_service.rs
@@ -1,7 +1,13 @@
-use std::{net::SocketAddr, path::PathBuf, sync::Arc};
+use std::{
+ net::SocketAddr,
+ path::PathBuf,
+ sync::Arc,
+ time::{Duration, Instant},
+};
use action_system::{action::ActionContext, action_pool::ActionPool};
use cfg_file::config::ConfigFile;
+use log::{error, info, warn};
use tcp_connection::{error::TcpTargetError, instance::ConnectionInstance};
use tokio::{
net::{TcpListener, TcpStream},
@@ -25,6 +31,12 @@ pub async fn server_entry(vault_path: impl Into<PathBuf>) -> Result<(), TcpTarge
// Initialize the vault
let vault: Arc<Vault> = init_vault(vault_cfg, vault_path.into()).await?;
+ // Lock the vault
+ vault.lock().map_err(|e| {
+ error!("{}", e);
+ TcpTargetError::Locked(e.to_string())
+ })?;
+
// Create ActionPool
let action_pool: Arc<ActionPool> = Arc::new(server_action_pool());
@@ -32,6 +44,9 @@ pub async fn server_entry(vault_path: impl Into<PathBuf>) -> Result<(), TcpTarge
let (_shutdown_rx, future) = build_server_future(vault.clone(), action_pool.clone(), listener);
future.await?; // Start and block until shutdown
+ // Unlock the vault
+ vault.unlock()?;
+
Ok(())
}
@@ -67,11 +82,32 @@ fn build_server_future(
let mut active_connections = 0;
let mut shutdown_requested = false;
- // Spawn task to handle Ctrl+C
+ // Spawn task to handle Ctrl+C with rapid exit detection
let shutdown_tx_clone = shutdown_tx.clone();
spawn(async move {
- if let Ok(()) = signal::ctrl_c().await {
+ let mut ctrl_c_count = 0;
+ let mut last_ctrl_c_time = Instant::now();
+
+ while let Ok(()) = signal::ctrl_c().await {
+ let now = Instant::now();
+
+ // Reset counter if more than 5 seconds have passed
+ if now.duration_since(last_ctrl_c_time) > Duration::from_secs(5) {
+ ctrl_c_count = 0;
+ }
+
+ ctrl_c_count += 1;
+ last_ctrl_c_time = now;
+
let _ = shutdown_tx_clone.send(()).await;
+
+ // If 3 Ctrl+C within 5 seconds, exit immediately
+ if ctrl_c_count >= 3 {
+ info!("Shutdown. (3/3)");
+ std::process::exit(0);
+ } else {
+ info!("Ctrl + C to force shutdown. ({} / 3)", ctrl_c_count);
+ }
}
});
@@ -82,14 +118,16 @@ fn build_server_future(
accept_result = listener.accept(), if !shutdown_requested => {
match accept_result {
Ok((stream, _addr)) => {
- active_connections += 1;
+ info!("New connection. (now {})", active_connections);
let _ = tx.send(1).await;
let vault_clone = vault.clone();
let action_pool_clone = action_pool.clone();
let tx_clone = tx.clone();
+
spawn(async move {
process_connection(stream, vault_clone, action_pool_clone).await;
+ info!("A connection closed. (now {})", active_connections);
let _ = tx_clone.send(-1).await;
});
}
@@ -114,7 +152,10 @@ fn build_server_future(
shutdown_requested = true;
// If no active connections, break immediately
if active_connections == 0 {
+ info!("No active connections. Shutting down.");
break;
+ } else {
+ warn!("Cannot shutdown while active connections exist! ({} active)", active_connections);
}
}
}
@@ -131,8 +172,12 @@ async fn process_connection(stream: TcpStream, vault: Arc<Vault>, action_pool: A
let mut instance = ConnectionInstance::from(stream);
// Read action name and action arguments
- let Ok(msg) = instance.read_msgpack::<RemoteActionInvoke>().await else {
- return;
+ let msg = match instance.read_msgpack::<RemoteActionInvoke>().await {
+ Ok(msg) => msg,
+ Err(e) => {
+ error!("Failed to read action message: {}", e);
+ return;
+ }
};
// Build context
@@ -141,11 +186,21 @@ async fn process_connection(stream: TcpStream, vault: Arc<Vault>, action_pool: A
// Insert vault into context
let ctx = ctx.insert_arc(vault);
+ info!(
+ "Process action `{}` with argument `{}`",
+ msg.action_name, msg.action_args_json
+ );
+
// Process action
- let Ok(_result_json) = action_pool
+ let result = action_pool
.process_json(&msg.action_name, ctx, msg.action_args_json)
- .await
- else {
- return;
+ .await;
+
+ match result {
+ Ok(_result_json) => {}
+ Err(e) => {
+ warn!("Failed to process action `{}`: {}", msg.action_name, e);
+ return;
+ }
};
}