very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[lib] update FilterControl to return a builder for easier application

dawn 394c057e d1d19573

+189 -142
+7 -3
examples/statusphere.rs
··· 173 173 174 174 // discover only repos that publish xyz.statusphere.status records, 175 175 // and only store that collection (all other record types are dropped). 176 - hydrant.filter.set_mode(FilterMode::Filter).await?; 177 - hydrant.filter.set_signals([COLLECTION]).await?; 178 - hydrant.filter.set_collections([COLLECTION]).await?; 176 + hydrant 177 + .filter 178 + .set_mode(FilterMode::Filter) 179 + .set_signals([COLLECTION]) 180 + .set_collections([COLLECTION]) 181 + .apply() 182 + .await?; 179 183 180 184 // replay all persisted events from the start to rebuild the in-memory index, 181 185 // then switch to live tail. since the index is in-memory, we always need the
+9 -6
src/api/filter.rs
··· 1 - use crate::control::Hydrant; 1 + use crate::control::{FilterPatch, Hydrant}; 2 2 use crate::filter::{FilterMode, SetUpdate}; 3 3 use axum::{ 4 4 Json, Router, ··· 28 28 } 29 29 30 30 #[derive(Deserialize)] 31 - pub struct FilterPatch { 31 + pub struct PatchFilterBody { 32 32 pub mode: Option<FilterMode>, 33 33 pub signals: Option<SetUpdate>, 34 34 pub collections: Option<SetUpdate>, ··· 37 37 38 38 pub async fn handle_patch_filter( 39 39 State(hydrant): State<Hydrant>, 40 - Json(patch): Json<FilterPatch>, 40 + Json(body): Json<PatchFilterBody>, 41 41 ) -> Result<Json<FilterSnapshot>, (StatusCode, String)> { 42 - hydrant 43 - .filter 44 - .patch(patch.mode, patch.signals, patch.collections, patch.excludes) 42 + let mut p = FilterPatch::new(&hydrant.filter); 43 + p.mode = body.mode; 44 + p.signals = body.signals; 45 + p.collections = body.collections; 46 + p.excludes = body.excludes; 47 + p.apply() 45 48 .await 46 49 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())) 47 50 .map(Json)
+173 -133
src/control.rs
··· 789 789 } 790 790 791 791 /// set the indexing mode. see [`FilterControl`] for mode semantics. 792 - pub async fn set_mode(&self, mode: FilterMode) -> Result<FilterSnapshot> { 793 - self.patch(Some(mode), None, None, None).await 792 + pub fn set_mode(&self, mode: FilterMode) -> FilterPatch { 793 + FilterPatch::new(self).set_mode(mode) 794 794 } 795 795 796 796 /// replace the entire signals set. existing signals are removed. 797 - pub async fn set_signals( 798 - &self, 799 - signals: impl IntoIterator<Item = impl Into<String>>, 800 - ) -> Result<FilterSnapshot> { 801 - self.patch( 802 - None, 803 - Some(SetUpdate::Set( 804 - signals.into_iter().map(Into::into).collect(), 805 - )), 806 - None, 807 - None, 808 - ) 809 - .await 797 + pub fn set_signals(&self, signals: impl IntoIterator<Item = impl Into<String>>) -> FilterPatch { 798 + FilterPatch::new(self).set_signals(signals) 810 799 } 811 800 812 801 /// add multiple signals without disturbing existing ones. 813 - pub async fn append_signals( 802 + pub fn append_signals( 814 803 &self, 815 804 signals: impl IntoIterator<Item = impl Into<String>>, 816 - ) -> Result<FilterSnapshot> { 817 - self.patch( 818 - None, 819 - Some(SetUpdate::Patch( 820 - signals.into_iter().map(|s| (s.into(), true)).collect(), 821 - )), 822 - None, 823 - None, 824 - ) 825 - .await 805 + ) -> FilterPatch { 806 + FilterPatch::new(self).append_signals(signals) 826 807 } 827 808 828 809 /// add a single signal. no-op if already present. 829 - pub async fn add_signal(&self, signal: impl Into<String>) -> Result<FilterSnapshot> { 830 - self.patch( 831 - None, 832 - Some(SetUpdate::Patch([(signal.into(), true)].into())), 833 - None, 834 - None, 835 - ) 836 - .await 810 + pub fn add_signal(&self, signal: impl Into<String>) -> FilterPatch { 811 + FilterPatch::new(self).add_signal(signal) 837 812 } 838 813 839 814 /// remove a single signal. no-op if not present. 840 - pub async fn remove_signal(&self, signal: impl Into<String>) -> Result<FilterSnapshot> { 841 - self.patch( 842 - None, 843 - Some(SetUpdate::Patch([(signal.into(), false)].into())), 844 - None, 845 - None, 846 - ) 847 - .await 815 + pub fn remove_signal(&self, signal: impl Into<String>) -> FilterPatch { 816 + FilterPatch::new(self).remove_signal(signal) 848 817 } 849 818 850 819 /// replace the entire collections set. pass an empty iterator to store all collections. 851 - pub async fn set_collections( 820 + pub fn set_collections( 852 821 &self, 853 822 collections: impl IntoIterator<Item = impl Into<String>>, 854 - ) -> Result<FilterSnapshot> { 855 - self.patch( 856 - None, 857 - None, 858 - Some(SetUpdate::Set( 859 - collections.into_iter().map(Into::into).collect(), 860 - )), 861 - None, 862 - ) 863 - .await 823 + ) -> FilterPatch { 824 + FilterPatch::new(self).set_collections(collections) 864 825 } 865 826 866 827 /// add multiple collections without disturbing existing ones. 867 - pub async fn append_collections( 828 + pub fn append_collections( 868 829 &self, 869 830 collections: impl IntoIterator<Item = impl Into<String>>, 870 - ) -> Result<FilterSnapshot> { 871 - self.patch( 872 - None, 873 - None, 874 - Some(SetUpdate::Patch( 875 - collections.into_iter().map(|c| (c.into(), true)).collect(), 876 - )), 877 - None, 878 - ) 879 - .await 831 + ) -> FilterPatch { 832 + FilterPatch::new(self).append_collections(collections) 880 833 } 881 834 882 835 /// add a single collection filter. no-op if already present. 883 - pub async fn add_collection(&self, collection: impl Into<String>) -> Result<FilterSnapshot> { 884 - self.patch( 885 - None, 886 - None, 887 - Some(SetUpdate::Patch([(collection.into(), true)].into())), 888 - None, 889 - ) 890 - .await 836 + pub fn add_collection(&self, collection: impl Into<String>) -> FilterPatch { 837 + FilterPatch::new(self).add_collection(collection) 891 838 } 892 839 893 840 /// remove a single collection filter. no-op if not present. 894 - pub async fn remove_collection(&self, collection: impl Into<String>) -> Result<FilterSnapshot> { 895 - self.patch( 896 - None, 897 - None, 898 - Some(SetUpdate::Patch([(collection.into(), false)].into())), 899 - None, 900 - ) 901 - .await 841 + pub fn remove_collection(&self, collection: impl Into<String>) -> FilterPatch { 842 + FilterPatch::new(self).remove_collection(collection) 902 843 } 903 844 904 845 /// replace the entire excludes set. 905 - pub async fn set_excludes( 846 + pub fn set_excludes( 906 847 &self, 907 848 excludes: impl IntoIterator<Item = impl Into<String>>, 908 - ) -> Result<FilterSnapshot> { 909 - self.patch( 910 - None, 911 - None, 912 - None, 913 - Some(SetUpdate::Set( 914 - excludes.into_iter().map(Into::into).collect(), 915 - )), 916 - ) 917 - .await 849 + ) -> FilterPatch { 850 + FilterPatch::new(self).set_excludes(excludes) 918 851 } 919 852 920 853 /// add multiple DIDs to the excludes set without disturbing existing ones. 921 - pub async fn append_excludes( 854 + pub fn append_excludes( 922 855 &self, 923 856 excludes: impl IntoIterator<Item = impl Into<String>>, 924 - ) -> Result<FilterSnapshot> { 925 - self.patch( 926 - None, 927 - None, 928 - None, 929 - Some(SetUpdate::Patch( 930 - excludes.into_iter().map(|d| (d.into(), true)).collect(), 931 - )), 932 - ) 933 - .await 857 + ) -> FilterPatch { 858 + FilterPatch::new(self).append_excludes(excludes) 934 859 } 935 860 936 861 /// add a single DID to the excludes set. no-op if already excluded. 937 - pub async fn add_exclude(&self, did: impl Into<String>) -> Result<FilterSnapshot> { 938 - self.patch( 939 - None, 940 - None, 941 - None, 942 - Some(SetUpdate::Patch([(did.into(), true)].into())), 943 - ) 944 - .await 862 + pub fn add_exclude(&self, did: impl Into<String>) -> FilterPatch { 863 + FilterPatch::new(self).add_exclude(did) 945 864 } 946 865 947 866 /// remove a single DID from the excludes set. no-op if not present. 948 - pub async fn remove_exclude(&self, did: impl Into<String>) -> Result<FilterSnapshot> { 949 - self.patch( 950 - None, 951 - None, 952 - None, 953 - Some(SetUpdate::Patch([(did.into(), false)].into())), 954 - ) 955 - .await 867 + pub fn remove_exclude(&self, did: impl Into<String>) -> FilterPatch { 868 + FilterPatch::new(self).remove_exclude(did) 956 869 } 870 + } 957 871 958 - /// apply a batch patch atomically. all provided fields are updated in a single db transaction. 959 - /// returns the updated [`FilterSnapshot`]. this is the primitive all other `FilterControl` methods delegate to. 960 - pub async fn patch( 961 - &self, 962 - mode: Option<FilterMode>, 963 - signals: Option<SetUpdate>, 964 - collections: Option<SetUpdate>, 965 - excludes: Option<SetUpdate>, 966 - ) -> Result<FilterSnapshot> { 967 - let filter_ks = self.0.db.filter.clone(); 968 - let inner = self.0.db.inner.clone(); 969 - let filter_handle = self.0.filter.clone(); 872 + /// a staged set of filter mutations. all methods accumulate changes without touching 873 + /// the database. call [`FilterPatch::apply`] to commit the entire patch atomically. 874 + /// 875 + /// obtain an instance by calling any mutation method on [`FilterControl`], or via 876 + /// [`FilterPatch::new`] to start from a blank patch. 877 + pub struct FilterPatch { 878 + state: Arc<AppState>, 879 + /// if set, replaces the current indexing mode. 880 + pub mode: Option<FilterMode>, 881 + /// if set, replaces or patches the signals set. 882 + pub signals: Option<SetUpdate>, 883 + /// if set, replaces or patches the collections set. 884 + pub collections: Option<SetUpdate>, 885 + /// if set, replaces or patches the excludes set. 886 + pub excludes: Option<SetUpdate>, 887 + } 888 + 889 + impl FilterPatch { 890 + /// create a new blank patch associated with the given [`FilterControl`]. 891 + pub fn new(control: &FilterControl) -> Self { 892 + Self { 893 + state: control.0.clone(), 894 + mode: None, 895 + signals: None, 896 + collections: None, 897 + excludes: None, 898 + } 899 + } 900 + 901 + /// set the indexing mode. see [`FilterControl`] for mode semantics. 902 + pub fn set_mode(mut self, mode: FilterMode) -> Self { 903 + self.mode = Some(mode); 904 + self 905 + } 906 + 907 + /// replace the entire signals set. existing signals are removed. 908 + pub fn set_signals(mut self, signals: impl IntoIterator<Item = impl Into<String>>) -> Self { 909 + self.signals = Some(SetUpdate::Set( 910 + signals.into_iter().map(Into::into).collect(), 911 + )); 912 + self 913 + } 914 + 915 + /// add multiple signals without disturbing existing ones. 916 + pub fn append_signals(mut self, signals: impl IntoIterator<Item = impl Into<String>>) -> Self { 917 + self.signals = Some(SetUpdate::Patch( 918 + signals.into_iter().map(|s| (s.into(), true)).collect(), 919 + )); 920 + self 921 + } 922 + 923 + /// add a single signal. no-op if already present. 924 + pub fn add_signal(mut self, signal: impl Into<String>) -> Self { 925 + self.signals = Some(SetUpdate::Patch([(signal.into(), true)].into())); 926 + self 927 + } 928 + 929 + /// remove a single signal. no-op if not present. 930 + pub fn remove_signal(mut self, signal: impl Into<String>) -> Self { 931 + self.signals = Some(SetUpdate::Patch([(signal.into(), false)].into())); 932 + self 933 + } 934 + 935 + /// replace the entire collections set. pass an empty iterator to store all collections. 936 + pub fn set_collections( 937 + mut self, 938 + collections: impl IntoIterator<Item = impl Into<String>>, 939 + ) -> Self { 940 + self.collections = Some(SetUpdate::Set( 941 + collections.into_iter().map(Into::into).collect(), 942 + )); 943 + self 944 + } 945 + 946 + /// add multiple collections without disturbing existing ones. 947 + pub fn append_collections( 948 + mut self, 949 + collections: impl IntoIterator<Item = impl Into<String>>, 950 + ) -> Self { 951 + self.collections = Some(SetUpdate::Patch( 952 + collections.into_iter().map(|c| (c.into(), true)).collect(), 953 + )); 954 + self 955 + } 956 + 957 + /// add a single collection filter. no-op if already present. 958 + pub fn add_collection(mut self, collection: impl Into<String>) -> Self { 959 + self.collections = Some(SetUpdate::Patch([(collection.into(), true)].into())); 960 + self 961 + } 962 + 963 + /// remove a single collection filter. no-op if not present. 964 + pub fn remove_collection(mut self, collection: impl Into<String>) -> Self { 965 + self.collections = Some(SetUpdate::Patch([(collection.into(), false)].into())); 966 + self 967 + } 968 + 969 + /// replace the entire excludes set. 970 + pub fn set_excludes(mut self, excludes: impl IntoIterator<Item = impl Into<String>>) -> Self { 971 + self.excludes = Some(SetUpdate::Set( 972 + excludes.into_iter().map(Into::into).collect(), 973 + )); 974 + self 975 + } 976 + 977 + /// add multiple DIDs to the excludes set without disturbing existing ones. 978 + pub fn append_excludes( 979 + mut self, 980 + excludes: impl IntoIterator<Item = impl Into<String>>, 981 + ) -> Self { 982 + self.excludes = Some(SetUpdate::Patch( 983 + excludes.into_iter().map(|d| (d.into(), true)).collect(), 984 + )); 985 + self 986 + } 987 + 988 + /// add a single DID to the excludes set. no-op if already excluded. 989 + pub fn add_exclude(mut self, did: impl Into<String>) -> Self { 990 + self.excludes = Some(SetUpdate::Patch([(did.into(), true)].into())); 991 + self 992 + } 993 + 994 + /// remove a single DID from the excludes set. no-op if not present. 995 + pub fn remove_exclude(mut self, did: impl Into<String>) -> Self { 996 + self.excludes = Some(SetUpdate::Patch([(did.into(), false)].into())); 997 + self 998 + } 999 + 1000 + /// commit the patch atomically to the database and update the in-memory filter. 1001 + /// returns the updated [`FilterSnapshot`]. 1002 + pub async fn apply(self) -> Result<FilterSnapshot> { 1003 + let filter_ks = self.state.db.filter.clone(); 1004 + let inner = self.state.db.inner.clone(); 1005 + let filter_handle = self.state.filter.clone(); 1006 + let mode = self.mode; 1007 + let signals = self.signals; 1008 + let collections = self.collections; 1009 + let excludes = self.excludes; 970 1010 971 1011 let new_filter = tokio::task::spawn_blocking(move || { 972 1012 let mut batch = inner.batch(); ··· 977 1017 .await 978 1018 .into_diagnostic()??; 979 1019 980 - let excludes = { 981 - let filter_ks = self.0.db.filter.clone(); 1020 + let exclude_list = { 1021 + let filter_ks = self.state.db.filter.clone(); 982 1022 tokio::task::spawn_blocking(move || { 983 1023 db_filter::read_set(&filter_ks, db_filter::EXCLUDE_PREFIX) 984 1024 }) ··· 994 1034 .iter() 995 1035 .map(|s| s.to_string()) 996 1036 .collect(), 997 - excludes, 1037 + excludes: exclude_list, 998 1038 }; 999 1039 1000 1040 filter_handle.store(Arc::new(new_filter));