summaryrefslogtreecommitdiff
path: root/systems
diff options
context:
space:
mode:
author魏曹先生 <1992414357@qq.com>2026-03-08 22:48:54 +0800
committer魏曹先生 <1992414357@qq.com>2026-03-08 22:48:54 +0800
commit47e0ffd50427440696c245814517e4f5fa94ed83 (patch)
tree777b1107af04f6b5bcc79673064b1821e1b7f59f /systems
parent90ed18a41fef137ed0637cf9fc6aa667de2c905f (diff)
Move action system to legacy and remove storage system
Diffstat (limited to 'systems')
-rw-r--r--systems/action/Cargo.toml15
-rw-r--r--systems/action/action_macros/Cargo.toml20
-rw-r--r--systems/action/action_macros/src/lib.rs248
-rw-r--r--systems/action/src/action.rs244
-rw-r--r--systems/action/src/action_pool.rs247
-rw-r--r--systems/action/src/lib.rs6
-rw-r--r--systems/storage/Cargo.toml13
-rw-r--r--systems/storage/src/error.rs8
-rw-r--r--systems/storage/src/lib.rs2
-rw-r--r--systems/storage/src/store.rs493
-rw-r--r--systems/storage/src/store/cdc.rs307
-rw-r--r--systems/storage/src/store/fixed.rs417
-rw-r--r--systems/storage/src/store/line.rs393
13 files changed, 0 insertions, 2413 deletions
diff --git a/systems/action/Cargo.toml b/systems/action/Cargo.toml
deleted file mode 100644
index 5317975..0000000
--- a/systems/action/Cargo.toml
+++ /dev/null
@@ -1,15 +0,0 @@
-[package]
-name = "action_system"
-edition = "2024"
-version.workspace = true
-
-[dependencies]
-tcp_connection = { path = "../../utils/tcp_connection" }
-action_system_macros = { path = "action_macros" }
-
-# Serialization
-serde = { version = "1.0.228", features = ["derive"] }
-serde_json = "1.0.145"
-
-# Async & Networking
-tokio = "1.48.0"
diff --git a/systems/action/action_macros/Cargo.toml b/systems/action/action_macros/Cargo.toml
deleted file mode 100644
index 0f209e2..0000000
--- a/systems/action/action_macros/Cargo.toml
+++ /dev/null
@@ -1,20 +0,0 @@
-[package]
-name = "action_system_macros"
-edition = "2024"
-version.workspace = true
-
-[lib]
-proc-macro = true
-
-[dependencies]
-tcp_connection = { path = "../../../utils/tcp_connection" }
-
-just_fmt = "0.1.2"
-
-syn = { version = "2.0", features = ["full", "extra-traits"] }
-quote = "1.0"
-proc-macro2 = "1.0"
-
-# Serialization
-serde = { version = "1.0.228", features = ["derive"] }
-serde_json = "1.0.145"
diff --git a/systems/action/action_macros/src/lib.rs b/systems/action/action_macros/src/lib.rs
deleted file mode 100644
index 6da0339..0000000
--- a/systems/action/action_macros/src/lib.rs
+++ /dev/null
@@ -1,248 +0,0 @@
-use proc_macro::TokenStream;
-use quote::quote;
-use syn::{ItemFn, parse_macro_input};
-
-/// # Macro - Generate Action
-///
-/// When annotating a function with the `#[action_gen]` macro in the following format, it generates boilerplate code for client-server interaction
-///
-/// ```ignore
-/// #[action_gen]
-/// async fn action_name(ctx: ActionContext, argument: YourArgument) -> Result<YourResult, TcpTargetError> {
-/// // Write your client and server logic here
-/// if ctx.is_proc_on_remote() {
-/// // Server logic
-/// }
-/// if ctx.is_proc_on_local() {
-/// // Client logic
-/// }
-/// }
-/// ```
-///
-/// > WARNING:
-/// > For Argument and Result types, the `action_gen` macro only supports types that derive serde's Serialize and Deserialize
-///
-/// ## Generated Code
-///
-/// `action_gen` will generate the following:
-///
-/// 1. Complete implementation of Action<Args, Return>
-/// 2. Process / Register method
-///
-/// ## How to use
-///
-/// You can use your generated method as follows
-///
-/// ```ignore
-/// async fn main() -> Result<(), TcpTargetError> {
-///
-/// // Prepare your argument
-/// let args = YourArgument::default();
-///
-/// // Create a pool and context
-/// let mut pool = ActionPool::new();
-/// let ctx = ActionContext::local();
-///
-/// // Register your action
-/// register_your_action(&mut pool);
-///
-/// // Process your action
-/// proc_your_action(&pool, ctx, args).await?;
-///
-/// Ok(())
-/// }
-/// ```
-#[proc_macro_attribute]
-pub fn action_gen(attr: TokenStream, item: TokenStream) -> TokenStream {
- let input_fn = parse_macro_input!(item as ItemFn);
- let is_local = if attr.is_empty() {
- false
- } else {
- let attr_str = attr.to_string();
- attr_str == "local" || attr_str.contains("local")
- };
-
- generate_action_struct(input_fn, is_local).into()
-}
-
-fn generate_action_struct(input_fn: ItemFn, _is_local: bool) -> proc_macro2::TokenStream {
- let fn_vis = &input_fn.vis;
- let fn_sig = &input_fn.sig;
- let fn_name = &fn_sig.ident;
- let fn_block = &input_fn.block;
-
- validate_function_signature(fn_sig);
-
- let (context_param_name, arg_param_name, arg_type, return_type) =
- extract_parameters_and_types(fn_sig);
-
- let struct_name = quote::format_ident!("{}", convert_to_pascal_case(&fn_name.to_string()));
-
- let action_name_ident = &fn_name;
-
- let register_this_action = quote::format_ident!("register_{}", action_name_ident);
- let proc_this_action = quote::format_ident!("proc_{}", action_name_ident);
-
- quote! {
- #[derive(Debug, Clone, Default)]
- #fn_vis struct #struct_name;
-
- impl action_system::action::Action<#arg_type, #return_type> for #struct_name {
- fn action_name() -> &'static str {
- Box::leak(just_fmt::snake_case!(stringify!(#action_name_ident)).into_boxed_str())
- }
-
- fn is_remote_action() -> bool {
- !#_is_local
- }
-
- async fn process(#context_param_name: action_system::action::ActionContext, #arg_param_name: #arg_type) -> Result<#return_type, tcp_connection::error::TcpTargetError> {
- #fn_block
- }
- }
-
- #fn_vis fn #register_this_action(pool: &mut action_system::action_pool::ActionPool) {
- pool.register::<#struct_name, #arg_type, #return_type>();
- }
-
- #fn_vis async fn #proc_this_action(
- pool: &action_system::action_pool::ActionPool,
- mut ctx: action_system::action::ActionContext,
- #arg_param_name: #arg_type
- ) -> Result<#return_type, tcp_connection::error::TcpTargetError> {
- ctx.set_is_remote_action(!#_is_local);
- let args_json = serde_json::to_string(&#arg_param_name)
- .map_err(|e| {
- tcp_connection::error::TcpTargetError::Serialization(e.to_string())
- })?;
- let result_json = pool.process_json(
- Box::leak(just_fmt::snake_case!(stringify!(#action_name_ident)).into_boxed_str()), // LEAK??? OH SHIT
- ctx,
- args_json,
- ).await?;
- serde_json::from_str(&result_json)
- .map_err(|e| {
- tcp_connection::error::TcpTargetError::Serialization(e.to_string())
- })
- }
-
- #[allow(dead_code)]
- #[deprecated = "This function is used by #[action_gen] as a template."]
- #[doc = "Template function for #[[action_gen]] - do not call directly."]
- #[doc = "Use the generated struct instead."]
- #[doc = ""]
- #[doc = "Register the action to the pool."]
- #[doc = "```ignore"]
- #[doc = "register_your_func(&mut pool);"]
- #[doc = "```"]
- #[doc = ""]
- #[doc = "Process the action at the pool."]
- #[doc = "```ignore"]
- #[doc = "let result = proc_your_func(&pool, ctx, arg).await?;"]
- #[doc = "```"]
- #fn_vis #fn_sig #fn_block
- }
-}
-
-fn validate_function_signature(fn_sig: &syn::Signature) {
- if fn_sig.asyncness.is_none() {
- panic!("Expected async function for Action, but found synchronous function");
- }
-
- if fn_sig.inputs.len() != 2 {
- panic!(
- "Expected exactly 2 arguments for Action function: ctx: ActionContext and arg: T, but found {} arguments",
- fn_sig.inputs.len()
- );
- }
-
- let return_type = match &fn_sig.output {
- syn::ReturnType::Type(_, ty) => ty,
- _ => panic!(
- "Expected Action function to return Result<T, TcpTargetError>, but found no return type"
- ),
- };
-
- if let syn::Type::Path(type_path) = return_type.as_ref() {
- if let Some(segment) = type_path.path.segments.last()
- && segment.ident != "Result"
- {
- panic!(
- "Expected Action function to return Result<T, TcpTargetError>, but found different return type"
- );
- }
- } else {
- panic!(
- "Expected Action function to return Result<T, TcpTargetError>, but found no return type"
- );
- }
-}
-
-fn convert_to_pascal_case(s: &str) -> String {
- s.split('_')
- .map(|word| {
- let mut chars = word.chars();
- match chars.next() {
- None => String::new(),
- Some(first) => first.to_uppercase().collect::<String>() + chars.as_str(),
- }
- })
- .collect()
-}
-
-fn extract_parameters_and_types(
- fn_sig: &syn::Signature,
-) -> (
- proc_macro2::TokenStream,
- proc_macro2::TokenStream,
- proc_macro2::TokenStream,
- proc_macro2::TokenStream,
-) {
- let mut inputs = fn_sig.inputs.iter();
-
- let context_param = match inputs.next() {
- Some(syn::FnArg::Typed(pat_type)) => {
- let pat = &pat_type.pat;
- quote::quote!(#pat)
- }
- _ => {
- panic!("Expected the first argument to be a typed parameter, but found something else")
- }
- };
-
- let arg_param = match inputs.next() {
- Some(syn::FnArg::Typed(pat_type)) => {
- let pat = &pat_type.pat;
- let ty = &pat_type.ty;
- (quote::quote!(#pat), quote::quote!(#ty))
- }
- _ => {
- panic!("Expected the second argument to be a typed parameter, but found something else")
- }
- };
-
- let (arg_param_name, arg_type) = arg_param;
-
- let return_type = match &fn_sig.output {
- syn::ReturnType::Type(_, ty) => {
- if let syn::Type::Path(type_path) = ty.as_ref() {
- if let syn::PathArguments::AngleBracketed(args) =
- &type_path.path.segments.last().unwrap().arguments
- {
- if let Some(syn::GenericArgument::Type(ty)) = args.args.first() {
- quote::quote!(#ty)
- } else {
- panic!("Expected to extract the success type of Result, but failed");
- }
- } else {
- panic!("Expected Result type to have generic parameters, but found none");
- }
- } else {
- panic!("Expected return type to be Result, but found different type");
- }
- }
- _ => panic!("Expected function to have return type, but found none"),
- };
-
- (context_param, arg_param_name, arg_type, return_type)
-}
diff --git a/systems/action/src/action.rs b/systems/action/src/action.rs
deleted file mode 100644
index 62425ff..0000000
--- a/systems/action/src/action.rs
+++ /dev/null
@@ -1,244 +0,0 @@
-use serde::{Serialize, de::DeserializeOwned};
-use std::any::{Any, TypeId};
-use std::collections::HashMap;
-use std::sync::Arc;
-use tcp_connection::{error::TcpTargetError, instance::ConnectionInstance};
-use tokio::{net::TcpStream, sync::Mutex};
-
-/// # Trait - Action<Args, Return>
-///
-/// A trait used to describe the interaction pattern between client and server
-///
-/// ## Generics
-///
-/// Args: Represents the parameter type required for this action
-///
-/// Return: Represents the return type of this action
-///
-/// The above generics must implement serde's Serialize and DeserializeOwned traits,
-/// and must be sendable between threads
-///
-/// ## Implementation
-///
-/// ```ignore
-/// pub trait Action<Args, Return>
-/// where
-/// Args: Serialize + DeserializeOwned + Send,
-/// Return: Serialize + DeserializeOwned + Send,
-/// {
-/// /// Name, used to inform the server which action to execute
-/// fn action_name() -> &'static str;
-///
-/// /// Whether it's a local Action, used to inform the system if it only runs locally
-/// fn is_remote_action() -> bool;
-///
-/// /// Action processing logic
-/// fn process(
-/// context: ActionContext,
-/// args: Args,
-/// ) -> impl std::future::Future<Output = Result<Return, TcpTargetError>> + Send;
-/// }
-/// ```
-pub trait Action<Args, Return>
-where
- Args: Serialize + DeserializeOwned + Send,
- Return: Serialize + DeserializeOwned + Send,
-{
- fn action_name() -> &'static str;
-
- fn is_remote_action() -> bool;
-
- fn process(
- context: ActionContext,
- args: Args,
- ) -> impl std::future::Future<Output = Result<Return, TcpTargetError>> + Send;
-}
-
-/// # Struct - ActionContext
-///
-/// Used to inform the Action about the current execution environment
-///
-/// ## Creation
-///
-/// Create ActionContext using the following methods:
-///
-/// ```ignore
-///
-/// // The instance here is the connection instance passed from external sources for communicating with the server
-/// // For specific usage, please refer to the `/crates/utils/tcp_connection` section
-///
-/// fn init_local_action_ctx(instance: ConnectionInstance) {
-/// // Create context and specify execution on local
-/// let mut ctx = ActionContext::local();
-/// }
-///
-/// fn init_remote_action_ctx(instance: ConnectionInstance) {
-/// // Create context and specify execution on remote
-/// let mut ctx = ActionContext::remote();
-/// }
-#[derive(Default)]
-pub struct ActionContext {
- /// Whether the action is executed locally or remotely
- proc_on_local: bool,
-
- /// Whether the action being executed in the current context is a remote action
- is_remote_action: bool,
-
- /// The name of the action being executed
- action_name: String,
-
- /// The JSON-serialized arguments for the action
- action_args_json: String,
-
- /// The connection instance in the current context,
- instance: Option<Arc<Mutex<ConnectionInstance>>>,
-
- /// Generic data storage for arbitrary types
- data: HashMap<TypeId, Arc<dyn Any + Send + Sync>>,
-}
-
-impl ActionContext {
- /// Generate local context
- pub fn local() -> Self {
- ActionContext {
- proc_on_local: true,
- ..Default::default()
- }
- }
-
- /// Generate remote context
- pub fn remote() -> Self {
- ActionContext {
- proc_on_local: false,
- ..Default::default()
- }
- }
-
- /// Build connection instance from TcpStream
- pub fn build_instance(mut self, stream: TcpStream) -> Self {
- self.instance = Some(Arc::new(Mutex::new(ConnectionInstance::from(stream))));
- self
- }
-
- /// Insert connection instance into context
- pub fn insert_instance(mut self, instance: ConnectionInstance) -> Self {
- self.instance = Some(Arc::new(Mutex::new(instance)));
- self
- }
-
- /// Pop connection instance from context
- pub fn pop_instance(&mut self) -> Option<Arc<Mutex<ConnectionInstance>>> {
- self.instance.take()
- }
-}
-
-impl ActionContext {
- /// Whether the action is executed locally
- pub fn is_proc_on_local(&self) -> bool {
- self.proc_on_local
- }
-
- /// Whether the action is executed remotely
- pub fn is_proc_on_remote(&self) -> bool {
- !self.proc_on_local
- }
-
- /// Whether the action being executed in the current context is a remote action
- pub fn is_remote_action(&self) -> bool {
- self.is_remote_action
- }
-
- /// Set whether the action being executed in the current context is a remote action
- pub fn set_is_remote_action(&mut self, is_remote_action: bool) {
- self.is_remote_action = is_remote_action;
- }
-
- /// Get the connection instance in the current context
- pub fn instance(&self) -> &Option<Arc<Mutex<ConnectionInstance>>> {
- &self.instance
- }
-
- /// Get a mutable reference to the connection instance in the current context
- pub fn instance_mut(&mut self) -> &mut Option<Arc<Mutex<ConnectionInstance>>> {
- &mut self.instance
- }
-
- /// Get the action name from the context
- pub fn action_name(&self) -> &str {
- &self.action_name
- }
-
- /// Get the action arguments from the context
- pub fn action_args_json(&self) -> &String {
- &self.action_args_json
- }
-
- /// Set the action name in the context
- pub fn set_action_name(mut self, action_name: String) -> Self {
- self.action_name = action_name;
- self
- }
-
- /// Set the action arguments in the context
- pub fn set_action_args(mut self, action_args: String) -> Self {
- self.action_args_json = action_args;
- self
- }
-
- /// Insert arbitrary data in the context
- pub fn with_data<T: Any + Send + Sync>(mut self, value: T) -> Self {
- self.data.insert(TypeId::of::<T>(), Arc::new(value));
- self
- }
-
- /// Insert arbitrary data as Arc in the context
- pub fn with_arc_data<T: Any + Send + Sync>(mut self, value: Arc<T>) -> Self {
- self.data.insert(TypeId::of::<T>(), value);
- self
- }
-
- /// Insert arbitrary data in the context
- pub fn insert_data<T: Any + Send + Sync>(&mut self, value: T) {
- self.data.insert(TypeId::of::<T>(), Arc::new(value));
- }
-
- /// Insert arbitrary data as Arc in the context
- pub fn insert_arc_data<T: Any + Send + Sync>(&mut self, value: Arc<T>) {
- self.data.insert(TypeId::of::<T>(), value);
- }
-
- /// Get arbitrary data from the context
- pub fn get<T: Any + Send + Sync>(&self) -> Option<&T> {
- self.data
- .get(&TypeId::of::<T>())
- .and_then(|arc| arc.downcast_ref::<T>())
- }
-
- /// Get arbitrary data as Arc from the context
- pub fn get_arc<T: Any + Send + Sync>(&self) -> Option<Arc<T>> {
- self.data
- .get(&TypeId::of::<T>())
- .and_then(|arc| Arc::clone(arc).downcast::<T>().ok())
- }
-
- /// Remove and return arbitrary data from the context
- pub fn remove<T: Any + Send + Sync>(&mut self) -> Option<Arc<T>> {
- self.data
- .remove(&TypeId::of::<T>())
- .and_then(|arc| arc.downcast::<T>().ok())
- }
-
- /// Check if the context contains data of a specific type
- pub fn contains<T: Any + Send + Sync>(&self) -> bool {
- self.data.contains_key(&TypeId::of::<T>())
- }
-
- /// Take ownership of the context and extract data of a specific type
- pub fn take<T: Any + Send + Sync>(mut self) -> (Self, Option<Arc<T>>) {
- let value = self
- .data
- .remove(&TypeId::of::<T>())
- .and_then(|arc| arc.downcast::<T>().ok());
- (self, value)
- }
-}
diff --git a/systems/action/src/action_pool.rs b/systems/action/src/action_pool.rs
deleted file mode 100644
index 019fa6d..0000000
--- a/systems/action/src/action_pool.rs
+++ /dev/null
@@ -1,247 +0,0 @@
-use std::pin::Pin;
-
-use serde::{Serialize, de::DeserializeOwned};
-use serde_json;
-use tcp_connection::error::TcpTargetError;
-
-use crate::action::{Action, ActionContext};
-
-type ProcBeginCallback = for<'a> fn(
- &'a mut ActionContext,
- args: &'a (dyn std::any::Any + Send + Sync),
-) -> ProcBeginFuture<'a>;
-type ProcEndCallback = fn() -> ProcEndFuture;
-
-type ProcBeginFuture<'a> = Pin<Box<dyn Future<Output = Result<(), TcpTargetError>> + Send + 'a>>;
-type ProcEndFuture = Pin<Box<dyn Future<Output = Result<(), TcpTargetError>> + Send>>;
-
-/// # Struct - ActionPool
-///
-/// This struct is used to register and record all accessible and executable actions
-///
-/// It also registers `on_proc_begin` and `on_proc_end` callback functions
-/// used for action initialization
-///
-/// ## Creating and registering actions
-/// ```ignore
-/// fn init_action_pool() {
-/// let mut pool = Action::new();
-///
-/// // Register action
-/// pool.register<YourAction, ActionArgument, ActionReturn>();
-///
-/// // If the action is implemented with `#[action_gen]`, you can also do
-/// register_your_action(&mut pool);
-/// }
-/// ```
-pub struct ActionPool {
- /// HashMap storing action name to action implementation mapping
- actions: std::collections::HashMap<&'static str, Box<dyn ActionErased>>,
-
- /// Callback to execute when process begins
- on_proc_begin: Option<ProcBeginCallback>,
-
- /// Callback to execute when process ends
- on_proc_end: Option<ProcEndCallback>,
-}
-
-impl Default for ActionPool {
- fn default() -> Self {
- Self::new()
- }
-}
-
-impl ActionPool {
- /// Creates a new empty ActionPool
- pub fn new() -> Self {
- Self {
- actions: std::collections::HashMap::new(),
- on_proc_begin: None,
- on_proc_end: None,
- }
- }
-
- /// Sets a callback to be executed when process begins
- pub fn set_on_proc_begin(&mut self, callback: ProcBeginCallback) {
- self.on_proc_begin = Some(callback);
- }
-
- /// Sets a callback to be executed when process ends
- pub fn set_on_proc_end(&mut self, callback: ProcEndCallback) {
- self.on_proc_end = Some(callback);
- }
-
- /// Registers an action type with the pool
- ///
- /// Usage:
- /// ```ignore
- /// action_pool.register::<MyAction, MyArgs, MyReturn>();
- /// ```
- pub fn register<A, Args, Return>(&mut self)
- where
- A: Action<Args, Return> + Send + Sync + 'static,
- Args: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
- Return: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
- {
- let action_name = A::action_name();
- self.actions.insert(
- action_name,
- Box::new(ActionWrapper::<A, Args, Return>(std::marker::PhantomData)),
- );
- }
-
- /// Processes an action by name with given context and arguments
- ///
- /// Usage:
- /// ```ignore
- /// let result = action_pool.process::<MyArgs, MyReturn>("my_action", context, args).await?;
- /// ```
- /// Processes an action by name with JSON-serialized arguments
- ///
- /// Usage:
- /// ```ignore
- /// let result_json = action_pool.process_json("my_action", context, args_json).await?;
- /// let result: MyReturn = serde_json::from_str(&result_json)?;
- /// ```
- pub async fn process_json<'a>(
- &'a self,
- action_name: &'a str,
- context: ActionContext,
- args_json: String,
- ) -> Result<String, TcpTargetError> {
- if let Some(action) = self.actions.get(action_name) {
- // Set action name and args in context for callbacks
- let context = context.set_action_name(action_name.to_string());
- let mut context = context.set_action_args(args_json.clone());
-
- self.exec_on_proc_begin(&mut context, &args_json).await?;
- let result = action.process_json_erased(context, args_json).await?;
- self.exec_on_proc_end().await?;
- Ok(result)
- } else {
- Err(TcpTargetError::Unsupported("InvalidAction".to_string()))
- }
- }
-
- /// Processes an action by name with given context and arguments
- ///
- /// Usage:
- /// ```ignore
- /// let result = action_pool.process::<MyArgs, MyReturn>("my_action", context, args).await?;
- /// ```
- pub async fn process<'a, Args, Return>(
- &'a self,
- action_name: &'a str,
- mut context: ActionContext,
- args: Args,
- ) -> Result<Return, TcpTargetError>
- where
- Args: serde::de::DeserializeOwned + Send + Sync + 'static,
- Return: serde::Serialize + Send + 'static,
- {
- if let Some(action) = self.actions.get(action_name) {
- self.exec_on_proc_begin(&mut context, &args).await?;
- let result = action.process_erased(context, Box::new(args)).await?;
- let result = *result
- .downcast::<Return>()
- .map_err(|_| TcpTargetError::Unsupported("InvalidArguments".to_string()))?;
- self.exec_on_proc_end().await?;
- Ok(result)
- } else {
- Err(TcpTargetError::Unsupported("InvalidAction".to_string()))
- }
- }
-
- /// Executes the process begin callback if set
- async fn exec_on_proc_begin(
- &self,
- context: &mut ActionContext,
- args: &(dyn std::any::Any + Send + Sync),
- ) -> Result<(), TcpTargetError> {
- if let Some(callback) = &self.on_proc_begin {
- callback(context, args).await
- } else {
- Ok(())
- }
- }
-
- /// Executes the process end callback if set
- async fn exec_on_proc_end(&self) -> Result<(), TcpTargetError> {
- if let Some(callback) = &self.on_proc_end {
- callback().await
- } else {
- Ok(())
- }
- }
-}
-
-/// Trait for type-erased actions that can be stored in ActionPool
-type ProcessErasedFuture = std::pin::Pin<
- Box<
- dyn std::future::Future<Output = Result<Box<dyn std::any::Any + Send>, TcpTargetError>>
- + Send,
- >,
->;
-type ProcessJsonErasedFuture =
- std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, TcpTargetError>> + Send>>;
-
-trait ActionErased: Send + Sync {
- /// Processes the action with type-erased arguments and returns type-erased result
- fn process_erased(
- &self,
- context: ActionContext,
- args: Box<dyn std::any::Any + Send>,
- ) -> ProcessErasedFuture;
-
- /// Processes the action with JSON-serialized arguments and returns JSON-serialized result
- fn process_json_erased(
- &self,
- context: ActionContext,
- args_json: String,
- ) -> ProcessJsonErasedFuture;
-}
-
-/// Wrapper struct that implements ActionErased for concrete Action types
-struct ActionWrapper<A, Args, Return>(std::marker::PhantomData<(A, Args, Return)>);
-
-impl<A, Args, Return> ActionErased for ActionWrapper<A, Args, Return>
-where
- A: Action<Args, Return> + Send + Sync,
- Args: Serialize + DeserializeOwned + Send + Sync + 'static,
- Return: Serialize + DeserializeOwned + Send + Sync + 'static,
-{
- fn process_erased(
- &self,
- context: ActionContext,
- args: Box<dyn std::any::Any + Send>,
- ) -> std::pin::Pin<
- Box<
- dyn std::future::Future<Output = Result<Box<dyn std::any::Any + Send>, TcpTargetError>>
- + Send,
- >,
- > {
- Box::pin(async move {
- let args = *args
- .downcast::<Args>()
- .map_err(|_| TcpTargetError::Unsupported("InvalidArguments".to_string()))?;
- let result = A::process(context, args).await?;
- Ok(Box::new(result) as Box<dyn std::any::Any + Send>)
- })
- }
-
- fn process_json_erased(
- &self,
- context: ActionContext,
- args_json: String,
- ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, TcpTargetError>> + Send>>
- {
- Box::pin(async move {
- let args: Args = serde_json::from_str(&args_json)
- .map_err(|e| TcpTargetError::Serialization(format!("Deserialize failed: {}", e)))?;
- let result = A::process(context, args).await?;
- let result_json = serde_json::to_string(&result)
- .map_err(|e| TcpTargetError::Serialization(format!("Serialize failed: {}", e)))?;
- Ok(result_json)
- })
- }
-}
diff --git a/systems/action/src/lib.rs b/systems/action/src/lib.rs
deleted file mode 100644
index 12ae999..0000000
--- a/systems/action/src/lib.rs
+++ /dev/null
@@ -1,6 +0,0 @@
-pub mod macros {
- pub use action_system_macros::*;
-}
-
-pub mod action;
-pub mod action_pool;
diff --git a/systems/storage/Cargo.toml b/systems/storage/Cargo.toml
deleted file mode 100644
index 5afb780..0000000
--- a/systems/storage/Cargo.toml
+++ /dev/null
@@ -1,13 +0,0 @@
-[package]
-name = "storage_system"
-edition = "2024"
-version.workspace = true
-
-[dependencies]
-thiserror = "1.0.69"
-tokio = { version = "1.48", features = ["full"] }
-
-blake3 = "1.5.0"
-hex = "0.4.3"
-memmap2 = "0.9.4"
-log = "0.4.25"
diff --git a/systems/storage/src/error.rs b/systems/storage/src/error.rs
deleted file mode 100644
index 4b3ad7e..0000000
--- a/systems/storage/src/error.rs
+++ /dev/null
@@ -1,8 +0,0 @@
-#[derive(Debug, thiserror::Error)]
-pub enum StorageIOError {
- #[error("IO error: {0}")]
- IOErr(#[from] std::io::Error),
-
- #[error("Hash too short")]
- HashTooShort,
-}
diff --git a/systems/storage/src/lib.rs b/systems/storage/src/lib.rs
deleted file mode 100644
index 4f89971..0000000
--- a/systems/storage/src/lib.rs
+++ /dev/null
@@ -1,2 +0,0 @@
-pub mod error;
-pub mod store;
diff --git a/systems/storage/src/store.rs b/systems/storage/src/store.rs
deleted file mode 100644
index 6492449..0000000
--- a/systems/storage/src/store.rs
+++ /dev/null
@@ -1,493 +0,0 @@
-use std::path::{Path, PathBuf};
-use std::time::Instant;
-
-use blake3;
-use log::{info, trace};
-use memmap2::Mmap;
-use tokio::fs;
-
-use crate::error::StorageIOError;
-
-pub mod cdc;
-pub mod fixed;
-pub mod line;
-
-#[derive(Default, Debug)]
-pub struct StorageConfig {
- /// Chunking policy for splitting files
- pub chunking_policy: ChunkingPolicy,
-}
-
-impl StorageConfig {
- /// Create a new StorageConfig with CDC chunking policy
- pub fn cdc(avg_size: u32) -> Self {
- Self {
- chunking_policy: ChunkingPolicy::Cdc(avg_size),
- }
- }
-
- /// Create a new StorageConfig with fixed-size chunking policy
- pub fn fixed_size(chunk_size: u32) -> Self {
- Self {
- chunking_policy: ChunkingPolicy::FixedSize(chunk_size),
- }
- }
-
- /// Create a new StorageConfig with line-based chunking policy
- pub fn line() -> Self {
- Self {
- chunking_policy: ChunkingPolicy::Line,
- }
- }
-}
-
-#[derive(Debug)]
-pub enum ChunkingPolicy {
- /// Content-Defined Chunking using Rabin fingerprinting
- /// The `u32` value represents the desired average chunk size in bytes
- Cdc(u32),
-
- /// Fixed-size chunking
- /// The `u32` value represents the exact size of each chunk in bytes
- FixedSize(u32),
-
- /// Line-based chunking
- /// Each line becomes a separate chunk
- Line,
-}
-
-impl Default for ChunkingPolicy {
- fn default() -> Self {
- ChunkingPolicy::Cdc(64 * 1024) // Default to 64KB CDC
- }
-}
-
-/// Represents a chunk of data with its hash and content
-#[derive(Debug, Clone)]
-pub struct Chunk {
- /// Blake3 hash of the chunk content
- pub hash: [u8; 32],
- /// Raw chunk data
- pub data: Vec<u8>,
-}
-
-/// Represents an index entry pointing to a chunk
-#[derive(Debug, Clone)]
-pub struct IndexEntry {
- /// Blake3 hash of the chunk
- pub hash: [u8; 32],
- /// Size of the chunk in bytes
- pub size: u32,
-}
-
-/// Result of chunking a file
-#[derive(Debug)]
-pub struct ChunkingResult {
- /// List of chunks extracted from the file
- pub chunks: Vec<Chunk>,
- /// Total size of the original file
- pub total_size: u64,
-}
-
-/// Split a file into chunks and store them in the repository, then output the index file to the specified directory
-pub async fn write_file(
- file_to_write: impl Into<PathBuf>,
- storage_dir: impl Into<PathBuf>,
- output_index_file: impl Into<PathBuf>,
- cfg: &StorageConfig,
-) -> Result<(), StorageIOError> {
- let (file_to_write, storage_dir, output_index_file) =
- precheck(file_to_write, storage_dir, output_index_file).await?;
-
- info!("Starting file write: {}", file_to_write.display());
- let start_time = Instant::now();
-
- // Memory map the entire file
- let file = std::fs::File::open(&file_to_write)?;
- let mmap = unsafe { Mmap::map(&file)? };
- let data = &mmap[..];
-
- // Split into chunks based on policy
- let chunking_result = split_into_chunks(&data, &cfg.chunking_policy)?;
-
- // Store chunks and create index
- let index_entries = store_chunks(&chunking_result.chunks, &storage_dir).await?;
-
- // Write index file
- write_index_file(&index_entries, &output_index_file).await?;
-
- let duration = start_time.elapsed();
- info!(
- "File write completed in {:?}: {}",
- duration,
- file_to_write.display()
- );
-
- Ok(())
-}
-
-/// Split data into chunks based on the specified policy
-pub fn split_into_chunks(
- data: &[u8],
- policy: &ChunkingPolicy,
-) -> Result<ChunkingResult, StorageIOError> {
- match policy {
- ChunkingPolicy::Cdc(avg_size) => split_cdc(data, *avg_size),
- ChunkingPolicy::FixedSize(chunk_size) => split_fixed(data, *chunk_size),
- ChunkingPolicy::Line => split_by_lines(data),
- }
-}
-
-/// Split data using Content-Defined Chunking
-fn split_cdc(data: &[u8], avg_size: u32) -> Result<ChunkingResult, StorageIOError> {
- use crate::store::cdc::split_cdc_impl;
- split_cdc_impl(data, avg_size)
-}
-
-/// Split data using fixed-size chunking
-fn split_fixed(data: &[u8], chunk_size: u32) -> Result<ChunkingResult, StorageIOError> {
- use crate::store::fixed::split_fixed_impl;
- split_fixed_impl(data, chunk_size)
-}
-
-/// Split data by lines
-fn split_by_lines(data: &[u8]) -> Result<ChunkingResult, StorageIOError> {
- use crate::store::line::split_by_lines_impl;
- split_by_lines_impl(data)
-}
-
-/// Store chunks in the storage directory and return index entries
-async fn store_chunks(
- chunks: &[Chunk],
- storage_dir: &Path,
-) -> Result<Vec<IndexEntry>, StorageIOError> {
- let mut index_entries = Vec::with_capacity(chunks.len());
-
- let writed_counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
- let total_chunks = chunks.len();
-
- for chunk in chunks {
- // Create storage directory structure based on hash
- let hash_str = hex::encode(chunk.hash);
- let chunk_dir = get_dir(storage_dir.to_path_buf(), hash_str.clone())?;
-
- // Create directory if it doesn't exist
- if let Some(parent) = chunk_dir.parent() {
- fs::create_dir_all(parent).await?;
- }
-
- // Write chunk data
- let chunk_path = chunk_dir.with_extension("chunk");
- if !chunk_path.exists() {
- trace!("W: {}", hash_str);
- fs::write(&chunk_path, &chunk.data).await?;
- writed_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
- }
-
- // Add to index
- index_entries.push(IndexEntry {
- hash: chunk.hash,
- size: chunk.data.len() as u32,
- });
- }
-
- let writed_count = writed_counter.load(std::sync::atomic::Ordering::Relaxed);
- info!(
- "Chunk storage completed: {}/{} ({}%) chunks written ({} duplicates skipped)",
- writed_count,
- total_chunks,
- (writed_count as f32 / total_chunks as f32) * 100 as f32,
- total_chunks - writed_count
- );
-
- Ok(index_entries)
-}
-
-/// Write index file containing chunk hashes and sizes
-async fn write_index_file(
- entries: &[IndexEntry],
- output_path: &Path,
-) -> Result<(), StorageIOError> {
- let mut index_data = Vec::with_capacity(entries.len() * 36); // 32 bytes hash + 4 bytes size
-
- for entry in entries {
- index_data.extend_from_slice(&entry.hash);
- index_data.extend_from_slice(&entry.size.to_le_bytes());
- }
-
- fs::write(output_path, &index_data).await?;
- Ok(())
-}
-
-/// Build a file from the repository to the specified directory using an index file
-pub async fn build_file(
- index_to_build: impl Into<PathBuf>,
- storage_dir: impl Into<PathBuf>,
- output_file: impl Into<PathBuf>,
-) -> Result<(), StorageIOError> {
- let (index_to_build, storage_dir, output_file) =
- precheck(index_to_build, storage_dir, output_file).await?;
-
- info!(
- "Starting file build from index: {}",
- index_to_build.display()
- );
- let start_time = Instant::now();
-
- // Read index file
- let index_entries = read_index_file(&index_to_build).await?;
-
- // Reconstruct file from chunks
- let reconstructed_data = reconstruct_from_chunks(&index_entries, &storage_dir).await?;
-
- // Write output file
- fs::write(&output_file, &reconstructed_data).await?;
-
- let duration = start_time.elapsed();
- info!(
- "File build completed in {:?}: {} -> {}",
- duration,
- index_to_build.display(),
- output_file.display()
- );
-
- Ok(())
-}
-
-/// Read index file and parse entries
-async fn read_index_file(index_path: &Path) -> Result<Vec<IndexEntry>, StorageIOError> {
- // Open file and memory map it
- let file = std::fs::File::open(index_path)?;
- let mmap = unsafe { Mmap::map(&file)? };
-
- // Each entry is 36 bytes (32 bytes hash + 4 bytes size)
- if mmap.len() % 36 != 0 {
- return Err(std::io::Error::new(
- std::io::ErrorKind::InvalidData,
- "Invalid index file format",
- )
- .into());
- }
-
- let num_entries = mmap.len() / 36;
- let mut entries = Vec::with_capacity(num_entries);
-
- for i in 0..num_entries {
- let start = i * 36;
- let hash_start = start;
- let size_start = start + 32;
-
- let mut hash = [0u8; 32];
- hash.copy_from_slice(&mmap[hash_start..hash_start + 32]);
-
- let size = u32::from_le_bytes([
- mmap[size_start],
- mmap[size_start + 1],
- mmap[size_start + 2],
- mmap[size_start + 3],
- ]);
-
- entries.push(IndexEntry { hash, size });
- }
-
- Ok(entries)
-}
-
-/// Reconstruct file data from chunks using index entries
-async fn reconstruct_from_chunks(
- entries: &[IndexEntry],
- storage_dir: &Path,
-) -> Result<Vec<u8>, StorageIOError> {
- let mut reconstructed_data = Vec::new();
-
- for entry in entries {
- // Get chunk path from hash
- let hash_str = hex::encode(entry.hash);
- let chunk_dir = get_dir(storage_dir.to_path_buf(), hash_str.clone())?;
- let chunk_path = chunk_dir.with_extension("chunk");
-
- trace!("R: {}", hash_str);
-
- // Memory map the chunk file
- let file = std::fs::File::open(&chunk_path)?;
- let mmap = unsafe { Mmap::map(&file)? };
-
- // Verify chunk size matches index
- if mmap.len() != entry.size as usize {
- return Err(std::io::Error::new(
- std::io::ErrorKind::InvalidData,
- format!(
- "Chunk size mismatch: expected {}, got {}",
- entry.size,
- mmap.len()
- ),
- )
- .into());
- }
-
- // Verify chunk hash
- let actual_hash = blake3_digest(&mmap);
- let expected_hash = entry.hash;
- if actual_hash != expected_hash {
- return Err(std::io::Error::new(
- std::io::ErrorKind::InvalidData,
- format!(
- "Chunk hash mismatch: expected {}, got {}",
- hex::encode(expected_hash),
- hex::encode(actual_hash)
- ),
- )
- .into());
- }
-
- // Append to reconstructed data
- reconstructed_data.extend_from_slice(&mmap);
- }
-
- Ok(reconstructed_data)
-}
-
-/// Calculate Blake3 hash of data
-fn blake3_digest(data: &[u8]) -> [u8; 32] {
- let hash = blake3::hash(data);
- *hash.as_bytes()
-}
-
-/// Calculate and return the corresponding storage subdirectory path based on the given storage directory and hash string
-pub fn get_dir(storage_dir: PathBuf, hash_str: String) -> Result<PathBuf, StorageIOError> {
- if hash_str.len() < 4 {
- return Err(StorageIOError::HashTooShort);
- }
- let dir = storage_dir
- .join(&hash_str[0..2])
- .join(&hash_str[2..4])
- .join(&hash_str);
- Ok(dir)
-}
-
-/// Calculate Blake3 hash of data and create a Chunk
-pub fn create_chunk(data: Vec<u8>) -> Chunk {
- let hash = blake3_digest(&data);
- Chunk { hash, data }
-}
-
-/// Read a file and split it into chunks using the specified policy
-pub async fn chunk_file(
- file_path: impl Into<PathBuf>,
- policy: &ChunkingPolicy,
-) -> Result<ChunkingResult, StorageIOError> {
- let file_path = file_path.into();
- info!("Starting chunking: {}", file_path.display());
- let start_time = Instant::now();
-
- // Memory map the file
- let file = std::fs::File::open(&file_path)?;
- let mmap = unsafe { Mmap::map(&file)? };
- let data = &mmap[..];
-
- let result = split_into_chunks(data, policy);
-
- let duration = start_time.elapsed();
- info!(
- "Chunking completed in {:?}: {}",
- duration,
- file_path.display()
- );
-
- result
-}
-
-/// Get chunk statistics
-pub fn get_chunk_stats(chunks: &[Chunk]) -> ChunkStats {
- let total_chunks = chunks.len();
- let total_size: usize = chunks.iter().map(|c| c.data.len()).sum();
- let avg_size = if total_chunks > 0 {
- total_size / total_chunks
- } else {
- 0
- };
- let min_size = chunks.iter().map(|c| c.data.len()).min().unwrap_or(0);
- let max_size = chunks.iter().map(|c| c.data.len()).max().unwrap_or(0);
-
- ChunkStats {
- total_chunks,
- total_size: total_size as u64,
- avg_size,
- min_size,
- max_size,
- }
-}
-
-/// Statistics about chunks
-#[derive(Debug, Clone)]
-pub struct ChunkStats {
- pub total_chunks: usize,
- pub total_size: u64,
- pub avg_size: usize,
- pub min_size: usize,
- pub max_size: usize,
-}
-
-/// Pre-check whether the input file path, directory path, and output path are valid
-pub async fn precheck(
- input_file: impl Into<PathBuf>,
- dir: impl Into<PathBuf>,
- output_file: impl Into<PathBuf>,
-) -> Result<(PathBuf, PathBuf, PathBuf), std::io::Error> {
- let (input, dir, output) = (input_file.into(), dir.into(), output_file.into());
-
- // Perform all checks in parallel
- let (input_metadata_result, dir_metadata_result, output_metadata_result) = tokio::join!(
- fs::metadata(&input),
- fs::metadata(&dir),
- fs::metadata(&output)
- );
-
- // Check if input file exists
- let input_metadata = match input_metadata_result {
- Ok(metadata) => metadata,
- Err(_) => {
- return Err(std::io::Error::new(
- std::io::ErrorKind::NotFound,
- format!("Input file not found: {}", input.display()),
- ));
- }
- };
-
- // Check if directory exists
- let dir_metadata = match dir_metadata_result {
- Ok(metadata) => metadata,
- Err(_) => {
- return Err(std::io::Error::new(
- std::io::ErrorKind::NotFound,
- format!("Directory not found: {}", dir.display()),
- ));
- }
- };
-
- // Check if output file already exists
- if output_metadata_result.is_ok() {
- return Err(std::io::Error::new(
- std::io::ErrorKind::AlreadyExists,
- format!("Output file already exist: {}", output.display()),
- ));
- }
-
- // Check if input is a file
- if !input_metadata.is_file() {
- return Err(std::io::Error::new(
- std::io::ErrorKind::InvalidInput,
- format!("Input path is not a file: {}", input.display()),
- ));
- }
-
- // Check if dir is a directory
- if !dir_metadata.is_dir() {
- return Err(std::io::Error::new(
- std::io::ErrorKind::NotADirectory,
- format!("Input path is not a directory: {}", dir.display()),
- ));
- }
-
- Ok((input, dir, output))
-}
diff --git a/systems/storage/src/store/cdc.rs b/systems/storage/src/store/cdc.rs
deleted file mode 100644
index cc16202..0000000
--- a/systems/storage/src/store/cdc.rs
+++ /dev/null
@@ -1,307 +0,0 @@
-use std::path::PathBuf;
-
-use crate::{error::StorageIOError, store::ChunkingResult};
-
-/// Rabin fingerprint parameters for CDC
-const WINDOW_SIZE: usize = 48;
-const MIN_CHUNK_RATIO: f64 = 0.75;
-const MAX_CHUNK_RATIO: f64 = 2.0;
-const BASE_TARGET_MASK: u64 = 0xFFFF;
-
-/// Rabin fingerprint polynomial (irreducible polynomial)
-const POLYNOMIAL: u64 = 0x3DA3358B4DC173;
-
-/// Precomputed table for Rabin fingerprint sliding window
-static FINGERPRINT_TABLE: [u64; 256] = {
- let mut table = [0u64; 256];
- let mut i = 0;
- while i < 256 {
- let mut fingerprint = i as u64;
- let mut j = 0;
- while j < 8 {
- if fingerprint & 1 != 0 {
- fingerprint = (fingerprint >> 1) ^ POLYNOMIAL;
- } else {
- fingerprint >>= 1;
- }
- j += 1;
- }
- table[i] = fingerprint;
- i += 1;
- }
- table
-};
-
-/// Calculate Rabin fingerprint for a byte
-#[inline]
-fn rabin_fingerprint_byte(old_fingerprint: u64, old_byte: u8, new_byte: u8) -> u64 {
- let old_byte_index = old_byte as usize;
-
- let fingerprint =
- (old_fingerprint << 8) ^ (new_byte as u64) ^ FINGERPRINT_TABLE[old_byte_index];
- fingerprint
-}
-
-/// Split data using Content-Defined Chunking with Rabin fingerprinting
-pub fn split_cdc_impl(data: &[u8], avg_size: u32) -> Result<ChunkingResult, StorageIOError> {
- let avg_size = avg_size as usize;
-
- if avg_size == 0 {
- return Err(std::io::Error::new(
- std::io::ErrorKind::InvalidInput,
- "Average chunk size must be greater than 0",
- )
- .into());
- }
-
- // Calculate min and max chunk sizes based on average size
- let min_chunk_size = (avg_size as f64 * MIN_CHUNK_RATIO) as usize;
- let max_chunk_size = (avg_size as f64 * MAX_CHUNK_RATIO) as usize;
-
- // Ensure reasonable bounds
- let min_chunk_size = min_chunk_size.max(512); // At least 512 bytes
- let max_chunk_size = max_chunk_size.max(avg_size * 2);
-
- let target_mask = {
- let scale_factor = avg_size as f64 / (64.0 * 1024.0);
- let mask_value = (BASE_TARGET_MASK as f64 * scale_factor) as u64;
- mask_value.max(0xC000)
- };
-
- let mut chunks = Vec::new();
- let mut start = 0;
- let data_len = data.len();
-
- while start < data_len {
- let chunk_start = start;
-
- let search_end = (start + max_chunk_size).min(data_len);
- let mut boundary_found = false;
- let mut boundary_pos = search_end;
-
- let mut window = Vec::with_capacity(WINDOW_SIZE);
- let mut fingerprint: u64 = 0;
-
- for i in start..search_end {
- let byte = data[i];
-
- // Update sliding window
- if window.len() >= WINDOW_SIZE {
- let old_byte = window.remove(0);
- fingerprint = rabin_fingerprint_byte(fingerprint, old_byte, byte);
- } else {
- fingerprint = (fingerprint << 8) ^ (byte as u64);
- }
- window.push(byte);
-
- if i - chunk_start >= min_chunk_size {
- if (fingerprint & target_mask) == 0 {
- boundary_found = true;
- boundary_pos = i + 1;
- break;
- }
- }
- }
-
- let chunk_end = if boundary_found {
- boundary_pos
- } else {
- search_end
- };
-
- let chunk_data = data[chunk_start..chunk_end].to_vec();
- let chunk = crate::store::create_chunk(chunk_data);
- chunks.push(chunk);
-
- start = chunk_end;
- }
-
- if chunks.len() > 1 {
- let mut merged_chunks = Vec::new();
- let mut i = 0;
-
- while i < chunks.len() {
- let current_chunk = &chunks[i];
-
- if i < chunks.len() - 1 {
- let next_chunk = &chunks[i + 1];
- if current_chunk.data.len() < min_chunk_size
- && current_chunk.data.len() + next_chunk.data.len() <= max_chunk_size
- {
- // Merge chunks
- let mut merged_data = current_chunk.data.clone();
- merged_data.extend_from_slice(&next_chunk.data);
- let merged_chunk = crate::store::create_chunk(merged_data);
- merged_chunks.push(merged_chunk);
- i += 2;
- } else {
- merged_chunks.push(current_chunk.clone());
- i += 1;
- }
- } else {
- merged_chunks.push(current_chunk.clone());
- i += 1;
- }
- }
-
- chunks = merged_chunks;
- }
-
- Ok(ChunkingResult {
- chunks,
- total_size: data_len as u64,
- })
-}
-
-/// Split file using CDC with specified average chunk size
-pub async fn write_file_cdc<I: Into<PathBuf>>(
- file_to_write: I,
- storage_dir: I,
- output_index_file: I,
- avg_size: u32,
-) -> Result<(), StorageIOError> {
- use crate::store::{StorageConfig, write_file};
-
- let config = StorageConfig::cdc(avg_size);
- write_file(file_to_write, storage_dir, output_index_file, &config).await
-}
-
-/// Test function for CDC chunking
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn test_cdc_basic() {
- let pattern: Vec<u8> = (0..1024).map(|i| (i % 256) as u8).collect();
- let mut data = Vec::new();
- for _ in 0..10 {
- data.extend_from_slice(&pattern);
- }
-
- let result = split_cdc_impl(&data, 4096).unwrap();
-
- // Should have at least one chunk
- assert!(!result.chunks.is_empty());
-
- assert!(
- result.chunks.len() >= 1 && result.chunks.len() <= 10,
- "Expected 1-10 chunks for 10KB data with 4KB average, got {}",
- result.chunks.len()
- );
-
- // Verify total size
- let total_chunk_size: usize = result.chunks.iter().map(|c| c.data.len()).sum();
- assert_eq!(total_chunk_size, data.len());
-
- // Verify chunk size bounds (more relaxed for CDC)
- let max_size = (4096.0 * MAX_CHUNK_RATIO) as usize;
-
- for chunk in &result.chunks {
- let chunk_size = chunk.data.len();
- // CDC can produce chunks smaller than min_size due to content patterns
- // But they should not exceed max_size
- assert!(
- chunk_size <= max_size,
- "Chunk size {} exceeds maximum {}",
- chunk_size,
- max_size
- );
- }
- }
-
- #[test]
- fn test_cdc_small_file() {
- // Small file should result in single chunk
- let data = vec![1, 2, 3, 4, 5];
-
- let result = split_cdc_impl(&data, 4096).unwrap();
-
- // Should have exactly one chunk
- assert_eq!(result.chunks.len(), 1);
- assert_eq!(result.chunks[0].data.len(), data.len());
- }
-
- #[test]
- fn test_cdc_large_avg_size() {
- // Test with larger average size (256KB)
- let mut data = Vec::new();
- for i in 0..(256 * 1024 * 3) {
- // 768KB of data
- data.push((i % 256) as u8);
- }
-
- let result = split_cdc_impl(&data, 256 * 1024).unwrap();
-
- // With 768KB data and 256KB average, should have reasonable number of chunks
- // CDC is content-defined, so exact count varies
- assert!(
- result.chunks.len() >= 1 && result.chunks.len() <= 10,
- "Expected 1-10 chunks for 768KB data with 256KB average, got {}",
- result.chunks.len()
- );
-
- // Verify total size
- let total_chunk_size: usize = result.chunks.iter().map(|c| c.data.len()).sum();
- assert_eq!(total_chunk_size, data.len());
-
- // Verify chunk size bounds
- let max_size = ((256 * 1024) as f64 * MAX_CHUNK_RATIO) as usize;
-
- for chunk in &result.chunks {
- // CDC can produce chunks smaller than min_size
- // But they should not exceed max_size
- assert!(
- chunk.data.len() <= max_size,
- "Chunk size {} exceeds maximum {}",
- chunk.data.len(),
- max_size
- );
- }
- }
-
- #[test]
- fn test_cdc_zero_avg_size() {
- let data = vec![1, 2, 3];
-
- let result = split_cdc_impl(&data, 0);
- assert!(result.is_err());
-
- match result {
- Err(StorageIOError::IOErr(e)) => {
- assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput);
- }
- _ => panic!("Expected IOErr with InvalidInput"),
- }
- }
-
- #[test]
- fn test_rabin_fingerprint() {
- // Test that rabin_fingerprint_byte function is deterministic
- let test_bytes = [0x01, 0x02, 0x03, 0x04];
-
- // Calculate fingerprint with specific sequence
- let mut fingerprint1 = 0;
- for &byte in &test_bytes {
- fingerprint1 = rabin_fingerprint_byte(fingerprint1, 0xAA, byte);
- }
-
- // Calculate again with same inputs
- let mut fingerprint2 = 0;
- for &byte in &test_bytes {
- fingerprint2 = rabin_fingerprint_byte(fingerprint2, 0xAA, byte);
- }
-
- // Same input should produce same output
- assert_eq!(fingerprint1, fingerprint2);
-
- // Test with different old_byte produces different result
- let mut fingerprint3 = 0;
- for &byte in &test_bytes {
- fingerprint3 = rabin_fingerprint_byte(fingerprint3, 0xBB, byte);
- }
-
- // Different old_byte should produce different fingerprint
- assert_ne!(fingerprint1, fingerprint3);
- }
-}
diff --git a/systems/storage/src/store/fixed.rs b/systems/storage/src/store/fixed.rs
deleted file mode 100644
index 044cc1c..0000000
--- a/systems/storage/src/store/fixed.rs
+++ /dev/null
@@ -1,417 +0,0 @@
-use std::path::PathBuf;
-use std::time::Instant;
-
-use log::{info, trace};
-use memmap2::Mmap;
-use tokio::fs;
-use tokio::task;
-
-use crate::{
- error::StorageIOError,
- store::{ChunkingResult, IndexEntry, StorageConfig, create_chunk, get_dir, precheck},
-};
-
-/// Split data using fixed-size chunking
-pub fn split_fixed_impl(data: &[u8], chunk_size: u32) -> Result<ChunkingResult, StorageIOError> {
- let chunk_size = chunk_size as usize;
-
- if chunk_size == 0 {
- return Err(std::io::Error::new(
- std::io::ErrorKind::InvalidInput,
- "Chunk size must be greater than 0",
- )
- .into());
- }
-
- let mut chunks = Vec::new();
- let mut start = 0;
- let total_size = data.len();
-
- while start < total_size {
- let end = (start + chunk_size).min(total_size);
- let chunk_data = data[start..end].to_vec();
-
- let chunk = crate::store::create_chunk(chunk_data);
- chunks.push(chunk);
-
- start = end;
- }
-
- Ok(ChunkingResult {
- chunks,
- total_size: total_size as u64,
- })
-}
-
-/// Split file using fixed-size chunking
-pub async fn write_file_fixed<I: Into<PathBuf>>(
- file_to_write: I,
- storage_dir: I,
- output_index_file: I,
- fixed_size: u32,
-) -> Result<(), StorageIOError> {
- let config = StorageConfig::fixed_size(fixed_size);
- write_file_parallel(file_to_write, storage_dir, output_index_file, &config).await
-}
-
-/// Split file using fixed-size chunking with parallel processing
-async fn write_file_parallel(
- file_to_write: impl Into<PathBuf>,
- storage_dir: impl Into<PathBuf>,
- output_index_file: impl Into<PathBuf>,
- cfg: &StorageConfig,
-) -> Result<(), StorageIOError> {
- let (file_to_write, storage_dir, output_index_file) =
- precheck(file_to_write, storage_dir, output_index_file).await?;
-
- info!("Starting file write: {}", file_to_write.display());
- let start_time = Instant::now();
-
- // Memory map the entire file
- let file = std::fs::File::open(&file_to_write)?;
- let mmap = unsafe { Mmap::map(&file)? };
- let data = &mmap[..];
-
- // Split into chunks based on policy
- let chunking_result = split_into_chunks_parallel(&data, &cfg.chunking_policy).await?;
-
- // Store chunks in parallel and create index
- let index_entries = store_chunks_parallel(&chunking_result.chunks, &storage_dir).await?;
-
- // Write index file
- write_index_file(&index_entries, &output_index_file).await?;
-
- let duration = start_time.elapsed();
- info!(
- "File write completed in {:?}: {}",
- duration,
- file_to_write.display()
- );
-
- Ok(())
-}
-
-/// Split data into chunks based on the specified policy with parallel processing
-async fn split_into_chunks_parallel(
- data: &[u8],
- policy: &crate::store::ChunkingPolicy,
-) -> Result<ChunkingResult, StorageIOError> {
- match policy {
- crate::store::ChunkingPolicy::FixedSize(chunk_size) => {
- split_fixed_parallel(data, *chunk_size).await
- }
- _ => split_fixed_impl(data, 64 * 1024), // Fallback for non-fixed chunking
- }
-}
-
-/// Split data using fixed-size chunking with parallel processing
-async fn split_fixed_parallel(
- data: &[u8],
- chunk_size: u32,
-) -> Result<ChunkingResult, StorageIOError> {
- let chunk_size = chunk_size as usize;
-
- if chunk_size == 0 {
- return Err(std::io::Error::new(
- std::io::ErrorKind::InvalidInput,
- "Chunk size must be greater than 0",
- )
- .into());
- }
-
- let total_size = data.len();
- let num_chunks = (total_size + chunk_size - 1) / chunk_size; // Ceiling division
-
- // Create a vector to hold chunk boundaries
- let mut chunk_boundaries = Vec::with_capacity(num_chunks);
- let mut start = 0;
-
- while start < total_size {
- let end = (start + chunk_size).min(total_size);
- chunk_boundaries.push((start, end));
- start = end;
- }
-
- // Process chunks in parallel using Tokio tasks
- let chunks: Vec<crate::store::Chunk> = if chunk_boundaries.len() > 1 {
- // Use parallel processing for multiple chunks
- let mut tasks = Vec::with_capacity(chunk_boundaries.len());
-
- for (start, end) in chunk_boundaries {
- let chunk_data = data[start..end].to_vec();
-
- // Spawn a blocking task for each chunk
- tasks.push(task::spawn_blocking(move || create_chunk(chunk_data)));
- }
-
- // Wait for all tasks to complete
- let mut chunks = Vec::with_capacity(tasks.len());
- for task in tasks {
- match task.await {
- Ok(chunk) => chunks.push(chunk),
- Err(e) => {
- return Err(std::io::Error::new(
- std::io::ErrorKind::Other,
- format!("Task join error: {}", e),
- )
- .into());
- }
- }
- }
-
- chunks
- } else {
- // Single chunk, no need for parallel processing
- chunk_boundaries
- .into_iter()
- .map(|(start, end)| {
- let chunk_data = data[start..end].to_vec();
- create_chunk(chunk_data)
- })
- .collect()
- };
-
- Ok(ChunkingResult {
- chunks,
- total_size: total_size as u64,
- })
-}
-
-/// Store chunks in the storage directory in parallel and return index entries
-async fn store_chunks_parallel(
- chunks: &[crate::store::Chunk],
- storage_dir: &std::path::Path,
-) -> Result<Vec<IndexEntry>, StorageIOError> {
- let mut tasks = Vec::with_capacity(chunks.len());
-
- let writed_counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
- let total_chunks = chunks.len();
-
- for chunk in chunks {
- let chunk = chunk.clone();
- let storage_dir = storage_dir.to_path_buf();
- let writed_counter = writed_counter.clone();
-
- // Spawn async task for each chunk storage operation
- tasks.push(task::spawn(async move {
- // Create storage directory structure based on hash
- let hash_str = hex::encode(chunk.hash);
- let chunk_dir = get_dir(storage_dir, hash_str.clone())?;
-
- // Create directory if it doesn't exist
- if let Some(parent) = chunk_dir.parent() {
- fs::create_dir_all(parent).await?;
- }
-
- // Write chunk data
- let chunk_path = chunk_dir.with_extension("chunk");
- if !chunk_path.exists() {
- trace!("W: {}", hash_str);
- fs::write(&chunk_path, &chunk.data).await?;
- writed_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
- }
-
- Ok::<IndexEntry, StorageIOError>(IndexEntry {
- hash: chunk.hash,
- size: chunk.data.len() as u32,
- })
- }));
- }
-
- let writed_count = writed_counter.load(std::sync::atomic::Ordering::Relaxed);
- info!(
- "Chunk storage completed: {}/{} ({}%) chunks written ({} duplicates skipped)",
- writed_count,
- total_chunks,
- (writed_count as f32 / total_chunks as f32) * 100 as f32,
- total_chunks - writed_count
- );
-
- // Wait for all tasks to complete
- let mut index_entries = Vec::with_capacity(chunks.len());
- for task in tasks {
- let entry = task.await.map_err(|e| {
- std::io::Error::new(std::io::ErrorKind::Other, format!("Task join error: {}", e))
- })??;
- index_entries.push(entry);
- }
-
- Ok(index_entries)
-}
-
-/// Write index file containing chunk hashes and sizes
-async fn write_index_file(
- entries: &[IndexEntry],
- output_path: &std::path::Path,
-) -> Result<(), StorageIOError> {
- let mut index_data = Vec::with_capacity(entries.len() * 36); // 32 bytes hash + 4 bytes size
-
- for entry in entries {
- index_data.extend_from_slice(&entry.hash);
- index_data.extend_from_slice(&entry.size.to_le_bytes());
- }
-
- fs::write(output_path, &index_data).await?;
- Ok(())
-}
-
-/// Utility function to calculate optimal fixed chunk size based on file size
-pub fn calculate_optimal_chunk_size(file_size: u64, target_chunks: usize) -> u32 {
- if target_chunks == 0 || file_size == 0 {
- return 64 * 1024; // Default 64KB
- }
-
- let chunk_size = (file_size as f64 / target_chunks as f64).ceil() as u32;
-
- // Round to nearest power of 2 for better performance
- let rounded_size = if chunk_size <= 1024 {
- // Small chunks: use exact size
- chunk_size
- } else {
- // Larger chunks: round to nearest power of 2
- let mut size = chunk_size;
- size -= 1;
- size |= size >> 1;
- size |= size >> 2;
- size |= size >> 4;
- size |= size >> 8;
- size |= size >> 16;
- size += 1;
- size
- };
-
- // Ensure minimum and maximum bounds
- rounded_size.max(1024).min(16 * 1024 * 1024) // 1KB min, 16MB max
-}
-
-/// Split file with automatic chunk size calculation
-pub async fn write_file_fixed_auto<I: Into<PathBuf>, J: Into<PathBuf>, K: Into<PathBuf>>(
- file_to_write: I,
- storage_dir: J,
- output_index_file: K,
- target_chunks: usize,
-) -> Result<(), StorageIOError> {
- let file_path = file_to_write.into();
- let storage_dir = storage_dir.into();
- let output_index_file = output_index_file.into();
-
- let file_size = fs::metadata(&file_path).await?.len();
-
- let chunk_size = calculate_optimal_chunk_size(file_size, target_chunks);
- write_file_fixed(file_path, storage_dir, output_index_file, chunk_size).await
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn test_fixed_chunking_basic() {
- // Create 10KB of test data
- let data: Vec<u8> = (0..10240).map(|i| (i % 256) as u8).collect();
-
- // Split into 1KB chunks
- let result = split_fixed_impl(&data, 1024).unwrap();
-
- // Should have 10 chunks
- assert_eq!(result.chunks.len(), 10);
-
- // Verify total size
- let total_chunk_size: usize = result.chunks.iter().map(|c| c.data.len()).sum();
- assert_eq!(total_chunk_size, data.len());
-
- // Verify chunk sizes (last chunk may be smaller)
- for (i, chunk) in result.chunks.iter().enumerate() {
- if i < 9 {
- assert_eq!(chunk.data.len(), 1024);
- } else {
- assert_eq!(chunk.data.len(), 1024); // 10240 / 1024 = 10 exactly
- }
- }
- }
-
- #[test]
- fn test_fixed_chunking_uneven() {
- // Create 5.5KB of test data
- let data: Vec<u8> = (0..5632).map(|i| (i % 256) as u8).collect();
-
- // Split into 2KB chunks
- let result = split_fixed_impl(&data, 2048).unwrap();
-
- // Should have 3 chunks (2048 + 2048 + 1536)
- assert_eq!(result.chunks.len(), 3);
-
- // Verify chunk sizes
- assert_eq!(result.chunks[0].data.len(), 2048);
- assert_eq!(result.chunks[1].data.len(), 2048);
- assert_eq!(result.chunks[2].data.len(), 1536);
-
- // Verify data integrity
- let mut reconstructed = Vec::new();
- for chunk in &result.chunks {
- reconstructed.extend_from_slice(&chunk.data);
- }
- assert_eq!(reconstructed, data);
- }
-
- #[test]
- fn test_fixed_chunking_small_file() {
- // Small file smaller than chunk size
- let data = vec![1, 2, 3, 4, 5];
-
- let result = split_fixed_impl(&data, 1024).unwrap();
-
- // Should have exactly one chunk
- assert_eq!(result.chunks.len(), 1);
- assert_eq!(result.chunks[0].data.len(), data.len());
- }
-
- #[test]
- fn test_fixed_chunking_zero_size() {
- let data = vec![1, 2, 3];
-
- let result = split_fixed_impl(&data, 0);
- assert!(result.is_err());
-
- match result {
- Err(StorageIOError::IOErr(e)) => {
- assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput);
- }
- _ => panic!("Expected IOErr with InvalidInput"),
- }
- }
-
- #[test]
- fn test_calculate_optimal_chunk_size() {
- // Test basic calculation
- assert_eq!(calculate_optimal_chunk_size(1024 * 1024, 16), 64 * 1024); // 1MB / 16 = 64KB
-
- // Test rounding to power of 2
- assert_eq!(calculate_optimal_chunk_size(1000 * 1000, 17), 64 * 1024); // ~58.8KB rounds to 64KB
-
- // Test minimum bound
- assert_eq!(calculate_optimal_chunk_size(100, 10), 1024); // 10 bytes per chunk, but min is 1KB
-
- // Test edge cases
- assert_eq!(calculate_optimal_chunk_size(0, 10), 64 * 1024); // Default
- assert_eq!(calculate_optimal_chunk_size(1000, 0), 64 * 1024); // Default
-
- // Test large file
- assert_eq!(
- calculate_optimal_chunk_size(100 * 1024 * 1024, 10),
- 16 * 1024 * 1024
- ); // 100MB / 10 = 10MB, max is 16MB
- }
-
- #[test]
- fn test_chunk_hash_uniqueness() {
- // Test that different data produces different hashes
- let data1 = vec![1, 2, 3, 4, 5];
- let data2 = vec![1, 2, 3, 4, 6];
-
- let result1 = split_fixed_impl(&data1, 1024).unwrap();
- let result2 = split_fixed_impl(&data2, 1024).unwrap();
-
- assert_ne!(result1.chunks[0].hash, result2.chunks[0].hash);
- }
-}
diff --git a/systems/storage/src/store/line.rs b/systems/storage/src/store/line.rs
deleted file mode 100644
index 971018b..0000000
--- a/systems/storage/src/store/line.rs
+++ /dev/null
@@ -1,393 +0,0 @@
-use std::path::PathBuf;
-
-use crate::{error::StorageIOError, store::ChunkingResult};
-
-/// Split data by lines (newline characters)
-pub fn split_by_lines_impl(data: &[u8]) -> Result<ChunkingResult, StorageIOError> {
- let mut chunks = Vec::new();
- let mut start = 0;
- let total_size = data.len();
-
- // Iterate through data to find line boundaries
- let mut i = 0;
- while i < data.len() {
- if data[i] == b'\n' {
- // Unix line ending
- let line_end = i + 1; // Include \n
- // Extract line data (include newline character)
- let line_data = data[start..line_end].to_vec();
-
- // Create chunk for this line
- let chunk = crate::store::create_chunk(line_data);
- chunks.push(chunk);
-
- // Move start to next line
- start = line_end;
- i = line_end;
- } else if data[i] == b'\r' && i + 1 < data.len() && data[i + 1] == b'\n' {
- // Windows line ending
- let line_end = i + 2; // Include both \r and \n
- // Extract line data (include newline characters)
- let line_data = data[start..line_end].to_vec();
-
- // Create chunk for this line
- let chunk = crate::store::create_chunk(line_data);
- chunks.push(chunk);
-
- // Move start to next line
- start = line_end;
- i = line_end;
- } else {
- i += 1;
- }
- }
-
- // Handle remaining data (last line without newline)
- if start < total_size {
- let line_data = data[start..].to_vec();
- let chunk = crate::store::create_chunk(line_data);
- chunks.push(chunk);
- }
-
- // Handle empty file (no lines)
- if chunks.is_empty() && total_size == 0 {
- let chunk = crate::store::create_chunk(Vec::new());
- chunks.push(chunk);
- }
-
- Ok(ChunkingResult {
- chunks,
- total_size: total_size as u64,
- })
-}
-
-/// Split file by lines
-pub async fn write_file_line<I: Into<PathBuf>>(
- file_to_write: I,
- storage_dir: I,
- output_index_file: I,
-) -> Result<(), StorageIOError> {
- use crate::store::{StorageConfig, write_file};
-
- let config = StorageConfig::line();
- write_file(file_to_write, storage_dir, output_index_file, &config).await
-}
-
-/// Utility function to split data by lines with custom line ending detection
-pub fn split_by_lines_custom<E: LineEnding>(data: &[u8]) -> Result<ChunkingResult, StorageIOError> {
- let mut chunks = Vec::new();
- let mut start = 0;
- let total_size = data.len();
-
- let mut i = 0;
- while i < total_size {
- if E::is_line_ending(data, i) {
- let line_end = i + E::ending_length(data, i);
- let line_data = data[start..line_end].to_vec();
-
- let chunk = crate::store::create_chunk(line_data);
- chunks.push(chunk);
-
- start = line_end;
- i = line_end;
- } else {
- i += 1;
- }
- }
-
- // Handle remaining data
- if start < total_size {
- let line_data = data[start..].to_vec();
- let chunk = crate::store::create_chunk(line_data);
- chunks.push(chunk);
- }
-
- // Handle empty file
- if chunks.is_empty() && total_size == 0 {
- let chunk = crate::store::create_chunk(Vec::new());
- chunks.push(chunk);
- }
-
- Ok(ChunkingResult {
- chunks,
- total_size: total_size as u64,
- })
-}
-
-/// Trait for different line ending types
-pub trait LineEnding {
- /// Check if position i is the start of a line ending
- fn is_line_ending(data: &[u8], i: usize) -> bool;
-
- /// Get the length of the line ending at position i
- fn ending_length(data: &[u8], i: usize) -> usize;
-}
-
-/// Unix line endings (\n)
-pub struct UnixLineEnding;
-
-impl LineEnding for UnixLineEnding {
- fn is_line_ending(data: &[u8], i: usize) -> bool {
- i < data.len() && data[i] == b'\n'
- }
-
- fn ending_length(_data: &[u8], _i: usize) -> usize {
- 1
- }
-}
-
-/// Windows line endings (\r\n)
-pub struct WindowsLineEnding;
-
-impl LineEnding for WindowsLineEnding {
- fn is_line_ending(data: &[u8], i: usize) -> bool {
- i + 1 < data.len() && data[i] == b'\r' && data[i + 1] == b'\n'
- }
-
- fn ending_length(_data: &[u8], _i: usize) -> usize {
- 2
- }
-}
-
-/// Mixed line endings (detects both Unix and Windows)
-pub struct MixedLineEnding;
-
-impl LineEnding for MixedLineEnding {
- fn is_line_ending(data: &[u8], i: usize) -> bool {
- if i < data.len() && data[i] == b'\n' {
- true
- } else if i + 1 < data.len() && data[i] == b'\r' && data[i + 1] == b'\n' {
- true
- } else {
- false
- }
- }
-
- fn ending_length(data: &[u8], i: usize) -> usize {
- if i < data.len() && data[i] == b'\n' {
- 1
- } else if i + 1 < data.len() && data[i] == b'\r' && data[i + 1] == b'\n' {
- 2
- } else {
- 1 // Default to 1 if somehow called incorrectly
- }
- }
-}
-
-/// Detect line ending type from data
-pub fn detect_line_ending(data: &[u8]) -> LineEndingType {
- let mut unix_count = 0;
- let mut windows_count = 0;
-
- let mut i = 0;
- while i < data.len() {
- if data[i] == b'\n' {
- unix_count += 1;
- i += 1;
- } else if i + 1 < data.len() && data[i] == b'\r' && data[i + 1] == b'\n' {
- windows_count += 1;
- i += 2;
- } else {
- i += 1;
- }
- }
-
- if unix_count > windows_count {
- LineEndingType::Unix
- } else if windows_count > unix_count {
- LineEndingType::Windows
- } else {
- LineEndingType::Mixed
- }
-}
-
-/// Line ending type enum
-#[derive(Debug, Clone, Copy, PartialEq)]
-pub enum LineEndingType {
- Unix,
- Windows,
- Mixed,
-}
-
-impl LineEndingType {
- /// Split data using the detected line ending type
- pub fn split_by_lines(&self, data: &[u8]) -> Result<ChunkingResult, StorageIOError> {
- match self {
- LineEndingType::Unix => split_by_lines_custom::<UnixLineEnding>(data),
- LineEndingType::Windows => split_by_lines_custom::<WindowsLineEnding>(data),
- LineEndingType::Mixed => split_by_lines_custom::<MixedLineEnding>(data),
- }
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn test_line_chunking_unix() {
- let data = b"Hello\nWorld\nTest\n";
-
- let result = split_by_lines_impl(data).unwrap();
-
- // Should have 3 chunks
- assert_eq!(result.chunks.len(), 3);
-
- // Verify chunk contents
- assert_eq!(result.chunks[0].data, b"Hello\n");
- assert_eq!(result.chunks[1].data, b"World\n");
- assert_eq!(result.chunks[2].data, b"Test\n");
-
- // Verify total size
- let total_chunk_size: usize = result.chunks.iter().map(|c| c.data.len()).sum();
- assert_eq!(total_chunk_size, data.len());
- }
-
- #[test]
- fn test_line_chunking_windows() {
- let data = b"Hello\r\nWorld\r\nTest\r\n";
-
- let result = split_by_lines_impl(data).unwrap();
-
- // Should have 3 chunks
- assert_eq!(result.chunks.len(), 3);
-
- // Verify chunk contents (should include \r\n)
- assert_eq!(result.chunks[0].data, b"Hello\r\n");
- assert_eq!(result.chunks[1].data, b"World\r\n");
- assert_eq!(result.chunks[2].data, b"Test\r\n");
- }
-
- #[test]
- fn test_line_chunking_mixed() {
- let data = b"Hello\nWorld\r\nTest\n";
-
- let result = split_by_lines_impl(data).unwrap();
-
- // Should have 3 chunks
- assert_eq!(result.chunks.len(), 3);
-
- // Verify chunk contents
- assert_eq!(result.chunks[0].data, b"Hello\n");
- assert_eq!(result.chunks[1].data, b"World\r\n");
- assert_eq!(result.chunks[2].data, b"Test\n");
- }
-
- #[test]
- fn test_line_chunking_no_trailing_newline() {
- let data = b"Hello\nWorld\nTest";
-
- let result = split_by_lines_impl(data).unwrap();
-
- // Should have 3 chunks
- assert_eq!(result.chunks.len(), 3);
-
- // Verify chunk contents
- assert_eq!(result.chunks[0].data, b"Hello\n");
- assert_eq!(result.chunks[1].data, b"World\n");
- assert_eq!(result.chunks[2].data, b"Test");
- }
-
- #[test]
- fn test_line_chunking_empty_lines() {
- let data = b"Hello\n\nWorld\n\n\n";
-
- let result = split_by_lines_impl(data).unwrap();
-
- // Should have 5 chunks (including empty lines)
- // "Hello\n", "\n", "World\n", "\n", "\n"
- assert_eq!(result.chunks.len(), 5);
-
- // Verify chunk contents
- assert_eq!(result.chunks[0].data, b"Hello\n");
- assert_eq!(result.chunks[1].data, b"\n");
- assert_eq!(result.chunks[2].data, b"World\n");
- assert_eq!(result.chunks[3].data, b"\n");
- assert_eq!(result.chunks[4].data, b"\n");
- }
-
- #[test]
- fn test_line_chunking_empty_file() {
- let data = b"";
-
- let result = split_by_lines_impl(data).unwrap();
-
- // Should have 1 empty chunk
- assert_eq!(result.chunks.len(), 1);
- assert_eq!(result.chunks[0].data, b"");
- }
-
- #[test]
- fn test_detect_line_ending() {
- // Test Unix detection
- let unix_data = b"Hello\nWorld\n";
- assert_eq!(detect_line_ending(unix_data), LineEndingType::Unix);
-
- // Test Windows detection
- let windows_data = b"Hello\r\nWorld\r\n";
- assert_eq!(detect_line_ending(windows_data), LineEndingType::Windows);
-
- // Test mixed detection
- let mixed_data = b"Hello\nWorld\r\n";
- assert_eq!(detect_line_ending(mixed_data), LineEndingType::Mixed);
-
- // Test no newlines
- let no_newlines = b"Hello World";
- assert_eq!(detect_line_ending(no_newlines), LineEndingType::Mixed);
- }
-
- #[test]
- fn test_custom_line_ending_unix() {
- let data = b"Hello\nWorld\n";
-
- let result = split_by_lines_custom::<UnixLineEnding>(data).unwrap();
-
- assert_eq!(result.chunks.len(), 2);
- assert_eq!(result.chunks[0].data, b"Hello\n");
- assert_eq!(result.chunks[1].data, b"World\n");
- }
-
- #[test]
- fn test_custom_line_ending_windows() {
- let data = b"Hello\r\nWorld\r\n";
-
- let result = split_by_lines_custom::<WindowsLineEnding>(data).unwrap();
-
- assert_eq!(result.chunks.len(), 2);
- assert_eq!(result.chunks[0].data, b"Hello\r\n");
- assert_eq!(result.chunks[1].data, b"World\r\n");
- }
-
- #[test]
- fn test_line_ending_type_split() {
- let unix_data = b"Hello\nWorld\n";
- let windows_data = b"Hello\r\nWorld\r\n";
- let mixed_data = b"Hello\nWorld\r\n";
-
- // Test Unix
- let unix_result = LineEndingType::Unix.split_by_lines(unix_data).unwrap();
- assert_eq!(unix_result.chunks.len(), 2);
-
- // Test Windows
- let windows_result = LineEndingType::Windows
- .split_by_lines(windows_data)
- .unwrap();
- assert_eq!(windows_result.chunks.len(), 2);
-
- // Test Mixed
- let mixed_result = LineEndingType::Mixed.split_by_lines(mixed_data).unwrap();
- assert_eq!(mixed_result.chunks.len(), 2);
- }
-
- #[test]
- fn test_chunk_hash_uniqueness() {
- // Test that different lines produce different hashes
- let data1 = b"Hello\n";
- let data2 = b"World\n";
-
- let result1 = split_by_lines_impl(data1).unwrap();
- let result2 = split_by_lines_impl(data2).unwrap();
-
- assert_ne!(result1.chunks[0].hash, result2.chunks[0].hash);
- }
-}