summaryrefslogtreecommitdiff
path: root/crates
diff options
context:
space:
mode:
author魏曹先生 <1992414357@qq.com>2025-10-13 14:17:51 +0800
committer魏曹先生 <1992414357@qq.com>2025-10-13 14:17:51 +0800
commitacf0804b5f9bdc2796d847919a8ae20103be600a (patch)
tree96eb75fad0d12e4a6c0c8e2148b555899602b540 /crates
parent67fb8ec01b351c6c9fd2af321166bb92250b1218 (diff)
feat: implement asynchronous action call system
- Add async callback support with proper argument passing - Implement remote action invocation via TCP connection - Add hello_world_action example demonstrating async communication - Improve ActionPool with type-safe async processing - Update client registry for remote action handling - Enhance ActionContext with better instance management - Support both local and remote action execution modes
Diffstat (limited to 'crates')
-rw-r--r--crates/system_action/action_macros/src/lib.rs4
-rw-r--r--crates/system_action/src/action.rs70
-rw-r--r--crates/system_action/src/action_pool.rs19
-rw-r--r--crates/vcs_actions/src/actions/local_actions.rs28
-rw-r--r--crates/vcs_actions/src/connection/action_service.rs5
-rw-r--r--crates/vcs_actions/src/registry/client_registry.rs17
-rw-r--r--crates/vcs_actions/src/registry/server_registry.rs4
7 files changed, 109 insertions, 38 deletions
diff --git a/crates/system_action/action_macros/src/lib.rs b/crates/system_action/action_macros/src/lib.rs
index aa1c696..d1a47ee 100644
--- a/crates/system_action/action_macros/src/lib.rs
+++ b/crates/system_action/action_macros/src/lib.rs
@@ -89,12 +89,12 @@ fn generate_action_struct(input_fn: ItemFn, _is_local: bool) -> proc_macro2::Tok
#[doc = ""]
#[doc = "Register the action to the pool."]
#[doc = "```ignore"]
- #[doc = "YourAction::register_to_pool(&mut pool);"]
+ #[doc = "register_your_func(&mut pool);"]
#[doc = "```"]
#[doc = ""]
#[doc = "Process the action at the pool."]
#[doc = "```ignore"]
- #[doc = "let result = YourAction::process_at_pool(&pool, ctx, arg).await?;"]
+ #[doc = "let result = proc_your_func(&pool, ctx, arg).await?;"]
#[doc = "```"]
#fn_vis #fn_sig #fn_block
}
diff --git a/crates/system_action/src/action.rs b/crates/system_action/src/action.rs
index e7d2d8c..3ae5711 100644
--- a/crates/system_action/src/action.rs
+++ b/crates/system_action/src/action.rs
@@ -1,6 +1,9 @@
use serde::{Serialize, de::DeserializeOwned};
+use std::any::{Any, TypeId};
+use std::collections::HashMap;
+use std::sync::Arc;
use tcp_connection::{error::TcpTargetError, instance::ConnectionInstance};
-use tokio::net::TcpStream;
+use tokio::{net::TcpStream, sync::Mutex};
pub trait Action<Args, Return>
where
@@ -29,8 +32,10 @@ pub struct ActionContext {
action_args_json: String,
/// The connection instance in the current context,
- /// used to interact with the machine on the other end
- instance: Option<ConnectionInstance>,
+ instance: Option<Arc<Mutex<ConnectionInstance>>>,
+
+ /// Generic data storage for arbitrary types
+ data: HashMap<TypeId, Arc<dyn Any + Send + Sync>>,
}
impl ActionContext {
@@ -50,18 +55,18 @@ impl ActionContext {
/// Build connection instance from TcpStream
pub fn build_instance(mut self, stream: TcpStream) -> Self {
- self.instance = Some(ConnectionInstance::from(stream));
+ self.instance = Some(Arc::new(Mutex::new(ConnectionInstance::from(stream))));
self
}
/// Insert connection instance into context
pub fn insert_instance(mut self, instance: ConnectionInstance) -> Self {
- self.instance = Some(instance);
+ self.instance = Some(Arc::new(Mutex::new(instance)));
self
}
/// Pop connection instance from context
- pub fn pop_instance(&mut self) -> Option<ConnectionInstance> {
+ pub fn pop_instance(&mut self) -> Option<Arc<Mutex<ConnectionInstance>>> {
self.instance.take()
}
}
@@ -78,12 +83,12 @@ impl ActionContext {
}
/// Get the connection instance in the current context
- pub fn instance(&self) -> &Option<ConnectionInstance> {
+ pub fn instance(&self) -> &Option<Arc<Mutex<ConnectionInstance>>> {
&self.instance
}
/// Get a mutable reference to the connection instance in the current context
- pub fn instance_mut(&mut self) -> &mut Option<ConnectionInstance> {
+ pub fn instance_mut(&mut self) -> &mut Option<Arc<Mutex<ConnectionInstance>>> {
&mut self.instance
}
@@ -104,8 +109,55 @@ impl ActionContext {
}
/// Set the action arguments in the context
- pub fn set_action_args_json(mut self, action_args: String) -> Self {
+ pub fn set_action_args(mut self, action_args: String) -> Self {
self.action_args_json = action_args;
self
}
+
+ /// Insert arbitrary data in the context
+ pub fn insert<T: Any + Send + Sync>(mut self, value: T) -> Self {
+ self.data.insert(TypeId::of::<T>(), Arc::new(value));
+ self
+ }
+
+ /// Insert arbitrary data as Arc in the context
+ pub fn insert_arc<T: Any + Send + Sync>(mut self, value: Arc<T>) -> Self {
+ self.data.insert(TypeId::of::<T>(), value);
+ self
+ }
+
+ /// Get arbitrary data from the context
+ pub fn get<T: Any + Send + Sync>(&self) -> Option<&T> {
+ self.data
+ .get(&TypeId::of::<T>())
+ .and_then(|arc| arc.downcast_ref::<T>())
+ }
+
+ /// Get arbitrary data as Arc from the context
+ pub fn get_arc<T: Any + Send + Sync>(&self) -> Option<Arc<T>> {
+ self.data
+ .get(&TypeId::of::<T>())
+ .and_then(|arc| Arc::clone(arc).downcast::<T>().ok())
+ }
+
+ /// Remove and return arbitrary data from the context
+ pub fn remove<T: Any + Send + Sync>(&mut self) -> Option<Arc<T>> {
+ self.data
+ .remove(&TypeId::of::<T>())
+ .and_then(|arc| arc.downcast::<T>().ok())
+ }
+
+ /// Check if the context contains data of a specific type
+ pub fn contains<T: Any + Send + Sync>(&self) -> bool {
+ self.data.contains_key(&TypeId::of::<T>())
+ }
+
+ /// Take ownership of the context and extract data of a specific type
+ pub fn take<T: Any + Send + Sync>(mut self) -> (Self, Option<Arc<T>>) {
+ let value = self
+ .data
+ .remove(&TypeId::of::<T>())
+ .and_then(|arc| arc.downcast::<T>().ok());
+ (self, value)
+ }
}
diff --git a/crates/system_action/src/action_pool.rs b/crates/system_action/src/action_pool.rs
index 7e93fc4..f3e178a 100644
--- a/crates/system_action/src/action_pool.rs
+++ b/crates/system_action/src/action_pool.rs
@@ -8,7 +8,8 @@ use crate::action::{Action, ActionContext};
type ProcBeginCallback =
for<'a> fn(
- &'a mut ActionContext,
+ &'a ActionContext,
+ args: &'a (dyn std::any::Any + Send + Sync),
) -> Pin<Box<dyn Future<Output = Result<(), TcpTargetError>> + Send + 'a>>;
type ProcEndCallback = fn() -> Pin<Box<dyn Future<Output = Result<(), TcpTargetError>> + Send>>;
@@ -85,9 +86,9 @@ impl ActionPool {
if let Some(action) = self.actions.get(action_name) {
// Set action name and args in context for callbacks
let context = context.set_action_name(action_name.to_string());
- let mut context = context.set_action_args_json(args_json.clone());
+ let context = context.set_action_args(args_json.clone());
- let _ = self.exec_on_proc_begin(&mut context).await?;
+ let _ = self.exec_on_proc_begin(&context, &args_json).await?;
let result = action.process_json_erased(context, args_json).await?;
let _ = self.exec_on_proc_end().await?;
Ok(result)
@@ -109,11 +110,11 @@ impl ActionPool {
args: Args,
) -> Result<Return, TcpTargetError>
where
- Args: serde::de::DeserializeOwned + Send + 'static,
+ Args: serde::de::DeserializeOwned + Send + Sync + 'static,
Return: serde::Serialize + Send + 'static,
{
if let Some(action) = self.actions.get(action_name) {
- let _ = self.exec_on_proc_begin(&mut context).await?;
+ let _ = self.exec_on_proc_begin(&context, &args).await?;
let result = action.process_erased(context, Box::new(args)).await?;
let result = *result
.downcast::<Return>()
@@ -126,9 +127,13 @@ impl ActionPool {
}
/// Executes the process begin callback if set
- async fn exec_on_proc_begin(&self, context: &mut ActionContext) -> Result<(), TcpTargetError> {
+ async fn exec_on_proc_begin(
+ &self,
+ context: &ActionContext,
+ args: &(dyn std::any::Any + Send + Sync),
+ ) -> Result<(), TcpTargetError> {
if let Some(callback) = &self.on_proc_begin {
- callback(context).await
+ callback(context, args).await
} else {
Ok(())
}
diff --git a/crates/vcs_actions/src/actions/local_actions.rs b/crates/vcs_actions/src/actions/local_actions.rs
index 0e210a7..b11a934 100644
--- a/crates/vcs_actions/src/actions/local_actions.rs
+++ b/crates/vcs_actions/src/actions/local_actions.rs
@@ -1,17 +1,25 @@
-use std::net::SocketAddr;
-
use action_system::{action::ActionContext, action_gen};
+use log::info;
use tcp_connection::error::TcpTargetError;
-#[action_gen(local)]
-pub async fn set_upstream_vault_action(
- ctx: ActionContext,
- _upstream: SocketAddr,
-) -> Result<(), TcpTargetError> {
- if ctx.is_remote() {
- return Err(TcpTargetError::NotLocal(
- "Action was not invoked on the local machine".to_string(),
+#[action_gen]
+pub async fn hello_world_action(ctx: ActionContext, _n: ()) -> Result<(), TcpTargetError> {
+ // Ensure the instance is available
+ let Some(instance) = ctx.instance() else {
+ return Err(TcpTargetError::NotFound(
+ "Connection Instance Lost.".to_string(),
));
+ };
+
+ if ctx.is_local() {
+ // Invoke on local
+ // Send the message to the server
+ let _ = instance.lock().await.write_text("Hello World!").await;
+ } else if ctx.is_remote() {
+ // Read the message from the client
+ let read = instance.lock().await.read_text().await?;
+ info!("{}", read)
}
+
Ok(())
}
diff --git a/crates/vcs_actions/src/connection/action_service.rs b/crates/vcs_actions/src/connection/action_service.rs
index 9ea5957..0a49953 100644
--- a/crates/vcs_actions/src/connection/action_service.rs
+++ b/crates/vcs_actions/src/connection/action_service.rs
@@ -136,7 +136,10 @@ async fn process_connection(stream: TcpStream, vault: Arc<Vault>, action_pool: A
};
// Build context
- let ctx = ActionContext::remote().insert_instance(instance);
+ let ctx: ActionContext = ActionContext::remote().insert_instance(instance);
+
+ // Insert vault into context
+ let ctx = ctx.insert_arc(vault);
// Process action
let Ok(_result_json) = action_pool
diff --git a/crates/vcs_actions/src/registry/client_registry.rs b/crates/vcs_actions/src/registry/client_registry.rs
index 56acdad..47fd7ee 100644
--- a/crates/vcs_actions/src/registry/client_registry.rs
+++ b/crates/vcs_actions/src/registry/client_registry.rs
@@ -2,13 +2,12 @@ use action_system::{action::ActionContext, action_pool::ActionPool};
use tcp_connection::error::TcpTargetError;
use crate::{
- actions::local_actions::register_set_upstream_vault_action,
- connection::protocol::RemoteActionInvoke,
+ actions::local_actions::register_hello_world_action, connection::protocol::RemoteActionInvoke,
};
fn register_actions(pool: &mut ActionPool) {
// Pool register here
- register_set_upstream_vault_action(pool);
+ register_hello_world_action(pool);
}
pub fn client_action_pool() -> ActionPool {
@@ -19,13 +18,16 @@ pub fn client_action_pool() -> ActionPool {
register_actions(&mut pool);
// Add process events
- pool.set_on_proc_begin(|ctx| Box::pin(on_proc_begin(ctx)));
+ pool.set_on_proc_begin(|ctx, args| Box::pin(on_proc_begin(ctx, args)));
// Return
pool
}
-async fn on_proc_begin(ctx: &mut ActionContext) -> Result<(), TcpTargetError> {
+async fn on_proc_begin(
+ ctx: &ActionContext,
+ _args: &(dyn std::any::Any + Send + Sync),
+) -> Result<(), TcpTargetError> {
// Is ctx remote
let is_remote = ctx.is_remote();
@@ -34,7 +36,7 @@ async fn on_proc_begin(ctx: &mut ActionContext) -> Result<(), TcpTargetError> {
let action_args_json = ctx.action_args_json().clone();
// Get instance
- let Some(instance) = ctx.instance_mut() else {
+ let Some(instance) = ctx.instance() else {
return Err(TcpTargetError::Unsupported(
"Missing ConnectionInstance in current context, this ActionPool does not support this call"
.to_string()));
@@ -49,7 +51,8 @@ async fn on_proc_begin(ctx: &mut ActionContext) -> Result<(), TcpTargetError> {
};
// Send
- instance.write_msgpack(msg).await?;
+ let mut instance = instance.lock().await;
+ instance.write_msgpack(&msg).await?;
}
// Return OK, wait for client to execute Action locally
diff --git a/crates/vcs_actions/src/registry/server_registry.rs b/crates/vcs_actions/src/registry/server_registry.rs
index 3ecc103..b449b68 100644
--- a/crates/vcs_actions/src/registry/server_registry.rs
+++ b/crates/vcs_actions/src/registry/server_registry.rs
@@ -1,9 +1,9 @@
use action_system::action_pool::ActionPool;
-use crate::actions::local_actions::register_set_upstream_vault_action;
+use crate::actions::local_actions::register_hello_world_action;
pub fn server_action_pool() -> ActionPool {
let mut pool = ActionPool::new();
- register_set_upstream_vault_action(&mut pool);
+ register_hello_world_action(&mut pool);
pool
}