diff options
| author | 魏曹先生 <1992414357@qq.com> | 2026-03-08 22:48:54 +0800 |
|---|---|---|
| committer | 魏曹先生 <1992414357@qq.com> | 2026-03-08 22:48:54 +0800 |
| commit | 47e0ffd50427440696c245814517e4f5fa94ed83 (patch) | |
| tree | 777b1107af04f6b5bcc79673064b1821e1b7f59f | |
| parent | 90ed18a41fef137ed0637cf9fc6aa667de2c905f (diff) | |
Move action system to legacy and remove storage system
17 files changed, 5 insertions, 1659 deletions
@@ -857,12 +857,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] -name = "hex" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" - -[[package]] name = "hex_display" version = "0.1.0" @@ -976,7 +970,6 @@ dependencies = [ "jvlib", "sha1_hash", "sheet_system", - "storage_system", "tcp_connection", "tokio", "toml", @@ -1810,18 +1803,6 @@ dependencies = [ ] [[package]] -name = "storage_system" -version = "0.1.0" -dependencies = [ - "blake3", - "hex", - "log", - "memmap2", - "thiserror 1.0.69", - "tokio", -] - -[[package]] name = "subtle" version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -30,11 +30,8 @@ members = [ "systems/_asset/test", "systems/_constants", "systems/_constants/macros", - "systems/action", - "systems/action/action_macros", "systems/sheet", "systems/sheet/macros", - "systems/storage", "utils/cfg_file", "utils/cfg_file/cfg_file_derive", "utils/cfg_file/cfg_file_test", @@ -48,6 +45,8 @@ members = [ "legacy_data", "legacy_data/tests", "legacy_actions", + "legacy_systems/action", + "legacy_systems/action/action_macros", # LEGACY AREA ] @@ -102,10 +101,9 @@ tcp_connection = { path = "utils/tcp_connection" } # Systems asset_system = { path = "systems/_asset" } constants = { path = "systems/_constants" } -action_system = { path = "systems/action" } sheet_system = { path = "systems/sheet" } -storage_system = { path = "systems/storage" } # Legacy vcs_data = { path = "legacy_data" } vcs_actions = { path = "legacy_actions" } +action_system = { path = "legacy_systems/action" } diff --git a/legacy_actions/Cargo.toml b/legacy_actions/Cargo.toml index 7c9e078..0ee8068 100644 --- a/legacy_actions/Cargo.toml +++ b/legacy_actions/Cargo.toml @@ -13,7 +13,7 @@ sha1_hash = { path = "../utils/sha1_hash" } just_fmt = "0.1.2" # Core dependencies -action_system = { path = "../systems/action" } +action_system = { path = "../legacy_systems/action" } vcs_data = { path = "../legacy_data" } # Error handling diff --git a/legacy_data/Cargo.toml b/legacy_data/Cargo.toml index 27fe6ad..e2f4dc6 100644 --- a/legacy_data/Cargo.toml +++ b/legacy_data/Cargo.toml @@ -12,7 +12,7 @@ sha1_hash = { path = "../utils/sha1_hash" } tcp_connection = { path = "../utils/tcp_connection" } # Core -action_system = { path = "../systems/action" } +action_system = { path = "../legacy_systems/action" } vcs_docs = { path = "../docs" } # Format diff --git a/systems/action/Cargo.toml b/legacy_systems/action/Cargo.toml index 5317975..5317975 100644 --- a/systems/action/Cargo.toml +++ b/legacy_systems/action/Cargo.toml diff --git a/systems/action/action_macros/Cargo.toml b/legacy_systems/action/action_macros/Cargo.toml index 0f209e2..0f209e2 100644 --- a/systems/action/action_macros/Cargo.toml +++ b/legacy_systems/action/action_macros/Cargo.toml diff --git a/systems/action/action_macros/src/lib.rs b/legacy_systems/action/action_macros/src/lib.rs index 6da0339..6da0339 100644 --- a/systems/action/action_macros/src/lib.rs +++ b/legacy_systems/action/action_macros/src/lib.rs diff --git a/systems/action/src/action.rs b/legacy_systems/action/src/action.rs index 62425ff..62425ff 100644 --- a/systems/action/src/action.rs +++ b/legacy_systems/action/src/action.rs diff --git a/systems/action/src/action_pool.rs b/legacy_systems/action/src/action_pool.rs index 019fa6d..019fa6d 100644 --- a/systems/action/src/action_pool.rs +++ b/legacy_systems/action/src/action_pool.rs diff --git a/systems/action/src/lib.rs b/legacy_systems/action/src/lib.rs index 12ae999..12ae999 100644 --- a/systems/action/src/lib.rs +++ b/legacy_systems/action/src/lib.rs diff --git a/systems/storage/Cargo.toml b/systems/storage/Cargo.toml deleted file mode 100644 index 5afb780..0000000 --- a/systems/storage/Cargo.toml +++ /dev/null @@ -1,13 +0,0 @@ -[package] -name = "storage_system" -edition = "2024" -version.workspace = true - -[dependencies] -thiserror = "1.0.69" -tokio = { version = "1.48", features = ["full"] } - -blake3 = "1.5.0" -hex = "0.4.3" -memmap2 = "0.9.4" -log = "0.4.25" diff --git a/systems/storage/src/error.rs b/systems/storage/src/error.rs deleted file mode 100644 index 4b3ad7e..0000000 --- a/systems/storage/src/error.rs +++ /dev/null @@ -1,8 +0,0 @@ -#[derive(Debug, thiserror::Error)] -pub enum StorageIOError { - #[error("IO error: {0}")] - IOErr(#[from] std::io::Error), - - #[error("Hash too short")] - HashTooShort, -} diff --git a/systems/storage/src/lib.rs b/systems/storage/src/lib.rs deleted file mode 100644 index 4f89971..0000000 --- a/systems/storage/src/lib.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod error; -pub mod store; diff --git a/systems/storage/src/store.rs b/systems/storage/src/store.rs deleted file mode 100644 index 6492449..0000000 --- a/systems/storage/src/store.rs +++ /dev/null @@ -1,493 +0,0 @@ -use std::path::{Path, PathBuf}; -use std::time::Instant; - -use blake3; -use log::{info, trace}; -use memmap2::Mmap; -use tokio::fs; - -use crate::error::StorageIOError; - -pub mod cdc; -pub mod fixed; -pub mod line; - -#[derive(Default, Debug)] -pub struct StorageConfig { - /// Chunking policy for splitting files - pub chunking_policy: ChunkingPolicy, -} - -impl StorageConfig { - /// Create a new StorageConfig with CDC chunking policy - pub fn cdc(avg_size: u32) -> Self { - Self { - chunking_policy: ChunkingPolicy::Cdc(avg_size), - } - } - - /// Create a new StorageConfig with fixed-size chunking policy - pub fn fixed_size(chunk_size: u32) -> Self { - Self { - chunking_policy: ChunkingPolicy::FixedSize(chunk_size), - } - } - - /// Create a new StorageConfig with line-based chunking policy - pub fn line() -> Self { - Self { - chunking_policy: ChunkingPolicy::Line, - } - } -} - -#[derive(Debug)] -pub enum ChunkingPolicy { - /// Content-Defined Chunking using Rabin fingerprinting - /// The `u32` value represents the desired average chunk size in bytes - Cdc(u32), - - /// Fixed-size chunking - /// The `u32` value represents the exact size of each chunk in bytes - FixedSize(u32), - - /// Line-based chunking - /// Each line becomes a separate chunk - Line, -} - -impl Default for ChunkingPolicy { - fn default() -> Self { - ChunkingPolicy::Cdc(64 * 1024) // Default to 64KB CDC - } -} - -/// Represents a chunk of data with its hash and content -#[derive(Debug, Clone)] -pub struct Chunk { - /// Blake3 hash of the chunk content - pub hash: [u8; 32], - /// Raw chunk data - pub data: Vec<u8>, -} - -/// Represents an index entry pointing to a chunk -#[derive(Debug, Clone)] -pub struct IndexEntry { - /// Blake3 hash of the chunk - pub hash: [u8; 32], - /// Size of the chunk in bytes - pub size: u32, -} - -/// Result of chunking a file -#[derive(Debug)] -pub struct ChunkingResult { - /// List of chunks extracted from the file - pub chunks: Vec<Chunk>, - /// Total size of the original file - pub total_size: u64, -} - -/// Split a file into chunks and store them in the repository, then output the index file to the specified directory -pub async fn write_file( - file_to_write: impl Into<PathBuf>, - storage_dir: impl Into<PathBuf>, - output_index_file: impl Into<PathBuf>, - cfg: &StorageConfig, -) -> Result<(), StorageIOError> { - let (file_to_write, storage_dir, output_index_file) = - precheck(file_to_write, storage_dir, output_index_file).await?; - - info!("Starting file write: {}", file_to_write.display()); - let start_time = Instant::now(); - - // Memory map the entire file - let file = std::fs::File::open(&file_to_write)?; - let mmap = unsafe { Mmap::map(&file)? }; - let data = &mmap[..]; - - // Split into chunks based on policy - let chunking_result = split_into_chunks(&data, &cfg.chunking_policy)?; - - // Store chunks and create index - let index_entries = store_chunks(&chunking_result.chunks, &storage_dir).await?; - - // Write index file - write_index_file(&index_entries, &output_index_file).await?; - - let duration = start_time.elapsed(); - info!( - "File write completed in {:?}: {}", - duration, - file_to_write.display() - ); - - Ok(()) -} - -/// Split data into chunks based on the specified policy -pub fn split_into_chunks( - data: &[u8], - policy: &ChunkingPolicy, -) -> Result<ChunkingResult, StorageIOError> { - match policy { - ChunkingPolicy::Cdc(avg_size) => split_cdc(data, *avg_size), - ChunkingPolicy::FixedSize(chunk_size) => split_fixed(data, *chunk_size), - ChunkingPolicy::Line => split_by_lines(data), - } -} - -/// Split data using Content-Defined Chunking -fn split_cdc(data: &[u8], avg_size: u32) -> Result<ChunkingResult, StorageIOError> { - use crate::store::cdc::split_cdc_impl; - split_cdc_impl(data, avg_size) -} - -/// Split data using fixed-size chunking -fn split_fixed(data: &[u8], chunk_size: u32) -> Result<ChunkingResult, StorageIOError> { - use crate::store::fixed::split_fixed_impl; - split_fixed_impl(data, chunk_size) -} - -/// Split data by lines -fn split_by_lines(data: &[u8]) -> Result<ChunkingResult, StorageIOError> { - use crate::store::line::split_by_lines_impl; - split_by_lines_impl(data) -} - -/// Store chunks in the storage directory and return index entries -async fn store_chunks( - chunks: &[Chunk], - storage_dir: &Path, -) -> Result<Vec<IndexEntry>, StorageIOError> { - let mut index_entries = Vec::with_capacity(chunks.len()); - - let writed_counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); - let total_chunks = chunks.len(); - - for chunk in chunks { - // Create storage directory structure based on hash - let hash_str = hex::encode(chunk.hash); - let chunk_dir = get_dir(storage_dir.to_path_buf(), hash_str.clone())?; - - // Create directory if it doesn't exist - if let Some(parent) = chunk_dir.parent() { - fs::create_dir_all(parent).await?; - } - - // Write chunk data - let chunk_path = chunk_dir.with_extension("chunk"); - if !chunk_path.exists() { - trace!("W: {}", hash_str); - fs::write(&chunk_path, &chunk.data).await?; - writed_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - } - - // Add to index - index_entries.push(IndexEntry { - hash: chunk.hash, - size: chunk.data.len() as u32, - }); - } - - let writed_count = writed_counter.load(std::sync::atomic::Ordering::Relaxed); - info!( - "Chunk storage completed: {}/{} ({}%) chunks written ({} duplicates skipped)", - writed_count, - total_chunks, - (writed_count as f32 / total_chunks as f32) * 100 as f32, - total_chunks - writed_count - ); - - Ok(index_entries) -} - -/// Write index file containing chunk hashes and sizes -async fn write_index_file( - entries: &[IndexEntry], - output_path: &Path, -) -> Result<(), StorageIOError> { - let mut index_data = Vec::with_capacity(entries.len() * 36); // 32 bytes hash + 4 bytes size - - for entry in entries { - index_data.extend_from_slice(&entry.hash); - index_data.extend_from_slice(&entry.size.to_le_bytes()); - } - - fs::write(output_path, &index_data).await?; - Ok(()) -} - -/// Build a file from the repository to the specified directory using an index file -pub async fn build_file( - index_to_build: impl Into<PathBuf>, - storage_dir: impl Into<PathBuf>, - output_file: impl Into<PathBuf>, -) -> Result<(), StorageIOError> { - let (index_to_build, storage_dir, output_file) = - precheck(index_to_build, storage_dir, output_file).await?; - - info!( - "Starting file build from index: {}", - index_to_build.display() - ); - let start_time = Instant::now(); - - // Read index file - let index_entries = read_index_file(&index_to_build).await?; - - // Reconstruct file from chunks - let reconstructed_data = reconstruct_from_chunks(&index_entries, &storage_dir).await?; - - // Write output file - fs::write(&output_file, &reconstructed_data).await?; - - let duration = start_time.elapsed(); - info!( - "File build completed in {:?}: {} -> {}", - duration, - index_to_build.display(), - output_file.display() - ); - - Ok(()) -} - -/// Read index file and parse entries -async fn read_index_file(index_path: &Path) -> Result<Vec<IndexEntry>, StorageIOError> { - // Open file and memory map it - let file = std::fs::File::open(index_path)?; - let mmap = unsafe { Mmap::map(&file)? }; - - // Each entry is 36 bytes (32 bytes hash + 4 bytes size) - if mmap.len() % 36 != 0 { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "Invalid index file format", - ) - .into()); - } - - let num_entries = mmap.len() / 36; - let mut entries = Vec::with_capacity(num_entries); - - for i in 0..num_entries { - let start = i * 36; - let hash_start = start; - let size_start = start + 32; - - let mut hash = [0u8; 32]; - hash.copy_from_slice(&mmap[hash_start..hash_start + 32]); - - let size = u32::from_le_bytes([ - mmap[size_start], - mmap[size_start + 1], - mmap[size_start + 2], - mmap[size_start + 3], - ]); - - entries.push(IndexEntry { hash, size }); - } - - Ok(entries) -} - -/// Reconstruct file data from chunks using index entries -async fn reconstruct_from_chunks( - entries: &[IndexEntry], - storage_dir: &Path, -) -> Result<Vec<u8>, StorageIOError> { - let mut reconstructed_data = Vec::new(); - - for entry in entries { - // Get chunk path from hash - let hash_str = hex::encode(entry.hash); - let chunk_dir = get_dir(storage_dir.to_path_buf(), hash_str.clone())?; - let chunk_path = chunk_dir.with_extension("chunk"); - - trace!("R: {}", hash_str); - - // Memory map the chunk file - let file = std::fs::File::open(&chunk_path)?; - let mmap = unsafe { Mmap::map(&file)? }; - - // Verify chunk size matches index - if mmap.len() != entry.size as usize { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!( - "Chunk size mismatch: expected {}, got {}", - entry.size, - mmap.len() - ), - ) - .into()); - } - - // Verify chunk hash - let actual_hash = blake3_digest(&mmap); - let expected_hash = entry.hash; - if actual_hash != expected_hash { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!( - "Chunk hash mismatch: expected {}, got {}", - hex::encode(expected_hash), - hex::encode(actual_hash) - ), - ) - .into()); - } - - // Append to reconstructed data - reconstructed_data.extend_from_slice(&mmap); - } - - Ok(reconstructed_data) -} - -/// Calculate Blake3 hash of data -fn blake3_digest(data: &[u8]) -> [u8; 32] { - let hash = blake3::hash(data); - *hash.as_bytes() -} - -/// Calculate and return the corresponding storage subdirectory path based on the given storage directory and hash string -pub fn get_dir(storage_dir: PathBuf, hash_str: String) -> Result<PathBuf, StorageIOError> { - if hash_str.len() < 4 { - return Err(StorageIOError::HashTooShort); - } - let dir = storage_dir - .join(&hash_str[0..2]) - .join(&hash_str[2..4]) - .join(&hash_str); - Ok(dir) -} - -/// Calculate Blake3 hash of data and create a Chunk -pub fn create_chunk(data: Vec<u8>) -> Chunk { - let hash = blake3_digest(&data); - Chunk { hash, data } -} - -/// Read a file and split it into chunks using the specified policy -pub async fn chunk_file( - file_path: impl Into<PathBuf>, - policy: &ChunkingPolicy, -) -> Result<ChunkingResult, StorageIOError> { - let file_path = file_path.into(); - info!("Starting chunking: {}", file_path.display()); - let start_time = Instant::now(); - - // Memory map the file - let file = std::fs::File::open(&file_path)?; - let mmap = unsafe { Mmap::map(&file)? }; - let data = &mmap[..]; - - let result = split_into_chunks(data, policy); - - let duration = start_time.elapsed(); - info!( - "Chunking completed in {:?}: {}", - duration, - file_path.display() - ); - - result -} - -/// Get chunk statistics -pub fn get_chunk_stats(chunks: &[Chunk]) -> ChunkStats { - let total_chunks = chunks.len(); - let total_size: usize = chunks.iter().map(|c| c.data.len()).sum(); - let avg_size = if total_chunks > 0 { - total_size / total_chunks - } else { - 0 - }; - let min_size = chunks.iter().map(|c| c.data.len()).min().unwrap_or(0); - let max_size = chunks.iter().map(|c| c.data.len()).max().unwrap_or(0); - - ChunkStats { - total_chunks, - total_size: total_size as u64, - avg_size, - min_size, - max_size, - } -} - -/// Statistics about chunks -#[derive(Debug, Clone)] -pub struct ChunkStats { - pub total_chunks: usize, - pub total_size: u64, - pub avg_size: usize, - pub min_size: usize, - pub max_size: usize, -} - -/// Pre-check whether the input file path, directory path, and output path are valid -pub async fn precheck( - input_file: impl Into<PathBuf>, - dir: impl Into<PathBuf>, - output_file: impl Into<PathBuf>, -) -> Result<(PathBuf, PathBuf, PathBuf), std::io::Error> { - let (input, dir, output) = (input_file.into(), dir.into(), output_file.into()); - - // Perform all checks in parallel - let (input_metadata_result, dir_metadata_result, output_metadata_result) = tokio::join!( - fs::metadata(&input), - fs::metadata(&dir), - fs::metadata(&output) - ); - - // Check if input file exists - let input_metadata = match input_metadata_result { - Ok(metadata) => metadata, - Err(_) => { - return Err(std::io::Error::new( - std::io::ErrorKind::NotFound, - format!("Input file not found: {}", input.display()), - )); - } - }; - - // Check if directory exists - let dir_metadata = match dir_metadata_result { - Ok(metadata) => metadata, - Err(_) => { - return Err(std::io::Error::new( - std::io::ErrorKind::NotFound, - format!("Directory not found: {}", dir.display()), - )); - } - }; - - // Check if output file already exists - if output_metadata_result.is_ok() { - return Err(std::io::Error::new( - std::io::ErrorKind::AlreadyExists, - format!("Output file already exist: {}", output.display()), - )); - } - - // Check if input is a file - if !input_metadata.is_file() { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - format!("Input path is not a file: {}", input.display()), - )); - } - - // Check if dir is a directory - if !dir_metadata.is_dir() { - return Err(std::io::Error::new( - std::io::ErrorKind::NotADirectory, - format!("Input path is not a directory: {}", dir.display()), - )); - } - - Ok((input, dir, output)) -} diff --git a/systems/storage/src/store/cdc.rs b/systems/storage/src/store/cdc.rs deleted file mode 100644 index cc16202..0000000 --- a/systems/storage/src/store/cdc.rs +++ /dev/null @@ -1,307 +0,0 @@ -use std::path::PathBuf; - -use crate::{error::StorageIOError, store::ChunkingResult}; - -/// Rabin fingerprint parameters for CDC -const WINDOW_SIZE: usize = 48; -const MIN_CHUNK_RATIO: f64 = 0.75; -const MAX_CHUNK_RATIO: f64 = 2.0; -const BASE_TARGET_MASK: u64 = 0xFFFF; - -/// Rabin fingerprint polynomial (irreducible polynomial) -const POLYNOMIAL: u64 = 0x3DA3358B4DC173; - -/// Precomputed table for Rabin fingerprint sliding window -static FINGERPRINT_TABLE: [u64; 256] = { - let mut table = [0u64; 256]; - let mut i = 0; - while i < 256 { - let mut fingerprint = i as u64; - let mut j = 0; - while j < 8 { - if fingerprint & 1 != 0 { - fingerprint = (fingerprint >> 1) ^ POLYNOMIAL; - } else { - fingerprint >>= 1; - } - j += 1; - } - table[i] = fingerprint; - i += 1; - } - table -}; - -/// Calculate Rabin fingerprint for a byte -#[inline] -fn rabin_fingerprint_byte(old_fingerprint: u64, old_byte: u8, new_byte: u8) -> u64 { - let old_byte_index = old_byte as usize; - - let fingerprint = - (old_fingerprint << 8) ^ (new_byte as u64) ^ FINGERPRINT_TABLE[old_byte_index]; - fingerprint -} - -/// Split data using Content-Defined Chunking with Rabin fingerprinting -pub fn split_cdc_impl(data: &[u8], avg_size: u32) -> Result<ChunkingResult, StorageIOError> { - let avg_size = avg_size as usize; - - if avg_size == 0 { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "Average chunk size must be greater than 0", - ) - .into()); - } - - // Calculate min and max chunk sizes based on average size - let min_chunk_size = (avg_size as f64 * MIN_CHUNK_RATIO) as usize; - let max_chunk_size = (avg_size as f64 * MAX_CHUNK_RATIO) as usize; - - // Ensure reasonable bounds - let min_chunk_size = min_chunk_size.max(512); // At least 512 bytes - let max_chunk_size = max_chunk_size.max(avg_size * 2); - - let target_mask = { - let scale_factor = avg_size as f64 / (64.0 * 1024.0); - let mask_value = (BASE_TARGET_MASK as f64 * scale_factor) as u64; - mask_value.max(0xC000) - }; - - let mut chunks = Vec::new(); - let mut start = 0; - let data_len = data.len(); - - while start < data_len { - let chunk_start = start; - - let search_end = (start + max_chunk_size).min(data_len); - let mut boundary_found = false; - let mut boundary_pos = search_end; - - let mut window = Vec::with_capacity(WINDOW_SIZE); - let mut fingerprint: u64 = 0; - - for i in start..search_end { - let byte = data[i]; - - // Update sliding window - if window.len() >= WINDOW_SIZE { - let old_byte = window.remove(0); - fingerprint = rabin_fingerprint_byte(fingerprint, old_byte, byte); - } else { - fingerprint = (fingerprint << 8) ^ (byte as u64); - } - window.push(byte); - - if i - chunk_start >= min_chunk_size { - if (fingerprint & target_mask) == 0 { - boundary_found = true; - boundary_pos = i + 1; - break; - } - } - } - - let chunk_end = if boundary_found { - boundary_pos - } else { - search_end - }; - - let chunk_data = data[chunk_start..chunk_end].to_vec(); - let chunk = crate::store::create_chunk(chunk_data); - chunks.push(chunk); - - start = chunk_end; - } - - if chunks.len() > 1 { - let mut merged_chunks = Vec::new(); - let mut i = 0; - - while i < chunks.len() { - let current_chunk = &chunks[i]; - - if i < chunks.len() - 1 { - let next_chunk = &chunks[i + 1]; - if current_chunk.data.len() < min_chunk_size - && current_chunk.data.len() + next_chunk.data.len() <= max_chunk_size - { - // Merge chunks - let mut merged_data = current_chunk.data.clone(); - merged_data.extend_from_slice(&next_chunk.data); - let merged_chunk = crate::store::create_chunk(merged_data); - merged_chunks.push(merged_chunk); - i += 2; - } else { - merged_chunks.push(current_chunk.clone()); - i += 1; - } - } else { - merged_chunks.push(current_chunk.clone()); - i += 1; - } - } - - chunks = merged_chunks; - } - - Ok(ChunkingResult { - chunks, - total_size: data_len as u64, - }) -} - -/// Split file using CDC with specified average chunk size -pub async fn write_file_cdc<I: Into<PathBuf>>( - file_to_write: I, - storage_dir: I, - output_index_file: I, - avg_size: u32, -) -> Result<(), StorageIOError> { - use crate::store::{StorageConfig, write_file}; - - let config = StorageConfig::cdc(avg_size); - write_file(file_to_write, storage_dir, output_index_file, &config).await -} - -/// Test function for CDC chunking -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_cdc_basic() { - let pattern: Vec<u8> = (0..1024).map(|i| (i % 256) as u8).collect(); - let mut data = Vec::new(); - for _ in 0..10 { - data.extend_from_slice(&pattern); - } - - let result = split_cdc_impl(&data, 4096).unwrap(); - - // Should have at least one chunk - assert!(!result.chunks.is_empty()); - - assert!( - result.chunks.len() >= 1 && result.chunks.len() <= 10, - "Expected 1-10 chunks for 10KB data with 4KB average, got {}", - result.chunks.len() - ); - - // Verify total size - let total_chunk_size: usize = result.chunks.iter().map(|c| c.data.len()).sum(); - assert_eq!(total_chunk_size, data.len()); - - // Verify chunk size bounds (more relaxed for CDC) - let max_size = (4096.0 * MAX_CHUNK_RATIO) as usize; - - for chunk in &result.chunks { - let chunk_size = chunk.data.len(); - // CDC can produce chunks smaller than min_size due to content patterns - // But they should not exceed max_size - assert!( - chunk_size <= max_size, - "Chunk size {} exceeds maximum {}", - chunk_size, - max_size - ); - } - } - - #[test] - fn test_cdc_small_file() { - // Small file should result in single chunk - let data = vec![1, 2, 3, 4, 5]; - - let result = split_cdc_impl(&data, 4096).unwrap(); - - // Should have exactly one chunk - assert_eq!(result.chunks.len(), 1); - assert_eq!(result.chunks[0].data.len(), data.len()); - } - - #[test] - fn test_cdc_large_avg_size() { - // Test with larger average size (256KB) - let mut data = Vec::new(); - for i in 0..(256 * 1024 * 3) { - // 768KB of data - data.push((i % 256) as u8); - } - - let result = split_cdc_impl(&data, 256 * 1024).unwrap(); - - // With 768KB data and 256KB average, should have reasonable number of chunks - // CDC is content-defined, so exact count varies - assert!( - result.chunks.len() >= 1 && result.chunks.len() <= 10, - "Expected 1-10 chunks for 768KB data with 256KB average, got {}", - result.chunks.len() - ); - - // Verify total size - let total_chunk_size: usize = result.chunks.iter().map(|c| c.data.len()).sum(); - assert_eq!(total_chunk_size, data.len()); - - // Verify chunk size bounds - let max_size = ((256 * 1024) as f64 * MAX_CHUNK_RATIO) as usize; - - for chunk in &result.chunks { - // CDC can produce chunks smaller than min_size - // But they should not exceed max_size - assert!( - chunk.data.len() <= max_size, - "Chunk size {} exceeds maximum {}", - chunk.data.len(), - max_size - ); - } - } - - #[test] - fn test_cdc_zero_avg_size() { - let data = vec![1, 2, 3]; - - let result = split_cdc_impl(&data, 0); - assert!(result.is_err()); - - match result { - Err(StorageIOError::IOErr(e)) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput); - } - _ => panic!("Expected IOErr with InvalidInput"), - } - } - - #[test] - fn test_rabin_fingerprint() { - // Test that rabin_fingerprint_byte function is deterministic - let test_bytes = [0x01, 0x02, 0x03, 0x04]; - - // Calculate fingerprint with specific sequence - let mut fingerprint1 = 0; - for &byte in &test_bytes { - fingerprint1 = rabin_fingerprint_byte(fingerprint1, 0xAA, byte); - } - - // Calculate again with same inputs - let mut fingerprint2 = 0; - for &byte in &test_bytes { - fingerprint2 = rabin_fingerprint_byte(fingerprint2, 0xAA, byte); - } - - // Same input should produce same output - assert_eq!(fingerprint1, fingerprint2); - - // Test with different old_byte produces different result - let mut fingerprint3 = 0; - for &byte in &test_bytes { - fingerprint3 = rabin_fingerprint_byte(fingerprint3, 0xBB, byte); - } - - // Different old_byte should produce different fingerprint - assert_ne!(fingerprint1, fingerprint3); - } -} diff --git a/systems/storage/src/store/fixed.rs b/systems/storage/src/store/fixed.rs deleted file mode 100644 index 044cc1c..0000000 --- a/systems/storage/src/store/fixed.rs +++ /dev/null @@ -1,417 +0,0 @@ -use std::path::PathBuf; -use std::time::Instant; - -use log::{info, trace}; -use memmap2::Mmap; -use tokio::fs; -use tokio::task; - -use crate::{ - error::StorageIOError, - store::{ChunkingResult, IndexEntry, StorageConfig, create_chunk, get_dir, precheck}, -}; - -/// Split data using fixed-size chunking -pub fn split_fixed_impl(data: &[u8], chunk_size: u32) -> Result<ChunkingResult, StorageIOError> { - let chunk_size = chunk_size as usize; - - if chunk_size == 0 { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "Chunk size must be greater than 0", - ) - .into()); - } - - let mut chunks = Vec::new(); - let mut start = 0; - let total_size = data.len(); - - while start < total_size { - let end = (start + chunk_size).min(total_size); - let chunk_data = data[start..end].to_vec(); - - let chunk = crate::store::create_chunk(chunk_data); - chunks.push(chunk); - - start = end; - } - - Ok(ChunkingResult { - chunks, - total_size: total_size as u64, - }) -} - -/// Split file using fixed-size chunking -pub async fn write_file_fixed<I: Into<PathBuf>>( - file_to_write: I, - storage_dir: I, - output_index_file: I, - fixed_size: u32, -) -> Result<(), StorageIOError> { - let config = StorageConfig::fixed_size(fixed_size); - write_file_parallel(file_to_write, storage_dir, output_index_file, &config).await -} - -/// Split file using fixed-size chunking with parallel processing -async fn write_file_parallel( - file_to_write: impl Into<PathBuf>, - storage_dir: impl Into<PathBuf>, - output_index_file: impl Into<PathBuf>, - cfg: &StorageConfig, -) -> Result<(), StorageIOError> { - let (file_to_write, storage_dir, output_index_file) = - precheck(file_to_write, storage_dir, output_index_file).await?; - - info!("Starting file write: {}", file_to_write.display()); - let start_time = Instant::now(); - - // Memory map the entire file - let file = std::fs::File::open(&file_to_write)?; - let mmap = unsafe { Mmap::map(&file)? }; - let data = &mmap[..]; - - // Split into chunks based on policy - let chunking_result = split_into_chunks_parallel(&data, &cfg.chunking_policy).await?; - - // Store chunks in parallel and create index - let index_entries = store_chunks_parallel(&chunking_result.chunks, &storage_dir).await?; - - // Write index file - write_index_file(&index_entries, &output_index_file).await?; - - let duration = start_time.elapsed(); - info!( - "File write completed in {:?}: {}", - duration, - file_to_write.display() - ); - - Ok(()) -} - -/// Split data into chunks based on the specified policy with parallel processing -async fn split_into_chunks_parallel( - data: &[u8], - policy: &crate::store::ChunkingPolicy, -) -> Result<ChunkingResult, StorageIOError> { - match policy { - crate::store::ChunkingPolicy::FixedSize(chunk_size) => { - split_fixed_parallel(data, *chunk_size).await - } - _ => split_fixed_impl(data, 64 * 1024), // Fallback for non-fixed chunking - } -} - -/// Split data using fixed-size chunking with parallel processing -async fn split_fixed_parallel( - data: &[u8], - chunk_size: u32, -) -> Result<ChunkingResult, StorageIOError> { - let chunk_size = chunk_size as usize; - - if chunk_size == 0 { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "Chunk size must be greater than 0", - ) - .into()); - } - - let total_size = data.len(); - let num_chunks = (total_size + chunk_size - 1) / chunk_size; // Ceiling division - - // Create a vector to hold chunk boundaries - let mut chunk_boundaries = Vec::with_capacity(num_chunks); - let mut start = 0; - - while start < total_size { - let end = (start + chunk_size).min(total_size); - chunk_boundaries.push((start, end)); - start = end; - } - - // Process chunks in parallel using Tokio tasks - let chunks: Vec<crate::store::Chunk> = if chunk_boundaries.len() > 1 { - // Use parallel processing for multiple chunks - let mut tasks = Vec::with_capacity(chunk_boundaries.len()); - - for (start, end) in chunk_boundaries { - let chunk_data = data[start..end].to_vec(); - - // Spawn a blocking task for each chunk - tasks.push(task::spawn_blocking(move || create_chunk(chunk_data))); - } - - // Wait for all tasks to complete - let mut chunks = Vec::with_capacity(tasks.len()); - for task in tasks { - match task.await { - Ok(chunk) => chunks.push(chunk), - Err(e) => { - return Err(std::io::Error::new( - std::io::ErrorKind::Other, - format!("Task join error: {}", e), - ) - .into()); - } - } - } - - chunks - } else { - // Single chunk, no need for parallel processing - chunk_boundaries - .into_iter() - .map(|(start, end)| { - let chunk_data = data[start..end].to_vec(); - create_chunk(chunk_data) - }) - .collect() - }; - - Ok(ChunkingResult { - chunks, - total_size: total_size as u64, - }) -} - -/// Store chunks in the storage directory in parallel and return index entries -async fn store_chunks_parallel( - chunks: &[crate::store::Chunk], - storage_dir: &std::path::Path, -) -> Result<Vec<IndexEntry>, StorageIOError> { - let mut tasks = Vec::with_capacity(chunks.len()); - - let writed_counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); - let total_chunks = chunks.len(); - - for chunk in chunks { - let chunk = chunk.clone(); - let storage_dir = storage_dir.to_path_buf(); - let writed_counter = writed_counter.clone(); - - // Spawn async task for each chunk storage operation - tasks.push(task::spawn(async move { - // Create storage directory structure based on hash - let hash_str = hex::encode(chunk.hash); - let chunk_dir = get_dir(storage_dir, hash_str.clone())?; - - // Create directory if it doesn't exist - if let Some(parent) = chunk_dir.parent() { - fs::create_dir_all(parent).await?; - } - - // Write chunk data - let chunk_path = chunk_dir.with_extension("chunk"); - if !chunk_path.exists() { - trace!("W: {}", hash_str); - fs::write(&chunk_path, &chunk.data).await?; - writed_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - } - - Ok::<IndexEntry, StorageIOError>(IndexEntry { - hash: chunk.hash, - size: chunk.data.len() as u32, - }) - })); - } - - let writed_count = writed_counter.load(std::sync::atomic::Ordering::Relaxed); - info!( - "Chunk storage completed: {}/{} ({}%) chunks written ({} duplicates skipped)", - writed_count, - total_chunks, - (writed_count as f32 / total_chunks as f32) * 100 as f32, - total_chunks - writed_count - ); - - // Wait for all tasks to complete - let mut index_entries = Vec::with_capacity(chunks.len()); - for task in tasks { - let entry = task.await.map_err(|e| { - std::io::Error::new(std::io::ErrorKind::Other, format!("Task join error: {}", e)) - })??; - index_entries.push(entry); - } - - Ok(index_entries) -} - -/// Write index file containing chunk hashes and sizes -async fn write_index_file( - entries: &[IndexEntry], - output_path: &std::path::Path, -) -> Result<(), StorageIOError> { - let mut index_data = Vec::with_capacity(entries.len() * 36); // 32 bytes hash + 4 bytes size - - for entry in entries { - index_data.extend_from_slice(&entry.hash); - index_data.extend_from_slice(&entry.size.to_le_bytes()); - } - - fs::write(output_path, &index_data).await?; - Ok(()) -} - -/// Utility function to calculate optimal fixed chunk size based on file size -pub fn calculate_optimal_chunk_size(file_size: u64, target_chunks: usize) -> u32 { - if target_chunks == 0 || file_size == 0 { - return 64 * 1024; // Default 64KB - } - - let chunk_size = (file_size as f64 / target_chunks as f64).ceil() as u32; - - // Round to nearest power of 2 for better performance - let rounded_size = if chunk_size <= 1024 { - // Small chunks: use exact size - chunk_size - } else { - // Larger chunks: round to nearest power of 2 - let mut size = chunk_size; - size -= 1; - size |= size >> 1; - size |= size >> 2; - size |= size >> 4; - size |= size >> 8; - size |= size >> 16; - size += 1; - size - }; - - // Ensure minimum and maximum bounds - rounded_size.max(1024).min(16 * 1024 * 1024) // 1KB min, 16MB max -} - -/// Split file with automatic chunk size calculation -pub async fn write_file_fixed_auto<I: Into<PathBuf>, J: Into<PathBuf>, K: Into<PathBuf>>( - file_to_write: I, - storage_dir: J, - output_index_file: K, - target_chunks: usize, -) -> Result<(), StorageIOError> { - let file_path = file_to_write.into(); - let storage_dir = storage_dir.into(); - let output_index_file = output_index_file.into(); - - let file_size = fs::metadata(&file_path).await?.len(); - - let chunk_size = calculate_optimal_chunk_size(file_size, target_chunks); - write_file_fixed(file_path, storage_dir, output_index_file, chunk_size).await -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_fixed_chunking_basic() { - // Create 10KB of test data - let data: Vec<u8> = (0..10240).map(|i| (i % 256) as u8).collect(); - - // Split into 1KB chunks - let result = split_fixed_impl(&data, 1024).unwrap(); - - // Should have 10 chunks - assert_eq!(result.chunks.len(), 10); - - // Verify total size - let total_chunk_size: usize = result.chunks.iter().map(|c| c.data.len()).sum(); - assert_eq!(total_chunk_size, data.len()); - - // Verify chunk sizes (last chunk may be smaller) - for (i, chunk) in result.chunks.iter().enumerate() { - if i < 9 { - assert_eq!(chunk.data.len(), 1024); - } else { - assert_eq!(chunk.data.len(), 1024); // 10240 / 1024 = 10 exactly - } - } - } - - #[test] - fn test_fixed_chunking_uneven() { - // Create 5.5KB of test data - let data: Vec<u8> = (0..5632).map(|i| (i % 256) as u8).collect(); - - // Split into 2KB chunks - let result = split_fixed_impl(&data, 2048).unwrap(); - - // Should have 3 chunks (2048 + 2048 + 1536) - assert_eq!(result.chunks.len(), 3); - - // Verify chunk sizes - assert_eq!(result.chunks[0].data.len(), 2048); - assert_eq!(result.chunks[1].data.len(), 2048); - assert_eq!(result.chunks[2].data.len(), 1536); - - // Verify data integrity - let mut reconstructed = Vec::new(); - for chunk in &result.chunks { - reconstructed.extend_from_slice(&chunk.data); - } - assert_eq!(reconstructed, data); - } - - #[test] - fn test_fixed_chunking_small_file() { - // Small file smaller than chunk size - let data = vec![1, 2, 3, 4, 5]; - - let result = split_fixed_impl(&data, 1024).unwrap(); - - // Should have exactly one chunk - assert_eq!(result.chunks.len(), 1); - assert_eq!(result.chunks[0].data.len(), data.len()); - } - - #[test] - fn test_fixed_chunking_zero_size() { - let data = vec![1, 2, 3]; - - let result = split_fixed_impl(&data, 0); - assert!(result.is_err()); - - match result { - Err(StorageIOError::IOErr(e)) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput); - } - _ => panic!("Expected IOErr with InvalidInput"), - } - } - - #[test] - fn test_calculate_optimal_chunk_size() { - // Test basic calculation - assert_eq!(calculate_optimal_chunk_size(1024 * 1024, 16), 64 * 1024); // 1MB / 16 = 64KB - - // Test rounding to power of 2 - assert_eq!(calculate_optimal_chunk_size(1000 * 1000, 17), 64 * 1024); // ~58.8KB rounds to 64KB - - // Test minimum bound - assert_eq!(calculate_optimal_chunk_size(100, 10), 1024); // 10 bytes per chunk, but min is 1KB - - // Test edge cases - assert_eq!(calculate_optimal_chunk_size(0, 10), 64 * 1024); // Default - assert_eq!(calculate_optimal_chunk_size(1000, 0), 64 * 1024); // Default - - // Test large file - assert_eq!( - calculate_optimal_chunk_size(100 * 1024 * 1024, 10), - 16 * 1024 * 1024 - ); // 100MB / 10 = 10MB, max is 16MB - } - - #[test] - fn test_chunk_hash_uniqueness() { - // Test that different data produces different hashes - let data1 = vec![1, 2, 3, 4, 5]; - let data2 = vec![1, 2, 3, 4, 6]; - - let result1 = split_fixed_impl(&data1, 1024).unwrap(); - let result2 = split_fixed_impl(&data2, 1024).unwrap(); - - assert_ne!(result1.chunks[0].hash, result2.chunks[0].hash); - } -} diff --git a/systems/storage/src/store/line.rs b/systems/storage/src/store/line.rs deleted file mode 100644 index 971018b..0000000 --- a/systems/storage/src/store/line.rs +++ /dev/null @@ -1,393 +0,0 @@ -use std::path::PathBuf; - -use crate::{error::StorageIOError, store::ChunkingResult}; - -/// Split data by lines (newline characters) -pub fn split_by_lines_impl(data: &[u8]) -> Result<ChunkingResult, StorageIOError> { - let mut chunks = Vec::new(); - let mut start = 0; - let total_size = data.len(); - - // Iterate through data to find line boundaries - let mut i = 0; - while i < data.len() { - if data[i] == b'\n' { - // Unix line ending - let line_end = i + 1; // Include \n - // Extract line data (include newline character) - let line_data = data[start..line_end].to_vec(); - - // Create chunk for this line - let chunk = crate::store::create_chunk(line_data); - chunks.push(chunk); - - // Move start to next line - start = line_end; - i = line_end; - } else if data[i] == b'\r' && i + 1 < data.len() && data[i + 1] == b'\n' { - // Windows line ending - let line_end = i + 2; // Include both \r and \n - // Extract line data (include newline characters) - let line_data = data[start..line_end].to_vec(); - - // Create chunk for this line - let chunk = crate::store::create_chunk(line_data); - chunks.push(chunk); - - // Move start to next line - start = line_end; - i = line_end; - } else { - i += 1; - } - } - - // Handle remaining data (last line without newline) - if start < total_size { - let line_data = data[start..].to_vec(); - let chunk = crate::store::create_chunk(line_data); - chunks.push(chunk); - } - - // Handle empty file (no lines) - if chunks.is_empty() && total_size == 0 { - let chunk = crate::store::create_chunk(Vec::new()); - chunks.push(chunk); - } - - Ok(ChunkingResult { - chunks, - total_size: total_size as u64, - }) -} - -/// Split file by lines -pub async fn write_file_line<I: Into<PathBuf>>( - file_to_write: I, - storage_dir: I, - output_index_file: I, -) -> Result<(), StorageIOError> { - use crate::store::{StorageConfig, write_file}; - - let config = StorageConfig::line(); - write_file(file_to_write, storage_dir, output_index_file, &config).await -} - -/// Utility function to split data by lines with custom line ending detection -pub fn split_by_lines_custom<E: LineEnding>(data: &[u8]) -> Result<ChunkingResult, StorageIOError> { - let mut chunks = Vec::new(); - let mut start = 0; - let total_size = data.len(); - - let mut i = 0; - while i < total_size { - if E::is_line_ending(data, i) { - let line_end = i + E::ending_length(data, i); - let line_data = data[start..line_end].to_vec(); - - let chunk = crate::store::create_chunk(line_data); - chunks.push(chunk); - - start = line_end; - i = line_end; - } else { - i += 1; - } - } - - // Handle remaining data - if start < total_size { - let line_data = data[start..].to_vec(); - let chunk = crate::store::create_chunk(line_data); - chunks.push(chunk); - } - - // Handle empty file - if chunks.is_empty() && total_size == 0 { - let chunk = crate::store::create_chunk(Vec::new()); - chunks.push(chunk); - } - - Ok(ChunkingResult { - chunks, - total_size: total_size as u64, - }) -} - -/// Trait for different line ending types -pub trait LineEnding { - /// Check if position i is the start of a line ending - fn is_line_ending(data: &[u8], i: usize) -> bool; - - /// Get the length of the line ending at position i - fn ending_length(data: &[u8], i: usize) -> usize; -} - -/// Unix line endings (\n) -pub struct UnixLineEnding; - -impl LineEnding for UnixLineEnding { - fn is_line_ending(data: &[u8], i: usize) -> bool { - i < data.len() && data[i] == b'\n' - } - - fn ending_length(_data: &[u8], _i: usize) -> usize { - 1 - } -} - -/// Windows line endings (\r\n) -pub struct WindowsLineEnding; - -impl LineEnding for WindowsLineEnding { - fn is_line_ending(data: &[u8], i: usize) -> bool { - i + 1 < data.len() && data[i] == b'\r' && data[i + 1] == b'\n' - } - - fn ending_length(_data: &[u8], _i: usize) -> usize { - 2 - } -} - -/// Mixed line endings (detects both Unix and Windows) -pub struct MixedLineEnding; - -impl LineEnding for MixedLineEnding { - fn is_line_ending(data: &[u8], i: usize) -> bool { - if i < data.len() && data[i] == b'\n' { - true - } else if i + 1 < data.len() && data[i] == b'\r' && data[i + 1] == b'\n' { - true - } else { - false - } - } - - fn ending_length(data: &[u8], i: usize) -> usize { - if i < data.len() && data[i] == b'\n' { - 1 - } else if i + 1 < data.len() && data[i] == b'\r' && data[i + 1] == b'\n' { - 2 - } else { - 1 // Default to 1 if somehow called incorrectly - } - } -} - -/// Detect line ending type from data -pub fn detect_line_ending(data: &[u8]) -> LineEndingType { - let mut unix_count = 0; - let mut windows_count = 0; - - let mut i = 0; - while i < data.len() { - if data[i] == b'\n' { - unix_count += 1; - i += 1; - } else if i + 1 < data.len() && data[i] == b'\r' && data[i + 1] == b'\n' { - windows_count += 1; - i += 2; - } else { - i += 1; - } - } - - if unix_count > windows_count { - LineEndingType::Unix - } else if windows_count > unix_count { - LineEndingType::Windows - } else { - LineEndingType::Mixed - } -} - -/// Line ending type enum -#[derive(Debug, Clone, Copy, PartialEq)] -pub enum LineEndingType { - Unix, - Windows, - Mixed, -} - -impl LineEndingType { - /// Split data using the detected line ending type - pub fn split_by_lines(&self, data: &[u8]) -> Result<ChunkingResult, StorageIOError> { - match self { - LineEndingType::Unix => split_by_lines_custom::<UnixLineEnding>(data), - LineEndingType::Windows => split_by_lines_custom::<WindowsLineEnding>(data), - LineEndingType::Mixed => split_by_lines_custom::<MixedLineEnding>(data), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_line_chunking_unix() { - let data = b"Hello\nWorld\nTest\n"; - - let result = split_by_lines_impl(data).unwrap(); - - // Should have 3 chunks - assert_eq!(result.chunks.len(), 3); - - // Verify chunk contents - assert_eq!(result.chunks[0].data, b"Hello\n"); - assert_eq!(result.chunks[1].data, b"World\n"); - assert_eq!(result.chunks[2].data, b"Test\n"); - - // Verify total size - let total_chunk_size: usize = result.chunks.iter().map(|c| c.data.len()).sum(); - assert_eq!(total_chunk_size, data.len()); - } - - #[test] - fn test_line_chunking_windows() { - let data = b"Hello\r\nWorld\r\nTest\r\n"; - - let result = split_by_lines_impl(data).unwrap(); - - // Should have 3 chunks - assert_eq!(result.chunks.len(), 3); - - // Verify chunk contents (should include \r\n) - assert_eq!(result.chunks[0].data, b"Hello\r\n"); - assert_eq!(result.chunks[1].data, b"World\r\n"); - assert_eq!(result.chunks[2].data, b"Test\r\n"); - } - - #[test] - fn test_line_chunking_mixed() { - let data = b"Hello\nWorld\r\nTest\n"; - - let result = split_by_lines_impl(data).unwrap(); - - // Should have 3 chunks - assert_eq!(result.chunks.len(), 3); - - // Verify chunk contents - assert_eq!(result.chunks[0].data, b"Hello\n"); - assert_eq!(result.chunks[1].data, b"World\r\n"); - assert_eq!(result.chunks[2].data, b"Test\n"); - } - - #[test] - fn test_line_chunking_no_trailing_newline() { - let data = b"Hello\nWorld\nTest"; - - let result = split_by_lines_impl(data).unwrap(); - - // Should have 3 chunks - assert_eq!(result.chunks.len(), 3); - - // Verify chunk contents - assert_eq!(result.chunks[0].data, b"Hello\n"); - assert_eq!(result.chunks[1].data, b"World\n"); - assert_eq!(result.chunks[2].data, b"Test"); - } - - #[test] - fn test_line_chunking_empty_lines() { - let data = b"Hello\n\nWorld\n\n\n"; - - let result = split_by_lines_impl(data).unwrap(); - - // Should have 5 chunks (including empty lines) - // "Hello\n", "\n", "World\n", "\n", "\n" - assert_eq!(result.chunks.len(), 5); - - // Verify chunk contents - assert_eq!(result.chunks[0].data, b"Hello\n"); - assert_eq!(result.chunks[1].data, b"\n"); - assert_eq!(result.chunks[2].data, b"World\n"); - assert_eq!(result.chunks[3].data, b"\n"); - assert_eq!(result.chunks[4].data, b"\n"); - } - - #[test] - fn test_line_chunking_empty_file() { - let data = b""; - - let result = split_by_lines_impl(data).unwrap(); - - // Should have 1 empty chunk - assert_eq!(result.chunks.len(), 1); - assert_eq!(result.chunks[0].data, b""); - } - - #[test] - fn test_detect_line_ending() { - // Test Unix detection - let unix_data = b"Hello\nWorld\n"; - assert_eq!(detect_line_ending(unix_data), LineEndingType::Unix); - - // Test Windows detection - let windows_data = b"Hello\r\nWorld\r\n"; - assert_eq!(detect_line_ending(windows_data), LineEndingType::Windows); - - // Test mixed detection - let mixed_data = b"Hello\nWorld\r\n"; - assert_eq!(detect_line_ending(mixed_data), LineEndingType::Mixed); - - // Test no newlines - let no_newlines = b"Hello World"; - assert_eq!(detect_line_ending(no_newlines), LineEndingType::Mixed); - } - - #[test] - fn test_custom_line_ending_unix() { - let data = b"Hello\nWorld\n"; - - let result = split_by_lines_custom::<UnixLineEnding>(data).unwrap(); - - assert_eq!(result.chunks.len(), 2); - assert_eq!(result.chunks[0].data, b"Hello\n"); - assert_eq!(result.chunks[1].data, b"World\n"); - } - - #[test] - fn test_custom_line_ending_windows() { - let data = b"Hello\r\nWorld\r\n"; - - let result = split_by_lines_custom::<WindowsLineEnding>(data).unwrap(); - - assert_eq!(result.chunks.len(), 2); - assert_eq!(result.chunks[0].data, b"Hello\r\n"); - assert_eq!(result.chunks[1].data, b"World\r\n"); - } - - #[test] - fn test_line_ending_type_split() { - let unix_data = b"Hello\nWorld\n"; - let windows_data = b"Hello\r\nWorld\r\n"; - let mixed_data = b"Hello\nWorld\r\n"; - - // Test Unix - let unix_result = LineEndingType::Unix.split_by_lines(unix_data).unwrap(); - assert_eq!(unix_result.chunks.len(), 2); - - // Test Windows - let windows_result = LineEndingType::Windows - .split_by_lines(windows_data) - .unwrap(); - assert_eq!(windows_result.chunks.len(), 2); - - // Test Mixed - let mixed_result = LineEndingType::Mixed.split_by_lines(mixed_data).unwrap(); - assert_eq!(mixed_result.chunks.len(), 2); - } - - #[test] - fn test_chunk_hash_uniqueness() { - // Test that different lines produce different hashes - let data1 = b"Hello\n"; - let data2 = b"World\n"; - - let result1 = split_by_lines_impl(data1).unwrap(); - let result2 = split_by_lines_impl(data2).unwrap(); - - assert_ne!(result1.chunks[0].hash, result2.chunks[0].hash); - } -} |
