this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

hepa: periodically refresh auth session (#496)

This works around a current issue in prod where actions are failing to
persist because the session expires.

The real fix for the underlying issue (IMHO) is to re-implement
`xrpc.Client` to handle this in a more elegant and robust way. That is
sort-of tracked in https://github.com/bluesky-social/indigo/issues/378

I was initially confused why this was only coming up when taking
actions, because the client is used for a number of reads (GET) as well.
Those GETs mostly use *admin* auth (a token in the HTTP Authorization
header), not login/session auth (which requires refresh), which explains
why it mostly works, then fails for actions.

authored by

bnewbold and committed by
GitHub
f36f09d2 452b41b7

+46
+8
cmd/hepa/main.go
··· 193 193 } 194 194 }() 195 195 196 + if srv.engine.AdminClient != nil { 197 + go func() { 198 + if err := srv.RunRefreshAdminClient(ctx); err != nil { 199 + slog.Error("session refresh failed", "err", err) 200 + } 201 + }() 202 + } 203 + 196 204 // the main service loop 197 205 if err := srv.RunConsumer(ctx); err != nil { 198 206 return fmt.Errorf("failure consuming and processing firehose: %w", err)
+38
cmd/hepa/server.go
··· 191 191 return err 192 192 } 193 193 194 + // Periodically refreshes the engine's admin XRPC client JWT auth token. 195 + // 196 + // Expects to be run in a goroutine, and to be the only running code which touches the auth fields (aka, there is no locking). 197 + // TODO: this is a hack until we have an XRPC client which handles these details automatically. 198 + func (s *Server) RunRefreshAdminClient(ctx context.Context) error { 199 + if s.engine.AdminClient == nil { 200 + return nil 201 + } 202 + ac := s.engine.AdminClient 203 + ticker := time.NewTicker(1 * time.Hour) 204 + for { 205 + select { 206 + case <-ticker.C: 207 + // uses a temporary xrpc client instead of the existing one because we need to put refreshJwt in the position of accessJwt, and that would cause an error for any concurrent requests 208 + tmpClient := xrpc.Client{ 209 + Host: ac.Host, 210 + Auth: &xrpc.AuthInfo{ 211 + Did: ac.Auth.Did, 212 + Handle: ac.Auth.Handle, 213 + AccessJwt: ac.Auth.RefreshJwt, 214 + RefreshJwt: ac.Auth.RefreshJwt, 215 + }, 216 + } 217 + refresh, err := comatproto.ServerRefreshSession(ctx, &tmpClient) 218 + if err != nil { 219 + // don't return an error, just log, and attempt again on the next tick 220 + s.logger.Error("failed to refresh admin client session", "err", err, "host", ac.Host) 221 + } else { 222 + s.engine.AdminClient.Auth.RefreshJwt = refresh.RefreshJwt 223 + s.engine.AdminClient.Auth.AccessJwt = refresh.AccessJwt 224 + s.logger.Info("refreshed admin client session") 225 + } 226 + case <-ctx.Done(): 227 + return nil 228 + } 229 + } 230 + } 231 + 194 232 // this method runs in a loop, persisting the current cursor state every 5 seconds 195 233 func (s *Server) RunPersistCursor(ctx context.Context) error { 196 234