Merge pull request 'feat/engine' (#1) from feat/engine into main

Reviewed-on: https://gitea.thegux.fr/rmanach/cycle-scheduler/pulls/1
This commit is contained in:
rmanach 2024-09-24 14:47:21 +00:00
commit 3f1afb63d4
6 changed files with 274 additions and 355 deletions

View File

@ -1,48 +1,18 @@
# cycle-scheduler # cycle-scheduler
cycle-scheduler is a simple scheduler handling jobs and executes them at regular interval. cycle-scheduler is a simple scheduler handling tasks and executes them at regular interval. If a task is not in desired state, the task is re-scheduled with a backoff.
Here a simple representation:
```ascii
+------------------------------------------------------+
| +---+ +---+ +---+ +---+ +---+ +---+ |
| | | | | | | | | | | | | |
| | | | | | | | | | | | | |
| | | | | | | | | | | | | |
| | | | | | | | | | | | | |
| | | | | | | | | | | | | |
| | | | | | | | | | | | | |
| |s1 | |s2 | |s3 | |s4 | | | |s60| |
| +---+ +---+ +---+ +---+ +---+ +---+ |
+---------------^--------------------------------------+
```
Jobs are handle in a array of job slices.
At each interval (clock), the cursor `^` moves to the next slot (s*).
If there are jobs, they are sent to workers to be executed
and the slot is cleaned.
At the end of the slot (s60), the cursor re-starts a new cycle from s1.
If a job is not in a desire state, the job is re-scheduled in the current slot to be re-executed in the next cycle.
**NOTE**: This scheduler does not accept long running tasks. Job execution have a fixed timeout of 10s.
Pooling tasks are more suitable for this kind of scheduler.
## Run ## Run
You can run sample tests from `main.go` to see the scheduler in action: You can run sample tests from `main.go` to see the scheduler in action:
```bash ```bash
make run make run
``` ```
If all goes well, you should see this kind of output in the stdout:
```ascii
# cycle-scheduler (slot: 7)
_ P _ _ _ _ _ _ _ _ _ _ _ _
- - - - - - ^ - - - - - - -
```
> **P** means *pending* state
You can adjust the clock interval as needed in `main.go`: You can adjust the clock interval and the number of workers as needed in `main.go` constants section:
```go ```go
interval := 200 * time.Millisecond const (
MaxWorkers = 5
Interval = 2000 * time.Millisecond
)
``` ```

View File

@ -59,7 +59,6 @@ type JobDetails struct {
Err string `json:"error"` Err string `json:"error"`
} }
// TODO(rmanach): add priority level
type Job struct { type Job struct {
l sync.RWMutex l sync.RWMutex
id uuid.UUID id uuid.UUID
@ -71,7 +70,7 @@ type Job struct {
chAbort chan struct{} chAbort chan struct{}
} }
func NewJob(task FnJob, row, col int) Job { func NewJob(task FnJob) Job {
return Job{ return Job{
id: uuid.New(), id: uuid.New(),
createdAt: time.Now().UTC(), createdAt: time.Now().UTC(),
@ -130,7 +129,10 @@ func (j *Job) setFail(err error) {
now := time.Now().UTC() now := time.Now().UTC()
j.updatedAt = &now j.updatedAt = &now
if j.state != Abort {
j.state = Failed j.state = Failed
}
j.err = err j.err = err
} }

View File

@ -3,8 +3,7 @@ package scheduler
import ( import (
"context" "context"
"cycle-scheduler/internal/job" "cycle-scheduler/internal/job"
"fmt" "math"
"strings"
"sync" "sync"
"time" "time"
@ -12,45 +11,23 @@ import (
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
const ( const ExponentialFactor = 1.8
TableTitle = "# cycle-scheduler"
Cursor = "^"
CycleLength = 60
MaxWorkers = 5
)
const MaxSlotsIdx = 59 // SchedulerCycle is a simple scheduler handling jobs and executes them at regular interval.
// If a task is not in desired state, the task is re-scheduled with a backoff.
type JobSlot struct {
*job.Job
row int
}
// SchedulerCycle is a dumb scheduler.
// It handle job and executes it at each cycle (60 * interval).
//
// Jobs are handle in a array of job slices.
// At each interval (clock), the cursor moves to the next slot (s*).
// If there are jobs, they are sent to workers to be executed
// and the slot is cleaned.
//
// At the end of the slot (s60), the cursor re-starts a cycle at s1.
type SchedulerCycle struct { type SchedulerCycle struct {
l sync.RWMutex
wg sync.WaitGroup wg sync.WaitGroup
ctx context.Context ctx context.Context
fnCancel context.CancelFunc fnCancel context.CancelFunc
interval time.Duration interval time.Duration
currentSlot int tasks tasks
slots [60][]*job.Job
jobs map[uuid.UUID]*job.Job
chJobs chan *JobSlot chTasks chan *task
} }
func NewSchedulerCycle(ctx context.Context, interval time.Duration) *SchedulerCycle { func NewSchedulerCycle(ctx context.Context, interval time.Duration, workers uint32) *SchedulerCycle {
ctxChild, fnCancel := context.WithCancel(ctx) ctxChild, fnCancel := context.WithCancel(ctx)
c := SchedulerCycle{ c := SchedulerCycle{
@ -58,17 +35,71 @@ func NewSchedulerCycle(ctx context.Context, interval time.Duration) *SchedulerCy
ctx: ctxChild, ctx: ctxChild,
fnCancel: fnCancel, fnCancel: fnCancel,
interval: interval, interval: interval,
currentSlot: 0, tasks: newTasks(),
slots: [60][]*job.Job{}, chTasks: make(chan *task),
jobs: make(map[uuid.UUID]*job.Job),
chJobs: make(chan *JobSlot),
} }
c.run() c.run(workers)
return &c return &c
} }
func (c *SchedulerCycle) backoff(t *task) {
backoff := c.interval + time.Duration(math.Pow(ExponentialFactor, float64(t.attempts.Load())))
t.timer.set(
time.AfterFunc(backoff, func() {
select {
case c.chTasks <- t:
default:
log.Error().Str("task id", t.GetID().String()).Msg("unable to execute task to the worker, delayed it")
c.backoff(t)
}
}),
)
}
// exec runs the task now or if all the workers are in use, delayed it.
func (c *SchedulerCycle) exec(t *task) {
select {
case c.chTasks <- t:
default:
log.Error().Str("task id", t.GetID().String()).Msg("unable to execute the task to a worker now, delayed it")
c.backoff(t)
}
}
func (c *SchedulerCycle) getTask(id uuid.UUID) *task {
return c.tasks.get(id)
}
// run launches a number of worker to execute tasks.
// If a task returns `ErrJobNotCompletedYet`, it re-schedules with a backoff.
func (c *SchedulerCycle) run(n uint32) {
for i := 0; i < int(n); i++ {
c.wg.Add(1)
go func() {
defer c.wg.Done()
for {
select {
case t := <-c.chTasks:
c.execute(t, c.backoff)
case <-c.ctx.Done():
log.Error().Msg("context done, worker is stopping...")
return
}
}
}()
}
}
func (c *SchedulerCycle) execute(t *task, fnFallBack func(*task)) {
t.run(c.ctx)
if t.GetState() == job.Pending {
fnFallBack(t)
}
}
func (c *SchedulerCycle) Stop() { func (c *SchedulerCycle) Stop() {
c.fnCancel() c.fnCancel()
} }
@ -77,295 +108,56 @@ func (c *SchedulerCycle) Done() <-chan struct{} {
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
<-c.ctx.Done() <-c.ctx.Done()
c.wg.Done() c.wg.Wait()
done <- struct{}{} done <- struct{}{}
}() }()
return done return done
} }
func (c *SchedulerCycle) Len() int { func (c *SchedulerCycle) Len() int {
c.l.Lock() return c.tasks.len()
defer c.l.Unlock()
return len(c.jobs)
} }
func (c *SchedulerCycle) HasAllJobsDone() bool { // TasksDone checks whether all the tasks has been completed.
c.l.Lock() func (c *SchedulerCycle) TasksDone() bool {
defer c.l.Unlock() return c.tasks.completed()
for _, j := range c.jobs {
if j.GetState() == job.Pending || j.GetState() == job.Running {
return false
}
}
return true
} }
func (c *SchedulerCycle) GetJobsDetails() []job.JobDetails { func (c *SchedulerCycle) GetTasksDetails() []TaskDetails {
c.l.Lock() return c.tasks.getAllDetails()
defer c.l.Unlock()
details := []job.JobDetails{}
for _, j := range c.jobs {
details = append(details, j.IntoDetails())
}
return details
} }
// Delay builds a job and add it to the scheduler engine. // GetTaskDetails returns the task details by id.
func (c *SchedulerCycle) GetTaskDetails(id uuid.UUID) TaskDetails {
return c.tasks.getDetails(id)
}
// Delay builds a task and add it to the scheduler engine.
func (c *SchedulerCycle) Delay(fnJob job.FnJob) uuid.UUID { func (c *SchedulerCycle) Delay(fnJob job.FnJob) uuid.UUID {
c.l.Lock() select {
defer c.l.Unlock() case <-c.Done():
log.Error().Msg("context done unable to add new job")
nextSlot := c.currentSlot + 1 default:
if nextSlot > MaxSlotsIdx {
nextSlot = 0
} }
j := job.NewJob(fnJob, nextSlot, len(c.slots[nextSlot])) t := newTask(fnJob)
c.slots[nextSlot] = append(c.slots[nextSlot], &j) c.tasks.add(t)
c.jobs[j.GetID()] = &j
log.Info().Str("job", j.GetID().String()).Msg("job added successfully") c.exec(t)
return j.GetID()
log.Info().Str("task", t.GetID().String()).Msg("task added successfully")
return t.GetID()
} }
// Abort aborts the job given by its id if it exists.. // Abort aborts the task given by its id if it exists.
func (c *SchedulerCycle) Abort(id uuid.UUID) bool { func (c *SchedulerCycle) Abort(id uuid.UUID) bool {
if j := c.getJob(id); j != nil { if t := c.getTask(id); t != nil {
j.Abort() t.abort()
log.Info().Str("job", j.GetID().String()).Msg("abort job done") log.Info().Str("task id", t.GetID().String()).Msg("abort task done")
return true return true
} }
return false return false
} }
// GetJobDetails returns the job details by .
func (c *SchedulerCycle) GetJobDetails(id uuid.UUID) job.JobDetails {
c.l.Lock()
defer c.l.Unlock()
j, ok := c.jobs[id]
if !ok {
return job.JobDetails{
State: job.Unknown.String(),
}
}
return j.IntoDetails()
}
// Display outputs earch interval the scheduler state.
func (c *SchedulerCycle) Display() {
ticker := time.NewTicker(c.interval)
go func() {
for range ticker.C {
c.display()
}
}()
}
// display writes to stdout the state of the scheduler as a table.
func (c *SchedulerCycle) display() { //nolint:gocyclo // not complex
c.l.RLock()
defer c.l.RUnlock()
var maxCols int
for i := range c.slots {
if l := len(c.slots[i]); l > maxCols {
maxCols = l
}
}
table := [][]string{}
title := fmt.Sprintf("%s (slot: %d)", TableTitle, c.currentSlot+1)
table = append(table, []string{title})
for {
if maxCols == 0 {
break
}
row := make([]string, CycleLength)
for i := 0; i <= MaxSlotsIdx; i++ {
row[i] = "_"
}
for i := range c.slots {
if len(c.slots[i]) < maxCols {
continue
}
j := c.slots[i][maxCols-1]
switch j.GetState() {
case job.Pending:
row[i] = "P"
case job.Running:
row[i] = "R"
case job.Failed:
row[i] = "X"
case job.Abort:
row[i] = "A"
case job.Unknown:
row[i] = "?"
case job.Success:
row[i] = "O"
}
}
table = append(table, row)
maxCols--
}
row := make([]string, CycleLength)
for i := 0; i <= MaxSlotsIdx; i++ {
row[i] = "-"
}
table = append(table, row)
if l := len(table); l > 0 {
table[l-1][c.currentSlot] = Cursor
}
tableFormat := ""
for _, r := range table {
tableFormat += strings.Join(r, " ")
tableFormat += "\n"
}
fmt.Println(tableFormat)
}
func (c *SchedulerCycle) getJob(id uuid.UUID) *job.Job {
c.l.RLock()
defer c.l.RUnlock()
j, ok := c.jobs[id]
if !ok {
return nil
}
return j
}
// getCurrentSlotJobs collects all the current slot jobs
// and clean the slot.
func (c *SchedulerCycle) getCurrentSlotJobs() (int, []*job.Job) {
c.l.Lock()
defer c.l.Unlock()
jobs := c.slots[c.currentSlot]
c.slots[c.currentSlot] = []*job.Job{}
return c.currentSlot, jobs
}
// updateSlot add a job to the slot where it was before.
func (c *SchedulerCycle) updateSlot(row int, j *job.Job) {
c.l.Lock()
defer c.l.Unlock()
c.slots[row] = append(c.slots[row], j)
}
// updateCurrentSlot add a job to the current slot.
func (c *SchedulerCycle) updateCurrentSlot(j *job.Job) {
c.l.Lock()
defer c.l.Unlock()
c.slots[c.currentSlot] = append(c.slots[c.currentSlot], j)
}
// incr increments the slot cursor.
// It the cursor reaches `MaxSlotsIdx`, it goes back to 0.
func (c *SchedulerCycle) incr() {
c.l.Lock()
defer c.l.Unlock()
nextSlot := c.currentSlot + 1
if nextSlot > MaxSlotsIdx {
nextSlot = 0
}
c.currentSlot = nextSlot
}
// dispatch gets jobs from the current slot, resets the slot
// and dispatch all jobs to the workers.
//
// It all the workers are busy, the jobs are re-schedule in the same slot
// to be executed in the next cycle.
func (c *SchedulerCycle) dispatch() {
row, jobs := c.getCurrentSlotJobs()
for _, j := range jobs {
if j.GetState() == job.Abort {
continue
}
select {
case c.chJobs <- &JobSlot{row: row, Job: j}:
default:
log.Warn().Msg("unable to put job in workers, trying next cycle")
c.updateSlot(row, j)
}
}
}
// run launches the workers and the ticker.
func (c *SchedulerCycle) run() {
c.workers()
c.tick()
}
// workers launches `MaxWorkers` number of worker to execute job.
// If job returns `ErrJobNotCompletedYet`, it re-schedules in the same slot.
func (c *SchedulerCycle) workers() {
for i := 0; i < MaxWorkers; i++ {
c.wg.Add(1)
go func() {
defer c.wg.Done()
for {
select {
case j := <-c.chJobs:
c.executeJob(j.Job, c.updateCurrentSlot)
case <-c.ctx.Done():
log.Error().Msg("context done, worker is stopping...")
return
}
}
}()
}
}
func (c *SchedulerCycle) executeJob(j *job.Job, fnFallBack func(*job.Job)) {
j.Run(c.ctx)
if j.GetState() == job.Pending {
fnFallBack(j)
}
}
// tick is a simple ticker incrementing at each scheduler interval,
// the slot cursor and dispatch jobs to the workers.
func (c *SchedulerCycle) tick() {
c.wg.Add(1)
go func() {
defer c.wg.Done()
for {
select {
case <-c.ctx.Done():
log.Error().Msg("context done, ticker is stopping...")
return
default:
time.Sleep(c.interval)
c.incr()
c.dispatch()
}
}
}()
}

View File

@ -14,7 +14,7 @@ func TestSlot(t *testing.T) {
ctx, fnCancel := context.WithCancel(context.Background()) ctx, fnCancel := context.WithCancel(context.Background())
defer fnCancel() defer fnCancel()
s := NewSchedulerCycle(ctx, 1*time.Millisecond) s := NewSchedulerCycle(ctx, 1*time.Millisecond, 5)
s.Delay(func(ctx context.Context) error { s.Delay(func(ctx context.Context) error {
return nil return nil
@ -29,5 +29,5 @@ func TestSlot(t *testing.T) {
time.Sleep(2 * time.Millisecond) time.Sleep(2 * time.Millisecond)
assert.Equal(t, 3, s.Len()) assert.Equal(t, 3, s.Len())
assert.Equal(t, job.Failed.String(), s.GetJobDetails(j3).State) assert.Equal(t, job.Failed.String(), s.GetTaskDetails(j3).State)
} }

152
internal/scheduler/task.go Normal file
View File

@ -0,0 +1,152 @@
package scheduler
import (
"context"
"cycle-scheduler/internal/job"
"sync"
"sync/atomic"
"time"
"github.com/google/uuid"
)
// atomicTimer wraps a `time.Timer`.
type atomicTimer struct {
atomic.Pointer[time.Timer]
}
func (at *atomicTimer) stop() {
timer := at.Load()
if timer != nil {
timer.Stop()
}
}
// set replaces the current timer.
// It also ensures that the current timer is stopped.
func (at *atomicTimer) set(t *time.Timer) {
timer := at.Load()
if timer != nil {
timer.Stop()
at.Swap(t)
return
}
at.Swap(t)
}
type TaskDetails struct {
job.JobDetails
Attempts int `json:"attempts"`
}
type task struct {
*job.Job
attempts atomic.Uint32
timer atomicTimer
}
func newTask(f job.FnJob) *task {
j := job.NewJob(f)
t := task{
Job: &j,
timer: atomicTimer{},
}
return &t
}
func (t *task) abort() {
t.timer.stop()
t.Job.Abort()
}
func (t *task) run(ctx context.Context) {
t.attempts.Add(1)
t.Job.Run(ctx)
}
func (t *task) getDetails() TaskDetails {
return TaskDetails{
JobDetails: t.IntoDetails(),
Attempts: int(t.attempts.Load()),
}
}
type tasks struct {
l sync.RWMutex
s map[uuid.UUID]*task
}
func newTasks() tasks {
return tasks{
s: make(map[uuid.UUID]*task),
}
}
func (ts *tasks) add(t *task) {
ts.l.Lock()
defer ts.l.Unlock()
ts.s[t.GetID()] = t
}
func (ts *tasks) get(id uuid.UUID) *task {
ts.l.RLock()
defer ts.l.RUnlock()
j, ok := ts.s[id]
if !ok {
return nil
}
return j
}
func (ts *tasks) len() int {
ts.l.RLock()
defer ts.l.RUnlock()
return len(ts.s)
}
func (ts *tasks) completed() bool {
ts.l.RLock()
defer ts.l.RUnlock()
for _, t := range ts.s {
if t.GetState() == job.Pending || t.GetState() == job.Running {
return false
}
}
return true
}
func (ts *tasks) getAllDetails() []TaskDetails {
ts.l.RLock()
defer ts.l.RUnlock()
details := []TaskDetails{}
for _, t := range ts.s {
details = append(details, t.getDetails())
}
return details
}
func (ts *tasks) getDetails(id uuid.UUID) TaskDetails {
ts.l.RLock()
defer ts.l.RUnlock()
t, ok := ts.s[id]
if !ok {
return TaskDetails{
JobDetails: job.JobDetails{
State: job.UnknownState,
},
}
}
return t.getDetails()
}

19
main.go
View File

@ -16,6 +16,11 @@ import (
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
const (
MaxWorkers = 5
Interval = 2000 * time.Millisecond
)
func initLogger() { func initLogger() {
zerolog.TimeFieldFormat = zerolog.TimeFormatUnix zerolog.TimeFieldFormat = zerolog.TimeFormatUnix
log.Logger = log.With().Caller().Logger().Output(zerolog.ConsoleWriter{Out: os.Stderr}) log.Logger = log.With().Caller().Logger().Output(zerolog.ConsoleWriter{Out: os.Stderr})
@ -31,9 +36,7 @@ func main() {
) )
defer stop() defer stop()
interval := 200 * time.Millisecond s := scheduler.NewSchedulerCycle(ctx, Interval, MaxWorkers)
s := scheduler.NewSchedulerCycle(ctx, interval)
s.Display()
// pending test // pending test
for i := 0; i < 20; i++ { for i := 0; i < 20; i++ {
@ -101,7 +104,7 @@ func main() {
go func() { go func() {
for { for {
time.Sleep(2 * time.Second) //nolint:mnd // test purpose time.Sleep(2 * time.Second) //nolint:mnd // test purpose
if s.HasAllJobsDone() { if s.TasksDone() {
s.Stop() s.Stop()
return return
} }
@ -110,11 +113,11 @@ func main() {
<-s.Done() <-s.Done()
jds := s.GetJobsDetails() ts := s.GetTasksDetails()
for _, jd := range jds { for _, t := range ts {
c, err := json.Marshal(&jd) c, err := json.Marshal(&t)
if err != nil { if err != nil {
log.Err(err).Str("job", jd.ID.String()).Msg("unable to parse job details into JSON") log.Err(err).Str("task", t.ID.String()).Msg("unable to parse task details into JSON")
continue continue
} }
fmt.Println(string(c)) fmt.Println(string(c))