372 lines
		
	
	
		
			7.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			372 lines
		
	
	
		
			7.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package scheduler
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"cycle-scheduler/internal/job"
 | |
| 	"fmt"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/google/uuid"
 | |
| 	"github.com/rs/zerolog/log"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	TableTitle  = "# cycle-scheduler"
 | |
| 	Cursor      = "^"
 | |
| 	CycleLength = 60
 | |
| 	MaxWorkers  = 5
 | |
| )
 | |
| 
 | |
| const MaxSlotsIdx = 59
 | |
| 
 | |
| type JobSlot struct {
 | |
| 	*job.Job
 | |
| 	row int
 | |
| }
 | |
| 
 | |
| // SchedulerCycle is a dumb scheduler.
 | |
| // It handle job and executes it at each cycle (60 * interval).
 | |
| //
 | |
| // Jobs are handle in a array of job slices.
 | |
| // At each interval (clock), the cursor moves to the next slot (s*).
 | |
| // If there are jobs, they are sent to workers to be executed
 | |
| // and the slot is cleaned.
 | |
| //
 | |
| // At the end of the slot (s60), the cursor re-starts a cycle at s1.
 | |
| type SchedulerCycle struct {
 | |
| 	l  sync.RWMutex
 | |
| 	wg sync.WaitGroup
 | |
| 
 | |
| 	ctx      context.Context
 | |
| 	fnCancel context.CancelFunc
 | |
| 
 | |
| 	interval    time.Duration
 | |
| 	currentSlot int
 | |
| 	slots       [60][]*job.Job
 | |
| 	jobs        map[uuid.UUID]*job.Job
 | |
| 
 | |
| 	chJobs chan *JobSlot
 | |
| }
 | |
| 
 | |
| func NewSchedulerCycle(ctx context.Context, interval time.Duration) *SchedulerCycle {
 | |
| 	ctxChild, fnCancel := context.WithCancel(ctx)
 | |
| 
 | |
| 	c := SchedulerCycle{
 | |
| 		wg:          sync.WaitGroup{},
 | |
| 		ctx:         ctxChild,
 | |
| 		fnCancel:    fnCancel,
 | |
| 		interval:    interval,
 | |
| 		currentSlot: 0,
 | |
| 		slots:       [60][]*job.Job{},
 | |
| 		jobs:        make(map[uuid.UUID]*job.Job),
 | |
| 		chJobs:      make(chan *JobSlot),
 | |
| 	}
 | |
| 
 | |
| 	c.run()
 | |
| 
 | |
| 	return &c
 | |
| }
 | |
| 
 | |
| func (c *SchedulerCycle) Stop() {
 | |
| 	c.fnCancel()
 | |
| }
 | |
| 
 | |
| func (c *SchedulerCycle) Done() <-chan struct{} {
 | |
| 	done := make(chan struct{})
 | |
| 	go func() {
 | |
| 		<-c.ctx.Done()
 | |
| 		c.wg.Done()
 | |
| 		done <- struct{}{}
 | |
| 	}()
 | |
| 	return done
 | |
| }
 | |
| 
 | |
| func (c *SchedulerCycle) Len() int {
 | |
| 	c.l.Lock()
 | |
| 	defer c.l.Unlock()
 | |
| 
 | |
| 	return len(c.jobs)
 | |
| }
 | |
| 
 | |
