309 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			309 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package scheduler
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 
 | |
| 	"github.com/rs/zerolog/log"
 | |
| )
 | |
| 
 | |
| var (
 | |
| 	ErrSchedulerMaxCapacityReached = errors.New("unable to add new task, max capacity reached")
 | |
| 	ErrSchedulerContextDone        = errors.New("context done, scheduler stopped")
 | |
| 
 | |
| 	ErrTaskDoesNotExist       = errors.New("task does not exist")
 | |
| 	ErrParentTaskDoesNotExist = errors.New("parent task does not exist")
 | |
| 	ErrParentTaskFailed       = errors.New("parent task failed")
 | |
| )
 | |
| 
 | |
| type TaskStatus string
 | |
| 
 | |
| const (
 | |
| 	Pending TaskStatus = "pending"
 | |
| 	Running TaskStatus = "running"
 | |
| 	Success            = "success"
 | |
| 	Failed             = "failed"
 | |
| 	Unknown            = "unknown"
 | |
| )
 | |
| 
 | |
| type FnJob func() error
 | |
| 
 | |
| // taskStore is a thread safe `Task` store.
 | |
| type taskStore struct {
 | |
| 	tasks map[string]*Task
 | |
| 	l     sync.RWMutex
 | |
| }
 | |
| 
 | |
