diff options
| author | 魏曹先生 <1992414357@qq.com> | 2026-03-08 22:48:54 +0800 |
|---|---|---|
| committer | 魏曹先生 <1992414357@qq.com> | 2026-03-08 22:48:54 +0800 |
| commit | 47e0ffd50427440696c245814517e4f5fa94ed83 (patch) | |
| tree | 777b1107af04f6b5bcc79673064b1821e1b7f59f /systems | |
| parent | 90ed18a41fef137ed0637cf9fc6aa667de2c905f (diff) | |
Move action system to legacy and remove storage system
Diffstat (limited to 'systems')
| -rw-r--r-- | systems/action/Cargo.toml | 15 | ||||
| -rw-r--r-- | systems/action/action_macros/Cargo.toml | 20 | ||||
| -rw-r--r-- | systems/action/action_macros/src/lib.rs | 248 | ||||
| -rw-r--r-- | systems/action/src/action.rs | 244 | ||||
| -rw-r--r-- | systems/action/src/action_pool.rs | 247 | ||||
| -rw-r--r-- | systems/action/src/lib.rs | 6 | ||||
| -rw-r--r-- | systems/storage/Cargo.toml | 13 | ||||
| -rw-r--r-- | systems/storage/src/error.rs | 8 | ||||
| -rw-r--r-- | systems/storage/src/lib.rs | 2 | ||||
| -rw-r--r-- | systems/storage/src/store.rs | 493 | ||||
| -rw-r--r-- | systems/storage/src/store/cdc.rs | 307 | ||||
| -rw-r--r-- | systems/storage/src/store/fixed.rs | 417 | ||||
| -rw-r--r-- | systems/storage/src/store/line.rs | 393 |
13 files changed, 0 insertions, 2413 deletions
diff --git a/systems/action/Cargo.toml b/systems/action/Cargo.toml deleted file mode 100644 index 5317975..0000000 --- a/systems/action/Cargo.toml +++ /dev/null @@ -1,15 +0,0 @@ -[package] -name = "action_system" -edition = "2024" -version.workspace = true - -[dependencies] -tcp_connection = { path = "../../utils/tcp_connection" } -action_system_macros = { path = "action_macros" } - -# Serialization -serde = { version = "1.0.228", features = ["derive"] } -serde_json = "1.0.145" - -# Async & Networking -tokio = "1.48.0" diff --git a/systems/action/action_macros/Cargo.toml b/systems/action/action_macros/Cargo.toml deleted file mode 100644 index 0f209e2..0000000 --- a/systems/action/action_macros/Cargo.toml +++ /dev/null @@ -1,20 +0,0 @@ -[package] -name = "action_system_macros" -edition = "2024" -version.workspace = true - -[lib] -proc-macro = true - -[dependencies] -tcp_connection = { path = "../../../utils/tcp_connection" } - -just_fmt = "0.1.2" - -syn = { version = "2.0", features = ["full", "extra-traits"] } -quote = "1.0" -proc-macro2 = "1.0" - -# Serialization -serde = { version = "1.0.228", features = ["derive"] } -serde_json = "1.0.145" diff --git a/systems/action/action_macros/src/lib.rs b/systems/action/action_macros/src/lib.rs deleted file mode 100644 index 6da0339..0000000 --- a/systems/action/action_macros/src/lib.rs +++ /dev/null @@ -1,248 +0,0 @@ -use proc_macro::TokenStream; -use quote::quote; -use syn::{ItemFn, parse_macro_input}; - -/// # Macro - Generate Action -/// -/// When annotating a function with the `#[action_gen]` macro in the following format, it generates boilerplate code for client-server interaction -/// -/// ```ignore -/// #[action_gen] -/// async fn action_name(ctx: ActionContext, argument: YourArgument) -> Result<YourResult, TcpTargetError> { -/// // Write your client and server logic here -/// if ctx.is_proc_on_remote() { -/// // Server logic -/// } -/// if ctx.is_proc_on_local() { -/// // Client logic -/// } -/// } -/// ``` -/// -/// > WARNING: -/// > For Argument and Result types, the `action_gen` macro only supports types that derive serde's Serialize and Deserialize -/// -/// ## Generated Code -/// -/// `action_gen` will generate the following: -/// -/// 1. Complete implementation of Action<Args, Return> -/// 2. Process / Register method -/// -/// ## How to use -/// -/// You can use your generated method as follows -/// -/// ```ignore -/// async fn main() -> Result<(), TcpTargetError> { -/// -/// // Prepare your argument -/// let args = YourArgument::default(); -/// -/// // Create a pool and context -/// let mut pool = ActionPool::new(); -/// let ctx = ActionContext::local(); -/// -/// // Register your action -/// register_your_action(&mut pool); -/// -/// // Process your action -/// proc_your_action(&pool, ctx, args).await?; -/// -/// Ok(()) -/// } -/// ``` -#[proc_macro_attribute] -pub fn action_gen(attr: TokenStream, item: TokenStream) -> TokenStream { - let input_fn = parse_macro_input!(item as ItemFn); - let is_local = if attr.is_empty() { - false - } else { - let attr_str = attr.to_string(); - attr_str == "local" || attr_str.contains("local") - }; - - generate_action_struct(input_fn, is_local).into() -} - -fn generate_action_struct(input_fn: ItemFn, _is_local: bool) -> proc_macro2::TokenStream { - let fn_vis = &input_fn.vis; - let fn_sig = &input_fn.sig; - let fn_name = &fn_sig.ident; - let fn_block = &input_fn.block; - - validate_function_signature(fn_sig); - - let (context_param_name, arg_param_name, arg_type, return_type) = - extract_parameters_and_types(fn_sig); - - let struct_name = quote::format_ident!("{}", convert_to_pascal_case(&fn_name.to_string())); - - let action_name_ident = &fn_name; - - let register_this_action = quote::format_ident!("register_{}", action_name_ident); - let proc_this_action = quote::format_ident!("proc_{}", action_name_ident); - - quote! { - #[derive(Debug, Clone, Default)] - #fn_vis struct #struct_name; - - impl action_system::action::Action<#arg_type, #return_type> for #struct_name { - fn action_name() -> &'static str { - Box::leak(just_fmt::snake_case!(stringify!(#action_name_ident)).into_boxed_str()) - } - - fn is_remote_action() -> bool { - !#_is_local - } - - async fn process(#context_param_name: action_system::action::ActionContext, #arg_param_name: #arg_type) -> Result<#return_type, tcp_connection::error::TcpTargetError> { - #fn_block - } - } - - #fn_vis fn #register_this_action(pool: &mut action_system::action_pool::ActionPool) { - pool.register::<#struct_name, #arg_type, #return_type>(); - } - - #fn_vis async fn #proc_this_action( - pool: &action_system::action_pool::ActionPool, - mut ctx: action_system::action::ActionContext, - #arg_param_name: #arg_type - ) -> Result<#return_type, tcp_connection::error::TcpTargetError> { - ctx.set_is_remote_action(!#_is_local); - let args_json = serde_json::to_string(&#arg_param_name) - .map_err(|e| { - tcp_connection::error::TcpTargetError::Serialization(e.to_string()) - })?; - let result_json = pool.process_json( - Box::leak(just_fmt::snake_case!(stringify!(#action_name_ident)).into_boxed_str()), // LEAK??? OH SHIT - ctx, - args_json, - ).await?; - serde_json::from_str(&result_json) - .map_err(|e| { - tcp_connection::error::TcpTargetError::Serialization(e.to_string()) - }) - } - - #[allow(dead_code)] - #[deprecated = "This function is used by #[action_gen] as a template."] - #[doc = "Template function for #[[action_gen]] - do not call directly."] - #[doc = "Use the generated struct instead."] - #[doc = ""] - #[doc = "Register the action to the pool."] - #[doc = "```ignore"] - #[doc = "register_your_func(&mut pool);"] - #[doc = "```"] - #[doc = ""] - #[doc = "Process the action at the pool."] - #[doc = "```ignore"] - #[doc = "let result = proc_your_func(&pool, ctx, arg).await?;"] - #[doc = "```"] - #fn_vis #fn_sig #fn_block - } -} - -fn validate_function_signature(fn_sig: &syn::Signature) { - if fn_sig.asyncness.is_none() { - panic!("Expected async function for Action, but found synchronous function"); - } - - if fn_sig.inputs.len() != 2 { - panic!( - "Expected exactly 2 arguments for Action function: ctx: ActionContext and arg: T, but found {} arguments", - fn_sig.inputs.len() - ); - } - - let return_type = match &fn_sig.output { - syn::ReturnType::Type(_, ty) => ty, - _ => panic!( - "Expected Action function to return Result<T, TcpTargetError>, but found no return type" - ), - }; - - if let syn::Type::Path(type_path) = return_type.as_ref() { - if let Some(segment) = type_path.path.segments.last() - && segment.ident != "Result" - { - panic!( - "Expected Action function to return Result<T, TcpTargetError>, but found different return type" - ); - } - } else { - panic!( - "Expected Action function to return Result<T, TcpTargetError>, but found no return type" - ); - } -} - -fn convert_to_pascal_case(s: &str) -> String { - s.split('_') - .map(|word| { - let mut chars = word.chars(); - match chars.next() { - None => String::new(), - Some(first) => first.to_uppercase().collect::<String>() + chars.as_str(), - } - }) - .collect() -} - -fn extract_parameters_and_types( - fn_sig: &syn::Signature, -) -> ( - proc_macro2::TokenStream, - proc_macro2::TokenStream, - proc_macro2::TokenStream, - proc_macro2::TokenStream, -) { - let mut inputs = fn_sig.inputs.iter(); - - let context_param = match inputs.next() { - Some(syn::FnArg::Typed(pat_type)) => { - let pat = &pat_type.pat; - quote::quote!(#pat) - } - _ => { - panic!("Expected the first argument to be a typed parameter, but found something else") - } - }; - - let arg_param = match inputs.next() { - Some(syn::FnArg::Typed(pat_type)) => { - let pat = &pat_type.pat; - let ty = &pat_type.ty; - (quote::quote!(#pat), quote::quote!(#ty)) - } - _ => { - panic!("Expected the second argument to be a typed parameter, but found something else") - } - }; - - let (arg_param_name, arg_type) = arg_param; - - let return_type = match &fn_sig.output { - syn::ReturnType::Type(_, ty) => { - if let syn::Type::Path(type_path) = ty.as_ref() { - if let syn::PathArguments::AngleBracketed(args) = - &type_path.path.segments.last().unwrap().arguments - { - if let Some(syn::GenericArgument::Type(ty)) = args.args.first() { - quote::quote!(#ty) - } else { - panic!("Expected to extract the success type of Result, but failed"); - } - } else { - panic!("Expected Result type to have generic parameters, but found none"); - } - } else { - panic!("Expected return type to be Result, but found different type"); - } - } - _ => panic!("Expected function to have return type, but found none"), - }; - - (context_param, arg_param_name, arg_type, return_type) -} diff --git a/systems/action/src/action.rs b/systems/action/src/action.rs deleted file mode 100644 index 62425ff..0000000 --- a/systems/action/src/action.rs +++ /dev/null @@ -1,244 +0,0 @@ -use serde::{Serialize, de::DeserializeOwned}; -use std::any::{Any, TypeId}; -use std::collections::HashMap; -use std::sync::Arc; -use tcp_connection::{error::TcpTargetError, instance::ConnectionInstance}; -use tokio::{net::TcpStream, sync::Mutex}; - -/// # Trait - Action<Args, Return> -/// -/// A trait used to describe the interaction pattern between client and server -/// -/// ## Generics -/// -/// Args: Represents the parameter type required for this action -/// -/// Return: Represents the return type of this action -/// -/// The above generics must implement serde's Serialize and DeserializeOwned traits, -/// and must be sendable between threads -/// -/// ## Implementation -/// -/// ```ignore -/// pub trait Action<Args, Return> -/// where -/// Args: Serialize + DeserializeOwned + Send, -/// Return: Serialize + DeserializeOwned + Send, -/// { -/// /// Name, used to inform the server which action to execute -/// fn action_name() -> &'static str; -/// -/// /// Whether it's a local Action, used to inform the system if it only runs locally -/// fn is_remote_action() -> bool; -/// -/// /// Action processing logic -/// fn process( -/// context: ActionContext, -/// args: Args, -/// ) -> impl std::future::Future<Output = Result<Return, TcpTargetError>> + Send; -/// } -/// ``` -pub trait Action<Args, Return> -where - Args: Serialize + DeserializeOwned + Send, - Return: Serialize + DeserializeOwned + Send, -{ - fn action_name() -> &'static str; - - fn is_remote_action() -> bool; - - fn process( - context: ActionContext, - args: Args, - ) -> impl std::future::Future<Output = Result<Return, TcpTargetError>> + Send; -} - -/// # Struct - ActionContext -/// -/// Used to inform the Action about the current execution environment -/// -/// ## Creation -/// -/// Create ActionContext using the following methods: -/// -/// ```ignore -/// -/// // The instance here is the connection instance passed from external sources for communicating with the server -/// // For specific usage, please refer to the `/crates/utils/tcp_connection` section -/// -/// fn init_local_action_ctx(instance: ConnectionInstance) { -/// // Create context and specify execution on local -/// let mut ctx = ActionContext::local(); -/// } -/// -/// fn init_remote_action_ctx(instance: ConnectionInstance) { -/// // Create context and specify execution on remote -/// let mut ctx = ActionContext::remote(); -/// } -#[derive(Default)] -pub struct ActionContext { - /// Whether the action is executed locally or remotely - proc_on_local: bool, - - /// Whether the action being executed in the current context is a remote action - is_remote_action: bool, - - /// The name of the action being executed - action_name: String, - - /// The JSON-serialized arguments for the action - action_args_json: String, - - /// The connection instance in the current context, - instance: Option<Arc<Mutex<ConnectionInstance>>>, - - /// Generic data storage for arbitrary types - data: HashMap<TypeId, Arc<dyn Any + Send + Sync>>, -} - -impl ActionContext { - /// Generate local context - pub fn local() -> Self { - ActionContext { - proc_on_local: true, - ..Default::default() - } - } - - /// Generate remote context - pub fn remote() -> Self { - ActionContext { - proc_on_local: false, - ..Default::default() - } - } - - /// Build connection instance from TcpStream - pub fn build_instance(mut self, stream: TcpStream) -> Self { - self.instance = Some(Arc::new(Mutex::new(ConnectionInstance::from(stream)))); - self - } - - /// Insert connection instance into context - pub fn insert_instance(mut self, instance: ConnectionInstance) -> Self { - self.instance = Some(Arc::new(Mutex::new(instance))); - self - } - - /// Pop connection instance from context - pub fn pop_instance(&mut self) -> Option<Arc<Mutex<ConnectionInstance>>> { - self.instance.take() - } -} - -impl ActionContext { - /// Whether the action is executed locally - pub fn is_proc_on_local(&self) -> bool { - self.proc_on_local - } - - /// Whether the action is executed remotely - pub fn is_proc_on_remote(&self) -> bool { - !self.proc_on_local - } - - /// Whether the action being executed in the current context is a remote action - pub fn is_remote_action(&self) -> bool { - self.is_remote_action - } - - /// Set whether the action being executed in the current context is a remote action - pub fn set_is_remote_action(&mut self, is_remote_action: bool) { - self.is_remote_action = is_remote_action; - } - - /// Get the connection instance in the current context - pub fn instance(&self) -> &Option<Arc<Mutex<ConnectionInstance>>> { - &self.instance - } - - /// Get a mutable reference to the connection instance in the current context - pub fn instance_mut(&mut self) -> &mut Option<Arc<Mutex<ConnectionInstance>>> { - &mut self.instance - } - - /// Get the action name from the context - pub fn action_name(&self) -> &str { - &self.action_name - } - - /// Get the action arguments from the context - pub fn action_args_json(&self) -> &String { - &self.action_args_json - } - - /// Set the action name in the context - pub fn set_action_name(mut self, action_name: String) -> Self { - self.action_name = action_name; - self - } - - /// Set the action arguments in the context - pub fn set_action_args(mut self, action_args: String) -> Self { - self.action_args_json = action_args; - self - } - - /// Insert arbitrary data in the context - pub fn with_data<T: Any + Send + Sync>(mut self, value: T) -> Self { - self.data.insert(TypeId::of::<T>(), Arc::new(value)); - self - } - - /// Insert arbitrary data as Arc in the context - pub fn with_arc_data<T: Any + Send + Sync>(mut self, value: Arc<T>) -> Self { - self.data.insert(TypeId::of::<T>(), value); - self - } - - /// Insert arbitrary data in the context - pub fn insert_data<T: Any + Send + Sync>(&mut self, value: T) { - self.data.insert(TypeId::of::<T>(), Arc::new(value)); - } - - /// Insert arbitrary data as Arc in the context - pub fn insert_arc_data<T: Any + Send + Sync>(&mut self, value: Arc<T>) { - self.data.insert(TypeId::of::<T>(), value); - } - - /// Get arbitrary data from the context - pub fn get<T: Any + Send + Sync>(&self) -> Option<&T> { - self.data - .get(&TypeId::of::<T>()) - .and_then(|arc| arc.downcast_ref::<T>()) - } - - /// Get arbitrary data as Arc from the context - pub fn get_arc<T: Any + Send + Sync>(&self) -> Option<Arc<T>> { - self.data - .get(&TypeId::of::<T>()) - .and_then(|arc| Arc::clone(arc).downcast::<T>().ok()) - } - - /// Remove and return arbitrary data from the context - pub fn remove<T: Any + Send + Sync>(&mut self) -> Option<Arc<T>> { - self.data - .remove(&TypeId::of::<T>()) - .and_then(|arc| arc.downcast::<T>().ok()) - } - - /// Check if the context contains data of a specific type - pub fn contains<T: Any + Send + Sync>(&self) -> bool { - self.data.contains_key(&TypeId::of::<T>()) - } - - /// Take ownership of the context and extract data of a specific type - pub fn take<T: Any + Send + Sync>(mut self) -> (Self, Option<Arc<T>>) { - let value = self - .data - .remove(&TypeId::of::<T>()) - .and_then(|arc| arc.downcast::<T>().ok()); - (self, value) - } -} diff --git a/systems/action/src/action_pool.rs b/systems/action/src/action_pool.rs deleted file mode 100644 index 019fa6d..0000000 --- a/systems/action/src/action_pool.rs +++ /dev/null @@ -1,247 +0,0 @@ -use std::pin::Pin; - -use serde::{Serialize, de::DeserializeOwned}; -use serde_json; -use tcp_connection::error::TcpTargetError; - -use crate::action::{Action, ActionContext}; - -type ProcBeginCallback = for<'a> fn( - &'a mut ActionContext, - args: &'a (dyn std::any::Any + Send + Sync), -) -> ProcBeginFuture<'a>; -type ProcEndCallback = fn() -> ProcEndFuture; - -type ProcBeginFuture<'a> = Pin<Box<dyn Future<Output = Result<(), TcpTargetError>> + Send + 'a>>; -type ProcEndFuture = Pin<Box<dyn Future<Output = Result<(), TcpTargetError>> + Send>>; - -/// # Struct - ActionPool -/// -/// This struct is used to register and record all accessible and executable actions -/// -/// It also registers `on_proc_begin` and `on_proc_end` callback functions -/// used for action initialization -/// -/// ## Creating and registering actions -/// ```ignore -/// fn init_action_pool() { -/// let mut pool = Action::new(); -/// -/// // Register action -/// pool.register<YourAction, ActionArgument, ActionReturn>(); -/// -/// // If the action is implemented with `#[action_gen]`, you can also do -/// register_your_action(&mut pool); -/// } -/// ``` -pub struct ActionPool { - /// HashMap storing action name to action implementation mapping - actions: std::collections::HashMap<&'static str, Box<dyn ActionErased>>, - - /// Callback to execute when process begins - on_proc_begin: Option<ProcBeginCallback>, - - /// Callback to execute when process ends - on_proc_end: Option<ProcEndCallback>, -} - -impl Default for ActionPool { - fn default() -> Self { - Self::new() - } -} - -impl ActionPool { - /// Creates a new empty ActionPool - pub fn new() -> Self { - Self { - actions: std::collections::HashMap::new(), - on_proc_begin: None, - on_proc_end: None, - } - } - - /// Sets a callback to be executed when process begins - pub fn set_on_proc_begin(&mut self, callback: ProcBeginCallback) { - self.on_proc_begin = Some(callback); - } - - /// Sets a callback to be executed when process ends - pub fn set_on_proc_end(&mut self, callback: ProcEndCallback) { - self.on_proc_end = Some(callback); - } - - /// Registers an action type with the pool - /// - /// Usage: - /// ```ignore - /// action_pool.register::<MyAction, MyArgs, MyReturn>(); - /// ``` - pub fn register<A, Args, Return>(&mut self) - where - A: Action<Args, Return> + Send + Sync + 'static, - Args: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static, - Return: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static, - { - let action_name = A::action_name(); - self.actions.insert( - action_name, - Box::new(ActionWrapper::<A, Args, Return>(std::marker::PhantomData)), - ); - } - - /// Processes an action by name with given context and arguments - /// - /// Usage: - /// ```ignore - /// let result = action_pool.process::<MyArgs, MyReturn>("my_action", context, args).await?; - /// ``` - /// Processes an action by name with JSON-serialized arguments - /// - /// Usage: - /// ```ignore - /// let result_json = action_pool.process_json("my_action", context, args_json).await?; - /// let result: MyReturn = serde_json::from_str(&result_json)?; - /// ``` - pub async fn process_json<'a>( - &'a self, - action_name: &'a str, - context: ActionContext, - args_json: String, - ) -> Result<String, TcpTargetError> { - if let Some(action) = self.actions.get(action_name) { - // Set action name and args in context for callbacks - let context = context.set_action_name(action_name.to_string()); - let mut context = context.set_action_args(args_json.clone()); - - self.exec_on_proc_begin(&mut context, &args_json).await?; - let result = action.process_json_erased(context, args_json).await?; - self.exec_on_proc_end().await?; - Ok(result) - } else { - Err(TcpTargetError::Unsupported("InvalidAction".to_string())) - } - } - - /// Processes an action by name with given context and arguments - /// - /// Usage: - /// ```ignore - /// let result = action_pool.process::<MyArgs, MyReturn>("my_action", context, args).await?; - /// ``` - pub async fn process<'a, Args, Return>( - &'a self, - action_name: &'a str, - mut context: ActionContext, - args: Args, - ) -> Result<Return, TcpTargetError> - where - Args: serde::de::DeserializeOwned + Send + Sync + 'static, - Return: serde::Serialize + Send + 'static, - { - if let Some(action) = self.actions.get(action_name) { - self.exec_on_proc_begin(&mut context, &args).await?; - let result = action.process_erased(context, Box::new(args)).await?; - let result = *result - .downcast::<Return>() - .map_err(|_| TcpTargetError::Unsupported("InvalidArguments".to_string()))?; - self.exec_on_proc_end().await?; - Ok(result) - } else { - Err(TcpTargetError::Unsupported("InvalidAction".to_string())) - } - } - - /// Executes the process begin callback if set - async fn exec_on_proc_begin( - &self, - context: &mut ActionContext, - args: &(dyn std::any::Any + Send + Sync), - ) -> Result<(), TcpTargetError> { - if let Some(callback) = &self.on_proc_begin { - callback(context, args).await - } else { - Ok(()) - } - } - - /// Executes the process end callback if set - async fn exec_on_proc_end(&self) -> Result<(), TcpTargetError> { - if let Some(callback) = &self.on_proc_end { - callback().await - } else { - Ok(()) - } - } -} - -/// Trait for type-erased actions that can be stored in ActionPool -type ProcessErasedFuture = std::pin::Pin< - Box< - dyn std::future::Future<Output = Result<Box<dyn std::any::Any + Send>, TcpTargetError>> - + Send, - >, ->; -type ProcessJsonErasedFuture = - std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, TcpTargetError>> + Send>>; - -trait ActionErased: Send + Sync { - /// Processes the action with type-erased arguments and returns type-erased result - fn process_erased( - &self, - context: ActionContext, - args: Box<dyn std::any::Any + Send>, - ) -> ProcessErasedFuture; - - /// Processes the action with JSON-serialized arguments and returns JSON-serialized result - fn process_json_erased( - &self, - context: ActionContext, - args_json: String, - ) -> ProcessJsonErasedFuture; -} - -/// Wrapper struct that implements ActionErased for concrete Action types -struct ActionWrapper<A, Args, Return>(std::marker::PhantomData<(A, Args, Return)>); - -impl<A, Args, Return> ActionErased for ActionWrapper<A, Args, Return> -where - A: Action<Args, Return> + Send + Sync, - Args: Serialize + DeserializeOwned + Send + Sync + 'static, - Return: Serialize + DeserializeOwned + Send + Sync + 'static, -{ - fn process_erased( - &self, - context: ActionContext, - args: Box<dyn std::any::Any + Send>, - ) -> std::pin::Pin< - Box< - dyn std::future::Future<Output = Result<Box<dyn std::any::Any + Send>, TcpTargetError>> - + Send, - >, - > { - Box::pin(async move { - let args = *args - .downcast::<Args>() - .map_err(|_| TcpTargetError::Unsupported("InvalidArguments".to_string()))?; - let result = A::process(context, args).await?; - Ok(Box::new(result) as Box<dyn std::any::Any + Send>) - }) - } - - fn process_json_erased( - &self, - context: ActionContext, - args_json: String, - ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, TcpTargetError>> + Send>> - { - Box::pin(async move { - let args: Args = serde_json::from_str(&args_json) - .map_err(|e| TcpTargetError::Serialization(format!("Deserialize failed: {}", e)))?; - let result = A::process(context, args).await?; - let result_json = serde_json::to_string(&result) - .map_err(|e| TcpTargetError::Serialization(format!("Serialize failed: {}", e)))?; - Ok(result_json) - }) - } -} diff --git a/systems/action/src/lib.rs b/systems/action/src/lib.rs deleted file mode 100644 index 12ae999..0000000 --- a/systems/action/src/lib.rs +++ /dev/null @@ -1,6 +0,0 @@ -pub mod macros { - pub use action_system_macros::*; -} - -pub mod action; -pub mod action_pool; diff --git a/systems/storage/Cargo.toml b/systems/storage/Cargo.toml deleted file mode 100644 index 5afb780..0000000 --- a/systems/storage/Cargo.toml +++ /dev/null @@ -1,13 +0,0 @@ -[package] -name = "storage_system" -edition = "2024" -version.workspace = true - -[dependencies] -thiserror = "1.0.69" -tokio = { version = "1.48", features = ["full"] } - -blake3 = "1.5.0" -hex = "0.4.3" -memmap2 = "0.9.4" -log = "0.4.25" diff --git a/systems/storage/src/error.rs b/systems/storage/src/error.rs deleted file mode 100644 index 4b3ad7e..0000000 --- a/systems/storage/src/error.rs +++ /dev/null @@ -1,8 +0,0 @@ -#[derive(Debug, thiserror::Error)] -pub enum StorageIOError { - #[error("IO error: {0}")] - IOErr(#[from] std::io::Error), - - #[error("Hash too short")] - HashTooShort, -} diff --git a/systems/storage/src/lib.rs b/systems/storage/src/lib.rs deleted file mode 100644 index 4f89971..0000000 --- a/systems/storage/src/lib.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod error; -pub mod store; diff --git a/systems/storage/src/store.rs b/systems/storage/src/store.rs deleted file mode 100644 index 6492449..0000000 --- a/systems/storage/src/store.rs +++ /dev/null @@ -1,493 +0,0 @@ -use std::path::{Path, PathBuf}; -use std::time::Instant; - -use blake3; -use log::{info, trace}; -use memmap2::Mmap; -use tokio::fs; - -use crate::error::StorageIOError; - -pub mod cdc; -pub mod fixed; -pub mod line; - -#[derive(Default, Debug)] -pub struct StorageConfig { - /// Chunking policy for splitting files - pub chunking_policy: ChunkingPolicy, -} - -impl StorageConfig { - /// Create a new StorageConfig with CDC chunking policy - pub fn cdc(avg_size: u32) -> Self { - Self { - chunking_policy: ChunkingPolicy::Cdc(avg_size), - } - } - - /// Create a new StorageConfig with fixed-size chunking policy - pub fn fixed_size(chunk_size: u32) -> Self { - Self { - chunking_policy: ChunkingPolicy::FixedSize(chunk_size), - } - } - - /// Create a new StorageConfig with line-based chunking policy - pub fn line() -> Self { - Self { - chunking_policy: ChunkingPolicy::Line, - } - } -} - -#[derive(Debug)] -pub enum ChunkingPolicy { - /// Content-Defined Chunking using Rabin fingerprinting - /// The `u32` value represents the desired average chunk size in bytes - Cdc(u32), - - /// Fixed-size chunking - /// The `u32` value represents the exact size of each chunk in bytes - FixedSize(u32), - - /// Line-based chunking - /// Each line becomes a separate chunk - Line, -} - -impl Default for ChunkingPolicy { - fn default() -> Self { - ChunkingPolicy::Cdc(64 * 1024) // Default to 64KB CDC - } -} - -/// Represents a chunk of data with its hash and content -#[derive(Debug, Clone)] -pub struct Chunk { - /// Blake3 hash of the chunk content - pub hash: [u8; 32], - /// Raw chunk data - pub data: Vec<u8>, -} - -/// Represents an index entry pointing to a chunk -#[derive(Debug, Clone)] -pub struct IndexEntry { - /// Blake3 hash of the chunk - pub hash: [u8; 32], - /// Size of the chunk in bytes - pub size: u32, -} - -/// Result of chunking a file -#[derive(Debug)] -pub struct ChunkingResult { - /// List of chunks extracted from the file - pub chunks: Vec<Chunk>, - /// Total size of the original file - pub total_size: u64, -} - -/// Split a file into chunks and store them in the repository, then output the index file to the specified directory -pub async fn write_file( - file_to_write: impl Into<PathBuf>, - storage_dir: impl Into<PathBuf>, - output_index_file: impl Into<PathBuf>, - cfg: &StorageConfig, -) -> Result<(), StorageIOError> { - let (file_to_write, storage_dir, output_index_file) = - precheck(file_to_write, storage_dir, output_index_file).await?; - - info!("Starting file write: {}", file_to_write.display()); - let start_time = Instant::now(); - - // Memory map the entire file - let file = std::fs::File::open(&file_to_write)?; - let mmap = unsafe { Mmap::map(&file)? }; - let data = &mmap[..]; - - // Split into chunks based on policy - let chunking_result = split_into_chunks(&data, &cfg.chunking_policy)?; - - // Store chunks and create index - let index_entries = store_chunks(&chunking_result.chunks, &storage_dir).await?; - - // Write index file - write_index_file(&index_entries, &output_index_file).await?; - - let duration = start_time.elapsed(); - info!( - "File write completed in {:?}: {}", - duration, - file_to_write.display() - ); - - Ok(()) -} - -/// Split data into chunks based on the specified policy -pub fn split_into_chunks( - data: &[u8], - policy: &ChunkingPolicy, -) -> Result<ChunkingResult, StorageIOError> { - match policy { - ChunkingPolicy::Cdc(avg_size) => split_cdc(data, *avg_size), - ChunkingPolicy::FixedSize(chunk_size) => split_fixed(data, *chunk_size), - ChunkingPolicy::Line => split_by_lines(data), - } -} - -/// Split data using Content-Defined Chunking -fn split_cdc(data: &[u8], avg_size: u32) -> Result<ChunkingResult, StorageIOError> { - use crate::store::cdc::split_cdc_impl; - split_cdc_impl(data, avg_size) -} - -/// Split data using fixed-size chunking -fn split_fixed(data: &[u8], chunk_size: u32) -> Result<ChunkingResult, StorageIOError> { - use crate::store::fixed::split_fixed_impl; - split_fixed_impl(data, chunk_size) -} - -/// Split data by lines -fn split_by_lines(data: &[u8]) -> Result<ChunkingResult, StorageIOError> { - use crate::store::line::split_by_lines_impl; - split_by_lines_impl(data) -} - -/// Store chunks in the storage directory and return index entries -async fn store_chunks( - chunks: &[Chunk], - storage_dir: &Path, -) -> Result<Vec<IndexEntry>, StorageIOError> { - let mut index_entries = Vec::with_capacity(chunks.len()); - - let writed_counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); - let total_chunks = chunks.len(); - - for chunk in chunks { - // Create storage directory structure based on hash - let hash_str = hex::encode(chunk.hash); - let chunk_dir = get_dir(storage_dir.to_path_buf(), hash_str.clone())?; - - // Create directory if it doesn't exist - if let Some(parent) = chunk_dir.parent() { - fs::create_dir_all(parent).await?; - } - - // Write chunk data - let chunk_path = chunk_dir.with_extension("chunk"); - if !chunk_path.exists() { - trace!("W: {}", hash_str); - fs::write(&chunk_path, &chunk.data).await?; - writed_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - } - - // Add to index - index_entries.push(IndexEntry { - hash: chunk.hash, - size: chunk.data.len() as u32, - }); - } - - let writed_count = writed_counter.load(std::sync::atomic::Ordering::Relaxed); - info!( - "Chunk storage completed: {}/{} ({}%) chunks written ({} duplicates skipped)", - writed_count, - total_chunks, - (writed_count as f32 / total_chunks as f32) * 100 as f32, - total_chunks - writed_count - ); - - Ok(index_entries) -} - -/// Write index file containing chunk hashes and sizes -async fn write_index_file( - entries: &[IndexEntry], - output_path: &Path, -) -> Result<(), StorageIOError> { - let mut index_data = Vec::with_capacity(entries.len() * 36); // 32 bytes hash + 4 bytes size - - for entry in entries { - index_data.extend_from_slice(&entry.hash); - index_data.extend_from_slice(&entry.size.to_le_bytes()); - } - - fs::write(output_path, &index_data).await?; - Ok(()) -} - -/// Build a file from the repository to the specified directory using an index file -pub async fn build_file( - index_to_build: impl Into<PathBuf>, - storage_dir: impl Into<PathBuf>, - output_file: impl Into<PathBuf>, -) -> Result<(), StorageIOError> { - let (index_to_build, storage_dir, output_file) = - precheck(index_to_build, storage_dir, output_file).await?; - - info!( - "Starting file build from index: {}", - index_to_build.display() - ); - let start_time = Instant::now(); - - // Read index file - let index_entries = read_index_file(&index_to_build).await?; - - // Reconstruct file from chunks - let reconstructed_data = reconstruct_from_chunks(&index_entries, &storage_dir).await?; - - // Write output file - fs::write(&output_file, &reconstructed_data).await?; - - let duration = start_time.elapsed(); - info!( - "File build completed in {:?}: {} -> {}", - duration, - index_to_build.display(), - output_file.display() - ); - - Ok(()) -} - -/// Read index file and parse entries -async fn read_index_file(index_path: &Path) -> Result<Vec<IndexEntry>, StorageIOError> { - // Open file and memory map it - let file = std::fs::File::open(index_path)?; - let mmap = unsafe { Mmap::map(&file)? }; - - // Each entry is 36 bytes (32 bytes hash + 4 bytes size) - if mmap.len() % 36 != 0 { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "Invalid index file format", - ) - .into()); - } - - let num_entries = mmap.len() / 36; - let mut entries = Vec::with_capacity(num_entries); - - for i in 0..num_entries { - let start = i * 36; - let hash_start = start; - let size_start = start + 32; - - let mut hash = [0u8; 32]; - hash.copy_from_slice(&mmap[hash_start..hash_start + 32]); - - let size = u32::from_le_bytes([ - mmap[size_start], - mmap[size_start + 1], - mmap[size_start + 2], - mmap[size_start + 3], - ]); - - entries.push(IndexEntry { hash, size }); - } - - Ok(entries) -} - -/// Reconstruct file data from chunks using index entries -async fn reconstruct_from_chunks( - entries: &[IndexEntry], - storage_dir: &Path, -) -> Result<Vec<u8>, StorageIOError> { - let mut reconstructed_data = Vec::new(); - - for entry in entries { - // Get chunk path from hash - let hash_str = hex::encode(entry.hash); - let chunk_dir = get_dir(storage_dir.to_path_buf(), hash_str.clone())?; - let chunk_path = chunk_dir.with_extension("chunk"); - - trace!("R: {}", hash_str); - - // Memory map the chunk file - let file = std::fs::File::open(&chunk_path)?; - let mmap = unsafe { Mmap::map(&file)? }; - - // Verify chunk size matches index - if mmap.len() != entry.size as usize { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!( - "Chunk size mismatch: expected {}, got {}", - entry.size, - mmap.len() - ), - ) - .into()); - } - - // Verify chunk hash - let actual_hash = blake3_digest(&mmap); - let expected_hash = entry.hash; - if actual_hash != expected_hash { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!( - "Chunk hash mismatch: expected {}, got {}", - hex::encode(expected_hash), - hex::encode(actual_hash) - ), - ) - .into()); - } - - // Append to reconstructed data - reconstructed_data.extend_from_slice(&mmap); - } - - Ok(reconstructed_data) -} - -/// Calculate Blake3 hash of data -fn blake3_digest(data: &[u8]) -> [u8; 32] { - let hash = blake3::hash(data); - *hash.as_bytes() -} - -/// Calculate and return the corresponding storage subdirectory path based on the given storage directory and hash string -pub fn get_dir(storage_dir: PathBuf, hash_str: String) -> Result<PathBuf, StorageIOError> { - if hash_str.len() < 4 { - return Err(StorageIOError::HashTooShort); - } - let dir = storage_dir - .join(&hash_str[0..2]) - .join(&hash_str[2..4]) - .join(&hash_str); - Ok(dir) -} - -/// Calculate Blake3 hash of data and create a Chunk -pub fn create_chunk(data: Vec<u8>) -> Chunk { - let hash = blake3_digest(&data); - Chunk { hash, data } -} - -/// Read a file and split it into chunks using the specified policy -pub async fn chunk_file( - file_path: impl Into<PathBuf>, - policy: &ChunkingPolicy, -) -> Result<ChunkingResult, StorageIOError> { - let file_path = file_path.into(); - info!("Starting chunking: {}", file_path.display()); - let start_time = Instant::now(); - - // Memory map the file - let file = std::fs::File::open(&file_path)?; - let mmap = unsafe { Mmap::map(&file)? }; - let data = &mmap[..]; - - let result = split_into_chunks(data, policy); - - let duration = start_time.elapsed(); - info!( - "Chunking completed in {:?}: {}", - duration, - file_path.display() - ); - - result -} - -/// Get chunk statistics -pub fn get_chunk_stats(chunks: &[Chunk]) -> ChunkStats { - let total_chunks = chunks.len(); - let total_size: usize = chunks.iter().map(|c| c.data.len()).sum(); - let avg_size = if total_chunks > 0 { - total_size / total_chunks - } else { - 0 - }; - let min_size = chunks.iter().map(|c| c.data.len()).min().unwrap_or(0); - let max_size = chunks.iter().map(|c| c.data.len()).max().unwrap_or(0); - - ChunkStats { - total_chunks, - total_size: total_size as u64, - avg_size, - min_size, - max_size, - } -} - -/// Statistics about chunks -#[derive(Debug, Clone)] -pub struct ChunkStats { - pub total_chunks: usize, - pub total_size: u64, - pub avg_size: usize, - pub min_size: usize, - pub max_size: usize, -} - -/// Pre-check whether the input file path, directory path, and output path are valid -pub async fn precheck( - input_file: impl Into<PathBuf>, - dir: impl Into<PathBuf>, - output_file: impl Into<PathBuf>, -) -> Result<(PathBuf, PathBuf, PathBuf), std::io::Error> { - let (input, dir, output) = (input_file.into(), dir.into(), output_file.into()); - - // Perform all checks in parallel - let (input_metadata_result, dir_metadata_result, output_metadata_result) = tokio::join!( - fs::metadata(&input), - fs::metadata(&dir), - fs::metadata(&output) - ); - - // Check if input file exists - let input_metadata = match input_metadata_result { - Ok(metadata) => metadata, - Err(_) => { - return Err(std::io::Error::new( - std::io::ErrorKind::NotFound, - format!("Input file not found: {}", input.display()), - )); - } - }; - - // Check if directory exists - let dir_metadata = match dir_metadata_result { - Ok(metadata) => metadata, - Err(_) => { - return Err(std::io::Error::new( - std::io::ErrorKind::NotFound, - format!("Directory not found: {}", dir.display()), - )); - } - }; - - // Check if output file already exists - if output_metadata_result.is_ok() { - return Err(std::io::Error::new( - std::io::ErrorKind::AlreadyExists, - format!("Output file already exist: {}", output.display()), - )); - } - - // Check if input is a file - if !input_metadata.is_file() { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - format!("Input path is not a file: {}", input.display()), - )); - } - - // Check if dir is a directory - if !dir_metadata.is_dir() { - return Err(std::io::Error::new( - std::io::ErrorKind::NotADirectory, - format!("Input path is not a directory: {}", dir.display()), - )); - } - - Ok((input, dir, output)) -} diff --git a/systems/storage/src/store/cdc.rs b/systems/storage/src/store/cdc.rs deleted file mode 100644 index cc16202..0000000 --- a/systems/storage/src/store/cdc.rs +++ /dev/null @@ -1,307 +0,0 @@ -use std::path::PathBuf; - -use crate::{error::StorageIOError, store::ChunkingResult}; - -/// Rabin fingerprint parameters for CDC -const WINDOW_SIZE: usize = 48; -const MIN_CHUNK_RATIO: f64 = 0.75; -const MAX_CHUNK_RATIO: f64 = 2.0; -const BASE_TARGET_MASK: u64 = 0xFFFF; - -/// Rabin fingerprint polynomial (irreducible polynomial) -const POLYNOMIAL: u64 = 0x3DA3358B4DC173; - -/// Precomputed table for Rabin fingerprint sliding window -static FINGERPRINT_TABLE: [u64; 256] = { - let mut table = [0u64; 256]; - let mut i = 0; - while i < 256 { - let mut fingerprint = i as u64; - let mut j = 0; - while j < 8 { - if fingerprint & 1 != 0 { - fingerprint = (fingerprint >> 1) ^ POLYNOMIAL; - } else { - fingerprint >>= 1; - } - j += 1; - } - table[i] = fingerprint; - i += 1; - } - table -}; - -/// Calculate Rabin fingerprint for a byte -#[inline] -fn rabin_fingerprint_byte(old_fingerprint: u64, old_byte: u8, new_byte: u8) -> u64 { - let old_byte_index = old_byte as usize; - - let fingerprint = - (old_fingerprint << 8) ^ (new_byte as u64) ^ FINGERPRINT_TABLE[old_byte_index]; - fingerprint -} - -/// Split data using Content-Defined Chunking with Rabin fingerprinting -pub fn split_cdc_impl(data: &[u8], avg_size: u32) -> Result<ChunkingResult, StorageIOError> { - let avg_size = avg_size as usize; - - if avg_size == 0 { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "Average chunk size must be greater than 0", - ) - .into()); - } - - // Calculate min and max chunk sizes based on average size - let min_chunk_size = (avg_size as f64 * MIN_CHUNK_RATIO) as usize; - let max_chunk_size = (avg_size as f64 * MAX_CHUNK_RATIO) as usize; - - // Ensure reasonable bounds - let min_chunk_size = min_chunk_size.max(512); // At least 512 bytes - let max_chunk_size = max_chunk_size.max(avg_size * 2); - - let target_mask = { - let scale_factor = avg_size as f64 / (64.0 * 1024.0); - let mask_value = (BASE_TARGET_MASK as f64 * scale_factor) as u64; - mask_value.max(0xC000) - }; - - let mut chunks = Vec::new(); - let mut start = 0; - let data_len = data.len(); - - while start < data_len { - let chunk_start = start; - - let search_end = (start + max_chunk_size).min(data_len); - let mut boundary_found = false; - let mut boundary_pos = search_end; - - let mut window = Vec::with_capacity(WINDOW_SIZE); - let mut fingerprint: u64 = 0; - - for i in start..search_end { - let byte = data[i]; - - // Update sliding window - if window.len() >= WINDOW_SIZE { - let old_byte = window.remove(0); - fingerprint = rabin_fingerprint_byte(fingerprint, old_byte, byte); - } else { - fingerprint = (fingerprint << 8) ^ (byte as u64); - } - window.push(byte); - - if i - chunk_start >= min_chunk_size { - if (fingerprint & target_mask) == 0 { - boundary_found = true; - boundary_pos = i + 1; - break; - } - } - } - - let chunk_end = if boundary_found { - boundary_pos - } else { - search_end - }; - - let chunk_data = data[chunk_start..chunk_end].to_vec(); - let chunk = crate::store::create_chunk(chunk_data); - chunks.push(chunk); - - start = chunk_end; - } - - if chunks.len() > 1 { - let mut merged_chunks = Vec::new(); - let mut i = 0; - - while i < chunks.len() { - let current_chunk = &chunks[i]; - - if i < chunks.len() - 1 { - let next_chunk = &chunks[i + 1]; - if current_chunk.data.len() < min_chunk_size - && current_chunk.data.len() + next_chunk.data.len() <= max_chunk_size - { - // Merge chunks - let mut merged_data = current_chunk.data.clone(); - merged_data.extend_from_slice(&next_chunk.data); - let merged_chunk = crate::store::create_chunk(merged_data); - merged_chunks.push(merged_chunk); - i += 2; - } else { - merged_chunks.push(current_chunk.clone()); - i += 1; - } - } else { - merged_chunks.push(current_chunk.clone()); - i += 1; - } - } - - chunks = merged_chunks; - } - - Ok(ChunkingResult { - chunks, - total_size: data_len as u64, - }) -} - -/// Split file using CDC with specified average chunk size -pub async fn write_file_cdc<I: Into<PathBuf>>( - file_to_write: I, - storage_dir: I, - output_index_file: I, - avg_size: u32, -) -> Result<(), StorageIOError> { - use crate::store::{StorageConfig, write_file}; - - let config = StorageConfig::cdc(avg_size); - write_file(file_to_write, storage_dir, output_index_file, &config).await -} - -/// Test function for CDC chunking -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_cdc_basic() { - let pattern: Vec<u8> = (0..1024).map(|i| (i % 256) as u8).collect(); - let mut data = Vec::new(); - for _ in 0..10 { - data.extend_from_slice(&pattern); - } - - let result = split_cdc_impl(&data, 4096).unwrap(); - - // Should have at least one chunk - assert!(!result.chunks.is_empty()); - - assert!( - result.chunks.len() >= 1 && result.chunks.len() <= 10, - "Expected 1-10 chunks for 10KB data with 4KB average, got {}", - result.chunks.len() - ); - - // Verify total size - let total_chunk_size: usize = result.chunks.iter().map(|c| c.data.len()).sum(); - assert_eq!(total_chunk_size, data.len()); - - // Verify chunk size bounds (more relaxed for CDC) - let max_size = (4096.0 * MAX_CHUNK_RATIO) as usize; - - for chunk in &result.chunks { - let chunk_size = chunk.data.len(); - // CDC can produce chunks smaller than min_size due to content patterns - // But they should not exceed max_size - assert!( - chunk_size <= max_size, - "Chunk size {} exceeds maximum {}", - chunk_size, - max_size - ); - } - } - - #[test] - fn test_cdc_small_file() { - // Small file should result in single chunk - let data = vec![1, 2, 3, 4, 5]; - - let result = split_cdc_impl(&data, 4096).unwrap(); - - // Should have exactly one chunk - assert_eq!(result.chunks.len(), 1); - assert_eq!(result.chunks[0].data.len(), data.len()); - } - - #[test] - fn test_cdc_large_avg_size() { - // Test with larger average size (256KB) - let mut data = Vec::new(); - for i in 0..(256 * 1024 * 3) { - // 768KB of data - data.push((i % 256) as u8); - } - - let result = split_cdc_impl(&data, 256 * 1024).unwrap(); - - // With 768KB data and 256KB average, should have reasonable number of chunks - // CDC is content-defined, so exact count varies - assert!( - result.chunks.len() >= 1 && result.chunks.len() <= 10, - "Expected 1-10 chunks for 768KB data with 256KB average, got {}", - result.chunks.len() - ); - - // Verify total size - let total_chunk_size: usize = result.chunks.iter().map(|c| c.data.len()).sum(); - assert_eq!(total_chunk_size, data.len()); - - // Verify chunk size bounds - let max_size = ((256 * 1024) as f64 * MAX_CHUNK_RATIO) as usize; - - for chunk in &result.chunks { - // CDC can produce chunks smaller than min_size - // But they should not exceed max_size - assert!( - chunk.data.len() <= max_size, - "Chunk size {} exceeds maximum {}", - chunk.data.len(), - max_size - ); - } - } - - #[test] - fn test_cdc_zero_avg_size() { - let data = vec![1, 2, 3]; - - let result = split_cdc_impl(&data, 0); - assert!(result.is_err()); - - match result { - Err(StorageIOError::IOErr(e)) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput); - } - _ => panic!("Expected IOErr with InvalidInput"), - } - } - - #[test] - fn test_rabin_fingerprint() { - // Test that rabin_fingerprint_byte function is deterministic - let test_bytes = [0x01, 0x02, 0x03, 0x04]; - - // Calculate fingerprint with specific sequence - let mut fingerprint1 = 0; - for &byte in &test_bytes { - fingerprint1 = rabin_fingerprint_byte(fingerprint1, 0xAA, byte); - } - - // Calculate again with same inputs - let mut fingerprint2 = 0; - for &byte in &test_bytes { - fingerprint2 = rabin_fingerprint_byte(fingerprint2, 0xAA, byte); - } - - // Same input should produce same output - assert_eq!(fingerprint1, fingerprint2); - - // Test with different old_byte produces different result - let mut fingerprint3 = 0; - for &byte in &test_bytes { - fingerprint3 = rabin_fingerprint_byte(fingerprint3, 0xBB, byte); - } - - // Different old_byte should produce different fingerprint - assert_ne!(fingerprint1, fingerprint3); - } -} diff --git a/systems/storage/src/store/fixed.rs b/systems/storage/src/store/fixed.rs deleted file mode 100644 index 044cc1c..0000000 --- a/systems/storage/src/store/fixed.rs +++ /dev/null @@ -1,417 +0,0 @@ -use std::path::PathBuf; -use std::time::Instant; - -use log::{info, trace}; -use memmap2::Mmap; -use tokio::fs; -use tokio::task; - -use crate::{ - error::StorageIOError, - store::{ChunkingResult, IndexEntry, StorageConfig, create_chunk, get_dir, precheck}, -}; - -/// Split data using fixed-size chunking -pub fn split_fixed_impl(data: &[u8], chunk_size: u32) -> Result<ChunkingResult, StorageIOError> { - let chunk_size = chunk_size as usize; - - if chunk_size == 0 { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "Chunk size must be greater than 0", - ) - .into()); - } - - let mut chunks = Vec::new(); - let mut start = 0; - let total_size = data.len(); - - while start < total_size { - let end = (start + chunk_size).min(total_size); - let chunk_data = data[start..end].to_vec(); - - let chunk = crate::store::create_chunk(chunk_data); - chunks.push(chunk); - - start = end; - } - - Ok(ChunkingResult { - chunks, - total_size: total_size as u64, - }) -} - -/// Split file using fixed-size chunking -pub async fn write_file_fixed<I: Into<PathBuf>>( - file_to_write: I, - storage_dir: I, - output_index_file: I, - fixed_size: u32, -) -> Result<(), StorageIOError> { - let config = StorageConfig::fixed_size(fixed_size); - write_file_parallel(file_to_write, storage_dir, output_index_file, &config).await -} - -/// Split file using fixed-size chunking with parallel processing -async fn write_file_parallel( - file_to_write: impl Into<PathBuf>, - storage_dir: impl Into<PathBuf>, - output_index_file: impl Into<PathBuf>, - cfg: &StorageConfig, -) -> Result<(), StorageIOError> { - let (file_to_write, storage_dir, output_index_file) = - precheck(file_to_write, storage_dir, output_index_file).await?; - - info!("Starting file write: {}", file_to_write.display()); - let start_time = Instant::now(); - - // Memory map the entire file - let file = std::fs::File::open(&file_to_write)?; - let mmap = unsafe { Mmap::map(&file)? }; - let data = &mmap[..]; - - // Split into chunks based on policy - let chunking_result = split_into_chunks_parallel(&data, &cfg.chunking_policy).await?; - - // Store chunks in parallel and create index - let index_entries = store_chunks_parallel(&chunking_result.chunks, &storage_dir).await?; - - // Write index file - write_index_file(&index_entries, &output_index_file).await?; - - let duration = start_time.elapsed(); - info!( - "File write completed in {:?}: {}", - duration, - file_to_write.display() - ); - - Ok(()) -} - -/// Split data into chunks based on the specified policy with parallel processing -async fn split_into_chunks_parallel( - data: &[u8], - policy: &crate::store::ChunkingPolicy, -) -> Result<ChunkingResult, StorageIOError> { - match policy { - crate::store::ChunkingPolicy::FixedSize(chunk_size) => { - split_fixed_parallel(data, *chunk_size).await - } - _ => split_fixed_impl(data, 64 * 1024), // Fallback for non-fixed chunking - } -} - -/// Split data using fixed-size chunking with parallel processing -async fn split_fixed_parallel( - data: &[u8], - chunk_size: u32, -) -> Result<ChunkingResult, StorageIOError> { - let chunk_size = chunk_size as usize; - - if chunk_size == 0 { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "Chunk size must be greater than 0", - ) - .into()); - } - - let total_size = data.len(); - let num_chunks = (total_size + chunk_size - 1) / chunk_size; // Ceiling division - - // Create a vector to hold chunk boundaries - let mut chunk_boundaries = Vec::with_capacity(num_chunks); - let mut start = 0; - - while start < total_size { - let end = (start + chunk_size).min(total_size); - chunk_boundaries.push((start, end)); - start = end; - } - - // Process chunks in parallel using Tokio tasks - let chunks: Vec<crate::store::Chunk> = if chunk_boundaries.len() > 1 { - // Use parallel processing for multiple chunks - let mut tasks = Vec::with_capacity(chunk_boundaries.len()); - - for (start, end) in chunk_boundaries { - let chunk_data = data[start..end].to_vec(); - - // Spawn a blocking task for each chunk - tasks.push(task::spawn_blocking(move || create_chunk(chunk_data))); - } - - // Wait for all tasks to complete - let mut chunks = Vec::with_capacity(tasks.len()); - for task in tasks { - match task.await { - Ok(chunk) => chunks.push(chunk), - Err(e) => { - return Err(std::io::Error::new( - std::io::ErrorKind::Other, - format!("Task join error: {}", e), - ) - .into()); - } - } - } - - chunks - } else { - // Single chunk, no need for parallel processing - chunk_boundaries - .into_iter() - .map(|(start, end)| { - let chunk_data = data[start..end].to_vec(); - create_chunk(chunk_data) - }) - .collect() - }; - - Ok(ChunkingResult { - chunks, - total_size: total_size as u64, - }) -} - -/// Store chunks in the storage directory in parallel and return index entries -async fn store_chunks_parallel( - chunks: &[crate::store::Chunk], - storage_dir: &std::path::Path, -) -> Result<Vec<IndexEntry>, StorageIOError> { - let mut tasks = Vec::with_capacity(chunks.len()); - - let writed_counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); - let total_chunks = chunks.len(); - - for chunk in chunks { - let chunk = chunk.clone(); - let storage_dir = storage_dir.to_path_buf(); - let writed_counter = writed_counter.clone(); - - // Spawn async task for each chunk storage operation - tasks.push(task::spawn(async move { - // Create storage directory structure based on hash - let hash_str = hex::encode(chunk.hash); - let chunk_dir = get_dir(storage_dir, hash_str.clone())?; - - // Create directory if it doesn't exist - if let Some(parent) = chunk_dir.parent() { - fs::create_dir_all(parent).await?; - } - - // Write chunk data - let chunk_path = chunk_dir.with_extension("chunk"); - if !chunk_path.exists() { - trace!("W: {}", hash_str); - fs::write(&chunk_path, &chunk.data).await?; - writed_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - } - - Ok::<IndexEntry, StorageIOError>(IndexEntry { - hash: chunk.hash, - size: chunk.data.len() as u32, - }) - })); - } - - let writed_count = writed_counter.load(std::sync::atomic::Ordering::Relaxed); - info!( - "Chunk storage completed: {}/{} ({}%) chunks written ({} duplicates skipped)", - writed_count, - total_chunks, - (writed_count as f32 / total_chunks as f32) * 100 as f32, - total_chunks - writed_count - ); - - // Wait for all tasks to complete - let mut index_entries = Vec::with_capacity(chunks.len()); - for task in tasks { - let entry = task.await.map_err(|e| { - std::io::Error::new(std::io::ErrorKind::Other, format!("Task join error: {}", e)) - })??; - index_entries.push(entry); - } - - Ok(index_entries) -} - -/// Write index file containing chunk hashes and sizes -async fn write_index_file( - entries: &[IndexEntry], - output_path: &std::path::Path, -) -> Result<(), StorageIOError> { - let mut index_data = Vec::with_capacity(entries.len() * 36); // 32 bytes hash + 4 bytes size - - for entry in entries { - index_data.extend_from_slice(&entry.hash); - index_data.extend_from_slice(&entry.size.to_le_bytes()); - } - - fs::write(output_path, &index_data).await?; - Ok(()) -} - -/// Utility function to calculate optimal fixed chunk size based on file size -pub fn calculate_optimal_chunk_size(file_size: u64, target_chunks: usize) -> u32 { - if target_chunks == 0 || file_size == 0 { - return 64 * 1024; // Default 64KB - } - - let chunk_size = (file_size as f64 / target_chunks as f64).ceil() as u32; - - // Round to nearest power of 2 for better performance - let rounded_size = if chunk_size <= 1024 { - // Small chunks: use exact size - chunk_size - } else { - // Larger chunks: round to nearest power of 2 - let mut size = chunk_size; - size -= 1; - size |= size >> 1; - size |= size >> 2; - size |= size >> 4; - size |= size >> 8; - size |= size >> 16; - size += 1; - size - }; - - // Ensure minimum and maximum bounds - rounded_size.max(1024).min(16 * 1024 * 1024) // 1KB min, 16MB max -} - -/// Split file with automatic chunk size calculation -pub async fn write_file_fixed_auto<I: Into<PathBuf>, J: Into<PathBuf>, K: Into<PathBuf>>( - file_to_write: I, - storage_dir: J, - output_index_file: K, - target_chunks: usize, -) -> Result<(), StorageIOError> { - let file_path = file_to_write.into(); - let storage_dir = storage_dir.into(); - let output_index_file = output_index_file.into(); - - let file_size = fs::metadata(&file_path).await?.len(); - - let chunk_size = calculate_optimal_chunk_size(file_size, target_chunks); - write_file_fixed(file_path, storage_dir, output_index_file, chunk_size).await -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_fixed_chunking_basic() { - // Create 10KB of test data - let data: Vec<u8> = (0..10240).map(|i| (i % 256) as u8).collect(); - - // Split into 1KB chunks - let result = split_fixed_impl(&data, 1024).unwrap(); - - // Should have 10 chunks - assert_eq!(result.chunks.len(), 10); - - // Verify total size - let total_chunk_size: usize = result.chunks.iter().map(|c| c.data.len()).sum(); - assert_eq!(total_chunk_size, data.len()); - - // Verify chunk sizes (last chunk may be smaller) - for (i, chunk) in result.chunks.iter().enumerate() { - if i < 9 { - assert_eq!(chunk.data.len(), 1024); - } else { - assert_eq!(chunk.data.len(), 1024); // 10240 / 1024 = 10 exactly - } - } - } - - #[test] - fn test_fixed_chunking_uneven() { - // Create 5.5KB of test data - let data: Vec<u8> = (0..5632).map(|i| (i % 256) as u8).collect(); - - // Split into 2KB chunks - let result = split_fixed_impl(&data, 2048).unwrap(); - - // Should have 3 chunks (2048 + 2048 + 1536) - assert_eq!(result.chunks.len(), 3); - - // Verify chunk sizes - assert_eq!(result.chunks[0].data.len(), 2048); - assert_eq!(result.chunks[1].data.len(), 2048); - assert_eq!(result.chunks[2].data.len(), 1536); - - // Verify data integrity - let mut reconstructed = Vec::new(); - for chunk in &result.chunks { - reconstructed.extend_from_slice(&chunk.data); - } - assert_eq!(reconstructed, data); - } - - #[test] - fn test_fixed_chunking_small_file() { - // Small file smaller than chunk size - let data = vec![1, 2, 3, 4, 5]; - - let result = split_fixed_impl(&data, 1024).unwrap(); - - // Should have exactly one chunk - assert_eq!(result.chunks.len(), 1); - assert_eq!(result.chunks[0].data.len(), data.len()); - } - - #[test] - fn test_fixed_chunking_zero_size() { - let data = vec![1, 2, 3]; - - let result = split_fixed_impl(&data, 0); - assert!(result.is_err()); - - match result { - Err(StorageIOError::IOErr(e)) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput); - } - _ => panic!("Expected IOErr with InvalidInput"), - } - } - - #[test] - fn test_calculate_optimal_chunk_size() { - // Test basic calculation - assert_eq!(calculate_optimal_chunk_size(1024 * 1024, 16), 64 * 1024); // 1MB / 16 = 64KB - - // Test rounding to power of 2 - assert_eq!(calculate_optimal_chunk_size(1000 * 1000, 17), 64 * 1024); // ~58.8KB rounds to 64KB - - // Test minimum bound - assert_eq!(calculate_optimal_chunk_size(100, 10), 1024); // 10 bytes per chunk, but min is 1KB - - // Test edge cases - assert_eq!(calculate_optimal_chunk_size(0, 10), 64 * 1024); // Default - assert_eq!(calculate_optimal_chunk_size(1000, 0), 64 * 1024); // Default - - // Test large file - assert_eq!( - calculate_optimal_chunk_size(100 * 1024 * 1024, 10), - 16 * 1024 * 1024 - ); // 100MB / 10 = 10MB, max is 16MB - } - - #[test] - fn test_chunk_hash_uniqueness() { - // Test that different data produces different hashes - let data1 = vec![1, 2, 3, 4, 5]; - let data2 = vec![1, 2, 3, 4, 6]; - - let result1 = split_fixed_impl(&data1, 1024).unwrap(); - let result2 = split_fixed_impl(&data2, 1024).unwrap(); - - assert_ne!(result1.chunks[0].hash, result2.chunks[0].hash); - } -} diff --git a/systems/storage/src/store/line.rs b/systems/storage/src/store/line.rs deleted file mode 100644 index 971018b..0000000 --- a/systems/storage/src/store/line.rs +++ /dev/null @@ -1,393 +0,0 @@ -use std::path::PathBuf; - -use crate::{error::StorageIOError, store::ChunkingResult}; - -/// Split data by lines (newline characters) -pub fn split_by_lines_impl(data: &[u8]) -> Result<ChunkingResult, StorageIOError> { - let mut chunks = Vec::new(); - let mut start = 0; - let total_size = data.len(); - - // Iterate through data to find line boundaries - let mut i = 0; - while i < data.len() { - if data[i] == b'\n' { - // Unix line ending - let line_end = i + 1; // Include \n - // Extract line data (include newline character) - let line_data = data[start..line_end].to_vec(); - - // Create chunk for this line - let chunk = crate::store::create_chunk(line_data); - chunks.push(chunk); - - // Move start to next line - start = line_end; - i = line_end; - } else if data[i] == b'\r' && i + 1 < data.len() && data[i + 1] == b'\n' { - // Windows line ending - let line_end = i + 2; // Include both \r and \n - // Extract line data (include newline characters) - let line_data = data[start..line_end].to_vec(); - - // Create chunk for this line - let chunk = crate::store::create_chunk(line_data); - chunks.push(chunk); - - // Move start to next line - start = line_end; - i = line_end; - } else { - i += 1; - } - } - - // Handle remaining data (last line without newline) - if start < total_size { - let line_data = data[start..].to_vec(); - let chunk = crate::store::create_chunk(line_data); - chunks.push(chunk); - } - - // Handle empty file (no lines) - if chunks.is_empty() && total_size == 0 { - let chunk = crate::store::create_chunk(Vec::new()); - chunks.push(chunk); - } - - Ok(ChunkingResult { - chunks, - total_size: total_size as u64, - }) -} - -/// Split file by lines -pub async fn write_file_line<I: Into<PathBuf>>( - file_to_write: I, - storage_dir: I, - output_index_file: I, -) -> Result<(), StorageIOError> { - use crate::store::{StorageConfig, write_file}; - - let config = StorageConfig::line(); - write_file(file_to_write, storage_dir, output_index_file, &config).await -} - -/// Utility function to split data by lines with custom line ending detection -pub fn split_by_lines_custom<E: LineEnding>(data: &[u8]) -> Result<ChunkingResult, StorageIOError> { - let mut chunks = Vec::new(); - let mut start = 0; - let total_size = data.len(); - - let mut i = 0; - while i < total_size { - if E::is_line_ending(data, i) { - let line_end = i + E::ending_length(data, i); - let line_data = data[start..line_end].to_vec(); - - let chunk = crate::store::create_chunk(line_data); - chunks.push(chunk); - - start = line_end; - i = line_end; - } else { - i += 1; - } - } - - // Handle remaining data - if start < total_size { - let line_data = data[start..].to_vec(); - let chunk = crate::store::create_chunk(line_data); - chunks.push(chunk); - } - - // Handle empty file - if chunks.is_empty() && total_size == 0 { - let chunk = crate::store::create_chunk(Vec::new()); - chunks.push(chunk); - } - - Ok(ChunkingResult { - chunks, - total_size: total_size as u64, - }) -} - -/// Trait for different line ending types -pub trait LineEnding { - /// Check if position i is the start of a line ending - fn is_line_ending(data: &[u8], i: usize) -> bool; - - /// Get the length of the line ending at position i - fn ending_length(data: &[u8], i: usize) -> usize; -} - -/// Unix line endings (\n) -pub struct UnixLineEnding; - -impl LineEnding for UnixLineEnding { - fn is_line_ending(data: &[u8], i: usize) -> bool { - i < data.len() && data[i] == b'\n' - } - - fn ending_length(_data: &[u8], _i: usize) -> usize { - 1 - } -} - -/// Windows line endings (\r\n) -pub struct WindowsLineEnding; - -impl LineEnding for WindowsLineEnding { - fn is_line_ending(data: &[u8], i: usize) -> bool { - i + 1 < data.len() && data[i] == b'\r' && data[i + 1] == b'\n' - } - - fn ending_length(_data: &[u8], _i: usize) -> usize { - 2 - } -} - -/// Mixed line endings (detects both Unix and Windows) -pub struct MixedLineEnding; - -impl LineEnding for MixedLineEnding { - fn is_line_ending(data: &[u8], i: usize) -> bool { - if i < data.len() && data[i] == b'\n' { - true - } else if i + 1 < data.len() && data[i] == b'\r' && data[i + 1] == b'\n' { - true - } else { - false - } - } - - fn ending_length(data: &[u8], i: usize) -> usize { - if i < data.len() && data[i] == b'\n' { - 1 - } else if i + 1 < data.len() && data[i] == b'\r' && data[i + 1] == b'\n' { - 2 - } else { - 1 // Default to 1 if somehow called incorrectly - } - } -} - -/// Detect line ending type from data -pub fn detect_line_ending(data: &[u8]) -> LineEndingType { - let mut unix_count = 0; - let mut windows_count = 0; - - let mut i = 0; - while i < data.len() { - if data[i] == b'\n' { - unix_count += 1; - i += 1; - } else if i + 1 < data.len() && data[i] == b'\r' && data[i + 1] == b'\n' { - windows_count += 1; - i += 2; - } else { - i += 1; - } - } - - if unix_count > windows_count { - LineEndingType::Unix - } else if windows_count > unix_count { - LineEndingType::Windows - } else { - LineEndingType::Mixed - } -} - -/// Line ending type enum -#[derive(Debug, Clone, Copy, PartialEq)] -pub enum LineEndingType { - Unix, - Windows, - Mixed, -} - -impl LineEndingType { - /// Split data using the detected line ending type - pub fn split_by_lines(&self, data: &[u8]) -> Result<ChunkingResult, StorageIOError> { - match self { - LineEndingType::Unix => split_by_lines_custom::<UnixLineEnding>(data), - LineEndingType::Windows => split_by_lines_custom::<WindowsLineEnding>(data), - LineEndingType::Mixed => split_by_lines_custom::<MixedLineEnding>(data), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_line_chunking_unix() { - let data = b"Hello\nWorld\nTest\n"; - - let result = split_by_lines_impl(data).unwrap(); - - // Should have 3 chunks - assert_eq!(result.chunks.len(), 3); - - // Verify chunk contents - assert_eq!(result.chunks[0].data, b"Hello\n"); - assert_eq!(result.chunks[1].data, b"World\n"); - assert_eq!(result.chunks[2].data, b"Test\n"); - - // Verify total size - let total_chunk_size: usize = result.chunks.iter().map(|c| c.data.len()).sum(); - assert_eq!(total_chunk_size, data.len()); - } - - #[test] - fn test_line_chunking_windows() { - let data = b"Hello\r\nWorld\r\nTest\r\n"; - - let result = split_by_lines_impl(data).unwrap(); - - // Should have 3 chunks - assert_eq!(result.chunks.len(), 3); - - // Verify chunk contents (should include \r\n) - assert_eq!(result.chunks[0].data, b"Hello\r\n"); - assert_eq!(result.chunks[1].data, b"World\r\n"); - assert_eq!(result.chunks[2].data, b"Test\r\n"); - } - - #[test] - fn test_line_chunking_mixed() { - let data = b"Hello\nWorld\r\nTest\n"; - - let result = split_by_lines_impl(data).unwrap(); - - // Should have 3 chunks - assert_eq!(result.chunks.len(), 3); - - // Verify chunk contents - assert_eq!(result.chunks[0].data, b"Hello\n"); - assert_eq!(result.chunks[1].data, b"World\r\n"); - assert_eq!(result.chunks[2].data, b"Test\n"); - } - - #[test] - fn test_line_chunking_no_trailing_newline() { - let data = b"Hello\nWorld\nTest"; - - let result = split_by_lines_impl(data).unwrap(); - - // Should have 3 chunks - assert_eq!(result.chunks.len(), 3); - - // Verify chunk contents - assert_eq!(result.chunks[0].data, b"Hello\n"); - assert_eq!(result.chunks[1].data, b"World\n"); - assert_eq!(result.chunks[2].data, b"Test"); - } - - #[test] - fn test_line_chunking_empty_lines() { - let data = b"Hello\n\nWorld\n\n\n"; - - let result = split_by_lines_impl(data).unwrap(); - - // Should have 5 chunks (including empty lines) - // "Hello\n", "\n", "World\n", "\n", "\n" - assert_eq!(result.chunks.len(), 5); - - // Verify chunk contents - assert_eq!(result.chunks[0].data, b"Hello\n"); - assert_eq!(result.chunks[1].data, b"\n"); - assert_eq!(result.chunks[2].data, b"World\n"); - assert_eq!(result.chunks[3].data, b"\n"); - assert_eq!(result.chunks[4].data, b"\n"); - } - - #[test] - fn test_line_chunking_empty_file() { - let data = b""; - - let result = split_by_lines_impl(data).unwrap(); - - // Should have 1 empty chunk - assert_eq!(result.chunks.len(), 1); - assert_eq!(result.chunks[0].data, b""); - } - - #[test] - fn test_detect_line_ending() { - // Test Unix detection - let unix_data = b"Hello\nWorld\n"; - assert_eq!(detect_line_ending(unix_data), LineEndingType::Unix); - - // Test Windows detection - let windows_data = b"Hello\r\nWorld\r\n"; - assert_eq!(detect_line_ending(windows_data), LineEndingType::Windows); - - // Test mixed detection - let mixed_data = b"Hello\nWorld\r\n"; - assert_eq!(detect_line_ending(mixed_data), LineEndingType::Mixed); - - // Test no newlines - let no_newlines = b"Hello World"; - assert_eq!(detect_line_ending(no_newlines), LineEndingType::Mixed); - } - - #[test] - fn test_custom_line_ending_unix() { - let data = b"Hello\nWorld\n"; - - let result = split_by_lines_custom::<UnixLineEnding>(data).unwrap(); - - assert_eq!(result.chunks.len(), 2); - assert_eq!(result.chunks[0].data, b"Hello\n"); - assert_eq!(result.chunks[1].data, b"World\n"); - } - - #[test] - fn test_custom_line_ending_windows() { - let data = b"Hello\r\nWorld\r\n"; - - let result = split_by_lines_custom::<WindowsLineEnding>(data).unwrap(); - - assert_eq!(result.chunks.len(), 2); - assert_eq!(result.chunks[0].data, b"Hello\r\n"); - assert_eq!(result.chunks[1].data, b"World\r\n"); - } - - #[test] - fn test_line_ending_type_split() { - let unix_data = b"Hello\nWorld\n"; - let windows_data = b"Hello\r\nWorld\r\n"; - let mixed_data = b"Hello\nWorld\r\n"; - - // Test Unix - let unix_result = LineEndingType::Unix.split_by_lines(unix_data).unwrap(); - assert_eq!(unix_result.chunks.len(), 2); - - // Test Windows - let windows_result = LineEndingType::Windows - .split_by_lines(windows_data) - .unwrap(); - assert_eq!(windows_result.chunks.len(), 2); - - // Test Mixed - let mixed_result = LineEndingType::Mixed.split_by_lines(mixed_data).unwrap(); - assert_eq!(mixed_result.chunks.len(), 2); - } - - #[test] - fn test_chunk_hash_uniqueness() { - // Test that different lines produce different hashes - let data1 = b"Hello\n"; - let data2 = b"World\n"; - - let result1 = split_by_lines_impl(data1).unwrap(); - let result2 = split_by_lines_impl(data2).unwrap(); - - assert_ne!(result1.chunks[0].hash, result2.chunks[0].hash); - } -} |
