package scheduler import ( "context" "errors" "fmt" "sync" "sync/atomic" "github.com/rs/zerolog/log" ) var ( ErrSchedulerMaxCapacityReached = errors.New("unable to add new task, max capacity reached") ErrSchedulerContextDone = errors.New("context done, scheduler stopped") ) type TaskStatus string const ( Pending TaskStatus = "pending" Running TaskStatus = "running" Success = "success" Failed = "failed" ) type FnJob func() error // taskStore is a thread safe `Task` store. type taskStore struct { tasks map[string]*Task l sync.RWMutex } func newTaskStore() taskStore { return taskStore{ tasks: map[string]*Task{}, } } func (ts *taskStore) push(task *Task) { ts.l.Lock() defer ts.l.Unlock() ts.tasks[task.Name] = task } func (ts *taskStore) setStatus(task *Task, status TaskStatus) { ts.l.Lock() defer ts.l.Unlock() if _, ok := ts.tasks[task.Name]; !ok { log.Debug().Str("name", task.Name).Msg("unable to update task status, does not exist") return } ts.tasks[task.Name].Status = status } func (ts *taskStore) len() int { ts.l.RLock() defer ts.l.RUnlock() return len(ts.tasks) } type Tasks []*Task type tasksOptions func(*options) type options struct { layer int } func withLayer(layer int) tasksOptions { return func(o *options) { o.layer = layer } } // Display displays on stdout the Tasks tree execution layers. // Each layer represents the tasks going to be executed by the scheduler. // TODO: display dependencies func (ts Tasks) Display() { fmt.Println("> Tasks execution layers") ts.display() } func (ts Tasks) display(opts ...tasksOptions) { var opt options for _, o := range opts { o(&opt) } if opt.layer == 0 { opt.layer = 1 } if len(ts) == 0 { return } fmt.Println(fmt.Sprintf("------ layer %d ------", opt.layer)) nextTasks := Tasks{} for idx := range ts { fmt.Print(ts[idx].Name + " ") nextTasks = append(nextTasks, ts[idx].Next...) } fmt.Println("") opt.layer += 1 nextTasks.display(withLayer(opt.layer)) } // Task represents an execution unit handle by the scheduler. // // Next field links to next executable tasks (tree kind). type Task struct { Name string Job FnJob Status TaskStatus Next []*Task } func NewTask(name string, job FnJob, next ...*Task) *Task { return &Task{ Name: name, Job: job, Next: next, Status: Pending, } } // Scheduler is a simple scheduler. // Handling tasks and executes them, that's all. type Scheduler struct { //nolint: govet // ll capacity atomic.Uint32 workers uint8 chTasks chan *Task wg sync.WaitGroup ctx context.Context fnCancel context.CancelFunc tasks taskStore } // NewScheduler instantiates a new `Scheduler`. // // If you want to run tasks immediately after the scheduler creation, you can pass a list of // `Task` with `tasks` argument. func NewScheduler(ctx context.Context, capacity uint32, workers uint8, tasks ...*Task) *Scheduler { ctxChild, fnCancel := context.WithCancel(ctx) s := Scheduler{ ctx: ctxChild, fnCancel: fnCancel, capacity: atomic.Uint32{}, workers: workers, chTasks: make(chan *Task, capacity), tasks: newTaskStore(), wg: sync.WaitGroup{}, } s.capacity.Add(capacity) s.run() if tasks != nil { for idx := range tasks { s.Submit(tasks[idx]) //nolint: errcheck // TODO } } return &s } func (s *Scheduler) run() { for i := 0; i < int(s.workers); i++ { s.wg.Add(1) go func() { defer s.wg.Done() for { select { case t := <-s.chTasks: s.tasks.setStatus(t, Running) if err := t.Job(); err != nil { log.Debug().Err(err).Str("task", t.Name).Msg("error executing task") s.tasks.setStatus(t, Failed) continue } s.tasks.setStatus(t, Success) for _, nt := range t.Next { s.Submit(nt) //nolint: errcheck // TODO } case <-s.ctx.Done(): log.Debug().Msg("context done, stopping worker...") return } } }() } } func (s *Scheduler) Stop() { s.fnCancel() } func (s *Scheduler) Submit(task *Task) error { select { case <-s.ctx.Done(): log.Debug().Msg("unable to submit new task, scheduler is stopping...") return ErrSchedulerContextDone default: } cap := s.capacity.Load() if s.tasks.len() >= int(cap) { return ErrSchedulerMaxCapacityReached } s.tasks.push(task) s.chTasks <- task return nil } func (s *Scheduler) Done() <-chan struct{} { chDone := make(chan struct{}) go func() { for { //nolint: staticcheck // no select { case <-s.ctx.Done(): log.Debug().Msg("waiting for scheduler task completion...") s.wg.Wait() chDone <- struct{}{} return } } }() return chDone }