summaryrefslogtreecommitdiff
path: root/crates/vcs_actions/src
diff options
context:
space:
mode:
Diffstat (limited to 'crates/vcs_actions/src')
-rw-r--r--crates/vcs_actions/src/actions.rs74
-rw-r--r--crates/vcs_actions/src/actions/local_actions.rs248
-rw-r--r--crates/vcs_actions/src/actions/virtual_file_actions.rs465
3 files changed, 749 insertions, 38 deletions
diff --git a/crates/vcs_actions/src/actions.rs b/crates/vcs_actions/src/actions.rs
index 51186fb..81dcd96 100644
--- a/crates/vcs_actions/src/actions.rs
+++ b/crates/vcs_actions/src/actions.rs
@@ -1,11 +1,18 @@
use std::sync::Arc;
use action_system::action::ActionContext;
+use cfg_file::config::ConfigFile;
use tcp_connection::{error::TcpTargetError, instance::ConnectionInstance};
use tokio::sync::Mutex;
use vcs_data::{
constants::SERVER_PATH_MEMBER_PUB,
- data::{local::LocalWorkspace, member::MemberId, user::UserDirectory, vault::Vault},
+ data::{
+ local::{LocalWorkspace, config::LocalConfig},
+ member::MemberId,
+ sheet::SheetName,
+ user::UserDirectory,
+ vault::Vault,
+ },
};
pub mod local_actions;
@@ -117,6 +124,71 @@ pub async fn auth_member(
Err(TcpTargetError::NoResult("Auth failed.".to_string()))
}
+/// Get the current sheet name based on the context (local or remote).
+/// This function handles the communication between local and remote instances
+/// to verify and retrieve the current sheet name.
+///
+/// On local:
+/// - Reads the current sheet from local configuration
+/// - Sends the sheet name to remote for verification
+/// - Returns the sheet name if remote confirms it exists
+///
+/// On remote:
+/// - Receives sheet name from local
+/// - Verifies the sheet exists in the vault
+/// - Sends confirmation back to local
+///
+/// Returns the verified sheet name or an error if the sheet doesn't exist
+pub async fn get_current_sheet_name(
+ ctx: &ActionContext,
+ instance: &Arc<Mutex<ConnectionInstance>>,
+ member_id: &MemberId,
+) -> Result<SheetName, TcpTargetError> {
+ let mut mut_instance = instance.lock().await;
+ if ctx.is_proc_on_local() {
+ let config = LocalConfig::read().await?;
+ if let Some(sheet_name) = config.sheet_in_use() {
+ // Send sheet name
+ mut_instance.write_msgpack(sheet_name).await?;
+
+ // Read result
+ if mut_instance.read_msgpack::<bool>().await? {
+ return Ok(sheet_name.clone());
+ } else {
+ return Err(TcpTargetError::NotFound("Sheet not found".to_string()));
+ }
+ }
+ // Send empty sheet_name
+ mut_instance.write_msgpack("".to_string()).await?;
+
+ // Read result, since we know it's impossible to pass here, we just consume this result
+ let _ = mut_instance.read_msgpack::<bool>().await?;
+
+ return Err(TcpTargetError::NotFound("Sheet not found".to_string()));
+ }
+ if ctx.is_proc_on_remote() {
+ let vault = try_get_vault(&ctx)?;
+
+ // Read sheet name
+ let sheet_name: SheetName = mut_instance.read_msgpack().await?;
+
+ // Check if sheet exists
+ if let Ok(sheet) = vault.sheet(&sheet_name).await {
+ if let Some(holder) = sheet.holder() {
+ if holder == member_id {
+ // Tell local the check is passed
+ mut_instance.write_msgpack(true).await?;
+ return Ok(sheet_name.clone());
+ }
+ }
+ }
+ // Tell local the check is not passed
+ mut_instance.write_msgpack(false).await?;
+ return Err(TcpTargetError::NotFound("Sheet not found".to_string()));
+ }
+ return Err(TcpTargetError::NoResult("NoResult".to_string()));
+}
+
/// The macro to write and return a result.
#[macro_export]
macro_rules! write_and_return {
diff --git a/crates/vcs_actions/src/actions/local_actions.rs b/crates/vcs_actions/src/actions/local_actions.rs
index 869d14b..c9e5db3 100644
--- a/crates/vcs_actions/src/actions/local_actions.rs
+++ b/crates/vcs_actions/src/actions/local_actions.rs
@@ -1,18 +1,27 @@
-use std::net::SocketAddr;
+use std::{
+ collections::HashMap,
+ io::{Error, ErrorKind},
+ net::SocketAddr,
+ path::PathBuf,
+};
use action_system::{action::ActionContext, macros::action_gen};
use cfg_file::config::ConfigFile;
use log::info;
use serde::{Deserialize, Serialize};
use tcp_connection::error::TcpTargetError;
+use tokio::time::Instant;
use vcs_data::data::{
local::{
cached_sheet::CachedSheet,
config::LocalConfig,
latest_info::{LatestInfo, SheetInfo},
+ local_sheet::LocalSheetData,
+ member_held::MemberHeld,
},
+ member::MemberId,
sheet::{SheetData, SheetName},
- vault::config::VaultUuid,
+ vault::{config::VaultUuid, virtual_file::VirtualFileId},
};
use crate::actions::{
@@ -114,6 +123,12 @@ pub enum UpdateToLatestInfoResult {
// Fail
AuthorizeFailed(String),
+ SyncCachedSheetFail(SyncCachedSheetFailReason),
+}
+
+#[derive(Serialize, Deserialize)]
+pub enum SyncCachedSheetFailReason {
+ PathAlreadyExist(PathBuf),
}
#[action_gen]
@@ -128,6 +143,8 @@ pub async fn update_to_latest_info_action(
Err(e) => return Ok(UpdateToLatestInfoResult::AuthorizeFailed(e.to_string())),
};
+ info!("Sending latest info to {}", member_id);
+
// Sync Latest Info
{
if ctx.is_proc_on_remote() {
@@ -174,15 +191,18 @@ pub async fn update_to_latest_info_action(
}
if ctx.is_proc_on_local() {
- let latest_info = instance
+ let mut latest_info = instance
.lock()
.await
.read_large_msgpack::<LatestInfo>(512 as u16)
.await?;
+ latest_info.update_instant = Some(Instant::now());
LatestInfo::write(&latest_info).await?;
}
}
+ info!("Update sheets to {}", member_id);
+
// Sync Remote Sheets
{
if ctx.is_proc_on_local() {
@@ -195,10 +215,10 @@ pub async fn update_to_latest_info_action(
// Collect all local versions
let mut local_versions = vec![];
for request_sheet in latest_info.my_sheets {
- let Ok(data) =
- CachedSheet::cached_sheet_data(member_id.clone(), request_sheet.clone()).await
- else {
- local_versions.push((request_sheet, 0));
+ let Ok(data) = CachedSheet::cached_sheet_data(&request_sheet).await else {
+ // For newly created sheets, the version is 0.
+ // Send -1 to distinguish from 0, ensuring the upstream will definitely send the sheet information
+ local_versions.push((request_sheet, -1));
continue;
};
local_versions.push((request_sheet, data.write_count()));
@@ -209,12 +229,53 @@ pub async fn update_to_latest_info_action(
instance.lock().await.write_msgpack(local_versions).await?;
if len < 1 {
- return Ok(UpdateToLatestInfoResult::Success);
- }
- }
+ // Don't return here, continue to next section
+ } else {
+ // Send data to local
+ if ctx.is_proc_on_remote() {
+ let vault = try_get_vault(&ctx)?;
+ let mut mut_instance = instance.lock().await;
+
+ let local_versions =
+ mut_instance.read_msgpack::<Vec<(SheetName, i32)>>().await?;
+
+ for (sheet_name, local_write_count) in local_versions.iter() {
+ let sheet = vault.sheet(sheet_name).await?;
+ if let Some(holder) = sheet.holder() {
+ if holder == &member_id && &sheet.write_count() != local_write_count {
+ mut_instance.write_msgpack(true).await?;
+ mut_instance
+ .write_large_msgpack((sheet_name, sheet.to_data()), 1024u16)
+ .await?;
+ }
+ }
+ }
+ mut_instance.write_msgpack(false).await?;
+ }
- // Send data to local
- if ctx.is_proc_on_remote() {
+ // Receive data
+ if ctx.is_proc_on_local() {
+ let mut mut_instance = instance.lock().await;
+ loop {
+ let in_coming: bool = mut_instance.read_msgpack().await?;
+ if in_coming {
+ let (sheet_name, data): (SheetName, SheetData) =
+ mut_instance.read_large_msgpack(1024u16).await?;
+
+ let Some(path) = CachedSheet::cached_sheet_path(sheet_name) else {
+ return Err(TcpTargetError::NotFound(
+ "Workspace not found".to_string(),
+ ));
+ };
+
+ SheetData::write_to(&data, path).await?;
+ } else {
+ break;
+ }
+ }
+ }
+ }
+ } else if ctx.is_proc_on_remote() {
let vault = try_get_vault(&ctx)?;
let mut mut_instance = instance.lock().await;
@@ -232,30 +293,159 @@ pub async fn update_to_latest_info_action(
}
}
mut_instance.write_msgpack(false).await?;
- return Ok(UpdateToLatestInfoResult::Success);
}
+ }
- // Receive data
+ info!("Fetch held status to {}", member_id);
+
+ // Sync Held Info
+ {
if ctx.is_proc_on_local() {
+ let Ok(latest_info) = LatestInfo::read().await else {
+ return Err(TcpTargetError::NotFound(
+ "Latest info not found.".to_string(),
+ ));
+ };
+
+ // Collect files that need to know the holder
+ let mut holder_wants_know = Vec::new();
+ for sheet_name in &latest_info.my_sheets {
+ if let Ok(sheet_data) = CachedSheet::cached_sheet_data(sheet_name).await {
+ holder_wants_know
+ .extend(sheet_data.mapping().values().map(|value| value.id.clone()));
+ }
+ }
+
+ // Send request
let mut mut_instance = instance.lock().await;
- loop {
- let in_coming: bool = mut_instance.read_msgpack().await?;
- if in_coming {
- let (sheet_name, data): (SheetName, SheetData) =
- mut_instance.read_large_msgpack(1024u16).await?;
-
- let Some(path) = CachedSheet::cached_sheet_path(member_id.clone(), sheet_name)
- else {
- return Err(TcpTargetError::NotFound("Workspace not found".to_string()));
- };
-
- SheetData::write_to(&data, path).await?;
- } else {
- return Ok(UpdateToLatestInfoResult::Success);
+ mut_instance
+ .write_large_msgpack(&holder_wants_know, 1024u16)
+ .await?;
+
+ // Receive information and write to local
+ let result: HashMap<VirtualFileId, Option<MemberId>> =
+ mut_instance.read_large_msgpack(1024u16).await?;
+
+ // Read configuration file
+ let path = MemberHeld::held_file_path(&member_id)?;
+ let mut member_held = match MemberHeld::read_from(&path).await {
+ Ok(r) => r,
+ Err(_) => MemberHeld::default(),
+ };
+
+ // Write the received information
+ member_held.update_held_status(result);
+
+ // Write
+ MemberHeld::write_to(&member_held, &path).await?;
+ }
+
+ if ctx.is_proc_on_remote() {
+ let vault = try_get_vault(&ctx)?;
+ let mut mut_instance = instance.lock().await;
+
+ // Read the request
+ let holder_wants_know: Vec<VirtualFileId> =
+ mut_instance.read_large_msgpack(1024u16).await?;
+
+ // Organize the information
+ let mut result: HashMap<VirtualFileId, Option<MemberId>> = HashMap::new();
+ for id in holder_wants_know {
+ let Ok(meta) = vault.virtual_file_meta(&id).await else {
+ continue;
+ };
+ result.insert(
+ id,
+ if meta.hold_member().is_empty() {
+ None
+ } else {
+ Some(meta.hold_member().to_string())
+ },
+ );
+ }
+
+ // Send information
+ mut_instance.write_large_msgpack(&result, 1024u16).await?;
+ }
+ }
+
+ // Sync cached sheet to local sheet
+ if ctx.is_proc_on_local() {
+ let workspace = try_get_local_workspace(&ctx)?;
+ let local_sheet_paths =
+ extract_sheet_names_from_paths(workspace.local_sheet_paths().await?)?;
+ let cached_sheet_paths =
+ extract_sheet_names_from_paths(CachedSheet::cached_sheet_paths().await?)?;
+
+ // Match cached sheets and local heets, and sync content
+ for (cached_sheet_name, _cached_sheet_path) in cached_sheet_paths {
+ // Get local sheet path by cached_sheet_name
+ let Some(local_sheet_path) = local_sheet_paths.get(&cached_sheet_name) else {
+ continue;
+ };
+
+ // Read cached sheet and local sheet
+ let cached_sheet = CachedSheet::cached_sheet_data(&cached_sheet_name).await?;
+ let Ok(local_sheet_data) = LocalSheetData::read_from(local_sheet_path).await else {
+ continue;
+ };
+ let mut local_sheet =
+ local_sheet_data.wrap_to_local_sheet(&workspace, "".to_string(), "".to_string());
+
+ // Read cached id mapping
+ let Some(cached_sheet_id_mapping) = cached_sheet.id_mapping() else {
+ continue;
+ };
+
+ for (cached_item_id, cached_item_path) in cached_sheet_id_mapping.iter() {
+ let path_by_id = { local_sheet.path_by_id(cached_item_id).cloned() };
+
+ // Get local path
+ let Some(local_path) = path_by_id else {
+ continue;
+ };
+
+ if &local_path == cached_item_path {
+ continue;
+ }
+
+ // If path not match, try to move
+ let move_result = local_sheet.move_mapping(&local_path, cached_item_path);
+ match move_result {
+ Err(e) => match e.kind() {
+ ErrorKind::AlreadyExists => {
+ return Ok(UpdateToLatestInfoResult::SyncCachedSheetFail(
+ SyncCachedSheetFailReason::PathAlreadyExist(
+ cached_item_path.clone(),
+ ),
+ ));
+ }
+ _ => return Err(e.into()),
+ },
+ _ => {}
}
+ local_sheet.write_to_path(&local_sheet_path).await?
}
}
}
- Err(TcpTargetError::NoResult("No result.".to_string()))
+ Ok(UpdateToLatestInfoResult::Success)
+}
+
+/// Extract sheet names from file paths
+fn extract_sheet_names_from_paths(
+ paths: Vec<PathBuf>,
+) -> Result<HashMap<SheetName, PathBuf>, std::io::Error> {
+ let mut result = HashMap::new();
+ for p in paths {
+ let sheet_name = p
+ .file_stem()
+ .and_then(|s| s.to_str())
+ .map(|s| s.to_string())
+ .ok_or_else(|| {
+ std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid file name")
+ })?;
+ result.insert(sheet_name, p);
+ }
+ Ok(result)
}
diff --git a/crates/vcs_actions/src/actions/virtual_file_actions.rs b/crates/vcs_actions/src/actions/virtual_file_actions.rs
index 3e801b0..fa71f1b 100644
--- a/crates/vcs_actions/src/actions/virtual_file_actions.rs
+++ b/crates/vcs_actions/src/actions/virtual_file_actions.rs
@@ -1,32 +1,481 @@
-use std::path::PathBuf;
+use std::{collections::HashSet, path::PathBuf, sync::Arc, time::SystemTime};
use action_system::{action::ActionContext, macros::action_gen};
+use cfg_file::config::ConfigFile;
use serde::{Deserialize, Serialize};
-use tcp_connection::error::TcpTargetError;
+use tcp_connection::{error::TcpTargetError, instance::ConnectionInstance};
+use tokio::{sync::Mutex, time::Instant};
+use vcs_data::data::{
+ local::{
+ cached_sheet::CachedSheet, file_status::AnalyzeResult, local_sheet::LocalMappingMetadata,
+ member_held::MemberHeld,
+ },
+ member::MemberId,
+ sheet::SheetName,
+ vault::virtual_file::{VirtualFileId, VirtualFileVersion, VirtualFileVersionDescription},
+};
-use crate::actions::{auth_member, check_connection_instance};
+use crate::actions::{
+ auth_member, check_connection_instance, get_current_sheet_name, try_get_local_workspace,
+ try_get_vault,
+};
+
+#[derive(Serialize, Deserialize)]
+pub struct TrackFileActionArguments {
+ // Path need to track
+ pub relative_pathes: HashSet<PathBuf>,
+
+ // Display progress bar
+ pub display_progressbar: bool,
+}
#[derive(Serialize, Deserialize)]
pub enum TrackFileActionResult {
- Success,
+ Done {
+ created: Vec<PathBuf>,
+ updated: Vec<PathBuf>,
+ synced: Vec<PathBuf>,
+ },
// Fail
AuthorizeFailed(String),
+
+ /// There are local move or missing items that have not been resolved,
+ /// this situation does not allow track
+ StructureChangesNotSolved,
+
+ CreateTaskFailed(CreateTaskResult),
+ UpdateTaskFailed(UpdateTaskResult),
+ SyncTaskFailed(SyncTaskResult),
+}
+
+#[derive(Serialize, Deserialize)]
+pub enum CreateTaskResult {
+ Success(Vec<PathBuf>), // Success(success_relative_pathes)
+
+ /// Create file on existing path in the sheet
+ CreateFileOnExistPath(PathBuf),
+
+ /// Sheet not found
+ SheetNotFound(SheetName),
+}
+
+#[derive(Serialize, Deserialize)]
+pub enum UpdateTaskResult {
+ Success(Vec<PathBuf>), // Success(success_relative_pathes)
}
+#[derive(Serialize, Deserialize)]
+pub enum SyncTaskResult {
+ Success(Vec<PathBuf>), // Success(success_relative_pathes)
+}
#[action_gen]
pub async fn track_file_action(
ctx: ActionContext,
- relative_pathes: Vec<PathBuf>,
+ arguments: TrackFileActionArguments,
) -> Result<TrackFileActionResult, TcpTargetError> {
+ let relative_pathes = arguments.relative_pathes;
let instance = check_connection_instance(&ctx)?;
// Auth Member
- if let Err(e) = auth_member(&ctx, instance).await {
- return Ok(TrackFileActionResult::AuthorizeFailed(e.to_string()));
+ let member_id = match auth_member(&ctx, instance).await {
+ Ok(id) => id,
+ Err(e) => return Ok(TrackFileActionResult::AuthorizeFailed(e.to_string())),
};
- if ctx.is_proc_on_local() {}
+ // Check sheet
+ let sheet_name = get_current_sheet_name(&ctx, instance, &member_id).await?;
+
+ if ctx.is_proc_on_local() {
+ let workspace = try_get_local_workspace(&ctx)?;
+ let analyzed = AnalyzeResult::analyze_local_status(&workspace).await?;
+
+ if !analyzed.lost.is_empty() || !analyzed.moved.is_empty() {
+ return Ok(TrackFileActionResult::StructureChangesNotSolved);
+ }
+
+ let Some(sheet_in_use) = workspace.config().lock().await.sheet_in_use().clone() else {
+ return Err(TcpTargetError::NotFound("Sheet not found!".to_string()));
+ };
+
+ // Read local sheet and member held
+ let local_sheet = workspace.local_sheet(&member_id, &sheet_in_use).await?;
+ let cached_sheet = CachedSheet::cached_sheet_data(&sheet_in_use).await?;
+ let member_held = MemberHeld::read_from(MemberHeld::held_file_path(&member_id)?).await?;
+
+ let modified = analyzed
+ .modified
+ .intersection(&relative_pathes)
+ .cloned()
+ .collect::<HashSet<_>>();
+
+ // Filter out created files
+ let created_task = analyzed
+ .created
+ .intersection(&relative_pathes)
+ .cloned()
+ .collect::<HashSet<_>>();
+
+ // Filter out modified files that need to be updated
+ let update_task: HashSet<PathBuf> = {
+ let result = modified.iter().filter_map(|p| {
+ if let Ok(data) = local_sheet.mapping_data(p) {
+ let id = data.mapping_vfid();
+ if let Some(held_member) = member_held.file_holder(id) {
+ if held_member == &member_id {
+ return Some(p.clone());
+ }
+ }
+ };
+ return None;
+ });
+ result.collect()
+ };
+
+ // Filter out files that do not exist locally or have version inconsistencies and need to be synchronized
+ let sync_task: HashSet<PathBuf> = {
+ let other = relative_pathes
+ .difference(&created_task)
+ .cloned()
+ .collect::<HashSet<_>>()
+ .difference(&update_task)
+ .cloned()
+ .collect::<HashSet<_>>();
+
+ let result = other.iter().filter_map(|p| {
+ // In cached sheet
+ let Some(cached_sheet_mapping) = cached_sheet.mapping().get(p) else {
+ return None;
+ };
+
+ // Check if path mapping at local sheet
+ if let Ok(data) = local_sheet.mapping_data(p) {
+ // Version does not match
+ if data.version_when_updated() != &cached_sheet_mapping.version {
+ return Some(p.clone());
+ }
+
+ // File modified
+ if modified.contains(p) {
+ return Some(p.clone());
+ }
+ }
+
+ return None;
+ });
+ result.collect()
+ };
+
+ // Package tasks
+ let tasks: (HashSet<PathBuf>, HashSet<PathBuf>, HashSet<PathBuf>) =
+ (created_task, update_task, sync_task);
+
+ // Send to remote
+ {
+ let mut mut_instance = instance.lock().await;
+ mut_instance
+ .write_large_msgpack(tasks.clone(), 1024u16)
+ .await?;
+ // Drop mutex here
+ }
+
+ // Process create tasks
+ let success_create =
+ match proc_create_tasks_local(&ctx, instance.clone(), &member_id, &sheet_name, tasks.0)
+ .await
+ {
+ Ok(r) => match r {
+ CreateTaskResult::Success(relative_pathes) => relative_pathes,
+ _ => {
+ return Ok(TrackFileActionResult::CreateTaskFailed(r));
+ }
+ },
+ Err(e) => return Err(e),
+ };
+
+ // Process update tasks
+ let success_update =
+ match proc_update_tasks_local(&ctx, instance.clone(), &member_id, &sheet_name, tasks.1)
+ .await
+ {
+ Ok(r) => match r {
+ UpdateTaskResult::Success(relative_pathes) => relative_pathes,
+ _ => {
+ return Ok(TrackFileActionResult::UpdateTaskFailed(r));
+ }
+ },
+ Err(e) => return Err(e),
+ };
+
+ // Process sync tasks
+ let success_sync =
+ match proc_sync_tasks_local(&ctx, instance.clone(), &member_id, &sheet_name, tasks.2)
+ .await
+ {
+ Ok(r) => match r {
+ SyncTaskResult::Success(relative_pathes) => relative_pathes,
+ _ => {
+ return Ok(TrackFileActionResult::SyncTaskFailed(r));
+ }
+ },
+ Err(e) => return Err(e),
+ };
+
+ return Ok(TrackFileActionResult::Done {
+ created: success_create,
+ updated: success_update,
+ synced: success_sync,
+ });
+ }
+
+ if ctx.is_proc_on_remote() {
+ // Read tasks
+ let (created_task, update_task, sync_task): (
+ HashSet<PathBuf>,
+ HashSet<PathBuf>,
+ HashSet<PathBuf>,
+ ) = {
+ let mut mut_instance = instance.lock().await;
+ mut_instance.read_large_msgpack(1024u16).await?
+ };
+
+ // Process create tasks
+ let success_create = match proc_create_tasks_remote(
+ &ctx,
+ instance.clone(),
+ &member_id,
+ &sheet_name,
+ created_task,
+ )
+ .await
+ {
+ Ok(r) => match r {
+ CreateTaskResult::Success(relative_pathes) => relative_pathes,
+ _ => {
+ return Ok(TrackFileActionResult::CreateTaskFailed(r));
+ }
+ },
+ Err(e) => return Err(e),
+ };
+
+ // Process update tasks
+ let success_update = match proc_update_tasks_remote(
+ &ctx,
+ instance.clone(),
+ &member_id,
+ &sheet_name,
+ update_task,
+ )
+ .await
+ {
+ Ok(r) => match r {
+ UpdateTaskResult::Success(relative_pathes) => relative_pathes,
+ _ => {
+ return Ok(TrackFileActionResult::UpdateTaskFailed(r));
+ }
+ },
+ Err(e) => return Err(e),
+ };
+
+ // Process sync tasks
+ let success_sync = match proc_sync_tasks_remote(
+ &ctx,
+ instance.clone(),
+ &member_id,
+ &sheet_name,
+ sync_task,
+ )
+ .await
+ {
+ Ok(r) => match r {
+ SyncTaskResult::Success(relative_pathes) => relative_pathes,
+ _ => {
+ return Ok(TrackFileActionResult::SyncTaskFailed(r));
+ }
+ },
+ Err(e) => return Err(e),
+ };
+
+ return Ok(TrackFileActionResult::Done {
+ created: success_create,
+ updated: success_update,
+ synced: success_sync,
+ });
+ }
Err(TcpTargetError::NoResult("No result.".to_string()))
}
+
+async fn proc_create_tasks_local(
+ ctx: &ActionContext,
+ instance: Arc<Mutex<ConnectionInstance>>,
+ member_id: &MemberId,
+ sheet_name: &SheetName,
+ relative_paths: HashSet<PathBuf>,
+) -> Result<CreateTaskResult, TcpTargetError> {
+ let workspace = try_get_local_workspace(&ctx)?;
+ let mut mut_instance = instance.lock().await;
+ let mut local_sheet = workspace.local_sheet(member_id, sheet_name).await?;
+
+ // Wait for remote detection of whether the sheet exists
+ let has_sheet = mut_instance.read_msgpack::<bool>().await?;
+ if !has_sheet {
+ return Ok(CreateTaskResult::SheetNotFound(sheet_name.clone()));
+ }
+
+ // Wait for remote detection of whether the file exists
+ let (hasnt_duplicate, duplicate_path) = mut_instance.read_msgpack::<(bool, PathBuf)>().await?;
+ if !hasnt_duplicate {
+ return Ok(CreateTaskResult::CreateFileOnExistPath(duplicate_path));
+ }
+
+ let mut success_relative_pathes = Vec::new();
+
+ // Start sending files
+ for path in relative_paths {
+ let full_path = workspace.local_path().join(&path);
+
+ // Send file
+ if let Err(_) = mut_instance.write_file(&full_path).await {
+ continue;
+ }
+
+ // Read virtual file id and version
+ let (vfid, version, version_desc) = mut_instance
+ .read_msgpack::<(
+ VirtualFileId,
+ VirtualFileVersion,
+ VirtualFileVersionDescription,
+ )>()
+ .await?;
+
+ // Add mapping to local sheet
+ let hash = sha1_hash::calc_sha1(&full_path, 2048).await.unwrap().hash;
+ let time = std::fs::metadata(&full_path)?.modified()?;
+ local_sheet.add_mapping(
+ path.clone(),
+ LocalMappingMetadata::new(
+ hash, // hash_when_updated
+ time, // time_when_updated
+ std::fs::metadata(&full_path)?.len(), // size_when_updated
+ version_desc, // version_desc_when_updated
+ version, // version_when_updated
+ vfid, // mapping_vfid
+ time, // last_modifiy_check_itme
+ false, // last_modifiy_check_result
+ ),
+ )?;
+
+ success_relative_pathes.push(path);
+ }
+
+ // Write local sheet
+ local_sheet.write().await?;
+
+ Ok(CreateTaskResult::Success(success_relative_pathes))
+}
+
+async fn proc_create_tasks_remote(
+ ctx: &ActionContext,
+ instance: Arc<Mutex<ConnectionInstance>>,
+ member_id: &MemberId,
+ sheet_name: &SheetName,
+ relative_paths: HashSet<PathBuf>,
+) -> Result<CreateTaskResult, TcpTargetError> {
+ let vault = try_get_vault(&ctx)?;
+ let mut mut_instance = instance.lock().await;
+
+ // Sheet check
+ let Ok(mut sheet) = vault.sheet(sheet_name).await else {
+ // Sheet not found
+ mut_instance.write_msgpack(false).await?;
+ return Ok(CreateTaskResult::SheetNotFound(sheet_name.to_string()));
+ };
+ mut_instance.write_msgpack(true).await?;
+
+ // Duplicate create precheck
+ for path in relative_paths.iter() {
+ if sheet.mapping().contains_key(path) {
+ // Duplicate file
+ mut_instance.write_msgpack((false, path)).await?;
+ return Ok(CreateTaskResult::CreateFileOnExistPath(path.clone()));
+ }
+ }
+ mut_instance.write_msgpack((true, PathBuf::new())).await?;
+
+ let mut success_relative_pathes = Vec::new();
+
+ // Start receiving files
+ for path in relative_paths {
+ // Read file and create virtual file
+ let Ok(vfid) = vault
+ .create_virtual_file_from_connection(&mut mut_instance, member_id)
+ .await
+ else {
+ continue;
+ };
+
+ // Record virtual file to sheet
+ let vf_meta = vault.virtual_file(&vfid)?.read_meta().await?;
+ sheet
+ .add_mapping(path.clone(), vfid.clone(), vf_meta.version_latest())
+ .await?;
+
+ // Tell client the virtual file id and version
+ mut_instance
+ .write_msgpack((
+ vfid,
+ vf_meta.version_latest(),
+ vf_meta
+ .version_description(vf_meta.version_latest())
+ .unwrap(),
+ ))
+ .await?;
+
+ success_relative_pathes.push(path);
+ }
+
+ sheet.persist().await?;
+
+ Ok(CreateTaskResult::Success(success_relative_pathes))
+}
+
+async fn proc_update_tasks_local(
+ ctx: &ActionContext,
+ instance: Arc<Mutex<ConnectionInstance>>,
+ member_id: &MemberId,
+ sheet_name: &SheetName,
+ relative_paths: HashSet<PathBuf>,
+) -> Result<UpdateTaskResult, TcpTargetError> {
+ Ok(UpdateTaskResult::Success(Vec::new()))
+}
+
+async fn proc_update_tasks_remote(
+ ctx: &ActionContext,
+ instance: Arc<Mutex<ConnectionInstance>>,
+ member_id: &MemberId,
+ sheet_name: &SheetName,
+ relative_paths: HashSet<PathBuf>,
+) -> Result<UpdateTaskResult, TcpTargetError> {
+ Ok(UpdateTaskResult::Success(Vec::new()))
+}
+
+async fn proc_sync_tasks_local(
+ ctx: &ActionContext,
+ instance: Arc<Mutex<ConnectionInstance>>,
+ member_id: &MemberId,
+ sheet_name: &SheetName,
+ relative_paths: HashSet<PathBuf>,
+) -> Result<SyncTaskResult, TcpTargetError> {
+ Ok(SyncTaskResult::Success(Vec::new()))
+}
+
+async fn proc_sync_tasks_remote(
+ ctx: &ActionContext,
+ instance: Arc<Mutex<ConnectionInstance>>,
+ member_id: &MemberId,
+ sheet_name: &SheetName,
+ relative_paths: HashSet<PathBuf>,
+) -> Result<SyncTaskResult, TcpTargetError> {
+ Ok(SyncTaskResult::Success(Vec::new()))
+}