A Kubernetes operator that bridges Hardware Security Module (HSM) data storage with Kubernetes Secrets, providing true secret portability th
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

idk

+350 -35
+1 -1
Makefile
··· 3 3 # To re-generate a bundle for another specific version without changing the standard setup, you can: 4 4 # - use the VERSION as arg of the bundle target (e.g make bundle VERSION=0.0.2) 5 5 # - use environment variables to overwrite this value (e.g export VERSION=0.0.2) 6 - VERSION ?= 0.5.28 6 + VERSION ?= 0.5.29 7 7 8 8 # CHANNELS define the bundle channels used in the bundle. 9 9 # Add a new line here if you would like to change its default config. (E.g CHANNELS = "candidate,fast,stable")
+2 -2
helm/hsm-secrets-operator/Chart.yaml
··· 2 2 name: hsm-secrets-operator 3 3 description: A Kubernetes operator that bridges Pico HSM binary data storage with Kubernetes Secrets 4 4 type: application 5 - version: 0.5.28 6 - appVersion: v0.5.28 5 + version: 0.5.29 6 + appVersion: v0.5.29 7 7 icon: https://raw.githubusercontent.com/cncf/artwork/master/projects/kubernetes/icon/color/kubernetes-icon-color.svg 8 8 home: https://github.com/evanjarrett/hsm-secrets-operator 9 9 sources:
+235 -22
internal/agent/connection_pool.go
··· 27 27 "github.com/evanjarrett/hsm-secrets-operator/internal/hsm" 28 28 ) 29 29 30 + // ClientWrapper wraps an HSM client to track usage and manage lifecycle 31 + type ClientWrapper struct { 32 + client hsm.Client 33 + pool *ConnectionPool 34 + endpoint string 35 + } 36 + 37 + // Implement hsm.Client interface methods by delegating to wrapped client 38 + func (cw *ClientWrapper) Initialize(ctx context.Context, config hsm.Config) error { 39 + return cw.client.Initialize(ctx, config) 40 + } 41 + 42 + func (cw *ClientWrapper) WriteSecret(ctx context.Context, path string, data hsm.SecretData) error { 43 + return cw.client.WriteSecret(ctx, path, data) 44 + } 45 + 46 + func (cw *ClientWrapper) ReadSecret(ctx context.Context, path string) (hsm.SecretData, error) { 47 + return cw.client.ReadSecret(ctx, path) 48 + } 49 + 50 + func (cw *ClientWrapper) DeleteSecret(ctx context.Context, path string) error { 51 + return cw.client.DeleteSecret(ctx, path) 52 + } 53 + 54 + func (cw *ClientWrapper) ListSecrets(ctx context.Context, prefix string) ([]string, error) { 55 + return cw.client.ListSecrets(ctx, prefix) 56 + } 57 + 58 + func (cw *ClientWrapper) WriteSecretWithMetadata(ctx context.Context, path string, data hsm.SecretData, metadata *hsm.SecretMetadata) error { 59 + return cw.client.WriteSecretWithMetadata(ctx, path, data, metadata) 60 + } 61 + 62 + func (cw *ClientWrapper) ReadMetadata(ctx context.Context, path string) (*hsm.SecretMetadata, error) { 63 + return cw.client.ReadMetadata(ctx, path) 64 + } 65 + 66 + func (cw *ClientWrapper) GetInfo(ctx context.Context) (*hsm.HSMInfo, error) { 67 + return cw.client.GetInfo(ctx) 68 + } 69 + 70 + func (cw *ClientWrapper) GetChecksum(ctx context.Context, path string) (string, error) { 71 + return cw.client.GetChecksum(ctx, path) 72 + } 73 + 74 + func (cw *ClientWrapper) IsConnected() bool { 75 + return cw.client.IsConnected() 76 + } 77 + 78 + func (cw *ClientWrapper) Close() error { 79 + // Mark client as no longer in use when closed 80 + cw.pool.mutex.Lock() 81 + if pooled, exists := cw.pool.clients[cw.endpoint]; exists { 82 + pooled.InUse = false 83 + cw.pool.logger.V(1).Info("Client marked as not in use", "endpoint", cw.endpoint) 84 + } 85 + cw.pool.mutex.Unlock() 86 + 87 + // Note: Don't close the underlying client here, let the pool manage it 88 + return nil 89 + } 90 + 30 91 // PooledClient represents a cached gRPC client with metadata 31 92 type PooledClient struct { 32 - Client hsm.Client 33 - Endpoint string 34 - CreatedAt time.Time 35 - LastUsed time.Time 93 + Client hsm.Client 94 + Endpoint string 95 + CreatedAt time.Time 96 + LastUsed time.Time 97 + UsageCount int64 // Track how many times this client has been used 98 + InUse bool // Track if client is currently being used 99 + } 100 + 101 + // ConnectionPoolMetrics tracks connection pool performance 102 + type ConnectionPoolMetrics struct { 103 + TotalConnections int64 104 + SuccessfulConnections int64 105 + FailedConnections int64 106 + ConnectionReuses int64 107 + HealthCheckPasses int64 108 + HealthCheckFailures int64 109 + ConnectionTimeouts int64 110 + RetryAttempts int64 36 111 } 37 112 38 113 // ConnectionPool manages a pool of gRPC connections to HSM agents ··· 44 119 cleanupInterval time.Duration 45 120 stopChan chan struct{} 46 121 stopOnce sync.Once 122 + metrics ConnectionPoolMetrics 47 123 } 48 124 49 125 // NewConnectionPool creates a new connection pool ··· 64 140 65 141 // GetClient returns a cached client or creates a new one 66 142 func (cp *ConnectionPool) GetClient(ctx context.Context, endpoint string, logger logr.Logger) (hsm.Client, error) { 143 + return cp.getClientWithRetry(ctx, endpoint, logger, 3) // Retry up to 3 times 144 + } 145 + 146 + // getClientWithRetry implements retry logic for client creation 147 + func (cp *ConnectionPool) getClientWithRetry(ctx context.Context, endpoint string, logger logr.Logger, maxRetries int) (hsm.Client, error) { 148 + var lastErr error 149 + 150 + for attempt := 1; attempt <= maxRetries; attempt++ { 151 + client, err := cp.getClientAttempt(ctx, endpoint, logger) 152 + if err == nil { 153 + // Perform health check on the returned client 154 + if wrapper, ok := client.(*ClientWrapper); ok { 155 + if !wrapper.IsConnected() { 156 + cp.logger.Info("Health check failed on new client", "endpoint", endpoint, 157 + "attempt", attempt, "error", "client not connected") 158 + // Remove the client and try again 159 + cp.RemoveClient(endpoint) 160 + lastErr = fmt.Errorf("health check failed: client not connected") 161 + if attempt < maxRetries { 162 + backoffDuration := time.Duration(attempt) * time.Second 163 + cp.logger.Info("Retrying client creation after backoff", 164 + "endpoint", endpoint, "attempt", attempt+1, "backoff", backoffDuration.String()) 165 + time.Sleep(backoffDuration) 166 + continue 167 + } 168 + } 169 + } 170 + return client, nil 171 + } 172 + 173 + lastErr = err 174 + if attempt < maxRetries { 175 + cp.metrics.RetryAttempts++ 176 + backoffDuration := time.Duration(attempt) * time.Second 177 + cp.logger.Info("Client creation failed, retrying after backoff", 178 + "endpoint", endpoint, "attempt", attempt, "error", err, "backoff", backoffDuration.String()) 179 + time.Sleep(backoffDuration) 180 + } 181 + } 182 + 183 + return nil, fmt.Errorf("failed to create client after %d attempts: %w", maxRetries, lastErr) 184 + } 185 + 186 + // getClientAttempt performs a single attempt to get or create a client 187 + func (cp *ConnectionPool) getClientAttempt(ctx context.Context, endpoint string, logger logr.Logger) (hsm.Client, error) { 67 188 cp.mutex.Lock() 68 189 defer cp.mutex.Unlock() 69 190 ··· 73 194 if pooled, exists := cp.clients[endpoint]; exists { 74 195 // Check if client is still valid and not too old 75 196 if now.Sub(pooled.CreatedAt) < cp.maxAge { 76 - // Update last used time 197 + // Check if client is currently in use - avoid closing active connections 198 + if pooled.InUse { 199 + cp.logger.Info("Client is in use, extending max age", "endpoint", endpoint, 200 + "age", now.Sub(pooled.CreatedAt).String(), "usage_count", pooled.UsageCount) 201 + // Extend the client's life while it's in use 202 + pooled.CreatedAt = now.Add(-cp.maxAge / 2) 203 + } 204 + 205 + // Update usage tracking 77 206 pooled.LastUsed = now 78 - cp.logger.V(1).Info("Reusing cached gRPC client", "endpoint", endpoint, 79 - "age", now.Sub(pooled.CreatedAt).String()) 80 - return pooled.Client, nil 207 + pooled.UsageCount++ 208 + pooled.InUse = true 209 + 210 + cp.logger.Info("Reusing cached gRPC client", "endpoint", endpoint, 211 + "age", now.Sub(pooled.CreatedAt).String(), "usage_count", pooled.UsageCount) 212 + cp.metrics.ConnectionReuses++ 213 + return &ClientWrapper{client: pooled.Client, pool: cp, endpoint: endpoint}, nil 81 214 } else { 82 - // Client is too old, close it and remove from cache 83 - cp.logger.V(1).Info("gRPC client expired, closing", "endpoint", endpoint, 84 - "age", now.Sub(pooled.CreatedAt).String()) 215 + // Client is too old, but check if it's in use 216 + if pooled.InUse { 217 + cp.logger.Info("Client expired but in use, extending life", "endpoint", endpoint, 218 + "age", now.Sub(pooled.CreatedAt).String()) 219 + pooled.CreatedAt = now.Add(-cp.maxAge / 2) 220 + pooled.LastUsed = now 221 + pooled.UsageCount++ 222 + return &ClientWrapper{client: pooled.Client, pool: cp, endpoint: endpoint}, nil 223 + } 224 + 225 + // Client is too old and not in use, close it and remove from cache 226 + cp.logger.Info("gRPC client expired, closing", "endpoint", endpoint, 227 + "age", now.Sub(pooled.CreatedAt).String(), "usage_count", pooled.UsageCount) 85 228 if err := pooled.Client.Close(); err != nil { 86 229 cp.logger.V(1).Info("Error closing expired client", "endpoint", endpoint, "error", err) 87 230 } ··· 91 234 92 235 // Create new client 93 236 cp.logger.V(1).Info("Creating new gRPC client", "endpoint", endpoint) 237 + cp.metrics.TotalConnections++ 94 238 client, err := NewGRPCClient(endpoint, logger) 95 239 if err != nil { 240 + cp.metrics.FailedConnections++ 96 241 return nil, fmt.Errorf("failed to create gRPC client: %w", err) 97 242 } 98 243 99 244 // Initialize the connection 100 245 if err := client.Initialize(ctx, hsm.Config{}); err != nil { 246 + cp.metrics.FailedConnections++ 101 247 if closeErr := client.Close(); closeErr != nil { 102 248 cp.logger.V(1).Info("Error closing client after failed initialization", 103 249 "endpoint", endpoint, "error", closeErr) ··· 107 253 108 254 // Cache the client 109 255 cp.clients[endpoint] = &PooledClient{ 110 - Client: client, 111 - Endpoint: endpoint, 112 - CreatedAt: now, 113 - LastUsed: now, 256 + Client: client, 257 + Endpoint: endpoint, 258 + CreatedAt: now, 259 + LastUsed: now, 260 + UsageCount: 1, 261 + InUse: true, 114 262 } 115 263 116 264 cp.logger.Info("Created and cached new gRPC client", "endpoint", endpoint) 117 - return client, nil 265 + cp.metrics.SuccessfulConnections++ 266 + return &ClientWrapper{client: client, pool: cp, endpoint: endpoint}, nil 118 267 } 119 268 120 269 // RemoveClient removes a client from the pool (useful when agent pods restart) ··· 173 322 var toRemove []string 174 323 175 324 for endpoint, pooled := range cp.clients { 176 - // Remove if too old or unused for too long 177 - if now.Sub(pooled.CreatedAt) > cp.maxAge || now.Sub(pooled.LastUsed) > cp.maxAge { 325 + // Remove if too old or unused for too long, but not if currently in use 326 + shouldRemove := (now.Sub(pooled.CreatedAt) > cp.maxAge || now.Sub(pooled.LastUsed) > cp.maxAge) && !pooled.InUse 327 + if shouldRemove { 328 + cp.logger.V(1).Info("Marking client for cleanup", "endpoint", endpoint, 329 + "age", now.Sub(pooled.CreatedAt).String(), 330 + "last_used_ago", now.Sub(pooled.LastUsed).String(), 331 + "usage_count", pooled.UsageCount, "in_use", pooled.InUse) 178 332 toRemove = append(toRemove, endpoint) 333 + } else if pooled.InUse && (now.Sub(pooled.CreatedAt) > cp.maxAge) { 334 + cp.logger.Info("Client is old but still in use, keeping alive", "endpoint", endpoint, 335 + "age", now.Sub(pooled.CreatedAt).String(), "usage_count", pooled.UsageCount) 179 336 } 180 337 } 181 338 ··· 198 355 cp.mutex.RLock() 199 356 defer cp.mutex.RUnlock() 200 357 358 + now := time.Now() 201 359 stats := make(map[string]interface{}) 202 360 stats["active_connections"] = len(cp.clients) 361 + stats["max_age_seconds"] = cp.maxAge.Seconds() 362 + stats["cleanup_interval_seconds"] = cp.cleanupInterval.Seconds() 203 363 204 - endpoints := make([]string, 0, len(cp.clients)) 205 - for endpoint := range cp.clients { 206 - endpoints = append(endpoints, endpoint) 364 + var totalUsage int64 365 + inUseCount := 0 366 + clientDetails := make([]map[string]interface{}, 0, len(cp.clients)) 367 + 368 + for endpoint, pooled := range cp.clients { 369 + totalUsage += pooled.UsageCount 370 + if pooled.InUse { 371 + inUseCount++ 372 + } 373 + 374 + clientDetails = append(clientDetails, map[string]interface{}{ 375 + "endpoint": endpoint, 376 + "age_seconds": now.Sub(pooled.CreatedAt).Seconds(), 377 + "last_used_seconds_ago": now.Sub(pooled.LastUsed).Seconds(), 378 + "usage_count": pooled.UsageCount, 379 + "in_use": pooled.InUse, 380 + }) 207 381 } 208 - stats["endpoints"] = endpoints 382 + 383 + stats["clients_in_use"] = inUseCount 384 + stats["total_usage_count"] = totalUsage 385 + stats["client_details"] = clientDetails 386 + 387 + // Add connection pool metrics 388 + stats["metrics"] = map[string]interface{}{ 389 + "total_connections": cp.metrics.TotalConnections, 390 + "successful_connections": cp.metrics.SuccessfulConnections, 391 + "failed_connections": cp.metrics.FailedConnections, 392 + "connection_reuses": cp.metrics.ConnectionReuses, 393 + "health_check_passes": cp.metrics.HealthCheckPasses, 394 + "health_check_failures": cp.metrics.HealthCheckFailures, 395 + "connection_timeouts": cp.metrics.ConnectionTimeouts, 396 + "retry_attempts": cp.metrics.RetryAttempts, 397 + } 209 398 210 399 return stats 211 400 } 401 + 402 + // HealthCheckClient verifies that a client connection is still healthy 403 + func (cp *ConnectionPool) HealthCheckClient(ctx context.Context, endpoint string) error { 404 + cp.mutex.RLock() 405 + pooled, exists := cp.clients[endpoint] 406 + cp.mutex.RUnlock() 407 + 408 + if !exists { 409 + return fmt.Errorf("no client found for endpoint %s", endpoint) 410 + } 411 + 412 + // Check if the client is still connected 413 + if !pooled.Client.IsConnected() { 414 + cp.metrics.HealthCheckFailures++ 415 + cp.logger.Info("Health check failed for client, removing from pool", 416 + "endpoint", endpoint, "error", "client not connected") 417 + cp.RemoveClient(endpoint) 418 + return fmt.Errorf("health check failed: client not connected") 419 + } 420 + 421 + cp.metrics.HealthCheckPasses++ 422 + cp.logger.V(1).Info("Health check passed for client", "endpoint", endpoint) 423 + return nil 424 + }
+112 -10
internal/mirror/manager.go
··· 21 21 "crypto/sha256" 22 22 "fmt" 23 23 "sort" 24 + "strings" 24 25 "time" 25 26 26 27 "github.com/go-logr/logr" ··· 658 659 return mm.executeMirrorPlans(ctx, plans, deviceLookup, logger), nil 659 660 } 660 661 661 - // discoverAllSecrets discovers all secrets present on any HSM device 662 + // discoverAllSecrets discovers all secrets present on any HSM device with retry logic 662 663 func (mm *MirrorManager) discoverAllSecrets(ctx context.Context, devices []hsmv1alpha1.DiscoveredDevice, logger logr.Logger) []string { 663 664 secretPaths := make(map[string]bool) 665 + failedDevices := make([]hsmv1alpha1.DiscoveredDevice, 0) 666 + 667 + logger.Info("Starting secret discovery", "totalDevices", len(devices)) 664 668 665 669 for _, device := range devices { 666 670 deviceId := device.SerialNumber 667 671 deviceLogger := logger.WithValues("device", deviceId) 668 672 669 - hsmClient, err := mm.agentManager.CreateGRPCClient(ctx, device, deviceLogger) 673 + secrets, err := mm.discoverDeviceSecretsWithRetry(ctx, device, deviceLogger, 3) 670 674 if err != nil { 671 - deviceLogger.Info("Failed to connect to device for discovery, skipping", "error", err) 675 + deviceLogger.Error(err, "Failed to discover secrets on device after retries") 676 + failedDevices = append(failedDevices, device) 672 677 continue 673 678 } 674 679 675 - secrets, err := hsmClient.ListSecrets(ctx, "") 676 - if err != nil { 677 - deviceLogger.Info("Failed to list secrets on device, skipping", "error", err) 678 - continue 680 + deviceLogger.Info("Successfully discovered secrets on device", "secretCount", len(secrets)) 681 + for _, secretPath := range secrets { 682 + secretPaths[secretPath] = true 679 683 } 684 + } 680 685 681 - deviceLogger.Info("Discovered secrets on device", "secretCount", len(secrets)) 682 - for _, secretPath := range secrets { 683 - secretPaths[secretPath] = true 686 + // Log summary of discovery results 687 + successCount := len(devices) - len(failedDevices) 688 + logger.Info("Discovery completed", 689 + "successfulDevices", successCount, 690 + "failedDevices", len(failedDevices), 691 + "totalSecretsFound", len(secretPaths)) 692 + 693 + if len(failedDevices) > 0 { 694 + for _, device := range failedDevices { 695 + logger.Info("Device failed discovery", "device", device.SerialNumber, "nodeName", device.NodeName, "devicePath", device.DevicePath) 684 696 } 685 697 } 686 698 ··· 692 704 sort.Strings(result) 693 705 694 706 return result 707 + } 708 + 709 + // discoverDeviceSecretsWithRetry attempts to discover secrets from a single device with retry logic 710 + func (mm *MirrorManager) discoverDeviceSecretsWithRetry(ctx context.Context, device hsmv1alpha1.DiscoveredDevice, logger logr.Logger, maxRetries int) ([]string, error) { 711 + var lastErr error 712 + 713 + for attempt := 1; attempt <= maxRetries; attempt++ { 714 + attemptLogger := logger.WithValues("attempt", attempt, "maxRetries", maxRetries) 715 + 716 + // Create client with connection pool retry logic 717 + hsmClient, err := mm.agentManager.CreateGRPCClient(ctx, device, attemptLogger) 718 + if err != nil { 719 + lastErr = fmt.Errorf("failed to create gRPC client: %w", err) 720 + attemptLogger.Info("Failed to connect to device", "error", err) 721 + 722 + if attempt < maxRetries { 723 + backoffDuration := time.Duration(attempt) * time.Second 724 + attemptLogger.Info("Retrying device connection after backoff", "backoff", backoffDuration.String()) 725 + time.Sleep(backoffDuration) 726 + continue 727 + } 728 + break 729 + } 730 + 731 + // Ensure client is closed after use 732 + defer func() { 733 + if closeErr := hsmClient.Close(); closeErr != nil { 734 + logger.V(1).Info("Error closing HSM client", "error", closeErr) 735 + } 736 + }() 737 + 738 + // Add timeout for list secrets operation 739 + listCtx, cancel := context.WithTimeout(ctx, 30*time.Second) 740 + secrets, err := hsmClient.ListSecrets(listCtx, "") 741 + cancel() 742 + 743 + if err != nil { 744 + lastErr = fmt.Errorf("failed to list secrets: %w", err) 745 + attemptLogger.Info("Failed to list secrets on device", "error", err) 746 + 747 + // Check for specific connection-related errors that might benefit from retry 748 + if isConnectionError(err) && attempt < maxRetries { 749 + backoffDuration := time.Duration(attempt) * time.Second 750 + attemptLogger.Info("Connection error detected, retrying after backoff", 751 + "backoff", backoffDuration.String(), "error", err) 752 + time.Sleep(backoffDuration) 753 + continue 754 + } 755 + 756 + if attempt < maxRetries { 757 + backoffDuration := time.Duration(attempt) * time.Second 758 + attemptLogger.Info("Retrying list secrets after backoff", "backoff", backoffDuration.String()) 759 + time.Sleep(backoffDuration) 760 + continue 761 + } 762 + break 763 + } 764 + 765 + // Success case 766 + attemptLogger.Info("Successfully listed secrets", "secretCount", len(secrets)) 767 + return secrets, nil 768 + } 769 + 770 + return nil, fmt.Errorf("failed to discover secrets after %d attempts: %w", maxRetries, lastErr) 771 + } 772 + 773 + // isConnectionError checks if an error is related to connection issues that might benefit from retry 774 + func isConnectionError(err error) bool { 775 + if err == nil { 776 + return false 777 + } 778 + 779 + errStr := err.Error() 780 + connectionErrors := []string{ 781 + "grpc: the client connection is closing", 782 + "connection refused", 783 + "connection reset", 784 + "connection timeout", 785 + "context deadline exceeded", 786 + "rpc error: code = Canceled", 787 + "rpc error: code = Unavailable", 788 + } 789 + 790 + for _, connErr := range connectionErrors { 791 + if strings.Contains(errStr, connErr) { 792 + return true 793 + } 794 + } 795 + 796 + return false 695 797 } 696 798 697 799 // calculateChecksum calculates SHA256 checksum of secret data