summaryrefslogtreecommitdiff
path: root/crates/vcs_actions
diff options
context:
space:
mode:
Diffstat (limited to 'crates/vcs_actions')
-rw-r--r--crates/vcs_actions/src/actions/local_actions.rs17
-rw-r--r--crates/vcs_actions/src/connection/action_service.rs75
-rw-r--r--crates/vcs_actions/src/registry/client_registry.rs9
-rw-r--r--crates/vcs_actions/src/registry/server_registry.rs4
4 files changed, 83 insertions, 22 deletions
diff --git a/crates/vcs_actions/src/actions/local_actions.rs b/crates/vcs_actions/src/actions/local_actions.rs
index b11a934..f705692 100644
--- a/crates/vcs_actions/src/actions/local_actions.rs
+++ b/crates/vcs_actions/src/actions/local_actions.rs
@@ -1,9 +1,14 @@
-use action_system::{action::ActionContext, action_gen};
+use std::net::SocketAddr;
+
+use action_system::{action::ActionContext, macros::action_gen};
use log::info;
use tcp_connection::error::TcpTargetError;
#[action_gen]
-pub async fn hello_world_action(ctx: ActionContext, _n: ()) -> Result<(), TcpTargetError> {
+pub async fn set_upstream_vault_action(
+ ctx: ActionContext,
+ _upstream: SocketAddr,
+) -> Result<(), TcpTargetError> {
// Ensure the instance is available
let Some(instance) = ctx.instance() else {
return Err(TcpTargetError::NotFound(
@@ -11,14 +16,14 @@ pub async fn hello_world_action(ctx: ActionContext, _n: ()) -> Result<(), TcpTar
));
};
- if ctx.is_local() {
+ if ctx.is_proc_on_local() {
// Invoke on local
// Send the message to the server
let _ = instance.lock().await.write_text("Hello World!").await;
- } else if ctx.is_remote() {
- // Read the message from the client
+ } else if ctx.is_proc_on_remote() {
+ // Remote execution - read the message from the client
let read = instance.lock().await.read_text().await?;
- info!("{}", read)
+ info!("Received: {}", read)
}
Ok(())
diff --git a/crates/vcs_actions/src/connection/action_service.rs b/crates/vcs_actions/src/connection/action_service.rs
index c302fd4..f787cae 100644
--- a/crates/vcs_actions/src/connection/action_service.rs
+++ b/crates/vcs_actions/src/connection/action_service.rs
@@ -1,7 +1,13 @@
-use std::{net::SocketAddr, path::PathBuf, sync::Arc};
+use std::{
+ net::SocketAddr,
+ path::PathBuf,
+ sync::Arc,
+ time::{Duration, Instant},
+};
use action_system::{action::ActionContext, action_pool::ActionPool};
use cfg_file::config::ConfigFile;
+use log::{error, info, warn};
use tcp_connection::{error::TcpTargetError, instance::ConnectionInstance};
use tokio::{
net::{TcpListener, TcpStream},
@@ -25,6 +31,12 @@ pub async fn server_entry(vault_path: impl Into<PathBuf>) -> Result<(), TcpTarge
// Initialize the vault
let vault: Arc<Vault> = init_vault(vault_cfg, vault_path.into()).await?;
+ // Lock the vault
+ vault.lock().map_err(|e| {
+ error!("{}", e);
+ TcpTargetError::Locked(e.to_string())
+ })?;
+
// Create ActionPool
let action_pool: Arc<ActionPool> = Arc::new(server_action_pool());
@@ -32,6 +44,9 @@ pub async fn server_entry(vault_path: impl Into<PathBuf>) -> Result<(), TcpTarge
let (_shutdown_rx, future) = build_server_future(vault.clone(), action_pool.clone(), listener);
future.await?; // Start and block until shutdown
+ // Unlock the vault
+ vault.unlock()?;
+
Ok(())
}
@@ -67,11 +82,32 @@ fn build_server_future(
let mut active_connections = 0;
let mut shutdown_requested = false;
- // Spawn task to handle Ctrl+C
+ // Spawn task to handle Ctrl+C with rapid exit detection
let shutdown_tx_clone = shutdown_tx.clone();
spawn(async move {
- if let Ok(()) = signal::ctrl_c().await {
+ let mut ctrl_c_count = 0;
+ let mut last_ctrl_c_time = Instant::now();
+
+ while let Ok(()) = signal::ctrl_c().await {
+ let now = Instant::now();
+
+ // Reset counter if more than 5 seconds have passed
+ if now.duration_since(last_ctrl_c_time) > Duration::from_secs(5) {
+ ctrl_c_count = 0;
+ }
+
+ ctrl_c_count += 1;
+ last_ctrl_c_time = now;
+
let _ = shutdown_tx_clone.send(()).await;
+
+ // If 3 Ctrl+C within 5 seconds, exit immediately
+ if ctrl_c_count >= 3 {
+ info!("Shutdown. (3/3)");
+ std::process::exit(0);
+ } else {
+ info!("Ctrl + C to force shutdown. ({} / 3)", ctrl_c_count);
+ }
}
});
@@ -82,14 +118,16 @@ fn build_server_future(
accept_result = listener.accept(), if !shutdown_requested => {
match accept_result {
Ok((stream, _addr)) => {
- active_connections += 1;
+ info!("New connection. (now {})", active_connections);
let _ = tx.send(1).await;
let vault_clone = vault.clone();
let action_pool_clone = action_pool.clone();
let tx_clone = tx.clone();
+
spawn(async move {
process_connection(stream, vault_clone, action_pool_clone).await;
+ info!("A connection closed. (now {})", active_connections);
let _ = tx_clone.send(-1).await;
});
}
@@ -114,7 +152,10 @@ fn build_server_future(
shutdown_requested = true;
// If no active connections, break immediately
if active_connections == 0 {
+ info!("No active connections. Shutting down.");
break;
+ } else {
+ warn!("Cannot shutdown while active connections exist! ({} active)", active_connections);
}
}
}
@@ -131,8 +172,12 @@ async fn process_connection(stream: TcpStream, vault: Arc<Vault>, action_pool: A
let mut instance = ConnectionInstance::from(stream);
// Read action name and action arguments
- let Ok(msg) = instance.read_msgpack::<RemoteActionInvoke>().await else {
- return;
+ let msg = match instance.read_msgpack::<RemoteActionInvoke>().await {
+ Ok(msg) => msg,
+ Err(e) => {
+ error!("Failed to read action message: {}", e);
+ return;
+ }
};
// Build context
@@ -141,11 +186,21 @@ async fn process_connection(stream: TcpStream, vault: Arc<Vault>, action_pool: A
// Insert vault into context
let ctx = ctx.insert_arc(vault);
+ info!(
+ "Process action `{}` with argument `{}`",
+ msg.action_name, msg.action_args_json
+ );
+
// Process action
- let Ok(_result_json) = action_pool
+ let result = action_pool
.process_json(&msg.action_name, ctx, msg.action_args_json)
- .await
- else {
- return;
+ .await;
+
+ match result {
+ Ok(_result_json) => {}
+ Err(e) => {
+ warn!("Failed to process action `{}`: {}", msg.action_name, e);
+ return;
+ }
};
}
diff --git a/crates/vcs_actions/src/registry/client_registry.rs b/crates/vcs_actions/src/registry/client_registry.rs
index 5939bed..9769750 100644
--- a/crates/vcs_actions/src/registry/client_registry.rs
+++ b/crates/vcs_actions/src/registry/client_registry.rs
@@ -2,12 +2,13 @@ use action_system::{action::ActionContext, action_pool::ActionPool};
use tcp_connection::error::TcpTargetError;
use crate::{
- actions::local_actions::register_hello_world_action, connection::protocol::RemoteActionInvoke,
+ actions::local_actions::register_set_upstream_vault_action,
+ connection::protocol::RemoteActionInvoke,
};
fn register_actions(pool: &mut ActionPool) {
// Pool register here
- register_hello_world_action(pool);
+ register_set_upstream_vault_action(pool);
}
pub fn client_action_pool() -> ActionPool {
@@ -25,11 +26,11 @@ pub fn client_action_pool() -> ActionPool {
}
async fn on_proc_begin(
- ctx: &ActionContext,
+ ctx: &mut ActionContext,
_args: &(dyn std::any::Any + Send + Sync),
) -> Result<(), TcpTargetError> {
// Is ctx remote
- let is_remote = ctx.is_remote();
+ let is_remote = ctx.is_remote_action();
// Action name and arguments
let action_name = ctx.action_name().to_string();
diff --git a/crates/vcs_actions/src/registry/server_registry.rs b/crates/vcs_actions/src/registry/server_registry.rs
index b449b68..3ecc103 100644
--- a/crates/vcs_actions/src/registry/server_registry.rs
+++ b/crates/vcs_actions/src/registry/server_registry.rs
@@ -1,9 +1,9 @@
use action_system::action_pool::ActionPool;
-use crate::actions::local_actions::register_hello_world_action;
+use crate::actions::local_actions::register_set_upstream_vault_action;
pub fn server_action_pool() -> ActionPool {
let mut pool = ActionPool::new();
- register_hello_world_action(&mut pool);
+ register_set_upstream_vault_action(&mut pool);
pool
}