[mirror] a bluesky bot to post golang projects
4
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge pull request #12 from till/improve-s3-cache

authored by

Till! and committed by
GitHub
fdaddedc fe167de0

+312 -122
+27
.github/workflows/pr.yml
··· 1 + name: pr 2 + 3 + on: 4 + pull_request: 5 + 6 + concurrency: 7 + group: pr-${{ github.event.number }} 8 + cancel-in-progress: true 9 + 10 + jobs: 11 + lint: 12 + runs-on: ubuntu-latest 13 + steps: 14 + - uses: actions/checkout@v4 15 + - uses: actions/setup-go@v5 16 + with: 17 + go-version-file: go.mod 18 + - uses: docker://morphy/revive-action:v2 19 + 20 + test: 21 + runs-on: ubuntu-latest 22 + steps: 23 + - uses: actions/checkout@v4 24 + - uses: actions/setup-go@v5 25 + with: 26 + go-version-file: go.mod 27 + - run: go test -v ./...
+86 -65
cmd/bot/main.go
··· 1 + // package main is the entry point for this application 1 2 package main 2 3 3 4 import ( ··· 14 15 bk "github.com/tailscale/go-bluesky" 15 16 "github.com/till/golangoss-bluesky/internal/bluesky" 16 17 "github.com/till/golangoss-bluesky/internal/content" 18 + "github.com/till/golangoss-bluesky/internal/utils" 17 19 "github.com/urfave/cli/v2" 18 20 ) 19 21 20 22 var ( 21 - blueskyHandle string = "till+bluesky-golang@lagged.biz" 22 - blueskyAppKey string = "" 23 - 24 - cacheBucket string = "golangoss-cache-bucket" 25 - 26 - ctx context.Context 23 + blueskyHandle = "till+bluesky-golang@lagged.biz" 24 + blueskyAppKey = "" 27 25 28 26 // for cache 29 - awsEndpoint string = "" 30 - awsAccessKeyId string = "" 31 - awsSecretKey string = "" 27 + awsEndpoint = "" 28 + awsAccessKeyID = "" 29 + awsSecretKey = "" 30 + cacheBucket = "golangoss-cache-bucket" 32 31 33 32 // for github crawling 34 - githubToken string = "" 33 + githubToken = "" 35 34 36 35 checkInterval time.Duration = 15 * time.Minute 36 + // How long to wait before retrying after a connection failure 37 + reconnectDelay time.Duration = 2 * time.Minute 37 38 ) 38 39 39 40 func init() { 40 41 slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ 41 42 Level: slog.LevelDebug, 42 43 }))) 44 + } 45 + 46 + // connectBluesky establishes a connection to Bluesky and logs in 47 + func connectBluesky(ctx context.Context) (*bk.Client, error) { 48 + client, err := bk.Dial(ctx, bk.ServerBskySocial) 49 + if err != nil { 50 + return nil, fmt.Errorf("failed to open connection: %v", err) 51 + } 43 52 44 - ctx = context.Background() 53 + if err := client.Login(ctx, blueskyHandle, blueskyAppKey); err != nil { 54 + client.Close() 55 + switch { 56 + case errors.Is(err, bk.ErrMasterCredentials): 57 + return nil, fmt.Errorf("you're not allowed to use your full-access credentials, please create an appkey") 58 + case errors.Is(err, bk.ErrLoginUnauthorized): 59 + return nil, fmt.Errorf("username of application password seems incorrect, please double check") 60 + default: 61 + return nil, fmt.Errorf("login failed: %v", err) 62 + } 63 + } 64 + 65 + return client, nil 66 + } 67 + 68 + // runWithReconnect attempts to run the bot with automatic reconnection on failure 69 + func runWithReconnect(ctx context.Context, mc *minio.Client) error { 70 + for { 71 + client, err := connectBluesky(ctx) 72 + if err != nil { 73 + slog.Error("failed to connect to Bluesky", "error", err) 74 + slog.Info("retrying connection", "delay", reconnectDelay) 75 + time.Sleep(reconnectDelay) 76 + continue 77 + } 78 + 79 + c := bluesky.Client{ 80 + Client: client, 81 + } 82 + 83 + cacheClient := content.NewCacheClientS3(ctx, mc, cacheBucket) 84 + 85 + // Initialize and start the cleanup handler 86 + cleanup := content.NewS3Cleanup(mc, cacheBucket) 87 + cleanup.Start(ctx) 88 + defer cleanup.Stop() 89 + 90 + if err := content.Start(githubToken, cacheClient); err != nil { 91 + slog.Error("failed to start service", "error", err) 92 + client.Close() 93 + time.Sleep(reconnectDelay) 94 + continue 95 + } 96 + 97 + // Run the main loop 98 + for { 99 + slog.DebugContext(ctx, "checking...") 100 + if err := content.Do(ctx, c); err != nil { 101 + if !errors.Is(err, content.ErrCouldNotContent) { 102 + slog.Error("error during content check", "error", err) 103 + client.Close() 104 + time.Sleep(reconnectDelay) 105 + break 106 + } 107 + slog.DebugContext(ctx, "backing off...") 108 + } 109 + 110 + time.Sleep(checkInterval) 111 + } 112 + } 45 113 } 46 114 47 115 func main() { ··· 71 139 Name: "aws-access-key-id", 72 140 EnvVars: []string{"AWS_ACCESS_KEY_ID"}, 73 141 Required: true, 74 - Destination: &awsAccessKeyId, 142 + Destination: &awsAccessKeyID, 75 143 }, 76 144 &cli.StringFlag{ 77 145 Name: "aws-secret-key", ··· 88 156 }, 89 157 90 158 Action: func(cCtx *cli.Context) error { 91 - // FIXME: run this in a control loop; or we crash the app 92 - client, err := bk.Dial(ctx, bk.ServerBskySocial) 93 - if err != nil { 94 - return fmt.Errorf("failed to open connection: %v", err) 95 - } 96 - defer client.Close() 97 - 98 - if err := client.Login(ctx, blueskyHandle, blueskyAppKey); err != nil { 99 - switch { 100 - case errors.Is(err, bk.ErrMasterCredentials): 101 - return fmt.Errorf("you're not allowed to use your full-access credentials, please create an appkey") 102 - case errors.Is(err, bk.ErrLoginUnauthorized): 103 - return fmt.Errorf("username of application password seems incorrect, please double check") 104 - default: 105 - return fmt.Errorf("something else went wrong, please look at the returned error") 106 - } 107 - } 108 - 109 - // init s3 client 159 + // Initialize S3 client 110 160 mc, err := minio.New(awsEndpoint, &minio.Options{ 111 - Creds: credentials.NewStaticV4(awsAccessKeyId, awsSecretKey, ""), 161 + Creds: credentials.NewStaticV4(awsAccessKeyID, awsSecretKey, ""), 112 162 Secure: true, 113 163 }) 114 164 if err != nil { 115 165 return fmt.Errorf("failed to initialize minio client: %v", err) 116 166 } 117 167 118 - // ensure the bucket exists 119 - if err := mc.MakeBucket(ctx, cacheBucket, minio.MakeBucketOptions{}); err != nil { 168 + // Ensure the bucket exists 169 + if err := mc.MakeBucket(cCtx.Context, cacheBucket, minio.MakeBucketOptions{}); err != nil { 120 170 return fmt.Errorf("failed to create bucket: %v", err) 121 171 } 122 172 123 - c := bluesky.Client{ 124 - Client: client, 125 - } 126 - 127 - cacheClient := &content.CacheClientS3{ 128 - MC: mc, 129 - Bucket: cacheBucket, 130 - CTX: ctx, 131 - } 132 - 133 - if err := content.Start(githubToken, cacheClient); err != nil { 134 - return fmt.Errorf("failed to start service: %v", err) 135 - } 136 - 137 - var runErr error 138 - 139 - for { 140 - slog.DebugContext(ctx, "checking...") 141 - if err := content.Do(ctx, c); err != nil { 142 - if !errors.Is(err, content.ErrCouldNotContent) { 143 - runErr = err 144 - break 145 - } 146 - slog.DebugContext(ctx, "backing off...") 147 - } 148 - 149 - time.Sleep(checkInterval) 150 - } 151 - return runErr 173 + return runWithReconnect(cCtx.Context, mc) 152 174 }, 153 175 } 154 176 155 177 if err := bot.Run(os.Args); err != nil { 156 - slog.ErrorContext(ctx, err.Error()) 178 + utils.LogError(err) 157 179 os.Exit(1) 158 180 } 159 - 160 181 }
+2 -1
internal/bluesky/bluesky.go
··· 16 16 bk "github.com/tailscale/go-bluesky" 17 17 ) 18 18 19 + // Client wraps the official bluesky sdk 19 20 type Client struct { 20 21 Client *bk.Client 21 22 } ··· 51 52 text += "\n\n" + hashtags 52 53 } 53 54 54 - var startRepoURL int64 = 0 55 + var startRepoURL int64 55 56 56 57 facets := []*bsky.RichtextFacet{} 57 58 facets = append(facets, addFacet(
+2
internal/bluesky/doc.go
··· 1 + // Package bluesky wraps the official bluesky sdk 2 + package bluesky
-39
internal/content/cache.go
··· 1 - package content 2 - 3 - import ( 4 - "context" 5 - "log/slog" 6 - "sync" 7 - "time" 8 - 9 - "github.com/go-redis/redis/v8" 10 - ) 11 - 12 - // CacheClientProcess is an in process cache that adheres to larry's interface 13 - type CacheClientProcess struct { 14 - store sync.Map 15 - } 16 - 17 - func (c *CacheClientProcess) Set(key string, value interface{}, exp time.Duration) error { 18 - slog.Debug("set", slog.String("key", key), slog.Any("value", value)) 19 - c.store.Store(key, value) 20 - return nil 21 - } 22 - 23 - func (c *CacheClientProcess) Get(key string) (string, error) { 24 - slog.Debug("get", slog.String("key", key)) 25 - val, status := c.store.Load(key) 26 - if !status { 27 - return "", redis.Nil 28 - } 29 - return val.(string), nil 30 - } 31 - 32 - func (c *CacheClientProcess) Del(key string) error { 33 - c.store.Delete(key) 34 - return nil 35 - } 36 - 37 - func (c *CacheClientProcess) Scan(key string, action func(context.Context, string) error) error { 38 - return nil 39 - }
+8 -2
internal/content/content.go
··· 3 3 import ( 4 4 "context" 5 5 "errors" 6 + "fmt" 6 7 "log/slog" 7 8 "strings" 8 9 ··· 10 11 "github.com/ezeoleaf/larry/config" 11 12 "github.com/ezeoleaf/larry/provider/github" 12 13 "github.com/till/golangoss-bluesky/internal/bluesky" 14 + "github.com/till/golangoss-bluesky/internal/utils" 13 15 ) 14 16 15 17 var ( 16 - provider github.Provider 18 + provider github.Provider 19 + 20 + // ErrCouldNotContent is returned when content cannot be fetched 17 21 ErrCouldNotContent = errors.New("could not get content") 18 22 ) 19 23 24 + // Start bootstraps the content provider 20 25 func Start(token string, cacheClient cache.Client) error { 21 26 cfg := config.Config{ 22 27 Language: "go", ··· 26 31 return nil 27 32 } 28 33 34 + // Do gets content from the provider and posts it to bluesky 29 35 func Do(ctx context.Context, c bluesky.Client) error { 30 36 p, err := provider.GetContentToPublish() 31 37 if err != nil { 32 - slog.Error("error fetching content", slog.Any("err", err)) 38 + utils.LogError(fmt.Errorf("error fetching content: %w", err)) 33 39 return ErrCouldNotContent 34 40 } 35 41
+2
internal/content/doc.go
··· 1 + // Package content provides a custom cache implementation 2 + package content
+52 -15
internal/content/s3_cache.go
··· 13 13 14 14 // CacheClientS3 is a small cache that is backed by an S3-compatible store 15 15 type CacheClientS3 struct { 16 - MC *minio.Client 17 - Bucket string 18 - CTX context.Context 16 + mc *minio.Client 17 + bucket string 18 + ctx context.Context 19 + defaultExpiration time.Duration 19 20 } 20 21 21 - func (c *CacheClientS3) Set(key string, value interface{}, exp time.Duration) error { 22 - data, err := json.Marshal(value) 23 - if err != nil { 22 + // NewCacheClientS3 creates a new S3 cache client with default settings 23 + func NewCacheClientS3(ctx context.Context, mc *minio.Client, bucket string) *CacheClientS3 { 24 + return &CacheClientS3{ 25 + mc: mc, 26 + bucket: bucket, 27 + ctx: ctx, 28 + defaultExpiration: 24 * time.Hour, 29 + } 30 + } 31 + 32 + // Set sets a value in the cache 33 + func (c *CacheClientS3) Set(key string, value any, exp time.Duration) error { 34 + var data bytes.Buffer 35 + if err := json.NewEncoder(&data).Encode(value); err != nil { 24 36 return err 25 37 } 26 38 27 - r := bytes.NewReader(data) 39 + r := bytes.NewReader(data.Bytes()) 40 + 41 + // Use the provided expiration time or fall back to default 42 + expiration := exp 43 + if expiration == 0 { 44 + expiration = c.defaultExpiration 45 + } 46 + 47 + // Calculate the expiration time 48 + expiresAt := time.Now().Add(expiration) 49 + 50 + // Set metadata to track expiration 51 + metadata := map[string]string{ 52 + "expires-at": expiresAt.Format(time.RFC3339), 53 + } 28 54 29 - _, err = c.MC.PutObject(c.CTX, c.Bucket, key, r, int64(r.Len()), minio.PutObjectOptions{ 30 - Expires: time.Now().Add(exp), 55 + _, err := c.mc.PutObject(c.ctx, c.bucket, key, r, int64(r.Len()), minio.PutObjectOptions{ 56 + UserMetadata: metadata, 31 57 }) 32 58 return err 33 59 } ··· 36 62 // does not exist, in other case we can use minio.ToErrorResponse(err) to extract more details about the 37 63 // potential S3 related error 38 64 func (c *CacheClientS3) Get(key string) (string, error) { 39 - if _, err := c.MC.StatObject(c.CTX, c.Bucket, key, minio.StatObjectOptions{}); err != nil { 65 + // First check if object exists and get its metadata 66 + objInfo, err := c.mc.StatObject(c.ctx, c.bucket, key, minio.StatObjectOptions{}) 67 + if err != nil { 40 68 return "", redis.Nil 41 69 } 42 70 43 - object, err := c.MC.GetObject(c.CTX, c.Bucket, key, minio.GetObjectOptions{}) 71 + if expiresAt, ok := objInfo.UserMetadata["expires-at"]; ok { 72 + expTime, err := time.Parse(time.RFC3339, expiresAt) 73 + if err == nil && time.Now().After(expTime) { 74 + // Object has expired, delete it and return not found 75 + _ = c.Del(key) // Ignore delete error 76 + return "", redis.Nil 77 + } 78 + } 79 + 80 + object, err := c.mc.GetObject(c.ctx, c.bucket, key, minio.GetObjectOptions{}) 44 81 if err != nil { 45 82 return "", err 46 83 } 47 84 48 85 var val any 49 - 50 86 if err := json.NewDecoder(object).Decode(&val); err != nil { 51 87 return "", err 52 88 } ··· 62 98 } 63 99 } 64 100 101 + // Del deletes a value from the cache 65 102 func (c *CacheClientS3) Del(key string) error { 66 - return c.MC.RemoveObject(c.CTX, c.Bucket, key, minio.RemoveObjectOptions{ 103 + return c.mc.RemoveObject(c.ctx, c.bucket, key, minio.RemoveObjectOptions{ 67 104 ForceDelete: true, 68 105 }) 69 106 } 70 107 71 - func (c *CacheClientS3) Scan(key string, action func(context.Context, string) error) error { 72 - 108 + // Scan satisfies the interface for the cache client 109 + func (c *CacheClientS3) Scan(_ string, _ func(context.Context, string) error) error { 73 110 return nil 74 111 }
+111
internal/content/s3_cleanup.go
··· 1 + package content 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "time" 7 + 8 + "github.com/minio/minio-go/v7" 9 + "github.com/till/golangoss-bluesky/internal/utils" 10 + ) 11 + 12 + // S3Cleanup handles background cleanup of expired objects in S3 13 + type S3Cleanup struct { 14 + mc *minio.Client 15 + bucket string 16 + // cleanupInterval is how often to run the cleanup routine 17 + cleanupInterval time.Duration 18 + // stopCleanup is used to signal the cleanup routine to stop 19 + stopCleanup chan struct{} 20 + } 21 + 22 + // NewS3Cleanup creates a new S3 cleanup handler 23 + func NewS3Cleanup(mc *minio.Client, bucket string) *S3Cleanup { 24 + return &S3Cleanup{ 25 + mc: mc, 26 + bucket: bucket, 27 + cleanupInterval: 24 * time.Hour, 28 + stopCleanup: make(chan struct{}), 29 + } 30 + } 31 + 32 + // Start begins the background cleanup routine 33 + func (c *S3Cleanup) Start(ctx context.Context) { 34 + go c.cleanupRoutine(ctx) 35 + } 36 + 37 + // Stop stops the background cleanup routine 38 + func (c *S3Cleanup) Stop() { 39 + close(c.stopCleanup) 40 + } 41 + 42 + // cleanupRoutine periodically checks for and deletes expired objects 43 + func (c *S3Cleanup) cleanupRoutine(ctx context.Context) { 44 + ticker := time.NewTicker(c.cleanupInterval) 45 + defer ticker.Stop() 46 + 47 + for { 48 + select { 49 + case <-ticker.C: 50 + if err := c.cleanupExpired(ctx); err != nil { 51 + utils.LogErrorWithContext(ctx, fmt.Errorf("failed to cleanup expired objects: %w", err)) 52 + } 53 + case <-c.stopCleanup: 54 + return 55 + } 56 + } 57 + } 58 + 59 + // cleanupExpired scans the bucket for expired objects and deletes them 60 + func (c *S3Cleanup) cleanupExpired(ctx context.Context) error { 61 + filter := minio.ListObjectsOptions{ 62 + Recursive: true, 63 + WithMetadata: true, 64 + } 65 + 66 + objectsCh := c.mc.ListObjects(ctx, c.bucket, filter) 67 + for obj := range objectsCh { 68 + if obj.Err != nil { 69 + utils.LogErrorWithContext(ctx, fmt.Errorf("failed to list objects: %w", obj.Err)) 70 + continue 71 + } 72 + 73 + // Check if object has expiration metadata 74 + expiresAt, ok := obj.UserMetadata["expires-at"] 75 + if !ok { 76 + utils.LogErrorWithContext( 77 + ctx, 78 + fmt.Errorf("no expiration metadata found for object: %s", obj.Key), 79 + ) 80 + if err := c.delete(ctx, obj); err != nil { 81 + return err 82 + } 83 + continue 84 + } 85 + 86 + expTime, err := time.Parse(time.RFC3339, expiresAt) 87 + if err != nil { 88 + utils.LogError(fmt.Errorf("failed to parse expiration time (%s): %w", obj.Key, err)) 89 + continue 90 + } 91 + if !time.Now().After(expTime) { 92 + continue 93 + } 94 + 95 + // Object has expired, delete it 96 + if err := c.delete(ctx, obj); err != nil { 97 + return err 98 + } 99 + } 100 + return nil 101 + } 102 + 103 + // delete an object from s3 104 + func (c *S3Cleanup) delete(ctx context.Context, obj minio.ObjectInfo) error { 105 + if err := c.mc.RemoveObject(ctx, c.bucket, obj.Key, minio.RemoveObjectOptions{ 106 + ForceDelete: true, 107 + }); err != nil { 108 + return fmt.Errorf("failed to delete expired object (%s): %w", obj.Key, err) 109 + } 110 + return nil 111 + }
+22
internal/utils/utils.go
··· 1 + // Package utils provides utility/convenience functions 2 + package utils 3 + 4 + import ( 5 + "context" 6 + "log/slog" 7 + ) 8 + 9 + // LogError logs an error 10 + func LogError(err error) { 11 + slog.Error(err.Error(), LogAttr(err)) 12 + } 13 + 14 + // LogErrorWithContext logs an error with a context 15 + func LogErrorWithContext(ctx context.Context, err error) { 16 + slog.ErrorContext(ctx, err.Error(), LogAttr(err)) 17 + } 18 + 19 + // LogAttr returns a slog.Attr for an error 20 + func LogAttr(err error) slog.Attr { 21 + return slog.Any("error", err) 22 + }