From acf0804b5f9bdc2796d847919a8ae20103be600a Mon Sep 17 00:00:00 2001 From: 魏曹先生 <1992414357@qq.com> Date: Mon, 13 Oct 2025 14:17:51 +0800 Subject: feat: implement asynchronous action call system - Add async callback support with proper argument passing - Implement remote action invocation via TCP connection - Add hello_world_action example demonstrating async communication - Improve ActionPool with type-safe async processing - Update client registry for remote action handling - Enhance ActionContext with better instance management - Support both local and remote action execution modes --- crates/system_action/action_macros/src/lib.rs | 4 +- crates/system_action/src/action.rs | 70 +++++++++++++++++++--- crates/system_action/src/action_pool.rs | 19 +++--- crates/vcs_actions/src/actions/local_actions.rs | 28 +++++---- .../vcs_actions/src/connection/action_service.rs | 5 +- crates/vcs_actions/src/registry/client_registry.rs | 17 +++--- crates/vcs_actions/src/registry/server_registry.rs | 4 +- 7 files changed, 109 insertions(+), 38 deletions(-) diff --git a/crates/system_action/action_macros/src/lib.rs b/crates/system_action/action_macros/src/lib.rs index aa1c696..d1a47ee 100644 --- a/crates/system_action/action_macros/src/lib.rs +++ b/crates/system_action/action_macros/src/lib.rs @@ -89,12 +89,12 @@ fn generate_action_struct(input_fn: ItemFn, _is_local: bool) -> proc_macro2::Tok #[doc = ""] #[doc = "Register the action to the pool."] #[doc = "```ignore"] - #[doc = "YourAction::register_to_pool(&mut pool);"] + #[doc = "register_your_func(&mut pool);"] #[doc = "```"] #[doc = ""] #[doc = "Process the action at the pool."] #[doc = "```ignore"] - #[doc = "let result = YourAction::process_at_pool(&pool, ctx, arg).await?;"] + #[doc = "let result = proc_your_func(&pool, ctx, arg).await?;"] #[doc = "```"] #fn_vis #fn_sig #fn_block } diff --git a/crates/system_action/src/action.rs b/crates/system_action/src/action.rs index e7d2d8c..3ae5711 100644 --- a/crates/system_action/src/action.rs +++ b/crates/system_action/src/action.rs @@ -1,6 +1,9 @@ use serde::{Serialize, de::DeserializeOwned}; +use std::any::{Any, TypeId}; +use std::collections::HashMap; +use std::sync::Arc; use tcp_connection::{error::TcpTargetError, instance::ConnectionInstance}; -use tokio::net::TcpStream; +use tokio::{net::TcpStream, sync::Mutex}; pub trait Action where @@ -29,8 +32,10 @@ pub struct ActionContext { action_args_json: String, /// The connection instance in the current context, - /// used to interact with the machine on the other end - instance: Option, + instance: Option>>, + + /// Generic data storage for arbitrary types + data: HashMap>, } impl ActionContext { @@ -50,18 +55,18 @@ impl ActionContext { /// Build connection instance from TcpStream pub fn build_instance(mut self, stream: TcpStream) -> Self { - self.instance = Some(ConnectionInstance::from(stream)); + self.instance = Some(Arc::new(Mutex::new(ConnectionInstance::from(stream)))); self } /// Insert connection instance into context pub fn insert_instance(mut self, instance: ConnectionInstance) -> Self { - self.instance = Some(instance); + self.instance = Some(Arc::new(Mutex::new(instance))); self } /// Pop connection instance from context - pub fn pop_instance(&mut self) -> Option { + pub fn pop_instance(&mut self) -> Option>> { self.instance.take() } } @@ -78,12 +83,12 @@ impl ActionContext { } /// Get the connection instance in the current context - pub fn instance(&self) -> &Option { + pub fn instance(&self) -> &Option>> { &self.instance } /// Get a mutable reference to the connection instance in the current context - pub fn instance_mut(&mut self) -> &mut Option { + pub fn instance_mut(&mut self) -> &mut Option>> { &mut self.instance } @@ -104,8 +109,55 @@ impl ActionContext { } /// Set the action arguments in the context - pub fn set_action_args_json(mut self, action_args: String) -> Self { + pub fn set_action_args(mut self, action_args: String) -> Self { self.action_args_json = action_args; self } + + /// Insert arbitrary data in the context + pub fn insert(mut self, value: T) -> Self { + self.data.insert(TypeId::of::(), Arc::new(value)); + self + } + + /// Insert arbitrary data as Arc in the context + pub fn insert_arc(mut self, value: Arc) -> Self { + self.data.insert(TypeId::of::(), value); + self + } + + /// Get arbitrary data from the context + pub fn get(&self) -> Option<&T> { + self.data + .get(&TypeId::of::()) + .and_then(|arc| arc.downcast_ref::()) + } + + /// Get arbitrary data as Arc from the context + pub fn get_arc(&self) -> Option> { + self.data + .get(&TypeId::of::()) + .and_then(|arc| Arc::clone(arc).downcast::().ok()) + } + + /// Remove and return arbitrary data from the context + pub fn remove(&mut self) -> Option> { + self.data + .remove(&TypeId::of::()) + .and_then(|arc| arc.downcast::().ok()) + } + + /// Check if the context contains data of a specific type + pub fn contains(&self) -> bool { + self.data.contains_key(&TypeId::of::()) + } + + /// Take ownership of the context and extract data of a specific type + pub fn take(mut self) -> (Self, Option>) { + let value = self + .data + .remove(&TypeId::of::()) + .and_then(|arc| arc.downcast::().ok()); + (self, value) + } } diff --git a/crates/system_action/src/action_pool.rs b/crates/system_action/src/action_pool.rs index 7e93fc4..f3e178a 100644 --- a/crates/system_action/src/action_pool.rs +++ b/crates/system_action/src/action_pool.rs @@ -8,7 +8,8 @@ use crate::action::{Action, ActionContext}; type ProcBeginCallback = for<'a> fn( - &'a mut ActionContext, + &'a ActionContext, + args: &'a (dyn std::any::Any + Send + Sync), ) -> Pin> + Send + 'a>>; type ProcEndCallback = fn() -> Pin> + Send>>; @@ -85,9 +86,9 @@ impl ActionPool { if let Some(action) = self.actions.get(action_name) { // Set action name and args in context for callbacks let context = context.set_action_name(action_name.to_string()); - let mut context = context.set_action_args_json(args_json.clone()); + let context = context.set_action_args(args_json.clone()); - let _ = self.exec_on_proc_begin(&mut context).await?; + let _ = self.exec_on_proc_begin(&context, &args_json).await?; let result = action.process_json_erased(context, args_json).await?; let _ = self.exec_on_proc_end().await?; Ok(result) @@ -109,11 +110,11 @@ impl ActionPool { args: Args, ) -> Result where - Args: serde::de::DeserializeOwned + Send + 'static, + Args: serde::de::DeserializeOwned + Send + Sync + 'static, Return: serde::Serialize + Send + 'static, { if let Some(action) = self.actions.get(action_name) { - let _ = self.exec_on_proc_begin(&mut context).await?; + let _ = self.exec_on_proc_begin(&context, &args).await?; let result = action.process_erased(context, Box::new(args)).await?; let result = *result .downcast::() @@ -126,9 +127,13 @@ impl ActionPool { } /// Executes the process begin callback if set - async fn exec_on_proc_begin(&self, context: &mut ActionContext) -> Result<(), TcpTargetError> { + async fn exec_on_proc_begin( + &self, + context: &ActionContext, + args: &(dyn std::any::Any + Send + Sync), + ) -> Result<(), TcpTargetError> { if let Some(callback) = &self.on_proc_begin { - callback(context).await + callback(context, args).await } else { Ok(()) } diff --git a/crates/vcs_actions/src/actions/local_actions.rs b/crates/vcs_actions/src/actions/local_actions.rs index 0e210a7..b11a934 100644 --- a/crates/vcs_actions/src/actions/local_actions.rs +++ b/crates/vcs_actions/src/actions/local_actions.rs @@ -1,17 +1,25 @@ -use std::net::SocketAddr; - use action_system::{action::ActionContext, action_gen}; +use log::info; use tcp_connection::error::TcpTargetError; -#[action_gen(local)] -pub async fn set_upstream_vault_action( - ctx: ActionContext, - _upstream: SocketAddr, -) -> Result<(), TcpTargetError> { - if ctx.is_remote() { - return Err(TcpTargetError::NotLocal( - "Action was not invoked on the local machine".to_string(), +#[action_gen] +pub async fn hello_world_action(ctx: ActionContext, _n: ()) -> Result<(), TcpTargetError> { + // Ensure the instance is available + let Some(instance) = ctx.instance() else { + return Err(TcpTargetError::NotFound( + "Connection Instance Lost.".to_string(), )); + }; + + if ctx.is_local() { + // Invoke on local + // Send the message to the server + let _ = instance.lock().await.write_text("Hello World!").await; + } else if ctx.is_remote() { + // Read the message from the client + let read = instance.lock().await.read_text().await?; + info!("{}", read) } + Ok(()) } diff --git a/crates/vcs_actions/src/connection/action_service.rs b/crates/vcs_actions/src/connection/action_service.rs index 9ea5957..0a49953 100644 --- a/crates/vcs_actions/src/connection/action_service.rs +++ b/crates/vcs_actions/src/connection/action_service.rs @@ -136,7 +136,10 @@ async fn process_connection(stream: TcpStream, vault: Arc, action_pool: A }; // Build context - let ctx = ActionContext::remote().insert_instance(instance); + let ctx: ActionContext = ActionContext::remote().insert_instance(instance); + + // Insert vault into context + let ctx = ctx.insert_arc(vault); // Process action let Ok(_result_json) = action_pool diff --git a/crates/vcs_actions/src/registry/client_registry.rs b/crates/vcs_actions/src/registry/client_registry.rs index 56acdad..47fd7ee 100644 --- a/crates/vcs_actions/src/registry/client_registry.rs +++ b/crates/vcs_actions/src/registry/client_registry.rs @@ -2,13 +2,12 @@ use action_system::{action::ActionContext, action_pool::ActionPool}; use tcp_connection::error::TcpTargetError; use crate::{ - actions::local_actions::register_set_upstream_vault_action, - connection::protocol::RemoteActionInvoke, + actions::local_actions::register_hello_world_action, connection::protocol::RemoteActionInvoke, }; fn register_actions(pool: &mut ActionPool) { // Pool register here - register_set_upstream_vault_action(pool); + register_hello_world_action(pool); } pub fn client_action_pool() -> ActionPool { @@ -19,13 +18,16 @@ pub fn client_action_pool() -> ActionPool { register_actions(&mut pool); // Add process events - pool.set_on_proc_begin(|ctx| Box::pin(on_proc_begin(ctx))); + pool.set_on_proc_begin(|ctx, args| Box::pin(on_proc_begin(ctx, args))); // Return pool } -async fn on_proc_begin(ctx: &mut ActionContext) -> Result<(), TcpTargetError> { +async fn on_proc_begin( + ctx: &ActionContext, + _args: &(dyn std::any::Any + Send + Sync), +) -> Result<(), TcpTargetError> { // Is ctx remote let is_remote = ctx.is_remote(); @@ -34,7 +36,7 @@ async fn on_proc_begin(ctx: &mut ActionContext) -> Result<(), TcpTargetError> { let action_args_json = ctx.action_args_json().clone(); // Get instance - let Some(instance) = ctx.instance_mut() else { + let Some(instance) = ctx.instance() else { return Err(TcpTargetError::Unsupported( "Missing ConnectionInstance in current context, this ActionPool does not support this call" .to_string())); @@ -49,7 +51,8 @@ async fn on_proc_begin(ctx: &mut ActionContext) -> Result<(), TcpTargetError> { }; // Send - instance.write_msgpack(msg).await?; + let mut instance = instance.lock().await; + instance.write_msgpack(&msg).await?; } // Return OK, wait for client to execute Action locally diff --git a/crates/vcs_actions/src/registry/server_registry.rs b/crates/vcs_actions/src/registry/server_registry.rs index 3ecc103..b449b68 100644 --- a/crates/vcs_actions/src/registry/server_registry.rs +++ b/crates/vcs_actions/src/registry/server_registry.rs @@ -1,9 +1,9 @@ use action_system::action_pool::ActionPool; -use crate::actions::local_actions::register_set_upstream_vault_action; +use crate::actions::local_actions::register_hello_world_action; pub fn server_action_pool() -> ActionPool { let mut pool = ActionPool::new(); - register_set_upstream_vault_action(&mut pool); + register_hello_world_action(&mut pool); pool } -- cgit