diff options
Diffstat (limited to 'crates')
| -rw-r--r-- | crates/system_action/action_macros/src/lib.rs | 17 | ||||
| -rw-r--r-- | crates/system_action/src/action.rs | 38 | ||||
| -rw-r--r-- | crates/system_action/src/action_pool.rs | 67 | ||||
| -rw-r--r-- | crates/vcs_actions/src/actions/local_actions.rs | 2 | ||||
| -rw-r--r-- | crates/vcs_actions/src/connection.rs | 1 | ||||
| -rw-r--r-- | crates/vcs_actions/src/connection/action_service.rs | 29 | ||||
| -rw-r--r-- | crates/vcs_actions/src/connection/protocol.rs | 7 | ||||
| -rw-r--r-- | crates/vcs_actions/src/registry/client_registry.rs | 28 |
8 files changed, 166 insertions, 23 deletions
diff --git a/crates/system_action/action_macros/src/lib.rs b/crates/system_action/action_macros/src/lib.rs index 683efcb..aa1c696 100644 --- a/crates/system_action/action_macros/src/lib.rs +++ b/crates/system_action/action_macros/src/lib.rs @@ -67,14 +67,19 @@ fn generate_action_struct(input_fn: ItemFn, _is_local: bool) -> proc_macro2::Tok ctx: action_system::action::ActionContext, #arg_param_name: #arg_type ) -> Result<#return_type, tcp_connection::error::TcpTargetError> { - pool.process::<#arg_type, #return_type>( + let args_json = serde_json::to_string(&#arg_param_name) + .map_err(|e| { + tcp_connection::error::TcpTargetError::Serialization(e.to_string()) + })?; + let result_json = pool.process_json( Box::leak(string_proc::snake_case!(stringify!(#action_name_ident)).into_boxed_str()), ctx, - serde_json::to_string(&#arg_param_name) - .map_err(|e| { - tcp_connection::error::TcpTargetError::Serialization(e.to_string()) - })? - ).await + args_json, + ).await?; + serde_json::from_str(&result_json) + .map_err(|e| { + tcp_connection::error::TcpTargetError::Serialization(e.to_string()) + }) } #[allow(dead_code)] diff --git a/crates/system_action/src/action.rs b/crates/system_action/src/action.rs index c8d7a18..e7d2d8c 100644 --- a/crates/system_action/src/action.rs +++ b/crates/system_action/src/action.rs @@ -22,6 +22,12 @@ pub struct ActionContext { /// Whether the action is executed locally or remotely local: bool, + /// The name of the action being executed + action_name: String, + + /// The JSON-serialized arguments for the action + action_args_json: String, + /// The connection instance in the current context, /// used to interact with the machine on the other end instance: Option<ConnectionInstance>, @@ -53,6 +59,11 @@ impl ActionContext { self.instance = Some(instance); self } + + /// Pop connection instance from context + pub fn pop_instance(&mut self) -> Option<ConnectionInstance> { + self.instance.take() + } } impl ActionContext { @@ -70,4 +81,31 @@ impl ActionContext { pub fn instance(&self) -> &Option<ConnectionInstance> { &self.instance } + + /// Get a mutable reference to the connection instance in the current context + pub fn instance_mut(&mut self) -> &mut Option<ConnectionInstance> { + &mut self.instance + } + + /// Get the action name from the context + pub fn action_name(&self) -> &str { + &self.action_name + } + + /// Get the action arguments from the context + pub fn action_args_json(&self) -> &String { + &self.action_args_json + } + + /// Set the action name in the context + pub fn set_action_name(mut self, action_name: String) -> Self { + self.action_name = action_name; + self + } + + /// Set the action arguments in the context + pub fn set_action_args_json(mut self, action_args: String) -> Self { + self.action_args_json = action_args; + self + } } diff --git a/crates/system_action/src/action_pool.rs b/crates/system_action/src/action_pool.rs index a3e82d6..7e93fc4 100644 --- a/crates/system_action/src/action_pool.rs +++ b/crates/system_action/src/action_pool.rs @@ -1,13 +1,14 @@ use std::pin::Pin; use serde::{Serialize, de::DeserializeOwned}; +use serde_json; use tcp_connection::error::TcpTargetError; use crate::action::{Action, ActionContext}; type ProcBeginCallback = for<'a> fn( - &'a ActionContext, + &'a mut ActionContext, ) -> Pin<Box<dyn Future<Output = Result<(), TcpTargetError>> + Send + 'a>>; type ProcEndCallback = fn() -> Pin<Box<dyn Future<Output = Result<(), TcpTargetError>> + Send>>; @@ -68,20 +69,51 @@ impl ActionPool { /// ```ignore /// let result = action_pool.process::<MyArgs, MyReturn>("my_action", context, args).await?; /// ``` - pub async fn process<'a, Args, Return>( + /// Processes an action by name with JSON-serialized arguments + /// + /// Usage: + /// ```ignore + /// let result_json = action_pool.process_json("my_action", context, args_json).await?; + /// let result: MyReturn = serde_json::from_str(&result_json)?; + /// ``` + pub async fn process_json<'a>( &'a self, action_name: &'a str, context: ActionContext, args_json: String, + ) -> Result<String, TcpTargetError> { + if let Some(action) = self.actions.get(action_name) { + // Set action name and args in context for callbacks + let context = context.set_action_name(action_name.to_string()); + let mut context = context.set_action_args_json(args_json.clone()); + + let _ = self.exec_on_proc_begin(&mut context).await?; + let result = action.process_json_erased(context, args_json).await?; + let _ = self.exec_on_proc_end().await?; + Ok(result) + } else { + Err(TcpTargetError::Unsupported("InvalidAction".to_string())) + } + } + + /// Processes an action by name with given context and arguments + /// + /// Usage: + /// ```ignore + /// let result = action_pool.process::<MyArgs, MyReturn>("my_action", context, args).await?; + /// ``` + pub async fn process<'a, Args, Return>( + &'a self, + action_name: &'a str, + mut context: ActionContext, + args: Args, ) -> Result<Return, TcpTargetError> where Args: serde::de::DeserializeOwned + Send + 'static, Return: serde::Serialize + Send + 'static, { if let Some(action) = self.actions.get(action_name) { - let _ = self.exec_on_proc_begin(&context).await?; - let args: Args = serde_json::from_str(&args_json) - .map_err(|e| TcpTargetError::Serialization(format!("Deserialize failed: {}", e)))?; + let _ = self.exec_on_proc_begin(&mut context).await?; let result = action.process_erased(context, Box::new(args)).await?; let result = *result .downcast::<Return>() @@ -94,7 +126,7 @@ impl ActionPool { } /// Executes the process begin callback if set - async fn exec_on_proc_begin(&self, context: &ActionContext) -> Result<(), TcpTargetError> { + async fn exec_on_proc_begin(&self, context: &mut ActionContext) -> Result<(), TcpTargetError> { if let Some(callback) = &self.on_proc_begin { callback(context).await } else { @@ -125,6 +157,13 @@ trait ActionErased: Send + Sync { + Send, >, >; + + /// Processes the action with JSON-serialized arguments and returns JSON-serialized result + fn process_json_erased( + &self, + context: ActionContext, + args_json: String, + ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, TcpTargetError>> + Send>>; } /// Wrapper struct that implements ActionErased for concrete Action types @@ -154,4 +193,20 @@ where Ok(Box::new(result) as Box<dyn std::any::Any + Send>) }) } + + fn process_json_erased( + &self, + context: ActionContext, + args_json: String, + ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, TcpTargetError>> + Send>> + { + Box::pin(async move { + let args: Args = serde_json::from_str(&args_json) + .map_err(|e| TcpTargetError::Serialization(format!("Deserialize failed: {}", e)))?; + let result = A::process(context, args).await?; + let result_json = serde_json::to_string(&result) + .map_err(|e| TcpTargetError::Serialization(format!("Serialize failed: {}", e)))?; + Ok(result_json) + }) + } } diff --git a/crates/vcs_actions/src/actions/local_actions.rs b/crates/vcs_actions/src/actions/local_actions.rs index b230c6f..0e210a7 100644 --- a/crates/vcs_actions/src/actions/local_actions.rs +++ b/crates/vcs_actions/src/actions/local_actions.rs @@ -6,7 +6,7 @@ use tcp_connection::error::TcpTargetError; #[action_gen(local)] pub async fn set_upstream_vault_action( ctx: ActionContext, - upstream: SocketAddr, + _upstream: SocketAddr, ) -> Result<(), TcpTargetError> { if ctx.is_remote() { return Err(TcpTargetError::NotLocal( diff --git a/crates/vcs_actions/src/connection.rs b/crates/vcs_actions/src/connection.rs index dabbd44..918f93c 100644 --- a/crates/vcs_actions/src/connection.rs +++ b/crates/vcs_actions/src/connection.rs @@ -1,2 +1,3 @@ pub mod action_service; pub mod error; +pub mod protocol; diff --git a/crates/vcs_actions/src/connection/action_service.rs b/crates/vcs_actions/src/connection/action_service.rs index 8d3a03d..9ea5957 100644 --- a/crates/vcs_actions/src/connection/action_service.rs +++ b/crates/vcs_actions/src/connection/action_service.rs @@ -1,6 +1,6 @@ use std::{net::SocketAddr, path::PathBuf, sync::Arc}; -use action_system::action_pool::ActionPool; +use action_system::{action::ActionContext, action_pool::ActionPool}; use cfg_file::config::ConfigFile; use tcp_connection::{error::TcpTargetError, instance::ConnectionInstance}; use tokio::{ @@ -10,10 +10,12 @@ use tokio::{ }; use vcs_data::data::vault::{Vault, config::VaultConfig}; -use crate::registry::server_registry::server_action_pool; +use crate::{ + connection::protocol::RemoteActionInvoke, registry::server_registry::server_action_pool, +}; // Start the server with a Vault using the specified directory -pub async fn server_entry(path: impl Into<PathBuf>) -> Result<(), TcpTargetError> { +pub async fn server_entry(vault_path: impl Into<PathBuf>) -> Result<(), TcpTargetError> { // Read the vault cfg let vault_cfg = VaultConfig::read().await?; @@ -21,7 +23,7 @@ pub async fn server_entry(path: impl Into<PathBuf>) -> Result<(), TcpTargetError let listener = create_tcp_listener(&vault_cfg).await?; // Initialize the vault - let vault: Arc<Vault> = init_vault(vault_cfg, path.into()).await?; + let vault: Arc<Vault> = init_vault(vault_cfg, vault_path.into()).await?; // Create ActionPool let action_pool: Arc<ActionPool> = Arc::new(server_action_pool()); @@ -125,5 +127,22 @@ fn build_server_future( } async fn process_connection(stream: TcpStream, vault: Arc<Vault>, action_pool: Arc<ActionPool>) { - let instance = ConnectionInstance::from(stream); + // Setup connection instance + let mut instance = ConnectionInstance::from(stream); + + // Read action name and action arguments + let Ok(msg) = instance.read_msgpack::<RemoteActionInvoke>().await else { + return; + }; + + // Build context + let ctx = ActionContext::remote().insert_instance(instance); + + // Process action + let Ok(_result_json) = action_pool + .process_json(&msg.action_name, ctx, msg.action_args_json) + .await + else { + return; + }; } diff --git a/crates/vcs_actions/src/connection/protocol.rs b/crates/vcs_actions/src/connection/protocol.rs new file mode 100644 index 0000000..2cebe79 --- /dev/null +++ b/crates/vcs_actions/src/connection/protocol.rs @@ -0,0 +1,7 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Default, Clone, Serialize, Deserialize)] +pub struct RemoteActionInvoke { + pub action_name: String, + pub action_args_json: String, +} diff --git a/crates/vcs_actions/src/registry/client_registry.rs b/crates/vcs_actions/src/registry/client_registry.rs index d298099..56acdad 100644 --- a/crates/vcs_actions/src/registry/client_registry.rs +++ b/crates/vcs_actions/src/registry/client_registry.rs @@ -1,7 +1,10 @@ use action_system::{action::ActionContext, action_pool::ActionPool}; use tcp_connection::error::TcpTargetError; -use crate::actions::local_actions::register_set_upstream_vault_action; +use crate::{ + actions::local_actions::register_set_upstream_vault_action, + connection::protocol::RemoteActionInvoke, +}; fn register_actions(pool: &mut ActionPool) { // Pool register here @@ -22,18 +25,33 @@ pub fn client_action_pool() -> ActionPool { pool } -async fn on_proc_begin(ctx: &ActionContext) -> Result<(), TcpTargetError> { +async fn on_proc_begin(ctx: &mut ActionContext) -> Result<(), TcpTargetError> { + // Is ctx remote + let is_remote = ctx.is_remote(); + + // Action name and arguments + let action_name = ctx.action_name().to_string(); + let action_args_json = ctx.action_args_json().clone(); + // Get instance - let Some(_instance) = ctx.instance() else { + let Some(instance) = ctx.instance_mut() else { return Err(TcpTargetError::Unsupported( "Missing ConnectionInstance in current context, this ActionPool does not support this call" .to_string())); }; // If it's remote, invoke action at server - if ctx.is_remote() { - // instance.write_text(text) + if is_remote { + // Build protocol message + let msg = RemoteActionInvoke { + action_name: action_name, + action_args_json: action_args_json, + }; + + // Send + instance.write_msgpack(msg).await?; } + // Return OK, wait for client to execute Action locally Ok(()) } |
