From acf0804b5f9bdc2796d847919a8ae20103be600a Mon Sep 17 00:00:00 2001 From: 魏曹先生 <1992414357@qq.com> Date: Mon, 13 Oct 2025 14:17:51 +0800 Subject: feat: implement asynchronous action call system - Add async callback support with proper argument passing - Implement remote action invocation via TCP connection - Add hello_world_action example demonstrating async communication - Improve ActionPool with type-safe async processing - Update client registry for remote action handling - Enhance ActionContext with better instance management - Support both local and remote action execution modes --- crates/system_action/src/action_pool.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) (limited to 'crates/system_action/src/action_pool.rs') diff --git a/crates/system_action/src/action_pool.rs b/crates/system_action/src/action_pool.rs index 7e93fc4..f3e178a 100644 --- a/crates/system_action/src/action_pool.rs +++ b/crates/system_action/src/action_pool.rs @@ -8,7 +8,8 @@ use crate::action::{Action, ActionContext}; type ProcBeginCallback = for<'a> fn( - &'a mut ActionContext, + &'a ActionContext, + args: &'a (dyn std::any::Any + Send + Sync), ) -> Pin> + Send + 'a>>; type ProcEndCallback = fn() -> Pin> + Send>>; @@ -85,9 +86,9 @@ impl ActionPool { if let Some(action) = self.actions.get(action_name) { // Set action name and args in context for callbacks let context = context.set_action_name(action_name.to_string()); - let mut context = context.set_action_args_json(args_json.clone()); + let context = context.set_action_args(args_json.clone()); - let _ = self.exec_on_proc_begin(&mut context).await?; + let _ = self.exec_on_proc_begin(&context, &args_json).await?; let result = action.process_json_erased(context, args_json).await?; let _ = self.exec_on_proc_end().await?; Ok(result) @@ -109,11 +110,11 @@ impl ActionPool { args: Args, ) -> Result where - Args: serde::de::DeserializeOwned + Send + 'static, + Args: serde::de::DeserializeOwned + Send + Sync + 'static, Return: serde::Serialize + Send + 'static, { if let Some(action) = self.actions.get(action_name) { - let _ = self.exec_on_proc_begin(&mut context).await?; + let _ = self.exec_on_proc_begin(&context, &args).await?; let result = action.process_erased(context, Box::new(args)).await?; let result = *result .downcast::() @@ -126,9 +127,13 @@ impl ActionPool { } /// Executes the process begin callback if set - async fn exec_on_proc_begin(&self, context: &mut ActionContext) -> Result<(), TcpTargetError> { + async fn exec_on_proc_begin( + &self, + context: &ActionContext, + args: &(dyn std::any::Any + Send + Sync), + ) -> Result<(), TcpTargetError> { if let Some(callback) = &self.on_proc_begin { - callback(context).await + callback(context, args).await } else { Ok(()) } -- cgit