Openstatus
www.openstatus.dev
1package scheduler_test
2
3import (
4 "context"
5 "sync/atomic"
6
7 "sync"
8 "testing"
9 "time"
10
11 "connectrpc.com/connect"
12 "github.com/madflojo/tasks"
13 "github.com/openstatushq/openstatus/apps/checker/pkg/job"
14 "github.com/openstatushq/openstatus/apps/checker/pkg/scheduler"
15 v1 "github.com/openstatushq/openstatus/apps/checker/proto/private_location/v1"
16)
17
18// mockJobRunner implements job.JobRunner for testing
19type mockJobRunner struct {
20 HTTPJobCalled atomic.Bool
21 TCPJobCalled atomic.Bool
22 DNSJobCalled atomic.Bool
23 mu sync.Mutex
24}
25
26func (m *mockJobRunner) HTTPJob(ctx context.Context, monitor *v1.HTTPMonitor) (*job.HttpPrivateRegionData, error) {
27 m.HTTPJobCalled.Store(true)
28 return &job.HttpPrivateRegionData{}, nil
29}
30func (m *mockJobRunner) TCPJob(ctx context.Context, monitor *v1.TCPMonitor) (*job.TCPPrivateRegionData, error) {
31
32 m.TCPJobCalled.Store(true)
33 return &job.TCPPrivateRegionData{}, nil
34}
35
36func (m *mockJobRunner) DNSJob(ctx context.Context, monitor *v1.DNSMonitor) (*job.DNSPrivateRegionData, error) {
37
38 m.TCPJobCalled.Store(true)
39 return &job.DNSPrivateRegionData{}, nil
40}
41// mockClient implements v1.PrivateLocationServiceClient for testing
42type mockClient struct {
43 MonitorsFunc func(ctx context.Context, req *connect.Request[v1.MonitorsRequest]) (*connect.Response[v1.MonitorsResponse], error)
44 IngestHTTPFunc func(ctx context.Context, req *connect.Request[v1.IngestHTTPRequest]) (*connect.Response[v1.IngestHTTPResponse], error)
45 IngestTCPFunc func(ctx context.Context, req *connect.Request[v1.IngestTCPRequest]) (*connect.Response[v1.IngestTCPResponse], error)
46 IngestDNSFunc func(ctx context.Context, req *connect.Request[v1.IngestDNSRequest]) (*connect.Response[v1.IngestDNSResponse], error)
47}
48
49func (m *mockClient) Monitors(ctx context.Context, req *connect.Request[v1.MonitorsRequest]) (*connect.Response[v1.MonitorsResponse], error) {
50 return m.MonitorsFunc(ctx, req)
51}
52func (m *mockClient) IngestHTTP(ctx context.Context, req *connect.Request[v1.IngestHTTPRequest]) (*connect.Response[v1.IngestHTTPResponse], error) {
53 return m.IngestHTTPFunc(ctx, req)
54}
55func (m *mockClient) IngestTCP(ctx context.Context, req *connect.Request[v1.IngestTCPRequest]) (*connect.Response[v1.IngestTCPResponse], error) {
56 return m.IngestTCPFunc(ctx, req)
57}
58func (m *mockClient) IngestDNS(ctx context.Context, req *connect.Request[v1.IngestDNSRequest]) (*connect.Response[v1.IngestDNSResponse], error) {
59 return m.IngestDNSFunc(ctx, req)
60}
61
62func TestMonitorManager_StartAndStopJobs_WithJobRunner(t *testing.T) {
63 ctx := t.Context()
64
65 httpMonitor := &v1.HTTPMonitor{Id: "http1", Url: "http://openstat.us", Periodicity: "10s"}
66 tcpMonitor := &v1.TCPMonitor{Id: "tcp1", Uri: "openstatus:80", Periodicity: "10s"}
67
68 client := &mockClient{
69 MonitorsFunc: func(ctx context.Context, req *connect.Request[v1.MonitorsRequest]) (*connect.Response[v1.MonitorsResponse], error) {
70 return connect.NewResponse(&v1.MonitorsResponse{
71 HttpMonitors: []*v1.HTTPMonitor{httpMonitor},
72 TcpMonitors: []*v1.TCPMonitor{tcpMonitor},
73 }), nil
74 },
75 IngestHTTPFunc: func(ctx context.Context, req *connect.Request[v1.IngestHTTPRequest]) (*connect.Response[v1.IngestHTTPResponse], error) {
76 return connect.NewResponse(&v1.IngestHTTPResponse{}), nil
77 },
78 IngestTCPFunc: func(ctx context.Context, req *connect.Request[v1.IngestTCPRequest]) (*connect.Response[v1.IngestTCPResponse], error) {
79 return connect.NewResponse(&v1.IngestTCPResponse{}), nil
80 },
81
82 }
83 jobRunner := &mockJobRunner{}
84
85 s := tasks.New()
86 defer s.Stop()
87
88 mm := &scheduler.MonitorManager{
89
90 Client: client,
91 JobRunner: jobRunner,
92 Scheduler: s,
93 }
94
95 mm.UpdateMonitors(ctx)
96 time.Sleep(12 * time.Second) // allow jobs to run
97
98 if !jobRunner.HTTPJobCalled.Load() == true {
99 t.Errorf("expected HTTPJob to be called")
100 }
101 if !jobRunner.TCPJobCalled.Load() == true {
102 t.Errorf("expected TCPJob to be called")
103 }
104
105 // Remove monitors and ensure jobs are stopped
106 client.MonitorsFunc = func(ctx context.Context, req *connect.Request[v1.MonitorsRequest]) (*connect.Response[v1.MonitorsResponse], error) {
107 return connect.NewResponse(&v1.MonitorsResponse{
108 HttpMonitors: []*v1.HTTPMonitor{},
109 TcpMonitors: []*v1.TCPMonitor{},
110 }), nil
111 }
112 mm.UpdateMonitors(ctx)
113 time.Sleep(1 * time.Second)
114
115 if _, err := mm.Scheduler.Lookup("http1"); err == nil {
116 t.Errorf("expected HTTP job to be removed")
117 }
118 if _, err := mm.Scheduler.Lookup("tcp1"); err == nil {
119 t.Errorf("expected TCP job to be removed")
120 }
121
122}