A Kubernetes operator that bridges Hardware Security Module (HSM) data storage with Kubernetes Secrets, providing true secret portability th
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

use connection pool

+251 -44
+1 -1
Makefile
··· 3 3 # To re-generate a bundle for another specific version without changing the standard setup, you can: 4 4 # - use the VERSION as arg of the bundle target (e.g make bundle VERSION=0.0.2) 5 5 # - use environment variables to overwrite this value (e.g export VERSION=0.0.2) 6 - VERSION ?= 0.5.27 6 + VERSION ?= 0.5.28 7 7 8 8 # CHANNELS define the bundle channels used in the bundle. 9 9 # Add a new line here if you would like to change its default config. (E.g CHANNELS = "candidate,fast,stable")
+2 -2
helm/hsm-secrets-operator/Chart.yaml
··· 2 2 name: hsm-secrets-operator 3 3 description: A Kubernetes operator that bridges Pico HSM binary data storage with Kubernetes Secrets 4 4 type: application 5 - version: 0.5.27 6 - appVersion: v0.5.27 5 + version: 0.5.28 6 + appVersion: v0.5.28 7 7 icon: https://raw.githubusercontent.com/cncf/artwork/master/projects/kubernetes/icon/color/kubernetes-icon-color.svg 8 8 home: https://github.com/evanjarrett/hsm-secrets-operator 9 9 sources:
+211
internal/agent/connection_pool.go
··· 1 + /* 2 + Copyright 2025. 3 + 4 + Licensed under the Apache License, Version 2.0 (the "License"); 5 + you may not use this file except in compliance with the License. 6 + You may obtain a copy of the License at 7 + 8 + http://www.apache.org/licenses/LICENSE-2.0 9 + 10 + Unless required by applicable law or agreed to in writing, software 11 + distributed under the License is distributed on an "AS IS" BASIS, 12 + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 + See the License for the specific language governing permissions and 14 + limitations under the License. 15 + */ 16 + 17 + package agent 18 + 19 + import ( 20 + "context" 21 + "fmt" 22 + "sync" 23 + "time" 24 + 25 + "github.com/go-logr/logr" 26 + 27 + "github.com/evanjarrett/hsm-secrets-operator/internal/hsm" 28 + ) 29 + 30 + // PooledClient represents a cached gRPC client with metadata 31 + type PooledClient struct { 32 + Client hsm.Client 33 + Endpoint string 34 + CreatedAt time.Time 35 + LastUsed time.Time 36 + } 37 + 38 + // ConnectionPool manages a pool of gRPC connections to HSM agents 39 + type ConnectionPool struct { 40 + clients map[string]*PooledClient // endpoint -> client 41 + mutex sync.RWMutex 42 + logger logr.Logger 43 + maxAge time.Duration // Maximum age before client is recreated 44 + cleanupInterval time.Duration 45 + stopChan chan struct{} 46 + stopOnce sync.Once 47 + } 48 + 49 + // NewConnectionPool creates a new connection pool 50 + func NewConnectionPool(logger logr.Logger) *ConnectionPool { 51 + pool := &ConnectionPool{ 52 + clients: make(map[string]*PooledClient), 53 + logger: logger.WithName("connection-pool"), 54 + maxAge: 5 * time.Minute, // Recreate connections every 5 minutes 55 + cleanupInterval: 1 * time.Minute, // Cleanup every minute 56 + stopChan: make(chan struct{}), 57 + } 58 + 59 + // Start background cleanup goroutine 60 + go pool.cleanupLoop() 61 + 62 + return pool 63 + } 64 + 65 + // GetClient returns a cached client or creates a new one 66 + func (cp *ConnectionPool) GetClient(ctx context.Context, endpoint string, logger logr.Logger) (hsm.Client, error) { 67 + cp.mutex.Lock() 68 + defer cp.mutex.Unlock() 69 + 70 + now := time.Now() 71 + 72 + // Check if we have a cached client 73 + if pooled, exists := cp.clients[endpoint]; exists { 74 + // Check if client is still valid and not too old 75 + if now.Sub(pooled.CreatedAt) < cp.maxAge { 76 + // Update last used time 77 + pooled.LastUsed = now 78 + cp.logger.V(1).Info("Reusing cached gRPC client", "endpoint", endpoint, 79 + "age", now.Sub(pooled.CreatedAt).String()) 80 + return pooled.Client, nil 81 + } else { 82 + // Client is too old, close it and remove from cache 83 + cp.logger.V(1).Info("gRPC client expired, closing", "endpoint", endpoint, 84 + "age", now.Sub(pooled.CreatedAt).String()) 85 + if err := pooled.Client.Close(); err != nil { 86 + cp.logger.V(1).Info("Error closing expired client", "endpoint", endpoint, "error", err) 87 + } 88 + delete(cp.clients, endpoint) 89 + } 90 + } 91 + 92 + // Create new client 93 + cp.logger.V(1).Info("Creating new gRPC client", "endpoint", endpoint) 94 + client, err := NewGRPCClient(endpoint, logger) 95 + if err != nil { 96 + return nil, fmt.Errorf("failed to create gRPC client: %w", err) 97 + } 98 + 99 + // Initialize the connection 100 + if err := client.Initialize(ctx, hsm.Config{}); err != nil { 101 + if closeErr := client.Close(); closeErr != nil { 102 + cp.logger.V(1).Info("Error closing client after failed initialization", 103 + "endpoint", endpoint, "error", closeErr) 104 + } 105 + return nil, fmt.Errorf("failed to initialize gRPC client: %w", err) 106 + } 107 + 108 + // Cache the client 109 + cp.clients[endpoint] = &PooledClient{ 110 + Client: client, 111 + Endpoint: endpoint, 112 + CreatedAt: now, 113 + LastUsed: now, 114 + } 115 + 116 + cp.logger.Info("Created and cached new gRPC client", "endpoint", endpoint) 117 + return client, nil 118 + } 119 + 120 + // RemoveClient removes a client from the pool (useful when agent pods restart) 121 + func (cp *ConnectionPool) RemoveClient(endpoint string) { 122 + cp.mutex.Lock() 123 + defer cp.mutex.Unlock() 124 + 125 + if pooled, exists := cp.clients[endpoint]; exists { 126 + cp.logger.Info("Removing client from pool", "endpoint", endpoint) 127 + if err := pooled.Client.Close(); err != nil { 128 + cp.logger.V(1).Info("Error closing removed client", "endpoint", endpoint, "error", err) 129 + } 130 + delete(cp.clients, endpoint) 131 + } 132 + } 133 + 134 + // Close closes all connections and stops the pool 135 + func (cp *ConnectionPool) Close() { 136 + cp.stopOnce.Do(func() { 137 + close(cp.stopChan) 138 + 139 + cp.mutex.Lock() 140 + defer cp.mutex.Unlock() 141 + 142 + cp.logger.Info("Closing connection pool", "cached_clients", len(cp.clients)) 143 + for endpoint, pooled := range cp.clients { 144 + if err := pooled.Client.Close(); err != nil { 145 + cp.logger.V(1).Info("Error closing pooled client", "endpoint", endpoint, "error", err) 146 + } 147 + } 148 + cp.clients = make(map[string]*PooledClient) 149 + }) 150 + } 151 + 152 + // cleanupLoop periodically removes unused/expired connections 153 + func (cp *ConnectionPool) cleanupLoop() { 154 + ticker := time.NewTicker(cp.cleanupInterval) 155 + defer ticker.Stop() 156 + 157 + for { 158 + select { 159 + case <-ticker.C: 160 + cp.cleanup() 161 + case <-cp.stopChan: 162 + return 163 + } 164 + } 165 + } 166 + 167 + // cleanup removes expired connections 168 + func (cp *ConnectionPool) cleanup() { 169 + cp.mutex.Lock() 170 + defer cp.mutex.Unlock() 171 + 172 + now := time.Now() 173 + var toRemove []string 174 + 175 + for endpoint, pooled := range cp.clients { 176 + // Remove if too old or unused for too long 177 + if now.Sub(pooled.CreatedAt) > cp.maxAge || now.Sub(pooled.LastUsed) > cp.maxAge { 178 + toRemove = append(toRemove, endpoint) 179 + } 180 + } 181 + 182 + if len(toRemove) > 0 { 183 + cp.logger.V(1).Info("Cleaning up expired connections", "count", len(toRemove)) 184 + for _, endpoint := range toRemove { 185 + if pooled, exists := cp.clients[endpoint]; exists { 186 + if err := pooled.Client.Close(); err != nil { 187 + cp.logger.V(1).Info("Error closing expired client during cleanup", 188 + "endpoint", endpoint, "error", err) 189 + } 190 + delete(cp.clients, endpoint) 191 + } 192 + } 193 + } 194 + } 195 + 196 + // GetStats returns pool statistics 197 + func (cp *ConnectionPool) GetStats() map[string]interface{} { 198 + cp.mutex.RLock() 199 + defer cp.mutex.RUnlock() 200 + 201 + stats := make(map[string]interface{}) 202 + stats["active_connections"] = len(cp.clients) 203 + 204 + endpoints := make([]string, 0, len(cp.clients)) 205 + for endpoint := range cp.clients { 206 + endpoints = append(endpoints, endpoint) 207 + } 208 + stats["endpoints"] = endpoints 209 + 210 + return stats 211 + }
+4 -3
internal/agent/grpc_client.go
··· 47 47 return nil, fmt.Errorf("endpoint cannot be empty") 48 48 } 49 49 50 - // Create gRPC connection with keepalive 50 + // Create gRPC connection with conservative keepalive settings 51 + // Reduce ping frequency to prevent "too_many_pings" errors 51 52 conn, err := grpc.NewClient(endpoint, 52 53 grpc.WithTransportCredentials(insecure.NewCredentials()), 53 54 grpc.WithKeepaliveParams(keepalive.ClientParameters{ 54 - Time: 10 * time.Second, 55 - Timeout: 3 * time.Second, 55 + Time: 30 * time.Second, // Reduced from 10s to 30s 56 + Timeout: 10 * time.Second, // Increased from 3s to 10s 56 57 PermitWithoutStream: true, 57 58 }), 58 59 )
+10
internal/agent/grpc_server.go
··· 26 26 "github.com/go-logr/logr" 27 27 "google.golang.org/grpc" 28 28 "google.golang.org/grpc/codes" 29 + "google.golang.org/grpc/keepalive" 29 30 "google.golang.org/grpc/status" 30 31 31 32 hsmv1 "github.com/evanjarrett/hsm-secrets-operator/api/proto/hsm/v1" ··· 69 70 return fmt.Errorf("failed to listen on port %d: %w", s.port, err) 70 71 } 71 72 73 + // Configure server with lenient keepalive policy to prevent "too_many_pings" errors 72 74 grpcServer := grpc.NewServer( 73 75 grpc.UnaryInterceptor(s.loggingInterceptor), 76 + grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ 77 + MinTime: 15 * time.Second, // Allow pings every 15s minimum 78 + PermitWithoutStream: true, // Allow pings without active streams 79 + }), 80 + grpc.KeepaliveParams(keepalive.ServerParameters{ 81 + Time: 60 * time.Second, // Send pings every 60s if no activity 82 + Timeout: 10 * time.Second, // Wait 10s for ping response 83 + }), 74 84 ) 75 85 76 86 // Register the HSM agent service
+21 -13
internal/agent/manager.go
··· 83 83 ImageResolver ImageResolver 84 84 85 85 // Internal tracking 86 - activeAgents map[string]*AgentInfo // deviceName -> AgentInfo 87 - mu sync.RWMutex 86 + activeAgents map[string]*AgentInfo // deviceName -> AgentInfo 87 + connectionPool *ConnectionPool // Shared connection pool for gRPC clients 88 + mu sync.RWMutex 88 89 89 90 // Test configuration 90 91 TestMode bool // Enable test mode for faster operations ··· 107 108 108 109 // NewManager creates a new agent manager 109 110 func NewManager(k8sClient client.Client, namespace string, imageResolver ImageResolver) *Manager { 111 + // Create logger for the manager 112 + logger := logr.FromContextOrDiscard(context.Background()).WithName("agent-manager") 110 113 111 114 m := &Manager{ 112 115 Client: k8sClient, 113 116 AgentNamespace: namespace, 114 117 ImageResolver: imageResolver, 115 118 activeAgents: make(map[string]*AgentInfo), 119 + connectionPool: NewConnectionPool(logger), 116 120 // Default production timeouts 117 121 WaitTimeout: 60 * time.Second, 118 122 WaitPollInterval: 2 * time.Second, ··· 126 130 127 131 // NewTestManager creates a new agent manager optimized for testing 128 132 func NewTestManager(k8sClient client.Client, namespace string, imageResolver ImageResolver) *Manager { 133 + // Create logger for the test manager 134 + logger := logr.FromContextOrDiscard(context.Background()).WithName("agent-manager-test") 135 + 129 136 m := &Manager{ 130 137 Client: k8sClient, 131 138 AgentNamespace: namespace, 132 139 ImageResolver: imageResolver, 133 140 activeAgents: make(map[string]*AgentInfo), 141 + connectionPool: NewConnectionPool(logger), 134 142 // Fast test timeouts 135 143 TestMode: true, 136 144 WaitTimeout: 5 * time.Second, ··· 980 988 podIP := targetPod.Status.PodIPs[0].IP 981 989 endpoint := fmt.Sprintf("%s:%d", podIP, AgentPort) 982 990 983 - // Create gRPC client 984 - grpcClient, err := NewGRPCClient(endpoint, logger) 991 + // Use connection pool to get or create cached client 992 + // This significantly reduces connection overhead and prevents "too_many_pings" errors 993 + grpcClient, err := m.connectionPool.GetClient(ctx, endpoint, logger) 985 994 if err != nil { 986 - return nil, fmt.Errorf("failed to create gRPC client for %s: %w", endpoint, err) 987 - } 988 - 989 - // Test the connection 990 - if err := grpcClient.Initialize(ctx, hsm.Config{}); err != nil { 991 - if err := grpcClient.Close(); err != nil { 992 - logger.Error(err, "Failed to close gRPC client after failed initialization") 993 - } 994 - return nil, fmt.Errorf("failed to initialize gRPC client for %s: %w", endpoint, err) 995 + return nil, fmt.Errorf("failed to get pooled gRPC client for %s: %w", endpoint, err) 995 996 } 996 997 997 998 return grpcClient, nil ··· 1042 1043 q, _ := resource.ParseQuantity(s) 1043 1044 return q 1044 1045 } 1046 + 1047 + // Close closes the manager and all its resources including the connection pool 1048 + func (m *Manager) Close() { 1049 + if m.connectionPool != nil { 1050 + m.connectionPool.Close() 1051 + } 1052 + }
+2 -25
internal/mirror/manager.go
··· 112 112 ) 113 113 114 114 // buildSecretInventory builds a comprehensive inventory of secrets across all devices 115 + // 116 + //nolint:unparam // Error return preserved for future error handling scenarios 115 117 func (mm *MirrorManager) buildSecretInventory(ctx context.Context, secretPaths []string, devices []hsmv1alpha1.DiscoveredDevice, logger logr.Logger) (map[string]*SecretInventory, error) { 116 118 inventory := make(map[string]*SecretInventory) 117 119 ··· 145 147 } 146 148 continue 147 149 } 148 - 149 - defer func(client hsm.Client, device string) { 150 - if closeErr := client.Close(); closeErr != nil { 151 - logger.V(1).Info("Failed to close gRPC client", "device", device, "error", closeErr) 152 - } 153 - }(grpcClient, deviceId) 154 150 155 151 // Check if device is connected 156 152 if !grpcClient.IsConnected() { ··· 544 540 if err != nil { 545 541 return nil, nil, fmt.Errorf("failed to create gRPC client: %w", err) 546 542 } 547 - defer func() { 548 - if closeErr := grpcClient.Close(); closeErr != nil { 549 - logger.V(1).Info("Failed to close gRPC client", "error", closeErr) 550 - } 551 - }() 552 543 553 544 if !grpcClient.IsConnected() { 554 545 return nil, nil, fmt.Errorf("device not connected") ··· 576 567 if err != nil { 577 568 return fmt.Errorf("failed to create gRPC client: %w", err) 578 569 } 579 - defer func() { 580 - if closeErr := grpcClient.Close(); closeErr != nil { 581 - logger.V(1).Info("Failed to close gRPC client", "error", closeErr) 582 - } 583 - }() 584 - 585 570 if !grpcClient.IsConnected() { 586 571 return fmt.Errorf("device not connected") 587 572 } ··· 600 585 if err != nil { 601 586 return fmt.Errorf("failed to create gRPC client: %w", err) 602 587 } 603 - defer func() { 604 - if closeErr := grpcClient.Close(); closeErr != nil { 605 - logger.V(1).Info("Failed to close gRPC client", "error", closeErr) 606 - } 607 - }() 608 588 609 589 if !grpcClient.IsConnected() { 610 590 return fmt.Errorf("device not connected") ··· 778 758 } 779 759 logger.Info("HSM agents are ready", "readyDevices", len(devices)) 780 760 return true, nil 781 - } 782 - if closeErr := grpcClient.Close(); closeErr != nil { 783 - logger.V(1).Info("Failed to close gRPC client", "error", closeErr) 784 761 } 785 762 } 786 763 }