ipaacar_core/components/iu/
mod.rs
1use core::IUStatus;
2use std::collections::HashMap;
3use std::fmt::{Display, Formatter};
4use std::sync::{Arc, Weak};
5
6use log::{debug, error};
7use serde::{Deserialize, Serialize};
8use tokio::sync::RwLock;
9use uuid::Uuid;
10
11use crate::backend::Backend;
12use crate::components::callback_list::CallbackList;
13use crate::components::iu::core::{IUCore, IUCoreError};
14use crate::{CallbackFuture, VoidOrAsyncError};
15
16pub mod core;
17pub(crate) mod links;
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub enum IUMessage {
21 Publish(IUCore),
22 Update { uid: String, message: IUUpdate },
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub enum IUUpdate {
27 Payload(serde_json::Value),
28 Close(IUStatus),
29 LinkRemoveTarget {link_name: String, target: String },
30 LinkAddTarget { link_name: String, target: String },
31 LinkRemove(String),
32}
33
34pub struct IU<B: Backend + Send + Sync> {
40 core: RwLock<IUCore>,
41 backend: Arc<B>,
42 default_channel: String,
43 uid: String,
44 update_callbacks: RwLock<CallbackList<(Arc<Self>, Arc<IUUpdate>)>>,
45 cb_ref: Weak<Self>,
46}
47
48impl<B: Backend + Send + Sync> IU<B> {
49 pub fn new(
53 category: impl Into<String>,
54 component_name: impl Into<String>,
55 owner_buffer_uid: impl Into<String>,
56 payload: serde_json::Value,
57 backend: Arc<B>,
58 ) -> Arc<Self> {
59 let uid = Uuid::new_v4().to_string();
60 let component_name = component_name.into();
61 let owner_buffer_uid = owner_buffer_uid.into();
62 let category = category.into();
63 let default_channel = format!("{}/{}/IU/{}", component_name, &category, &uid);
64 let update_callbacks = RwLock::new(CallbackList::new());
65
66 Arc::new_cyclic(|w| Self {
67 core: RwLock::new(IUCore::new(
68 category,
69 component_name,
70 Some(owner_buffer_uid),
71 payload,
72 Some(uid.clone()),
73 )),
74 backend,
75 default_channel,
76 uid,
77 update_callbacks,
78 cb_ref: Weak::clone(w),
79 })
80 }
81
82 pub fn from_core(
83 mut core: IUCore,
84 owner_buffer_uid: Option<String>,
85 backend: Arc<B>,
86 ) -> Arc<Self> {
87 let uid = core.uid.clone();
88 if let Some(bid) = owner_buffer_uid {
89 core.owner_buffer_uid = Some(bid)
90 }
91 let default_channel = format!("{}/{}/IU/{}", &core.component_name, &core.category, &uid);
92 let core = RwLock::new(core);
93 let update_callbacks = RwLock::new(CallbackList::new());
94 Arc::new_cyclic(|w| Self {
95 core,
96 backend,
97 default_channel,
98 uid,
99 update_callbacks,
100 cb_ref: Weak::clone(w),
101 })
102 }
103
104 pub(crate) async fn process_update(
109 &self,
110 update: IUUpdate,
111 ) -> VoidOrAsyncError {
112 let mut core_locked = self.core.write().await;
113 match &update {
114 IUUpdate::Payload(payload) => core_locked.payload = payload.clone(),
115 IUUpdate::Close(status) => core_locked.closed_status = Some(*status),
116 IUUpdate::LinkAddTarget { link_name, target: targets } => {
117 core_locked.links.add_target_to_link(link_name, targets);
118 }
119 IUUpdate::LinkRemoveTarget { link_name, target } => {
120 core_locked.links.remove_target_from_link(link_name, target)?;
121 }
122 IUUpdate::LinkRemove(link_name) => {
123 core_locked.links.remove_link(link_name)?;
124 }
125 };
126 drop(core_locked);
127 let update = Arc::new(update);
128 let update_callbacks = self.update_callbacks.read().await;
129 match self.cb_ref.upgrade() {
130 Some(iu) => update_callbacks.call((iu, update)).await,
131 None => error!("Callbacks triggered for non existent IU. This should never happen."),
132 };
133 Ok(())
134 }
135
136 pub async fn get_uid(&self) -> &str {
137 &self.uid
138 }
139
140 pub async fn on_update<F>(&self, cb: F)
141 where
142 F: Fn((Arc<Self>, Arc<IUUpdate>)) -> CallbackFuture<()>,
143 F: Send + Sync + 'static,
144 {
145 let mut update_callbacks = self.update_callbacks.write().await;
146 update_callbacks.push(cb);
147 }
148
149 pub async fn update_status(&self, buffer_id: &str, status: IUStatus) -> VoidOrAsyncError {
158 let mut core_locked = self.core.write().await;
159 let commit = core_locked.update_status(buffer_id, status).await;
160 drop(core_locked);
161 if commit.is_ok() {
162 self.announce_change_over_backend(IUUpdate::Close(status)).await?;
163 }
164 commit
165 }
166
167 pub async fn is_committed(&self) -> bool {
175 let core_locked = self.core.read().await;
176 matches!(core_locked.closed_status, Some(IUStatus::Committed))
177 }
178
179 pub async fn get_payload(&self) -> serde_json::Value {
180 let core_locked = self.core.read().await;
181 core_locked.payload.clone()
182 }
183
184 pub async fn get_category(&self) -> String {
185 let core_locked = self.core.read().await;
186 core_locked.category.clone()
187 }
188
189 pub async fn get_component_name(&self) -> String {
190 let core_locked = self.core.read().await;
191 core_locked.component_name.clone()
192 }
193
194 pub async fn get_owner_buffer_uid(&self) -> Option<String> {
195 let core_locked = self.core.read().await;
196 core_locked.owner_buffer_uid.clone()
197 }
198
199 pub async fn get_links(&self) -> HashMap<String, Vec<String>> {
200 let core_locked = self.core.read().await;
201 core_locked.links.link_map.clone()
202 }
203
204 pub async fn get_status(&self) -> Option<IUStatus> {
205 let core_locked = self.core.read().await;
206 core_locked.closed_status
207 }
208
209 pub async fn set_payload(&self, new: serde_json::Value) -> VoidOrAsyncError {
220 let mut core_locked = self.core.write().await;
221 if core_locked.is_open() {
222 core_locked.payload = new.clone();
223 drop(core_locked);
224 self.announce_change_over_backend(IUUpdate::Payload(new)).await
225 } else {
226 Err(Box::new(IUCoreError::IUNotOpen))
227 }
228 }
229
230 pub async fn add_target_to_link(
232 &self,
233 link_name: &str,
234 target: impl Into<String>,
235 ) -> VoidOrAsyncError {
236 let target = target.into();
237 let mut core_locked = self.core.write().await;
238 if core_locked.is_open() {
239 core_locked.add_target_to_link(link_name, &target);
240 drop(core_locked);
241 self.announce_change_over_backend(IUUpdate::LinkAddTarget {
242 link_name: link_name.to_string(),
243 target
244 }).await
245 } else {
246 Err(Box::new(IUCoreError::IUNotOpen))
247 }
248 }
249
250 pub async fn remove_target_from_link(&self, link_name: &str, target: &str) -> VoidOrAsyncError {
255 let mut core_locked = self.core.write().await;
256 if core_locked.is_open() {
257 core_locked.remove_target_from_link(link_name, target)?;
258 drop(core_locked);
259 self.announce_change_over_backend(IUUpdate::LinkRemoveTarget {
260 link_name: link_name.to_string(),
261 target: target.to_string()
262 }).await
263 } else {
264 Err(Box::new(IUCoreError::IUNotOpen))
265 }
266 }
267
268 pub async fn remove_link(&self, link_name: &str) -> VoidOrAsyncError {
269 let mut core_locked = self.core.write().await;
270 if core_locked.is_open() {
271 core_locked.remove_link(link_name)?;
272 drop(core_locked);
273 self.announce_change_over_backend(IUUpdate::LinkRemove(link_name.to_string())).await
274 } else {
275 Err(Box::new(IUCoreError::IUNotOpen))
276 }
277 }
278
279 pub async fn announce_change_over_backend(&self, update: IUUpdate) -> VoidOrAsyncError {
284 let bytes = rmp_serde::to_vec(&IUMessage::Update {
285 uid: self.uid.clone(),
286 message: update,
287 })?;
288 self.backend
289 .send_message(&self.default_channel, bytes)
290 .await?;
291 debug!("IU {} announced change.", self.uid);
292 Ok(())
293 }
294
295 pub async fn publish_over_backend(&self) -> VoidOrAsyncError {
296 let bytes = rmp_serde::to_vec(&IUMessage::Publish(self.core.read().await.clone()))?;
297 self.backend
298 .send_message(&self.default_channel, bytes)
299 .await?;
300 debug!("IU {} published.", self.uid);
301 Ok(())
302 }
303}
304
305impl<B: Backend + Send + Sync> Display for IU<B> {
306 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
307 write!(f, "ID: {}", self.uid)
308 }
309}
310
311#[cfg(test)]
312mod tests {
313 use crate::backend::mqtt::MqttBackend;
314 use crate::components::buffer::output::OutputBuffer;
315 use crate::components::iu::core::IUStatus;
316 use crate::components::iu::{IUUpdate, IU};
317 use crate::setup_test_logger;
318 use log::info;
319 use serde_json::json;
320 use tokio::sync::Notify;
321 use tokio::time::timeout;
322 use std::sync::Arc;
323 use std::time::Duration;
324
325
326 #[tokio::test]
327 async fn process_update_test() {
328 setup_test_logger();
329 let mut ob = OutputBuffer::<MqttBackend>::new(
330 "process_update_test",
331 "IUTest",
332 "localhost:1883",
333 None,
334 )
335 .await
336 .unwrap();
337 let iu = ob.create_new_iu("process_update_test", json!({"test": 123}), None).await.unwrap();
338 iu.process_update(IUUpdate::Payload(json!({"test": 456}))).await.unwrap();
339 assert!(iu.get_payload().await == json!({"test": 456}));
340
341 iu.process_update(IUUpdate::LinkAddTarget {
342 link_name: "test_link".to_string(),
343 target: "target1".to_string(),
344 }).await.unwrap();
345 iu.process_update(IUUpdate::LinkAddTarget {
346 link_name: "test_link".to_string(),
347 target: "target2".to_string(),
348 }).await.unwrap();
349 let links = iu.get_links().await.get("test_link").unwrap().clone();
350 assert!(links.contains(&"target1".to_string()));
351 assert!(links.contains(&"target2".to_string()));
352
353 iu.process_update(IUUpdate::LinkRemoveTarget {
354 link_name: "test_link".to_string(),
355 target: "target1".to_string(),
356 }).await.unwrap();
357 assert!(!iu.get_links().await.get("test_link").unwrap().contains(&"target1".to_string()));
358
359 iu.process_update(IUUpdate::LinkRemove("test_link".to_string())).await.unwrap();
360 assert!(!iu.get_links().await.contains_key("test_link"));
361
362 iu.process_update(IUUpdate::Close(IUStatus::Committed)).await.unwrap();
363 assert!(matches!(iu.get_status().await, Some(IUStatus::Committed)));
364 }
365
366
367 #[tokio::test]
368 async fn update_cb_test() {
369 setup_test_logger();
370 let callback_processed = Arc::new(Notify::new());
371 let mut ub: OutputBuffer<MqttBackend> =
372 OutputBuffer::new("ddd", "IUTest", "localhost:1883", None)
373 .await
374 .unwrap();
375 let iu = IU::new(
376 "whatever",
377 "IUTest",
378 ub.uid.clone(),
379 serde_json::Value::default(),
380 Arc::clone(&ub.backend),
381 );
382 ub.publish_iu(Arc::clone(&iu)).await.unwrap();
383 let cp = Arc::clone(&callback_processed);
384 iu.on_update(move |_| {
385 let cbp = Arc::clone(&cp);
386 Box::pin(async move {
387 info!("I WAS HERE!!");
388 cbp.notify_one();
389 })
390 })
391 .await;
392 iu.set_payload(serde_json::Value::default()).await.unwrap();
393 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
395 assert!(timeout(Duration::from_millis(500), callback_processed.notified()).await.is_ok());
396 }
397}