A Kubernetes operator that bridges Hardware Security Module (HSM) data storage with Kubernetes Secrets, providing true secret portability th
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 416 lines 14 kB view raw
1/* 2Copyright 2025. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package controller 18 19import ( 20 "context" 21 "encoding/json" 22 "fmt" 23 "time" 24 25 appsv1 "k8s.io/api/apps/v1" 26 corev1 "k8s.io/api/core/v1" 27 apierrors "k8s.io/apimachinery/pkg/api/errors" 28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 29 "k8s.io/apimachinery/pkg/labels" 30 "k8s.io/apimachinery/pkg/runtime" 31 ctrl "sigs.k8s.io/controller-runtime" 32 "sigs.k8s.io/controller-runtime/pkg/builder" 33 "sigs.k8s.io/controller-runtime/pkg/client" 34 "sigs.k8s.io/controller-runtime/pkg/handler" 35 "sigs.k8s.io/controller-runtime/pkg/log" 36 "sigs.k8s.io/controller-runtime/pkg/predicate" 37 38 hsmv1alpha1 "github.com/evanjarrett/hsm-secrets-operator/api/v1alpha1" 39) 40 41const ( 42 // deviceReportAnnotation is the annotation key used by discovery pods 43 deviceReportAnnotation = "hsm.j5t.io/device-report" 44 // DefaultGracePeriod is the default grace period for considering pod reports stale 45 DefaultGracePeriod = 5 * time.Minute 46 // DefaultAggregationInterval is the default interval for checking pod annotations 47 DefaultAggregationInterval = 30 * time.Second 48) 49 50// PodDiscoveryReport represents the structure of discovery data in pod annotations 51type PodDiscoveryReport struct { 52 HSMDeviceName string `json:"hsmDeviceName"` 53 ReportingNode string `json:"reportingNode"` 54 DiscoveredDevices []hsmv1alpha1.DiscoveredDevice `json:"discoveredDevices"` 55 LastReportTime metav1.Time `json:"lastReportTime"` 56 DiscoveryStatus string `json:"discoveryStatus"` // "discovering", "completed", "error" 57 Error string `json:"error,omitempty"` 58} 59 60// HSMPoolReconciler reconciles a HSMPool object 61type HSMPoolReconciler struct { 62 client.Client 63 Scheme *runtime.Scheme 64} 65 66// +kubebuilder:rbac:groups=hsm.j5t.io,resources=hsmpools,verbs=get;list;watch;create;update;patch;delete 67// +kubebuilder:rbac:groups=hsm.j5t.io,resources=hsmpools/status,verbs=get;update;patch 68// +kubebuilder:rbac:groups=hsm.j5t.io,resources=hsmpools/finalizers,verbs=update 69// +kubebuilder:rbac:groups=hsm.j5t.io,resources=hsmdevices,verbs=get;list;watch 70// +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;patch 71// +kubebuilder:rbac:groups=apps,resources=daemonsets,verbs=get;list;watch 72 73// Reconcile handles HSMPool reconciliation - aggregates device discovery from pod annotations 74func (r *HSMPoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { 75 logger := log.FromContext(ctx) 76 77 // Fetch the HSMPool instance 78 var hsmPool hsmv1alpha1.HSMPool 79 if err := r.Get(ctx, req.NamespacedName, &hsmPool); err != nil { 80 logger.Error(err, "Unable to fetch HSMPool") 81 return ctrl.Result{}, client.IgnoreNotFound(err) 82 } 83 84 // Validate that the referenced HSMDevice exists (from ownerReferences) 85 if len(hsmPool.OwnerReferences) == 0 { 86 return r.updatePoolStatus(ctx, &hsmPool, hsmv1alpha1.HSMPoolPhaseError, nil, nil, 0, "HSMPool has no owner references") 87 } 88 89 deviceRef := hsmPool.OwnerReferences[0].Name 90 hsmDevices := make([]*hsmv1alpha1.HSMDevice, 0, 1) 91 hsmDevice := &hsmv1alpha1.HSMDevice{} 92 if err := r.Get(ctx, client.ObjectKey{ 93 Name: deviceRef, 94 Namespace: hsmPool.Namespace, 95 }, hsmDevice); err != nil { 96 logger.Error(err, "Unable to fetch referenced HSMDevice", "hsmDevice", deviceRef) 97 return r.updatePoolStatus(ctx, &hsmPool, hsmv1alpha1.HSMPoolPhaseError, nil, nil, 0, fmt.Sprintf("HSMDevice %s not found", deviceRef)) 98 } 99 hsmDevices = append(hsmDevices, hsmDevice) 100 101 // Find discovery pods and their annotations 102 podReports, aggregatedDevices, expectedPods, err := r.collectPodReports(ctx, hsmDevices) 103 if err != nil { 104 logger.Error(err, "Failed to collect pod reports") 105 return r.updatePoolStatus(ctx, &hsmPool, hsmv1alpha1.HSMPoolPhaseError, nil, nil, expectedPods, err.Error()) 106 } 107 108 // Aggregate devices from all pod reports 109 phase := r.aggregateDevices(podReports, expectedPods) 110 111 return r.updatePoolStatus(ctx, &hsmPool, phase, aggregatedDevices, podReports, expectedPods, "") 112} 113 114// collectPodReports finds discovery DaemonSet pods owned by HSMDevices and queries their status 115func (r *HSMPoolReconciler) collectPodReports(ctx context.Context, hsmDevices []*hsmv1alpha1.HSMDevice) ([]hsmv1alpha1.PodReport, []hsmv1alpha1.DiscoveredDevice, int32, error) { 116 logger := log.FromContext(ctx) 117 118 podReports := make([]hsmv1alpha1.PodReport, 0) 119 var allDevices []hsmv1alpha1.DiscoveredDevice 120 totalExpectedPods := int32(0) 121 122 // For each HSMDevice referenced by this pool, find its DaemonSet and pods 123 for _, hsmDevice := range hsmDevices { 124 daemonSetName := fmt.Sprintf("%s-discovery", hsmDevice.Name) 125 126 // Get the DaemonSet owned by this HSMDevice 127 daemonSet := &appsv1.DaemonSet{} 128 err := r.Get(ctx, client.ObjectKey{ 129 Name: daemonSetName, 130 Namespace: hsmDevice.Namespace, 131 }, daemonSet) 132 133 if apierrors.IsNotFound(err) { 134 logger.Info("Discovery DaemonSet not found", "device", hsmDevice.Name, "daemonset", daemonSetName) 135 continue 136 } else if err != nil { 137 logger.Error(err, "Failed to get discovery DaemonSet", "device", hsmDevice.Name, "daemonset", daemonSetName) 138 continue 139 } 140 141 // Add expected pods from this DaemonSet 142 totalExpectedPods += daemonSet.Status.DesiredNumberScheduled 143 144 // List pods owned by this DaemonSet 145 pods := &corev1.PodList{} 146 labelSelector := labels.SelectorFromSet(daemonSet.Spec.Selector.MatchLabels) 147 148 listOpts := &client.ListOptions{ 149 LabelSelector: labelSelector, 150 Namespace: hsmDevice.Namespace, 151 } 152 153 if err := r.List(ctx, pods, listOpts); err != nil { 154 return nil, nil, totalExpectedPods, fmt.Errorf("failed to list DaemonSet pods for device %s: %w", hsmDevice.Name, err) 155 } 156 157 // Create pod reports from pod annotations 158 for _, pod := range pods.Items { 159 podReport := hsmv1alpha1.PodReport{ 160 PodName: pod.Name, 161 NodeName: pod.Spec.NodeName, 162 LastReportTime: metav1.Now(), 163 DiscoveryStatus: r.getPodDiscoveryStatus(&pod), 164 Fresh: r.isPodFresh(&pod), 165 } 166 167 podReport.DevicesFound = 0 168 // Parse device count from pod annotation if available 169 if devicesFound, status, reportTime := r.parseDeviceReportAnnotation(&pod); devicesFound >= 0 { 170 podReport.DevicesFound = devicesFound 171 if status != "" { 172 podReport.DiscoveryStatus = status 173 } 174 if !reportTime.IsZero() { 175 podReport.LastReportTime = reportTime 176 } 177 178 // Also collect the actual discovered devices from annotation 179 if pod.Annotations != nil { 180 if reportJSON, exists := pod.Annotations[deviceReportAnnotation]; exists { 181 var discoveryReport PodDiscoveryReport 182 if err := json.Unmarshal([]byte(reportJSON), &discoveryReport); err == nil { 183 allDevices = append(allDevices, discoveryReport.DiscoveredDevices...) 184 } 185 } 186 } 187 } 188 189 podReports = append(podReports, podReport) 190 } 191 } 192 193 return podReports, allDevices, totalExpectedPods, nil 194} 195 196// getPodDiscoveryStatus determines the discovery status based on pod phase and conditions 197func (r *HSMPoolReconciler) getPodDiscoveryStatus(pod *corev1.Pod) string { 198 switch pod.Status.Phase { 199 case corev1.PodRunning: 200 return "completed" 201 case corev1.PodPending: 202 return "pending" 203 case corev1.PodFailed: 204 return "failed" 205 default: 206 return "unknown" 207 } 208} 209 210// isPodFresh checks if the pod is recently updated (simple implementation) 211func (r *HSMPoolReconciler) isPodFresh(pod *corev1.Pod) bool { 212 // Consider pod fresh if it's been ready for less than grace period 213 if pod.Status.Phase != corev1.PodRunning { 214 return false 215 } 216 217 // For now, consider all running pods as fresh 218 // TODO: Could check pod start time or last transition time 219 return true 220} 221 222// parseDeviceReportAnnotation parses the device discovery report from pod annotation 223// Returns (devicesFound, discoveryStatus, lastReportTime) or (-1, "", time.Time{}) if not found/invalid 224func (r *HSMPoolReconciler) parseDeviceReportAnnotation(pod *corev1.Pod) (int32, string, metav1.Time) { 225 if pod.Annotations == nil { 226 return -1, "", metav1.Time{} 227 } 228 229 reportJSON, exists := pod.Annotations[deviceReportAnnotation] 230 if !exists { 231 return -1, "", metav1.Time{} 232 } 233 234 var report PodDiscoveryReport 235 if err := json.Unmarshal([]byte(reportJSON), &report); err != nil { 236 // Log error but don't fail - return fallback values 237 return -1, "", metav1.Time{} 238 } 239 240 return int32(len(report.DiscoveredDevices)), report.DiscoveryStatus, report.LastReportTime 241} 242 243// aggregateDevices determines the pool phase based on pod reports 244func (r *HSMPoolReconciler) aggregateDevices(podReports []hsmv1alpha1.PodReport, expectedPods int32) hsmv1alpha1.HSMPoolPhase { 245 freshReports := 0 246 completedReports := 0 247 248 // Count fresh and completed reports 249 for _, report := range podReports { 250 if report.Fresh { 251 freshReports++ 252 } 253 if report.DiscoveryStatus == "completed" && report.Fresh { 254 completedReports++ 255 } 256 } 257 258 // Determine phase based on reporting status 259 var phase hsmv1alpha1.HSMPoolPhase 260 261 if len(podReports) == 0 { 262 phase = hsmv1alpha1.HSMPoolPhasePending 263 } else if int32(completedReports) >= expectedPods { 264 // All expected pods have completed reporting 265 phase = hsmv1alpha1.HSMPoolPhaseReady 266 } else if int32(freshReports) < expectedPods { 267 // Some pods are not reporting within grace period 268 phase = hsmv1alpha1.HSMPoolPhasePartial 269 } else { 270 // Still collecting reports 271 phase = hsmv1alpha1.HSMPoolPhaseAggregating 272 } 273 274 return phase 275} 276 277// updatePoolStatus updates the HSMPool status 278func (r *HSMPoolReconciler) updatePoolStatus(ctx context.Context, hsmPool *hsmv1alpha1.HSMPool, phase hsmv1alpha1.HSMPoolPhase, devices []hsmv1alpha1.DiscoveredDevice, podReports []hsmv1alpha1.PodReport, expectedPods int32, errorMsg string) (ctrl.Result, error) { 279 now := metav1.Now() 280 281 // Update basic status fields 282 hsmPool.Status.Phase = phase 283 hsmPool.Status.AggregatedDevices = devices 284 hsmPool.Status.TotalDevices = int32(len(devices)) 285 hsmPool.Status.ReportingPods = podReports 286 hsmPool.Status.ExpectedPods = expectedPods 287 hsmPool.Status.LastAggregationTime = &now 288 289 // Count available devices 290 availableCount := int32(0) 291 for _, device := range devices { 292 if device.Available { 293 availableCount++ 294 } 295 } 296 hsmPool.Status.AvailableDevices = availableCount 297 298 // Update conditions 299 conditionType := "DeviceAggregation" 300 conditionStatus := metav1.ConditionTrue 301 reason := string(phase) 302 message := fmt.Sprintf("Aggregated %d devices from %d pods", len(devices), expectedPods) 303 304 if errorMsg != "" { 305 conditionStatus = metav1.ConditionFalse 306 message = errorMsg 307 reason = "Error" 308 } 309 310 // Find or create condition 311 found := false 312 for i, cond := range hsmPool.Status.Conditions { 313 if cond.Type == conditionType { 314 lastTransitionTime := cond.LastTransitionTime 315 if cond.Status != conditionStatus { 316 lastTransitionTime = now 317 } 318 319 hsmPool.Status.Conditions[i] = metav1.Condition{ 320 Type: conditionType, 321 Status: conditionStatus, 322 LastTransitionTime: lastTransitionTime, 323 Reason: reason, 324 Message: message, 325 } 326 found = true 327 break 328 } 329 } 330 331 if !found { 332 hsmPool.Status.Conditions = append(hsmPool.Status.Conditions, metav1.Condition{ 333 Type: conditionType, 334 Status: conditionStatus, 335 LastTransitionTime: now, 336 Reason: reason, 337 Message: message, 338 }) 339 } 340 341 // Update status 342 if err := r.Status().Update(ctx, hsmPool); err != nil { 343 if apierrors.IsConflict(err) { 344 return ctrl.Result{RequeueAfter: DefaultAggregationInterval}, nil 345 } 346 return ctrl.Result{}, err 347 } 348 349 // Requeue based on phase 350 requeueInterval := DefaultAggregationInterval 351 if phase == hsmv1alpha1.HSMPoolPhaseReady { 352 requeueInterval = time.Minute // Less frequent when ready 353 } 354 355 return ctrl.Result{RequeueAfter: requeueInterval}, nil 356} 357 358// SetupWithManager sets up the controller with the Manager 359func (r *HSMPoolReconciler) SetupWithManager(mgr ctrl.Manager) error { 360 return ctrl.NewControllerManagedBy(mgr). 361 For(&hsmv1alpha1.HSMPool{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). 362 // Watch for pod annotation changes 363 Watches( 364 &corev1.Pod{}, 365 handler.EnqueueRequestsFromMapFunc(r.findPoolsForPod), 366 builder.WithPredicates(predicate.AnnotationChangedPredicate{}), 367 ). 368 Named("hsmpool"). 369 Complete(r) 370} 371 372// findPoolsForPod finds HSMPools that should be updated when a pod's annotations change 373func (r *HSMPoolReconciler) findPoolsForPod(ctx context.Context, obj client.Object) []ctrl.Request { 374 pod := obj.(*corev1.Pod) 375 376 // Only watch discovery pods 377 if pod.Labels == nil { 378 return nil 379 } 380 381 if pod.Labels["app.kubernetes.io/component"] != "discovery" { 382 return nil 383 } 384 385 // Check if pod has device reports 386 if pod.Annotations == nil || pod.Annotations[deviceReportAnnotation] == "" { 387 return nil 388 } 389 390 // Parse the report to find which HSMDevice it's for 391 var discoveryReport PodDiscoveryReport 392 if err := json.Unmarshal([]byte(pod.Annotations[deviceReportAnnotation]), &discoveryReport); err != nil { 393 return nil 394 } 395 396 // Find HSMPools that reference this HSMDevice 397 pools := &hsmv1alpha1.HSMPoolList{} 398 if err := r.List(ctx, pools, &client.ListOptions{Namespace: pod.Namespace}); err != nil { 399 return nil 400 } 401 402 var requests []ctrl.Request 403 for _, pool := range pools.Items { 404 // Check if this pool references the HSMDevice in the report (from ownerReferences) 405 if len(pool.OwnerReferences) > 0 && pool.OwnerReferences[0].Name == discoveryReport.HSMDeviceName { 406 requests = append(requests, ctrl.Request{ 407 NamespacedName: client.ObjectKey{ 408 Name: pool.Name, 409 Namespace: pool.Namespace, 410 }, 411 }) 412 } 413 } 414 415 return requests 416}