package main import ( "context" "fmt" "github.com/multiplay/go-slack/chat" "github.com/multiplay/go-slack/webhook" "github.com/robfig/cron" "gopkg.in/alecthomas/kingpin.v2" "io" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/typed/batch/v1beta1" "k8s.io/client-go/rest" "os" "os/signal" "syscall" "time" ) var checkFunc = doCheck var exitFunc = os.Exit func main() { slackUrl := kingpin.Flag("slack-url", "The Slack Webhook URL").Envar("SLACK_URL").Required().String() kingpin.Parse() exitFunc(doMain(*slackUrl, &DefaultProvider{&InClusterProvider{}})) } func doMain(slackUrl string, provider ClientProvider) int { client, err := provider.Provide() if err != nil { fmt.Printf("Unable to connect to K8S: %s\n", err) return 1 } ic := make(chan os.Signal, 1) signal.Notify(ic, os.Interrupt, syscall.SIGTERM) if err := checkFunc(client, slackUrl, ic, 60*time.Second, os.Stdout); err != nil { fmt.Printf("Error checking jobs: %s\n", err) return 1 } return 0 } func doCheck(client Client, slackUrl string, ic chan os.Signal, sleepTime time.Duration, out io.Writer) error { slack := webhook.New(slackUrl) parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) for { select { case <-ic: _, _ = fmt.Fprintf(out, "Got SIGTERM signal, exiting\n") return nil default: cronJobs, err := client.BatchV1beta1().CronJobs("").List(context.Background(), v1.ListOptions{}) if err != nil { return fmt.Errorf("error getting cronjobs: %w", err) } limit := time.Now().Add(-120 * time.Second) for _, c := range cronJobs.Items { if c.Spec.Suspend == nil || !*c.Spec.Suspend { since := c.CreationTimestamp if c.Status.LastScheduleTime != nil { since = *c.Status.LastScheduleTime } schedule, err := parser.Parse(c.Spec.Schedule) if err != nil { return fmt.Errorf("error parsing schedule of %s/%s (%s): %w", c.Namespace, c.Name, c.Spec.Schedule, err) } next := schedule.Next(since.Time) _, _ = fmt.Fprintf(out, "Checking %s/%s since %s, next schedule %s, limit %s.\n", c.Namespace, c.Name, since.Format(time.RFC3339), next.Format(time.RFC3339), limit.Format(time.RFC3339)) if next.Before(limit) { _, _ = fmt.Fprintf(out, "%s/%s was not scheduled. Sending Slack notification.\n", c.Namespace, c.Name) m := &chat.Message{ Text: fmt.Sprintf("Cronjob %s/%s is not running according to schedule (%s). Last scheduled: %s", c.Namespace, c.Name, c.Spec.Schedule, since.Format(time.RFC3339)), Username: "cron-checker", } _, err := m.Send(slack) if err != nil { _, _ = fmt.Fprintf(out, "Unable to send Slack notification: %s\n", err) } } } } time.Sleep(sleepTime) } } } type Client interface { BatchV1beta1() v1beta1.BatchV1beta1Interface } type ClientProvider interface { Provide() (Client, error) } type DefaultProvider struct { provider ConfigProvider } func (d DefaultProvider) Provide() (Client, error) { config, err := d.provider.Provide() if err != nil { return nil, err } return kubernetes.NewForConfig(config) } var _ ClientProvider = &DefaultProvider{} type ConfigProvider interface { Provide() (*rest.Config, error) } type InClusterProvider struct{} func (i InClusterProvider) Provide() (*rest.Config, error) { return rest.InClusterConfig() } var _ ConfigProvider = &InClusterProvider{}