[READ ONLY MIRROR] Spark Social AppView Server
github.com/sprksocial/server
atproto
deno
hono
lexicon
1import mongoose, { Connection } from "mongoose";
2import { IdResolver, MemoryCache } from "@atp/identity";
3import * as models from "./models.ts";
4import { getResultFromDoc } from "../util.ts";
5import { getLogger } from "@logtape/logtape";
6import { ServerConfig } from "../../config.ts";
7
8const HOUR = 60 * 60 * 1000;
9const DAY = HOUR * 24;
10
11export class Database {
12 private connection!: Connection;
13 public models!: models.DatabaseModels;
14 public logger = getLogger(["appview", "database"]);
15 public idResolver: IdResolver;
16
17 constructor(private cfg: ServerConfig) {
18 this.idResolver = new IdResolver({
19 didCache: new MemoryCache(HOUR, DAY),
20 });
21 }
22
23 connect() {
24 const uri = this.cfg.dbUri;
25 const name = this.cfg.dbName;
26 const user = this.cfg.dbUser;
27 const pass = this.cfg.dbPass;
28 if (!uri) {
29 throw new Error("No database URI provided");
30 }
31 this.logger.info(`Connecting to ${uri}`);
32
33 try {
34 this.connection = mongoose.createConnection(uri, {
35 autoIndex: true,
36 autoCreate: true,
37 dbName: name,
38 user,
39 pass,
40 });
41
42 // Attach basic listeners for visibility
43 this.connection.on("connected", () => {
44 this.logger.info("MongoDB connection established");
45 });
46 this.connection.on("disconnected", () => {
47 this.logger.warn("MongoDB connection disconnected");
48 });
49 this.connection.on("error", (err) => {
50 this.logger.error("MongoDB connection error", { err });
51 });
52
53 // Initialize models
54 this.models = {
55 Record: this.connection.model<models.RecordDocument>(
56 "Record",
57 models.recordSchema,
58 ),
59 DuplicateRecord: this.connection.model<models.DuplicateRecordDocument>(
60 "DuplicateRecord",
61 models.duplicateRecordSchema,
62 ),
63 Like: this.connection.model<models.LikeDocument>(
64 "Like",
65 models.likeSchema,
66 ),
67 Post: this.connection.model<models.PostDocument>(
68 "Post",
69 models.postSchema,
70 ),
71 Reply: this.connection.model<models.ReplyDocument>(
72 "Reply",
73 models.replySchema,
74 ),
75 Story: this.connection.model<models.StoryDocument>(
76 "Story",
77 models.storySchema,
78 ),
79 Follow: this.connection.model<models.FollowDocument>(
80 "Follow",
81 models.followSchema,
82 ),
83 Block: this.connection.model<models.BlockDocument>(
84 "Block",
85 models.blockSchema,
86 ),
87 Profile: this.connection.model<models.ProfileDocument>(
88 "Profile",
89 models.profileSchema,
90 ),
91 Audio: this.connection.model<models.AudioDocument>(
92 "Audio",
93 models.audioSchema,
94 ),
95 Repost: this.connection.model<models.RepostDocument>(
96 "Repost",
97 models.repostSchema,
98 ),
99 Generator: this.connection.model<models.GeneratorDocument>(
100 "Generator",
101 models.generatorSchema,
102 ),
103 Takedown: this.connection.model<models.TakedownDocument>(
104 "Takedown",
105 models.takedownSchema,
106 ),
107 RepoTakedown: this.connection.model<models.RepoTakedownDocument>(
108 "RepoTakedown",
109 models.repoTakedownSchema,
110 ),
111 BlobTakedown: this.connection.model<models.BlobTakedownDocument>(
112 "BlobTakedown",
113 models.blobTakedownSchema,
114 ),
115 Actor: this.connection.model<models.ActorDocument>(
116 "Actor",
117 models.actorSchema,
118 ),
119 ActorSync: this.connection.model<models.ActorSyncDocument>(
120 "ActorSync",
121 models.actorSyncSchema,
122 ),
123 Preference: this.connection.model<models.PreferenceDocument>(
124 "Preference",
125 models.preferenceSchema,
126 ),
127 CursorState: this.connection.model<models.CursorStateDocument>(
128 "CursorState",
129 models.cursorStateSchema,
130 ),
131 };
132
133 this.logger.info("Started connection to MongoDB");
134 } catch (error) {
135 this.logger.error("Failed to start connection to MongoDB", { error });
136 throw error;
137 }
138 }
139
140 async disconnect(): Promise<void> {
141 if (this.connection) {
142 await this.connection.close();
143 this.logger.info("Disconnected from MongoDB");
144 }
145 }
146
147 // Add methods for DID resolution
148 async resolveHandle(handle: string): Promise<string | undefined> {
149 try {
150 return await this.idResolver.handle.resolve(handle);
151 } catch (err) {
152 this.logger.error("Failed to resolve handle", { err, handle });
153 return undefined;
154 }
155 }
156
157 async resolveDid(
158 did: string,
159 ): Promise<{ did: string; handle?: string } | undefined> {
160 try {
161 const data = await this.idResolver.did.resolveAtprotoData(did);
162 return {
163 did: data.did,
164 handle: data.handle,
165 };
166 } catch (err) {
167 this.logger.error("Failed to resolve DID", { err, did });
168 return undefined;
169 }
170 }
171
172 // Implement DataPlaneClient interface
173 async getIdentityByDid(
174 { did }: { did: string },
175 ): Promise<{ did: string; handle?: string } | undefined> {
176 const doc = await this.idResolver.did.resolve(did);
177 if (!doc) {
178 throw new Error("DID not found");
179 }
180 return getResultFromDoc(doc);
181 }
182
183 async getCursorState(): Promise<number | null> {
184 try {
185 const cursorState = await this.models.CursorState.findOne({
186 identifier: "last_processed_cursor",
187 });
188 return cursorState?.cursorValue || null;
189 } catch (error) {
190 this.logger.error("Failed to get cursor state", { error });
191 return null;
192 }
193 }
194
195 async saveCursorState(cursorPosition: number): Promise<void> {
196 try {
197 await this.models.CursorState.findOneAndUpdate(
198 { identifier: "last_processed_cursor" },
199 {
200 cursorValue: cursorPosition,
201 updatedAt: new Date(),
202 },
203 { upsert: true },
204 );
205 } catch (error) {
206 this.logger.error(
207 "Failed to save cursor state",
208 { error, cursorPosition },
209 );
210 }
211 }
212}