diff options
Diffstat (limited to 'crates/vcs_actions/src/actions')
| -rw-r--r-- | crates/vcs_actions/src/actions/local_actions.rs | 163 |
1 files changed, 119 insertions, 44 deletions
diff --git a/crates/vcs_actions/src/actions/local_actions.rs b/crates/vcs_actions/src/actions/local_actions.rs index 59dd972..869d14b 100644 --- a/crates/vcs_actions/src/actions/local_actions.rs +++ b/crates/vcs_actions/src/actions/local_actions.rs @@ -7,9 +7,11 @@ use serde::{Deserialize, Serialize}; use tcp_connection::error::TcpTargetError; use vcs_data::data::{ local::{ + cached_sheet::CachedSheet, config::LocalConfig, latest_info::{LatestInfo, SheetInfo}, }, + sheet::{SheetData, SheetName}, vault::config::VaultUuid, }; @@ -126,60 +128,133 @@ pub async fn update_to_latest_info_action( Err(e) => return Ok(UpdateToLatestInfoResult::AuthorizeFailed(e.to_string())), }; - if ctx.is_proc_on_remote() { - let vault = try_get_vault(&ctx)?; + // Sync Latest Info + { + if ctx.is_proc_on_remote() { + let vault = try_get_vault(&ctx)?; + + // Build latest info + let mut latest_info = LatestInfo::default(); - // Build latest info - let mut latest_info = LatestInfo::default(); - - // Sheet - let mut member_owned = Vec::new(); - let mut member_visible = Vec::new(); - - for sheet in vault.sheets().await? { - if sheet.holder().is_some() && sheet.holder().unwrap() == &member_id { - member_owned.push(sheet.name().clone()); - } else { - member_visible.push(SheetInfo { - sheet_name: sheet.name().clone(), - holder_name: match sheet.holder() { - Some(holder) => Some(holder.clone()), - None => None, - }, - }); + // Sheet + let mut member_owned = Vec::new(); + let mut member_visible = Vec::new(); + + for sheet in vault.sheets().await? { + if sheet.holder().is_some() && sheet.holder().unwrap() == &member_id { + member_owned.push(sheet.name().clone()); + } else { + member_visible.push(SheetInfo { + sheet_name: sheet.name().clone(), + holder_name: match sheet.holder() { + Some(holder) => Some(holder.clone()), + None => None, + }, + }); + } } - } - latest_info.my_sheets = member_owned; - latest_info.other_sheets = member_visible; + latest_info.my_sheets = member_owned; + latest_info.other_sheets = member_visible; - // RefSheet - let ref_sheet_data = vault.sheet(&"ref".to_string()).await?.to_data(); - latest_info.ref_sheet_content = ref_sheet_data; + // RefSheet + let ref_sheet_data = vault.sheet(&"ref".to_string()).await?.to_data(); + latest_info.ref_sheet_content = ref_sheet_data; - // Members - let members = vault.members().await?; - latest_info.vault_members = members; + // Members + let members = vault.members().await?; + latest_info.vault_members = members; - // Send - instance - .lock() - .await - .write_large_msgpack(latest_info, 512 as u16) - .await?; + // Send + instance + .lock() + .await + .write_large_msgpack(latest_info, 512 as u16) + .await?; + } - return Ok(UpdateToLatestInfoResult::Success); + if ctx.is_proc_on_local() { + let latest_info = instance + .lock() + .await + .read_large_msgpack::<LatestInfo>(512 as u16) + .await?; + LatestInfo::write(&latest_info).await?; + } } - if ctx.is_proc_on_local() { - let latest_info = instance - .lock() - .await - .read_large_msgpack::<LatestInfo>(512 as u16) - .await?; - LatestInfo::write(&latest_info).await?; + // Sync Remote Sheets + { + if ctx.is_proc_on_local() { + let Ok(latest_info) = LatestInfo::read().await else { + return Err(TcpTargetError::NotFound( + "Latest info not found.".to_string(), + )); + }; + + // Collect all local versions + let mut local_versions = vec![]; + for request_sheet in latest_info.my_sheets { + let Ok(data) = + CachedSheet::cached_sheet_data(member_id.clone(), request_sheet.clone()).await + else { + local_versions.push((request_sheet, 0)); + continue; + }; + local_versions.push((request_sheet, data.write_count())); + } + + // Send the version list + let len = local_versions.len(); + instance.lock().await.write_msgpack(local_versions).await?; + + if len < 1 { + return Ok(UpdateToLatestInfoResult::Success); + } + } - return Ok(UpdateToLatestInfoResult::Success); + // Send data to local + if ctx.is_proc_on_remote() { + let vault = try_get_vault(&ctx)?; + let mut mut_instance = instance.lock().await; + + let local_versions = mut_instance.read_msgpack::<Vec<(SheetName, i32)>>().await?; + + for (sheet_name, version) in local_versions.iter() { + let sheet = vault.sheet(sheet_name).await?; + if let Some(holder) = sheet.holder() { + if holder == &member_id && &sheet.write_count() != version { + mut_instance.write_msgpack(true).await?; + mut_instance + .write_large_msgpack((sheet_name, sheet.to_data()), 1024u16) + .await?; + } + } + } + mut_instance.write_msgpack(false).await?; + return Ok(UpdateToLatestInfoResult::Success); + } + + // Receive data + if ctx.is_proc_on_local() { + let mut mut_instance = instance.lock().await; + loop { + let in_coming: bool = mut_instance.read_msgpack().await?; + if in_coming { + let (sheet_name, data): (SheetName, SheetData) = + mut_instance.read_large_msgpack(1024u16).await?; + + let Some(path) = CachedSheet::cached_sheet_path(member_id.clone(), sheet_name) + else { + return Err(TcpTargetError::NotFound("Workspace not found".to_string())); + }; + + SheetData::write_to(&data, path).await?; + } else { + return Ok(UpdateToLatestInfoResult::Success); + } + } + } } Err(TcpTargetError::NoResult("No result.".to_string())) |
