summaryrefslogtreecommitdiff
path: root/crates/vcs_actions/src
diff options
context:
space:
mode:
Diffstat (limited to 'crates/vcs_actions/src')
-rw-r--r--crates/vcs_actions/src/actions/local_actions.rs25
-rw-r--r--crates/vcs_actions/src/connection.rs3
-rw-r--r--crates/vcs_actions/src/connection/action_service.rs151
-rw-r--r--crates/vcs_actions/src/connection/error.rs14
-rw-r--r--crates/vcs_actions/src/connection/protocol.rs7
-rw-r--r--crates/vcs_actions/src/lib.rs1
-rw-r--r--crates/vcs_actions/src/registry/client_registry.rs60
-rw-r--r--crates/vcs_actions/src/registry/server_registry.rs9
8 files changed, 270 insertions, 0 deletions
diff --git a/crates/vcs_actions/src/actions/local_actions.rs b/crates/vcs_actions/src/actions/local_actions.rs
index e69de29..b11a934 100644
--- a/crates/vcs_actions/src/actions/local_actions.rs
+++ b/crates/vcs_actions/src/actions/local_actions.rs
@@ -0,0 +1,25 @@
+use action_system::{action::ActionContext, action_gen};
+use log::info;
+use tcp_connection::error::TcpTargetError;
+
+#[action_gen]
+pub async fn hello_world_action(ctx: ActionContext, _n: ()) -> Result<(), TcpTargetError> {
+ // Ensure the instance is available
+ let Some(instance) = ctx.instance() else {
+ return Err(TcpTargetError::NotFound(
+ "Connection Instance Lost.".to_string(),
+ ));
+ };
+
+ if ctx.is_local() {
+ // Invoke on local
+ // Send the message to the server
+ let _ = instance.lock().await.write_text("Hello World!").await;
+ } else if ctx.is_remote() {
+ // Read the message from the client
+ let read = instance.lock().await.read_text().await?;
+ info!("{}", read)
+ }
+
+ Ok(())
+}
diff --git a/crates/vcs_actions/src/connection.rs b/crates/vcs_actions/src/connection.rs
new file mode 100644
index 0000000..918f93c
--- /dev/null
+++ b/crates/vcs_actions/src/connection.rs
@@ -0,0 +1,3 @@
+pub mod action_service;
+pub mod error;
+pub mod protocol;
diff --git a/crates/vcs_actions/src/connection/action_service.rs b/crates/vcs_actions/src/connection/action_service.rs
new file mode 100644
index 0000000..c302fd4
--- /dev/null
+++ b/crates/vcs_actions/src/connection/action_service.rs
@@ -0,0 +1,151 @@
+use std::{net::SocketAddr, path::PathBuf, sync::Arc};
+
+use action_system::{action::ActionContext, action_pool::ActionPool};
+use cfg_file::config::ConfigFile;
+use tcp_connection::{error::TcpTargetError, instance::ConnectionInstance};
+use tokio::{
+ net::{TcpListener, TcpStream},
+ select, signal, spawn,
+ sync::mpsc,
+};
+use vcs_data::data::vault::{Vault, config::VaultConfig};
+
+use crate::{
+ connection::protocol::RemoteActionInvoke, registry::server_registry::server_action_pool,
+};
+
+// Start the server with a Vault using the specified directory
+pub async fn server_entry(vault_path: impl Into<PathBuf>) -> Result<(), TcpTargetError> {
+ // Read the vault cfg
+ let vault_cfg = VaultConfig::read().await?;
+
+ // Create TCPListener
+ let listener = create_tcp_listener(&vault_cfg).await?;
+
+ // Initialize the vault
+ let vault: Arc<Vault> = init_vault(vault_cfg, vault_path.into()).await?;
+
+ // Create ActionPool
+ let action_pool: Arc<ActionPool> = Arc::new(server_action_pool());
+
+ // Start the server
+ let (_shutdown_rx, future) = build_server_future(vault.clone(), action_pool.clone(), listener);
+ future.await?; // Start and block until shutdown
+
+ Ok(())
+}
+
+async fn create_tcp_listener(cfg: &VaultConfig) -> Result<TcpListener, TcpTargetError> {
+ let local_bind_addr = cfg.server_config().local_bind();
+ let bind_port = cfg.server_config().port();
+ let sock_addr = SocketAddr::new(*local_bind_addr, bind_port);
+ let listener = TcpListener::bind(sock_addr).await?;
+
+ Ok(listener)
+}
+
+async fn init_vault(cfg: VaultConfig, path: PathBuf) -> Result<Arc<Vault>, TcpTargetError> {
+ // Init and create the vault
+ let Some(vault) = Vault::init(cfg, path) else {
+ return Err(TcpTargetError::NotFound("Vault not found".to_string()));
+ };
+ let vault: Arc<Vault> = Arc::new(vault);
+
+ Ok(vault)
+}
+
+fn build_server_future(
+ vault: Arc<Vault>,
+ action_pool: Arc<ActionPool>,
+ listener: TcpListener,
+) -> (
+ mpsc::Sender<()>,
+ impl std::future::Future<Output = Result<(), TcpTargetError>>,
+) {
+ let (tx, mut rx) = mpsc::channel::<i32>(100);
+ let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
+ let mut active_connections = 0;
+ let mut shutdown_requested = false;
+
+ // Spawn task to handle Ctrl+C
+ let shutdown_tx_clone = shutdown_tx.clone();
+ spawn(async move {
+ if let Ok(()) = signal::ctrl_c().await {
+ let _ = shutdown_tx_clone.send(()).await;
+ }
+ });
+
+ let future = async move {
+ loop {
+ select! {
+ // Accept new connections
+ accept_result = listener.accept(), if !shutdown_requested => {
+ match accept_result {
+ Ok((stream, _addr)) => {
+ active_connections += 1;
+ let _ = tx.send(1).await;
+
+ let vault_clone = vault.clone();
+ let action_pool_clone = action_pool.clone();
+ let tx_clone = tx.clone();
+ spawn(async move {
+ process_connection(stream, vault_clone, action_pool_clone).await;
+ let _ = tx_clone.send(-1).await;
+ });
+ }
+ Err(_) => {
+ continue;
+ }
+ }
+ }
+
+ // Handle connection count updates
+ Some(count_change) = rx.recv() => {
+ active_connections = (active_connections as i32 + count_change) as usize;
+
+ // Check if we should shutdown after all connections are done
+ if shutdown_requested && active_connections == 0 {
+ break;
+ }
+ }
+
+ // Handle shutdown signal
+ _ = shutdown_rx.recv() => {
+ shutdown_requested = true;
+ // If no active connections, break immediately
+ if active_connections == 0 {
+ break;
+ }
+ }
+ }
+ }
+
+ Ok(())
+ };
+
+ (shutdown_tx, future)
+}
+
+async fn process_connection(stream: TcpStream, vault: Arc<Vault>, action_pool: Arc<ActionPool>) {
+ // Setup connection instance
+ let mut instance = ConnectionInstance::from(stream);
+
+ // Read action name and action arguments
+ let Ok(msg) = instance.read_msgpack::<RemoteActionInvoke>().await else {
+ return;
+ };
+
+ // Build context
+ let ctx: ActionContext = ActionContext::remote().insert_instance(instance);
+
+ // Insert vault into context
+ let ctx = ctx.insert_arc(vault);
+
+ // Process action
+ let Ok(_result_json) = action_pool
+ .process_json(&msg.action_name, ctx, msg.action_args_json)
+ .await
+ else {
+ return;
+ };
+}
diff --git a/crates/vcs_actions/src/connection/error.rs b/crates/vcs_actions/src/connection/error.rs
new file mode 100644
index 0000000..241c16e
--- /dev/null
+++ b/crates/vcs_actions/src/connection/error.rs
@@ -0,0 +1,14 @@
+use std::io;
+use thiserror::Error;
+
+#[derive(Error, Debug, Clone)]
+pub enum ConnectionError {
+ #[error("I/O error: {0}")]
+ Io(String),
+}
+
+impl From<io::Error> for ConnectionError {
+ fn from(error: io::Error) -> Self {
+ ConnectionError::Io(error.to_string())
+ }
+}
diff --git a/crates/vcs_actions/src/connection/protocol.rs b/crates/vcs_actions/src/connection/protocol.rs
new file mode 100644
index 0000000..2cebe79
--- /dev/null
+++ b/crates/vcs_actions/src/connection/protocol.rs
@@ -0,0 +1,7 @@
+use serde::{Deserialize, Serialize};
+
+#[derive(Default, Clone, Serialize, Deserialize)]
+pub struct RemoteActionInvoke {
+ pub action_name: String,
+ pub action_args_json: String,
+}
diff --git a/crates/vcs_actions/src/lib.rs b/crates/vcs_actions/src/lib.rs
index 92de35f..2f7cbe4 100644
--- a/crates/vcs_actions/src/lib.rs
+++ b/crates/vcs_actions/src/lib.rs
@@ -1,2 +1,3 @@
pub mod actions;
+pub mod connection;
pub mod registry;
diff --git a/crates/vcs_actions/src/registry/client_registry.rs b/crates/vcs_actions/src/registry/client_registry.rs
index e69de29..5939bed 100644
--- a/crates/vcs_actions/src/registry/client_registry.rs
+++ b/crates/vcs_actions/src/registry/client_registry.rs
@@ -0,0 +1,60 @@
+use action_system::{action::ActionContext, action_pool::ActionPool};
+use tcp_connection::error::TcpTargetError;
+
+use crate::{
+ actions::local_actions::register_hello_world_action, connection::protocol::RemoteActionInvoke,
+};
+
+fn register_actions(pool: &mut ActionPool) {
+ // Pool register here
+ register_hello_world_action(pool);
+}
+
+pub fn client_action_pool() -> ActionPool {
+ // Create pool
+ let mut pool = ActionPool::new();
+
+ // Register actions
+ register_actions(&mut pool);
+
+ // Add process events
+ pool.set_on_proc_begin(|ctx, args| Box::pin(on_proc_begin(ctx, args)));
+
+ // Return
+ pool
+}
+
+async fn on_proc_begin(
+ ctx: &ActionContext,
+ _args: &(dyn std::any::Any + Send + Sync),
+) -> Result<(), TcpTargetError> {
+ // Is ctx remote
+ let is_remote = ctx.is_remote();
+
+ // Action name and arguments
+ let action_name = ctx.action_name().to_string();
+ let action_args_json = ctx.action_args_json().clone();
+
+ // Get instance
+ let Some(instance) = ctx.instance() else {
+ return Err(TcpTargetError::Unsupported(
+ "Missing ConnectionInstance in current context, this ActionPool does not support this call"
+ .to_string()));
+ };
+
+ // If it's remote, invoke action at server
+ if is_remote {
+ // Build protocol message
+ let msg = RemoteActionInvoke {
+ action_name,
+ action_args_json,
+ };
+
+ // Send
+ let mut instance = instance.lock().await;
+ instance.write_msgpack(&msg).await?;
+ }
+
+ // Return OK, wait for client to execute Action locally
+ Ok(())
+}
diff --git a/crates/vcs_actions/src/registry/server_registry.rs b/crates/vcs_actions/src/registry/server_registry.rs
index e69de29..b449b68 100644
--- a/crates/vcs_actions/src/registry/server_registry.rs
+++ b/crates/vcs_actions/src/registry/server_registry.rs
@@ -0,0 +1,9 @@
+use action_system::action_pool::ActionPool;
+
+use crate::actions::local_actions::register_hello_world_action;
+
+pub fn server_action_pool() -> ActionPool {
+ let mut pool = ActionPool::new();
+ register_hello_world_action(&mut pool);
+ pool
+}