clean scheduler file + move tasks map

This commit is contained in:
rmanach 2024-09-24 16:45:09 +02:00
parent ec53ec990a
commit df9a0ffbbc
5 changed files with 129 additions and 92 deletions

View File

@ -1,6 +1,6 @@
# cycle-scheduler # cycle-scheduler
cycle-scheduler is a simple scheduler handling jobs and executes them at regular interval. If a task is not in desired state, the task is re-scheduled with a backoff. cycle-scheduler is a simple scheduler handling tasks and executes them at regular interval. If a task is not in desired state, the task is re-scheduled with a backoff.
## Run ## Run
You can run sample tests from `main.go` to see the scheduler in action: You can run sample tests from `main.go` to see the scheduler in action:

View File

@ -16,14 +16,13 @@ const ExponentialFactor = 1.8
// SchedulerCycle is a simple scheduler handling jobs and executes them at regular interval. // SchedulerCycle is a simple scheduler handling jobs and executes them at regular interval.
// If a task is not in desired state, the task is re-scheduled with a backoff. // If a task is not in desired state, the task is re-scheduled with a backoff.
type SchedulerCycle struct { type SchedulerCycle struct {
l sync.RWMutex
wg sync.WaitGroup wg sync.WaitGroup
ctx context.Context ctx context.Context
fnCancel context.CancelFunc fnCancel context.CancelFunc
interval time.Duration interval time.Duration
tasks map[uuid.UUID]*task tasks tasks
chTasks chan *task chTasks chan *task
} }
@ -36,7 +35,7 @@ func NewSchedulerCycle(ctx context.Context, interval time.Duration, workers uint
ctx: ctxChild, ctx: ctxChild,
fnCancel: fnCancel, fnCancel: fnCancel,
interval: interval, interval: interval,
tasks: make(map[uuid.UUID]*task), tasks: newTasks(),
chTasks: make(chan *task), chTasks: make(chan *task),
} }
@ -45,45 +44,37 @@ func NewSchedulerCycle(ctx context.Context, interval time.Duration, workers uint
return &c return &c
} }
func (c *SchedulerCycle) backoff(j *task) { func (c *SchedulerCycle) backoff(t *task) {
backoff := c.interval + time.Duration(math.Pow(ExponentialFactor, float64(j.attempts.Load()))) backoff := c.interval + time.Duration(math.Pow(ExponentialFactor, float64(t.attempts.Load())))
j.timer.set( t.timer.set(
time.AfterFunc(backoff, func() { time.AfterFunc(backoff, func() {
select { select {
case c.chTasks <- j: case c.chTasks <- t:
default: default:
log.Error().Str("job id", j.GetID().String()).Msg("unable to execute job to a worker") log.Error().Str("task id", t.GetID().String()).Msg("unable to execute task to the worker, delayed it")
c.backoff(j) c.backoff(t)
} }
}), }),
) )
} }
// exec runs the job now or if all the workers are in use, delayed it. // exec runs the task now or if all the workers are in use, delayed it.
func (c *SchedulerCycle) exec(j *task) { func (c *SchedulerCycle) exec(t *task) {
select { select {
case c.chTasks <- j: case c.chTasks <- t:
default: default:
log.Error().Str("job id", j.GetID().String()).Msg("unable to execute job to a worker now, delayed it") log.Error().Str("task id", t.GetID().String()).Msg("unable to execute the task to a worker now, delayed it")
c.backoff(j) c.backoff(t)
} }
} }
func (c *SchedulerCycle) getTask(id uuid.UUID) *task { func (c *SchedulerCycle) getTask(id uuid.UUID) *task {
c.l.RLock() return c.tasks.get(id)
defer c.l.RUnlock()
j, ok := c.tasks[id]
if !ok {
return nil
}
return j
} }
// run launches a number of worker to execute job. // run launches a number of worker to execute tasks.
// If job returns `ErrJobNotCompletedYet`, it re-schedules with a backoff. // If a task returns `ErrJobNotCompletedYet`, it re-schedules with a backoff.
func (c *SchedulerCycle) run(n uint32) { func (c *SchedulerCycle) run(n uint32) {
for i := 0; i < int(n); i++ { for i := 0; i < int(n); i++ {
c.wg.Add(1) c.wg.Add(1)
@ -91,8 +82,8 @@ func (c *SchedulerCycle) run(n uint32) {
defer c.wg.Done() defer c.wg.Done()
for { for {
select { select {
case j := <-c.chTasks: case t := <-c.chTasks:
c.executeJob(j, c.backoff) c.execute(t, c.backoff)
case <-c.ctx.Done(): case <-c.ctx.Done():
log.Error().Msg("context done, worker is stopping...") log.Error().Msg("context done, worker is stopping...")
return return
@ -102,10 +93,10 @@ func (c *SchedulerCycle) run(n uint32) {
} }
} }
func (c *SchedulerCycle) executeJob(j *task, fnFallBack func(*task)) { func (c *SchedulerCycle) execute(t *task, fnFallBack func(*task)) {
j.run(c.ctx) t.run(c.ctx)
if j.GetState() == job.Pending { if t.GetState() == job.Pending {
fnFallBack(j) fnFallBack(t)
} }
} }
@ -124,36 +115,21 @@ func (c *SchedulerCycle) Done() <-chan struct{} {
} }
func (c *SchedulerCycle) Len() int { func (c *SchedulerCycle) Len() int {
c.l.RLock() return c.tasks.len()
defer c.l.RUnlock()
return len(c.tasks)
} }
// HasAllJobsDone checks whether all the jobs has been executed. // TasksDone checks whether all the tasks has been completed.
func (c *SchedulerCycle) HasAllJobsDone() bool { func (c *SchedulerCycle) TasksDone() bool {
c.l.RLock() return c.tasks.completed()
defer c.l.RUnlock()
for _, t := range c.tasks {
if t.GetState() == job.Pending || t.GetState() == job.Running {
return false
}
}
return true
} }
func (c *SchedulerCycle) GetJobsDetails() []TaskDetails { func (c *SchedulerCycle) GetTasksDetails() []TaskDetails {
c.l.RLock() return c.tasks.getAllDetails()
defer c.l.RUnlock() }
details := []TaskDetails{} // GetTaskDetails returns the task details by id.
for _, t := range c.tasks { func (c *SchedulerCycle) GetTaskDetails(id uuid.UUID) TaskDetails {
details = append(details, t.getDetails()) return c.tasks.getDetails(id)
}
return details
} }
// Delay builds a task and add it to the scheduler engine. // Delay builds a task and add it to the scheduler engine.
@ -164,44 +140,24 @@ func (c *SchedulerCycle) Delay(fnJob job.FnJob) uuid.UUID {
default: default:
} }
j := newTask(fnJob) t := newTask(fnJob)
c.l.Lock() c.tasks.add(t)
defer c.l.Unlock()
c.tasks[j.GetID()] = j c.exec(t)
c.exec(j) log.Info().Str("task", t.GetID().String()).Msg("task added successfully")
return t.GetID()
log.Info().Str("job", j.GetID().String()).Msg("job added successfully")
return j.GetID()
} }
// Abort aborts the job given by its id if it exists. // Abort aborts the task given by its id if it exists.
func (c *SchedulerCycle) Abort(id uuid.UUID) bool { func (c *SchedulerCycle) Abort(id uuid.UUID) bool {
if j := c.getTask(id); j != nil { if t := c.getTask(id); t != nil {
j.abort() t.abort()
log.Info().Str("job", j.GetID().String()).Msg("abort job done") log.Info().Str("task id", t.GetID().String()).Msg("abort task done")
return true return true
} }
return false return false
} }
// GetJobDetails returns the task details by id.
func (c *SchedulerCycle) GetJobDetails(id uuid.UUID) TaskDetails {
c.l.RLock()
defer c.l.RUnlock()
j, ok := c.tasks[id]
if !ok {
return TaskDetails{
JobDetails: job.JobDetails{
State: job.UnknownState,
},
}
}
return j.getDetails()
}

View File

@ -29,5 +29,5 @@ func TestSlot(t *testing.T) {
time.Sleep(2 * time.Millisecond) time.Sleep(2 * time.Millisecond)
assert.Equal(t, 3, s.Len()) assert.Equal(t, 3, s.Len())
assert.Equal(t, job.Failed.String(), s.GetJobDetails(j3).State) assert.Equal(t, job.Failed.String(), s.GetTaskDetails(j3).State)
} }

View File

@ -3,8 +3,11 @@ package scheduler
import ( import (
"context" "context"
"cycle-scheduler/internal/job" "cycle-scheduler/internal/job"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/google/uuid"
) )
// atomicTimer wraps a `time.Timer`. // atomicTimer wraps a `time.Timer`.
@ -69,3 +72,81 @@ func (t *task) getDetails() TaskDetails {
Attempts: int(t.attempts.Load()), Attempts: int(t.attempts.Load()),
} }
} }
type tasks struct {
l sync.RWMutex
s map[uuid.UUID]*task
}
func newTasks() tasks {
return tasks{
s: make(map[uuid.UUID]*task),
}
}
func (ts *tasks) add(t *task) {
ts.l.Lock()
defer ts.l.Unlock()
ts.s[t.GetID()] = t
}
func (ts *tasks) get(id uuid.UUID) *task {
ts.l.RLock()
defer ts.l.RUnlock()
j, ok := ts.s[id]
if !ok {
return nil
}
return j
}
func (ts *tasks) len() int {
ts.l.RLock()
defer ts.l.RUnlock()
return len(ts.s)
}
func (ts *tasks) completed() bool {
ts.l.RLock()
defer ts.l.RUnlock()
for _, t := range ts.s {
if t.GetState() == job.Pending || t.GetState() == job.Running {
return false
}
}
return true
}
func (ts *tasks) getAllDetails() []TaskDetails {
ts.l.RLock()
defer ts.l.RUnlock()
details := []TaskDetails{}
for _, t := range ts.s {
details = append(details, t.getDetails())
}
return details
}
func (ts *tasks) getDetails(id uuid.UUID) TaskDetails {
ts.l.RLock()
defer ts.l.RUnlock()
t, ok := ts.s[id]
if !ok {
return TaskDetails{
JobDetails: job.JobDetails{
State: job.UnknownState,
},
}
}
return t.getDetails()
}

10
main.go
View File

@ -104,7 +104,7 @@ func main() {
go func() { go func() {
for { for {
time.Sleep(2 * time.Second) //nolint:mnd // test purpose time.Sleep(2 * time.Second) //nolint:mnd // test purpose
if s.HasAllJobsDone() { if s.TasksDone() {
s.Stop() s.Stop()
return return
} }
@ -113,11 +113,11 @@ func main() {
<-s.Done() <-s.Done()
jds := s.GetJobsDetails() ts := s.GetTasksDetails()
for _, jd := range jds { for _, t := range ts {
c, err := json.Marshal(&jd) c, err := json.Marshal(&t)
if err != nil { if err != nil {
log.Err(err).Str("job", jd.ID.String()).Msg("unable to parse job details into JSON") log.Err(err).Str("task", t.ID.String()).Msg("unable to parse task details into JSON")
continue continue
} }
fmt.Println(string(c)) fmt.Println(string(c))