this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

tools/jsonrpc2: permit func enqueuing into message daisychain

The existing JSONRPC AsyncHandler creates an infinite queue of messages
received from the client. Every message received instantly spawns a new
go-routine, which waits for the previous go-routine to finish work
before starting processing of its own message. This chain of go-routines
is synchronised via channels, which act as memory barriers.
Consequently, although every message the LSP server processes uses a
different go-routine, there is no need for additional synchronisation;
conceptually, the LSP remains single-threaded.

JSONRPC, and the LSP protocol, are designed to be bi-directional: either
agent can call the other. This leads to the possibility that both agents
call each other at the same time, and then block, each waiting for the
reply to their call.

There are currently two places, both in [server.UpdateWatchedFiles]
where the server makes calls to the client, but soon there will be more.
If the client behaves badly and does not reply, then this can block the
server. Adding timeouts on these calls, which we were missing, will
help; plus we don't particularly care about the response to these calls
- it's either an error or a void result, neither of which we currently
care about.

Nevertheless, testing shows that we significantly improve the odds of
these (and other) calls to the client working (read: working around
slightly faulty editors) if we avoid making calls to the client when we
know there's a queue of unprocessed messages (notifications, calls etc)
from the client. To do this, we enqueue the function making the call
onto the end of the queue of messages from the client. This is the
EnqueueHandler. This is never going to be perfect: there's always the
chance that the client and LSP both make calls at the same time. But if
we already know there's a call from the client arrived and pending
processing (thus the client is likely blocked waiting for a reply), then
we now always delay making calls back to the client until after the
client's calls are processed.

This mechanism also means arbitrary callback functions can be safely
injected into the processing queue without the need for any additional
synchronisation mechanism (locks etc).

Signed-off-by: Matthew Sackman <matthew@cue.works>
Change-Id: Ifbf525d23df5df70b64e96f55a1dd58a229a9770
Reviewed-on: https://cue.gerrithub.io/c/cue-lang/cue/+/1233760
Reviewed-by: Roger Peppe <rogpeppe@gmail.com>
TryBot-Result: CUEcueckoo <cueckoo@cuelang.org>

