Mirror of https://github.com/roostorg/osprey github.com/roostorg/osprey
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 193 lines 5.9 kB view raw
1use std::{collections::HashMap, fmt::Debug, future::Future, hash::Hash, sync::Arc}; 2 3use futures::{ 4 future::{BoxFuture, Shared}, 5 FutureExt, 6}; 7use parking_lot::RwLock; 8 9enum CachedFuture<T, E> 10where 11 T: Clone, 12 E: Clone, 13{ 14 Ready(T), 15 Pending(Shared<BoxFuture<'static, Result<T, E>>>), 16} 17 18pub enum CachedFutureRef<'a, T> { 19 Ready(&'a T), 20 Pending, 21} 22 23#[derive(Clone)] 24pub struct CachedFutures<K, T, E> 25where 26 K: Hash + Eq + Clone, 27 T: Clone, 28 E: Clone, 29{ 30 cache: Arc<RwLock<HashMap<K, CachedFuture<T, E>>>>, 31} 32 33impl<K, T, E> Default for CachedFutures<K, T, E> 34where 35 K: Hash + Eq + Clone, 36 T: Clone, 37 E: Clone, 38{ 39 fn default() -> Self { 40 Self { 41 cache: Default::default(), 42 } 43 } 44} 45impl<K, T, E> Debug for CachedFutures<K, T, E> 46where 47 K: Hash + Eq + Clone, 48 T: Clone, 49 E: Clone, 50{ 51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 52 f.debug_struct("CachedFutures") 53 .field("cache_size", &self.cache.read().len()) 54 .finish() 55 } 56} 57 58impl<K, T, E> CachedFutures<K, T, E> 59where 60 K: Hash + Eq + Clone + Send + Sync + 'static, 61 T: Clone + Send + Sync + 'static, 62 E: Clone + Send + Sync + 'static, 63{ 64 /// Given a key, tries to re-use the successful result of `resolver`, otherwise, will execute resolver, 65 /// and cache its result if the result is Ok(). 66 pub async fn get_or_cache_default<F, U>(&self, key: K, resolver: F) -> Result<T, E> 67 where 68 F: FnOnce(K) -> U, 69 U: Future<Output = Result<T, E>> + Send + 'static, 70 { 71 // Check to see if we already have a cached value - fast path with the read lock! 72 let maybe_pending_future = match self.cache.read().get(&key) { 73 Some(CachedFuture::Ready(value)) => return Ok(value.clone()), 74 Some(CachedFuture::Pending(future)) => Some(future.clone()), 75 None => None, 76 }; 77 78 let future = match maybe_pending_future { 79 Some(future) => future, 80 None => { 81 // The future does not exist yet, so now we're going to acquire a write lock, so we can start resolution: 82 let mut write_locked_cache = self.cache.write(); 83 84 // Repeat the read check first, a writer may have raced us and inserted the future: 85 match write_locked_cache.get(&key) { 86 Some(CachedFuture::Ready(value)) => return Ok(value.clone()), 87 Some(CachedFuture::Pending(future)) => future.clone(), 88 None => { 89 let future = self.create_resolver_future(resolver, &key); 90 write_locked_cache.insert(key, CachedFuture::Pending(future.clone())); 91 future 92 } 93 } 94 } 95 }; 96 97 future.await 98 } 99 100 /// Reduces over the cached futures, allowing one to introspect all of the resolved future values, or which futures are pending. 101 pub fn fold<I, F>(&self, mut init: I, reducer: F) -> I 102 where 103 F: Fn(I, (&K, CachedFutureRef<'_, T>)) -> I, 104 { 105 let cache = self.cache.read(); 106 for (k, v) in cache.iter() { 107 let future_ref = match v { 108 CachedFuture::Ready(v) => CachedFutureRef::Ready(v), 109 CachedFuture::Pending(_) => CachedFutureRef::Pending, 110 }; 111 112 init = (reducer)(init, (k, future_ref)); 113 } 114 115 init 116 } 117 118 /// Reduces over the ready cached futures, allowing one to introspect all of the resolved future values. 119 pub fn fold_ready<I, F>(&self, init: I, reducer: F) -> I 120 where 121 F: Fn(I, (&K, &T)) -> I, 122 { 123 self.fold(init, |init, (k, v)| match v { 124 CachedFutureRef::Ready(v) => (reducer)(init, (k, v)), 125 CachedFutureRef::Pending => init, 126 }) 127 } 128 129 // drop a cached future 130 pub fn drop(&self, key: &K) { 131 self.cache.write().remove(key); 132 } 133 134 /// Iterates over all the *ready* cached futures, retaining ones that are match the predicate function, and returning the number 135 /// of futures which have been disposed of (where the predicate function has returned false). 136 pub fn retain_ready<F>(&self, mut f: F) -> usize 137 where 138 F: FnMut(&T) -> bool, 139 { 140 let mut cache = self.cache.write(); 141 let mut num_disposed = 0; 142 cache.retain(|_, v| { 143 match v { 144 // Always retain pending entries, as they are in-flight. 145 CachedFuture::Pending(_) => true, 146 CachedFuture::Ready(r) => { 147 let should_retain = (f)(r); 148 149 if !should_retain { 150 num_disposed += 1; 151 } 152 153 should_retain 154 } 155 } 156 }); 157 158 num_disposed 159 } 160 161 fn create_resolver_future<F, U>( 162 &self, 163 resolver: F, 164 key: &K, 165 ) -> Shared<BoxFuture<'static, Result<T, E>>> 166 where 167 F: FnOnce(K) -> U, 168 U: Future<Output = Result<T, E>> + Send + 'static, 169 { 170 let resolver_future = (resolver)(key.clone()); 171 let cache = self.cache.clone(); 172 let key_clone = key.clone(); 173 async move { 174 let result = resolver_future.await; 175 match result { 176 Ok(result) => { 177 // If the result was a success, we'll cache that result now. 178 cache 179 .write() 180 .insert(key_clone, CachedFuture::Ready(result.clone())); 181 Ok(result) 182 } 183 Err(error) => { 184 // We will not cache errors! 185 cache.write().remove(&key_clone); 186 Err(error) 187 } 188 } 189 } 190 .boxed() 191 .shared() 192 } 193}