···6868 constraint hosts_pkey primary key (hostname)
6969 );
70707171+ create table if not exists resync_buffer (
7272+ did text not null,
7373+ rkey text not null,
7474+ event_rkey text not null,
7575+ db_created_at timestamptz not null default now(),
7676+7777+ constraint resync_buffer_pkey primary key (did, rkey, event_rkey),
7878+ constraint resync_buffer_repo_fkey foreign key (did, rkey)
7979+ references repos (did, rkey) on delete cascade
8080+ );
8181+7182 create index if not exists idx_repos_aturi on repos (at_uri);
7283 create index if not exists idx_repos_db_updated_at on repos (db_updated_at desc);
7384 create index if not exists idx_hosts_db_updated_at on hosts (db_updated_at desc);
-13
knotmirror/db/repos.go
···5454 return nil
5555}
56565757-func UpdateRepoState(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey, state models.RepoState) error {
5858- if _, err := e.ExecContext(ctx,
5959- `update repos
6060- set state = $1
6161- where did = $2 and rkey = $3`,
6262- state,
6363- did, rkey,
6464- ); err != nil {
6565- return fmt.Errorf("updating repo: %w", err)
6666- }
6767- return nil
6868-}
6969-7057func DeleteRepo(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey) error {
7158 if _, err := e.ExecContext(ctx,
7259 `delete from repos where did = $1 and rkey = $2`,
+217
knotmirror/db/resync_buffer.go
···11+package db
22+33+import (
44+ "context"
55+ "database/sql"
66+ "errors"
77+ "fmt"
88+99+ "github.com/bluesky-social/indigo/atproto/syntax"
1010+ "tangled.org/core/knotmirror/models"
1111+)
1212+1313+func BufferEvent(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey, eventRkey string) error {
1414+ tx, err := e.BeginTx(ctx, nil)
1515+ if err != nil {
1616+ return fmt.Errorf("begin tx: %w", err)
1717+ }
1818+ defer tx.Rollback()
1919+2020+ var state models.RepoState
2121+ if err := tx.QueryRowContext(ctx,
2222+ `select state from repos where did = $1 and rkey = $2 for update`,
2323+ did, rkey,
2424+ ).Scan(&state); err != nil {
2525+ if errors.Is(err, sql.ErrNoRows) {
2626+ return nil
2727+ }
2828+ return fmt.Errorf("locking repo: %w", err)
2929+ }
3030+ if state != models.RepoStateResyncing {
3131+ return nil
3232+ }
3333+3434+ if _, err := tx.ExecContext(ctx,
3535+ `insert into resync_buffer (did, rkey, event_rkey)
3636+ values ($1, $2, $3)
3737+ on conflict (did, rkey, event_rkey) do nothing`,
3838+ did, rkey, eventRkey,
3939+ ); err != nil {
4040+ return fmt.Errorf("buffering event: %w", err)
4141+ }
4242+4343+ if err := tx.Commit(); err != nil {
4444+ return fmt.Errorf("commit: %w", err)
4545+ }
4646+ return nil
4747+}
4848+4949+func MarkDesync(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey) error {
5050+ tx, err := e.BeginTx(ctx, nil)
5151+ if err != nil {
5252+ return fmt.Errorf("begin tx: %w", err)
5353+ }
5454+ defer tx.Rollback()
5555+5656+ var state models.RepoState
5757+ if err := tx.QueryRowContext(ctx,
5858+ `select state from repos where did = $1 and rkey = $2 for update`,
5959+ did, rkey,
6060+ ).Scan(&state); err != nil {
6161+ if errors.Is(err, sql.ErrNoRows) {
6262+ return nil
6363+ }
6464+ return fmt.Errorf("locking repo: %w", err)
6565+ }
6666+ switch state {
6767+ case models.RepoStateActive, models.RepoStateDesynchronized, models.RepoStateError:
6868+ default:
6969+ return nil
7070+ }
7171+7272+ if _, err := tx.ExecContext(ctx,
7373+ `update repos set state = $1 where did = $2 and rkey = $3`,
7474+ models.RepoStateDesynchronized, did, rkey,
7575+ ); err != nil {
7676+ return fmt.Errorf("marking desync: %w", err)
7777+ }
7878+7979+ if err := tx.Commit(); err != nil {
8080+ return fmt.Errorf("commit: %w", err)
8181+ }
8282+ return nil
8383+}
8484+8585+func CompleteResync(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey) error {
8686+ tx, err := e.BeginTx(ctx, nil)
8787+ if err != nil {
8888+ return fmt.Errorf("begin tx: %w", err)
8989+ }
9090+ defer tx.Rollback()
9191+9292+ var state models.RepoState
9393+ if err := tx.QueryRowContext(ctx,
9494+ `select state from repos where did = $1 and rkey = $2 for update`,
9595+ did, rkey,
9696+ ).Scan(&state); err != nil {
9797+ if errors.Is(err, sql.ErrNoRows) {
9898+ return nil
9999+ }
100100+ return fmt.Errorf("locking repo: %w", err)
101101+ }
102102+ if state != models.RepoStateResyncing {
103103+ return nil
104104+ }
105105+106106+ res, err := tx.ExecContext(ctx,
107107+ `delete from resync_buffer where did = $1 and rkey = $2`,
108108+ did, rkey,
109109+ )
110110+ if err != nil {
111111+ return fmt.Errorf("draining buffer: %w", err)
112112+ }
113113+ deleted, _ := res.RowsAffected()
114114+115115+ finalState := models.RepoStateActive
116116+ if deleted > 0 {
117117+ finalState = models.RepoStateDesynchronized
118118+ }
119119+120120+ if _, err := tx.ExecContext(ctx,
121121+ `update repos
122122+ set error_msg = '',
123123+ retry_count = 0,
124124+ retry_after = 0,
125125+ state = $1
126126+ where did = $2 and rkey = $3`,
127127+ finalState, did, rkey,
128128+ ); err != nil {
129129+ return fmt.Errorf("finalising repo: %w", err)
130130+ }
131131+132132+ return tx.Commit()
133133+}
134134+135135+func CancelResync(ctx context.Context, e *sql.DB, aturi syntax.ATURI) error {
136136+ tx, err := e.BeginTx(ctx, nil)
137137+ if err != nil {
138138+ return fmt.Errorf("begin tx: %w", err)
139139+ }
140140+ defer tx.Rollback()
141141+142142+ var did syntax.DID
143143+ var rkey syntax.RecordKey
144144+ var state models.RepoState
145145+ if err := tx.QueryRowContext(ctx,
146146+ `select did, rkey, state from repos where at_uri = $1 for update`,
147147+ aturi,
148148+ ).Scan(&did, &rkey, &state); err != nil {
149149+ if errors.Is(err, sql.ErrNoRows) {
150150+ return nil
151151+ }
152152+ return fmt.Errorf("locking repo: %w", err)
153153+ }
154154+ if state != models.RepoStateResyncing {
155155+ return nil
156156+ }
157157+158158+ if _, err := tx.ExecContext(ctx,
159159+ `delete from resync_buffer where did = $1 and rkey = $2`,
160160+ did, rkey,
161161+ ); err != nil {
162162+ return fmt.Errorf("draining buffer: %w", err)
163163+ }
164164+165165+ if _, err := tx.ExecContext(ctx,
166166+ `update repos set state = $1 where did = $2 and rkey = $3`,
167167+ models.RepoStateSuspended, did, rkey,
168168+ ); err != nil {
169169+ return fmt.Errorf("suspending repo: %w", err)
170170+ }
171171+172172+ return tx.Commit()
173173+}
174174+175175+func FailResync(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey, state models.RepoState, errMsg string, retryCount int, retryAfter int64) error {
176176+ tx, err := e.BeginTx(ctx, nil)
177177+ if err != nil {
178178+ return fmt.Errorf("begin tx: %w", err)
179179+ }
180180+ defer tx.Rollback()
181181+182182+ var current models.RepoState
183183+ if err := tx.QueryRowContext(ctx,
184184+ `select state from repos where did = $1 and rkey = $2 for update`,
185185+ did, rkey,
186186+ ).Scan(¤t); err != nil {
187187+ if errors.Is(err, sql.ErrNoRows) {
188188+ return nil
189189+ }
190190+ return fmt.Errorf("locking repo: %w", err)
191191+ }
192192+ if current != models.RepoStateResyncing {
193193+ return nil
194194+ }
195195+196196+ if _, err := tx.ExecContext(ctx,
197197+ `delete from resync_buffer where did = $1 and rkey = $2`,
198198+ did, rkey,
199199+ ); err != nil {
200200+ return fmt.Errorf("draining buffer: %w", err)
201201+ }
202202+203203+ if _, err := tx.ExecContext(ctx,
204204+ `update repos
205205+ set error_msg = $1,
206206+ retry_count = $2,
207207+ retry_after = $3,
208208+ state = $4
209209+ where did = $5 and rkey = $6`,
210210+ errMsg, retryCount, retryAfter, state,
211211+ did, rkey,
212212+ ); err != nil {
213213+ return fmt.Errorf("finalising repo: %w", err)
214214+ }
215215+216216+ return tx.Commit()
217217+}