A Kubernetes operator that bridges Hardware Security Module (HSM) data storage with Kubernetes Secrets, providing true secret portability th
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

try and keep connections open

+43 -110
+43 -110
internal/agent/connection_pool.go
··· 20 20 "context" 21 21 "fmt" 22 22 "sync" 23 + "sync/atomic" 23 24 "time" 24 25 25 26 "github.com/go-logr/logr" ··· 76 77 } 77 78 78 79 func (cw *ClientWrapper) Close() error { 79 - // Mark client as no longer in use when closed 80 + // Decrease reference count when closed 80 81 cw.pool.mutex.Lock() 81 82 if pooled, exists := cw.pool.clients[cw.endpoint]; exists { 82 - pooled.InUse = false 83 - cw.pool.logger.V(1).Info("Client marked as not in use", "endpoint", cw.endpoint) 83 + newRefCount := atomic.AddInt32(&pooled.RefCount, -1) 84 + if newRefCount <= 0 { 85 + pooled.InUse = false 86 + } 87 + cw.pool.logger.V(1).Info("Client reference decreased", "endpoint", cw.endpoint, "refCount", newRefCount) 84 88 } 85 89 cw.pool.mutex.Unlock() 86 90 ··· 90 94 91 95 // PooledClient represents a cached gRPC client with metadata 92 96 type PooledClient struct { 93 - Client hsm.Client 94 - Endpoint string 95 - CreatedAt time.Time 96 - LastUsed time.Time 97 - UsageCount int64 // Track how many times this client has been used 98 - InUse bool // Track if client is currently being used 97 + Client hsm.Client 98 + Endpoint string 99 + CreatedAt time.Time 100 + LastUsed time.Time 101 + UsageCount int64 // Track how many times this client has been used 102 + InUse bool // Track if client is currently being used 103 + RefCount int32 // Active reference count (atomic operations) 104 + LastOpStart time.Time // When the last operation started 99 105 } 100 106 101 107 // ConnectionPoolMetrics tracks connection pool performance ··· 112 118 113 119 // ConnectionPool manages a pool of gRPC connections to HSM agents 114 120 type ConnectionPool struct { 115 - clients map[string]*PooledClient // endpoint -> client 116 - mutex sync.RWMutex 117 - logger logr.Logger 118 - maxAge time.Duration // Maximum age before client is recreated 119 - cleanupInterval time.Duration 120 - stopChan chan struct{} 121 - stopOnce sync.Once 122 - metrics ConnectionPoolMetrics 121 + clients map[string]*PooledClient // endpoint -> client 122 + mutex sync.RWMutex 123 + logger logr.Logger 124 + stopChan chan struct{} 125 + stopOnce sync.Once 126 + metrics ConnectionPoolMetrics 123 127 } 124 128 125 129 // NewConnectionPool creates a new connection pool 126 130 func NewConnectionPool(logger logr.Logger) *ConnectionPool { 127 131 pool := &ConnectionPool{ 128 - clients: make(map[string]*PooledClient), 129 - logger: logger.WithName("connection-pool"), 130 - maxAge: 5 * time.Minute, // Recreate connections every 5 minutes 131 - cleanupInterval: 1 * time.Minute, // Cleanup every minute 132 - stopChan: make(chan struct{}), 132 + clients: make(map[string]*PooledClient), 133 + logger: logger.WithName("connection-pool"), 134 + stopChan: make(chan struct{}), 133 135 } 134 136 135 - // Start background cleanup goroutine 136 - go pool.cleanupLoop() 137 - 137 + pool.logger.Info("Connection pool created - connections will be kept alive until pod termination") 138 138 return pool 139 139 } 140 140 ··· 192 192 193 193 // Check if we have a cached client 194 194 if pooled, exists := cp.clients[endpoint]; exists { 195 - // Check if client is still valid and not too old 196 - if now.Sub(pooled.CreatedAt) < cp.maxAge { 197 - // Check if client is currently in use - avoid closing active connections 198 - if pooled.InUse { 199 - cp.logger.Info("Client is in use, extending max age", "endpoint", endpoint, 200 - "age", now.Sub(pooled.CreatedAt).String(), "usage_count", pooled.UsageCount) 201 - // Extend the client's life while it's in use 202 - pooled.CreatedAt = now.Add(-cp.maxAge / 2) 203 - } 204 - 195 + // Check if client is still connected 196 + if pooled.Client.IsConnected() { 205 197 // Update usage tracking 206 198 pooled.LastUsed = now 207 199 pooled.UsageCount++ 208 200 pooled.InUse = true 201 + pooled.LastOpStart = now 202 + atomic.AddInt32(&pooled.RefCount, 1) 209 203 210 - cp.logger.Info("Reusing cached gRPC client", "endpoint", endpoint, 211 - "age", now.Sub(pooled.CreatedAt).String(), "usage_count", pooled.UsageCount) 204 + cp.logger.V(1).Info("Reusing cached gRPC client", "endpoint", endpoint, 205 + "age", now.Sub(pooled.CreatedAt).String(), "usage_count", pooled.UsageCount, "refCount", pooled.RefCount) 212 206 cp.metrics.ConnectionReuses++ 213 207 return &ClientWrapper{client: pooled.Client, pool: cp, endpoint: endpoint}, nil 214 208 } else { 215 - // Client is too old, but check if it's in use 216 - if pooled.InUse { 217 - cp.logger.Info("Client expired but in use, extending life", "endpoint", endpoint, 218 - "age", now.Sub(pooled.CreatedAt).String()) 219 - pooled.CreatedAt = now.Add(-cp.maxAge / 2) 220 - pooled.LastUsed = now 221 - pooled.UsageCount++ 222 - return &ClientWrapper{client: pooled.Client, pool: cp, endpoint: endpoint}, nil 223 - } 224 - 225 - // Client is too old and not in use, close it and remove from cache 226 - cp.logger.Info("gRPC client expired, closing", "endpoint", endpoint, 209 + // Client is disconnected, remove it and create a new one 210 + cp.logger.Info("Cached client is disconnected, removing", "endpoint", endpoint, 227 211 "age", now.Sub(pooled.CreatedAt).String(), "usage_count", pooled.UsageCount) 228 212 if err := pooled.Client.Close(); err != nil { 229 - cp.logger.V(1).Info("Error closing expired client", "endpoint", endpoint, "error", err) 213 + cp.logger.V(1).Info("Error closing disconnected client", "endpoint", endpoint, "error", err) 230 214 } 231 215 delete(cp.clients, endpoint) 232 216 } ··· 253 237 254 238 // Cache the client 255 239 cp.clients[endpoint] = &PooledClient{ 256 - Client: client, 257 - Endpoint: endpoint, 258 - CreatedAt: now, 259 - LastUsed: now, 260 - UsageCount: 1, 261 - InUse: true, 240 + Client: client, 241 + Endpoint: endpoint, 242 + CreatedAt: now, 243 + LastUsed: now, 244 + UsageCount: 1, 245 + InUse: true, 246 + RefCount: 1, 247 + LastOpStart: now, 262 248 } 263 249 264 250 cp.logger.Info("Created and cached new gRPC client", "endpoint", endpoint) ··· 298 284 }) 299 285 } 300 286 301 - // cleanupLoop periodically removes unused/expired connections 302 - func (cp *ConnectionPool) cleanupLoop() { 303 - ticker := time.NewTicker(cp.cleanupInterval) 304 - defer ticker.Stop() 305 - 306 - for { 307 - select { 308 - case <-ticker.C: 309 - cp.cleanup() 310 - case <-cp.stopChan: 311 - return 312 - } 313 - } 314 - } 315 - 316 - // cleanup removes expired connections 317 - func (cp *ConnectionPool) cleanup() { 318 - cp.mutex.Lock() 319 - defer cp.mutex.Unlock() 320 - 321 - now := time.Now() 322 - var toRemove []string 323 - 324 - for endpoint, pooled := range cp.clients { 325 - // Remove if too old or unused for too long, but not if currently in use 326 - shouldRemove := (now.Sub(pooled.CreatedAt) > cp.maxAge || now.Sub(pooled.LastUsed) > cp.maxAge) && !pooled.InUse 327 - if shouldRemove { 328 - cp.logger.V(1).Info("Marking client for cleanup", "endpoint", endpoint, 329 - "age", now.Sub(pooled.CreatedAt).String(), 330 - "last_used_ago", now.Sub(pooled.LastUsed).String(), 331 - "usage_count", pooled.UsageCount, "in_use", pooled.InUse) 332 - toRemove = append(toRemove, endpoint) 333 - } else if pooled.InUse && (now.Sub(pooled.CreatedAt) > cp.maxAge) { 334 - cp.logger.Info("Client is old but still in use, keeping alive", "endpoint", endpoint, 335 - "age", now.Sub(pooled.CreatedAt).String(), "usage_count", pooled.UsageCount) 336 - } 337 - } 338 - 339 - if len(toRemove) > 0 { 340 - cp.logger.V(1).Info("Cleaning up expired connections", "count", len(toRemove)) 341 - for _, endpoint := range toRemove { 342 - if pooled, exists := cp.clients[endpoint]; exists { 343 - if err := pooled.Client.Close(); err != nil { 344 - cp.logger.V(1).Info("Error closing expired client during cleanup", 345 - "endpoint", endpoint, "error", err) 346 - } 347 - delete(cp.clients, endpoint) 348 - } 349 - } 350 - } 351 - } 352 - 353 287 // GetStats returns pool statistics 354 288 func (cp *ConnectionPool) GetStats() map[string]interface{} { 355 289 cp.mutex.RLock() ··· 358 292 now := time.Now() 359 293 stats := make(map[string]interface{}) 360 294 stats["active_connections"] = len(cp.clients) 361 - stats["max_age_seconds"] = cp.maxAge.Seconds() 362 - stats["cleanup_interval_seconds"] = cp.cleanupInterval.Seconds() 295 + stats["connection_lifetime"] = "permanent" 363 296 364 297 var totalUsage int64 365 298 inUseCount := 0