summaryrefslogtreecommitdiff
path: root/systems/storage/src/store
diff options
context:
space:
mode:
author魏曹先生 <1992414357@qq.com>2026-02-27 06:17:06 +0800
committer魏曹先生 <1992414357@qq.com>2026-02-27 06:17:06 +0800
commit76e78fe53c03c9b4c7fa029709f06ee86ce9c865 (patch)
tree4e3778dfb405b2c21b51df24331100b94f5356d9 /systems/storage/src/store
parent748c8a3353df887ee4b01e0e1327aa95c1c7225a (diff)
Add storage system with chunk-based file storage
Diffstat (limited to 'systems/storage/src/store')
-rw-r--r--systems/storage/src/store/cdc.rs307
-rw-r--r--systems/storage/src/store/fixed.rs417
-rw-r--r--systems/storage/src/store/line.rs393
3 files changed, 1117 insertions, 0 deletions
diff --git a/systems/storage/src/store/cdc.rs b/systems/storage/src/store/cdc.rs
new file mode 100644
index 0000000..cc16202
--- /dev/null
+++ b/systems/storage/src/store/cdc.rs
@@ -0,0 +1,307 @@
+use std::path::PathBuf;
+
+use crate::{error::StorageIOError, store::ChunkingResult};
+
+/// Rabin fingerprint parameters for CDC
+const WINDOW_SIZE: usize = 48;
+const MIN_CHUNK_RATIO: f64 = 0.75;
+const MAX_CHUNK_RATIO: f64 = 2.0;
+const BASE_TARGET_MASK: u64 = 0xFFFF;
+
+/// Rabin fingerprint polynomial (irreducible polynomial)
+const POLYNOMIAL: u64 = 0x3DA3358B4DC173;
+
+/// Precomputed table for Rabin fingerprint sliding window
+static FINGERPRINT_TABLE: [u64; 256] = {
+ let mut table = [0u64; 256];
+ let mut i = 0;
+ while i < 256 {
+ let mut fingerprint = i as u64;
+ let mut j = 0;
+ while j < 8 {
+ if fingerprint & 1 != 0 {
+ fingerprint = (fingerprint >> 1) ^ POLYNOMIAL;
+ } else {
+ fingerprint >>= 1;
+ }
+ j += 1;
+ }
+ table[i] = fingerprint;
+ i += 1;
+ }
+ table
+};
+
+/// Calculate Rabin fingerprint for a byte
+#[inline]
+fn rabin_fingerprint_byte(old_fingerprint: u64, old_byte: u8, new_byte: u8) -> u64 {
+ let old_byte_index = old_byte as usize;
+
+ let fingerprint =
+ (old_fingerprint << 8) ^ (new_byte as u64) ^ FINGERPRINT_TABLE[old_byte_index];
+ fingerprint
+}
+
+/// Split data using Content-Defined Chunking with Rabin fingerprinting
+pub fn split_cdc_impl(data: &[u8], avg_size: u32) -> Result<ChunkingResult, StorageIOError> {
+ let avg_size = avg_size as usize;
+
+ if avg_size == 0 {
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::InvalidInput,
+ "Average chunk size must be greater than 0",
+ )
+ .into());
+ }
+
+ // Calculate min and max chunk sizes based on average size
+ let min_chunk_size = (avg_size as f64 * MIN_CHUNK_RATIO) as usize;
+ let max_chunk_size = (avg_size as f64 * MAX_CHUNK_RATIO) as usize;
+
+ // Ensure reasonable bounds
+ let min_chunk_size = min_chunk_size.max(512); // At least 512 bytes
+ let max_chunk_size = max_chunk_size.max(avg_size * 2);
+
+ let target_mask = {
+ let scale_factor = avg_size as f64 / (64.0 * 1024.0);
+ let mask_value = (BASE_TARGET_MASK as f64 * scale_factor) as u64;
+ mask_value.max(0xC000)
+ };
+
+ let mut chunks = Vec::new();
+ let mut start = 0;
+ let data_len = data.len();
+
+ while start < data_len {
+ let chunk_start = start;
+
+ let search_end = (start + max_chunk_size).min(data_len);
+ let mut boundary_found = false;
+ let mut boundary_pos = search_end;
+
+ let mut window = Vec::with_capacity(WINDOW_SIZE);
+ let mut fingerprint: u64 = 0;
+
+ for i in start..search_end {
+ let byte = data[i];
+
+ // Update sliding window
+ if window.len() >= WINDOW_SIZE {
+ let old_byte = window.remove(0);
+ fingerprint = rabin_fingerprint_byte(fingerprint, old_byte, byte);
+ } else {
+ fingerprint = (fingerprint << 8) ^ (byte as u64);
+ }
+ window.push(byte);
+
+ if i - chunk_start >= min_chunk_size {
+ if (fingerprint & target_mask) == 0 {
+ boundary_found = true;
+ boundary_pos = i + 1;
+ break;
+ }
+ }
+ }
+
+ let chunk_end = if boundary_found {
+ boundary_pos
+ } else {
+ search_end
+ };
+
+ let chunk_data = data[chunk_start..chunk_end].to_vec();
+ let chunk = crate::store::create_chunk(chunk_data);
+ chunks.push(chunk);
+
+ start = chunk_end;
+ }
+
+ if chunks.len() > 1 {
+ let mut merged_chunks = Vec::new();
+ let mut i = 0;
+
+ while i < chunks.len() {
+ let current_chunk = &chunks[i];
+
+ if i < chunks.len() - 1 {
+ let next_chunk = &chunks[i + 1];
+ if current_chunk.data.len() < min_chunk_size
+ && current_chunk.data.len() + next_chunk.data.len() <= max_chunk_size
+ {
+ // Merge chunks
+ let mut merged_data = current_chunk.data.clone();
+ merged_data.extend_from_slice(&next_chunk.data);
+ let merged_chunk = crate::store::create_chunk(merged_data);
+ merged_chunks.push(merged_chunk);
+ i += 2;
+ } else {
+ merged_chunks.push(current_chunk.clone());
+ i += 1;
+ }
+ } else {
+ merged_chunks.push(current_chunk.clone());
+ i += 1;
+ }
+ }
+
+ chunks = merged_chunks;
+ }
+
+ Ok(ChunkingResult {
+ chunks,
+ total_size: data_len as u64,
+ })
+}
+
+/// Split file using CDC with specified average chunk size
+pub async fn write_file_cdc<I: Into<PathBuf>>(
+ file_to_write: I,
+ storage_dir: I,
+ output_index_file: I,
+ avg_size: u32,
+) -> Result<(), StorageIOError> {
+ use crate::store::{StorageConfig, write_file};
+
+ let config = StorageConfig::cdc(avg_size);
+ write_file(file_to_write, storage_dir, output_index_file, &config).await
+}
+
+/// Test function for CDC chunking
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_cdc_basic() {
+ let pattern: Vec<u8> = (0..1024).map(|i| (i % 256) as u8).collect();
+ let mut data = Vec::new();
+ for _ in 0..10 {
+ data.extend_from_slice(&pattern);
+ }
+
+ let result = split_cdc_impl(&data, 4096).unwrap();
+
+ // Should have at least one chunk
+ assert!(!result.chunks.is_empty());
+
+ assert!(
+ result.chunks.len() >= 1 && result.chunks.len() <= 10,
+ "Expected 1-10 chunks for 10KB data with 4KB average, got {}",
+ result.chunks.len()
+ );
+
+ // Verify total size
+ let total_chunk_size: usize = result.chunks.iter().map(|c| c.data.len()).sum();
+ assert_eq!(total_chunk_size, data.len());
+
+ // Verify chunk size bounds (more relaxed for CDC)
+ let max_size = (4096.0 * MAX_CHUNK_RATIO) as usize;
+
+ for chunk in &result.chunks {
+ let chunk_size = chunk.data.len();
+ // CDC can produce chunks smaller than min_size due to content patterns
+ // But they should not exceed max_size
+ assert!(
+ chunk_size <= max_size,
+ "Chunk size {} exceeds maximum {}",
+ chunk_size,
+ max_size
+ );
+ }
+ }
+
+ #[test]
+ fn test_cdc_small_file() {
+ // Small file should result in single chunk
+ let data = vec![1, 2, 3, 4, 5];
+
+ let result = split_cdc_impl(&data, 4096).unwrap();
+
+ // Should have exactly one chunk
+ assert_eq!(result.chunks.len(), 1);
+ assert_eq!(result.chunks[0].data.len(), data.len());
+ }
+
+ #[test]
+ fn test_cdc_large_avg_size() {
+ // Test with larger average size (256KB)
+ let mut data = Vec::new();
+ for i in 0..(256 * 1024 * 3) {
+ // 768KB of data
+ data.push((i % 256) as u8);
+ }
+
+ let result = split_cdc_impl(&data, 256 * 1024).unwrap();
+
+ // With 768KB data and 256KB average, should have reasonable number of chunks
+ // CDC is content-defined, so exact count varies
+ assert!(
+ result.chunks.len() >= 1 && result.chunks.len() <= 10,
+ "Expected 1-10 chunks for 768KB data with 256KB average, got {}",
+ result.chunks.len()
+ );
+
+ // Verify total size
+ let total_chunk_size: usize = result.chunks.iter().map(|c| c.data.len()).sum();
+ assert_eq!(total_chunk_size, data.len());
+
+ // Verify chunk size bounds
+ let max_size = ((256 * 1024) as f64 * MAX_CHUNK_RATIO) as usize;
+
+ for chunk in &result.chunks {
+ // CDC can produce chunks smaller than min_size
+ // But they should not exceed max_size
+ assert!(
+ chunk.data.len() <= max_size,
+ "Chunk size {} exceeds maximum {}",
+ chunk.data.len(),
+ max_size
+ );
+ }
+ }
+
+ #[test]
+ fn test_cdc_zero_avg_size() {
+ let data = vec![1, 2, 3];
+
+ let result = split_cdc_impl(&data, 0);
+ assert!(result.is_err());
+
+ match result {
+ Err(StorageIOError::IOErr(e)) => {
+ assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput);
+ }
+ _ => panic!("Expected IOErr with InvalidInput"),
+ }
+ }
+
+ #[test]
+ fn test_rabin_fingerprint() {
+ // Test that rabin_fingerprint_byte function is deterministic
+ let test_bytes = [0x01, 0x02, 0x03, 0x04];
+
+ // Calculate fingerprint with specific sequence
+ let mut fingerprint1 = 0;
+ for &byte in &test_bytes {
+ fingerprint1 = rabin_fingerprint_byte(fingerprint1, 0xAA, byte);
+ }
+
+ // Calculate again with same inputs
+ let mut fingerprint2 = 0;
+ for &byte in &test_bytes {
+ fingerprint2 = rabin_fingerprint_byte(fingerprint2, 0xAA, byte);
+ }
+
+ // Same input should produce same output
+ assert_eq!(fingerprint1, fingerprint2);
+
+ // Test with different old_byte produces different result
+ let mut fingerprint3 = 0;
+ for &byte in &test_bytes {
+ fingerprint3 = rabin_fingerprint_byte(fingerprint3, 0xBB, byte);
+ }
+
+ // Different old_byte should produce different fingerprint
+ assert_ne!(fingerprint1, fingerprint3);
+ }
+}
diff --git a/systems/storage/src/store/fixed.rs b/systems/storage/src/store/fixed.rs
new file mode 100644
index 0000000..044cc1c
--- /dev/null
+++ b/systems/storage/src/store/fixed.rs
@@ -0,0 +1,417 @@
+use std::path::PathBuf;
+use std::time::Instant;
+
+use log::{info, trace};
+use memmap2::Mmap;
+use tokio::fs;
+use tokio::task;
+
+use crate::{
+ error::StorageIOError,
+ store::{ChunkingResult, IndexEntry, StorageConfig, create_chunk, get_dir, precheck},
+};
+
+/// Split data using fixed-size chunking
+pub fn split_fixed_impl(data: &[u8], chunk_size: u32) -> Result<ChunkingResult, StorageIOError> {
+ let chunk_size = chunk_size as usize;
+
+ if chunk_size == 0 {
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::InvalidInput,
+ "Chunk size must be greater than 0",
+ )
+ .into());
+ }
+
+ let mut chunks = Vec::new();
+ let mut start = 0;
+ let total_size = data.len();
+
+ while start < total_size {
+ let end = (start + chunk_size).min(total_size);
+ let chunk_data = data[start..end].to_vec();
+
+ let chunk = crate::store::create_chunk(chunk_data);
+ chunks.push(chunk);
+
+ start = end;
+ }
+
+ Ok(ChunkingResult {
+ chunks,
+ total_size: total_size as u64,
+ })
+}
+
+/// Split file using fixed-size chunking
+pub async fn write_file_fixed<I: Into<PathBuf>>(
+ file_to_write: I,
+ storage_dir: I,
+ output_index_file: I,
+ fixed_size: u32,
+) -> Result<(), StorageIOError> {
+ let config = StorageConfig::fixed_size(fixed_size);
+ write_file_parallel(file_to_write, storage_dir, output_index_file, &config).await
+}
+
+/// Split file using fixed-size chunking with parallel processing
+async fn write_file_parallel(
+ file_to_write: impl Into<PathBuf>,
+ storage_dir: impl Into<PathBuf>,
+ output_index_file: impl Into<PathBuf>,
+ cfg: &StorageConfig,
+) -> Result<(), StorageIOError> {
+ let (file_to_write, storage_dir, output_index_file) =
+ precheck(file_to_write, storage_dir, output_index_file).await?;
+
+ info!("Starting file write: {}", file_to_write.display());
+ let start_time = Instant::now();
+
+ // Memory map the entire file
+ let file = std::fs::File::open(&file_to_write)?;
+ let mmap = unsafe { Mmap::map(&file)? };
+ let data = &mmap[..];
+
+ // Split into chunks based on policy
+ let chunking_result = split_into_chunks_parallel(&data, &cfg.chunking_policy).await?;
+
+ // Store chunks in parallel and create index
+ let index_entries = store_chunks_parallel(&chunking_result.chunks, &storage_dir).await?;
+
+ // Write index file
+ write_index_file(&index_entries, &output_index_file).await?;
+
+ let duration = start_time.elapsed();
+ info!(
+ "File write completed in {:?}: {}",
+ duration,
+ file_to_write.display()
+ );
+
+ Ok(())
+}
+
+/// Split data into chunks based on the specified policy with parallel processing
+async fn split_into_chunks_parallel(
+ data: &[u8],
+ policy: &crate::store::ChunkingPolicy,
+) -> Result<ChunkingResult, StorageIOError> {
+ match policy {
+ crate::store::ChunkingPolicy::FixedSize(chunk_size) => {
+ split_fixed_parallel(data, *chunk_size).await
+ }
+ _ => split_fixed_impl(data, 64 * 1024), // Fallback for non-fixed chunking
+ }
+}
+
+/// Split data using fixed-size chunking with parallel processing
+async fn split_fixed_parallel(
+ data: &[u8],
+ chunk_size: u32,
+) -> Result<ChunkingResult, StorageIOError> {
+ let chunk_size = chunk_size as usize;
+
+ if chunk_size == 0 {
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::InvalidInput,
+ "Chunk size must be greater than 0",
+ )
+ .into());
+ }
+
+ let total_size = data.len();
+ let num_chunks = (total_size + chunk_size - 1) / chunk_size; // Ceiling division
+
+ // Create a vector to hold chunk boundaries
+ let mut chunk_boundaries = Vec::with_capacity(num_chunks);
+ let mut start = 0;
+
+ while start < total_size {
+ let end = (start + chunk_size).min(total_size);
+ chunk_boundaries.push((start, end));
+ start = end;
+ }
+
+ // Process chunks in parallel using Tokio tasks
+ let chunks: Vec<crate::store::Chunk> = if chunk_boundaries.len() > 1 {
+ // Use parallel processing for multiple chunks
+ let mut tasks = Vec::with_capacity(chunk_boundaries.len());
+
+ for (start, end) in chunk_boundaries {
+ let chunk_data = data[start..end].to_vec();
+
+ // Spawn a blocking task for each chunk
+ tasks.push(task::spawn_blocking(move || create_chunk(chunk_data)));
+ }
+
+ // Wait for all tasks to complete
+ let mut chunks = Vec::with_capacity(tasks.len());
+ for task in tasks {
+ match task.await {
+ Ok(chunk) => chunks.push(chunk),
+ Err(e) => {
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::Other,
+ format!("Task join error: {}", e),
+ )
+ .into());
+ }
+ }
+ }
+
+ chunks
+ } else {
+ // Single chunk, no need for parallel processing
+ chunk_boundaries
+ .into_iter()
+ .map(|(start, end)| {
+ let chunk_data = data[start..end].to_vec();
+ create_chunk(chunk_data)
+ })
+ .collect()
+ };
+
+ Ok(ChunkingResult {
+ chunks,
+ total_size: total_size as u64,
+ })
+}
+
+/// Store chunks in the storage directory in parallel and return index entries
+async fn store_chunks_parallel(
+ chunks: &[crate::store::Chunk],
+ storage_dir: &std::path::Path,
+) -> Result<Vec<IndexEntry>, StorageIOError> {
+ let mut tasks = Vec::with_capacity(chunks.len());
+
+ let writed_counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
+ let total_chunks = chunks.len();
+
+ for chunk in chunks {
+ let chunk = chunk.clone();
+ let storage_dir = storage_dir.to_path_buf();
+ let writed_counter = writed_counter.clone();
+
+ // Spawn async task for each chunk storage operation
+ tasks.push(task::spawn(async move {
+ // Create storage directory structure based on hash
+ let hash_str = hex::encode(chunk.hash);
+ let chunk_dir = get_dir(storage_dir, hash_str.clone())?;
+
+ // Create directory if it doesn't exist
+ if let Some(parent) = chunk_dir.parent() {
+ fs::create_dir_all(parent).await?;
+ }
+
+ // Write chunk data
+ let chunk_path = chunk_dir.with_extension("chunk");
+ if !chunk_path.exists() {
+ trace!("W: {}", hash_str);
+ fs::write(&chunk_path, &chunk.data).await?;
+ writed_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+ }
+
+ Ok::<IndexEntry, StorageIOError>(IndexEntry {
+ hash: chunk.hash,
+ size: chunk.data.len() as u32,
+ })
+ }));
+ }
+
+ let writed_count = writed_counter.load(std::sync::atomic::Ordering::Relaxed);
+ info!(
+ "Chunk storage completed: {}/{} ({}%) chunks written ({} duplicates skipped)",
+ writed_count,
+ total_chunks,
+ (writed_count as f32 / total_chunks as f32) * 100 as f32,
+ total_chunks - writed_count
+ );
+
+ // Wait for all tasks to complete
+ let mut index_entries = Vec::with_capacity(chunks.len());
+ for task in tasks {
+ let entry = task.await.map_err(|e| {
+ std::io::Error::new(std::io::ErrorKind::Other, format!("Task join error: {}", e))
+ })??;
+ index_entries.push(entry);
+ }
+
+ Ok(index_entries)
+}
+
+/// Write index file containing chunk hashes and sizes
+async fn write_index_file(
+ entries: &[IndexEntry],
+ output_path: &std::path::Path,
+) -> Result<(), StorageIOError> {
+ let mut index_data = Vec::with_capacity(entries.len() * 36); // 32 bytes hash + 4 bytes size
+
+ for entry in entries {
+ index_data.extend_from_slice(&entry.hash);
+ index_data.extend_from_slice(&entry.size.to_le_bytes());
+ }
+
+ fs::write(output_path, &index_data).await?;
+ Ok(())
+}
+
+/// Utility function to calculate optimal fixed chunk size based on file size
+pub fn calculate_optimal_chunk_size(file_size: u64, target_chunks: usize) -> u32 {
+ if target_chunks == 0 || file_size == 0 {
+ return 64 * 1024; // Default 64KB
+ }
+
+ let chunk_size = (file_size as f64 / target_chunks as f64).ceil() as u32;
+
+ // Round to nearest power of 2 for better performance
+ let rounded_size = if chunk_size <= 1024 {
+ // Small chunks: use exact size
+ chunk_size
+ } else {
+ // Larger chunks: round to nearest power of 2
+ let mut size = chunk_size;
+ size -= 1;
+ size |= size >> 1;
+ size |= size >> 2;
+ size |= size >> 4;
+ size |= size >> 8;
+ size |= size >> 16;
+ size += 1;
+ size
+ };
+
+ // Ensure minimum and maximum bounds
+ rounded_size.max(1024).min(16 * 1024 * 1024) // 1KB min, 16MB max
+}
+
+/// Split file with automatic chunk size calculation
+pub async fn write_file_fixed_auto<I: Into<PathBuf>, J: Into<PathBuf>, K: Into<PathBuf>>(
+ file_to_write: I,
+ storage_dir: J,
+ output_index_file: K,
+ target_chunks: usize,
+) -> Result<(), StorageIOError> {
+ let file_path = file_to_write.into();
+ let storage_dir = storage_dir.into();
+ let output_index_file = output_index_file.into();
+
+ let file_size = fs::metadata(&file_path).await?.len();
+
+ let chunk_size = calculate_optimal_chunk_size(file_size, target_chunks);
+ write_file_fixed(file_path, storage_dir, output_index_file, chunk_size).await
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_fixed_chunking_basic() {
+ // Create 10KB of test data
+ let data: Vec<u8> = (0..10240).map(|i| (i % 256) as u8).collect();
+
+ // Split into 1KB chunks
+ let result = split_fixed_impl(&data, 1024).unwrap();
+
+ // Should have 10 chunks
+ assert_eq!(result.chunks.len(), 10);
+
+ // Verify total size
+ let total_chunk_size: usize = result.chunks.iter().map(|c| c.data.len()).sum();
+ assert_eq!(total_chunk_size, data.len());
+
+ // Verify chunk sizes (last chunk may be smaller)
+ for (i, chunk) in result.chunks.iter().enumerate() {
+ if i < 9 {
+ assert_eq!(chunk.data.len(), 1024);
+ } else {
+ assert_eq!(chunk.data.len(), 1024); // 10240 / 1024 = 10 exactly
+ }
+ }
+ }
+
+ #[test]
+ fn test_fixed_chunking_uneven() {
+ // Create 5.5KB of test data
+ let data: Vec<u8> = (0..5632).map(|i| (i % 256) as u8).collect();
+
+ // Split into 2KB chunks
+ let result = split_fixed_impl(&data, 2048).unwrap();
+
+ // Should have 3 chunks (2048 + 2048 + 1536)
+ assert_eq!(result.chunks.len(), 3);
+
+ // Verify chunk sizes
+ assert_eq!(result.chunks[0].data.len(), 2048);
+ assert_eq!(result.chunks[1].data.len(), 2048);
+ assert_eq!(result.chunks[2].data.len(), 1536);
+
+ // Verify data integrity
+ let mut reconstructed = Vec::new();
+ for chunk in &result.chunks {
+ reconstructed.extend_from_slice(&chunk.data);
+ }
+ assert_eq!(reconstructed, data);
+ }
+
+ #[test]
+ fn test_fixed_chunking_small_file() {
+ // Small file smaller than chunk size
+ let data = vec![1, 2, 3, 4, 5];
+
+ let result = split_fixed_impl(&data, 1024).unwrap();
+
+ // Should have exactly one chunk
+ assert_eq!(result.chunks.len(), 1);
+ assert_eq!(result.chunks[0].data.len(), data.len());
+ }
+
+ #[test]
+ fn test_fixed_chunking_zero_size() {
+ let data = vec![1, 2, 3];
+
+ let result = split_fixed_impl(&data, 0);
+ assert!(result.is_err());
+
+ match result {
+ Err(StorageIOError::IOErr(e)) => {
+ assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput);
+ }
+ _ => panic!("Expected IOErr with InvalidInput"),
+ }
+ }
+
+ #[test]
+ fn test_calculate_optimal_chunk_size() {
+ // Test basic calculation
+ assert_eq!(calculate_optimal_chunk_size(1024 * 1024, 16), 64 * 1024); // 1MB / 16 = 64KB
+
+ // Test rounding to power of 2
+ assert_eq!(calculate_optimal_chunk_size(1000 * 1000, 17), 64 * 1024); // ~58.8KB rounds to 64KB
+
+ // Test minimum bound
+ assert_eq!(calculate_optimal_chunk_size(100, 10), 1024); // 10 bytes per chunk, but min is 1KB
+
+ // Test edge cases
+ assert_eq!(calculate_optimal_chunk_size(0, 10), 64 * 1024); // Default
+ assert_eq!(calculate_optimal_chunk_size(1000, 0), 64 * 1024); // Default
+
+ // Test large file
+ assert_eq!(
+ calculate_optimal_chunk_size(100 * 1024 * 1024, 10),
+ 16 * 1024 * 1024
+ ); // 100MB / 10 = 10MB, max is 16MB
+ }
+
+ #[test]
+ fn test_chunk_hash_uniqueness() {
+ // Test that different data produces different hashes
+ let data1 = vec![1, 2, 3, 4, 5];
+ let data2 = vec![1, 2, 3, 4, 6];
+
+ let result1 = split_fixed_impl(&data1, 1024).unwrap();
+ let result2 = split_fixed_impl(&data2, 1024).unwrap();
+
+ assert_ne!(result1.chunks[0].hash, result2.chunks[0].hash);
+ }
+}
diff --git a/systems/storage/src/store/line.rs b/systems/storage/src/store/line.rs
new file mode 100644
index 0000000..971018b
--- /dev/null
+++ b/systems/storage/src/store/line.rs
@@ -0,0 +1,393 @@
+use std::path::PathBuf;
+
+use crate::{error::StorageIOError, store::ChunkingResult};
+
+/// Split data by lines (newline characters)
+pub fn split_by_lines_impl(data: &[u8]) -> Result<ChunkingResult, StorageIOError> {
+ let mut chunks = Vec::new();
+ let mut start = 0;
+ let total_size = data.len();
+
+ // Iterate through data to find line boundaries
+ let mut i = 0;
+ while i < data.len() {
+ if data[i] == b'\n' {
+ // Unix line ending
+ let line_end = i + 1; // Include \n
+ // Extract line data (include newline character)
+ let line_data = data[start..line_end].to_vec();
+
+ // Create chunk for this line
+ let chunk = crate::store::create_chunk(line_data);
+ chunks.push(chunk);
+
+ // Move start to next line
+ start = line_end;
+ i = line_end;
+ } else if data[i] == b'\r' && i + 1 < data.len() && data[i + 1] == b'\n' {
+ // Windows line ending
+ let line_end = i + 2; // Include both \r and \n
+ // Extract line data (include newline characters)
+ let line_data = data[start..line_end].to_vec();
+
+ // Create chunk for this line
+ let chunk = crate::store::create_chunk(line_data);
+ chunks.push(chunk);
+
+ // Move start to next line
+ start = line_end;
+ i = line_end;
+ } else {
+ i += 1;
+ }
+ }
+
+ // Handle remaining data (last line without newline)
+ if start < total_size {
+ let line_data = data[start..].to_vec();
+ let chunk = crate::store::create_chunk(line_data);
+ chunks.push(chunk);
+ }
+
+ // Handle empty file (no lines)
+ if chunks.is_empty() && total_size == 0 {
+ let chunk = crate::store::create_chunk(Vec::new());
+ chunks.push(chunk);
+ }
+
+ Ok(ChunkingResult {
+ chunks,
+ total_size: total_size as u64,
+ })
+}
+
+/// Split file by lines
+pub async fn write_file_line<I: Into<PathBuf>>(
+ file_to_write: I,
+ storage_dir: I,
+ output_index_file: I,
+) -> Result<(), StorageIOError> {
+ use crate::store::{StorageConfig, write_file};
+
+ let config = StorageConfig::line();
+ write_file(file_to_write, storage_dir, output_index_file, &config).await
+}
+
+/// Utility function to split data by lines with custom line ending detection
+pub fn split_by_lines_custom<E: LineEnding>(data: &[u8]) -> Result<ChunkingResult, StorageIOError> {
+ let mut chunks = Vec::new();
+ let mut start = 0;
+ let total_size = data.len();
+
+ let mut i = 0;
+ while i < total_size {
+ if E::is_line_ending(data, i) {
+ let line_end = i + E::ending_length(data, i);
+ let line_data = data[start..line_end].to_vec();
+
+ let chunk = crate::store::create_chunk(line_data);
+ chunks.push(chunk);
+
+ start = line_end;
+ i = line_end;
+ } else {
+ i += 1;
+ }
+ }
+
+ // Handle remaining data
+ if start < total_size {
+ let line_data = data[start..].to_vec();
+ let chunk = crate::store::create_chunk(line_data);
+ chunks.push(chunk);
+ }
+
+ // Handle empty file
+ if chunks.is_empty() && total_size == 0 {
+ let chunk = crate::store::create_chunk(Vec::new());
+ chunks.push(chunk);
+ }
+
+ Ok(ChunkingResult {
+ chunks,
+ total_size: total_size as u64,
+ })
+}
+
+/// Trait for different line ending types
+pub trait LineEnding {
+ /// Check if position i is the start of a line ending
+ fn is_line_ending(data: &[u8], i: usize) -> bool;
+
+ /// Get the length of the line ending at position i
+ fn ending_length(data: &[u8], i: usize) -> usize;
+}
+
+/// Unix line endings (\n)
+pub struct UnixLineEnding;
+
+impl LineEnding for UnixLineEnding {
+ fn is_line_ending(data: &[u8], i: usize) -> bool {
+ i < data.len() && data[i] == b'\n'
+ }
+
+ fn ending_length(_data: &[u8], _i: usize) -> usize {
+ 1
+ }
+}
+
+/// Windows line endings (\r\n)
+pub struct WindowsLineEnding;
+
+impl LineEnding for WindowsLineEnding {
+ fn is_line_ending(data: &[u8], i: usize) -> bool {
+ i + 1 < data.len() && data[i] == b'\r' && data[i + 1] == b'\n'
+ }
+
+ fn ending_length(_data: &[u8], _i: usize) -> usize {
+ 2
+ }
+}
+
+/// Mixed line endings (detects both Unix and Windows)
+pub struct MixedLineEnding;
+
+impl LineEnding for MixedLineEnding {
+ fn is_line_ending(data: &[u8], i: usize) -> bool {
+ if i < data.len() && data[i] == b'\n' {
+ true
+ } else if i + 1 < data.len() && data[i] == b'\r' && data[i + 1] == b'\n' {
+ true
+ } else {
+ false
+ }
+ }
+
+ fn ending_length(data: &[u8], i: usize) -> usize {
+ if i < data.len() && data[i] == b'\n' {
+ 1
+ } else if i + 1 < data.len() && data[i] == b'\r' && data[i + 1] == b'\n' {
+ 2
+ } else {
+ 1 // Default to 1 if somehow called incorrectly
+ }
+ }
+}
+
+/// Detect line ending type from data
+pub fn detect_line_ending(data: &[u8]) -> LineEndingType {
+ let mut unix_count = 0;
+ let mut windows_count = 0;
+
+ let mut i = 0;
+ while i < data.len() {
+ if data[i] == b'\n' {
+ unix_count += 1;
+ i += 1;
+ } else if i + 1 < data.len() && data[i] == b'\r' && data[i + 1] == b'\n' {
+ windows_count += 1;
+ i += 2;
+ } else {
+ i += 1;
+ }
+ }
+
+ if unix_count > windows_count {
+ LineEndingType::Unix
+ } else if windows_count > unix_count {
+ LineEndingType::Windows
+ } else {
+ LineEndingType::Mixed
+ }
+}
+
+/// Line ending type enum
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub enum LineEndingType {
+ Unix,
+ Windows,
+ Mixed,
+}
+
+impl LineEndingType {
+ /// Split data using the detected line ending type
+ pub fn split_by_lines(&self, data: &[u8]) -> Result<ChunkingResult, StorageIOError> {
+ match self {
+ LineEndingType::Unix => split_by_lines_custom::<UnixLineEnding>(data),
+ LineEndingType::Windows => split_by_lines_custom::<WindowsLineEnding>(data),
+ LineEndingType::Mixed => split_by_lines_custom::<MixedLineEnding>(data),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_line_chunking_unix() {
+ let data = b"Hello\nWorld\nTest\n";
+
+ let result = split_by_lines_impl(data).unwrap();
+
+ // Should have 3 chunks
+ assert_eq!(result.chunks.len(), 3);
+
+ // Verify chunk contents
+ assert_eq!(result.chunks[0].data, b"Hello\n");
+ assert_eq!(result.chunks[1].data, b"World\n");
+ assert_eq!(result.chunks[2].data, b"Test\n");
+
+ // Verify total size
+ let total_chunk_size: usize = result.chunks.iter().map(|c| c.data.len()).sum();
+ assert_eq!(total_chunk_size, data.len());
+ }
+
+ #[test]
+ fn test_line_chunking_windows() {
+ let data = b"Hello\r\nWorld\r\nTest\r\n";
+
+ let result = split_by_lines_impl(data).unwrap();
+
+ // Should have 3 chunks
+ assert_eq!(result.chunks.len(), 3);
+
+ // Verify chunk contents (should include \r\n)
+ assert_eq!(result.chunks[0].data, b"Hello\r\n");
+ assert_eq!(result.chunks[1].data, b"World\r\n");
+ assert_eq!(result.chunks[2].data, b"Test\r\n");
+ }
+
+ #[test]
+ fn test_line_chunking_mixed() {
+ let data = b"Hello\nWorld\r\nTest\n";
+
+ let result = split_by_lines_impl(data).unwrap();
+
+ // Should have 3 chunks
+ assert_eq!(result.chunks.len(), 3);
+
+ // Verify chunk contents
+ assert_eq!(result.chunks[0].data, b"Hello\n");
+ assert_eq!(result.chunks[1].data, b"World\r\n");
+ assert_eq!(result.chunks[2].data, b"Test\n");
+ }
+
+ #[test]
+ fn test_line_chunking_no_trailing_newline() {
+ let data = b"Hello\nWorld\nTest";
+
+ let result = split_by_lines_impl(data).unwrap();
+
+ // Should have 3 chunks
+ assert_eq!(result.chunks.len(), 3);
+
+ // Verify chunk contents
+ assert_eq!(result.chunks[0].data, b"Hello\n");
+ assert_eq!(result.chunks[1].data, b"World\n");
+ assert_eq!(result.chunks[2].data, b"Test");
+ }
+
+ #[test]
+ fn test_line_chunking_empty_lines() {
+ let data = b"Hello\n\nWorld\n\n\n";
+
+ let result = split_by_lines_impl(data).unwrap();
+
+ // Should have 5 chunks (including empty lines)
+ // "Hello\n", "\n", "World\n", "\n", "\n"
+ assert_eq!(result.chunks.len(), 5);
+
+ // Verify chunk contents
+ assert_eq!(result.chunks[0].data, b"Hello\n");
+ assert_eq!(result.chunks[1].data, b"\n");
+ assert_eq!(result.chunks[2].data, b"World\n");
+ assert_eq!(result.chunks[3].data, b"\n");
+ assert_eq!(result.chunks[4].data, b"\n");
+ }
+
+ #[test]
+ fn test_line_chunking_empty_file() {
+ let data = b"";
+
+ let result = split_by_lines_impl(data).unwrap();
+
+ // Should have 1 empty chunk
+ assert_eq!(result.chunks.len(), 1);
+ assert_eq!(result.chunks[0].data, b"");
+ }
+
+ #[test]
+ fn test_detect_line_ending() {
+ // Test Unix detection
+ let unix_data = b"Hello\nWorld\n";
+ assert_eq!(detect_line_ending(unix_data), LineEndingType::Unix);
+
+ // Test Windows detection
+ let windows_data = b"Hello\r\nWorld\r\n";
+ assert_eq!(detect_line_ending(windows_data), LineEndingType::Windows);
+
+ // Test mixed detection
+ let mixed_data = b"Hello\nWorld\r\n";
+ assert_eq!(detect_line_ending(mixed_data), LineEndingType::Mixed);
+
+ // Test no newlines
+ let no_newlines = b"Hello World";
+ assert_eq!(detect_line_ending(no_newlines), LineEndingType::Mixed);
+ }
+
+ #[test]
+ fn test_custom_line_ending_unix() {
+ let data = b"Hello\nWorld\n";
+
+ let result = split_by_lines_custom::<UnixLineEnding>(data).unwrap();
+
+ assert_eq!(result.chunks.len(), 2);
+ assert_eq!(result.chunks[0].data, b"Hello\n");
+ assert_eq!(result.chunks[1].data, b"World\n");
+ }
+
+ #[test]
+ fn test_custom_line_ending_windows() {
+ let data = b"Hello\r\nWorld\r\n";
+
+ let result = split_by_lines_custom::<WindowsLineEnding>(data).unwrap();
+
+ assert_eq!(result.chunks.len(), 2);
+ assert_eq!(result.chunks[0].data, b"Hello\r\n");
+ assert_eq!(result.chunks[1].data, b"World\r\n");
+ }
+
+ #[test]
+ fn test_line_ending_type_split() {
+ let unix_data = b"Hello\nWorld\n";
+ let windows_data = b"Hello\r\nWorld\r\n";
+ let mixed_data = b"Hello\nWorld\r\n";
+
+ // Test Unix
+ let unix_result = LineEndingType::Unix.split_by_lines(unix_data).unwrap();
+ assert_eq!(unix_result.chunks.len(), 2);
+
+ // Test Windows
+ let windows_result = LineEndingType::Windows
+ .split_by_lines(windows_data)
+ .unwrap();
+ assert_eq!(windows_result.chunks.len(), 2);
+
+ // Test Mixed
+ let mixed_result = LineEndingType::Mixed.split_by_lines(mixed_data).unwrap();
+ assert_eq!(mixed_result.chunks.len(), 2);
+ }
+
+ #[test]
+ fn test_chunk_hash_uniqueness() {
+ // Test that different lines produce different hashes
+ let data1 = b"Hello\n";
+ let data2 = b"World\n";
+
+ let result1 = split_by_lines_impl(data1).unwrap();
+ let result2 = split_by_lines_impl(data2).unwrap();
+
+ assert_ne!(result1.chunks[0].hash, result2.chunks[0].hash);
+ }
+}