summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--crates/system_action/action_macros/src/lib.rs17
-rw-r--r--crates/system_action/src/action.rs38
-rw-r--r--crates/system_action/src/action_pool.rs67
-rw-r--r--crates/vcs_actions/src/actions/local_actions.rs2
-rw-r--r--crates/vcs_actions/src/connection.rs1
-rw-r--r--crates/vcs_actions/src/connection/action_service.rs29
-rw-r--r--crates/vcs_actions/src/connection/protocol.rs7
-rw-r--r--crates/vcs_actions/src/registry/client_registry.rs28
8 files changed, 166 insertions, 23 deletions
diff --git a/crates/system_action/action_macros/src/lib.rs b/crates/system_action/action_macros/src/lib.rs
index 683efcb..aa1c696 100644
--- a/crates/system_action/action_macros/src/lib.rs
+++ b/crates/system_action/action_macros/src/lib.rs
@@ -67,14 +67,19 @@ fn generate_action_struct(input_fn: ItemFn, _is_local: bool) -> proc_macro2::Tok
ctx: action_system::action::ActionContext,
#arg_param_name: #arg_type
) -> Result<#return_type, tcp_connection::error::TcpTargetError> {
- pool.process::<#arg_type, #return_type>(
+ let args_json = serde_json::to_string(&#arg_param_name)
+ .map_err(|e| {
+ tcp_connection::error::TcpTargetError::Serialization(e.to_string())
+ })?;
+ let result_json = pool.process_json(
Box::leak(string_proc::snake_case!(stringify!(#action_name_ident)).into_boxed_str()),
ctx,
- serde_json::to_string(&#arg_param_name)
- .map_err(|e| {
- tcp_connection::error::TcpTargetError::Serialization(e.to_string())
- })?
- ).await
+ args_json,
+ ).await?;
+ serde_json::from_str(&result_json)
+ .map_err(|e| {
+ tcp_connection::error::TcpTargetError::Serialization(e.to_string())
+ })
}
#[allow(dead_code)]
diff --git a/crates/system_action/src/action.rs b/crates/system_action/src/action.rs
index c8d7a18..e7d2d8c 100644
--- a/crates/system_action/src/action.rs
+++ b/crates/system_action/src/action.rs
@@ -22,6 +22,12 @@ pub struct ActionContext {
/// Whether the action is executed locally or remotely
local: bool,
+ /// The name of the action being executed
+ action_name: String,
+
+ /// The JSON-serialized arguments for the action
+ action_args_json: String,
+
/// The connection instance in the current context,
/// used to interact with the machine on the other end
instance: Option<ConnectionInstance>,
@@ -53,6 +59,11 @@ impl ActionContext {
self.instance = Some(instance);
self
}
+
+ /// Pop connection instance from context
+ pub fn pop_instance(&mut self) -> Option<ConnectionInstance> {
+ self.instance.take()
+ }
}
impl ActionContext {
@@ -70,4 +81,31 @@ impl ActionContext {
pub fn instance(&self) -> &Option<ConnectionInstance> {
&self.instance
}
+
+ /// Get a mutable reference to the connection instance in the current context
+ pub fn instance_mut(&mut self) -> &mut Option<ConnectionInstance> {
+ &mut self.instance
+ }
+
+ /// Get the action name from the context
+ pub fn action_name(&self) -> &str {
+ &self.action_name
+ }
+
+ /// Get the action arguments from the context
+ pub fn action_args_json(&self) -> &String {
+ &self.action_args_json
+ }
+
+ /// Set the action name in the context
+ pub fn set_action_name(mut self, action_name: String) -> Self {
+ self.action_name = action_name;
+ self
+ }
+
+ /// Set the action arguments in the context
+ pub fn set_action_args_json(mut self, action_args: String) -> Self {
+ self.action_args_json = action_args;
+ self
+ }
}
diff --git a/crates/system_action/src/action_pool.rs b/crates/system_action/src/action_pool.rs
index a3e82d6..7e93fc4 100644
--- a/crates/system_action/src/action_pool.rs
+++ b/crates/system_action/src/action_pool.rs
@@ -1,13 +1,14 @@
use std::pin::Pin;
use serde::{Serialize, de::DeserializeOwned};
+use serde_json;
use tcp_connection::error::TcpTargetError;
use crate::action::{Action, ActionContext};
type ProcBeginCallback =
for<'a> fn(
- &'a ActionContext,
+ &'a mut ActionContext,
) -> Pin<Box<dyn Future<Output = Result<(), TcpTargetError>> + Send + 'a>>;
type ProcEndCallback = fn() -> Pin<Box<dyn Future<Output = Result<(), TcpTargetError>> + Send>>;
@@ -68,20 +69,51 @@ impl ActionPool {
/// ```ignore
/// let result = action_pool.process::<MyArgs, MyReturn>("my_action", context, args).await?;
/// ```
- pub async fn process<'a, Args, Return>(
+ /// Processes an action by name with JSON-serialized arguments
+ ///
+ /// Usage:
+ /// ```ignore
+ /// let result_json = action_pool.process_json("my_action", context, args_json).await?;
+ /// let result: MyReturn = serde_json::from_str(&result_json)?;
+ /// ```
+ pub async fn process_json<'a>(
&'a self,
action_name: &'a str,
context: ActionContext,
args_json: String,
+ ) -> Result<String, TcpTargetError> {
+ if let Some(action) = self.actions.get(action_name) {
+ // Set action name and args in context for callbacks
+ let context = context.set_action_name(action_name.to_string());
+ let mut context = context.set_action_args_json(args_json.clone());
+
+ let _ = self.exec_on_proc_begin(&mut context).await?;
+ let result = action.process_json_erased(context, args_json).await?;
+ let _ = self.exec_on_proc_end().await?;
+ Ok(result)
+ } else {
+ Err(TcpTargetError::Unsupported("InvalidAction".to_string()))
+ }
+ }
+
+ /// Processes an action by name with given context and arguments
+ ///
+ /// Usage:
+ /// ```ignore
+ /// let result = action_pool.process::<MyArgs, MyReturn>("my_action", context, args).await?;
+ /// ```
+ pub async fn process<'a, Args, Return>(
+ &'a self,
+ action_name: &'a str,
+ mut context: ActionContext,
+ args: Args,
) -> Result<Return, TcpTargetError>
where
Args: serde::de::DeserializeOwned + Send + 'static,
Return: serde::Serialize + Send + 'static,
{
if let Some(action) = self.actions.get(action_name) {
- let _ = self.exec_on_proc_begin(&context).await?;
- let args: Args = serde_json::from_str(&args_json)
- .map_err(|e| TcpTargetError::Serialization(format!("Deserialize failed: {}", e)))?;
+ let _ = self.exec_on_proc_begin(&mut context).await?;
let result = action.process_erased(context, Box::new(args)).await?;
let result = *result
.downcast::<Return>()
@@ -94,7 +126,7 @@ impl ActionPool {
}
/// Executes the process begin callback if set
- async fn exec_on_proc_begin(&self, context: &ActionContext) -> Result<(), TcpTargetError> {
+ async fn exec_on_proc_begin(&self, context: &mut ActionContext) -> Result<(), TcpTargetError> {
if let Some(callback) = &self.on_proc_begin {
callback(context).await
} else {
@@ -125,6 +157,13 @@ trait ActionErased: Send + Sync {
+ Send,
>,
>;
+
+ /// Processes the action with JSON-serialized arguments and returns JSON-serialized result
+ fn process_json_erased(
+ &self,
+ context: ActionContext,
+ args_json: String,
+ ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, TcpTargetError>> + Send>>;
}
/// Wrapper struct that implements ActionErased for concrete Action types
@@ -154,4 +193,20 @@ where
Ok(Box::new(result) as Box<dyn std::any::Any + Send>)
})
}
+
+ fn process_json_erased(
+ &self,
+ context: ActionContext,
+ args_json: String,
+ ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, TcpTargetError>> + Send>>
+ {
+ Box::pin(async move {
+ let args: Args = serde_json::from_str(&args_json)
+ .map_err(|e| TcpTargetError::Serialization(format!("Deserialize failed: {}", e)))?;
+ let result = A::process(context, args).await?;
+ let result_json = serde_json::to_string(&result)
+ .map_err(|e| TcpTargetError::Serialization(format!("Serialize failed: {}", e)))?;
+ Ok(result_json)
+ })
+ }
}
diff --git a/crates/vcs_actions/src/actions/local_actions.rs b/crates/vcs_actions/src/actions/local_actions.rs
index b230c6f..0e210a7 100644
--- a/crates/vcs_actions/src/actions/local_actions.rs
+++ b/crates/vcs_actions/src/actions/local_actions.rs
@@ -6,7 +6,7 @@ use tcp_connection::error::TcpTargetError;
#[action_gen(local)]
pub async fn set_upstream_vault_action(
ctx: ActionContext,
- upstream: SocketAddr,
+ _upstream: SocketAddr,
) -> Result<(), TcpTargetError> {
if ctx.is_remote() {
return Err(TcpTargetError::NotLocal(
diff --git a/crates/vcs_actions/src/connection.rs b/crates/vcs_actions/src/connection.rs
index dabbd44..918f93c 100644
--- a/crates/vcs_actions/src/connection.rs
+++ b/crates/vcs_actions/src/connection.rs
@@ -1,2 +1,3 @@
pub mod action_service;
pub mod error;
+pub mod protocol;
diff --git a/crates/vcs_actions/src/connection/action_service.rs b/crates/vcs_actions/src/connection/action_service.rs
index 8d3a03d..9ea5957 100644
--- a/crates/vcs_actions/src/connection/action_service.rs
+++ b/crates/vcs_actions/src/connection/action_service.rs
@@ -1,6 +1,6 @@
use std::{net::SocketAddr, path::PathBuf, sync::Arc};
-use action_system::action_pool::ActionPool;
+use action_system::{action::ActionContext, action_pool::ActionPool};
use cfg_file::config::ConfigFile;
use tcp_connection::{error::TcpTargetError, instance::ConnectionInstance};
use tokio::{
@@ -10,10 +10,12 @@ use tokio::{
};
use vcs_data::data::vault::{Vault, config::VaultConfig};
-use crate::registry::server_registry::server_action_pool;
+use crate::{
+ connection::protocol::RemoteActionInvoke, registry::server_registry::server_action_pool,
+};
// Start the server with a Vault using the specified directory
-pub async fn server_entry(path: impl Into<PathBuf>) -> Result<(), TcpTargetError> {
+pub async fn server_entry(vault_path: impl Into<PathBuf>) -> Result<(), TcpTargetError> {
// Read the vault cfg
let vault_cfg = VaultConfig::read().await?;
@@ -21,7 +23,7 @@ pub async fn server_entry(path: impl Into<PathBuf>) -> Result<(), TcpTargetError
let listener = create_tcp_listener(&vault_cfg).await?;
// Initialize the vault
- let vault: Arc<Vault> = init_vault(vault_cfg, path.into()).await?;
+ let vault: Arc<Vault> = init_vault(vault_cfg, vault_path.into()).await?;
// Create ActionPool
let action_pool: Arc<ActionPool> = Arc::new(server_action_pool());
@@ -125,5 +127,22 @@ fn build_server_future(
}
async fn process_connection(stream: TcpStream, vault: Arc<Vault>, action_pool: Arc<ActionPool>) {
- let instance = ConnectionInstance::from(stream);
+ // Setup connection instance
+ let mut instance = ConnectionInstance::from(stream);
+
+ // Read action name and action arguments
+ let Ok(msg) = instance.read_msgpack::<RemoteActionInvoke>().await else {
+ return;
+ };
+
+ // Build context
+ let ctx = ActionContext::remote().insert_instance(instance);
+
+ // Process action
+ let Ok(_result_json) = action_pool
+ .process_json(&msg.action_name, ctx, msg.action_args_json)
+ .await
+ else {
+ return;
+ };
}
diff --git a/crates/vcs_actions/src/connection/protocol.rs b/crates/vcs_actions/src/connection/protocol.rs
new file mode 100644
index 0000000..2cebe79
--- /dev/null
+++ b/crates/vcs_actions/src/connection/protocol.rs
@@ -0,0 +1,7 @@
+use serde::{Deserialize, Serialize};
+
+#[derive(Default, Clone, Serialize, Deserialize)]
+pub struct RemoteActionInvoke {
+ pub action_name: String,
+ pub action_args_json: String,
+}
diff --git a/crates/vcs_actions/src/registry/client_registry.rs b/crates/vcs_actions/src/registry/client_registry.rs
index d298099..56acdad 100644
--- a/crates/vcs_actions/src/registry/client_registry.rs
+++ b/crates/vcs_actions/src/registry/client_registry.rs
@@ -1,7 +1,10 @@
use action_system::{action::ActionContext, action_pool::ActionPool};
use tcp_connection::error::TcpTargetError;
-use crate::actions::local_actions::register_set_upstream_vault_action;
+use crate::{
+ actions::local_actions::register_set_upstream_vault_action,
+ connection::protocol::RemoteActionInvoke,
+};
fn register_actions(pool: &mut ActionPool) {
// Pool register here
@@ -22,18 +25,33 @@ pub fn client_action_pool() -> ActionPool {
pool
}
-async fn on_proc_begin(ctx: &ActionContext) -> Result<(), TcpTargetError> {
+async fn on_proc_begin(ctx: &mut ActionContext) -> Result<(), TcpTargetError> {
+ // Is ctx remote
+ let is_remote = ctx.is_remote();
+
+ // Action name and arguments
+ let action_name = ctx.action_name().to_string();
+ let action_args_json = ctx.action_args_json().clone();
+
// Get instance
- let Some(_instance) = ctx.instance() else {
+ let Some(instance) = ctx.instance_mut() else {
return Err(TcpTargetError::Unsupported(
"Missing ConnectionInstance in current context, this ActionPool does not support this call"
.to_string()));
};
// If it's remote, invoke action at server
- if ctx.is_remote() {
- // instance.write_text(text)
+ if is_remote {
+ // Build protocol message
+ let msg = RemoteActionInvoke {
+ action_name: action_name,
+ action_args_json: action_args_json,
+ };
+
+ // Send
+ instance.write_msgpack(msg).await?;
}
+ // Return OK, wait for client to execute Action locally
Ok(())
}