···11+// Copyright 2016 Google LLC.
22+// Use of this source code is governed by a BSD-style
33+// license that can be found in the LICENSE file.
44+55+// Package bundler supports bundling (batching) of items. Bundling amortizes an
66+// action with fixed costs over multiple items. For example, if an API provides
77+// an RPC that accepts a list of items as input, but clients would prefer
88+// adding items one at a time, then a Bundler can accept individual items from
99+// the client and bundle many of them into a single RPC.
1010+//
1111+// This package is experimental and subject to change without notice.
1212+package bundler
1313+1414+import (
1515+ "context"
1616+ "errors"
1717+ "sync"
1818+ "time"
1919+2020+ "golang.org/x/sync/semaphore"
2121+)
2222+2323+type mode int
2424+2525+const (
2626+ DefaultDelayThreshold = time.Second
2727+ DefaultBundleHandlerDeadline = time.Minute
2828+ DefaultBundleCountThreshold = 10
2929+ DefaultBundleByteThreshold = 1e6 // 1M
3030+ DefaultBufferedByteLimit = 1e9 // 1G
3131+)
3232+3333+const (
3434+ none mode = iota
3535+ add
3636+ addWait
3737+)
3838+3939+var (
4040+ // ErrOverflow indicates that Bundler's stored bytes exceeds its BufferedByteLimit.
4141+ ErrOverflow = errors.New("bundler: reached buffered byte limit")
4242+4343+ // ErrOversizedItem indicates that an item's size exceeds the maximum bundle size.
4444+ ErrOversizedItem = errors.New("bundler: item size exceeds bundle byte limit")
4545+4646+ // errMixedMethods indicates that mutually exclusive methods has been
4747+ // called subsequently.
4848+ errMixedMethods = errors.New("bundler: calls to Add and AddWait cannot be mixed")
4949+)
5050+5151+// A Bundler collects items added to it into a bundle until the bundle
5252+// exceeds a given size, then calls a user-provided function to handle the
5353+// bundle.
5454+//
5555+// The exported fields are only safe to modify prior to the first call to Add
5656+// or AddWait.
5757+type Bundler[T any] struct {
5858+ // Starting from the time that the first message is added to a bundle, once
5959+ // this delay has passed, handle the bundle. The default is DefaultDelayThreshold.
6060+ DelayThreshold time.Duration
6161+6262+ // Once a bundle has this many items, handle the bundle. Since only one
6363+ // item at a time is added to a bundle, no bundle will exceed this
6464+ // threshold, so it also serves as a limit. The default is
6565+ // DefaultBundleCountThreshold.
6666+ BundleCountThreshold int
6767+6868+ // Once the number of bytes in current bundle reaches this threshold, handle
6969+ // the bundle. The default is DefaultBundleByteThreshold. This triggers handling,
7070+ // but does not cap the total size of a bundle.
7171+ BundleByteThreshold int
7272+7373+ // The maximum size of a bundle, in bytes. Zero means unlimited.
7474+ BundleByteLimit int
7575+7676+ // The maximum number of bytes that the Bundler will keep in memory before
7777+ // returning ErrOverflow. The default is DefaultBufferedByteLimit.
7878+ BufferedByteLimit int
7979+8080+ // The maximum number of handler invocations that can be running at once.
8181+ // The default is 1.
8282+ HandlerLimit int
8383+8484+ // ContextDeadline is the deadline for the context attached to bundle handling.
8585+ ContextDeadline time.Duration
8686+8787+ handler func(context.Context, []T) // called to handle a bundle
8888+8989+ mu sync.Mutex // guards access to fields below
9090+ flushTimer *time.Timer // implements DelayThreshold
9191+ handlerCount int // # of bundles currently being handled (i.e. handler is invoked on them)
9292+ sem *semaphore.Weighted // enforces BufferedByteLimit
9393+ semOnce sync.Once // guards semaphore initialization
9494+ // The current bundle we're adding items to. Not yet in the queue.
9595+ // Appended to the queue once the flushTimer fires or the bundle
9696+ // thresholds/limits are reached. If curBundle is nil and tail is
9797+ // not, we first try to add items to tail. Once tail is full or handled,
9898+ // we create a new curBundle for the incoming item.
9999+ curBundle *bundle[T]
100100+ // The next bundle in the queue to be handled. Nil if the queue is
101101+ // empty.
102102+ head *bundle[T]
103103+ // The last bundle in the queue to be handled. Nil if the queue is
104104+ // empty. If curBundle is nil and tail isn't, we attempt to add new
105105+ // items to the tail until if becomes full or has been passed to the
106106+ // handler.
107107+ tail *bundle[T]
108108+ curFlush *sync.WaitGroup // counts outstanding bundles since last flush
109109+ prevFlush chan bool // signal used to wait for prior flush
110110+111111+ // The first call to Add or AddWait, mode will be add or addWait respectively.
112112+ // If there wasn't call yet then mode is none.
113113+ mode mode
114114+ // TODO: consider alternative queue implementation for head/tail bundle. see:
115115+ // https://code-review.googlesource.com/c/google-api-go-client/+/47991/4/support/bundler/bundler.go#74
116116+}
117117+118118+// A bundle is a group of items that were added individually and will be passed
119119+// to a handler as a slice.
120120+type bundle[T any] struct {
121121+ items []T // slice of T
122122+ size int // size in bytes of all items
123123+ next *bundle[T] // bundles are handled in order as a linked list queue
124124+ flush *sync.WaitGroup // the counter that tracks flush completion
125125+}
126126+127127+// add appends item to this bundle and increments the total size. It requires
128128+// that b.mu is locked.
129129+func (bu *bundle[T]) add(item T, size int) {
130130+ bu.items = append(bu.items, item)
131131+ bu.size += size
132132+}
133133+134134+// New creates a new Bundler.
135135+//
136136+// handler is a function that will be called on each bundle. If itemExample is
137137+// of type T, the argument to handler is of type []T. handler is always called
138138+// sequentially for each bundle, and never in parallel.
139139+//
140140+// Configure the Bundler by setting its thresholds and limits before calling
141141+// any of its methods.
142142+func New[T any](handler func(context.Context, []T)) *Bundler[T] {
143143+ b := &Bundler[T]{
144144+ DelayThreshold: DefaultDelayThreshold,
145145+ BundleCountThreshold: DefaultBundleCountThreshold,
146146+ BundleByteThreshold: DefaultBundleByteThreshold,
147147+ BufferedByteLimit: DefaultBufferedByteLimit,
148148+ HandlerLimit: 1,
149149+150150+ handler: handler,
151151+ curFlush: &sync.WaitGroup{},
152152+ }
153153+ return b
154154+}
155155+156156+func (b *Bundler[T]) initSemaphores() {
157157+ // Create the semaphores lazily, because the user may set limits
158158+ // after NewBundler.
159159+ b.semOnce.Do(func() {
160160+ b.sem = semaphore.NewWeighted(int64(b.BufferedByteLimit))
161161+ })
162162+}
163163+164164+// enqueueCurBundle moves curBundle to the end of the queue. The bundle may be
165165+// handled immediately if we are below HandlerLimit. It requires that b.mu is
166166+// locked.
167167+func (b *Bundler[T]) enqueueCurBundle() {
168168+ // We don't require callers to check if there is a pending bundle. It
169169+ // may have already been appended to the queue. If so, return early.
170170+ if b.curBundle == nil {
171171+ return
172172+ }
173173+ // If we are below the HandlerLimit, the queue must be empty. Handle
174174+ // immediately with a new goroutine.
175175+ if b.handlerCount < b.HandlerLimit {
176176+ b.handlerCount++
177177+ go b.handle(b.curBundle)
178178+ } else if b.tail != nil {
179179+ // There are bundles on the queue, so append to the end
180180+ b.tail.next = b.curBundle
181181+ b.tail = b.curBundle
182182+ } else {
183183+ // The queue is empty, so initialize the queue
184184+ b.head = b.curBundle
185185+ b.tail = b.curBundle
186186+ }
187187+ b.curBundle = nil
188188+ if b.flushTimer != nil {
189189+ b.flushTimer.Stop()
190190+ b.flushTimer = nil
191191+ }
192192+}
193193+194194+// setMode sets the state of Bundler's mode. If mode was defined before
195195+// and passed state is different from it then return an error.
196196+func (b *Bundler[T]) setMode(m mode) error {
197197+ b.mu.Lock()
198198+ defer b.mu.Unlock()
199199+ if b.mode == m || b.mode == none {
200200+ b.mode = m
201201+ return nil
202202+ }
203203+ return errMixedMethods
204204+}
205205+206206+// canFit returns true if bu can fit an additional item of size bytes based
207207+// on the limits of Bundler b.
208208+func (b *Bundler[T]) canFit(bu *bundle[T], size int) bool {
209209+ return (b.BundleByteLimit <= 0 || bu.size+size <= b.BundleByteLimit) &&
210210+ (b.BundleCountThreshold <= 0 || len(bu.items) < b.BundleCountThreshold)
211211+}
212212+213213+// Add adds item to the current bundle. It marks the bundle for handling and
214214+// starts a new one if any of the thresholds or limits are exceeded.
215215+// The type of item must be assignable to the itemExample parameter of the NewBundler
216216+// method, otherwise there will be a panic.
217217+//
218218+// If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then
219219+// the item can never be handled. Add returns ErrOversizedItem in this case.
220220+//
221221+// If adding the item would exceed the maximum memory allowed
222222+// (Bundler.BufferedByteLimit) or an AddWait call is blocked waiting for
223223+// memory, Add returns ErrOverflow.
224224+//
225225+// Add never blocks.
226226+func (b *Bundler[T]) Add(item T, size int) error {
227227+ if err := b.setMode(add); err != nil {
228228+ return err
229229+ }
230230+ // If this item exceeds the maximum size of a bundle,
231231+ // we can never send it.
232232+ if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
233233+ return ErrOversizedItem
234234+ }
235235+236236+ // If adding this item would exceed our allotted memory
237237+ // footprint, we can't accept it.
238238+ // (TryAcquire also returns false if anything is waiting on the semaphore,
239239+ // so calls to Add and AddWait shouldn't be mixed.)
240240+ b.initSemaphores()
241241+ if !b.sem.TryAcquire(int64(size)) {
242242+ return ErrOverflow
243243+ }
244244+245245+ b.mu.Lock()
246246+ defer b.mu.Unlock()
247247+ return b.add(item, size)
248248+}
249249+250250+// add adds item to the tail of the bundle queue or curBundle depending on space
251251+// and nil-ness (see inline comments). It marks curBundle for handling (by
252252+// appending it to the queue) if any of the thresholds or limits are exceeded.
253253+// curBundle is lazily initialized. It requires that b.mu is locked.
254254+func (b *Bundler[T]) add(item T, size int) error {
255255+ // If we don't have a curBundle, see if we can add to the queue tail.
256256+ if b.tail != nil && b.curBundle == nil && b.canFit(b.tail, size) {
257257+ b.tail.add(item, size)
258258+ return nil
259259+ }
260260+261261+ // If we can't fit in the existing curBundle, move it onto the queue.
262262+ if b.curBundle != nil && !b.canFit(b.curBundle, size) {
263263+ b.enqueueCurBundle()
264264+ }
265265+266266+ // Create a curBundle if we don't have one.
267267+ if b.curBundle == nil {
268268+ b.curFlush.Add(1)
269269+ b.curBundle = &bundle[T]{
270270+ items: []T{},
271271+ flush: b.curFlush,
272272+ }
273273+ }
274274+275275+ // Add the item.
276276+ b.curBundle.add(item, size)
277277+278278+ // If curBundle is ready for handling, move it to the queue.
279279+ if b.curBundle.size >= b.BundleByteThreshold ||
280280+ len(b.curBundle.items) == b.BundleCountThreshold {
281281+ b.enqueueCurBundle()
282282+ }
283283+284284+ // If we created a new bundle and it wasn't immediately handled, set a timer
285285+ if b.curBundle != nil && b.flushTimer == nil {
286286+ b.flushTimer = time.AfterFunc(b.DelayThreshold, b.tryHandleBundles)
287287+ }
288288+289289+ return nil
290290+}
291291+292292+// tryHandleBundles is the timer callback that handles or queues any current
293293+// bundle after DelayThreshold time, even if the bundle isn't completely full.
294294+func (b *Bundler[T]) tryHandleBundles() {
295295+ b.mu.Lock()
296296+ b.enqueueCurBundle()
297297+ b.mu.Unlock()
298298+}
299299+300300+// next returns the next bundle that is ready for handling and removes it from
301301+// the internal queue. It requires that b.mu is locked.
302302+func (b *Bundler[T]) next() *bundle[T] {
303303+ if b.head == nil {
304304+ return nil
305305+ }
306306+ out := b.head
307307+ b.head = b.head.next
308308+ if b.head == nil {
309309+ b.tail = nil
310310+ }
311311+ out.next = nil
312312+ return out
313313+}
314314+315315+// handle calls the user-specified handler on the given bundle. handle is
316316+// intended to be run as a goroutine. After the handler returns, we update the
317317+// byte total. handle continues processing additional bundles that are ready.
318318+// If no more bundles are ready, the handler count is decremented and the
319319+// goroutine ends.
320320+func (b *Bundler[T]) handle(bu *bundle[T]) {
321321+ for bu != nil {
322322+ ctx, cancel := context.WithTimeout(context.Background(), b.ContextDeadline)
323323+ b.handler(ctx, bu.items)
324324+ cancel()
325325+ bu = b.postHandle(bu)
326326+ }
327327+}
328328+329329+func (b *Bundler[T]) postHandle(bu *bundle[T]) *bundle[T] {
330330+ b.mu.Lock()
331331+ defer b.mu.Unlock()
332332+333333+ b.sem.Release(int64(bu.size))
334334+ bu.flush.Done()
335335+336336+ bu = b.next()
337337+ if bu == nil {
338338+ b.handlerCount--
339339+ }
340340+ return bu
341341+}
342342+343343+// AddWait adds item to the current bundle. It marks the bundle for handling and
344344+// starts a new one if any of the thresholds or limits are exceeded.
345345+//
346346+// If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then
347347+// the item can never be handled. AddWait returns ErrOversizedItem in this case.
348348+//
349349+// If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit),
350350+// AddWait blocks until space is available or ctx is done.
351351+//
352352+// Calls to Add and AddWait should not be mixed on the same Bundler.
353353+func (b *Bundler[T]) AddWait(ctx context.Context, item T, size int) error {
354354+ if err := b.setMode(addWait); err != nil {
355355+ return err
356356+ }
357357+ // If this item exceeds the maximum size of a bundle,
358358+ // we can never send it.
359359+ if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
360360+ return ErrOversizedItem
361361+ }
362362+ // If adding this item would exceed our allotted memory footprint, block
363363+ // until space is available. The semaphore is FIFO, so there will be no
364364+ // starvation.
365365+ b.initSemaphores()
366366+ if err := b.sem.Acquire(ctx, int64(size)); err != nil {
367367+ return err
368368+ }
369369+370370+ b.mu.Lock()
371371+ defer b.mu.Unlock()
372372+ return b.add(item, size)
373373+}
374374+375375+// Flush invokes the handler for all remaining items in the Bundler and waits
376376+// for it to return.
377377+func (b *Bundler[T]) Flush() {
378378+ b.mu.Lock()
379379+380380+ // If a curBundle is pending, move it to the queue.
381381+ b.enqueueCurBundle()
382382+383383+ // Store a pointer to the WaitGroup that counts outstanding bundles
384384+ // in the current flush and create a new one to track the next flush.
385385+ wg := b.curFlush
386386+ b.curFlush = &sync.WaitGroup{}
387387+388388+ // Flush must wait for all prior, outstanding flushes to complete.
389389+ // We use a channel to communicate completion between each flush in
390390+ // the sequence.
391391+ prev := b.prevFlush
392392+ next := make(chan bool)
393393+ b.prevFlush = next
394394+395395+ b.mu.Unlock()
396396+397397+ // Wait until the previous flush is finished.
398398+ if prev != nil {
399399+ <-prev
400400+ }
401401+402402+ // Wait until this flush is finished.
403403+ wg.Wait()
404404+405405+ // Allow the next flush to finish.
406406+ close(next)
407407+}
+37
bundler/bundler_test.go
···11+package bundler
22+33+import (
44+ "context"
55+ "testing"
66+)
77+88+func TestBundler(t *testing.T) {
99+ input := []int{1, 2, 3, 4}
1010+ done := false
1111+ b := New[int](func(_ context.Context, data []int) {
1212+ if len(data) != 4 {
1313+ t.Errorf("Wanted len(data) == %d, got: %d", len(input), len(data))
1414+ }
1515+1616+ sum := 0
1717+ const wantSum = 10
1818+ for _, i := range data {
1919+ sum += i
2020+ }
2121+2222+ if sum != wantSum {
2323+ t.Errorf("wanted sum of inputs to be %d, got: %d", wantSum, sum)
2424+ }
2525+ done = true
2626+ })
2727+2828+ for _, i := range input {
2929+ b.Add(i, 1)
3030+ }
3131+3232+ b.Flush()
3333+3434+ if !done {
3535+ t.Fatal("function wasn't called")
3636+ }
3737+}
+45-3
cmd/hythlodaeus/main.go
···66 "log"
77 "os"
88 "path/filepath"
99+ "time"
9101111+ islog "git.xeserv.us/Techaro/hythlodaeus/internal/slog"
1012 "git.xeserv.us/Techaro/hythlodaeus/server"
1313+ "git.xeserv.us/Techaro/hythlodaeus/telemetry"
1114 "git.xeserv.us/Techaro/hythlodaeus/watcher"
1215 "github.com/facebookgo/flagenv"
1616+ _ "github.com/joho/godotenv/autoload"
1317 "golang.org/x/sync/errgroup"
1418 "k8s.io/client-go/kubernetes"
1519 "k8s.io/client-go/rest"
···1721)
18221923var (
2020- httpBind = flag.String("http-bind", ":80", "the host:port to bind plain HTTP")
2121- httpsBind = flag.String("https-bind", ":443", "the host:port to bind secure HTTPS")
2424+ grpcBind = flag.String("grpc-bind", ":8393", "the host:port to bind the grpc API")
2525+ grpcCert = flag.String("grpc-cert", "./var/tls.crt", "the TLS cert to serve grpc")
2626+ grpcKey = flag.String("grpc-key", "./var/tls.key", "the TLS key to serve grpc")
2727+ httpBind = flag.String("http-bind", ":80", "the host:port to bind plain HTTP")
2828+ httpsBind = flag.String("https-bind", ":443", "the host:port to bind secure HTTPS")
2929+ telemetryEnable = flag.Bool("telemetry-enable", false, "if true, enable request telemetry bundling to object storage")
3030+ telemetryBucket = flag.String("telemetry-bucket", "relayd-logs", "object storage bucket to dump logs to")
3131+ telemetryPathStyle = flag.Bool("telemetry-path-style", false, "if true, use s3 path style")
3232+ telemetryHost = flag.String("telemetry-host", hostname(), "hostname to disambiguate telemetry")
3333+ telemetryBundleCount = flag.Int("telemetry-bundle-count", 512, "maximum number of items per telemetry bundle")
3434+ telemetryContextDeadline = flag.Duration("telemetry-context-deadline", time.Minute, "maximum time for the telemetry context deadline")
3535+ telemetryDelayThreshold = flag.Duration("telemetry-delay-threshold", time.Minute, "how long log messages should live in memory before they are written to object storage")
2236)
23373838+func hostname() string {
3939+ name, err := os.Hostname()
4040+ if err != nil {
4141+ return "hythlodaeus"
4242+ }
4343+ return name
4444+}
4545+2446func main() {
2547 flagenv.Parse()
2648 flag.Parse()
27495050+ islog.Init()
5151+2852 client, err := kubernetes.NewForConfig(getKubernetesConfig())
2953 if err != nil {
3054 log.Fatalf("can't make kubernetes client: %v", err)
3155 }
32563333- s := server.New(server.WithHTTPBind(*httpBind), server.WithHTTPSBind(*httpsBind))
5757+ s, err := server.New(
5858+ server.WithGRPCBind(*grpcBind),
5959+ server.WithGRPCCert(*grpcCert),
6060+ server.WithGRPCKey(*grpcKey),
6161+ server.WithHTTPBind(*httpBind),
6262+ server.WithHTTPSBind(*httpsBind),
6363+ server.WithTelemetryConfig(telemetry.Config{
6464+ Bucket: *telemetryBucket,
6565+ PathStyle: *telemetryPathStyle,
6666+ Host: *telemetryHost,
6767+ BundleCount: *telemetryBundleCount,
6868+ ContextDeadline: *telemetryContextDeadline,
6969+ DelayThreshold: *telemetryDelayThreshold,
7070+ }),
7171+ )
7272+ if err != nil {
7373+ log.Fatalf("can't make server: %v", err)
7474+ }
7575+3476 w := watcher.New(client, func(payload *watcher.Payload) {
3577 s.Update(payload)
3678 })