diff options
Diffstat (limited to 'src/chunker/rw/storage/write/stream.rs')
| -rw-r--r-- | src/chunker/rw/storage/write/stream.rs | 36 |
1 files changed, 29 insertions, 7 deletions
diff --git a/src/chunker/rw/storage/write/stream.rs b/src/chunker/rw/storage/write/stream.rs index 74a391b..c4e786f 100644 --- a/src/chunker/rw/storage/write/stream.rs +++ b/src/chunker/rw/storage/write/stream.rs @@ -1,4 +1,11 @@ -use std::{collections::HashMap, path::Path, sync::Arc}; +use std::{ + collections::HashMap, + path::Path, + sync::{ + Arc, + atomic::{AtomicU32, Ordering}, + }, +}; use crate::{ chunker::{ @@ -8,11 +15,11 @@ use crate::{ storage::{ChunkInfo, bidx::write_bidx_file, get_chunk_path}, }, }, - storage::{get_index_file_name, simple::display_boundaries}, + storage::{display_boundaries, get_index_file_name}, }; use butck_policies::chunk_stream_with; use just_progress::progress; -use log::{error, info, trace}; +use log::{debug, error, trace}; use tokio::sync::Mutex; pub async fn write_file_stream( @@ -26,6 +33,15 @@ pub async fn write_file_stream( path.display() ); + let file_size = tokio::fs::metadata(path) + .await + .map_err(|e| { + error!("Failed to get file metadata: {}", e); + ButckRWErrorKind::IOError(e) + })? + .len() as f32; + let readed: Arc<AtomicU32> = Arc::new(AtomicU32::new(0)); + // Check if policy is specified let policy_name = ctx.policy_name.as_ref().ok_or_else(|| { error!("No chunking policy specified for stream write"); @@ -80,6 +96,7 @@ pub async fn write_file_stream( stream_read_size, path, |chunk_data: Vec<u8>| { + let readed = readed.clone(); let storage_dir = storage_dir.clone(); let chunk_hash = chunk_hash; let progress_name = progress_name.clone(); @@ -87,6 +104,14 @@ pub async fn write_file_stream( let chunk_counter = Arc::clone(&chunk_counter); Box::pin(async move { + // Update readed + readed.fetch_add(chunk_data.len() as u32, Ordering::SeqCst); + let readed_f32 = readed.load(Ordering::SeqCst) as f32; + + // Update progress + let progress = readed_f32 / file_size; + progress::update_progress(&progress_name, progress); + // Increment chunk counter let mut counter = chunk_counter.lock().await; let chunk_index = *counter; @@ -116,9 +141,6 @@ pub async fn write_file_stream( hash: hash_hex, }); - // Update progress - progress::increase(&progress_name, 0.01); // Small increment per chunk - Ok(()) }) }, @@ -145,7 +167,7 @@ pub async fn write_file_stream( ButckRWErrorKind::IndexFileWriteFailed(e.to_string()) })?; - info!( + debug!( "Stream write completed for {}: {} chunks written", path.display(), chunk_infos.len() |
