From 9e7c0fd45e169929156bdb317b10d7bb3db65f8b Mon Sep 17 00:00:00 2001 From: 魏曹先生 <1992414357@qq.com> Date: Sat, 7 Mar 2026 19:37:52 +0800 Subject: Add callback support to chunk_stream_with and implement stream writing --- src/chunker/rw/storage/write/stream.rs | 151 ++++++++++++++++++++++++++++++++- 1 file changed, 147 insertions(+), 4 deletions(-) (limited to 'src/chunker/rw/storage/write/stream.rs') diff --git a/src/chunker/rw/storage/write/stream.rs b/src/chunker/rw/storage/write/stream.rs index 020cfcd..092cee7 100644 --- a/src/chunker/rw/storage/write/stream.rs +++ b/src/chunker/rw/storage/write/stream.rs @@ -1,12 +1,155 @@ -use std::{collections::HashMap, path::PathBuf}; +use std::{collections::HashMap, path::Path, sync::Arc}; -use crate::chunker::{context::ButckContext, rw::error::ButckRWErrorKind}; +use crate::{ + chunker::{ + context::ButckContext, + rw::{ + error::ButckRWErrorKind, + storage::{ChunkInfo, bidx::write_bidx_file, get_chunk_path}, + }, + }, + storage::{get_index_file_name, simple::display_boundaries}, +}; +use butck_policies::chunk_stream_with; +use just_progress::progress; +use log::{error, info, trace}; +use tokio::sync::Mutex; pub async fn write_file_stream( - path: &PathBuf, + path: &Path, stream_read_size: u32, ctx: &ButckContext, params: &HashMap<&str, &str>, ) -> Result<(), ButckRWErrorKind> { - todo!() + trace!( + "write_file_stream: Starting stream write for {}", + path.display() + ); + + // Check if policy is specified + let policy_name = ctx.policy_name.as_ref().ok_or_else(|| { + error!("No chunking policy specified for stream write"); + ButckRWErrorKind::ChunkingPolicyNotSpecified + })?; + + // Create progress bar + let progress_name = format!( + "Write `{}`", + path.file_name().unwrap_or_default().to_string_lossy() + ); + progress::update_progress(&progress_name, 0.0); + + // Collect chunk information + let chunk_infos = Arc::new(Mutex::new(Vec::new())); + let chunk_counter = Arc::new(Mutex::new(0usize)); + let output_dir = ctx.output_dir.clone(); + let chunk_hash = ctx.chunk_hash; + + // If only displaying boundaries, use chunk_stream_display_boundaries + if ctx.display_boundaries { + let boundaries = butck_policies::chunk_stream_display_boundaries( + policy_name, + stream_read_size, + path, + params, + ) + .await + .map_err(|e| { + error!("Stream chunking failed: {}", e); + ButckRWErrorKind::ChunkingFailed(e.to_string()) + })?; + + // Calculate total file size by reading the file + let total_bytes = tokio::fs::metadata(path) + .await + .map_err(|e| { + error!("Failed to get file metadata: {}", e); + ButckRWErrorKind::IOError(e) + })? + .len() as usize; + + // Display boundaries information + display_boundaries(&boundaries, total_bytes).await; + + return Ok(()); + } + + // Call chunk_stream_with with callback to write chunks + chunk_stream_with( + policy_name, + stream_read_size, + path, + |chunk_data: Vec| { + let output_dir = output_dir.clone(); + let chunk_hash = chunk_hash; + let progress_name = progress_name.clone(); + let chunk_infos = Arc::clone(&chunk_infos); + let chunk_counter = Arc::clone(&chunk_counter); + + Box::pin(async move { + // Increment chunk counter + let mut counter = chunk_counter.lock().await; + let chunk_index = *counter; + *counter += 1; + + // Compute hash + let hash_bytes = chunk_hash.hash(&chunk_data); + let hash_hex = hex::encode(hash_bytes); + + // Build file path + let file_path = get_chunk_path(&output_dir, &hash_hex); + + // Create directory if needed + if let Some(parent_dir) = file_path.parent() { + tokio::fs::create_dir_all(parent_dir).await?; + } + + // Write chunk if file doesn't exist + if !file_path.exists() { + tokio::fs::write(&file_path, &chunk_data).await?; + } + + // Store chunk info + let mut infos = chunk_infos.lock().await; + infos.push(ChunkInfo { + index: chunk_index, + hash: hash_hex, + }); + + // Update progress + progress::increase(&progress_name, 0.01); // Small increment per chunk + + Ok(()) + }) + }, + params, + ) + .await + .map_err(|e| { + error!("Stream chunking failed: {}", e); + ButckRWErrorKind::ChunkingFailed(e.to_string()) + })?; + + // Complete progress + progress::complete(&progress_name); + + // Get chunk infos + let chunk_infos = chunk_infos.lock().await; + + // Write index file + let index_file_name = get_index_file_name(path, ctx); + + // Use the unified bidx file writer + write_bidx_file(&index_file_name, &chunk_infos, path).map_err(|e| { + error!("Failed to write index file: {}", e); + ButckRWErrorKind::IndexFileWriteFailed(e.to_string()) + })?; + + info!( + "Stream write completed for {}: {} chunks written", + path.display(), + chunk_infos.len() + ); + + Ok(()) } -- cgit