Openstatus www.openstatus.dev
6
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 122 lines 4.5 kB view raw
1package scheduler_test 2 3import ( 4 "context" 5 "sync/atomic" 6 7 "sync" 8 "testing" 9 "time" 10 11 "connectrpc.com/connect" 12 "github.com/madflojo/tasks" 13 "github.com/openstatushq/openstatus/apps/checker/pkg/job" 14 "github.com/openstatushq/openstatus/apps/checker/pkg/scheduler" 15 v1 "github.com/openstatushq/openstatus/apps/checker/proto/private_location/v1" 16) 17 18// mockJobRunner implements job.JobRunner for testing 19type mockJobRunner struct { 20 HTTPJobCalled atomic.Bool 21 TCPJobCalled atomic.Bool 22 DNSJobCalled atomic.Bool 23 mu sync.Mutex 24} 25 26func (m *mockJobRunner) HTTPJob(ctx context.Context, monitor *v1.HTTPMonitor) (*job.HttpPrivateRegionData, error) { 27 m.HTTPJobCalled.Store(true) 28 return &job.HttpPrivateRegionData{}, nil 29} 30func (m *mockJobRunner) TCPJob(ctx context.Context, monitor *v1.TCPMonitor) (*job.TCPPrivateRegionData, error) { 31 32 m.TCPJobCalled.Store(true) 33 return &job.TCPPrivateRegionData{}, nil 34} 35 36func (m *mockJobRunner) DNSJob(ctx context.Context, monitor *v1.DNSMonitor) (*job.DNSPrivateRegionData, error) { 37 38 m.TCPJobCalled.Store(true) 39 return &job.DNSPrivateRegionData{}, nil 40} 41// mockClient implements v1.PrivateLocationServiceClient for testing 42type mockClient struct { 43 MonitorsFunc func(ctx context.Context, req *connect.Request[v1.MonitorsRequest]) (*connect.Response[v1.MonitorsResponse], error) 44 IngestHTTPFunc func(ctx context.Context, req *connect.Request[v1.IngestHTTPRequest]) (*connect.Response[v1.IngestHTTPResponse], error) 45 IngestTCPFunc func(ctx context.Context, req *connect.Request[v1.IngestTCPRequest]) (*connect.Response[v1.IngestTCPResponse], error) 46 IngestDNSFunc func(ctx context.Context, req *connect.Request[v1.IngestDNSRequest]) (*connect.Response[v1.IngestDNSResponse], error) 47} 48 49func (m *mockClient) Monitors(ctx context.Context, req *connect.Request[v1.MonitorsRequest]) (*connect.Response[v1.MonitorsResponse], error) { 50 return m.MonitorsFunc(ctx, req) 51} 52func (m *mockClient) IngestHTTP(ctx context.Context, req *connect.Request[v1.IngestHTTPRequest]) (*connect.Response[v1.IngestHTTPResponse], error) { 53 return m.IngestHTTPFunc(ctx, req) 54} 55func (m *mockClient) IngestTCP(ctx context.Context, req *connect.Request[v1.IngestTCPRequest]) (*connect.Response[v1.IngestTCPResponse], error) { 56 return m.IngestTCPFunc(ctx, req) 57} 58func (m *mockClient) IngestDNS(ctx context.Context, req *connect.Request[v1.IngestDNSRequest]) (*connect.Response[v1.IngestDNSResponse], error) { 59 return m.IngestDNSFunc(ctx, req) 60} 61 62func TestMonitorManager_StartAndStopJobs_WithJobRunner(t *testing.T) { 63 ctx := t.Context() 64 65 httpMonitor := &v1.HTTPMonitor{Id: "http1", Url: "http://openstat.us", Periodicity: "10s"} 66 tcpMonitor := &v1.TCPMonitor{Id: "tcp1", Uri: "openstatus:80", Periodicity: "10s"} 67 68 client := &mockClient{ 69 MonitorsFunc: func(ctx context.Context, req *connect.Request[v1.MonitorsRequest]) (*connect.Response[v1.MonitorsResponse], error) { 70 return connect.NewResponse(&v1.MonitorsResponse{ 71 HttpMonitors: []*v1.HTTPMonitor{httpMonitor}, 72 TcpMonitors: []*v1.TCPMonitor{tcpMonitor}, 73 }), nil 74 }, 75 IngestHTTPFunc: func(ctx context.Context, req *connect.Request[v1.IngestHTTPRequest]) (*connect.Response[v1.IngestHTTPResponse], error) { 76 return connect.NewResponse(&v1.IngestHTTPResponse{}), nil 77 }, 78 IngestTCPFunc: func(ctx context.Context, req *connect.Request[v1.IngestTCPRequest]) (*connect.Response[v1.IngestTCPResponse], error) { 79 return connect.NewResponse(&v1.IngestTCPResponse{}), nil 80 }, 81 82 } 83 jobRunner := &mockJobRunner{} 84 85 s := tasks.New() 86 defer s.Stop() 87 88 mm := &scheduler.MonitorManager{ 89 90 Client: client, 91 JobRunner: jobRunner, 92 Scheduler: s, 93 } 94 95 mm.UpdateMonitors(ctx) 96 time.Sleep(12 * time.Second) // allow jobs to run 97 98 if !jobRunner.HTTPJobCalled.Load() == true { 99 t.Errorf("expected HTTPJob to be called") 100 } 101 if !jobRunner.TCPJobCalled.Load() == true { 102 t.Errorf("expected TCPJob to be called") 103 } 104 105 // Remove monitors and ensure jobs are stopped 106 client.MonitorsFunc = func(ctx context.Context, req *connect.Request[v1.MonitorsRequest]) (*connect.Response[v1.MonitorsResponse], error) { 107 return connect.NewResponse(&v1.MonitorsResponse{ 108 HttpMonitors: []*v1.HTTPMonitor{}, 109 TcpMonitors: []*v1.TCPMonitor{}, 110 }), nil 111 } 112 mm.UpdateMonitors(ctx) 113 time.Sleep(1 * time.Second) 114 115 if _, err := mm.Scheduler.Lookup("http1"); err == nil { 116 t.Errorf("expected HTTP job to be removed") 117 } 118 if _, err := mm.Scheduler.Lookup("tcp1"); err == nil { 119 t.Errorf("expected TCP job to be removed") 120 } 121 122}