summaryrefslogtreecommitdiff
path: root/src/chunker
diff options
context:
space:
mode:
Diffstat (limited to 'src/chunker')
-rw-r--r--src/chunker/constants.rs2
-rw-r--r--src/chunker/context.rs12
-rw-r--r--src/chunker/entry.rs39
-rw-r--r--src/chunker/rw/error.rs6
-rw-r--r--src/chunker/rw/storage.rs41
-rw-r--r--src/chunker/rw/storage/bidx.rs157
-rw-r--r--src/chunker/rw/storage/build.rs82
-rw-r--r--src/chunker/rw/storage/hash.rs38
-rw-r--r--src/chunker/rw/storage/write.rs7
-rw-r--r--src/chunker/rw/storage/write/simple.rs171
-rw-r--r--src/chunker/rw/storage/write/stream.rs151
11 files changed, 460 insertions, 246 deletions
diff --git a/src/chunker/constants.rs b/src/chunker/constants.rs
index 5e4870e..11cb994 100644
--- a/src/chunker/constants.rs
+++ b/src/chunker/constants.rs
@@ -1,3 +1,3 @@
pub const BUTCK_STORAGE_DIR_NAME: &str = ".butck";
pub const BUTCK_INDEX_FILE_SUFFIX: &str = "bidx";
-pub const BUTCK_INDEX_MAGIC: [u8; 4] = [0xfe, 0xe1, 0xf0, 0x0d];
+pub const BUTCK_INDEX_MAGIC: [u8; 4] = *b"G00d";
diff --git a/src/chunker/context.rs b/src/chunker/context.rs
index 79254f5..b7418f0 100644
--- a/src/chunker/context.rs
+++ b/src/chunker/context.rs
@@ -3,8 +3,9 @@ use std::{collections::HashMap, env::current_dir, path::PathBuf, process::exit,
use log::{error, warn};
use crate::{
- chunker::constants::BUTCK_STORAGE_DIR_NAME, core::hash::ChunkWriteHash, special_argument,
- special_flag, utils::file_input_solve::parse_path_input,
+ chunker::{constants::BUTCK_STORAGE_DIR_NAME, rw::storage::hash::ChunkWriteHash},
+ special_argument, special_flag,
+ utils::file_input_solve::parse_path_input,
};
#[derive(Debug, Default)]
@@ -69,9 +70,10 @@ impl ButckContext {
fn apply_stream_read(&mut self, args: &mut Vec<String>) {
if let Some(size_str) = special_argument!(args, "-S", "--stream-read")
- && let Ok(size) = size_str.parse::<u32>() {
- self.stream_read = Some(size);
- }
+ && let Ok(size) = size_str.parse::<u32>()
+ {
+ self.stream_read = Some(size);
+ }
}
fn apply_memmap_read(&mut self, args: &mut Vec<String>) -> bool {
diff --git a/src/chunker/entry.rs b/src/chunker/entry.rs
deleted file mode 100644
index 4fdb1f8..0000000
--- a/src/chunker/entry.rs
+++ /dev/null
@@ -1,39 +0,0 @@
-use std::process::exit;
-
-use log::info;
-
-use crate::chunker::{
- context::ButckContext,
- rw::{self, error::ButckRWError},
-};
-
-pub async fn entry(ctx: ButckContext, args: Vec<String>) -> Result<(), ButckRWError> {
- if let Some(subcommand) = args.first() {
- return match subcommand.as_str() {
- "write" => rw::storage::write(ctx).await,
- "build" => rw::storage::build(ctx).await,
- "policies" => {
- butck_policies::policies()
- .iter()
- .for_each(|p| info!("{}", p));
- return Ok(());
- }
- _ => {
- print_help();
- exit(1)
- }
- };
- }
- Ok(())
-}
-
-pub fn print_help() {
- println!("{}", include_str!("../../resources/helps/butck.txt").trim());
-}
-
-pub fn print_version() {
- println!(
- "{}",
- include_str!("../../resources/version_info.txt").trim()
- );
-}
diff --git a/src/chunker/rw/error.rs b/src/chunker/rw/error.rs
index 7f263a5..d113b96 100644
--- a/src/chunker/rw/error.rs
+++ b/src/chunker/rw/error.rs
@@ -34,6 +34,12 @@ pub enum ButckRWErrorKind {
#[error("Chunking failed: {0}")]
ChunkFailed(#[from] ChunkFailed),
+ #[error("Chunking process failed: {0}")]
+ ChunkingFailed(String),
+
+ #[error("Failed to write index file: {0}")]
+ IndexFileWriteFailed(String),
+
#[error("IO error: {0}")]
IOError(#[from] std::io::Error),
}
diff --git a/src/chunker/rw/storage.rs b/src/chunker/rw/storage.rs
index 13452d0..135ad4c 100644
--- a/src/chunker/rw/storage.rs
+++ b/src/chunker/rw/storage.rs
@@ -1,4 +1,6 @@
+pub mod bidx;
pub mod build;
+pub mod hash;
pub mod write;
pub use build::build;
@@ -13,52 +15,21 @@ pub struct ChunkInfo {
pub index: usize,
/// Hash of the chunk (hex string)
pub hash: String,
- /// Size of the chunk in bytes
- pub size: usize,
- /// Start position in the original file
- pub start: usize,
- /// End position in the original file (exclusive)
- pub end: usize,
}
-/// 根据hash值计算chunk文件的存储路径
-///
-/// # 参数
-/// - `storage_dir`: 存储目录
-/// - `hash_hex`: chunk的hash值(16进制字符串)
-///
-/// # 返回
-/// 返回chunk文件的完整路径
+/// Calculate the storage path of a chunk file based on its hash value
pub fn get_chunk_path(storage_dir: &Path, hash_hex: &str) -> PathBuf {
let first_slice = &hash_hex[0..2];
- let second_slice = &hash_hex[2..4];
- storage_dir
- .join(first_slice)
- .join(second_slice)
- .join(hash_hex)
+ storage_dir.join(first_slice).join(hash_hex)
}
-/// 根据hash字节数组计算chunk文件的存储路径
-///
-/// # 参数
-/// - `storage_dir`: 存储目录
-/// - `hash_bytes`: chunk的hash值(字节数组)
-///
-/// # 返回
-/// 返回chunk文件的完整路径
+/// Calculate the storage path of a chunk file based on its hash byte array
pub fn get_chunk_path_from_bytes(storage_dir: &Path, hash_bytes: &[u8; 32]) -> PathBuf {
let hash_hex = hex::encode(hash_bytes);
get_chunk_path(storage_dir, &hash_hex)
}
-/// 生成唯一的文件路径,如果文件已存在则添加数字后缀
-///
-/// # 参数
-/// - `output_dir`: 输出目录
-/// - `desired_filename`: 期望的文件名
-///
-/// # 返回
-/// 返回唯一的文件路径
+/// Generate a unique file path, adding a numeric suffix if the file already exists
pub fn generate_unique_path(output_dir: &Path, desired_filename: &str) -> PathBuf {
let desired_path = output_dir.join(desired_filename);
let mut candidate = desired_path.clone();
diff --git a/src/chunker/rw/storage/bidx.rs b/src/chunker/rw/storage/bidx.rs
new file mode 100644
index 0000000..783ded6
--- /dev/null
+++ b/src/chunker/rw/storage/bidx.rs
@@ -0,0 +1,157 @@
+//! Bidx (Butchunker Index) file format utilities
+//!
+//! The bidx file format:
+//! - Magic number: [u8; 4] = b"G00d"
+//! - Original filename length: u16 (little-endian)
+//! - Original filename: [u8] (UTF-8, no null terminator)
+//! - Chunk hashes: [u8; 32][u8; 32][u8; 32]... (binary hashes, not hex strings)
+
+use std::io::{self, Write};
+use std::path::Path;
+
+use crate::chunker::constants::BUTCK_INDEX_MAGIC;
+use crate::chunker::rw::storage::ChunkInfo;
+
+/// Write a bidx index file
+pub fn write_bidx_file(
+ index_path: &Path,
+ chunk_infos: &[ChunkInfo],
+ original_file_path: &Path,
+) -> io::Result<()> {
+ let file = std::fs::File::create(index_path)?;
+ let mut writer = io::BufWriter::new(file);
+
+ // Magic bytes
+ writer.write_all(&BUTCK_INDEX_MAGIC)?;
+
+ // Get original filename
+ let filename = original_file_path
+ .file_name()
+ .and_then(|n| n.to_str())
+ .unwrap_or("unknown");
+ let filename_bytes = filename.as_bytes();
+
+ // Validate filename length
+ if filename_bytes.len() > u16::MAX as usize {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ format!("Filename too long: {} bytes", filename_bytes.len()),
+ ));
+ }
+
+ // Write filename length as u16
+ let filename_len = filename_bytes.len() as u16;
+ writer.write_all(&filename_len.to_le_bytes())?;
+
+ // Write filename bytes
+ writer.write_all(filename_bytes)?;
+
+ // Write chunk hashes
+ for chunk_info in chunk_infos {
+ // Convert hex hash to 32-byte binary representation
+ let hash_bytes = match hex::decode(&chunk_info.hash) {
+ Ok(bytes) => bytes,
+ Err(e) => {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!("Failed to decode hash hex '{}': {}", chunk_info.hash, e),
+ ));
+ }
+ };
+
+ // Ensure hash is exactly 32 bytes
+ if hash_bytes.len() != 32 {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!("Hash must be 32 bytes, got {} bytes", hash_bytes.len()),
+ ));
+ }
+
+ // Write hash
+ writer.write_all(&hash_bytes)?;
+ }
+
+ writer.flush()?;
+ Ok(())
+}
+
+/// Read a bidx index file
+pub fn read_bidx_file(index_path: &Path) -> io::Result<(String, Vec<ChunkInfo>)> {
+ use std::io::Read;
+
+ let mut file = std::fs::File::open(index_path)?;
+ let mut buffer = Vec::new();
+ file.read_to_end(&mut buffer)?;
+
+ if buffer.len() < 4 {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ "File too short to contain magic number",
+ ));
+ }
+
+ // Check magic number
+ if &buffer[0..4] != BUTCK_INDEX_MAGIC {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ "Invalid magic number",
+ ));
+ }
+
+ let mut offset = 4;
+
+ // Read filename length
+ if offset + 2 > buffer.len() {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ "File too short to contain filename length",
+ ));
+ }
+ let filename_len = u16::from_le_bytes([buffer[offset], buffer[offset + 1]]) as usize;
+ offset += 2;
+
+ // Read filename
+ if offset + filename_len > buffer.len() {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ "File too short to contain filename",
+ ));
+ }
+ let filename_bytes = &buffer[offset..offset + filename_len];
+ let filename = String::from_utf8(filename_bytes.to_vec()).map_err(|e| {
+ io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!("Filename is not valid UTF-8: {}", e),
+ )
+ })?;
+ offset += filename_len;
+
+ // Read chunk hashes
+ let mut chunk_infos = Vec::new();
+ let hash_size = 32;
+
+ while offset + hash_size <= buffer.len() {
+ // Read hash
+ let hash_bytes = &buffer[offset..offset + hash_size];
+ let hash = hex::encode(hash_bytes);
+ offset += hash_size;
+
+ chunk_infos.push(ChunkInfo {
+ index: chunk_infos.len(),
+ hash,
+ });
+ }
+
+ // Check if we read exactly all data
+ if offset != buffer.len() {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!(
+ "File contains {} extra bytes after chunk hashes",
+ buffer.len() - offset
+ ),
+ ));
+ }
+
+ Ok((filename, chunk_infos))
+}
diff --git a/src/chunker/rw/storage/build.rs b/src/chunker/rw/storage/build.rs
index 7608b5c..51b5bf5 100644
--- a/src/chunker/rw/storage/build.rs
+++ b/src/chunker/rw/storage/build.rs
@@ -1,13 +1,12 @@
use futures::future::join_all;
use just_progress::progress;
use log::{error, info, trace};
-use memmap2::Mmap;
-use std::path::PathBuf;
+use std::path::{Path, PathBuf};
use tokio::{fs::File, io::AsyncWriteExt};
use crate::{
chunker::{
- constants::{BUTCK_INDEX_FILE_SUFFIX, BUTCK_INDEX_MAGIC},
+ constants::BUTCK_INDEX_FILE_SUFFIX,
context::ButckContext,
rw::error::{ButckRWError, ButckRWErrorKind},
rw::storage,
@@ -52,64 +51,47 @@ async fn rebuild_from_bidx(
bidx_path: &PathBuf,
ctx: &ButckContext,
) -> Result<(), ButckRWErrorKind> {
- // Validate file extension
- if let Some(ext) = bidx_path.extension()
- && ext != BUTCK_INDEX_FILE_SUFFIX
+ // Validate file suffix
+ if let Some(suffix) = bidx_path.extension()
+ && suffix != BUTCK_INDEX_FILE_SUFFIX
{
return Err(ButckRWErrorKind::InvalidBidxFormat);
}
info!("Rebuilding from bidx file: {}", bidx_path.display());
- // Read bidx file content
- let bidx_content = if ctx.memmap_read {
- let file = File::open(bidx_path).await?;
- let mmap = unsafe { Mmap::map(&file)? };
- mmap.to_vec()
- } else {
- tokio::fs::read(bidx_path).await?
- };
-
- // Verify file size includes at least the header
- if bidx_content.len() < 6 {
- return Err(ButckRWErrorKind::InvalidBidxFormat);
- }
-
- // Validate MAGIC bytes
- if bidx_content[0..4] != BUTCK_INDEX_MAGIC {
- return Err(ButckRWErrorKind::InvalidBidxFormat);
- }
-
- // Read filename
- let filename_len = u16::from_le_bytes([bidx_content[4], bidx_content[5]]) as usize;
- if bidx_content.len() < 6 + filename_len {
- return Err(ButckRWErrorKind::InvalidBidxFormat);
- }
- let filename_bytes = &bidx_content[6..6 + filename_len];
- let original_filename = String::from_utf8(filename_bytes.to_vec())
- .map_err(|_| ButckRWErrorKind::InvalidBidxFormat)?;
+ // Use the unified bidx file reader
+ let (original_filename, chunk_infos) =
+ crate::chunker::rw::storage::bidx::read_bidx_file(bidx_path).map_err(|e| {
+ error!("Failed to read bidx file: {}", e);
+ ButckRWErrorKind::InvalidBidxFormat
+ })?;
trace!("Original filename from bidx: {}", original_filename);
- let hash_data_start = 6 + filename_len;
- let hash_data = &bidx_content[hash_data_start..];
-
- // Verify that hash data size is a multiple of 32 bytes
- if hash_data.len() % 32 != 0 {
- return Err(ButckRWErrorKind::InvalidBidxFormat);
- }
-
- let chunk_count = hash_data.len() / 32;
+ let chunk_count = chunk_infos.len();
info!("Found {} chunks in bidx file", chunk_count);
+ // Extract hash bytes from chunk infos
let mut chunk_hashes = Vec::with_capacity(chunk_count);
- for i in 0..chunk_count {
- let start = i * 32;
- let end = start + 32;
- let hash_bytes: [u8; 32] = hash_data[start..end]
- .try_into()
- .map_err(|_| ButckRWErrorKind::InvalidBidxFormat)?;
- chunk_hashes.push(hash_bytes);
+ for chunk_info in &chunk_infos {
+ match hex::decode(&chunk_info.hash) {
+ Ok(hash_bytes) => {
+ if hash_bytes.len() == 32 {
+ let hash_array: [u8; 32] = hash_bytes
+ .try_into()
+ .map_err(|_| ButckRWErrorKind::InvalidBidxFormat)?;
+ chunk_hashes.push(hash_array);
+ } else {
+ error!("Invalid hash length: {} bytes", hash_bytes.len());
+ return Err(ButckRWErrorKind::InvalidBidxFormat);
+ }
+ }
+ Err(e) => {
+ error!("Failed to decode hash hex: {}", e);
+ return Err(ButckRWErrorKind::InvalidBidxFormat);
+ }
+ }
}
trace!("Parsed {} chunk hashes", chunk_hashes.len());
@@ -210,7 +192,7 @@ async fn read_chunk(
progress_name: &str,
step: f64,
hash_hex: String,
- storage_dir: &PathBuf,
+ storage_dir: &Path,
chunk_index: usize,
) -> Result<Vec<u8>, ButckRWErrorKind> {
trace!("read_chunk[{}]: Starting, hash: {}", chunk_index, hash_hex);
diff --git a/src/chunker/rw/storage/hash.rs b/src/chunker/rw/storage/hash.rs
new file mode 100644
index 0000000..8eaa641
--- /dev/null
+++ b/src/chunker/rw/storage/hash.rs
@@ -0,0 +1,38 @@
+use blake3::Hasher as Blake3Hasher;
+use sha2::{Digest as Sha2Digest, Sha256};
+
+const SALT: &[u8] = b"Dude@";
+
+#[derive(Debug, Default, Clone, Copy)]
+pub enum ChunkWriteHash {
+ #[default]
+ Blake3,
+ Sha256,
+}
+
+impl ChunkWriteHash {
+ pub fn hash(&self, d: &[u8]) -> [u8; 32] {
+ match self {
+ ChunkWriteHash::Blake3 => hash_blake3(d),
+ ChunkWriteHash::Sha256 => hash_sha256(d),
+ }
+ }
+}
+
+/// Compute the Blake3 hash of the data with a salt
+/// Returns a 32-byte hash value
+pub fn hash_blake3(d: &[u8]) -> [u8; 32] {
+ let mut hasher = Blake3Hasher::new();
+ hasher.update(SALT);
+ hasher.update(d);
+ *hasher.finalize().as_bytes()
+}
+
+/// Compute the SHA-256 hash of the data with a salt
+/// Returns a 32-byte hash value
+pub fn hash_sha256(d: &[u8]) -> [u8; 32] {
+ let mut hasher = Sha256::new();
+ hasher.update(SALT);
+ hasher.update(d);
+ hasher.finalize().into()
+}
diff --git a/src/chunker/rw/storage/write.rs b/src/chunker/rw/storage/write.rs
index 8b3acc7..9348901 100644
--- a/src/chunker/rw/storage/write.rs
+++ b/src/chunker/rw/storage/write.rs
@@ -1,4 +1,7 @@
-use std::{collections::HashMap, path::PathBuf};
+use std::{
+ collections::HashMap,
+ path::{Path, PathBuf},
+};
use log::trace;
@@ -73,7 +76,7 @@ async fn write_file(
}
}
-pub fn get_index_file_name(path: &PathBuf, ctx: &ButckContext) -> PathBuf {
+pub fn get_index_file_name(path: &Path, ctx: &ButckContext) -> PathBuf {
let output_file = if let Some(output_file) = &ctx.output_file {
return output_file.clone();
} else {
diff --git a/src/chunker/rw/storage/write/simple.rs b/src/chunker/rw/storage/write/simple.rs
index 75b9bd7..38aecfc 100644
--- a/src/chunker/rw/storage/write/simple.rs
+++ b/src/chunker/rw/storage/write/simple.rs
@@ -1,15 +1,20 @@
use futures::future::join_all;
use just_progress::progress;
use log::{error, info, trace};
-use std::{collections::HashMap, path::PathBuf};
+use std::{
+ collections::HashMap,
+ path::{Path, PathBuf},
+};
use tokio::{fs::File, io::AsyncReadExt};
use crate::{
chunker::{
context::ButckContext,
- rw::{error::ButckRWErrorKind, storage},
+ rw::{
+ error::ButckRWErrorKind,
+ storage::{self, ChunkInfo, bidx::write_bidx_file, hash::ChunkWriteHash},
+ },
},
- core::hash::ChunkWriteHash,
storage::get_index_file_name,
utils::size_display::size_display,
};
@@ -64,7 +69,7 @@ async fn read_file(
}
async fn write_file_to_storage(
- path: &PathBuf,
+ path: &Path,
ctx: &ButckContext,
chunk_boundaries: Vec<u32>,
raw_data: &[u8],
@@ -110,16 +115,15 @@ async fn write_file_to_storage(
end - 1,
end - start
);
- tasks.push(write_chunk(
- progress_name.as_str(),
+ let params = WriteChunkParams {
+ progress_name: progress_name.clone(),
step,
- chunk_data,
- &ctx.output_dir,
- &ctx.chunk_hash,
+ chunk_data: chunk_data.to_vec(),
+ output_dir: ctx.output_dir.clone(),
+ chunk_hash: ctx.chunk_hash,
chunk_index,
- start,
- end,
- ));
+ };
+ tasks.push(write_chunk(params));
chunk_index += 1;
} else {
trace!(
@@ -141,16 +145,15 @@ async fn write_file_to_storage(
raw_data.len() - 1,
raw_data.len() - start
);
- tasks.push(write_chunk(
- progress_name.as_str(),
+ let params = WriteChunkParams {
+ progress_name: progress_name.clone(),
step,
- chunk_data,
- &ctx.output_dir,
- &ctx.chunk_hash,
+ chunk_data: chunk_data.to_vec(),
+ output_dir: ctx.output_dir.clone(),
+ chunk_hash: ctx.chunk_hash,
chunk_index,
- start,
- raw_data.len(),
- ));
+ };
+ tasks.push(write_chunk(params));
}
trace!("Total chunks prepared for writing: {}", tasks.len());
@@ -192,78 +195,81 @@ async fn write_file_to_storage(
Ok(())
}
-async fn write_chunk(
- progress_name: &str,
+struct WriteChunkParams {
+ progress_name: String,
step: f64,
- chunk_data: &[u8],
- output_dir: &PathBuf,
- chunk_hash: &ChunkWriteHash,
+ chunk_data: Vec<u8>,
+ output_dir: PathBuf,
+ chunk_hash: ChunkWriteHash,
chunk_index: usize,
- start: usize,
- end: usize,
-) -> Result<crate::chunker::rw::storage::ChunkInfo, ButckRWErrorKind> {
+}
+
+async fn write_chunk(params: WriteChunkParams) -> Result<ChunkInfo, ButckRWErrorKind> {
trace!(
"write_chunk[{}]: Starting, data size: {} bytes",
- chunk_index,
- chunk_data.len()
+ params.chunk_index,
+ params.chunk_data.len()
);
trace!(
"write_chunk[{}]: Computing hash with algorithm: {:?}",
- chunk_index, chunk_hash
+ params.chunk_index, params.chunk_hash
);
- let hash_bytes = chunk_hash.hash(chunk_data);
+ let hash_bytes = params.chunk_hash.hash(&params.chunk_data);
trace!(
"write_chunk[{}]: Hash computed: {:?}",
- chunk_index, hash_bytes
+ params.chunk_index, hash_bytes
);
let hash_hex = hex::encode(hash_bytes);
- trace!("write_chunk[{}]: Hash hex: {}", chunk_index, hash_hex);
+ trace!(
+ "write_chunk[{}]: Hash hex: {}",
+ params.chunk_index, hash_hex
+ );
- let file_path = storage::get_chunk_path(output_dir, &hash_hex);
+ let file_path = storage::get_chunk_path(&params.output_dir, &hash_hex);
if let Some(parent_dir) = file_path.parent() {
trace!(
"write_chunk[{}]: Creating directory structure: {}",
- chunk_index,
+ params.chunk_index,
parent_dir.display()
);
tokio::fs::create_dir_all(parent_dir).await?;
- trace!("write_chunk[{}]: Directory created", chunk_index);
+ trace!("write_chunk[{}]: Directory created", params.chunk_index);
}
trace!(
"write_chunk[{}]: File path: {}",
- chunk_index,
+ params.chunk_index,
file_path.display()
);
trace!(
"write_chunk[{}]: Writing {} bytes to file",
- chunk_index,
- chunk_data.len()
+ params.chunk_index,
+ params.chunk_data.len()
);
if !file_path.exists() {
- tokio::fs::write(&file_path, chunk_data).await?;
+ tokio::fs::write(&file_path, &params.chunk_data).await?;
} else {
trace!(
"write_chunk[{}]: File already exists, skipping",
- chunk_index
+ params.chunk_index
);
}
- trace!("write_chunk[{}]: File written successfully", chunk_index);
- progress::increase(progress_name, step as f32);
- Ok(crate::chunker::rw::storage::ChunkInfo {
- index: chunk_index,
+ trace!(
+ "write_chunk[{}]: File written successfully",
+ params.chunk_index
+ );
+ progress::increase(&params.progress_name, params.step as f32);
+ Ok(ChunkInfo {
+ index: params.chunk_index,
hash: hash_hex,
- size: chunk_data.len(),
- start,
- end,
})
}
-async fn get_boundaries<'a>(
+async fn get_boundaries(
raw_data: &[u8],
ctx: &ButckContext,
params: &HashMap<&str, &str>,
@@ -276,69 +282,14 @@ async fn get_boundaries<'a>(
}
async fn write_index_file(
- index_path: &PathBuf,
- chunk_infos: &[crate::chunker::rw::storage::ChunkInfo],
- original_file_path: &PathBuf,
+ index_path: &Path,
+ chunk_infos: &[ChunkInfo],
+ original_file_path: &Path,
) -> Result<(), std::io::Error> {
- use std::io::Write;
-
- let file = std::fs::File::create(index_path)?;
- let mut writer = std::io::BufWriter::new(file);
-
- // Write header: [u8; 4] magic + [u16] filename length + [u8] filename bytes
- use crate::chunker::constants::BUTCK_INDEX_MAGIC;
-
- // Write magic bytes
- writer.write_all(&BUTCK_INDEX_MAGIC)?;
-
- // Get original filename as bytes
- let filename = original_file_path
- .file_name()
- .and_then(|n| n.to_str())
- .unwrap_or("unknown");
- let filename_bytes = filename.as_bytes();
-
- // Write filename length as u16 (little-endian)
- if filename_bytes.len() > u16::MAX as usize {
- return Err(std::io::Error::new(
- std::io::ErrorKind::InvalidInput,
- format!("Filename too long: {} bytes", filename_bytes.len()),
- ));
- }
- let filename_len = filename_bytes.len() as u16;
- writer.write_all(&filename_len.to_le_bytes())?;
-
- // Write filename bytes
- writer.write_all(filename_bytes)?;
-
- // Write chunk hashes: [u8; 32][u8; 32][u8; 32]...
- for chunk_info in chunk_infos {
- // Convert hex hash to bytes
- match hex::decode(&chunk_info.hash) {
- Ok(hash_bytes) => {
- if hash_bytes.len() == 32 {
- writer.write_all(&hash_bytes)?;
- } else {
- // Pad or truncate to 32 bytes if needed
- let mut fixed_hash = [0u8; 32];
- let len = hash_bytes.len().min(32);
- fixed_hash[..len].copy_from_slice(&hash_bytes[..len]);
- writer.write_all(&fixed_hash)?;
- }
- }
- Err(e) => {
- return Err(std::io::Error::new(
- std::io::ErrorKind::InvalidData,
- format!("Failed to decode hash hex: {}", e),
- ));
- }
- }
- }
-
- Ok(())
+ write_bidx_file(index_path, chunk_infos, original_file_path)
}
-async fn display_boundaries(chunk_boundaries: &Vec<u32>, total_bytes: usize) {
+pub async fn display_boundaries(chunk_boundaries: &[u32], total_bytes: usize) {
let total_chunks = chunk_boundaries.len() + 1;
let (total_value, total_unit) = size_display(total_bytes);
info!(
diff --git a/src/chunker/rw/storage/write/stream.rs b/src/chunker/rw/storage/write/stream.rs
index 020cfcd..092cee7 100644
--- a/src/chunker/rw/storage/write/stream.rs
+++ b/src/chunker/rw/storage/write/stream.rs
@@ -1,12 +1,155 @@
-use std::{collections::HashMap, path::PathBuf};
+use std::{collections::HashMap, path::Path, sync::Arc};
-use crate::chunker::{context::ButckContext, rw::error::ButckRWErrorKind};
+use crate::{
+ chunker::{
+ context::ButckContext,
+ rw::{
+ error::ButckRWErrorKind,
+ storage::{ChunkInfo, bidx::write_bidx_file, get_chunk_path},
+ },
+ },
+ storage::{get_index_file_name, simple::display_boundaries},
+};
+use butck_policies::chunk_stream_with;
+use just_progress::progress;
+use log::{error, info, trace};
+use tokio::sync::Mutex;
pub async fn write_file_stream(
- path: &PathBuf,
+ path: &Path,
stream_read_size: u32,
ctx: &ButckContext,
params: &HashMap<&str, &str>,
) -> Result<(), ButckRWErrorKind> {
- todo!()
+ trace!(
+ "write_file_stream: Starting stream write for {}",
+ path.display()
+ );
+
+ // Check if policy is specified
+ let policy_name = ctx.policy_name.as_ref().ok_or_else(|| {
+ error!("No chunking policy specified for stream write");
+ ButckRWErrorKind::ChunkingPolicyNotSpecified
+ })?;
+
+ // Create progress bar
+ let progress_name = format!(
+ "Write `{}`",
+ path.file_name().unwrap_or_default().to_string_lossy()
+ );
+ progress::update_progress(&progress_name, 0.0);
+
+ // Collect chunk information
+ let chunk_infos = Arc::new(Mutex::new(Vec::new()));
+ let chunk_counter = Arc::new(Mutex::new(0usize));
+ let output_dir = ctx.output_dir.clone();
+ let chunk_hash = ctx.chunk_hash;
+
+ // If only displaying boundaries, use chunk_stream_display_boundaries
+ if ctx.display_boundaries {
+ let boundaries = butck_policies::chunk_stream_display_boundaries(
+ policy_name,
+ stream_read_size,
+ path,
+ params,
+ )
+ .await
+ .map_err(|e| {
+ error!("Stream chunking failed: {}", e);
+ ButckRWErrorKind::ChunkingFailed(e.to_string())
+ })?;
+
+ // Calculate total file size by reading the file
+ let total_bytes = tokio::fs::metadata(path)
+ .await
+ .map_err(|e| {
+ error!("Failed to get file metadata: {}", e);
+ ButckRWErrorKind::IOError(e)
+ })?
+ .len() as usize;
+
+ // Display boundaries information
+ display_boundaries(&boundaries, total_bytes).await;
+
+ return Ok(());
+ }
+
+ // Call chunk_stream_with with callback to write chunks
+ chunk_stream_with(
+ policy_name,
+ stream_read_size,
+ path,
+ |chunk_data: Vec<u8>| {
+ let output_dir = output_dir.clone();
+ let chunk_hash = chunk_hash;
+ let progress_name = progress_name.clone();
+ let chunk_infos = Arc::clone(&chunk_infos);
+ let chunk_counter = Arc::clone(&chunk_counter);
+
+ Box::pin(async move {
+ // Increment chunk counter
+ let mut counter = chunk_counter.lock().await;
+ let chunk_index = *counter;
+ *counter += 1;
+
+ // Compute hash
+ let hash_bytes = chunk_hash.hash(&chunk_data);
+ let hash_hex = hex::encode(hash_bytes);
+
+ // Build file path
+ let file_path = get_chunk_path(&output_dir, &hash_hex);
+
+ // Create directory if needed
+ if let Some(parent_dir) = file_path.parent() {
+ tokio::fs::create_dir_all(parent_dir).await?;
+ }
+
+ // Write chunk if file doesn't exist
+ if !file_path.exists() {
+ tokio::fs::write(&file_path, &chunk_data).await?;
+ }
+
+ // Store chunk info
+ let mut infos = chunk_infos.lock().await;
+ infos.push(ChunkInfo {
+ index: chunk_index,
+ hash: hash_hex,
+ });
+
+ // Update progress
+ progress::increase(&progress_name, 0.01); // Small increment per chunk
+
+ Ok(())
+ })
+ },
+ params,
+ )
+ .await
+ .map_err(|e| {
+ error!("Stream chunking failed: {}", e);
+ ButckRWErrorKind::ChunkingFailed(e.to_string())
+ })?;
+
+ // Complete progress
+ progress::complete(&progress_name);
+
+ // Get chunk infos
+ let chunk_infos = chunk_infos.lock().await;
+
+ // Write index file
+ let index_file_name = get_index_file_name(path, ctx);
+
+ // Use the unified bidx file writer
+ write_bidx_file(&index_file_name, &chunk_infos, path).map_err(|e| {
+ error!("Failed to write index file: {}", e);
+ ButckRWErrorKind::IndexFileWriteFailed(e.to_string())
+ })?;
+
+ info!(
+ "Stream write completed for {}: {} chunks written",
+ path.display(),
+ chunk_infos.len()
+ );
+
+ Ok(())
}