A Kubernetes operator that bridges Hardware Security Module (HSM) data storage with Kubernetes Secrets, providing true secret portability th
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

update logs, add mirror sync via api, try and fix agent mirror syncing to only happen on startup, and every 5min instead of every 30 seconds. add checks for available and lastseen to determine how to reconcile agent nodes

+782 -97
+3
internal/api/proxy_handlers.go
··· 67 67 68 68 // PIN operations 69 69 hsmGroup.POST("/change-pin", s.proxyClient.ChangePIN) 70 + 71 + // Mirror operations 72 + hsmGroup.POST("/mirror/sync", s.handleMirrorSync) 70 73 } 71 74 72 75 // Health and info endpoints can stay local
+55
internal/api/server.go
··· 150 150 s.sendResponse(c, http.StatusOK, "Health check completed", health) 151 151 } 152 152 153 + // MirrorTriggerInterface defines the interface for triggering mirror syncs 154 + type MirrorTriggerInterface interface { 155 + TriggerMirror(reason, source string, force bool) 156 + } 157 + 158 + // Global mirror trigger for API access 159 + var globalMirrorTrigger MirrorTriggerInterface 160 + 161 + // SetMirrorTrigger sets the global mirror trigger for API access 162 + func SetMirrorTrigger(trigger MirrorTriggerInterface) { 163 + globalMirrorTrigger = trigger 164 + } 165 + 166 + // handleMirrorSync triggers a manual mirror synchronization 167 + func (s *Server) handleMirrorSync(c *gin.Context) { 168 + // Parse request body for force flag 169 + var req struct { 170 + Force bool `json:"force,omitempty"` 171 + } 172 + if err := c.ShouldBindJSON(&req); err != nil && err.Error() != "EOF" { 173 + s.sendError(c, http.StatusBadRequest, "invalid_request", "Invalid request body", map[string]any{ 174 + "error": err.Error(), 175 + }) 176 + return 177 + } 178 + 179 + // Get the global mirror trigger 180 + if globalMirrorTrigger == nil { 181 + s.sendError(c, http.StatusServiceUnavailable, "mirror_unavailable", "Mirror service not available", nil) 182 + return 183 + } 184 + 185 + // Trigger the mirror sync 186 + reason := "manual_api" 187 + source := "api_endpoint" 188 + if req.Force { 189 + reason = "manual_api_force" 190 + } 191 + 192 + globalMirrorTrigger.TriggerMirror(reason, source, req.Force) 193 + 194 + s.logger.Info("Manual mirror sync triggered via API", 195 + "force", req.Force, 196 + "source", c.ClientIP()) 197 + 198 + response := map[string]any{ 199 + "triggered": true, 200 + "reason": reason, 201 + "force": req.Force, 202 + "message": "Mirror synchronization triggered successfully", 203 + } 204 + 205 + s.sendResponse(c, http.StatusOK, "Mirror sync triggered", response) 206 + } 207 + 153 208 // All HSM operations are now proxied to agents - no direct handlers needed 154 209 155 210 // sendResponse sends a successful API response
-1
internal/controller/discovery_daemonset_controller.go
··· 327 327 existing.Labels = desired.Labels 328 328 329 329 logger.Info("Updating discovery DaemonSet", 330 - "device", hsmDevice.Name, 331 330 "daemonset", daemonSetName, 332 331 "specChanged", specChanged, 333 332 "labelsChanged", labelsChanged)
+180 -50
internal/controller/hsmpool_agent_controller.go
··· 85 85 return ctrl.Result{}, client.IgnoreNotFound(err) 86 86 } 87 87 88 - logger.Info("Reconciling HSM agent deployment for pool", "pool", hsmPool.Name, "phase", hsmPool.Status.Phase) 88 + logger.Info("Reconciling HSM agent deployment", "phase", hsmPool.Status.Phase) 89 89 90 90 // Only deploy agents for ready pools with discovered hardware 91 91 if hsmPool.Status.Phase == hsmv1alpha1.HSMPoolPhaseReady && len(hsmPool.Status.AggregatedDevices) > 0 { ··· 306 306 // Deployment creation and management functions 307 307 308 308 // ensureAgentDeployments ensures agent deployments exist for all available devices in the pool 309 + // Handles device migrations by grouping devices by serial number and detecting state changes 309 310 func (r *HSMPoolAgentReconciler) ensureAgentDeployments(ctx context.Context, hsmPool *hsmv1alpha1.HSMPool) error { 310 311 logger := log.FromContext(ctx) 311 312 313 + // Group devices by serial number to detect migrations 314 + devicesBySerial := make(map[string][]hsmv1alpha1.DiscoveredDevice) 315 + for _, device := range hsmPool.Status.AggregatedDevices { 316 + devicesBySerial[device.SerialNumber] = append(devicesBySerial[device.SerialNumber], device) 317 + } 318 + 319 + // Process each unique serial with stable ordering 320 + serialNumbers := make([]string, 0, len(devicesBySerial)) 321 + for serial := range devicesBySerial { 322 + serialNumbers = append(serialNumbers, serial) 323 + } 324 + sort.Strings(serialNumbers) // Stable ordering 325 + 312 326 var deploymentErrors []error 313 327 314 - // Create stable device-to-index mapping by sorting devices by serial number 315 - // This ensures the same device always gets the same index regardless of discovery order 316 - availableDevices := make([]hsmv1alpha1.DiscoveredDevice, 0) 317 - for _, device := range hsmPool.Status.AggregatedDevices { 318 - if device.Available { 319 - availableDevices = append(availableDevices, device) 328 + for i, serial := range serialNumbers { 329 + devices := devicesBySerial[serial] 330 + agentName := fmt.Sprintf("%s-%s-%d", AgentNamePrefix, hsmPool.OwnerReferences[0].Name, i) 331 + 332 + // Find the active device (if any) and lost device (if any) 333 + var activeDevice *hsmv1alpha1.DiscoveredDevice 334 + var lostDevice *hsmv1alpha1.DiscoveredDevice 335 + 336 + for j := range devices { 337 + dev := &devices[j] 338 + if dev.Available { 339 + activeDevice = dev 340 + } else { 341 + lostDevice = dev 342 + } 343 + } 344 + 345 + // Decision logic based on device state 346 + if activeDevice != nil && lostDevice != nil { 347 + // Migration scenario - device moved nodes 348 + timeSinceLost := time.Since(lostDevice.LastSeen.Time) 349 + gracePeriod := 5 * time.Minute 350 + if hsmPool.Spec.GracePeriod != nil { 351 + gracePeriod = hsmPool.Spec.GracePeriod.Duration 352 + } 353 + 354 + if timeSinceLost < gracePeriod && activeDevice.NodeName != lostDevice.NodeName { 355 + logger.Info("Device migration detected", 356 + "serial", serial, 357 + "from", lostDevice.NodeName, 358 + "to", activeDevice.NodeName, 359 + "timeSinceLost", timeSinceLost) 360 + 361 + // Ensure agent is on the new node (will handle deletion/creation) 362 + if err := r.ensureAgentOnNode(ctx, hsmPool, activeDevice, agentName); err != nil { 363 + logger.Error(err, "Failed to ensure agent on new node after migration", "serial", serial) 364 + deploymentErrors = append(deploymentErrors, fmt.Errorf("migration failed for %s: %w", serial, err)) 365 + continue 366 + } 367 + } else if timeSinceLost < gracePeriod { 368 + // Device came back on same node within grace period 369 + logger.Info("Device reconnected on same node", 370 + "serial", serial, 371 + "node", activeDevice.NodeName, 372 + "timeSinceLost", timeSinceLost) 373 + 374 + if err := r.ensureAgentOnNode(ctx, hsmPool, activeDevice, agentName); err != nil { 375 + logger.Error(err, "Failed to ensure agent after reconnection", "serial", serial) 376 + deploymentErrors = append(deploymentErrors, fmt.Errorf("reconnection failed for %s: %w", serial, err)) 377 + continue 378 + } 379 + } 380 + } else if activeDevice != nil { 381 + // Normal case - device is available 382 + if err := r.ensureAgentOnNode(ctx, hsmPool, activeDevice, agentName); err != nil { 383 + logger.Error(err, "Failed to ensure agent for available device", "serial", serial) 384 + deploymentErrors = append(deploymentErrors, fmt.Errorf("agent creation failed for %s: %w", serial, err)) 385 + continue 386 + } 387 + } else if lostDevice != nil { 388 + // Device is lost - check if we should clean up 389 + timeSinceLost := time.Since(lostDevice.LastSeen.Time) 390 + gracePeriod := 5 * time.Minute 391 + if hsmPool.Spec.GracePeriod != nil { 392 + gracePeriod = hsmPool.Spec.GracePeriod.Duration 393 + } 394 + 395 + if timeSinceLost > gracePeriod { 396 + logger.Info("Cleaning up agent for lost device", 397 + "serial", serial, 398 + "lastNode", lostDevice.NodeName, 399 + "timeSinceLost", timeSinceLost) 400 + 401 + if err := r.deleteAgent(ctx, agentName, hsmPool.Namespace); err != nil { 402 + logger.Error(err, "Failed to delete agent for lost device", "serial", serial) 403 + deploymentErrors = append(deploymentErrors, fmt.Errorf("cleanup failed for %s: %w", serial, err)) 404 + } 405 + } else { 406 + logger.V(1).Info("Device lost but within grace period", 407 + "serial", serial, 408 + "timeSinceLost", timeSinceLost, 409 + "gracePeriod", gracePeriod) 410 + } 320 411 } 321 412 } 322 413 323 - // Sort by serial number for stable index assignment 324 - sort.Slice(availableDevices, func(i, j int) bool { 325 - return availableDevices[i].SerialNumber < availableDevices[j].SerialNumber 326 - }) 414 + // Return aggregated errors if any occurred 415 + if len(deploymentErrors) > 0 { 416 + return fmt.Errorf("deployment errors occurred: %v", deploymentErrors) 417 + } 327 418 328 - // Process each available device with stable index 329 - for i, aggregatedDevice := range availableDevices { 330 - agentName := fmt.Sprintf("%s-%d", r.generateAgentName(hsmPool), i) 419 + return nil 420 + } 331 421 332 - // Check if deployment already exists 333 - var deployment appsv1.Deployment 334 - err := r.Get(ctx, types.NamespacedName{ 335 - Name: agentName, 336 - Namespace: hsmPool.Namespace, 337 - }, &deployment) 422 + // ensureAgentOnNode ensures an agent deployment exists on the correct node for the given device 423 + func (r *HSMPoolAgentReconciler) ensureAgentOnNode(ctx context.Context, hsmPool *hsmv1alpha1.HSMPool, device *hsmv1alpha1.DiscoveredDevice, agentName string) error { 424 + logger := log.FromContext(ctx) 425 + 426 + // Check if deployment exists 427 + var deployment appsv1.Deployment 428 + err := r.Get(ctx, types.NamespacedName{ 429 + Name: agentName, 430 + Namespace: hsmPool.Namespace, 431 + }, &deployment) 432 + 433 + if err == nil { 434 + // Deployment exists - check if it's on the right node 435 + if !r.isDeploymentOnNode(&deployment, device.NodeName) { 436 + logger.Info("Agent on wrong node, recreating", 437 + "agent", agentName, 438 + "currentNode", r.getDeploymentNode(&deployment), 439 + "targetNode", device.NodeName, 440 + "serial", device.SerialNumber) 338 441 339 - if err == nil { 340 - // Deployment exists, check if it needs updating 442 + // Delete and recreate 443 + if err := r.Delete(ctx, &deployment); err != nil && !errors.IsNotFound(err) { 444 + return fmt.Errorf("failed to delete outdated agent: %w", err) 445 + } 446 + // Fall through to create 447 + } else { 448 + // Agent is on correct node - check if other details need updating 341 449 needsUpdate, err := r.agentNeedsUpdate(ctx, &deployment, hsmPool) 342 450 if err != nil { 343 - logger.Error(err, "Failed to check if agent deployment needs update", "deployment", agentName) 344 - deploymentErrors = append(deploymentErrors, fmt.Errorf("failed to check deployment %s: %w", agentName, err)) 345 - continue 451 + return fmt.Errorf("failed to check if agent needs update: %w", err) 346 452 } 347 453 348 - // Check device-specific configuration for this specific device (deployment index matches device index) 349 454 if !needsUpdate { 350 - needsUpdate = r.deploymentNeedsUpdateForDevice(&deployment, &aggregatedDevice) 455 + needsUpdate = r.deploymentNeedsUpdateForDevice(&deployment, device) 351 456 } 352 457 353 458 if needsUpdate { 354 - // Delete existing deployment to trigger recreation 355 - logger.Info("Deleting outdated agent deployment", "deployment", agentName) 459 + logger.Info("Agent needs updating, recreating", 460 + "agent", agentName, 461 + "node", device.NodeName, 462 + "serial", device.SerialNumber) 463 + 356 464 if err := r.Delete(ctx, &deployment); err != nil && !errors.IsNotFound(err) { 357 - logger.Error(err, "Failed to delete outdated agent deployment", "deployment", agentName) 358 - deploymentErrors = append(deploymentErrors, fmt.Errorf("failed to delete deployment %s: %w", agentName, err)) 359 - continue 465 + return fmt.Errorf("failed to delete outdated agent: %w", err) 360 466 } 361 - // Fall through to create new deployment 467 + // Fall through to create 362 468 } else { 363 - // Deployment is up to date 364 - logger.V(1).Info("Agent deployment is up to date", "deployment", agentName) 365 - continue 469 + // Agent is up to date 470 + logger.V(1).Info("Agent deployment is up to date", 471 + "agent", agentName, 472 + "node", device.NodeName, 473 + "serial", device.SerialNumber) 474 + return nil 366 475 } 367 - } else if !errors.IsNotFound(err) { 368 - logger.Error(err, "Failed to check agent deployment", "deployment", agentName) 369 - deploymentErrors = append(deploymentErrors, fmt.Errorf("failed to check deployment %s: %w", agentName, err)) 370 - continue 371 476 } 477 + } else if !errors.IsNotFound(err) { 478 + return fmt.Errorf("failed to check agent deployment: %w", err) 479 + } 372 480 373 - // Create new deployment 374 - logger.Info("Creating agent deployment", "deployment", agentName, "device", aggregatedDevice.SerialNumber) 375 - if err := r.createAgentDeployment(ctx, hsmPool, &aggregatedDevice, agentName); err != nil { 376 - logger.Error(err, "Failed to create agent deployment", "deployment", agentName) 377 - deploymentErrors = append(deploymentErrors, fmt.Errorf("failed to create deployment %s: %w", agentName, err)) 378 - continue 379 - } 481 + // Create agent deployment 482 + logger.Info("Creating agent deployment", 483 + "agent", agentName, 484 + "node", device.NodeName, 485 + "serial", device.SerialNumber) 486 + 487 + return r.createAgentDeployment(ctx, hsmPool, device, agentName) 488 + } 489 + 490 + // isDeploymentOnNode checks if a deployment is pinned to the specified node 491 + func (r *HSMPoolAgentReconciler) isDeploymentOnNode(deployment *appsv1.Deployment, nodeName string) bool { 492 + if deployment.Spec.Template.Spec.NodeSelector != nil { 493 + return deployment.Spec.Template.Spec.NodeSelector["kubernetes.io/hostname"] == nodeName 380 494 } 495 + return false 496 + } 381 497 382 - // Return aggregated errors if any occurred 383 - if len(deploymentErrors) > 0 { 384 - return fmt.Errorf("deployment errors occurred: %v", deploymentErrors) 498 + // getDeploymentNode returns the node name that a deployment is pinned to 499 + func (r *HSMPoolAgentReconciler) getDeploymentNode(deployment *appsv1.Deployment) string { 500 + if deployment.Spec.Template.Spec.NodeSelector != nil { 501 + return deployment.Spec.Template.Spec.NodeSelector["kubernetes.io/hostname"] 385 502 } 503 + return "" 504 + } 386 505 506 + // deleteAgent deletes an agent deployment by name 507 + func (r *HSMPoolAgentReconciler) deleteAgent(ctx context.Context, name, namespace string) error { 508 + deployment := &appsv1.Deployment{ 509 + ObjectMeta: metav1.ObjectMeta{ 510 + Name: name, 511 + Namespace: namespace, 512 + }, 513 + } 514 + if err := r.Delete(ctx, deployment); err != nil && !errors.IsNotFound(err) { 515 + return fmt.Errorf("failed to delete agent deployment %s: %w", name, err) 516 + } 387 517 return nil 388 518 } 389 519
+1 -1
internal/mirror/manager.go
··· 49 49 return &MirrorManager{ 50 50 client: k8sClient, 51 51 agentManager: agentManager, 52 - logger: logger.WithName("mirror-manager"), 52 + logger: logger, 53 53 operatorNamespace: operatorNamespace, 54 54 } 55 55 }
+69 -18
internal/modes/discovery/discovery.go
··· 17 17 package discovery 18 18 19 19 import ( 20 + "maps" 20 21 "context" 21 22 "encoding/json" 22 23 "flag" ··· 323 324 } 324 325 325 326 // Add additional device info 326 - for k, v := range usbDev.DeviceInfo { 327 - device.DeviceInfo[k] = v 328 - } 327 + maps.Copy(device.DeviceInfo, usbDev.DeviceInfo) 329 328 330 329 devices = append(devices, device) 331 330 } ··· 497 496 currentDevices = []hsmv1alpha1.DiscoveredDevice{} 498 497 } 499 498 500 - // Check if device already exists using fast lookup 499 + // Check if device already exists using fast lookup (by serial number) 501 500 if existingIdx := d.findDeviceIndex(event.HSMDeviceName, device); existingIdx != -1 { 502 - d.logger.V(2).Info("Device already in cache, updating last seen", 503 - "device", event.HSMDeviceName, 504 - "serial", device.SerialNumber) 505 - currentDevices[existingIdx].LastSeen = device.LastSeen 501 + existingDevice := currentDevices[existingIdx] 502 + 503 + // Check if this is a migration (same serial, different node) 504 + if !existingDevice.Available && existingDevice.NodeName != d.nodeName { 505 + d.logger.Info("Device migrated from another node", 506 + "device", event.HSMDeviceName, 507 + "serial", device.SerialNumber, 508 + "fromNode", existingDevice.NodeName, 509 + "toNode", d.nodeName, 510 + "fromPath", existingDevice.DevicePath, 511 + "toPath", device.DevicePath) 512 + } else if existingDevice.Available && existingDevice.NodeName == d.nodeName { 513 + d.logger.V(2).Info("Device already available on this node, updating last seen", 514 + "device", event.HSMDeviceName, 515 + "serial", device.SerialNumber) 516 + } else if !existingDevice.Available && existingDevice.NodeName == d.nodeName { 517 + d.logger.Info("Device reconnected on same node", 518 + "device", event.HSMDeviceName, 519 + "serial", device.SerialNumber) 520 + } 521 + 522 + // Update the existing device entry with new information 523 + currentDevices[existingIdx] = device 524 + currentDevices[existingIdx].Available = true // Mark as available 525 + 506 526 d.updateDeviceCache(event.HSMDeviceName, currentDevices) 507 527 if err := d.updatePodAnnotation(ctx, event.HSMDeviceName, currentDevices); err != nil { 508 - d.logger.Error(err, "Failed to update pod annotation after updating device last seen") 528 + d.logger.Error(err, "Failed to update pod annotation after device reconnection") 509 529 } 510 530 return 511 531 } ··· 545 565 return 546 566 } 547 567 548 - d.logger.Info("Removed device from cache", 568 + d.logger.Info("Marking device as lost in cache", 549 569 "device", event.HSMDeviceName, 550 570 "serial", device.SerialNumber, 551 571 "path", device.DevicePath) 552 572 553 - // Remove device efficiently by swapping with last element 554 - lastIdx := len(currentDevices) - 1 555 - if deviceIdx != lastIdx { 556 - currentDevices[deviceIdx] = currentDevices[lastIdx] 557 - } 558 - currentDevices = currentDevices[:lastIdx] 573 + // Mark device as lost instead of removing it from cache 574 + currentDevices[deviceIdx].Available = false 575 + currentDevices[deviceIdx].LastSeen = metav1.Now() // Use LastSeen as "lost at" timestamp 559 576 560 577 d.updateDeviceCache(event.HSMDeviceName, currentDevices) 561 578 562 - d.logger.Info("Updated device cache after removal", 579 + d.logger.Info("Updated device cache after marking as lost", 563 580 "device", event.HSMDeviceName, 564 - "remainingDevices", len(currentDevices)) 581 + "serial", device.SerialNumber, 582 + "totalDevices", len(currentDevices)) 565 583 566 584 // Update pod annotation 567 585 if err := d.updatePodAnnotation(ctx, event.HSMDeviceName, currentDevices); err != nil { ··· 589 607 // Re-register specs in case any were missed 590 608 for _, hsmDevice := range hsmDeviceList.Items { 591 609 d.registerSpecForMonitoring(&hsmDevice) 610 + } 611 + 612 + // Clean up devices that have been lost for too long (5 minutes) 613 + cleanupThreshold := 5 * time.Minute 614 + for hsmDeviceName, devices := range d.deviceCache { 615 + var activeDevices []hsmv1alpha1.DiscoveredDevice 616 + cleanedCount := 0 617 + 618 + for _, device := range devices { 619 + if !device.Available && time.Since(device.LastSeen.Time) > cleanupThreshold { 620 + // Skip this device, it's been gone too long 621 + cleanedCount++ 622 + d.logger.V(1).Info("Cleaning up stale lost device from cache", 623 + "device", hsmDeviceName, 624 + "serial", device.SerialNumber, 625 + "lastSeen", device.LastSeen.Time, 626 + "timeSinceLost", time.Since(device.LastSeen.Time)) 627 + continue 628 + } 629 + activeDevices = append(activeDevices, device) 630 + } 631 + 632 + if cleanedCount > 0 { 633 + d.logger.Info("Cleaned up stale lost devices", 634 + "device", hsmDeviceName, 635 + "cleanedCount", cleanedCount, 636 + "remainingDevices", len(activeDevices)) 637 + 638 + d.updateDeviceCache(hsmDeviceName, activeDevices) 639 + if err := d.updatePodAnnotation(ctx, hsmDeviceName, activeDevices); err != nil { 640 + d.logger.Error(err, "Failed to update pod annotation after cleanup", "device", hsmDeviceName) 641 + } 642 + } 592 643 } 593 644 594 645 return nil
+31 -1
internal/modes/manager/manager.go
··· 71 71 apiPort int 72 72 agentImage string 73 73 discoveryImage string 74 + 75 + // Mirror configuration 76 + mirrorPeriodicInterval string // Duration string for periodic sync interval 77 + mirrorDebounceWindow string // Duration string for event debounce window 74 78 } 75 79 76 80 // parseFlags parses command line arguments and returns the configuration ··· 103 107 "Container image for HSM agent pods") 104 108 fs.StringVar(&cfg.discoveryImage, "discovery-image", "", 105 109 "Container image for HSM discovery DaemonSet") 110 + 111 + // Mirror configuration flags 112 + fs.StringVar(&cfg.mirrorPeriodicInterval, "mirror-periodic-interval", "5m", 113 + "Interval for periodic mirror safety sync (e.g., 5m, 10m, 1h)") 114 + fs.StringVar(&cfg.mirrorDebounceWindow, "mirror-debounce-window", "5s", 115 + "Debounce window for batching mirror events (e.g., 5s, 10s, 30s)") 106 116 107 117 if err := fs.Parse(args); err != nil { 108 118 return nil, err ··· 322 332 setupLog.Info("API server will start", "port", cfg.apiPort) 323 333 } 324 334 335 + // Create Kubernetes clientset for pod watching 336 + k8sInterface, err := kubernetes.NewForConfig(mgr.GetConfig()) 337 + if err != nil { 338 + setupLog.Error(err, "unable to create Kubernetes clientset for mirror manager") 339 + return err 340 + } 341 + 342 + // Parse mirror configuration durations 343 + periodicInterval, err := time.ParseDuration(cfg.mirrorPeriodicInterval) 344 + if err != nil { 345 + setupLog.Error(err, "invalid mirror periodic interval", "value", cfg.mirrorPeriodicInterval) 346 + return err 347 + } 348 + 349 + debounceWindow, err := time.ParseDuration(cfg.mirrorDebounceWindow) 350 + if err != nil { 351 + setupLog.Error(err, "invalid mirror debounce window", "value", cfg.mirrorDebounceWindow) 352 + return err 353 + } 354 + 325 355 // Start device-scoped HSM mirroring in background 326 - mirrorManagerRunnable := NewMirrorManagerRunnable(mgr.GetClient(), agentManager, operatorNamespace, ctrl.Log.WithName("device-mirror")) 356 + mirrorManagerRunnable := NewMirrorManagerRunnable(mgr.GetClient(), k8sInterface, agentManager, operatorNamespace, ctrl.Log.WithName("agent-mirror"), periodicInterval, debounceWindow) 327 357 if err := mgr.Add(mirrorManagerRunnable); err != nil { 328 358 setupLog.Error(err, "unable to add mirror manager to manager") 329 359 return err
+230 -26
internal/modes/manager/runnable.go
··· 22 22 "time" 23 23 24 24 "github.com/go-logr/logr" 25 + corev1 "k8s.io/api/core/v1" 26 + "k8s.io/apimachinery/pkg/fields" 25 27 "k8s.io/client-go/kubernetes" 28 + "k8s.io/client-go/tools/cache" 26 29 "sigs.k8s.io/controller-runtime/pkg/client" 27 30 28 31 "github.com/evanjarrett/hsm-secrets-operator/internal/agent" ··· 61 64 return apiServer.Start(ctx) 62 65 } 63 66 67 + // MirrorTriggerEvent represents an event that should trigger mirroring 68 + type MirrorTriggerEvent struct { 69 + Reason string // "agent_ready", "agent_changed", "manual", "periodic" 70 + Force bool // Skip optimization checks 71 + Source string // Additional context (e.g., agent pod name) 72 + } 73 + 64 74 // MirrorManagerRunnable starts the HSM mirroring service 65 75 type MirrorManagerRunnable struct { 66 76 k8sClient client.Client 77 + k8sInterface *kubernetes.Clientset 67 78 agentManager *agent.Manager 68 79 operatorNamespace string 69 80 logger logr.Logger 81 + 82 + // Event-driven mirroring 83 + mirrorTrigger chan MirrorTriggerEvent 84 + periodicInterval time.Duration // Safety net interval (default 5 minutes) 85 + debounceWindow time.Duration // Wait for multiple changes (default 5 seconds) 70 86 } 71 87 72 88 // NewMirrorManagerRunnable creates a new mirror manager runnable 73 - func NewMirrorManagerRunnable(k8sClient client.Client, agentManager *agent.Manager, operatorNamespace string, logger logr.Logger) *MirrorManagerRunnable { 89 + func NewMirrorManagerRunnable(k8sClient client.Client, k8sInterface *kubernetes.Clientset, agentManager *agent.Manager, operatorNamespace string, logger logr.Logger, periodicInterval, debounceWindow time.Duration) *MirrorManagerRunnable { 74 90 return &MirrorManagerRunnable{ 75 91 k8sClient: k8sClient, 92 + k8sInterface: k8sInterface, 76 93 agentManager: agentManager, 77 94 operatorNamespace: operatorNamespace, 78 - logger: logger.WithName("mirror-manager-runnable"), 95 + logger: logger, 96 + mirrorTrigger: make(chan MirrorTriggerEvent, 10), // Buffered channel 97 + periodicInterval: periodicInterval, 98 + debounceWindow: debounceWindow, 99 + } 100 + } 101 + 102 + // setupAgentPodWatcher sets up a pod informer to watch for agent pod changes 103 + func (mmr *MirrorManagerRunnable) setupAgentPodWatcher(ctx context.Context) { 104 + // Create a pod watcher for agent pods in the operator namespace 105 + watchlist := cache.NewListWatchFromClient( 106 + mmr.k8sInterface.CoreV1().RESTClient(), 107 + "pods", 108 + mmr.operatorNamespace, 109 + fields.Everything(), 110 + ) 111 + 112 + // Create the informer using the new API 113 + _, controller := cache.NewInformerWithOptions(cache.InformerOptions{ 114 + ListerWatcher: watchlist, 115 + ObjectType: &corev1.Pod{}, 116 + ResyncPeriod: 5 * time.Minute, // Resync interval 117 + Handler: cache.ResourceEventHandlerFuncs{ 118 + UpdateFunc: func(oldObj, newObj any) { 119 + pod, ok := newObj.(*corev1.Pod) 120 + if !ok { 121 + return 122 + } 123 + 124 + // Only watch agent pods 125 + if !mmr.isAgentPod(pod) { 126 + return 127 + } 128 + 129 + // Check if pod became ready 130 + if mmr.isPodReady(pod) && !mmr.wasPodReady(oldObj.(*corev1.Pod)) { 131 + mmr.logger.Info("Agent pod became ready, triggering mirror sync", 132 + "pod", pod.Name, 133 + "node", pod.Spec.NodeName) 134 + 135 + mmr.TriggerMirror("agent_ready", pod.Name, false) 136 + } 137 + }, 138 + }, 139 + }) 140 + 141 + // Start the controller in a goroutine 142 + go controller.Run(ctx.Done()) 143 + 144 + mmr.logger.Info("Agent pod watcher started") 145 + } 146 + 147 + // isAgentPod checks if a pod is an HSM agent pod 148 + func (mmr *MirrorManagerRunnable) isAgentPod(pod *corev1.Pod) bool { 149 + if pod.Labels == nil { 150 + return false 151 + } 152 + component, exists := pod.Labels["app.kubernetes.io/component"] 153 + return exists && component == "hsm-agent" 154 + } 155 + 156 + // isPodReady checks if a pod is in ready state 157 + func (mmr *MirrorManagerRunnable) isPodReady(pod *corev1.Pod) bool { 158 + for _, condition := range pod.Status.Conditions { 159 + if condition.Type == corev1.PodReady { 160 + return condition.Status == corev1.ConditionTrue 161 + } 162 + } 163 + return false 164 + } 165 + 166 + // wasPodReady checks if the old pod was ready (for detecting transitions) 167 + func (mmr *MirrorManagerRunnable) wasPodReady(pod *corev1.Pod) bool { 168 + return mmr.isPodReady(pod) 169 + } 170 + 171 + // TriggerMirror sends a trigger event to the mirror manager 172 + func (mmr *MirrorManagerRunnable) TriggerMirror(reason, source string, force bool) { 173 + select { 174 + case mmr.mirrorTrigger <- MirrorTriggerEvent{ 175 + Reason: reason, 176 + Force: force, 177 + Source: source, 178 + }: 179 + mmr.logger.V(1).Info("Mirror trigger sent", "reason", reason, "source", source, "force", force) 180 + default: 181 + mmr.logger.V(1).Info("Mirror trigger channel full, dropping event", "reason", reason, "source", source) 79 182 } 80 183 } 81 184 82 185 // Start starts the mirroring service - implements manager.Runnable 83 186 func (mmr *MirrorManagerRunnable) Start(ctx context.Context) error { 84 - mmr.logger.Info("Starting HSM mirroring service") 187 + mmr.logger.Info("Starting event-driven HSM mirroring service", 188 + "periodicInterval", mmr.periodicInterval, 189 + "debounceWindow", mmr.debounceWindow) 190 + 191 + // Set global reference for API access 192 + api.SetMirrorTrigger(mmr) 85 193 86 194 // Create mirror manager 87 195 mirrorManager := mirror.NewMirrorManager(mmr.k8sClient, mmr.agentManager, mmr.logger, mmr.operatorNamespace) 88 196 89 - // Start mirroring cycle 90 - mirrorTicker := time.NewTicker(30 * time.Second) // Mirror every 30 seconds 91 - defer mirrorTicker.Stop() 92 - 93 - mmr.logger.Info("starting device-scoped HSM mirroring", "interval", "30s") 197 + // Set up agent pod watcher for event-driven mirroring 198 + mmr.setupAgentPodWatcher(ctx) 94 199 95 200 // Wait for agents to be ready before starting mirroring 96 201 mmr.logger.Info("waiting for HSM agents to be ready before starting mirroring") ··· 104 209 <-ctx.Done() 105 210 return nil 106 211 } 107 - mmr.logger.Info("HSM agents are ready, starting mirroring cycle") 212 + mmr.logger.Info("HSM agents are ready, starting event-driven mirroring") 213 + 214 + // Set up periodic safety net ticker (much less frequent than before) 215 + periodicTicker := time.NewTicker(mmr.periodicInterval) 216 + defer periodicTicker.Stop() 217 + 218 + // Track last sync to avoid unnecessary operations 219 + var lastSyncTime time.Time 220 + 221 + // Debounce timer for batching multiple rapid events 222 + var debounceTimer *time.Timer 223 + var pendingEvents []MirrorTriggerEvent 108 224 109 225 for { 110 226 select { 111 - case <-mirrorTicker.C: 112 - mmr.logger.Info("starting device-scoped mirroring cycle") 113 - mirrorCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 114 - result, err := mirrorManager.MirrorAllSecrets(mirrorCtx) 115 - cancel() 227 + case event := <-mmr.mirrorTrigger: 228 + mmr.logger.V(1).Info("Received mirror trigger event", 229 + "reason", event.Reason, 230 + "source", event.Source, 231 + "force", event.Force) 116 232 117 - if err != nil { 118 - mmr.logger.Error(err, "device-scoped mirroring failed") 233 + // Add to pending events for debouncing 234 + pendingEvents = append(pendingEvents, event) 235 + 236 + // Reset or start debounce timer 237 + if debounceTimer != nil { 238 + debounceTimer.Stop() 239 + } 240 + debounceTimer = time.AfterFunc(mmr.debounceWindow, func() { 241 + // Process batched events 242 + mmr.processPendingEvents(ctx, mirrorManager, pendingEvents, &lastSyncTime) 243 + pendingEvents = nil 244 + }) 245 + 246 + case <-periodicTicker.C: 247 + // Periodic safety net - only sync if it's been a while 248 + timeSinceLastSync := time.Since(lastSyncTime) 249 + if timeSinceLastSync > mmr.periodicInterval { 250 + mmr.logger.Info("Performing periodic safety mirror sync", 251 + "timeSinceLastSync", timeSinceLastSync) 252 + 253 + mmr.performMirrorSync(ctx, mirrorManager, "periodic", "", &lastSyncTime) 119 254 } else { 120 - mmr.logger.Info("device-scoped mirroring completed", 121 - "secretsProcessed", result.SecretsProcessed, 122 - "secretsUpdated", result.SecretsUpdated, 123 - "secretsCreated", result.SecretsCreated, 124 - "metadataRestored", result.MetadataRestored, 125 - "errors", len(result.Errors), 126 - "success", result.Success) 127 - if len(result.Errors) > 0 { 128 - mmr.logger.Info("mirroring errors details", "errors", result.Errors) 129 - } 255 + mmr.logger.V(1).Info("Skipping periodic sync, recent sync performed", 256 + "timeSinceLastSync", timeSinceLastSync) 130 257 } 258 + 131 259 case <-ctx.Done(): 132 260 mmr.logger.Info("Mirror manager context cancelled, stopping mirroring") 261 + if debounceTimer != nil { 262 + debounceTimer.Stop() 263 + } 133 264 return nil 134 265 } 135 266 } 136 267 } 268 + 269 + // processPendingEvents handles batched mirror trigger events 270 + func (mmr *MirrorManagerRunnable) processPendingEvents(ctx context.Context, mirrorManager *mirror.MirrorManager, events []MirrorTriggerEvent, lastSyncTime *time.Time) { 271 + if len(events) == 0 { 272 + return 273 + } 274 + 275 + // Determine if any event forces sync 276 + force := false 277 + reasons := make([]string, 0, len(events)) 278 + sources := make([]string, 0, len(events)) 279 + 280 + for _, event := range events { 281 + if event.Force { 282 + force = true 283 + } 284 + reasons = append(reasons, event.Reason) 285 + sources = append(sources, event.Source) 286 + } 287 + 288 + // Check if we should skip sync (optimization) 289 + if !force && time.Since(*lastSyncTime) < 30*time.Second { 290 + mmr.logger.V(1).Info("Skipping mirror sync due to recent sync", 291 + "timeSinceLastSync", time.Since(*lastSyncTime), 292 + "events", len(events)) 293 + return 294 + } 295 + 296 + combinedReason := fmt.Sprintf("batched(%s)", reasons[0]) 297 + if len(reasons) > 1 { 298 + combinedReason = fmt.Sprintf("batched(%d events)", len(events)) 299 + } 300 + 301 + mmr.logger.Info("Processing batched mirror events", 302 + "eventCount", len(events), 303 + "reasons", reasons, 304 + "sources", sources) 305 + 306 + mmr.performMirrorSync(ctx, mirrorManager, combinedReason, fmt.Sprintf("%v", sources), lastSyncTime) 307 + } 308 + 309 + // performMirrorSync executes the actual mirroring operation 310 + func (mmr *MirrorManagerRunnable) performMirrorSync(ctx context.Context, mirrorManager *mirror.MirrorManager, reason, source string, lastSyncTime *time.Time) { 311 + mmr.logger.Info("Starting mirror sync", 312 + "reason", reason, 313 + "source", source) 314 + 315 + mirrorCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) 316 + defer cancel() 317 + 318 + result, err := mirrorManager.MirrorAllSecrets(mirrorCtx) 319 + 320 + if err != nil { 321 + mmr.logger.Error(err, "Mirror sync failed", 322 + "reason", reason, 323 + "source", source) 324 + } else { 325 + *lastSyncTime = time.Now() 326 + mmr.logger.Info("Mirror sync completed", 327 + "reason", reason, 328 + "source", source, 329 + "secretsProcessed", result.SecretsProcessed, 330 + "secretsUpdated", result.SecretsUpdated, 331 + "secretsCreated", result.SecretsCreated, 332 + "metadataRestored", result.MetadataRestored, 333 + "errors", len(result.Errors), 334 + "success", result.Success) 335 + 336 + if len(result.Errors) > 0 { 337 + mmr.logger.V(1).Info("Mirror sync error details", "errors", result.Errors) 338 + } 339 + } 340 + }
+7
kubectl-hsm/cmd/main.go
··· 68 68 # Check operator health 69 69 kubectl hsm health 70 70 71 + # Trigger manual mirror synchronization 72 + kubectl hsm mirror sync 73 + 74 + # Force immediate mirror sync 75 + kubectl hsm mirror sync --force 76 + 71 77 For more information about the HSM Secrets Operator, visit: 72 78 https://github.com/evanjarrett/hsm-secrets-operator`, 73 79 SilenceUsage: true, ··· 90 96 // Add operational commands 91 97 cmd.AddCommand(commands.NewHealthCmd()) 92 98 cmd.AddCommand(commands.NewDevicesCmd()) 99 + cmd.AddCommand(commands.NewMirrorCmd()) 93 100 94 101 // Add authentication command 95 102 cmd.AddCommand(commands.NewAuthCmd())
+15
kubectl-hsm/pkg/client/client.go
··· 297 297 298 298 return nil 299 299 } 300 + 301 + // TriggerMirrorSync triggers manual mirror synchronization 302 + func (c *Client) TriggerMirrorSync(ctx context.Context, force bool) (*MirrorSyncResult, error) { 303 + req := &MirrorSyncRequest{ 304 + Force: force, 305 + } 306 + 307 + var result MirrorSyncResult 308 + err := c.doRequest(ctx, "POST", "/api/v1/hsm/mirror/sync", req, &result) 309 + if err != nil { 310 + return nil, fmt.Errorf("failed to trigger mirror sync: %w", err) 311 + } 312 + 313 + return &result, nil 314 + }
+13
kubectl-hsm/pkg/client/types.go
··· 121 121 Errors []string `json:"errors,omitempty"` 122 122 Message string `json:"message"` 123 123 } 124 + 125 + // MirrorSyncRequest represents a request to trigger mirror synchronization 126 + type MirrorSyncRequest struct { 127 + Force bool `json:"force,omitempty"` 128 + } 129 + 130 + // MirrorSyncResult represents the result of a mirror sync trigger 131 + type MirrorSyncResult struct { 132 + Triggered bool `json:"triggered"` 133 + Reason string `json:"reason"` 134 + Force bool `json:"force"` 135 + Message string `json:"message"` 136 + }
+178
kubectl-hsm/pkg/commands/mirror.go
··· 1 + /* 2 + Copyright 2025. 3 + 4 + Licensed under the Apache License, Version 2.0 (the "License"); 5 + you may not use this file except in compliance with the License. 6 + You may obtain a copy of the License at 7 + 8 + http://www.apache.org/licenses/LICENSE-2.0 9 + 10 + Unless required by applicable law or agreed to in writing, software 11 + distributed under the License is distributed on an "AS IS" BASIS, 12 + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 + See the License for the specific language governing permissions and 14 + limitations under the License. 15 + */ 16 + 17 + package commands 18 + 19 + import ( 20 + "context" 21 + "encoding/json" 22 + "fmt" 23 + "os" 24 + 25 + "github.com/spf13/cobra" 26 + "sigs.k8s.io/yaml" 27 + 28 + "github.com/evanjarrett/hsm-secrets-operator/kubectl-hsm/pkg/client" 29 + ) 30 + 31 + // MirrorOptions holds options for the mirror command 32 + type MirrorOptions struct { 33 + CommonOptions 34 + Force bool // Force sync even if recent sync occurred 35 + } 36 + 37 + // Complete fills in default values for the mirror options 38 + func (o *MirrorOptions) Complete() error { 39 + return nil 40 + } 41 + 42 + // Validate checks that the mirror options are valid 43 + func (o *MirrorOptions) Validate() error { 44 + return nil 45 + } 46 + 47 + // AddCommonFlags adds common flags to the command 48 + func (o *MirrorOptions) AddCommonFlags(cmd *cobra.Command) { 49 + cmd.Flags().StringVarP(&o.Namespace, "namespace", "n", "", "namespace to use (defaults to current kubectl namespace)") 50 + cmd.Flags().StringVarP(&o.Output, "output", "o", "", "output format (table|json|yaml)") 51 + cmd.Flags().BoolVarP(&o.Verbose, "verbose", "v", false, "verbose output") 52 + } 53 + 54 + // NewMirrorCmd creates the mirror command 55 + func NewMirrorCmd() *cobra.Command { 56 + opts := &MirrorOptions{} 57 + 58 + cmd := &cobra.Command{ 59 + Use: "mirror", 60 + Short: "Mirror management operations", 61 + Long: `Commands for managing HSM secret mirroring between devices. 62 + 63 + The mirror commands allow you to manually trigger synchronization 64 + of secrets between HSM devices, useful for ensuring consistency 65 + after direct HSM modifications or troubleshooting.`, 66 + } 67 + 68 + // Add subcommands 69 + cmd.AddCommand(newMirrorSyncCmd(opts)) 70 + 71 + return cmd 72 + } 73 + 74 + // newMirrorSyncCmd creates the mirror sync subcommand 75 + func newMirrorSyncCmd(opts *MirrorOptions) *cobra.Command { 76 + cmd := &cobra.Command{ 77 + Use: "sync [flags]", 78 + Short: "Trigger manual mirror synchronization", 79 + Long: `Manually trigger synchronization of secrets between HSM devices. 80 + 81 + This command is useful when: 82 + - You've made direct changes to HSM devices outside the operator 83 + - You want to ensure immediate consistency across all devices 84 + - You're troubleshooting mirror synchronization issues 85 + 86 + Examples: 87 + # Trigger normal mirror sync 88 + kubectl hsm mirror sync 89 + 90 + # Force immediate sync (ignores recent sync optimization) 91 + kubectl hsm mirror sync --force 92 + 93 + # Trigger sync with specific output format 94 + kubectl hsm mirror sync -o json`, 95 + RunE: func(cmd *cobra.Command, args []string) error { 96 + if err := opts.Complete(); err != nil { 97 + return fmt.Errorf("failed to complete options: %w", err) 98 + } 99 + 100 + if err := opts.Validate(); err != nil { 101 + return fmt.Errorf("invalid options: %w", err) 102 + } 103 + 104 + return runMirrorSync(opts) 105 + }, 106 + } 107 + 108 + // Add flags 109 + cmd.Flags().BoolVar(&opts.Force, "force", false, "Force sync even if recent sync occurred") 110 + opts.AddCommonFlags(cmd) 111 + 112 + return cmd 113 + } 114 + 115 + // runMirrorSync executes the mirror sync operation 116 + func runMirrorSync(opts *MirrorOptions) error { 117 + ctx := context.Background() 118 + 119 + // Create client manager 120 + cm, err := NewClientManager(opts.Namespace, opts.Verbose) 121 + if err != nil { 122 + return fmt.Errorf("failed to initialize client manager: %w", err) 123 + } 124 + defer cm.Close() 125 + 126 + // Get HSM client 127 + hsmClient, err := cm.GetClient(ctx) 128 + if err != nil { 129 + return fmt.Errorf("failed to get HSM client: %w", err) 130 + } 131 + 132 + // Trigger mirror sync 133 + result, err := hsmClient.TriggerMirrorSync(ctx, opts.Force) 134 + if err != nil { 135 + return fmt.Errorf("failed to trigger mirror sync: %w", err) 136 + } 137 + 138 + // Output result 139 + return outputMirrorSyncResult(result, opts.Output) 140 + } 141 + 142 + // outputMirrorSyncResult formats and outputs the mirror sync result 143 + func outputMirrorSyncResult(result *client.MirrorSyncResult, outputFormat string) error { 144 + switch outputFormat { 145 + case "json": 146 + encoder := json.NewEncoder(os.Stdout) 147 + encoder.SetIndent("", " ") 148 + return encoder.Encode(result) 149 + 150 + case "yaml": 151 + yamlBytes, err := yaml.Marshal(result) 152 + if err != nil { 153 + return fmt.Errorf("failed to marshal result to YAML: %w", err) 154 + } 155 + fmt.Print(string(yamlBytes)) 156 + return nil 157 + 158 + case "table", "": 159 + // Table format (default) 160 + fmt.Printf("Mirror Sync Triggered Successfully\n\n") 161 + fmt.Printf("Reason: %s\n", result.Reason) 162 + fmt.Printf("Forced: %t\n", result.Force) 163 + fmt.Printf("Status: %s\n", result.Message) 164 + fmt.Printf("Triggered: %t\n", result.Triggered) 165 + fmt.Printf("\n") 166 + 167 + if result.Force { 168 + fmt.Printf("ℹ️ Force flag used - sync will run immediately regardless of recent syncs\n") 169 + } else { 170 + fmt.Printf("ℹ️ Normal sync triggered - may be debounced with other recent events\n") 171 + } 172 + 173 + return nil 174 + 175 + default: 176 + return fmt.Errorf("unsupported output format: %s", outputFormat) 177 + } 178 + }