summaryrefslogtreecommitdiff
path: root/systems/storage/src/store.rs
diff options
context:
space:
mode:
author魏曹先生 <1992414357@qq.com>2026-02-27 06:17:06 +0800
committer魏曹先生 <1992414357@qq.com>2026-02-27 06:17:06 +0800
commit76e78fe53c03c9b4c7fa029709f06ee86ce9c865 (patch)
tree4e3778dfb405b2c21b51df24331100b94f5356d9 /systems/storage/src/store.rs
parent748c8a3353df887ee4b01e0e1327aa95c1c7225a (diff)
Add storage system with chunk-based file storage
Diffstat (limited to 'systems/storage/src/store.rs')
-rw-r--r--systems/storage/src/store.rs493
1 files changed, 493 insertions, 0 deletions
diff --git a/systems/storage/src/store.rs b/systems/storage/src/store.rs
new file mode 100644
index 0000000..6492449
--- /dev/null
+++ b/systems/storage/src/store.rs
@@ -0,0 +1,493 @@
+use std::path::{Path, PathBuf};
+use std::time::Instant;
+
+use blake3;
+use log::{info, trace};
+use memmap2::Mmap;
+use tokio::fs;
+
+use crate::error::StorageIOError;
+
+pub mod cdc;
+pub mod fixed;
+pub mod line;
+
+#[derive(Default, Debug)]
+pub struct StorageConfig {
+ /// Chunking policy for splitting files
+ pub chunking_policy: ChunkingPolicy,
+}
+
+impl StorageConfig {
+ /// Create a new StorageConfig with CDC chunking policy
+ pub fn cdc(avg_size: u32) -> Self {
+ Self {
+ chunking_policy: ChunkingPolicy::Cdc(avg_size),
+ }
+ }
+
+ /// Create a new StorageConfig with fixed-size chunking policy
+ pub fn fixed_size(chunk_size: u32) -> Self {
+ Self {
+ chunking_policy: ChunkingPolicy::FixedSize(chunk_size),
+ }
+ }
+
+ /// Create a new StorageConfig with line-based chunking policy
+ pub fn line() -> Self {
+ Self {
+ chunking_policy: ChunkingPolicy::Line,
+ }
+ }
+}
+
+#[derive(Debug)]
+pub enum ChunkingPolicy {
+ /// Content-Defined Chunking using Rabin fingerprinting
+ /// The `u32` value represents the desired average chunk size in bytes
+ Cdc(u32),
+
+ /// Fixed-size chunking
+ /// The `u32` value represents the exact size of each chunk in bytes
+ FixedSize(u32),
+
+ /// Line-based chunking
+ /// Each line becomes a separate chunk
+ Line,
+}
+
+impl Default for ChunkingPolicy {
+ fn default() -> Self {
+ ChunkingPolicy::Cdc(64 * 1024) // Default to 64KB CDC
+ }
+}
+
+/// Represents a chunk of data with its hash and content
+#[derive(Debug, Clone)]
+pub struct Chunk {
+ /// Blake3 hash of the chunk content
+ pub hash: [u8; 32],
+ /// Raw chunk data
+ pub data: Vec<u8>,
+}
+
+/// Represents an index entry pointing to a chunk
+#[derive(Debug, Clone)]
+pub struct IndexEntry {
+ /// Blake3 hash of the chunk
+ pub hash: [u8; 32],
+ /// Size of the chunk in bytes
+ pub size: u32,
+}
+
+/// Result of chunking a file
+#[derive(Debug)]
+pub struct ChunkingResult {
+ /// List of chunks extracted from the file
+ pub chunks: Vec<Chunk>,
+ /// Total size of the original file
+ pub total_size: u64,
+}
+
+/// Split a file into chunks and store them in the repository, then output the index file to the specified directory
+pub async fn write_file(
+ file_to_write: impl Into<PathBuf>,
+ storage_dir: impl Into<PathBuf>,
+ output_index_file: impl Into<PathBuf>,
+ cfg: &StorageConfig,
+) -> Result<(), StorageIOError> {
+ let (file_to_write, storage_dir, output_index_file) =
+ precheck(file_to_write, storage_dir, output_index_file).await?;
+
+ info!("Starting file write: {}", file_to_write.display());
+ let start_time = Instant::now();
+
+ // Memory map the entire file
+ let file = std::fs::File::open(&file_to_write)?;
+ let mmap = unsafe { Mmap::map(&file)? };
+ let data = &mmap[..];
+
+ // Split into chunks based on policy
+ let chunking_result = split_into_chunks(&data, &cfg.chunking_policy)?;
+
+ // Store chunks and create index
+ let index_entries = store_chunks(&chunking_result.chunks, &storage_dir).await?;
+
+ // Write index file
+ write_index_file(&index_entries, &output_index_file).await?;
+
+ let duration = start_time.elapsed();
+ info!(
+ "File write completed in {:?}: {}",
+ duration,
+ file_to_write.display()
+ );
+
+ Ok(())
+}
+
+/// Split data into chunks based on the specified policy
+pub fn split_into_chunks(
+ data: &[u8],
+ policy: &ChunkingPolicy,
+) -> Result<ChunkingResult, StorageIOError> {
+ match policy {
+ ChunkingPolicy::Cdc(avg_size) => split_cdc(data, *avg_size),
+ ChunkingPolicy::FixedSize(chunk_size) => split_fixed(data, *chunk_size),
+ ChunkingPolicy::Line => split_by_lines(data),
+ }
+}
+
+/// Split data using Content-Defined Chunking
+fn split_cdc(data: &[u8], avg_size: u32) -> Result<ChunkingResult, StorageIOError> {
+ use crate::store::cdc::split_cdc_impl;
+ split_cdc_impl(data, avg_size)
+}
+
+/// Split data using fixed-size chunking
+fn split_fixed(data: &[u8], chunk_size: u32) -> Result<ChunkingResult, StorageIOError> {
+ use crate::store::fixed::split_fixed_impl;
+ split_fixed_impl(data, chunk_size)
+}
+
+/// Split data by lines
+fn split_by_lines(data: &[u8]) -> Result<ChunkingResult, StorageIOError> {
+ use crate::store::line::split_by_lines_impl;
+ split_by_lines_impl(data)
+}
+
+/// Store chunks in the storage directory and return index entries
+async fn store_chunks(
+ chunks: &[Chunk],
+ storage_dir: &Path,
+) -> Result<Vec<IndexEntry>, StorageIOError> {
+ let mut index_entries = Vec::with_capacity(chunks.len());
+
+ let writed_counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
+ let total_chunks = chunks.len();
+
+ for chunk in chunks {
+ // Create storage directory structure based on hash
+ let hash_str = hex::encode(chunk.hash);
+ let chunk_dir = get_dir(storage_dir.to_path_buf(), hash_str.clone())?;
+
+ // Create directory if it doesn't exist
+ if let Some(parent) = chunk_dir.parent() {
+ fs::create_dir_all(parent).await?;
+ }
+
+ // Write chunk data
+ let chunk_path = chunk_dir.with_extension("chunk");
+ if !chunk_path.exists() {
+ trace!("W: {}", hash_str);
+ fs::write(&chunk_path, &chunk.data).await?;
+ writed_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+ }
+
+ // Add to index
+ index_entries.push(IndexEntry {
+ hash: chunk.hash,
+ size: chunk.data.len() as u32,
+ });
+ }
+
+ let writed_count = writed_counter.load(std::sync::atomic::Ordering::Relaxed);
+ info!(
+ "Chunk storage completed: {}/{} ({}%) chunks written ({} duplicates skipped)",
+ writed_count,
+ total_chunks,
+ (writed_count as f32 / total_chunks as f32) * 100 as f32,
+ total_chunks - writed_count
+ );
+
+ Ok(index_entries)
+}
+
+/// Write index file containing chunk hashes and sizes
+async fn write_index_file(
+ entries: &[IndexEntry],
+ output_path: &Path,
+) -> Result<(), StorageIOError> {
+ let mut index_data = Vec::with_capacity(entries.len() * 36); // 32 bytes hash + 4 bytes size
+
+ for entry in entries {
+ index_data.extend_from_slice(&entry.hash);
+ index_data.extend_from_slice(&entry.size.to_le_bytes());
+ }
+
+ fs::write(output_path, &index_data).await?;
+ Ok(())
+}
+
+/// Build a file from the repository to the specified directory using an index file
+pub async fn build_file(
+ index_to_build: impl Into<PathBuf>,
+ storage_dir: impl Into<PathBuf>,
+ output_file: impl Into<PathBuf>,
+) -> Result<(), StorageIOError> {
+ let (index_to_build, storage_dir, output_file) =
+ precheck(index_to_build, storage_dir, output_file).await?;
+
+ info!(
+ "Starting file build from index: {}",
+ index_to_build.display()
+ );
+ let start_time = Instant::now();
+
+ // Read index file
+ let index_entries = read_index_file(&index_to_build).await?;
+
+ // Reconstruct file from chunks
+ let reconstructed_data = reconstruct_from_chunks(&index_entries, &storage_dir).await?;
+
+ // Write output file
+ fs::write(&output_file, &reconstructed_data).await?;
+
+ let duration = start_time.elapsed();
+ info!(
+ "File build completed in {:?}: {} -> {}",
+ duration,
+ index_to_build.display(),
+ output_file.display()
+ );
+
+ Ok(())
+}
+
+/// Read index file and parse entries
+async fn read_index_file(index_path: &Path) -> Result<Vec<IndexEntry>, StorageIOError> {
+ // Open file and memory map it
+ let file = std::fs::File::open(index_path)?;
+ let mmap = unsafe { Mmap::map(&file)? };
+
+ // Each entry is 36 bytes (32 bytes hash + 4 bytes size)
+ if mmap.len() % 36 != 0 {
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::InvalidData,
+ "Invalid index file format",
+ )
+ .into());
+ }
+
+ let num_entries = mmap.len() / 36;
+ let mut entries = Vec::with_capacity(num_entries);
+
+ for i in 0..num_entries {
+ let start = i * 36;
+ let hash_start = start;
+ let size_start = start + 32;
+
+ let mut hash = [0u8; 32];
+ hash.copy_from_slice(&mmap[hash_start..hash_start + 32]);
+
+ let size = u32::from_le_bytes([
+ mmap[size_start],
+ mmap[size_start + 1],
+ mmap[size_start + 2],
+ mmap[size_start + 3],
+ ]);
+
+ entries.push(IndexEntry { hash, size });
+ }
+
+ Ok(entries)
+}
+
+/// Reconstruct file data from chunks using index entries
+async fn reconstruct_from_chunks(
+ entries: &[IndexEntry],
+ storage_dir: &Path,
+) -> Result<Vec<u8>, StorageIOError> {
+ let mut reconstructed_data = Vec::new();
+
+ for entry in entries {
+ // Get chunk path from hash
+ let hash_str = hex::encode(entry.hash);
+ let chunk_dir = get_dir(storage_dir.to_path_buf(), hash_str.clone())?;
+ let chunk_path = chunk_dir.with_extension("chunk");
+
+ trace!("R: {}", hash_str);
+
+ // Memory map the chunk file
+ let file = std::fs::File::open(&chunk_path)?;
+ let mmap = unsafe { Mmap::map(&file)? };
+
+ // Verify chunk size matches index
+ if mmap.len() != entry.size as usize {
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::InvalidData,
+ format!(
+ "Chunk size mismatch: expected {}, got {}",
+ entry.size,
+ mmap.len()
+ ),
+ )
+ .into());
+ }
+
+ // Verify chunk hash
+ let actual_hash = blake3_digest(&mmap);
+ let expected_hash = entry.hash;
+ if actual_hash != expected_hash {
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::InvalidData,
+ format!(
+ "Chunk hash mismatch: expected {}, got {}",
+ hex::encode(expected_hash),
+ hex::encode(actual_hash)
+ ),
+ )
+ .into());
+ }
+
+ // Append to reconstructed data
+ reconstructed_data.extend_from_slice(&mmap);
+ }
+
+ Ok(reconstructed_data)
+}
+
+/// Calculate Blake3 hash of data
+fn blake3_digest(data: &[u8]) -> [u8; 32] {
+ let hash = blake3::hash(data);
+ *hash.as_bytes()
+}
+
+/// Calculate and return the corresponding storage subdirectory path based on the given storage directory and hash string
+pub fn get_dir(storage_dir: PathBuf, hash_str: String) -> Result<PathBuf, StorageIOError> {
+ if hash_str.len() < 4 {
+ return Err(StorageIOError::HashTooShort);
+ }
+ let dir = storage_dir
+ .join(&hash_str[0..2])
+ .join(&hash_str[2..4])
+ .join(&hash_str);
+ Ok(dir)
+}
+
+/// Calculate Blake3 hash of data and create a Chunk
+pub fn create_chunk(data: Vec<u8>) -> Chunk {
+ let hash = blake3_digest(&data);
+ Chunk { hash, data }
+}
+
+/// Read a file and split it into chunks using the specified policy
+pub async fn chunk_file(
+ file_path: impl Into<PathBuf>,
+ policy: &ChunkingPolicy,
+) -> Result<ChunkingResult, StorageIOError> {
+ let file_path = file_path.into();
+ info!("Starting chunking: {}", file_path.display());
+ let start_time = Instant::now();
+
+ // Memory map the file
+ let file = std::fs::File::open(&file_path)?;
+ let mmap = unsafe { Mmap::map(&file)? };
+ let data = &mmap[..];
+
+ let result = split_into_chunks(data, policy);
+
+ let duration = start_time.elapsed();
+ info!(
+ "Chunking completed in {:?}: {}",
+ duration,
+ file_path.display()
+ );
+
+ result
+}
+
+/// Get chunk statistics
+pub fn get_chunk_stats(chunks: &[Chunk]) -> ChunkStats {
+ let total_chunks = chunks.len();
+ let total_size: usize = chunks.iter().map(|c| c.data.len()).sum();
+ let avg_size = if total_chunks > 0 {
+ total_size / total_chunks
+ } else {
+ 0
+ };
+ let min_size = chunks.iter().map(|c| c.data.len()).min().unwrap_or(0);
+ let max_size = chunks.iter().map(|c| c.data.len()).max().unwrap_or(0);
+
+ ChunkStats {
+ total_chunks,
+ total_size: total_size as u64,
+ avg_size,
+ min_size,
+ max_size,
+ }
+}
+
+/// Statistics about chunks
+#[derive(Debug, Clone)]
+pub struct ChunkStats {
+ pub total_chunks: usize,
+ pub total_size: u64,
+ pub avg_size: usize,
+ pub min_size: usize,
+ pub max_size: usize,
+}
+
+/// Pre-check whether the input file path, directory path, and output path are valid
+pub async fn precheck(
+ input_file: impl Into<PathBuf>,
+ dir: impl Into<PathBuf>,
+ output_file: impl Into<PathBuf>,
+) -> Result<(PathBuf, PathBuf, PathBuf), std::io::Error> {
+ let (input, dir, output) = (input_file.into(), dir.into(), output_file.into());
+
+ // Perform all checks in parallel
+ let (input_metadata_result, dir_metadata_result, output_metadata_result) = tokio::join!(
+ fs::metadata(&input),
+ fs::metadata(&dir),
+ fs::metadata(&output)
+ );
+
+ // Check if input file exists
+ let input_metadata = match input_metadata_result {
+ Ok(metadata) => metadata,
+ Err(_) => {
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::NotFound,
+ format!("Input file not found: {}", input.display()),
+ ));
+ }
+ };
+
+ // Check if directory exists
+ let dir_metadata = match dir_metadata_result {
+ Ok(metadata) => metadata,
+ Err(_) => {
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::NotFound,
+ format!("Directory not found: {}", dir.display()),
+ ));
+ }
+ };
+
+ // Check if output file already exists
+ if output_metadata_result.is_ok() {
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::AlreadyExists,
+ format!("Output file already exist: {}", output.display()),
+ ));
+ }
+
+ // Check if input is a file
+ if !input_metadata.is_file() {
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::InvalidInput,
+ format!("Input path is not a file: {}", input.display()),
+ ));
+ }
+
+ // Check if dir is a directory
+ if !dir_metadata.is_dir() {
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::NotADirectory,
+ format!("Input path is not a directory: {}", dir.display()),
+ ));
+ }
+
+ Ok((input, dir, output))
+}