From 47e0ffd50427440696c245814517e4f5fa94ed83 Mon Sep 17 00:00:00 2001 From: 魏曹先生 <1992414357@qq.com> Date: Sun, 8 Mar 2026 22:48:54 +0800 Subject: Move action system to legacy and remove storage system --- Cargo.lock | 19 - Cargo.toml | 8 +- legacy_actions/Cargo.toml | 2 +- legacy_data/Cargo.toml | 2 +- legacy_systems/action/Cargo.toml | 15 + legacy_systems/action/action_macros/Cargo.toml | 20 + legacy_systems/action/action_macros/src/lib.rs | 248 +++++++++++++ legacy_systems/action/src/action.rs | 244 ++++++++++++ legacy_systems/action/src/action_pool.rs | 247 +++++++++++++ legacy_systems/action/src/lib.rs | 6 + systems/action/Cargo.toml | 15 - systems/action/action_macros/Cargo.toml | 20 - systems/action/action_macros/src/lib.rs | 248 ------------- systems/action/src/action.rs | 244 ------------ systems/action/src/action_pool.rs | 247 ------------- systems/action/src/lib.rs | 6 - systems/storage/Cargo.toml | 13 - systems/storage/src/error.rs | 8 - systems/storage/src/lib.rs | 2 - systems/storage/src/store.rs | 493 ------------------------- systems/storage/src/store/cdc.rs | 307 --------------- systems/storage/src/store/fixed.rs | 417 --------------------- systems/storage/src/store/line.rs | 393 -------------------- 23 files changed, 785 insertions(+), 2439 deletions(-) create mode 100644 legacy_systems/action/Cargo.toml create mode 100644 legacy_systems/action/action_macros/Cargo.toml create mode 100644 legacy_systems/action/action_macros/src/lib.rs create mode 100644 legacy_systems/action/src/action.rs create mode 100644 legacy_systems/action/src/action_pool.rs create mode 100644 legacy_systems/action/src/lib.rs delete mode 100644 systems/action/Cargo.toml delete mode 100644 systems/action/action_macros/Cargo.toml delete mode 100644 systems/action/action_macros/src/lib.rs delete mode 100644 systems/action/src/action.rs delete mode 100644 systems/action/src/action_pool.rs delete mode 100644 systems/action/src/lib.rs delete mode 100644 systems/storage/Cargo.toml delete mode 100644 systems/storage/src/error.rs delete mode 100644 systems/storage/src/lib.rs delete mode 100644 systems/storage/src/store.rs delete mode 100644 systems/storage/src/store/cdc.rs delete mode 100644 systems/storage/src/store/fixed.rs delete mode 100644 systems/storage/src/store/line.rs diff --git a/Cargo.lock b/Cargo.lock index e6f0315..672700a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -856,12 +856,6 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" -[[package]] -name = "hex" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" - [[package]] name = "hex_display" version = "0.1.0" @@ -976,7 +970,6 @@ dependencies = [ "jvlib", "sha1_hash", "sheet_system", - "storage_system", "tcp_connection", "tokio", "toml", @@ -1809,18 +1802,6 @@ dependencies = [ "der", ] -[[package]] -name = "storage_system" -version = "0.1.0" -dependencies = [ - "blake3", - "hex", - "log", - "memmap2", - "thiserror 1.0.69", - "tokio", -] - [[package]] name = "subtle" version = "2.6.1" diff --git a/Cargo.toml b/Cargo.toml index 0c67d06..f49d3be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,11 +30,8 @@ members = [ "systems/_asset/test", "systems/_constants", "systems/_constants/macros", - "systems/action", - "systems/action/action_macros", "systems/sheet", "systems/sheet/macros", - "systems/storage", "utils/cfg_file", "utils/cfg_file/cfg_file_derive", "utils/cfg_file/cfg_file_test", @@ -48,6 +45,8 @@ members = [ "legacy_data", "legacy_data/tests", "legacy_actions", + "legacy_systems/action", + "legacy_systems/action/action_macros", # LEGACY AREA ] @@ -102,10 +101,9 @@ tcp_connection = { path = "utils/tcp_connection" } # Systems asset_system = { path = "systems/_asset" } constants = { path = "systems/_constants" } -action_system = { path = "systems/action" } sheet_system = { path = "systems/sheet" } -storage_system = { path = "systems/storage" } # Legacy vcs_data = { path = "legacy_data" } vcs_actions = { path = "legacy_actions" } +action_system = { path = "legacy_systems/action" } diff --git a/legacy_actions/Cargo.toml b/legacy_actions/Cargo.toml index 7c9e078..0ee8068 100644 --- a/legacy_actions/Cargo.toml +++ b/legacy_actions/Cargo.toml @@ -13,7 +13,7 @@ sha1_hash = { path = "../utils/sha1_hash" } just_fmt = "0.1.2" # Core dependencies -action_system = { path = "../systems/action" } +action_system = { path = "../legacy_systems/action" } vcs_data = { path = "../legacy_data" } # Error handling diff --git a/legacy_data/Cargo.toml b/legacy_data/Cargo.toml index 27fe6ad..e2f4dc6 100644 --- a/legacy_data/Cargo.toml +++ b/legacy_data/Cargo.toml @@ -12,7 +12,7 @@ sha1_hash = { path = "../utils/sha1_hash" } tcp_connection = { path = "../utils/tcp_connection" } # Core -action_system = { path = "../systems/action" } +action_system = { path = "../legacy_systems/action" } vcs_docs = { path = "../docs" } # Format diff --git a/legacy_systems/action/Cargo.toml b/legacy_systems/action/Cargo.toml new file mode 100644 index 0000000..5317975 --- /dev/null +++ b/legacy_systems/action/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "action_system" +edition = "2024" +version.workspace = true + +[dependencies] +tcp_connection = { path = "../../utils/tcp_connection" } +action_system_macros = { path = "action_macros" } + +# Serialization +serde = { version = "1.0.228", features = ["derive"] } +serde_json = "1.0.145" + +# Async & Networking +tokio = "1.48.0" diff --git a/legacy_systems/action/action_macros/Cargo.toml b/legacy_systems/action/action_macros/Cargo.toml new file mode 100644 index 0000000..0f209e2 --- /dev/null +++ b/legacy_systems/action/action_macros/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "action_system_macros" +edition = "2024" +version.workspace = true + +[lib] +proc-macro = true + +[dependencies] +tcp_connection = { path = "../../../utils/tcp_connection" } + +just_fmt = "0.1.2" + +syn = { version = "2.0", features = ["full", "extra-traits"] } +quote = "1.0" +proc-macro2 = "1.0" + +# Serialization +serde = { version = "1.0.228", features = ["derive"] } +serde_json = "1.0.145" diff --git a/legacy_systems/action/action_macros/src/lib.rs b/legacy_systems/action/action_macros/src/lib.rs new file mode 100644 index 0000000..6da0339 --- /dev/null +++ b/legacy_systems/action/action_macros/src/lib.rs @@ -0,0 +1,248 @@ +use proc_macro::TokenStream; +use quote::quote; +use syn::{ItemFn, parse_macro_input}; + +/// # Macro - Generate Action +/// +/// When annotating a function with the `#[action_gen]` macro in the following format, it generates boilerplate code for client-server interaction +/// +/// ```ignore +/// #[action_gen] +/// async fn action_name(ctx: ActionContext, argument: YourArgument) -> Result { +/// // Write your client and server logic here +/// if ctx.is_proc_on_remote() { +/// // Server logic +/// } +/// if ctx.is_proc_on_local() { +/// // Client logic +/// } +/// } +/// ``` +/// +/// > WARNING: +/// > For Argument and Result types, the `action_gen` macro only supports types that derive serde's Serialize and Deserialize +/// +/// ## Generated Code +/// +/// `action_gen` will generate the following: +/// +/// 1. Complete implementation of Action +/// 2. Process / Register method +/// +/// ## How to use +/// +/// You can use your generated method as follows +/// +/// ```ignore +/// async fn main() -> Result<(), TcpTargetError> { +/// +/// // Prepare your argument +/// let args = YourArgument::default(); +/// +/// // Create a pool and context +/// let mut pool = ActionPool::new(); +/// let ctx = ActionContext::local(); +/// +/// // Register your action +/// register_your_action(&mut pool); +/// +/// // Process your action +/// proc_your_action(&pool, ctx, args).await?; +/// +/// Ok(()) +/// } +/// ``` +#[proc_macro_attribute] +pub fn action_gen(attr: TokenStream, item: TokenStream) -> TokenStream { + let input_fn = parse_macro_input!(item as ItemFn); + let is_local = if attr.is_empty() { + false + } else { + let attr_str = attr.to_string(); + attr_str == "local" || attr_str.contains("local") + }; + + generate_action_struct(input_fn, is_local).into() +} + +fn generate_action_struct(input_fn: ItemFn, _is_local: bool) -> proc_macro2::TokenStream { + let fn_vis = &input_fn.vis; + let fn_sig = &input_fn.sig; + let fn_name = &fn_sig.ident; + let fn_block = &input_fn.block; + + validate_function_signature(fn_sig); + + let (context_param_name, arg_param_name, arg_type, return_type) = + extract_parameters_and_types(fn_sig); + + let struct_name = quote::format_ident!("{}", convert_to_pascal_case(&fn_name.to_string())); + + let action_name_ident = &fn_name; + + let register_this_action = quote::format_ident!("register_{}", action_name_ident); + let proc_this_action = quote::format_ident!("proc_{}", action_name_ident); + + quote! { + #[derive(Debug, Clone, Default)] + #fn_vis struct #struct_name; + + impl action_system::action::Action<#arg_type, #return_type> for #struct_name { + fn action_name() -> &'static str { + Box::leak(just_fmt::snake_case!(stringify!(#action_name_ident)).into_boxed_str()) + } + + fn is_remote_action() -> bool { + !#_is_local + } + + async fn process(#context_param_name: action_system::action::ActionContext, #arg_param_name: #arg_type) -> Result<#return_type, tcp_connection::error::TcpTargetError> { + #fn_block + } + } + + #fn_vis fn #register_this_action(pool: &mut action_system::action_pool::ActionPool) { + pool.register::<#struct_name, #arg_type, #return_type>(); + } + + #fn_vis async fn #proc_this_action( + pool: &action_system::action_pool::ActionPool, + mut ctx: action_system::action::ActionContext, + #arg_param_name: #arg_type + ) -> Result<#return_type, tcp_connection::error::TcpTargetError> { + ctx.set_is_remote_action(!#_is_local); + let args_json = serde_json::to_string(&#arg_param_name) + .map_err(|e| { + tcp_connection::error::TcpTargetError::Serialization(e.to_string()) + })?; + let result_json = pool.process_json( + Box::leak(just_fmt::snake_case!(stringify!(#action_name_ident)).into_boxed_str()), // LEAK??? OH SHIT + ctx, + args_json, + ).await?; + serde_json::from_str(&result_json) + .map_err(|e| { + tcp_connection::error::TcpTargetError::Serialization(e.to_string()) + }) + } + + #[allow(dead_code)] + #[deprecated = "This function is used by #[action_gen] as a template."] + #[doc = "Template function for #[[action_gen]] - do not call directly."] + #[doc = "Use the generated struct instead."] + #[doc = ""] + #[doc = "Register the action to the pool."] + #[doc = "```ignore"] + #[doc = "register_your_func(&mut pool);"] + #[doc = "```"] + #[doc = ""] + #[doc = "Process the action at the pool."] + #[doc = "```ignore"] + #[doc = "let result = proc_your_func(&pool, ctx, arg).await?;"] + #[doc = "```"] + #fn_vis #fn_sig #fn_block + } +} + +fn validate_function_signature(fn_sig: &syn::Signature) { + if fn_sig.asyncness.is_none() { + panic!("Expected async function for Action, but found synchronous function"); + } + + if fn_sig.inputs.len() != 2 { + panic!( + "Expected exactly 2 arguments for Action function: ctx: ActionContext and arg: T, but found {} arguments", + fn_sig.inputs.len() + ); + } + + let return_type = match &fn_sig.output { + syn::ReturnType::Type(_, ty) => ty, + _ => panic!( + "Expected Action function to return Result, but found no return type" + ), + }; + + if let syn::Type::Path(type_path) = return_type.as_ref() { + if let Some(segment) = type_path.path.segments.last() + && segment.ident != "Result" + { + panic!( + "Expected Action function to return Result, but found different return type" + ); + } + } else { + panic!( + "Expected Action function to return Result, but found no return type" + ); + } +} + +fn convert_to_pascal_case(s: &str) -> String { + s.split('_') + .map(|word| { + let mut chars = word.chars(); + match chars.next() { + None => String::new(), + Some(first) => first.to_uppercase().collect::() + chars.as_str(), + } + }) + .collect() +} + +fn extract_parameters_and_types( + fn_sig: &syn::Signature, +) -> ( + proc_macro2::TokenStream, + proc_macro2::TokenStream, + proc_macro2::TokenStream, + proc_macro2::TokenStream, +) { + let mut inputs = fn_sig.inputs.iter(); + + let context_param = match inputs.next() { + Some(syn::FnArg::Typed(pat_type)) => { + let pat = &pat_type.pat; + quote::quote!(#pat) + } + _ => { + panic!("Expected the first argument to be a typed parameter, but found something else") + } + }; + + let arg_param = match inputs.next() { + Some(syn::FnArg::Typed(pat_type)) => { + let pat = &pat_type.pat; + let ty = &pat_type.ty; + (quote::quote!(#pat), quote::quote!(#ty)) + } + _ => { + panic!("Expected the second argument to be a typed parameter, but found something else") + } + }; + + let (arg_param_name, arg_type) = arg_param; + + let return_type = match &fn_sig.output { + syn::ReturnType::Type(_, ty) => { + if let syn::Type::Path(type_path) = ty.as_ref() { + if let syn::PathArguments::AngleBracketed(args) = + &type_path.path.segments.last().unwrap().arguments + { + if let Some(syn::GenericArgument::Type(ty)) = args.args.first() { + quote::quote!(#ty) + } else { + panic!("Expected to extract the success type of Result, but failed"); + } + } else { + panic!("Expected Result type to have generic parameters, but found none"); + } + } else { + panic!("Expected return type to be Result, but found different type"); + } + } + _ => panic!("Expected function to have return type, but found none"), + }; + + (context_param, arg_param_name, arg_type, return_type) +} diff --git a/legacy_systems/action/src/action.rs b/legacy_systems/action/src/action.rs new file mode 100644 index 0000000..62425ff --- /dev/null +++ b/legacy_systems/action/src/action.rs @@ -0,0 +1,244 @@ +use serde::{Serialize, de::DeserializeOwned}; +use std::any::{Any, TypeId}; +use std::collections::HashMap; +use std::sync::Arc; +use tcp_connection::{error::TcpTargetError, instance::ConnectionInstance}; +use tokio::{net::TcpStream, sync::Mutex}; + +/// # Trait - Action +/// +/// A trait used to describe the interaction pattern between client and server +/// +/// ## Generics +/// +/// Args: Represents the parameter type required for this action +/// +/// Return: Represents the return type of this action +/// +/// The above generics must implement serde's Serialize and DeserializeOwned traits, +/// and must be sendable between threads +/// +/// ## Implementation +/// +/// ```ignore +/// pub trait Action +/// where +/// Args: Serialize + DeserializeOwned + Send, +/// Return: Serialize + DeserializeOwned + Send, +/// { +/// /// Name, used to inform the server which action to execute +/// fn action_name() -> &'static str; +/// +/// /// Whether it's a local Action, used to inform the system if it only runs locally +/// fn is_remote_action() -> bool; +/// +/// /// Action processing logic +/// fn process( +/// context: ActionContext, +/// args: Args, +/// ) -> impl std::future::Future> + Send; +/// } +/// ``` +pub trait Action +where + Args: Serialize + DeserializeOwned + Send, + Return: Serialize + DeserializeOwned + Send, +{ + fn action_name() -> &'static str; + + fn is_remote_action() -> bool; + + fn process( + context: ActionContext, + args: Args, + ) -> impl std::future::Future> + Send; +} + +/// # Struct - ActionContext +/// +/// Used to inform the Action about the current execution environment +/// +/// ## Creation +/// +/// Create ActionContext using the following methods: +/// +/// ```ignore +/// +/// // The instance here is the connection instance passed from external sources for communicating with the server +/// // For specific usage, please refer to the `/crates/utils/tcp_connection` section +/// +/// fn init_local_action_ctx(instance: ConnectionInstance) { +/// // Create context and specify execution on local +/// let mut ctx = ActionContext::local(); +/// } +/// +/// fn init_remote_action_ctx(instance: ConnectionInstance) { +/// // Create context and specify execution on remote +/// let mut ctx = ActionContext::remote(); +/// } +#[derive(Default)] +pub struct ActionContext { + /// Whether the action is executed locally or remotely + proc_on_local: bool, + + /// Whether the action being executed in the current context is a remote action + is_remote_action: bool, + + /// The name of the action being executed + action_name: String, + + /// The JSON-serialized arguments for the action + action_args_json: String, + + /// The connection instance in the current context, + instance: Option>>, + + /// Generic data storage for arbitrary types + data: HashMap>, +} + +impl ActionContext { + /// Generate local context + pub fn local() -> Self { + ActionContext { + proc_on_local: true, + ..Default::default() + } + } + + /// Generate remote context + pub fn remote() -> Self { + ActionContext { + proc_on_local: false, + ..Default::default() + } + } + + /// Build connection instance from TcpStream + pub fn build_instance(mut self, stream: TcpStream) -> Self { + self.instance = Some(Arc::new(Mutex::new(ConnectionInstance::from(stream)))); + self + } + + /// Insert connection instance into context + pub fn insert_instance(mut self, instance: ConnectionInstance) -> Self { + self.instance = Some(Arc::new(Mutex::new(instance))); + self + } + + /// Pop connection instance from context + pub fn pop_instance(&mut self) -> Option>> { + self.instance.take() + } +} + +impl ActionContext { + /// Whether the action is executed locally + pub fn is_proc_on_local(&self) -> bool { + self.proc_on_local + } + + /// Whether the action is executed remotely + pub fn is_proc_on_remote(&self) -> bool { + !self.proc_on_local + } + + /// Whether the action being executed in the current context is a remote action + pub fn is_remote_action(&self) -> bool { + self.is_remote_action + } + + /// Set whether the action being executed in the current context is a remote action + pub fn set_is_remote_action(&mut self, is_remote_action: bool) { + self.is_remote_action = is_remote_action; + } + + /// Get the connection instance in the current context + pub fn instance(&self) -> &Option>> { + &self.instance + } + + /// Get a mutable reference to the connection instance in the current context + pub fn instance_mut(&mut self) -> &mut Option>> { + &mut self.instance + } + + /// Get the action name from the context + pub fn action_name(&self) -> &str { + &self.action_name + } + + /// Get the action arguments from the context + pub fn action_args_json(&self) -> &String { + &self.action_args_json + } + + /// Set the action name in the context + pub fn set_action_name(mut self, action_name: String) -> Self { + self.action_name = action_name; + self + } + + /// Set the action arguments in the context + pub fn set_action_args(mut self, action_args: String) -> Self { + self.action_args_json = action_args; + self + } + + /// Insert arbitrary data in the context + pub fn with_data(mut self, value: T) -> Self { + self.data.insert(TypeId::of::(), Arc::new(value)); + self + } + + /// Insert arbitrary data as Arc in the context + pub fn with_arc_data(mut self, value: Arc) -> Self { + self.data.insert(TypeId::of::(), value); + self + } + + /// Insert arbitrary data in the context + pub fn insert_data(&mut self, value: T) { + self.data.insert(TypeId::of::(), Arc::new(value)); + } + + /// Insert arbitrary data as Arc in the context + pub fn insert_arc_data(&mut self, value: Arc) { + self.data.insert(TypeId::of::(), value); + } + + /// Get arbitrary data from the context + pub fn get(&self) -> Option<&T> { + self.data + .get(&TypeId::of::()) + .and_then(|arc| arc.downcast_ref::()) + } + + /// Get arbitrary data as Arc from the context + pub fn get_arc(&self) -> Option> { + self.data + .get(&TypeId::of::()) + .and_then(|arc| Arc::clone(arc).downcast::().ok()) + } + + /// Remove and return arbitrary data from the context + pub fn remove(&mut self) -> Option> { + self.data + .remove(&TypeId::of::()) + .and_then(|arc| arc.downcast::().ok()) + } + + /// Check if the context contains data of a specific type + pub fn contains(&self) -> bool { + self.data.contains_key(&TypeId::of::()) + } + + /// Take ownership of the context and extract data of a specific type + pub fn take(mut self) -> (Self, Option>) { + let value = self + .data + .remove(&TypeId::of::()) + .and_then(|arc| arc.downcast::().ok()); + (self, value) + } +} diff --git a/legacy_systems/action/src/action_pool.rs b/legacy_systems/action/src/action_pool.rs new file mode 100644 index 0000000..019fa6d --- /dev/null +++ b/legacy_systems/action/src/action_pool.rs @@ -0,0 +1,247 @@ +use std::pin::Pin; + +use serde::{Serialize, de::DeserializeOwned}; +use serde_json; +use tcp_connection::error::TcpTargetError; + +use crate::action::{Action, ActionContext}; + +type ProcBeginCallback = for<'a> fn( + &'a mut ActionContext, + args: &'a (dyn std::any::Any + Send + Sync), +) -> ProcBeginFuture<'a>; +type ProcEndCallback = fn() -> ProcEndFuture; + +type ProcBeginFuture<'a> = Pin> + Send + 'a>>; +type ProcEndFuture = Pin> + Send>>; + +/// # Struct - ActionPool +/// +/// This struct is used to register and record all accessible and executable actions +/// +/// It also registers `on_proc_begin` and `on_proc_end` callback functions +/// used for action initialization +/// +/// ## Creating and registering actions +/// ```ignore +/// fn init_action_pool() { +/// let mut pool = Action::new(); +/// +/// // Register action +/// pool.register(); +/// +/// // If the action is implemented with `#[action_gen]`, you can also do +/// register_your_action(&mut pool); +/// } +/// ``` +pub struct ActionPool { + /// HashMap storing action name to action implementation mapping + actions: std::collections::HashMap<&'static str, Box>, + + /// Callback to execute when process begins + on_proc_begin: Option, + + /// Callback to execute when process ends + on_proc_end: Option, +} + +impl Default for ActionPool { + fn default() -> Self { + Self::new() + } +} + +impl ActionPool { + /// Creates a new empty ActionPool + pub fn new() -> Self { + Self { + actions: std::collections::HashMap::new(), + on_proc_begin: None, + on_proc_end: None, + } + } + + /// Sets a callback to be executed when process begins + pub fn set_on_proc_begin(&mut self, callback: ProcBeginCallback) { + self.on_proc_begin = Some(callback); + } + + /// Sets a callback to be executed when process ends + pub fn set_on_proc_end(&mut self, callback: ProcEndCallback) { + self.on_proc_end = Some(callback); + } + + /// Registers an action type with the pool + /// + /// Usage: + /// ```ignore + /// action_pool.register::(); + /// ``` + pub fn register(&mut self) + where + A: Action + Send + Sync + 'static, + Args: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static, + Return: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static, + { + let action_name = A::action_name(); + self.actions.insert( + action_name, + Box::new(ActionWrapper::(std::marker::PhantomData)), + ); + } + + /// Processes an action by name with given context and arguments + /// + /// Usage: + /// ```ignore + /// let result = action_pool.process::("my_action", context, args).await?; + /// ``` + /// Processes an action by name with JSON-serialized arguments + /// + /// Usage: + /// ```ignore + /// let result_json = action_pool.process_json("my_action", context, args_json).await?; + /// let result: MyReturn = serde_json::from_str(&result_json)?; + /// ``` + pub async fn process_json<'a>( + &'a self, + action_name: &'a str, + context: ActionContext, + args_json: String, + ) -> Result { + if let Some(action) = self.actions.get(action_name) { + // Set action name and args in context for callbacks + let context = context.set_action_name(action_name.to_string()); + let mut context = context.set_action_args(args_json.clone()); + + self.exec_on_proc_begin(&mut context, &args_json).await?; + let result = action.process_json_erased(context, args_json).await?; + self.exec_on_proc_end().await?; + Ok(result) + } else { + Err(TcpTargetError::Unsupported("InvalidAction".to_string())) + } + } + + /// Processes an action by name with given context and arguments + /// + /// Usage: + /// ```ignore + /// let result = action_pool.process::("my_action", context, args).await?; + /// ``` + pub async fn process<'a, Args, Return>( + &'a self, + action_name: &'a str, + mut context: ActionContext, + args: Args, + ) -> Result + where + Args: serde::de::DeserializeOwned + Send + Sync + 'static, + Return: serde::Serialize + Send + 'static, + { + if let Some(action) = self.actions.get(action_name) { + self.exec_on_proc_begin(&mut context, &args).await?; + let result = action.process_erased(context, Box::new(args)).await?; + let result = *result + .downcast::() + .map_err(|_| TcpTargetError::Unsupported("InvalidArguments".to_string()))?; + self.exec_on_proc_end().await?; + Ok(result) + } else { + Err(TcpTargetError::Unsupported("InvalidAction".to_string())) + } + } + + /// Executes the process begin callback if set + async fn exec_on_proc_begin( + &self, + context: &mut ActionContext, + args: &(dyn std::any::Any + Send + Sync), + ) -> Result<(), TcpTargetError> { + if let Some(callback) = &self.on_proc_begin { + callback(context, args).await + } else { + Ok(()) + } + } + + /// Executes the process end callback if set + async fn exec_on_proc_end(&self) -> Result<(), TcpTargetError> { + if let Some(callback) = &self.on_proc_end { + callback().await + } else { + Ok(()) + } + } +} + +/// Trait for type-erased actions that can be stored in ActionPool +type ProcessErasedFuture = std::pin::Pin< + Box< + dyn std::future::Future, TcpTargetError>> + + Send, + >, +>; +type ProcessJsonErasedFuture = + std::pin::Pin> + Send>>; + +trait ActionErased: Send + Sync { + /// Processes the action with type-erased arguments and returns type-erased result + fn process_erased( + &self, + context: ActionContext, + args: Box, + ) -> ProcessErasedFuture; + + /// Processes the action with JSON-serialized arguments and returns JSON-serialized result + fn process_json_erased( + &self, + context: ActionContext, + args_json: String, + ) -> ProcessJsonErasedFuture; +} + +/// Wrapper struct that implements ActionErased for concrete Action types +struct ActionWrapper(std::marker::PhantomData<(A, Args, Return)>); + +impl ActionErased for ActionWrapper +where + A: Action + Send + Sync, + Args: Serialize + DeserializeOwned + Send + Sync + 'static, + Return: Serialize + DeserializeOwned + Send + Sync + 'static, +{ + fn process_erased( + &self, + context: ActionContext, + args: Box, + ) -> std::pin::Pin< + Box< + dyn std::future::Future, TcpTargetError>> + + Send, + >, + > { + Box::pin(async move { + let args = *args + .downcast::() + .map_err(|_| TcpTargetError::Unsupported("InvalidArguments".to_string()))?; + let result = A::process(context, args).await?; + Ok(Box::new(result) as Box) + }) + } + + fn process_json_erased( + &self, + context: ActionContext, + args_json: String, + ) -> std::pin::Pin> + Send>> + { + Box::pin(async move { + let args: Args = serde_json::from_str(&args_json) + .map_err(|e| TcpTargetError::Serialization(format!("Deserialize failed: {}", e)))?; + let result = A::process(context, args).await?; + let result_json = serde_json::to_string(&result) + .map_err(|e| TcpTargetError::Serialization(format!("Serialize failed: {}", e)))?; + Ok(result_json) + }) + } +} diff --git a/legacy_systems/action/src/lib.rs b/legacy_systems/action/src/lib.rs new file mode 100644 index 0000000..12ae999 --- /dev/null +++ b/legacy_systems/action/src/lib.rs @@ -0,0 +1,6 @@ +pub mod macros { + pub use action_system_macros::*; +} + +pub mod action; +pub mod action_pool; diff --git a/systems/action/Cargo.toml b/systems/action/Cargo.toml deleted file mode 100644 index 5317975..0000000 --- a/systems/action/Cargo.toml +++ /dev/null @@ -1,15 +0,0 @@ -[package] -name = "action_system" -edition = "2024" -version.workspace = true - -[dependencies] -tcp_connection = { path = "../../utils/tcp_connection" } -action_system_macros = { path = "action_macros" } - -# Serialization -serde = { version = "1.0.228", features = ["derive"] } -serde_json = "1.0.145" - -# Async & Networking -tokio = "1.48.0" diff --git a/systems/action/action_macros/Cargo.toml b/systems/action/action_macros/Cargo.toml deleted file mode 100644 index 0f209e2..0000000 --- a/systems/action/action_macros/Cargo.toml +++ /dev/null @@ -1,20 +0,0 @@ -[package] -name = "action_system_macros" -edition = "2024" -version.workspace = true - -[lib] -proc-macro = true - -[dependencies] -tcp_connection = { path = "../../../utils/tcp_connection" } - -just_fmt = "0.1.2" - -syn = { version = "2.0", features = ["full", "extra-traits"] } -quote = "1.0" -proc-macro2 = "1.0" - -# Serialization -serde = { version = "1.0.228", features = ["derive"] } -serde_json = "1.0.145" diff --git a/systems/action/action_macros/src/lib.rs b/systems/action/action_macros/src/lib.rs deleted file mode 100644 index 6da0339..0000000 --- a/systems/action/action_macros/src/lib.rs +++ /dev/null @@ -1,248 +0,0 @@ -use proc_macro::TokenStream; -use quote::quote; -use syn::{ItemFn, parse_macro_input}; - -/// # Macro - Generate Action -/// -/// When annotating a function with the `#[action_gen]` macro in the following format, it generates boilerplate code for client-server interaction -/// -/// ```ignore -/// #[action_gen] -/// async fn action_name(ctx: ActionContext, argument: YourArgument) -> Result { -/// // Write your client and server logic here -/// if ctx.is_proc_on_remote() { -/// // Server logic -/// } -/// if ctx.is_proc_on_local() { -/// // Client logic -/// } -/// } -/// ``` -/// -/// > WARNING: -/// > For Argument and Result types, the `action_gen` macro only supports types that derive serde's Serialize and Deserialize -/// -/// ## Generated Code -/// -/// `action_gen` will generate the following: -/// -/// 1. Complete implementation of Action -/// 2. Process / Register method -/// -/// ## How to use -/// -/// You can use your generated method as follows -/// -/// ```ignore -/// async fn main() -> Result<(), TcpTargetError> { -/// -/// // Prepare your argument -/// let args = YourArgument::default(); -/// -/// // Create a pool and context -/// let mut pool = ActionPool::new(); -/// let ctx = ActionContext::local(); -/// -/// // Register your action -/// register_your_action(&mut pool); -/// -/// // Process your action -/// proc_your_action(&pool, ctx, args).await?; -/// -/// Ok(()) -/// } -/// ``` -#[proc_macro_attribute] -pub fn action_gen(attr: TokenStream, item: TokenStream) -> TokenStream { - let input_fn = parse_macro_input!(item as ItemFn); - let is_local = if attr.is_empty() { - false - } else { - let attr_str = attr.to_string(); - attr_str == "local" || attr_str.contains("local") - }; - - generate_action_struct(input_fn, is_local).into() -} - -fn generate_action_struct(input_fn: ItemFn, _is_local: bool) -> proc_macro2::TokenStream { - let fn_vis = &input_fn.vis; - let fn_sig = &input_fn.sig; - let fn_name = &fn_sig.ident; - let fn_block = &input_fn.block; - - validate_function_signature(fn_sig); - - let (context_param_name, arg_param_name, arg_type, return_type) = - extract_parameters_and_types(fn_sig); - - let struct_name = quote::format_ident!("{}", convert_to_pascal_case(&fn_name.to_string())); - - let action_name_ident = &fn_name; - - let register_this_action = quote::format_ident!("register_{}", action_name_ident); - let proc_this_action = quote::format_ident!("proc_{}", action_name_ident); - - quote! { - #[derive(Debug, Clone, Default)] - #fn_vis struct #struct_name; - - impl action_system::action::Action<#arg_type, #return_type> for #struct_name { - fn action_name() -> &'static str { - Box::leak(just_fmt::snake_case!(stringify!(#action_name_ident)).into_boxed_str()) - } - - fn is_remote_action() -> bool { - !#_is_local - } - - async fn process(#context_param_name: action_system::action::ActionContext, #arg_param_name: #arg_type) -> Result<#return_type, tcp_connection::error::TcpTargetError> { - #fn_block - } - } - - #fn_vis fn #register_this_action(pool: &mut action_system::action_pool::ActionPool) { - pool.register::<#struct_name, #arg_type, #return_type>(); - } - - #fn_vis async fn #proc_this_action( - pool: &action_system::action_pool::ActionPool, - mut ctx: action_system::action::ActionContext, - #arg_param_name: #arg_type - ) -> Result<#return_type, tcp_connection::error::TcpTargetError> { - ctx.set_is_remote_action(!#_is_local); - let args_json = serde_json::to_string(&#arg_param_name) - .map_err(|e| { - tcp_connection::error::TcpTargetError::Serialization(e.to_string()) - })?; - let result_json = pool.process_json( - Box::leak(just_fmt::snake_case!(stringify!(#action_name_ident)).into_boxed_str()), // LEAK??? OH SHIT - ctx, - args_json, - ).await?; - serde_json::from_str(&result_json) - .map_err(|e| { - tcp_connection::error::TcpTargetError::Serialization(e.to_string()) - }) - } - - #[allow(dead_code)] - #[deprecated = "This function is used by #[action_gen] as a template."] - #[doc = "Template function for #[[action_gen]] - do not call directly."] - #[doc = "Use the generated struct instead."] - #[doc = ""] - #[doc = "Register the action to the pool."] - #[doc = "```ignore"] - #[doc = "register_your_func(&mut pool);"] - #[doc = "```"] - #[doc = ""] - #[doc = "Process the action at the pool."] - #[doc = "```ignore"] - #[doc = "let result = proc_your_func(&pool, ctx, arg).await?;"] - #[doc = "```"] - #fn_vis #fn_sig #fn_block - } -} - -fn validate_function_signature(fn_sig: &syn::Signature) { - if fn_sig.asyncness.is_none() { - panic!("Expected async function for Action, but found synchronous function"); - } - - if fn_sig.inputs.len() != 2 { - panic!( - "Expected exactly 2 arguments for Action function: ctx: ActionContext and arg: T, but found {} arguments", - fn_sig.inputs.len() - ); - } - - let return_type = match &fn_sig.output { - syn::ReturnType::Type(_, ty) => ty, - _ => panic!( - "Expected Action function to return Result, but found no return type" - ), - }; - - if let syn::Type::Path(type_path) = return_type.as_ref() { - if let Some(segment) = type_path.path.segments.last() - && segment.ident != "Result" - { - panic!( - "Expected Action function to return Result, but found different return type" - ); - } - } else { - panic!( - "Expected Action function to return Result, but found no return type" - ); - } -} - -fn convert_to_pascal_case(s: &str) -> String { - s.split('_') - .map(|word| { - let mut chars = word.chars(); - match chars.next() { - None => String::new(), - Some(first) => first.to_uppercase().collect::() + chars.as_str(), - } - }) - .collect() -} - -fn extract_parameters_and_types( - fn_sig: &syn::Signature, -) -> ( - proc_macro2::TokenStream, - proc_macro2::TokenStream, - proc_macro2::TokenStream, - proc_macro2::TokenStream, -) { - let mut inputs = fn_sig.inputs.iter(); - - let context_param = match inputs.next() { - Some(syn::FnArg::Typed(pat_type)) => { - let pat = &pat_type.pat; - quote::quote!(#pat) - } - _ => { - panic!("Expected the first argument to be a typed parameter, but found something else") - } - }; - - let arg_param = match inputs.next() { - Some(syn::FnArg::Typed(pat_type)) => { - let pat = &pat_type.pat; - let ty = &pat_type.ty; - (quote::quote!(#pat), quote::quote!(#ty)) - } - _ => { - panic!("Expected the second argument to be a typed parameter, but found something else") - } - }; - - let (arg_param_name, arg_type) = arg_param; - - let return_type = match &fn_sig.output { - syn::ReturnType::Type(_, ty) => { - if let syn::Type::Path(type_path) = ty.as_ref() { - if let syn::PathArguments::AngleBracketed(args) = - &type_path.path.segments.last().unwrap().arguments - { - if let Some(syn::GenericArgument::Type(ty)) = args.args.first() { - quote::quote!(#ty) - } else { - panic!("Expected to extract the success type of Result, but failed"); - } - } else { - panic!("Expected Result type to have generic parameters, but found none"); - } - } else { - panic!("Expected return type to be Result, but found different type"); - } - } - _ => panic!("Expected function to have return type, but found none"), - }; - - (context_param, arg_param_name, arg_type, return_type) -} diff --git a/systems/action/src/action.rs b/systems/action/src/action.rs deleted file mode 100644 index 62425ff..0000000 --- a/systems/action/src/action.rs +++ /dev/null @@ -1,244 +0,0 @@ -use serde::{Serialize, de::DeserializeOwned}; -use std::any::{Any, TypeId}; -use std::collections::HashMap; -use std::sync::Arc; -use tcp_connection::{error::TcpTargetError, instance::ConnectionInstance}; -use tokio::{net::TcpStream, sync::Mutex}; - -/// # Trait - Action -/// -/// A trait used to describe the interaction pattern between client and server -/// -/// ## Generics -/// -/// Args: Represents the parameter type required for this action -/// -/// Return: Represents the return type of this action -/// -/// The above generics must implement serde's Serialize and DeserializeOwned traits, -/// and must be sendable between threads -/// -/// ## Implementation -/// -/// ```ignore -/// pub trait Action -/// where -/// Args: Serialize + DeserializeOwned + Send, -/// Return: Serialize + DeserializeOwned + Send, -/// { -/// /// Name, used to inform the server which action to execute -/// fn action_name() -> &'static str; -/// -/// /// Whether it's a local Action, used to inform the system if it only runs locally -/// fn is_remote_action() -> bool; -/// -/// /// Action processing logic -/// fn process( -/// context: ActionContext, -/// args: Args, -/// ) -> impl std::future::Future> + Send; -/// } -/// ``` -pub trait Action -where - Args: Serialize + DeserializeOwned + Send, - Return: Serialize + DeserializeOwned + Send, -{ - fn action_name() -> &'static str; - - fn is_remote_action() -> bool; - - fn process( - context: ActionContext, - args: Args, - ) -> impl std::future::Future> + Send; -} - -/// # Struct - ActionContext -/// -/// Used to inform the Action about the current execution environment -/// -/// ## Creation -/// -/// Create ActionContext using the following methods: -/// -/// ```ignore -/// -/// // The instance here is the connection instance passed from external sources for communicating with the server -/// // For specific usage, please refer to the `/crates/utils/tcp_connection` section -/// -/// fn init_local_action_ctx(instance: ConnectionInstance) { -/// // Create context and specify execution on local -/// let mut ctx = ActionContext::local(); -/// } -/// -/// fn init_remote_action_ctx(instance: ConnectionInstance) { -/// // Create context and specify execution on remote -/// let mut ctx = ActionContext::remote(); -/// } -#[derive(Default)] -pub struct ActionContext { - /// Whether the action is executed locally or remotely - proc_on_local: bool, - - /// Whether the action being executed in the current context is a remote action - is_remote_action: bool, - - /// The name of the action being executed - action_name: String, - - /// The JSON-serialized arguments for the action - action_args_json: String, - - /// The connection instance in the current context, - instance: Option>>, - - /// Generic data storage for arbitrary types - data: HashMap>, -} - -impl ActionContext { - /// Generate local context - pub fn local() -> Self { - ActionContext { - proc_on_local: true, - ..Default::default() - } - } - - /// Generate remote context - pub fn remote() -> Self { - ActionContext { - proc_on_local: false, - ..Default::default() - } - } - - /// Build connection instance from TcpStream - pub fn build_instance(mut self, stream: TcpStream) -> Self { - self.instance = Some(Arc::new(Mutex::new(ConnectionInstance::from(stream)))); - self - } - - /// Insert connection instance into context - pub fn insert_instance(mut self, instance: ConnectionInstance) -> Self { - self.instance = Some(Arc::new(Mutex::new(instance))); - self - } - - /// Pop connection instance from context - pub fn pop_instance(&mut self) -> Option>> { - self.instance.take() - } -} - -impl ActionContext { - /// Whether the action is executed locally - pub fn is_proc_on_local(&self) -> bool { - self.proc_on_local - } - - /// Whether the action is executed remotely - pub fn is_proc_on_remote(&self) -> bool { - !self.proc_on_local - } - - /// Whether the action being executed in the current context is a remote action - pub fn is_remote_action(&self) -> bool { - self.is_remote_action - } - - /// Set whether the action being executed in the current context is a remote action - pub fn set_is_remote_action(&mut self, is_remote_action: bool) { - self.is_remote_action = is_remote_action; - } - - /// Get the connection instance in the current context - pub fn instance(&self) -> &Option>> { - &self.instance - } - - /// Get a mutable reference to the connection instance in the current context - pub fn instance_mut(&mut self) -> &mut Option>> { - &mut self.instance - } - - /// Get the action name from the context - pub fn action_name(&self) -> &str { - &self.action_name - } - - /// Get the action arguments from the context - pub fn action_args_json(&self) -> &String { - &self.action_args_json - } - - /// Set the action name in the context - pub fn set_action_name(mut self, action_name: String) -> Self { - self.action_name = action_name; - self - } - - /// Set the action arguments in the context - pub fn set_action_args(mut self, action_args: String) -> Self { - self.action_args_json = action_args; - self - } - - /// Insert arbitrary data in the context - pub fn with_data(mut self, value: T) -> Self { - self.data.insert(TypeId::of::(), Arc::new(value)); - self - } - - /// Insert arbitrary data as Arc in the context - pub fn with_arc_data(mut self, value: Arc) -> Self { - self.data.insert(TypeId::of::(), value); - self - } - - /// Insert arbitrary data in the context - pub fn insert_data(&mut self, value: T) { - self.data.insert(TypeId::of::(), Arc::new(value)); - } - - /// Insert arbitrary data as Arc in the context - pub fn insert_arc_data(&mut self, value: Arc) { - self.data.insert(TypeId::of::(), value); - } - - /// Get arbitrary data from the context - pub fn get(&self) -> Option<&T> { - self.data - .get(&TypeId::of::()) - .and_then(|arc| arc.downcast_ref::()) - } - - /// Get arbitrary data as Arc from the context - pub fn get_arc(&self) -> Option> { - self.data - .get(&TypeId::of::()) - .and_then(|arc| Arc::clone(arc).downcast::().ok()) - } - - /// Remove and return arbitrary data from the context - pub fn remove(&mut self) -> Option> { - self.data - .remove(&TypeId::of::()) - .and_then(|arc| arc.downcast::().ok()) - } - - /// Check if the context contains data of a specific type - pub fn contains(&self) -> bool { - self.data.contains_key(&TypeId::of::()) - } - - /// Take ownership of the context and extract data of a specific type - pub fn take(mut self) -> (Self, Option>) { - let value = self - .data - .remove(&TypeId::of::()) - .and_then(|arc| arc.downcast::().ok()); - (self, value) - } -} diff --git a/systems/action/src/action_pool.rs b/systems/action/src/action_pool.rs deleted file mode 100644 index 019fa6d..0000000 --- a/systems/action/src/action_pool.rs +++ /dev/null @@ -1,247 +0,0 @@ -use std::pin::Pin; - -use serde::{Serialize, de::DeserializeOwned}; -use serde_json; -use tcp_connection::error::TcpTargetError; - -use crate::action::{Action, ActionContext}; - -type ProcBeginCallback = for<'a> fn( - &'a mut ActionContext, - args: &'a (dyn std::any::Any + Send + Sync), -) -> ProcBeginFuture<'a>; -type ProcEndCallback = fn() -> ProcEndFuture; - -type ProcBeginFuture<'a> = Pin> + Send + 'a>>; -type ProcEndFuture = Pin> + Send>>; - -/// # Struct - ActionPool -/// -/// This struct is used to register and record all accessible and executable actions -/// -/// It also registers `on_proc_begin` and `on_proc_end` callback functions -/// used for action initialization -/// -/// ## Creating and registering actions -/// ```ignore -/// fn init_action_pool() { -/// let mut pool = Action::new(); -/// -/// // Register action -/// pool.register(); -/// -/// // If the action is implemented with `#[action_gen]`, you can also do -/// register_your_action(&mut pool); -/// } -/// ``` -pub struct ActionPool { - /// HashMap storing action name to action implementation mapping - actions: std::collections::HashMap<&'static str, Box>, - - /// Callback to execute when process begins - on_proc_begin: Option, - - /// Callback to execute when process ends - on_proc_end: Option, -} - -impl Default for ActionPool { - fn default() -> Self { - Self::new() - } -} - -impl ActionPool { - /// Creates a new empty ActionPool - pub fn new() -> Self { - Self { - actions: std::collections::HashMap::new(), - on_proc_begin: None, - on_proc_end: None, - } - } - - /// Sets a callback to be executed when process begins - pub fn set_on_proc_begin(&mut self, callback: ProcBeginCallback) { - self.on_proc_begin = Some(callback); - } - - /// Sets a callback to be executed when process ends - pub fn set_on_proc_end(&mut self, callback: ProcEndCallback) { - self.on_proc_end = Some(callback); - } - - /// Registers an action type with the pool - /// - /// Usage: - /// ```ignore - /// action_pool.register::(); - /// ``` - pub fn register(&mut self) - where - A: Action + Send + Sync + 'static, - Args: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static, - Return: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static, - { - let action_name = A::action_name(); - self.actions.insert( - action_name, - Box::new(ActionWrapper::(std::marker::PhantomData)), - ); - } - - /// Processes an action by name with given context and arguments - /// - /// Usage: - /// ```ignore - /// let result = action_pool.process::("my_action", context, args).await?; - /// ``` - /// Processes an action by name with JSON-serialized arguments - /// - /// Usage: - /// ```ignore - /// let result_json = action_pool.process_json("my_action", context, args_json).await?; - /// let result: MyReturn = serde_json::from_str(&result_json)?; - /// ``` - pub async fn process_json<'a>( - &'a self, - action_name: &'a str, - context: ActionContext, - args_json: String, - ) -> Result { - if let Some(action) = self.actions.get(action_name) { - // Set action name and args in context for callbacks - let context = context.set_action_name(action_name.to_string()); - let mut context = context.set_action_args(args_json.clone()); - - self.exec_on_proc_begin(&mut context, &args_json).await?; - let result = action.process_json_erased(context, args_json).await?; - self.exec_on_proc_end().await?; - Ok(result) - } else { - Err(TcpTargetError::Unsupported("InvalidAction".to_string())) - } - } - - /// Processes an action by name with given context and arguments - /// - /// Usage: - /// ```ignore - /// let result = action_pool.process::("my_action", context, args).await?; - /// ``` - pub async fn process<'a, Args, Return>( - &'a self, - action_name: &'a str, - mut context: ActionContext, - args: Args, - ) -> Result - where - Args: serde::de::DeserializeOwned + Send + Sync + 'static, - Return: serde::Serialize + Send + 'static, - { - if let Some(action) = self.actions.get(action_name) { - self.exec_on_proc_begin(&mut context, &args).await?; - let result = action.process_erased(context, Box::new(args)).await?; - let result = *result - .downcast::() - .map_err(|_| TcpTargetError::Unsupported("InvalidArguments".to_string()))?; - self.exec_on_proc_end().await?; - Ok(result) - } else { - Err(TcpTargetError::Unsupported("InvalidAction".to_string())) - } - } - - /// Executes the process begin callback if set - async fn exec_on_proc_begin( - &self, - context: &mut ActionContext, - args: &(dyn std::any::Any + Send + Sync), - ) -> Result<(), TcpTargetError> { - if let Some(callback) = &self.on_proc_begin { - callback(context, args).await - } else { - Ok(()) - } - } - - /// Executes the process end callback if set - async fn exec_on_proc_end(&self) -> Result<(), TcpTargetError> { - if let Some(callback) = &self.on_proc_end { - callback().await - } else { - Ok(()) - } - } -} - -/// Trait for type-erased actions that can be stored in ActionPool -type ProcessErasedFuture = std::pin::Pin< - Box< - dyn std::future::Future, TcpTargetError>> - + Send, - >, ->; -type ProcessJsonErasedFuture = - std::pin::Pin> + Send>>; - -trait ActionErased: Send + Sync { - /// Processes the action with type-erased arguments and returns type-erased result - fn process_erased( - &self, - context: ActionContext, - args: Box, - ) -> ProcessErasedFuture; - - /// Processes the action with JSON-serialized arguments and returns JSON-serialized result - fn process_json_erased( - &self, - context: ActionContext, - args_json: String, - ) -> ProcessJsonErasedFuture; -} - -/// Wrapper struct that implements ActionErased for concrete Action types -struct ActionWrapper(std::marker::PhantomData<(A, Args, Return)>); - -impl ActionErased for ActionWrapper -where - A: Action + Send + Sync, - Args: Serialize + DeserializeOwned + Send + Sync + 'static, - Return: Serialize + DeserializeOwned + Send + Sync + 'static, -{ - fn process_erased( - &self, - context: ActionContext, - args: Box, - ) -> std::pin::Pin< - Box< - dyn std::future::Future, TcpTargetError>> - + Send, - >, - > { - Box::pin(async move { - let args = *args - .downcast::() - .map_err(|_| TcpTargetError::Unsupported("InvalidArguments".to_string()))?; - let result = A::process(context, args).await?; - Ok(Box::new(result) as Box) - }) - } - - fn process_json_erased( - &self, - context: ActionContext, - args_json: String, - ) -> std::pin::Pin> + Send>> - { - Box::pin(async move { - let args: Args = serde_json::from_str(&args_json) - .map_err(|e| TcpTargetError::Serialization(format!("Deserialize failed: {}", e)))?; - let result = A::process(context, args).await?; - let result_json = serde_json::to_string(&result) - .map_err(|e| TcpTargetError::Serialization(format!("Serialize failed: {}", e)))?; - Ok(result_json) - }) - } -} diff --git a/systems/action/src/lib.rs b/systems/action/src/lib.rs deleted file mode 100644 index 12ae999..0000000 --- a/systems/action/src/lib.rs +++ /dev/null @@ -1,6 +0,0 @@ -pub mod macros { - pub use action_system_macros::*; -} - -pub mod action; -pub mod action_pool; diff --git a/systems/storage/Cargo.toml b/systems/storage/Cargo.toml deleted file mode 100644 index 5afb780..0000000 --- a/systems/storage/Cargo.toml +++ /dev/null @@ -1,13 +0,0 @@ -[package] -name = "storage_system" -edition = "2024" -version.workspace = true - -[dependencies] -thiserror = "1.0.69" -tokio = { version = "1.48", features = ["full"] } - -blake3 = "1.5.0" -hex = "0.4.3" -memmap2 = "0.9.4" -log = "0.4.25" diff --git a/systems/storage/src/error.rs b/systems/storage/src/error.rs deleted file mode 100644 index 4b3ad7e..0000000 --- a/systems/storage/src/error.rs +++ /dev/null @@ -1,8 +0,0 @@ -#[derive(Debug, thiserror::Error)] -pub enum StorageIOError { - #[error("IO error: {0}")] - IOErr(#[from] std::io::Error), - - #[error("Hash too short")] - HashTooShort, -} diff --git a/systems/storage/src/lib.rs b/systems/storage/src/lib.rs deleted file mode 100644 index 4f89971..0000000 --- a/systems/storage/src/lib.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod error; -pub mod store; diff --git a/systems/storage/src/store.rs b/systems/storage/src/store.rs deleted file mode 100644 index 6492449..0000000 --- a/systems/storage/src/store.rs +++ /dev/null @@ -1,493 +0,0 @@ -use std::path::{Path, PathBuf}; -use std::time::Instant; - -use blake3; -use log::{info, trace}; -use memmap2::Mmap; -use tokio::fs; - -use crate::error::StorageIOError; - -pub mod cdc; -pub mod fixed; -pub mod line; - -#[derive(Default, Debug)] -pub struct StorageConfig { - /// Chunking policy for splitting files - pub chunking_policy: ChunkingPolicy, -} - -impl StorageConfig { - /// Create a new StorageConfig with CDC chunking policy - pub fn cdc(avg_size: u32) -> Self { - Self { - chunking_policy: ChunkingPolicy::Cdc(avg_size), - } - } - - /// Create a new StorageConfig with fixed-size chunking policy - pub fn fixed_size(chunk_size: u32) -> Self { - Self { - chunking_policy: ChunkingPolicy::FixedSize(chunk_size), - } - } - - /// Create a new StorageConfig with line-based chunking policy - pub fn line() -> Self { - Self { - chunking_policy: ChunkingPolicy::Line, - } - } -} - -#[derive(Debug)] -pub enum ChunkingPolicy { - /// Content-Defined Chunking using Rabin fingerprinting - /// The `u32` value represents the desired average chunk size in bytes - Cdc(u32), - - /// Fixed-size chunking - /// The `u32` value represents the exact size of each chunk in bytes - FixedSize(u32), - - /// Line-based chunking - /// Each line becomes a separate chunk - Line, -} - -impl Default for ChunkingPolicy { - fn default() -> Self { - ChunkingPolicy::Cdc(64 * 1024) // Default to 64KB CDC - } -} - -/// Represents a chunk of data with its hash and content -#[derive(Debug, Clone)] -pub struct Chunk { - /// Blake3 hash of the chunk content - pub hash: [u8; 32], - /// Raw chunk data - pub data: Vec, -} - -/// Represents an index entry pointing to a chunk -#[derive(Debug, Clone)] -pub struct IndexEntry { - /// Blake3 hash of the chunk - pub hash: [u8; 32], - /// Size of the chunk in bytes - pub size: u32, -} - -/// Result of chunking a file -#[derive(Debug)] -pub struct ChunkingResult { - /// List of chunks extracted from the file - pub chunks: Vec, - /// Total size of the original file - pub total_size: u64, -} - -/// Split a file into chunks and store them in the repository, then output the index file to the specified directory -pub async fn write_file( - file_to_write: impl Into, - storage_dir: impl Into, - output_index_file: impl Into, - cfg: &StorageConfig, -) -> Result<(), StorageIOError> { - let (file_to_write, storage_dir, output_index_file) = - precheck(file_to_write, storage_dir, output_index_file).await?; - - info!("Starting file write: {}", file_to_write.display()); - let start_time = Instant::now(); - - // Memory map the entire file - let file = std::fs::File::open(&file_to_write)?; - let mmap = unsafe { Mmap::map(&file)? }; - let data = &mmap[..]; - - // Split into chunks based on policy - let chunking_result = split_into_chunks(&data, &cfg.chunking_policy)?; - - // Store chunks and create index - let index_entries = store_chunks(&chunking_result.chunks, &storage_dir).await?; - - // Write index file - write_index_file(&index_entries, &output_index_file).await?; - - let duration = start_time.elapsed(); - info!( - "File write completed in {:?}: {}", - duration, - file_to_write.display() - ); - - Ok(()) -} - -/// Split data into chunks based on the specified policy -pub fn split_into_chunks( - data: &[u8], - policy: &ChunkingPolicy, -) -> Result { - match policy { - ChunkingPolicy::Cdc(avg_size) => split_cdc(data, *avg_size), - ChunkingPolicy::FixedSize(chunk_size) => split_fixed(data, *chunk_size), - ChunkingPolicy::Line => split_by_lines(data), - } -} - -/// Split data using Content-Defined Chunking -fn split_cdc(data: &[u8], avg_size: u32) -> Result { - use crate::store::cdc::split_cdc_impl; - split_cdc_impl(data, avg_size) -} - -/// Split data using fixed-size chunking -fn split_fixed(data: &[u8], chunk_size: u32) -> Result { - use crate::store::fixed::split_fixed_impl; - split_fixed_impl(data, chunk_size) -} - -/// Split data by lines -fn split_by_lines(data: &[u8]) -> Result { - use crate::store::line::split_by_lines_impl; - split_by_lines_impl(data) -} - -/// Store chunks in the storage directory and return index entries -async fn store_chunks( - chunks: &[Chunk], - storage_dir: &Path, -) -> Result, StorageIOError> { - let mut index_entries = Vec::with_capacity(chunks.len()); - - let writed_counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); - let total_chunks = chunks.len(); - - for chunk in chunks { - // Create storage directory structure based on hash - let hash_str = hex::encode(chunk.hash); - let chunk_dir = get_dir(storage_dir.to_path_buf(), hash_str.clone())?; - - // Create directory if it doesn't exist - if let Some(parent) = chunk_dir.parent() { - fs::create_dir_all(parent).await?; - } - - // Write chunk data - let chunk_path = chunk_dir.with_extension("chunk"); - if !chunk_path.exists() { - trace!("W: {}", hash_str); - fs::write(&chunk_path, &chunk.data).await?; - writed_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - } - - // Add to index - index_entries.push(IndexEntry { - hash: chunk.hash, - size: chunk.data.len() as u32, - }); - } - - let writed_count = writed_counter.load(std::sync::atomic::Ordering::Relaxed); - info!( - "Chunk storage completed: {}/{} ({}%) chunks written ({} duplicates skipped)", - writed_count, - total_chunks, - (writed_count as f32 / total_chunks as f32) * 100 as f32, - total_chunks - writed_count - ); - - Ok(index_entries) -} - -/// Write index file containing chunk hashes and sizes -async fn write_index_file( - entries: &[IndexEntry], - output_path: &Path, -) -> Result<(), StorageIOError> { - let mut index_data = Vec::with_capacity(entries.len() * 36); // 32 bytes hash + 4 bytes size - - for entry in entries { - index_data.extend_from_slice(&entry.hash); - index_data.extend_from_slice(&entry.size.to_le_bytes()); - } - - fs::write(output_path, &index_data).await?; - Ok(()) -} - -/// Build a file from the repository to the specified directory using an index file -pub async fn build_file( - index_to_build: impl Into, - storage_dir: impl Into, - output_file: impl Into, -) -> Result<(), StorageIOError> { - let (index_to_build, storage_dir, output_file) = - precheck(index_to_build, storage_dir, output_file).await?; - - info!( - "Starting file build from index: {}", - index_to_build.display() - ); - let start_time = Instant::now(); - - // Read index file - let index_entries = read_index_file(&index_to_build).await?; - - // Reconstruct file from chunks - let reconstructed_data = reconstruct_from_chunks(&index_entries, &storage_dir).await?; - - // Write output file - fs::write(&output_file, &reconstructed_data).await?; - - let duration = start_time.elapsed(); - info!( - "File build completed in {:?}: {} -> {}", - duration, - index_to_build.display(), - output_file.display() - ); - - Ok(()) -} - -/// Read index file and parse entries -async fn read_index_file(index_path: &Path) -> Result, StorageIOError> { - // Open file and memory map it - let file = std::fs::File::open(index_path)?; - let mmap = unsafe { Mmap::map(&file)? }; - - // Each entry is 36 bytes (32 bytes hash + 4 bytes size) - if mmap.len() % 36 != 0 { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "Invalid index file format", - ) - .into()); - } - - let num_entries = mmap.len() / 36; - let mut entries = Vec::with_capacity(num_entries); - - for i in 0..num_entries { - let start = i * 36; - let hash_start = start; - let size_start = start + 32; - - let mut hash = [0u8; 32]; - hash.copy_from_slice(&mmap[hash_start..hash_start + 32]); - - let size = u32::from_le_bytes([ - mmap[size_start], - mmap[size_start + 1], - mmap[size_start + 2], - mmap[size_start + 3], - ]); - - entries.push(IndexEntry { hash, size }); - } - - Ok(entries) -} - -/// Reconstruct file data from chunks using index entries -async fn reconstruct_from_chunks( - entries: &[IndexEntry], - storage_dir: &Path, -) -> Result, StorageIOError> { - let mut reconstructed_data = Vec::new(); - - for entry in entries { - // Get chunk path from hash - let hash_str = hex::encode(entry.hash); - let chunk_dir = get_dir(storage_dir.to_path_buf(), hash_str.clone())?; - let chunk_path = chunk_dir.with_extension("chunk"); - - trace!("R: {}", hash_str); - - // Memory map the chunk file - let file = std::fs::File::open(&chunk_path)?; - let mmap = unsafe { Mmap::map(&file)? }; - - // Verify chunk size matches index - if mmap.len() != entry.size as usize { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!( - "Chunk size mismatch: expected {}, got {}", - entry.size, - mmap.len() - ), - ) - .into()); - } - - // Verify chunk hash - let actual_hash = blake3_digest(&mmap); - let expected_hash = entry.hash; - if actual_hash != expected_hash { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!( - "Chunk hash mismatch: expected {}, got {}", - hex::encode(expected_hash), - hex::encode(actual_hash) - ), - ) - .into()); - } - - // Append to reconstructed data - reconstructed_data.extend_from_slice(&mmap); - } - - Ok(reconstructed_data) -} - -/// Calculate Blake3 hash of data -fn blake3_digest(data: &[u8]) -> [u8; 32] { - let hash = blake3::hash(data); - *hash.as_bytes() -} - -/// Calculate and return the corresponding storage subdirectory path based on the given storage directory and hash string -pub fn get_dir(storage_dir: PathBuf, hash_str: String) -> Result { - if hash_str.len() < 4 { - return Err(StorageIOError::HashTooShort); - } - let dir = storage_dir - .join(&hash_str[0..2]) - .join(&hash_str[2..4]) - .join(&hash_str); - Ok(dir) -} - -/// Calculate Blake3 hash of data and create a Chunk -pub fn create_chunk(data: Vec) -> Chunk { - let hash = blake3_digest(&data); - Chunk { hash, data } -} - -/// Read a file and split it into chunks using the specified policy -pub async fn chunk_file( - file_path: impl Into, - policy: &ChunkingPolicy, -) -> Result { - let file_path = file_path.into(); - info!("Starting chunking: {}", file_path.display()); - let start_time = Instant::now(); - - // Memory map the file - let file = std::fs::File::open(&file_path)?; - let mmap = unsafe { Mmap::map(&file)? }; - let data = &mmap[..]; - - let result = split_into_chunks(data, policy); - - let duration = start_time.elapsed(); - info!( - "Chunking completed in {:?}: {}", - duration, - file_path.display() - ); - - result -} - -/// Get chunk statistics -pub fn get_chunk_stats(chunks: &[Chunk]) -> ChunkStats { - let total_chunks = chunks.len(); - let total_size: usize = chunks.iter().map(|c| c.data.len()).sum(); - let avg_size = if total_chunks > 0 { - total_size / total_chunks - } else { - 0 - }; - let min_size = chunks.iter().map(|c| c.data.len()).min().unwrap_or(0); - let max_size = chunks.iter().map(|c| c.data.len()).max().unwrap_or(0); - - ChunkStats { - total_chunks, - total_size: total_size as u64, - avg_size, - min_size, - max_size, - } -} - -/// Statistics about chunks -#[derive(Debug, Clone)] -pub struct ChunkStats { - pub total_chunks: usize, - pub total_size: u64, - pub avg_size: usize, - pub min_size: usize, - pub max_size: usize, -} - -/// Pre-check whether the input file path, directory path, and output path are valid -pub async fn precheck( - input_file: impl Into, - dir: impl Into, - output_file: impl Into, -) -> Result<(PathBuf, PathBuf, PathBuf), std::io::Error> { - let (input, dir, output) = (input_file.into(), dir.into(), output_file.into()); - - // Perform all checks in parallel - let (input_metadata_result, dir_metadata_result, output_metadata_result) = tokio::join!( - fs::metadata(&input), - fs::metadata(&dir), - fs::metadata(&output) - ); - - // Check if input file exists - let input_metadata = match input_metadata_result { - Ok(metadata) => metadata, - Err(_) => { - return Err(std::io::Error::new( - std::io::ErrorKind::NotFound, - format!("Input file not found: {}", input.display()), - )); - } - }; - - // Check if directory exists - let dir_metadata = match dir_metadata_result { - Ok(metadata) => metadata, - Err(_) => { - return Err(std::io::Error::new( - std::io::ErrorKind::NotFound, - format!("Directory not found: {}", dir.display()), - )); - } - }; - - // Check if output file already exists - if output_metadata_result.is_ok() { - return Err(std::io::Error::new( - std::io::ErrorKind::AlreadyExists, - format!("Output file already exist: {}", output.display()), - )); - } - - // Check if input is a file - if !input_metadata.is_file() { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - format!("Input path is not a file: {}", input.display()), - )); - } - - // Check if dir is a directory - if !dir_metadata.is_dir() { - return Err(std::io::Error::new( - std::io::ErrorKind::NotADirectory, - format!("Input path is not a directory: {}", dir.display()), - )); - } - - Ok((input, dir, output)) -} diff --git a/systems/storage/src/store/cdc.rs b/systems/storage/src/store/cdc.rs deleted file mode 100644 index cc16202..0000000 --- a/systems/storage/src/store/cdc.rs +++ /dev/null @@ -1,307 +0,0 @@ -use std::path::PathBuf; - -use crate::{error::StorageIOError, store::ChunkingResult}; - -/// Rabin fingerprint parameters for CDC -const WINDOW_SIZE: usize = 48; -const MIN_CHUNK_RATIO: f64 = 0.75; -const MAX_CHUNK_RATIO: f64 = 2.0; -const BASE_TARGET_MASK: u64 = 0xFFFF; - -/// Rabin fingerprint polynomial (irreducible polynomial) -const POLYNOMIAL: u64 = 0x3DA3358B4DC173; - -/// Precomputed table for Rabin fingerprint sliding window -static FINGERPRINT_TABLE: [u64; 256] = { - let mut table = [0u64; 256]; - let mut i = 0; - while i < 256 { - let mut fingerprint = i as u64; - let mut j = 0; - while j < 8 { - if fingerprint & 1 != 0 { - fingerprint = (fingerprint >> 1) ^ POLYNOMIAL; - } else { - fingerprint >>= 1; - } - j += 1; - } - table[i] = fingerprint; - i += 1; - } - table -}; - -/// Calculate Rabin fingerprint for a byte -#[inline] -fn rabin_fingerprint_byte(old_fingerprint: u64, old_byte: u8, new_byte: u8) -> u64 { - let old_byte_index = old_byte as usize; - - let fingerprint = - (old_fingerprint << 8) ^ (new_byte as u64) ^ FINGERPRINT_TABLE[old_byte_index]; - fingerprint -} - -/// Split data using Content-Defined Chunking with Rabin fingerprinting -pub fn split_cdc_impl(data: &[u8], avg_size: u32) -> Result { - let avg_size = avg_size as usize; - - if avg_size == 0 { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "Average chunk size must be greater than 0", - ) - .into()); - } - - // Calculate min and max chunk sizes based on average size - let min_chunk_size = (avg_size as f64 * MIN_CHUNK_RATIO) as usize; - let max_chunk_size = (avg_size as f64 * MAX_CHUNK_RATIO) as usize; - - // Ensure reasonable bounds - let min_chunk_size = min_chunk_size.max(512); // At least 512 bytes - let max_chunk_size = max_chunk_size.max(avg_size * 2); - - let target_mask = { - let scale_factor = avg_size as f64 / (64.0 * 1024.0); - let mask_value = (BASE_TARGET_MASK as f64 * scale_factor) as u64; - mask_value.max(0xC000) - }; - - let mut chunks = Vec::new(); - let mut start = 0; - let data_len = data.len(); - - while start < data_len { - let chunk_start = start; - - let search_end = (start + max_chunk_size).min(data_len); - let mut boundary_found = false; - let mut boundary_pos = search_end; - - let mut window = Vec::with_capacity(WINDOW_SIZE); - let mut fingerprint: u64 = 0; - - for i in start..search_end { - let byte = data[i]; - - // Update sliding window - if window.len() >= WINDOW_SIZE { - let old_byte = window.remove(0); - fingerprint = rabin_fingerprint_byte(fingerprint, old_byte, byte); - } else { - fingerprint = (fingerprint << 8) ^ (byte as u64); - } - window.push(byte); - - if i - chunk_start >= min_chunk_size { - if (fingerprint & target_mask) == 0 { - boundary_found = true; - boundary_pos = i + 1; - break; - } - } - } - - let chunk_end = if boundary_found { - boundary_pos - } else { - search_end - }; - - let chunk_data = data[chunk_start..chunk_end].to_vec(); - let chunk = crate::store::create_chunk(chunk_data); - chunks.push(chunk); - - start = chunk_end; - } - - if chunks.len() > 1 { - let mut merged_chunks = Vec::new(); - let mut i = 0; - - while i < chunks.len() { - let current_chunk = &chunks[i]; - - if i < chunks.len() - 1 { - let next_chunk = &chunks[i + 1]; - if current_chunk.data.len() < min_chunk_size - && current_chunk.data.len() + next_chunk.data.len() <= max_chunk_size - { - // Merge chunks - let mut merged_data = current_chunk.data.clone(); - merged_data.extend_from_slice(&next_chunk.data); - let merged_chunk = crate::store::create_chunk(merged_data); - merged_chunks.push(merged_chunk); - i += 2; - } else { - merged_chunks.push(current_chunk.clone()); - i += 1; - } - } else { - merged_chunks.push(current_chunk.clone()); - i += 1; - } - } - - chunks = merged_chunks; - } - - Ok(ChunkingResult { - chunks, - total_size: data_len as u64, - }) -} - -/// Split file using CDC with specified average chunk size -pub async fn write_file_cdc>( - file_to_write: I, - storage_dir: I, - output_index_file: I, - avg_size: u32, -) -> Result<(), StorageIOError> { - use crate::store::{StorageConfig, write_file}; - - let config = StorageConfig::cdc(avg_size); - write_file(file_to_write, storage_dir, output_index_file, &config).await -} - -/// Test function for CDC chunking -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_cdc_basic() { - let pattern: Vec = (0..1024).map(|i| (i % 256) as u8).collect(); - let mut data = Vec::new(); - for _ in 0..10 { - data.extend_from_slice(&pattern); - } - - let result = split_cdc_impl(&data, 4096).unwrap(); - - // Should have at least one chunk - assert!(!result.chunks.is_empty()); - - assert!( - result.chunks.len() >= 1 && result.chunks.len() <= 10, - "Expected 1-10 chunks for 10KB data with 4KB average, got {}", - result.chunks.len() - ); - - // Verify total size - let total_chunk_size: usize = result.chunks.iter().map(|c| c.data.len()).sum(); - assert_eq!(total_chunk_size, data.len()); - - // Verify chunk size bounds (more relaxed for CDC) - let max_size = (4096.0 * MAX_CHUNK_RATIO) as usize; - - for chunk in &result.chunks { - let chunk_size = chunk.data.len(); - // CDC can produce chunks smaller than min_size due to content patterns - // But they should not exceed max_size - assert!( - chunk_size <= max_size, - "Chunk size {} exceeds maximum {}", - chunk_size, - max_size - ); - } - } - - #[test] - fn test_cdc_small_file() { - // Small file should result in single chunk - let data = vec![1, 2, 3, 4, 5]; - - let result = split_cdc_impl(&data, 4096).unwrap(); - - // Should have exactly one chunk - assert_eq!(result.chunks.len(), 1); - assert_eq!(result.chunks[0].data.len(), data.len()); - } - - #[test] - fn test_cdc_large_avg_size() { - // Test with larger average size (256KB) - let mut data = Vec::new(); - for i in 0..(256 * 1024 * 3) { - // 768KB of data - data.push((i % 256) as u8); - } - - let result = split_cdc_impl(&data, 256 * 1024).unwrap(); - - // With 768KB data and 256KB average, should have reasonable number of chunks - // CDC is content-defined, so exact count varies - assert!( - result.chunks.len() >= 1 && result.chunks.len() <= 10, - "Expected 1-10 chunks for 768KB data with 256KB average, got {}", - result.chunks.len() - ); - - // Verify total size - let total_chunk_size: usize = result.chunks.iter().map(|c| c.data.len()).sum(); - assert_eq!(total_chunk_size, data.len()); - - // Verify chunk size bounds - let max_size = ((256 * 1024) as f64 * MAX_CHUNK_RATIO) as usize; - - for chunk in &result.chunks { - // CDC can produce chunks smaller than min_size - // But they should not exceed max_size - assert!( - chunk.data.len() <= max_size, - "Chunk size {} exceeds maximum {}", - chunk.data.len(), - max_size - ); - } - } - - #[test] - fn test_cdc_zero_avg_size() { - let data = vec![1, 2, 3]; - - let result = split_cdc_impl(&data, 0); - assert!(result.is_err()); - - match result { - Err(StorageIOError::IOErr(e)) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput); - } - _ => panic!("Expected IOErr with InvalidInput"), - } - } - - #[test] - fn test_rabin_fingerprint() { - // Test that rabin_fingerprint_byte function is deterministic - let test_bytes = [0x01, 0x02, 0x03, 0x04]; - - // Calculate fingerprint with specific sequence - let mut fingerprint1 = 0; - for &byte in &test_bytes { - fingerprint1 = rabin_fingerprint_byte(fingerprint1, 0xAA, byte); - } - - // Calculate again with same inputs - let mut fingerprint2 = 0; - for &byte in &test_bytes { - fingerprint2 = rabin_fingerprint_byte(fingerprint2, 0xAA, byte); - } - - // Same input should produce same output - assert_eq!(fingerprint1, fingerprint2); - - // Test with different old_byte produces different result - let mut fingerprint3 = 0; - for &byte in &test_bytes { - fingerprint3 = rabin_fingerprint_byte(fingerprint3, 0xBB, byte); - } - - // Different old_byte should produce different fingerprint - assert_ne!(fingerprint1, fingerprint3); - } -} diff --git a/systems/storage/src/store/fixed.rs b/systems/storage/src/store/fixed.rs deleted file mode 100644 index 044cc1c..0000000 --- a/systems/storage/src/store/fixed.rs +++ /dev/null @@ -1,417 +0,0 @@ -use std::path::PathBuf; -use std::time::Instant; - -use log::{info, trace}; -use memmap2::Mmap; -use tokio::fs; -use tokio::task; - -use crate::{ - error::StorageIOError, - store::{ChunkingResult, IndexEntry, StorageConfig, create_chunk, get_dir, precheck}, -}; - -/// Split data using fixed-size chunking -pub fn split_fixed_impl(data: &[u8], chunk_size: u32) -> Result { - let chunk_size = chunk_size as usize; - - if chunk_size == 0 { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "Chunk size must be greater than 0", - ) - .into()); - } - - let mut chunks = Vec::new(); - let mut start = 0; - let total_size = data.len(); - - while start < total_size { - let end = (start + chunk_size).min(total_size); - let chunk_data = data[start..end].to_vec(); - - let chunk = crate::store::create_chunk(chunk_data); - chunks.push(chunk); - - start = end; - } - - Ok(ChunkingResult { - chunks, - total_size: total_size as u64, - }) -} - -/// Split file using fixed-size chunking -pub async fn write_file_fixed>( - file_to_write: I, - storage_dir: I, - output_index_file: I, - fixed_size: u32, -) -> Result<(), StorageIOError> { - let config = StorageConfig::fixed_size(fixed_size); - write_file_parallel(file_to_write, storage_dir, output_index_file, &config).await -} - -/// Split file using fixed-size chunking with parallel processing -async fn write_file_parallel( - file_to_write: impl Into, - storage_dir: impl Into, - output_index_file: impl Into, - cfg: &StorageConfig, -) -> Result<(), StorageIOError> { - let (file_to_write, storage_dir, output_index_file) = - precheck(file_to_write, storage_dir, output_index_file).await?; - - info!("Starting file write: {}", file_to_write.display()); - let start_time = Instant::now(); - - // Memory map the entire file - let file = std::fs::File::open(&file_to_write)?; - let mmap = unsafe { Mmap::map(&file)? }; - let data = &mmap[..]; - - // Split into chunks based on policy - let chunking_result = split_into_chunks_parallel(&data, &cfg.chunking_policy).await?; - - // Store chunks in parallel and create index - let index_entries = store_chunks_parallel(&chunking_result.chunks, &storage_dir).await?; - - // Write index file - write_index_file(&index_entries, &output_index_file).await?; - - let duration = start_time.elapsed(); - info!( - "File write completed in {:?}: {}", - duration, - file_to_write.display() - ); - - Ok(()) -} - -/// Split data into chunks based on the specified policy with parallel processing -async fn split_into_chunks_parallel( - data: &[u8], - policy: &crate::store::ChunkingPolicy, -) -> Result { - match policy { - crate::store::ChunkingPolicy::FixedSize(chunk_size) => { - split_fixed_parallel(data, *chunk_size).await - } - _ => split_fixed_impl(data, 64 * 1024), // Fallback for non-fixed chunking - } -} - -/// Split data using fixed-size chunking with parallel processing -async fn split_fixed_parallel( - data: &[u8], - chunk_size: u32, -) -> Result { - let chunk_size = chunk_size as usize; - - if chunk_size == 0 { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "Chunk size must be greater than 0", - ) - .into()); - } - - let total_size = data.len(); - let num_chunks = (total_size + chunk_size - 1) / chunk_size; // Ceiling division - - // Create a vector to hold chunk boundaries - let mut chunk_boundaries = Vec::with_capacity(num_chunks); - let mut start = 0; - - while start < total_size { - let end = (start + chunk_size).min(total_size); - chunk_boundaries.push((start, end)); - start = end; - } - - // Process chunks in parallel using Tokio tasks - let chunks: Vec = if chunk_boundaries.len() > 1 { - // Use parallel processing for multiple chunks - let mut tasks = Vec::with_capacity(chunk_boundaries.len()); - - for (start, end) in chunk_boundaries { - let chunk_data = data[start..end].to_vec(); - - // Spawn a blocking task for each chunk - tasks.push(task::spawn_blocking(move || create_chunk(chunk_data))); - } - - // Wait for all tasks to complete - let mut chunks = Vec::with_capacity(tasks.len()); - for task in tasks { - match task.await { - Ok(chunk) => chunks.push(chunk), - Err(e) => { - return Err(std::io::Error::new( - std::io::ErrorKind::Other, - format!("Task join error: {}", e), - ) - .into()); - } - } - } - - chunks - } else { - // Single chunk, no need for parallel processing - chunk_boundaries - .into_iter() - .map(|(start, end)| { - let chunk_data = data[start..end].to_vec(); - create_chunk(chunk_data) - }) - .collect() - }; - - Ok(ChunkingResult { - chunks, - total_size: total_size as u64, - }) -} - -/// Store chunks in the storage directory in parallel and return index entries -async fn store_chunks_parallel( - chunks: &[crate::store::Chunk], - storage_dir: &std::path::Path, -) -> Result, StorageIOError> { - let mut tasks = Vec::with_capacity(chunks.len()); - - let writed_counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); - let total_chunks = chunks.len(); - - for chunk in chunks { - let chunk = chunk.clone(); - let storage_dir = storage_dir.to_path_buf(); - let writed_counter = writed_counter.clone(); - - // Spawn async task for each chunk storage operation - tasks.push(task::spawn(async move { - // Create storage directory structure based on hash - let hash_str = hex::encode(chunk.hash); - let chunk_dir = get_dir(storage_dir, hash_str.clone())?; - - // Create directory if it doesn't exist - if let Some(parent) = chunk_dir.parent() { - fs::create_dir_all(parent).await?; - } - - // Write chunk data - let chunk_path = chunk_dir.with_extension("chunk"); - if !chunk_path.exists() { - trace!("W: {}", hash_str); - fs::write(&chunk_path, &chunk.data).await?; - writed_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - } - - Ok::(IndexEntry { - hash: chunk.hash, - size: chunk.data.len() as u32, - }) - })); - } - - let writed_count = writed_counter.load(std::sync::atomic::Ordering::Relaxed); - info!( - "Chunk storage completed: {}/{} ({}%) chunks written ({} duplicates skipped)", - writed_count, - total_chunks, - (writed_count as f32 / total_chunks as f32) * 100 as f32, - total_chunks - writed_count - ); - - // Wait for all tasks to complete - let mut index_entries = Vec::with_capacity(chunks.len()); - for task in tasks { - let entry = task.await.map_err(|e| { - std::io::Error::new(std::io::ErrorKind::Other, format!("Task join error: {}", e)) - })??; - index_entries.push(entry); - } - - Ok(index_entries) -} - -/// Write index file containing chunk hashes and sizes -async fn write_index_file( - entries: &[IndexEntry], - output_path: &std::path::Path, -) -> Result<(), StorageIOError> { - let mut index_data = Vec::with_capacity(entries.len() * 36); // 32 bytes hash + 4 bytes size - - for entry in entries { - index_data.extend_from_slice(&entry.hash); - index_data.extend_from_slice(&entry.size.to_le_bytes()); - } - - fs::write(output_path, &index_data).await?; - Ok(()) -} - -/// Utility function to calculate optimal fixed chunk size based on file size -pub fn calculate_optimal_chunk_size(file_size: u64, target_chunks: usize) -> u32 { - if target_chunks == 0 || file_size == 0 { - return 64 * 1024; // Default 64KB - } - - let chunk_size = (file_size as f64 / target_chunks as f64).ceil() as u32; - - // Round to nearest power of 2 for better performance - let rounded_size = if chunk_size <= 1024 { - // Small chunks: use exact size - chunk_size - } else { - // Larger chunks: round to nearest power of 2 - let mut size = chunk_size; - size -= 1; - size |= size >> 1; - size |= size >> 2; - size |= size >> 4; - size |= size >> 8; - size |= size >> 16; - size += 1; - size - }; - - // Ensure minimum and maximum bounds - rounded_size.max(1024).min(16 * 1024 * 1024) // 1KB min, 16MB max -} - -/// Split file with automatic chunk size calculation -pub async fn write_file_fixed_auto, J: Into, K: Into>( - file_to_write: I, - storage_dir: J, - output_index_file: K, - target_chunks: usize, -) -> Result<(), StorageIOError> { - let file_path = file_to_write.into(); - let storage_dir = storage_dir.into(); - let output_index_file = output_index_file.into(); - - let file_size = fs::metadata(&file_path).await?.len(); - - let chunk_size = calculate_optimal_chunk_size(file_size, target_chunks); - write_file_fixed(file_path, storage_dir, output_index_file, chunk_size).await -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_fixed_chunking_basic() { - // Create 10KB of test data - let data: Vec = (0..10240).map(|i| (i % 256) as u8).collect(); - - // Split into 1KB chunks - let result = split_fixed_impl(&data, 1024).unwrap(); - - // Should have 10 chunks - assert_eq!(result.chunks.len(), 10); - - // Verify total size - let total_chunk_size: usize = result.chunks.iter().map(|c| c.data.len()).sum(); - assert_eq!(total_chunk_size, data.len()); - - // Verify chunk sizes (last chunk may be smaller) - for (i, chunk) in result.chunks.iter().enumerate() { - if i < 9 { - assert_eq!(chunk.data.len(), 1024); - } else { - assert_eq!(chunk.data.len(), 1024); // 10240 / 1024 = 10 exactly - } - } - } - - #[test] - fn test_fixed_chunking_uneven() { - // Create 5.5KB of test data - let data: Vec = (0..5632).map(|i| (i % 256) as u8).collect(); - - // Split into 2KB chunks - let result = split_fixed_impl(&data, 2048).unwrap(); - - // Should have 3 chunks (2048 + 2048 + 1536) - assert_eq!(result.chunks.len(), 3); - - // Verify chunk sizes - assert_eq!(result.chunks[0].data.len(), 2048); - assert_eq!(result.chunks[1].data.len(), 2048); - assert_eq!(result.chunks[2].data.len(), 1536); - - // Verify data integrity - let mut reconstructed = Vec::new(); - for chunk in &result.chunks { - reconstructed.extend_from_slice(&chunk.data); - } - assert_eq!(reconstructed, data); - } - - #[test] - fn test_fixed_chunking_small_file() { - // Small file smaller than chunk size - let data = vec![1, 2, 3, 4, 5]; - - let result = split_fixed_impl(&data, 1024).unwrap(); - - // Should have exactly one chunk - assert_eq!(result.chunks.len(), 1); - assert_eq!(result.chunks[0].data.len(), data.len()); - } - - #[test] - fn test_fixed_chunking_zero_size() { - let data = vec![1, 2, 3]; - - let result = split_fixed_impl(&data, 0); - assert!(result.is_err()); - - match result { - Err(StorageIOError::IOErr(e)) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput); - } - _ => panic!("Expected IOErr with InvalidInput"), - } - } - - #[test] - fn test_calculate_optimal_chunk_size() { - // Test basic calculation - assert_eq!(calculate_optimal_chunk_size(1024 * 1024, 16), 64 * 1024); // 1MB / 16 = 64KB - - // Test rounding to power of 2 - assert_eq!(calculate_optimal_chunk_size(1000 * 1000, 17), 64 * 1024); // ~58.8KB rounds to 64KB - - // Test minimum bound - assert_eq!(calculate_optimal_chunk_size(100, 10), 1024); // 10 bytes per chunk, but min is 1KB - - // Test edge cases - assert_eq!(calculate_optimal_chunk_size(0, 10), 64 * 1024); // Default - assert_eq!(calculate_optimal_chunk_size(1000, 0), 64 * 1024); // Default - - // Test large file - assert_eq!( - calculate_optimal_chunk_size(100 * 1024 * 1024, 10), - 16 * 1024 * 1024 - ); // 100MB / 10 = 10MB, max is 16MB - } - - #[test] - fn test_chunk_hash_uniqueness() { - // Test that different data produces different hashes - let data1 = vec![1, 2, 3, 4, 5]; - let data2 = vec![1, 2, 3, 4, 6]; - - let result1 = split_fixed_impl(&data1, 1024).unwrap(); - let result2 = split_fixed_impl(&data2, 1024).unwrap(); - - assert_ne!(result1.chunks[0].hash, result2.chunks[0].hash); - } -} diff --git a/systems/storage/src/store/line.rs b/systems/storage/src/store/line.rs deleted file mode 100644 index 971018b..0000000 --- a/systems/storage/src/store/line.rs +++ /dev/null @@ -1,393 +0,0 @@ -use std::path::PathBuf; - -use crate::{error::StorageIOError, store::ChunkingResult}; - -/// Split data by lines (newline characters) -pub fn split_by_lines_impl(data: &[u8]) -> Result { - let mut chunks = Vec::new(); - let mut start = 0; - let total_size = data.len(); - - // Iterate through data to find line boundaries - let mut i = 0; - while i < data.len() { - if data[i] == b'\n' { - // Unix line ending - let line_end = i + 1; // Include \n - // Extract line data (include newline character) - let line_data = data[start..line_end].to_vec(); - - // Create chunk for this line - let chunk = crate::store::create_chunk(line_data); - chunks.push(chunk); - - // Move start to next line - start = line_end; - i = line_end; - } else if data[i] == b'\r' && i + 1 < data.len() && data[i + 1] == b'\n' { - // Windows line ending - let line_end = i + 2; // Include both \r and \n - // Extract line data (include newline characters) - let line_data = data[start..line_end].to_vec(); - - // Create chunk for this line - let chunk = crate::store::create_chunk(line_data); - chunks.push(chunk); - - // Move start to next line - start = line_end; - i = line_end; - } else { - i += 1; - } - } - - // Handle remaining data (last line without newline) - if start < total_size { - let line_data = data[start..].to_vec(); - let chunk = crate::store::create_chunk(line_data); - chunks.push(chunk); - } - - // Handle empty file (no lines) - if chunks.is_empty() && total_size == 0 { - let chunk = crate::store::create_chunk(Vec::new()); - chunks.push(chunk); - } - - Ok(ChunkingResult { - chunks, - total_size: total_size as u64, - }) -} - -/// Split file by lines -pub async fn write_file_line>( - file_to_write: I, - storage_dir: I, - output_index_file: I, -) -> Result<(), StorageIOError> { - use crate::store::{StorageConfig, write_file}; - - let config = StorageConfig::line(); - write_file(file_to_write, storage_dir, output_index_file, &config).await -} - -/// Utility function to split data by lines with custom line ending detection -pub fn split_by_lines_custom(data: &[u8]) -> Result { - let mut chunks = Vec::new(); - let mut start = 0; - let total_size = data.len(); - - let mut i = 0; - while i < total_size { - if E::is_line_ending(data, i) { - let line_end = i + E::ending_length(data, i); - let line_data = data[start..line_end].to_vec(); - - let chunk = crate::store::create_chunk(line_data); - chunks.push(chunk); - - start = line_end; - i = line_end; - } else { - i += 1; - } - } - - // Handle remaining data - if start < total_size { - let line_data = data[start..].to_vec(); - let chunk = crate::store::create_chunk(line_data); - chunks.push(chunk); - } - - // Handle empty file - if chunks.is_empty() && total_size == 0 { - let chunk = crate::store::create_chunk(Vec::new()); - chunks.push(chunk); - } - - Ok(ChunkingResult { - chunks, - total_size: total_size as u64, - }) -} - -/// Trait for different line ending types -pub trait LineEnding { - /// Check if position i is the start of a line ending - fn is_line_ending(data: &[u8], i: usize) -> bool; - - /// Get the length of the line ending at position i - fn ending_length(data: &[u8], i: usize) -> usize; -} - -/// Unix line endings (\n) -pub struct UnixLineEnding; - -impl LineEnding for UnixLineEnding { - fn is_line_ending(data: &[u8], i: usize) -> bool { - i < data.len() && data[i] == b'\n' - } - - fn ending_length(_data: &[u8], _i: usize) -> usize { - 1 - } -} - -/// Windows line endings (\r\n) -pub struct WindowsLineEnding; - -impl LineEnding for WindowsLineEnding { - fn is_line_ending(data: &[u8], i: usize) -> bool { - i + 1 < data.len() && data[i] == b'\r' && data[i + 1] == b'\n' - } - - fn ending_length(_data: &[u8], _i: usize) -> usize { - 2 - } -} - -/// Mixed line endings (detects both Unix and Windows) -pub struct MixedLineEnding; - -impl LineEnding for MixedLineEnding { - fn is_line_ending(data: &[u8], i: usize) -> bool { - if i < data.len() && data[i] == b'\n' { - true - } else if i + 1 < data.len() && data[i] == b'\r' && data[i + 1] == b'\n' { - true - } else { - false - } - } - - fn ending_length(data: &[u8], i: usize) -> usize { - if i < data.len() && data[i] == b'\n' { - 1 - } else if i + 1 < data.len() && data[i] == b'\r' && data[i + 1] == b'\n' { - 2 - } else { - 1 // Default to 1 if somehow called incorrectly - } - } -} - -/// Detect line ending type from data -pub fn detect_line_ending(data: &[u8]) -> LineEndingType { - let mut unix_count = 0; - let mut windows_count = 0; - - let mut i = 0; - while i < data.len() { - if data[i] == b'\n' { - unix_count += 1; - i += 1; - } else if i + 1 < data.len() && data[i] == b'\r' && data[i + 1] == b'\n' { - windows_count += 1; - i += 2; - } else { - i += 1; - } - } - - if unix_count > windows_count { - LineEndingType::Unix - } else if windows_count > unix_count { - LineEndingType::Windows - } else { - LineEndingType::Mixed - } -} - -/// Line ending type enum -#[derive(Debug, Clone, Copy, PartialEq)] -pub enum LineEndingType { - Unix, - Windows, - Mixed, -} - -impl LineEndingType { - /// Split data using the detected line ending type - pub fn split_by_lines(&self, data: &[u8]) -> Result { - match self { - LineEndingType::Unix => split_by_lines_custom::(data), - LineEndingType::Windows => split_by_lines_custom::(data), - LineEndingType::Mixed => split_by_lines_custom::(data), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_line_chunking_unix() { - let data = b"Hello\nWorld\nTest\n"; - - let result = split_by_lines_impl(data).unwrap(); - - // Should have 3 chunks - assert_eq!(result.chunks.len(), 3); - - // Verify chunk contents - assert_eq!(result.chunks[0].data, b"Hello\n"); - assert_eq!(result.chunks[1].data, b"World\n"); - assert_eq!(result.chunks[2].data, b"Test\n"); - - // Verify total size - let total_chunk_size: usize = result.chunks.iter().map(|c| c.data.len()).sum(); - assert_eq!(total_chunk_size, data.len()); - } - - #[test] - fn test_line_chunking_windows() { - let data = b"Hello\r\nWorld\r\nTest\r\n"; - - let result = split_by_lines_impl(data).unwrap(); - - // Should have 3 chunks - assert_eq!(result.chunks.len(), 3); - - // Verify chunk contents (should include \r\n) - assert_eq!(result.chunks[0].data, b"Hello\r\n"); - assert_eq!(result.chunks[1].data, b"World\r\n"); - assert_eq!(result.chunks[2].data, b"Test\r\n"); - } - - #[test] - fn test_line_chunking_mixed() { - let data = b"Hello\nWorld\r\nTest\n"; - - let result = split_by_lines_impl(data).unwrap(); - - // Should have 3 chunks - assert_eq!(result.chunks.len(), 3); - - // Verify chunk contents - assert_eq!(result.chunks[0].data, b"Hello\n"); - assert_eq!(result.chunks[1].data, b"World\r\n"); - assert_eq!(result.chunks[2].data, b"Test\n"); - } - - #[test] - fn test_line_chunking_no_trailing_newline() { - let data = b"Hello\nWorld\nTest"; - - let result = split_by_lines_impl(data).unwrap(); - - // Should have 3 chunks - assert_eq!(result.chunks.len(), 3); - - // Verify chunk contents - assert_eq!(result.chunks[0].data, b"Hello\n"); - assert_eq!(result.chunks[1].data, b"World\n"); - assert_eq!(result.chunks[2].data, b"Test"); - } - - #[test] - fn test_line_chunking_empty_lines() { - let data = b"Hello\n\nWorld\n\n\n"; - - let result = split_by_lines_impl(data).unwrap(); - - // Should have 5 chunks (including empty lines) - // "Hello\n", "\n", "World\n", "\n", "\n" - assert_eq!(result.chunks.len(), 5); - - // Verify chunk contents - assert_eq!(result.chunks[0].data, b"Hello\n"); - assert_eq!(result.chunks[1].data, b"\n"); - assert_eq!(result.chunks[2].data, b"World\n"); - assert_eq!(result.chunks[3].data, b"\n"); - assert_eq!(result.chunks[4].data, b"\n"); - } - - #[test] - fn test_line_chunking_empty_file() { - let data = b""; - - let result = split_by_lines_impl(data).unwrap(); - - // Should have 1 empty chunk - assert_eq!(result.chunks.len(), 1); - assert_eq!(result.chunks[0].data, b""); - } - - #[test] - fn test_detect_line_ending() { - // Test Unix detection - let unix_data = b"Hello\nWorld\n"; - assert_eq!(detect_line_ending(unix_data), LineEndingType::Unix); - - // Test Windows detection - let windows_data = b"Hello\r\nWorld\r\n"; - assert_eq!(detect_line_ending(windows_data), LineEndingType::Windows); - - // Test mixed detection - let mixed_data = b"Hello\nWorld\r\n"; - assert_eq!(detect_line_ending(mixed_data), LineEndingType::Mixed); - - // Test no newlines - let no_newlines = b"Hello World"; - assert_eq!(detect_line_ending(no_newlines), LineEndingType::Mixed); - } - - #[test] - fn test_custom_line_ending_unix() { - let data = b"Hello\nWorld\n"; - - let result = split_by_lines_custom::(data).unwrap(); - - assert_eq!(result.chunks.len(), 2); - assert_eq!(result.chunks[0].data, b"Hello\n"); - assert_eq!(result.chunks[1].data, b"World\n"); - } - - #[test] - fn test_custom_line_ending_windows() { - let data = b"Hello\r\nWorld\r\n"; - - let result = split_by_lines_custom::(data).unwrap(); - - assert_eq!(result.chunks.len(), 2); - assert_eq!(result.chunks[0].data, b"Hello\r\n"); - assert_eq!(result.chunks[1].data, b"World\r\n"); - } - - #[test] - fn test_line_ending_type_split() { - let unix_data = b"Hello\nWorld\n"; - let windows_data = b"Hello\r\nWorld\r\n"; - let mixed_data = b"Hello\nWorld\r\n"; - - // Test Unix - let unix_result = LineEndingType::Unix.split_by_lines(unix_data).unwrap(); - assert_eq!(unix_result.chunks.len(), 2); - - // Test Windows - let windows_result = LineEndingType::Windows - .split_by_lines(windows_data) - .unwrap(); - assert_eq!(windows_result.chunks.len(), 2); - - // Test Mixed - let mixed_result = LineEndingType::Mixed.split_by_lines(mixed_data).unwrap(); - assert_eq!(mixed_result.chunks.len(), 2); - } - - #[test] - fn test_chunk_hash_uniqueness() { - // Test that different lines produce different hashes - let data1 = b"Hello\n"; - let data2 = b"World\n"; - - let result1 = split_by_lines_impl(data1).unwrap(); - let result2 = split_by_lines_impl(data2).unwrap(); - - assert_ne!(result1.chunks[0].hash, result2.chunks[0].hash); - } -} -- cgit