Mirror of https://github.com/roostorg/osprey
github.com/roostorg/osprey
1use std::{collections::HashMap, fmt::Debug, future::Future, hash::Hash, sync::Arc};
2
3use futures::{
4 future::{BoxFuture, Shared},
5 FutureExt,
6};
7use parking_lot::RwLock;
8
9enum CachedFuture<T, E>
10where
11 T: Clone,
12 E: Clone,
13{
14 Ready(T),
15 Pending(Shared<BoxFuture<'static, Result<T, E>>>),
16}
17
18pub enum CachedFutureRef<'a, T> {
19 Ready(&'a T),
20 Pending,
21}
22
23#[derive(Clone)]
24pub struct CachedFutures<K, T, E>
25where
26 K: Hash + Eq + Clone,
27 T: Clone,
28 E: Clone,
29{
30 cache: Arc<RwLock<HashMap<K, CachedFuture<T, E>>>>,
31}
32
33impl<K, T, E> Default for CachedFutures<K, T, E>
34where
35 K: Hash + Eq + Clone,
36 T: Clone,
37 E: Clone,
38{
39 fn default() -> Self {
40 Self {
41 cache: Default::default(),
42 }
43 }
44}
45impl<K, T, E> Debug for CachedFutures<K, T, E>
46where
47 K: Hash + Eq + Clone,
48 T: Clone,
49 E: Clone,
50{
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 f.debug_struct("CachedFutures")
53 .field("cache_size", &self.cache.read().len())
54 .finish()
55 }
56}
57
58impl<K, T, E> CachedFutures<K, T, E>
59where
60 K: Hash + Eq + Clone + Send + Sync + 'static,
61 T: Clone + Send + Sync + 'static,
62 E: Clone + Send + Sync + 'static,
63{
64 /// Given a key, tries to re-use the successful result of `resolver`, otherwise, will execute resolver,
65 /// and cache its result if the result is Ok().
66 pub async fn get_or_cache_default<F, U>(&self, key: K, resolver: F) -> Result<T, E>
67 where
68 F: FnOnce(K) -> U,
69 U: Future<Output = Result<T, E>> + Send + 'static,
70 {
71 // Check to see if we already have a cached value - fast path with the read lock!
72 let maybe_pending_future = match self.cache.read().get(&key) {
73 Some(CachedFuture::Ready(value)) => return Ok(value.clone()),
74 Some(CachedFuture::Pending(future)) => Some(future.clone()),
75 None => None,
76 };
77
78 let future = match maybe_pending_future {
79 Some(future) => future,
80 None => {
81 // The future does not exist yet, so now we're going to acquire a write lock, so we can start resolution:
82 let mut write_locked_cache = self.cache.write();
83
84 // Repeat the read check first, a writer may have raced us and inserted the future:
85 match write_locked_cache.get(&key) {
86 Some(CachedFuture::Ready(value)) => return Ok(value.clone()),
87 Some(CachedFuture::Pending(future)) => future.clone(),
88 None => {
89 let future = self.create_resolver_future(resolver, &key);
90 write_locked_cache.insert(key, CachedFuture::Pending(future.clone()));
91 future
92 }
93 }
94 }
95 };
96
97 future.await
98 }
99
100 /// Reduces over the cached futures, allowing one to introspect all of the resolved future values, or which futures are pending.
101 pub fn fold<I, F>(&self, mut init: I, reducer: F) -> I
102 where
103 F: Fn(I, (&K, CachedFutureRef<'_, T>)) -> I,
104 {
105 let cache = self.cache.read();
106 for (k, v) in cache.iter() {
107 let future_ref = match v {
108 CachedFuture::Ready(v) => CachedFutureRef::Ready(v),
109 CachedFuture::Pending(_) => CachedFutureRef::Pending,
110 };
111
112 init = (reducer)(init, (k, future_ref));
113 }
114
115 init
116 }
117
118 /// Reduces over the ready cached futures, allowing one to introspect all of the resolved future values.
119 pub fn fold_ready<I, F>(&self, init: I, reducer: F) -> I
120 where
121 F: Fn(I, (&K, &T)) -> I,
122 {
123 self.fold(init, |init, (k, v)| match v {
124 CachedFutureRef::Ready(v) => (reducer)(init, (k, v)),
125 CachedFutureRef::Pending => init,
126 })
127 }
128
129 // drop a cached future
130 pub fn drop(&self, key: &K) {
131 self.cache.write().remove(key);
132 }
133
134 /// Iterates over all the *ready* cached futures, retaining ones that are match the predicate function, and returning the number
135 /// of futures which have been disposed of (where the predicate function has returned false).
136 pub fn retain_ready<F>(&self, mut f: F) -> usize
137 where
138 F: FnMut(&T) -> bool,
139 {
140 let mut cache = self.cache.write();
141 let mut num_disposed = 0;
142 cache.retain(|_, v| {
143 match v {
144 // Always retain pending entries, as they are in-flight.
145 CachedFuture::Pending(_) => true,
146 CachedFuture::Ready(r) => {
147 let should_retain = (f)(r);
148
149 if !should_retain {
150 num_disposed += 1;
151 }
152
153 should_retain
154 }
155 }
156 });
157
158 num_disposed
159 }
160
161 fn create_resolver_future<F, U>(
162 &self,
163 resolver: F,
164 key: &K,
165 ) -> Shared<BoxFuture<'static, Result<T, E>>>
166 where
167 F: FnOnce(K) -> U,
168 U: Future<Output = Result<T, E>> + Send + 'static,
169 {
170 let resolver_future = (resolver)(key.clone());
171 let cache = self.cache.clone();
172 let key_clone = key.clone();
173 async move {
174 let result = resolver_future.await;
175 match result {
176 Ok(result) => {
177 // If the result was a success, we'll cache that result now.
178 cache
179 .write()
180 .insert(key_clone, CachedFuture::Ready(result.clone()));
181 Ok(result)
182 }
183 Err(error) => {
184 // We will not cache errors!
185 cache.write().remove(&key_clone);
186 Err(error)
187 }
188 }
189 }
190 .boxed()
191 .shared()
192 }
193}