From 67fb8ec01b351c6c9fd2af321166bb92250b1218 Mon Sep 17 00:00:00 2001 From: 魏曹先生 <1992414357@qq.com> Date: Mon, 13 Oct 2025 13:34:39 +0800 Subject: feat: Implement JSON-based type-erased action invocation - Add process_json method to ActionPool for type-agnostic calls using JSON serialization - Extend ActionContext with action_name and action_args fields and setter methods - Update action_gen macro to use process_json instead of typed process method - Implement remote action invocation framework in client_registry and action_service - Add protocol definitions for remote action communication - Enable flexible action execution without explicit type specifications --- crates/system_action/action_macros/src/lib.rs | 17 ++++--- crates/system_action/src/action.rs | 38 +++++++++++++++ crates/system_action/src/action_pool.rs | 67 ++++++++++++++++++++++++--- 3 files changed, 110 insertions(+), 12 deletions(-) (limited to 'crates/system_action') diff --git a/crates/system_action/action_macros/src/lib.rs b/crates/system_action/action_macros/src/lib.rs index 683efcb..aa1c696 100644 --- a/crates/system_action/action_macros/src/lib.rs +++ b/crates/system_action/action_macros/src/lib.rs @@ -67,14 +67,19 @@ fn generate_action_struct(input_fn: ItemFn, _is_local: bool) -> proc_macro2::Tok ctx: action_system::action::ActionContext, #arg_param_name: #arg_type ) -> Result<#return_type, tcp_connection::error::TcpTargetError> { - pool.process::<#arg_type, #return_type>( + let args_json = serde_json::to_string(&#arg_param_name) + .map_err(|e| { + tcp_connection::error::TcpTargetError::Serialization(e.to_string()) + })?; + let result_json = pool.process_json( Box::leak(string_proc::snake_case!(stringify!(#action_name_ident)).into_boxed_str()), ctx, - serde_json::to_string(&#arg_param_name) - .map_err(|e| { - tcp_connection::error::TcpTargetError::Serialization(e.to_string()) - })? - ).await + args_json, + ).await?; + serde_json::from_str(&result_json) + .map_err(|e| { + tcp_connection::error::TcpTargetError::Serialization(e.to_string()) + }) } #[allow(dead_code)] diff --git a/crates/system_action/src/action.rs b/crates/system_action/src/action.rs index c8d7a18..e7d2d8c 100644 --- a/crates/system_action/src/action.rs +++ b/crates/system_action/src/action.rs @@ -22,6 +22,12 @@ pub struct ActionContext { /// Whether the action is executed locally or remotely local: bool, + /// The name of the action being executed + action_name: String, + + /// The JSON-serialized arguments for the action + action_args_json: String, + /// The connection instance in the current context, /// used to interact with the machine on the other end instance: Option, @@ -53,6 +59,11 @@ impl ActionContext { self.instance = Some(instance); self } + + /// Pop connection instance from context + pub fn pop_instance(&mut self) -> Option { + self.instance.take() + } } impl ActionContext { @@ -70,4 +81,31 @@ impl ActionContext { pub fn instance(&self) -> &Option { &self.instance } + + /// Get a mutable reference to the connection instance in the current context + pub fn instance_mut(&mut self) -> &mut Option { + &mut self.instance + } + + /// Get the action name from the context + pub fn action_name(&self) -> &str { + &self.action_name + } + + /// Get the action arguments from the context + pub fn action_args_json(&self) -> &String { + &self.action_args_json + } + + /// Set the action name in the context + pub fn set_action_name(mut self, action_name: String) -> Self { + self.action_name = action_name; + self + } + + /// Set the action arguments in the context + pub fn set_action_args_json(mut self, action_args: String) -> Self { + self.action_args_json = action_args; + self + } } diff --git a/crates/system_action/src/action_pool.rs b/crates/system_action/src/action_pool.rs index a3e82d6..7e93fc4 100644 --- a/crates/system_action/src/action_pool.rs +++ b/crates/system_action/src/action_pool.rs @@ -1,13 +1,14 @@ use std::pin::Pin; use serde::{Serialize, de::DeserializeOwned}; +use serde_json; use tcp_connection::error::TcpTargetError; use crate::action::{Action, ActionContext}; type ProcBeginCallback = for<'a> fn( - &'a ActionContext, + &'a mut ActionContext, ) -> Pin> + Send + 'a>>; type ProcEndCallback = fn() -> Pin> + Send>>; @@ -68,20 +69,51 @@ impl ActionPool { /// ```ignore /// let result = action_pool.process::("my_action", context, args).await?; /// ``` - pub async fn process<'a, Args, Return>( + /// Processes an action by name with JSON-serialized arguments + /// + /// Usage: + /// ```ignore + /// let result_json = action_pool.process_json("my_action", context, args_json).await?; + /// let result: MyReturn = serde_json::from_str(&result_json)?; + /// ``` + pub async fn process_json<'a>( &'a self, action_name: &'a str, context: ActionContext, args_json: String, + ) -> Result { + if let Some(action) = self.actions.get(action_name) { + // Set action name and args in context for callbacks + let context = context.set_action_name(action_name.to_string()); + let mut context = context.set_action_args_json(args_json.clone()); + + let _ = self.exec_on_proc_begin(&mut context).await?; + let result = action.process_json_erased(context, args_json).await?; + let _ = self.exec_on_proc_end().await?; + Ok(result) + } else { + Err(TcpTargetError::Unsupported("InvalidAction".to_string())) + } + } + + /// Processes an action by name with given context and arguments + /// + /// Usage: + /// ```ignore + /// let result = action_pool.process::("my_action", context, args).await?; + /// ``` + pub async fn process<'a, Args, Return>( + &'a self, + action_name: &'a str, + mut context: ActionContext, + args: Args, ) -> Result where Args: serde::de::DeserializeOwned + Send + 'static, Return: serde::Serialize + Send + 'static, { if let Some(action) = self.actions.get(action_name) { - let _ = self.exec_on_proc_begin(&context).await?; - let args: Args = serde_json::from_str(&args_json) - .map_err(|e| TcpTargetError::Serialization(format!("Deserialize failed: {}", e)))?; + let _ = self.exec_on_proc_begin(&mut context).await?; let result = action.process_erased(context, Box::new(args)).await?; let result = *result .downcast::() @@ -94,7 +126,7 @@ impl ActionPool { } /// Executes the process begin callback if set - async fn exec_on_proc_begin(&self, context: &ActionContext) -> Result<(), TcpTargetError> { + async fn exec_on_proc_begin(&self, context: &mut ActionContext) -> Result<(), TcpTargetError> { if let Some(callback) = &self.on_proc_begin { callback(context).await } else { @@ -125,6 +157,13 @@ trait ActionErased: Send + Sync { + Send, >, >; + + /// Processes the action with JSON-serialized arguments and returns JSON-serialized result + fn process_json_erased( + &self, + context: ActionContext, + args_json: String, + ) -> std::pin::Pin> + Send>>; } /// Wrapper struct that implements ActionErased for concrete Action types @@ -154,4 +193,20 @@ where Ok(Box::new(result) as Box) }) } + + fn process_json_erased( + &self, + context: ActionContext, + args_json: String, + ) -> std::pin::Pin> + Send>> + { + Box::pin(async move { + let args: Args = serde_json::from_str(&args_json) + .map_err(|e| TcpTargetError::Serialization(format!("Deserialize failed: {}", e)))?; + let result = A::process(context, args).await?; + let result_json = serde_json::to_string(&result) + .map_err(|e| TcpTargetError::Serialization(format!("Serialize failed: {}", e)))?; + Ok(result_json) + }) + } } -- cgit