summaryrefslogtreecommitdiff
path: root/crates
diff options
context:
space:
mode:
Diffstat (limited to 'crates')
-rw-r--r--crates/vcs_actions/src/actions/local_actions.rs11
-rw-r--r--crates/vcs_actions/src/connection/action_service.rs66
2 files changed, 61 insertions, 16 deletions
diff --git a/crates/vcs_actions/src/actions/local_actions.rs b/crates/vcs_actions/src/actions/local_actions.rs
index c19b8f0..c2d9f4f 100644
--- a/crates/vcs_actions/src/actions/local_actions.rs
+++ b/crates/vcs_actions/src/actions/local_actions.rs
@@ -14,11 +14,10 @@ pub async fn hello_world_action(ctx: ActionContext, _n: ()) -> Result<(), TcpTar
};
if ctx.is_local() {
- // Invoke on local
- // Send the message to the server
- let _ = instance.lock().await.write_text("Hello World!").await;
+ // Local execution - communication is handled by on_proc_begin
+ info!("Hello World action executed locally");
} else if ctx.is_remote() {
- // Read the message from the client
+ // Remote execution - read the message from the client
let read = instance.lock().await.read_text().await?;
info!("{}", read)
}
@@ -43,9 +42,9 @@ pub async fn set_upstream_vault_action(
// Send the message to the server
let _ = instance.lock().await.write_text("Hello World!").await;
} else if ctx.is_remote() {
- // Read the message from the client
+ // Remote execution - read the message from the client
let read = instance.lock().await.read_text().await?;
- info!("{}", read)
+ info!("Received: {}", read)
}
Ok(())
diff --git a/crates/vcs_actions/src/connection/action_service.rs b/crates/vcs_actions/src/connection/action_service.rs
index c302fd4..f76921e 100644
--- a/crates/vcs_actions/src/connection/action_service.rs
+++ b/crates/vcs_actions/src/connection/action_service.rs
@@ -1,7 +1,13 @@
-use std::{net::SocketAddr, path::PathBuf, sync::Arc};
+use std::{
+ net::SocketAddr,
+ path::PathBuf,
+ sync::Arc,
+ time::{Duration, Instant},
+};
use action_system::{action::ActionContext, action_pool::ActionPool};
use cfg_file::config::ConfigFile;
+use log::{error, info, warn};
use tcp_connection::{error::TcpTargetError, instance::ConnectionInstance};
use tokio::{
net::{TcpListener, TcpStream},
@@ -67,11 +73,32 @@ fn build_server_future(
let mut active_connections = 0;
let mut shutdown_requested = false;
- // Spawn task to handle Ctrl+C
+ // Spawn task to handle Ctrl+C with rapid exit detection
let shutdown_tx_clone = shutdown_tx.clone();
spawn(async move {
- if let Ok(()) = signal::ctrl_c().await {
+ let mut ctrl_c_count = 0;
+ let mut last_ctrl_c_time = Instant::now();
+
+ while let Ok(()) = signal::ctrl_c().await {
+ let now = Instant::now();
+
+ // Reset counter if more than 5 seconds have passed
+ if now.duration_since(last_ctrl_c_time) > Duration::from_secs(5) {
+ ctrl_c_count = 0;
+ }
+
+ ctrl_c_count += 1;
+ last_ctrl_c_time = now;
+
let _ = shutdown_tx_clone.send(()).await;
+
+ // If 3 Ctrl+C within 5 seconds, exit immediately
+ if ctrl_c_count >= 3 {
+ info!("Shutdown. (3/3)");
+ std::process::exit(0);
+ } else {
+ info!("Ctrl + C to force shutdown. ({} / 3)", ctrl_c_count);
+ }
}
});
@@ -82,14 +109,16 @@ fn build_server_future(
accept_result = listener.accept(), if !shutdown_requested => {
match accept_result {
Ok((stream, _addr)) => {
- active_connections += 1;
+ info!("New connection accepted.");
let _ = tx.send(1).await;
let vault_clone = vault.clone();
let action_pool_clone = action_pool.clone();
let tx_clone = tx.clone();
+
spawn(async move {
process_connection(stream, vault_clone, action_pool_clone).await;
+ info!("A connection closed.");
let _ = tx_clone.send(-1).await;
});
}
@@ -114,7 +143,10 @@ fn build_server_future(
shutdown_requested = true;
// If no active connections, break immediately
if active_connections == 0 {
+ info!("No active connections. Shutting down.");
break;
+ } else {
+ warn!("Cannot shutdown while active connections exist! ({} active)", active_connections);
}
}
}
@@ -131,8 +163,12 @@ async fn process_connection(stream: TcpStream, vault: Arc<Vault>, action_pool: A
let mut instance = ConnectionInstance::from(stream);
// Read action name and action arguments
- let Ok(msg) = instance.read_msgpack::<RemoteActionInvoke>().await else {
- return;
+ let msg = match instance.read_msgpack::<RemoteActionInvoke>().await {
+ Ok(msg) => msg,
+ Err(e) => {
+ error!("Failed to read action message: {}", e);
+ return;
+ }
};
// Build context
@@ -141,11 +177,21 @@ async fn process_connection(stream: TcpStream, vault: Arc<Vault>, action_pool: A
// Insert vault into context
let ctx = ctx.insert_arc(vault);
+ info!(
+ "Process action `{}` with argument `{}`",
+ msg.action_name, msg.action_args_json
+ );
+
// Process action
- let Ok(_result_json) = action_pool
+ let result = action_pool
.process_json(&msg.action_name, ctx, msg.action_args_json)
- .await
- else {
- return;
+ .await;
+
+ match result {
+ Ok(_result_json) => {}
+ Err(e) => {
+ warn!("Failed to process action `{}`: {}", msg.action_name, e);
+ return;
+ }
};
}