From 76e78fe53c03c9b4c7fa029709f06ee86ce9c865 Mon Sep 17 00:00:00 2001 From: 魏曹先生 <1992414357@qq.com> Date: Fri, 27 Feb 2026 06:17:06 +0800 Subject: Add storage system with chunk-based file storage --- systems/storage/src/store/fixed.rs | 417 +++++++++++++++++++++++++++++++++++++ 1 file changed, 417 insertions(+) create mode 100644 systems/storage/src/store/fixed.rs (limited to 'systems/storage/src/store/fixed.rs') diff --git a/systems/storage/src/store/fixed.rs b/systems/storage/src/store/fixed.rs new file mode 100644 index 0000000..044cc1c --- /dev/null +++ b/systems/storage/src/store/fixed.rs @@ -0,0 +1,417 @@ +use std::path::PathBuf; +use std::time::Instant; + +use log::{info, trace}; +use memmap2::Mmap; +use tokio::fs; +use tokio::task; + +use crate::{ + error::StorageIOError, + store::{ChunkingResult, IndexEntry, StorageConfig, create_chunk, get_dir, precheck}, +}; + +/// Split data using fixed-size chunking +pub fn split_fixed_impl(data: &[u8], chunk_size: u32) -> Result { + let chunk_size = chunk_size as usize; + + if chunk_size == 0 { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Chunk size must be greater than 0", + ) + .into()); + } + + let mut chunks = Vec::new(); + let mut start = 0; + let total_size = data.len(); + + while start < total_size { + let end = (start + chunk_size).min(total_size); + let chunk_data = data[start..end].to_vec(); + + let chunk = crate::store::create_chunk(chunk_data); + chunks.push(chunk); + + start = end; + } + + Ok(ChunkingResult { + chunks, + total_size: total_size as u64, + }) +} + +/// Split file using fixed-size chunking +pub async fn write_file_fixed>( + file_to_write: I, + storage_dir: I, + output_index_file: I, + fixed_size: u32, +) -> Result<(), StorageIOError> { + let config = StorageConfig::fixed_size(fixed_size); + write_file_parallel(file_to_write, storage_dir, output_index_file, &config).await +} + +/// Split file using fixed-size chunking with parallel processing +async fn write_file_parallel( + file_to_write: impl Into, + storage_dir: impl Into, + output_index_file: impl Into, + cfg: &StorageConfig, +) -> Result<(), StorageIOError> { + let (file_to_write, storage_dir, output_index_file) = + precheck(file_to_write, storage_dir, output_index_file).await?; + + info!("Starting file write: {}", file_to_write.display()); + let start_time = Instant::now(); + + // Memory map the entire file + let file = std::fs::File::open(&file_to_write)?; + let mmap = unsafe { Mmap::map(&file)? }; + let data = &mmap[..]; + + // Split into chunks based on policy + let chunking_result = split_into_chunks_parallel(&data, &cfg.chunking_policy).await?; + + // Store chunks in parallel and create index + let index_entries = store_chunks_parallel(&chunking_result.chunks, &storage_dir).await?; + + // Write index file + write_index_file(&index_entries, &output_index_file).await?; + + let duration = start_time.elapsed(); + info!( + "File write completed in {:?}: {}", + duration, + file_to_write.display() + ); + + Ok(()) +} + +/// Split data into chunks based on the specified policy with parallel processing +async fn split_into_chunks_parallel( + data: &[u8], + policy: &crate::store::ChunkingPolicy, +) -> Result { + match policy { + crate::store::ChunkingPolicy::FixedSize(chunk_size) => { + split_fixed_parallel(data, *chunk_size).await + } + _ => split_fixed_impl(data, 64 * 1024), // Fallback for non-fixed chunking + } +} + +/// Split data using fixed-size chunking with parallel processing +async fn split_fixed_parallel( + data: &[u8], + chunk_size: u32, +) -> Result { + let chunk_size = chunk_size as usize; + + if chunk_size == 0 { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Chunk size must be greater than 0", + ) + .into()); + } + + let total_size = data.len(); + let num_chunks = (total_size + chunk_size - 1) / chunk_size; // Ceiling division + + // Create a vector to hold chunk boundaries + let mut chunk_boundaries = Vec::with_capacity(num_chunks); + let mut start = 0; + + while start < total_size { + let end = (start + chunk_size).min(total_size); + chunk_boundaries.push((start, end)); + start = end; + } + + // Process chunks in parallel using Tokio tasks + let chunks: Vec = if chunk_boundaries.len() > 1 { + // Use parallel processing for multiple chunks + let mut tasks = Vec::with_capacity(chunk_boundaries.len()); + + for (start, end) in chunk_boundaries { + let chunk_data = data[start..end].to_vec(); + + // Spawn a blocking task for each chunk + tasks.push(task::spawn_blocking(move || create_chunk(chunk_data))); + } + + // Wait for all tasks to complete + let mut chunks = Vec::with_capacity(tasks.len()); + for task in tasks { + match task.await { + Ok(chunk) => chunks.push(chunk), + Err(e) => { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!("Task join error: {}", e), + ) + .into()); + } + } + } + + chunks + } else { + // Single chunk, no need for parallel processing + chunk_boundaries + .into_iter() + .map(|(start, end)| { + let chunk_data = data[start..end].to_vec(); + create_chunk(chunk_data) + }) + .collect() + }; + + Ok(ChunkingResult { + chunks, + total_size: total_size as u64, + }) +} + +/// Store chunks in the storage directory in parallel and return index entries +async fn store_chunks_parallel( + chunks: &[crate::store::Chunk], + storage_dir: &std::path::Path, +) -> Result, StorageIOError> { + let mut tasks = Vec::with_capacity(chunks.len()); + + let writed_counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let total_chunks = chunks.len(); + + for chunk in chunks { + let chunk = chunk.clone(); + let storage_dir = storage_dir.to_path_buf(); + let writed_counter = writed_counter.clone(); + + // Spawn async task for each chunk storage operation + tasks.push(task::spawn(async move { + // Create storage directory structure based on hash + let hash_str = hex::encode(chunk.hash); + let chunk_dir = get_dir(storage_dir, hash_str.clone())?; + + // Create directory if it doesn't exist + if let Some(parent) = chunk_dir.parent() { + fs::create_dir_all(parent).await?; + } + + // Write chunk data + let chunk_path = chunk_dir.with_extension("chunk"); + if !chunk_path.exists() { + trace!("W: {}", hash_str); + fs::write(&chunk_path, &chunk.data).await?; + writed_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + + Ok::(IndexEntry { + hash: chunk.hash, + size: chunk.data.len() as u32, + }) + })); + } + + let writed_count = writed_counter.load(std::sync::atomic::Ordering::Relaxed); + info!( + "Chunk storage completed: {}/{} ({}%) chunks written ({} duplicates skipped)", + writed_count, + total_chunks, + (writed_count as f32 / total_chunks as f32) * 100 as f32, + total_chunks - writed_count + ); + + // Wait for all tasks to complete + let mut index_entries = Vec::with_capacity(chunks.len()); + for task in tasks { + let entry = task.await.map_err(|e| { + std::io::Error::new(std::io::ErrorKind::Other, format!("Task join error: {}", e)) + })??; + index_entries.push(entry); + } + + Ok(index_entries) +} + +/// Write index file containing chunk hashes and sizes +async fn write_index_file( + entries: &[IndexEntry], + output_path: &std::path::Path, +) -> Result<(), StorageIOError> { + let mut index_data = Vec::with_capacity(entries.len() * 36); // 32 bytes hash + 4 bytes size + + for entry in entries { + index_data.extend_from_slice(&entry.hash); + index_data.extend_from_slice(&entry.size.to_le_bytes()); + } + + fs::write(output_path, &index_data).await?; + Ok(()) +} + +/// Utility function to calculate optimal fixed chunk size based on file size +pub fn calculate_optimal_chunk_size(file_size: u64, target_chunks: usize) -> u32 { + if target_chunks == 0 || file_size == 0 { + return 64 * 1024; // Default 64KB + } + + let chunk_size = (file_size as f64 / target_chunks as f64).ceil() as u32; + + // Round to nearest power of 2 for better performance + let rounded_size = if chunk_size <= 1024 { + // Small chunks: use exact size + chunk_size + } else { + // Larger chunks: round to nearest power of 2 + let mut size = chunk_size; + size -= 1; + size |= size >> 1; + size |= size >> 2; + size |= size >> 4; + size |= size >> 8; + size |= size >> 16; + size += 1; + size + }; + + // Ensure minimum and maximum bounds + rounded_size.max(1024).min(16 * 1024 * 1024) // 1KB min, 16MB max +} + +/// Split file with automatic chunk size calculation +pub async fn write_file_fixed_auto, J: Into, K: Into>( + file_to_write: I, + storage_dir: J, + output_index_file: K, + target_chunks: usize, +) -> Result<(), StorageIOError> { + let file_path = file_to_write.into(); + let storage_dir = storage_dir.into(); + let output_index_file = output_index_file.into(); + + let file_size = fs::metadata(&file_path).await?.len(); + + let chunk_size = calculate_optimal_chunk_size(file_size, target_chunks); + write_file_fixed(file_path, storage_dir, output_index_file, chunk_size).await +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_fixed_chunking_basic() { + // Create 10KB of test data + let data: Vec = (0..10240).map(|i| (i % 256) as u8).collect(); + + // Split into 1KB chunks + let result = split_fixed_impl(&data, 1024).unwrap(); + + // Should have 10 chunks + assert_eq!(result.chunks.len(), 10); + + // Verify total size + let total_chunk_size: usize = result.chunks.iter().map(|c| c.data.len()).sum(); + assert_eq!(total_chunk_size, data.len()); + + // Verify chunk sizes (last chunk may be smaller) + for (i, chunk) in result.chunks.iter().enumerate() { + if i < 9 { + assert_eq!(chunk.data.len(), 1024); + } else { + assert_eq!(chunk.data.len(), 1024); // 10240 / 1024 = 10 exactly + } + } + } + + #[test] + fn test_fixed_chunking_uneven() { + // Create 5.5KB of test data + let data: Vec = (0..5632).map(|i| (i % 256) as u8).collect(); + + // Split into 2KB chunks + let result = split_fixed_impl(&data, 2048).unwrap(); + + // Should have 3 chunks (2048 + 2048 + 1536) + assert_eq!(result.chunks.len(), 3); + + // Verify chunk sizes + assert_eq!(result.chunks[0].data.len(), 2048); + assert_eq!(result.chunks[1].data.len(), 2048); + assert_eq!(result.chunks[2].data.len(), 1536); + + // Verify data integrity + let mut reconstructed = Vec::new(); + for chunk in &result.chunks { + reconstructed.extend_from_slice(&chunk.data); + } + assert_eq!(reconstructed, data); + } + + #[test] + fn test_fixed_chunking_small_file() { + // Small file smaller than chunk size + let data = vec![1, 2, 3, 4, 5]; + + let result = split_fixed_impl(&data, 1024).unwrap(); + + // Should have exactly one chunk + assert_eq!(result.chunks.len(), 1); + assert_eq!(result.chunks[0].data.len(), data.len()); + } + + #[test] + fn test_fixed_chunking_zero_size() { + let data = vec![1, 2, 3]; + + let result = split_fixed_impl(&data, 0); + assert!(result.is_err()); + + match result { + Err(StorageIOError::IOErr(e)) => { + assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput); + } + _ => panic!("Expected IOErr with InvalidInput"), + } + } + + #[test] + fn test_calculate_optimal_chunk_size() { + // Test basic calculation + assert_eq!(calculate_optimal_chunk_size(1024 * 1024, 16), 64 * 1024); // 1MB / 16 = 64KB + + // Test rounding to power of 2 + assert_eq!(calculate_optimal_chunk_size(1000 * 1000, 17), 64 * 1024); // ~58.8KB rounds to 64KB + + // Test minimum bound + assert_eq!(calculate_optimal_chunk_size(100, 10), 1024); // 10 bytes per chunk, but min is 1KB + + // Test edge cases + assert_eq!(calculate_optimal_chunk_size(0, 10), 64 * 1024); // Default + assert_eq!(calculate_optimal_chunk_size(1000, 0), 64 * 1024); // Default + + // Test large file + assert_eq!( + calculate_optimal_chunk_size(100 * 1024 * 1024, 10), + 16 * 1024 * 1024 + ); // 100MB / 10 = 10MB, max is 16MB + } + + #[test] + fn test_chunk_hash_uniqueness() { + // Test that different data produces different hashes + let data1 = vec![1, 2, 3, 4, 5]; + let data2 = vec![1, 2, 3, 4, 6]; + + let result1 = split_fixed_impl(&data1, 1024).unwrap(); + let result2 = split_fixed_impl(&data2, 1024).unwrap(); + + assert_ne!(result1.chunks[0].hash, result2.chunks[0].hash); + } +} -- cgit