diff options
| author | 魏曹先生 <1992414357@qq.com> | 2025-09-29 15:50:12 +0800 |
|---|---|---|
| committer | 魏曹先生 <1992414357@qq.com> | 2025-09-29 15:50:12 +0800 |
| commit | e98b298d583626ab505debe778d0beba303256c3 (patch) | |
| tree | 0a3507e7af8382e47d2c2f14a82df0460275ac63 | |
| parent | 2753a38ab627369c8bffce610b3106869f26dd61 (diff) | |
Add incremental transfer functionality and update core components
- Implement instance_incremental_transfer module for efficient file synchronization
- Add support for chunk-based file transfer with hash comparison
- Update TCP connection utilities to support incremental transfer protocol
- Enhance error handling and version management for file synchronization
- Update dependencies and integrate new functionality into main library
| -rw-r--r-- | crates/utils/tcp_connection/src/behaviour.rs | 1 | ||||
| -rw-r--r-- | crates/utils/tcp_connection/src/instance_incremental_transfer.rs | 1209 | ||||
| -rw-r--r-- | crates/utils/tcp_connection/src/lib.rs | 2 | ||||
| -rw-r--r-- | crates/vcs/src/data/sheet.rs | 163 | ||||
| -rw-r--r-- | crates/vcs/src/data/vault/virtual_file.rs | 4 |
5 files changed, 147 insertions, 1232 deletions
diff --git a/crates/utils/tcp_connection/src/behaviour.rs b/crates/utils/tcp_connection/src/behaviour.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/crates/utils/tcp_connection/src/behaviour.rs @@ -0,0 +1 @@ + diff --git a/crates/utils/tcp_connection/src/instance_incremental_transfer.rs b/crates/utils/tcp_connection/src/instance_incremental_transfer.rs deleted file mode 100644 index aeb2cb8..0000000 --- a/crates/utils/tcp_connection/src/instance_incremental_transfer.rs +++ /dev/null @@ -1,1209 +0,0 @@ -use std::path::{Path, PathBuf}; - -use tokio::fs::{File, OpenOptions, copy, create_dir_all, read, read_to_string, remove_file}; -use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter}; - -use crate::{error::TcpTargetError, instance::ConnectionInstance}; - -// 增量传输协议版本 -const INCREMENTAL_TRANSFER_VERSION: u64 = 1; -// 块大小(字节) -const DEFAULT_CHUNK_SIZE: usize = 8192; -// 哈希大小(字节) -const HASH_SIZE: usize = 32; // blake3 produces 32-byte hashes -// 版本文件扩展名 -const VERSION_FILE_EXTENSION: &str = "ver"; -// 版本历史目录名 -const VERSION_HISTORY_DIR: &str = "diff"; -// 差异文件扩展名 -const DELTA_FILE_EXTENSION: &str = "delta"; - -// 协议模式常量 -const SERVER_DELTA_MODE: u8 = 1; -const SERVER_FULL_MODE: u8 = 2; -const CLIENT_UPDATE_MODE: u8 = 1; -const CLIENT_UPLOAD_MODE: u8 = 2; -const NO_CHANGE_MODE: u8 = 3; - -// 协议错误消息 -const ERR_INVALID_SERVER_RESPONSE: &str = "Invalid server response format"; -const ERR_VERSION_MISMATCH: &str = "Version mismatch detected"; -const ERR_CHUNK_INDEX_OUT_OF_BOUNDS: &str = "Chunk index out of bounds"; -const ERR_DELTA_FILE_CORRUPTED: &str = "Delta file corrupted or incomplete"; - -impl ConnectionInstance { - // ==================== 客户端功能 ==================== - - /// 客户端:增量更新到指定版本 - pub async fn client_update_to_version( - &mut self, - file_path: impl AsRef<Path>, - target_version: i32, - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - let current_version = self.get_current_version(path).await?; - println!( - "Client update: current_version={}, target_version={}", - current_version, target_version - ); - - if current_version >= target_version { - println!("Client update: Already up to date, skipping"); - return Ok(()); // 已经是最新版本 - } - - // 发送更新请求 - println!( - "Client update: Sending protocol version: {}", - INCREMENTAL_TRANSFER_VERSION - ); - self.stream - .write_all(&INCREMENTAL_TRANSFER_VERSION.to_be_bytes()) - .await?; - println!("Client update: Sending mode: {}", CLIENT_UPDATE_MODE); - self.stream.write_all(&[CLIENT_UPDATE_MODE]).await?; // 客户端更新模式 - println!("Client update: Sending target version: {}", target_version); - self.stream.write_all(&target_version.to_be_bytes()).await?; - self.stream.flush().await?; - println!("Client update: Request header sent"); - - // 执行增量更新 - println!("Client update: Starting incremental update..."); - let result = self - .client_perform_incremental_update(path, current_version, target_version) - .await; - println!( - "Client update: Incremental update completed, result: {:?}", - result - ); - result - } - - /// 客户端:增量上传变化到服务器 - pub async fn client_upload( - &mut self, - file_path: impl AsRef<Path>, - ) -> Result<i32, TcpTargetError> { - let path = file_path.as_ref(); - let current_version = self.get_current_version(path).await?; - - // 发送上传请求 - self.stream - .write_all(&INCREMENTAL_TRANSFER_VERSION.to_be_bytes()) - .await?; - self.stream.write_all(&[2u8]).await?; // 客户端上传模式 - self.stream - .write_all(¤t_version.to_be_bytes()) - .await?; - self.stream.flush().await?; - - // 执行增量上传 - let new_version = self - .client_perform_incremental_upload(path, current_version) - .await?; - - // 更新本地版本 - self.save_version(path, new_version).await?; - - Ok(new_version) - } - - // ==================== 服务端功能 ==================== - - /// 服务端:处理客户端更新请求 - pub async fn server_handle_client_update( - &mut self, - file_path: impl AsRef<Path>, - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - - println!("Server: Reading protocol version..."); - // 读取协议版本 - let mut version_buf = [0u8; 8]; - self.stream.read_exact(&mut version_buf).await?; - let version = u64::from_be_bytes(version_buf); - println!("Server: Received protocol version: {}", version); - - if version != INCREMENTAL_TRANSFER_VERSION { - return Err(TcpTargetError::Protocol( - "Unsupported incremental transfer version".to_string(), - )); - } - - // 读取请求模式 - let mut mode_buf = [0u8; 1]; - self.stream.read_exact(&mut mode_buf).await?; - println!("Server: Received mode: {}", mode_buf[0]); - - match mode_buf[0] { - CLIENT_UPDATE_MODE => { - // 客户端更新模式 - println!("Server: Client update mode detected"); - let mut version_buf = [0u8; 4]; - self.stream.read_exact(&mut version_buf).await?; - let target_version = i32::from_be_bytes(version_buf); - println!("Server: Target version: {}", target_version); - - println!("Server: Sending version delta..."); - let result = self.server_send_version_delta(path, target_version).await; - println!("Server: Version delta sent, result: {:?}", result); - result - } - CLIENT_UPLOAD_MODE => { - // 客户端上传模式 - let mut version_buf = [0u8; 4]; - self.stream.read_exact(&mut version_buf).await?; - let client_version = i32::from_be_bytes(version_buf); - - self.server_receive_client_changes(path, client_version) - .await - } - _ => { - return Err(TcpTargetError::Protocol(format!( - "{}: unknown mode {}", - ERR_INVALID_SERVER_RESPONSE, mode_buf[0] - ))); - } - } - } - - /// 服务端:发送指定版本的增量数据 - pub async fn server_send_delta_to_version( - &mut self, - file_path: impl AsRef<Path>, - from_version: i32, - to_version: i32, - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - - // 发送响应头 - self.stream - .write_all(&INCREMENTAL_TRANSFER_VERSION.to_be_bytes()) - .await?; - self.stream.write_all(&[1u8]).await?; // 服务端增量模式 - - self.server_send_version_delta_internal(path, from_version, to_version) - .await - } - - // ==================== 内部实现 ==================== - - /// 客户端:执行增量更新 - async fn client_perform_incremental_update( - &mut self, - file_path: impl AsRef<Path>, - current_version: i32, - target_version: i32, - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - - // 发送客户端当前版本给服务器 - self.stream - .write_all(¤t_version.to_be_bytes()) - .await?; - self.stream.flush().await?; - - // 读取服务器响应 - let mut version_buf = [0u8; 8]; - self.stream.read_exact(&mut version_buf).await?; - let version = u64::from_be_bytes(version_buf); - - if version != INCREMENTAL_TRANSFER_VERSION { - return Err(TcpTargetError::Protocol( - "Unsupported incremental transfer version".to_string(), - )); - } - - let mut mode_buf = [0u8; 1]; - self.stream.read_exact(&mut mode_buf).await?; - - match mode_buf[0] { - SERVER_DELTA_MODE => { - // 服务端增量模式 - self.client_apply_delta(path, target_version).await - } - NO_CHANGE_MODE => { - // 无变化模式,不需要更新 - Ok(()) - } - _ => Err(TcpTargetError::Protocol( - "Invalid server response".to_string(), - )), - } - } - - /// 客户端:应用增量数据 - async fn client_apply_delta( - &mut self, - file_path: impl AsRef<Path>, - target_version: i32, - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - - // 读取增量类型 - let mut delta_type_buf = [0u8; 1]; - self.stream.read_exact(&mut delta_type_buf).await?; - - match delta_type_buf[0] { - SERVER_FULL_MODE => { - // 完整文件传输 - self.read_file(path).await?; - } - SERVER_DELTA_MODE => { - // 增量块传输 - self.client_receive_and_apply_chunks(path).await?; - } - NO_CHANGE_MODE => { - // 无变化模式 - // 不需要做任何操作 - } - _ => { - return Err(TcpTargetError::Protocol(format!( - "{}: unknown mode {}", - ERR_INVALID_SERVER_RESPONSE, delta_type_buf[0] - ))); - } - } - - // 更新本地版本 - self.save_version(path, target_version).await?; - - // 发送确认 - self.stream.write_all(&[1u8]).await?; - self.stream.flush().await?; - - Ok(()) - } - - /// 客户端:接收并应用增量块 - async fn client_receive_and_apply_chunks( - &mut self, - file_path: impl AsRef<Path>, - ) -> Result<(), TcpTargetError> { - self.receive_and_apply_chunks_internal(file_path, None) - .await - } - - /// 客户端:执行增量上传 - async fn client_perform_incremental_upload( - &mut self, - file_path: impl AsRef<Path>, - current_version: i32, - ) -> Result<i32, TcpTargetError> { - let path = file_path.as_ref(); - - // 读取服务器响应 - println!("Client upload: Reading server response version..."); - let mut version_buf = [0u8; 8]; - self.stream.read_exact(&mut version_buf).await?; - let version = u64::from_be_bytes(version_buf); - println!("Client upload: Received protocol version: {}", version); - - if version != INCREMENTAL_TRANSFER_VERSION { - return Err(TcpTargetError::Protocol( - "Unsupported incremental transfer version".to_string(), - )); - } - - println!("Client upload: Reading server response mode..."); - let mut mode_buf = [0u8; 1]; - self.stream.read_exact(&mut mode_buf).await?; - println!("Client upload: Received mode: {}", mode_buf[0]); - - match mode_buf[0] { - SERVER_FULL_MODE => { - // 服务端接收模式 - let new_version = self.client_send_changes(path, current_version).await?; - Ok(new_version) - } - _ => Err(TcpTargetError::Protocol( - "Invalid server response".to_string(), - )), - } - } - - /// 客户端:发送变化到服务器 - async fn client_send_changes( - &mut self, - file_path: impl AsRef<Path>, - client_version: i32, - ) -> Result<i32, TcpTargetError> { - let path = file_path.as_ref(); - println!( - "Client send changes: Starting to send changes, client_version={}", - client_version - ); - - // 发送确认给服务器 - println!("Client: Sending acknowledgment to server..."); - self.stream.write_all(&[1u8]).await?; - self.stream.flush().await?; - println!("Client: Acknowledgment sent"); - - // 发送客户端版本给服务器进行验证 - self.stream.write_all(&client_version.to_be_bytes()).await?; - self.stream.flush().await?; - - // 计算当前文件块哈希 - let current_hashes = self.calculate_file_chunk_hashes(path).await?; - println!( - "Client: Calculated {} current chunk hashes", - current_hashes.len() - ); - - // 发送块哈希 - println!("Client: Sending chunk hashes to server..."); - match self.send_chunk_hashes(¤t_hashes).await { - Ok(_) => println!("Client: Successfully sent chunk hashes"), - Err(e) => { - println!("Client: ERROR sending chunk hashes: {:?}", e); - return Err(e); - } - } - // Extra flush to ensure data is sent before server reads - match self.stream.flush().await { - Ok(_) => println!("Client: Successfully flushed stream"), - Err(e) => { - println!("Client: ERROR flushing stream: {:?}", e); - return Err(e.into()); - } - } - - // 读取服务器需要的块列表 - println!("Client: Reading chunks needed from server..."); - let chunks_to_send = self.receive_chunks_to_send().await?; - println!("Client: Received chunks to send: {:?}", chunks_to_send); - - // 发送需要的块 - println!("Client send changes: Sending file chunks to server..."); - self.send_file_chunks(path, &chunks_to_send).await?; - println!("Client send changes: File chunks sent"); - - // 读取新版本号 - println!("Client send changes: Reading new version from server..."); - let mut version_buf = [0u8; 4]; - self.stream.read_exact(&mut version_buf).await?; - let new_version = i32::from_be_bytes(version_buf); - println!("Client send changes: Received new version: {}", new_version); - - Ok(new_version) - } - - /// 服务端:发送版本增量数据 - async fn server_send_version_delta( - &mut self, - file_path: impl AsRef<Path>, - target_version: i32, - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - - println!("Server send version delta: Reading client version..."); - // 获取客户端当前版本(需要从协议中读取) - let mut client_version_buf = [0u8; 4]; - self.stream.read_exact(&mut client_version_buf).await?; - let client_version = i32::from_be_bytes(client_version_buf); - println!( - "Server send version delta: Client version: {}", - client_version - ); - - // 发送响应头 - println!( - "Server send version delta: Sending protocol version: {}", - INCREMENTAL_TRANSFER_VERSION - ); - self.stream - .write_all(&INCREMENTAL_TRANSFER_VERSION.to_be_bytes()) - .await?; - println!( - "Server send version delta: Sending mode: {}", - SERVER_DELTA_MODE - ); - self.stream.write_all(&[SERVER_DELTA_MODE]).await?; // 服务端增量模式 - self.stream.flush().await?; - println!("Server send version delta: Response header sent"); - - println!("Server send version delta: Calling internal delta function..."); - let result = self - .server_send_version_delta_internal(path, client_version, target_version) - .await; - println!( - "Server send version delta: Internal function completed, result: {:?}", - result - ); - result - } - - /// 服务端:内部发送版本增量 - async fn server_send_version_delta_internal( - &mut self, - file_path: impl AsRef<Path>, - from_version: i32, - to_version: i32, - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - - println!( - "Server send version delta internal: from_version={}, to_version={}", - from_version, to_version - ); - - if from_version == to_version { - // 版本相同,不需要传输 - println!( - "Server send version delta internal: Versions are the same, sending NO_CHANGE_MODE" - ); - self.stream.write_all(&[NO_CHANGE_MODE]).await?; // 无变化模式 - return Ok(()); - } - - // 计算版本间的差异 - println!("Server send version delta internal: Calculating version delta..."); - let delta_chunks = self - .calculate_version_delta(path, from_version, to_version) - .await?; - println!( - "Server send version delta internal: Delta chunks count: {}", - delta_chunks.len() - ); - - if delta_chunks.is_empty() { - // 没有变化或目标版本不存在 - println!("Server send version delta internal: No changes, sending NO_CHANGE_MODE"); - self.stream.write_all(&[NO_CHANGE_MODE]).await?; // 无变化模式 - } else { - // 发送增量块 - println!( - "Server send version delta internal: Sending delta chunks with SERVER_DELTA_MODE" - ); - self.stream.write_all(&[SERVER_DELTA_MODE]).await?; // 增量块模式 - self.send_file_chunks(path, &delta_chunks).await?; - } - println!("Server send version delta internal: Completed successfully"); - Ok(()) - } - - /// 服务端:接收客户端变化 - async fn server_receive_client_changes( - &mut self, - file_path: impl AsRef<Path>, - client_version: i32, - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - println!("Server receive changes: Client version: {}", client_version); - - // 验证客户端版本是否匹配服务器当前版本 - let server_version = self.get_current_version(path).await?; - if client_version != server_version { - return Err(TcpTargetError::Protocol(format!( - "{}: client {}, server {}", - ERR_VERSION_MISMATCH, client_version, server_version - ))); - } - - // 发送响应头 - println!( - "Server receive changes: Sending protocol version: {}", - INCREMENTAL_TRANSFER_VERSION - ); - self.stream - .write_all(&INCREMENTAL_TRANSFER_VERSION.to_be_bytes()) - .await?; - println!("Server receive changes: Sending mode: {}", SERVER_FULL_MODE); - self.stream.write_all(&[SERVER_FULL_MODE]).await?; // 服务端完整文件模式 - self.stream.flush().await?; - println!("Server receive changes: Response header sent"); - - // 接收客户端块哈希 - println!("Server receive changes: Receiving client chunk hashes..."); - // 等待客户端确认 - let mut ack_buf = [0u8; 1]; - self.stream.read_exact(&mut ack_buf).await?; - if ack_buf[0] != 1 { - return Err(TcpTargetError::Protocol( - "Client acknowledgment failed".to_string(), - )); - } - println!("Server receive changes: Client acknowledged, reading client version..."); - - // 读取客户端版本进行验证 - let mut client_version_buf = [0u8; 4]; - self.stream.read_exact(&mut client_version_buf).await?; - let received_client_version = i32::from_be_bytes(client_version_buf); - println!( - "Server receive changes: Received client version: {}", - received_client_version - ); - - // 验证客户端版本是否匹配 - if received_client_version != client_version { - return Err(TcpTargetError::Protocol(format!( - "{}: expected {}, got {}", - ERR_VERSION_MISMATCH, client_version, received_client_version - ))); - } - - println!("Server receive changes: Reading chunk hashes..."); - let client_chunk_hashes = match self.receive_chunk_hashes().await { - Ok(hashes) => { - println!( - "Server receive changes: Successfully received {} client chunk hashes", - hashes.len() - ); - hashes - } - Err(e) => { - println!( - "Server receive changes: ERROR receiving chunk hashes: {:?}", - e - ); - return Err(e); - } - }; - - // 计算服务器当前块哈希 - let server_hashes = self.calculate_file_chunk_hashes(path).await?; - println!( - "Server: Calculated {} server chunk hashes", - server_hashes.len() - ); - - // 比较差异,确定需要更新的块 - let chunks_needed = self.compare_chunk_hashes(&client_chunk_hashes, &server_hashes); - println!( - "Server: Chunks needed after comparison: {:?}", - chunks_needed - ); - - // 发送需要的块列表 - println!("Server: Sending chunks needed list: {:?}", chunks_needed); - self.send_chunks_needed(&chunks_needed).await?; - - // 接收并应用客户端块 - self.receive_and_apply_chunks(path, &chunks_needed).await?; - - // 生成新版本号(这里简化:版本号+1) - let current_version = self.get_current_version(path).await?; - let new_version = current_version + 1; - - // 保存差异而不是完整文件 - self.save_version_delta(path, current_version, new_version, &chunks_needed) - .await?; - self.save_version(path, new_version).await?; - - // 发送新版本号给客户端 - println!( - "Server receive changes: Sending new version to client: {}", - new_version - ); - self.stream.write_all(&new_version.to_be_bytes()).await?; - self.stream.flush().await?; - println!("Server receive changes: New version sent to client"); - - Ok(()) - } - - // ==================== 工具函数 ==================== - - /// 计算文件的块哈希 - async fn calculate_file_chunk_hashes( - &self, - file_path: impl AsRef<Path>, - ) -> Result<Vec<[u8; HASH_SIZE]>, TcpTargetError> { - let path = file_path.as_ref(); - let mut file = File::open(path).await?; - let file_size = file.metadata().await?.len(); - - let mut hashes = Vec::new(); - let mut buffer = vec![0u8; DEFAULT_CHUNK_SIZE]; - let mut bytes_read = 0; - - while bytes_read < file_size { - let bytes_to_read = (file_size - bytes_read).min(DEFAULT_CHUNK_SIZE as u64) as usize; - let chunk = &mut buffer[..bytes_to_read]; - - file.read_exact(chunk).await?; - - let hash = self.simple_chunk_hash(chunk); - hashes.push(hash); - - bytes_read += bytes_to_read as u64; - } - - Ok(hashes) - } - - /// 简单的块哈希函数 - fn simple_chunk_hash(&self, data: &[u8]) -> [u8; HASH_SIZE] { - // 使用稳定的blake3哈希算法,确保跨平台一致性 - let hash = blake3::hash(data); - let mut hash_bytes = [0u8; HASH_SIZE]; - hash_bytes[..32].copy_from_slice(hash.as_bytes()); - hash_bytes - } - - /// 发送块哈希列表 - async fn send_chunk_hashes( - &mut self, - hashes: &[[u8; HASH_SIZE]], - ) -> Result<(), TcpTargetError> { - let hash_count = hashes.len() as u32; - println!("Client: Sending {} chunk hashes", hash_count); - - match self.stream.write_all(&hash_count.to_be_bytes()).await { - Ok(_) => println!("Client: Successfully sent hash count"), - Err(e) => { - println!("Client: ERROR sending hash count: {:?}", e); - return Err(e.into()); - } - } - // Extra flush to ensure hash count is sent before server reads - match self.stream.flush().await { - Ok(_) => println!("Client: Successfully flushed hash count"), - Err(e) => { - println!("Client: ERROR flushing hash count: {:?}", e); - return Err(e.into()); - } - } - - for (i, hash) in hashes.iter().enumerate() { - println!("Client: Sending chunk hash {}: {:?}", i, &hash[..8]); // Show first 8 bytes for debugging - match self.stream.write_all(hash).await { - Ok(_) => println!("Client: Successfully sent chunk hash {}", i), - Err(e) => { - println!("Client: ERROR sending chunk hash {}: {:?}", i, e); - return Err(e.into()); - } - } - } - - match self.stream.flush().await { - Ok(_) => println!("Client: Successfully flushed after sending hashes"), - Err(e) => { - println!("Client: ERROR flushing after sending hashes: {:?}", e); - return Err(e.into()); - } - } - println!("Client: All chunk hashes sent"); - // Extra flush to ensure data is sent before server reads - match self.stream.flush().await { - Ok(_) => println!("Client: Successfully flushed final"), - Err(e) => { - println!("Client: ERROR flushing final: {:?}", e); - return Err(e.into()); - } - } - Ok(()) - } - - /// 接收块哈希列表 - async fn receive_chunk_hashes(&mut self) -> Result<Vec<[u8; HASH_SIZE]>, TcpTargetError> { - println!("Server: Starting to receive chunk hashes..."); - - let mut count_buf = [0u8; 4]; - println!("Server: Reading chunk hash count..."); - self.stream.read_exact(&mut count_buf).await?; - let hash_count = u32::from_be_bytes(count_buf) as usize; - println!( - "Server: Reading {} chunk hashes, raw bytes: {:?}", - hash_count, count_buf - ); - - let mut hashes = Vec::with_capacity(hash_count); - let mut hash_buf = [0u8; HASH_SIZE]; - - for i in 0..hash_count { - println!("Server: Reading chunk hash {}...", i); - self.stream.read_exact(&mut hash_buf).await?; - println!("Server: Received chunk hash {}: {:?}", i, &hash_buf[..8]); // Show first 8 bytes for debugging - hashes.push(hash_buf); - } - - println!("Server: All {} chunk hashes received", hashes.len()); - Ok(hashes) - } - - /// 比较块哈希并返回需要更新的块索引 - fn compare_chunk_hashes( - &self, - new_hashes: &[[u8; HASH_SIZE]], - old_hashes: &[[u8; HASH_SIZE]], - ) -> Vec<usize> { - let mut chunks_needed = Vec::new(); - - for (i, (new_hash, old_hash)) in new_hashes.iter().zip(old_hashes.iter()).enumerate() { - if new_hash != old_hash { - chunks_needed.push(i); - } - } - - // 如果新文件有更多块,也需要更新 - for i in old_hashes.len()..new_hashes.len() { - chunks_needed.push(i); - } - - chunks_needed - } - - /// 接收需要传输的块列表 - async fn receive_chunks_to_send(&mut self) -> Result<Vec<usize>, TcpTargetError> { - let mut count_buf = [0u8; 4]; - self.stream.read_exact(&mut count_buf).await?; - let chunk_count = u32::from_be_bytes(count_buf) as usize; - - let mut chunks = Vec::with_capacity(chunk_count); - let mut index_buf = [0u8; 4]; - - for _ in 0..chunk_count { - self.stream.read_exact(&mut index_buf).await?; - let index = u32::from_be_bytes(index_buf) as usize; - chunks.push(index); - } - - Ok(chunks) - } - - /// 发送需要接收的块列表 - async fn send_chunks_needed(&mut self, chunks: &[usize]) -> Result<(), TcpTargetError> { - let chunk_count = chunks.len() as u32; - self.stream.write_all(&chunk_count.to_be_bytes()).await?; - - for &chunk_index in chunks { - self.stream - .write_all(&(chunk_index as u32).to_be_bytes()) - .await?; - } - - self.stream.flush().await?; - Ok(()) - } - - /// 发送指定的文件块 - async fn send_file_chunks( - &mut self, - file_path: impl AsRef<Path>, - chunk_indices: &[usize], - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - let mut file = File::open(path).await?; - let file_size = file.metadata().await?.len(); - - // 发送块数量 - self.stream - .write_all(&(chunk_indices.len() as u32).to_be_bytes()) - .await?; - - if chunk_indices.is_empty() { - self.stream.flush().await?; - return Ok(()); - } - - for &chunk_index in chunk_indices { - let chunk_offset = (chunk_index * DEFAULT_CHUNK_SIZE) as u64; - if chunk_offset >= file_size { - continue; // 跳过超出文件范围的块 - } - - // 定位到块位置 - tokio::io::AsyncSeekExt::seek(&mut file, std::io::SeekFrom::Start(chunk_offset)) - .await?; - - // 计算块大小 - let remaining_bytes = file_size - chunk_offset; - let chunk_size = remaining_bytes.min(DEFAULT_CHUNK_SIZE as u64) as usize; - - // 发送块索引和大小 - self.stream - .write_all(&(chunk_index as u32).to_be_bytes()) - .await?; - self.stream - .write_all(&(chunk_size as u32).to_be_bytes()) - .await?; - - // 发送块数据 - let mut buffer = vec![0u8; chunk_size]; - file.read_exact(&mut buffer).await?; - self.stream.write_all(&buffer).await?; - } - - self.stream.flush().await?; - Ok(()) - } - - /// 接收并应用增量块 - async fn receive_and_apply_chunks( - &mut self, - file_path: impl AsRef<Path>, - chunk_indices: &[usize], - ) -> Result<(), TcpTargetError> { - self.receive_and_apply_chunks_internal(file_path, Some(chunk_indices)) - .await - } - - /// 内部工具函数:接收并应用块数据 - async fn receive_and_apply_chunks_internal( - &mut self, - file_path: impl AsRef<Path>, - expected_indices: Option<&[usize]>, - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - - // 读取块数量 - let mut count_buf = [0u8; 4]; - self.stream.read_exact(&mut count_buf).await?; - let chunk_count = u32::from_be_bytes(count_buf) as usize; - - // 创建临时文件来接收完整内容 - let temp_path = path.with_extension("tmp"); - let temp_file = File::create(&temp_path).await?; - let mut temp_writer = BufWriter::new(temp_file); - - // 记录接收到的块数据 - let mut received_chunks = Vec::new(); - - for i in 0..chunk_count { - // 读取块索引和大小 - let mut index_buf = [0u8; 4]; - self.stream.read_exact(&mut index_buf).await?; - let chunk_index = u32::from_be_bytes(index_buf) as usize; - - // 验证块索引(如果提供了预期索引) - if let Some(expected) = expected_indices { - if i >= expected.len() || chunk_index != expected[i] { - return Err(TcpTargetError::Protocol(format!( - "{}: expected {:?}, got {}", - ERR_CHUNK_INDEX_OUT_OF_BOUNDS, - expected.get(i), - chunk_index - ))); - } - } - - let mut size_buf = [0u8; 4]; - self.stream.read_exact(&mut size_buf).await?; - let chunk_size = u32::from_be_bytes(size_buf) as usize; - - // 读取块数据 - let mut chunk_data = vec![0u8; chunk_size]; - self.stream.read_exact(&mut chunk_data).await?; - - // 记录接收到的块 - received_chunks.push((chunk_index, chunk_data)); - } - - // 按块索引排序并写入临时文件 - received_chunks.sort_by_key(|(index, _)| *index); - - for (chunk_index, chunk_data) in received_chunks { - let chunk_offset = (chunk_index * DEFAULT_CHUNK_SIZE) as u64; - tokio::io::AsyncSeekExt::seek(&mut temp_writer, std::io::SeekFrom::Start(chunk_offset)) - .await?; - temp_writer.write_all(&chunk_data).await?; - } - - temp_writer.flush().await?; - - // 替换原文件 - tokio::fs::rename(&temp_path, path).await?; - - Ok(()) - } - - /// 获取当前文件版本 - async fn get_current_version( - &self, - file_path: impl AsRef<Path>, - ) -> Result<i32, TcpTargetError> { - let path = file_path.as_ref(); - let version_file_path = path.with_extension(VERSION_FILE_EXTENSION); - - if !version_file_path.exists() { - return Ok(0); // 默认版本为0 - } - - let version_content = read_to_string(&version_file_path) - .await - .map_err(|e| TcpTargetError::File(format!("Failed to read version file: {}", e)))?; - - version_content - .trim() - .parse::<i32>() - .map_err(|e| TcpTargetError::File(format!("Invalid version format: {}", e))) - } - - /// 保存文件版本 - async fn save_version( - &self, - file_path: impl AsRef<Path>, - version: i32, - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - let version_file_path = path.with_extension(VERSION_FILE_EXTENSION); - - tokio::fs::write(&version_file_path, version.to_string()) - .await - .map_err(|e| TcpTargetError::File(format!("Failed to write version file: {}", e)))?; - - Ok(()) - } - - /// 计算版本间的差异 - async fn calculate_version_delta( - &self, - file_path: impl AsRef<Path>, - from_version: i32, - to_version: i32, - ) -> Result<Vec<usize>, TcpTargetError> { - let path = file_path.as_ref(); - - // 如果目标版本不存在,返回空差异 - let current_version = self.get_current_version(path).await?; - if to_version > current_version { - return Ok(Vec::new()); - } - - // 获取源版本和目标版本的块哈希 - let from_hashes = self.get_version_chunk_hashes(path, from_version).await?; - let to_hashes = self.get_version_chunk_hashes(path, to_version).await?; - - // 比较差异 - let delta_chunks = self.compare_chunk_hashes(&to_hashes, &from_hashes); - - Ok(delta_chunks) - } - - /// 获取指定版本的块哈希 - async fn get_version_chunk_hashes( - &self, - file_path: impl AsRef<Path>, - version: i32, - ) -> Result<Vec<[u8; HASH_SIZE]>, TcpTargetError> { - let path = file_path.as_ref(); - - if version == 0 { - // 版本0表示空文件 - return Ok(Vec::new()); - } - - // 从版本历史中重建文件并计算哈希 - let reconstructed_path = self.reconstruct_version(path, version).await?; - let hashes = self - .calculate_file_chunk_hashes(&reconstructed_path) - .await?; - - // 清理临时文件 - let _ = remove_file(&reconstructed_path).await; - - Ok(hashes) - } - - /// 从差异重建指定版本的文件 - async fn reconstruct_version( - &self, - file_path: impl AsRef<Path>, - target_version: i32, - ) -> Result<PathBuf, TcpTargetError> { - let path = file_path.as_ref(); - let temp_path = path.with_extension(format!("temp_{}", target_version)); - - if target_version == 0 { - // 创建空文件 - tokio::fs::write(&temp_path, b"").await?; - return Ok(temp_path); - } - - // 从版本0开始,逐步应用差异 - let mut current_version = 0; - let mut current_path = path.with_extension("temp_base"); - tokio::fs::write(¤t_path, b"").await?; // 创建基础空文件 - - while current_version < target_version { - let next_version = current_version + 1; - let (delta_chunks, chunk_data_list) = self - .load_version_delta(path, current_version, next_version) - .await?; - - // 应用差异到新文件 - let next_path = path.with_extension(format!("temp_{}", next_version)); - self.apply_delta_to_file(¤t_path, &next_path, &delta_chunks, &chunk_data_list) - .await?; - - // 清理旧临时文件 - if current_version > 0 { - let _ = tokio::fs::remove_file(¤t_path).await; - } - current_path = next_path; - current_version = next_version; - } - - Ok(current_path) - } - - /// 保存版本差异 - async fn save_version_delta( - &self, - file_path: impl AsRef<Path>, - from_version: i32, - to_version: i32, - changed_chunks: &[usize], - ) -> Result<(), TcpTargetError> { - let path = file_path.as_ref(); - - // 创建版本历史目录 - let history_dir = path - .parent() - .ok_or_else(|| TcpTargetError::File("Invalid file path".to_string()))? - .join(VERSION_HISTORY_DIR); - - if !history_dir.exists() { - create_dir_all(&history_dir).await?; - } - - // 保存差异信息(记录变化的块索引和实际数据) - let delta_path = history_dir.join(format!( - "{}_{}_{}.{}", - path.file_name() - .ok_or_else(|| TcpTargetError::File("Invalid file name".to_string()))? - .to_string_lossy(), - from_version, - to_version, - DELTA_FILE_EXTENSION - )); - - // 读取当前版本文件以获取变化块的实际数据 - let current_file_data = read(path).await?; - - let mut delta_data = Vec::new(); - - for &chunk_index in changed_chunks { - // 写入块索引 - delta_data.extend_from_slice(&(chunk_index as u32).to_be_bytes()); - - // 计算块的起始和结束位置 - let chunk_start = chunk_index * DEFAULT_CHUNK_SIZE; - let chunk_end = - std::cmp::min(chunk_start + DEFAULT_CHUNK_SIZE, current_file_data.len()); - - // 写入块数据大小 - let chunk_size = (chunk_end - chunk_start) as u32; - delta_data.extend_from_slice(&chunk_size.to_be_bytes()); - - // 写入块数据 - delta_data.extend_from_slice(¤t_file_data[chunk_start..chunk_end]); - } - - tokio::fs::write(&delta_path, &delta_data).await?; - - Ok(()) - } - - /// 加载版本差异 - async fn load_version_delta( - &self, - file_path: impl AsRef<Path>, - from_version: i32, - to_version: i32, - ) -> Result<(Vec<usize>, Vec<Vec<u8>>), TcpTargetError> { - let path = file_path.as_ref(); - let history_dir = path.parent().unwrap().join(VERSION_HISTORY_DIR); - - let delta_path = history_dir.join(format!( - "{}_{}_{}.{}", - path.file_name().unwrap().to_string_lossy(), - from_version, - to_version, - DELTA_FILE_EXTENSION - )); - - if !delta_path.exists() { - return Ok((Vec::new(), Vec::new())); - } - - let delta_data = read(&delta_path).await?; - let mut chunks = Vec::new(); - let mut chunk_data_list = Vec::new(); - - let mut offset = 0; - while offset < delta_data.len() { - // 读取块索引 (4 bytes) - if offset + 4 > delta_data.len() { - return Err(TcpTargetError::File(format!( - "{}: incomplete index data", - ERR_DELTA_FILE_CORRUPTED - ))); - } - let index = u32::from_be_bytes([ - delta_data[offset], - delta_data[offset + 1], - delta_data[offset + 2], - delta_data[offset + 3], - ]) as usize; - offset += 4; - - // 读取块大小 (4 bytes) - if offset + 4 > delta_data.len() { - return Err(TcpTargetError::File(format!( - "{}: incomplete size data", - ERR_DELTA_FILE_CORRUPTED - ))); - } - let chunk_size = u32::from_be_bytes([ - delta_data[offset], - delta_data[offset + 1], - delta_data[offset + 2], - delta_data[offset + 3], - ]) as usize; - offset += 4; - - // 读取块数据 - if offset + chunk_size > delta_data.len() { - return Err(TcpTargetError::File(format!( - "{}: incomplete chunk data", - ERR_DELTA_FILE_CORRUPTED - ))); - } - let chunk_data = delta_data[offset..offset + chunk_size].to_vec(); - offset += chunk_size; - - chunks.push(index); - chunk_data_list.push(chunk_data); - } - - Ok((chunks, chunk_data_list)) - } - - /// 应用差异到文件 - async fn apply_delta_to_file( - &self, - source_path: impl AsRef<Path>, - target_path: impl AsRef<Path>, - delta_chunks: &[usize], - chunk_data_list: &[Vec<u8>], - ) -> Result<(), TcpTargetError> { - let source_path = source_path.as_ref(); - let target_path = target_path.as_ref(); - - // 复制源文件到目标文件 - copy(source_path, target_path).await?; - - if delta_chunks.is_empty() { - return Ok(()); - } - - // 打开目标文件进行修改 - let mut file = OpenOptions::new().write(true).open(target_path).await?; - - for (i, &chunk_index) in delta_chunks.iter().enumerate() { - let chunk_offset = (chunk_index * DEFAULT_CHUNK_SIZE) as u64; - tokio::io::AsyncSeekExt::seek(&mut file, std::io::SeekFrom::Start(chunk_offset)) - .await?; - - // 使用从delta文件中读取的实际块数据进行写入 - if i < chunk_data_list.len() { - file.write_all(&chunk_data_list[i]).await?; - } - } - - Ok(()) - } -} diff --git a/crates/utils/tcp_connection/src/lib.rs b/crates/utils/tcp_connection/src/lib.rs index a2615d1..6a2e599 100644 --- a/crates/utils/tcp_connection/src/lib.rs +++ b/crates/utils/tcp_connection/src/lib.rs @@ -3,6 +3,4 @@ pub mod instance; pub mod instance_challenge; -pub mod instance_incremental_transfer; - pub mod error; diff --git a/crates/vcs/src/data/sheet.rs b/crates/vcs/src/data/sheet.rs index 95599ff..a6220c9 100644 --- a/crates/vcs/src/data/sheet.rs +++ b/crates/vcs/src/data/sheet.rs @@ -97,8 +97,8 @@ impl<'a> Sheet<'a> { Ok(()) } - /// Remove an input package from the sheet - pub fn remove_input(&mut self, input_name: &InputName) -> Option<InputPackage> { + /// Deny and remove an input package from the sheet + pub fn deny_input(&mut self, input_name: &InputName) -> Option<InputPackage> { self.data .inputs .iter() @@ -106,14 +106,127 @@ impl<'a> Sheet<'a> { .map(|pos| self.data.inputs.remove(pos)) } - /// Add a mapping entry to the sheet - pub fn add_mapping(&mut self, sheet_path: SheetPathBuf, virtual_file_id: VirtualFileId) { - self.data.mapping.insert(sheet_path, virtual_file_id); + /// Accept an input package and insert to the sheet + pub fn accept_import( + &mut self, + input_name: &InputName, + insert_to: &SheetPathBuf, + ) -> Result<(), std::io::Error> { + // Remove inputs + let input = self + .inputs() + .iter() + .position(|input| input.name == *input_name) + .map(|pos| self.data.inputs.remove(pos)); + + // Ensure input is not empty + let Some(input) = input else { + return Err(std::io::Error::new( + std::io::ErrorKind::NotFound, + "Empty inputs.", + )); + }; + + // Insert to sheet + for (relative_path, virtual_file_id) in input.files { + let _ = self.add_mapping(insert_to.join(relative_path), virtual_file_id); + } + + Ok(()) + } + + /// Add (or Edit) a mapping entry to the sheet + /// + /// This operation performs safety checks to ensure the member has the right to add the mapping: + /// 1. If the virtual file ID doesn't exist in the vault, the mapping is added directly + /// 2. If the virtual file exists, check if the member has edit rights to the virtual file + /// 3. If member has edit rights, the mapping is not allowed to be modified and returns an error + /// 4. If member doesn't have edit rights, the mapping is allowed (member is giving up the file) + /// + /// Note: Full validation adds overhead - avoid frequent calls + pub async fn add_mapping( + &mut self, + sheet_path: SheetPathBuf, + virtual_file_id: VirtualFileId, + ) -> Result<(), std::io::Error> { + // Check if the virtual file exists in the vault + if self.vault_reference.virtual_file(&virtual_file_id).is_err() { + // Virtual file doesn't exist, add the mapping directly + self.data.mapping.insert(sheet_path, virtual_file_id); + return Ok(()); + } + + // Check if the holder has edit rights to the virtual file + match self + .vault_reference + .has_virtual_file_edit_right(self.holder(), &virtual_file_id) + .await + { + Ok(false) => { + // Holder doesn't have rights, add the mapping (member is giving up the file) + self.data.mapping.insert(sheet_path, virtual_file_id); + Ok(()) + } + Ok(true) => { + // Holder has edit rights, don't allow modifying the mapping + Err(std::io::Error::new( + std::io::ErrorKind::PermissionDenied, + "Member has edit rights to the virtual file, cannot modify mapping", + )) + } + Err(_) => { + // Error checking rights, don't allow modifying the mapping + Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Failed to check virtual file edit rights", + )) + } + } } /// Remove a mapping entry from the sheet - pub fn remove_mapping(&mut self, sheet_path: &SheetPathBuf) -> Option<VirtualFileId> { - self.data.mapping.remove(sheet_path) + /// + /// This operation performs safety checks to ensure the member has the right to remove the mapping: + /// 1. Member must NOT have edit rights to the virtual file to release it (ensuring clear ownership) + /// 2. If the virtual file doesn't exist, the mapping is removed but no ID is returned + /// 3. If member has no edit rights and the file exists, returns the removed virtual file ID + /// + /// Note: Full validation adds overhead - avoid frequent calls + pub async fn remove_mapping(&mut self, sheet_path: &SheetPathBuf) -> Option<VirtualFileId> { + let virtual_file_id = match self.data.mapping.get(sheet_path) { + Some(id) => id, + None => { + // The mapping entry doesn't exist, nothing to remove + return None; + } + }; + + // Check if the virtual file exists in the vault + if self.vault_reference.virtual_file(virtual_file_id).is_err() { + // Virtual file doesn't exist, remove the mapping and return None + self.data.mapping.remove(sheet_path); + return None; + } + + // Check if the holder has edit rights to the virtual file + match self + .vault_reference + .has_virtual_file_edit_right(self.holder(), virtual_file_id) + .await + { + Ok(false) => { + // Holder doesn't have rights, remove and return the virtual file ID + self.data.mapping.remove(sheet_path) + } + Ok(true) => { + // Holder has edit rights, don't remove the mapping + None + } + Err(_) => { + // Error checking rights, don't remove the mapping + None + } + } } /// Persist the sheet to disk @@ -178,9 +291,7 @@ impl<'a> Sheet<'a> { } // Find the longest common prefix among all paths - let common_prefix = paths.iter().skip(1).fold(paths[0].clone(), |prefix, path| { - Self::common_path_prefix(prefix, path) - }); + let common_prefix = Self::find_longest_common_prefix(paths); // Create output files with optimized relative paths let files = paths @@ -209,16 +320,28 @@ impl<'a> Sheet<'a> { }) } - /// Helper function to find common path prefix between two paths - fn common_path_prefix(path1: impl Into<PathBuf>, path2: impl Into<PathBuf>) -> PathBuf { - let path1 = path1.into(); - let path2 = path2.into(); + /// Helper function to find the longest common prefix among all paths + fn find_longest_common_prefix(paths: &[SheetPathBuf]) -> PathBuf { + if paths.is_empty() { + return PathBuf::new(); + } + + let first_path = &paths[0]; + let mut common_components = Vec::new(); + + for (component_idx, first_component) in first_path.components().enumerate() { + for path in paths.iter().skip(1) { + if let Some(component) = path.components().nth(component_idx) { + if component != first_component { + return common_components.into_iter().collect(); + } + } else { + return common_components.into_iter().collect(); + } + } + common_components.push(first_component); + } - path1 - .components() - .zip(path2.components()) - .take_while(|(a, b)| a == b) - .map(|(comp, _)| comp) - .collect() + common_components.into_iter().collect() } } diff --git a/crates/vcs/src/data/vault/virtual_file.rs b/crates/vcs/src/data/vault/virtual_file.rs index 83b1c82..fe83594 100644 --- a/crates/vcs/src/data/vault/virtual_file.rs +++ b/crates/vcs/src/data/vault/virtual_file.rs @@ -225,7 +225,7 @@ impl Vault { // Create metadata let mut meta = VirtualFileMeta { current_version: FIRST_VERSION.to_string(), - hold_member: String::default(), + hold_member: member_id.clone(), // The holder of the newly created virtual file is the creator by default version_description, histories: Vec::default(), }; @@ -244,6 +244,8 @@ impl Vault { } fs::rename(receive_path, move_path).await?; + // + Ok(new_id) } Err(e) => { |
