package scheduler import ( "context" "sync" "time" "github.com/google/uuid" "github.com/rs/zerolog/log" ) const ( ChanLength = 500 DefaultExecInterval = 30 * time.Second ) type IScheduler interface { Delay(fnJob FnJob, opts ...TaskOption) uuid.UUID } // SchedulerCycle is a simple scheduler handling jobs and executes them at regular interval. // If a task is not in desired state, the task is re-scheduled. type SchedulerCycle struct { wg sync.WaitGroup ctx context.Context fnCancel context.CancelFunc tasks tasks chTasks chan *task chDone chan struct{} } func NewSchedulerCycle(ctx context.Context, workers uint32) *SchedulerCycle { ctxChild, fnCancel := context.WithCancel(ctx) c := SchedulerCycle{ wg: sync.WaitGroup{}, ctx: ctxChild, fnCancel: fnCancel, tasks: newTasks(), chTasks: make(chan *task, ChanLength), } done := make(chan struct{}) go func() { <-c.ctx.Done() defer c.fnCancel() c.wg.Wait() c.stop() done <- struct{}{} }() c.chDone = done c.run(workers) return &c } // delay sets the task timer when the task should be scheduled. func (c *SchedulerCycle) delay(t *task) { interval := DefaultExecInterval if t.execInterval != nil { interval = *t.execInterval } t.setTimer( time.AfterFunc(interval, func() { select { case c.chTasks <- t: default: log.Warn().Str("id", t.GetID().String()).Msg("queue is full, can't accept new task, delayed it") c.delay(t) } }), ) } // exec runs the task now or if all the workers are in use, delayed it. func (c *SchedulerCycle) exec(t *task) { select { case c.chTasks <- t: default: log.Warn().Str("id", t.GetID().String()).Msg("queue is full, can't accept new task, delayed it") c.delay(t) } } func (c *SchedulerCycle) getTask(id uuid.UUID) *task { return c.tasks.get(id) } // run launches a number of worker to execute tasks. func (c *SchedulerCycle) run(n uint32) { for i := 0; i < int(n); i++ { c.wg.Add(1) go func() { defer c.wg.Done() for { select { case t := <-c.chTasks: c.execute(t, c.delay) case <-c.ctx.Done(): log.Error().Msg("context done, worker is stopping...") return } } }() } } // execute executes the task. // // It does not handle task error, it's up to the task to implement its own callbacks. // In case of pending state, a callback is executed for actions. For the others states, // the task is deleted from the scheduler. func (c *SchedulerCycle) execute(t *task, fnFallBack func(*task)) { t.Run(c.ctx) switch t.GetState() { case Pending: fnFallBack(t) case Success, Failed, Abort, Unknown: c.tasks.delete(t) case Running: c.tasks.delete(t) log.Debug().Str("id", t.GetID().String()).Msg("weird state (running) after job execution...") } } // stop aborts all tasks and waits until tasks are stopped. // If the process can't be stopped within 10s, too bad... func (c *SchedulerCycle) stop() { c.tasks.abort() if c.TasksDone() { log.Info().Msg("all tasks has been stopped gracefully") return } ctxTimeout := 10 * time.Second ctx, fnCancel := context.WithTimeout(c.ctx, ctxTimeout) defer fnCancel() for { select { case <-ctx.Done(): log.Error().Msg("stop context done, tasks has been stopped gracefully") return default: } if c.TasksDone() { log.Info().Msg("all tasks has been stopped gracefully") return } time.Sleep(time.Second) } } func (c *SchedulerCycle) Done() <-chan struct{} { return c.chDone } func (c *SchedulerCycle) Len() int { return c.tasks.len() } // TasksDone checks whether all the tasks has been completed. func (c *SchedulerCycle) TasksDone() bool { return c.tasks.completed() } func (c *SchedulerCycle) GetTasksDetails() []TaskDetails { return c.tasks.getAllDetails() } // GetTaskDetails returns the task details by id. func (c *SchedulerCycle) GetTaskDetails(id uuid.UUID) TaskDetails { return c.tasks.getDetails(id) } // Delay builds a task and adds it to the scheduler engine. func (c *SchedulerCycle) Delay(fnJob FnJob, opts ...TaskOption) uuid.UUID { select { case <-c.Done(): log.Error().Msg("context done unable to add new job") default: } t := NewTask(fnJob, opts...) c.tasks.add(t) c.exec(t) log.Info().Str("id", t.GetID().String()).Msg("task added successfully") return t.GetID() } // Abort aborts the task given by its id if it exists. func (c *SchedulerCycle) Abort(id uuid.UUID) bool { if t := c.getTask(id); t != nil { t.Abort() log.Info().Str("id", t.GetID().String()).Msg("abort task done") return true } return false }