diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/bin/butck.rs | 8 | ||||
| -rw-r--r-- | src/bin/butckrepo-guide.rs | 13 | ||||
| -rw-r--r-- | src/bin/butckrepo-refresh.rs | 32 | ||||
| -rw-r--r-- | src/chunker.rs | 1 | ||||
| -rw-r--r-- | src/chunker/constants.rs | 2 | ||||
| -rw-r--r-- | src/chunker/context.rs | 12 | ||||
| -rw-r--r-- | src/chunker/entry.rs | 39 | ||||
| -rw-r--r-- | src/chunker/rw/error.rs | 6 | ||||
| -rw-r--r-- | src/chunker/rw/storage.rs | 41 | ||||
| -rw-r--r-- | src/chunker/rw/storage/bidx.rs | 157 | ||||
| -rw-r--r-- | src/chunker/rw/storage/build.rs | 82 | ||||
| -rw-r--r-- | src/chunker/rw/storage/hash.rs (renamed from src/core/hash.rs) | 2 | ||||
| -rw-r--r-- | src/chunker/rw/storage/write.rs | 7 | ||||
| -rw-r--r-- | src/chunker/rw/storage/write/simple.rs | 171 | ||||
| -rw-r--r-- | src/chunker/rw/storage/write/stream.rs | 151 | ||||
| -rw-r--r-- | src/core.rs | 1 | ||||
| -rw-r--r-- | src/entry.rs | 50 | ||||
| -rw-r--r-- | src/lib.rs | 4 | ||||
| -rw-r--r-- | src/utils.rs | 2 | ||||
| -rw-r--r-- | src/utils/cmd_macros.rs (renamed from src/macros.rs) | 0 | ||||
| -rw-r--r-- | src/utils/log.rs (renamed from src/log.rs) | 0 |
21 files changed, 505 insertions, 276 deletions
diff --git a/src/bin/butck.rs b/src/bin/butck.rs index 6a81fbb..0d0e102 100644 --- a/src/bin/butck.rs +++ b/src/bin/butck.rs @@ -3,11 +3,11 @@ use std::process::exit; use butchunker::{ chunker::{ context::ButckContext, - entry::{entry, print_help, print_version}, rw::error::{ButckRWError, ButckRWErrorKind}, }, - log::init_logger, + entry::{entry, print_help, print_version}, special_argument, special_flag, + utils::log::init_logger, }; use just_progress::{progress, renderer}; use log::error; @@ -104,6 +104,10 @@ fn handle_entry_result(r: Result<(), ButckRWError>) { ButckRWErrorKind::ChunkFailed(_chunk_failed) => error!("Chunk failed"), ButckRWErrorKind::IOError(error) => error!("IO error: {}", error), ButckRWErrorKind::InvalidBidxFormat => error!("Invalid bidx format"), + ButckRWErrorKind::ChunkingFailed(reason) => error!("Chunking failed: {}", reason), + ButckRWErrorKind::IndexFileWriteFailed(reason) => { + error!("Failed to write index file: {}", reason) + } }, } } diff --git a/src/bin/butckrepo-guide.rs b/src/bin/butckrepo-guide.rs deleted file mode 100644 index d694ba5..0000000 --- a/src/bin/butckrepo-guide.rs +++ /dev/null @@ -1,13 +0,0 @@ -use colored::Colorize; - -fn main() { - println!("Welcome to Butchunker!"); - println!( - "Please add your policy crates to the `{}` directory", - "./policy/".bright_green() - ); - println!( - "Then run `{}` to update the policy registry", - "cargo run --bin butckrepo-refresh".bright_green() - ); -} diff --git a/src/bin/butckrepo-refresh.rs b/src/bin/butckrepo-refresh.rs index 9184efb..2b3d841 100644 --- a/src/bin/butckrepo-refresh.rs +++ b/src/bin/butckrepo-refresh.rs @@ -19,8 +19,8 @@ async fn main() { println!("Updating policies ..."); let (mut lib_rs_template, mut cargo_toml_template) = { - let lib_rs_template_path = current_dir.join("policy/_policies/src/lib.rs.t"); - let cargo_toml_template_path = current_dir.join("policy/_policies/Cargo.toml.t"); + let lib_rs_template_path = current_dir.join(LIB_RS_TEMPLATE_PATH); + let cargo_toml_template_path = current_dir.join(CARGO_TOML_TEMPLATE_PATH); let lib_rs_content = fs::read_to_string(&lib_rs_template_path) .await @@ -65,7 +65,10 @@ async fn main() { tmpl_param!(lib_rs_template, policy_count = cargo_toml_pathes.len()); - let collect_futures = cargo_toml_pathes.iter().map(collect).collect::<Vec<_>>(); + let collect_futures = cargo_toml_pathes + .iter() + .map(|path| collect(path)) + .collect::<Vec<_>>(); for policy in futures::future::join_all(collect_futures).await { let Some(policy) = policy else { continue }; @@ -96,8 +99,12 @@ async fn main() { crate_name = policy.crate_name, stream_struct_id = stream_struct_id ) }, - policy_names { ( - name = policy.crate_name, + match_arms_stream_display { ( + crate_name = policy.crate_name, + stream_struct_id = stream_struct_id + ) }, + stream_policy_names { ( + name = policy.crate_name ) } }); } else { @@ -120,8 +127,15 @@ async fn main() { crate_name = policy.crate_name, stream_struct_id = stream_struct_id ) }, + match_arms_stream_display { ( + crate_name = policy.crate_name, + stream_struct_id = stream_struct_id + ) }, policy_names { ( name = policy.crate_name, + ) }, + stream_policy_names { ( + name = policy.crate_name ) } }); } @@ -162,7 +176,9 @@ struct CollectedPolicy { stream_struct_id: Option<String>, } -async fn collect(policy_crate_path: &PathBuf) -> Option<CollectedPolicy> { +type MatchedFuncInfo = (String, bool, Option<String>, bool, Option<String>); + +async fn collect(policy_crate_path: &Path) -> Option<CollectedPolicy> { let lib_rs_path = policy_crate_path.join("src").join("lib.rs"); let lib_rs_content = fs::read_to_string(&lib_rs_path).await.ok()?; @@ -227,9 +243,7 @@ async fn collect(policy_crate_path: &PathBuf) -> Option<CollectedPolicy> { }) } -fn collect_matched_func( - lib_rs_content: &str, -) -> Option<(String, bool, Option<String>, bool, Option<String>)> { +fn collect_matched_func(lib_rs_content: &str) -> Option<MatchedFuncInfo> { let syntax_tree = syn::parse_file(lib_rs_content).ok()?; let mut matched_func = None; diff --git a/src/chunker.rs b/src/chunker.rs index 3143f68..d8a745b 100644 --- a/src/chunker.rs +++ b/src/chunker.rs @@ -1,4 +1,3 @@ pub mod constants; pub mod context; -pub mod entry; pub mod rw; diff --git a/src/chunker/constants.rs b/src/chunker/constants.rs index 5e4870e..11cb994 100644 --- a/src/chunker/constants.rs +++ b/src/chunker/constants.rs @@ -1,3 +1,3 @@ pub const BUTCK_STORAGE_DIR_NAME: &str = ".butck"; pub const BUTCK_INDEX_FILE_SUFFIX: &str = "bidx"; -pub const BUTCK_INDEX_MAGIC: [u8; 4] = [0xfe, 0xe1, 0xf0, 0x0d]; +pub const BUTCK_INDEX_MAGIC: [u8; 4] = *b"G00d"; diff --git a/src/chunker/context.rs b/src/chunker/context.rs index 79254f5..b7418f0 100644 --- a/src/chunker/context.rs +++ b/src/chunker/context.rs @@ -3,8 +3,9 @@ use std::{collections::HashMap, env::current_dir, path::PathBuf, process::exit, use log::{error, warn}; use crate::{ - chunker::constants::BUTCK_STORAGE_DIR_NAME, core::hash::ChunkWriteHash, special_argument, - special_flag, utils::file_input_solve::parse_path_input, + chunker::{constants::BUTCK_STORAGE_DIR_NAME, rw::storage::hash::ChunkWriteHash}, + special_argument, special_flag, + utils::file_input_solve::parse_path_input, }; #[derive(Debug, Default)] @@ -69,9 +70,10 @@ impl ButckContext { fn apply_stream_read(&mut self, args: &mut Vec<String>) { if let Some(size_str) = special_argument!(args, "-S", "--stream-read") - && let Ok(size) = size_str.parse::<u32>() { - self.stream_read = Some(size); - } + && let Ok(size) = size_str.parse::<u32>() + { + self.stream_read = Some(size); + } } fn apply_memmap_read(&mut self, args: &mut Vec<String>) -> bool { diff --git a/src/chunker/entry.rs b/src/chunker/entry.rs deleted file mode 100644 index 4fdb1f8..0000000 --- a/src/chunker/entry.rs +++ /dev/null @@ -1,39 +0,0 @@ -use std::process::exit; - -use log::info; - -use crate::chunker::{ - context::ButckContext, - rw::{self, error::ButckRWError}, -}; - -pub async fn entry(ctx: ButckContext, args: Vec<String>) -> Result<(), ButckRWError> { - if let Some(subcommand) = args.first() { - return match subcommand.as_str() { - "write" => rw::storage::write(ctx).await, - "build" => rw::storage::build(ctx).await, - "policies" => { - butck_policies::policies() - .iter() - .for_each(|p| info!("{}", p)); - return Ok(()); - } - _ => { - print_help(); - exit(1) - } - }; - } - Ok(()) -} - -pub fn print_help() { - println!("{}", include_str!("../../resources/helps/butck.txt").trim()); -} - -pub fn print_version() { - println!( - "{}", - include_str!("../../resources/version_info.txt").trim() - ); -} diff --git a/src/chunker/rw/error.rs b/src/chunker/rw/error.rs index 7f263a5..d113b96 100644 --- a/src/chunker/rw/error.rs +++ b/src/chunker/rw/error.rs @@ -34,6 +34,12 @@ pub enum ButckRWErrorKind { #[error("Chunking failed: {0}")] ChunkFailed(#[from] ChunkFailed), + #[error("Chunking process failed: {0}")] + ChunkingFailed(String), + + #[error("Failed to write index file: {0}")] + IndexFileWriteFailed(String), + #[error("IO error: {0}")] IOError(#[from] std::io::Error), } diff --git a/src/chunker/rw/storage.rs b/src/chunker/rw/storage.rs index 13452d0..135ad4c 100644 --- a/src/chunker/rw/storage.rs +++ b/src/chunker/rw/storage.rs @@ -1,4 +1,6 @@ +pub mod bidx; pub mod build; +pub mod hash; pub mod write; pub use build::build; @@ -13,52 +15,21 @@ pub struct ChunkInfo { pub index: usize, /// Hash of the chunk (hex string) pub hash: String, - /// Size of the chunk in bytes - pub size: usize, - /// Start position in the original file - pub start: usize, - /// End position in the original file (exclusive) - pub end: usize, } -/// 根据hash值计算chunk文件的存储路径 -/// -/// # 参数 -/// - `storage_dir`: 存储目录 -/// - `hash_hex`: chunk的hash值(16进制字符串) -/// -/// # 返回 -/// 返回chunk文件的完整路径 +/// Calculate the storage path of a chunk file based on its hash value pub fn get_chunk_path(storage_dir: &Path, hash_hex: &str) -> PathBuf { let first_slice = &hash_hex[0..2]; - let second_slice = &hash_hex[2..4]; - storage_dir - .join(first_slice) - .join(second_slice) - .join(hash_hex) + storage_dir.join(first_slice).join(hash_hex) } -/// 根据hash字节数组计算chunk文件的存储路径 -/// -/// # 参数 -/// - `storage_dir`: 存储目录 -/// - `hash_bytes`: chunk的hash值(字节数组) -/// -/// # 返回 -/// 返回chunk文件的完整路径 +/// Calculate the storage path of a chunk file based on its hash byte array pub fn get_chunk_path_from_bytes(storage_dir: &Path, hash_bytes: &[u8; 32]) -> PathBuf { let hash_hex = hex::encode(hash_bytes); get_chunk_path(storage_dir, &hash_hex) } -/// 生成唯一的文件路径,如果文件已存在则添加数字后缀 -/// -/// # 参数 -/// - `output_dir`: 输出目录 -/// - `desired_filename`: 期望的文件名 -/// -/// # 返回 -/// 返回唯一的文件路径 +/// Generate a unique file path, adding a numeric suffix if the file already exists pub fn generate_unique_path(output_dir: &Path, desired_filename: &str) -> PathBuf { let desired_path = output_dir.join(desired_filename); let mut candidate = desired_path.clone(); diff --git a/src/chunker/rw/storage/bidx.rs b/src/chunker/rw/storage/bidx.rs new file mode 100644 index 0000000..783ded6 --- /dev/null +++ b/src/chunker/rw/storage/bidx.rs @@ -0,0 +1,157 @@ +//! Bidx (Butchunker Index) file format utilities +//! +//! The bidx file format: +//! - Magic number: [u8; 4] = b"G00d" +//! - Original filename length: u16 (little-endian) +//! - Original filename: [u8] (UTF-8, no null terminator) +//! - Chunk hashes: [u8; 32][u8; 32][u8; 32]... (binary hashes, not hex strings) + +use std::io::{self, Write}; +use std::path::Path; + +use crate::chunker::constants::BUTCK_INDEX_MAGIC; +use crate::chunker::rw::storage::ChunkInfo; + +/// Write a bidx index file +pub fn write_bidx_file( + index_path: &Path, + chunk_infos: &[ChunkInfo], + original_file_path: &Path, +) -> io::Result<()> { + let file = std::fs::File::create(index_path)?; + let mut writer = io::BufWriter::new(file); + + // Magic bytes + writer.write_all(&BUTCK_INDEX_MAGIC)?; + + // Get original filename + let filename = original_file_path + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or("unknown"); + let filename_bytes = filename.as_bytes(); + + // Validate filename length + if filename_bytes.len() > u16::MAX as usize { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("Filename too long: {} bytes", filename_bytes.len()), + )); + } + + // Write filename length as u16 + let filename_len = filename_bytes.len() as u16; + writer.write_all(&filename_len.to_le_bytes())?; + + // Write filename bytes + writer.write_all(filename_bytes)?; + + // Write chunk hashes + for chunk_info in chunk_infos { + // Convert hex hash to 32-byte binary representation + let hash_bytes = match hex::decode(&chunk_info.hash) { + Ok(bytes) => bytes, + Err(e) => { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("Failed to decode hash hex '{}': {}", chunk_info.hash, e), + )); + } + }; + + // Ensure hash is exactly 32 bytes + if hash_bytes.len() != 32 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("Hash must be 32 bytes, got {} bytes", hash_bytes.len()), + )); + } + + // Write hash + writer.write_all(&hash_bytes)?; + } + + writer.flush()?; + Ok(()) +} + +/// Read a bidx index file +pub fn read_bidx_file(index_path: &Path) -> io::Result<(String, Vec<ChunkInfo>)> { + use std::io::Read; + + let mut file = std::fs::File::open(index_path)?; + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer)?; + + if buffer.len() < 4 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "File too short to contain magic number", + )); + } + + // Check magic number + if &buffer[0..4] != BUTCK_INDEX_MAGIC { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "Invalid magic number", + )); + } + + let mut offset = 4; + + // Read filename length + if offset + 2 > buffer.len() { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "File too short to contain filename length", + )); + } + let filename_len = u16::from_le_bytes([buffer[offset], buffer[offset + 1]]) as usize; + offset += 2; + + // Read filename + if offset + filename_len > buffer.len() { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "File too short to contain filename", + )); + } + let filename_bytes = &buffer[offset..offset + filename_len]; + let filename = String::from_utf8(filename_bytes.to_vec()).map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("Filename is not valid UTF-8: {}", e), + ) + })?; + offset += filename_len; + + // Read chunk hashes + let mut chunk_infos = Vec::new(); + let hash_size = 32; + + while offset + hash_size <= buffer.len() { + // Read hash + let hash_bytes = &buffer[offset..offset + hash_size]; + let hash = hex::encode(hash_bytes); + offset += hash_size; + + chunk_infos.push(ChunkInfo { + index: chunk_infos.len(), + hash, + }); + } + + // Check if we read exactly all data + if offset != buffer.len() { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "File contains {} extra bytes after chunk hashes", + buffer.len() - offset + ), + )); + } + + Ok((filename, chunk_infos)) +} diff --git a/src/chunker/rw/storage/build.rs b/src/chunker/rw/storage/build.rs index 7608b5c..51b5bf5 100644 --- a/src/chunker/rw/storage/build.rs +++ b/src/chunker/rw/storage/build.rs @@ -1,13 +1,12 @@ use futures::future::join_all; use just_progress::progress; use log::{error, info, trace}; -use memmap2::Mmap; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use tokio::{fs::File, io::AsyncWriteExt}; use crate::{ chunker::{ - constants::{BUTCK_INDEX_FILE_SUFFIX, BUTCK_INDEX_MAGIC}, + constants::BUTCK_INDEX_FILE_SUFFIX, context::ButckContext, rw::error::{ButckRWError, ButckRWErrorKind}, rw::storage, @@ -52,64 +51,47 @@ async fn rebuild_from_bidx( bidx_path: &PathBuf, ctx: &ButckContext, ) -> Result<(), ButckRWErrorKind> { - // Validate file extension - if let Some(ext) = bidx_path.extension() - && ext != BUTCK_INDEX_FILE_SUFFIX + // Validate file suffix + if let Some(suffix) = bidx_path.extension() + && suffix != BUTCK_INDEX_FILE_SUFFIX { return Err(ButckRWErrorKind::InvalidBidxFormat); } info!("Rebuilding from bidx file: {}", bidx_path.display()); - // Read bidx file content - let bidx_content = if ctx.memmap_read { - let file = File::open(bidx_path).await?; - let mmap = unsafe { Mmap::map(&file)? }; - mmap.to_vec() - } else { - tokio::fs::read(bidx_path).await? - }; - - // Verify file size includes at least the header - if bidx_content.len() < 6 { - return Err(ButckRWErrorKind::InvalidBidxFormat); - } - - // Validate MAGIC bytes - if bidx_content[0..4] != BUTCK_INDEX_MAGIC { - return Err(ButckRWErrorKind::InvalidBidxFormat); - } - - // Read filename - let filename_len = u16::from_le_bytes([bidx_content[4], bidx_content[5]]) as usize; - if bidx_content.len() < 6 + filename_len { - return Err(ButckRWErrorKind::InvalidBidxFormat); - } - let filename_bytes = &bidx_content[6..6 + filename_len]; - let original_filename = String::from_utf8(filename_bytes.to_vec()) - .map_err(|_| ButckRWErrorKind::InvalidBidxFormat)?; + // Use the unified bidx file reader + let (original_filename, chunk_infos) = + crate::chunker::rw::storage::bidx::read_bidx_file(bidx_path).map_err(|e| { + error!("Failed to read bidx file: {}", e); + ButckRWErrorKind::InvalidBidxFormat + })?; trace!("Original filename from bidx: {}", original_filename); - let hash_data_start = 6 + filename_len; - let hash_data = &bidx_content[hash_data_start..]; - - // Verify that hash data size is a multiple of 32 bytes - if hash_data.len() % 32 != 0 { - return Err(ButckRWErrorKind::InvalidBidxFormat); - } - - let chunk_count = hash_data.len() / 32; + let chunk_count = chunk_infos.len(); info!("Found {} chunks in bidx file", chunk_count); + // Extract hash bytes from chunk infos let mut chunk_hashes = Vec::with_capacity(chunk_count); - for i in 0..chunk_count { - let start = i * 32; - let end = start + 32; - let hash_bytes: [u8; 32] = hash_data[start..end] - .try_into() - .map_err(|_| ButckRWErrorKind::InvalidBidxFormat)?; - chunk_hashes.push(hash_bytes); + for chunk_info in &chunk_infos { + match hex::decode(&chunk_info.hash) { + Ok(hash_bytes) => { + if hash_bytes.len() == 32 { + let hash_array: [u8; 32] = hash_bytes + .try_into() + .map_err(|_| ButckRWErrorKind::InvalidBidxFormat)?; + chunk_hashes.push(hash_array); + } else { + error!("Invalid hash length: {} bytes", hash_bytes.len()); + return Err(ButckRWErrorKind::InvalidBidxFormat); + } + } + Err(e) => { + error!("Failed to decode hash hex: {}", e); + return Err(ButckRWErrorKind::InvalidBidxFormat); + } + } } trace!("Parsed {} chunk hashes", chunk_hashes.len()); @@ -210,7 +192,7 @@ async fn read_chunk( progress_name: &str, step: f64, hash_hex: String, - storage_dir: &PathBuf, + storage_dir: &Path, chunk_index: usize, ) -> Result<Vec<u8>, ButckRWErrorKind> { trace!("read_chunk[{}]: Starting, hash: {}", chunk_index, hash_hex); diff --git a/src/core/hash.rs b/src/chunker/rw/storage/hash.rs index 36a62b3..8eaa641 100644 --- a/src/core/hash.rs +++ b/src/chunker/rw/storage/hash.rs @@ -3,7 +3,7 @@ use sha2::{Digest as Sha2Digest, Sha256}; const SALT: &[u8] = b"Dude@"; -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone, Copy)] pub enum ChunkWriteHash { #[default] Blake3, diff --git a/src/chunker/rw/storage/write.rs b/src/chunker/rw/storage/write.rs index 8b3acc7..9348901 100644 --- a/src/chunker/rw/storage/write.rs +++ b/src/chunker/rw/storage/write.rs @@ -1,4 +1,7 @@ -use std::{collections::HashMap, path::PathBuf}; +use std::{ + collections::HashMap, + path::{Path, PathBuf}, +}; use log::trace; @@ -73,7 +76,7 @@ async fn write_file( } } -pub fn get_index_file_name(path: &PathBuf, ctx: &ButckContext) -> PathBuf { +pub fn get_index_file_name(path: &Path, ctx: &ButckContext) -> PathBuf { let output_file = if let Some(output_file) = &ctx.output_file { return output_file.clone(); } else { diff --git a/src/chunker/rw/storage/write/simple.rs b/src/chunker/rw/storage/write/simple.rs index 75b9bd7..38aecfc 100644 --- a/src/chunker/rw/storage/write/simple.rs +++ b/src/chunker/rw/storage/write/simple.rs @@ -1,15 +1,20 @@ use futures::future::join_all; use just_progress::progress; use log::{error, info, trace}; -use std::{collections::HashMap, path::PathBuf}; +use std::{ + collections::HashMap, + path::{Path, PathBuf}, +}; use tokio::{fs::File, io::AsyncReadExt}; use crate::{ chunker::{ context::ButckContext, - rw::{error::ButckRWErrorKind, storage}, + rw::{ + error::ButckRWErrorKind, + storage::{self, ChunkInfo, bidx::write_bidx_file, hash::ChunkWriteHash}, + }, }, - core::hash::ChunkWriteHash, storage::get_index_file_name, utils::size_display::size_display, }; @@ -64,7 +69,7 @@ async fn read_file( } async fn write_file_to_storage( - path: &PathBuf, + path: &Path, ctx: &ButckContext, chunk_boundaries: Vec<u32>, raw_data: &[u8], @@ -110,16 +115,15 @@ async fn write_file_to_storage( end - 1, end - start ); - tasks.push(write_chunk( - progress_name.as_str(), + let params = WriteChunkParams { + progress_name: progress_name.clone(), step, - chunk_data, - &ctx.output_dir, - &ctx.chunk_hash, + chunk_data: chunk_data.to_vec(), + output_dir: ctx.output_dir.clone(), + chunk_hash: ctx.chunk_hash, chunk_index, - start, - end, - )); + }; + tasks.push(write_chunk(params)); chunk_index += 1; } else { trace!( @@ -141,16 +145,15 @@ async fn write_file_to_storage( raw_data.len() - 1, raw_data.len() - start ); - tasks.push(write_chunk( - progress_name.as_str(), + let params = WriteChunkParams { + progress_name: progress_name.clone(), step, - chunk_data, - &ctx.output_dir, - &ctx.chunk_hash, + chunk_data: chunk_data.to_vec(), + output_dir: ctx.output_dir.clone(), + chunk_hash: ctx.chunk_hash, chunk_index, - start, - raw_data.len(), - )); + }; + tasks.push(write_chunk(params)); } trace!("Total chunks prepared for writing: {}", tasks.len()); @@ -192,78 +195,81 @@ async fn write_file_to_storage( Ok(()) } -async fn write_chunk( - progress_name: &str, +struct WriteChunkParams { + progress_name: String, step: f64, - chunk_data: &[u8], - output_dir: &PathBuf, - chunk_hash: &ChunkWriteHash, + chunk_data: Vec<u8>, + output_dir: PathBuf, + chunk_hash: ChunkWriteHash, chunk_index: usize, - start: usize, - end: usize, -) -> Result<crate::chunker::rw::storage::ChunkInfo, ButckRWErrorKind> { +} + +async fn write_chunk(params: WriteChunkParams) -> Result<ChunkInfo, ButckRWErrorKind> { trace!( "write_chunk[{}]: Starting, data size: {} bytes", - chunk_index, - chunk_data.len() + params.chunk_index, + params.chunk_data.len() ); trace!( "write_chunk[{}]: Computing hash with algorithm: {:?}", - chunk_index, chunk_hash + params.chunk_index, params.chunk_hash ); - let hash_bytes = chunk_hash.hash(chunk_data); + let hash_bytes = params.chunk_hash.hash(¶ms.chunk_data); trace!( "write_chunk[{}]: Hash computed: {:?}", - chunk_index, hash_bytes + params.chunk_index, hash_bytes ); let hash_hex = hex::encode(hash_bytes); - trace!("write_chunk[{}]: Hash hex: {}", chunk_index, hash_hex); + trace!( + "write_chunk[{}]: Hash hex: {}", + params.chunk_index, hash_hex + ); - let file_path = storage::get_chunk_path(output_dir, &hash_hex); + let file_path = storage::get_chunk_path(¶ms.output_dir, &hash_hex); if let Some(parent_dir) = file_path.parent() { trace!( "write_chunk[{}]: Creating directory structure: {}", - chunk_index, + params.chunk_index, parent_dir.display() ); tokio::fs::create_dir_all(parent_dir).await?; - trace!("write_chunk[{}]: Directory created", chunk_index); + trace!("write_chunk[{}]: Directory created", params.chunk_index); } trace!( "write_chunk[{}]: File path: {}", - chunk_index, + params.chunk_index, file_path.display() ); trace!( "write_chunk[{}]: Writing {} bytes to file", - chunk_index, - chunk_data.len() + params.chunk_index, + params.chunk_data.len() ); if !file_path.exists() { - tokio::fs::write(&file_path, chunk_data).await?; + tokio::fs::write(&file_path, ¶ms.chunk_data).await?; } else { trace!( "write_chunk[{}]: File already exists, skipping", - chunk_index + params.chunk_index ); } - trace!("write_chunk[{}]: File written successfully", chunk_index); - progress::increase(progress_name, step as f32); - Ok(crate::chunker::rw::storage::ChunkInfo { - index: chunk_index, + trace!( + "write_chunk[{}]: File written successfully", + params.chunk_index + ); + progress::increase(¶ms.progress_name, params.step as f32); + Ok(ChunkInfo { + index: params.chunk_index, hash: hash_hex, - size: chunk_data.len(), - start, - end, }) } -async fn get_boundaries<'a>( +async fn get_boundaries( raw_data: &[u8], ctx: &ButckContext, params: &HashMap<&str, &str>, @@ -276,69 +282,14 @@ async fn get_boundaries<'a>( } async fn write_index_file( - index_path: &PathBuf, - chunk_infos: &[crate::chunker::rw::storage::ChunkInfo], - original_file_path: &PathBuf, + index_path: &Path, + chunk_infos: &[ChunkInfo], + original_file_path: &Path, ) -> Result<(), std::io::Error> { - use std::io::Write; - - let file = std::fs::File::create(index_path)?; - let mut writer = std::io::BufWriter::new(file); - - // Write header: [u8; 4] magic + [u16] filename length + [u8] filename bytes - use crate::chunker::constants::BUTCK_INDEX_MAGIC; - - // Write magic bytes - writer.write_all(&BUTCK_INDEX_MAGIC)?; - - // Get original filename as bytes - let filename = original_file_path - .file_name() - .and_then(|n| n.to_str()) - .unwrap_or("unknown"); - let filename_bytes = filename.as_bytes(); - - // Write filename length as u16 (little-endian) - if filename_bytes.len() > u16::MAX as usize { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - format!("Filename too long: {} bytes", filename_bytes.len()), - )); - } - let filename_len = filename_bytes.len() as u16; - writer.write_all(&filename_len.to_le_bytes())?; - - // Write filename bytes - writer.write_all(filename_bytes)?; - - // Write chunk hashes: [u8; 32][u8; 32][u8; 32]... - for chunk_info in chunk_infos { - // Convert hex hash to bytes - match hex::decode(&chunk_info.hash) { - Ok(hash_bytes) => { - if hash_bytes.len() == 32 { - writer.write_all(&hash_bytes)?; - } else { - // Pad or truncate to 32 bytes if needed - let mut fixed_hash = [0u8; 32]; - let len = hash_bytes.len().min(32); - fixed_hash[..len].copy_from_slice(&hash_bytes[..len]); - writer.write_all(&fixed_hash)?; - } - } - Err(e) => { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!("Failed to decode hash hex: {}", e), - )); - } - } - } - - Ok(()) + write_bidx_file(index_path, chunk_infos, original_file_path) } -async fn display_boundaries(chunk_boundaries: &Vec<u32>, total_bytes: usize) { +pub async fn display_boundaries(chunk_boundaries: &[u32], total_bytes: usize) { let total_chunks = chunk_boundaries.len() + 1; let (total_value, total_unit) = size_display(total_bytes); info!( diff --git a/src/chunker/rw/storage/write/stream.rs b/src/chunker/rw/storage/write/stream.rs index 020cfcd..092cee7 100644 --- a/src/chunker/rw/storage/write/stream.rs +++ b/src/chunker/rw/storage/write/stream.rs @@ -1,12 +1,155 @@ -use std::{collections::HashMap, path::PathBuf}; +use std::{collections::HashMap, path::Path, sync::Arc}; -use crate::chunker::{context::ButckContext, rw::error::ButckRWErrorKind}; +use crate::{ + chunker::{ + context::ButckContext, + rw::{ + error::ButckRWErrorKind, + storage::{ChunkInfo, bidx::write_bidx_file, get_chunk_path}, + }, + }, + storage::{get_index_file_name, simple::display_boundaries}, +}; +use butck_policies::chunk_stream_with; +use just_progress::progress; +use log::{error, info, trace}; +use tokio::sync::Mutex; pub async fn write_file_stream( - path: &PathBuf, + path: &Path, stream_read_size: u32, ctx: &ButckContext, params: &HashMap<&str, &str>, ) -> Result<(), ButckRWErrorKind> { - todo!() + trace!( + "write_file_stream: Starting stream write for {}", + path.display() + ); + + // Check if policy is specified + let policy_name = ctx.policy_name.as_ref().ok_or_else(|| { + error!("No chunking policy specified for stream write"); + ButckRWErrorKind::ChunkingPolicyNotSpecified + })?; + + // Create progress bar + let progress_name = format!( + "Write `{}`", + path.file_name().unwrap_or_default().to_string_lossy() + ); + progress::update_progress(&progress_name, 0.0); + + // Collect chunk information + let chunk_infos = Arc::new(Mutex::new(Vec::new())); + let chunk_counter = Arc::new(Mutex::new(0usize)); + let output_dir = ctx.output_dir.clone(); + let chunk_hash = ctx.chunk_hash; + + // If only displaying boundaries, use chunk_stream_display_boundaries + if ctx.display_boundaries { + let boundaries = butck_policies::chunk_stream_display_boundaries( + policy_name, + stream_read_size, + path, + params, + ) + .await + .map_err(|e| { + error!("Stream chunking failed: {}", e); + ButckRWErrorKind::ChunkingFailed(e.to_string()) + })?; + + // Calculate total file size by reading the file + let total_bytes = tokio::fs::metadata(path) + .await + .map_err(|e| { + error!("Failed to get file metadata: {}", e); + ButckRWErrorKind::IOError(e) + })? + .len() as usize; + + // Display boundaries information + display_boundaries(&boundaries, total_bytes).await; + + return Ok(()); + } + + // Call chunk_stream_with with callback to write chunks + chunk_stream_with( + policy_name, + stream_read_size, + path, + |chunk_data: Vec<u8>| { + let output_dir = output_dir.clone(); + let chunk_hash = chunk_hash; + let progress_name = progress_name.clone(); + let chunk_infos = Arc::clone(&chunk_infos); + let chunk_counter = Arc::clone(&chunk_counter); + + Box::pin(async move { + // Increment chunk counter + let mut counter = chunk_counter.lock().await; + let chunk_index = *counter; + *counter += 1; + + // Compute hash + let hash_bytes = chunk_hash.hash(&chunk_data); + let hash_hex = hex::encode(hash_bytes); + + // Build file path + let file_path = get_chunk_path(&output_dir, &hash_hex); + + // Create directory if needed + if let Some(parent_dir) = file_path.parent() { + tokio::fs::create_dir_all(parent_dir).await?; + } + + // Write chunk if file doesn't exist + if !file_path.exists() { + tokio::fs::write(&file_path, &chunk_data).await?; + } + + // Store chunk info + let mut infos = chunk_infos.lock().await; + infos.push(ChunkInfo { + index: chunk_index, + hash: hash_hex, + }); + + // Update progress + progress::increase(&progress_name, 0.01); // Small increment per chunk + + Ok(()) + }) + }, + params, + ) + .await + .map_err(|e| { + error!("Stream chunking failed: {}", e); + ButckRWErrorKind::ChunkingFailed(e.to_string()) + })?; + + // Complete progress + progress::complete(&progress_name); + + // Get chunk infos + let chunk_infos = chunk_infos.lock().await; + + // Write index file + let index_file_name = get_index_file_name(path, ctx); + + // Use the unified bidx file writer + write_bidx_file(&index_file_name, &chunk_infos, path).map_err(|e| { + error!("Failed to write index file: {}", e); + ButckRWErrorKind::IndexFileWriteFailed(e.to_string()) + })?; + + info!( + "Stream write completed for {}: {} chunks written", + path.display(), + chunk_infos.len() + ); + + Ok(()) } diff --git a/src/core.rs b/src/core.rs deleted file mode 100644 index ec5d33c..0000000 --- a/src/core.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod hash; diff --git a/src/entry.rs b/src/entry.rs new file mode 100644 index 0000000..4e91e59 --- /dev/null +++ b/src/entry.rs @@ -0,0 +1,50 @@ +use std::process::exit; + +use crate::chunker::{ + context::ButckContext, + rw::{self, error::ButckRWError}, +}; + +pub async fn entry(ctx: ButckContext, args: Vec<String>) -> Result<(), ButckRWError> { + if let Some(subcommand) = args.first() { + return match subcommand.as_str() { + "write" => rw::storage::write(ctx).await, + "build" => rw::storage::build(ctx).await, + "policies" => { + let policies = butck_policies::policies(); + let stream_policies = butck_policies::stream_policies(); + let mut all_policies: Vec<_> = + policies.iter().chain(stream_policies.iter()).collect(); + all_policies.sort(); + all_policies.dedup(); + all_policies.iter().for_each(|p| println!("{}", p)); + return Ok(()); + } + "simple_policies" => { + butck_policies::policies() + .iter() + .for_each(|p| println!("{}", p)); + return Ok(()); + } + "stream_policies" => { + butck_policies::stream_policies() + .iter() + .for_each(|p| println!("{}", p)); + return Ok(()); + } + _ => { + print_help(); + exit(1) + } + }; + } + Ok(()) +} + +pub fn print_help() { + println!("{}", include_str!("../resources/helps/butck.txt").trim()); +} + +pub fn print_version() { + println!("{}", include_str!("../resources/version_info.txt").trim()); +} @@ -1,7 +1,5 @@ pub mod chunker; -pub mod core; -pub mod log; -pub mod macros; +pub mod entry; pub mod utils; pub mod storage { diff --git a/src/utils.rs b/src/utils.rs index b64c0c4..32fb545 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,2 +1,4 @@ +pub mod cmd_macros; pub mod file_input_solve; +pub mod log; pub mod size_display; diff --git a/src/macros.rs b/src/utils/cmd_macros.rs index 11b8da4..11b8da4 100644 --- a/src/macros.rs +++ b/src/utils/cmd_macros.rs diff --git a/src/log.rs b/src/utils/log.rs index 5fc6160..5fc6160 100644 --- a/src/log.rs +++ b/src/utils/log.rs |
