Unified Agent + reusable Go agent core.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: harden console runtime config generations

Lyric 807d6729 76a69b6c

+1147 -394
+2 -1
cmd/mistermorph/consolecmd/agent_settings.go
··· 18 18 "github.com/quailyquaily/mistermorph/integration" 19 19 "github.com/quailyquaily/mistermorph/internal/configbootstrap" 20 20 "github.com/quailyquaily/mistermorph/internal/configutil" 21 + "github.com/quailyquaily/mistermorph/internal/fsstore" 21 22 "github.com/quailyquaily/mistermorph/internal/llmbench" 22 23 "github.com/quailyquaily/mistermorph/internal/llmutil" 23 24 "github.com/quailyquaily/mistermorph/internal/pathutil" ··· 245 246 writeError(w, http.StatusInternalServerError, err.Error()) 246 247 return 247 248 } 248 - if err := os.WriteFile(configPath, serialized, 0o600); err != nil { 249 + if err := fsstore.WriteTextAtomic(configPath, string(serialized), fsstore.FileOptions{DirPerm: 0o755, FilePerm: 0o600}); err != nil { 249 250 writeError(w, http.StatusInternalServerError, err.Error()) 250 251 return 251 252 }
+2 -1
cmd/mistermorph/consolecmd/console_settings.go
··· 12 12 "github.com/quailyquaily/mistermorph/integration" 13 13 "github.com/quailyquaily/mistermorph/internal/channelopts" 14 14 "github.com/quailyquaily/mistermorph/internal/configbootstrap" 15 + "github.com/quailyquaily/mistermorph/internal/fsstore" 15 16 "github.com/spf13/viper" 16 17 "gopkg.in/yaml.v3" 17 18 ) ··· 194 195 writeError(w, http.StatusInternalServerError, err.Error()) 195 196 return 196 197 } 197 - if err := os.WriteFile(configPath, serialized, 0o600); err != nil { 198 + if err := fsstore.WriteTextAtomic(configPath, string(serialized), fsstore.FileOptions{DirPerm: 0o755, FilePerm: 0o600}); err != nil { 198 199 writeError(w, http.StatusInternalServerError, err.Error()) 199 200 return 200 201 }
+24 -12
cmd/mistermorph/consolecmd/local_endpoint_client.go
··· 10 10 ) 11 11 12 12 type inProcessRuntimeEndpointClient struct { 13 - handler http.Handler 14 - authToken string 13 + handler func() http.Handler 14 + authToken func() string 15 15 canSubmit func() bool 16 16 } 17 17 18 - func newInProcessRuntimeEndpointClient(handler http.Handler, authToken string, canSubmit func() bool) *inProcessRuntimeEndpointClient { 18 + func newInProcessRuntimeEndpointClient(handler func() http.Handler, authToken func() string, canSubmit func() bool) *inProcessRuntimeEndpointClient { 19 19 return &inProcessRuntimeEndpointClient{ 20 20 handler: handler, 21 - authToken: strings.TrimSpace(authToken), 21 + authToken: authToken, 22 22 canSubmit: canSubmit, 23 23 } 24 24 } 25 25 26 - func (c *inProcessRuntimeEndpointClient) readyHandler() error { 26 + func (c *inProcessRuntimeEndpointClient) currentHandler() (http.Handler, error) { 27 27 if c == nil || c.handler == nil { 28 - return fmt.Errorf("daemon handler is not configured") 28 + return nil, fmt.Errorf("daemon handler getter is not configured") 29 + } 30 + handler := c.handler() 31 + if handler == nil { 32 + return nil, fmt.Errorf("daemon handler is not configured") 29 33 } 30 - return nil 34 + return handler, nil 31 35 } 32 36 33 37 func (c *inProcessRuntimeEndpointClient) ready() error { 34 - if err := c.readyHandler(); err != nil { 38 + if _, err := c.currentHandler(); err != nil { 35 39 return err 36 40 } 37 - if strings.TrimSpace(c.authToken) == "" { 41 + if strings.TrimSpace(c.currentAuthToken()) == "" { 38 42 return fmt.Errorf("daemon server auth token is not configured") 39 43 } 40 44 return nil 41 45 } 42 46 47 + func (c *inProcessRuntimeEndpointClient) currentAuthToken() string { 48 + if c == nil || c.authToken == nil { 49 + return "" 50 + } 51 + return strings.TrimSpace(c.authToken()) 52 + } 53 + 43 54 func (c *inProcessRuntimeEndpointClient) Health(ctx context.Context) (runtimeEndpointHealth, error) { 44 55 status, raw, err := c.roundTrip(ctx, http.MethodGet, "/health", nil, false) 45 56 if err != nil { ··· 70 81 } 71 82 72 83 func (c *inProcessRuntimeEndpointClient) roundTrip(ctx context.Context, method, target string, body []byte, includeAuth bool) (int, []byte, error) { 73 - if err := c.readyHandler(); err != nil { 84 + handler, err := c.currentHandler() 85 + if err != nil { 74 86 return 0, nil, err 75 87 } 76 88 if ctx == nil { ··· 85 97 return 0, nil, err 86 98 } 87 99 if includeAuth { 88 - req.Header.Set("Authorization", "Bearer "+c.authToken) 100 + req.Header.Set("Authorization", "Bearer "+c.currentAuthToken()) 89 101 } 90 102 if len(body) > 0 { 91 103 req.Header.Set("Content-Type", "application/json") 92 104 } 93 105 94 106 rec := newBufferedResponseWriter() 95 - c.handler.ServeHTTP(rec, req) 107 + handler.ServeHTTP(rec, req) 96 108 return rec.StatusCode(), rec.Body(), nil 97 109 } 98 110
+20 -4
cmd/mistermorph/consolecmd/local_endpoint_client_test.go
··· 20 20 }, 21 21 HealthEnabled: true, 22 22 }) 23 - client := newInProcessRuntimeEndpointClient(handler, "dev-token", func() bool { return true }) 23 + client := newInProcessRuntimeEndpointClient( 24 + func() http.Handler { return handler }, 25 + func() string { return "dev-token" }, 26 + func() bool { return true }, 27 + ) 24 28 25 29 health, err := client.Health(context.Background()) 26 30 if err != nil { ··· 46 50 AuthToken: "dev-token", 47 51 HealthEnabled: true, 48 52 }) 49 - client := newInProcessRuntimeEndpointClient(handler, "dev-token", func() bool { return true }) 53 + client := newInProcessRuntimeEndpointClient( 54 + func() http.Handler { return handler }, 55 + func() string { return "dev-token" }, 56 + func() bool { return true }, 57 + ) 50 58 51 59 status, raw, err := client.Proxy(context.Background(), http.MethodGet, "/overview", nil) 52 60 if err != nil { ··· 73 81 return daemonruntime.SubmitTaskResponse{}, nil 74 82 }, 75 83 }) 76 - client := newInProcessRuntimeEndpointClient(handler, "dev-token", func() bool { return false }) 84 + client := newInProcessRuntimeEndpointClient( 85 + func() http.Handler { return handler }, 86 + func() string { return "dev-token" }, 87 + func() bool { return false }, 88 + ) 77 89 78 90 health, err := client.Health(context.Background()) 79 91 if err != nil { ··· 92 104 return daemonruntime.SubmitTaskResponse{}, nil 93 105 }, 94 106 }) 95 - client := newInProcessRuntimeEndpointClient(handler, "dev-token", func() bool { return true }) 107 + client := newInProcessRuntimeEndpointClient( 108 + func() http.Handler { return handler }, 109 + func() string { return "dev-token" }, 110 + func() bool { return true }, 111 + ) 96 112 97 113 status, raw, err := client.Proxy(context.Background(), http.MethodPost, "/tasks", nil) 98 114 if err != nil {
+445 -146
cmd/mistermorph/consolecmd/local_runtime.go
··· 35 35 "github.com/quailyquaily/mistermorph/internal/mcphost" 36 36 "github.com/quailyquaily/mistermorph/internal/memoryruntime" 37 37 "github.com/quailyquaily/mistermorph/internal/outputfmt" 38 + "github.com/quailyquaily/mistermorph/internal/pathutil" 38 39 "github.com/quailyquaily/mistermorph/internal/personautil" 39 40 "github.com/quailyquaily/mistermorph/internal/promptprofile" 40 41 "github.com/quailyquaily/mistermorph/internal/skillsutil" ··· 69 70 AutoRenameTopic bool 70 71 WakeSignal daemonruntime.PokeInput 71 72 Version uint64 73 + Generation *consoleLocalRuntimeGeneration 72 74 } 73 75 74 76 type consoleLocalRuntimeBundle struct { ··· 83 85 commonDeps depsutil.CommonDependencies 84 86 } 85 87 88 + type consoleLocalRuntimeGeneration struct { 89 + generation uint64 90 + reader *viper.Viper 91 + logger *slog.Logger 92 + commonDeps depsutil.CommonDependencies 93 + bundle *consoleLocalRuntimeBundle 94 + memRuntime runtimecore.MemoryRuntime 95 + contactsSvc *contacts.Service 96 + 97 + mu sync.Mutex 98 + refs int 99 + retired bool 100 + cleaned bool 101 + } 102 + 86 103 type consoleLocalRuntime struct { 87 - logger *slog.Logger 88 104 inspectors *consoleInspectors 89 105 store *daemonruntime.ConsoleFileStore 90 106 bus *busruntime.Inproc 91 107 runner *runtimecore.ConversationRunner[string, consoleLocalTaskJob] 92 - contactsSvc *contacts.Service 93 - bundleMu sync.RWMutex 94 - bundle *consoleLocalRuntimeBundle 95 - runtimeConfigMu sync.RWMutex 96 - runtimeConfig consoleLocalRuntimeConfigSnapshot 108 + generationMu sync.RWMutex 109 + generation *consoleLocalRuntimeGeneration 110 + nextGeneration uint64 111 + pendingJobsMu sync.Mutex 112 + pendingJobs map[string]consoleLocalTaskJob 97 113 managedRuntimeMu sync.RWMutex 98 114 managedRuntimeRunning map[string]bool 99 - memRuntime runtimecore.MemoryRuntime 100 115 workersCtx context.Context 101 116 heartbeatMu sync.Mutex 102 117 streamHub *consoleStreamHub 103 118 heartbeatState *heartbeatutil.State 104 119 heartbeatPokeRequests chan heartbeatloop.PokeRequest 105 120 heartbeatCancel context.CancelFunc 121 + handlerMu sync.RWMutex 106 122 handler http.Handler 107 123 authToken string 108 124 cancelWorkers context.CancelFunc ··· 110 126 } 111 127 112 128 func newConsoleLocalRuntime(cfg serveConfig, reader *viper.Viper) (*consoleLocalRuntime, error) { 113 - logger, err := logutil.LoggerFromConfig(logutil.LoggerConfigFromReader(reader)) 114 - if err != nil { 115 - return nil, err 116 - } 117 - slog.SetDefault(logger) 118 129 inspectors, err := newConsoleInspectors(cfg.inspectPrompt, cfg.inspectRequest, "console", "console", "20060102_150405") 119 130 if err != nil { 120 131 return nil, err 121 132 } 122 133 out := &consoleLocalRuntime{ 123 - logger: logger, 124 - inspectors: inspectors, 125 - runtimeConfig: buildConsoleLocalRuntimeConfigSnapshot(logger, inspectors, reader), 126 - } 127 - snapshot := out.currentRuntimeConfig() 128 - bundle, err := buildConsoleLocalRuntimeBundle(logger, inspectors, snapshot) 129 - if err != nil { 130 - _ = inspectors.Close() 131 - return nil, err 134 + inspectors: inspectors, 135 + pendingJobs: map[string]consoleLocalTaskJob{}, 136 + managedRuntimeRunning: map[string]bool{}, 132 137 } 133 - commonDeps := snapshot.commonDeps 134 138 workersCtx, cancelWorkers := context.WithCancel(context.Background()) 135 - memRuntime, err := runtimecore.NewMemoryRuntime(commonDeps, runtimecore.MemoryRuntimeOptions{ 136 - Enabled: snapshot.reader.GetBool("memory.enabled"), 137 - ShortTermDays: snapshot.reader.GetInt("memory.short_term_days"), 138 - Logger: logger, 139 - }) 139 + out.workersCtx = workersCtx 140 + out.cancelWorkers = cancelWorkers 141 + gen, err := out.prepareGeneration(reader) 140 142 if err != nil { 141 143 _ = inspectors.Close() 142 144 cancelWorkers() 143 145 return nil, err 144 146 } 145 - if memRuntime.ProjectionWorker != nil { 146 - memRuntime.ProjectionWorker.Start(workersCtx) 147 - } 148 - 149 - authToken, err := consoleLocalRuntimeAuthTokenFromReader(snapshot.reader) 150 - if err != nil { 151 - _ = inspectors.Close() 152 - cancelWorkers() 153 - memRuntime.Cleanup() 154 - return nil, err 155 - } 147 + slog.SetDefault(gen.logger) 156 148 store, err := daemonruntime.NewConsoleFileStore(daemonruntime.ConsoleFileStoreOptions{ 157 - RootDir: statepaths.TaskTargetDir("console"), 158 - HeartbeatTopicID: strings.TrimSpace(snapshot.reader.GetString("tasks.targets.console.heartbeat_topic_id")), 159 - Persist: consoleTaskPersistenceEnabledFromReader(snapshot.reader), 149 + RootDir: consoleTaskTargetDirFromReader(gen.reader), 150 + HeartbeatTopicID: strings.TrimSpace(gen.reader.GetString("tasks.targets.console.heartbeat_topic_id")), 151 + Persist: consoleTaskPersistenceEnabledFromReader(gen.reader), 160 152 }) 161 153 if err != nil { 162 154 _ = inspectors.Close() 163 155 cancelWorkers() 164 - memRuntime.Cleanup() 156 + gen.cleanupNow() 165 157 return nil, err 166 158 } 167 - maxInFlight := snapshot.reader.GetInt("bus.max_inflight") 159 + maxInFlight := gen.reader.GetInt("bus.max_inflight") 168 160 if maxInFlight <= 0 { 169 161 maxInFlight = 1024 170 162 } 171 163 inprocBus, err := busruntime.StartInproc(busruntime.BootstrapOptions{ 172 164 MaxInFlight: maxInFlight, 173 - Logger: logger, 165 + Logger: gen.logger, 174 166 Component: "console", 175 167 }) 176 168 if err != nil { 177 169 _ = inspectors.Close() 178 170 cancelWorkers() 179 - memRuntime.Cleanup() 171 + gen.cleanupNow() 180 172 return nil, err 181 173 } 182 174 out.store = store 183 175 out.bus = inprocBus 184 176 out.streamHub = newConsoleStreamHub() 185 - out.bundle = bundle 186 - out.managedRuntimeRunning = map[string]bool{} 187 - out.memRuntime = memRuntime 188 - out.workersCtx = workersCtx 189 - out.contactsSvc = contacts.NewService(contacts.NewFileStore(statepaths.ContactsDir())) 190 - out.authToken = authToken 191 - out.cancelWorkers = cancelWorkers 192 177 out.runner = runtimecore.NewConversationRunner[string, consoleLocalTaskJob]( 193 178 workersCtx, 194 179 make(chan struct{}, 1), ··· 201 186 _ = inspectors.Close() 202 187 inprocBus.Close() 203 188 cancelWorkers() 204 - memRuntime.Cleanup() 189 + gen.cleanupNow() 190 + return nil, err 191 + } 192 + if err := out.applyPreparedGeneration(gen); err != nil { 193 + _ = inspectors.Close() 194 + inprocBus.Close() 195 + cancelWorkers() 196 + gen.cleanupNow() 205 197 return nil, err 206 198 } 207 - out.reloadHeartbeatLoop() 208 - out.handler = daemonruntime.NewHandler(out.routesOptions(strings.TrimSpace(authToken))) 209 199 return out, nil 210 200 } 211 201 ··· 299 289 return base64.RawURLEncoding.EncodeToString(raw), nil 300 290 } 301 291 302 - func (r *consoleLocalRuntime) currentRuntimeConfig() consoleLocalRuntimeConfigSnapshot { 292 + func consoleMemoryDirFromReader(r interface { 293 + GetString(string) string 294 + }) string { 303 295 if r == nil { 304 - return consoleLocalRuntimeConfigSnapshot{} 296 + return pathutil.ResolveStateChildDir("", "", "memory") 305 297 } 306 - r.runtimeConfigMu.RLock() 307 - defer r.runtimeConfigMu.RUnlock() 308 - return r.runtimeConfig 298 + return pathutil.ResolveStateChildDir(r.GetString("file_state_dir"), r.GetString("memory.dir_name"), "memory") 309 299 } 310 300 311 - func (r *consoleLocalRuntime) currentConfigReader() *viper.Viper { 312 - reader := r.currentRuntimeConfig().reader 313 - if reader == nil { 314 - return viper.New() 301 + func consoleContactsDirFromReader(r interface { 302 + GetString(string) string 303 + }) string { 304 + if r == nil { 305 + return pathutil.ResolveStateChildDir("", "", "contacts") 315 306 } 316 - return reader 307 + return pathutil.ResolveStateChildDir(r.GetString("file_state_dir"), r.GetString("contacts.dir_name"), "contacts") 317 308 } 318 309 319 - func (r *consoleLocalRuntime) setRuntimeConfig(snapshot consoleLocalRuntimeConfigSnapshot) { 310 + func consoleTaskTargetDirFromReader(r interface { 311 + GetString(string) string 312 + }) string { 320 313 if r == nil { 314 + return pathutil.ResolveStateChildDir("", "tasks/console", "tasks/console") 315 + } 316 + tasksDir := pathutil.ResolveStateChildDir(r.GetString("file_state_dir"), r.GetString("tasks.dir_name"), "tasks") 317 + return pathutil.ResolveStateChildDir(tasksDir, "console", "console") 318 + } 319 + 320 + func consoleStateDirFromReader(r interface { 321 + GetString(string) string 322 + }) string { 323 + if r == nil { 324 + return pathutil.ResolveStateDir("") 325 + } 326 + return pathutil.ResolveStateDir(r.GetString("file_state_dir")) 327 + } 328 + 329 + func consoleHeartbeatChecklistPathFromReader(r interface { 330 + GetString(string) string 331 + }) string { 332 + return pathutil.ResolveStateFile(consoleStateDirFromReader(r), statepaths.HeartbeatChecklistFilename) 333 + } 334 + 335 + func (g *consoleLocalRuntimeGeneration) acquire() { 336 + if g == nil { 321 337 return 322 338 } 323 - r.runtimeConfigMu.Lock() 324 - defer r.runtimeConfigMu.Unlock() 325 - r.runtimeConfig = snapshot 339 + g.mu.Lock() 340 + defer g.mu.Unlock() 341 + g.refs++ 342 + } 343 + 344 + func (g *consoleLocalRuntimeGeneration) release() { 345 + if g == nil { 346 + return 347 + } 348 + g.mu.Lock() 349 + if g.refs > 0 { 350 + g.refs-- 351 + } 352 + shouldCleanup := g.retired && g.refs == 0 && !g.cleaned 353 + if shouldCleanup { 354 + g.cleaned = true 355 + } 356 + g.mu.Unlock() 357 + if shouldCleanup { 358 + g.cleanupResources() 359 + } 326 360 } 327 361 328 - func (r *consoleLocalRuntime) commonDependencies() depsutil.CommonDependencies { 329 - return r.currentRuntimeConfig().commonDeps 362 + func (g *consoleLocalRuntimeGeneration) retire() { 363 + if g == nil { 364 + return 365 + } 366 + g.mu.Lock() 367 + g.retired = true 368 + shouldCleanup := g.refs == 0 && !g.cleaned 369 + if shouldCleanup { 370 + g.cleaned = true 371 + } 372 + g.mu.Unlock() 373 + if shouldCleanup { 374 + g.cleanupResources() 375 + } 376 + } 377 + 378 + func (g *consoleLocalRuntimeGeneration) cleanupNow() { 379 + if g == nil { 380 + return 381 + } 382 + g.mu.Lock() 383 + if g.cleaned { 384 + g.mu.Unlock() 385 + return 386 + } 387 + g.cleaned = true 388 + g.retired = true 389 + g.mu.Unlock() 390 + g.cleanupResources() 391 + } 392 + 393 + func (g *consoleLocalRuntimeGeneration) cleanupResources() { 394 + if g == nil { 395 + return 396 + } 397 + if g.bundle != nil && g.bundle.mcpHost != nil { 398 + _ = g.bundle.mcpHost.Close() 399 + } 400 + if g.memRuntime.Cleanup != nil { 401 + g.memRuntime.Cleanup() 402 + } 403 + } 404 + 405 + func (r *consoleLocalRuntime) prepareGeneration(reader *viper.Viper) (*consoleLocalRuntimeGeneration, error) { 406 + if r == nil { 407 + return nil, fmt.Errorf("console runtime is not initialized") 408 + } 409 + if reader == nil { 410 + reader = viper.New() 411 + } 412 + logger, err := logutil.LoggerFromConfig(logutil.LoggerConfigFromReader(reader)) 413 + if err != nil { 414 + return nil, err 415 + } 416 + snapshot := buildConsoleLocalRuntimeConfigSnapshot(logger, r.inspectors, reader) 417 + bundle, err := buildConsoleLocalRuntimeBundle(logger, r.inspectors, snapshot) 418 + if err != nil { 419 + return nil, err 420 + } 421 + memRuntime, err := runtimecore.NewMemoryRuntime(snapshot.commonDeps, runtimecore.MemoryRuntimeOptions{ 422 + Enabled: snapshot.reader.GetBool("memory.enabled"), 423 + ShortTermDays: snapshot.reader.GetInt("memory.short_term_days"), 424 + MemoryDir: consoleMemoryDirFromReader(snapshot.reader), 425 + Logger: logger, 426 + }) 427 + if err != nil { 428 + if bundle.mcpHost != nil { 429 + _ = bundle.mcpHost.Close() 430 + } 431 + return nil, err 432 + } 433 + r.generationMu.Lock() 434 + r.nextGeneration++ 435 + nextGeneration := r.nextGeneration 436 + r.generationMu.Unlock() 437 + contactsStore := contacts.NewFileStore(consoleContactsDirFromReader(snapshot.reader)) 438 + generation := &consoleLocalRuntimeGeneration{ 439 + generation: nextGeneration, 440 + reader: snapshot.reader, 441 + logger: logger, 442 + commonDeps: snapshot.commonDeps, 443 + bundle: bundle, 444 + memRuntime: memRuntime, 445 + contactsSvc: contacts.NewServiceWithOptions(contactsStore, contacts.ServiceOptions{ 446 + FailureCooldown: consoleContactsFailureCooldownFromReader(snapshot.reader), 447 + }), 448 + } 449 + return generation, nil 450 + } 451 + 452 + func (r *consoleLocalRuntime) currentGeneration() *consoleLocalRuntimeGeneration { 453 + if r == nil { 454 + return nil 455 + } 456 + r.generationMu.RLock() 457 + defer r.generationMu.RUnlock() 458 + return r.generation 459 + } 460 + 461 + func (r *consoleLocalRuntime) captureGeneration() (*consoleLocalRuntimeGeneration, error) { 462 + if r == nil { 463 + return nil, fmt.Errorf("console runtime is not initialized") 464 + } 465 + r.generationMu.RLock() 466 + generation := r.generation 467 + if generation != nil { 468 + generation.acquire() 469 + } 470 + r.generationMu.RUnlock() 471 + if generation == nil { 472 + return nil, fmt.Errorf("console runtime generation is not initialized") 473 + } 474 + return generation, nil 475 + } 476 + 477 + func (r *consoleLocalRuntime) currentLogger() *slog.Logger { 478 + if generation := r.currentGeneration(); generation != nil && generation.logger != nil { 479 + return generation.logger 480 + } 481 + return slog.Default() 482 + } 483 + 484 + func (r *consoleLocalRuntime) currentAuthToken() string { 485 + if r == nil { 486 + return "" 487 + } 488 + r.handlerMu.RLock() 489 + defer r.handlerMu.RUnlock() 490 + return strings.TrimSpace(r.authToken) 330 491 } 331 492 332 - func (r *consoleLocalRuntime) currentBundle() *consoleLocalRuntimeBundle { 493 + func (r *consoleLocalRuntime) currentHandler() http.Handler { 333 494 if r == nil { 334 495 return nil 335 496 } 336 - r.bundleMu.RLock() 337 - defer r.bundleMu.RUnlock() 338 - return r.bundle 497 + r.handlerMu.RLock() 498 + defer r.handlerMu.RUnlock() 499 + return r.handler 500 + } 501 + 502 + func (r *consoleLocalRuntime) currentConfigReader() *viper.Viper { 503 + generation := r.currentGeneration() 504 + if generation == nil { 505 + return viper.New() 506 + } 507 + reader := generation.reader 508 + if reader == nil { 509 + return viper.New() 510 + } 511 + return reader 339 512 } 340 513 341 - func (r *consoleLocalRuntime) defaultLLMConfig() (string, string) { 342 - if r != nil { 343 - route, err := depsutil.ResolveLLMRouteFromCommon(r.commonDependencies(), llmutil.RoutePurposeMainLoop) 514 + func (r *consoleLocalRuntime) applyPreparedGeneration(generation *consoleLocalRuntimeGeneration) error { 515 + if r == nil { 516 + return fmt.Errorf("console runtime is not initialized") 517 + } 518 + var reader *viper.Viper 519 + if generation != nil { 520 + reader = generation.reader 521 + } 522 + authToken, err := consoleLocalRuntimeAuthTokenFromReader(reader) 523 + if err != nil { 524 + authToken = "" 525 + } 526 + if generation != nil && r.store != nil { 527 + if err := r.store.ApplyConfig(daemonruntime.ConsoleFileStoreOptions{ 528 + RootDir: consoleTaskTargetDirFromReader(generation.reader), 529 + HeartbeatTopicID: strings.TrimSpace(generation.reader.GetString("tasks.targets.console.heartbeat_topic_id")), 530 + Persist: consoleTaskPersistenceEnabledFromReader(generation.reader), 531 + }); err != nil { 532 + return err 533 + } 534 + } 535 + r.generationMu.Lock() 536 + prevGeneration := r.generation 537 + r.generation = generation 538 + r.generationMu.Unlock() 539 + r.handlerMu.Lock() 540 + r.authToken = authToken 541 + r.handler = daemonruntime.NewHandler(r.routesOptions(strings.TrimSpace(authToken))) 542 + r.handlerMu.Unlock() 543 + if generation != nil && generation.memRuntime.ProjectionWorker != nil && r.workersCtx != nil { 544 + generation.memRuntime.ProjectionWorker.Start(r.workersCtx) 545 + } 546 + slog.SetDefault(r.currentLogger()) 547 + r.reloadHeartbeatLoop() 548 + if prevGeneration != nil { 549 + prevGeneration.retire() 550 + } 551 + return nil 552 + } 553 + 554 + func defaultLLMConfigForGeneration(generation *consoleLocalRuntimeGeneration) (string, string) { 555 + if generation != nil { 556 + route, err := depsutil.ResolveLLMRouteFromCommon(generation.commonDeps, llmutil.RoutePurposeMainLoop) 344 557 if err == nil { 345 558 return strings.TrimSpace(route.ClientConfig.Provider), strings.TrimSpace(route.ClientConfig.Model) 346 559 } 347 560 } 348 - bundle := r.currentBundle() 561 + var bundle *consoleLocalRuntimeBundle 562 + if generation != nil { 563 + bundle = generation.bundle 564 + } 349 565 if bundle == nil { 350 566 return "", "" 351 567 } ··· 393 609 if r == nil { 394 610 return fmt.Errorf("console runtime is not initialized") 395 611 } 396 - snapshot := buildConsoleLocalRuntimeConfigSnapshot(r.logger, r.inspectors, reader) 397 - nextBundle, err := buildConsoleLocalRuntimeBundle(r.logger, r.inspectors, snapshot) 612 + generation, err := r.prepareGeneration(reader) 398 613 if err != nil { 399 614 return err 400 615 } 401 - r.bundleMu.Lock() 402 - prevBundle := r.bundle 403 - r.bundle = nextBundle 404 - r.bundleMu.Unlock() 405 - r.setRuntimeConfig(snapshot) 406 - r.reloadHeartbeatLoop() 407 - if prevBundle != nil && prevBundle.mcpHost != nil { 408 - _ = prevBundle.mcpHost.Close() 616 + if err := r.applyPreparedGeneration(generation); err != nil { 617 + generation.cleanupNow() 618 + return err 409 619 } 410 620 return nil 411 621 } ··· 414 624 if r == nil { 415 625 return 416 626 } 627 + r.pendingJobsMu.Lock() 628 + for taskID, job := range r.pendingJobs { 629 + if job.Generation != nil { 630 + job.Generation.release() 631 + } 632 + delete(r.pendingJobs, taskID) 633 + } 634 + r.pendingJobsMu.Unlock() 417 635 if r.bus != nil { 418 636 _ = r.bus.Close() 419 637 } ··· 427 645 if r.cancelWorkers != nil { 428 646 r.cancelWorkers() 429 647 } 430 - if r.memRuntime.Cleanup != nil { 431 - r.memRuntime.Cleanup() 648 + generation := r.currentGeneration() 649 + r.generationMu.Lock() 650 + r.generation = nil 651 + r.generationMu.Unlock() 652 + if generation != nil { 653 + generation.cleanupNow() 432 654 } 433 655 if r.inspectors != nil { 434 656 _ = r.inspectors.Close() 435 657 } 436 - bundle := r.currentBundle() 437 - if bundle != nil && bundle.mcpHost != nil { 438 - _ = bundle.mcpHost.Close() 439 - } 440 658 } 441 659 442 660 func consoleLLMCredentialsWarning(route llmutil.ResolvedRoute) string { ··· 466 684 Ref: consoleLocalEndpointRef, 467 685 Name: consoleLocalEndpointName, 468 686 URL: consoleLocalEndpointURL, 469 - Client: newInProcessRuntimeEndpointClient(r.handler, r.authToken, r.canSubmit), 687 + Client: newInProcessRuntimeEndpointClient(r.currentHandler, r.currentAuthToken, r.canSubmit), 470 688 } 471 689 } 472 690 473 691 func (r *consoleLocalRuntime) canSubmit() bool { 474 - if r == nil { 692 + generation, err := r.captureGeneration() 693 + if err != nil { 694 + return false 695 + } 696 + defer generation.release() 697 + return canSubmitGeneration(generation) 698 + } 699 + 700 + func canSubmitGeneration(generation *consoleLocalRuntimeGeneration) bool { 701 + if generation == nil { 475 702 return false 476 703 } 477 - bundle := r.currentBundle() 704 + bundle := generation.bundle 478 705 if bundle == nil || bundle.taskRuntime == nil { 479 706 return false 480 707 } ··· 498 725 poke = r.pokeHeartbeat 499 726 } 500 727 return daemonruntime.RoutesOptions{ 501 - Mode: "console", 502 - AgentNameFunc: func() string { return personautil.LoadAgentName(statepaths.FileStateDir()) }, 728 + Mode: "console", 729 + AgentNameFunc: func() string { 730 + generation := r.currentGeneration() 731 + if generation == nil { 732 + return personautil.LoadAgentName(consoleStateDirFromReader(viper.New())) 733 + } 734 + return personautil.LoadAgentName(consoleStateDirFromReader(generation.reader)) 735 + }, 503 736 AuthToken: strings.TrimSpace(authToken), 504 737 TaskReader: r.store, 505 738 TopicReader: r.store, ··· 507 740 Submit: r.submitTask, 508 741 HealthEnabled: true, 509 742 Overview: func(ctx context.Context) (map[string]any, error) { 510 - provider, model := r.defaultLLMConfig() 511 - reader := r.currentConfigReader() 743 + generation, err := r.captureGeneration() 744 + if err != nil { 745 + return nil, err 746 + } 747 + defer generation.release() 748 + provider, model := defaultLLMConfigForGeneration(generation) 749 + reader := generation.reader 512 750 return map[string]any{ 513 751 "llm": map[string]any{ 514 752 "provider": provider, ··· 561 799 } 562 800 563 801 func (r *consoleLocalRuntime) submitTask(ctx context.Context, req daemonruntime.SubmitTaskRequest) (daemonruntime.SubmitTaskResponse, error) { 564 - timeout := consoleDefaultTimeoutFromReader(r.currentConfigReader()) 802 + generation, err := r.captureGeneration() 803 + if err != nil { 804 + return daemonruntime.SubmitTaskResponse{}, err 805 + } 806 + releaseGeneration := true 807 + defer func() { 808 + if releaseGeneration { 809 + generation.release() 810 + } 811 + }() 812 + timeout := consoleDefaultTimeoutFromReader(generation.reader) 565 813 if strings.TrimSpace(req.Timeout) != "" { 566 814 d, err := time.ParseDuration(strings.TrimSpace(req.Timeout)) 567 815 if err != nil || d <= 0 { ··· 575 823 Ref: "web/console", 576 824 }) 577 825 task := strings.TrimSpace(req.Task) 578 - if output, handled := r.handleConsoleModelCommand(task); handled { 579 - return r.submitSyntheticTask(task, output, timeout, strings.TrimSpace(req.TopicID), strings.TrimSpace(req.TopicTitle), trigger) 826 + if output, handled := r.handleConsoleModelCommand(generation.reader, task); handled { 827 + resp, err := r.submitSyntheticTask(generation, task, output, timeout, strings.TrimSpace(req.TopicID), strings.TrimSpace(req.TopicTitle), trigger) 828 + if err == nil { 829 + releaseGeneration = false 830 + } 831 + return resp, err 580 832 } 581 833 model := strings.TrimSpace(req.Model) 582 - return r.submitTaskViaBus( 834 + resp, err := r.submitTaskViaBus( 583 835 ctx, 836 + generation, 584 837 task, 585 838 model, 586 839 timeout, ··· 588 841 strings.TrimSpace(req.TopicTitle), 589 842 trigger, 590 843 ) 844 + if err == nil { 845 + releaseGeneration = false 846 + } 847 + return resp, err 591 848 } 592 849 593 - func (r *consoleLocalRuntime) handleConsoleModelCommand(task string) (string, bool) { 850 + func (r *consoleLocalRuntime) handleConsoleModelCommand(reader *viper.Viper, task string) (string, bool) { 594 851 output, handled, err := llmselect.ExecuteCommandText( 595 - llmutil.RuntimeValuesFromReader(r.currentConfigReader()), 852 + llmutil.RuntimeValuesFromReader(reader), 596 853 llmselect.ProcessStore(), 597 854 task, 598 855 ) ··· 605 862 return output, true 606 863 } 607 864 608 - func (r *consoleLocalRuntime) submitSyntheticTask(task string, output string, timeout time.Duration, topicID string, topicTitle string, trigger daemonruntime.TaskTrigger) (daemonruntime.SubmitTaskResponse, error) { 609 - job, _, err := r.acceptTask(task, "", timeout, topicID, topicTitle, trigger) 865 + func (r *consoleLocalRuntime) submitSyntheticTask(generation *consoleLocalRuntimeGeneration, task string, output string, timeout time.Duration, topicID string, topicTitle string, trigger daemonruntime.TaskTrigger) (daemonruntime.SubmitTaskResponse, error) { 866 + job, _, err := r.acceptTask(generation, task, "", timeout, topicID, topicTitle, trigger) 610 867 if err != nil { 611 868 return daemonruntime.SubmitTaskResponse{}, err 612 869 } 870 + defer generation.release() 613 871 finishedAt := time.Now().UTC() 614 872 r.store.Update(job.TaskID, func(info *daemonruntime.TaskInfo) { 615 873 info.Status = daemonruntime.TaskDone ··· 629 887 } 630 888 631 889 func (r *consoleLocalRuntime) enqueueTask(ctx context.Context, task string, model string, timeout time.Duration, topicID string, topicTitle string, trigger daemonruntime.TaskTrigger) (daemonruntime.SubmitTaskResponse, error) { 632 - job, resp, err := r.acceptTask(task, model, timeout, topicID, topicTitle, trigger) 890 + generation, err := r.captureGeneration() 633 891 if err != nil { 634 892 return daemonruntime.SubmitTaskResponse{}, err 635 893 } 894 + job, resp, err := r.acceptTask(generation, task, model, timeout, topicID, topicTitle, trigger) 895 + if err != nil { 896 + generation.release() 897 + return daemonruntime.SubmitTaskResponse{}, err 898 + } 636 899 err = r.runner.Enqueue(ctx, job.ConversationKey, func(version uint64) consoleLocalTaskJob { 637 900 job.Version = version 638 901 return job 639 902 }) 640 903 if err != nil { 904 + generation.release() 641 905 runtimecore.MarkTaskFailed(r.store, job.TaskID, strings.TrimSpace(err.Error()), daemonruntime.IsContextDeadline(ctx, err)) 642 906 return daemonruntime.SubmitTaskResponse{}, err 643 907 } ··· 648 912 if r == nil { 649 913 return 650 914 } 915 + if job.Generation == nil { 916 + runtimecore.MarkTaskFailed(r.store, job.TaskID, "console task generation is not initialized", false) 917 + return 918 + } 919 + defer job.Generation.release() 920 + logger := job.Generation.logger 921 + if logger == nil { 922 + logger = r.currentLogger() 923 + } 651 924 runtimecore.MarkTaskRunning(r.store, job.TaskID) 652 925 if r.streamHub != nil { 653 926 r.streamHub.PublishStatus(job.TaskID, string(daemonruntime.TaskRunning)) 654 927 } 655 - if r.logger != nil { 656 - r.logger.Info("console_stream_enabled", 928 + if logger != nil { 929 + logger.Info("console_stream_enabled", 657 930 "task_id", job.TaskID, 658 931 "conversation_key", conversationKey, 659 932 "topic_id", strings.TrimSpace(job.TopicID), ··· 661 934 ) 662 935 } 663 936 664 - replySink := newConsoleReplySink(r.streamHub, job.TaskID, r.logger) 665 - eventSink := newConsoleEventPreviewSink(r.streamHub, job.TaskID, r.logger) 666 - if bundle := r.currentBundle(); bundle != nil { 667 - eventSink.observer = newConsoleLLMObserver(bundle.taskRuntime, job.Model, r.logger) 937 + replySink := newConsoleReplySink(r.streamHub, job.TaskID, logger) 938 + eventSink := newConsoleEventPreviewSink(r.streamHub, job.TaskID, logger) 939 + if bundle := job.Generation.bundle; bundle != nil { 940 + eventSink.observer = newConsoleLLMObserver(bundle.taskRuntime, job.Model, logger) 668 941 } 669 942 streamer := streaming.NewFinalOutputStreamer(streaming.FinalOutputStreamerOptions{ 670 943 Sink: replySink, 671 944 }) 672 - streamTracker := newConsoleStreamTracker(r.logger, job.TaskID) 945 + streamTracker := newConsoleStreamTracker(logger, job.TaskID) 673 946 onStream := func(event llm.StreamEvent) error { 674 947 return streamTracker.Handle(event, streamer.Handle) 675 948 } ··· 729 1002 if r == nil { 730 1003 return nil, nil, fmt.Errorf("console runtime is not initialized") 731 1004 } 1005 + if job.Generation == nil { 1006 + return nil, nil, fmt.Errorf("console task generation is not initialized") 1007 + } 1008 + generation := job.Generation 732 1009 ctx = llmstats.WithRunID(ctx, job.TaskID) 733 1010 task := strings.TrimSpace(job.Task) 734 1011 if task == "" { ··· 736 1013 } 737 1014 model := strings.TrimSpace(job.Model) 738 1015 if model == "" { 739 - _, model = r.defaultLLMConfig() 1016 + _, model = defaultLLMConfigForGeneration(generation) 740 1017 } 741 1018 historyMsgs, currentMsg, err := r.buildConsolePromptMessages(job) 742 1019 if err != nil { ··· 747 1024 Source: "console", 748 1025 SubjectID: memSubjectID, 749 1026 } 750 - reader := r.currentConfigReader() 751 - if reader.GetBool("memory.enabled") && r.memRuntime.Orchestrator != nil && memSubjectID != "" { 1027 + reader := generation.reader 1028 + if reader.GetBool("memory.enabled") && generation.memRuntime.Orchestrator != nil && memSubjectID != "" { 752 1029 memoryHooks.InjectionEnabled = reader.GetBool("memory.injection.enabled") 753 1030 memoryHooks.InjectionMaxItems = reader.GetInt("memory.injection.max_items") 754 1031 memoryHooks.PrepareInjection = func(maxItems int) (string, error) { 755 - return r.memRuntime.Orchestrator.PrepareInjection(memoryruntime.PrepareInjectionRequest{ 1032 + return generation.memRuntime.Orchestrator.PrepareInjection(memoryruntime.PrepareInjectionRequest{ 756 1033 SubjectID: memSubjectID, 757 1034 RequestContext: memory.ContextPrivate, 758 1035 MaxItems: maxItems, 759 1036 }) 760 1037 } 761 1038 memoryHooks.Record = func(_ *agent.Final, finalOutput string) error { 762 - _, err := r.memRuntime.Orchestrator.Record(buildConsoleMemoryRecordRequest(job, memSubjectID, finalOutput)) 1039 + _, err := generation.memRuntime.Orchestrator.Record(buildConsoleMemoryRecordRequest(job, memSubjectID, finalOutput)) 763 1040 return err 764 1041 } 765 1042 memoryHooks.NotifyRecorded = func() { 766 - if r.memRuntime.ProjectionWorker != nil { 767 - r.memRuntime.ProjectionWorker.NotifyRecordAppended() 1043 + if generation.memRuntime.ProjectionWorker != nil { 1044 + generation.memRuntime.ProjectionWorker.NotifyRecordAppended() 768 1045 } 769 1046 } 770 1047 } ··· 783 1060 promptprofile.AppendWakeSignalBlock(spec, wakeSignal) 784 1061 } 785 1062 } 786 - bundle := r.currentBundle() 1063 + bundle := generation.bundle 787 1064 if bundle == nil || bundle.taskRuntime == nil { 788 1065 return nil, nil, fmt.Errorf("console task runtime is not initialized") 789 1066 } ··· 853 1130 } 854 1131 if title := consoleTopicTitleFromOutput(finalOutput); title != "" { 855 1132 if err := r.store.SetTopicTitle(topicID, title); err != nil { 856 - r.logger.Debug("console_topic_title_update_failed", "topic_id", topicID, "error", err.Error()) 1133 + r.currentLogger().Debug("console_topic_title_update_failed", "topic_id", topicID, "error", err.Error()) 857 1134 } 858 1135 return 859 1136 } ··· 861 1138 return 862 1139 } 863 1140 1141 + if job.Generation != nil { 1142 + job.Generation.acquire() 1143 + } 864 1144 go func() { 1145 + if job.Generation != nil { 1146 + defer job.Generation.release() 1147 + } 865 1148 if current, ok := r.store.GetTopic(topicID); ok && current != nil && current.LLMTitleGeneratedAt != nil { 866 1149 return 867 1150 } 868 1151 ctx, cancel := context.WithTimeout(context.Background(), consoleTopicTitleTimeout) 869 1152 defer cancel() 870 1153 871 - title, err := r.generateTopicTitle(ctx, taskText, finalOutput) 1154 + title, err := r.generateTopicTitle(ctx, job.Generation, taskText, finalOutput) 872 1155 if err != nil { 873 - r.logger.Debug("console_topic_title_generate_failed", "topic_id", topicID, "error", err.Error()) 1156 + r.currentLogger().Debug("console_topic_title_generate_failed", "topic_id", topicID, "error", err.Error()) 874 1157 return 875 1158 } 876 1159 if err := r.store.SetTopicTitleFromLLM(topicID, title); err != nil { 877 - r.logger.Debug("console_topic_title_update_failed", "topic_id", topicID, "error", err.Error()) 1160 + r.currentLogger().Debug("console_topic_title_update_failed", "topic_id", topicID, "error", err.Error()) 878 1161 } 879 1162 }() 880 1163 } 881 1164 882 - func (r *consoleLocalRuntime) generateTopicTitle(ctx context.Context, task string, finalOutput string) (string, error) { 883 - if r == nil { 884 - return "", fmt.Errorf("console runtime is not initialized") 1165 + func (r *consoleLocalRuntime) generateTopicTitle(ctx context.Context, generation *consoleLocalRuntimeGeneration, task string, finalOutput string) (string, error) { 1166 + if generation == nil { 1167 + return "", fmt.Errorf("console runtime generation is not initialized") 885 1168 } 886 - commonDeps := r.commonDependencies() 887 - route, err := depsutil.ResolveLLMRouteFromCommon(commonDeps, llmutil.RoutePurposeMainLoop) 1169 + route, err := depsutil.ResolveLLMRouteFromCommon(generation.commonDeps, llmutil.RoutePurposeMainLoop) 888 1170 if err != nil { 889 1171 return "", err 890 1172 } 891 - client, err := depsutil.CreateClientFromCommon(commonDeps, route) 1173 + client, err := depsutil.CreateClientFromCommon(generation.commonDeps, route) 892 1174 if err != nil { 893 1175 return "", err 894 1176 } 895 1177 model := strings.TrimSpace(route.ClientConfig.Model) 896 1178 if model == "" { 897 - _, model = r.defaultLLMConfig() 1179 + _, model = defaultLLMConfigForGeneration(generation) 898 1180 } 899 1181 task = daemonruntime.TruncateUTF8(strings.Join(strings.Fields(task), " "), 1200) 900 1182 finalOutput = daemonruntime.TruncateUTF8(strings.Join(strings.Fields(finalOutput), " "), 1200) ··· 1099 1381 if r == nil { 1100 1382 return "runtime_unavailable" 1101 1383 } 1102 - model := func() string { 1103 - _, model := r.defaultLLMConfig() 1104 - return model 1384 + generation, err := r.captureGeneration() 1385 + if err != nil { 1386 + return err.Error() 1387 + } 1388 + releaseGeneration := true 1389 + defer func() { 1390 + if releaseGeneration { 1391 + generation.release() 1392 + } 1105 1393 }() 1394 + _, model := defaultLLMConfigForGeneration(generation) 1106 1395 trigger := daemonruntime.TaskTrigger{ 1107 1396 Source: "heartbeat", 1108 1397 Event: "heartbeat_tick", ··· 1113 1402 trigger.Ref = "console/poke" 1114 1403 } 1115 1404 job, _, err := r.acceptTask( 1405 + generation, 1116 1406 task, 1117 1407 model, 1118 - consoleDefaultTimeoutFromReader(r.currentConfigReader()), 1408 + consoleDefaultTimeoutFromReader(generation.reader), 1119 1409 r.store.HeartbeatTopicID(), 1120 1410 daemonruntime.ConsoleHeartbeatTopicTitle, 1121 1411 trigger, ··· 1128 1418 job.Version = version 1129 1419 return job 1130 1420 }); err != nil { 1421 + generation.release() 1131 1422 runtimecore.MarkTaskFailed(r.store, job.TaskID, strings.TrimSpace(err.Error()), daemonruntime.IsContextDeadline(ctx, err)) 1132 1423 return err.Error() 1133 1424 } 1425 + releaseGeneration = false 1134 1426 return "" 1135 1427 } 1136 1428 ··· 1149 1441 r.heartbeatMu.Unlock() 1150 1442 return 1151 1443 } 1152 - hbCfg := channelopts.HeartbeatConfigFromReader(r.currentConfigReader()) 1444 + generation := r.currentGeneration() 1445 + if generation == nil { 1446 + r.heartbeatState = nil 1447 + r.heartbeatMu.Unlock() 1448 + return 1449 + } 1450 + hbCfg := channelopts.HeartbeatConfigFromReader(generation.reader) 1153 1451 if !hbCfg.Enabled || hbCfg.Interval <= 0 { 1154 1452 r.heartbeatMu.Unlock() 1155 1453 return ··· 1158 1456 r.heartbeatState = &heartbeatutil.State{} 1159 1457 } 1160 1458 hbState := r.heartbeatState 1161 - hbChecklist := statepaths.HeartbeatChecklistPath() 1459 + hbChecklist := consoleHeartbeatChecklistPathFromReader(generation.reader) 1460 + logger := generation.logger 1162 1461 hbCtx, cancel := context.WithCancel(r.workersCtx) 1163 1462 pokeRequests := make(chan heartbeatloop.PokeRequest) 1164 1463 r.heartbeatCancel = cancel ··· 1184 1483 switch result.Outcome { 1185 1484 case heartbeatutil.TickBuildError: 1186 1485 if strings.TrimSpace(result.AlertMessage) != "" { 1187 - r.logger.Warn("heartbeat_alert", "source", "console", "message", result.AlertMessage) 1486 + logger.Warn("heartbeat_alert", "source", "console", "message", result.AlertMessage) 1188 1487 } else if result.BuildError != nil { 1189 - r.logger.Warn("heartbeat_task_error", "source", "console", "error", result.BuildError.Error()) 1488 + logger.Warn("heartbeat_task_error", "source", "console", "error", result.BuildError.Error()) 1190 1489 } 1191 1490 case heartbeatutil.TickSkipped: 1192 1491 if result.SkipReason == consoleHeartbeatSkipNoLLM { 1193 1492 break 1194 1493 } 1195 - r.logger.Debug("heartbeat_skip", "source", "console", "reason", result.SkipReason) 1494 + logger.Debug("heartbeat_skip", "source", "console", "reason", result.SkipReason) 1196 1495 } 1197 1496 return result 1198 1497 }
+85 -31
cmd/mistermorph/consolecmd/local_runtime_bus.go
··· 20 20 consoleDisplayName = "Console User" 21 21 ) 22 22 23 - func (r *consoleLocalRuntime) submitTaskViaBus(ctx context.Context, task string, model string, timeout time.Duration, topicID string, topicTitle string, trigger daemonruntime.TaskTrigger) (daemonruntime.SubmitTaskResponse, error) { 24 - job, resp, err := r.acceptTask(task, model, timeout, topicID, topicTitle, trigger) 23 + func (r *consoleLocalRuntime) submitTaskViaBus(ctx context.Context, generation *consoleLocalRuntimeGeneration, task string, model string, timeout time.Duration, topicID string, topicTitle string, trigger daemonruntime.TaskTrigger) (daemonruntime.SubmitTaskResponse, error) { 24 + job, resp, err := r.acceptTask(generation, task, model, timeout, topicID, topicTitle, trigger) 25 25 if err != nil { 26 26 return daemonruntime.SubmitTaskResponse{}, err 27 27 } 28 + r.pendingJobsMu.Lock() 29 + r.pendingJobs[job.TaskID] = job 30 + r.pendingJobsMu.Unlock() 28 31 if err := r.publishConsoleInbound(ctx, job); err != nil { 32 + r.pendingJobsMu.Lock() 33 + delete(r.pendingJobs, job.TaskID) 34 + r.pendingJobsMu.Unlock() 35 + if generation != nil { 36 + generation.release() 37 + } 29 38 runtimecore.MarkTaskFailed(r.store, job.TaskID, strings.TrimSpace(err.Error()), daemonruntime.IsContextDeadline(ctx, err)) 30 39 return daemonruntime.SubmitTaskResponse{}, err 31 40 } 32 41 return resp, nil 33 42 } 34 43 35 - func (r *consoleLocalRuntime) acceptTask(task string, model string, timeout time.Duration, topicID string, topicTitle string, trigger daemonruntime.TaskTrigger) (consoleLocalTaskJob, daemonruntime.SubmitTaskResponse, error) { 44 + func (r *consoleLocalRuntime) acceptTask(generation *consoleLocalRuntimeGeneration, task string, model string, timeout time.Duration, topicID string, topicTitle string, trigger daemonruntime.TaskTrigger) (consoleLocalTaskJob, daemonruntime.SubmitTaskResponse, error) { 36 45 if r == nil || r.store == nil { 37 46 return consoleLocalTaskJob{}, daemonruntime.SubmitTaskResponse{}, fmt.Errorf("console runtime is not initialized") 38 47 } 48 + if generation == nil { 49 + return consoleLocalTaskJob{}, daemonruntime.SubmitTaskResponse{}, fmt.Errorf("console runtime generation is not initialized") 50 + } 39 51 now := time.Now().UTC() 40 52 seq := r.seq.Add(1) 41 53 taskID := daemonruntime.BuildTaskID("console", now.UnixNano(), seq, rand.Uint64()) ··· 75 87 CreatedAt: now, 76 88 Trigger: trigger, 77 89 AutoRenameTopic: autoRenameTopic, 90 + Generation: generation, 78 91 } 79 92 return job, daemonruntime.SubmitTaskResponse{ 80 93 ID: taskID, ··· 102 115 if ctx == nil { 103 116 return fmt.Errorf("context is required") 104 117 } 118 + sessionID := consoleBusSessionID(job.TopicID) 105 119 payloadBase64, err := busruntime.EncodeMessageEnvelope(busruntime.TopicChatMessage, busruntime.MessageEnvelope{ 106 120 MessageID: strings.TrimSpace(job.TaskID), 107 121 Text: strings.TrimSpace(job.Task), 108 122 SentAt: job.CreatedAt.UTC().Format(time.RFC3339), 109 - SessionID: consoleBusSessionID(job.TopicID), 123 + SessionID: sessionID, 110 124 }) 111 125 if err != nil { 112 126 return err ··· 123 137 PayloadBase64: payloadBase64, 124 138 CreatedAt: job.CreatedAt.UTC(), 125 139 Extensions: busruntime.MessageExtensions{ 126 - SessionID: consoleBusSessionID(job.TopicID), 140 + SessionID: sessionID, 127 141 ChatType: "private", 128 142 FromUserRef: consoleParticipantKey, 129 143 FromUsername: consoleUsername, ··· 156 170 func (r *consoleLocalRuntime) handleConsoleBusInbound(ctx context.Context, msg busruntime.BusMessage) error { 157 171 if msg.Channel != busruntime.ChannelConsole { 158 172 return fmt.Errorf("unsupported inbound channel: %s", msg.Channel) 159 - } 160 - if r.contactsSvc != nil { 161 - if err := r.contactsSvc.ObserveInboundBusMessage(context.Background(), msg, time.Now().UTC()); err != nil { 162 - r.logger.Warn("contacts_observe_bus_error", "channel", msg.Channel, "idempotency_key", msg.IdempotencyKey, "error", err.Error()) 163 - } 164 173 } 165 174 taskID := strings.TrimSpace(msg.CorrelationID) 166 175 if taskID == "" { ··· 170 179 } 171 180 taskID = strings.TrimSpace(envelope.MessageID) 172 181 } 182 + job, foundPending := r.takePendingJob(taskID) 183 + generation := job.Generation 184 + if !foundPending { 185 + var err error 186 + generation, err = r.captureGeneration() 187 + if err != nil { 188 + return err 189 + } 190 + job.Generation = generation 191 + } 192 + logger := r.currentLogger() 193 + if generation != nil && generation.logger != nil { 194 + logger = generation.logger 195 + } 196 + if generation != nil && generation.contactsSvc != nil { 197 + if err := generation.contactsSvc.ObserveInboundBusMessage(context.Background(), msg, time.Now().UTC()); err != nil { 198 + logger.Warn("contacts_observe_bus_error", "channel", msg.Channel, "idempotency_key", msg.IdempotencyKey, "error", err.Error()) 199 + } 200 + } 173 201 stored, exists := r.store.Get(taskID) 174 202 if !exists || stored == nil { 203 + if generation != nil { 204 + generation.release() 205 + } 175 206 return fmt.Errorf("console task %q not found", taskID) 176 207 } 177 - trigger, ok := r.store.GetTrigger(taskID) 178 - if !ok { 179 - trigger = daemonruntime.TaskTrigger{ 180 - Source: "ui", 181 - Event: "chat_submit", 182 - Ref: "web/console", 208 + if !foundPending { 209 + trigger, ok := r.store.GetTrigger(taskID) 210 + if !ok { 211 + trigger = daemonruntime.TaskTrigger{ 212 + Source: "ui", 213 + Event: "chat_submit", 214 + Ref: "web/console", 215 + } 183 216 } 184 - } 185 - autoRename := false 186 - if topic, ok := r.store.GetTopic(stored.TopicID); ok && topic != nil { 187 - autoRename = shouldAutoRenameConsoleTopic(stored.TopicID, strings.TrimSpace(stored.Task), strings.TrimSpace(topic.Title), r.store.HeartbeatTopicID()) 188 - } 189 - job := consoleLocalTaskJob{ 190 - TaskID: stored.ID, 191 - ConversationKey: buildConsoleConversationKey(stored.TopicID), 192 - TopicID: stored.TopicID, 193 - Task: stored.Task, 194 - Model: stored.Model, 195 - Timeout: parseConsoleTaskTimeout(stored.Timeout, consoleDefaultTimeoutFromReader(r.currentConfigReader())), 196 - CreatedAt: stored.CreatedAt, 197 - Trigger: trigger, 198 - AutoRenameTopic: autoRename, 217 + autoRename := false 218 + if topic, ok := r.store.GetTopic(stored.TopicID); ok && topic != nil { 219 + autoRename = shouldAutoRenameConsoleTopic(stored.TopicID, strings.TrimSpace(stored.Task), strings.TrimSpace(topic.Title), r.store.HeartbeatTopicID()) 220 + } 221 + job = consoleLocalTaskJob{ 222 + TaskID: stored.ID, 223 + ConversationKey: buildConsoleConversationKey(stored.TopicID), 224 + TopicID: stored.TopicID, 225 + Task: stored.Task, 226 + Model: stored.Model, 227 + Timeout: parseConsoleTaskTimeout(stored.Timeout, consoleDefaultTimeoutFromReader(generation.reader)), 228 + CreatedAt: stored.CreatedAt, 229 + Trigger: trigger, 230 + AutoRenameTopic: autoRename, 231 + Generation: generation, 232 + } 199 233 } 200 234 if err := r.runner.Enqueue(ctx, job.ConversationKey, func(version uint64) consoleLocalTaskJob { 201 235 job.Version = version 202 236 return job 203 237 }); err != nil { 238 + if generation != nil { 239 + generation.release() 240 + } 204 241 runtimecore.MarkTaskFailed(r.store, job.TaskID, strings.TrimSpace(err.Error()), daemonruntime.IsContextDeadline(ctx, err)) 205 242 return err 206 243 } 207 244 return nil 245 + } 246 + 247 + func (r *consoleLocalRuntime) takePendingJob(taskID string) (consoleLocalTaskJob, bool) { 248 + if r == nil { 249 + return consoleLocalTaskJob{}, false 250 + } 251 + taskID = strings.TrimSpace(taskID) 252 + if taskID == "" { 253 + return consoleLocalTaskJob{}, false 254 + } 255 + r.pendingJobsMu.Lock() 256 + defer r.pendingJobsMu.Unlock() 257 + job, ok := r.pendingJobs[taskID] 258 + if ok { 259 + delete(r.pendingJobs, taskID) 260 + } 261 + return job, ok 208 262 } 209 263 210 264 func parseConsoleTaskTimeout(raw string, fallback time.Duration) time.Duration {
+83
cmd/mistermorph/consolecmd/local_runtime_test.go
··· 10 10 "time" 11 11 12 12 "github.com/quailyquaily/mistermorph/agent" 13 + busruntime "github.com/quailyquaily/mistermorph/internal/bus" 14 + runtimecore "github.com/quailyquaily/mistermorph/internal/channelruntime/core" 13 15 heartbeatloop "github.com/quailyquaily/mistermorph/internal/channelruntime/heartbeat" 14 16 "github.com/quailyquaily/mistermorph/internal/chathistory" 15 17 "github.com/quailyquaily/mistermorph/internal/daemonruntime" 16 18 "github.com/quailyquaily/mistermorph/internal/heartbeatutil" 19 + "github.com/spf13/viper" 17 20 ) 18 21 19 22 func TestConsoleLocalRoutesOptionsPoke(t *testing.T) { ··· 29 32 } 30 33 31 34 func TestConsoleLocalRoutesOptionsOverviewHeartbeatRunning(t *testing.T) { 35 + reader := viper.New() 36 + reader.Set("telegram.bot_token", "tg-token") 37 + reader.Set("slack.bot_token", "slack-bot") 38 + reader.Set("slack.app_token", "slack-app") 32 39 rt := &consoleLocalRuntime{ 40 + generation: &consoleLocalRuntimeGeneration{reader: reader}, 33 41 heartbeatState: &heartbeatutil.State{}, 34 42 heartbeatPokeRequests: make(chan heartbeatloop.PokeRequest), 35 43 } ··· 371 379 t.Fatalf("history[last] = %#v, want persisted answer 8 outbound", last) 372 380 } 373 381 } 382 + 383 + func TestConsoleLocalRuntimeHandleConsoleBusInboundUsesPendingJobGeneration(t *testing.T) { 384 + store, err := daemonruntime.NewConsoleFileStore(daemonruntime.ConsoleFileStoreOptions{ 385 + HeartbeatTopicID: "_heartbeat", 386 + Persist: false, 387 + }) 388 + if err != nil { 389 + t.Fatalf("NewConsoleFileStore() error = %v", err) 390 + } 391 + 392 + workerCtx, cancel := context.WithCancel(context.Background()) 393 + defer cancel() 394 + jobs := make(chan consoleLocalTaskJob, 1) 395 + rt := &consoleLocalRuntime{ 396 + store: store, 397 + pendingJobs: map[string]consoleLocalTaskJob{}, 398 + } 399 + rt.runner = runtimecore.NewConversationRunner[string, consoleLocalTaskJob]( 400 + workerCtx, 401 + make(chan struct{}, 1), 402 + 1, 403 + func(_ context.Context, _ string, job consoleLocalTaskJob) { 404 + jobs <- job 405 + }, 406 + ) 407 + 408 + oldReader := viper.New() 409 + oldReader.Set("timeout", "2m") 410 + newReader := viper.New() 411 + newReader.Set("timeout", "9m") 412 + oldGeneration := &consoleLocalRuntimeGeneration{generation: 1, reader: oldReader} 413 + newGeneration := &consoleLocalRuntimeGeneration{generation: 2, reader: newReader} 414 + rt.generation = newGeneration 415 + 416 + oldGeneration.acquire() 417 + job, _, err := rt.acceptTask( 418 + oldGeneration, 419 + "hello", 420 + "", 421 + time.Minute, 422 + "", 423 + "", 424 + daemonruntime.TaskTrigger{Source: "ui", Event: "chat_submit", Ref: "web/console"}, 425 + ) 426 + if err != nil { 427 + t.Fatalf("acceptTask() error = %v", err) 428 + } 429 + rt.pendingJobs[job.TaskID] = job 430 + 431 + err = rt.handleConsoleBusInbound(context.Background(), busruntime.BusMessage{ 432 + Channel: busruntime.ChannelConsole, 433 + Direction: busruntime.DirectionInbound, 434 + CorrelationID: job.TaskID, 435 + }) 436 + if err != nil { 437 + t.Fatalf("handleConsoleBusInbound() error = %v", err) 438 + } 439 + 440 + select { 441 + case queued := <-jobs: 442 + if queued.Generation != oldGeneration { 443 + t.Fatalf("queued.Generation = %#v, want old generation %#v", queued.Generation, oldGeneration) 444 + } 445 + if queued.Timeout != time.Minute { 446 + t.Fatalf("queued.Timeout = %v, want %v", queued.Timeout, time.Minute) 447 + } 448 + queued.Generation.release() 449 + case <-time.After(2 * time.Second): 450 + t.Fatal("timed out waiting for queued job") 451 + } 452 + 453 + if _, ok := rt.pendingJobs[job.TaskID]; ok { 454 + t.Fatalf("pendingJobs[%q] still exists, want removed after enqueue", job.TaskID) 455 + } 456 + }
+123 -69
cmd/mistermorph/consolecmd/managed_runtime.go
··· 33 33 ) 34 34 35 35 type managedRuntimeSupervisor struct { 36 - mu sync.Mutex 37 - kinds []string 38 - configReader *viper.Viper 39 - inspectPrompt bool 40 - inspectRequest bool 41 - localRuntime *consoleLocalRuntime 42 - parentCtx context.Context 43 - cancel context.CancelFunc 44 - onFatal func(error) 45 - generation uint64 36 + mu sync.Mutex 37 + kinds []string 38 + configReader *viper.Viper 39 + pendingPrepared *managedRuntimePrepared 40 + inspectPrompt bool 41 + inspectRequest bool 42 + localRuntime *consoleLocalRuntime 43 + parentCtx context.Context 44 + cancel context.CancelFunc 45 + onFatal func(error) 46 + generation uint64 47 + } 48 + 49 + type managedRuntimePrepared struct { 50 + reader *viper.Viper 51 + kinds []string 52 + children []managedPreparedRuntime 53 + } 54 + 55 + type managedPreparedRuntime struct { 56 + kind string 57 + run func(context.Context) error 58 + cleanup func() 46 59 } 47 60 48 61 type managedRuntimeConfigError struct { ··· 104 117 } 105 118 s.parentCtx = ctx 106 119 s.onFatal = onFatal 107 - return s.startLocked() 120 + if s.pendingPrepared == nil && s.configReader != nil { 121 + prepared, err := s.prepareReloadLocked(s.configReader) 122 + if err != nil { 123 + return err 124 + } 125 + s.pendingPrepared = prepared 126 + } 127 + return s.applyPreparedLocked(s.pendingPrepared) 108 128 } 109 129 110 130 func (s *managedRuntimeSupervisor) ReloadConfig(reader *viper.Viper) error { 111 131 if s == nil { 112 132 return nil 113 133 } 134 + prepared, err := s.PrepareReload(reader) 135 + if err != nil { 136 + return err 137 + } 138 + return s.ApplyPrepared(prepared) 139 + } 140 + 141 + func (s *managedRuntimeSupervisor) PrepareReload(reader *viper.Viper) (*managedRuntimePrepared, error) { 142 + if s == nil { 143 + return &managedRuntimePrepared{}, nil 144 + } 145 + s.mu.Lock() 146 + defer s.mu.Unlock() 147 + return s.prepareReloadLocked(reader) 148 + } 149 + 150 + func (s *managedRuntimeSupervisor) prepareReloadLocked(reader *viper.Viper) (*managedRuntimePrepared, error) { 114 151 if reader == nil { 115 152 reader = viper.GetViper() 116 153 } 117 154 kinds, err := managedRuntimeKindsFromReader(reader) 118 155 if err != nil { 119 - return err 156 + return nil, err 120 157 } 121 - s.mu.Lock() 122 - defer s.mu.Unlock() 123 - s.stopLocked() 124 - s.configReader = reader 125 - s.kinds = append([]string(nil), kinds...) 126 - if s.parentCtx == nil { 127 - return nil 158 + prepared := &managedRuntimePrepared{ 159 + reader: reader, 160 + kinds: append([]string(nil), kinds...), 128 161 } 129 - return s.startLocked() 162 + for _, kind := range kinds { 163 + run, cleanup, err := s.buildRuntime(kind, reader) 164 + if err != nil { 165 + prepared.cleanup() 166 + return nil, err 167 + } 168 + prepared.children = append(prepared.children, managedPreparedRuntime{ 169 + kind: kind, 170 + run: run, 171 + cleanup: cleanup, 172 + }) 173 + } 174 + return prepared, nil 130 175 } 131 176 132 - func (s *managedRuntimeSupervisor) Close() { 133 - if s == nil { 177 + func (p *managedRuntimePrepared) cleanup() { 178 + if p == nil { 134 179 return 135 180 } 181 + for _, child := range p.children { 182 + if child.cleanup != nil { 183 + child.cleanup() 184 + } 185 + } 186 + } 187 + 188 + func (s *managedRuntimeSupervisor) ApplyPrepared(prepared *managedRuntimePrepared) error { 189 + if s == nil { 190 + if prepared != nil { 191 + prepared.cleanup() 192 + } 193 + return nil 194 + } 136 195 s.mu.Lock() 137 196 defer s.mu.Unlock() 138 - s.stopLocked() 139 - s.parentCtx = nil 140 - s.onFatal = nil 197 + return s.applyPreparedLocked(prepared) 141 198 } 142 199 143 - func (s *managedRuntimeSupervisor) startLocked() error { 144 - if len(s.kinds) == 0 { 145 - return nil 200 + func (s *managedRuntimeSupervisor) applyPreparedLocked(prepared *managedRuntimePrepared) error { 201 + if prepared == nil { 202 + prepared = &managedRuntimePrepared{reader: viper.New()} 146 203 } 204 + if s.pendingPrepared != nil && s.pendingPrepared != prepared { 205 + s.pendingPrepared.cleanup() 206 + } 207 + s.pendingPrepared = nil 147 208 if s.parentCtx == nil { 148 - return fmt.Errorf("managed runtime supervisor parent context is not set") 209 + s.configReader = prepared.reader 210 + s.kinds = append([]string(nil), prepared.kinds...) 211 + s.pendingPrepared = prepared 212 + return nil 213 + } 214 + s.stopLocked() 215 + s.configReader = prepared.reader 216 + s.kinds = append([]string(nil), prepared.kinds...) 217 + if len(prepared.children) == 0 { 218 + return nil 149 219 } 150 220 runCtx, cancel := context.WithCancel(s.parentCtx) 151 221 s.cancel = cancel 152 222 s.generation++ 153 223 generation := s.generation 154 - for _, kind := range s.kinds { 155 - run, cleanup, err := s.buildRuntimeLocked(kind) 156 - if err != nil { 157 - if isManagedRuntimeConfigError(err) { 158 - s.logger().Warn("managed_runtime_skipped_invalid_config", "kind", kind, "error", err) 159 - if cleanup != nil { 160 - cleanup() 161 - } 162 - if s.localRuntime != nil { 163 - s.localRuntime.SetManagedRuntimeRunning(kind, false) 164 - } 165 - continue 166 - } 167 - cancel() 168 - s.cancel = nil 169 - if s.localRuntime != nil { 170 - for _, item := range s.kinds { 171 - s.localRuntime.SetManagedRuntimeRunning(item, false) 172 - } 173 - } 174 - return err 175 - } 224 + for _, child := range prepared.children { 176 225 if s.localRuntime != nil { 177 - s.localRuntime.SetManagedRuntimeRunning(kind, true) 226 + s.localRuntime.SetManagedRuntimeRunning(child.kind, true) 178 227 } 179 - go s.runManagedRuntime(runCtx, generation, kind, run, cleanup) 228 + go s.runManagedRuntime(runCtx, generation, child.kind, child.run, child.cleanup) 180 229 } 181 230 return nil 182 231 } 183 232 233 + func (s *managedRuntimeSupervisor) Close() { 234 + if s == nil { 235 + return 236 + } 237 + s.mu.Lock() 238 + defer s.mu.Unlock() 239 + s.stopLocked() 240 + if s.pendingPrepared != nil { 241 + s.pendingPrepared.cleanup() 242 + s.pendingPrepared = nil 243 + } 244 + s.parentCtx = nil 245 + s.onFatal = nil 246 + } 247 + 184 248 func (s *managedRuntimeSupervisor) stopLocked() { 185 249 if s.cancel != nil { 186 250 s.cancel() ··· 193 257 } 194 258 } 195 259 196 - func (s *managedRuntimeSupervisor) buildRuntimeLocked(kind string) (func(context.Context) error, func(), error) { 197 - reader := s.currentConfigReaderLocked() 260 + func (s *managedRuntimeSupervisor) buildRuntime(kind string, reader *viper.Viper) (func(context.Context) error, func(), error) { 261 + if reader == nil { 262 + reader = viper.GetViper() 263 + } 198 264 runtimeValues := llmutil.RuntimeValuesFromReader(reader) 199 265 switch kind { 200 266 case managedRuntimeTelegram: ··· 271 337 } 272 338 273 339 func (s *managedRuntimeSupervisor) logger() *slog.Logger { 274 - if s != nil && s.localRuntime != nil && s.localRuntime.logger != nil { 275 - return s.localRuntime.logger 340 + if s != nil && s.localRuntime != nil { 341 + return s.localRuntime.currentLogger() 276 342 } 277 343 return slog.Default() 278 344 } ··· 286 352 return normalizeManagedRuntimeKinds(r.GetStringSlice("console.managed_runtimes")) 287 353 } 288 354 289 - func isManagedRuntimeConfigError(err error) bool { 290 - var target managedRuntimeConfigError 291 - return errors.As(err, &target) 292 - } 293 - 294 355 func newManagedRuntimeTaskStore(kind string, maxItems int) (daemonruntime.TaskView, error) { 295 356 switch kind { 296 357 case managedRuntimeTelegram, managedRuntimeSlack: ··· 328 389 s.mu.Lock() 329 390 defer s.mu.Unlock() 330 391 return s.generation == generation 331 - } 332 - 333 - func (s *managedRuntimeSupervisor) currentConfigReaderLocked() *viper.Viper { 334 - if s != nil && s.configReader != nil { 335 - return s.configReader 336 - } 337 - return viper.GetViper() 338 392 } 339 393 340 394 func buildManagedRuntimeDepsFromReader(logger *slog.Logger, reader *viper.Viper) (depsutil.CommonDependencies, func()) {
+24 -11
cmd/mistermorph/consolecmd/managed_runtime_test.go
··· 1 1 package consolecmd 2 2 3 3 import ( 4 - "context" 5 4 "testing" 6 5 7 6 "github.com/spf13/viper" 8 7 ) 9 8 10 - func TestManagedRuntimeSupervisorStartSkipsConfigError(t *testing.T) { 11 - viper.Reset() 12 - t.Cleanup(viper.Reset) 13 - viper.Set("console.managed_runtimes", []string{"telegram"}) 9 + func TestManagedRuntimeSupervisorReloadRejectsInvalidConfigWithoutMutatingState(t *testing.T) { 10 + local := &consoleLocalRuntime{managedRuntimeRunning: map[string]bool{}} 11 + local.SetManagedRuntimeRunning("telegram", true) 12 + supervisor := newManagedRuntimeSupervisor(local, false, false) 13 + 14 + current := viper.New() 15 + current.Set("console.managed_runtimes", []string{"telegram"}) 16 + current.Set("telegram.bot_token", "old-token") 17 + supervisor.configReader = current 18 + supervisor.kinds = []string{"telegram"} 19 + 20 + next := viper.New() 21 + next.Set("console.managed_runtimes", []string{"telegram"}) 14 22 15 - supervisor := newManagedRuntimeSupervisor(nil, false, false) 16 - if err := supervisor.ReloadConfig(viper.GetViper()); err != nil { 17 - t.Fatalf("ReloadConfig() error = %v, want nil", err) 23 + err := supervisor.ReloadConfig(next) 24 + if err == nil { 25 + t.Fatal("ReloadConfig() error = nil, want invalid config error") 26 + } 27 + if got := supervisor.configReader.GetString("telegram.bot_token"); got != "old-token" { 28 + t.Fatalf("configReader.telegram.bot_token = %q, want %q", got, "old-token") 29 + } 30 + if len(supervisor.kinds) != 1 || supervisor.kinds[0] != "telegram" { 31 + t.Fatalf("kinds = %#v, want [telegram]", supervisor.kinds) 18 32 } 19 - if err := supervisor.Start(context.Background(), nil); err != nil { 20 - t.Fatalf("Start() error = %v, want nil", err) 33 + if !local.isManagedRuntimeRunning("telegram") { 34 + t.Fatal("telegram running = false, want unchanged true") 21 35 } 22 - supervisor.Close() 23 36 } 24 37 25 38 func TestManagedRuntimeKindsFromReaderRejectsUnsupportedValue(t *testing.T) {
+25 -4
cmd/mistermorph/consolecmd/serve.go
··· 460 460 } 461 461 462 462 func (s *server) logger() *slog.Logger { 463 - if s != nil && s.localRuntime != nil && s.localRuntime.logger != nil { 464 - return s.localRuntime.logger 463 + if s != nil && s.localRuntime != nil { 464 + return s.localRuntime.currentLogger() 465 465 } 466 466 return slog.Default() 467 467 } ··· 518 518 if err != nil { 519 519 return err 520 520 } 521 + var preparedLocal *consoleLocalRuntimeGeneration 521 522 if s.localRuntime != nil { 522 - if err := s.localRuntime.ReloadAgentConfigFromReader(reader); err != nil { 523 + preparedLocal, err = s.localRuntime.prepareGeneration(reader) 524 + if err != nil { 523 525 return err 524 526 } 525 527 } 528 + var preparedManaged *managedRuntimePrepared 526 529 if s.managed != nil { 527 - if err := s.managed.ReloadConfig(reader); err != nil { 530 + preparedManaged, err = s.managed.PrepareReload(reader) 531 + if err != nil { 532 + if preparedLocal != nil { 533 + preparedLocal.cleanupNow() 534 + } 535 + return err 536 + } 537 + } 538 + if preparedLocal != nil { 539 + if err := s.localRuntime.applyPreparedGeneration(preparedLocal); err != nil { 540 + preparedLocal.cleanupNow() 541 + if preparedManaged != nil { 542 + preparedManaged.cleanup() 543 + } 544 + return err 545 + } 546 + } 547 + if preparedManaged != nil { 548 + if err := s.managed.ApplyPrepared(preparedManaged); err != nil { 528 549 return err 529 550 } 530 551 }
+2 -1
cmd/mistermorph/consolecmd/setup_repair.go
··· 8 8 "path/filepath" 9 9 "strings" 10 10 11 + "github.com/quailyquaily/mistermorph/internal/fsstore" 11 12 "github.com/quailyquaily/mistermorph/internal/onboardingcheck" 12 13 "github.com/quailyquaily/mistermorph/internal/pathutil" 13 14 "github.com/spf13/viper" ··· 115 116 if st, err := os.Stat(path); err == nil { 116 117 mode = st.Mode().Perm() 117 118 } 118 - return os.WriteFile(path, content, mode) 119 + return fsstore.WriteTextAtomic(path, string(content), fsstore.FileOptions{FilePerm: mode}) 119 120 }
+6 -5
cmd/mistermorph/consolecmd/streaming.go
··· 325 325 writeError(w, http.StatusInternalServerError, "failed to create stream ticket") 326 326 return 327 327 } 328 - if s.localRuntime != nil && s.localRuntime.logger != nil { 329 - s.localRuntime.logger.Debug("console_stream_ticket_created", 328 + if s.localRuntime != nil { 329 + s.localRuntime.currentLogger().Debug("console_stream_ticket_created", 330 330 "expires_at", expiresAt.Format(time.RFC3339Nano), 331 331 ) 332 332 } ··· 368 368 return 369 369 } 370 370 defer conn.Close() 371 - if s.localRuntime != nil && s.localRuntime.logger != nil { 372 - s.localRuntime.logger.Info("console_stream_ws_connected", 371 + if s.localRuntime != nil { 372 + logger := s.localRuntime.currentLogger() 373 + logger.Info("console_stream_ws_connected", 373 374 "task_id", taskID, 374 375 "remote_addr", strings.TrimSpace(r.RemoteAddr), 375 376 ) 376 - defer s.localRuntime.logger.Info("console_stream_ws_disconnected", 377 + defer logger.Info("console_stream_ws_disconnected", 377 378 "task_id", taskID, 378 379 "remote_addr", strings.TrimSpace(r.RemoteAddr), 379 380 )
+254 -108
docs/feat/feat_20260418_console_config_runtime_snapshots.md
··· 25 25 26 26 这带来几个结构性问题: 27 27 28 - 1. Web API 同时承担了“持久化配置”和“驱动运行时更新”两种职责。 29 - 2. runtime 不是围绕不可变快照工作,而是在多个阶段反复读取进程级可变配置。 30 - 3. 外部直接修改 `config.yaml` 时,没有一个清晰、统一、自动的快照重建路径。 31 - 4. 并发语义不清晰:运行中的任务、heartbeat、managed runtime 到底看到的是哪一版配置,并没有明确定义。 32 - 5. 全局 `viper` 在运行期仍会被再次写 defaults,已经出现真实 panic。 28 + 1. Web API 同时承担了“持久化配置”和“驱动运行时更新”两种职责。 29 + 2. runtime 不是围绕不可变快照工作,而是在多个阶段反复读取进程级可变配置。 30 + 3. 外部直接修改 `config.yaml` 时,没有一个清晰、统一、自动的快照重建路径。 31 + 4. 并发语义不清晰:运行中的任务、heartbeat、managed runtime 到底看到的是哪一版配置,并没有明确定义。 32 + 5. 全局 `viper` 在运行期仍会被再次写 defaults,已经出现真实 panic。 33 33 34 34 与之相对,`integration.Runtime` 已经采用了更合理的模型: 35 35 ··· 64 64 65 65 因此,新方案必须显式解决两件事: 66 66 67 - 1. 默认值应用只能发生在受控的 snapshot 构建阶段,不能在 runtime 热路径里再次写全局 `viper`。 68 - 2. runtime 运行时读取必须切到只读 snapshot,而不是继续共享读写同一个进程级 `viper`。 67 + 1. 默认值应用只能发生在受控的 snapshot 构建阶段,不能在 runtime 热路径里再次写全局 `viper`。 68 + 2. runtime 运行时读取必须切到只读 snapshot,而不是继续共享读写同一个进程级 `viper`。 69 69 70 70 ## 2) 设计原则 71 71 72 72 本次重构建议明确采用以下原则: 73 73 74 - 1. `config.yaml` 是持久化真相,不是运行时对象。 75 - 2. runtime 只消费“配置快照”,不直接依赖全局 `viper` 的即时值。 76 - 3. 当 `config.yaml` 变化时,系统重新生成一份新快照。 77 - 4. 每个 runtime 自己决定如何原子切换到新快照,并自己保证并发安全。 78 - 5. Console Web API 只负责更新 `config.yaml`,不直接负责更新快照。 79 - 6. 无效配置不能污染当前运行中的最后一份有效快照。 74 + 1. `config.yaml` 是持久化真相,不是运行时对象。 75 + 2. runtime 只消费“配置快照”,不直接依赖全局 `viper` 的即时值。 76 + 3. 当 `config.yaml` 变化时,系统重新生成一份新快照。 77 + 4. 每个 runtime 自己决定如何原子切换到新快照,并自己保证并发安全。 78 + 5. Console Web API 只负责更新 `config.yaml`,不直接负责更新快照。 79 + 6. 无效配置不能污染当前运行中的最后一份有效快照。 80 80 81 81 核心结论就是一句话: 82 82 ··· 86 86 87 87 本方案的目标是: 88 88 89 - 1. 让 `console serve` 先运行在“配置文件 -> 快照 -> runtime”模型上。 90 - 2. 让 `consoleLocalRuntime`、`managedRuntimeSupervisor`、未来其他长生命周期 runtime 都以 snapshot 为输入。 91 - 3. 让配置变更无论来自 Web API 还是外部编辑器,都走同一条快照重建路径。 92 - 4. 明确运行中任务与新任务对配置版本的可见性规则。 93 - 5. 收缩全局 `viper` 在 Console 进程中的职责,使其主要退回到“进程启动参数 + config 路径发现”。 94 - 6. 消除“运行期再次 `SetDefault` 全局 `viper`”这一类并发 panic 根因。 89 + 1. 让 `console serve` 先运行在“配置文件 -> 快照 -> runtime”模型上。 90 + 2. 让 `consoleLocalRuntime`、`managedRuntimeSupervisor`、未来其他长生命周期 runtime 都以 snapshot 为输入。 91 + 3. 让配置变更无论来自 Web API 还是外部编辑器,都走同一条快照重建路径。 92 + 4. 明确运行中任务与新任务对配置版本的可见性规则。 93 + 5. 收缩全局 `viper` 在 Console 进程中的职责,使其主要退回到“进程启动参数 + config 路径发现”。 94 + 6. 消除“运行期再次 `SetDefault` 全局 `viper`”这一类并发 panic 根因。 95 95 96 96 这里补一个范围约束: 97 97 ··· 103 103 104 104 本次方案不打算: 105 105 106 - 1. 重做 `config.yaml` 的字段结构。 107 - 2. 立即消灭整个仓库里所有 `FromViper()` 辅助函数。 108 - 3. 在第一阶段改造所有 CLI 子命令;首要范围是 `console serve`。 109 - 4. 改变外部 `/settings/*` API 的基本输入输出形状。 110 - 5. 在当前阶段引入一个很泛的订阅框架或跨进程配置分发体系。 106 + 1. 重做 `config.yaml` 的字段结构。 107 + 2. 立即消灭整个仓库里所有 `FromViper()` 辅助函数。 108 + 3. 在第一阶段改造所有 CLI 子命令;首要范围是 `console serve`。 109 + 4. 改变外部 `/settings/*` API 的基本输入输出形状。 110 + 5. 在当前阶段引入一个很泛的订阅框架或跨进程配置分发体系。 111 111 112 112 ## 5) 当前问题的本质 113 113 ··· 142 142 143 143 只要这两点不改,哪怕把现有 reload 流程整理得更漂亮,类似竞争条件仍然会反复出现。 144 144 145 + ## 5.1) 当前已经确认的问题 146 + 147 + 经过这轮实现和 review,已经确认当前问题不只是在“reload 分散”,而是在运行时语义上还没有真正形成 generation 边界。 148 + 149 + ### A. 一个 task 在运行中会混用多代配置 150 + 151 + 当前 `consoleLocalRuntime` 在 task 路径里并没有在入口处一次性冻结 generation。 152 + 153 + 实际行为是: 154 + 155 + - 提交阶段会从当前 reader 计算 timeout / model command 156 + - 执行阶段又会重新读取当前 reader 157 + - 真正调用 `taskRuntime.Run(...)` 前再重新读取当前 bundle 158 + - 异步的 topic title 生成还会再次读取当前 `commonDeps` 159 + 160 + 这意味着只要 reload 发生在这些步骤之间,同一个 task 就可能出现: 161 + 162 + - model 取自一代 163 + - memory 开关取自另一代 164 + - tools / MCP / guard 再取自第三代 165 + 166 + 这和“next-generation 只影响新任务”的目标是直接冲突的。 167 + 168 + ### B. 旧 bundle 会被过早清理 169 + 170 + 当前 local runtime reload 在替换 bundle 后,立即关闭旧 `mcpHost`。 171 + 172 + 但运行中的 task 会继续持有旧 `taskRuntime.BaseRegistry`,而 MCP tool adapter 绑定的是旧 session。 173 + 174 + 结果就是: 175 + 176 + - 旧任务看起来还在跑 177 + - 但它后续再调用 MCP tool 时,session 可能已经被关掉 178 + 179 + 这说明“替换当前 bundle”和“回收旧 generation 资源”不是同一个时刻的动作。 180 + 181 + ### C. memory runtime 还不是 generation 资源 182 + 183 + 当前 `memRuntime` 只在启动时创建一次,后续 reload 不会重建。 184 + 185 + 因此: 186 + 187 + - `memory.enabled: false -> true` 不会真正生效 188 + - `memory.short_term_days` 这类初始化参数不会更新 189 + - 当前 reader 和底层 memory 实例之间可能来自不同 generation 190 + 191 + ### D. managed runtime reload 不是事务性的 192 + 193 + 当前 `managedRuntimeSupervisor.ReloadConfig()` 的顺序是: 194 + 195 + 1. 先停旧 child 196 + 2. 再尝试按新 reader 启动 197 + 198 + 这样一来,只要新配置暂时不完整,例如 live edit 时 token 少了一半,就会出现: 199 + 200 + - 旧 telegram/slack 已停 201 + - 新 child 又起不来 202 + - 进程要等下一次成功 reload 才恢复 203 + 204 + 这不符合“坏配置不能污染当前最后一份有效运行态”的原则。 205 + 206 + ### E. 配置写入不是原子替换 207 + 208 + 当前 settings / repair 路径直接 `os.WriteFile(...)` 覆盖 `config.yaml`。 209 + 210 + 配合轮询 reload,会放大两个问题: 211 + 212 + - watcher 可能读到中间态文件 213 + - managed reload 更容易被临时坏配置打断 214 + 215 + 也就是说,即使 runtime generation 做对了,配置发布动作本身也应该是原子的。 216 + 217 + ### F. 进程级对象与 generation 对象的边界还没定清 218 + 219 + 当前还有一批对象既不像纯启动期对象,也没有真正并入 generation: 220 + 221 + - `logger` 222 + - `ConsoleFileStore` 的 `persist` / `heartbeat_topic_id` 223 + - local endpoint handler / auth token 224 + - `bus.max_inflight` 对应的 inproc bus 225 + 226 + 这些对象如果不先明确分类,后面还会不断出现“reader 变了,但实例没变”的同类问题。 227 + 228 + ## 5.2) 根因 229 + 230 + 根因可以压成一句话: 231 + 232 + > 现在代码实现的是“reader snapshot”,不是“runtime generation”。 233 + 234 + 真正需要被冻结的不是一份 `viper` reader,而是一整套运行时对象: 235 + 236 + - logger / log options 237 + - route resolver / client factory / prompt deps 238 + - task runtime bundle 239 + - MCP host 240 + - memory runtime 241 + - 以及这个 generation 的 cleanup 逻辑 242 + 243 + 只有把这些东西作为一个整体,在 task 进入队列时一次性绑定,`next-generation` 语义才真的成立。 244 + 145 245 ## 6) 总体方案 146 246 147 247 ## 6.1 新增统一的配置快照管理层 ··· 169 269 170 270 它负责: 171 271 172 - 1. 按 repo 级统一规则解析配置路径。 173 - 2. 用独立的临时 `viper` 读取并展开 `config.yaml`。 174 - 3. 应用 defaults、环境变量展开、必要的 normalize。 175 - 4. 生成一份不可变的 `ProcessConfigSnapshot`。 176 - 5. 给这份快照分配 `generation`、`loaded_at`、`source_path`、`content_hash`。 177 - 6. 在配置无效时保留“最后一份有效快照”,并记录最近一次加载失败。 272 + 1. 按 repo 级统一规则解析配置路径。 273 + 2. 用独立的临时 `viper` 读取并展开 `config.yaml`。 274 + 3. 应用 defaults、环境变量展开、必要的 normalize。 275 + 4. 生成一份不可变的 `ProcessConfigSnapshot`。 276 + 5. 给这份快照分配 `generation`、`loaded_at`、`source_path`、`content_hash`。 277 + 6. 在配置无效时保留“最后一份有效快照”,并记录最近一次加载失败。 178 278 179 279 其中第 1 条建议明确固定为: 180 280 ··· 245 345 246 346 建议在 `console serve` 内部引入文件变更监听: 247 347 248 - 1. watcher 绑定的是 canonical config resolver 语义,而不是某一次启动时碰巧命中的单一路径。 249 - 2. 如果显式指定了 `--config`,就监听该路径。 250 - 3. 如果没有显式路径,就按 `./config.yaml` -> `~/.morph/config.yaml` 的候选顺序解析当前 source,并在 source 切换时重新绑定 watcher。 251 - 4. 使用 debounce + content hash 去抖。 252 - 5. 文件变化后重新加载 snapshot。 253 - 6. 解析成功则发布新 generation。 254 - 7. 解析失败则保留旧 generation,并暴露错误状态。 348 + 1. watcher 绑定的是 canonical config resolver 语义,而不是某一次启动时碰巧命中的单一路径。 349 + 2. 如果显式指定了 `--config`,就监听该路径。 350 + 3. 如果没有显式路径,就按 `./config.yaml` -> `~/.morph/config.yaml` 的候选顺序解析当前 source,并在 source 切换时重新绑定 watcher。 351 + 4. 使用 debounce + content hash 去抖。 352 + 5. 文件变化后重新加载 snapshot。 353 + 6. 解析成功则发布新 generation。 354 + 7. 解析失败则保留旧 generation,并暴露错误状态。 255 355 256 356 实现上可以用: 257 357 ··· 304 404 - heartbeat 相关运行参数 305 405 - 默认 provider/model 视图 306 406 407 + 更重要的是,`consoleLocalRuntime` 内部要把这些配置真正收束成一个 generation 对象,而不是把 reader 暴露给 task 路径继续临时读取。 408 + 409 + 建议内部形态接近: 410 + 411 + ```go 412 + type consoleLocalRuntimeGeneration struct { 413 + generation uint64 414 + reader *viper.Viper 415 + logger *slog.Logger 416 + commonDeps depsutil.CommonDependencies 417 + bundle *consoleLocalRuntimeBundle 418 + memory runtimecore.MemoryRuntime 419 + cleanup func() 420 + } 421 + ``` 422 + 423 + 然后让每个 `consoleLocalTaskJob` 在入队时显式绑定一个 generation: 424 + 425 + ```go 426 + type consoleLocalTaskJob struct { 427 + ... 428 + Generation *consoleLocalRuntimeGeneration 429 + } 430 + ``` 431 + 307 432 切换策略建议是: 308 433 309 - 1. 先用新 snapshot 在堆上完整构建新 bundle。 310 - 2. 构建成功后再原子替换当前 bundle。 311 - 3. 替换完成后清理旧 bundle 的 MCP host 等资源。 312 - 4. 运行中的 task 继续持有旧 bundle 直到完成;新 task 使用新 bundle。 434 + 1. 先用新 snapshot 在堆上完整构建新 generation。 435 + 2. 构建成功后再原子替换 `currentGeneration`。 436 + 3. 新提交 task 从 `currentGeneration` 抓一个 generation 引用后再入队。 437 + 4. task 执行期间只允许使用 job 自己绑定的 generation。 438 + 5. 旧 generation 进入 retired 状态,但不立即 cleanup。 439 + 6. 只有当旧 generation 的引用计数归零时,才真正关闭 MCP host / memory runtime 等资源。 313 440 314 441 这会形成清晰语义: 315 442 ··· 328 455 329 456 建议拆成两层: 330 457 331 - 1. `managedRuntimeSupervisor.ApplySnapshot(snap ManagedRuntimeSnapshot)` 458 + 1. `managedRuntimeSupervisor.ApplySnapshot(snap ManagedRuntimeSnapshot)` 332 459 2. 每个 child runtime 自己维护自己的 active snapshot / run generation 333 460 334 461 `ManagedRuntimeSnapshot` 至少应包含: ··· 339 466 340 467 supervisor 的职责应该变成: 341 468 342 - 1. 对比旧 snapshot 和新 snapshot。 343 - 2. 识别: 344 - - 新增 kind 345 - - 删除 kind 346 - - 同 kind 配置是否变化 347 - 3. 针对变化的 child 做最小化重建或重启。 469 + 1. 先基于新 snapshot 预构建并验证下一代 child runtime。 470 + 2. 只有全部准备完成后,才切换当前运行代。 471 + 3. 切换成功后再停旧 child。 472 + 4. 若新配置无效,则拒绝发布新 generation,保留旧 child 继续运行,并记录错误。 348 473 349 474 这里建议明确两种配置变化: 350 475 351 - 1. transport 变化 352 - - 如 token、poll/socket 参数、allowed IDs、group trigger mode 476 + 1. transport 变化 477 + - 如 token、poll/socket 参数、allowed IDs、group trigger mode 353 478 - 需要重启 child loop 354 - 2. task-execution 变化 355 - - 如 llm、tools、guard、skills 479 + 2. task-execution 变化 480 + - 如 llm、tools、guard、skills 356 481 - 可以通过 child 内部 bundle snapshot 切换解决 357 482 358 - 如果第一阶段不想做那么细,也可以先统一为“child 全量重启”,但接口边界应先设计成 snapshot 驱动,避免以后继续和 `viper` 绑死。 483 + 如果第一阶段不想做那么细,也可以先统一为“child 全量重启”,但也必须保持这条语义: 484 + 485 + - 验证失败时,旧 child 不停 486 + - 只有下一代可运行时,才允许切换 359 487 360 488 ## 7.3 其他长生命周期逻辑 361 489 ··· 366 494 - setup repair 之后的运行时可见状态 367 495 - health / diagnostics 页面对当前 generation 的展示 368 496 497 + 但这里还要再补一个分类原则: 498 + 499 + - `task-bound`:必须跟随 job generation 冻结 500 + 例如 LLM route、tools、MCP、memory、topic title LLM 调用 501 + - `runtime-current`:允许在 generation 切换后立刻影响后续新任务或新 tick 502 + 例如 heartbeat scheduler interval、health/overview 展示 503 + - `process-bound`:如果暂时不做热更新,就必须在文档里明确写成 boot-only 504 + 否则就要补对应的动态更新路径 505 + 506 + 当前这条分类规则没有先定,所以实现里才会混出很多半热更新状态。 507 + 369 508 ## 8) Web API 职责重划 370 509 371 510 重构后,`PUT /api/settings/agent`、`PUT /api/settings/console`、setup repair 的职责都应该收缩为: 372 511 373 - 1. 读取当前 `config.yaml` 文档 512 + 1. 读取当前 `config.yaml` 文档 374 513 2. 合并 patch 375 514 3. 校验 YAML / 结构有效性 376 515 4. 写回 `config.yaml` ··· 441 580 442 581 这样前端可以区分三种状态: 443 582 444 - 1. 配置已写入且已生效 445 - 2. 配置已写入但新快照尚未应用 446 - 3. 配置已写入但快照生成失败,当前仍运行在旧快照上 583 + 1. 配置已写入且已生效 584 + 2. 配置已写入但新快照尚未应用 585 + 3. 配置已写入但快照生成失败,当前仍运行在旧快照上 447 586 448 587 这对于“Web API 只写文件”的模型很重要,否则用户会误以为写入成功就等于运行时已生效。 449 588 ··· 451 590 452 591 建议按下面的顺序改,风险最小。 453 592 454 - ### Phase 1:引入快照管理器,但先不改外部行为 593 + ### Phase 1:先把 generation 边界写死,再改实现 455 594 456 - 1. 新增 repo 级共享的 config path resolver,明确 read/write 两种 mode 和 `--config` -> `./config.yaml` -> `~/.morph/config.yaml` 的顺序。 457 - 2. 新增 `ConfigManager` 和 `ProcessConfigSnapshot`。 458 - 3. 启动时加载第一版 snapshot。 459 - 4. 把 shared defaults authority 收敛到 `internal/configdefaults.Apply`;`integration.ApplyViperDefaults` 若保留,则先调用前者。 460 - 5. manager 内先挂 `consoleLocalRuntime` 与 `managedRuntimeSupervisor` 两个 consumer。 461 - 6. 把 `cmd/mistermorph/registry.go` 那条 `configdefaults.Apply(viper.GetViper())` 调用链移回主启动路径,只在进程初始化阶段执行一次。 462 - 7. runtime 路径后续彻底不再碰 `SetDefault(...)` 或其他全局 defaults 写入。 463 - 8. 加入 watcher,但先只做日志和状态记录。 464 - 9. 在此基础上,暂时保留现有 handler 中的 `viper.Set + reload` 逻辑,确保行为不变。 595 + 1. 明确 `next-generation` 语义:新配置只影响后续新任务。 596 + 2. 列出 local runtime / managed runtime / heartbeat / settings 写入 的生命周期边界。 597 + 3. 给这些语义补测试目标,先把预期行为钉住。 465 598 466 599 目标: 467 600 468 - - 先把“统一快照生成”做出来 469 - - 先统一 defaults authority 和 config path 语义 470 - - 先消掉已知的全局 `viper` 并发写 panic 471 - - phase 1 的最小可测成果是:运行期不再存在 `configdefaults.Apply(viper.GetViper())` 472 - - 避免一上来同时改配置加载和 runtime 生命周期 601 + - 不再把 reader 当成 generation 602 + - 不再让 task 路径继续偷看“当前配置” 473 603 474 - ### Phase 2:让 `consoleLocalRuntime` 支持 `ApplySnapshot` 604 + ### Phase 2:把 `consoleLocalRuntime` 改成真正的 generation 模型 475 605 476 - 1. 把 `ReloadAgentConfig()` 改造成基于显式 snapshot 的重建函数。 477 - 2. bundle 改为原子替换。 478 - 3. task 执行路径显式绑定提交时拿到的 bundle。 606 + 1. 引入 `consoleLocalRuntimeGeneration`。 607 + 2. 让 `acceptTask()` / bus inbound / heartbeat enqueue 在入队时显式绑定 generation。 608 + 3. 让 `runTask()` / event preview / topic title 只使用 job 自己的 generation。 609 + 4. 把 memory runtime 也并入 generation。 610 + 5. 用延迟 cleanup 解决旧 MCP host / memory runtime 的回收时机。 479 611 480 612 目标: 481 613 482 - - local runtime 先脱离全局 `viper` 614 + - 旧任务继续跑完 615 + - 新任务使用新配置 616 + - 不再出现 task 混用多代配置 483 617 484 - ### Phase 3:让 `managedRuntimeSupervisor` 支持 `ApplySnapshot` 618 + ### Phase 3:把 local runtime 的配套对象补齐 485 619 486 - 1. supervisor 从 snapshot diff 决定 child 的启停。 487 - 2. child runtime 不再直接读全局 `viper`。 488 - 3. 把 `UpdateKinds()` / `Restart()` 逐步收缩为内部实现细节。 620 + 1. 处理 logger、overview、heartbeat loop、local endpoint 等 current-generation 读路径。 621 + 2. 明确 `ConsoleFileStore`、handler、auth token、task persistence、heartbeat topic 的更新语义。 622 + 3. 把配置写入改成原子替换,避免 poller 读到中间态。 489 623 490 624 目标: 491 625 492 - - managed runtime 也改为 snapshot 驱动 626 + - local runtime 侧所有热更新语义一致 627 + - 不再存在“reader 已变、实例没变”的明显断层 493 628 494 - ### Phase 4:删除 Web API 中的运行时副作用 629 + ### Phase 4:把 `managedRuntimeSupervisor` 改成验证后切换 495 630 496 - 1. 删除 `agent_settings.go` 中的 `viper.Set + ReloadAgentConfig + managed.Restart`。 497 - 2. 删除 `console_settings.go` 中的 `viper.Set + UpdateKinds/ReloadAgentConfig`。 498 - 3. 删除 `setup_repair.go` 中的对应 reload 链路。 499 - 4. watcher 检测到文件变化后统一发布新 snapshot。 631 + 1. supervisor 从 snapshot diff 决定 child 的启停。 632 + 2. 先验证并预构建下一代,再切换。 633 + 3. 配置无效时拒绝切换,保留当前 child 继续跑。 634 + 4. child runtime 不再直接读全局 `viper`。 500 635 501 636 目标: 502 637 503 - - 完成职责切分 638 + - 坏配置不再把现有 telegram/slack 拉下来 639 + - managed runtime 也对齐到 generation 语义 504 640 505 - ### Phase 5:清理与测试回收 641 + ### Phase 5:收口到统一的快照发布路径 506 642 507 - 1. 调整依赖全局 `viper` 的单元测试。 508 - 2. 给 snapshot generation / watcher / apply semantics 补测试。 509 - 3. 视情况新增显式“重新加载配置”命令或调试入口,仅作为辅助,不作为主路径。 643 + 1. Web API / setup repair 只负责写 `config.yaml`。 644 + 2. watcher 或统一 reload 路径负责发布新 generation。 645 + 3. 调整依赖旧 reload 语义的测试与文档。 646 + 647 + 目标: 648 + 649 + - 配置编辑与运行时切换彻底解耦 510 650 511 651 ## 12) 测试建议 512 652 513 653 至少补以下测试: 514 654 515 - 1. `config.yaml` 变化后会产生新的 snapshot generation。 516 - 2. 环境变量展开发生在 snapshot 构建阶段,而不是 handler 的特殊逻辑里。 517 - 3. 无效 YAML 不会替换当前有效 snapshot。 518 - 4. `consoleLocalRuntime` 切换 snapshot 时,旧任务继续跑,新任务用新配置。 519 - 这条不能只靠纯单元测试,至少要有一个集成测试,跑真实 goroutine 和 bundle 切换时序。 520 - 5. `managedRuntimeSupervisor` 能正确处理 kind 增删与配置变更。 521 - 6. `PUT /settings/*` 成功后,即使不直接调用 reload,最终也能通过 watcher 生效。 522 - 7. 并发启动 telegram / heartbeat / registry / logging 读取路径时,不会再因为运行期 `SetDefault` 全局 `viper` 而触发 data race 或 `concurrent map read and map write`。 655 + 1. `config.yaml` 变化后会产生新的 snapshot generation。 656 + 2. 环境变量展开发生在 snapshot 构建阶段,而不是 handler 的特殊逻辑里。 657 + 3. 无效 YAML 不会替换当前有效 snapshot。 658 + 4. `consoleLocalRuntime` 切换 snapshot 时,旧任务继续跑,新任务用新配置。 659 + 这条不能只靠纯单元测试,至少要有一个集成测试,跑真实 goroutine 和 bundle 切换时序。 660 + 5. 同一个 task 在执行期间不会混用多代 generation。 661 + 至少覆盖:model / memory / taskRuntime / topic title 这几条路径。 662 + 6. 旧 generation 的 MCP host 不会在仍有 task 引用时被提前关闭。 663 + 7. `memory.enabled`、`memory.short_term_days` 在下一代新任务上正确生效。 664 + 8. `managedRuntimeSupervisor` 在新配置无效时保留当前 child 继续运行。 665 + 9. settings / repair 对 `config.yaml` 的写入是原子替换,poller 不会吃到中间态。 666 + 10. `managedRuntimeSupervisor` 能正确处理 kind 增删与配置变更。 667 + 11. `PUT /settings/*` 成功后,即使不直接调用 reload,最终也能通过 watcher 生效。 668 + 12. 并发启动 telegram / heartbeat / registry / logging 读取路径时,不会再因为运行期 `SetDefault` 全局 `viper` 而触发 data race 或 `concurrent map read and map write`。 523 669 524 670 ## 13) 后续扩展(暂不纳入当前需求) 525 671 526 672 以下方向可以保留为后续扩展,但不建议现在一起做: 527 673 528 - 1. 让单独启动的 `telegram` / `slack` / `line` / `lark` 进程也统一接入相同的 config manager 形态。 529 - 2. 暴露通用 `Watch/Subscribe` 能力,供进程内更多组件订阅 snapshot 变化。 530 - 3. 在观测面板里按 `local / telegram / slack` 分别展示更细粒度的 `runtime_generation`。 531 - 4. 把目前 console 优先的 snapshot 结构继续抽象成更通用的跨命令配置运行时框架。 674 + 1. 让单独启动的 `telegram` / `slack` / `line` / `lark` 进程也统一接入相同的 config manager 形态。 675 + 2. 暴露通用 `Watch/Subscribe` 能力,供进程内更多组件订阅 snapshot 变化。 676 + 3. 在观测面板里按 `local / telegram / slack` 分别展示更细粒度的 `runtime_generation`。 677 + 4. 把目前 console 优先的 snapshot 结构继续抽象成更通用的跨命令配置运行时框架。 532 678 533 679 ## 14) 风险与取舍 534 680 ··· 546 692 - 暴露 generation/status 547 693 - 前端在写入后短轮询配置状态,直到 generation 更新或返回加载失败 548 694 549 - ### 风险 3:运行中的 bundle 切换容易造成资源泄漏 695 + ### 风险 3:generation 延迟回收容易造成资源泄漏 550 696 551 697 控制方式: 552 698 553 - - 统一 bundle 生命周期 554 - - 所有可关闭资源只挂在 bundle 上 555 - - 原子替换后集中关闭旧 bundle 699 + - 统一 generation 生命周期 700 + - 所有可关闭资源只挂在 generation 上 701 + - 通过引用计数或等价机制延迟回收旧 generation 556 702 557 703 ## 15) 推荐结论 558 704
+7 -1
internal/channelruntime/core/memory.go
··· 2 2 3 3 import ( 4 4 "log/slog" 5 + "strings" 5 6 6 7 "github.com/quailyquaily/mistermorph/internal/channelruntime/depsutil" 7 8 "github.com/quailyquaily/mistermorph/internal/llmutil" ··· 14 15 type MemoryRuntimeOptions struct { 15 16 Enabled bool 16 17 ShortTermDays int 18 + MemoryDir string 17 19 Logger *slog.Logger 18 20 Decorate func(client llm.Client, route llmutil.ResolvedRoute) llm.Client 19 21 } ··· 31 33 if !opts.Enabled { 32 34 return out, nil 33 35 } 34 - mgr := memory.NewManager(statepaths.MemoryDir(), opts.ShortTermDays) 36 + memoryDir := strings.TrimSpace(opts.MemoryDir) 37 + if memoryDir == "" { 38 + memoryDir = statepaths.MemoryDir() 39 + } 40 + mgr := memory.NewManager(memoryDir, opts.ShortTermDays) 35 41 journal := mgr.NewJournal(memory.JournalOptions{}) 36 42 draftResolver, err := memoryruntime.NewConfiguredDraftResolver(memoryruntime.DraftResolverFactoryOptions{ 37 43 ResolveLLMRoute: d.ResolveLLMRoute,
+45
internal/daemonruntime/console_store.go
··· 83 83 if s == nil { 84 84 return "_heartbeat" 85 85 } 86 + s.mu.RLock() 87 + defer s.mu.RUnlock() 86 88 return s.heartbeatTopicID 89 + } 90 + 91 + func (s *ConsoleFileStore) ApplyConfig(opts ConsoleFileStoreOptions) error { 92 + if s == nil { 93 + return fmt.Errorf("console task store is nil") 94 + } 95 + rootDir := strings.TrimSpace(opts.RootDir) 96 + if opts.Persist && rootDir == "" { 97 + return fmt.Errorf("console task store root dir is required") 98 + } 99 + heartbeatTopicID := strings.TrimSpace(opts.HeartbeatTopicID) 100 + if heartbeatTopicID == "" { 101 + heartbeatTopicID = "_heartbeat" 102 + } 103 + now := time.Now().UTC() 104 + 105 + s.mu.Lock() 106 + defer s.mu.Unlock() 107 + 108 + oldRootDir := s.rootDir 109 + oldPersist := s.persist 110 + 111 + s.rootDir = filepath.Clean(rootDir) 112 + s.logDir = filepath.Join(filepath.Clean(rootDir), "log") 113 + s.topicPath = filepath.Join(filepath.Clean(rootDir), "topic.json") 114 + s.heartbeatTopicID = heartbeatTopicID 115 + s.persist = opts.Persist 116 + 117 + if !s.persist { 118 + return nil 119 + } 120 + if err := s.persistTopicsLocked(now); err != nil { 121 + return err 122 + } 123 + if oldPersist && s.rootDir == oldRootDir { 124 + return nil 125 + } 126 + for _, item := range s.items { 127 + if err := s.appendTaskEventLocked(item, now, s.triggerForTaskLocked(item.ID, TaskTrigger{})); err != nil { 128 + return err 129 + } 130 + } 131 + return nil 87 132 } 88 133 89 134 func (s *ConsoleFileStore) CreateTopic(title string) (TopicInfo, error) {