| func newTaskStore() taskStore {
 | |
| 	return taskStore{
 | |
| 		tasks: map[string]*Task{},
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (ts *taskStore) push(task *Task) {
 | |
| 	ts.l.Lock()
 | |
| 	defer ts.l.Unlock()
 | |
| 
 | |
| 	ts.tasks[task.Name] = task
 | |
| }
 | |
| 
 | |
| func (ts *taskStore) setStatus(task *Task, status TaskStatus) {
 | |
| 	ts.l.Lock()
 | |
| 	defer ts.l.Unlock()
 | |
| 
 | |
| 	if _, ok := ts.tasks[task.Name]; !ok {
 | |
| 		log.Debug().Str("name", task.Name).Msg("unable to update task status, does not exist")
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	ts.tasks[task.Name].Status = status
 | |
| }
 | |
| 
 | |
| func (ts *taskStore) isReady(task *Task) (bool, error) {
 | |
| 	ts.l.RLock()
 | |
| 	defer ts.l.RUnlock()
 | |
| 
 | |
| 	t, ok := ts.tasks[task.Name]
 | |
| 	if !ok {
 | |
| 		log.Debug().Str("name", task.Name).Msg("unable to get task, does not exist")
 | |
| 		return false, ErrTaskDoesNotExist
 | |
| 	}
 | |
| 
 | |
| 	if t.Status != Pending {
 | |
| 		return false, fmt.Errorf("bad status to be run: %s", t.Status)
 | |
| 	}
 | |
| 
 | |
| 	if t.Parents == nil {
 | |
| 		return true, nil
 | |
| 	}
 | |
| 
 | |
| 	isReady := false
 | |
| 	for idx := range t.Parents {
 | |
| 		pt, ok := ts.tasks[t.Parents[idx].Name]
 | |
| 		if !ok {
 | |
| 			log.Debug().
 | |
| 				Str("name", t.Parents[idx].Name).
 | |
| 				Msg("unable to get parent task, does not exist")
 | |
| 			return false, ErrParentTaskDoesNotExist
 | |
| 		}
 | |
| 
 | |
| 		if pt.Status == Failed {
 | |
| 			return false, ErrParentTaskFailed
 | |
| 		}
 | |
| 
 | |
| 		isReady = pt.Status == Success
 | |
| 	}
 | |
| 
 | |
| 	return isReady, nil
 | |
| }
 | |
| 
 | |
| func (ts *taskStore) len() int {
 | |
| 	ts.l.RLock()
 | |
| 	defer ts.l.RUnlock()
 | |
| 
 | |
| 	return len(ts.tasks)
 | |
| }
 | |
| 
 | |
| type Tasks []*Task
 | |
| 
 | |
| type tasksOptions func(*options)
 | |
| 
 | |
| type options struct {
 | |
| 	layer int
 | |
| }
 | |
| 
 | |
| func withLayer(layer int) tasksOptions {
 | |
| 	return func(o *options) {
 | |
| 		o.layer = layer
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Display displays on stdout the Tasks tree execution layers.
 | |
| // Each layer represents the tasks going to be executed by the scheduler.
 | |
| // TODO: display dependencies
 | |
| func (ts Tasks) Display() {
 | |
| 	fmt.Println("> Tasks execution layers")
 | |
| 	ts.display()
 | |
| }
 | |
| 
 | |
| func (ts Tasks) display(opts ...tasksOptions) {
 | |
| 	var opt options
 | |
| 	for _, o := range opts {
 | |
| 		o(&opt)
 | |
| 	}
 | |
| 	if opt.layer == 0 {
 | |
| 		opt.layer = 1
 | |
| 	}
 | |
| 
 | |
| 	if len(ts) == 0 {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	fmt.Println(fmt.Sprintf("------ layer %d ------", opt.layer))
 | |
| 	nextTasks := Tasks{}
 | |
| 	for idx := range ts {
 | |
| 		fmt.Print(ts[idx].Name + " ")
 | |
| 		nextTasks = append(nextTasks, ts[idx].Next...)
 | |
| 	}
 | |
| 	fmt.Println("")
 | |
| 
 | |
| 	opt.layer += 1
 | |
| 	nextTasks.display(withLayer(opt.layer))
 | |
| }
 | |
| 
 | |
| // Task represents an execution unit handle by the scheduler.
 | |
| //
 | |
| // Next field links to next executable tasks (tree kind).
 | |
| type Task struct {
 | |
| 	Name    string
 | |
| 	Job     FnJob
 | |
| 	Status  TaskStatus
 | |
| 	Next    []*Task
 | |
| 	Parents []*Task
 | |
| }
 | |
| 
 | |
| func NewTask(name string, job FnJob, next ...*Task) *Task {
 | |
| 	return &Task{
 | |
| 		Name:   name,
 | |
| 		Job:    job,
 | |
| 		Next:   next,
 | |
| 		Status: Pending,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (t *Task) AddNext(next ...*Task) {
 | |
| 	t.Next = append(t.Next, next...)
 | |
| }
 | |
| 
 | |
| func (t *Task) AddParent(parents ...*Task) {
 | |
| 	if t.Parents == nil {
 | |
| 		t.Parents = []*Task{}
 | |
| 	}
 | |
| 	t.Parents = append(t.Parents, parents...)
 | |
| }
 | |
| 
 | |
| // Scheduler is a simple scheduler.
 | |
| // Handling tasks and executes them, that's all.
 | |
| type Scheduler struct { //nolint: govet // ll
 | |
| 	capacity atomic.Uint32
 | |
| 	workers  uint8
 | |
| 	chTasks  chan *Task
 | |
| 	wg       sync.WaitGroup
 | |
| 
 | |
| 	ctx      context.Context
 | |
| 	fnCancel context.CancelFunc
 | |
| 
 | |
| 	tasks taskStore
 | |
| }
 | |
| 
 | |
| // NewScheduler instantiates a new `Scheduler`.
 | |
| //
 | |
| // If you want to run tasks immediately after the scheduler creation, you can pass a list of
 | |
| // `Task` with `tasks` argument.
 | |
| func NewScheduler(ctx context.Context, capacity uint32, workers uint8, tasks ...*Task) *Scheduler {
 | |
| 	ctxChild, fnCancel := context.WithCancel(ctx)
 | |
| 	s := Scheduler{
 | |
| 		ctx:      ctxChild,
 | |
| 		fnCancel: fnCancel,
 | |
| 		capacity: atomic.Uint32{},
 | |
| 		workers:  workers,
 | |
| 		chTasks:  make(chan *Task, capacity),
 | |
| 		tasks:    newTaskStore(),
 | |
| 		wg:       sync.WaitGroup{},
 | |
| 	}
 | |
| 	s.capacity.Add(capacity)
 | |
| 	s.run()
 | |
| 
 | |
| 	if tasks != nil {
 | |
| 		for idx := range tasks {
 | |
| 			s.Submit(tasks[idx]) //nolint: errcheck // TODO
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return &s
 | |
| }
 | |
| 
 | |
| func (s *Scheduler) run() {
 | |
| 	for i := 0; i < int(s.workers); i++ {
 | |
| 		s.wg.Add(1)
 | |
| 		go func() {
 | |
| 			defer s.wg.Done()
 | |
| 			for {
 | |
| 				select {
 | |
| 				case t := <-s.chTasks:
 | |
| 					ok, err := s.tasks.isReady(t)
 | |
| 					if err != nil {
 | |
| 						log.Debug().Err(err).Str("task", t.Name).Msg("error checking task status")
 | |
| 						s.tasks.setStatus(t, Failed)
 | |
| 						continue
 | |
| 					}
 | |
| 
 | |
| 					if !ok {
 | |
| 						log.Debug().Str("task", t.Name).Msg("task not ready yet, re-scheduling...")
 | |
| 						s.Submit(t)
 | |
| 						continue
 | |
| 					}
 | |
| 
 | |
| 					s.tasks.setStatus(t, Running)
 | |
| 
 | |
| 					if err := t.Job(); err != nil {
 | |
| 						log.Debug().Err(err).Str("task", t.Name).Msg("error executing task")
 | |
| 						s.tasks.setStatus(t, Failed)
 | |
| 						continue
 | |
| 					}
 | |
| 
 | |
| 					s.tasks.setStatus(t, Success)
 | |
| 
 | |
| 					for _, nt := range t.Next {
 | |
| 						s.Submit(nt) //nolint: errcheck // TODO
 | |
| 					}
 | |
| 				case <-s.ctx.Done():
 | |
| 					log.Debug().Msg("context done, stopping worker...")
 | |
| 					return
 | |
| 				}
 | |
| 			}
 | |
| 		}()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (s *Scheduler) Stop() {
 | |
| 	s.fnCancel()
 | |
| }
 | |
| 
 | |
| func (s *Scheduler) Submit(task *Task) error {
 | |
| 	select {
 | |
| 	case <-s.ctx.Done():
 | |
| 		log.Debug().Msg("unable to submit new task, scheduler is stopping...")
 | |
| 		return ErrSchedulerContextDone
 | |
| 	default:
 | |
| 	}
 | |
| 
 | |
| 	cap := s.capacity.Load()
 | |
| 	if s.tasks.len() >= int(cap) {
 | |
| 		return ErrSchedulerMaxCapacityReached
 | |
| 	}
 | |
| 
 | |
| 	s.tasks.push(task)
 | |
| 	s.chTasks <- task
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (s *Scheduler) Done() <-chan struct{} {
 | |
| 	chDone := make(chan struct{})
 | |
| 	go func() {
 | |
| 		for { //nolint: staticcheck // no
 | |
| 			select {
 | |
| 			case <-s.ctx.Done():
 | |
| 				log.Debug().Msg("waiting for scheduler task completion...")
 | |
| 				s.wg.Wait()
 | |
| 				chDone <- struct{}{}
 | |
| 				return
 | |
| 			}
 | |
| 		}
 | |
| 	}()
 | |
| 	return chDone
 | |
| }
 | 
