diff options
Diffstat (limited to 'src/chunker')
| -rw-r--r-- | src/chunker/constants.rs | 3 | ||||
| -rw-r--r-- | src/chunker/context.rs | 226 | ||||
| -rw-r--r-- | src/chunker/entry.rs | 39 | ||||
| -rw-r--r-- | src/chunker/rw.rs | 2 | ||||
| -rw-r--r-- | src/chunker/rw/error.rs | 61 | ||||
| -rw-r--r-- | src/chunker/rw/storage.rs | 88 | ||||
| -rw-r--r-- | src/chunker/rw/storage/build.rs | 250 | ||||
| -rw-r--r-- | src/chunker/rw/storage/write.rs | 118 | ||||
| -rw-r--r-- | src/chunker/rw/storage/write/simple.rs | 368 | ||||
| -rw-r--r-- | src/chunker/rw/storage/write/stream.rs | 12 |
10 files changed, 1167 insertions, 0 deletions
diff --git a/src/chunker/constants.rs b/src/chunker/constants.rs new file mode 100644 index 0000000..5e4870e --- /dev/null +++ b/src/chunker/constants.rs @@ -0,0 +1,3 @@ +pub const BUTCK_STORAGE_DIR_NAME: &str = ".butck"; +pub const BUTCK_INDEX_FILE_SUFFIX: &str = "bidx"; +pub const BUTCK_INDEX_MAGIC: [u8; 4] = [0xfe, 0xe1, 0xf0, 0x0d]; diff --git a/src/chunker/context.rs b/src/chunker/context.rs new file mode 100644 index 0000000..79254f5 --- /dev/null +++ b/src/chunker/context.rs @@ -0,0 +1,226 @@ +use std::{collections::HashMap, env::current_dir, path::PathBuf, process::exit, str::FromStr}; + +use log::{error, warn}; + +use crate::{ + chunker::constants::BUTCK_STORAGE_DIR_NAME, core::hash::ChunkWriteHash, special_argument, + special_flag, utils::file_input_solve::parse_path_input, +}; + +#[derive(Debug, Default)] +pub struct ButckContext { + /// All input files + pub file_paths: Vec<PathBuf>, + + /// Path of Butck Storage + pub storage_path: Option<PathBuf>, + + // Display chunk boundaries + pub display_boundaries: bool, + + /// Whether to read in stream mode + pub stream_read: Option<u32>, + + /// Whether to read files using memory mapping + pub memmap_read: bool, + + /// Register name + pub register_name: Option<String>, + + /// Chunking policy name + pub policy_name: Option<String>, + + /// Hash algorithm used for chunking + pub chunk_hash: ChunkWriteHash, + + /// Output directory + pub output_dir: PathBuf, + + /// Output file (not available for some commands) + pub output_file: Option<PathBuf>, + + /// Override parameters + pub params: HashMap<String, String>, +} + +impl ButckContext { + /// Apply the args of ChunkerContext to itself + pub fn from_args(mut args: Vec<String>) -> Self { + let mut ctx = ButckContext::default(); + let recursive = ctx.read_recursive(&mut args); + ctx.apply_stream_read(&mut args); + ctx.apply_memmap_read(&mut args); + ctx.apply_register_name(&mut args); + ctx.apply_policy_name(&mut args); + ctx.apply_chunk_hash(&mut args); + ctx.apply_storage_dir(&mut args); + ctx.apply_output_paths(&mut args); + ctx.apply_params(&mut args); + ctx.apply_display_boundaries(&mut args); + + // Finally, parse path input + ctx.file_paths = parse_path_input(args, recursive, vec![BUTCK_STORAGE_DIR_NAME]); + ctx + } + + fn read_recursive(&mut self, args: &mut Vec<String>) -> bool { + special_flag!(args, "-r", "--recursive") + } + + fn apply_stream_read(&mut self, args: &mut Vec<String>) { + if let Some(size_str) = special_argument!(args, "-S", "--stream-read") + && let Ok(size) = size_str.parse::<u32>() { + self.stream_read = Some(size); + } + } + + fn apply_memmap_read(&mut self, args: &mut Vec<String>) -> bool { + special_flag!(args, "-m", "--memmap-read") + } + + fn apply_register_name(&mut self, args: &mut Vec<String>) { + self.register_name = special_argument!(args, "-R", "--register"); + } + + fn apply_policy_name(&mut self, args: &mut Vec<String>) { + self.policy_name = special_argument!(args, "-p", "--policy"); + } + + fn apply_chunk_hash(&mut self, args: &mut Vec<String>) { + let chunk_hash_str = special_argument!(args, "-H", "--chunk-hash"); + self.chunk_hash = match chunk_hash_str { + Some(ref s) => match s.as_str() { + "blake3" => ChunkWriteHash::Blake3, + "sha256" => ChunkWriteHash::Sha256, + _ => ChunkWriteHash::default(), + }, + None => ChunkWriteHash::default(), + }; + } + + fn apply_output_paths(&mut self, args: &mut Vec<String>) { + let output_dir_str = special_argument!(args, "-o", "--output-dir"); + let output_file_str = special_argument!(args, "-O", "--output-file"); + + let current_dir = current_dir().unwrap(); + + let output_dir = if let Some(output_dir_str) = output_dir_str { + let path = PathBuf::from(output_dir_str); + if path.exists() { Some(path) } else { None } + } else { + None + }; + + self.output_dir = if let Some(output_dir) = output_dir { + output_dir + } else if let Some(storage_path) = &self.storage_path { + storage_path.clone() + } else { + current_dir + }; + + self.output_file = output_file_str.map(PathBuf::from) + } + + fn apply_params(&mut self, args: &mut Vec<String>) { + while let Some(arg) = special_argument!(args, "+p", "+param") { + let split = arg.split('=').collect::<Vec<&str>>(); + if split.len() == 2 { + self.params + .insert(split[0].to_string(), split[1].to_string()); + } + } + } + + fn apply_storage_dir(&mut self, args: &mut Vec<String>) { + self.storage_path = { + let storage_override = match special_argument!(args, "-s", "--storage") { + Some(o) => { + let path = PathBuf::from_str(o.as_str()); + if let Ok(p) = &path { + Self::init_butck_storage(p.clone()); + } + path.ok() + } + None => None, + }; + Self::find_butck_storage_dir(storage_override) + }; + } + + fn apply_display_boundaries(&mut self, args: &mut Vec<String>) { + self.display_boundaries = special_flag!(args, "-D", "--display-boundaries"); + } + + fn init_butck_storage(path: PathBuf) -> Option<PathBuf> { + if !path.exists() { + // If the path does not exist, create it and initialize Butck Storage here + if let Err(e) = std::fs::create_dir_all(&path) { + error!("Failed to create directory '{}': {}", path.display(), e); + exit(1); + } + let butck_dir = path.join(BUTCK_STORAGE_DIR_NAME); + if let Err(e) = std::fs::create_dir_all(&butck_dir) { + error!( + "Failed to create '{}' directory: {}", + BUTCK_STORAGE_DIR_NAME, e + ); + exit(1); + } + Some(path) + } else { + let butck_dir = path.join(BUTCK_STORAGE_DIR_NAME); + + // Check if Butck Storage already exists + if butck_dir.exists() { + // Butck Storage already exists, return the path + Some(path) + } else { + // Butck Storage doesn't exist, create it with a warning if directory is not empty + let is_empty = path + .read_dir() + .map(|mut entries| entries.next().is_none()) + .unwrap_or(false); + + if !is_empty { + // Warn about creating storage in non-empty directory + warn!( + "Creating '{}' storage in non-empty directory: {}", + BUTCK_STORAGE_DIR_NAME, + path.display() + ); + } + + // Create Butck Storage directory + if let Err(e) = std::fs::create_dir_all(&butck_dir) { + error!( + "Failed to create '{}' directory: {}", + BUTCK_STORAGE_DIR_NAME, e + ); + exit(1); + } + Some(path) + } + } + } + + // Get the ButckStorage directory based on context + fn find_butck_storage_dir(from: Option<PathBuf>) -> Option<PathBuf> { + let mut current_dir = match from { + Some(path) => path, + None => std::env::current_dir().ok()?, + }; + + loop { + let butck_dir = current_dir.join(BUTCK_STORAGE_DIR_NAME); + if butck_dir.is_dir() { + return Some(current_dir); + } + + if !current_dir.pop() { + break; + } + } + None + } +} diff --git a/src/chunker/entry.rs b/src/chunker/entry.rs new file mode 100644 index 0000000..4fdb1f8 --- /dev/null +++ b/src/chunker/entry.rs @@ -0,0 +1,39 @@ +use std::process::exit; + +use log::info; + +use crate::chunker::{ + context::ButckContext, + rw::{self, error::ButckRWError}, +}; + +pub async fn entry(ctx: ButckContext, args: Vec<String>) -> Result<(), ButckRWError> { + if let Some(subcommand) = args.first() { + return match subcommand.as_str() { + "write" => rw::storage::write(ctx).await, + "build" => rw::storage::build(ctx).await, + "policies" => { + butck_policies::policies() + .iter() + .for_each(|p| info!("{}", p)); + return Ok(()); + } + _ => { + print_help(); + exit(1) + } + }; + } + Ok(()) +} + +pub fn print_help() { + println!("{}", include_str!("../../resources/helps/butck.txt").trim()); +} + +pub fn print_version() { + println!( + "{}", + include_str!("../../resources/version_info.txt").trim() + ); +} diff --git a/src/chunker/rw.rs b/src/chunker/rw.rs new file mode 100644 index 0000000..85e734e --- /dev/null +++ b/src/chunker/rw.rs @@ -0,0 +1,2 @@ +pub mod error; +pub mod storage; diff --git a/src/chunker/rw/error.rs b/src/chunker/rw/error.rs new file mode 100644 index 0000000..7f263a5 --- /dev/null +++ b/src/chunker/rw/error.rs @@ -0,0 +1,61 @@ +use butck_policies::error::ChunkFailed; + +use crate::chunker::context::ButckContext; + +#[derive(Debug)] +pub struct ButckRWError { + kind: ButckRWErrorKind, + ctx: ButckContext, +} + +#[derive(thiserror::Error, Debug)] +pub enum ButckRWErrorKind { + #[error("No butck storage found")] + NoButckStorageFound, + + #[error("Chunking policy not specified")] + ChunkingPolicyNotSpecified, + + #[error("Cannot enable both MemmapRead and StreamRead")] + ReadingMethodAmbiguous, + + #[error("Multiple input files specified but only one output file allowed")] + OutputCountMismatch, + + #[error("Invalid bidx file format")] + InvalidBidxFormat, + + #[error("Chunk not found in storage: {0}")] + ChunkNotFound(String), + + #[error("Failed to rebuild file: {0}")] + RebuildFailed(String), + + #[error("Chunking failed: {0}")] + ChunkFailed(#[from] ChunkFailed), + + #[error("IO error: {0}")] + IOError(#[from] std::io::Error), +} + +impl std::fmt::Display for ButckRWError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.kind) + } +} + +impl ButckRWError { + pub fn ctx(&self) -> &ButckContext { + &self.ctx + } + + pub fn kind(&self) -> &ButckRWErrorKind { + &self.kind + } +} + +impl ButckRWErrorKind { + pub fn pack(self, ctx: ButckContext) -> ButckRWError { + ButckRWError { kind: self, ctx } + } +} diff --git a/src/chunker/rw/storage.rs b/src/chunker/rw/storage.rs new file mode 100644 index 0000000..13452d0 --- /dev/null +++ b/src/chunker/rw/storage.rs @@ -0,0 +1,88 @@ +pub mod build; +pub mod write; + +pub use build::build; +pub use write::write; + +use std::path::{Path, PathBuf}; + +/// Information about a chunk for index file +#[derive(Debug, Clone)] +pub struct ChunkInfo { + /// Index of the chunk in the file + pub index: usize, + /// Hash of the chunk (hex string) + pub hash: String, + /// Size of the chunk in bytes + pub size: usize, + /// Start position in the original file + pub start: usize, + /// End position in the original file (exclusive) + pub end: usize, +} + +/// 根据hash值计算chunk文件的存储路径 +/// +/// # 参数 +/// - `storage_dir`: 存储目录 +/// - `hash_hex`: chunk的hash值(16进制字符串) +/// +/// # 返回 +/// 返回chunk文件的完整路径 +pub fn get_chunk_path(storage_dir: &Path, hash_hex: &str) -> PathBuf { + let first_slice = &hash_hex[0..2]; + let second_slice = &hash_hex[2..4]; + storage_dir + .join(first_slice) + .join(second_slice) + .join(hash_hex) +} + +/// 根据hash字节数组计算chunk文件的存储路径 +/// +/// # 参数 +/// - `storage_dir`: 存储目录 +/// - `hash_bytes`: chunk的hash值(字节数组) +/// +/// # 返回 +/// 返回chunk文件的完整路径 +pub fn get_chunk_path_from_bytes(storage_dir: &Path, hash_bytes: &[u8; 32]) -> PathBuf { + let hash_hex = hex::encode(hash_bytes); + get_chunk_path(storage_dir, &hash_hex) +} + +/// 生成唯一的文件路径,如果文件已存在则添加数字后缀 +/// +/// # 参数 +/// - `output_dir`: 输出目录 +/// - `desired_filename`: 期望的文件名 +/// +/// # 返回 +/// 返回唯一的文件路径 +pub fn generate_unique_path(output_dir: &Path, desired_filename: &str) -> PathBuf { + let desired_path = output_dir.join(desired_filename); + let mut candidate = desired_path.clone(); + + let mut counter = 1; + while candidate.exists() { + let path_buf = PathBuf::from(desired_filename); + if let Some(stem) = path_buf.file_stem() { + if let Some(ext) = path_buf.extension() { + let ext_str = ext.to_string_lossy(); + let new_name = if ext_str.is_empty() { + format!("{}_{}", stem.to_string_lossy(), counter) + } else { + format!("{}.{}_{}", stem.to_string_lossy(), ext_str, counter) + }; + candidate = output_dir.join(new_name); + } else { + candidate = output_dir.join(format!("{}_{}", stem.to_string_lossy(), counter)); + } + } else { + candidate = output_dir.join(format!("{}_{}", desired_filename, counter)); + } + counter += 1; + } + + candidate +} diff --git a/src/chunker/rw/storage/build.rs b/src/chunker/rw/storage/build.rs new file mode 100644 index 0000000..7608b5c --- /dev/null +++ b/src/chunker/rw/storage/build.rs @@ -0,0 +1,250 @@ +use futures::future::join_all; +use just_progress::progress; +use log::{error, info, trace}; +use memmap2::Mmap; +use std::path::PathBuf; +use tokio::{fs::File, io::AsyncWriteExt}; + +use crate::{ + chunker::{ + constants::{BUTCK_INDEX_FILE_SUFFIX, BUTCK_INDEX_MAGIC}, + context::ButckContext, + rw::error::{ButckRWError, ButckRWErrorKind}, + rw::storage, + }, + utils::size_display::size_display, +}; + +pub async fn build(ctx: ButckContext) -> Result<(), ButckRWError> { + if ctx.storage_path.is_none() { + return Err(ButckRWErrorKind::NoButckStorageFound.pack(ctx)); + } + if ctx.file_paths.is_empty() { + return Err( + ButckRWErrorKind::RebuildFailed("No bidx files specified".to_string()).pack(ctx), + ); + } + + let tasks: Vec<_> = ctx + .file_paths + .iter() + .map(|bidx_path| async { + trace!( + "Preparing to rebuild from bidx file `{}`", + bidx_path.display() + ); + rebuild_from_bidx(bidx_path, &ctx).await + }) + .collect(); + + let results = join_all(tasks).await; + + for result in results { + if let Err(e) = result { + return Err(e.pack(ctx)); + } + } + + Ok(()) +} + +async fn rebuild_from_bidx( + bidx_path: &PathBuf, + ctx: &ButckContext, +) -> Result<(), ButckRWErrorKind> { + // Validate file extension + if let Some(ext) = bidx_path.extension() + && ext != BUTCK_INDEX_FILE_SUFFIX + { + return Err(ButckRWErrorKind::InvalidBidxFormat); + } + + info!("Rebuilding from bidx file: {}", bidx_path.display()); + + // Read bidx file content + let bidx_content = if ctx.memmap_read { + let file = File::open(bidx_path).await?; + let mmap = unsafe { Mmap::map(&file)? }; + mmap.to_vec() + } else { + tokio::fs::read(bidx_path).await? + }; + + // Verify file size includes at least the header + if bidx_content.len() < 6 { + return Err(ButckRWErrorKind::InvalidBidxFormat); + } + + // Validate MAGIC bytes + if bidx_content[0..4] != BUTCK_INDEX_MAGIC { + return Err(ButckRWErrorKind::InvalidBidxFormat); + } + + // Read filename + let filename_len = u16::from_le_bytes([bidx_content[4], bidx_content[5]]) as usize; + if bidx_content.len() < 6 + filename_len { + return Err(ButckRWErrorKind::InvalidBidxFormat); + } + let filename_bytes = &bidx_content[6..6 + filename_len]; + let original_filename = String::from_utf8(filename_bytes.to_vec()) + .map_err(|_| ButckRWErrorKind::InvalidBidxFormat)?; + + trace!("Original filename from bidx: {}", original_filename); + + let hash_data_start = 6 + filename_len; + let hash_data = &bidx_content[hash_data_start..]; + + // Verify that hash data size is a multiple of 32 bytes + if hash_data.len() % 32 != 0 { + return Err(ButckRWErrorKind::InvalidBidxFormat); + } + + let chunk_count = hash_data.len() / 32; + info!("Found {} chunks in bidx file", chunk_count); + + let mut chunk_hashes = Vec::with_capacity(chunk_count); + for i in 0..chunk_count { + let start = i * 32; + let end = start + 32; + let hash_bytes: [u8; 32] = hash_data[start..end] + .try_into() + .map_err(|_| ButckRWErrorKind::InvalidBidxFormat)?; + chunk_hashes.push(hash_bytes); + } + + trace!("Parsed {} chunk hashes", chunk_hashes.len()); + + // Determine output file path + let output_path = if let Some(output_file) = &ctx.output_file { + output_file.clone() + } else { + // Use the original filename read from the bidx file + storage::generate_unique_path(&ctx.output_dir, &original_filename) + }; + + info!("Rebuilding file to: {}", output_path.display()); + + let progress_name = format!("Rebuild `{}`", output_path.display()); + progress::update_progress(progress_name.as_str(), 0.0); + let step = 1.0 / chunk_count as f64; + + let mut tasks = Vec::with_capacity(chunk_count); + + for (index, hash_bytes) in chunk_hashes.iter().enumerate() { + let hash_hex = hex::encode(hash_bytes); + tasks.push(read_chunk( + progress_name.as_str(), + step, + hash_hex, + &ctx.output_dir, + index, + )); + } + + trace!("Starting parallel read of {} chunks", tasks.len()); + let results = join_all(tasks).await; + trace!("All read tasks completed"); + + // Collect chunk data and verify order + let mut chunk_data_list = Vec::with_capacity(chunk_count); + let mut success_count = 0; + + for (index, result) in results.into_iter().enumerate() { + match result { + Ok(chunk_data) => { + let chunk_size = chunk_data.len(); + success_count += 1; + chunk_data_list.push((index, chunk_data)); + trace!( + "Chunk {} read successfully, size: {} bytes", + index, chunk_size + ); + } + Err(e) => { + error!("Failed to read chunk {}: {:?}", index, e); + return Err(e); + } + } + } + + if success_count != chunk_count { + return Err(ButckRWErrorKind::ChunkNotFound(format!( + "Only {}/{} chunks found in storage", + success_count, chunk_count + ))); + } + + info!("All {} chunks read successfully", success_count); + + // Sort by index and concatenate files + chunk_data_list.sort_by_key(|(index, _)| *index); + + // Calculate total size + let total_size: usize = chunk_data_list.iter().map(|(_, data)| data.len()).sum(); + let (total_value, total_unit) = size_display(total_size); + info!( + "Rebuilding file: {} chunks, total size: {:.2} {} ({} bytes)", + chunk_count, total_value, total_unit, total_size + ); + + // Write to output file + trace!("Writing to output file: {}", output_path.display()); + let mut output_file = File::create(&output_path).await?; + + for (index, chunk_data) in chunk_data_list { + trace!("Writing chunk {} ({} bytes)", index, chunk_data.len()); + output_file.write_all(&chunk_data).await?; + progress::increase(progress_name.as_str(), step as f32); + } + + output_file.flush().await?; + + info!("File successfully rebuilt: {}", output_path.display()); + progress::complete(progress_name.as_str()); + + Ok(()) +} + +/// Read a single chunk from storage +async fn read_chunk( + progress_name: &str, + step: f64, + hash_hex: String, + storage_dir: &PathBuf, + chunk_index: usize, +) -> Result<Vec<u8>, ButckRWErrorKind> { + trace!("read_chunk[{}]: Starting, hash: {}", chunk_index, hash_hex); + + // Build chunk file path + let file_path = storage::get_chunk_path(storage_dir, &hash_hex); + + trace!( + "read_chunk[{}]: Looking for file at: {}", + chunk_index, + file_path.display() + ); + + // Read chunk file + match tokio::fs::read(&file_path).await { + Ok(data) => { + trace!( + "read_chunk[{}]: Read {} bytes successfully", + chunk_index, + data.len() + ); + progress::increase(progress_name, step as f32); + Ok(data) + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + trace!("read_chunk[{}]: File not found", chunk_index); + Err(ButckRWErrorKind::ChunkNotFound(format!( + "Chunk {} (hash: {}) not found in storage", + chunk_index, hash_hex + ))) + } + Err(e) => { + trace!("read_chunk[{}]: Read failed: {:?}", chunk_index, e); + Err(ButckRWErrorKind::IOError(e)) + } + } +} diff --git a/src/chunker/rw/storage/write.rs b/src/chunker/rw/storage/write.rs new file mode 100644 index 0000000..8b3acc7 --- /dev/null +++ b/src/chunker/rw/storage/write.rs @@ -0,0 +1,118 @@ +use std::{collections::HashMap, path::PathBuf}; + +use log::trace; + +use crate::{ + chunker::{ + constants::BUTCK_INDEX_FILE_SUFFIX, + context::ButckContext, + rw::{ + error::{ButckRWError, ButckRWErrorKind}, + storage::generate_unique_path, + }, + }, + storage::{simple::write_file_simple, stream::write_file_stream}, +}; + +pub mod simple; +pub mod stream; + +pub async fn write(ctx: ButckContext) -> Result<(), ButckRWError> { + if ctx.storage_path.is_none() { + return Err(ButckRWErrorKind::NoButckStorageFound.pack(ctx)); + } + if ctx.policy_name.is_none() { + return Err(ButckRWErrorKind::ChunkingPolicyNotSpecified.pack(ctx)); + } + if ctx.file_paths.len() > 1 && ctx.output_file.is_some() { + return Err(ButckRWErrorKind::OutputCountMismatch.pack(ctx)); + } + + // Cannot enable both memory-mapped and stream reading simultaneously. + // Stream reading uses butck_policies::chunk_stream_with, + // while memory-mapped or default reading uses butck_policies::chunk_with. + if ctx.memmap_read && ctx.stream_read.is_some() { + return Err(ButckRWErrorKind::ReadingMethodAmbiguous.pack(ctx)); + } + + let param_refs: HashMap<&str, &str> = ctx + .params + .iter() + .map(|(k, v)| (k.as_str(), v.as_str())) + .collect(); + + let tasks: Vec<_> = ctx + .file_paths + .iter() + .map(|path| async { + trace!("Preparing to write file `{}`", path.display()); + write_file(path, &ctx, ¶m_refs).await + }) + .collect(); + + let results = futures::future::join_all(tasks).await; + + for result in results { + if let Err(e) = result { + return Err(e.pack(ctx)); + } + } + + Ok(()) +} + +async fn write_file( + path: &PathBuf, + ctx: &ButckContext, + params: &HashMap<&str, &str>, +) -> Result<(), ButckRWErrorKind> { + if let Some(stream_read_size) = ctx.stream_read { + write_file_stream(path, stream_read_size, ctx, params).await + } else { + write_file_simple(path, ctx, params).await + } +} + +pub fn get_index_file_name(path: &PathBuf, ctx: &ButckContext) -> PathBuf { + let output_file = if let Some(output_file) = &ctx.output_file { + return output_file.clone(); + } else { + ctx.output_dir.join(path.file_name().unwrap_or_default()) + }; + + // Append .bidx suffix directly to the original file name + let desired_filename = if let Some(ext) = output_file.extension() { + let ext_str = ext.to_string_lossy(); + if ext_str.is_empty() { + format!( + "{}.{}", + output_file + .file_stem() + .unwrap_or_default() + .to_string_lossy(), + BUTCK_INDEX_FILE_SUFFIX + ) + } else { + format!( + "{}.{}.{}", + output_file + .file_stem() + .unwrap_or_default() + .to_string_lossy(), + ext_str, + BUTCK_INDEX_FILE_SUFFIX + ) + } + } else { + format!( + "{}.{}", + output_file + .file_name() + .unwrap_or_default() + .to_string_lossy(), + BUTCK_INDEX_FILE_SUFFIX + ) + }; + + generate_unique_path(&ctx.output_dir, &desired_filename) +} diff --git a/src/chunker/rw/storage/write/simple.rs b/src/chunker/rw/storage/write/simple.rs new file mode 100644 index 0000000..75b9bd7 --- /dev/null +++ b/src/chunker/rw/storage/write/simple.rs @@ -0,0 +1,368 @@ +use futures::future::join_all; +use just_progress::progress; +use log::{error, info, trace}; +use std::{collections::HashMap, path::PathBuf}; +use tokio::{fs::File, io::AsyncReadExt}; + +use crate::{ + chunker::{ + context::ButckContext, + rw::{error::ButckRWErrorKind, storage}, + }, + core::hash::ChunkWriteHash, + storage::get_index_file_name, + utils::size_display::size_display, +}; + +pub async fn write_file_simple( + path: &PathBuf, + ctx: &ButckContext, + params: &HashMap<&str, &str>, +) -> Result<(), ButckRWErrorKind> { + read_file(path, ctx, params).await?; + Ok(()) +} + +async fn read_file( + path: &PathBuf, + ctx: &ButckContext, + params: &HashMap<&str, &str>, +) -> Result<(), ButckRWErrorKind> { + let mut file = File::open(path).await?; + + // Use butck_policies::chunk_with to locate chunk boundaries in the file + if ctx.memmap_read { + let mmap = unsafe { memmap2::Mmap::map(&file)? }; + let raw_data = &mmap[..]; + let (chunk_boundaries, total_bytes) = + (get_boundaries(raw_data, ctx, params).await?, raw_data.len()); + + // If output boundaries, do not execute actual write logic + if ctx.display_boundaries { + display_boundaries(&chunk_boundaries, total_bytes).await; + return Ok(()); + } else { + write_file_to_storage(path, ctx, chunk_boundaries, raw_data).await?; + } + } else { + let mut contents = Vec::new(); + file.read_to_end(&mut contents).await?; + let raw_data = &contents[..]; + let (chunk_boundaries, total_bytes) = + (get_boundaries(raw_data, ctx, params).await?, raw_data.len()); + + // If output boundaries, do not execute actual write logic + if ctx.display_boundaries { + display_boundaries(&chunk_boundaries, total_bytes).await; + return Ok(()); + } else { + write_file_to_storage(path, ctx, chunk_boundaries, raw_data).await?; + } + }; + progress::clear_all(); + Ok(()) +} + +async fn write_file_to_storage( + path: &PathBuf, + ctx: &ButckContext, + chunk_boundaries: Vec<u32>, + raw_data: &[u8], +) -> Result<(), ButckRWErrorKind> { + let output_index_file = get_index_file_name(path, ctx); + + let chunk_count = chunk_boundaries.len() + 1; + let progress_name = format!("Write `{}`", path.display()); + + progress::update_progress(progress_name.as_str(), 0.0); + let step = 1.0 / chunk_count as f64; + + trace!("chunks_count={}", chunk_count); + trace!("chunk_hash={:?}", ctx.chunk_hash); + trace!("file_size={}", raw_data.len()); + trace!("output_index_file={}", output_index_file.display()); + trace!("policy_name={:?}", ctx.policy_name); + trace!("storage_dir={}", ctx.output_dir.display()); + + info!( + "{} chunks will be written to {}", + chunk_count, + ctx.output_dir.display() + ); + + tokio::fs::create_dir_all(&ctx.output_dir).await?; + trace!("Output directory created or already exists"); + + let mut tasks = Vec::new(); + let mut start = 0; + let mut chunk_index = 0; + + trace!("Processing chunk boundaries:"); + + for &boundary in &chunk_boundaries { + let end = boundary as usize; + if start < end && end <= raw_data.len() { + let chunk_data = &raw_data[start..end]; + trace!( + "Chunk {}: bytes {}..{} (size: {} bytes)", + chunk_index, + start, + end - 1, + end - start + ); + tasks.push(write_chunk( + progress_name.as_str(), + step, + chunk_data, + &ctx.output_dir, + &ctx.chunk_hash, + chunk_index, + start, + end, + )); + chunk_index += 1; + } else { + trace!( + "Skipping invalid chunk boundary: start={}, end={}, data_len={}", + start, + end, + raw_data.len() + ); + } + start = end; + } + + if start < raw_data.len() { + let chunk_data = &raw_data[start..]; + trace!( + "Chunk {}: bytes {}..{} (size: {} bytes) - final chunk", + chunk_index, + start, + raw_data.len() - 1, + raw_data.len() - start + ); + tasks.push(write_chunk( + progress_name.as_str(), + step, + chunk_data, + &ctx.output_dir, + &ctx.chunk_hash, + chunk_index, + start, + raw_data.len(), + )); + } + + trace!("Total chunks prepared for writing: {}", tasks.len()); + + trace!("Starting parallel write of {} chunks", tasks.len()); + let results = join_all(tasks).await; + trace!("All write tasks completed"); + + let mut success_count = 0; + let mut chunk_infos = Vec::new(); + + for result in results { + match result { + Ok(chunk_info) => { + success_count += 1; + chunk_infos.push(chunk_info); + } + Err(e) => { + trace!("Chunk write failed: {:?}", e); + return Err(e); + } + } + } + + info!("All {} chunks written successfully", success_count); + + // Write index file + trace!("Writing index file to: {}", output_index_file.display()); + if let Err(e) = write_index_file(&output_index_file, &chunk_infos, path).await { + error!("Failed to write index file: {}", e); + return Err(ButckRWErrorKind::IOError(e)); + } + info!("Index file written to: {}", output_index_file.display()); + + trace!("write_file_to_storage completed successfully"); + + progress::complete(progress_name.as_str()); + + Ok(()) +} + +async fn write_chunk( + progress_name: &str, + step: f64, + chunk_data: &[u8], + output_dir: &PathBuf, + chunk_hash: &ChunkWriteHash, + chunk_index: usize, + start: usize, + end: usize, +) -> Result<crate::chunker::rw::storage::ChunkInfo, ButckRWErrorKind> { + trace!( + "write_chunk[{}]: Starting, data size: {} bytes", + chunk_index, + chunk_data.len() + ); + + trace!( + "write_chunk[{}]: Computing hash with algorithm: {:?}", + chunk_index, chunk_hash + ); + let hash_bytes = chunk_hash.hash(chunk_data); + trace!( + "write_chunk[{}]: Hash computed: {:?}", + chunk_index, hash_bytes + ); + + let hash_hex = hex::encode(hash_bytes); + trace!("write_chunk[{}]: Hash hex: {}", chunk_index, hash_hex); + + let file_path = storage::get_chunk_path(output_dir, &hash_hex); + + if let Some(parent_dir) = file_path.parent() { + trace!( + "write_chunk[{}]: Creating directory structure: {}", + chunk_index, + parent_dir.display() + ); + tokio::fs::create_dir_all(parent_dir).await?; + trace!("write_chunk[{}]: Directory created", chunk_index); + } + + trace!( + "write_chunk[{}]: File path: {}", + chunk_index, + file_path.display() + ); + + trace!( + "write_chunk[{}]: Writing {} bytes to file", + chunk_index, + chunk_data.len() + ); + if !file_path.exists() { + tokio::fs::write(&file_path, chunk_data).await?; + } else { + trace!( + "write_chunk[{}]: File already exists, skipping", + chunk_index + ); + } + trace!("write_chunk[{}]: File written successfully", chunk_index); + progress::increase(progress_name, step as f32); + Ok(crate::chunker::rw::storage::ChunkInfo { + index: chunk_index, + hash: hash_hex, + size: chunk_data.len(), + start, + end, + }) +} + +async fn get_boundaries<'a>( + raw_data: &[u8], + ctx: &ButckContext, + params: &HashMap<&str, &str>, +) -> Result<Vec<u32>, ButckRWErrorKind> { + let policy_name = ctx.policy_name.as_ref().unwrap().as_str(); + match butck_policies::chunk_with(policy_name, raw_data, params).await { + Ok(s) => Ok(s), + Err(e) => Err(ButckRWErrorKind::ChunkFailed(e)), + } +} + +async fn write_index_file( + index_path: &PathBuf, + chunk_infos: &[crate::chunker::rw::storage::ChunkInfo], + original_file_path: &PathBuf, +) -> Result<(), std::io::Error> { + use std::io::Write; + + let file = std::fs::File::create(index_path)?; + let mut writer = std::io::BufWriter::new(file); + + // Write header: [u8; 4] magic + [u16] filename length + [u8] filename bytes + use crate::chunker::constants::BUTCK_INDEX_MAGIC; + + // Write magic bytes + writer.write_all(&BUTCK_INDEX_MAGIC)?; + + // Get original filename as bytes + let filename = original_file_path + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or("unknown"); + let filename_bytes = filename.as_bytes(); + + // Write filename length as u16 (little-endian) + if filename_bytes.len() > u16::MAX as usize { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + format!("Filename too long: {} bytes", filename_bytes.len()), + )); + } + let filename_len = filename_bytes.len() as u16; + writer.write_all(&filename_len.to_le_bytes())?; + + // Write filename bytes + writer.write_all(filename_bytes)?; + + // Write chunk hashes: [u8; 32][u8; 32][u8; 32]... + for chunk_info in chunk_infos { + // Convert hex hash to bytes + match hex::decode(&chunk_info.hash) { + Ok(hash_bytes) => { + if hash_bytes.len() == 32 { + writer.write_all(&hash_bytes)?; + } else { + // Pad or truncate to 32 bytes if needed + let mut fixed_hash = [0u8; 32]; + let len = hash_bytes.len().min(32); + fixed_hash[..len].copy_from_slice(&hash_bytes[..len]); + writer.write_all(&fixed_hash)?; + } + } + Err(e) => { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("Failed to decode hash hex: {}", e), + )); + } + } + } + + Ok(()) +} + +async fn display_boundaries(chunk_boundaries: &Vec<u32>, total_bytes: usize) { + let total_chunks = chunk_boundaries.len() + 1; + let (total_value, total_unit) = size_display(total_bytes); + info!( + "{} chunks, ({:.2} {}, {})", + total_chunks, total_value, total_unit, total_bytes + ); + let mut start = 0; + chunk_boundaries.iter().for_each(|p| { + let next = *p as usize; + let (size_value, size_unit) = size_display(next - start); + info!( + "{} - {} (size: {:.2} {})", + start, + next - 1, + size_value, + size_unit + ); + start = next; + }); + let last = start; + let r#final = total_bytes; + let (size_value, size_unit) = size_display(total_bytes - start); + info!( + "{} - {} (size: {:.2} {})", + last, r#final, size_value, size_unit + ); +} diff --git a/src/chunker/rw/storage/write/stream.rs b/src/chunker/rw/storage/write/stream.rs new file mode 100644 index 0000000..020cfcd --- /dev/null +++ b/src/chunker/rw/storage/write/stream.rs @@ -0,0 +1,12 @@ +use std::{collections::HashMap, path::PathBuf}; + +use crate::chunker::{context::ButckContext, rw::error::ButckRWErrorKind}; + +pub async fn write_file_stream( + path: &PathBuf, + stream_read_size: u32, + ctx: &ButckContext, + params: &HashMap<&str, &str>, +) -> Result<(), ButckRWErrorKind> { + todo!() +} |
