372 lines
7.2 KiB
Go
372 lines
7.2 KiB
Go
package scheduler
|
|
|
|
import (
|
|
"context"
|
|
"cycle-scheduler/internal/job"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
const (
|
|
TableTitle = "# cycle-scheduler"
|
|
Cursor = "^"
|
|
CycleLength = 60
|
|
MaxWorkers = 5
|
|
)
|
|
|
|
const MaxSlotsIdx = 59
|
|
|
|
type JobSlot struct {
|
|
*job.Job
|
|
row int
|
|
}
|
|
|
|
// SchedulerCycle is a dumb scheduler.
|
|
// It handle job and executes it at each cycle (60 * interval).
|
|
//
|
|
// Jobs are handle in a array of job slices.
|
|
// At each interval (clock), the cursor moves to the next slot (s*).
|
|
// If there are jobs, they are sent to workers to be executed
|
|
// and the slot is cleaned.
|
|
//
|
|
// At the end of the slot (s60), the cursor re-starts a cycle at s1.
|
|
type SchedulerCycle struct {
|
|
l sync.RWMutex
|
|
wg sync.WaitGroup
|
|
|
|
ctx context.Context
|
|
fnCancel context.CancelFunc
|
|
|
|
interval time.Duration
|
|
currentSlot int
|
|
slots [60][]*job.Job
|
|
jobs map[uuid.UUID]*job.Job
|
|
|
|
chJobs chan *JobSlot
|
|
}
|
|
|
|
func NewSchedulerCycle(ctx context.Context, interval time.Duration) *SchedulerCycle {
|
|
ctxChild, fnCancel := context.WithCancel(ctx)
|
|
|
|
c := SchedulerCycle{
|
|
wg: sync.WaitGroup{},
|
|
ctx: ctxChild,
|
|
fnCancel: fnCancel,
|
|
interval: interval,
|
|
currentSlot: 0,
|
|
slots: [60][]*job.Job{},
|
|
jobs: make(map[uuid.UUID]*job.Job),
|
|
chJobs: make(chan *JobSlot),
|
|
}
|
|
|
|
c.run()
|
|
|
|
return &c
|
|
}
|
|
|
|
func (c *SchedulerCycle) Stop() {
|
|
c.fnCancel()
|
|
}
|
|
|
|
func (c *SchedulerCycle) Done() <-chan struct{} {
|
|
done := make(chan struct{})
|
|
go func() {
|
|
<-c.ctx.Done()
|
|
c.wg.Done()
|
|
done <- struct{}{}
|
|
}()
|
|
return done
|
|
}
|
|
|
|
func (c *SchedulerCycle) Len() int {
|
|
c.l.Lock()
|
|
defer c.l.Unlock()
|
|
|
|
return len(c.jobs)
|
|
}
|
|
|
|
func (c *SchedulerCycle) HasAllJobsDone() bool {
|
|
c.l.Lock()
|
|
defer c.l.Unlock()
|
|
|
|
for _, j := range c.jobs {
|
|
if j.GetState() == job.Pending || j.GetState() == job.Running {
|
|
return false
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func (c *SchedulerCycle) GetJobsDetails() []job.JobDetails {
|
|
c.l.Lock()
|
|
defer c.l.Unlock()
|
|
|
|
details := []job.JobDetails{}
|
|
for _, j := range c.jobs {
|
|
details = append(details, j.IntoDetails())
|
|
}
|
|
|
|
return details
|
|
}
|
|
|
|
// Delay builds a job and add it to the scheduler engine.
|
|
func (c *SchedulerCycle) Delay(fnJob job.FnJob) uuid.UUID {
|
|
c.l.Lock()
|
|
defer c.l.Unlock()
|
|
|
|
nextSlot := c.currentSlot + 1
|
|
if nextSlot > MaxSlotsIdx {
|
|
nextSlot = 0
|
|
}
|
|
|
|
j := job.NewJob(fnJob, nextSlot, len(c.slots[nextSlot]))
|
|
|
|
c.slots[nextSlot] = append(c.slots[nextSlot], &j)
|
|
c.jobs[j.GetID()] = &j
|
|
|
|
log.Info().Str("job", j.GetID().String()).Msg("job added successfully")
|
|
return j.GetID()
|
|
}
|
|
|
|
// Abort aborts the job given by its id if it exists..
|
|
func (c *SchedulerCycle) Abort(id uuid.UUID) bool {
|
|
if j := c.getJob(id); j != nil {
|
|
j.Abort()
|
|
|
|
log.Info().Str("job", j.GetID().String()).Msg("abort job done")
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// GetJobDetails returns the job details by .
|
|
func (c *SchedulerCycle) GetJobDetails(id uuid.UUID) job.JobDetails {
|
|
c.l.Lock()
|
|
defer c.l.Unlock()
|
|
|
|
j, ok := c.jobs[id]
|
|
if !ok {
|
|
return job.JobDetails{
|
|
State: job.Unknown.String(),
|
|
}
|
|
}
|
|
|
|
return j.IntoDetails()
|
|
}
|
|
|
|
// Display outputs earch interval the scheduler state.
|
|
func (c *SchedulerCycle) Display() {
|
|
ticker := time.NewTicker(c.interval)
|
|
go func() {
|
|
for range ticker.C {
|
|
c.display()
|
|
}
|
|
}()
|
|
}
|
|
|
|
// display writes to stdout the state of the scheduler as a table.
|
|
func (c *SchedulerCycle) display() { //nolint:gocyclo // not complex
|
|
c.l.RLock()
|
|
defer c.l.RUnlock()
|
|
|
|
var maxCols int
|
|
for i := range c.slots {
|
|
if l := len(c.slots[i]); l > maxCols {
|
|
maxCols = l
|
|
}
|
|
}
|
|
|
|
table := [][]string{}
|
|
title := fmt.Sprintf("%s (slot: %d)", TableTitle, c.currentSlot+1)
|
|
table = append(table, []string{title})
|
|
for {
|
|
if maxCols == 0 {
|
|
break
|
|
}
|
|
|
|
row := make([]string, CycleLength)
|
|
for i := 0; i <= MaxSlotsIdx; i++ {
|
|
row[i] = "_"
|
|
}
|
|
|
|
for i := range c.slots {
|
|
if len(c.slots[i]) < maxCols {
|
|
continue
|
|
}
|
|
|
|
j := c.slots[i][maxCols-1]
|
|
switch j.GetState() {
|
|
case job.Pending:
|
|
row[i] = "P"
|
|
case job.Running:
|
|
row[i] = "R"
|
|
case job.Failed:
|
|
row[i] = "X"
|
|
case job.Abort:
|
|
row[i] = "A"
|
|
case job.Unknown:
|
|
row[i] = "?"
|
|
case job.Success:
|
|
row[i] = "O"
|
|
}
|
|
}
|
|
|
|
table = append(table, row)
|
|
maxCols--
|
|
}
|
|
|
|
row := make([]string, CycleLength)
|
|
for i := 0; i <= MaxSlotsIdx; i++ {
|
|
row[i] = "-"
|
|
}
|
|
table = append(table, row)
|
|
|
|
if l := len(table); l > 0 {
|
|
table[l-1][c.currentSlot] = Cursor
|
|
}
|
|
|
|
tableFormat := ""
|
|
for _, r := range table {
|
|
tableFormat += strings.Join(r, " ")
|
|
tableFormat += "\n"
|
|
}
|
|
|
|
fmt.Println(tableFormat)
|
|
}
|
|
|
|
func (c *SchedulerCycle) getJob(id uuid.UUID) *job.Job {
|
|
c.l.RLock()
|
|
defer c.l.RUnlock()
|
|
|
|
j, ok := c.jobs[id]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
return j
|
|
}
|
|
|
|
// getCurrentSlotJobs collects all the current slot jobs
|
|
// and clean the slot.
|
|
func (c *SchedulerCycle) getCurrentSlotJobs() (int, []*job.Job) {
|
|
c.l.Lock()
|
|
defer c.l.Unlock()
|
|
|
|
jobs := c.slots[c.currentSlot]
|
|
|
|
c.slots[c.currentSlot] = []*job.Job{}
|
|
|
|
return c.currentSlot, jobs
|
|
}
|
|
|
|
// updateSlot add a job to the slot where it was before.
|
|
func (c *SchedulerCycle) updateSlot(row int, j *job.Job) {
|
|
c.l.Lock()
|
|
defer c.l.Unlock()
|
|
|
|
c.slots[row] = append(c.slots[row], j)
|
|
}
|
|
|
|
// updateCurrentSlot add a job to the current slot.
|
|
func (c *SchedulerCycle) updateCurrentSlot(j *job.Job) {
|
|
c.l.Lock()
|
|
defer c.l.Unlock()
|
|
|
|
c.slots[c.currentSlot] = append(c.slots[c.currentSlot], j)
|
|
}
|
|
|
|
// incr increments the slot cursor.
|
|
// It the cursor reaches `MaxSlotsIdx`, it goes back to 0.
|
|
func (c *SchedulerCycle) incr() {
|
|
c.l.Lock()
|
|
defer c.l.Unlock()
|
|
|
|
nextSlot := c.currentSlot + 1
|
|
if nextSlot > MaxSlotsIdx {
|
|
nextSlot = 0
|
|
}
|
|
|
|
c.currentSlot = nextSlot
|
|
}
|
|
|
|
// dispatch gets jobs from the current slot, resets the slot
|
|
// and dispatch all jobs to the workers.
|
|
//
|
|
// It all the workers are busy, the jobs are re-schedule in the same slot
|
|
// to be executed in the next cycle.
|
|
func (c *SchedulerCycle) dispatch() {
|
|
row, jobs := c.getCurrentSlotJobs()
|
|
for _, j := range jobs {
|
|
if j.GetState() == job.Abort {
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case c.chJobs <- &JobSlot{row: row, Job: j}:
|
|
default:
|
|
log.Warn().Msg("unable to put job in workers, trying next cycle")
|
|
c.updateSlot(row, j)
|
|
}
|
|
}
|
|
}
|
|
|
|
// run launches the workers and the ticker.
|
|
func (c *SchedulerCycle) run() {
|
|
c.workers()
|
|
c.tick()
|
|
}
|
|
|
|
// workers launches `MaxWorkers` number of worker to execute job.
|
|
// If job returns `ErrJobNotCompletedYet`, it re-schedules in the same slot.
|
|
func (c *SchedulerCycle) workers() {
|
|
for i := 0; i < MaxWorkers; i++ {
|
|
c.wg.Add(1)
|
|
go func() {
|
|
defer c.wg.Done()
|
|
for {
|
|
select {
|
|
case j := <-c.chJobs:
|
|
c.executeJob(j.Job, c.updateCurrentSlot)
|
|
case <-c.ctx.Done():
|
|
log.Error().Msg("context done, worker is stopping...")
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
|
|
func (c *SchedulerCycle) executeJob(j *job.Job, fnFallBack func(*job.Job)) {
|
|
j.Run(c.ctx)
|
|
if j.GetState() == job.Pending {
|
|
fnFallBack(j)
|
|
}
|
|
}
|
|
|
|
// tick is a simple ticker incrementing at each scheduler interval,
|
|
// the slot cursor and dispatch jobs to the workers.
|
|
func (c *SchedulerCycle) tick() {
|
|
c.wg.Add(1)
|
|
go func() {
|
|
defer c.wg.Done()
|
|
for {
|
|
select {
|
|
case <-c.ctx.Done():
|
|
log.Error().Msg("context done, ticker is stopping...")
|
|
return
|
|
default:
|
|
time.Sleep(c.interval)
|
|
c.incr()
|
|
c.dispatch()
|
|
}
|
|
}
|
|
}()
|
|
}
|