loading up the forgejo repo on tangled to test page performance
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Fix queue worker incorrectly stopped when there are still more items in the queue (#29532)

Without `case <-t.C`, the workers would stop incorrectly, the test won't
pass. For the worse case, there might be only one running worker
processing the queue items for long time because other workers are
stopped. The root cause is related to the logic of doDispatchBatchToWorker.
It isn't a serious problem at the moment, so keep it as-is.

(cherry picked from commit 6465f94a2d26cdacc232fddc20f98d98df61ddac)

authored by

wxiaoguang and committed by
Earl Warren
e1592974 e2371743

+42 -9
+16 -4
modules/queue/workergroup.go
··· 60 60 full = true 61 61 } 62 62 63 + // TODO: the logic could be improved in the future, to avoid a data-race between "doStartNewWorker" and "workerNum" 64 + // The root problem is that if we skip "doStartNewWorker" here, the "workerNum" might be decreased by other workers later 65 + // So ideally, it should check whether there are enough workers by some approaches, and start new workers if necessary. 63 66 q.workerNumMu.Lock() 64 67 noWorker := q.workerNum == 0 65 68 if full || noWorker { ··· 143 146 log.Debug("Queue %q starts new worker", q.GetName()) 144 147 defer log.Debug("Queue %q stops idle worker", q.GetName()) 145 148 149 + atomic.AddInt32(&q.workerStartedCounter, 1) // Only increase counter, used for debugging 150 + 146 151 t := time.NewTicker(workerIdleDuration) 152 + defer t.Stop() 153 + 147 154 keepWorking := true 148 155 stopWorking := func() { 149 156 q.workerNumMu.Lock() ··· 158 165 case batch, ok := <-q.batchChan: 159 166 if !ok { 160 167 stopWorking() 161 - } else { 162 - q.doWorkerHandle(batch) 163 - t.Reset(workerIdleDuration) 168 + continue 169 + } 170 + q.doWorkerHandle(batch) 171 + // reset the idle ticker, and drain the tick after reset in case a tick is already triggered 172 + t.Reset(workerIdleDuration) 173 + select { 174 + case <-t.C: 175 + default: 164 176 } 165 177 case <-t.C: 166 178 q.workerNumMu.Lock() 167 - keepWorking = q.workerNum <= 1 179 + keepWorking = q.workerNum <= 1 // keep the last worker running 168 180 if !keepWorking { 169 181 q.workerNum-- 170 182 }
+2
modules/queue/workerqueue.go
··· 40 40 workerMaxNum int 41 41 workerActiveNum int 42 42 workerNumMu sync.Mutex 43 + 44 + workerStartedCounter int32 43 45 } 44 46 45 47 type flushType chan struct{}
+24 -5
modules/queue/workerqueue_test.go
··· 11 11 "time" 12 12 13 13 "code.gitea.io/gitea/modules/setting" 14 + "code.gitea.io/gitea/modules/test" 14 15 15 16 "github.com/stretchr/testify/assert" 16 17 ) ··· 175 176 } 176 177 177 178 func TestWorkerPoolQueueActiveWorkers(t *testing.T) { 178 - oldWorkerIdleDuration := workerIdleDuration 179 - workerIdleDuration = 300 * time.Millisecond 180 - defer func() { 181 - workerIdleDuration = oldWorkerIdleDuration 182 - }() 179 + defer test.MockVariableValue(&workerIdleDuration, 300*time.Millisecond)() 183 180 184 181 handler := func(items ...int) (unhandled []int) { 185 182 time.Sleep(100 * time.Millisecond) ··· 250 247 q, _ = newWorkerPoolQueueForTest("test-workpoolqueue", qs, handler, false) 251 248 assert.EqualValues(t, 20, q.GetQueueItemNumber()) 252 249 } 250 + 251 + func TestWorkerPoolQueueWorkerIdleReset(t *testing.T) { 252 + defer test.MockVariableValue(&workerIdleDuration, 10*time.Millisecond)() 253 + 254 + handler := func(items ...int) (unhandled []int) { 255 + time.Sleep(50 * time.Millisecond) 256 + return nil 257 + } 258 + 259 + q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 2, Length: 100}, handler, false) 260 + stop := runWorkerPoolQueue(q) 261 + for i := 0; i < 20; i++ { 262 + assert.NoError(t, q.Push(i)) 263 + } 264 + 265 + time.Sleep(500 * time.Millisecond) 266 + assert.EqualValues(t, 2, q.GetWorkerNumber()) 267 + assert.EqualValues(t, 2, q.GetWorkerActiveNumber()) 268 + // when the queue never becomes empty, the existing workers should keep working 269 + assert.EqualValues(t, 2, q.workerStartedCounter) 270 + stop() 271 + }