diff options
Diffstat (limited to 'systems/storage/src')
| -rw-r--r-- | systems/storage/src/error.rs | 8 | ||||
| -rw-r--r-- | systems/storage/src/lib.rs | 2 | ||||
| -rw-r--r-- | systems/storage/src/store.rs | 493 | ||||
| -rw-r--r-- | systems/storage/src/store/cdc.rs | 307 | ||||
| -rw-r--r-- | systems/storage/src/store/fixed.rs | 417 | ||||
| -rw-r--r-- | systems/storage/src/store/line.rs | 393 |
6 files changed, 1620 insertions, 0 deletions
diff --git a/systems/storage/src/error.rs b/systems/storage/src/error.rs new file mode 100644 index 0000000..4b3ad7e --- /dev/null +++ b/systems/storage/src/error.rs @@ -0,0 +1,8 @@ +#[derive(Debug, thiserror::Error)] +pub enum StorageIOError { + #[error("IO error: {0}")] + IOErr(#[from] std::io::Error), + + #[error("Hash too short")] + HashTooShort, +} diff --git a/systems/storage/src/lib.rs b/systems/storage/src/lib.rs new file mode 100644 index 0000000..4f89971 --- /dev/null +++ b/systems/storage/src/lib.rs @@ -0,0 +1,2 @@ +pub mod error; +pub mod store; diff --git a/systems/storage/src/store.rs b/systems/storage/src/store.rs new file mode 100644 index 0000000..6492449 --- /dev/null +++ b/systems/storage/src/store.rs @@ -0,0 +1,493 @@ +use std::path::{Path, PathBuf}; +use std::time::Instant; + +use blake3; +use log::{info, trace}; +use memmap2::Mmap; +use tokio::fs; + +use crate::error::StorageIOError; + +pub mod cdc; +pub mod fixed; +pub mod line; + +#[derive(Default, Debug)] +pub struct StorageConfig { + /// Chunking policy for splitting files + pub chunking_policy: ChunkingPolicy, +} + +impl StorageConfig { + /// Create a new StorageConfig with CDC chunking policy + pub fn cdc(avg_size: u32) -> Self { + Self { + chunking_policy: ChunkingPolicy::Cdc(avg_size), + } + } + + /// Create a new StorageConfig with fixed-size chunking policy + pub fn fixed_size(chunk_size: u32) -> Self { + Self { + chunking_policy: ChunkingPolicy::FixedSize(chunk_size), + } + } + + /// Create a new StorageConfig with line-based chunking policy + pub fn line() -> Self { + Self { + chunking_policy: ChunkingPolicy::Line, + } + } +} + +#[derive(Debug)] +pub enum ChunkingPolicy { + /// Content-Defined Chunking using Rabin fingerprinting + /// The `u32` value represents the desired average chunk size in bytes + Cdc(u32), + + /// Fixed-size chunking + /// The `u32` value represents the exact size of each chunk in bytes + FixedSize(u32), + + /// Line-based chunking + /// Each line becomes a separate chunk + Line, +} + +impl Default for ChunkingPolicy { + fn default() -> Self { + ChunkingPolicy::Cdc(64 * 1024) // Default to 64KB CDC + } +} + +/// Represents a chunk of data with its hash and content +#[derive(Debug, Clone)] +pub struct Chunk { + /// Blake3 hash of the chunk content + pub hash: [u8; 32], + /// Raw chunk data + pub data: Vec<u8>, +} + +/// Represents an index entry pointing to a chunk +#[derive(Debug, Clone)] +pub struct IndexEntry { + /// Blake3 hash of the chunk + pub hash: [u8; 32], + /// Size of the chunk in bytes + pub size: u32, +} + +/// Result of chunking a file +#[derive(Debug)] +pub struct ChunkingResult { + /// List of chunks extracted from the file + pub chunks: Vec<Chunk>, + /// Total size of the original file + pub total_size: u64, +} + +/// Split a file into chunks and store them in the repository, then output the index file to the specified directory +pub async fn write_file( + file_to_write: impl Into<PathBuf>, + storage_dir: impl Into<PathBuf>, + output_index_file: impl Into<PathBuf>, + cfg: &StorageConfig, +) -> Result<(), StorageIOError> { + let (file_to_write, storage_dir, output_index_file) = + precheck(file_to_write, storage_dir, output_index_file).await?; + + info!("Starting file write: {}", file_to_write.display()); + let start_time = Instant::now(); + + // Memory map the entire file + let file = std::fs::File::open(&file_to_write)?; + let mmap = unsafe { Mmap::map(&file)? }; + let data = &mmap[..]; + + // Split into chunks based on policy + let chunking_result = split_into_chunks(&data, &cfg.chunking_policy)?; + + // Store chunks and create index + let index_entries = store_chunks(&chunking_result.chunks, &storage_dir).await?; + + // Write index file + write_index_file(&index_entries, &output_index_file).await?; + + let duration = start_time.elapsed(); + info!( + "File write completed in {:?}: {}", + duration, + file_to_write.display() + ); + + Ok(()) +} + +/// Split data into chunks based on the specified policy +pub fn split_into_chunks( + data: &[u8], + policy: &ChunkingPolicy, +) -> Result<ChunkingResult, StorageIOError> { + match policy { + ChunkingPolicy::Cdc(avg_size) => split_cdc(data, *avg_size), + ChunkingPolicy::FixedSize(chunk_size) => split_fixed(data, *chunk_size), + ChunkingPolicy::Line => split_by_lines(data), + } +} + +/// Split data using Content-Defined Chunking +fn split_cdc(data: &[u8], avg_size: u32) -> Result<ChunkingResult, StorageIOError> { + use crate::store::cdc::split_cdc_impl; + split_cdc_impl(data, avg_size) +} + +/// Split data using fixed-size chunking +fn split_fixed(data: &[u8], chunk_size: u32) -> Result<ChunkingResult, StorageIOError> { + use crate::store::fixed::split_fixed_impl; + split_fixed_impl(data, chunk_size) +} + +/// Split data by lines +fn split_by_lines(data: &[u8]) -> Result<ChunkingResult, StorageIOError> { + use crate::store::line::split_by_lines_impl; + split_by_lines_impl(data) +} + +/// Store chunks in the storage directory and return index entries +async fn store_chunks( + chunks: &[Chunk], + storage_dir: &Path, +) -> Result<Vec<IndexEntry>, StorageIOError> { + let mut index_entries = Vec::with_capacity(chunks.len()); + + let writed_counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let total_chunks = chunks.len(); + + for chunk in chunks { + // Create storage directory structure based on hash + let hash_str = hex::encode(chunk.hash); + let chunk_dir = get_dir(storage_dir.to_path_buf(), hash_str.clone())?; + + // Create directory if it doesn't exist + if let Some(parent) = chunk_dir.parent() { + fs::create_dir_all(parent).await?; + } + + // Write chunk data + let chunk_path = chunk_dir.with_extension("chunk"); + if !chunk_path.exists() { + trace!("W: {}", hash_str); + fs::write(&chunk_path, &chunk.data).await?; + writed_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + + // Add to index + index_entries.push(IndexEntry { + hash: chunk.hash, + size: chunk.data.len() as u32, + }); + } + + let writed_count = writed_counter.load(std::sync::atomic::Ordering::Relaxed); + info!( + "Chunk storage completed: {}/{} ({}%) chunks written ({} duplicates skipped)", + writed_count, + total_chunks, + (writed_count as f32 / total_chunks as f32) * 100 as f32, + total_chunks - writed_count + ); + + Ok(index_entries) +} + +/// Write index file containing chunk hashes and sizes +async fn write_index_file( + entries: &[IndexEntry], + output_path: &Path, +) -> Result<(), StorageIOError> { + let mut index_data = Vec::with_capacity(entries.len() * 36); // 32 bytes hash + 4 bytes size + + for entry in entries { + index_data.extend_from_slice(&entry.hash); + index_data.extend_from_slice(&entry.size.to_le_bytes()); + } + + fs::write(output_path, &index_data).await?; + Ok(()) +} + +/// Build a file from the repository to the specified directory using an index file +pub async fn build_file( + index_to_build: impl Into<PathBuf>, + storage_dir: impl Into<PathBuf>, + output_file: impl Into<PathBuf>, +) -> Result<(), StorageIOError> { + let (index_to_build, storage_dir, output_file) = + precheck(index_to_build, storage_dir, output_file).await?; + + info!( + "Starting file build from index: {}", + index_to_build.display() + ); + let start_time = Instant::now(); + + // Read index file + let index_entries = read_index_file(&index_to_build).await?; + + // Reconstruct file from chunks + let reconstructed_data = reconstruct_from_chunks(&index_entries, &storage_dir).await?; + + // Write output file + fs::write(&output_file, &reconstructed_data).await?; + + let duration = start_time.elapsed(); + info!( + "File build completed in {:?}: {} -> {}", + duration, + index_to_build.display(), + output_file.display() + ); + + Ok(()) +} + +/// Read index file and parse entries +async fn read_index_file(index_path: &Path) -> Result<Vec<IndexEntry>, StorageIOError> { + // Open file and memory map it + let file = std::fs::File::open(index_path)?; + let mmap = unsafe { Mmap::map(&file)? }; + + // Each entry is 36 bytes (32 bytes hash + 4 bytes size) + if mmap.len() % 36 != 0 { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Invalid index file format", + ) + .into()); + } + + let num_entries = mmap.len() / 36; + let mut entries = Vec::with_capacity(num_entries); + + for i in 0..num_entries { + let start = i * 36; + let hash_start = start; + let size_start = start + 32; + + let mut hash = [0u8; 32]; + hash.copy_from_slice(&mmap[hash_start..hash_start + 32]); + + let size = u32::from_le_bytes([ + mmap[size_start], + mmap[size_start + 1], + mmap[size_start + 2], + mmap[size_start + 3], + ]); + + entries.push(IndexEntry { hash, size }); + } + + Ok(entries) +} + +/// Reconstruct file data from chunks using index entries +async fn reconstruct_from_chunks( + entries: &[IndexEntry], + storage_dir: &Path, +) -> Result<Vec<u8>, StorageIOError> { + let mut reconstructed_data = Vec::new(); + + for entry in entries { + // Get chunk path from hash + let hash_str = hex::encode(entry.hash); + let chunk_dir = get_dir(storage_dir.to_path_buf(), hash_str.clone())?; + let chunk_path = chunk_dir.with_extension("chunk"); + + trace!("R: {}", hash_str); + + // Memory map the chunk file + let file = std::fs::File::open(&chunk_path)?; + let mmap = unsafe { Mmap::map(&file)? }; + + // Verify chunk size matches index + if mmap.len() != entry.size as usize { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!( + "Chunk size mismatch: expected {}, got {}", + entry.size, + mmap.len() + ), + ) + .into()); + } + + // Verify chunk hash + let actual_hash = blake3_digest(&mmap); + let expected_hash = entry.hash; + if actual_hash != expected_hash { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!( + "Chunk hash mismatch: expected {}, got {}", + hex::encode(expected_hash), + hex::encode(actual_hash) + ), + ) + .into()); + } + + // Append to reconstructed data + reconstructed_data.extend_from_slice(&mmap); + } + + Ok(reconstructed_data) +} + +/// Calculate Blake3 hash of data +fn blake3_digest(data: &[u8]) -> [u8; 32] { + let hash = blake3::hash(data); + *hash.as_bytes() +} + +/// Calculate and return the corresponding storage subdirectory path based on the given storage directory and hash string +pub fn get_dir(storage_dir: PathBuf, hash_str: String) -> Result<PathBuf, StorageIOError> { + if hash_str.len() < 4 { + return Err(StorageIOError::HashTooShort); + } + let dir = storage_dir + .join(&hash_str[0..2]) + .join(&hash_str[2..4]) + .join(&hash_str); + Ok(dir) +} + +/// Calculate Blake3 hash of data and create a Chunk +pub fn create_chunk(data: Vec<u8>) -> Chunk { + let hash = blake3_digest(&data); + Chunk { hash, data } +} + +/// Read a file and split it into chunks using the specified policy +pub async fn chunk_file( + file_path: impl Into<PathBuf>, + policy: &ChunkingPolicy, +) -> Result<ChunkingResult, StorageIOError> { + let file_path = file_path.into(); + info!("Starting chunking: {}", file_path.display()); + let start_time = Instant::now(); + + // Memory map the file + let file = std::fs::File::open(&file_path)?; + let mmap = unsafe { Mmap::map(&file)? }; + let data = &mmap[..]; + + let result = split_into_chunks(data, policy); + + let duration = start_time.elapsed(); + info!( + "Chunking completed in {:?}: {}", + duration, + file_path.display() + ); + + result +} + +/// Get chunk statistics +pub fn get_chunk_stats(chunks: &[Chunk]) -> ChunkStats { + let total_chunks = chunks.len(); + let total_size: usize = chunks.iter().map(|c| c.data.len()).sum(); + let avg_size = if total_chunks > 0 { + total_size / total_chunks + } else { + 0 + }; + let min_size = chunks.iter().map(|c| c.data.len()).min().unwrap_or(0); + let max_size = chunks.iter().map(|c| c.data.len()).max().unwrap_or(0); + + ChunkStats { + total_chunks, + total_size: total_size as u64, + avg_size, + min_size, + max_size, + } +} + +/// Statistics about chunks +#[derive(Debug, Clone)] +pub struct ChunkStats { + pub total_chunks: usize, + pub total_size: u64, + pub avg_size: usize, + pub min_size: usize, + pub max_size: usize, +} + +/// Pre-check whether the input file path, directory path, and output path are valid +pub async fn precheck( + input_file: impl Into<PathBuf>, + dir: impl Into<PathBuf>, + output_file: impl Into<PathBuf>, +) -> Result<(PathBuf, PathBuf, PathBuf), std::io::Error> { + let (input, dir, output) = (input_file.into(), dir.into(), output_file.into()); + + // Perform all checks in parallel + let (input_metadata_result, dir_metadata_result, output_metadata_result) = tokio::join!( + fs::metadata(&input), + fs::metadata(&dir), + fs::metadata(&output) + ); + + // Check if input file exists + let input_metadata = match input_metadata_result { + Ok(metadata) => metadata, + Err(_) => { + return Err(std::io::Error::new( + std::io::ErrorKind::NotFound, + format!("Input file not found: {}", input.display()), + )); + } + }; + + // Check if directory exists + let dir_metadata = match dir_metadata_result { + Ok(metadata) => metadata, + Err(_) => { + return Err(std::io::Error::new( + std::io::ErrorKind::NotFound, + format!("Directory not found: {}", dir.display()), + )); + } + }; + + // Check if output file already exists + if output_metadata_result.is_ok() { + return Err(std::io::Error::new( + std::io::ErrorKind::AlreadyExists, + format!("Output file already exist: {}", output.display()), + )); + } + + // Check if input is a file + if !input_metadata.is_file() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + format!("Input path is not a file: {}", input.display()), + )); + } + + // Check if dir is a directory + if !dir_metadata.is_dir() { + return Err(std::io::Error::new( + std::io::ErrorKind::NotADirectory, + format!("Input path is not a directory: {}", dir.display()), + )); + } + + Ok((input, dir, output)) +} diff --git a/systems/storage/src/store/cdc.rs b/systems/storage/src/store/cdc.rs new file mode 100644 index 0000000..cc16202 --- /dev/null +++ b/systems/storage/src/store/cdc.rs @@ -0,0 +1,307 @@ +use std::path::PathBuf; + +use crate::{error::StorageIOError, store::ChunkingResult}; + +/// Rabin fingerprint parameters for CDC +const WINDOW_SIZE: usize = 48; +const MIN_CHUNK_RATIO: f64 = 0.75; +const MAX_CHUNK_RATIO: f64 = 2.0; +const BASE_TARGET_MASK: u64 = 0xFFFF; + +/// Rabin fingerprint polynomial (irreducible polynomial) +const POLYNOMIAL: u64 = 0x3DA3358B4DC173; + +/// Precomputed table for Rabin fingerprint sliding window +static FINGERPRINT_TABLE: [u64; 256] = { + let mut table = [0u64; 256]; + let mut i = 0; + while i < 256 { + let mut fingerprint = i as u64; + let mut j = 0; + while j < 8 { + if fingerprint & 1 != 0 { + fingerprint = (fingerprint >> 1) ^ POLYNOMIAL; + } else { + fingerprint >>= 1; + } + j += 1; + } + table[i] = fingerprint; + i += 1; + } + table +}; + +/// Calculate Rabin fingerprint for a byte +#[inline] +fn rabin_fingerprint_byte(old_fingerprint: u64, old_byte: u8, new_byte: u8) -> u64 { + let old_byte_index = old_byte as usize; + + let fingerprint = + (old_fingerprint << 8) ^ (new_byte as u64) ^ FINGERPRINT_TABLE[old_byte_index]; + fingerprint +} + +/// Split data using Content-Defined Chunking with Rabin fingerprinting +pub fn split_cdc_impl(data: &[u8], avg_size: u32) -> Result<ChunkingResult, StorageIOError> { + let avg_size = avg_size as usize; + + if avg_size == 0 { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Average chunk size must be greater than 0", + ) + .into()); + } + + // Calculate min and max chunk sizes based on average size + let min_chunk_size = (avg_size as f64 * MIN_CHUNK_RATIO) as usize; + let max_chunk_size = (avg_size as f64 * MAX_CHUNK_RATIO) as usize; + + // Ensure reasonable bounds + let min_chunk_size = min_chunk_size.max(512); // At least 512 bytes + let max_chunk_size = max_chunk_size.max(avg_size * 2); + + let target_mask = { + let scale_factor = avg_size as f64 / (64.0 * 1024.0); + let mask_value = (BASE_TARGET_MASK as f64 * scale_factor) as u64; + mask_value.max(0xC000) + }; + + let mut chunks = Vec::new(); + let mut start = 0; + let data_len = data.len(); + + while start < data_len { + let chunk_start = start; + + let search_end = (start + max_chunk_size).min(data_len); + let mut boundary_found = false; + let mut boundary_pos = search_end; + + let mut window = Vec::with_capacity(WINDOW_SIZE); + let mut fingerprint: u64 = 0; + + for i in start..search_end { + let byte = data[i]; + + // Update sliding window + if window.len() >= WINDOW_SIZE { + let old_byte = window.remove(0); + fingerprint = rabin_fingerprint_byte(fingerprint, old_byte, byte); + } else { + fingerprint = (fingerprint << 8) ^ (byte as u64); + } + window.push(byte); + + if i - chunk_start >= min_chunk_size { + if (fingerprint & target_mask) == 0 { + boundary_found = true; + boundary_pos = i + 1; + break; + } + } + } + + let chunk_end = if boundary_found { + boundary_pos + } else { + search_end + }; + + let chunk_data = data[chunk_start..chunk_end].to_vec(); + let chunk = crate::store::create_chunk(chunk_data); + chunks.push(chunk); + + start = chunk_end; + } + + if chunks.len() > 1 { + let mut merged_chunks = Vec::new(); + let mut i = 0; + + while i < chunks.len() { + let current_chunk = &chunks[i]; + + if i < chunks.len() - 1 { + let next_chunk = &chunks[i + 1]; + if current_chunk.data.len() < min_chunk_size + && current_chunk.data.len() + next_chunk.data.len() <= max_chunk_size + { + // Merge chunks + let mut merged_data = current_chunk.data.clone(); + merged_data.extend_from_slice(&next_chunk.data); + let merged_chunk = crate::store::create_chunk(merged_data); + merged_chunks.push(merged_chunk); + i += 2; + } else { + merged_chunks.push(current_chunk.clone()); + i += 1; + } + } else { + merged_chunks.push(current_chunk.clone()); + i += 1; + } + } + + chunks = merged_chunks; + } + + Ok(ChunkingResult { + chunks, + total_size: data_len as u64, + }) +} + +/// Split file using CDC with specified average chunk size +pub async fn write_file_cdc<I: Into<PathBuf>>( + file_to_write: I, + storage_dir: I, + output_index_file: I, + avg_size: u32, +) -> Result<(), StorageIOError> { + use crate::store::{StorageConfig, write_file}; + + let config = StorageConfig::cdc(avg_size); + write_file(file_to_write, storage_dir, output_index_file, &config).await +} + +/// Test function for CDC chunking +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_cdc_basic() { + let pattern: Vec<u8> = (0..1024).map(|i| (i % 256) as u8).collect(); + let mut data = Vec::new(); + for _ in 0..10 { + data.extend_from_slice(&pattern); + } + + let result = split_cdc_impl(&data, 4096).unwrap(); + + // Should have at least one chunk + assert!(!result.chunks.is_empty()); + + assert!( + result.chunks.len() >= 1 && result.chunks.len() <= 10, + "Expected 1-10 chunks for 10KB data with 4KB average, got {}", + result.chunks.len() + ); + + // Verify total size + let total_chunk_size: usize = result.chunks.iter().map(|c| c.data.len()).sum(); + assert_eq!(total_chunk_size, data.len()); + + // Verify chunk size bounds (more relaxed for CDC) + let max_size = (4096.0 * MAX_CHUNK_RATIO) as usize; + + for chunk in &result.chunks { + let chunk_size = chunk.data.len(); + // CDC can produce chunks smaller than min_size due to content patterns + // But they should not exceed max_size + assert!( + chunk_size <= max_size, + "Chunk size {} exceeds maximum {}", + chunk_size, + max_size + ); + } + } + + #[test] + fn test_cdc_small_file() { + // Small file should result in single chunk + let data = vec![1, 2, 3, 4, 5]; + + let result = split_cdc_impl(&data, 4096).unwrap(); + + // Should have exactly one chunk + assert_eq!(result.chunks.len(), 1); + assert_eq!(result.chunks[0].data.len(), data.len()); + } + + #[test] + fn test_cdc_large_avg_size() { + // Test with larger average size (256KB) + let mut data = Vec::new(); + for i in 0..(256 * 1024 * 3) { + // 768KB of data + data.push((i % 256) as u8); + } + + let result = split_cdc_impl(&data, 256 * 1024).unwrap(); + + // With 768KB data and 256KB average, should have reasonable number of chunks + // CDC is content-defined, so exact count varies + assert!( + result.chunks.len() >= 1 && result.chunks.len() <= 10, + "Expected 1-10 chunks for 768KB data with 256KB average, got {}", + result.chunks.len() + ); + + // Verify total size + let total_chunk_size: usize = result.chunks.iter().map(|c| c.data.len()).sum(); + assert_eq!(total_chunk_size, data.len()); + + // Verify chunk size bounds + let max_size = ((256 * 1024) as f64 * MAX_CHUNK_RATIO) as usize; + + for chunk in &result.chunks { + // CDC can produce chunks smaller than min_size + // But they should not exceed max_size + assert!( + chunk.data.len() <= max_size, + "Chunk size {} exceeds maximum {}", + chunk.data.len(), + max_size + ); + } + } + + #[test] + fn test_cdc_zero_avg_size() { + let data = vec![1, 2, 3]; + + let result = split_cdc_impl(&data, 0); + assert!(result.is_err()); + + match result { + Err(StorageIOError::IOErr(e)) => { + assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput); + } + _ => panic!("Expected IOErr with InvalidInput"), + } + } + + #[test] + fn test_rabin_fingerprint() { + // Test that rabin_fingerprint_byte function is deterministic + let test_bytes = [0x01, 0x02, 0x03, 0x04]; + + // Calculate fingerprint with specific sequence + let mut fingerprint1 = 0; + for &byte in &test_bytes { + fingerprint1 = rabin_fingerprint_byte(fingerprint1, 0xAA, byte); + } + + // Calculate again with same inputs + let mut fingerprint2 = 0; + for &byte in &test_bytes { + fingerprint2 = rabin_fingerprint_byte(fingerprint2, 0xAA, byte); + } + + // Same input should produce same output + assert_eq!(fingerprint1, fingerprint2); + + // Test with different old_byte produces different result + let mut fingerprint3 = 0; + for &byte in &test_bytes { + fingerprint3 = rabin_fingerprint_byte(fingerprint3, 0xBB, byte); + } + + // Different old_byte should produce different fingerprint + assert_ne!(fingerprint1, fingerprint3); + } +} diff --git a/systems/storage/src/store/fixed.rs b/systems/storage/src/store/fixed.rs new file mode 100644 index 0000000..044cc1c --- /dev/null +++ b/systems/storage/src/store/fixed.rs @@ -0,0 +1,417 @@ +use std::path::PathBuf; +use std::time::Instant; + +use log::{info, trace}; +use memmap2::Mmap; +use tokio::fs; +use tokio::task; + +use crate::{ + error::StorageIOError, + store::{ChunkingResult, IndexEntry, StorageConfig, create_chunk, get_dir, precheck}, +}; + +/// Split data using fixed-size chunking +pub fn split_fixed_impl(data: &[u8], chunk_size: u32) -> Result<ChunkingResult, StorageIOError> { + let chunk_size = chunk_size as usize; + + if chunk_size == 0 { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Chunk size must be greater than 0", + ) + .into()); + } + + let mut chunks = Vec::new(); + let mut start = 0; + let total_size = data.len(); + + while start < total_size { + let end = (start + chunk_size).min(total_size); + let chunk_data = data[start..end].to_vec(); + + let chunk = crate::store::create_chunk(chunk_data); + chunks.push(chunk); + + start = end; + } + + Ok(ChunkingResult { + chunks, + total_size: total_size as u64, + }) +} + +/// Split file using fixed-size chunking +pub async fn write_file_fixed<I: Into<PathBuf>>( + file_to_write: I, + storage_dir: I, + output_index_file: I, + fixed_size: u32, +) -> Result<(), StorageIOError> { + let config = StorageConfig::fixed_size(fixed_size); + write_file_parallel(file_to_write, storage_dir, output_index_file, &config).await +} + +/// Split file using fixed-size chunking with parallel processing +async fn write_file_parallel( + file_to_write: impl Into<PathBuf>, + storage_dir: impl Into<PathBuf>, + output_index_file: impl Into<PathBuf>, + cfg: &StorageConfig, +) -> Result<(), StorageIOError> { + let (file_to_write, storage_dir, output_index_file) = + precheck(file_to_write, storage_dir, output_index_file).await?; + + info!("Starting file write: {}", file_to_write.display()); + let start_time = Instant::now(); + + // Memory map the entire file + let file = std::fs::File::open(&file_to_write)?; + let mmap = unsafe { Mmap::map(&file)? }; + let data = &mmap[..]; + + // Split into chunks based on policy + let chunking_result = split_into_chunks_parallel(&data, &cfg.chunking_policy).await?; + + // Store chunks in parallel and create index + let index_entries = store_chunks_parallel(&chunking_result.chunks, &storage_dir).await?; + + // Write index file + write_index_file(&index_entries, &output_index_file).await?; + + let duration = start_time.elapsed(); + info!( + "File write completed in {:?}: {}", + duration, + file_to_write.display() + ); + + Ok(()) +} + +/// Split data into chunks based on the specified policy with parallel processing +async fn split_into_chunks_parallel( + data: &[u8], + policy: &crate::store::ChunkingPolicy, +) -> Result<ChunkingResult, StorageIOError> { + match policy { + crate::store::ChunkingPolicy::FixedSize(chunk_size) => { + split_fixed_parallel(data, *chunk_size).await + } + _ => split_fixed_impl(data, 64 * 1024), // Fallback for non-fixed chunking + } +} + +/// Split data using fixed-size chunking with parallel processing +async fn split_fixed_parallel( + data: &[u8], + chunk_size: u32, +) -> Result<ChunkingResult, StorageIOError> { + let chunk_size = chunk_size as usize; + + if chunk_size == 0 { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Chunk size must be greater than 0", + ) + .into()); + } + + let total_size = data.len(); + let num_chunks = (total_size + chunk_size - 1) / chunk_size; // Ceiling division + + // Create a vector to hold chunk boundaries + let mut chunk_boundaries = Vec::with_capacity(num_chunks); + let mut start = 0; + + while start < total_size { + let end = (start + chunk_size).min(total_size); + chunk_boundaries.push((start, end)); + start = end; + } + + // Process chunks in parallel using Tokio tasks + let chunks: Vec<crate::store::Chunk> = if chunk_boundaries.len() > 1 { + // Use parallel processing for multiple chunks + let mut tasks = Vec::with_capacity(chunk_boundaries.len()); + + for (start, end) in chunk_boundaries { + let chunk_data = data[start..end].to_vec(); + + // Spawn a blocking task for each chunk + tasks.push(task::spawn_blocking(move || create_chunk(chunk_data))); + } + + // Wait for all tasks to complete + let mut chunks = Vec::with_capacity(tasks.len()); + for task in tasks { + match task.await { + Ok(chunk) => chunks.push(chunk), + Err(e) => { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!("Task join error: {}", e), + ) + .into()); + } + } + } + + chunks + } else { + // Single chunk, no need for parallel processing + chunk_boundaries + .into_iter() + .map(|(start, end)| { + let chunk_data = data[start..end].to_vec(); + create_chunk(chunk_data) + }) + .collect() + }; + + Ok(ChunkingResult { + chunks, + total_size: total_size as u64, + }) +} + +/// Store chunks in the storage directory in parallel and return index entries +async fn store_chunks_parallel( + chunks: &[crate::store::Chunk], + storage_dir: &std::path::Path, +) -> Result<Vec<IndexEntry>, StorageIOError> { + let mut tasks = Vec::with_capacity(chunks.len()); + + let writed_counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let total_chunks = chunks.len(); + + for chunk in chunks { + let chunk = chunk.clone(); + let storage_dir = storage_dir.to_path_buf(); + let writed_counter = writed_counter.clone(); + + // Spawn async task for each chunk storage operation + tasks.push(task::spawn(async move { + // Create storage directory structure based on hash + let hash_str = hex::encode(chunk.hash); + let chunk_dir = get_dir(storage_dir, hash_str.clone())?; + + // Create directory if it doesn't exist + if let Some(parent) = chunk_dir.parent() { + fs::create_dir_all(parent).await?; + } + + // Write chunk data + let chunk_path = chunk_dir.with_extension("chunk"); + if !chunk_path.exists() { + trace!("W: {}", hash_str); + fs::write(&chunk_path, &chunk.data).await?; + writed_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + + Ok::<IndexEntry, StorageIOError>(IndexEntry { + hash: chunk.hash, + size: chunk.data.len() as u32, + }) + })); + } + + let writed_count = writed_counter.load(std::sync::atomic::Ordering::Relaxed); + info!( + "Chunk storage completed: {}/{} ({}%) chunks written ({} duplicates skipped)", + writed_count, + total_chunks, + (writed_count as f32 / total_chunks as f32) * 100 as f32, + total_chunks - writed_count + ); + + // Wait for all tasks to complete + let mut index_entries = Vec::with_capacity(chunks.len()); + for task in tasks { + let entry = task.await.map_err(|e| { + std::io::Error::new(std::io::ErrorKind::Other, format!("Task join error: {}", e)) + })??; + index_entries.push(entry); + } + + Ok(index_entries) +} + +/// Write index file containing chunk hashes and sizes +async fn write_index_file( + entries: &[IndexEntry], + output_path: &std::path::Path, +) -> Result<(), StorageIOError> { + let mut index_data = Vec::with_capacity(entries.len() * 36); // 32 bytes hash + 4 bytes size + + for entry in entries { + index_data.extend_from_slice(&entry.hash); + index_data.extend_from_slice(&entry.size.to_le_bytes()); + } + + fs::write(output_path, &index_data).await?; + Ok(()) +} + +/// Utility function to calculate optimal fixed chunk size based on file size +pub fn calculate_optimal_chunk_size(file_size: u64, target_chunks: usize) -> u32 { + if target_chunks == 0 || file_size == 0 { + return 64 * 1024; // Default 64KB + } + + let chunk_size = (file_size as f64 / target_chunks as f64).ceil() as u32; + + // Round to nearest power of 2 for better performance + let rounded_size = if chunk_size <= 1024 { + // Small chunks: use exact size + chunk_size + } else { + // Larger chunks: round to nearest power of 2 + let mut size = chunk_size; + size -= 1; + size |= size >> 1; + size |= size >> 2; + size |= size >> 4; + size |= size >> 8; + size |= size >> 16; + size += 1; + size + }; + + // Ensure minimum and maximum bounds + rounded_size.max(1024).min(16 * 1024 * 1024) // 1KB min, 16MB max +} + +/// Split file with automatic chunk size calculation +pub async fn write_file_fixed_auto<I: Into<PathBuf>, J: Into<PathBuf>, K: Into<PathBuf>>( + file_to_write: I, + storage_dir: J, + output_index_file: K, + target_chunks: usize, +) -> Result<(), StorageIOError> { + let file_path = file_to_write.into(); + let storage_dir = storage_dir.into(); + let output_index_file = output_index_file.into(); + + let file_size = fs::metadata(&file_path).await?.len(); + + let chunk_size = calculate_optimal_chunk_size(file_size, target_chunks); + write_file_fixed(file_path, storage_dir, output_index_file, chunk_size).await +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_fixed_chunking_basic() { + // Create 10KB of test data + let data: Vec<u8> = (0..10240).map(|i| (i % 256) as u8).collect(); + + // Split into 1KB chunks + let result = split_fixed_impl(&data, 1024).unwrap(); + + // Should have 10 chunks + assert_eq!(result.chunks.len(), 10); + + // Verify total size + let total_chunk_size: usize = result.chunks.iter().map(|c| c.data.len()).sum(); + assert_eq!(total_chunk_size, data.len()); + + // Verify chunk sizes (last chunk may be smaller) + for (i, chunk) in result.chunks.iter().enumerate() { + if i < 9 { + assert_eq!(chunk.data.len(), 1024); + } else { + assert_eq!(chunk.data.len(), 1024); // 10240 / 1024 = 10 exactly + } + } + } + + #[test] + fn test_fixed_chunking_uneven() { + // Create 5.5KB of test data + let data: Vec<u8> = (0..5632).map(|i| (i % 256) as u8).collect(); + + // Split into 2KB chunks + let result = split_fixed_impl(&data, 2048).unwrap(); + + // Should have 3 chunks (2048 + 2048 + 1536) + assert_eq!(result.chunks.len(), 3); + + // Verify chunk sizes + assert_eq!(result.chunks[0].data.len(), 2048); + assert_eq!(result.chunks[1].data.len(), 2048); + assert_eq!(result.chunks[2].data.len(), 1536); + + // Verify data integrity + let mut reconstructed = Vec::new(); + for chunk in &result.chunks { + reconstructed.extend_from_slice(&chunk.data); + } + assert_eq!(reconstructed, data); + } + + #[test] + fn test_fixed_chunking_small_file() { + // Small file smaller than chunk size + let data = vec![1, 2, 3, 4, 5]; + + let result = split_fixed_impl(&data, 1024).unwrap(); + + // Should have exactly one chunk + assert_eq!(result.chunks.len(), 1); + assert_eq!(result.chunks[0].data.len(), data.len()); + } + + #[test] + fn test_fixed_chunking_zero_size() { + let data = vec![1, 2, 3]; + + let result = split_fixed_impl(&data, 0); + assert!(result.is_err()); + + match result { + Err(StorageIOError::IOErr(e)) => { + assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput); + } + _ => panic!("Expected IOErr with InvalidInput"), + } + } + + #[test] + fn test_calculate_optimal_chunk_size() { + // Test basic calculation + assert_eq!(calculate_optimal_chunk_size(1024 * 1024, 16), 64 * 1024); // 1MB / 16 = 64KB + + // Test rounding to power of 2 + assert_eq!(calculate_optimal_chunk_size(1000 * 1000, 17), 64 * 1024); // ~58.8KB rounds to 64KB + + // Test minimum bound + assert_eq!(calculate_optimal_chunk_size(100, 10), 1024); // 10 bytes per chunk, but min is 1KB + + // Test edge cases + assert_eq!(calculate_optimal_chunk_size(0, 10), 64 * 1024); // Default + assert_eq!(calculate_optimal_chunk_size(1000, 0), 64 * 1024); // Default + + // Test large file + assert_eq!( + calculate_optimal_chunk_size(100 * 1024 * 1024, 10), + 16 * 1024 * 1024 + ); // 100MB / 10 = 10MB, max is 16MB + } + + #[test] + fn test_chunk_hash_uniqueness() { + // Test that different data produces different hashes + let data1 = vec![1, 2, 3, 4, 5]; + let data2 = vec![1, 2, 3, 4, 6]; + + let result1 = split_fixed_impl(&data1, 1024).unwrap(); + let result2 = split_fixed_impl(&data2, 1024).unwrap(); + + assert_ne!(result1.chunks[0].hash, result2.chunks[0].hash); + } +} diff --git a/systems/storage/src/store/line.rs b/systems/storage/src/store/line.rs new file mode 100644 index 0000000..971018b --- /dev/null +++ b/systems/storage/src/store/line.rs @@ -0,0 +1,393 @@ +use std::path::PathBuf; + +use crate::{error::StorageIOError, store::ChunkingResult}; + +/// Split data by lines (newline characters) +pub fn split_by_lines_impl(data: &[u8]) -> Result<ChunkingResult, StorageIOError> { + let mut chunks = Vec::new(); + let mut start = 0; + let total_size = data.len(); + + // Iterate through data to find line boundaries + let mut i = 0; + while i < data.len() { + if data[i] == b'\n' { + // Unix line ending + let line_end = i + 1; // Include \n + // Extract line data (include newline character) + let line_data = data[start..line_end].to_vec(); + + // Create chunk for this line + let chunk = crate::store::create_chunk(line_data); + chunks.push(chunk); + + // Move start to next line + start = line_end; + i = line_end; + } else if data[i] == b'\r' && i + 1 < data.len() && data[i + 1] == b'\n' { + // Windows line ending + let line_end = i + 2; // Include both \r and \n + // Extract line data (include newline characters) + let line_data = data[start..line_end].to_vec(); + + // Create chunk for this line + let chunk = crate::store::create_chunk(line_data); + chunks.push(chunk); + + // Move start to next line + start = line_end; + i = line_end; + } else { + i += 1; + } + } + + // Handle remaining data (last line without newline) + if start < total_size { + let line_data = data[start..].to_vec(); + let chunk = crate::store::create_chunk(line_data); + chunks.push(chunk); + } + + // Handle empty file (no lines) + if chunks.is_empty() && total_size == 0 { + let chunk = crate::store::create_chunk(Vec::new()); + chunks.push(chunk); + } + + Ok(ChunkingResult { + chunks, + total_size: total_size as u64, + }) +} + +/// Split file by lines +pub async fn write_file_line<I: Into<PathBuf>>( + file_to_write: I, + storage_dir: I, + output_index_file: I, +) -> Result<(), StorageIOError> { + use crate::store::{StorageConfig, write_file}; + + let config = StorageConfig::line(); + write_file(file_to_write, storage_dir, output_index_file, &config).await +} + +/// Utility function to split data by lines with custom line ending detection +pub fn split_by_lines_custom<E: LineEnding>(data: &[u8]) -> Result<ChunkingResult, StorageIOError> { + let mut chunks = Vec::new(); + let mut start = 0; + let total_size = data.len(); + + let mut i = 0; + while i < total_size { + if E::is_line_ending(data, i) { + let line_end = i + E::ending_length(data, i); + let line_data = data[start..line_end].to_vec(); + + let chunk = crate::store::create_chunk(line_data); + chunks.push(chunk); + + start = line_end; + i = line_end; + } else { + i += 1; + } + } + + // Handle remaining data + if start < total_size { + let line_data = data[start..].to_vec(); + let chunk = crate::store::create_chunk(line_data); + chunks.push(chunk); + } + + // Handle empty file + if chunks.is_empty() && total_size == 0 { + let chunk = crate::store::create_chunk(Vec::new()); + chunks.push(chunk); + } + + Ok(ChunkingResult { + chunks, + total_size: total_size as u64, + }) +} + +/// Trait for different line ending types +pub trait LineEnding { + /// Check if position i is the start of a line ending + fn is_line_ending(data: &[u8], i: usize) -> bool; + + /// Get the length of the line ending at position i + fn ending_length(data: &[u8], i: usize) -> usize; +} + +/// Unix line endings (\n) +pub struct UnixLineEnding; + +impl LineEnding for UnixLineEnding { + fn is_line_ending(data: &[u8], i: usize) -> bool { + i < data.len() && data[i] == b'\n' + } + + fn ending_length(_data: &[u8], _i: usize) -> usize { + 1 + } +} + +/// Windows line endings (\r\n) +pub struct WindowsLineEnding; + +impl LineEnding for WindowsLineEnding { + fn is_line_ending(data: &[u8], i: usize) -> bool { + i + 1 < data.len() && data[i] == b'\r' && data[i + 1] == b'\n' + } + + fn ending_length(_data: &[u8], _i: usize) -> usize { + 2 + } +} + +/// Mixed line endings (detects both Unix and Windows) +pub struct MixedLineEnding; + +impl LineEnding for MixedLineEnding { + fn is_line_ending(data: &[u8], i: usize) -> bool { + if i < data.len() && data[i] == b'\n' { + true + } else if i + 1 < data.len() && data[i] == b'\r' && data[i + 1] == b'\n' { + true + } else { + false + } + } + + fn ending_length(data: &[u8], i: usize) -> usize { + if i < data.len() && data[i] == b'\n' { + 1 + } else if i + 1 < data.len() && data[i] == b'\r' && data[i + 1] == b'\n' { + 2 + } else { + 1 // Default to 1 if somehow called incorrectly + } + } +} + +/// Detect line ending type from data +pub fn detect_line_ending(data: &[u8]) -> LineEndingType { + let mut unix_count = 0; + let mut windows_count = 0; + + let mut i = 0; + while i < data.len() { + if data[i] == b'\n' { + unix_count += 1; + i += 1; + } else if i + 1 < data.len() && data[i] == b'\r' && data[i + 1] == b'\n' { + windows_count += 1; + i += 2; + } else { + i += 1; + } + } + + if unix_count > windows_count { + LineEndingType::Unix + } else if windows_count > unix_count { + LineEndingType::Windows + } else { + LineEndingType::Mixed + } +} + +/// Line ending type enum +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum LineEndingType { + Unix, + Windows, + Mixed, +} + +impl LineEndingType { + /// Split data using the detected line ending type + pub fn split_by_lines(&self, data: &[u8]) -> Result<ChunkingResult, StorageIOError> { + match self { + LineEndingType::Unix => split_by_lines_custom::<UnixLineEnding>(data), + LineEndingType::Windows => split_by_lines_custom::<WindowsLineEnding>(data), + LineEndingType::Mixed => split_by_lines_custom::<MixedLineEnding>(data), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_line_chunking_unix() { + let data = b"Hello\nWorld\nTest\n"; + + let result = split_by_lines_impl(data).unwrap(); + + // Should have 3 chunks + assert_eq!(result.chunks.len(), 3); + + // Verify chunk contents + assert_eq!(result.chunks[0].data, b"Hello\n"); + assert_eq!(result.chunks[1].data, b"World\n"); + assert_eq!(result.chunks[2].data, b"Test\n"); + + // Verify total size + let total_chunk_size: usize = result.chunks.iter().map(|c| c.data.len()).sum(); + assert_eq!(total_chunk_size, data.len()); + } + + #[test] + fn test_line_chunking_windows() { + let data = b"Hello\r\nWorld\r\nTest\r\n"; + + let result = split_by_lines_impl(data).unwrap(); + + // Should have 3 chunks + assert_eq!(result.chunks.len(), 3); + + // Verify chunk contents (should include \r\n) + assert_eq!(result.chunks[0].data, b"Hello\r\n"); + assert_eq!(result.chunks[1].data, b"World\r\n"); + assert_eq!(result.chunks[2].data, b"Test\r\n"); + } + + #[test] + fn test_line_chunking_mixed() { + let data = b"Hello\nWorld\r\nTest\n"; + + let result = split_by_lines_impl(data).unwrap(); + + // Should have 3 chunks + assert_eq!(result.chunks.len(), 3); + + // Verify chunk contents + assert_eq!(result.chunks[0].data, b"Hello\n"); + assert_eq!(result.chunks[1].data, b"World\r\n"); + assert_eq!(result.chunks[2].data, b"Test\n"); + } + + #[test] + fn test_line_chunking_no_trailing_newline() { + let data = b"Hello\nWorld\nTest"; + + let result = split_by_lines_impl(data).unwrap(); + + // Should have 3 chunks + assert_eq!(result.chunks.len(), 3); + + // Verify chunk contents + assert_eq!(result.chunks[0].data, b"Hello\n"); + assert_eq!(result.chunks[1].data, b"World\n"); + assert_eq!(result.chunks[2].data, b"Test"); + } + + #[test] + fn test_line_chunking_empty_lines() { + let data = b"Hello\n\nWorld\n\n\n"; + + let result = split_by_lines_impl(data).unwrap(); + + // Should have 5 chunks (including empty lines) + // "Hello\n", "\n", "World\n", "\n", "\n" + assert_eq!(result.chunks.len(), 5); + + // Verify chunk contents + assert_eq!(result.chunks[0].data, b"Hello\n"); + assert_eq!(result.chunks[1].data, b"\n"); + assert_eq!(result.chunks[2].data, b"World\n"); + assert_eq!(result.chunks[3].data, b"\n"); + assert_eq!(result.chunks[4].data, b"\n"); + } + + #[test] + fn test_line_chunking_empty_file() { + let data = b""; + + let result = split_by_lines_impl(data).unwrap(); + + // Should have 1 empty chunk + assert_eq!(result.chunks.len(), 1); + assert_eq!(result.chunks[0].data, b""); + } + + #[test] + fn test_detect_line_ending() { + // Test Unix detection + let unix_data = b"Hello\nWorld\n"; + assert_eq!(detect_line_ending(unix_data), LineEndingType::Unix); + + // Test Windows detection + let windows_data = b"Hello\r\nWorld\r\n"; + assert_eq!(detect_line_ending(windows_data), LineEndingType::Windows); + + // Test mixed detection + let mixed_data = b"Hello\nWorld\r\n"; + assert_eq!(detect_line_ending(mixed_data), LineEndingType::Mixed); + + // Test no newlines + let no_newlines = b"Hello World"; + assert_eq!(detect_line_ending(no_newlines), LineEndingType::Mixed); + } + + #[test] + fn test_custom_line_ending_unix() { + let data = b"Hello\nWorld\n"; + + let result = split_by_lines_custom::<UnixLineEnding>(data).unwrap(); + + assert_eq!(result.chunks.len(), 2); + assert_eq!(result.chunks[0].data, b"Hello\n"); + assert_eq!(result.chunks[1].data, b"World\n"); + } + + #[test] + fn test_custom_line_ending_windows() { + let data = b"Hello\r\nWorld\r\n"; + + let result = split_by_lines_custom::<WindowsLineEnding>(data).unwrap(); + + assert_eq!(result.chunks.len(), 2); + assert_eq!(result.chunks[0].data, b"Hello\r\n"); + assert_eq!(result.chunks[1].data, b"World\r\n"); + } + + #[test] + fn test_line_ending_type_split() { + let unix_data = b"Hello\nWorld\n"; + let windows_data = b"Hello\r\nWorld\r\n"; + let mixed_data = b"Hello\nWorld\r\n"; + + // Test Unix + let unix_result = LineEndingType::Unix.split_by_lines(unix_data).unwrap(); + assert_eq!(unix_result.chunks.len(), 2); + + // Test Windows + let windows_result = LineEndingType::Windows + .split_by_lines(windows_data) + .unwrap(); + assert_eq!(windows_result.chunks.len(), 2); + + // Test Mixed + let mixed_result = LineEndingType::Mixed.split_by_lines(mixed_data).unwrap(); + assert_eq!(mixed_result.chunks.len(), 2); + } + + #[test] + fn test_chunk_hash_uniqueness() { + // Test that different lines produce different hashes + let data1 = b"Hello\n"; + let data2 = b"World\n"; + + let result1 = split_by_lines_impl(data1).unwrap(); + let result2 = split_by_lines_impl(data2).unwrap(); + + assert_ne!(result1.chunks[0].hash, result2.chunks[0].hash); + } +} |
