214 lines
4.5 KiB
Go
214 lines
4.5 KiB
Go
package scheduler
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
const (
|
|
ChanLength = 500
|
|
DefaultExecInterval = 30 * time.Second
|
|
)
|
|
|
|
type IScheduler interface {
|
|
Delay(fnJob FnJob, opts ...TaskOption) uuid.UUID
|
|
}
|
|
|
|
// SchedulerCycle is a simple scheduler handling jobs and executes them at regular interval.
|
|
// If a task is not in desired state, the task is re-scheduled.
|
|
type SchedulerCycle struct {
|
|
wg sync.WaitGroup
|
|
|
|
ctx context.Context
|
|
fnCancel context.CancelFunc
|
|
|
|
tasks tasks
|
|
|
|
chTasks chan *task
|
|
chDone chan struct{}
|
|
}
|
|
|
|
func NewSchedulerCycle(ctx context.Context, workers uint32) *SchedulerCycle {
|
|
ctxChild, fnCancel := context.WithCancel(ctx)
|
|
|
|
c := SchedulerCycle{
|
|
wg: sync.WaitGroup{},
|
|
ctx: ctxChild,
|
|
fnCancel: fnCancel,
|
|
tasks: newTasks(),
|
|
chTasks: make(chan *task, ChanLength),
|
|
}
|
|
|
|
done := make(chan struct{})
|
|
go func() {
|
|
<-c.ctx.Done()
|
|
defer c.fnCancel()
|
|
c.wg.Wait()
|
|
c.stop()
|
|
done <- struct{}{}
|
|
}()
|
|
c.chDone = done
|
|
|
|
c.run(workers)
|
|
|
|
return &c
|
|
}
|
|
|
|
// delay sets the task timer when the task should be scheduled.
|
|
func (c *SchedulerCycle) delay(t *task) {
|
|
interval := DefaultExecInterval
|
|
if t.execInterval != nil {
|
|
interval = *t.execInterval
|
|
}
|
|
|
|
t.setTimer(
|
|
time.AfterFunc(interval, func() {
|
|
select {
|
|
case c.chTasks <- t:
|
|
default:
|
|
log.Warn().Str("id", t.GetID().String()).Msg("queue is full, can't accept new task, delayed it")
|
|
c.delay(t)
|
|
}
|
|
}),
|
|
)
|
|
}
|
|
|
|
// exec runs the task now or if all the workers are in use, delayed it.
|
|
func (c *SchedulerCycle) exec(t *task) {
|
|
select {
|
|
case c.chTasks <- t:
|
|
default:
|
|
log.Warn().Str("id", t.GetID().String()).Msg("queue is full, can't accept new task, delayed it")
|
|
c.delay(t)
|
|
}
|
|
}
|
|
|
|
func (c *SchedulerCycle) getTask(id uuid.UUID) *task {
|
|
return c.tasks.get(id)
|
|
}
|
|
|
|
// run launches a number of worker to execute tasks.
|
|
func (c *SchedulerCycle) run(n uint32) {
|
|
for i := 0; i < int(n); i++ {
|
|
c.wg.Add(1)
|
|
go func() {
|
|
defer c.wg.Done()
|
|
for {
|
|
select {
|
|
case t := <-c.chTasks:
|
|
c.execute(t, c.delay)
|
|
case <-c.ctx.Done():
|
|
log.Error().Msg("context done, worker is stopping...")
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
|
|
// execute executes the task.
|
|
//
|
|
// It does not handle task error, it's up to the task to implement its own callbacks.
|
|
// In case of pending state, a callback is executed for actions. For the others states,
|
|
// the task is deleted from the scheduler.
|
|
func (c *SchedulerCycle) execute(t *task, fnFallBack func(*task)) {
|
|
t.Run(c.ctx)
|
|
|
|
switch t.GetState() {
|
|
case Pending:
|
|
fnFallBack(t)
|
|
case Success, Failed, Abort, Unknown:
|
|
c.tasks.delete(t)
|
|
case Running:
|
|
c.tasks.delete(t)
|
|
log.Debug().Str("id", t.GetID().String()).Msg("weird state (running) after job execution...")
|
|
}
|
|
}
|
|
|
|
// stop aborts all tasks and waits until tasks are stopped.
|
|
// If the process can't be stopped within 10s, too bad...
|
|
func (c *SchedulerCycle) stop() {
|
|
c.tasks.abort()
|
|
|
|
if c.TasksDone() {
|
|
log.Info().Msg("all tasks has been stopped gracefully")
|
|
return
|
|
}
|
|
|
|
ctxTimeout := 10 * time.Second
|
|
ctx, fnCancel := context.WithTimeout(c.ctx, ctxTimeout)
|
|
defer fnCancel()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Error().Msg("stop context done, tasks has been stopped gracefully")
|
|
return
|
|
default:
|
|
}
|
|
|
|
if c.TasksDone() {
|
|
log.Info().Msg("all tasks has been stopped gracefully")
|
|
return
|
|
}
|
|
|
|
time.Sleep(time.Second)
|
|
}
|
|
}
|
|
|
|
func (c *SchedulerCycle) Done() <-chan struct{} {
|
|
return c.chDone
|
|
}
|
|
|
|
func (c *SchedulerCycle) Len() int {
|
|
return c.tasks.len()
|
|
}
|
|
|
|
// TasksDone checks whether all the tasks has been completed.
|
|
func (c *SchedulerCycle) TasksDone() bool {
|
|
return c.tasks.completed()
|
|
}
|
|
|
|
func (c *SchedulerCycle) GetTasksDetails() []TaskDetails {
|
|
return c.tasks.getAllDetails()
|
|
}
|
|
|
|
// GetTaskDetails returns the task details by id.
|
|
func (c *SchedulerCycle) GetTaskDetails(id uuid.UUID) TaskDetails {
|
|
return c.tasks.getDetails(id)
|
|
}
|
|
|
|
// Delay builds a task and adds it to the scheduler engine.
|
|
func (c *SchedulerCycle) Delay(fnJob FnJob, opts ...TaskOption) uuid.UUID {
|
|
select {
|
|
case <-c.Done():
|
|
log.Error().Msg("context done unable to add new job")
|
|
default:
|
|
}
|
|
|
|
t := NewTask(fnJob, opts...)
|
|
|
|
c.tasks.add(t)
|
|
c.exec(t)
|
|
|
|
log.Info().Str("id", t.GetID().String()).Msg("task added successfully")
|
|
return t.GetID()
|
|
}
|
|
|
|
// Abort aborts the task given by its id if it exists.
|
|
func (c *SchedulerCycle) Abort(id uuid.UUID) bool {
|
|
if t := c.getTask(id); t != nil {
|
|
t.Abort()
|
|
|
|
log.Info().Str("id", t.GetID().String()).Msg("abort task done")
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|