summaryrefslogtreecommitdiff
path: root/crates/vcs_actions/src/actions
diff options
context:
space:
mode:
author魏曹先生 <1992414357@qq.com>2025-11-06 22:10:28 +0800
committer魏曹先生 <1992414357@qq.com>2025-11-06 22:10:28 +0800
commit2779eb3782b1f9876ff300878b58836fcceb3a2e (patch)
tree8238966d767d3009713ba05c468fad9a58be920d /crates/vcs_actions/src/actions
parent6170f4ecfdcdd097b3a37c50cfd3d0e98a4be18b (diff)
feat: Overhaul local-remote synchronization
- Add cached sheet data synchronization - Implement bidirectional sheet version checking - Enhance update_to_latest_info_action with sheet sync - Add support for SheetData and CachedSheet integration
Diffstat (limited to 'crates/vcs_actions/src/actions')
-rw-r--r--crates/vcs_actions/src/actions/local_actions.rs163
1 files changed, 119 insertions, 44 deletions
diff --git a/crates/vcs_actions/src/actions/local_actions.rs b/crates/vcs_actions/src/actions/local_actions.rs
index 59dd972..869d14b 100644
--- a/crates/vcs_actions/src/actions/local_actions.rs
+++ b/crates/vcs_actions/src/actions/local_actions.rs
@@ -7,9 +7,11 @@ use serde::{Deserialize, Serialize};
use tcp_connection::error::TcpTargetError;
use vcs_data::data::{
local::{
+ cached_sheet::CachedSheet,
config::LocalConfig,
latest_info::{LatestInfo, SheetInfo},
},
+ sheet::{SheetData, SheetName},
vault::config::VaultUuid,
};
@@ -126,60 +128,133 @@ pub async fn update_to_latest_info_action(
Err(e) => return Ok(UpdateToLatestInfoResult::AuthorizeFailed(e.to_string())),
};
- if ctx.is_proc_on_remote() {
- let vault = try_get_vault(&ctx)?;
+ // Sync Latest Info
+ {
+ if ctx.is_proc_on_remote() {
+ let vault = try_get_vault(&ctx)?;
+
+ // Build latest info
+ let mut latest_info = LatestInfo::default();
- // Build latest info
- let mut latest_info = LatestInfo::default();
-
- // Sheet
- let mut member_owned = Vec::new();
- let mut member_visible = Vec::new();
-
- for sheet in vault.sheets().await? {
- if sheet.holder().is_some() && sheet.holder().unwrap() == &member_id {
- member_owned.push(sheet.name().clone());
- } else {
- member_visible.push(SheetInfo {
- sheet_name: sheet.name().clone(),
- holder_name: match sheet.holder() {
- Some(holder) => Some(holder.clone()),
- None => None,
- },
- });
+ // Sheet
+ let mut member_owned = Vec::new();
+ let mut member_visible = Vec::new();
+
+ for sheet in vault.sheets().await? {
+ if sheet.holder().is_some() && sheet.holder().unwrap() == &member_id {
+ member_owned.push(sheet.name().clone());
+ } else {
+ member_visible.push(SheetInfo {
+ sheet_name: sheet.name().clone(),
+ holder_name: match sheet.holder() {
+ Some(holder) => Some(holder.clone()),
+ None => None,
+ },
+ });
+ }
}
- }
- latest_info.my_sheets = member_owned;
- latest_info.other_sheets = member_visible;
+ latest_info.my_sheets = member_owned;
+ latest_info.other_sheets = member_visible;
- // RefSheet
- let ref_sheet_data = vault.sheet(&"ref".to_string()).await?.to_data();
- latest_info.ref_sheet_content = ref_sheet_data;
+ // RefSheet
+ let ref_sheet_data = vault.sheet(&"ref".to_string()).await?.to_data();
+ latest_info.ref_sheet_content = ref_sheet_data;
- // Members
- let members = vault.members().await?;
- latest_info.vault_members = members;
+ // Members
+ let members = vault.members().await?;
+ latest_info.vault_members = members;
- // Send
- instance
- .lock()
- .await
- .write_large_msgpack(latest_info, 512 as u16)
- .await?;
+ // Send
+ instance
+ .lock()
+ .await
+ .write_large_msgpack(latest_info, 512 as u16)
+ .await?;
+ }
- return Ok(UpdateToLatestInfoResult::Success);
+ if ctx.is_proc_on_local() {
+ let latest_info = instance
+ .lock()
+ .await
+ .read_large_msgpack::<LatestInfo>(512 as u16)
+ .await?;
+ LatestInfo::write(&latest_info).await?;
+ }
}
- if ctx.is_proc_on_local() {
- let latest_info = instance
- .lock()
- .await
- .read_large_msgpack::<LatestInfo>(512 as u16)
- .await?;
- LatestInfo::write(&latest_info).await?;
+ // Sync Remote Sheets
+ {
+ if ctx.is_proc_on_local() {
+ let Ok(latest_info) = LatestInfo::read().await else {
+ return Err(TcpTargetError::NotFound(
+ "Latest info not found.".to_string(),
+ ));
+ };
+
+ // Collect all local versions
+ let mut local_versions = vec![];
+ for request_sheet in latest_info.my_sheets {
+ let Ok(data) =
+ CachedSheet::cached_sheet_data(member_id.clone(), request_sheet.clone()).await
+ else {
+ local_versions.push((request_sheet, 0));
+ continue;
+ };
+ local_versions.push((request_sheet, data.write_count()));
+ }
+
+ // Send the version list
+ let len = local_versions.len();
+ instance.lock().await.write_msgpack(local_versions).await?;
+
+ if len < 1 {
+ return Ok(UpdateToLatestInfoResult::Success);
+ }
+ }
- return Ok(UpdateToLatestInfoResult::Success);
+ // Send data to local
+ if ctx.is_proc_on_remote() {
+ let vault = try_get_vault(&ctx)?;
+ let mut mut_instance = instance.lock().await;
+
+ let local_versions = mut_instance.read_msgpack::<Vec<(SheetName, i32)>>().await?;
+
+ for (sheet_name, version) in local_versions.iter() {
+ let sheet = vault.sheet(sheet_name).await?;
+ if let Some(holder) = sheet.holder() {
+ if holder == &member_id && &sheet.write_count() != version {
+ mut_instance.write_msgpack(true).await?;
+ mut_instance
+ .write_large_msgpack((sheet_name, sheet.to_data()), 1024u16)
+ .await?;
+ }
+ }
+ }
+ mut_instance.write_msgpack(false).await?;
+ return Ok(UpdateToLatestInfoResult::Success);
+ }
+
+ // Receive data
+ if ctx.is_proc_on_local() {
+ let mut mut_instance = instance.lock().await;
+ loop {
+ let in_coming: bool = mut_instance.read_msgpack().await?;
+ if in_coming {
+ let (sheet_name, data): (SheetName, SheetData) =
+ mut_instance.read_large_msgpack(1024u16).await?;
+
+ let Some(path) = CachedSheet::cached_sheet_path(member_id.clone(), sheet_name)
+ else {
+ return Err(TcpTargetError::NotFound("Workspace not found".to_string()));
+ };
+
+ SheetData::write_to(&data, path).await?;
+ } else {
+ return Ok(UpdateToLatestInfoResult::Success);
+ }
+ }
+ }
}
Err(TcpTargetError::NoResult("No result.".to_string()))