summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author魏曹先生 <1992414357@qq.com>2025-10-27 17:59:19 +0800
committerGitHub <noreply@github.com>2025-10-27 17:59:19 +0800
commit5e150adf0e3d8b3843779eddd83469d1b1ba84bc (patch)
treeaedbd6cd10757da9c9d401a818ed1471f377d54b
parent49ad7a152cf849c8d91ee6b686da31f9c252f77c (diff)
parent368687c943a13427b5338a30fb7b55558420f4de (diff)
Merge pull request #26 from JustEnoughVCS/jvcs_dev
Jvcs dev
-rw-r--r--.gitignore3
-rw-r--r--Cargo.lock1
-rw-r--r--Cargo.toml2
-rw-r--r--crates/system_action/action_macros/src/lib.rs14
-rw-r--r--crates/system_action/src/action.rs27
-rw-r--r--crates/system_action/src/action_pool.rs12
-rw-r--r--crates/system_action/src/lib.rs5
-rw-r--r--crates/utils/tcp_connection/src/error.rs3
-rw-r--r--crates/vcs_actions/src/actions/local_actions.rs17
-rw-r--r--crates/vcs_actions/src/connection/action_service.rs75
-rw-r--r--crates/vcs_actions/src/registry/client_registry.rs9
-rw-r--r--crates/vcs_actions/src/registry/server_registry.rs4
-rw-r--r--crates/vcs_data/src/constants.rs13
-rw-r--r--crates/vcs_data/src/data/local.rs2
-rw-r--r--crates/vcs_data/src/data/vault.rs1
-rw-r--r--crates/vcs_data/src/data/vault/service.rs43
-rw-r--r--examples/src/bin/example_action_system.rs2
-rw-r--r--src/lib.rs10
18 files changed, 191 insertions, 52 deletions
diff --git a/.gitignore b/.gitignore
index 21d9745..ab890bc 100644
--- a/.gitignore
+++ b/.gitignore
@@ -16,3 +16,6 @@
# Obsidian directory
/.obsidian/
+
+# Target directory
+/target/
diff --git a/Cargo.lock b/Cargo.lock
index b7911b5..daf478d 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -555,6 +555,7 @@ dependencies = [
name = "just_enough_vcs"
version = "0.0.0"
dependencies = [
+ "action_system",
"cfg_file",
"string_proc",
"tcp_connection",
diff --git a/Cargo.toml b/Cargo.toml
index 881fbbe..fa5292c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -60,5 +60,7 @@ cfg_file = { path = "crates/utils/cfg_file" }
tcp_connection = { path = "crates/utils/tcp_connection" }
string_proc = { path = "crates/utils/string_proc" }
+action_system = { path = "crates/system_action" }
+
vcs_data = { path = "crates/vcs_data" }
vcs_actions = { path = "crates/vcs_actions" }
diff --git a/crates/system_action/action_macros/src/lib.rs b/crates/system_action/action_macros/src/lib.rs
index ce50073..4c03b63 100644
--- a/crates/system_action/action_macros/src/lib.rs
+++ b/crates/system_action/action_macros/src/lib.rs
@@ -64,9 +64,10 @@ fn generate_action_struct(input_fn: ItemFn, _is_local: bool) -> proc_macro2::Tok
#fn_vis async fn #proc_this_action(
pool: &action_system::action_pool::ActionPool,
- ctx: action_system::action::ActionContext,
+ mut ctx: action_system::action::ActionContext,
#arg_param_name: #arg_type
) -> Result<#return_type, tcp_connection::error::TcpTargetError> {
+ ctx.set_is_remote_action(!#_is_local);
let args_json = serde_json::to_string(&#arg_param_name)
.map_err(|e| {
tcp_connection::error::TcpTargetError::Serialization(e.to_string())
@@ -121,11 +122,12 @@ fn validate_function_signature(fn_sig: &syn::Signature) {
if let syn::Type::Path(type_path) = return_type.as_ref() {
if let Some(segment) = type_path.path.segments.last()
- && segment.ident != "Result" {
- panic!(
- "Expected Action function to return Result<T, TcpTargetError>, but found different return type"
- );
- }
+ && segment.ident != "Result"
+ {
+ panic!(
+ "Expected Action function to return Result<T, TcpTargetError>, but found different return type"
+ );
+ }
} else {
panic!(
"Expected Action function to return Result<T, TcpTargetError>, but found no return type"
diff --git a/crates/system_action/src/action.rs b/crates/system_action/src/action.rs
index 8a6180a..9eef1db 100644
--- a/crates/system_action/src/action.rs
+++ b/crates/system_action/src/action.rs
@@ -23,7 +23,10 @@ where
#[derive(Default)]
pub struct ActionContext {
/// Whether the action is executed locally or remotely
- local: bool,
+ proc_on_local: bool,
+
+ /// Whether the action being executed in the current context is a remote action
+ is_remote_action: bool,
/// The name of the action being executed
action_name: String,
@@ -42,7 +45,7 @@ impl ActionContext {
/// Generate local context
pub fn local() -> Self {
ActionContext {
- local: true,
+ proc_on_local: true,
..Default::default()
}
}
@@ -50,7 +53,7 @@ impl ActionContext {
/// Generate remote context
pub fn remote() -> Self {
ActionContext {
- local: false,
+ proc_on_local: false,
..Default::default()
}
}
@@ -75,13 +78,23 @@ impl ActionContext {
impl ActionContext {
/// Whether the action is executed locally
- pub fn is_local(&self) -> bool {
- self.local
+ pub fn is_proc_on_local(&self) -> bool {
+ self.proc_on_local
}
/// Whether the action is executed remotely
- pub fn is_remote(&self) -> bool {
- !self.local
+ pub fn is_proc_on_remote(&self) -> bool {
+ !self.proc_on_local
+ }
+
+ /// Whether the action being executed in the current context is a remote action
+ pub fn is_remote_action(&self) -> bool {
+ self.is_remote_action
+ }
+
+ /// Set whether the action being executed in the current context is a remote action
+ pub fn set_is_remote_action(&mut self, is_remote_action: bool) {
+ self.is_remote_action = is_remote_action;
}
/// Get the connection instance in the current context
diff --git a/crates/system_action/src/action_pool.rs b/crates/system_action/src/action_pool.rs
index c28de1e..c3ad4a1 100644
--- a/crates/system_action/src/action_pool.rs
+++ b/crates/system_action/src/action_pool.rs
@@ -7,7 +7,7 @@ use tcp_connection::error::TcpTargetError;
use crate::action::{Action, ActionContext};
type ProcBeginCallback = for<'a> fn(
- &'a ActionContext,
+ &'a mut ActionContext,
args: &'a (dyn std::any::Any + Send + Sync),
) -> ProcBeginFuture<'a>;
type ProcEndCallback = fn() -> ProcEndFuture;
@@ -94,9 +94,9 @@ impl ActionPool {
if let Some(action) = self.actions.get(action_name) {
// Set action name and args in context for callbacks
let context = context.set_action_name(action_name.to_string());
- let context = context.set_action_args(args_json.clone());
+ let mut context = context.set_action_args(args_json.clone());
- self.exec_on_proc_begin(&context, &args_json).await?;
+ self.exec_on_proc_begin(&mut context, &args_json).await?;
let result = action.process_json_erased(context, args_json).await?;
self.exec_on_proc_end().await?;
Ok(result)
@@ -114,7 +114,7 @@ impl ActionPool {
pub async fn process<'a, Args, Return>(
&'a self,
action_name: &'a str,
- context: ActionContext,
+ mut context: ActionContext,
args: Args,
) -> Result<Return, TcpTargetError>
where
@@ -122,7 +122,7 @@ impl ActionPool {
Return: serde::Serialize + Send + 'static,
{
if let Some(action) = self.actions.get(action_name) {
- self.exec_on_proc_begin(&context, &args).await?;
+ self.exec_on_proc_begin(&mut context, &args).await?;
let result = action.process_erased(context, Box::new(args)).await?;
let result = *result
.downcast::<Return>()
@@ -137,7 +137,7 @@ impl ActionPool {
/// Executes the process begin callback if set
async fn exec_on_proc_begin(
&self,
- context: &ActionContext,
+ context: &mut ActionContext,
args: &(dyn std::any::Any + Send + Sync),
) -> Result<(), TcpTargetError> {
if let Some(callback) = &self.on_proc_begin {
diff --git a/crates/system_action/src/lib.rs b/crates/system_action/src/lib.rs
index 07be1bb..12ae999 100644
--- a/crates/system_action/src/lib.rs
+++ b/crates/system_action/src/lib.rs
@@ -1,5 +1,6 @@
-pub use action_system_macros::*;
-pub extern crate action_system_macros as macros;
+pub mod macros {
+ pub use action_system_macros::*;
+}
pub mod action;
pub mod action_pool;
diff --git a/crates/utils/tcp_connection/src/error.rs b/crates/utils/tcp_connection/src/error.rs
index 28e33d3..cfea060 100644
--- a/crates/utils/tcp_connection/src/error.rs
+++ b/crates/utils/tcp_connection/src/error.rs
@@ -44,6 +44,9 @@ pub enum TcpTargetError {
#[error("Not found: {0}")]
NotFound(String),
+
+ #[error("Locked: {0}")]
+ Locked(String),
}
impl From<io::Error> for TcpTargetError {
diff --git a/crates/vcs_actions/src/actions/local_actions.rs b/crates/vcs_actions/src/actions/local_actions.rs
index b11a934..f705692 100644
--- a/crates/vcs_actions/src/actions/local_actions.rs
+++ b/crates/vcs_actions/src/actions/local_actions.rs
@@ -1,9 +1,14 @@
-use action_system::{action::ActionContext, action_gen};
+use std::net::SocketAddr;
+
+use action_system::{action::ActionContext, macros::action_gen};
use log::info;
use tcp_connection::error::TcpTargetError;
#[action_gen]
-pub async fn hello_world_action(ctx: ActionContext, _n: ()) -> Result<(), TcpTargetError> {
+pub async fn set_upstream_vault_action(
+ ctx: ActionContext,
+ _upstream: SocketAddr,
+) -> Result<(), TcpTargetError> {
// Ensure the instance is available
let Some(instance) = ctx.instance() else {
return Err(TcpTargetError::NotFound(
@@ -11,14 +16,14 @@ pub async fn hello_world_action(ctx: ActionContext, _n: ()) -> Result<(), TcpTar
));
};
- if ctx.is_local() {
+ if ctx.is_proc_on_local() {
// Invoke on local
// Send the message to the server
let _ = instance.lock().await.write_text("Hello World!").await;
- } else if ctx.is_remote() {
- // Read the message from the client
+ } else if ctx.is_proc_on_remote() {
+ // Remote execution - read the message from the client
let read = instance.lock().await.read_text().await?;
- info!("{}", read)
+ info!("Received: {}", read)
}
Ok(())
diff --git a/crates/vcs_actions/src/connection/action_service.rs b/crates/vcs_actions/src/connection/action_service.rs
index c302fd4..f787cae 100644
--- a/crates/vcs_actions/src/connection/action_service.rs
+++ b/crates/vcs_actions/src/connection/action_service.rs
@@ -1,7 +1,13 @@
-use std::{net::SocketAddr, path::PathBuf, sync::Arc};
+use std::{
+ net::SocketAddr,
+ path::PathBuf,
+ sync::Arc,
+ time::{Duration, Instant},
+};
use action_system::{action::ActionContext, action_pool::ActionPool};
use cfg_file::config::ConfigFile;
+use log::{error, info, warn};
use tcp_connection::{error::TcpTargetError, instance::ConnectionInstance};
use tokio::{
net::{TcpListener, TcpStream},
@@ -25,6 +31,12 @@ pub async fn server_entry(vault_path: impl Into<PathBuf>) -> Result<(), TcpTarge
// Initialize the vault
let vault: Arc<Vault> = init_vault(vault_cfg, vault_path.into()).await?;
+ // Lock the vault
+ vault.lock().map_err(|e| {
+ error!("{}", e);
+ TcpTargetError::Locked(e.to_string())
+ })?;
+
// Create ActionPool
let action_pool: Arc<ActionPool> = Arc::new(server_action_pool());
@@ -32,6 +44,9 @@ pub async fn server_entry(vault_path: impl Into<PathBuf>) -> Result<(), TcpTarge
let (_shutdown_rx, future) = build_server_future(vault.clone(), action_pool.clone(), listener);
future.await?; // Start and block until shutdown
+ // Unlock the vault
+ vault.unlock()?;
+
Ok(())
}
@@ -67,11 +82,32 @@ fn build_server_future(
let mut active_connections = 0;
let mut shutdown_requested = false;
- // Spawn task to handle Ctrl+C
+ // Spawn task to handle Ctrl+C with rapid exit detection
let shutdown_tx_clone = shutdown_tx.clone();
spawn(async move {
- if let Ok(()) = signal::ctrl_c().await {
+ let mut ctrl_c_count = 0;
+ let mut last_ctrl_c_time = Instant::now();
+
+ while let Ok(()) = signal::ctrl_c().await {
+ let now = Instant::now();
+
+ // Reset counter if more than 5 seconds have passed
+ if now.duration_since(last_ctrl_c_time) > Duration::from_secs(5) {
+ ctrl_c_count = 0;
+ }
+
+ ctrl_c_count += 1;
+ last_ctrl_c_time = now;
+
let _ = shutdown_tx_clone.send(()).await;
+
+ // If 3 Ctrl+C within 5 seconds, exit immediately
+ if ctrl_c_count >= 3 {
+ info!("Shutdown. (3/3)");
+ std::process::exit(0);
+ } else {
+ info!("Ctrl + C to force shutdown. ({} / 3)", ctrl_c_count);
+ }
}
});
@@ -82,14 +118,16 @@ fn build_server_future(
accept_result = listener.accept(), if !shutdown_requested => {
match accept_result {
Ok((stream, _addr)) => {
- active_connections += 1;
+ info!("New connection. (now {})", active_connections);
let _ = tx.send(1).await;
let vault_clone = vault.clone();
let action_pool_clone = action_pool.clone();
let tx_clone = tx.clone();
+
spawn(async move {
process_connection(stream, vault_clone, action_pool_clone).await;
+ info!("A connection closed. (now {})", active_connections);
let _ = tx_clone.send(-1).await;
});
}
@@ -114,7 +152,10 @@ fn build_server_future(
shutdown_requested = true;
// If no active connections, break immediately
if active_connections == 0 {
+ info!("No active connections. Shutting down.");
break;
+ } else {
+ warn!("Cannot shutdown while active connections exist! ({} active)", active_connections);
}
}
}
@@ -131,8 +172,12 @@ async fn process_connection(stream: TcpStream, vault: Arc<Vault>, action_pool: A
let mut instance = ConnectionInstance::from(stream);
// Read action name and action arguments
- let Ok(msg) = instance.read_msgpack::<RemoteActionInvoke>().await else {
- return;
+ let msg = match instance.read_msgpack::<RemoteActionInvoke>().await {
+ Ok(msg) => msg,
+ Err(e) => {
+ error!("Failed to read action message: {}", e);
+ return;
+ }
};
// Build context
@@ -141,11 +186,21 @@ async fn process_connection(stream: TcpStream, vault: Arc<Vault>, action_pool: A
// Insert vault into context
let ctx = ctx.insert_arc(vault);
+ info!(
+ "Process action `{}` with argument `{}`",
+ msg.action_name, msg.action_args_json
+ );
+
// Process action
- let Ok(_result_json) = action_pool
+ let result = action_pool
.process_json(&msg.action_name, ctx, msg.action_args_json)
- .await
- else {
- return;
+ .await;
+
+ match result {
+ Ok(_result_json) => {}
+ Err(e) => {
+ warn!("Failed to process action `{}`: {}", msg.action_name, e);
+ return;
+ }
};
}
diff --git a/crates/vcs_actions/src/registry/client_registry.rs b/crates/vcs_actions/src/registry/client_registry.rs
index 5939bed..9769750 100644
--- a/crates/vcs_actions/src/registry/client_registry.rs
+++ b/crates/vcs_actions/src/registry/client_registry.rs
@@ -2,12 +2,13 @@ use action_system::{action::ActionContext, action_pool::ActionPool};
use tcp_connection::error::TcpTargetError;
use crate::{
- actions::local_actions::register_hello_world_action, connection::protocol::RemoteActionInvoke,
+ actions::local_actions::register_set_upstream_vault_action,
+ connection::protocol::RemoteActionInvoke,
};
fn register_actions(pool: &mut ActionPool) {
// Pool register here
- register_hello_world_action(pool);
+ register_set_upstream_vault_action(pool);
}
pub fn client_action_pool() -> ActionPool {
@@ -25,11 +26,11 @@ pub fn client_action_pool() -> ActionPool {
}
async fn on_proc_begin(
- ctx: &ActionContext,
+ ctx: &mut ActionContext,
_args: &(dyn std::any::Any + Send + Sync),
) -> Result<(), TcpTargetError> {
// Is ctx remote
- let is_remote = ctx.is_remote();
+ let is_remote = ctx.is_remote_action();
// Action name and arguments
let action_name = ctx.action_name().to_string();
diff --git a/crates/vcs_actions/src/registry/server_registry.rs b/crates/vcs_actions/src/registry/server_registry.rs
index b449b68..3ecc103 100644
--- a/crates/vcs_actions/src/registry/server_registry.rs
+++ b/crates/vcs_actions/src/registry/server_registry.rs
@@ -1,9 +1,9 @@
use action_system::action_pool::ActionPool;
-use crate::actions::local_actions::register_hello_world_action;
+use crate::actions::local_actions::register_set_upstream_vault_action;
pub fn server_action_pool() -> ActionPool {
let mut pool = ActionPool::new();
- register_hello_world_action(&mut pool);
+ register_set_upstream_vault_action(&mut pool);
pool
}
diff --git a/crates/vcs_data/src/constants.rs b/crates/vcs_data/src/constants.rs
index 5e147c4..7514fe2 100644
--- a/crates/vcs_data/src/constants.rs
+++ b/crates/vcs_data/src/constants.rs
@@ -12,7 +12,7 @@ pub const VAULT_HOST_NAME: &str = "host";
// Server
// Server - Vault (Main)
-pub const SERVER_FILE_VAULT: &str = "./vault.toml"; // crates::env::vault::vault_config
+pub const SERVER_FILE_VAULT: &str = "./vault.toml";
// Server - Sheets
pub const REF_SHEET_NAME: &str = "ref";
@@ -22,8 +22,8 @@ pub const SERVER_FILE_SHEET: &str = "./sheets/{sheet-name}.yaml";
// Server - Members
pub const SERVER_PATH_MEMBERS: &str = "./members/";
pub const SERVER_PATH_MEMBER_PUB: &str = "./key/";
-pub const SERVER_FILE_MEMBER_INFO: &str = "./members/{member_id}.toml"; // crates::env::member::manager
-pub const SERVER_FILE_MEMBER_PUB: &str = "./key/{member_id}.pem"; // crates::utils::tcp_connection::instance
+pub const SERVER_FILE_MEMBER_INFO: &str = "./members/{member_id}.toml";
+pub const SERVER_FILE_MEMBER_PUB: &str = "./key/{member_id}.pem";
// Server - Virtual File Storage
pub const SERVER_PATH_VF_TEMP: &str = "./.temp/{temp_name}";
@@ -32,6 +32,9 @@ pub const SERVER_PATH_VF_STORAGE: &str = "./storage/{vf_index}/{vf_id}/";
pub const SERVER_FILE_VF_VERSION_INSTANCE: &str = "./storage/{vf_index}/{vf_id}/{vf_version}.rf";
pub const SERVER_FILE_VF_META: &str = "./storage/{vf_index}/{vf_id}/meta.yaml";
+// Server - Service
+pub const SERVER_FILE_LOCKFILE: &str = "./.lock";
+
pub const SERVER_FILE_README: &str = "./README.md";
// -------------------------------------------------------------------------------------
@@ -40,10 +43,10 @@ pub const SERVER_FILE_README: &str = "./README.md";
pub const CLIENT_PATH_WORKSPACE_ROOT: &str = "./.jv/";
// Client - Workspace (Main)
-pub const CLIENT_FILE_WORKSPACE: &str = "./.jv/workspace.toml"; // crates::env::local::local_config
+pub const CLIENT_FILE_WORKSPACE: &str = "./.jv/workspace.toml";
// Client - Other
-pub const CLIENT_FILE_IGNOREFILES: &str = ".jgnore .gitignore"; // Support gitignore file.
+pub const CLIENT_FILE_IGNOREFILES: &str = "IGNORE_RULES.toml";
pub const CLIENT_FILE_README: &str = "./README.md";
// -------------------------------------------------------------------------------------
diff --git a/crates/vcs_data/src/data/local.rs b/crates/vcs_data/src/data/local.rs
index 1c99832..c93bd2b 100644
--- a/crates/vcs_data/src/data/local.rs
+++ b/crates/vcs_data/src/data/local.rs
@@ -93,7 +93,7 @@ Without these credentials, the server will reject all access requests.
}
/// Setup local workspace in current directory
- pub async fn setup_local_workspacecurrent_dir() -> Result<(), std::io::Error> {
+ pub async fn setup_local_workspace_current_dir() -> Result<(), std::io::Error> {
Self::setup_local_workspace(current_dir()?).await?;
Ok(())
}
diff --git a/crates/vcs_data/src/data/vault.rs b/crates/vcs_data/src/data/vault.rs
index 5d17a81..80ebe1d 100644
--- a/crates/vcs_data/src/data/vault.rs
+++ b/crates/vcs_data/src/data/vault.rs
@@ -17,6 +17,7 @@ use crate::{
pub mod config;
pub mod member;
+pub mod service;
pub mod sheets;
pub mod virtual_file;
diff --git a/crates/vcs_data/src/data/vault/service.rs b/crates/vcs_data/src/data/vault/service.rs
new file mode 100644
index 0000000..9fdce85
--- /dev/null
+++ b/crates/vcs_data/src/data/vault/service.rs
@@ -0,0 +1,43 @@
+use std::path::PathBuf;
+
+use crate::{constants::SERVER_FILE_LOCKFILE, data::vault::Vault};
+
+impl Vault {
+ /// Get the path of the lock file for the current Vault
+ pub fn lock_file_path(&self) -> PathBuf {
+ self.vault_path().join(SERVER_FILE_LOCKFILE)
+ }
+
+ /// Check if the current Vault is locked
+ pub fn is_locked(&self) -> bool {
+ self.lock_file_path().exists()
+ }
+
+ /// Lock the current Vault
+ pub fn lock(&self) -> Result<(), std::io::Error> {
+ if self.is_locked() {
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::AlreadyExists,
+ format!(
+ "Vault is already locked at {}. \
+ To unlock, please stop any running services. \
+ If you are certain no services are running, \
+ please delete this file",
+ self.lock_file_path().display()
+ ),
+ ));
+ }
+ std::fs::File::create(self.lock_file_path())?;
+ Ok(())
+ }
+
+ /// Unlock the current Vault
+ pub fn unlock(&self) -> Result<(), std::io::Error> {
+ if let Err(e) = std::fs::remove_file(self.lock_file_path()) {
+ if e.kind() != std::io::ErrorKind::NotFound {
+ return Err(e);
+ }
+ }
+ Ok(())
+ }
+}
diff --git a/examples/src/bin/example_action_system.rs b/examples/src/bin/example_action_system.rs
index c873a2e..776c982 100644
--- a/examples/src/bin/example_action_system.rs
+++ b/examples/src/bin/example_action_system.rs
@@ -1,4 +1,4 @@
-use action_system::{action::ActionContext, action_gen, action_pool::ActionPool};
+use action_system::{action::ActionContext, action_pool::ActionPool, macros::action_gen};
use tcp_connection::error::TcpTargetError;
#[tokio::main]
diff --git a/src/lib.rs b/src/lib.rs
index 29855e3..4771dcd 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,13 +1,19 @@
// Feature `vcs`
#[cfg(feature = "vcs")]
pub mod vcs {
- extern crate vcs_data;
+ pub extern crate vcs_data;
pub use vcs_data::*;
- extern crate vcs_actions;
+ pub extern crate vcs_actions;
pub use vcs_actions::*;
}
+pub mod system {
+ pub mod action_system {
+ pub use action_system::*;
+ }
+}
+
pub mod utils {
// Feature `tcp_connection`
#[cfg(feature = "tcp_connection")]