| func (c *SchedulerCycle) HasAllJobsDone() bool {
 | |
| 	c.l.Lock()
 | |
| 	defer c.l.Unlock()
 | |
| 
 | |
| 	for _, j := range c.jobs {
 | |
| 		if j.GetState() == job.Pending || j.GetState() == job.Running {
 | |
| 			return false
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| func (c *SchedulerCycle) GetJobsDetails() []job.JobDetails {
 | |
| 	c.l.Lock()
 | |
| 	defer c.l.Unlock()
 | |
| 
 | |
| 	details := []job.JobDetails{}
 | |
| 	for _, j := range c.jobs {
 | |
| 		details = append(details, j.IntoDetails())
 | |
| 	}
 | |
| 
 | |
| 	return details
 | |
| }
 | |
| 
 | |
| // Delay builds a job and add it to the scheduler engine.
 | |
| func (c *SchedulerCycle) Delay(fnJob job.FnJob) uuid.UUID {
 | |
| 	c.l.Lock()
 | |
| 	defer c.l.Unlock()
 | |
| 
 | |
| 	nextSlot := c.currentSlot + 1
 | |
| 	if nextSlot > MaxSlotsIdx {
 | |
| 		nextSlot = 0
 | |
| 	}
 | |
| 
 | |
| 	j := job.NewJob(fnJob, nextSlot, len(c.slots[nextSlot]))
 | |
| 
 | |
| 	c.slots[nextSlot] = append(c.slots[nextSlot], &j)
 | |
| 	c.jobs[j.GetID()] = &j
 | |
| 
 | |
| 	log.Info().Str("job", j.GetID().String()).Msg("job added successfully")
 | |
| 	return j.GetID()
 | |
| }
 | |
| 
 | |
| // Abort aborts the job given by its id if it exists..
 | |
| func (c *SchedulerCycle) Abort(id uuid.UUID) bool {
 | |
| 	if j := c.getJob(id); j != nil {
 | |
| 		j.Abort()
 | |
| 
 | |
| 		log.Info().Str("job", j.GetID().String()).Msg("abort job done")
 | |
| 		return true
 | |
| 	}
 | |
| 
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| // GetJobDetails returns the job details by .
 | |
| func (c *SchedulerCycle) GetJobDetails(id uuid.UUID) job.JobDetails {
 | |
| 	c.l.Lock()
 | |
| 	defer c.l.Unlock()
 | |
| 
 | |
| 	j, ok := c.jobs[id]
 | |
| 	if !ok {
 | |
| 		return job.JobDetails{
 | |
| 			State: job.Unknown.String(),
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return j.IntoDetails()
 | |
| }
 | |
| 
 | |
| // Display outputs earch interval the scheduler state.
 | |
| func (c *SchedulerCycle) Display() {
 | |
| 	ticker := time.NewTicker(c.interval)
 | |
| 	go func() {
 | |
| 		for range ticker.C {
 | |
| 			c.display()
 | |
| 		}
 | |
| 	}()
 | |
| }
 | |
| 
 | |
| // display writes to stdout the state of the scheduler as a table.
 | |
| func (c *SchedulerCycle) display() { //nolint:gocyclo // not complex
 | |
| 	c.l.RLock()
 | |
| 	defer c.l.RUnlock()
 | |
| 
 | |
| 	var maxCols int
 | |
| 	for i := range c.slots {
 | |
| 		if l := len(c.slots[i]); l > maxCols {
 | |
| 			maxCols = l
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	table := [][]string{}
 | |
| 	title := fmt.Sprintf("%s (slot: %d)", TableTitle, c.currentSlot+1)
 | |
| 	table = append(table, []string{title})
 | |
| 	for {
 | |
| 		if maxCols == 0 {
 | |
| 			break
 | |
| 		}
 | |
| 
 | |
| 		row := make([]string, CycleLength)
 | |
| 		for i := 0; i <= MaxSlotsIdx; i++ {
 | |
| 			row[i] = "_"
 | |
| 		}
 | |
| 
 | |
| 		for i := range c.slots {
 | |
| 			if len(c.slots[i]) < maxCols {
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			j := c.slots[i][maxCols-1]
 | |
| 			switch j.GetState() {
 | |
| 			case job.Pending:
 | |
| 				row[i] = "P"
 | |
| 			case job.Running:
 | |
| 				row[i] = "R"
 | |
| 			case job.Failed:
 | |
| 				row[i] = "X"
 | |
| 			case job.Abort:
 | |
| 				row[i] = "A"
 | |
| 			case job.Unknown:
 | |
| 				row[i] = "?"
 | |
| 			case job.Success:
 | |
| 				row[i] = "O"
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		table = append(table, row)
 | |
| 		maxCols--
 | |
| 	}
 | |
| 
 | |
| 	row := make([]string, CycleLength)
 | |
| 	for i := 0; i <= MaxSlotsIdx; i++ {
 | |
| 		row[i] = "-"
 | |
| 	}
 | |
| 	table = append(table, row)
 | |
| 
 | |
| 	if l := len(table); l > 0 {
 | |
| 		table[l-1][c.currentSlot] = Cursor
 | |
| 	}
 | |
| 
 | |
| 	tableFormat := ""
 | |
| 	for _, r := range table {
 | |
| 		tableFormat += strings.Join(r, " ")
 | |
| 		tableFormat += "\n"
 | |
| 	}
 | |
| 
 | |
| 	fmt.Println(tableFormat)
 | |
| }
 | |
| 
 | |
| func (c *SchedulerCycle) getJob(id uuid.UUID) *job.Job {
 | |
| 	c.l.RLock()
 | |
| 	defer c.l.RUnlock()
 | |
| 
 | |
| 	j, ok := c.jobs[id]
 | |
| 	if !ok {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	return j
 | |
| }
 | |
| 
 | |
| // getCurrentSlotJobs collects all the current slot jobs
 | |
| // and clean the slot.
 | |
| func (c *SchedulerCycle) getCurrentSlotJobs() (int, []*job.Job) {
 | |
| 	c.l.Lock()
 | |
| 	defer c.l.Unlock()
 | |
| 
 | |
| 	jobs := c.slots[c.currentSlot]
 | |
| 
 | |
| 	c.slots[c.currentSlot] = []*job.Job{}
 | |
| 
 | |
| 	return c.currentSlot, jobs
 | |
| }
 | |
| 
 | |
| // updateSlot add a job to the slot where it was before.
 | |
| func (c *SchedulerCycle) updateSlot(row int, j *job.Job) {
 | |
| 	c.l.Lock()
 | |
| 	defer c.l.Unlock()
 | |
| 
 | |
| 	c.slots[row] = append(c.slots[row], j)
 | |
| }
 | |
| 
 | |
| // updateCurrentSlot add a job to the current slot.
 | |
| func (c *SchedulerCycle) updateCurrentSlot(j *job.Job) {
 | |
| 	c.l.Lock()
 | |
| 	defer c.l.Unlock()
 | |
| 
 | |
| 	c.slots[c.currentSlot] = append(c.slots[c.currentSlot], j)
 | |
| }
 | |
| 
 | |
| // incr increments the slot cursor.
 | |
| // It the cursor reaches `MaxSlotsIdx`, it goes back to 0.
 | |
| func (c *SchedulerCycle) incr() {
 | |
| 	c.l.Lock()
 | |
| 	defer c.l.Unlock()
 | |
| 
 | |
| 	nextSlot := c.currentSlot + 1
 | |
| 	if nextSlot > MaxSlotsIdx {
 | |
| 		nextSlot = 0
 | |
| 	}
 | |
| 
 | |
| 	c.currentSlot = nextSlot
 | |
| }
 | |
| 
 | |
| // dispatch gets jobs from the current slot, resets the slot
 | |
| // and dispatch all jobs to the workers.
 | |
| //
 | |
| // It all the workers are busy, the jobs are re-schedule in the same slot
 | |
| // to be executed in the next cycle.
 | |
| func (c *SchedulerCycle) dispatch() {
 | |
| 	row, jobs := c.getCurrentSlotJobs()
 | |
| 	for _, j := range jobs {
 | |
| 		if j.GetState() == job.Abort {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		select {
 | |
| 		case c.chJobs <- &JobSlot{row: row, Job: j}:
 | |
| 		default:
 | |
| 			log.Warn().Msg("unable to put job in workers, trying next cycle")
 | |
| 			c.updateSlot(row, j)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // run launches the workers and the ticker.
 | |
| func (c *SchedulerCycle) run() {
 | |
| 	c.workers()
 | |
| 	c.tick()
 | |
| }
 | |
| 
 | |
| // workers launches `MaxWorkers` number of worker to execute job.
 | |
| // If job returns `ErrJobNotCompletedYet`, it re-schedules in the same slot.
 | |
| func (c *SchedulerCycle) workers() {
 | |
| 	for i := 0; i < MaxWorkers; i++ {
 | |
| 		c.wg.Add(1)
 | |
| 		go func() {
 | |
| 			defer c.wg.Done()
 | |
| 			for {
 | |
| 				select {
 | |
| 				case j := <-c.chJobs:
 | |
| 					c.executeJob(j.Job, c.updateCurrentSlot)
 | |
| 				case <-c.ctx.Done():
 | |
| 					log.Error().Msg("context done, worker is stopping...")
 | |
| 					return
 | |
| 				}
 | |
| 			}
 | |
| 		}()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (c *SchedulerCycle) executeJob(j *job.Job, fnFallBack func(*job.Job)) {
 | |
| 	j.Run(c.ctx)
 | |
| 	if j.GetState() == job.Pending {
 | |
| 		fnFallBack(j)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // tick is a simple ticker incrementing at each scheduler interval,
 | |
| // the slot cursor and dispatch jobs to the workers.
 | |
| func (c *SchedulerCycle) tick() {
 | |
| 	c.wg.Add(1)
 | |
| 	go func() {
 | |
| 		defer c.wg.Done()
 | |
| 		for {
 | |
| 			select {
 | |
| 			case <-c.ctx.Done():
 | |
| 				log.Error().Msg("context done, ticker is stopping...")
 | |
| 				return
 | |
| 			default:
 | |
| 				time.Sleep(c.interval)
 | |
| 				c.incr()
 | |
| 				c.dispatch()
 | |
| 			}
 | |
| 		}
 | |
| 	}()
 | |
| }
 |