package scheduler import ( "context" "cycle-scheduler/internal/job" "math" "sync" "time" "github.com/google/uuid" "github.com/rs/zerolog/log" ) const ExponentialFactor = 1.8 // SchedulerCycle is a simple scheduler handling jobs and executes them at regular interval. // If a task is not in desired state, the task is re-scheduled with a backoff. type SchedulerCycle struct { l sync.RWMutex wg sync.WaitGroup ctx context.Context fnCancel context.CancelFunc interval time.Duration tasks map[uuid.UUID]*task chTasks chan *task } func NewSchedulerCycle(ctx context.Context, interval time.Duration, workers uint32) *SchedulerCycle { ctxChild, fnCancel := context.WithCancel(ctx) c := SchedulerCycle{ wg: sync.WaitGroup{}, ctx: ctxChild, fnCancel: fnCancel, interval: interval, tasks: make(map[uuid.UUID]*task), chTasks: make(chan *task), } c.run(workers) return &c } func (c *SchedulerCycle) backoff(j *task) { backoff := c.interval + time.Duration(math.Pow(ExponentialFactor, float64(j.attempts.Load()))) j.timer.set( time.AfterFunc(backoff, func() { select { case c.chTasks <- j: default: log.Error().Str("job id", j.GetID().String()).Msg("unable to execute job to a worker") c.backoff(j) } }), ) } // exec runs the job now or if all the workers are in use, delayed it. func (c *SchedulerCycle) exec(j *task) { select { case c.chTasks <- j: default: log.Error().Str("job id", j.GetID().String()).Msg("unable to execute job to a worker now, delayed it") c.backoff(j) } } func (c *SchedulerCycle) getTask(id uuid.UUID) *task { c.l.RLock() defer c.l.RUnlock() j, ok := c.tasks[id] if !ok { return nil } return j } // run launches a number of worker to execute job. // If job returns `ErrJobNotCompletedYet`, it re-schedules with a backoff. func (c *SchedulerCycle) run(n uint32) { for i := 0; i < int(n); i++ { c.wg.Add(1) go func() { defer c.wg.Done() for { select { case j := <-c.chTasks: c.executeJob(j, c.backoff) case <-c.ctx.Done(): log.Error().Msg("context done, worker is stopping...") return } } }() } } func (c *SchedulerCycle) executeJob(j *task, fnFallBack func(*task)) { j.run(c.ctx) if j.GetState() == job.Pending { fnFallBack(j) } } func (c *SchedulerCycle) Stop() { c.fnCancel() } func (c *SchedulerCycle) Done() <-chan struct{} { done := make(chan struct{}) go func() { <-c.ctx.Done() c.wg.Wait() done <- struct{}{} }() return done } func (c *SchedulerCycle) Len() int { c.l.RLock() defer c.l.RUnlock() return len(c.tasks) } // HasAllJobsDone checks whether all the jobs has been executed. func (c *SchedulerCycle) HasAllJobsDone() bool { c.l.RLock() defer c.l.RUnlock() for _, t := range c.tasks { if t.GetState() == job.Pending || t.GetState() == job.Running { return false } } return true } func (c *SchedulerCycle) GetJobsDetails() []TaskDetails { c.l.RLock() defer c.l.RUnlock() details := []TaskDetails{} for _, t := range c.tasks { details = append(details, t.getDetails()) } return details } // Delay builds a task and add it to the scheduler engine. func (c *SchedulerCycle) Delay(fnJob job.FnJob) uuid.UUID { select { case <-c.Done(): log.Error().Msg("context done unable to add new job") default: } j := newTask(fnJob) c.l.Lock() defer c.l.Unlock() c.tasks[j.GetID()] = j c.exec(j) log.Info().Str("job", j.GetID().String()).Msg("job added successfully") return j.GetID() } // Abort aborts the job given by its id if it exists. func (c *SchedulerCycle) Abort(id uuid.UUID) bool { if j := c.getTask(id); j != nil { j.abort() log.Info().Str("job", j.GetID().String()).Msg("abort job done") return true } return false } // GetJobDetails returns the task details by id. func (c *SchedulerCycle) GetJobDetails(id uuid.UUID) TaskDetails { c.l.RLock() defer c.l.RUnlock() j, ok := c.tasks[id] if !ok { return TaskDetails{ JobDetails: job.JobDetails{ State: job.UnknownState, }, } } return j.getDetails() }