From 47e0ffd50427440696c245814517e4f5fa94ed83 Mon Sep 17 00:00:00 2001 From: 魏曹先生 <1992414357@qq.com> Date: Sun, 8 Mar 2026 22:48:54 +0800 Subject: Move action system to legacy and remove storage system --- systems/storage/src/store/cdc.rs | 307 --------------------------- systems/storage/src/store/fixed.rs | 417 ------------------------------------- systems/storage/src/store/line.rs | 393 ---------------------------------- 3 files changed, 1117 deletions(-) delete mode 100644 systems/storage/src/store/cdc.rs delete mode 100644 systems/storage/src/store/fixed.rs delete mode 100644 systems/storage/src/store/line.rs (limited to 'systems/storage/src/store') diff --git a/systems/storage/src/store/cdc.rs b/systems/storage/src/store/cdc.rs deleted file mode 100644 index cc16202..0000000 --- a/systems/storage/src/store/cdc.rs +++ /dev/null @@ -1,307 +0,0 @@ -use std::path::PathBuf; - -use crate::{error::StorageIOError, store::ChunkingResult}; - -/// Rabin fingerprint parameters for CDC -const WINDOW_SIZE: usize = 48; -const MIN_CHUNK_RATIO: f64 = 0.75; -const MAX_CHUNK_RATIO: f64 = 2.0; -const BASE_TARGET_MASK: u64 = 0xFFFF; - -/// Rabin fingerprint polynomial (irreducible polynomial) -const POLYNOMIAL: u64 = 0x3DA3358B4DC173; - -/// Precomputed table for Rabin fingerprint sliding window -static FINGERPRINT_TABLE: [u64; 256] = { - let mut table = [0u64; 256]; - let mut i = 0; - while i < 256 { - let mut fingerprint = i as u64; - let mut j = 0; - while j < 8 { - if fingerprint & 1 != 0 { - fingerprint = (fingerprint >> 1) ^ POLYNOMIAL; - } else { - fingerprint >>= 1; - } - j += 1; - } - table[i] = fingerprint; - i += 1; - } - table -}; - -/// Calculate Rabin fingerprint for a byte -#[inline] -fn rabin_fingerprint_byte(old_fingerprint: u64, old_byte: u8, new_byte: u8) -> u64 { - let old_byte_index = old_byte as usize; - - let fingerprint = - (old_fingerprint << 8) ^ (new_byte as u64) ^ FINGERPRINT_TABLE[old_byte_index]; - fingerprint -} - -/// Split data using Content-Defined Chunking with Rabin fingerprinting -pub fn split_cdc_impl(data: &[u8], avg_size: u32) -> Result { - let avg_size = avg_size as usize; - - if avg_size == 0 { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "Average chunk size must be greater than 0", - ) - .into()); - } - - // Calculate min and max chunk sizes based on average size - let min_chunk_size = (avg_size as f64 * MIN_CHUNK_RATIO) as usize; - let max_chunk_size = (avg_size as f64 * MAX_CHUNK_RATIO) as usize; - - // Ensure reasonable bounds - let min_chunk_size = min_chunk_size.max(512); // At least 512 bytes - let max_chunk_size = max_chunk_size.max(avg_size * 2); - - let target_mask = { - let scale_factor = avg_size as f64 / (64.0 * 1024.0); - let mask_value = (BASE_TARGET_MASK as f64 * scale_factor) as u64; - mask_value.max(0xC000) - }; - - let mut chunks = Vec::new(); - let mut start = 0; - let data_len = data.len(); - - while start < data_len { - let chunk_start = start; - - let search_end = (start + max_chunk_size).min(data_len); - let mut boundary_found = false; - let mut boundary_pos = search_end; - - let mut window = Vec::with_capacity(WINDOW_SIZE); - let mut fingerprint: u64 = 0; - - for i in start..search_end { - let byte = data[i]; - - // Update sliding window - if window.len() >= WINDOW_SIZE { - let old_byte = window.remove(0); - fingerprint = rabin_fingerprint_byte(fingerprint, old_byte, byte); - } else { - fingerprint = (fingerprint << 8) ^ (byte as u64); - } - window.push(byte); - - if i - chunk_start >= min_chunk_size { - if (fingerprint & target_mask) == 0 { - boundary_found = true; - boundary_pos = i + 1; - break; - } - } - } - - let chunk_end = if boundary_found { - boundary_pos - } else { - search_end - }; - - let chunk_data = data[chunk_start..chunk_end].to_vec(); - let chunk = crate::store::create_chunk(chunk_data); - chunks.push(chunk); - - start = chunk_end; - } - - if chunks.len() > 1 { - let mut merged_chunks = Vec::new(); - let mut i = 0; - - while i < chunks.len() { - let current_chunk = &chunks[i]; - - if i < chunks.len() - 1 { - let next_chunk = &chunks[i + 1]; - if current_chunk.data.len() < min_chunk_size - && current_chunk.data.len() + next_chunk.data.len() <= max_chunk_size - { - // Merge chunks - let mut merged_data = current_chunk.data.clone(); - merged_data.extend_from_slice(&next_chunk.data); - let merged_chunk = crate::store::create_chunk(merged_data); - merged_chunks.push(merged_chunk); - i += 2; - } else { - merged_chunks.push(current_chunk.clone()); - i += 1; - } - } else { - merged_chunks.push(current_chunk.clone()); - i += 1; - } - } - - chunks = merged_chunks; - } - - Ok(ChunkingResult { - chunks, - total_size: data_len as u64, - }) -} - -/// Split file using CDC with specified average chunk size -pub async fn write_file_cdc>( - file_to_write: I, - storage_dir: I, - output_index_file: I, - avg_size: u32, -) -> Result<(), StorageIOError> { - use crate::store::{StorageConfig, write_file}; - - let config = StorageConfig::cdc(avg_size); - write_file(file_to_write, storage_dir, output_index_file, &config).await -} - -/// Test function for CDC chunking -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_cdc_basic() { - let pattern: Vec = (0..1024).map(|i| (i % 256) as u8).collect(); - let mut data = Vec::new(); - for _ in 0..10 { - data.extend_from_slice(&pattern); - } - - let result = split_cdc_impl(&data, 4096).unwrap(); - - // Should have at least one chunk - assert!(!result.chunks.is_empty()); - - assert!( - result.chunks.len() >= 1 && result.chunks.len() <= 10, - "Expected 1-10 chunks for 10KB data with 4KB average, got {}", - result.chunks.len() - ); - - // Verify total size - let total_chunk_size: usize = result.chunks.iter().map(|c| c.data.len()).sum(); - assert_eq!(total_chunk_size, data.len()); - - // Verify chunk size bounds (more relaxed for CDC) - let max_size = (4096.0 * MAX_CHUNK_RATIO) as usize; - - for chunk in &result.chunks { - let chunk_size = chunk.data.len(); - // CDC can produce chunks smaller than min_size due to content patterns - // But they should not exceed max_size - assert!( - chunk_size <= max_size, - "Chunk size {} exceeds maximum {}", - chunk_size, - max_size - ); - } - } - - #[test] - fn test_cdc_small_file() { - // Small file should result in single chunk - let data = vec![1, 2, 3, 4, 5]; - - let result = split_cdc_impl(&data, 4096).unwrap(); - - // Should have exactly one chunk - assert_eq!(result.chunks.len(), 1); - assert_eq!(result.chunks[0].data.len(), data.len()); - } - - #[test] - fn test_cdc_large_avg_size() { - // Test with larger average size (256KB) - let mut data = Vec::new(); - for i in 0..(256 * 1024 * 3) { - // 768KB of data - data.push((i % 256) as u8); - } - - let result = split_cdc_impl(&data, 256 * 1024).unwrap(); - - // With 768KB data and 256KB average, should have reasonable number of chunks - // CDC is content-defined, so exact count varies - assert!( - result.chunks.len() >= 1 && result.chunks.len() <= 10, - "Expected 1-10 chunks for 768KB data with 256KB average, got {}", - result.chunks.len() - ); - - // Verify total size - let total_chunk_size: usize = result.chunks.iter().map(|c| c.data.len()).sum(); - assert_eq!(total_chunk_size, data.len()); - - // Verify chunk size bounds - let max_size = ((256 * 1024) as f64 * MAX_CHUNK_RATIO) as usize; - - for chunk in &result.chunks { - // CDC can produce chunks smaller than min_size - // But they should not exceed max_size - assert!( - chunk.data.len() <= max_size, - "Chunk size {} exceeds maximum {}", - chunk.data.len(), - max_size - ); - } - } - - #[test] - fn test_cdc_zero_avg_size() { - let data = vec![1, 2, 3]; - - let result = split_cdc_impl(&data, 0); - assert!(result.is_err()); - - match result { - Err(StorageIOError::IOErr(e)) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput); - } - _ => panic!("Expected IOErr with InvalidInput"), - } - } - - #[test] - fn test_rabin_fingerprint() { - // Test that rabin_fingerprint_byte function is deterministic - let test_bytes = [0x01, 0x02, 0x03, 0x04]; - - // Calculate fingerprint with specific sequence - let mut fingerprint1 = 0; - for &byte in &test_bytes { - fingerprint1 = rabin_fingerprint_byte(fingerprint1, 0xAA, byte); - } - - // Calculate again with same inputs - let mut fingerprint2 = 0; - for &byte in &test_bytes { - fingerprint2 = rabin_fingerprint_byte(fingerprint2, 0xAA, byte); - } - - // Same input should produce same output - assert_eq!(fingerprint1, fingerprint2); - - // Test with different old_byte produces different result - let mut fingerprint3 = 0; - for &byte in &test_bytes { - fingerprint3 = rabin_fingerprint_byte(fingerprint3, 0xBB, byte); - } - - // Different old_byte should produce different fingerprint - assert_ne!(fingerprint1, fingerprint3); - } -} diff --git a/systems/storage/src/store/fixed.rs b/systems/storage/src/store/fixed.rs deleted file mode 100644 index 044cc1c..0000000 --- a/systems/storage/src/store/fixed.rs +++ /dev/null @@ -1,417 +0,0 @@ -use std::path::PathBuf; -use std::time::Instant; - -use log::{info, trace}; -use memmap2::Mmap; -use tokio::fs; -use tokio::task; - -use crate::{ - error::StorageIOError, - store::{ChunkingResult, IndexEntry, StorageConfig, create_chunk, get_dir, precheck}, -}; - -/// Split data using fixed-size chunking -pub fn split_fixed_impl(data: &[u8], chunk_size: u32) -> Result { - let chunk_size = chunk_size as usize; - - if chunk_size == 0 { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "Chunk size must be greater than 0", - ) - .into()); - } - - let mut chunks = Vec::new(); - let mut start = 0; - let total_size = data.len(); - - while start < total_size { - let end = (start + chunk_size).min(total_size); - let chunk_data = data[start..end].to_vec(); - - let chunk = crate::store::create_chunk(chunk_data); - chunks.push(chunk); - - start = end; - } - - Ok(ChunkingResult { - chunks, - total_size: total_size as u64, - }) -} - -/// Split file using fixed-size chunking -pub async fn write_file_fixed>( - file_to_write: I, - storage_dir: I, - output_index_file: I, - fixed_size: u32, -) -> Result<(), StorageIOError> { - let config = StorageConfig::fixed_size(fixed_size); - write_file_parallel(file_to_write, storage_dir, output_index_file, &config).await -} - -/// Split file using fixed-size chunking with parallel processing -async fn write_file_parallel( - file_to_write: impl Into, - storage_dir: impl Into, - output_index_file: impl Into, - cfg: &StorageConfig, -) -> Result<(), StorageIOError> { - let (file_to_write, storage_dir, output_index_file) = - precheck(file_to_write, storage_dir, output_index_file).await?; - - info!("Starting file write: {}", file_to_write.display()); - let start_time = Instant::now(); - - // Memory map the entire file - let file = std::fs::File::open(&file_to_write)?; - let mmap = unsafe { Mmap::map(&file)? }; - let data = &mmap[..]; - - // Split into chunks based on policy - let chunking_result = split_into_chunks_parallel(&data, &cfg.chunking_policy).await?; - - // Store chunks in parallel and create index - let index_entries = store_chunks_parallel(&chunking_result.chunks, &storage_dir).await?; - - // Write index file - write_index_file(&index_entries, &output_index_file).await?; - - let duration = start_time.elapsed(); - info!( - "File write completed in {:?}: {}", - duration, - file_to_write.display() - ); - - Ok(()) -} - -/// Split data into chunks based on the specified policy with parallel processing -async fn split_into_chunks_parallel( - data: &[u8], - policy: &crate::store::ChunkingPolicy, -) -> Result { - match policy { - crate::store::ChunkingPolicy::FixedSize(chunk_size) => { - split_fixed_parallel(data, *chunk_size).await - } - _ => split_fixed_impl(data, 64 * 1024), // Fallback for non-fixed chunking - } -} - -/// Split data using fixed-size chunking with parallel processing -async fn split_fixed_parallel( - data: &[u8], - chunk_size: u32, -) -> Result { - let chunk_size = chunk_size as usize; - - if chunk_size == 0 { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "Chunk size must be greater than 0", - ) - .into()); - } - - let total_size = data.len(); - let num_chunks = (total_size + chunk_size - 1) / chunk_size; // Ceiling division - - // Create a vector to hold chunk boundaries - let mut chunk_boundaries = Vec::with_capacity(num_chunks); - let mut start = 0; - - while start < total_size { - let end = (start + chunk_size).min(total_size); - chunk_boundaries.push((start, end)); - start = end; - } - - // Process chunks in parallel using Tokio tasks - let chunks: Vec = if chunk_boundaries.len() > 1 { - // Use parallel processing for multiple chunks - let mut tasks = Vec::with_capacity(chunk_boundaries.len()); - - for (start, end) in chunk_boundaries { - let chunk_data = data[start..end].to_vec(); - - // Spawn a blocking task for each chunk - tasks.push(task::spawn_blocking(move || create_chunk(chunk_data))); - } - - // Wait for all tasks to complete - let mut chunks = Vec::with_capacity(tasks.len()); - for task in tasks { - match task.await { - Ok(chunk) => chunks.push(chunk), - Err(e) => { - return Err(std::io::Error::new( - std::io::ErrorKind::Other, - format!("Task join error: {}", e), - ) - .into()); - } - } - } - - chunks - } else { - // Single chunk, no need for parallel processing - chunk_boundaries - .into_iter() - .map(|(start, end)| { - let chunk_data = data[start..end].to_vec(); - create_chunk(chunk_data) - }) - .collect() - }; - - Ok(ChunkingResult { - chunks, - total_size: total_size as u64, - }) -} - -/// Store chunks in the storage directory in parallel and return index entries -async fn store_chunks_parallel( - chunks: &[crate::store::Chunk], - storage_dir: &std::path::Path, -) -> Result, StorageIOError> { - let mut tasks = Vec::with_capacity(chunks.len()); - - let writed_counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); - let total_chunks = chunks.len(); - - for chunk in chunks { - let chunk = chunk.clone(); - let storage_dir = storage_dir.to_path_buf(); - let writed_counter = writed_counter.clone(); - - // Spawn async task for each chunk storage operation - tasks.push(task::spawn(async move { - // Create storage directory structure based on hash - let hash_str = hex::encode(chunk.hash); - let chunk_dir = get_dir(storage_dir, hash_str.clone())?; - - // Create directory if it doesn't exist - if let Some(parent) = chunk_dir.parent() { - fs::create_dir_all(parent).await?; - } - - // Write chunk data - let chunk_path = chunk_dir.with_extension("chunk"); - if !chunk_path.exists() { - trace!("W: {}", hash_str); - fs::write(&chunk_path, &chunk.data).await?; - writed_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - } - - Ok::(IndexEntry { - hash: chunk.hash, - size: chunk.data.len() as u32, - }) - })); - } - - let writed_count = writed_counter.load(std::sync::atomic::Ordering::Relaxed); - info!( - "Chunk storage completed: {}/{} ({}%) chunks written ({} duplicates skipped)", - writed_count, - total_chunks, - (writed_count as f32 / total_chunks as f32) * 100 as f32, - total_chunks - writed_count - ); - - // Wait for all tasks to complete - let mut index_entries = Vec::with_capacity(chunks.len()); - for task in tasks { - let entry = task.await.map_err(|e| { - std::io::Error::new(std::io::ErrorKind::Other, format!("Task join error: {}", e)) - })??; - index_entries.push(entry); - } - - Ok(index_entries) -} - -/// Write index file containing chunk hashes and sizes -async fn write_index_file( - entries: &[IndexEntry], - output_path: &std::path::Path, -) -> Result<(), StorageIOError> { - let mut index_data = Vec::with_capacity(entries.len() * 36); // 32 bytes hash + 4 bytes size - - for entry in entries { - index_data.extend_from_slice(&entry.hash); - index_data.extend_from_slice(&entry.size.to_le_bytes()); - } - - fs::write(output_path, &index_data).await?; - Ok(()) -} - -/// Utility function to calculate optimal fixed chunk size based on file size -pub fn calculate_optimal_chunk_size(file_size: u64, target_chunks: usize) -> u32 { - if target_chunks == 0 || file_size == 0 { - return 64 * 1024; // Default 64KB - } - - let chunk_size = (file_size as f64 / target_chunks as f64).ceil() as u32; - - // Round to nearest power of 2 for better performance - let rounded_size = if chunk_size <= 1024 { - // Small chunks: use exact size - chunk_size - } else { - // Larger chunks: round to nearest power of 2 - let mut size = chunk_size; - size -= 1; - size |= size >> 1; - size |= size >> 2; - size |= size >> 4; - size |= size >> 8; - size |= size >> 16; - size += 1; - size - }; - - // Ensure minimum and maximum bounds - rounded_size.max(1024).min(16 * 1024 * 1024) // 1KB min, 16MB max -} - -/// Split file with automatic chunk size calculation -pub async fn write_file_fixed_auto, J: Into, K: Into>( - file_to_write: I, - storage_dir: J, - output_index_file: K, - target_chunks: usize, -) -> Result<(), StorageIOError> { - let file_path = file_to_write.into(); - let storage_dir = storage_dir.into(); - let output_index_file = output_index_file.into(); - - let file_size = fs::metadata(&file_path).await?.len(); - - let chunk_size = calculate_optimal_chunk_size(file_size, target_chunks); - write_file_fixed(file_path, storage_dir, output_index_file, chunk_size).await -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_fixed_chunking_basic() { - // Create 10KB of test data - let data: Vec = (0..10240).map(|i| (i % 256) as u8).collect(); - - // Split into 1KB chunks - let result = split_fixed_impl(&data, 1024).unwrap(); - - // Should have 10 chunks - assert_eq!(result.chunks.len(), 10); - - // Verify total size - let total_chunk_size: usize = result.chunks.iter().map(|c| c.data.len()).sum(); - assert_eq!(total_chunk_size, data.len()); - - // Verify chunk sizes (last chunk may be smaller) - for (i, chunk) in result.chunks.iter().enumerate() { - if i < 9 { - assert_eq!(chunk.data.len(), 1024); - } else { - assert_eq!(chunk.data.len(), 1024); // 10240 / 1024 = 10 exactly - } - } - } - - #[test] - fn test_fixed_chunking_uneven() { - // Create 5.5KB of test data - let data: Vec = (0..5632).map(|i| (i % 256) as u8).collect(); - - // Split into 2KB chunks - let result = split_fixed_impl(&data, 2048).unwrap(); - - // Should have 3 chunks (2048 + 2048 + 1536) - assert_eq!(result.chunks.len(), 3); - - // Verify chunk sizes - assert_eq!(result.chunks[0].data.len(), 2048); - assert_eq!(result.chunks[1].data.len(), 2048); - assert_eq!(result.chunks[2].data.len(), 1536); - - // Verify data integrity - let mut reconstructed = Vec::new(); - for chunk in &result.chunks { - reconstructed.extend_from_slice(&chunk.data); - } - assert_eq!(reconstructed, data); - } - - #[test] - fn test_fixed_chunking_small_file() { - // Small file smaller than chunk size - let data = vec![1, 2, 3, 4, 5]; - - let result = split_fixed_impl(&data, 1024).unwrap(); - - // Should have exactly one chunk - assert_eq!(result.chunks.len(), 1); - assert_eq!(result.chunks[0].data.len(), data.len()); - } - - #[test] - fn test_fixed_chunking_zero_size() { - let data = vec![1, 2, 3]; - - let result = split_fixed_impl(&data, 0); - assert!(result.is_err()); - - match result { - Err(StorageIOError::IOErr(e)) => { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput); - } - _ => panic!("Expected IOErr with InvalidInput"), - } - } - - #[test] - fn test_calculate_optimal_chunk_size() { - // Test basic calculation - assert_eq!(calculate_optimal_chunk_size(1024 * 1024, 16), 64 * 1024); // 1MB / 16 = 64KB - - // Test rounding to power of 2 - assert_eq!(calculate_optimal_chunk_size(1000 * 1000, 17), 64 * 1024); // ~58.8KB rounds to 64KB - - // Test minimum bound - assert_eq!(calculate_optimal_chunk_size(100, 10), 1024); // 10 bytes per chunk, but min is 1KB - - // Test edge cases - assert_eq!(calculate_optimal_chunk_size(0, 10), 64 * 1024); // Default - assert_eq!(calculate_optimal_chunk_size(1000, 0), 64 * 1024); // Default - - // Test large file - assert_eq!( - calculate_optimal_chunk_size(100 * 1024 * 1024, 10), - 16 * 1024 * 1024 - ); // 100MB / 10 = 10MB, max is 16MB - } - - #[test] - fn test_chunk_hash_uniqueness() { - // Test that different data produces different hashes - let data1 = vec![1, 2, 3, 4, 5]; - let data2 = vec![1, 2, 3, 4, 6]; - - let result1 = split_fixed_impl(&data1, 1024).unwrap(); - let result2 = split_fixed_impl(&data2, 1024).unwrap(); - - assert_ne!(result1.chunks[0].hash, result2.chunks[0].hash); - } -} diff --git a/systems/storage/src/store/line.rs b/systems/storage/src/store/line.rs deleted file mode 100644 index 971018b..0000000 --- a/systems/storage/src/store/line.rs +++ /dev/null @@ -1,393 +0,0 @@ -use std::path::PathBuf; - -use crate::{error::StorageIOError, store::ChunkingResult}; - -/// Split data by lines (newline characters) -pub fn split_by_lines_impl(data: &[u8]) -> Result { - let mut chunks = Vec::new(); - let mut start = 0; - let total_size = data.len(); - - // Iterate through data to find line boundaries - let mut i = 0; - while i < data.len() { - if data[i] == b'\n' { - // Unix line ending - let line_end = i + 1; // Include \n - // Extract line data (include newline character) - let line_data = data[start..line_end].to_vec(); - - // Create chunk for this line - let chunk = crate::store::create_chunk(line_data); - chunks.push(chunk); - - // Move start to next line - start = line_end; - i = line_end; - } else if data[i] == b'\r' && i + 1 < data.len() && data[i + 1] == b'\n' { - // Windows line ending - let line_end = i + 2; // Include both \r and \n - // Extract line data (include newline characters) - let line_data = data[start..line_end].to_vec(); - - // Create chunk for this line - let chunk = crate::store::create_chunk(line_data); - chunks.push(chunk); - - // Move start to next line - start = line_end; - i = line_end; - } else { - i += 1; - } - } - - // Handle remaining data (last line without newline) - if start < total_size { - let line_data = data[start..].to_vec(); - let chunk = crate::store::create_chunk(line_data); - chunks.push(chunk); - } - - // Handle empty file (no lines) - if chunks.is_empty() && total_size == 0 { - let chunk = crate::store::create_chunk(Vec::new()); - chunks.push(chunk); - } - - Ok(ChunkingResult { - chunks, - total_size: total_size as u64, - }) -} - -/// Split file by lines -pub async fn write_file_line>( - file_to_write: I, - storage_dir: I, - output_index_file: I, -) -> Result<(), StorageIOError> { - use crate::store::{StorageConfig, write_file}; - - let config = StorageConfig::line(); - write_file(file_to_write, storage_dir, output_index_file, &config).await -} - -/// Utility function to split data by lines with custom line ending detection -pub fn split_by_lines_custom(data: &[u8]) -> Result { - let mut chunks = Vec::new(); - let mut start = 0; - let total_size = data.len(); - - let mut i = 0; - while i < total_size { - if E::is_line_ending(data, i) { - let line_end = i + E::ending_length(data, i); - let line_data = data[start..line_end].to_vec(); - - let chunk = crate::store::create_chunk(line_data); - chunks.push(chunk); - - start = line_end; - i = line_end; - } else { - i += 1; - } - } - - // Handle remaining data - if start < total_size { - let line_data = data[start..].to_vec(); - let chunk = crate::store::create_chunk(line_data); - chunks.push(chunk); - } - - // Handle empty file - if chunks.is_empty() && total_size == 0 { - let chunk = crate::store::create_chunk(Vec::new()); - chunks.push(chunk); - } - - Ok(ChunkingResult { - chunks, - total_size: total_size as u64, - }) -} - -/// Trait for different line ending types -pub trait LineEnding { - /// Check if position i is the start of a line ending - fn is_line_ending(data: &[u8], i: usize) -> bool; - - /// Get the length of the line ending at position i - fn ending_length(data: &[u8], i: usize) -> usize; -} - -/// Unix line endings (\n) -pub struct UnixLineEnding; - -impl LineEnding for UnixLineEnding { - fn is_line_ending(data: &[u8], i: usize) -> bool { - i < data.len() && data[i] == b'\n' - } - - fn ending_length(_data: &[u8], _i: usize) -> usize { - 1 - } -} - -/// Windows line endings (\r\n) -pub struct WindowsLineEnding; - -impl LineEnding for WindowsLineEnding { - fn is_line_ending(data: &[u8], i: usize) -> bool { - i + 1 < data.len() && data[i] == b'\r' && data[i + 1] == b'\n' - } - - fn ending_length(_data: &[u8], _i: usize) -> usize { - 2 - } -} - -/// Mixed line endings (detects both Unix and Windows) -pub struct MixedLineEnding; - -impl LineEnding for MixedLineEnding { - fn is_line_ending(data: &[u8], i: usize) -> bool { - if i < data.len() && data[i] == b'\n' { - true - } else if i + 1 < data.len() && data[i] == b'\r' && data[i + 1] == b'\n' { - true - } else { - false - } - } - - fn ending_length(data: &[u8], i: usize) -> usize { - if i < data.len() && data[i] == b'\n' { - 1 - } else if i + 1 < data.len() && data[i] == b'\r' && data[i + 1] == b'\n' { - 2 - } else { - 1 // Default to 1 if somehow called incorrectly - } - } -} - -/// Detect line ending type from data -pub fn detect_line_ending(data: &[u8]) -> LineEndingType { - let mut unix_count = 0; - let mut windows_count = 0; - - let mut i = 0; - while i < data.len() { - if data[i] == b'\n' { - unix_count += 1; - i += 1; - } else if i + 1 < data.len() && data[i] == b'\r' && data[i + 1] == b'\n' { - windows_count += 1; - i += 2; - } else { - i += 1; - } - } - - if unix_count > windows_count { - LineEndingType::Unix - } else if windows_count > unix_count { - LineEndingType::Windows - } else { - LineEndingType::Mixed - } -} - -/// Line ending type enum -#[derive(Debug, Clone, Copy, PartialEq)] -pub enum LineEndingType { - Unix, - Windows, - Mixed, -} - -impl LineEndingType { - /// Split data using the detected line ending type - pub fn split_by_lines(&self, data: &[u8]) -> Result { - match self { - LineEndingType::Unix => split_by_lines_custom::(data), - LineEndingType::Windows => split_by_lines_custom::(data), - LineEndingType::Mixed => split_by_lines_custom::(data), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_line_chunking_unix() { - let data = b"Hello\nWorld\nTest\n"; - - let result = split_by_lines_impl(data).unwrap(); - - // Should have 3 chunks - assert_eq!(result.chunks.len(), 3); - - // Verify chunk contents - assert_eq!(result.chunks[0].data, b"Hello\n"); - assert_eq!(result.chunks[1].data, b"World\n"); - assert_eq!(result.chunks[2].data, b"Test\n"); - - // Verify total size - let total_chunk_size: usize = result.chunks.iter().map(|c| c.data.len()).sum(); - assert_eq!(total_chunk_size, data.len()); - } - - #[test] - fn test_line_chunking_windows() { - let data = b"Hello\r\nWorld\r\nTest\r\n"; - - let result = split_by_lines_impl(data).unwrap(); - - // Should have 3 chunks - assert_eq!(result.chunks.len(), 3); - - // Verify chunk contents (should include \r\n) - assert_eq!(result.chunks[0].data, b"Hello\r\n"); - assert_eq!(result.chunks[1].data, b"World\r\n"); - assert_eq!(result.chunks[2].data, b"Test\r\n"); - } - - #[test] - fn test_line_chunking_mixed() { - let data = b"Hello\nWorld\r\nTest\n"; - - let result = split_by_lines_impl(data).unwrap(); - - // Should have 3 chunks - assert_eq!(result.chunks.len(), 3); - - // Verify chunk contents - assert_eq!(result.chunks[0].data, b"Hello\n"); - assert_eq!(result.chunks[1].data, b"World\r\n"); - assert_eq!(result.chunks[2].data, b"Test\n"); - } - - #[test] - fn test_line_chunking_no_trailing_newline() { - let data = b"Hello\nWorld\nTest"; - - let result = split_by_lines_impl(data).unwrap(); - - // Should have 3 chunks - assert_eq!(result.chunks.len(), 3); - - // Verify chunk contents - assert_eq!(result.chunks[0].data, b"Hello\n"); - assert_eq!(result.chunks[1].data, b"World\n"); - assert_eq!(result.chunks[2].data, b"Test"); - } - - #[test] - fn test_line_chunking_empty_lines() { - let data = b"Hello\n\nWorld\n\n\n"; - - let result = split_by_lines_impl(data).unwrap(); - - // Should have 5 chunks (including empty lines) - // "Hello\n", "\n", "World\n", "\n", "\n" - assert_eq!(result.chunks.len(), 5); - - // Verify chunk contents - assert_eq!(result.chunks[0].data, b"Hello\n"); - assert_eq!(result.chunks[1].data, b"\n"); - assert_eq!(result.chunks[2].data, b"World\n"); - assert_eq!(result.chunks[3].data, b"\n"); - assert_eq!(result.chunks[4].data, b"\n"); - } - - #[test] - fn test_line_chunking_empty_file() { - let data = b""; - - let result = split_by_lines_impl(data).unwrap(); - - // Should have 1 empty chunk - assert_eq!(result.chunks.len(), 1); - assert_eq!(result.chunks[0].data, b""); - } - - #[test] - fn test_detect_line_ending() { - // Test Unix detection - let unix_data = b"Hello\nWorld\n"; - assert_eq!(detect_line_ending(unix_data), LineEndingType::Unix); - - // Test Windows detection - let windows_data = b"Hello\r\nWorld\r\n"; - assert_eq!(detect_line_ending(windows_data), LineEndingType::Windows); - - // Test mixed detection - let mixed_data = b"Hello\nWorld\r\n"; - assert_eq!(detect_line_ending(mixed_data), LineEndingType::Mixed); - - // Test no newlines - let no_newlines = b"Hello World"; - assert_eq!(detect_line_ending(no_newlines), LineEndingType::Mixed); - } - - #[test] - fn test_custom_line_ending_unix() { - let data = b"Hello\nWorld\n"; - - let result = split_by_lines_custom::(data).unwrap(); - - assert_eq!(result.chunks.len(), 2); - assert_eq!(result.chunks[0].data, b"Hello\n"); - assert_eq!(result.chunks[1].data, b"World\n"); - } - - #[test] - fn test_custom_line_ending_windows() { - let data = b"Hello\r\nWorld\r\n"; - - let result = split_by_lines_custom::(data).unwrap(); - - assert_eq!(result.chunks.len(), 2); - assert_eq!(result.chunks[0].data, b"Hello\r\n"); - assert_eq!(result.chunks[1].data, b"World\r\n"); - } - - #[test] - fn test_line_ending_type_split() { - let unix_data = b"Hello\nWorld\n"; - let windows_data = b"Hello\r\nWorld\r\n"; - let mixed_data = b"Hello\nWorld\r\n"; - - // Test Unix - let unix_result = LineEndingType::Unix.split_by_lines(unix_data).unwrap(); - assert_eq!(unix_result.chunks.len(), 2); - - // Test Windows - let windows_result = LineEndingType::Windows - .split_by_lines(windows_data) - .unwrap(); - assert_eq!(windows_result.chunks.len(), 2); - - // Test Mixed - let mixed_result = LineEndingType::Mixed.split_by_lines(mixed_data).unwrap(); - assert_eq!(mixed_result.chunks.len(), 2); - } - - #[test] - fn test_chunk_hash_uniqueness() { - // Test that different lines produce different hashes - let data1 = b"Hello\n"; - let data2 = b"World\n"; - - let result1 = split_by_lines_impl(data1).unwrap(); - let result2 = split_by_lines_impl(data2).unwrap(); - - assert_ne!(result1.chunks[0].hash, result2.chunks[0].hash); - } -} -- cgit