-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathjsonl_event_store.rs
More file actions
136 lines (130 loc) · 4.02 KB
/
jsonl_event_store.rs
File metadata and controls
136 lines (130 loc) · 4.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
//! Generic JSON-lines event store.
//!
//! `JsonlEventStore` persists each event as a single line of JSON.
//! It accepts any event type that implements [`Serialize`] and [`DeserializeOwned`].
use std::fs::{File, OpenOptions};
use std::io::{self, BufRead, BufReader, Write};
use std::marker::PhantomData;
use std::path::PathBuf;
use serde::{de::DeserializeOwned, Serialize};
/// Append-only storage backed by a JSON Lines file.
///
/// Each event is serialized to JSON and written on its own line. The store
/// can later be replayed to rebuild application state.
///
/// # Examples
///
/// ```
/// use aei_framework::infrastructure::JsonlEventStore;
/// use serde::{Deserialize, Serialize};
/// use std::path::PathBuf;
///
/// #[derive(Debug, Serialize, Deserialize, PartialEq)]
/// struct MyEvent {
/// value: u32,
/// }
///
/// let path = PathBuf::from("events.log");
/// let mut store = JsonlEventStore::<MyEvent>::new(path.clone());
/// store.append(&MyEvent { value: 42 }).unwrap();
/// let events = store.load().unwrap();
/// assert_eq!(events, vec![MyEvent { value: 42 }]);
/// std::fs::remove_file(path).unwrap();
/// ```
#[derive(Debug)]
pub struct JsonlEventStore<T> {
path: PathBuf,
_marker: PhantomData<T>,
}
impl<T> JsonlEventStore<T> {
/// Creates a new store writing to the specified path.
///
/// # Arguments
///
/// * `path` - Location of the JSON Lines file.
pub fn new(path: PathBuf) -> Self {
Self {
path,
_marker: PhantomData,
}
}
}
impl<T> JsonlEventStore<T>
where
T: Serialize + DeserializeOwned,
{
/// Persist an event to the underlying storage.
///
/// # Arguments
///
/// * `event` - The event to append.
///
/// # Errors
///
/// Returns [`io::Error`] if the event cannot be serialized or written.
///
/// # Examples
///
/// ```
/// # use aei_framework::infrastructure::JsonlEventStore;
/// # use serde::{Deserialize, Serialize};
/// # use std::path::PathBuf;
/// # #[derive(Serialize, Deserialize)]
/// # struct MyEvent { value: u32 }
/// # let path = PathBuf::from("append.log");
/// # let mut store = JsonlEventStore::<MyEvent>::new(path.clone());
/// store.append(&MyEvent { value: 7 }).unwrap();
/// # std::fs::remove_file(path).unwrap();
/// ```
pub fn append(&mut self, event: &T) -> Result<(), io::Error> {
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)?;
let json = serde_json::to_string(event).map_err(io::Error::other)?;
writeln!(file, "{json}")
}
/// Load all events in chronological order.
///
/// # Returns
///
/// A vector containing the deserialized events.
///
/// # Errors
///
/// Returns [`io::Error`] if the file cannot be read or an event fails to
/// deserialize.
///
/// # Examples
///
/// ```
/// # use aei_framework::infrastructure::JsonlEventStore;
/// # use serde::{Deserialize, Serialize};
/// # use std::path::PathBuf;
/// # #[derive(Debug, Serialize, Deserialize, PartialEq)]
/// # struct MyEvent { value: u32 }
/// # let path = PathBuf::from("load.log");
/// # let mut store = JsonlEventStore::<MyEvent>::new(path.clone());
/// store.append(&MyEvent { value: 1 }).unwrap();
/// let events = store.load().unwrap();
/// assert_eq!(events, vec![MyEvent { value: 1 }]);
/// # std::fs::remove_file(path).unwrap();
/// ```
pub fn load(&mut self) -> Result<Vec<T>, io::Error> {
let mut events = Vec::new();
if !self.path.exists() {
return Ok(events);
}
let file = File::open(&self.path)?;
let reader = BufReader::new(file);
for line in reader.lines() {
let line = line?;
if line.trim().is_empty() {
continue;
}
let event: T = serde_json::from_str(&line).map_err(io::Error::other)?;
events.push(event);
}
Ok(events)
}
}