package scheduler import ( "context" "errors" "fmt" "sync" "sync/atomic" "github.com/rs/zerolog/log" ) var ( ErrSchedulerMaxCapacityReached = errors.New("unable to add new task, max capacity reached") ErrSchedulerContextDone = errors.New("context done, scheduler stopped") ErrTaskDoesNotExist = errors.New("task does not exist") ErrParentTaskDoesNotExist = errors.New("parent task does not exist") ErrParentTaskFailed = errors.New("parent task failed") ) type TaskStatus string const ( Pending TaskStatus = "pending" Running TaskStatus = "running" Success = "success" Failed = "failed" Unknown = "unknown" ) type FnJob func() error // taskStore is a thread safe `Task` store. type taskStore struct { tasks map[string]*Task l sync.RWMutex } func newTaskStore() taskStore { return taskStore{ tasks: map[string]*Task{}, } } func (ts *taskStore) push(task *Task) { ts.l.Lock() defer ts.l.Unlock() ts.tasks[task.Name] = task } func (ts *taskStore) setStatus(task *Task, status TaskStatus) { ts.l.Lock() defer ts.l.Unlock() if _, ok := ts.tasks[task.Name]; !ok { log.Debug().Str("name", task.Name).Msg("unable to update task status, does not exist") return } ts.tasks[task.Name].Status = status } func (ts *taskStore) isReady(task *Task) (bool, error) { ts.l.RLock() defer ts.l.RUnlock() t, ok := ts.tasks[task.Name] if !ok { log.Debug().Str("name", task.Name).Msg("unable to get task, does not exist") return false, ErrTaskDoesNotExist } if t.Status != Pending { return false, fmt.Errorf("bad status to be run: %s", t.Status) } if t.Parents == nil { return true, nil } isReady := false for idx := range t.Parents { pt, ok := ts.tasks[t.Parents[idx].Name] if !ok { log.Debug(). Str("name", t.Parents[idx].Name). Msg("unable to get parent task, does not exist") return false, ErrParentTaskDoesNotExist } if pt.Status == Failed { return false, ErrParentTaskFailed } isReady = pt.Status == Success } return isReady, nil } func (ts *taskStore) len() int { ts.l.RLock() defer ts.l.RUnlock() return len(ts.tasks) } type Tasks []*Task type tasksOptions func(*options) type options struct { layer int } func withLayer(layer int) tasksOptions { return func(o *options) { o.layer = layer } } // Display displays on stdout the Tasks tree execution layers. // Each layer represents the tasks going to be executed by the scheduler. // TODO: display dependencies func (ts Tasks) Display() { fmt.Println("> Tasks execution layers") ts.display() } func (ts Tasks) display(opts ...tasksOptions) { var opt options for _, o := range opts { o(&opt) } if opt.layer == 0 { opt.layer = 1 } if len(ts) == 0 { return } fmt.Println(fmt.Sprintf("------ layer %d ------", opt.layer)) nextTasks := Tasks{} for idx := range ts { fmt.Print(ts[idx].Name + " ") nextTasks = append(nextTasks, ts[idx].Next...) } fmt.Println("") opt.layer += 1 nextTasks.display(withLayer(opt.layer)) } // Task represents an execution unit handle by the scheduler. // // Next field links to next executable tasks (tree kind). type Task struct { Name string Job FnJob Status TaskStatus Next []*Task Parents []*Task } func NewTask(name string, job FnJob, next ...*Task) *Task { return &Task{ Name: name, Job: job, Next: next, Status: Pending, } } func (t *Task) AddNext(next ...*Task) { t.Next = append(t.Next, next...) } func (t *Task) AddParent(parents ...*Task) { if t.Parents == nil { t.Parents = []*Task{} } t.Parents = append(t.Parents, parents...) } // Scheduler is a simple scheduler. // Handling tasks and executes them, that's all. type Scheduler struct { //nolint: govet // ll capacity atomic.Uint32 workers uint8 chTasks chan *Task wg sync.WaitGroup ctx context.Context fnCancel context.CancelFunc tasks taskStore } // NewScheduler instantiates a new `Scheduler`. // // If you want to run tasks immediately after the scheduler creation, you can pass a list of // `Task` with `tasks` argument. func NewScheduler(ctx context.Context, capacity uint32, workers uint8, tasks ...*Task) *Scheduler { ctxChild, fnCancel := context.WithCancel(ctx) s := Scheduler{ ctx: ctxChild, fnCancel: fnCancel, capacity: atomic.Uint32{}, workers: workers, chTasks: make(chan *Task, capacity), tasks: newTaskStore(), wg: sync.WaitGroup{}, } s.capacity.Add(capacity) s.run() if tasks != nil { for idx := range tasks { s.Submit(tasks[idx]) //nolint: errcheck // TODO } } return &s } func (s *Scheduler) run() { for i := 0; i < int(s.workers); i++ { s.wg.Add(1) go func() { defer s.wg.Done() for { select { case t := <-s.chTasks: ok, err := s.tasks.isReady(t) if err != nil { log.Debug().Err(err).Str("task", t.Name).Msg("error checking task status") s.tasks.setStatus(t, Failed) continue } if !ok { log.Debug().Str("task", t.Name).Msg("task not ready yet, re-scheduling...") s.Submit(t) continue } s.tasks.setStatus(t, Running) if err := t.Job(); err != nil { log.Debug().Err(err).Str("task", t.Name).Msg("error executing task") s.tasks.setStatus(t, Failed) continue } s.tasks.setStatus(t, Success) for _, nt := range t.Next { s.Submit(nt) //nolint: errcheck // TODO } case <-s.ctx.Done(): log.Debug().Msg("context done, stopping worker...") return } } }() } } func (s *Scheduler) Stop() { s.fnCancel() } func (s *Scheduler) Submit(task *Task) error { select { case <-s.ctx.Done(): log.Debug().Msg("unable to submit new task, scheduler is stopping...") return ErrSchedulerContextDone default: } cap := s.capacity.Load() if s.tasks.len() >= int(cap) { return ErrSchedulerMaxCapacityReached } s.tasks.push(task) s.chTasks <- task return nil } func (s *Scheduler) Done() <-chan struct{} { chDone := make(chan struct{}) go func() { for { //nolint: staticcheck // no select { case <-s.ctx.Done(): log.Debug().Msg("waiting for scheduler task completion...") s.wg.Wait() chDone <- struct{}{} return } } }() return chDone }