summaryrefslogtreecommitdiff
path: root/src/chunker/rw/storage/write
diff options
context:
space:
mode:
author魏曹先生 <1992414357@qq.com>2026-03-07 19:37:52 +0800
committer魏曹先生 <1992414357@qq.com>2026-03-07 19:37:52 +0800
commit9e7c0fd45e169929156bdb317b10d7bb3db65f8b (patch)
tree94c1e0e6cafe996b7b7da8dfd6e1ff1a04539cda /src/chunker/rw/storage/write
parent22926ce29e3f8e040ec349401aeb6a77f32eae72 (diff)
Add callback support to chunk_stream_with and implement stream writing
Diffstat (limited to 'src/chunker/rw/storage/write')
-rw-r--r--src/chunker/rw/storage/write/simple.rs171
-rw-r--r--src/chunker/rw/storage/write/stream.rs151
2 files changed, 208 insertions, 114 deletions
diff --git a/src/chunker/rw/storage/write/simple.rs b/src/chunker/rw/storage/write/simple.rs
index 75b9bd7..38aecfc 100644
--- a/src/chunker/rw/storage/write/simple.rs
+++ b/src/chunker/rw/storage/write/simple.rs
@@ -1,15 +1,20 @@
use futures::future::join_all;
use just_progress::progress;
use log::{error, info, trace};
-use std::{collections::HashMap, path::PathBuf};
+use std::{
+ collections::HashMap,
+ path::{Path, PathBuf},
+};
use tokio::{fs::File, io::AsyncReadExt};
use crate::{
chunker::{
context::ButckContext,
- rw::{error::ButckRWErrorKind, storage},
+ rw::{
+ error::ButckRWErrorKind,
+ storage::{self, ChunkInfo, bidx::write_bidx_file, hash::ChunkWriteHash},
+ },
},
- core::hash::ChunkWriteHash,
storage::get_index_file_name,
utils::size_display::size_display,
};
@@ -64,7 +69,7 @@ async fn read_file(
}
async fn write_file_to_storage(
- path: &PathBuf,
+ path: &Path,
ctx: &ButckContext,
chunk_boundaries: Vec<u32>,
raw_data: &[u8],
@@ -110,16 +115,15 @@ async fn write_file_to_storage(
end - 1,
end - start
);
- tasks.push(write_chunk(
- progress_name.as_str(),
+ let params = WriteChunkParams {
+ progress_name: progress_name.clone(),
step,
- chunk_data,
- &ctx.output_dir,
- &ctx.chunk_hash,
+ chunk_data: chunk_data.to_vec(),
+ output_dir: ctx.output_dir.clone(),
+ chunk_hash: ctx.chunk_hash,
chunk_index,
- start,
- end,
- ));
+ };
+ tasks.push(write_chunk(params));
chunk_index += 1;
} else {
trace!(
@@ -141,16 +145,15 @@ async fn write_file_to_storage(
raw_data.len() - 1,
raw_data.len() - start
);
- tasks.push(write_chunk(
- progress_name.as_str(),
+ let params = WriteChunkParams {
+ progress_name: progress_name.clone(),
step,
- chunk_data,
- &ctx.output_dir,
- &ctx.chunk_hash,
+ chunk_data: chunk_data.to_vec(),
+ output_dir: ctx.output_dir.clone(),
+ chunk_hash: ctx.chunk_hash,
chunk_index,
- start,
- raw_data.len(),
- ));
+ };
+ tasks.push(write_chunk(params));
}
trace!("Total chunks prepared for writing: {}", tasks.len());
@@ -192,78 +195,81 @@ async fn write_file_to_storage(
Ok(())
}
-async fn write_chunk(
- progress_name: &str,
+struct WriteChunkParams {
+ progress_name: String,
step: f64,
- chunk_data: &[u8],
- output_dir: &PathBuf,
- chunk_hash: &ChunkWriteHash,
+ chunk_data: Vec<u8>,
+ output_dir: PathBuf,
+ chunk_hash: ChunkWriteHash,
chunk_index: usize,
- start: usize,
- end: usize,
-) -> Result<crate::chunker::rw::storage::ChunkInfo, ButckRWErrorKind> {
+}
+
+async fn write_chunk(params: WriteChunkParams) -> Result<ChunkInfo, ButckRWErrorKind> {
trace!(
"write_chunk[{}]: Starting, data size: {} bytes",
- chunk_index,
- chunk_data.len()
+ params.chunk_index,
+ params.chunk_data.len()
);
trace!(
"write_chunk[{}]: Computing hash with algorithm: {:?}",
- chunk_index, chunk_hash
+ params.chunk_index, params.chunk_hash
);
- let hash_bytes = chunk_hash.hash(chunk_data);
+ let hash_bytes = params.chunk_hash.hash(&params.chunk_data);
trace!(
"write_chunk[{}]: Hash computed: {:?}",
- chunk_index, hash_bytes
+ params.chunk_index, hash_bytes
);
let hash_hex = hex::encode(hash_bytes);
- trace!("write_chunk[{}]: Hash hex: {}", chunk_index, hash_hex);
+ trace!(
+ "write_chunk[{}]: Hash hex: {}",
+ params.chunk_index, hash_hex
+ );
- let file_path = storage::get_chunk_path(output_dir, &hash_hex);
+ let file_path = storage::get_chunk_path(&params.output_dir, &hash_hex);
if let Some(parent_dir) = file_path.parent() {
trace!(
"write_chunk[{}]: Creating directory structure: {}",
- chunk_index,
+ params.chunk_index,
parent_dir.display()
);
tokio::fs::create_dir_all(parent_dir).await?;
- trace!("write_chunk[{}]: Directory created", chunk_index);
+ trace!("write_chunk[{}]: Directory created", params.chunk_index);
}
trace!(
"write_chunk[{}]: File path: {}",
- chunk_index,
+ params.chunk_index,
file_path.display()
);
trace!(
"write_chunk[{}]: Writing {} bytes to file",
- chunk_index,
- chunk_data.len()
+ params.chunk_index,
+ params.chunk_data.len()
);
if !file_path.exists() {
- tokio::fs::write(&file_path, chunk_data).await?;
+ tokio::fs::write(&file_path, &params.chunk_data).await?;
} else {
trace!(
"write_chunk[{}]: File already exists, skipping",
- chunk_index
+ params.chunk_index
);
}
- trace!("write_chunk[{}]: File written successfully", chunk_index);
- progress::increase(progress_name, step as f32);
- Ok(crate::chunker::rw::storage::ChunkInfo {
- index: chunk_index,
+ trace!(
+ "write_chunk[{}]: File written successfully",
+ params.chunk_index
+ );
+ progress::increase(&params.progress_name, params.step as f32);
+ Ok(ChunkInfo {
+ index: params.chunk_index,
hash: hash_hex,
- size: chunk_data.len(),
- start,
- end,
})
}
-async fn get_boundaries<'a>(
+async fn get_boundaries(
raw_data: &[u8],
ctx: &ButckContext,
params: &HashMap<&str, &str>,
@@ -276,69 +282,14 @@ async fn get_boundaries<'a>(
}
async fn write_index_file(
- index_path: &PathBuf,
- chunk_infos: &[crate::chunker::rw::storage::ChunkInfo],
- original_file_path: &PathBuf,
+ index_path: &Path,
+ chunk_infos: &[ChunkInfo],
+ original_file_path: &Path,
) -> Result<(), std::io::Error> {
- use std::io::Write;
-
- let file = std::fs::File::create(index_path)?;
- let mut writer = std::io::BufWriter::new(file);
-
- // Write header: [u8; 4] magic + [u16] filename length + [u8] filename bytes
- use crate::chunker::constants::BUTCK_INDEX_MAGIC;
-
- // Write magic bytes
- writer.write_all(&BUTCK_INDEX_MAGIC)?;
-
- // Get original filename as bytes
- let filename = original_file_path
- .file_name()
- .and_then(|n| n.to_str())
- .unwrap_or("unknown");
- let filename_bytes = filename.as_bytes();
-
- // Write filename length as u16 (little-endian)
- if filename_bytes.len() > u16::MAX as usize {
- return Err(std::io::Error::new(
- std::io::ErrorKind::InvalidInput,
- format!("Filename too long: {} bytes", filename_bytes.len()),
- ));
- }
- let filename_len = filename_bytes.len() as u16;
- writer.write_all(&filename_len.to_le_bytes())?;
-
- // Write filename bytes
- writer.write_all(filename_bytes)?;
-
- // Write chunk hashes: [u8; 32][u8; 32][u8; 32]...
- for chunk_info in chunk_infos {
- // Convert hex hash to bytes
- match hex::decode(&chunk_info.hash) {
- Ok(hash_bytes) => {
- if hash_bytes.len() == 32 {
- writer.write_all(&hash_bytes)?;
- } else {
- // Pad or truncate to 32 bytes if needed
- let mut fixed_hash = [0u8; 32];
- let len = hash_bytes.len().min(32);
- fixed_hash[..len].copy_from_slice(&hash_bytes[..len]);
- writer.write_all(&fixed_hash)?;
- }
- }
- Err(e) => {
- return Err(std::io::Error::new(
- std::io::ErrorKind::InvalidData,
- format!("Failed to decode hash hex: {}", e),
- ));
- }
- }
- }
-
- Ok(())
+ write_bidx_file(index_path, chunk_infos, original_file_path)
}
-async fn display_boundaries(chunk_boundaries: &Vec<u32>, total_bytes: usize) {
+pub async fn display_boundaries(chunk_boundaries: &[u32], total_bytes: usize) {
let total_chunks = chunk_boundaries.len() + 1;
let (total_value, total_unit) = size_display(total_bytes);
info!(
diff --git a/src/chunker/rw/storage/write/stream.rs b/src/chunker/rw/storage/write/stream.rs
index 020cfcd..092cee7 100644
--- a/src/chunker/rw/storage/write/stream.rs
+++ b/src/chunker/rw/storage/write/stream.rs
@@ -1,12 +1,155 @@
-use std::{collections::HashMap, path::PathBuf};
+use std::{collections::HashMap, path::Path, sync::Arc};
-use crate::chunker::{context::ButckContext, rw::error::ButckRWErrorKind};
+use crate::{
+ chunker::{
+ context::ButckContext,
+ rw::{
+ error::ButckRWErrorKind,
+ storage::{ChunkInfo, bidx::write_bidx_file, get_chunk_path},
+ },
+ },
+ storage::{get_index_file_name, simple::display_boundaries},
+};
+use butck_policies::chunk_stream_with;
+use just_progress::progress;
+use log::{error, info, trace};
+use tokio::sync::Mutex;
pub async fn write_file_stream(
- path: &PathBuf,
+ path: &Path,
stream_read_size: u32,
ctx: &ButckContext,
params: &HashMap<&str, &str>,
) -> Result<(), ButckRWErrorKind> {
- todo!()
+ trace!(
+ "write_file_stream: Starting stream write for {}",
+ path.display()
+ );
+
+ // Check if policy is specified
+ let policy_name = ctx.policy_name.as_ref().ok_or_else(|| {
+ error!("No chunking policy specified for stream write");
+ ButckRWErrorKind::ChunkingPolicyNotSpecified
+ })?;
+
+ // Create progress bar
+ let progress_name = format!(
+ "Write `{}`",
+ path.file_name().unwrap_or_default().to_string_lossy()
+ );
+ progress::update_progress(&progress_name, 0.0);
+
+ // Collect chunk information
+ let chunk_infos = Arc::new(Mutex::new(Vec::new()));
+ let chunk_counter = Arc::new(Mutex::new(0usize));
+ let output_dir = ctx.output_dir.clone();
+ let chunk_hash = ctx.chunk_hash;
+
+ // If only displaying boundaries, use chunk_stream_display_boundaries
+ if ctx.display_boundaries {
+ let boundaries = butck_policies::chunk_stream_display_boundaries(
+ policy_name,
+ stream_read_size,
+ path,
+ params,
+ )
+ .await
+ .map_err(|e| {
+ error!("Stream chunking failed: {}", e);
+ ButckRWErrorKind::ChunkingFailed(e.to_string())
+ })?;
+
+ // Calculate total file size by reading the file
+ let total_bytes = tokio::fs::metadata(path)
+ .await
+ .map_err(|e| {
+ error!("Failed to get file metadata: {}", e);
+ ButckRWErrorKind::IOError(e)
+ })?
+ .len() as usize;
+
+ // Display boundaries information
+ display_boundaries(&boundaries, total_bytes).await;
+
+ return Ok(());
+ }
+
+ // Call chunk_stream_with with callback to write chunks
+ chunk_stream_with(
+ policy_name,
+ stream_read_size,
+ path,
+ |chunk_data: Vec<u8>| {
+ let output_dir = output_dir.clone();
+ let chunk_hash = chunk_hash;
+ let progress_name = progress_name.clone();
+ let chunk_infos = Arc::clone(&chunk_infos);
+ let chunk_counter = Arc::clone(&chunk_counter);
+
+ Box::pin(async move {
+ // Increment chunk counter
+ let mut counter = chunk_counter.lock().await;
+ let chunk_index = *counter;
+ *counter += 1;
+
+ // Compute hash
+ let hash_bytes = chunk_hash.hash(&chunk_data);
+ let hash_hex = hex::encode(hash_bytes);
+
+ // Build file path
+ let file_path = get_chunk_path(&output_dir, &hash_hex);
+
+ // Create directory if needed
+ if let Some(parent_dir) = file_path.parent() {
+ tokio::fs::create_dir_all(parent_dir).await?;
+ }
+
+ // Write chunk if file doesn't exist
+ if !file_path.exists() {
+ tokio::fs::write(&file_path, &chunk_data).await?;
+ }
+
+ // Store chunk info
+ let mut infos = chunk_infos.lock().await;
+ infos.push(ChunkInfo {
+ index: chunk_index,
+ hash: hash_hex,
+ });
+
+ // Update progress
+ progress::increase(&progress_name, 0.01); // Small increment per chunk
+
+ Ok(())
+ })
+ },
+ params,
+ )
+ .await
+ .map_err(|e| {
+ error!("Stream chunking failed: {}", e);
+ ButckRWErrorKind::ChunkingFailed(e.to_string())
+ })?;
+
+ // Complete progress
+ progress::complete(&progress_name);
+
+ // Get chunk infos
+ let chunk_infos = chunk_infos.lock().await;
+
+ // Write index file
+ let index_file_name = get_index_file_name(path, ctx);
+
+ // Use the unified bidx file writer
+ write_bidx_file(&index_file_name, &chunk_infos, path).map_err(|e| {
+ error!("Failed to write index file: {}", e);
+ ButckRWErrorKind::IndexFileWriteFailed(e.to_string())
+ })?;
+
+ info!(
+ "Stream write completed for {}: {} chunks written",
+ path.display(),
+ chunk_infos.len()
+ );
+
+ Ok(())
}