summaryrefslogtreecommitdiff
path: root/crates/vcs_actions/src/actions/local_actions.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/vcs_actions/src/actions/local_actions.rs')
-rw-r--r--crates/vcs_actions/src/actions/local_actions.rs248
1 files changed, 219 insertions, 29 deletions
diff --git a/crates/vcs_actions/src/actions/local_actions.rs b/crates/vcs_actions/src/actions/local_actions.rs
index 869d14b..c9e5db3 100644
--- a/crates/vcs_actions/src/actions/local_actions.rs
+++ b/crates/vcs_actions/src/actions/local_actions.rs
@@ -1,18 +1,27 @@
-use std::net::SocketAddr;
+use std::{
+ collections::HashMap,
+ io::{Error, ErrorKind},
+ net::SocketAddr,
+ path::PathBuf,
+};
use action_system::{action::ActionContext, macros::action_gen};
use cfg_file::config::ConfigFile;
use log::info;
use serde::{Deserialize, Serialize};
use tcp_connection::error::TcpTargetError;
+use tokio::time::Instant;
use vcs_data::data::{
local::{
cached_sheet::CachedSheet,
config::LocalConfig,
latest_info::{LatestInfo, SheetInfo},
+ local_sheet::LocalSheetData,
+ member_held::MemberHeld,
},
+ member::MemberId,
sheet::{SheetData, SheetName},
- vault::config::VaultUuid,
+ vault::{config::VaultUuid, virtual_file::VirtualFileId},
};
use crate::actions::{
@@ -114,6 +123,12 @@ pub enum UpdateToLatestInfoResult {
// Fail
AuthorizeFailed(String),
+ SyncCachedSheetFail(SyncCachedSheetFailReason),
+}
+
+#[derive(Serialize, Deserialize)]
+pub enum SyncCachedSheetFailReason {
+ PathAlreadyExist(PathBuf),
}
#[action_gen]
@@ -128,6 +143,8 @@ pub async fn update_to_latest_info_action(
Err(e) => return Ok(UpdateToLatestInfoResult::AuthorizeFailed(e.to_string())),
};
+ info!("Sending latest info to {}", member_id);
+
// Sync Latest Info
{
if ctx.is_proc_on_remote() {
@@ -174,15 +191,18 @@ pub async fn update_to_latest_info_action(
}
if ctx.is_proc_on_local() {
- let latest_info = instance
+ let mut latest_info = instance
.lock()
.await
.read_large_msgpack::<LatestInfo>(512 as u16)
.await?;
+ latest_info.update_instant = Some(Instant::now());
LatestInfo::write(&latest_info).await?;
}
}
+ info!("Update sheets to {}", member_id);
+
// Sync Remote Sheets
{
if ctx.is_proc_on_local() {
@@ -195,10 +215,10 @@ pub async fn update_to_latest_info_action(
// Collect all local versions
let mut local_versions = vec![];
for request_sheet in latest_info.my_sheets {
- let Ok(data) =
- CachedSheet::cached_sheet_data(member_id.clone(), request_sheet.clone()).await
- else {
- local_versions.push((request_sheet, 0));
+ let Ok(data) = CachedSheet::cached_sheet_data(&request_sheet).await else {
+ // For newly created sheets, the version is 0.
+ // Send -1 to distinguish from 0, ensuring the upstream will definitely send the sheet information
+ local_versions.push((request_sheet, -1));
continue;
};
local_versions.push((request_sheet, data.write_count()));
@@ -209,12 +229,53 @@ pub async fn update_to_latest_info_action(
instance.lock().await.write_msgpack(local_versions).await?;
if len < 1 {
- return Ok(UpdateToLatestInfoResult::Success);
- }
- }
+ // Don't return here, continue to next section
+ } else {
+ // Send data to local
+ if ctx.is_proc_on_remote() {
+ let vault = try_get_vault(&ctx)?;
+ let mut mut_instance = instance.lock().await;
+
+ let local_versions =
+ mut_instance.read_msgpack::<Vec<(SheetName, i32)>>().await?;
+
+ for (sheet_name, local_write_count) in local_versions.iter() {
+ let sheet = vault.sheet(sheet_name).await?;
+ if let Some(holder) = sheet.holder() {
+ if holder == &member_id && &sheet.write_count() != local_write_count {
+ mut_instance.write_msgpack(true).await?;
+ mut_instance
+ .write_large_msgpack((sheet_name, sheet.to_data()), 1024u16)
+ .await?;
+ }
+ }
+ }
+ mut_instance.write_msgpack(false).await?;
+ }
- // Send data to local
- if ctx.is_proc_on_remote() {
+ // Receive data
+ if ctx.is_proc_on_local() {
+ let mut mut_instance = instance.lock().await;
+ loop {
+ let in_coming: bool = mut_instance.read_msgpack().await?;
+ if in_coming {
+ let (sheet_name, data): (SheetName, SheetData) =
+ mut_instance.read_large_msgpack(1024u16).await?;
+
+ let Some(path) = CachedSheet::cached_sheet_path(sheet_name) else {
+ return Err(TcpTargetError::NotFound(
+ "Workspace not found".to_string(),
+ ));
+ };
+
+ SheetData::write_to(&data, path).await?;
+ } else {
+ break;
+ }
+ }
+ }
+ }
+ } else if ctx.is_proc_on_remote() {
let vault = try_get_vault(&ctx)?;
let mut mut_instance = instance.lock().await;
@@ -232,30 +293,159 @@ pub async fn update_to_latest_info_action(
}
}
mut_instance.write_msgpack(false).await?;
- return Ok(UpdateToLatestInfoResult::Success);
}
+ }
- // Receive data
+ info!("Fetch held status to {}", member_id);
+
+ // Sync Held Info
+ {
if ctx.is_proc_on_local() {
+ let Ok(latest_info) = LatestInfo::read().await else {
+ return Err(TcpTargetError::NotFound(
+ "Latest info not found.".to_string(),
+ ));
+ };
+
+ // Collect files that need to know the holder
+ let mut holder_wants_know = Vec::new();
+ for sheet_name in &latest_info.my_sheets {
+ if let Ok(sheet_data) = CachedSheet::cached_sheet_data(sheet_name).await {
+ holder_wants_know
+ .extend(sheet_data.mapping().values().map(|value| value.id.clone()));
+ }
+ }
+
+ // Send request
let mut mut_instance = instance.lock().await;
- loop {
- let in_coming: bool = mut_instance.read_msgpack().await?;
- if in_coming {
- let (sheet_name, data): (SheetName, SheetData) =
- mut_instance.read_large_msgpack(1024u16).await?;
-
- let Some(path) = CachedSheet::cached_sheet_path(member_id.clone(), sheet_name)
- else {
- return Err(TcpTargetError::NotFound("Workspace not found".to_string()));
- };
-
- SheetData::write_to(&data, path).await?;
- } else {
- return Ok(UpdateToLatestInfoResult::Success);
+ mut_instance
+ .write_large_msgpack(&holder_wants_know, 1024u16)
+ .await?;
+
+ // Receive information and write to local
+ let result: HashMap<VirtualFileId, Option<MemberId>> =
+ mut_instance.read_large_msgpack(1024u16).await?;
+
+ // Read configuration file
+ let path = MemberHeld::held_file_path(&member_id)?;
+ let mut member_held = match MemberHeld::read_from(&path).await {
+ Ok(r) => r,
+ Err(_) => MemberHeld::default(),
+ };
+
+ // Write the received information
+ member_held.update_held_status(result);
+
+ // Write
+ MemberHeld::write_to(&member_held, &path).await?;
+ }
+
+ if ctx.is_proc_on_remote() {
+ let vault = try_get_vault(&ctx)?;
+ let mut mut_instance = instance.lock().await;
+
+ // Read the request
+ let holder_wants_know: Vec<VirtualFileId> =
+ mut_instance.read_large_msgpack(1024u16).await?;
+
+ // Organize the information
+ let mut result: HashMap<VirtualFileId, Option<MemberId>> = HashMap::new();
+ for id in holder_wants_know {
+ let Ok(meta) = vault.virtual_file_meta(&id).await else {
+ continue;
+ };
+ result.insert(
+ id,
+ if meta.hold_member().is_empty() {
+ None
+ } else {
+ Some(meta.hold_member().to_string())
+ },
+ );
+ }
+
+ // Send information
+ mut_instance.write_large_msgpack(&result, 1024u16).await?;
+ }
+ }
+
+ // Sync cached sheet to local sheet
+ if ctx.is_proc_on_local() {
+ let workspace = try_get_local_workspace(&ctx)?;
+ let local_sheet_paths =
+ extract_sheet_names_from_paths(workspace.local_sheet_paths().await?)?;
+ let cached_sheet_paths =
+ extract_sheet_names_from_paths(CachedSheet::cached_sheet_paths().await?)?;
+
+ // Match cached sheets and local heets, and sync content
+ for (cached_sheet_name, _cached_sheet_path) in cached_sheet_paths {
+ // Get local sheet path by cached_sheet_name
+ let Some(local_sheet_path) = local_sheet_paths.get(&cached_sheet_name) else {
+ continue;
+ };
+
+ // Read cached sheet and local sheet
+ let cached_sheet = CachedSheet::cached_sheet_data(&cached_sheet_name).await?;
+ let Ok(local_sheet_data) = LocalSheetData::read_from(local_sheet_path).await else {
+ continue;
+ };
+ let mut local_sheet =
+ local_sheet_data.wrap_to_local_sheet(&workspace, "".to_string(), "".to_string());
+
+ // Read cached id mapping
+ let Some(cached_sheet_id_mapping) = cached_sheet.id_mapping() else {
+ continue;
+ };
+
+ for (cached_item_id, cached_item_path) in cached_sheet_id_mapping.iter() {
+ let path_by_id = { local_sheet.path_by_id(cached_item_id).cloned() };
+
+ // Get local path
+ let Some(local_path) = path_by_id else {
+ continue;
+ };
+
+ if &local_path == cached_item_path {
+ continue;
+ }
+
+ // If path not match, try to move
+ let move_result = local_sheet.move_mapping(&local_path, cached_item_path);
+ match move_result {
+ Err(e) => match e.kind() {
+ ErrorKind::AlreadyExists => {
+ return Ok(UpdateToLatestInfoResult::SyncCachedSheetFail(
+ SyncCachedSheetFailReason::PathAlreadyExist(
+ cached_item_path.clone(),
+ ),
+ ));
+ }
+ _ => return Err(e.into()),
+ },
+ _ => {}
}
+ local_sheet.write_to_path(&local_sheet_path).await?
}
}
}
- Err(TcpTargetError::NoResult("No result.".to_string()))
+ Ok(UpdateToLatestInfoResult::Success)
+}
+
+/// Extract sheet names from file paths
+fn extract_sheet_names_from_paths(
+ paths: Vec<PathBuf>,
+) -> Result<HashMap<SheetName, PathBuf>, std::io::Error> {
+ let mut result = HashMap::new();
+ for p in paths {
+ let sheet_name = p
+ .file_stem()
+ .and_then(|s| s.to_str())
+ .map(|s| s.to_string())
+ .ok_or_else(|| {
+ std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid file name")
+ })?;
+ result.insert(sheet_name, p);
+ }
+ Ok(result)
}