+130 -35
+11 -17
internal/golangorgx/gopls/lsprpc/lsprpc.go
··· 55 55 options := settings.DefaultOptions(s.optionsOverrides) 56 56 svr := server.New(s.cache, client, options) 57 57 svrID := svr.ID() 58 - // Clients may or may not send a shutdown message. Make sure the server is 59 - // shut down. 60 - // 61 - // TODO(ms): temporarily disabled because it introduces a 62 - // data-race: this is a moment of genuine concurrency. It would be 63 - // much better to inject the shutdown message onto the end of the 64 - // jsonrpc2 stream somehow. For now, there's nothing important to 65 - // do for shutdown, so disabling this is fine, and it solves the 66 - // data-race. It's possible we could get away with moving the 67 - // <-conn.Done() call within the defer, prior to the Shutdown call 68 - // - that would provide the necessary memory barries. TBD. 69 - // 70 - // defer svr.Shutdown(ctx) 58 + 59 + handlers, enqueue := protocol.HandlersWithEnqueue( 60 + handshaker(svrID, s.daemon, 61 + protocol.ServerHandler(svr, jsonrpc2.MethodNotFound))) 62 + svr.SetEnqueuer(enqueue.Enqueue) 63 + 64 + // Clients may or may not send a shutdown message. Make sure the 65 + // server is shut down. 66 + defer enqueue.Enqueue(func() { svr.Shutdown(ctx) }) 67 + 71 68 ctx = protocol.WithClient(ctx, client) 72 - conn.Go(ctx, 73 - protocol.Handlers( 74 - handshaker(svrID, s.daemon, 75 - protocol.ServerHandler(svr, jsonrpc2.MethodNotFound)))) 69 + conn.Go(ctx, handlers) 76 70 if s.daemon { 77 71 log.Printf("Server %s: connected", svrID) 78 72 defer log.Printf("Server %s: exited", svrID)
+5
internal/golangorgx/gopls/protocol/protocol.go
··· 110 110 jsonrpc2.MustReplyHandler(handler))) 111 111 } 112 112 113 + func HandlersWithEnqueue(handler jsonrpc2.Handler) (jsonrpc2.Handler, *jsonrpc2.EnqueueHandler) { 114 + enqueueHandler := jsonrpc2.NewEnqueueHandler(jsonrpc2.MustReplyHandler(handler)) 115 + return CancelHandler(enqueueHandler.Handle), enqueueHandler 116 + } 117 + 113 118 func CancelHandler(handler jsonrpc2.Handler) jsonrpc2.Handler { 114 119 handler, canceller := jsonrpc2.CancelHandler(handler) 115 120 return func(ctx context.Context, reply jsonrpc2.Replier, req jsonrpc2.Request) error {
+62
internal/golangorgx/tools/jsonrpc2/handler.go
··· 107 107 return nil 108 108 } 109 109 } 110 + 111 + // EnqueueHandler behaves exactly the same as [AsyncHandler]: each 112 + // request is processed in its own goroutine, and requests are 113 + // enqueued non-blocking. 114 + // 115 + // Unlike [AsyncHandler], it also allows arbitrary functions to be 116 + // added to the queue. 117 + type EnqueueHandler struct { 118 + lock sync.Mutex 119 + nextRequest chan struct{} 120 + handler Handler 121 + } 122 + 123 + func NewEnqueueHandler(handler Handler) *EnqueueHandler { 124 + nextRequest := make(chan struct{}) 125 + close(nextRequest) 126 + return &EnqueueHandler{ 127 + nextRequest: nextRequest, 128 + handler: handler, 129 + } 130 + } 131 + 132 + // Handle implements [Handler] 133 + func (h *EnqueueHandler) Handle(ctx context.Context, reply Replier, req Request) error { 134 + h.lock.Lock() 135 + waitForPrevious := h.nextRequest 136 + h.nextRequest = make(chan struct{}) 137 + unlockNext := h.nextRequest 138 + h.lock.Unlock() 139 + 140 + innerReply := reply 141 + reply = func(ctx context.Context, result any, err error) error { 142 + close(unlockNext) 143 + return innerReply(ctx, result, err) 144 + } 145 + _, queueDone := event.Start(ctx, "queued") 146 + go func() { 147 + <-waitForPrevious 148 + queueDone() 149 + if err := h.handler(ctx, reply, req); err != nil { 150 + event.Error(ctx, "jsonrpc2 async message delivery failed", err) 151 + } 152 + }() 153 + return nil 154 + } 155 + 156 + // Enqueue adds the fun to the end of the queue. It is 157 + // non-blocking. The fun will be run in its own go-routine, and will 158 + // not run until the previous item in the queue has finished running. 159 + func (h *EnqueueHandler) Enqueue(fun func()) { 160 + h.lock.Lock() 161 + waitForPrevious := h.nextRequest 162 + h.nextRequest = make(chan struct{}) 163 + unlockNext := h.nextRequest 164 + h.lock.Unlock() 165 + 166 + go func() { 167 + <-waitForPrevious 168 + fun() 169 + close(unlockNext) 170 + }() 171 + }
+7 -1
internal/lsp/cache/workspace.go
··· 67 67 files map[protocol.DocumentURI]*File 68 68 69 69 standalone *Standalone 70 + 71 + // enqueue allows for a function to be added to the incoming queue 72 + // of messages from the client. The enqueue function itself is 73 + // non-blocking. 74 + enqueue func(func()) 70 75 } 71 76 72 - func NewWorkspace(cache *Cache, client protocol.Client, debugLog func(string)) *Workspace { 77 + func NewWorkspace(cache *Cache, client protocol.Client, debugLog func(string), enqueue func(func())) *Workspace { 73 78 overlayFS := fscache.NewOverlayFS(cache.fs) 74 79 w := &Workspace{ 75 80 registry: &registryWrapper{ ··· 83 88 modules: make(map[protocol.DocumentURI]*Module), 84 89 mappers: make(map[*token.File]*protocol.Mapper), 85 90 files: make(map[protocol.DocumentURI]*File), 91 + enqueue: enqueue, 86 92 } 87 93 w.standalone = NewStandalone(w) 88 94 return w
+1 -1
internal/lsp/server/initialize.go
··· 173 173 } 174 174 } 175 175 176 - s.workspace = cache.NewWorkspace(s.cache, s.client, s.debugLog) 176 + s.workspace = cache.NewWorkspace(s.cache, s.client, s.debugLog, s.enqueue) 177 177 178 178 err = s.maybeUseWorkspaceFolders(ctx) 179 179 // Initialized is a notification, so if there's an error, we show
+14
internal/lsp/server/server.go
··· 28 28 // ID returns a unique, human-readable string for this server, for 29 29 // the purpose of log messages and debugging. 30 30 ID() string 31 + 32 + // SetEnqueuer allows the server's enqueue function to be set. This 33 + // exists only because of awkwardness around the creation of the 34 + // server and the JSONRPC handlers. 35 + SetEnqueuer(func(func())) 31 36 } 32 37 33 38 // New creates an LSP server and binds it to handle incoming client ··· 97 102 // The map field may be reassigned but the map itself is immutable. 98 103 watchedGlobPatterns map[protocol.RelativePattern]struct{} 99 104 watchingIDCounter int 105 + 106 + // enqueue allows for a function to be added to the incoming queue 107 + // of messages from the client. The enqueue function itself is 108 + // non-blocking. 109 + enqueue func(func()) 100 110 } 101 111 102 112 var _ ServerWithID = (*server)(nil) 103 113 104 114 func (s *server) ID() string { return s.id } 115 + 116 + func (s *server) SetEnqueuer(enqueue func(func())) { 117 + s.enqueue = enqueue 118 + } 105 119 106 120 // Shutdown implements the 'shutdown' LSP handler. It releases resources 107 121 // associated with the server and waits for all ongoing work to complete.
+30 -16
internal/lsp/server/watching.go
··· 19 19 "fmt" 20 20 "path" 21 21 "path/filepath" 22 + "time" 22 23 23 24 "cuelang.org/go/internal/golangorgx/gopls/protocol" 24 25 ) ··· 87 88 }) 88 89 } 89 90 90 - err := s.client.RegisterCapability(ctx, &protocol.RegistrationParams{ 91 - Registrations: []protocol.Registration{{ 92 - ID: WatchedFilesCapabilityID(curID), 93 - Method: "workspace/didChangeWatchedFiles", 94 - RegisterOptions: protocol.DidChangeWatchedFilesRegistrationOptions{ 95 - Watchers: watchers, 96 - }, 97 - }}, 91 + s.enqueue(func() { 92 + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 93 + defer cancel() 94 + err := s.client.RegisterCapability(ctx, 95 + &protocol.RegistrationParams{ 96 + Registrations: []protocol.Registration{{ 97 + ID: WatchedFilesCapabilityID(curID), 98 + Method: "workspace/didChangeWatchedFiles", 99 + RegisterOptions: protocol.DidChangeWatchedFilesRegistrationOptions{ 100 + Watchers: watchers, 101 + }, 102 + }}, 103 + }) 104 + if err != nil { 105 + s.debugLog(err.Error()) 106 + } 98 107 }) 99 - if err != nil { 100 - return err 101 - } 102 108 103 109 if oldID > 0 { 104 - return s.client.UnregisterCapability(ctx, &protocol.UnregistrationParams{ 105 - Unregisterations: []protocol.Unregistration{{ 106 - ID: WatchedFilesCapabilityID(oldID), 107 - Method: "workspace/didChangeWatchedFiles", 108 - }}, 110 + s.enqueue(func() { 111 + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 112 + defer cancel() 113 + err := s.client.UnregisterCapability(ctx, 114 + &protocol.UnregistrationParams{ 115 + Unregisterations: []protocol.Unregistration{{ 116 + ID: WatchedFilesCapabilityID(oldID), 117 + Method: "workspace/didChangeWatchedFiles", 118 + }}, 119 + }) 120 + if err != nil { 121 + s.debugLog(err.Error()) 122 + } 109 123 }) 110 124 } 111 125