A very experimental PLC implementation which uses BFT consensus for decentralization
19
fork

Configure Feed

Select the types of activity you want to include in your feed.

Authoritative import fixes and improvements

- Will keep polling for, and importing, new operations now after reaching "the end"
- Tentative websocket client support (not tested, and doesn't actually reduce usage of HTTP /export when new operations are detected)
- The way we're forcing CometBFT to produce a new block is a major hack but it works. This hack will be redundant if we later decide to let create_empty_blocks be true.

gbl08ma 9f40f718 0fe993f3

+248 -67
+5 -4
abciapp/app.go
··· 47 47 lastProcessedProposalHash []byte 48 48 lastProcessedProposalExecTxResults []*processResult 49 49 50 - aocsByPLC map[string]*authoritativeOperationsCache 50 + aoc *authoritativeOperationsFetcher 51 51 52 - blockStore *bftstore.BlockStore 52 + blockStore *bftstore.BlockStore 53 + triggerBlockCreation func() 53 54 54 55 blockChallengeCoordinator *blockChallengeCoordinator 55 56 } ··· 86 87 indexDB: indexDB, 87 88 mempoolSubmitter: mempoolSubmitter, 88 89 snapshotDirectory: snapshotDirectory, 89 - aocsByPLC: make(map[string]*authoritativeOperationsCache), 90 90 } 91 91 92 92 if pv != nil { ··· 209 209 } 210 210 } 211 211 212 - func (d *DIDPLCApplication) FinishInitializing(blockStore *bftstore.BlockStore) error { 212 + func (d *DIDPLCApplication) FinishInitializing(blockStore *bftstore.BlockStore, triggerBlockCreation func()) error { 213 213 d.blockStore = blockStore 214 + d.triggerBlockCreation = triggerBlockCreation 214 215 215 216 var err error 216 217 d.blockChallengeCoordinator, err = newBlockChallengeCoordinator(d.runnerContext, d.logger, d.txFactory, blockStore, d.validatorPubKey)
+41 -20
abciapp/execution.go
··· 48 48 st := time.Now() 49 49 acceptedTx := make([][]byte, 0, len(req.Txs)) 50 50 toProcess := req.Txs 51 + deps := d.transactionProcessorDependenciesForOngoingProcessing(true, req.Time) 51 52 for { 52 53 toTryNext := [][]byte{} 53 54 for _, tx := range toProcess { 54 - result, err := processTx(ctx, d.transactionProcessorDependenciesForOngoingProcessing(true, req.Time), tx) 55 + result, err := processTx(ctx, deps, tx) 55 56 if err != nil { 56 57 return nil, stacktrace.Propagate(err, "") 57 58 } ··· 77 78 toProcess = toTryNext 78 79 } 79 80 80 - maybeTx, err := d.maybeCreateAuthoritativeImportTx(ctx) 81 + maybeTx, err := d.maybeCreateAuthoritativeImportTx(ctx, deps.getAuthoritativeOperationsFetcher) 81 82 if err != nil { 82 83 // TODO don't fail absolutely silently always, we should at least check what the error is 83 - //return nil, stacktrace.Propagate(err, "") 84 + d.logger.Error("failed to create authoritative import transaction", "error", stacktrace.Propagate(err, "")) 84 85 } 85 86 86 87 if err == nil && len(maybeTx) != 0 { ··· 132 133 d.blockChallengeCoordinator.notifyOfIncomingBlockHeight(req.Height) 133 134 134 135 txResults := make([]*processResult, len(req.Txs)) 136 + deps := d.transactionProcessorDependenciesForOngoingProcessing(true, req.Time) 135 137 for i, tx := range req.Txs { 136 138 result, action, processor, err := beginProcessTx(tx) 137 139 if err != nil { ··· 143 145 return &abcitypes.ResponseProcessProposal{Status: abcitypes.ResponseProcessProposal_REJECT}, nil 144 146 } 145 147 146 - result, err = finishProcessTx(ctx, d.transactionProcessorDependenciesForOngoingProcessing(true, req.Time), processor, tx) 148 + result, err = finishProcessTx(ctx, deps, processor, tx) 147 149 if err != nil { 148 150 return nil, stacktrace.Propagate(err, "") 149 151 } ··· 266 268 func (d *DIDPLCApplication) transactionProcessorDependenciesForCommittedRead() TransactionProcessorDependencies { 267 269 readTx := d.txFactory.ReadCommitted() 268 270 return TransactionProcessorDependencies{ 269 - runnerContext: d.runnerContext, 270 - workingHeight: readTx.Height() + 1, 271 - plc: d.plc, 272 - readTx: readTx, 273 - writeTx: mo.None[transaction.Write](), 274 - aocsByPLC: d.aocsByPLC, 275 - blockChallengeCoordinator: d.blockChallengeCoordinator, 276 - blockStore: d.blockStore, 271 + workingHeight: readTx.Height() + 1, 272 + plc: d.plc, 273 + readTx: readTx, 274 + writeTx: mo.None[transaction.Write](), 275 + getAuthoritativeOperationsFetcher: d.buildAuthoritativeOperationsFetcher, 276 + destroyAuthoritativeOperationsFetcher: d.destroyAuthoritativeOperationsFetcher, 277 + blockChallengeCoordinator: d.blockChallengeCoordinator, 278 + blockStore: d.blockStore, 277 279 } 278 280 } 279 281 ··· 287 289 } 288 290 289 291 return TransactionProcessorDependencies{ 290 - runnerContext: d.runnerContext, 291 - workingHeight: d.ongoingRead.Height(), 292 - readTx: d.ongoingRead, 293 - writeTx: writeTx, 294 - plc: d.plc, 295 - aocsByPLC: d.aocsByPLC, 296 - blockChallengeCoordinator: d.blockChallengeCoordinator, 297 - blockStore: d.blockStore, 292 + workingHeight: d.ongoingRead.Height(), 293 + readTx: d.ongoingRead, 294 + writeTx: writeTx, 295 + plc: d.plc, 296 + getAuthoritativeOperationsFetcher: d.buildAuthoritativeOperationsFetcher, 297 + destroyAuthoritativeOperationsFetcher: d.destroyAuthoritativeOperationsFetcher, 298 + blockChallengeCoordinator: d.blockChallengeCoordinator, 299 + blockStore: d.blockStore, 298 300 } 299 301 } 300 302 ··· 310 312 } 311 313 } 312 314 } 315 + 316 + func (d *DIDPLCApplication) buildAuthoritativeOperationsFetcher(plc string) *authoritativeOperationsFetcher { 317 + if d.aoc != nil && plc != d.aoc.plcURL { 318 + d.destroyAuthoritativeOperationsFetcher() 319 + } 320 + 321 + if d.aoc == nil { 322 + d.aoc = newAuthoritativeOperationsFetcher(d.runnerContext, d.logger, plc, d.triggerBlockCreation) 323 + } 324 + 325 + return d.aoc 326 + } 327 + 328 + func (d *DIDPLCApplication) destroyAuthoritativeOperationsFetcher() { 329 + if d.aoc != nil { 330 + d.aoc.cancel() 331 + } 332 + d.aoc = nil 333 + }
+146 -31
abciapp/import.go
··· 7 7 "encoding/binary" 8 8 "encoding/hex" 9 9 "encoding/json" 10 + "errors" 10 11 "fmt" 11 12 "net/http" 12 13 "net/url" ··· 14 15 "time" 15 16 16 17 "github.com/bluesky-social/indigo/atproto/syntax" 18 + "github.com/coder/websocket" 19 + "github.com/coder/websocket/wsjson" 20 + cmtlog "github.com/cometbft/cometbft/libs/log" 17 21 "github.com/did-method-plc/go-didplc" 18 22 "github.com/ipfs/go-cid" 19 23 cbornode "github.com/ipfs/go-ipld-cbor" ··· 26 30 const MaxOpsPerImportTx = 1000 27 31 const OpsPerEagerFetch = 1000 28 32 29 - type authoritativeOperationsCache struct { 30 - mu sync.Mutex 33 + const EagerFetchFrequencyWhenBusy = 500 * time.Millisecond 34 + const EagerFetchFrequencyWhenIdle = 5 * time.Second 35 + const EagerFetchFrequencyWhenUsingWS = 5 * time.Minute // if we're using the websocket to discover new operations, then this is really meant as a fallback 36 + 37 + type authoritativeOperationsFetcher struct { 38 + logger cmtlog.Logger 39 + mu sync.Mutex 40 + cancel func() 41 + client *http.Client 42 + 43 + triggerBlockCreation func() 44 + 45 + importProgress uint64 31 46 32 - plcURL string 33 - operations map[uint64]logEntryWithSeq 34 - highestFetchedHeight uint64 47 + plcURL string 48 + operations map[uint64]logEntryWithSeq 49 + highestFetchedSeq uint64 50 + triedUsingWS bool 51 + isUsingWS bool 35 52 } 36 53 37 54 type logEntryWithSeq struct { ··· 39 56 Seq uint64 `json:"seq"` 40 57 } 41 58 42 - func newAuthoritativeOperationsCache(ctx context.Context, plc string) *authoritativeOperationsCache { 43 - aoc := &authoritativeOperationsCache{ 44 - plcURL: plc, 45 - operations: make(map[uint64]logEntryWithSeq), 59 + func newAuthoritativeOperationsFetcher(ctx context.Context, logger cmtlog.Logger, plc string, triggerBlockCreation func()) *authoritativeOperationsFetcher { 60 + ctx, cancel := context.WithCancel(ctx) 61 + aoc := &authoritativeOperationsFetcher{ 62 + logger: logger, 63 + cancel: cancel, 64 + client: &http.Client{Timeout: 30 * time.Second}, 65 + plcURL: plc, 66 + triggerBlockCreation: triggerBlockCreation, 67 + operations: make(map[uint64]logEntryWithSeq), 46 68 } 47 69 48 70 go func() { 49 - ticker := time.NewTicker(500 * time.Millisecond) 71 + tickerInterval := EagerFetchFrequencyWhenBusy 72 + fetchTicker := time.NewTicker(tickerInterval) 50 73 for { 51 74 select { 52 75 case <-ctx.Done(): 53 76 return 54 - case <-ticker.C: 55 - aoc.eagerlyFetch(ctx) 77 + case <-fetchTicker.C: 78 + desiredInterval := aoc.eagerlyFetch(ctx) 79 + if desiredInterval != tickerInterval { 80 + tickerInterval = desiredInterval 81 + fetchTicker.Reset(tickerInterval) 82 + } 56 83 } 57 84 } 58 85 }() ··· 60 87 return aoc 61 88 } 62 89 63 - func getOrCreateAuthoritativeOperationsCache(ctx context.Context, aocsByPLC map[string]*authoritativeOperationsCache, plc string) *authoritativeOperationsCache { 64 - aoc, ok := aocsByPLC[plc] 65 - if !ok { 66 - aoc = newAuthoritativeOperationsCache(ctx, plc) 67 - aocsByPLC[plc] = aoc 90 + func (a *authoritativeOperationsFetcher) maybeLaunchWebsocketClientInMutex(ctx context.Context, cursor uint64) { 91 + if a.triedUsingWS { 92 + return 93 + } 94 + a.triedUsingWS = true 95 + 96 + go a.launchWebsocketClient(ctx, cursor) 97 + } 98 + 99 + func (a *authoritativeOperationsFetcher) launchWebsocketClient(ctx context.Context, cursor uint64) { 100 + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) 101 + defer cancel() 102 + 103 + url, err := url.Parse(a.plcURL) 104 + if err != nil { 105 + // this really shouldn't happen 106 + a.logger.Error("authoritative operations fetcher failed to parse known good PLC URL", "error", stacktrace.Propagate(err, "")) 107 + return 108 + } 109 + 110 + url.Scheme = "wss" 111 + url = url.JoinPath(a.plcURL, "/export/stream") 112 + q := url.Query() 113 + q.Add("cursor", fmt.Sprint(cursor)) 114 + url.RawQuery = q.Encode() 115 + 116 + c, _, err := websocket.Dial(ctx, url.String(), &websocket.DialOptions{ 117 + HTTPClient: a.client, 118 + }) 119 + if err != nil { 120 + a.logger.Error("authoritative operations fetcher failed to dial websocket", 121 + "error", stacktrace.Propagate(err, ""), 122 + "url", url.String(), 123 + ) 124 + return 125 + } 126 + defer c.CloseNow() 127 + 128 + a.mu.Lock() 129 + a.isUsingWS = true 130 + a.mu.Unlock() 131 + 132 + defer func() { 133 + a.mu.Lock() 134 + defer a.mu.Unlock() 135 + a.triedUsingWS = false // so we try to reconnect if eagerlyFetch runs again 136 + a.isUsingWS = false 137 + }() 138 + 139 + a.logger.Info("authoritative operations fetcher connected to websocket") 140 + 141 + for { 142 + var entry logEntryWithSeq 143 + err := wsjson.Read(ctx, c, &entry) 144 + if err != nil { 145 + if errors.Is(err, context.Canceled) { 146 + return 147 + } 148 + var closeError websocket.CloseError 149 + if errors.As(err, &closeError) { 150 + a.logger.Error("authoritative operations fetcher websocket connection closed", 151 + "code", closeError.Code, 152 + "reason", closeError.Reason) 153 + return 154 + } 155 + 156 + a.logger.Error("authoritative operations fetcher failed to read from websocket", "error", stacktrace.Propagate(err, "")) 157 + return 158 + } 159 + 160 + a.mu.Lock() 161 + a.operations[entry.Seq] = entry 162 + a.highestFetchedSeq = max(a.highestFetchedSeq, entry.Seq) 163 + 164 + if a.highestFetchedSeq > a.importProgress { 165 + a.triggerBlockCreation() 166 + } 167 + a.mu.Unlock() 68 168 } 69 - return aoc 70 169 } 71 170 72 - func (a *authoritativeOperationsCache) eagerlyFetch(ctx context.Context) { 171 + func (a *authoritativeOperationsFetcher) eagerlyFetch(ctx context.Context) time.Duration { 73 172 a.mu.Lock() 74 173 defer a.mu.Unlock() 75 174 76 175 curOps := len(a.operations) 77 176 if curOps >= EagerFetchMaxOps { 78 - return 177 + return EagerFetchFrequencyWhenBusy 178 + } 179 + reachedEnd, err := a.fetchInMutex(ctx, a.highestFetchedSeq, OpsPerEagerFetch) 180 + if err != nil { 181 + a.logger.Error("authoritative operations fetcher failed eager fetch", "error", stacktrace.Propagate(err, "")) 79 182 } 80 - _, _ = a.fetchInMutex(ctx, a.highestFetchedHeight, OpsPerEagerFetch) 183 + 184 + if a.highestFetchedSeq > a.importProgress { 185 + a.triggerBlockCreation() 186 + } 187 + 188 + if err == nil && reachedEnd { 189 + a.maybeLaunchWebsocketClientInMutex(ctx, a.highestFetchedSeq) 190 + if a.isUsingWS { 191 + return EagerFetchFrequencyWhenUsingWS 192 + } 193 + return EagerFetchFrequencyWhenIdle 194 + } 195 + return EagerFetchFrequencyWhenBusy 81 196 } 82 197 83 - func (a *authoritativeOperationsCache) dropSeqBelowOrEqual(highestCommittedSeq uint64) { 198 + func (a *authoritativeOperationsFetcher) setImportProgress(highestCommittedSeq uint64) { 84 199 a.mu.Lock() 85 200 defer a.mu.Unlock() 86 201 202 + a.importProgress = highestCommittedSeq 203 + 87 204 for i := range a.operations { 88 205 if a.operations[i].Seq <= highestCommittedSeq { 89 206 delete(a.operations, i) ··· 91 208 } 92 209 } 93 210 94 - func (a *authoritativeOperationsCache) fetchInMutex(ctx context.Context, after, count uint64) (bool, error) { 95 - entries, _, err := fetchExportedBatchFromAuthoritativeSource(ctx, a.plcURL, after, count) 211 + func (a *authoritativeOperationsFetcher) fetchInMutex(ctx context.Context, after, count uint64) (bool, error) { 212 + entries, _, err := fetchExportedBatchFromAuthoritativeSource(ctx, a.client, a.plcURL, after, count) 96 213 if err != nil { 97 214 return false, stacktrace.Propagate(err, "") 98 215 } 99 216 100 217 for _, entry := range entries { 101 218 a.operations[entry.Seq] = entry 102 - a.highestFetchedHeight = max(a.highestFetchedHeight, entry.Seq) 219 + a.highestFetchedSeq = max(a.highestFetchedSeq, entry.Seq) 103 220 } 104 221 return uint64(len(entries)) < count, nil 105 222 } 106 223 107 - func (a *authoritativeOperationsCache) get(ctx context.Context, after, count uint64) ([]logEntryWithSeq, error) { 224 + func (a *authoritativeOperationsFetcher) get(ctx context.Context, after, count uint64) ([]logEntryWithSeq, error) { 108 225 a.mu.Lock() 109 226 defer a.mu.Unlock() 110 227 ··· 140 257 return result, nil 141 258 } 142 259 143 - func fetchExportedBatchFromAuthoritativeSource(ctx context.Context, plcURL string, startAt, maxCount uint64) ([]logEntryWithSeq, uint64, error) { 260 + func fetchExportedBatchFromAuthoritativeSource(ctx context.Context, client *http.Client, plcURL string, startAt, maxCount uint64) ([]logEntryWithSeq, uint64, error) { 144 261 baseURL, err := url.JoinPath(plcURL, "/export") 145 262 if err != nil { 146 263 return nil, 0, stacktrace.Propagate(err, "") 147 264 } 148 - 149 - client := &http.Client{Timeout: 30 * time.Second} 150 265 151 266 entries := make([]logEntryWithSeq, 0, maxCount) 152 267 for { ··· 248 363 return hash.Sum(nil), nil 249 364 } 250 365 251 - func (d *DIDPLCApplication) maybeCreateAuthoritativeImportTx(ctx context.Context) ([]byte, error) { 366 + func (d *DIDPLCApplication) maybeCreateAuthoritativeImportTx(ctx context.Context, aocGetter func(plcURL string) *authoritativeOperationsFetcher) ([]byte, error) { 252 367 // use WorkingTreeVersion so we take into account any import operation that may have been processed in this block 253 368 readTx := d.txFactory.ReadWorking(time.Now()) 254 369 ··· 267 382 return nil, stacktrace.Propagate(err, "") 268 383 } 269 384 270 - aoc := getOrCreateAuthoritativeOperationsCache(d.runnerContext, d.aocsByPLC, plcURL) 385 + aoc := aocGetter(plcURL) 271 386 272 387 entries, err := aoc.get(ctx, cursor, OpsPerImportTx) 273 388 if err != nil {
+8 -8
abciapp/tx.go
··· 24 24 type TransactionAction string 25 25 26 26 type TransactionProcessorDependencies struct { 27 - runnerContext context.Context 28 - workingHeight int64 29 - readTx transaction.Read 30 - writeTx mo.Option[transaction.Write] 31 - plc plc.PLC 32 - aocsByPLC map[string]*authoritativeOperationsCache 33 - blockChallengeCoordinator *blockChallengeCoordinator 34 - blockStore *bftstore.BlockStore 27 + workingHeight int64 28 + readTx transaction.Read 29 + writeTx mo.Option[transaction.Write] 30 + plc plc.PLC 31 + getAuthoritativeOperationsFetcher func(plc string) *authoritativeOperationsFetcher 32 + destroyAuthoritativeOperationsFetcher func() 33 + blockChallengeCoordinator *blockChallengeCoordinator 34 + blockStore *bftstore.BlockStore 35 35 } 36 36 37 37 type TransactionProcessor func(ctx context.Context, deps TransactionProcessorDependencies, txBytes []byte) (*processResult, error)
+6 -3
abciapp/tx_import.go
··· 69 69 70 70 return &processResult{ 71 71 Code: 0, 72 + commitSideEffects: []func(){ 73 + deps.destroyAuthoritativeOperationsFetcher, 74 + }, 72 75 }, nil 73 76 } 74 77 ··· 111 114 }, nil 112 115 } 113 116 114 - aoc := getOrCreateAuthoritativeOperationsCache(deps.runnerContext, deps.aocsByPLC, expectedPlcUrl) 117 + fetcher := deps.getAuthoritativeOperationsFetcher(expectedPlcUrl) 115 118 116 119 expectedCursor, err := store.Consensus.AuthoritativeImportProgress(deps.readTx) 117 120 if err != nil { ··· 132 135 }, nil 133 136 } 134 137 135 - operations, err := aoc.get(ctx, expectedCursor, tx.Arguments.Count) 138 + operations, err := fetcher.get(ctx, expectedCursor, tx.Arguments.Count) 136 139 if err != nil { 137 140 return &processResult{ 138 141 Code: 4113, ··· 181 184 return &processResult{ 182 185 commitSideEffects: []func(){ 183 186 func() { 184 - aoc.dropSeqBelowOrEqual(newCursor) 187 + fetcher.setImportProgress(newCursor) 185 188 }, 186 189 }, 187 190 Code: 0,
+1
go.mod
··· 7 7 github.com/Yiling-J/theine-go v0.6.2 8 8 github.com/bits-and-blooms/bloom/v3 v3.7.1 9 9 github.com/bluesky-social/indigo v0.0.0-20251009212240-20524de167fe 10 + github.com/coder/websocket v1.8.14 10 11 github.com/cometbft/cometbft v0.38.19 11 12 github.com/cometbft/cometbft-db v0.14.1 12 13 github.com/consensys/gnark v0.14.0
+2
go.sum
··· 45 45 github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= 46 46 github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAKVxetITBuuhv3BI9cMrmStnpT18zmgmTxunpo= 47 47 github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ= 48 + github.com/coder/websocket v1.8.14 h1:9L0p0iKiNOibykf283eHkKUHHrpG7f65OE3BhhO7v9g= 49 + github.com/coder/websocket v1.8.14/go.mod h1:NX3SzP+inril6yawo5CQXx8+fk145lPDC6pumgx0mVg= 48 50 github.com/cometbft/cometbft v0.38.19 h1:vNdtCkvhuwUlrcLPAyigV7lQpmmo+tAq8CsB8gZjEYw= 49 51 github.com/cometbft/cometbft v0.38.19/go.mod h1:UCu8dlHqvkAsmAFmWDRWNZJPlu6ya2fTWZlDrWsivwo= 50 52 github.com/cometbft/cometbft-db v0.14.1 h1:SxoamPghqICBAIcGpleHbmoPqy+crij/++eZz3DlerQ=
+39 -1
main.go
··· 8 8 "os" 9 9 "os/signal" 10 10 "path/filepath" 11 + "reflect" 11 12 "syscall" 12 13 "time" 14 + "unsafe" 13 15 16 + "github.com/cometbft/cometbft/mempool" 14 17 "github.com/cometbft/cometbft/p2p" 15 18 "github.com/cometbft/cometbft/privval" 16 19 "github.com/cometbft/cometbft/proxy" ··· 138 141 log.Fatalf("Creating node: %v", err) 139 142 } 140 143 144 + txsAvailableChan := getMempoolTxsAvailableChan(node) 145 + 141 146 mempoolSubmitter.node = node 142 147 143 - err = app.FinishInitializing(node.BlockStore()) 148 + err = app.FinishInitializing(node.BlockStore(), blockCreationTrigger(txsAvailableChan)) 144 149 if err != nil { 145 150 log.Fatalf("Finishing ABCI app initialization: %v", err) 146 151 } ··· 197 202 signal.Notify(c, os.Interrupt, syscall.SIGTERM) 198 203 <-c 199 204 } 205 + 206 + func getMempoolTxsAvailableChan(node *nm.Node) chan struct{} { 207 + clistMempool, ok := node.Mempool().(*mempool.CListMempool) 208 + if !ok { 209 + return nil 210 + } 211 + 212 + val := reflect.ValueOf(clistMempool) 213 + val = reflect.Indirect(val) 214 + field := val.FieldByName("txsAvailable") 215 + if field.IsZero() || !field.CanAddr() { 216 + return nil 217 + } 218 + 219 + chanIface := reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem().Interface() 220 + c, ok := chanIface.(chan struct{}) 221 + if !ok { 222 + return nil 223 + } 224 + return c 225 + } 226 + 227 + func blockCreationTrigger(txsAvailable chan struct{}) func() { 228 + return func() { 229 + if txsAvailable == nil { 230 + return 231 + } 232 + select { 233 + case txsAvailable <- struct{}{}: 234 + default: 235 + } 236 + } 237 + }