summaryrefslogtreecommitdiff
path: root/crates/utils/tcp_connection/tcp_connection_test
diff options
context:
space:
mode:
author魏曹先生 <1992414357@qq.com>2025-09-29 13:54:28 +0800
committer魏曹先生 <1992414357@qq.com>2025-09-29 13:54:28 +0800
commit2753a38ab627369c8bffce610b3106869f26dd61 (patch)
tree367198d276a0fc55eaf5b51d5e7a9766556e1bbe /crates/utils/tcp_connection/tcp_connection_test
parent7f6a6600bbb6ec4a76e7cbd6bdbaec2875a80f5d (diff)
Add incremental transfer functionality and update TCP connection utilities
- Add instance_incremental_transfer module for handling incremental data transfers - Add test_incremental_transfer module for testing incremental transfer functionality - Update TCP connection library to support new incremental transfer features - Update Cargo.toml dependencies for TCP connection utilities - Update main library to integrate new TCP connection functionality
Diffstat (limited to 'crates/utils/tcp_connection/tcp_connection_test')
-rw-r--r--crates/utils/tcp_connection/tcp_connection_test/src/lib.rs3
-rw-r--r--crates/utils/tcp_connection/tcp_connection_test/src/test_incremental_transfer.rs284
2 files changed, 287 insertions, 0 deletions
diff --git a/crates/utils/tcp_connection/tcp_connection_test/src/lib.rs b/crates/utils/tcp_connection/tcp_connection_test/src/lib.rs
index c9372d4..c7a3adb 100644
--- a/crates/utils/tcp_connection/tcp_connection_test/src/lib.rs
+++ b/crates/utils/tcp_connection/tcp_connection_test/src/lib.rs
@@ -13,5 +13,8 @@ pub mod test_file_transfer;
#[cfg(test)]
pub mod test_msgpack;
+#[cfg(test)]
+pub mod test_incremental_transfer;
+
pub mod test_utils;
pub use test_utils::*;
diff --git a/crates/utils/tcp_connection/tcp_connection_test/src/test_incremental_transfer.rs b/crates/utils/tcp_connection/tcp_connection_test/src/test_incremental_transfer.rs
new file mode 100644
index 0000000..b4834bf
--- /dev/null
+++ b/crates/utils/tcp_connection/tcp_connection_test/src/test_incremental_transfer.rs
@@ -0,0 +1,284 @@
+#[cfg(test)]
+mod test_incremental_transfer {
+ use std::path::PathBuf;
+
+ use tokio::fs::{self, File};
+ use tokio::io::AsyncWriteExt;
+ use tokio::net::{TcpListener, TcpStream};
+
+ use tcp_connection::error::TcpTargetError;
+ use tcp_connection::instance::ConnectionConfig;
+ use tcp_connection::instance::ConnectionInstance;
+
+ const TEST_PORT: u16 = 54321;
+ const TEST_FILE_CONTENT: &str =
+ "Hello, this is a test file content for incremental transfer testing.";
+ const TEST_FILE_CONTENT_UPDATED: &str = "Hello, this is UPDATED test file content for incremental transfer testing with more changes.";
+ const TEST_FILE_CONTENT_SECOND_UPDATE: &str = "Second update with completely different content";
+
+ async fn setup_test_file(file_path: &PathBuf, content: &str) -> Result<(), TcpTargetError> {
+ if let Some(parent) = file_path.parent() {
+ fs::create_dir_all(parent).await?;
+ }
+
+ let mut file = File::create(file_path).await?;
+ file.write_all(content.as_bytes()).await?;
+ file.flush().await?;
+ Ok(())
+ }
+
+ async fn cleanup_test_file(file_path: &PathBuf) -> Result<(), TcpTargetError> {
+ if file_path.exists() {
+ fs::remove_file(file_path).await?;
+ }
+
+ // Clean up version files
+ let version_file = file_path.with_extension("v");
+ if version_file.exists() {
+ fs::remove_file(version_file).await?;
+ }
+
+ // Clean up version history directory
+ let version_dir = file_path.parent().unwrap().join("diff");
+ if version_dir.exists() {
+ fs::remove_dir_all(version_dir).await?;
+ }
+
+ Ok(())
+ }
+
+ async fn read_file_content(file_path: &PathBuf) -> Result<String, TcpTargetError> {
+ let content = fs::read_to_string(file_path).await?;
+ Ok(content)
+ }
+
+ #[tokio::test]
+ async fn test_incremental_transfer_basic_flow() {
+ let server_file = PathBuf::from("res/.temp/test_data/server_file_basic.txt");
+ let client_file = PathBuf::from("res/.temp/test_data/client_file_basic.txt");
+
+ // Setup test files
+ setup_test_file(&server_file, TEST_FILE_CONTENT)
+ .await
+ .unwrap();
+ setup_test_file(&client_file, TEST_FILE_CONTENT)
+ .await
+ .unwrap();
+
+ let listener = TcpListener::bind(format!("127.0.0.1:{}", TEST_PORT))
+ .await
+ .unwrap();
+ let server_addr = listener.local_addr().unwrap();
+
+ // Server task
+ let server_file_clone = server_file.clone();
+ let server_handle = tokio::spawn(async move {
+ println!("Server: Waiting for client connection...");
+ let (stream, _) = listener.accept().await.unwrap();
+ println!("Server: Client connected");
+ let config = ConnectionConfig::default();
+ let mut server_instance = ConnectionInstance::with_config(stream, config);
+
+ println!("Server: Handling client update request...");
+ // Handle client update request
+ server_instance
+ .server_handle_client_update(&server_file_clone)
+ .await
+ .unwrap();
+ println!("Server: Client update handled successfully");
+ });
+
+ // Client task
+ let client_file_clone = client_file.clone();
+ let client_handle = tokio::spawn(async move {
+ tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
+
+ println!("Client: Connecting to server...");
+ let stream = TcpStream::connect(server_addr).await.unwrap();
+ let config = ConnectionConfig::default();
+ let mut client_instance = ConnectionInstance::with_config(stream, config);
+
+ println!("Client: Starting update to version 1...");
+ // Update client file to version 1
+ client_instance
+ .client_update_to_version(&client_file_clone, 1)
+ .await
+ .unwrap();
+ println!("Client: Update completed successfully");
+ });
+
+ // Wait for both tasks to complete
+ let (server_result, client_result) = tokio::join!(server_handle, client_handle);
+ server_result.unwrap();
+ client_result.unwrap();
+
+ // Verify both files still have the same content
+ let server_content = read_file_content(&server_file).await.unwrap();
+ let client_content = read_file_content(&client_file).await.unwrap();
+
+ assert_eq!(server_content, client_content);
+ assert_eq!(server_content, TEST_FILE_CONTENT);
+
+ // Cleanup
+ cleanup_test_file(&server_file).await.unwrap();
+ cleanup_test_file(&client_file).await.unwrap();
+ }
+
+ #[tokio::test]
+ async fn test_incremental_upload_basic_flow() {
+ let server_file = PathBuf::from("res/.temp/test_data/server_file_upload.txt");
+ let client_file = PathBuf::from("res/.temp/test_data/client_file_upload.txt");
+
+ // Setup test files - client has updated content
+ setup_test_file(&server_file, TEST_FILE_CONTENT)
+ .await
+ .unwrap();
+ setup_test_file(&client_file, TEST_FILE_CONTENT_UPDATED)
+ .await
+ .unwrap();
+
+ let listener = TcpListener::bind(format!("127.0.0.1:{}", TEST_PORT + 1))
+ .await
+ .unwrap();
+ let server_addr = listener.local_addr().unwrap();
+
+ // Server task
+ let server_file_clone = server_file.clone();
+ let server_handle = tokio::spawn(async move {
+ let (stream, _) = listener.accept().await.unwrap();
+ let config = ConnectionConfig::default();
+ let mut server_instance = ConnectionInstance::with_config(stream, config);
+
+ // Handle client upload request
+ server_instance
+ .server_handle_client_update(&server_file_clone)
+ .await
+ .unwrap();
+ });
+
+ // Client task
+ let client_file_clone = client_file.clone();
+ let client_handle = tokio::spawn(async move {
+ tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
+
+ let stream = TcpStream::connect(server_addr).await.unwrap();
+ let config = ConnectionConfig::default();
+ let mut client_instance = ConnectionInstance::with_config(stream, config);
+
+ // Upload client changes to server
+ let new_version = client_instance
+ .client_upload(&client_file_clone)
+ .await
+ .unwrap();
+ assert_eq!(new_version, 1); // First upload should create version 1
+ });
+
+ // Wait for both tasks to complete
+ let (server_result, client_result) = tokio::join!(server_handle, client_handle);
+ server_result.unwrap();
+ client_result.unwrap();
+
+ // Verify server file was updated with client content
+ let server_content = read_file_content(&server_file).await.unwrap();
+ let expected_content = TEST_FILE_CONTENT_UPDATED;
+
+ assert_eq!(server_content, expected_content);
+
+ // Cleanup
+ cleanup_test_file(&server_file).await.unwrap();
+ cleanup_test_file(&client_file).await.unwrap();
+ }
+
+ #[tokio::test]
+ async fn test_version_increment_after_upload() {
+ let server_file = PathBuf::from("res/.temp/test_data/server_file_version.txt");
+ let client_file = PathBuf::from("res/.temp/test_data/client_file_version.txt");
+
+ setup_test_file(&server_file, TEST_FILE_CONTENT)
+ .await
+ .unwrap();
+ setup_test_file(&client_file, TEST_FILE_CONTENT_UPDATED)
+ .await
+ .unwrap();
+
+ let listener = TcpListener::bind(format!("127.0.0.1:{}", TEST_PORT + 2))
+ .await
+ .unwrap();
+ let server_addr = listener.local_addr().unwrap();
+
+ // First upload
+ let server_file_clone = server_file.clone();
+ let server_handle1 = tokio::spawn(async move {
+ let (stream, _) = listener.accept().await.unwrap();
+ let config = ConnectionConfig::default();
+ let mut server_instance = ConnectionInstance::with_config(stream, config);
+ server_instance
+ .server_handle_client_update(&server_file_clone)
+ .await
+ .unwrap();
+ });
+
+ let client_file_clone = client_file.clone();
+ let client_handle1 = tokio::spawn(async move {
+ tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
+ let stream = TcpStream::connect(server_addr).await.unwrap();
+ let config = ConnectionConfig::default();
+ let mut client_instance = ConnectionInstance::with_config(stream, config);
+ let version = client_instance
+ .client_upload(&client_file_clone)
+ .await
+ .unwrap();
+ assert_eq!(version, 1);
+ });
+
+ let (server_result1, client_result1) = tokio::join!(server_handle1, client_handle1);
+ server_result1.unwrap();
+ client_result1.unwrap();
+
+ // Second upload with different content
+ let updated_content2 = TEST_FILE_CONTENT_SECOND_UPDATE;
+ setup_test_file(&client_file, updated_content2)
+ .await
+ .unwrap();
+
+ let listener2 = TcpListener::bind(format!("127.0.0.1:{}", TEST_PORT + 3))
+ .await
+ .unwrap();
+ let server_addr2 = listener2.local_addr().unwrap();
+
+ let server_file_clone2 = server_file.clone();
+ let server_handle2 = tokio::spawn(async move {
+ let (stream, _) = listener2.accept().await.unwrap();
+ let config = ConnectionConfig::default();
+ let mut server_instance = ConnectionInstance::with_config(stream, config);
+ server_instance
+ .server_handle_client_update(&server_file_clone2)
+ .await
+ .unwrap();
+ });
+
+ let client_file_clone2 = client_file.clone();
+ let client_handle2 = tokio::spawn(async move {
+ tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
+ let stream = TcpStream::connect(server_addr2).await.unwrap();
+ let config = ConnectionConfig::default();
+ let mut client_instance = ConnectionInstance::with_config(stream, config);
+ let version = client_instance
+ .client_upload(&client_file_clone2)
+ .await
+ .unwrap();
+ assert_eq!(version, 2); // Should increment to version 2
+ });
+
+ let (server_result2, client_result2) = tokio::join!(server_handle2, client_handle2);
+ server_result2.unwrap();
+ client_result2.unwrap();
+
+ // Verify final content
+ let server_content = read_file_content(&server_file).await.unwrap();
+ assert_eq!(server_content, updated_content2);
+
+ cleanup_test_file(&server_file).await.unwrap();
+ cleanup_test_file(&client_file).await.unwrap();
+ }
+}