this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

pkg/tools/http: experimental implementation of http.Serve

This allows cue cmd to be used to serve requests. Our intention
is to ultimately replace cue cmd with something more principled.
This implementation allows us to get more experience with using
cue cmd for serving, so that we can make better design decisions.

Signed-off-by: Marcel van Lohuizen <mpvl@gmail.com>
Change-Id: I5eb048cb3f303bdc631c4a16eb8ade211d4c45eb
Reviewed-on: https://cue.gerrithub.io/c/cue-lang/cue/+/1221921
Reviewed-by: Roger Peppe <rogpeppe@gmail.com>
TryBot-Result: CUEcueckoo <cueckoo@cuelang.org>
Unity-Result: CUE porcuepine <cue.porcuepine@gmail.com>

+1402 -55
+7
cmd/cue/cmd/custom.go
··· 18 18 19 19 import ( 20 20 "fmt" 21 + "os" 22 + "os/signal" 21 23 "strings" 22 24 "sync/atomic" 23 25 ··· 212 214 // Return early if anything was in error 213 215 if err := c.Run(cmd.Context()); err != nil { 214 216 return err 217 + } 218 + if itask.Background.Load() { 219 + signalChan := make(chan os.Signal, 1) 220 + signal.Notify(signalChan, os.Interrupt) 221 + <-signalChan 215 222 } 216 223 217 224 if !didWork.Load() {
+81
cmd/cue/cmd/script_test.go
··· 263 263 fmt.Fprintln(ts.Stdout(), s) 264 264 } 265 265 }, 266 + // curl is a simple HTTP client for testscripts. 267 + // Supports: -X METHOD, -H header, -d data, -L (follow redirects), -w format, -f (fail on error) 268 + "curl": func(ts *testscript.TestScript, neg bool, args []string) { 269 + method := "GET" 270 + var headers []string 271 + var data string 272 + followRedirects := false 273 + writeFormat := "" 274 + failOnError := false 275 + 276 + var reqURL string 277 + for i := 0; i < len(args); i++ { 278 + arg := args[i] 279 + switch { 280 + case arg == "-X" && i+1 < len(args): 281 + i++ 282 + method = args[i] 283 + case arg == "-H" && i+1 < len(args): 284 + i++ 285 + headers = append(headers, args[i]) 286 + case arg == "-d" && i+1 < len(args): 287 + i++ 288 + data = args[i] 289 + if method == "GET" { 290 + method = "POST" 291 + } 292 + case arg == "-L": 293 + followRedirects = true 294 + case arg == "-f": 295 + failOnError = true 296 + case arg == "-w" && i+1 < len(args): 297 + i++ 298 + writeFormat = args[i] 299 + case !strings.HasPrefix(arg, "-"): 300 + reqURL = arg 301 + } 302 + } 303 + if reqURL == "" { 304 + ts.Fatalf("curl: no URL specified") 305 + } 306 + 307 + var body io.Reader 308 + if data != "" { 309 + body = strings.NewReader(data) 310 + } 311 + req, err := http.NewRequest(method, reqURL, body) 312 + ts.Check(err) 313 + 314 + for _, h := range headers { 315 + key, val, _ := strings.Cut(h, ":") 316 + req.Header.Add(strings.TrimSpace(key), strings.TrimSpace(val)) 317 + } 318 + 319 + client := &http.Client{} 320 + if !followRedirects { 321 + client.CheckRedirect = func(req *http.Request, via []*http.Request) error { 322 + return http.ErrUseLastResponse 323 + } 324 + } 325 + 326 + resp, err := client.Do(req) 327 + ts.Check(err) 328 + defer resp.Body.Close() 329 + 330 + _, err = io.Copy(ts.Stdout(), resp.Body) 331 + ts.Check(err) 332 + 333 + // Handle -w format (mainly for adding newline) 334 + if writeFormat != "" { 335 + fmt.Fprint(ts.Stdout(), strings.ReplaceAll(writeFormat, `\n`, "\n")) 336 + } 337 + 338 + // Check for HTTP errors when -f is used 339 + failed := failOnError && resp.StatusCode >= 400 340 + if failed && !neg { 341 + ts.Fatalf("curl: HTTP %d", resp.StatusCode) 342 + } 343 + if !failed && neg { 344 + ts.Fatalf("curl: expected failure but got HTTP %d", resp.StatusCode) 345 + } 346 + }, 266 347 }, 267 348 Setup: func(e *testscript.Env) error { 268 349 // If a testscript loads CUE packages but forgot to set up a cue.mod,
+364
cmd/cue/cmd/testdata/script/cmd_serve.txtar
··· 1 + exec cue cmd serve & 2 + 3 + exec sleep 1 4 + 5 + curl -X GET http://localhost:8082/test 6 + stdout 'ack: using method GET' 7 + 8 + curl -X GET http://localhost:8082/one/1/two/i/i 9 + stdout 'ack: one/1 two/i/i' 10 + 11 + curl -X GET http://localhost:8082/authz/alice 12 + stdout 'true' 13 + 14 + curl -X GET http://localhost:8082/authz/bob 15 + stdout 'false' 16 + 17 + curl -w '\n' -X GET http://localhost:8082/schema/api/vm?r=research 18 + cmp stdout out/vm-research 19 + 20 + curl -L 'http://localhost:8082/api/vm?u=alice' -H 'Content-Type: application/json' -d '{"os":"Linux","arch":"x86","mem":10000000000,"disk":20000000000,"access": {"public":true}}' 21 + stdout 'OK' 22 + 23 + curl -L 'http://localhost:8082/authz' -H 'Content-Type: application/json' -d '{"user":"alice"}' 24 + stdout 'true' 25 + 26 + curl -L 'http://localhost:8082/authz' -H 'Content-Type: application/json' -d '{"user":"bob"}' 27 + stdout 'false' 28 + 29 + # Sibling task pattern - request data flows to another task 30 + curl -L 'http://localhost:8082/echo' -H 'Content-Type: text/plain' -d 'hello from sibling task' 31 + stdout 'hello from sibling task' 32 + 33 + # Custom status code examples 34 + 35 + # Valid request returns 201 Created 36 + curl -w '\n' -L 'http://localhost:8082/api/vm/create?u=alice' -H 'Content-Type: application/json' -d '{}' 37 + cmp stdout out/create-201 38 + 39 + # Custom error endpoint returns 400 Bad Request 40 + ! curl -f -w '\n' -L 'http://localhost:8082/api/error' -H 'Content-Type: application/json' -d '{}' 41 + cmp stdout out/create-400 42 + 43 + # Error cases 44 + 45 + # Malformed JSON input 46 + ! curl -f -L 'http://localhost:8082/authz' -H 'Content-Type: application/json' -d 'not valid json' 47 + cmp stdout out/error-malformed-json 48 + 49 + # Research user (bob) trying to create a public VM - violates access: public: false constraint 50 + ! curl -f -L 'http://localhost:8082/api/vm?u=bob' -H 'Content-Type: application/json' -d '{"os":"Linux","arch":"x86","mem":10000000000,"disk":20000000000,"access":{"public":true}}' 51 + cmp stdout out/error-public-vm 52 + 53 + # Invalid OS value 54 + ! curl -f -L 'http://localhost:8082/api/vm?u=alice' -H 'Content-Type: application/json' -d '{"os":"FreeBSD","arch":"x86","mem":10000000000,"disk":20000000000,"access":{"public":true}}' 55 + cmp stdout out/error-invalid-os 56 + 57 + stop 58 + 59 + -- authz_tool.cue -- 60 + package vm 61 + 62 + import "encoding/json" 63 + 64 + #Authz: { 65 + user!: string 66 + 67 + authz: *false | bool 68 + 69 + authz: allow && !deny 70 + 71 + deny: *false | bool // default(false) 72 + allow: *false | bool // default(true) 73 + 74 + if user == "alice" { 75 + allow: true 76 + } 77 + } 78 + 79 + // We have two kinds of authz endpoints: one that takes the user 80 + // as part of the URL path, and one that takes a JSON body. 81 + // 82 + // The main purpose here is to show that requests can be routed based on their 83 + // method type. 84 + // 85 + // This pattern constraint sets up the common aspects for both. 86 + command: serve: [=~"^/authz"]: ServeJSON & { 87 + request: value: #Authz 88 + response: value: request.value.authz 89 + } 90 + 91 + // authz returns a boolean indicating whether the user is authorized. 92 + // The user is specified as part of the URL path. 93 + command: serve: "/authz/{user}": { 94 + method: "GET" 95 + request: value: user: request.pathValues.user 96 + } 97 + 98 + // authz returns a boolean indicating whether the user is authorized. 99 + // The user is specified in the JSON body. 100 + command: serve: "/authz": { 101 + method: "POST" 102 + request: value: json.Unmarshal(request.body) 103 + } 104 + 105 + -- api_tool.cue -- 106 + package vm 107 + 108 + import ( 109 + "encoding/json" 110 + "encoding/openapi" 111 + ) 112 + 113 + //#Machine describes a virtual machine. 114 + #Machine: { 115 + os!: "Windows" | "Linux" | "MacOS" 116 + arch!: "x86" | "arm64" 117 + mem?: int 118 + disk?: int 119 + access?: { 120 + public?: bool 121 + } 122 + } 123 + 124 + // MachineByRole defines different machine configurations that are allowed 125 + // based on user role. 126 + MachineByRole: { 127 + admin: #Machine 128 + 129 + // Interns are only afforded smaller machines. 130 + intern: #Machine&{ 131 + mem?: <=4Gi 132 + disk?: <=100G 133 + } 134 + 135 + research: #Machine&{ 136 + // The research team cannot launch public-facing machines. 137 + access?: public?: false 138 + os?: "Linux" | "Windows" 139 + } 140 + default: null 141 + } 142 + 143 + Membership: alice: "admin" 144 + Membership: bob: "research" 145 + Membership: taylor: "intern" 146 + 147 + // This API gets the allowed VM configurations for a given user defined as 148 + // OpenAPI. The user is specified as a query parameter. 149 + // If no user is given, a generic OpenAPI schema for Machine is returned. 150 + command: serve: "/schema/api/vm": { 151 + routing: method: "GET" 152 + 153 + // Define default for user query parameter. 154 + request: form: r: *[""] | [...string] 155 + 156 + let role = request.form.r[0] 157 + let X = [// select first 158 + if MachineByRole[role] != _|_ { 159 + config: { 160 + version: "3.0.0" 161 + info: title: "Machine for \(role)" 162 + } 163 + 164 + // TODO: note: if this is a disjunction it won't get converted to 165 + // OpenAPI properly. 166 + 167 + machine: MachineByRole[role] 168 + }, 169 + { 170 + config: { 171 + version: "3.0.0" 172 + info: title: "Machine" 173 + } 174 + machine: #Machine 175 + }, // default 176 + ][0] 177 + 178 + response: body: json.Indent( 179 + openapi.MarshalSchema(X.config, {#Machine: X.machine}), "", " ") 180 + } 181 + 182 + // This API simulates creating a new VM for a given user, specified as a query 183 + // parameter. The machine is requested in the JSON body. It is than checked 184 + // that the user can actually use the machine. 185 + command: serve: "/api/vm": { 186 + routing: method: "POST" 187 + 188 + // Further constrain user permissions. 189 + request: form: u!: [string] 190 + 191 + let user = request.form.u[0] 192 + let role = Membership[user] 193 + 194 + // Validate the request to create a new VM against the VM schema. 195 + X: #Machine 196 + X: json.Unmarshal(request.body) 197 + X: MachineByRole[role] 198 + 199 + response: { 200 + header: "Content-Type": "application/json" 201 + 202 + result: "OK" 203 + body: json.Marshal(result) 204 + } 205 + } 206 + 207 + // This API demonstrates returning a custom 201 Created status code. 208 + command: serve: "/api/vm/create": { 209 + routing: method: "POST" 210 + 211 + request: form: u!: [string, ...string] 212 + 213 + response: { 214 + header: "Content-Type": "application/json" 215 + statusCode: 201 216 + body: json.Marshal({ 217 + status: "created" 218 + user: request.form.u[0] 219 + }) 220 + } 221 + } 222 + 223 + // This API demonstrates returning a custom 400 Bad Request status code. 224 + command: serve: "/api/error": { 225 + routing: method: "POST" 226 + 227 + response: { 228 + header: "Content-Type": "application/json" 229 + statusCode: 400 230 + body: json.Marshal({ 231 + error: "missing required parameter: u" 232 + }) 233 + } 234 + } 235 + 236 + -- server_tool.cue -- 237 + package vm 238 + 239 + import ( 240 + "encoding/json" 241 + "tool/exec" // Uncomment when experimenting with sibling task pattern 242 + "tool/http" 243 + ) 244 + 245 + // Test serves as an endpoint for simple testing. 246 + command: serve: "/test": http.Serve & { 247 + request: _ 248 + response: body: "ack: using method \(request.method)" 249 + } 250 + 251 + // This endpoint demonstrates capture of all path parameters. 252 + command: serve: "/one/{one}/{two...}": http.Serve & { 253 + request: _ 254 + response: body: "ack: one/\(request.pathValues.one) \(request.pathValues.two)" 255 + } 256 + 257 + // This endpoint demonstrates a sibling task (defined as a separate command) 258 + // that reads from the request and produces output used in the response. 259 + command: serve: "/echo": http.Serve & { 260 + routing: method: "POST" 261 + request: _ 262 + // Reference the sibling command's output 263 + response: body: command.echo.stdout 264 + } 265 + 266 + // The echo task is defined as a sibling command that reads from the serve request. 267 + command: echo: exec.Run & { 268 + cmd: ["cat"] 269 + stdin: command.serve."/echo".request.body 270 + stdout: string // Explicitly capture stdout 271 + } 272 + 273 + // ServeJSON is a helper for endpoints that serve JSON. 274 + ServeJSON: { 275 + response: { 276 + value!: _ 277 + body: json.Marshal(value) 278 + } 279 + } 280 + 281 + // All our endpoints serve from the same address. 282 + // We take the convention that the name of the "subcommand" is the path prefix. 283 + command: serve: [Prefix=string]: http.Serve & { 284 + listenAddr: ":8082" 285 + routing: path: Prefix 286 + } 287 + -- out/create-201 -- 288 + {"status":"created","user":"alice"} 289 + -- out/create-400 -- 290 + {"error":"missing required parameter: u"} 291 + -- out/error-malformed-json -- 292 + error handling request: command.serve."/authz".request.value: error in call to encoding/json.Unmarshal: json: invalid JSON: 293 + ./authz_tool.cue:43:18 294 + 295 + -- out/error-public-vm -- 296 + error handling request: command.serve."/api/vm".X.access.public: conflicting values false and true: 297 + ./api_tool.cue:32:21 298 + ./api_tool.cue:92:5 299 + json:1:84 300 + 301 + -- out/error-invalid-os -- 302 + error handling request: command.serve."/api/vm".X.os: 3 errors in empty disjunction: 303 + command.serve."/api/vm".X.os: conflicting values "Linux" and "FreeBSD": 304 + ./api_tool.cue:10:23 305 + ./api_tool.cue:90:5 306 + json:1:7 307 + command.serve."/api/vm".X.os: conflicting values "MacOS" and "FreeBSD": 308 + ./api_tool.cue:10:33 309 + ./api_tool.cue:90:5 310 + json:1:7 311 + command.serve."/api/vm".X.os: conflicting values "Windows" and "FreeBSD": 312 + ./api_tool.cue:10:11 313 + ./api_tool.cue:90:5 314 + json:1:7 315 + 316 + -- out/vm-research -- 317 + { 318 + "openapi": "3.0.0", 319 + "info": { 320 + "title": "Machine for research" 321 + }, 322 + "paths": {}, 323 + "components": { 324 + "schemas": { 325 + "Machine": { 326 + "type": "object", 327 + "required": [ 328 + "arch", 329 + "os" 330 + ], 331 + "properties": { 332 + "access": { 333 + "type": "object", 334 + "properties": { 335 + "public": { 336 + "type": "boolean" 337 + } 338 + } 339 + }, 340 + "arch": { 341 + "type": "string", 342 + "enum": [ 343 + "x86", 344 + "arm64" 345 + ] 346 + }, 347 + "disk": { 348 + "type": "integer" 349 + }, 350 + "mem": { 351 + "type": "integer" 352 + }, 353 + "os": { 354 + "type": "string", 355 + "enum": [ 356 + "Linux", 357 + "Windows" 358 + ] 359 + } 360 + } 361 + } 362 + } 363 + } 364 + }
+89 -16
internal/task/task.go
··· 143 143 144 144 // flowFunc takes a Runner and a schema v, which should only be defined for 145 145 // legacy task ids. 146 - func (c Context) flowFunc(runner Runner, v cue.Value) flow.RunnerFunc { 147 - return flow.RunnerFunc(func(t *flow.Task) error { 148 - // Set task-specific values. 149 - c.Context = t.Context() 150 - c.Obj = t.Value() 151 - if v.Exists() { 152 - c.Obj = c.Obj.Unify(v) 146 + func (c Context) flowFunc(runner Runner, v cue.Value) flow.Runner { 147 + wrapper := &flowRunner{c: c, runner: runner, v: v} 148 + // If the runner declares it is a service, return a 149 + // wrapper that implements both Runner and Service. 150 + if ce, ok := runner.(flow.Service); ok && ce.IsService() { 151 + return &flowRunnerWithService{flowRunner: wrapper} 152 + } 153 + return wrapper 154 + } 155 + 156 + // flowRunner wraps a task.Runner to implement flow.Runner. 157 + type flowRunner struct { 158 + c Context 159 + runner Runner 160 + v cue.Value 161 + } 162 + 163 + func (r *flowRunner) Run(t *flow.Task, err error) error { 164 + // Set task-specific values. 165 + r.c.Context = t.Context() 166 + r.c.Obj = t.Value() 167 + if r.v.Exists() { 168 + r.c.Obj = r.c.Obj.Unify(r.v) 169 + } 170 + value, runErr := r.runner.Run(&r.c) 171 + if runErr != nil { 172 + return runErr 173 + } 174 + if value != nil { 175 + _ = t.Fill(value) 176 + } 177 + return nil 178 + } 179 + 180 + // flowRunnerWithService wraps a flowRunner and also implements 181 + // flow.Service. 182 + type flowRunnerWithService struct { 183 + *flowRunner 184 + } 185 + 186 + func (r *flowRunnerWithService) IsService() bool { 187 + return true 188 + } 189 + 190 + // ForkRunLoop is used to serve an external event. It makes a copy of the 191 + // configuration that results from the first phase and than patches the 192 + // task at path to run the given runner, instead of the initialization phase. 193 + func (c Context) ForkRunLoop(ctx context.Context, path cue.Path, v cue.Value, r Runner) *flow.Controller { 194 + cfg := &flow.Config{ 195 + Root: path, 196 + InferTasks: true, 197 + IgnoreConcrete: true, 198 + RunInferredTasks: true, // Run inferred tasks since inputs are filled 199 + } 200 + 201 + // Fill the root with the request data. The v value contains the filled 202 + // serve task, including request fields. We need to ensure dependent tasks 203 + // can access this data through references. 204 + root := c.Root.FillPath(path, v) 205 + 206 + taskFunc := func(v cue.Value) (flow.Runner, error) { 207 + // The node itself has a function to continue. 208 + if v.Path().String() == path.String() { 209 + return c.flowFunc(r, cue.Value{}), nil 153 210 } 154 - value, err := runner.Run(&c) 155 - if err != nil { 156 - return err 157 - } 158 - if value != nil { 159 - _ = t.Fill(value) 160 - } 161 - return nil 162 - }) 211 + var didWork atomic.Bool 212 + // if !didWork.Load() { 213 + // return nil, fmt.Errorf("%v: no tasks found", cmdPath) 214 + // } 215 + return c.TaskFunc(&didWork)(v) 216 + } 217 + 218 + return flow.New(cfg, root, taskFunc) 163 219 } 164 220 165 221 // taskError wraps some error values to retain position information about the ··· 207 263 // Runner runs given the current value and returns a new value which is to 208 264 // be unified with the original result. 209 265 Run(ctx *Context) (results interface{}, err error) 266 + } 267 + 268 + // Background indicates whether the task is running in the background after 269 + // finishing. 270 + var Background atomic.Bool 271 + 272 + // BackgroundTask must be used by a task to indicate that it is running in the 273 + // background. 274 + // TODO: this is a hack. We should have a better way to indicate this. Also, 275 + // introduce mechanism to cancel and background tasks, detect when they are 276 + // done, and collect errors. 277 + // Maybe do something like put a sync.WaitGroup (or errgroup.Group) inside 278 + // task.Context and have a way to add to it (perhaps just expose a Go method) 279 + // and wait for it to complete (in practice you'd probably select on that 280 + // finishing and os.Interrupt)? 281 + func (c *Context) BackgroundTask() { 282 + Background.Store(true) 210 283 } 211 284 212 285 // Register registers a task for cue commands.
+152 -11
pkg/tool/http/http.cue
··· 40 40 41 41 request: { 42 42 body?: bytes | string 43 - header: [string]: string | [...string] 44 - trailer: [string]: string | [...string] 43 + header: [string]: [string, ...string] 44 + trailer: [string]: [string, ...string] 45 45 } 46 46 response: { 47 47 status: string ··· 53 53 } 54 54 } 55 55 56 - // TODO: support serving once we have the cue serve command. 57 - // Serve: { 58 - // port: int 56 + // Serve launches a task that listens on the given port and serves HTTP 57 + // requests. (EXPERIMENTAL) 59 58 // 60 - // cert: string 61 - // key: string 59 + // Serve support HTTP multiplexing. Multiple tasks can be configured to be 60 + // served from the same address. Serve will multiplex these different instances 61 + // based on the serving path and, optionally, method. 62 62 // 63 - // handle: [Pattern=string]: Message & { 64 - // pattern: Pattern 65 - // } 66 - // } 63 + // For more details see the documentation of the routing parameters such as 64 + // path and method. 65 + Serve: { 66 + $id: _id 67 + _id: "tool/http.Serve" 68 + 69 + // listenAddr is the address to listen on (e.g., ":8080", "localhost:8000"). 70 + // This field is required to avoid accidentally binding to privileged ports. 71 + listenAddr!: string 72 + 73 + // routing configures the HTTP routes that are served. 74 + // 75 + // Routing is done based on path and methods (TODO: allow host as well) 76 + // 77 + // Literal (that is, non-wildcard) parts of a pattern match the 78 + // corresponding parts of a request case-sensitively. 79 + // 80 + // If no method is given it matches every method. If routing.method is set to 81 + // "GET", it matches both GET and HEAD requests. Otherwise, the method must 82 + // match exactly. 83 + // 84 + // TODO: When no host is given, every host is matched. A pattern with a host 85 + // matches URLs on that host only. 86 + // 87 + // A path can include wildcard segments of the form {NAME} or {NAME...}. For 88 + // example, "/b/{bucket}/o/{objectname...}". The wildcard name must be a 89 + // valid Go identifier. Wildcards must be full path segments: they must be 90 + // preceded by a slash and followed by either a slash or the end of the 91 + // string. For example, "/b_{bucket}" is not a valid pattern. 92 + // 93 + // Normally a wildcard matches only a single path segment, ending at the 94 + // next literal slash (not %2F) in the request URL. But if "..." is 95 + // present, then the wildcard matches the remainder of the URL path, 96 + // including slashes. (Therefore it is invalid for a "..." wildcard to 97 + // appear anywhere but at the end of a pattern.) The match for a wildcard 98 + // can be obtained from request.pathValues with the wildcard's name. A 99 + // trailing slash in a path acts as an anonymous "..." wildcard. 100 + // 101 + // The special wildcard {$} matches only the end of the URL. For example, 102 + // the pattern "/{$}" matches only the path "/", whereas the pattern "/" 103 + // matches every path. 104 + // 105 + // For matching, both pattern paths and incoming request paths are unescaped 106 + // segment by segment. So, for example, the path "/a%2Fb/100%25" is treated 107 + // as having two segments, "a/b" and "100%". The pattern "/a%2fb/" matches 108 + // it, but the pattern "/a/b/" does not. 109 + // 110 + // 111 + // Precedence 112 + // 113 + // If two or more patterns match a request, then the most specific pattern 114 + // takes precedence. A pattern P1 is more specific than P2 if P1 matches a 115 + // strict subset of P2’s requests; that is, if P2 matches all the requests 116 + // of P1 and more. If neither is more specific, then the patterns conflict. 117 + // There is one exception to this rule: if two patterns would otherwise 118 + // conflict and one has a host while the other does not, then the pattern 119 + // with the host takes precedence. If a pattern conflicts with another 120 + // pattern that is already registered the task will panic. 121 + // 122 + // As an example of the general rule, "/images/thumbnails/" is more specific 123 + // than "/images/", so both can be registered. The former matches paths 124 + // beginning with "/images/thumbnails/" and the latter will match any other 125 + // path in the "/images/" subtree. 126 + // 127 + // As another example, consider a route with path "/" and method "GET" versus 128 + // a route with path "/index.html" and no method: both match a GET request 129 + // for "/index.html", but the former matches all other GET and HEAD requests, 130 + // while the latter matches any request for "/index.html" that uses a 131 + // different method. These routes would conflict. 132 + routing: { 133 + // path sets the path to route to. It may include wildcard segments 134 + // as described above. 135 + path: *"/" | =~"^/" 136 + 137 + // method optionally sets the HTTP method to match (e.g. "GET" | 138 + // "POST"). If not set, all methods are accepted. 139 + method?: string 140 + } 141 + 142 + // TODO: 143 + // - schemes: string // e.g. "http" | "https" 144 + // - TLS 145 + 146 + // request holds data about the incoming HTTP request. 147 + // 148 + // Fields marked [runtime] are populated automatically when a request is 149 + // received. Users can add constraints to these fields to validate input, 150 + // for example: `form: u!: [string]` to require a query parameter "u". 151 + // 152 + // The value field is for user-defined parsing of the request body. 153 + request: { 154 + // method is the HTTP method (GET, POST, PUT, etc.). [runtime] 155 + method: string 156 + 157 + // url is the full request URL. [runtime] 158 + url: string 159 + 160 + // body is the raw request body. [runtime] 161 + body: *bytes | string 162 + 163 + // value can be set by the user to hold a parsed representation of 164 + // the body. For example: `value: json.Unmarshal(body)` 165 + value?: _ 166 + 167 + // pathValues contains values extracted from URL path wildcards. 168 + // For example, with routing.path: "/users/{id}", a request to 169 + // "/users/123" would have pathValues: {id: "123"}. [runtime] 170 + pathValues: [string]: string 171 + 172 + // form contains the parsed form data, including both the URL 173 + // query parameters and POST/PUT/PATCH form bodies. [runtime] 174 + form: [string]: [string, ...string] 175 + 176 + // header contains the request headers. Each header key maps to a 177 + // non-empty list of values, as HTTP allows multiple values per header. 178 + // [runtime] 179 + header: [string]: [string, ...string] 180 + 181 + // trailer contains the request trailers. Each trailer key maps to a 182 + // non-empty list of values. [runtime] 183 + trailer: [string]: [string, ...string] 184 + } 185 + 186 + // response defines the HTTP response to send back to the client. All fields 187 + // are optional and user-defined. 188 + // 189 + // Note: sub-tasks (e.g., exec.Run) cannot be nested within response fields. 190 + // However, sibling tasks defined outside the Serve block can reference 191 + // request data and their output can be used in the response. 192 + response: { 193 + // statusCode sets the HTTP status code. If not set, 200 is used. 194 + statusCode?: int 195 + 196 + // body is the response body to send. 197 + body?: *bytes | string 198 + 199 + // header sets response headers. Each key can be set to either a single 200 + // string value or a list of values for headers with multiple values. 201 + header?: [string]: string | [string, ...string] 202 + 203 + // trailer sets response trailers. Each key can be set to either a single 204 + // string value or a list of values. 205 + trailer?: [string]: string | [string, ...string] 206 + } 207 + }
+19 -2
pkg/tool/http/http.go
··· 29 29 30 30 func init() { 31 31 task.Register("tool/http.Do", newHTTPCmd) 32 + task.Register("tool/http.Serve", newServeCmd) 32 33 33 34 // For backwards compatibility. 34 35 task.Register("http", newHTTPCmd) ··· 176 177 } 177 178 h := http.Header{} 178 179 for iter.Next() { 179 - str, err := iter.Value().String() 180 + key := iter.Selector().Unquoted() 181 + val := iter.Value() 182 + 183 + // Handle single string value 184 + if s, err := val.String(); err == nil { 185 + h.Add(key, s) 186 + continue 187 + } 188 + 189 + // Each header value is a list of strings [string, ...string] 190 + list, err := val.List() 180 191 if err != nil { 181 192 return nil, err 182 193 } 183 - h.Add(iter.Selector().Unquoted(), str) 194 + for list.Next() { 195 + str, err := list.Value().String() 196 + if err != nil { 197 + return nil, err 198 + } 199 + h.Add(key, str) 200 + } 184 201 } 185 202 return h, nil 186 203 }
+1 -1
pkg/tool/http/http_test.go
··· 128 128 } 129 129 `, 130 130 field: "header", 131 - out: "header.\"1\": cannot use value 'a' (type bytes) as string", 131 + out: "header.\"1\": cannot use value 'a' (type bytes) as list", 132 132 }, { 133 133 req: ` 134 134 header: 1
+179 -13
pkg/tool/http/pkg.go
··· 30 30 // 31 31 // request: { 32 32 // body?: bytes | string 33 - // header: [string]: string | [...string] 34 - // trailer: [string]: string | [...string] 33 + // header: [string]: [string, ...string] 34 + // trailer: [string]: [string, ...string] 35 35 // } 36 36 // response: { 37 37 // status: string ··· 43 43 // } 44 44 // } 45 45 // 46 - // // TODO: support serving once we have the cue serve command. 47 - // // Serve: { 48 - // // port: int 46 + // // Serve launches a task that listens on the given port and serves HTTP 47 + // // requests. (EXPERIMENTAL) 49 48 // // 50 - // // cert: string 51 - // // key: string 49 + // // Serve support HTTP multiplexing. Multiple tasks can be configured to be 50 + // // served from the same address. Serve will multiplex these different instances 51 + // // based on the serving path and, optionally, method. 52 52 // // 53 - // // handle: [Pattern=string]: Message & { 54 - // // pattern: Pattern 55 - // // } 56 - // // } 53 + // // For more details see the documentation of the routing parameters such as 54 + // // path and method. 55 + // Serve: { 56 + // $id: _id 57 + // _id: "tool/http.Serve" 58 + // 59 + // // listenAddr is the address to listen on (e.g., ":8080", "localhost:8000"). 60 + // // This field is required to avoid accidentally binding to privileged ports. 61 + // listenAddr!: string 62 + // 63 + // // routing configures the HTTP routes that are served. 64 + // // 65 + // // Routing is done based on path and methods (TODO: allow host as well) 66 + // // 67 + // // Literal (that is, non-wildcard) parts of a pattern match the 68 + // // corresponding parts of a request case-sensitively. 69 + // // 70 + // // If no method is given it matches every method. If routing.method is set to 71 + // // "GET", it matches both GET and HEAD requests. Otherwise, the method must 72 + // // match exactly. 73 + // // 74 + // // TODO: When no host is given, every host is matched. A pattern with a host 75 + // // matches URLs on that host only. 76 + // // 77 + // // A path can include wildcard segments of the form {NAME} or {NAME...}. For 78 + // // example, "/b/{bucket}/o/{objectname...}". The wildcard name must be a 79 + // // valid Go identifier. Wildcards must be full path segments: they must be 80 + // // preceded by a slash and followed by either a slash or the end of the 81 + // // string. For example, "/b_{bucket}" is not a valid pattern. 82 + // // 83 + // // Normally a wildcard matches only a single path segment, ending at the 84 + // // next literal slash (not %2F) in the request URL. But if "..." is 85 + // // present, then the wildcard matches the remainder of the URL path, 86 + // // including slashes. (Therefore it is invalid for a "..." wildcard to 87 + // // appear anywhere but at the end of a pattern.) The match for a wildcard 88 + // // can be obtained from request.pathValues with the wildcard's name. A 89 + // // trailing slash in a path acts as an anonymous "..." wildcard. 90 + // // 91 + // // The special wildcard {$} matches only the end of the URL. For example, 92 + // // the pattern "/{$}" matches only the path "/", whereas the pattern "/" 93 + // // matches every path. 94 + // // 95 + // // For matching, both pattern paths and incoming request paths are unescaped 96 + // // segment by segment. So, for example, the path "/a%2Fb/100%25" is treated 97 + // // as having two segments, "a/b" and "100%". The pattern "/a%2fb/" matches 98 + // // it, but the pattern "/a/b/" does not. 99 + // // 100 + // // 101 + // // Precedence 102 + // // 103 + // // If two or more patterns match a request, then the most specific pattern 104 + // // takes precedence. A pattern P1 is more specific than P2 if P1 matches a 105 + // // strict subset of P2’s requests; that is, if P2 matches all the requests 106 + // // of P1 and more. If neither is more specific, then the patterns conflict. 107 + // // There is one exception to this rule: if two patterns would otherwise 108 + // // conflict and one has a host while the other does not, then the pattern 109 + // // with the host takes precedence. If a pattern conflicts with another 110 + // // pattern that is already registered the task will panic. 111 + // // 112 + // // As an example of the general rule, "/images/thumbnails/" is more specific 113 + // // than "/images/", so both can be registered. The former matches paths 114 + // // beginning with "/images/thumbnails/" and the latter will match any other 115 + // // path in the "/images/" subtree. 116 + // // 117 + // // As another example, consider a route with path "/" and method "GET" versus 118 + // // a route with path "/index.html" and no method: both match a GET request 119 + // // for "/index.html", but the former matches all other GET and HEAD requests, 120 + // // while the latter matches any request for "/index.html" that uses a 121 + // // different method. These routes would conflict. 122 + // routing: { 123 + // // path sets the path to route to. It may include wildcard segments 124 + // // as described above. 125 + // path: *"/" | =~"^/" 126 + // 127 + // // method optionally sets the HTTP method to match (e.g. "GET" | 128 + // // "POST"). If not set, all methods are accepted. 129 + // method?: string 130 + // } 131 + // 132 + // // TODO: 133 + // // - schemes: string // e.g. "http" | "https" 134 + // // - TLS 135 + // 136 + // // request holds data about the incoming HTTP request. 137 + // // 138 + // // Fields marked [runtime] are populated automatically when a request is 139 + // // received. Users can add constraints to these fields to validate input, 140 + // // for example: `form: u!: [string]` to require a query parameter "u". 141 + // // 142 + // // The value field is for user-defined parsing of the request body. 143 + // request: { 144 + // // method is the HTTP method (GET, POST, PUT, etc.). [runtime] 145 + // method: string 146 + // 147 + // // url is the full request URL. [runtime] 148 + // url: string 149 + // 150 + // // body is the raw request body. [runtime] 151 + // body: *bytes | string 152 + // 153 + // // value can be set by the user to hold a parsed representation of 154 + // // the body. For example: `value: json.Unmarshal(body)` 155 + // value?: _ 156 + // 157 + // // pathValues contains values extracted from URL path wildcards. 158 + // // For example, with routing.path: "/users/{id}", a request to 159 + // // "/users/123" would have pathValues: {id: "123"}. [runtime] 160 + // pathValues: [string]: string 161 + // 162 + // // form contains the parsed form data, including both the URL 163 + // // query parameters and POST/PUT/PATCH form bodies. [runtime] 164 + // form: [string]: [string, ...string] 165 + // 166 + // // header contains the request headers. Each header key maps to a 167 + // // non-empty list of values, as HTTP allows multiple values per header. 168 + // // [runtime] 169 + // header: [string]: [string, ...string] 170 + // 171 + // // trailer contains the request trailers. Each trailer key maps to a 172 + // // non-empty list of values. [runtime] 173 + // trailer: [string]: [string, ...string] 174 + // } 175 + // 176 + // // response defines the HTTP response to send back to the client. All fields 177 + // // are optional and user-defined. 178 + // // 179 + // // Note: sub-tasks (e.g., exec.Run) cannot be nested within response fields. 180 + // // However, sibling tasks defined outside the Serve block can reference 181 + // // request data and their output can be used in the response. 182 + // response: { 183 + // // statusCode sets the HTTP status code. If not set, 200 is used. 184 + // statusCode?: int 185 + // 186 + // // body is the response body to send. 187 + // body?: *bytes | string 188 + // 189 + // // header sets response headers. Each key can be set to either a single 190 + // // string value or a list of values for headers with multiple values. 191 + // header?: [string]: string | [string, ...string] 192 + // 193 + // // trailer sets response trailers. Each key can be set to either a single 194 + // // string value or a list of values. 195 + // trailer?: [string]: string | [string, ...string] 196 + // } 197 + // } 57 198 package http 58 199 59 200 import ( ··· 86 227 } 87 228 request: { 88 229 body?: bytes | string 89 - header: [string]: string | [...string] 90 - trailer: [string]: string | [...string] 230 + header: [string]: [string, ...string] 231 + trailer: [string]: [string, ...string] 91 232 } 92 233 response: { 93 234 status: string ··· 95 236 body: *bytes | string 96 237 header: [string]: string | [...string] 97 238 trailer: [string]: string | [...string] 239 + } 240 + } 241 + Serve: { 242 + $id: _id 243 + _id: "tool/http.Serve" 244 + listenAddr!: string 245 + routing: { 246 + path: *"/" | =~"^/" 247 + method?: string 248 + } 249 + request: { 250 + method: string 251 + url: string 252 + body: *bytes | string 253 + value?: _ 254 + pathValues: [string]: string 255 + form: [string]: [string, ...string] 256 + header: [string]: [string, ...string] 257 + trailer: [string]: [string, ...string] 258 + } 259 + response: { 260 + statusCode?: int 261 + body?: *bytes | string 262 + header?: [string]: string | [string, ...string] 263 + trailer?: [string]: string | [string, ...string] 98 264 } 99 265 } 100 266 }`,
+270
pkg/tool/http/serve.go
··· 1 + // Copyright 2023 CUE Authors 2 + // 3 + // Licensed under the Apache License, Version 2.0 (the "License"); 4 + // you may not use this file except in compliance with the License. 5 + // You may obtain a copy of the License at 6 + // 7 + // http://www.apache.org/licenses/LICENSE-2.0 8 + // 9 + // Unless required by applicable law or agreed to in writing, software 10 + // distributed under the License is distributed on an "AS IS" BASIS, 11 + // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 + // See the License for the specific language governing permissions and 13 + // limitations under the License. 14 + 15 + package http 16 + 17 + import ( 18 + "fmt" 19 + "io" 20 + "log" 21 + "net" 22 + "net/http" 23 + "os" 24 + "regexp" 25 + "sync" 26 + 27 + "cuelang.org/go/cue" 28 + "cuelang.org/go/cue/errors" 29 + "cuelang.org/go/internal/task" 30 + ) 31 + 32 + var ( 33 + muxers = map[string]*http.ServeMux{} 34 + // listeners stores net.Listener for each address to enable error checking 35 + // before starting the serve goroutine. 36 + listeners = map[string]net.Listener{} 37 + ) 38 + 39 + func newServeCmd(v cue.Value) (task.Runner, error) { 40 + return &listenCmd{}, nil 41 + } 42 + 43 + type listenCmd struct { 44 + w http.ResponseWriter 45 + body cue.Path 46 + } 47 + 48 + // IsService indicates that http.Serve acts as a service. 49 + // Other tasks can reference request fields (which are filled 50 + // at runtime) without creating a dependency cycle error. 51 + func (c *listenCmd) IsService() bool { 52 + return true 53 + } 54 + 55 + var m sync.Mutex 56 + 57 + // valueMu protects cue.Value operations which are not thread-safe. 58 + // This serializes all request handling, which is not ideal for performance. 59 + // TODO: remove this lock once cue.Value supports concurrent operations. 60 + var valueMu sync.Mutex 61 + 62 + var ( 63 + listenPath = cue.ParsePath("listenAddr") 64 + pathPath = cue.ParsePath("routing.path") 65 + methodPath = cue.ParsePath("routing.method") 66 + 67 + requestPath = cue.ParsePath("request") 68 + respBodyPath = cue.ParsePath("response.body") 69 + respStatusCodePath = cue.ParsePath("response.statusCode") 70 + responsePath = cue.ParsePath("response") 71 + ) 72 + 73 + // httpRequest represents the request data to fill into the CUE value. 74 + type httpRequest struct { 75 + Method string `json:"method"` 76 + URL string `json:"url"` 77 + Body []byte `json:"body"` 78 + Form map[string][]string `json:"form"` 79 + Header map[string][]string `json:"header"` 80 + PathValues map[string]string `json:"pathValues"` 81 + } 82 + 83 + func (c *listenCmd) Run(ctx *task.Context) (res interface{}, err error) { 84 + valueMu.Lock() 85 + v := ctx.Obj 86 + addr, err := v.LookupPath(listenPath).String() 87 + valueMu.Unlock() 88 + 89 + if err != nil { 90 + return nil, err 91 + } 92 + 93 + m.Lock() 94 + mux := muxers[addr] 95 + if mux == nil { 96 + // Create listener first to catch errors (e.g., port unavailable) 97 + // before starting the serve goroutine. 98 + ln, err := net.Listen("tcp", addr) 99 + if err != nil { 100 + m.Unlock() 101 + return nil, fmt.Errorf("cannot listen on %s: %w", addr, err) 102 + } 103 + 104 + mux = http.NewServeMux() 105 + muxers[addr] = mux 106 + listeners[addr] = ln 107 + 108 + log.Printf("listening on %v\n", addr) 109 + 110 + // TODO: use Server at some point. 111 + go http.Serve(ln, mux) 112 + } 113 + m.Unlock() 114 + 115 + valueMu.Lock() 116 + url := "/" 117 + if p := v.LookupPath(pathPath); p.Exists() { 118 + url, err = p.String() 119 + if err != nil { 120 + valueMu.Unlock() 121 + return nil, err 122 + } 123 + } 124 + 125 + vars := extractPathVariables(url) 126 + 127 + if m := v.LookupPath(methodPath); m.Exists() { 128 + method, err := m.String() 129 + if err != nil { 130 + valueMu.Unlock() 131 + return nil, err 132 + } 133 + url = fmt.Sprintf("%s %s", method, url) 134 + } 135 + 136 + path := v.Path() 137 + valueMu.Unlock() 138 + 139 + log.Printf("adding handler for %v\n", url) 140 + mux.HandleFunc(url, func(w http.ResponseWriter, req *http.Request) { 141 + err := req.ParseForm() 142 + if err != nil { 143 + http.Error(w, fmt.Sprintf("cannot parse form: %v", err), http.StatusBadRequest) 144 + return 145 + } 146 + 147 + data, err := io.ReadAll(req.Body) 148 + if err != nil { 149 + http.Error(w, fmt.Sprintf("cannot read body: %v", err), http.StatusBadRequest) 150 + return 151 + } 152 + 153 + pathValues := make(map[string]string) 154 + for _, variable := range vars { 155 + if s := req.PathValue(variable); s != "" { 156 + pathValues[variable] = s 157 + } 158 + } 159 + 160 + valueMu.Lock() 161 + defer valueMu.Unlock() 162 + 163 + reqValue := v.FillPath(requestPath, httpRequest{ 164 + Method: req.Method, 165 + URL: req.URL.String(), 166 + Body: data, 167 + Form: req.Form, 168 + Header: req.Header, 169 + PathValues: pathValues, 170 + }) 171 + 172 + handle := &serveCmd{w: w} 173 + 174 + c := req.Context() 175 + controller := ctx.ForkRunLoop(c, path, reqValue, handle) 176 + 177 + if err := controller.Run(c); err != nil { 178 + cwd, _ := os.Getwd() 179 + details := errors.Details(err, &errors.Config{Cwd: cwd, ToSlash: true}) 180 + // TODO: return JSON-formatted error response for consistency with 181 + // successful responses (e.g. {"error": "...", "details": "..."}). 182 + http.Error(w, fmt.Sprintf("error handling request: %v", details), http.StatusInternalServerError) 183 + return 184 + } 185 + }) 186 + 187 + ctx.BackgroundTask() 188 + return nil, nil 189 + } 190 + 191 + // variableRegex is a regular expression to find all instances of {variableName} in a path. 192 + // It captures the content inside the braces. 193 + var variableRegex = regexp.MustCompile(`\{([^{}\.]+)(\.\.\.)?\}`) 194 + 195 + // extractPathVariables parses a URL pattern string and returns a slice of the variable names. 196 + // For example, given "/users/{userID}/posts/{postID}", it returns ["userID", "postID"]. 197 + // The special pattern {$} (exact path match in http.ServeMux) is excluded. 198 + func extractPathVariables(pattern string) []string { 199 + matches := variableRegex.FindAllStringSubmatch(pattern, -1) 200 + if matches == nil { 201 + return nil 202 + } 203 + 204 + var variables []string 205 + for _, match := range matches { 206 + // The first submatch (index 1) is the captured group, which is the variable name. 207 + // Skip {$} which is a special http.ServeMux pattern for exact path matching. 208 + if name := match[1]; name != "$" { 209 + variables = append(variables, name) 210 + } 211 + } 212 + return variables 213 + } 214 + 215 + type serveCmd struct { 216 + w http.ResponseWriter 217 + body cue.Path 218 + } 219 + 220 + // IsService indicates that http.Serve should not be reported as part 221 + // of task cycles during request handling via ForkRunLoop. 222 + func (c *serveCmd) IsService() bool { 223 + return true 224 + } 225 + 226 + func (c *serveCmd) Run(ctx *task.Context) (res interface{}, err error) { 227 + v := ctx.Obj 228 + 229 + response := v.LookupPath(responsePath) 230 + headers, err := parseHeaders(response, "header") 231 + if err != nil { 232 + http.Error(c.w, fmt.Sprintf("cannot parse headers: %v", err), http.StatusBadRequest) 233 + return nil, err 234 + } 235 + trailers, err := parseHeaders(response, "trailer") 236 + if err != nil { 237 + http.Error(c.w, fmt.Sprintf("cannot parse trailers: %v", err), http.StatusBadRequest) 238 + return nil, err 239 + } 240 + 241 + v = v.LookupPath(respBodyPath) 242 + 243 + b, err := v.Bytes() 244 + if err != nil { 245 + http.Error(c.w, fmt.Sprintf("cannot encode response: %v", err), http.StatusBadRequest) 246 + } 247 + 248 + for k, v := range headers { 249 + for _, v := range v { 250 + c.w.Header().Set(k, v) 251 + } 252 + } 253 + 254 + for k, v := range trailers { 255 + for _, v := range v { 256 + c.w.Header().Set(k, v) 257 + } 258 + } 259 + 260 + // Set status code if specified, otherwise defaults to 200 261 + if sc := ctx.Obj.LookupPath(respStatusCodePath); sc.Exists() { 262 + if code, err := sc.Int64(); err == nil { 263 + c.w.WriteHeader(int(code)) 264 + } 265 + } 266 + 267 + c.w.Write(b) 268 + 269 + return nil, nil 270 + }
+9
tools/flow/cycle.go
··· 63 63 } 64 64 65 65 func (cc *cycleChecker) addCycleError(start *Task) { 66 + // If any task in the cycle is marked as a service, don't report 67 + // the error. This allows services like http.Serve to have bidirectional 68 + // references with other tasks that read from request fields. 69 + for _, t := range cc.stack { 70 + if t.isService { 71 + return 72 + } 73 + } 74 + 66 75 err := &cycleError{} 67 76 68 77 for _, t := range cc.stack {
+129
tools/flow/defer_test.go
··· 1 + package flow_test 2 + 3 + import ( 4 + "context" 5 + "testing" 6 + 7 + "cuelang.org/go/cue" 8 + "cuelang.org/go/cue/cuecontext" 9 + "cuelang.org/go/internal/task" 10 + "cuelang.org/go/tools/flow" 11 + ) 12 + 13 + // TestStaticDeferredMixing verifies that static tasks (dependencies known at 14 + // start) are NOT deferred even if they are dependencies of a task that is part 15 + // of a cycle. 16 + // 17 + // The previous "sibling task" heuristic (waiting for tasks with 0 dependencies) 18 + // would incorrectly defer independent tasks if they happened to be leaf nodes 19 + // in the dependency graph, potentially causing them to never run if the 20 + // controller was waiting for them to complete before starting the server. 21 + func TestStaticDeferredMixing(t *testing.T) { 22 + ctx := cuecontext.New() 23 + 24 + // This configuration has: 25 + // 1. A server task (root) that excludes itself from cycles. 26 + // 2. A static task (setup) that runs immediately. 27 + // 3. A dynamic task (handler) that depends on the server's request (runtime coverage). 28 + // 4. A dependency from server to setup (server needs setup done). 29 + // 30 + // In a correct implementation: 31 + // - 'setup' should NOT be deferred because it has no runtime dependency. 32 + // - 'server' waits for 'setup'. 33 + // - 'handler' is deferred until valid request. 34 + val := ctx.CompileString(` 35 + // Server task (Service) 36 + root: { 37 + $id: "valStub" 38 + $exclude: true 39 + // Depends on static task 40 + dep: setup.done 41 + } 42 + 43 + // Static task - should run immediately 44 + setup: { 45 + $id: "valStub" 46 + done: true 47 + } 48 + 49 + // Dynamic task - depends on runtime value from root 50 + handler: { 51 + $id: "valStub" 52 + // Depends on runtime field 53 + input: root.request.body 54 + } 55 + `) 56 + 57 + if err := val.Err(); err != nil { 58 + t.Fatal(err) 59 + } 60 + 61 + // Track which tasks ran 62 + ran := make(map[string]bool) 63 + 64 + // Register a stub runner 65 + runner := func(v cue.Value) (flow.Runner, error) { 66 + selectors := v.Path().Selectors() 67 + if len(selectors) == 0 { 68 + return nil, nil // Not a task 69 + } 70 + name := selectors[0].String() 71 + exclude := false 72 + if v.LookupPath(cue.ParsePath("$exclude")).Exists() { 73 + exclude = true 74 + } 75 + 76 + return &stubRunner{ 77 + name: name, 78 + exclude: exclude, 79 + runFunc: func() error { 80 + ran[name] = true 81 + return nil 82 + }, 83 + }, nil 84 + } 85 + 86 + c := flow.New(nil, val, flow.TaskFunc(runner)) 87 + 88 + // Run the flow 89 + err := c.Run(context.Background()) 90 + if err != nil { 91 + t.Fatalf("Flow run failed: %v", err) 92 + } 93 + 94 + // Verify 'setup' ran. It is a static dependency of 'root'. 95 + // If 'setup' was incorrectly deferred, 'root' would wait forever for it, 96 + // or if 'root' started, 'setup' wouldn't have run yet. 97 + if !ran["setup"] { 98 + t.Error("Static task 'setup' did not run. It may have been incorrectly deferred.") 99 + } 100 + 101 + // Verify 'root' ran (it waits for setup) 102 + if !ran["root"] { 103 + t.Error("Root task 'root' did not run.") 104 + } 105 + 106 + // Verify 'handler' did NOT run (it depends on runtime input) 107 + if ran["handler"] { 108 + t.Error("Dynamic task 'handler' ran prematurely. It should be deferred.") 109 + } 110 + } 111 + 112 + type stubRunner struct { 113 + name string 114 + exclude bool 115 + runFunc func() error 116 + } 117 + 118 + func (r *stubRunner) Run(t *flow.Task, err error) error { 119 + return r.runFunc() 120 + } 121 + 122 + func (r *stubRunner) IsService() bool { 123 + return r.exclude 124 + } 125 + 126 + // Needed to implement task.Runner for registration (though strictly flow.Runner is interface) 127 + func (r *stubRunner) RunTask(ctx *task.Context) (results interface{}, err error) { 128 + return nil, r.runFunc() 129 + }
+38 -4
tools/flow/flow.go
··· 112 112 Run(t *Task, err error) error 113 113 } 114 114 115 + // Service is an optional interface that Runner implementations can 116 + // provide to indicate that the task describes a service. Services are 117 + // typically long-running tasks (like http.Serve) that produce values 118 + // at runtime (e.g. requests) that other tasks may depend on. 119 + // 120 + // Services are treated specially in the dependency graph: 121 + // 1. They are excluded from cycle detection filters. 122 + // 2. Dependencies on services are treated as runtime dependencies, 123 + // causing dependent tasks to be deferred until the service runs. 124 + type Service interface { 125 + IsService() bool 126 + } 127 + 115 128 // A RunnerFunc runs a Task. 116 129 type RunnerFunc func(t *Task) error 117 130 ··· 147 160 // updated. This includes directly after initialization. The task may be 148 161 // nil if this call is not the result of a task completing. 149 162 UpdateFunc func(c *Controller, t *Task) error 163 + 164 + // RunInferredTasks, when true, prevents inferred tasks from being deferred. 165 + // This is used by ForkRunLoop where inferred tasks should run immediately 166 + // because their inputs are already filled. 167 + RunInferredTasks bool 150 168 } 151 169 152 170 // A Controller defines a set of Tasks to be executed. ··· 343 361 ctxt *adt.OpContext 344 362 r Runner 345 363 346 - index int 347 - path cue.Path 348 - key string 349 - labels []adt.Feature 364 + index int 365 + path cue.Path 366 + key string 367 + labels []adt.Feature 368 + isService bool // if true, this task is a service (excluded from cycles) 369 + deferred bool // if true, this task is skipped at runtime (discovered from Service) 370 + runtimeDeps []*Task // dependencies on Service tasks 350 371 351 372 // Dynamic 352 373 update adt.Expr ··· 412 433 if dep == nil || dep == t { 413 434 return 414 435 } 436 + // Skip dependencies on tasks that are services. 437 + // These are typically long-running tasks (like http.Serve) that fill 438 + // their values at runtime, so waiting for them would cause deadlock. 439 + if dep.isService { 440 + t.runtimeDeps = append(t.runtimeDeps, dep) 441 + return 442 + } 443 + // Skip dependencies on deferred tasks. These tasks won't run at startup 444 + // and are expected to run later via ForkRunLoop. 445 + if dep.deferred { 446 + return 447 + } 448 + 415 449 if t.deps == nil { 416 450 t.deps = map[*Task]bool{} 417 451 t.pathDeps = map[string][]*Task{}
+5
tools/flow/run.go
··· 57 57 58 58 // Mark tasks as Ready. 59 59 for _, t := range c.tasks { 60 + // Skip deferred tasks - they're expected to run later 61 + // via ForkRunLoop when their inputs become available. 62 + if t.deferred { 63 + continue 64 + } 60 65 switch t.state { 61 66 case Waiting: 62 67 waiting = true
+59 -8
tools/flow/tasks.go
··· 54 54 } 55 55 } 56 56 57 + // Calculate deferred tasks (those waiting for runtime inputs). 58 + // Only needed if we're not already running inferred tasks. 59 + if !c.cfg.RunInferredTasks { 60 + changed := true 61 + for changed { 62 + changed = false 63 + for _, t := range c.tasks { 64 + if t.deferred { 65 + continue 66 + } 67 + 68 + // Defer if task has runtime dependencies 69 + shouldDefer := len(t.runtimeDeps) > 0 70 + 71 + // Or if task depends on a deferred task (propagation) 72 + if !shouldDefer { 73 + for d := range t.deps { 74 + if d.deferred { 75 + shouldDefer = true 76 + break 77 + } 78 + } 79 + } 80 + 81 + // Service tasks (like Serve) are never deferred 82 + if shouldDefer && !t.isService { 83 + t.deferred = true 84 + changed = true 85 + } 86 + } 87 + } 88 + 89 + // Prune dependencies: Service tasks shouldn't wait for deferred tasks 90 + for _, t := range c.tasks { 91 + if t.isService { 92 + var kept []*Task 93 + for _, d := range t.depTasks { 94 + if !d.deferred { 95 + kept = append(kept, d) 96 + } 97 + } 98 + t.depTasks = kept 99 + } 100 + } 101 + } 102 + 57 103 // Check if there are cycles in the task dependencies. 58 104 if err := checkCycle(c.tasks); err != nil { 59 105 c.addErr(err, "cyclic task") ··· 129 175 if r != nil { 130 176 index := len(c.tasks) 131 177 t = &Task{ 132 - v: v, 133 - c: c, 134 - r: r, 135 - path: p, 136 - labels: w.Path(), 137 - key: key, 138 - index: index, 139 - err: errs, 178 + v: v, 179 + c: c, 180 + r: r, 181 + path: p, 182 + labels: w.Path(), 183 + key: key, 184 + index: index, 185 + err: errs, 186 + valueSeq: -1, // Ensure first updateTaskValue call updates the value 187 + } 188 + // If the runner declares it is a service, mark the task accordingly. 189 + if ce, ok := r.(Service); ok && ce.IsService() { 190 + t.isService = true 140 191 } 141 192 c.tasks = append(c.tasks, t) 142 193 c.keys[key] = t