ipaacar_core/components/iu/
mod.rs

1use core::IUStatus;
2use std::collections::HashMap;
3use std::fmt::{Display, Formatter};
4use std::sync::{Arc, Weak};
5
6use log::{debug, error};
7use serde::{Deserialize, Serialize};
8use tokio::sync::RwLock;
9use uuid::Uuid;
10
11use crate::backend::Backend;
12use crate::components::callback_list::CallbackList;
13use crate::components::iu::core::{IUCore, IUCoreError};
14use crate::{CallbackFuture, VoidOrAsyncError};
15
16pub mod core;
17pub(crate) mod links;
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub enum IUMessage { 
21    Publish(IUCore),
22    Update { uid: String, message: IUUpdate },
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub enum IUUpdate {
27    Payload(serde_json::Value),
28    Close(IUStatus),
29    LinkRemoveTarget {link_name: String, target: String },
30    LinkAddTarget { link_name: String, target: String },
31    LinkRemove(String),
32}
33
34/// Wrapper around the Core IU with a backend to announce any changes.
35/// Takes no mutable references and uses exclusively dynamic ownership management with a mutex lock.
36/// This is necessary, because any callbacks might try to access the core IU as well.
37/// For this reason, if you change this code, ALWAYS drop the MutexGuard before announcing changes,
38/// or you can Deadlock.
39pub struct IU<B: Backend + Send + Sync> {
40    core: RwLock<IUCore>,
41    backend: Arc<B>,
42    default_channel: String,
43    uid: String,
44    update_callbacks: RwLock<CallbackList<(Arc<Self>, Arc<IUUpdate>)>>,
45    cb_ref: Weak<Self>,
46}
47
48impl<B: Backend + Send + Sync> IU<B> {
49    ///////////////////////////////////////////
50    // Constructors
51    ///////////////////////////////////////////
52    pub fn new(
53        category: impl Into<String>,
54        component_name: impl Into<String>,
55        owner_buffer_uid: impl Into<String>,
56        payload: serde_json::Value,
57        backend: Arc<B>,
58    ) -> Arc<Self> {
59        let uid = Uuid::new_v4().to_string();
60        let component_name = component_name.into();
61        let owner_buffer_uid = owner_buffer_uid.into();
62        let category = category.into();
63        let default_channel = format!("{}/{}/IU/{}", component_name, &category, &uid);
64        let update_callbacks = RwLock::new(CallbackList::new());
65
66        Arc::new_cyclic(|w| Self {
67            core: RwLock::new(IUCore::new(
68                category,
69                component_name,
70                Some(owner_buffer_uid),
71                payload,
72                Some(uid.clone()),
73            )),
74            backend,
75            default_channel,
76            uid,
77            update_callbacks,
78            cb_ref: Weak::clone(w),
79        })
80    }
81
82    pub fn from_core(
83        mut core: IUCore,
84        owner_buffer_uid: Option<String>,
85        backend: Arc<B>,
86    ) -> Arc<Self> {
87        let uid = core.uid.clone();
88        if let Some(bid) = owner_buffer_uid {
89            core.owner_buffer_uid = Some(bid)
90        }
91        let default_channel = format!("{}/{}/IU/{}", &core.component_name, &core.category, &uid);
92        let core = RwLock::new(core);
93        let update_callbacks = RwLock::new(CallbackList::new());
94        Arc::new_cyclic(|w| Self {
95            core,
96            backend,
97            default_channel,
98            uid,
99            update_callbacks,
100            cb_ref: Weak::clone(w),
101        })
102    }
103
104    ///////////////////////////////////////////
105    // Wrapper specific
106    ///////////////////////////////////////////
107
108    pub(crate) async fn process_update(
109        &self,
110        update: IUUpdate,
111    ) -> VoidOrAsyncError {
112        let mut core_locked = self.core.write().await;
113        match &update {
114            IUUpdate::Payload(payload) => core_locked.payload = payload.clone(),
115            IUUpdate::Close(status) => core_locked.closed_status = Some(*status),
116            IUUpdate::LinkAddTarget { link_name, target: targets } => {
117                core_locked.links.add_target_to_link(link_name, targets);
118            }
119            IUUpdate::LinkRemoveTarget { link_name, target } => {
120                core_locked.links.remove_target_from_link(link_name, target)?;
121            }
122            IUUpdate::LinkRemove(link_name) => {
123                core_locked.links.remove_link(link_name)?;
124            }
125        };
126        drop(core_locked);
127        let update = Arc::new(update);
128        let update_callbacks = self.update_callbacks.read().await;
129        match self.cb_ref.upgrade() {
130            Some(iu) => update_callbacks.call((iu, update)).await,
131            None => error!("Callbacks triggered for non existent IU. This should never happen."),
132        };
133        Ok(())
134    }
135
136    pub async fn get_uid(&self) -> &str {
137        &self.uid
138    }
139
140    pub async fn on_update<F>(&self, cb: F)
141    where
142        F: Fn((Arc<Self>, Arc<IUUpdate>)) -> CallbackFuture<()>,
143        F: Send + Sync + 'static,
144    {
145        let mut update_callbacks = self.update_callbacks.write().await;
146        update_callbacks.push(cb);
147    }
148
149    ///////////////////////////////////////////
150    // Core functionality
151    ///////////////////////////////////////////
152
153    /// Commits this IU. Can (or should) only be done by the owner of the IU.
154    /// Is checked via a passed id, which might make misuse possible.
155    ///
156    /// Returns IU::CommittedByNonOwner if buffer_id is not the owner.
157    pub async fn update_status(&self, buffer_id: &str, status: IUStatus) -> VoidOrAsyncError {
158        let mut core_locked = self.core.write().await;
159        let commit = core_locked.update_status(buffer_id, status).await;
160        drop(core_locked);
161        if commit.is_ok() {
162            self.announce_change_over_backend(IUUpdate::Close(status)).await?;
163        }
164        commit
165    }
166
167    ///////////////////////////////////////////
168    // Core Getters
169    //
170    // Need to:
171    // Lock Core
172    ///////////////////////////////////////////
173
174    pub async fn is_committed(&self) -> bool {
175        let core_locked = self.core.read().await;
176        matches!(core_locked.closed_status, Some(IUStatus::Committed))
177    }
178
179    pub async fn get_payload(&self) -> serde_json::Value {
180        let core_locked = self.core.read().await;
181        core_locked.payload.clone()
182    }
183
184    pub async fn get_category(&self) -> String {
185        let core_locked = self.core.read().await;
186        core_locked.category.clone()
187    }
188
189    pub async fn get_component_name(&self) -> String {
190        let core_locked = self.core.read().await;
191        core_locked.component_name.clone()
192    }
193
194    pub async fn get_owner_buffer_uid(&self) -> Option<String> {
195        let core_locked = self.core.read().await;
196        core_locked.owner_buffer_uid.clone()
197    }
198
199    pub async fn get_links(&self) -> HashMap<String, Vec<String>> {
200        let core_locked = self.core.read().await;
201        core_locked.links.link_map.clone()
202    }
203
204    pub async fn get_status(&self) -> Option<IUStatus> {
205        let core_locked = self.core.read().await;
206        core_locked.closed_status
207    }
208
209    ///////////////////////////////////////////
210    // Core Setters
211    //
212    // Need to:
213    // Lock Core
214    // Check if committed
215    // if not -> modify & drop lock
216    // Announce Change
217    ///////////////////////////////////////////
218
219    pub async fn set_payload(&self, new: serde_json::Value) -> VoidOrAsyncError {
220        let mut core_locked = self.core.write().await;
221        if core_locked.is_open() {
222            core_locked.payload = new.clone();
223            drop(core_locked);
224            self.announce_change_over_backend(IUUpdate::Payload(new)).await
225        } else {
226            Err(Box::new(IUCoreError::IUNotOpen))
227        }
228    }
229
230    /// adds a target to a link. Target should be an id. If the link doesn't exist, will be created.
231    pub async fn add_target_to_link(
232        &self,
233        link_name: &str,
234        target: impl Into<String>,
235    ) -> VoidOrAsyncError {
236        let target = target.into();
237        let mut core_locked = self.core.write().await;
238        if core_locked.is_open() {
239            core_locked.add_target_to_link(link_name, &target);
240            drop(core_locked);
241            self.announce_change_over_backend(IUUpdate::LinkAddTarget { 
242                link_name: link_name.to_string(), 
243                target 
244            }).await
245        } else {
246            Err(Box::new(IUCoreError::IUNotOpen))
247        }
248    }
249
250    /// Removes a target from a link. Target should be an id. If the link would have no more
251    /// targets after removal, it will be removed as well.
252    ///
253    /// Can return respective errors, if the target or link is not found.
254    pub async fn remove_target_from_link(&self, link_name: &str, target: &str) -> VoidOrAsyncError {
255        let mut core_locked = self.core.write().await;
256        if core_locked.is_open() {
257            core_locked.remove_target_from_link(link_name, target)?;
258            drop(core_locked);
259            self.announce_change_over_backend(IUUpdate::LinkRemoveTarget { 
260                link_name: link_name.to_string(), 
261                target: target.to_string() 
262            }).await
263        } else {
264            Err(Box::new(IUCoreError::IUNotOpen))
265        }
266    }
267
268    pub async fn remove_link(&self, link_name: &str) -> VoidOrAsyncError {
269        let mut core_locked = self.core.write().await;
270        if core_locked.is_open() {
271            core_locked.remove_link(link_name)?;
272            drop(core_locked);
273            self.announce_change_over_backend(IUUpdate::LinkRemove(link_name.to_string())).await
274        } else {
275            Err(Box::new(IUCoreError::IUNotOpen))
276        }
277    }
278
279    ///////////////////////////////////////////
280    // Backend Stuff
281    ///////////////////////////////////////////
282
283    pub async fn announce_change_over_backend(&self, update: IUUpdate) -> VoidOrAsyncError {
284        let bytes = rmp_serde::to_vec(&IUMessage::Update {
285            uid: self.uid.clone(),
286            message: update,
287        })?;
288        self.backend
289            .send_message(&self.default_channel, bytes)
290            .await?;
291        debug!("IU {} announced change.", self.uid);
292        Ok(())
293    }
294
295    pub async fn publish_over_backend(&self) -> VoidOrAsyncError {
296        let bytes = rmp_serde::to_vec(&IUMessage::Publish(self.core.read().await.clone()))?;
297        self.backend
298            .send_message(&self.default_channel, bytes)
299            .await?;
300        debug!("IU {} published.", self.uid);
301        Ok(())
302    }
303}
304
305impl<B: Backend + Send + Sync> Display for IU<B> {
306    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
307        write!(f, "ID: {}", self.uid)
308    }
309}
310
311#[cfg(test)]
312mod tests {
313    use crate::backend::mqtt::MqttBackend;
314    use crate::components::buffer::output::OutputBuffer;
315    use crate::components::iu::core::IUStatus;
316    use crate::components::iu::{IUUpdate, IU};
317    use crate::setup_test_logger;
318    use log::info;
319    use serde_json::json;
320    use tokio::sync::Notify;
321    use tokio::time::timeout;
322    use std::sync::Arc;
323    use std::time::Duration;
324
325
326    #[tokio::test]
327    async fn process_update_test() {
328        setup_test_logger();
329        let mut ob = OutputBuffer::<MqttBackend>::new(
330            "process_update_test",
331            "IUTest",
332            "localhost:1883",
333            None,
334        )
335        .await
336        .unwrap();
337        let iu = ob.create_new_iu("process_update_test", json!({"test": 123}), None).await.unwrap();
338        iu.process_update(IUUpdate::Payload(json!({"test": 456}))).await.unwrap();
339        assert!(iu.get_payload().await == json!({"test": 456}));
340
341        iu.process_update(IUUpdate::LinkAddTarget {
342            link_name: "test_link".to_string(),
343            target: "target1".to_string(),
344        }).await.unwrap();
345        iu.process_update(IUUpdate::LinkAddTarget {
346            link_name: "test_link".to_string(),
347            target: "target2".to_string(),
348        }).await.unwrap();
349        let links = iu.get_links().await.get("test_link").unwrap().clone();
350        assert!(links.contains(&"target1".to_string()));
351        assert!(links.contains(&"target2".to_string()));
352
353        iu.process_update(IUUpdate::LinkRemoveTarget {
354            link_name: "test_link".to_string(),
355            target: "target1".to_string(),
356        }).await.unwrap();
357        assert!(!iu.get_links().await.get("test_link").unwrap().contains(&"target1".to_string()));
358
359        iu.process_update(IUUpdate::LinkRemove("test_link".to_string())).await.unwrap();
360        assert!(!iu.get_links().await.contains_key("test_link"));
361
362        iu.process_update(IUUpdate::Close(IUStatus::Committed)).await.unwrap();
363        assert!(matches!(iu.get_status().await, Some(IUStatus::Committed)));
364    }
365    
366
367    #[tokio::test]
368    async fn update_cb_test() {
369        setup_test_logger();
370        let callback_processed = Arc::new(Notify::new());
371        let mut ub: OutputBuffer<MqttBackend> =
372            OutputBuffer::new("ddd", "IUTest", "localhost:1883", None)
373                .await
374                .unwrap();
375        let iu = IU::new(
376            "whatever",
377            "IUTest",
378            ub.uid.clone(),
379            serde_json::Value::default(),
380            Arc::clone(&ub.backend),
381        );
382        ub.publish_iu(Arc::clone(&iu)).await.unwrap();
383        let cp = Arc::clone(&callback_processed);
384        iu.on_update(move |_| {
385            let cbp = Arc::clone(&cp);
386            Box::pin(async move {
387                info!("I WAS HERE!!");
388                cbp.notify_one();
389            })
390        })
391        .await;
392        iu.set_payload(serde_json::Value::default()).await.unwrap();
393        // Wait for update to go through broker
394        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
395        assert!(timeout(Duration::from_millis(500), callback_processed.notified()).await.is_ok());
396    }
397}