hmdeploy/scheduler/scheduler.go

241 lines
4.6 KiB
Go

package scheduler
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"github.com/rs/zerolog/log"
)
var (
ErrSchedulerMaxCapacityReached = errors.New("unable to add new task, max capacity reached")
ErrSchedulerContextDone = errors.New("context done, scheduler stopped")
)
type TaskStatus string
const (
Pending TaskStatus = "pending"
Running TaskStatus = "running"
Success = "success"
Failed = "failed"
)
type FnJob func() error
// taskStore is a thread safe `Task` store.
type taskStore struct {
tasks map[string]*Task
l sync.RWMutex
}
func newTaskStore() taskStore {
return taskStore{
tasks: map[string]*Task{},
}
}
func (ts *taskStore) push(task *Task) {
ts.l.Lock()
defer ts.l.Unlock()
ts.tasks[task.Name] = task
}
func (ts *taskStore) setStatus(task *Task, status TaskStatus) {
ts.l.Lock()
defer ts.l.Unlock()
if _, ok := ts.tasks[task.Name]; !ok {
log.Debug().Str("name", task.Name).Msg("unable to update task status, does not exist")
return
}
ts.tasks[task.Name].Status = status
}
func (ts *taskStore) len() int {
ts.l.RLock()
defer ts.l.RUnlock()
return len(ts.tasks)
}
type Tasks []*Task
type tasksOptions func(*options)
type options struct {
layer int
}
func withLayer(layer int) tasksOptions {
return func(o *options) {
o.layer = layer
}
}
// Display displays on stdout the Tasks tree execution layers.
// Each layer represents the tasks going to be executed by the scheduler.
// TODO: display dependencies
func (ts Tasks) Display() {
fmt.Println("> Tasks execution layers")
ts.display()
}
func (ts Tasks) display(opts ...tasksOptions) {
var opt options
for _, o := range opts {
o(&opt)
}
if opt.layer == 0 {
opt.layer = 1
}
if len(ts) == 0 {
return
}
fmt.Println(fmt.Sprintf("------ layer %d ------", opt.layer))
nextTasks := Tasks{}
for idx := range ts {
fmt.Print(ts[idx].Name + " ")
nextTasks = append(nextTasks, ts[idx].Next...)
}
fmt.Println("")
opt.layer += 1
nextTasks.display(withLayer(opt.layer))
}
// Task represents an execution unit handle by the scheduler.
//
// Next field links to next executable tasks (tree kind).
type Task struct {
Name string
Job FnJob
Status TaskStatus
Next []*Task
}
func NewTask(name string, job FnJob, next ...*Task) *Task {
return &Task{
Name: name,
Job: job,
Next: next,
Status: Pending,
}
}
// Scheduler is a simple scheduler.
// Handling tasks and executes them, that's all.
type Scheduler struct { //nolint: govet // ll
capacity atomic.Uint32
workers uint8
chTasks chan *Task
wg sync.WaitGroup
ctx context.Context
fnCancel context.CancelFunc
tasks taskStore
}
// NewScheduler instantiates a new `Scheduler`.
//
// If you want to run tasks immediately after the scheduler creation, you can pass a list of
// `Task` with `tasks` argument.
func NewScheduler(ctx context.Context, capacity uint32, workers uint8, tasks ...*Task) *Scheduler {
ctxChild, fnCancel := context.WithCancel(ctx)
s := Scheduler{
ctx: ctxChild,
fnCancel: fnCancel,
capacity: atomic.Uint32{},
workers: workers,
chTasks: make(chan *Task, capacity),
tasks: newTaskStore(),
wg: sync.WaitGroup{},
}
s.capacity.Add(capacity)
s.run()
if tasks != nil {
for idx := range tasks {
s.Submit(tasks[idx]) //nolint: errcheck // TODO
}
}
return &s
}
func (s *Scheduler) run() {
for i := 0; i < int(s.workers); i++ {
s.wg.Add(1)
go func() {
defer s.wg.Done()
for {
select {
case t := <-s.chTasks:
s.tasks.setStatus(t, Running)
if err := t.Job(); err != nil {
log.Debug().Err(err).Str("task", t.Name).Msg("error executing task")
s.tasks.setStatus(t, Failed)
continue
}
s.tasks.setStatus(t, Success)
for _, nt := range t.Next {
s.Submit(nt) //nolint: errcheck // TODO
}
case <-s.ctx.Done():
log.Debug().Msg("context done, stopping worker...")
return
}
}
}()
}
}
func (s *Scheduler) Stop() {
s.fnCancel()
}
func (s *Scheduler) Submit(task *Task) error {
select {
case <-s.ctx.Done():
log.Debug().Msg("unable to submit new task, scheduler is stopping...")
return ErrSchedulerContextDone
default:
}
cap := s.capacity.Load()
if s.tasks.len() >= int(cap) {
return ErrSchedulerMaxCapacityReached
}
s.tasks.push(task)
s.chTasks <- task
return nil
}
func (s *Scheduler) Done() <-chan struct{} {
chDone := make(chan struct{})
go func() {
for { //nolint: staticcheck // no
select {
case <-s.ctx.Done():
log.Debug().Msg("waiting for scheduler task completion...")
s.wg.Wait()
chDone <- struct{}{}
return
}
}
}()
return chDone
}