diff --git a/README.md b/README.md index b890fe6..98db783 100644 --- a/README.md +++ b/README.md @@ -1,48 +1,18 @@ # cycle-scheduler -cycle-scheduler is a simple scheduler handling jobs and executes them at regular interval. - -Here a simple representation: -```ascii -+------------------------------------------------------+ -| +---+ +---+ +---+ +---+ +---+ +---+ | -| | | | | | | | | | | | | | -| | | | | | | | | | | | | | -| | | | | | | | | | | | | | -| | | | | | | | | | | | | | -| | | | | | | | | | | | | | -| | | | | | | | | | | | | | -| |s1 | |s2 | |s3 | |s4 | | | |s60| | -| +---+ +---+ +---+ +---+ +---+ +---+ | -+---------------^--------------------------------------+ -``` -Jobs are handle in a array of job slices. - -At each interval (clock), the cursor `^` moves to the next slot (s*). -If there are jobs, they are sent to workers to be executed -and the slot is cleaned. -At the end of the slot (s60), the cursor re-starts a new cycle from s1. - -If a job is not in a desire state, the job is re-scheduled in the current slot to be re-executed in the next cycle. - -**NOTE**: This scheduler does not accept long running tasks. Job execution have a fixed timeout of 10s. -Pooling tasks are more suitable for this kind of scheduler. +cycle-scheduler is a simple scheduler handling tasks and executes them at regular interval. If a task is not in desired state, the task is re-scheduled with a backoff. ## Run You can run sample tests from `main.go` to see the scheduler in action: ```bash make run ``` -If all goes well, you should see this kind of output in the stdout: -```ascii -# cycle-scheduler (slot: 7) -_ P _ _ _ _ _ _ _ _ _ _ _ _ -- - - - - - ^ - - - - - - - -``` -> **P** means *pending* state -You can adjust the clock interval as needed in `main.go`: +You can adjust the clock interval and the number of workers as needed in `main.go` constants section: ```go -interval := 200 * time.Millisecond +const ( + MaxWorkers = 5 + Interval = 2000 * time.Millisecond +) ``` diff --git a/internal/job/job.go b/internal/job/job.go index 617735f..781ffde 100644 --- a/internal/job/job.go +++ b/internal/job/job.go @@ -59,7 +59,6 @@ type JobDetails struct { Err string `json:"error"` } -// TODO(rmanach): add priority level type Job struct { l sync.RWMutex id uuid.UUID @@ -71,7 +70,7 @@ type Job struct { chAbort chan struct{} } -func NewJob(task FnJob, row, col int) Job { +func NewJob(task FnJob) Job { return Job{ id: uuid.New(), createdAt: time.Now().UTC(), @@ -130,7 +129,10 @@ func (j *Job) setFail(err error) { now := time.Now().UTC() j.updatedAt = &now - j.state = Failed + if j.state != Abort { + j.state = Failed + } + j.err = err } diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index d5e7351..dfcd78a 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -3,8 +3,7 @@ package scheduler import ( "context" "cycle-scheduler/internal/job" - "fmt" - "strings" + "math" "sync" "time" @@ -12,63 +11,95 @@ import ( "github.com/rs/zerolog/log" ) -const ( - TableTitle = "# cycle-scheduler" - Cursor = "^" - CycleLength = 60 - MaxWorkers = 5 -) +const ExponentialFactor = 1.8 -const MaxSlotsIdx = 59 - -type JobSlot struct { - *job.Job - row int -} - -// SchedulerCycle is a dumb scheduler. -// It handle job and executes it at each cycle (60 * interval). -// -// Jobs are handle in a array of job slices. -// At each interval (clock), the cursor moves to the next slot (s*). -// If there are jobs, they are sent to workers to be executed -// and the slot is cleaned. -// -// At the end of the slot (s60), the cursor re-starts a cycle at s1. +// SchedulerCycle is a simple scheduler handling jobs and executes them at regular interval. +// If a task is not in desired state, the task is re-scheduled with a backoff. type SchedulerCycle struct { - l sync.RWMutex wg sync.WaitGroup ctx context.Context fnCancel context.CancelFunc - interval time.Duration - currentSlot int - slots [60][]*job.Job - jobs map[uuid.UUID]*job.Job + interval time.Duration + tasks tasks - chJobs chan *JobSlot + chTasks chan *task } -func NewSchedulerCycle(ctx context.Context, interval time.Duration) *SchedulerCycle { +func NewSchedulerCycle(ctx context.Context, interval time.Duration, workers uint32) *SchedulerCycle { ctxChild, fnCancel := context.WithCancel(ctx) c := SchedulerCycle{ - wg: sync.WaitGroup{}, - ctx: ctxChild, - fnCancel: fnCancel, - interval: interval, - currentSlot: 0, - slots: [60][]*job.Job{}, - jobs: make(map[uuid.UUID]*job.Job), - chJobs: make(chan *JobSlot), + wg: sync.WaitGroup{}, + ctx: ctxChild, + fnCancel: fnCancel, + interval: interval, + tasks: newTasks(), + chTasks: make(chan *task), } - c.run() + c.run(workers) return &c } +func (c *SchedulerCycle) backoff(t *task) { + backoff := c.interval + time.Duration(math.Pow(ExponentialFactor, float64(t.attempts.Load()))) + + t.timer.set( + time.AfterFunc(backoff, func() { + select { + case c.chTasks <- t: + default: + log.Error().Str("task id", t.GetID().String()).Msg("unable to execute task to the worker, delayed it") + c.backoff(t) + } + }), + ) +} + +// exec runs the task now or if all the workers are in use, delayed it. +func (c *SchedulerCycle) exec(t *task) { + select { + case c.chTasks <- t: + default: + log.Error().Str("task id", t.GetID().String()).Msg("unable to execute the task to a worker now, delayed it") + c.backoff(t) + } +} + +func (c *SchedulerCycle) getTask(id uuid.UUID) *task { + return c.tasks.get(id) +} + +// run launches a number of worker to execute tasks. +// If a task returns `ErrJobNotCompletedYet`, it re-schedules with a backoff. +func (c *SchedulerCycle) run(n uint32) { + for i := 0; i < int(n); i++ { + c.wg.Add(1) + go func() { + defer c.wg.Done() + for { + select { + case t := <-c.chTasks: + c.execute(t, c.backoff) + case <-c.ctx.Done(): + log.Error().Msg("context done, worker is stopping...") + return + } + } + }() + } +} + +func (c *SchedulerCycle) execute(t *task, fnFallBack func(*task)) { + t.run(c.ctx) + if t.GetState() == job.Pending { + fnFallBack(t) + } +} + func (c *SchedulerCycle) Stop() { c.fnCancel() } @@ -77,295 +108,56 @@ func (c *SchedulerCycle) Done() <-chan struct{} { done := make(chan struct{}) go func() { <-c.ctx.Done() - c.wg.Done() + c.wg.Wait() done <- struct{}{} }() return done } func (c *SchedulerCycle) Len() int { - c.l.Lock() - defer c.l.Unlock() - - return len(c.jobs) + return c.tasks.len() } -func (c *SchedulerCycle) HasAllJobsDone() bool { - c.l.Lock() - defer c.l.Unlock() - - for _, j := range c.jobs { - if j.GetState() == job.Pending || j.GetState() == job.Running { - return false - } - } - - return true +// TasksDone checks whether all the tasks has been completed. +func (c *SchedulerCycle) TasksDone() bool { + return c.tasks.completed() } -func (c *SchedulerCycle) GetJobsDetails() []job.JobDetails { - c.l.Lock() - defer c.l.Unlock() - - details := []job.JobDetails{} - for _, j := range c.jobs { - details = append(details, j.IntoDetails()) - } - - return details +func (c *SchedulerCycle) GetTasksDetails() []TaskDetails { + return c.tasks.getAllDetails() } -// Delay builds a job and add it to the scheduler engine. +// GetTaskDetails returns the task details by id. +func (c *SchedulerCycle) GetTaskDetails(id uuid.UUID) TaskDetails { + return c.tasks.getDetails(id) +} + +// Delay builds a task and add it to the scheduler engine. func (c *SchedulerCycle) Delay(fnJob job.FnJob) uuid.UUID { - c.l.Lock() - defer c.l.Unlock() - - nextSlot := c.currentSlot + 1 - if nextSlot > MaxSlotsIdx { - nextSlot = 0 + select { + case <-c.Done(): + log.Error().Msg("context done unable to add new job") + default: } - j := job.NewJob(fnJob, nextSlot, len(c.slots[nextSlot])) + t := newTask(fnJob) - c.slots[nextSlot] = append(c.slots[nextSlot], &j) - c.jobs[j.GetID()] = &j + c.tasks.add(t) - log.Info().Str("job", j.GetID().String()).Msg("job added successfully") - return j.GetID() + c.exec(t) + + log.Info().Str("task", t.GetID().String()).Msg("task added successfully") + return t.GetID() } -// Abort aborts the job given by its id if it exists.. +// Abort aborts the task given by its id if it exists. func (c *SchedulerCycle) Abort(id uuid.UUID) bool { - if j := c.getJob(id); j != nil { - j.Abort() + if t := c.getTask(id); t != nil { + t.abort() - log.Info().Str("job", j.GetID().String()).Msg("abort job done") + log.Info().Str("task id", t.GetID().String()).Msg("abort task done") return true } return false } - -// GetJobDetails returns the job details by . -func (c *SchedulerCycle) GetJobDetails(id uuid.UUID) job.JobDetails { - c.l.Lock() - defer c.l.Unlock() - - j, ok := c.jobs[id] - if !ok { - return job.JobDetails{ - State: job.Unknown.String(), - } - } - - return j.IntoDetails() -} - -// Display outputs earch interval the scheduler state. -func (c *SchedulerCycle) Display() { - ticker := time.NewTicker(c.interval) - go func() { - for range ticker.C { - c.display() - } - }() -} - -// display writes to stdout the state of the scheduler as a table. -func (c *SchedulerCycle) display() { //nolint:gocyclo // not complex - c.l.RLock() - defer c.l.RUnlock() - - var maxCols int - for i := range c.slots { - if l := len(c.slots[i]); l > maxCols { - maxCols = l - } - } - - table := [][]string{} - title := fmt.Sprintf("%s (slot: %d)", TableTitle, c.currentSlot+1) - table = append(table, []string{title}) - for { - if maxCols == 0 { - break - } - - row := make([]string, CycleLength) - for i := 0; i <= MaxSlotsIdx; i++ { - row[i] = "_" - } - - for i := range c.slots { - if len(c.slots[i]) < maxCols { - continue - } - - j := c.slots[i][maxCols-1] - switch j.GetState() { - case job.Pending: - row[i] = "P" - case job.Running: - row[i] = "R" - case job.Failed: - row[i] = "X" - case job.Abort: - row[i] = "A" - case job.Unknown: - row[i] = "?" - case job.Success: - row[i] = "O" - } - } - - table = append(table, row) - maxCols-- - } - - row := make([]string, CycleLength) - for i := 0; i <= MaxSlotsIdx; i++ { - row[i] = "-" - } - table = append(table, row) - - if l := len(table); l > 0 { - table[l-1][c.currentSlot] = Cursor - } - - tableFormat := "" - for _, r := range table { - tableFormat += strings.Join(r, " ") - tableFormat += "\n" - } - - fmt.Println(tableFormat) -} - -func (c *SchedulerCycle) getJob(id uuid.UUID) *job.Job { - c.l.RLock() - defer c.l.RUnlock() - - j, ok := c.jobs[id] - if !ok { - return nil - } - - return j -} - -// getCurrentSlotJobs collects all the current slot jobs -// and clean the slot. -func (c *SchedulerCycle) getCurrentSlotJobs() (int, []*job.Job) { - c.l.Lock() - defer c.l.Unlock() - - jobs := c.slots[c.currentSlot] - - c.slots[c.currentSlot] = []*job.Job{} - - return c.currentSlot, jobs -} - -// updateSlot add a job to the slot where it was before. -func (c *SchedulerCycle) updateSlot(row int, j *job.Job) { - c.l.Lock() - defer c.l.Unlock() - - c.slots[row] = append(c.slots[row], j) -} - -// updateCurrentSlot add a job to the current slot. -func (c *SchedulerCycle) updateCurrentSlot(j *job.Job) { - c.l.Lock() - defer c.l.Unlock() - - c.slots[c.currentSlot] = append(c.slots[c.currentSlot], j) -} - -// incr increments the slot cursor. -// It the cursor reaches `MaxSlotsIdx`, it goes back to 0. -func (c *SchedulerCycle) incr() { - c.l.Lock() - defer c.l.Unlock() - - nextSlot := c.currentSlot + 1 - if nextSlot > MaxSlotsIdx { - nextSlot = 0 - } - - c.currentSlot = nextSlot -} - -// dispatch gets jobs from the current slot, resets the slot -// and dispatch all jobs to the workers. -// -// It all the workers are busy, the jobs are re-schedule in the same slot -// to be executed in the next cycle. -func (c *SchedulerCycle) dispatch() { - row, jobs := c.getCurrentSlotJobs() - for _, j := range jobs { - if j.GetState() == job.Abort { - continue - } - - select { - case c.chJobs <- &JobSlot{row: row, Job: j}: - default: - log.Warn().Msg("unable to put job in workers, trying next cycle") - c.updateSlot(row, j) - } - } -} - -// run launches the workers and the ticker. -func (c *SchedulerCycle) run() { - c.workers() - c.tick() -} - -// workers launches `MaxWorkers` number of worker to execute job. -// If job returns `ErrJobNotCompletedYet`, it re-schedules in the same slot. -func (c *SchedulerCycle) workers() { - for i := 0; i < MaxWorkers; i++ { - c.wg.Add(1) - go func() { - defer c.wg.Done() - for { - select { - case j := <-c.chJobs: - c.executeJob(j.Job, c.updateCurrentSlot) - case <-c.ctx.Done(): - log.Error().Msg("context done, worker is stopping...") - return - } - } - }() - } -} - -func (c *SchedulerCycle) executeJob(j *job.Job, fnFallBack func(*job.Job)) { - j.Run(c.ctx) - if j.GetState() == job.Pending { - fnFallBack(j) - } -} - -// tick is a simple ticker incrementing at each scheduler interval, -// the slot cursor and dispatch jobs to the workers. -func (c *SchedulerCycle) tick() { - c.wg.Add(1) - go func() { - defer c.wg.Done() - for { - select { - case <-c.ctx.Done(): - log.Error().Msg("context done, ticker is stopping...") - return - default: - time.Sleep(c.interval) - c.incr() - c.dispatch() - } - } - }() -} diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index f5af0b8..faa8c15 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -14,7 +14,7 @@ func TestSlot(t *testing.T) { ctx, fnCancel := context.WithCancel(context.Background()) defer fnCancel() - s := NewSchedulerCycle(ctx, 1*time.Millisecond) + s := NewSchedulerCycle(ctx, 1*time.Millisecond, 5) s.Delay(func(ctx context.Context) error { return nil @@ -29,5 +29,5 @@ func TestSlot(t *testing.T) { time.Sleep(2 * time.Millisecond) assert.Equal(t, 3, s.Len()) - assert.Equal(t, job.Failed.String(), s.GetJobDetails(j3).State) + assert.Equal(t, job.Failed.String(), s.GetTaskDetails(j3).State) } diff --git a/internal/scheduler/task.go b/internal/scheduler/task.go new file mode 100644 index 0000000..026ae32 --- /dev/null +++ b/internal/scheduler/task.go @@ -0,0 +1,152 @@ +package scheduler + +import ( + "context" + "cycle-scheduler/internal/job" + "sync" + "sync/atomic" + "time" + + "github.com/google/uuid" +) + +// atomicTimer wraps a `time.Timer`. +type atomicTimer struct { + atomic.Pointer[time.Timer] +} + +func (at *atomicTimer) stop() { + timer := at.Load() + if timer != nil { + timer.Stop() + } +} + +// set replaces the current timer. +// It also ensures that the current timer is stopped. +func (at *atomicTimer) set(t *time.Timer) { + timer := at.Load() + if timer != nil { + timer.Stop() + at.Swap(t) + return + } + + at.Swap(t) +} + +type TaskDetails struct { + job.JobDetails + Attempts int `json:"attempts"` +} + +type task struct { + *job.Job + attempts atomic.Uint32 + timer atomicTimer +} + +func newTask(f job.FnJob) *task { + j := job.NewJob(f) + t := task{ + Job: &j, + timer: atomicTimer{}, + } + + return &t +} + +func (t *task) abort() { + t.timer.stop() + t.Job.Abort() +} + +func (t *task) run(ctx context.Context) { + t.attempts.Add(1) + t.Job.Run(ctx) +} + +func (t *task) getDetails() TaskDetails { + return TaskDetails{ + JobDetails: t.IntoDetails(), + Attempts: int(t.attempts.Load()), + } +} + +type tasks struct { + l sync.RWMutex + s map[uuid.UUID]*task +} + +func newTasks() tasks { + return tasks{ + s: make(map[uuid.UUID]*task), + } +} + +func (ts *tasks) add(t *task) { + ts.l.Lock() + defer ts.l.Unlock() + + ts.s[t.GetID()] = t +} + +func (ts *tasks) get(id uuid.UUID) *task { + ts.l.RLock() + defer ts.l.RUnlock() + + j, ok := ts.s[id] + if !ok { + return nil + } + + return j +} + +func (ts *tasks) len() int { + ts.l.RLock() + defer ts.l.RUnlock() + + return len(ts.s) +} + +func (ts *tasks) completed() bool { + ts.l.RLock() + defer ts.l.RUnlock() + + for _, t := range ts.s { + if t.GetState() == job.Pending || t.GetState() == job.Running { + return false + } + } + + return true +} + +func (ts *tasks) getAllDetails() []TaskDetails { + ts.l.RLock() + defer ts.l.RUnlock() + + details := []TaskDetails{} + for _, t := range ts.s { + details = append(details, t.getDetails()) + } + + return details +} + +func (ts *tasks) getDetails(id uuid.UUID) TaskDetails { + ts.l.RLock() + defer ts.l.RUnlock() + + t, ok := ts.s[id] + if !ok { + return TaskDetails{ + JobDetails: job.JobDetails{ + State: job.UnknownState, + }, + } + } + + return t.getDetails() +} diff --git a/main.go b/main.go index 894d704..58b6ceb 100644 --- a/main.go +++ b/main.go @@ -16,6 +16,11 @@ import ( "github.com/rs/zerolog/log" ) +const ( + MaxWorkers = 5 + Interval = 2000 * time.Millisecond +) + func initLogger() { zerolog.TimeFieldFormat = zerolog.TimeFormatUnix log.Logger = log.With().Caller().Logger().Output(zerolog.ConsoleWriter{Out: os.Stderr}) @@ -31,9 +36,7 @@ func main() { ) defer stop() - interval := 200 * time.Millisecond - s := scheduler.NewSchedulerCycle(ctx, interval) - s.Display() + s := scheduler.NewSchedulerCycle(ctx, Interval, MaxWorkers) // pending test for i := 0; i < 20; i++ { @@ -101,7 +104,7 @@ func main() { go func() { for { time.Sleep(2 * time.Second) //nolint:mnd // test purpose - if s.HasAllJobsDone() { + if s.TasksDone() { s.Stop() return } @@ -110,11 +113,11 @@ func main() { <-s.Done() - jds := s.GetJobsDetails() - for _, jd := range jds { - c, err := json.Marshal(&jd) + ts := s.GetTasksDetails() + for _, t := range ts { + c, err := json.Marshal(&t) if err != nil { - log.Err(err).Str("job", jd.ID.String()).Msg("unable to parse job details into JSON") + log.Err(err).Str("task", t.ID.String()).Msg("unable to parse task details into JSON") continue } fmt.Println(string(c))