summaryrefslogtreecommitdiff
path: root/crates
diff options
context:
space:
mode:
author魏曹先生 <1992414357@qq.com>2025-10-13 14:27:59 +0800
committerGitHub <noreply@github.com>2025-10-13 14:27:59 +0800
commitb9bbfb31bee88f6b10a9cc5b49e7618bef9d0be5 (patch)
treeef02095c73635b5ace574c26dfcb999017e34897 /crates
parentc1d862d6df58173c24604e4dda33db8ce3be3ad7 (diff)
parent4810f56e6a49b60923eb850d5944457650c81c75 (diff)
Merge pull request #21 from JustEnoughVCS/jvcs_dev
Jvcs dev
Diffstat (limited to 'crates')
-rw-r--r--crates/system_action/Cargo.toml4
-rw-r--r--crates/system_action/action_macros/Cargo.toml4
-rw-r--r--crates/system_action/action_macros/src/lib.rs54
-rw-r--r--crates/system_action/src/action.rs132
-rw-r--r--crates/system_action/src/action_pool.rs145
-rw-r--r--crates/utils/tcp_connection/src/error.rs3
-rw-r--r--crates/utils/tcp_connection/tcp_connection_test/src/test_msgpack.rs11
-rw-r--r--crates/vcs_actions/Cargo.toml13
-rw-r--r--crates/vcs_actions/src/actions/local_actions.rs25
-rw-r--r--crates/vcs_actions/src/connection.rs3
-rw-r--r--crates/vcs_actions/src/connection/action_service.rs151
-rw-r--r--crates/vcs_actions/src/connection/error.rs14
-rw-r--r--crates/vcs_actions/src/connection/protocol.rs7
-rw-r--r--crates/vcs_actions/src/lib.rs1
-rw-r--r--crates/vcs_actions/src/registry/client_registry.rs60
-rw-r--r--crates/vcs_actions/src/registry/server_registry.rs9
-rw-r--r--crates/vcs_data/src/data/local/config.rs25
-rw-r--r--crates/vcs_data/src/data/sheet.rs8
-rw-r--r--crates/vcs_data/src/data/vault/config.rs78
-rw-r--r--crates/vcs_data/todo.md31
-rw-r--r--crates/vcs_data/todo.txt36
-rw-r--r--crates/vcs_data/vcs_data_test/src/test_sheet_creation_management_and_persistence.rs2
22 files changed, 715 insertions, 101 deletions
diff --git a/crates/system_action/Cargo.toml b/crates/system_action/Cargo.toml
index ee4f774..54ae454 100644
--- a/crates/system_action/Cargo.toml
+++ b/crates/system_action/Cargo.toml
@@ -9,3 +9,7 @@ action_system_macros = { path = "action_macros" }
# Serialization
serde = { version = "1.0.219", features = ["derive"] }
+serde_json = "1.0.140"
+
+# Async & Networking
+tokio = { version = "1.46.1", features = ["full"] }
diff --git a/crates/system_action/action_macros/Cargo.toml b/crates/system_action/action_macros/Cargo.toml
index 5ae14fa..869dcde 100644
--- a/crates/system_action/action_macros/Cargo.toml
+++ b/crates/system_action/action_macros/Cargo.toml
@@ -13,3 +13,7 @@ string_proc = { path = "../../utils/string_proc" }
syn = { version = "2.0", features = ["full", "extra-traits"] }
quote = "1.0"
proc-macro2 = "1.0"
+
+# Serialization
+serde = { version = "1.0.219", features = ["derive"] }
+serde_json = "1.0.140"
diff --git a/crates/system_action/action_macros/src/lib.rs b/crates/system_action/action_macros/src/lib.rs
index a7de9b6..ce50073 100644
--- a/crates/system_action/action_macros/src/lib.rs
+++ b/crates/system_action/action_macros/src/lib.rs
@@ -37,6 +37,9 @@ fn generate_action_struct(input_fn: ItemFn, _is_local: bool) -> proc_macro2::Tok
let action_name_ident = &fn_name;
+ let register_this_action = quote::format_ident!("register_{}", action_name_ident);
+ let proc_this_action = quote::format_ident!("proc_{}", action_name_ident);
+
quote! {
#[derive(Debug, Clone, Default)]
#fn_vis struct #struct_name;
@@ -55,22 +58,28 @@ fn generate_action_struct(input_fn: ItemFn, _is_local: bool) -> proc_macro2::Tok
}
}
- impl #struct_name {
- #fn_vis fn register_to_pool(pool: &mut action_system::action_pool::ActionPool) {
- pool.register::<#struct_name, #arg_type, #return_type>();
- }
+ #fn_vis fn #register_this_action(pool: &mut action_system::action_pool::ActionPool) {
+ pool.register::<#struct_name, #arg_type, #return_type>();
+ }
- #fn_vis async fn process_at_pool<'a>(
- pool: &'a action_system::action_pool::ActionPool,
- ctx: action_system::action::ActionContext,
- #arg_param_name: #arg_type
- ) -> Result<#return_type, tcp_connection::error::TcpTargetError> {
- pool.process::<#arg_type, #return_type>(
- Box::leak(string_proc::snake_case!(stringify!(#action_name_ident)).into_boxed_str()),
- ctx,
- #arg_param_name
- ).await
- }
+ #fn_vis async fn #proc_this_action(
+ pool: &action_system::action_pool::ActionPool,
+ ctx: action_system::action::ActionContext,
+ #arg_param_name: #arg_type
+ ) -> Result<#return_type, tcp_connection::error::TcpTargetError> {
+ let args_json = serde_json::to_string(&#arg_param_name)
+ .map_err(|e| {
+ tcp_connection::error::TcpTargetError::Serialization(e.to_string())
+ })?;
+ let result_json = pool.process_json(
+ Box::leak(string_proc::snake_case!(stringify!(#action_name_ident)).into_boxed_str()),
+ ctx,
+ args_json,
+ ).await?;
+ serde_json::from_str(&result_json)
+ .map_err(|e| {
+ tcp_connection::error::TcpTargetError::Serialization(e.to_string())
+ })
}
#[allow(dead_code)]
@@ -79,20 +88,20 @@ fn generate_action_struct(input_fn: ItemFn, _is_local: bool) -> proc_macro2::Tok
#[doc = "Use the generated struct instead."]
#[doc = ""]
#[doc = "Register the action to the pool."]
- #[doc = "```rust"]
- #[doc = "YourActionPascalName::register_to_pool(&mut pool);"]
+ #[doc = "```ignore"]
+ #[doc = "register_your_func(&mut pool);"]
#[doc = "```"]
#[doc = ""]
#[doc = "Process the action at the pool."]
- #[doc = "```rust"]
- #[doc = "let result = YourActionPascalName::process_at_pool(&pool, ctx, arg).await?;"]
+ #[doc = "```ignore"]
+ #[doc = "let result = proc_your_func(&pool, ctx, arg).await?;"]
#[doc = "```"]
#fn_vis #fn_sig #fn_block
}
}
fn validate_function_signature(fn_sig: &syn::Signature) {
- if !fn_sig.asyncness.is_some() {
+ if fn_sig.asyncness.is_none() {
panic!("Expected async function for Action, but found synchronous function");
}
@@ -111,13 +120,12 @@ fn validate_function_signature(fn_sig: &syn::Signature) {
};
if let syn::Type::Path(type_path) = return_type.as_ref() {
- if let Some(segment) = type_path.path.segments.last() {
- if segment.ident != "Result" {
+ if let Some(segment) = type_path.path.segments.last()
+ && segment.ident != "Result" {
panic!(
"Expected Action function to return Result<T, TcpTargetError>, but found different return type"
);
}
- }
} else {
panic!(
"Expected Action function to return Result<T, TcpTargetError>, but found no return type"
diff --git a/crates/system_action/src/action.rs b/crates/system_action/src/action.rs
index 562a142..8a6180a 100644
--- a/crates/system_action/src/action.rs
+++ b/crates/system_action/src/action.rs
@@ -1,6 +1,15 @@
+use serde::{Serialize, de::DeserializeOwned};
+use std::any::{Any, TypeId};
+use std::collections::HashMap;
+use std::sync::Arc;
use tcp_connection::{error::TcpTargetError, instance::ConnectionInstance};
+use tokio::{net::TcpStream, sync::Mutex};
-pub trait Action<Args, Return> {
+pub trait Action<Args, Return>
+where
+ Args: Serialize + DeserializeOwned + Send,
+ Return: Serialize + DeserializeOwned + Send,
+{
fn action_name() -> &'static str;
fn is_remote_action() -> bool;
@@ -13,27 +22,54 @@ pub trait Action<Args, Return> {
#[derive(Default)]
pub struct ActionContext {
- // Whether the action is executed locally or remotely
+ /// Whether the action is executed locally or remotely
local: bool,
+ /// The name of the action being executed
+ action_name: String,
+
+ /// The JSON-serialized arguments for the action
+ action_args_json: String,
+
/// The connection instance in the current context,
- /// used to interact with the machine on the other end
- instance: Option<ConnectionInstance>,
+ instance: Option<Arc<Mutex<ConnectionInstance>>>,
+
+ /// Generic data storage for arbitrary types
+ data: HashMap<TypeId, Arc<dyn Any + Send + Sync>>,
}
impl ActionContext {
/// Generate local context
pub fn local() -> Self {
- let mut ctx = ActionContext::default();
- ctx.local = true;
- ctx
+ ActionContext {
+ local: true,
+ ..Default::default()
+ }
}
/// Generate remote context
pub fn remote() -> Self {
- let mut ctx = ActionContext::default();
- ctx.local = false;
- ctx
+ ActionContext {
+ local: false,
+ ..Default::default()
+ }
+ }
+
+ /// Build connection instance from TcpStream
+ pub fn build_instance(mut self, stream: TcpStream) -> Self {
+ self.instance = Some(Arc::new(Mutex::new(ConnectionInstance::from(stream))));
+ self
+ }
+
+ /// Insert connection instance into context
+ pub fn insert_instance(mut self, instance: ConnectionInstance) -> Self {
+ self.instance = Some(Arc::new(Mutex::new(instance)));
+ self
+ }
+
+ /// Pop connection instance from context
+ pub fn pop_instance(&mut self) -> Option<Arc<Mutex<ConnectionInstance>>> {
+ self.instance.take()
}
}
@@ -49,7 +85,81 @@ impl ActionContext {
}
/// Get the connection instance in the current context
- pub fn instance(&self) -> &Option<ConnectionInstance> {
+ pub fn instance(&self) -> &Option<Arc<Mutex<ConnectionInstance>>> {
&self.instance
}
+
+ /// Get a mutable reference to the connection instance in the current context
+ pub fn instance_mut(&mut self) -> &mut Option<Arc<Mutex<ConnectionInstance>>> {
+ &mut self.instance
+ }
+
+ /// Get the action name from the context
+ pub fn action_name(&self) -> &str {
+ &self.action_name
+ }
+
+ /// Get the action arguments from the context
+ pub fn action_args_json(&self) -> &String {
+ &self.action_args_json
+ }
+
+ /// Set the action name in the context
+ pub fn set_action_name(mut self, action_name: String) -> Self {
+ self.action_name = action_name;
+ self
+ }
+
+ /// Set the action arguments in the context
+ pub fn set_action_args(mut self, action_args: String) -> Self {
+ self.action_args_json = action_args;
+ self
+ }
+
+ /// Insert arbitrary data in the context
+ pub fn insert<T: Any + Send + Sync>(mut self, value: T) -> Self {
+ self.data.insert(TypeId::of::<T>(), Arc::new(value));
+ self
+ }
+
+ /// Insert arbitrary data as Arc in the context
+ pub fn insert_arc<T: Any + Send + Sync>(mut self, value: Arc<T>) -> Self {
+ self.data.insert(TypeId::of::<T>(), value);
+ self
+ }
+
+ /// Get arbitrary data from the context
+ pub fn get<T: Any + Send + Sync>(&self) -> Option<&T> {
+ self.data
+ .get(&TypeId::of::<T>())
+ .and_then(|arc| arc.downcast_ref::<T>())
+ }
+
+ /// Get arbitrary data as Arc from the context
+ pub fn get_arc<T: Any + Send + Sync>(&self) -> Option<Arc<T>> {
+ self.data
+ .get(&TypeId::of::<T>())
+ .and_then(|arc| Arc::clone(arc).downcast::<T>().ok())
+ }
+
+ /// Remove and return arbitrary data from the context
+ pub fn remove<T: Any + Send + Sync>(&mut self) -> Option<Arc<T>> {
+ self.data
+ .remove(&TypeId::of::<T>())
+ .and_then(|arc| arc.downcast::<T>().ok())
+ }
+
+ /// Check if the context contains data of a specific type
+ pub fn contains<T: Any + Send + Sync>(&self) -> bool {
+ self.data.contains_key(&TypeId::of::<T>())
+ }
+
+ /// Take ownership of the context and extract data of a specific type
+ pub fn take<T: Any + Send + Sync>(mut self) -> (Self, Option<Arc<T>>) {
+ let value = self
+ .data
+ .remove(&TypeId::of::<T>())
+ .and_then(|arc| arc.downcast::<T>().ok());
+ (self, value)
+ }
}
diff --git a/crates/system_action/src/action_pool.rs b/crates/system_action/src/action_pool.rs
index 0a1a6c7..c28de1e 100644
--- a/crates/system_action/src/action_pool.rs
+++ b/crates/system_action/src/action_pool.rs
@@ -1,11 +1,36 @@
+use std::pin::Pin;
+
+use serde::{Serialize, de::DeserializeOwned};
+use serde_json;
use tcp_connection::error::TcpTargetError;
use crate::action::{Action, ActionContext};
+type ProcBeginCallback = for<'a> fn(
+ &'a ActionContext,
+ args: &'a (dyn std::any::Any + Send + Sync),
+) -> ProcBeginFuture<'a>;
+type ProcEndCallback = fn() -> ProcEndFuture;
+
+type ProcBeginFuture<'a> = Pin<Box<dyn Future<Output = Result<(), TcpTargetError>> + Send + 'a>>;
+type ProcEndFuture = Pin<Box<dyn Future<Output = Result<(), TcpTargetError>> + Send>>;
+
/// A pool of registered actions that can be processed by name
pub struct ActionPool {
/// HashMap storing action name to action implementation mapping
actions: std::collections::HashMap<&'static str, Box<dyn ActionErased>>,
+
+ /// Callback to execute when process begins
+ on_proc_begin: Option<ProcBeginCallback>,
+
+ /// Callback to execute when process ends
+ on_proc_end: Option<ProcEndCallback>,
+}
+
+impl Default for ActionPool {
+ fn default() -> Self {
+ Self::new()
+ }
}
impl ActionPool {
@@ -13,20 +38,32 @@ impl ActionPool {
pub fn new() -> Self {
Self {
actions: std::collections::HashMap::new(),
+ on_proc_begin: None,
+ on_proc_end: None,
}
}
+ /// Sets a callback to be executed when process begins
+ pub fn set_on_proc_begin(&mut self, callback: ProcBeginCallback) {
+ self.on_proc_begin = Some(callback);
+ }
+
+ /// Sets a callback to be executed when process ends
+ pub fn set_on_proc_end(&mut self, callback: ProcEndCallback) {
+ self.on_proc_end = Some(callback);
+ }
+
/// Registers an action type with the pool
///
/// Usage:
- /// ```
+ /// ```ignore
/// action_pool.register::<MyAction, MyArgs, MyReturn>();
/// ```
pub fn register<A, Args, Return>(&mut self)
where
A: Action<Args, Return> + Send + Sync + 'static,
- Args: serde::de::DeserializeOwned + Send + Sync + 'static,
- Return: serde::Serialize + Send + Sync + 'static,
+ Args: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
+ Return: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
{
let action_name = A::action_name();
self.actions.insert(
@@ -38,7 +75,40 @@ impl ActionPool {
/// Processes an action by name with given context and arguments
///
/// Usage:
+ /// ```ignore
+ /// let result = action_pool.process::<MyArgs, MyReturn>("my_action", context, args).await?;
+ /// ```
+ /// Processes an action by name with JSON-serialized arguments
+ ///
+ /// Usage:
+ /// ```ignore
+ /// let result_json = action_pool.process_json("my_action", context, args_json).await?;
+ /// let result: MyReturn = serde_json::from_str(&result_json)?;
/// ```
+ pub async fn process_json<'a>(
+ &'a self,
+ action_name: &'a str,
+ context: ActionContext,
+ args_json: String,
+ ) -> Result<String, TcpTargetError> {
+ if let Some(action) = self.actions.get(action_name) {
+ // Set action name and args in context for callbacks
+ let context = context.set_action_name(action_name.to_string());
+ let context = context.set_action_args(args_json.clone());
+
+ self.exec_on_proc_begin(&context, &args_json).await?;
+ let result = action.process_json_erased(context, args_json).await?;
+ self.exec_on_proc_end().await?;
+ Ok(result)
+ } else {
+ Err(TcpTargetError::Unsupported("InvalidAction".to_string()))
+ }
+ }
+
+ /// Processes an action by name with given context and arguments
+ ///
+ /// Usage:
+ /// ```ignore
/// let result = action_pool.process::<MyArgs, MyReturn>("my_action", context, args).await?;
/// ```
pub async fn process<'a, Args, Return>(
@@ -48,34 +118,69 @@ impl ActionPool {
args: Args,
) -> Result<Return, TcpTargetError>
where
- Args: serde::de::DeserializeOwned + Send + 'static,
+ Args: serde::de::DeserializeOwned + Send + Sync + 'static,
Return: serde::Serialize + Send + 'static,
{
if let Some(action) = self.actions.get(action_name) {
+ self.exec_on_proc_begin(&context, &args).await?;
let result = action.process_erased(context, Box::new(args)).await?;
let result = *result
.downcast::<Return>()
.map_err(|_| TcpTargetError::Unsupported("InvalidArguments".to_string()))?;
+ self.exec_on_proc_end().await?;
Ok(result)
} else {
Err(TcpTargetError::Unsupported("InvalidAction".to_string()))
}
}
+
+ /// Executes the process begin callback if set
+ async fn exec_on_proc_begin(
+ &self,
+ context: &ActionContext,
+ args: &(dyn std::any::Any + Send + Sync),
+ ) -> Result<(), TcpTargetError> {
+ if let Some(callback) = &self.on_proc_begin {
+ callback(context, args).await
+ } else {
+ Ok(())
+ }
+ }
+
+ /// Executes the process end callback if set
+ async fn exec_on_proc_end(&self) -> Result<(), TcpTargetError> {
+ if let Some(callback) = &self.on_proc_end {
+ callback().await
+ } else {
+ Ok(())
+ }
+ }
}
/// Trait for type-erased actions that can be stored in ActionPool
+type ProcessErasedFuture = std::pin::Pin<
+ Box<
+ dyn std::future::Future<Output = Result<Box<dyn std::any::Any + Send>, TcpTargetError>>
+ + Send,
+ >,
+>;
+type ProcessJsonErasedFuture =
+ std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, TcpTargetError>> + Send>>;
+
trait ActionErased: Send + Sync {
/// Processes the action with type-erased arguments and returns type-erased result
fn process_erased(
&self,
context: ActionContext,
args: Box<dyn std::any::Any + Send>,
- ) -> std::pin::Pin<
- Box<
- dyn std::future::Future<Output = Result<Box<dyn std::any::Any + Send>, TcpTargetError>>
- + Send,
- >,
- >;
+ ) -> ProcessErasedFuture;
+
+ /// Processes the action with JSON-serialized arguments and returns JSON-serialized result
+ fn process_json_erased(
+ &self,
+ context: ActionContext,
+ args_json: String,
+ ) -> ProcessJsonErasedFuture;
}
/// Wrapper struct that implements ActionErased for concrete Action types
@@ -84,8 +189,8 @@ struct ActionWrapper<A, Args, Return>(std::marker::PhantomData<(A, Args, Return)
impl<A, Args, Return> ActionErased for ActionWrapper<A, Args, Return>
where
A: Action<Args, Return> + Send + Sync,
- Args: serde::de::DeserializeOwned + Send + Sync + 'static,
- Return: serde::Serialize + Send + Sync + 'static,
+ Args: Serialize + DeserializeOwned + Send + Sync + 'static,
+ Return: Serialize + DeserializeOwned + Send + Sync + 'static,
{
fn process_erased(
&self,
@@ -105,4 +210,20 @@ where
Ok(Box::new(result) as Box<dyn std::any::Any + Send>)
})
}
+
+ fn process_json_erased(
+ &self,
+ context: ActionContext,
+ args_json: String,
+ ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, TcpTargetError>> + Send>>
+ {
+ Box::pin(async move {
+ let args: Args = serde_json::from_str(&args_json)
+ .map_err(|e| TcpTargetError::Serialization(format!("Deserialize failed: {}", e)))?;
+ let result = A::process(context, args).await?;
+ let result_json = serde_json::to_string(&result)
+ .map_err(|e| TcpTargetError::Serialization(format!("Serialize failed: {}", e)))?;
+ Ok(result_json)
+ })
+ }
}
diff --git a/crates/utils/tcp_connection/src/error.rs b/crates/utils/tcp_connection/src/error.rs
index 691e5ee..28e33d3 100644
--- a/crates/utils/tcp_connection/src/error.rs
+++ b/crates/utils/tcp_connection/src/error.rs
@@ -41,6 +41,9 @@ pub enum TcpTargetError {
#[error("Not remote machine: {0}")]
NotRemote(String),
+
+ #[error("Not found: {0}")]
+ NotFound(String),
}
impl From<io::Error> for TcpTargetError {
diff --git a/crates/utils/tcp_connection/tcp_connection_test/src/test_msgpack.rs b/crates/utils/tcp_connection/tcp_connection_test/src/test_msgpack.rs
index 7a7dc1f..4c9c870 100644
--- a/crates/utils/tcp_connection/tcp_connection_test/src/test_msgpack.rs
+++ b/crates/utils/tcp_connection/tcp_connection_test/src/test_msgpack.rs
@@ -9,21 +9,12 @@ use crate::test_utils::{
target_configure::ServerTargetConfig,
};
-#[derive(Debug, PartialEq, Serialize, Deserialize)]
+#[derive(Debug, PartialEq, Serialize, Deserialize, Default)]
struct TestData {
id: u32,
name: String,
}
-impl Default for TestData {
- fn default() -> Self {
- Self {
- id: 0,
- name: String::new(),
- }
- }
-}
-
pub(crate) struct MsgPackClientHandle;
impl ClientHandle<MsgPackServerHandle> for MsgPackClientHandle {
diff --git a/crates/vcs_actions/Cargo.toml b/crates/vcs_actions/Cargo.toml
index e5a07f6..6735d43 100644
--- a/crates/vcs_actions/Cargo.toml
+++ b/crates/vcs_actions/Cargo.toml
@@ -13,3 +13,16 @@ string_proc = { path = "../utils/string_proc" }
# Core dependencies
action_system = { path = "../system_action" }
vcs_data = { path = "../vcs_data" }
+
+# Error handling
+thiserror = "1.0.69"
+
+# Serialization
+serde = { version = "1.0.219", features = ["derive"] }
+serde_json = "1.0.140"
+
+# Async & Networking
+tokio = { version = "1.46.1", features = ["full"] }
+
+# Logging
+log = "0.4.28"
diff --git a/crates/vcs_actions/src/actions/local_actions.rs b/crates/vcs_actions/src/actions/local_actions.rs
index e69de29..b11a934 100644
--- a/crates/vcs_actions/src/actions/local_actions.rs
+++ b/crates/vcs_actions/src/actions/local_actions.rs
@@ -0,0 +1,25 @@
+use action_system::{action::ActionContext, action_gen};
+use log::info;
+use tcp_connection::error::TcpTargetError;
+
+#[action_gen]
+pub async fn hello_world_action(ctx: ActionContext, _n: ()) -> Result<(), TcpTargetError> {
+ // Ensure the instance is available
+ let Some(instance) = ctx.instance() else {
+ return Err(TcpTargetError::NotFound(
+ "Connection Instance Lost.".to_string(),
+ ));
+ };
+
+ if ctx.is_local() {
+ // Invoke on local
+ // Send the message to the server
+ let _ = instance.lock().await.write_text("Hello World!").await;
+ } else if ctx.is_remote() {
+ // Read the message from the client
+ let read = instance.lock().await.read_text().await?;
+ info!("{}", read)
+ }
+
+ Ok(())
+}
diff --git a/crates/vcs_actions/src/connection.rs b/crates/vcs_actions/src/connection.rs
new file mode 100644
index 0000000..918f93c
--- /dev/null
+++ b/crates/vcs_actions/src/connection.rs
@@ -0,0 +1,3 @@
+pub mod action_service;
+pub mod error;
+pub mod protocol;
diff --git a/crates/vcs_actions/src/connection/action_service.rs b/crates/vcs_actions/src/connection/action_service.rs
new file mode 100644
index 0000000..c302fd4
--- /dev/null
+++ b/crates/vcs_actions/src/connection/action_service.rs
@@ -0,0 +1,151 @@
+use std::{net::SocketAddr, path::PathBuf, sync::Arc};
+
+use action_system::{action::ActionContext, action_pool::ActionPool};
+use cfg_file::config::ConfigFile;
+use tcp_connection::{error::TcpTargetError, instance::ConnectionInstance};
+use tokio::{
+ net::{TcpListener, TcpStream},
+ select, signal, spawn,
+ sync::mpsc,
+};
+use vcs_data::data::vault::{Vault, config::VaultConfig};
+
+use crate::{
+ connection::protocol::RemoteActionInvoke, registry::server_registry::server_action_pool,
+};
+
+// Start the server with a Vault using the specified directory
+pub async fn server_entry(vault_path: impl Into<PathBuf>) -> Result<(), TcpTargetError> {
+ // Read the vault cfg
+ let vault_cfg = VaultConfig::read().await?;
+
+ // Create TCPListener
+ let listener = create_tcp_listener(&vault_cfg).await?;
+
+ // Initialize the vault
+ let vault: Arc<Vault> = init_vault(vault_cfg, vault_path.into()).await?;
+
+ // Create ActionPool
+ let action_pool: Arc<ActionPool> = Arc::new(server_action_pool());
+
+ // Start the server
+ let (_shutdown_rx, future) = build_server_future(vault.clone(), action_pool.clone(), listener);
+ future.await?; // Start and block until shutdown
+
+ Ok(())
+}
+
+async fn create_tcp_listener(cfg: &VaultConfig) -> Result<TcpListener, TcpTargetError> {
+ let local_bind_addr = cfg.server_config().local_bind();
+ let bind_port = cfg.server_config().port();
+ let sock_addr = SocketAddr::new(*local_bind_addr, bind_port);
+ let listener = TcpListener::bind(sock_addr).await?;
+
+ Ok(listener)
+}
+
+async fn init_vault(cfg: VaultConfig, path: PathBuf) -> Result<Arc<Vault>, TcpTargetError> {
+ // Init and create the vault
+ let Some(vault) = Vault::init(cfg, path) else {
+ return Err(TcpTargetError::NotFound("Vault not found".to_string()));
+ };
+ let vault: Arc<Vault> = Arc::new(vault);
+
+ Ok(vault)
+}
+
+fn build_server_future(
+ vault: Arc<Vault>,
+ action_pool: Arc<ActionPool>,
+ listener: TcpListener,
+) -> (
+ mpsc::Sender<()>,
+ impl std::future::Future<Output = Result<(), TcpTargetError>>,
+) {
+ let (tx, mut rx) = mpsc::channel::<i32>(100);
+ let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
+ let mut active_connections = 0;
+ let mut shutdown_requested = false;
+
+ // Spawn task to handle Ctrl+C
+ let shutdown_tx_clone = shutdown_tx.clone();
+ spawn(async move {
+ if let Ok(()) = signal::ctrl_c().await {
+ let _ = shutdown_tx_clone.send(()).await;
+ }
+ });
+
+ let future = async move {
+ loop {
+ select! {
+ // Accept new connections
+ accept_result = listener.accept(), if !shutdown_requested => {
+ match accept_result {
+ Ok((stream, _addr)) => {
+ active_connections += 1;
+ let _ = tx.send(1).await;
+
+ let vault_clone = vault.clone();
+ let action_pool_clone = action_pool.clone();
+ let tx_clone = tx.clone();
+ spawn(async move {
+ process_connection(stream, vault_clone, action_pool_clone).await;
+ let _ = tx_clone.send(-1).await;
+ });
+ }
+ Err(_) => {
+ continue;
+ }
+ }
+ }
+
+ // Handle connection count updates
+ Some(count_change) = rx.recv() => {
+ active_connections = (active_connections as i32 + count_change) as usize;
+
+ // Check if we should shutdown after all connections are done
+ if shutdown_requested && active_connections == 0 {
+ break;
+ }
+ }
+
+ // Handle shutdown signal
+ _ = shutdown_rx.recv() => {
+ shutdown_requested = true;
+ // If no active connections, break immediately
+ if active_connections == 0 {
+ break;
+ }
+ }
+ }
+ }
+
+ Ok(())
+ };
+
+ (shutdown_tx, future)
+}
+
+async fn process_connection(stream: TcpStream, vault: Arc<Vault>, action_pool: Arc<ActionPool>) {
+ // Setup connection instance
+ let mut instance = ConnectionInstance::from(stream);
+
+ // Read action name and action arguments
+ let Ok(msg) = instance.read_msgpack::<RemoteActionInvoke>().await else {
+ return;
+ };
+
+ // Build context
+ let ctx: ActionContext = ActionContext::remote().insert_instance(instance);
+
+ // Insert vault into context
+ let ctx = ctx.insert_arc(vault);
+
+ // Process action
+ let Ok(_result_json) = action_pool
+ .process_json(&msg.action_name, ctx, msg.action_args_json)
+ .await
+ else {
+ return;
+ };
+}
diff --git a/crates/vcs_actions/src/connection/error.rs b/crates/vcs_actions/src/connection/error.rs
new file mode 100644
index 0000000..241c16e
--- /dev/null
+++ b/crates/vcs_actions/src/connection/error.rs
@@ -0,0 +1,14 @@
+use std::io;
+use thiserror::Error;
+
+#[derive(Error, Debug, Clone)]
+pub enum ConnectionError {
+ #[error("I/O error: {0}")]
+ Io(String),
+}
+
+impl From<io::Error> for ConnectionError {
+ fn from(error: io::Error) -> Self {
+ ConnectionError::Io(error.to_string())
+ }
+}
diff --git a/crates/vcs_actions/src/connection/protocol.rs b/crates/vcs_actions/src/connection/protocol.rs
new file mode 100644
index 0000000..2cebe79
--- /dev/null
+++ b/crates/vcs_actions/src/connection/protocol.rs
@@ -0,0 +1,7 @@
+use serde::{Deserialize, Serialize};
+
+#[derive(Default, Clone, Serialize, Deserialize)]
+pub struct RemoteActionInvoke {
+ pub action_name: String,
+ pub action_args_json: String,
+}
diff --git a/crates/vcs_actions/src/lib.rs b/crates/vcs_actions/src/lib.rs
index 92de35f..2f7cbe4 100644
--- a/crates/vcs_actions/src/lib.rs
+++ b/crates/vcs_actions/src/lib.rs
@@ -1,2 +1,3 @@
pub mod actions;
+pub mod connection;
pub mod registry;
diff --git a/crates/vcs_actions/src/registry/client_registry.rs b/crates/vcs_actions/src/registry/client_registry.rs
index e69de29..5939bed 100644
--- a/crates/vcs_actions/src/registry/client_registry.rs
+++ b/crates/vcs_actions/src/registry/client_registry.rs
@@ -0,0 +1,60 @@
+use action_system::{action::ActionContext, action_pool::ActionPool};
+use tcp_connection::error::TcpTargetError;
+
+use crate::{
+ actions::local_actions::register_hello_world_action, connection::protocol::RemoteActionInvoke,
+};
+
+fn register_actions(pool: &mut ActionPool) {
+ // Pool register here
+ register_hello_world_action(pool);
+}
+
+pub fn client_action_pool() -> ActionPool {
+ // Create pool
+ let mut pool = ActionPool::new();
+
+ // Register actions
+ register_actions(&mut pool);
+
+ // Add process events
+ pool.set_on_proc_begin(|ctx, args| Box::pin(on_proc_begin(ctx, args)));
+
+ // Return
+ pool
+}
+
+async fn on_proc_begin(
+ ctx: &ActionContext,
+ _args: &(dyn std::any::Any + Send + Sync),
+) -> Result<(), TcpTargetError> {
+ // Is ctx remote
+ let is_remote = ctx.is_remote();
+
+ // Action name and arguments
+ let action_name = ctx.action_name().to_string();
+ let action_args_json = ctx.action_args_json().clone();
+
+ // Get instance
+ let Some(instance) = ctx.instance() else {
+ return Err(TcpTargetError::Unsupported(
+ "Missing ConnectionInstance in current context, this ActionPool does not support this call"
+ .to_string()));
+ };
+
+ // If it's remote, invoke action at server
+ if is_remote {
+ // Build protocol message
+ let msg = RemoteActionInvoke {
+ action_name,
+ action_args_json,
+ };
+
+ // Send
+ let mut instance = instance.lock().await;
+ instance.write_msgpack(&msg).await?;
+ }
+
+ // Return OK, wait for client to execute Action locally
+ Ok(())
+}
diff --git a/crates/vcs_actions/src/registry/server_registry.rs b/crates/vcs_actions/src/registry/server_registry.rs
index e69de29..b449b68 100644
--- a/crates/vcs_actions/src/registry/server_registry.rs
+++ b/crates/vcs_actions/src/registry/server_registry.rs
@@ -0,0 +1,9 @@
+use action_system::action_pool::ActionPool;
+
+use crate::actions::local_actions::register_hello_world_action;
+
+pub fn server_action_pool() -> ActionPool {
+ let mut pool = ActionPool::new();
+ register_hello_world_action(&mut pool);
+ pool
+}
diff --git a/crates/vcs_data/src/data/local/config.rs b/crates/vcs_data/src/data/local/config.rs
index 5444047..338d01b 100644
--- a/crates/vcs_data/src/data/local/config.rs
+++ b/crates/vcs_data/src/data/local/config.rs
@@ -5,6 +5,7 @@ use std::net::SocketAddr;
use crate::constants::CLIENT_FILE_WORKSPACE;
use crate::constants::PORT;
use crate::data::member::MemberId;
+use crate::data::vault::config::VaultUuid;
#[derive(Serialize, Deserialize, ConfigFile)]
#[cfg_file(path = CLIENT_FILE_WORKSPACE)]
@@ -16,6 +17,14 @@ pub struct LocalConfig {
/// The member ID used by the current local workspace.
/// This ID will be used to verify access permissions when connecting to the upstream server.
using_account: MemberId,
+
+ /// Whether the local workspace is stained.
+ ///
+ /// If stained, it can only set an upstream server with the same identifier.
+ ///
+ /// If the value is None, it means not stained;
+ /// otherwise, it contains the stain identifier (i.e., the upstream vault's unique ID)
+ stained_uuid: Option<VaultUuid>,
}
impl Default for LocalConfig {
@@ -26,6 +35,7 @@ impl Default for LocalConfig {
PORT,
)),
using_account: "unknown".to_string(),
+ stained_uuid: None,
}
}
}
@@ -50,4 +60,19 @@ impl LocalConfig {
pub fn current_account(&self) -> MemberId {
self.using_account.clone()
}
+
+ /// Check if the local workspace is stained.
+ pub fn stained(&self) -> bool {
+ self.stained_uuid.is_some()
+ }
+
+ /// Stain the local workspace with the given UUID.
+ pub fn stain(&mut self, uuid: VaultUuid) {
+ self.stained_uuid = Some(uuid);
+ }
+
+ /// Unstain the local workspace.
+ pub fn unstain(&mut self) {
+ self.stained_uuid = None;
+ }
}
diff --git a/crates/vcs_data/src/data/sheet.rs b/crates/vcs_data/src/data/sheet.rs
index a6220c9..f1cf67c 100644
--- a/crates/vcs_data/src/data/sheet.rs
+++ b/crates/vcs_data/src/data/sheet.rs
@@ -107,7 +107,7 @@ impl<'a> Sheet<'a> {
}
/// Accept an input package and insert to the sheet
- pub fn accept_import(
+ pub async fn accept_import(
&mut self,
input_name: &InputName,
insert_to: &SheetPathBuf,
@@ -129,7 +129,8 @@ impl<'a> Sheet<'a> {
// Insert to sheet
for (relative_path, virtual_file_id) in input.files {
- let _ = self.add_mapping(insert_to.join(relative_path), virtual_file_id);
+ self.add_mapping(insert_to.join(relative_path), virtual_file_id)
+ .await?;
}
Ok(())
@@ -176,8 +177,7 @@ impl<'a> Sheet<'a> {
}
Err(_) => {
// Error checking rights, don't allow modifying the mapping
- Err(std::io::Error::new(
- std::io::ErrorKind::Other,
+ Err(std::io::Error::other(
"Failed to check virtual file edit rights",
))
}
diff --git a/crates/vcs_data/src/data/vault/config.rs b/crates/vcs_data/src/data/vault/config.rs
index 6eea25a..5586e1e 100644
--- a/crates/vcs_data/src/data/vault/config.rs
+++ b/crates/vcs_data/src/data/vault/config.rs
@@ -2,15 +2,22 @@ use std::net::{IpAddr, Ipv4Addr};
use cfg_file::ConfigFile;
use serde::{Deserialize, Serialize};
+use uuid::Uuid;
use crate::constants::{PORT, SERVER_FILE_VAULT};
use crate::data::member::{Member, MemberId};
+pub type VaultName = String;
+pub type VaultUuid = Uuid;
+
#[derive(Serialize, Deserialize, ConfigFile)]
#[cfg_file(path = SERVER_FILE_VAULT)]
pub struct VaultConfig {
+ /// Vault uuid, unique identifier for the vault
+ vault_uuid: VaultUuid,
+
/// Vault name, which can be used as the project name and generally serves as a hint
- vault_name: String,
+ vault_name: VaultName,
/// Vault admin id, a list of member id representing administrator identities
vault_admin_list: Vec<MemberId>,
@@ -42,6 +49,7 @@ pub struct VaultServerConfig {
impl Default for VaultConfig {
fn default() -> Self {
Self {
+ vault_uuid: Uuid::new_v4(),
vault_name: "JustEnoughVault".to_string(),
vault_admin_list: Vec::new(),
server_config: VaultServerConfig {
@@ -56,12 +64,12 @@ impl Default for VaultConfig {
/// Vault Management
impl VaultConfig {
- // Change name of the vault.
+ /// Change name of the vault.
pub fn change_name(&mut self, name: impl Into<String>) {
self.vault_name = name.into()
}
- // Add admin
+ /// Add admin
pub fn add_admin(&mut self, member: &Member) {
let uuid = member.id();
if !self.vault_admin_list.contains(&uuid) {
@@ -69,9 +77,71 @@ impl VaultConfig {
}
}
- // Remove admin
+ /// Remove admin
pub fn remove_admin(&mut self, member: &Member) {
let id = member.id();
self.vault_admin_list.retain(|x| x != &id);
}
+
+ /// Get vault UUID
+ pub fn vault_uuid(&self) -> &VaultUuid {
+ &self.vault_uuid
+ }
+
+ /// Set vault UUID
+ pub fn set_vault_uuid(&mut self, vault_uuid: VaultUuid) {
+ self.vault_uuid = vault_uuid;
+ }
+
+ /// Get vault name
+ pub fn vault_name(&self) -> &VaultName {
+ &self.vault_name
+ }
+
+ /// Set vault name
+ pub fn set_vault_name(&mut self, vault_name: VaultName) {
+ self.vault_name = vault_name;
+ }
+
+ /// Get vault admin list
+ pub fn vault_admin_list(&self) -> &Vec<MemberId> {
+ &self.vault_admin_list
+ }
+
+ /// Set vault admin list
+ pub fn set_vault_admin_list(&mut self, vault_admin_list: Vec<MemberId>) {
+ self.vault_admin_list = vault_admin_list;
+ }
+
+ /// Get server config
+ pub fn server_config(&self) -> &VaultServerConfig {
+ &self.server_config
+ }
+
+ /// Set server config
+ pub fn set_server_config(&mut self, server_config: VaultServerConfig) {
+ self.server_config = server_config;
+ }
+}
+
+impl VaultServerConfig {
+ /// Get local bind IP address
+ pub fn local_bind(&self) -> &IpAddr {
+ &self.local_bind
+ }
+
+ /// Set local bind IP address
+ pub fn set_local_bind(&mut self, local_bind: IpAddr) {
+ self.local_bind = local_bind;
+ }
+
+ /// Get port
+ pub fn port(&self) -> u16 {
+ self.port
+ }
+
+ /// Set port
+ pub fn set_port(&mut self, port: u16) {
+ self.port = port;
+ }
}
diff --git a/crates/vcs_data/todo.md b/crates/vcs_data/todo.md
new file mode 100644
index 0000000..3c7e0c0
--- /dev/null
+++ b/crates/vcs_data/todo.md
@@ -0,0 +1,31 @@
+| 类别 | 项 | 可完成性 | 已完成 |
+|----------|----|----------|--------|
+| 本地文件 | 设置上游服务器(仅设置,不会连接和修改染色标识) | y | |
+| 本地文件 | 验证连接、权限,并为当前工作区染色(若已染色,则无法连接不同标识的服务器) | y | |
+| 本地文件 | 进入表 (否则无法做任何操作) | | |
+| 本地文件 | 退出表 (文件将会从当前目录移出,等待下次进入时还原) | | |
+| 本地文件 | 去色 - 断开与上游服务器的关联 | y | |
+| 本地文件 | 跟踪本地文件的移动、重命名,立刻同步至表 | | |
+| 本地文件 | 扫描本地文件结构,标记变化 | | |
+| 本地文件 | 通过本地暂存的表索引搜索文件 | | |
+| 本地文件 | 查询本地某个文件的状态 | | |
+| 本地文件 | 查询当前目录的状态 | | |
+| 本地文件 | 查询工作区状态 | | |
+| 本地文件 | 将本地所有文件更新到最新状态 | | |
+| 本地文件 | 提交所有产生变化的自身所属文件 | | |
+| 表 | 表查看 - 指定表并查看结构 | | |
+| 表 | 从参照表拉入文件项目 | | |
+| 表 | 将文件项目(或多个)导出到指定表 | | |
+| 表 | 查看导入请求 | | |
+| 表 | 在某个本地地址同意并导入文件 | | |
+| 表 | 拒绝某个、某些或所有导入请求 | | |
+| 表 | 删除表中的映射,但要确保实际文件已被移除 (忽略文件) | | |
+| 表 | 放弃表,所有者消失,下一个切换至表的人获得(放弃需要确保表中没有任何文件是所有者持有的)(替代目前的安全删除) | | |
+| 虚拟文件 | 跟踪本地某些文件,并将其创建为虚拟文件,然后添加到自己的表 | | |
+| 虚拟文件 | 根据本地文件的目录查找虚拟文件,并为自己获得所有权(需要确保版本和上游同步才可) | | |
+| 虚拟文件 | 根据本地文件的目录查找虚拟文件,并放弃所有权(需要确保和上游同步才可) | | |
+| 虚拟文件 | 根据本地文件的目录查找虚拟文件,并定向到指定的存在的老版本 | | |
+
+
+?为什么虚拟文件不能删除:虚拟文件的唯一删除方式就是,没有人再用他
+?为什么没有删除表:同理,表权限可以转移,但是删除只能等待定期清除无主人的表
diff --git a/crates/vcs_data/todo.txt b/crates/vcs_data/todo.txt
deleted file mode 100644
index 65c94ef..0000000
--- a/crates/vcs_data/todo.txt
+++ /dev/null
@@ -1,36 +0,0 @@
-本地文件操作
-设置上游服务器(仅设置,不会连接和修改染色标识)
-验证连接、权限,并为当前工作区染色(若已染色,则无法连接不同标识的服务器)
-进入表 (否则无法做任何操作)
-退出表 (文件将会从当前目录移出,等待下次进入时还原)
-去色 - 断开与上游服务器的关联
-跟踪本地文件的移动、重命名,立刻同步至表
-扫描本地文件结构,标记变化
-通过本地暂存的表索引搜索文件
-查询本地某个文件的状态
-查询当前目录的状态
-查询工作区状态
-将本地所有文件更新到最新状态
-提交所有产生变化的自身所属文件
-
-
-表操作(必须指定成员和表)
-表查看 - 指定表并查看结构
-从参照表拉入文件项目
-将文件项目(或多个)导出到指定表
-查看导入请求
-在某个本地地址同意并导入文件
-拒绝某个、某些或所有导入请求
-删除表中的映射,但要确保实际文件已被移除 (忽略文件)
-放弃表,所有者消失,下一个切换至表的人获得(放弃需要确保表中没有任何文件是所有者持有的)(替代目前的安全删除)
-
-
-虚拟文件操作
-跟踪本地某些文件,并将其创建为虚拟文件,然后添加到自己的表
-根据本地文件的目录查找虚拟文件,并为自己获得所有权(需要确保版本和上游同步才可)
-根据本地文件的目录查找虚拟文件,并放弃所有权(需要确保和上游同步才可)
-根据本地文件的目录查找虚拟文件,并定向到指定的存在的老版本
-
-
-?为什么虚拟文件不能删除:虚拟文件的唯一删除方式就是,没有人再用他
-?为什么没有删除表:同理,表权限可以转移,但是删除只能等待定期清除无主人的表
diff --git a/crates/vcs_data/vcs_data_test/src/test_sheet_creation_management_and_persistence.rs b/crates/vcs_data/vcs_data_test/src/test_sheet_creation_management_and_persistence.rs
index 461d465..a8dfb89 100644
--- a/crates/vcs_data/vcs_data_test/src/test_sheet_creation_management_and_persistence.rs
+++ b/crates/vcs_data/vcs_data_test/src/test_sheet_creation_management_and_persistence.rs
@@ -254,7 +254,7 @@ async fn test_sheet_data_serialization() -> Result<(), std::io::Error> {
// Add some inputs
let input_name = "source_files".to_string();
- let _files = vec![
+ let _files = [
(
InputRelativePathBuf::from("src/main.rs"),
VirtualFileId::new(),