commit 052958eb7e460563b070dfa98918caa28f04bcb5 Author: rmanach Date: Sat Sep 14 10:38:14 2024 +0200 init repo diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e69de29 diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..857b5df --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,133 @@ +linters-settings: + # depguard: // Specific for golangci repository + # list-type: blacklist + # packages: + # # logging is allowed only by logutils.Log, logrus + # # is allowed to use only in logutils package + # - github.com/sirupsen/logrus + # packages-with-error-message: + # - github.com/sirupsen/logrus: 'logging is allowed only by logutils.Log' + dupl: + threshold: 100 + funlen: + lines: 100 + statements: 50 + gci: + sections: + prefix(fetchsysd) + goconst: + min-len: 2 + min-occurrences: 2 + gocritic: + enabled-tags: + - diagnostic + - experimental + - opinionated + - performance + - style + disabled-checks: + - dupImport # https://github.com/go-critic/go-critic/issues/845 + - ifElseChain + - octalLiteral + # - whyNoLint + - wrapperFunc + gocyclo: + min-complexity: 15 + goimports: + local-prefixes: localenv + mnd: + # don't include the "operation" and "assign" + checks: + - argument + - case + - condition + - return + govet: + shadow: true + # settings: // Specific for golangci repository + # printf: + # funcs: + # - (github.com/golangci/golangci-lint/pkg/logutils.Log).Infof + # - (github.com/golangci/golangci-lint/pkg/logutils.Log).Warnf + # - (github.com/golangci/golangci-lint/pkg/logutils.Log).Errorf + # - (github.com/golangci/golangci-lint/pkg/logutils.Log).Fatalf + lll: + line-length: 200 + maligned: + suggest-new: true + misspell: + locale: US + nolintlint: + allow-leading-space: true # don't require machine-readable nolint directives (i.e. with no leading space) + allow-unused: false # report any unused nolint directives + require-explanation: false # don't require an explanation for nolint directives + require-specific: false # don't require nolint directives to be specific about which linter is being skipped + errcheck: + check-blank: true + exclude-functions: + - '(*github.com/gin-gonic/gin.Error).SetType' + - '(*github.com/gin-gonic/gin.Context).Error' + +linters: + disable-all: true + enable: + - bodyclose + # - deadcode # deprecated (since v1.49.0) + # - depguard + - dogsled + - dupl + - errcheck + - copyloopvar + - exhaustive + - funlen + - gochecknoinits + - goconst + - gocritic + - gocyclo + - gofmt + - goimports + - mnd + - goprintffuncname + - gosec + - gosimple + - govet + - ineffassign + - lll + - misspell + - nakedret + - noctx + - nolintlint + # - rowserrcheck # https://github.com/golangci/golangci-lint/issues/2649 + - staticcheck + # - structcheck # https://github.com/golangci/golangci-lint/issues/2649 + - stylecheck + - typecheck + - unconvert + - unparam + - unused + # - varcheck # deprecated (since v1.49.0) + - whitespace + # - gochecknoglobals # too many global in ds9 + + # don't enable: + # - asciicheck + # - scopelint + # - gocognit + # - godot + # - godox + # - goerr113 + # - interfacer + # - maligned + # - nestif + # - prealloc + # - testpackage + # - revive + # - wsl + +# issues: +# Excluding configuration per-path, per-linter, per-text and per-source +# fix: true + +run: + timeout: 5m + skip-dirs: [] \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..2b837a3 --- /dev/null +++ b/Makefile @@ -0,0 +1,8 @@ +run: lint + go run main.go + +lint: + golangci-lint run ./... + +test: + go test ./... -race \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..b890fe6 --- /dev/null +++ b/README.md @@ -0,0 +1,48 @@ +# cycle-scheduler + +cycle-scheduler is a simple scheduler handling jobs and executes them at regular interval. + +Here a simple representation: +```ascii ++------------------------------------------------------+ +| +---+ +---+ +---+ +---+ +---+ +---+ | +| | | | | | | | | | | | | | +| | | | | | | | | | | | | | +| | | | | | | | | | | | | | +| | | | | | | | | | | | | | +| | | | | | | | | | | | | | +| | | | | | | | | | | | | | +| |s1 | |s2 | |s3 | |s4 | | | |s60| | +| +---+ +---+ +---+ +---+ +---+ +---+ | ++---------------^--------------------------------------+ +``` +Jobs are handle in a array of job slices. + +At each interval (clock), the cursor `^` moves to the next slot (s*). +If there are jobs, they are sent to workers to be executed +and the slot is cleaned. +At the end of the slot (s60), the cursor re-starts a new cycle from s1. + +If a job is not in a desire state, the job is re-scheduled in the current slot to be re-executed in the next cycle. + +**NOTE**: This scheduler does not accept long running tasks. Job execution have a fixed timeout of 10s. +Pooling tasks are more suitable for this kind of scheduler. + +## Run +You can run sample tests from `main.go` to see the scheduler in action: +```bash +make run +``` +If all goes well, you should see this kind of output in the stdout: +```ascii +# cycle-scheduler (slot: 7) +_ P _ _ _ _ _ _ _ _ _ _ _ _ +- - - - - - ^ - - - - - - - +``` +> **P** means *pending* state + +You can adjust the clock interval as needed in `main.go`: +```go +interval := 200 * time.Millisecond +``` + diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..be15dc4 --- /dev/null +++ b/go.mod @@ -0,0 +1,18 @@ +module cycle-scheduler + +go 1.22.4 + +require ( + github.com/google/uuid v1.6.0 + github.com/rs/zerolog v1.33.0 + github.com/stretchr/testify v1.9.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.19 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/sys v0.12.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..de74cd7 --- /dev/null +++ b/go.sum @@ -0,0 +1,27 @@ +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8= +github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/job/job.go b/internal/job/job.go new file mode 100644 index 0000000..617735f --- /dev/null +++ b/internal/job/job.go @@ -0,0 +1,166 @@ +package job + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/google/uuid" + "github.com/rs/zerolog/log" +) + +type State int + +const ( + Pending State = iota + Running + Success + Failed + Abort + Unknown +) + +const UnknownState = "unknown" + +var JobExecTimeout = 10 * time.Second + +func (s State) String() string { + switch s { + case Pending: + return "pending" + case Running: + return "running" + case Success: + return "success" + case Failed: + return "failed" + case Abort: + return "abort" + case Unknown: + return UnknownState + default: + return UnknownState + } +} + +var ( + ErrJobAborted = errors.New("job has been aborted") + ErrJobNotCompletedYet = errors.New("job is not right state, retrying") +) + +type FnJob func(ctx context.Context) error + +type JobDetails struct { + ID uuid.UUID `json:"id"` + State string `json:"state"` + CreatedAt time.Time `json:"createdAt"` + UpdatedAt *time.Time `json:"updatedAt,omitempty"` + Err string `json:"error"` +} + +// TODO(rmanach): add priority level +type Job struct { + l sync.RWMutex + id uuid.UUID + createdAt time.Time + updatedAt *time.Time + state State + task FnJob + err error + chAbort chan struct{} +} + +func NewJob(task FnJob, row, col int) Job { + return Job{ + id: uuid.New(), + createdAt: time.Now().UTC(), + state: Pending, + task: task, + chAbort: make(chan struct{}, 1), + } +} + +func (j *Job) IntoDetails() JobDetails { + j.l.RLock() + defer j.l.RUnlock() + + jd := JobDetails{ + ID: j.id, + CreatedAt: j.createdAt, + State: j.state.String(), + } + + if err := j.err; err != nil { + jd.Err = err.Error() + } + + if ut := j.updatedAt; ut != nil { + jd.UpdatedAt = ut + } + + return jd +} + +func (j *Job) GetID() uuid.UUID { + return j.id +} + +func (j *Job) GetState() State { + j.l.RLock() + defer j.l.RUnlock() + + return j.state +} + +func (j *Job) setState(s State) { + j.l.Lock() + defer j.l.Unlock() + + now := time.Now().UTC() + j.updatedAt = &now + + j.state = s +} + +func (j *Job) setFail(err error) { + j.l.Lock() + defer j.l.Unlock() + + now := time.Now().UTC() + j.updatedAt = &now + + j.state = Failed + j.err = err +} + +func (j *Job) Abort() { + j.setState(Abort) + j.chAbort <- struct{}{} +} + +func (j *Job) Run(ctx context.Context) { + ctxExec, fnCancel := context.WithTimeout(ctx, JobExecTimeout) + defer fnCancel() + + j.setState(Running) + + log.Info().Str("job", j.GetID().String()).Msg("job running...") + + go func() { + for range j.chAbort { + j.setState(Abort) + fnCancel() + } + }() + + if err := j.task(ctxExec); err != nil { + if errors.Is(err, ErrJobNotCompletedYet) { + j.setState(Pending) + return + } + j.setFail(err) + return + } + j.setState(Success) +} diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go new file mode 100644 index 0000000..d5e7351 --- /dev/null +++ b/internal/scheduler/scheduler.go @@ -0,0 +1,371 @@ +package scheduler + +import ( + "context" + "cycle-scheduler/internal/job" + "fmt" + "strings" + "sync" + "time" + + "github.com/google/uuid" + "github.com/rs/zerolog/log" +) + +const ( + TableTitle = "# cycle-scheduler" + Cursor = "^" + CycleLength = 60 + MaxWorkers = 5 +) + +const MaxSlotsIdx = 59 + +type JobSlot struct { + *job.Job + row int +} + +// SchedulerCycle is a dumb scheduler. +// It handle job and executes it at each cycle (60 * interval). +// +// Jobs are handle in a array of job slices. +// At each interval (clock), the cursor moves to the next slot (s*). +// If there are jobs, they are sent to workers to be executed +// and the slot is cleaned. +// +// At the end of the slot (s60), the cursor re-starts a cycle at s1. +type SchedulerCycle struct { + l sync.RWMutex + wg sync.WaitGroup + + ctx context.Context + fnCancel context.CancelFunc + + interval time.Duration + currentSlot int + slots [60][]*job.Job + jobs map[uuid.UUID]*job.Job + + chJobs chan *JobSlot +} + +func NewSchedulerCycle(ctx context.Context, interval time.Duration) *SchedulerCycle { + ctxChild, fnCancel := context.WithCancel(ctx) + + c := SchedulerCycle{ + wg: sync.WaitGroup{}, + ctx: ctxChild, + fnCancel: fnCancel, + interval: interval, + currentSlot: 0, + slots: [60][]*job.Job{}, + jobs: make(map[uuid.UUID]*job.Job), + chJobs: make(chan *JobSlot), + } + + c.run() + + return &c +} + +func (c *SchedulerCycle) Stop() { + c.fnCancel() +} + +func (c *SchedulerCycle) Done() <-chan struct{} { + done := make(chan struct{}) + go func() { + <-c.ctx.Done() + c.wg.Done() + done <- struct{}{} + }() + return done +} + +func (c *SchedulerCycle) Len() int { + c.l.Lock() + defer c.l.Unlock() + + return len(c.jobs) +} + +func (c *SchedulerCycle) HasAllJobsDone() bool { + c.l.Lock() + defer c.l.Unlock() + + for _, j := range c.jobs { + if j.GetState() == job.Pending || j.GetState() == job.Running { + return false + } + } + + return true +} + +func (c *SchedulerCycle) GetJobsDetails() []job.JobDetails { + c.l.Lock() + defer c.l.Unlock() + + details := []job.JobDetails{} + for _, j := range c.jobs { + details = append(details, j.IntoDetails()) + } + + return details +} + +// Delay builds a job and add it to the scheduler engine. +func (c *SchedulerCycle) Delay(fnJob job.FnJob) uuid.UUID { + c.l.Lock() + defer c.l.Unlock() + + nextSlot := c.currentSlot + 1 + if nextSlot > MaxSlotsIdx { + nextSlot = 0 + } + + j := job.NewJob(fnJob, nextSlot, len(c.slots[nextSlot])) + + c.slots[nextSlot] = append(c.slots[nextSlot], &j) + c.jobs[j.GetID()] = &j + + log.Info().Str("job", j.GetID().String()).Msg("job added successfully") + return j.GetID() +} + +// Abort aborts the job given by its id if it exists.. +func (c *SchedulerCycle) Abort(id uuid.UUID) bool { + if j := c.getJob(id); j != nil { + j.Abort() + + log.Info().Str("job", j.GetID().String()).Msg("abort job done") + return true + } + + return false +} + +// GetJobDetails returns the job details by . +func (c *SchedulerCycle) GetJobDetails(id uuid.UUID) job.JobDetails { + c.l.Lock() + defer c.l.Unlock() + + j, ok := c.jobs[id] + if !ok { + return job.JobDetails{ + State: job.Unknown.String(), + } + } + + return j.IntoDetails() +} + +// Display outputs earch interval the scheduler state. +func (c *SchedulerCycle) Display() { + ticker := time.NewTicker(c.interval) + go func() { + for range ticker.C { + c.display() + } + }() +} + +// display writes to stdout the state of the scheduler as a table. +func (c *SchedulerCycle) display() { //nolint:gocyclo // not complex + c.l.RLock() + defer c.l.RUnlock() + + var maxCols int + for i := range c.slots { + if l := len(c.slots[i]); l > maxCols { + maxCols = l + } + } + + table := [][]string{} + title := fmt.Sprintf("%s (slot: %d)", TableTitle, c.currentSlot+1) + table = append(table, []string{title}) + for { + if maxCols == 0 { + break + } + + row := make([]string, CycleLength) + for i := 0; i <= MaxSlotsIdx; i++ { + row[i] = "_" + } + + for i := range c.slots { + if len(c.slots[i]) < maxCols { + continue + } + + j := c.slots[i][maxCols-1] + switch j.GetState() { + case job.Pending: + row[i] = "P" + case job.Running: + row[i] = "R" + case job.Failed: + row[i] = "X" + case job.Abort: + row[i] = "A" + case job.Unknown: + row[i] = "?" + case job.Success: + row[i] = "O" + } + } + + table = append(table, row) + maxCols-- + } + + row := make([]string, CycleLength) + for i := 0; i <= MaxSlotsIdx; i++ { + row[i] = "-" + } + table = append(table, row) + + if l := len(table); l > 0 { + table[l-1][c.currentSlot] = Cursor + } + + tableFormat := "" + for _, r := range table { + tableFormat += strings.Join(r, " ") + tableFormat += "\n" + } + + fmt.Println(tableFormat) +} + +func (c *SchedulerCycle) getJob(id uuid.UUID) *job.Job { + c.l.RLock() + defer c.l.RUnlock() + + j, ok := c.jobs[id] + if !ok { + return nil + } + + return j +} + +// getCurrentSlotJobs collects all the current slot jobs +// and clean the slot. +func (c *SchedulerCycle) getCurrentSlotJobs() (int, []*job.Job) { + c.l.Lock() + defer c.l.Unlock() + + jobs := c.slots[c.currentSlot] + + c.slots[c.currentSlot] = []*job.Job{} + + return c.currentSlot, jobs +} + +// updateSlot add a job to the slot where it was before. +func (c *SchedulerCycle) updateSlot(row int, j *job.Job) { + c.l.Lock() + defer c.l.Unlock() + + c.slots[row] = append(c.slots[row], j) +} + +// updateCurrentSlot add a job to the current slot. +func (c *SchedulerCycle) updateCurrentSlot(j *job.Job) { + c.l.Lock() + defer c.l.Unlock() + + c.slots[c.currentSlot] = append(c.slots[c.currentSlot], j) +} + +// incr increments the slot cursor. +// It the cursor reaches `MaxSlotsIdx`, it goes back to 0. +func (c *SchedulerCycle) incr() { + c.l.Lock() + defer c.l.Unlock() + + nextSlot := c.currentSlot + 1 + if nextSlot > MaxSlotsIdx { + nextSlot = 0 + } + + c.currentSlot = nextSlot +} + +// dispatch gets jobs from the current slot, resets the slot +// and dispatch all jobs to the workers. +// +// It all the workers are busy, the jobs are re-schedule in the same slot +// to be executed in the next cycle. +func (c *SchedulerCycle) dispatch() { + row, jobs := c.getCurrentSlotJobs() + for _, j := range jobs { + if j.GetState() == job.Abort { + continue + } + + select { + case c.chJobs <- &JobSlot{row: row, Job: j}: + default: + log.Warn().Msg("unable to put job in workers, trying next cycle") + c.updateSlot(row, j) + } + } +} + +// run launches the workers and the ticker. +func (c *SchedulerCycle) run() { + c.workers() + c.tick() +} + +// workers launches `MaxWorkers` number of worker to execute job. +// If job returns `ErrJobNotCompletedYet`, it re-schedules in the same slot. +func (c *SchedulerCycle) workers() { + for i := 0; i < MaxWorkers; i++ { + c.wg.Add(1) + go func() { + defer c.wg.Done() + for { + select { + case j := <-c.chJobs: + c.executeJob(j.Job, c.updateCurrentSlot) + case <-c.ctx.Done(): + log.Error().Msg("context done, worker is stopping...") + return + } + } + }() + } +} + +func (c *SchedulerCycle) executeJob(j *job.Job, fnFallBack func(*job.Job)) { + j.Run(c.ctx) + if j.GetState() == job.Pending { + fnFallBack(j) + } +} + +// tick is a simple ticker incrementing at each scheduler interval, +// the slot cursor and dispatch jobs to the workers. +func (c *SchedulerCycle) tick() { + c.wg.Add(1) + go func() { + defer c.wg.Done() + for { + select { + case <-c.ctx.Done(): + log.Error().Msg("context done, ticker is stopping...") + return + default: + time.Sleep(c.interval) + c.incr() + c.dispatch() + } + } + }() +} diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go new file mode 100644 index 0000000..f5af0b8 --- /dev/null +++ b/internal/scheduler/scheduler_test.go @@ -0,0 +1,33 @@ +package scheduler + +import ( + "context" + "cycle-scheduler/internal/job" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestSlot(t *testing.T) { + ctx, fnCancel := context.WithCancel(context.Background()) + defer fnCancel() + + s := NewSchedulerCycle(ctx, 1*time.Millisecond) + + s.Delay(func(ctx context.Context) error { + return nil + }) + s.Delay(func(ctx context.Context) error { + return job.ErrJobNotCompletedYet + }) + j3 := s.Delay(func(ctx context.Context) error { + return errors.New("errors") + }) + + time.Sleep(2 * time.Millisecond) + + assert.Equal(t, 3, s.Len()) + assert.Equal(t, job.Failed.String(), s.GetJobDetails(j3).State) +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..894d704 --- /dev/null +++ b/main.go @@ -0,0 +1,122 @@ +package main + +import ( + "context" + "cycle-scheduler/internal/job" + "cycle-scheduler/internal/scheduler" + "encoding/json" + "errors" + "fmt" + "math/rand/v2" + "os" + "os/signal" + "time" + + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" +) + +func initLogger() { + zerolog.TimeFieldFormat = zerolog.TimeFormatUnix + log.Logger = log.With().Caller().Logger().Output(zerolog.ConsoleWriter{Out: os.Stderr}) +} + +func main() { + initLogger() + + ctx, stop := signal.NotifyContext( + context.Background(), + os.Interrupt, + os.Kill, + ) + defer stop() + + interval := 200 * time.Millisecond + s := scheduler.NewSchedulerCycle(ctx, interval) + s.Display() + + // pending test + for i := 0; i < 20; i++ { + go func(i int) { + time.Sleep(time.Duration(i) * time.Second) + s.Delay(func(ctx context.Context) error { + time.Sleep(4 * time.Second) //nolint:mnd // test purpose + if rand.IntN(10)%2 == 0 { //nolint:gosec,mnd // test prupose + return job.ErrJobNotCompletedYet + } + return nil + }) + }(i) + } + + // abort test + j := s.Delay(func(ctx context.Context) error { + time.Sleep(4 * time.Second) //nolint:mnd // test purpose + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + return job.ErrJobNotCompletedYet + }) + go func() { + time.Sleep(2 * time.Second) //nolint:mnd // test purpose + s.Abort(j) + }() + + // abort test 2 + j2 := s.Delay(func(ctx context.Context) error { + time.Sleep(time.Second) + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + return job.ErrJobNotCompletedYet + }) + go func() { + time.Sleep(10 * time.Second) //nolint:mnd // test purpose + s.Abort(j2) + }() + + // error test + s.Delay(func(ctx context.Context) error { + time.Sleep(5 * time.Second) //nolint:mnd // test purpose + return errors.New("err") + }) + + // success test + go func() { + time.Sleep(10 * time.Second) //nolint:mnd // test purpose + s.Delay(func(ctx context.Context) error { + time.Sleep(5 * time.Second) //nolint:mnd // test purpose + return nil + }) + }() + + go func() { + for { + time.Sleep(2 * time.Second) //nolint:mnd // test purpose + if s.HasAllJobsDone() { + s.Stop() + return + } + } + }() + + <-s.Done() + + jds := s.GetJobsDetails() + for _, jd := range jds { + c, err := json.Marshal(&jd) + if err != nil { + log.Err(err).Str("job", jd.ID.String()).Msg("unable to parse job details into JSON") + continue + } + fmt.Println(string(c)) + } +}