From acf0804b5f9bdc2796d847919a8ae20103be600a Mon Sep 17 00:00:00 2001 From: 魏曹先生 <1992414357@qq.com> Date: Mon, 13 Oct 2025 14:17:51 +0800 Subject: feat: implement asynchronous action call system - Add async callback support with proper argument passing - Implement remote action invocation via TCP connection - Add hello_world_action example demonstrating async communication - Improve ActionPool with type-safe async processing - Update client registry for remote action handling - Enhance ActionContext with better instance management - Support both local and remote action execution modes --- crates/vcs_actions/src/actions/local_actions.rs | 28 ++++++++++++++-------- .../vcs_actions/src/connection/action_service.rs | 5 +++- crates/vcs_actions/src/registry/client_registry.rs | 17 +++++++------ crates/vcs_actions/src/registry/server_registry.rs | 4 ++-- 4 files changed, 34 insertions(+), 20 deletions(-) (limited to 'crates/vcs_actions') diff --git a/crates/vcs_actions/src/actions/local_actions.rs b/crates/vcs_actions/src/actions/local_actions.rs index 0e210a7..b11a934 100644 --- a/crates/vcs_actions/src/actions/local_actions.rs +++ b/crates/vcs_actions/src/actions/local_actions.rs @@ -1,17 +1,25 @@ -use std::net::SocketAddr; - use action_system::{action::ActionContext, action_gen}; +use log::info; use tcp_connection::error::TcpTargetError; -#[action_gen(local)] -pub async fn set_upstream_vault_action( - ctx: ActionContext, - _upstream: SocketAddr, -) -> Result<(), TcpTargetError> { - if ctx.is_remote() { - return Err(TcpTargetError::NotLocal( - "Action was not invoked on the local machine".to_string(), +#[action_gen] +pub async fn hello_world_action(ctx: ActionContext, _n: ()) -> Result<(), TcpTargetError> { + // Ensure the instance is available + let Some(instance) = ctx.instance() else { + return Err(TcpTargetError::NotFound( + "Connection Instance Lost.".to_string(), )); + }; + + if ctx.is_local() { + // Invoke on local + // Send the message to the server + let _ = instance.lock().await.write_text("Hello World!").await; + } else if ctx.is_remote() { + // Read the message from the client + let read = instance.lock().await.read_text().await?; + info!("{}", read) } + Ok(()) } diff --git a/crates/vcs_actions/src/connection/action_service.rs b/crates/vcs_actions/src/connection/action_service.rs index 9ea5957..0a49953 100644 --- a/crates/vcs_actions/src/connection/action_service.rs +++ b/crates/vcs_actions/src/connection/action_service.rs @@ -136,7 +136,10 @@ async fn process_connection(stream: TcpStream, vault: Arc, action_pool: A }; // Build context - let ctx = ActionContext::remote().insert_instance(instance); + let ctx: ActionContext = ActionContext::remote().insert_instance(instance); + + // Insert vault into context + let ctx = ctx.insert_arc(vault); // Process action let Ok(_result_json) = action_pool diff --git a/crates/vcs_actions/src/registry/client_registry.rs b/crates/vcs_actions/src/registry/client_registry.rs index 56acdad..47fd7ee 100644 --- a/crates/vcs_actions/src/registry/client_registry.rs +++ b/crates/vcs_actions/src/registry/client_registry.rs @@ -2,13 +2,12 @@ use action_system::{action::ActionContext, action_pool::ActionPool}; use tcp_connection::error::TcpTargetError; use crate::{ - actions::local_actions::register_set_upstream_vault_action, - connection::protocol::RemoteActionInvoke, + actions::local_actions::register_hello_world_action, connection::protocol::RemoteActionInvoke, }; fn register_actions(pool: &mut ActionPool) { // Pool register here - register_set_upstream_vault_action(pool); + register_hello_world_action(pool); } pub fn client_action_pool() -> ActionPool { @@ -19,13 +18,16 @@ pub fn client_action_pool() -> ActionPool { register_actions(&mut pool); // Add process events - pool.set_on_proc_begin(|ctx| Box::pin(on_proc_begin(ctx))); + pool.set_on_proc_begin(|ctx, args| Box::pin(on_proc_begin(ctx, args))); // Return pool } -async fn on_proc_begin(ctx: &mut ActionContext) -> Result<(), TcpTargetError> { +async fn on_proc_begin( + ctx: &ActionContext, + _args: &(dyn std::any::Any + Send + Sync), +) -> Result<(), TcpTargetError> { // Is ctx remote let is_remote = ctx.is_remote(); @@ -34,7 +36,7 @@ async fn on_proc_begin(ctx: &mut ActionContext) -> Result<(), TcpTargetError> { let action_args_json = ctx.action_args_json().clone(); // Get instance - let Some(instance) = ctx.instance_mut() else { + let Some(instance) = ctx.instance() else { return Err(TcpTargetError::Unsupported( "Missing ConnectionInstance in current context, this ActionPool does not support this call" .to_string())); @@ -49,7 +51,8 @@ async fn on_proc_begin(ctx: &mut ActionContext) -> Result<(), TcpTargetError> { }; // Send - instance.write_msgpack(msg).await?; + let mut instance = instance.lock().await; + instance.write_msgpack(&msg).await?; } // Return OK, wait for client to execute Action locally diff --git a/crates/vcs_actions/src/registry/server_registry.rs b/crates/vcs_actions/src/registry/server_registry.rs index 3ecc103..b449b68 100644 --- a/crates/vcs_actions/src/registry/server_registry.rs +++ b/crates/vcs_actions/src/registry/server_registry.rs @@ -1,9 +1,9 @@ use action_system::action_pool::ActionPool; -use crate::actions::local_actions::register_set_upstream_vault_action; +use crate::actions::local_actions::register_hello_world_action; pub fn server_action_pool() -> ActionPool { let mut pool = ActionPool::new(); - register_set_upstream_vault_action(&mut pool); + register_hello_world_action(&mut pool); pool } -- cgit