loading up the forgejo repo on tangled to test page performance
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Help to recover from corrupted levelqueue (#24912)

gitea.com experienced the corrupted LevelQueue bug again.

I think the problem is clear now: if the keys in LevelDB went
out-of-sync, the LevelQueue itself doesn't have the ability to recover,
eg:

* LevelQueue.Len() reports 100
* LevelQueue.LPop() reports ErrNotFound = errors.New("no key found")

So it needs to dive into the LevelDB to remove all keys to recover the
corrupted LevelQueue.

More comments are in TestCorruptedLevelQueue.

authored by

wxiaoguang and committed by
GitHub
84c8ab9f 8faf9465

+162 -55
+25 -14
modules/queue/base_levelqueue.go
··· 5 5 6 6 import ( 7 7 "context" 8 + "sync/atomic" 8 9 9 10 "code.gitea.io/gitea/modules/nosql" 11 + "code.gitea.io/gitea/modules/queue/lqinternal" 10 12 11 13 "gitea.com/lunny/levelqueue" 14 + "github.com/syndtr/goleveldb/leveldb" 12 15 ) 13 16 14 17 type baseLevelQueue struct { 15 - internal *levelqueue.Queue 16 - conn string 17 - cfg *BaseConfig 18 + internal atomic.Pointer[levelqueue.Queue] 19 + 20 + conn string 21 + cfg *BaseConfig 22 + db *leveldb.DB 18 23 } 19 24 20 25 var _ baseQueue = (*baseLevelQueue)(nil) ··· 31 36 if err != nil { 32 37 return nil, err 33 38 } 34 - q := &baseLevelQueue{conn: conn, cfg: cfg} 35 - q.internal, err = levelqueue.NewQueue(db, []byte(cfg.QueueFullName), false) 39 + q := &baseLevelQueue{conn: conn, cfg: cfg, db: db} 40 + lq, err := levelqueue.NewQueue(db, []byte(cfg.QueueFullName), false) 36 41 if err != nil { 37 42 return nil, err 38 43 } 39 - 44 + q.internal.Store(lq) 40 45 return q, nil 41 46 } 42 47 43 48 func (q *baseLevelQueue) PushItem(ctx context.Context, data []byte) error { 44 - return baseLevelQueueCommon(q.cfg, q.internal, nil).PushItem(ctx, data) 49 + c := baseLevelQueueCommon(q.cfg, nil, func() baseLevelQueuePushPoper { return q.internal.Load() }) 50 + return c.PushItem(ctx, data) 45 51 } 46 52 47 53 func (q *baseLevelQueue) PopItem(ctx context.Context) ([]byte, error) { 48 - return baseLevelQueueCommon(q.cfg, q.internal, nil).PopItem(ctx) 54 + c := baseLevelQueueCommon(q.cfg, nil, func() baseLevelQueuePushPoper { return q.internal.Load() }) 55 + return c.PopItem(ctx) 49 56 } 50 57 51 58 func (q *baseLevelQueue) HasItem(ctx context.Context, data []byte) (bool, error) { ··· 53 60 } 54 61 55 62 func (q *baseLevelQueue) Len(ctx context.Context) (int, error) { 56 - return int(q.internal.Len()), nil 63 + return int(q.internal.Load().Len()), nil 57 64 } 58 65 59 66 func (q *baseLevelQueue) Close() error { 60 - err := q.internal.Close() 67 + err := q.internal.Load().Close() 61 68 _ = nosql.GetManager().CloseLevelDB(q.conn) 69 + q.db = nil // the db is not managed by us, it's managed by the nosql manager 62 70 return err 63 71 } 64 72 65 73 func (q *baseLevelQueue) RemoveAll(ctx context.Context) error { 66 - for q.internal.Len() > 0 { 67 - if _, err := q.internal.LPop(); err != nil { 68 - return err 69 - } 74 + lqinternal.RemoveLevelQueueKeys(q.db, []byte(q.cfg.QueueFullName)) 75 + lq, err := levelqueue.NewQueue(q.db, []byte(q.cfg.QueueFullName), false) 76 + if err != nil { 77 + return err 70 78 } 79 + old := q.internal.Load() 80 + q.internal.Store(lq) 81 + _ = old.Close() // Not ideal for concurrency. Luckily, the levelqueue only sets its db=nil because it doesn't manage the db, so far so good 71 82 return nil 72 83 }
+9 -8
modules/queue/base_levelqueue_common.go
··· 17 17 "github.com/syndtr/goleveldb/leveldb" 18 18 ) 19 19 20 + // baseLevelQueuePushPoper is the common interface for levelqueue.Queue and levelqueue.UniqueQueue 20 21 type baseLevelQueuePushPoper interface { 21 22 RPush(data []byte) error 22 23 LPop() ([]byte, error) ··· 24 25 } 25 26 26 27 type baseLevelQueueCommonImpl struct { 27 - length int 28 - internal baseLevelQueuePushPoper 29 - mu *sync.Mutex 28 + length int 29 + internalFunc func() baseLevelQueuePushPoper 30 + mu *sync.Mutex 30 31 } 31 32 32 33 func (q *baseLevelQueueCommonImpl) PushItem(ctx context.Context, data []byte) error { ··· 36 37 defer q.mu.Unlock() 37 38 } 38 39 39 - cnt := int(q.internal.Len()) 40 + cnt := int(q.internalFunc().Len()) 40 41 if cnt >= q.length { 41 42 return true, nil 42 43 } 43 - retry, err = false, q.internal.RPush(data) 44 + retry, err = false, q.internalFunc().RPush(data) 44 45 if err == levelqueue.ErrAlreadyInQueue { 45 46 err = ErrAlreadyInQueue 46 47 } ··· 55 56 defer q.mu.Unlock() 56 57 } 57 58 58 - data, err = q.internal.LPop() 59 + data, err = q.internalFunc().LPop() 59 60 if err == levelqueue.ErrNotFound { 60 61 return true, nil, nil 61 62 } ··· 66 67 }) 67 68 } 68 69 69 - func baseLevelQueueCommon(cfg *BaseConfig, internal baseLevelQueuePushPoper, mu *sync.Mutex) *baseLevelQueueCommonImpl { 70 - return &baseLevelQueueCommonImpl{length: cfg.Length, internal: internal} 70 + func baseLevelQueueCommon(cfg *BaseConfig, mu *sync.Mutex, internalFunc func() baseLevelQueuePushPoper) *baseLevelQueueCommonImpl { 71 + return &baseLevelQueueCommonImpl{length: cfg.Length, mu: mu, internalFunc: internalFunc} 71 72 } 72 73 73 74 func prepareLevelDB(cfg *BaseConfig) (conn string, db *leveldb.DB, err error) {
+55
modules/queue/base_levelqueue_test.go
··· 6 6 import ( 7 7 "testing" 8 8 9 + "code.gitea.io/gitea/modules/queue/lqinternal" 9 10 "code.gitea.io/gitea/modules/setting" 10 11 12 + "gitea.com/lunny/levelqueue" 11 13 "github.com/stretchr/testify/assert" 14 + "github.com/syndtr/goleveldb/leveldb" 12 15 ) 13 16 14 17 func TestBaseLevelDB(t *testing.T) { ··· 21 24 testQueueBasic(t, newBaseLevelQueueSimple, toBaseConfig("baseLevelQueue", setting.QueueSettings{Datadir: t.TempDir() + "/queue-test", Length: 10}), false) 22 25 testQueueBasic(t, newBaseLevelQueueUnique, toBaseConfig("baseLevelQueueUnique", setting.QueueSettings{ConnStr: "leveldb://" + t.TempDir() + "/queue-test", Length: 10}), true) 23 26 } 27 + 28 + func TestCorruptedLevelQueue(t *testing.T) { 29 + // sometimes the levelqueue could be in a corrupted state, this test is to make sure it can recover from it 30 + dbDir := t.TempDir() + "/levelqueue-test" 31 + db, err := leveldb.OpenFile(dbDir, nil) 32 + if !assert.NoError(t, err) { 33 + return 34 + } 35 + defer db.Close() 36 + 37 + assert.NoError(t, db.Put([]byte("other-key"), []byte("other-value"), nil)) 38 + 39 + nameQueuePrefix := []byte("queue_name") 40 + nameSetPrefix := []byte("set_name") 41 + lq, err := levelqueue.NewUniqueQueue(db, nameQueuePrefix, nameSetPrefix, false) 42 + assert.NoError(t, err) 43 + assert.NoError(t, lq.RPush([]byte("item-1"))) 44 + 45 + itemKey := lqinternal.QueueItemKeyBytes(nameQueuePrefix, 1) 46 + itemValue, err := db.Get(itemKey, nil) 47 + assert.NoError(t, err) 48 + assert.Equal(t, []byte("item-1"), itemValue) 49 + 50 + // there should be 5 keys in db: queue low, queue high, 1 queue item, 1 set item, and "other-key" 51 + keys := lqinternal.ListLevelQueueKeys(db) 52 + assert.Len(t, keys, 5) 53 + 54 + // delete the queue item key, to corrupt the queue 55 + assert.NoError(t, db.Delete(itemKey, nil)) 56 + // now the queue is corrupted, it never works again 57 + _, err = lq.LPop() 58 + assert.ErrorIs(t, err, levelqueue.ErrNotFound) 59 + assert.NoError(t, lq.Close()) 60 + 61 + // remove all the queue related keys to reset the queue 62 + lqinternal.RemoveLevelQueueKeys(db, nameQueuePrefix) 63 + lqinternal.RemoveLevelQueueKeys(db, nameSetPrefix) 64 + // now there should be only 1 key in db: "other-key" 65 + keys = lqinternal.ListLevelQueueKeys(db) 66 + assert.Len(t, keys, 1) 67 + assert.Equal(t, []byte("other-key"), keys[0]) 68 + 69 + // re-create a queue from db 70 + lq, err = levelqueue.NewUniqueQueue(db, nameQueuePrefix, nameSetPrefix, false) 71 + assert.NoError(t, err) 72 + assert.NoError(t, lq.RPush([]byte("item-new-1"))) 73 + // now the queue works again 74 + itemValue, err = lq.LPop() 75 + assert.NoError(t, err) 76 + assert.Equal(t, []byte("item-new-1"), itemValue) 77 + assert.NoError(t, lq.Close()) 78 + }
+25 -33
modules/queue/base_levelqueue_unique.go
··· 6 6 import ( 7 7 "context" 8 8 "sync" 9 - "unsafe" 9 + "sync/atomic" 10 10 11 11 "code.gitea.io/gitea/modules/nosql" 12 + "code.gitea.io/gitea/modules/queue/lqinternal" 12 13 13 14 "gitea.com/lunny/levelqueue" 14 15 "github.com/syndtr/goleveldb/leveldb" 15 16 ) 16 17 17 18 type baseLevelQueueUnique struct { 18 - internal *levelqueue.UniqueQueue 19 - conn string 20 - cfg *BaseConfig 19 + internal atomic.Pointer[levelqueue.UniqueQueue] 20 + 21 + conn string 22 + cfg *BaseConfig 23 + db *leveldb.DB 21 24 22 25 mu sync.Mutex // the levelqueue.UniqueQueue is not thread-safe, there is no mutex protecting the underlying queue&set together 23 26 } ··· 29 32 if err != nil { 30 33 return nil, err 31 34 } 32 - q := &baseLevelQueueUnique{conn: conn, cfg: cfg} 33 - q.internal, err = levelqueue.NewUniqueQueue(db, []byte(cfg.QueueFullName), []byte(cfg.SetFullName), false) 35 + q := &baseLevelQueueUnique{conn: conn, cfg: cfg, db: db} 36 + lq, err := levelqueue.NewUniqueQueue(db, []byte(cfg.QueueFullName), []byte(cfg.SetFullName), false) 34 37 if err != nil { 35 38 return nil, err 36 39 } 37 - 40 + q.internal.Store(lq) 38 41 return q, nil 39 42 } 40 43 41 44 func (q *baseLevelQueueUnique) PushItem(ctx context.Context, data []byte) error { 42 - return baseLevelQueueCommon(q.cfg, q.internal, &q.mu).PushItem(ctx, data) 45 + c := baseLevelQueueCommon(q.cfg, &q.mu, func() baseLevelQueuePushPoper { return q.internal.Load() }) 46 + return c.PushItem(ctx, data) 43 47 } 44 48 45 49 func (q *baseLevelQueueUnique) PopItem(ctx context.Context) ([]byte, error) { 46 - return baseLevelQueueCommon(q.cfg, q.internal, &q.mu).PopItem(ctx) 50 + c := baseLevelQueueCommon(q.cfg, &q.mu, func() baseLevelQueuePushPoper { return q.internal.Load() }) 51 + return c.PopItem(ctx) 47 52 } 48 53 49 54 func (q *baseLevelQueueUnique) HasItem(ctx context.Context, data []byte) (bool, error) { 50 55 q.mu.Lock() 51 56 defer q.mu.Unlock() 52 - return q.internal.Has(data) 57 + return q.internal.Load().Has(data) 53 58 } 54 59 55 60 func (q *baseLevelQueueUnique) Len(ctx context.Context) (int, error) { 56 61 q.mu.Lock() 57 62 defer q.mu.Unlock() 58 - return int(q.internal.Len()), nil 63 + return int(q.internal.Load().Len()), nil 59 64 } 60 65 61 66 func (q *baseLevelQueueUnique) Close() error { 62 67 q.mu.Lock() 63 68 defer q.mu.Unlock() 64 - err := q.internal.Close() 69 + err := q.internal.Load().Close() 70 + q.db = nil // the db is not managed by us, it's managed by the nosql manager 65 71 _ = nosql.GetManager().CloseLevelDB(q.conn) 66 72 return err 67 73 } ··· 69 75 func (q *baseLevelQueueUnique) RemoveAll(ctx context.Context) error { 70 76 q.mu.Lock() 71 77 defer q.mu.Unlock() 72 - 73 - type levelUniqueQueue struct { 74 - q *levelqueue.Queue 75 - set *levelqueue.Set 76 - db *leveldb.DB 77 - } 78 - lq := (*levelUniqueQueue)(unsafe.Pointer(q.internal)) 79 - 80 - for lq.q.Len() > 0 { 81 - if _, err := lq.q.LPop(); err != nil { 82 - return err 83 - } 84 - } 85 - 86 - // the "set" must be cleared after the "list" because there is no transaction. 87 - // it's better to have duplicate items than losing items. 88 - members, err := lq.set.Members() 78 + lqinternal.RemoveLevelQueueKeys(q.db, []byte(q.cfg.QueueFullName)) 79 + lqinternal.RemoveLevelQueueKeys(q.db, []byte(q.cfg.SetFullName)) 80 + lq, err := levelqueue.NewUniqueQueue(q.db, []byte(q.cfg.QueueFullName), []byte(q.cfg.SetFullName), false) 89 81 if err != nil { 90 - return err // seriously corrupted 82 + return err 91 83 } 92 - for _, v := range members { 93 - _, _ = lq.set.Remove(v) 94 - } 84 + old := q.internal.Load() 85 + q.internal.Store(lq) 86 + _ = old.Close() // Not ideal for concurrency. Luckily, the levelqueue only sets its db=nil because it doesn't manage the db, so far so good 95 87 return nil 96 88 }
+48
modules/queue/lqinternal/lqinternal.go
··· 1 + // Copyright 2023 The Gitea Authors. All rights reserved. 2 + // SPDX-License-Identifier: MIT 3 + 4 + package lqinternal 5 + 6 + import ( 7 + "bytes" 8 + "encoding/binary" 9 + 10 + "github.com/syndtr/goleveldb/leveldb" 11 + "github.com/syndtr/goleveldb/leveldb/opt" 12 + ) 13 + 14 + func QueueItemIDBytes(id int64) []byte { 15 + buf := make([]byte, 8) 16 + binary.PutVarint(buf, id) 17 + return buf 18 + } 19 + 20 + func QueueItemKeyBytes(prefix []byte, id int64) []byte { 21 + key := make([]byte, len(prefix), len(prefix)+1+8) 22 + copy(key, prefix) 23 + key = append(key, '-') 24 + return append(key, QueueItemIDBytes(id)...) 25 + } 26 + 27 + func RemoveLevelQueueKeys(db *leveldb.DB, namePrefix []byte) { 28 + keyPrefix := make([]byte, len(namePrefix)+1) 29 + copy(keyPrefix, namePrefix) 30 + keyPrefix[len(namePrefix)] = '-' 31 + 32 + it := db.NewIterator(nil, &opt.ReadOptions{Strict: opt.NoStrict}) 33 + defer it.Release() 34 + for it.Next() { 35 + if bytes.HasPrefix(it.Key(), keyPrefix) { 36 + _ = db.Delete(it.Key(), nil) 37 + } 38 + } 39 + } 40 + 41 + func ListLevelQueueKeys(db *leveldb.DB) (res [][]byte) { 42 + it := db.NewIterator(nil, &opt.ReadOptions{Strict: opt.NoStrict}) 43 + defer it.Release() 44 + for it.Next() { 45 + res = append(res, it.Key()) 46 + } 47 + return res 48 + }