422 lines
7.9 KiB
Go

package scheduler
import (
"context"
"cycle-scheduler/internal/job"
"fmt"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/rs/zerolog/log"
)
const (
TableTitle = "# cycle-scheduler"
Cursor = "^"
CycleLength = 60
MaxWorkers = 5
)
const MaxSlotsIdx = 59
type Priority int
const (
Low Priority = iota
Medium
High
)
type JobSlotDetails struct {
job.JobDetails
Attempts int `json:"attempts"`
}
type jobSlot struct {
l sync.RWMutex
job.Job
slot int
attempts int
// priority Priority
}
func newJobSlot(task job.FnJob, slot int) jobSlot {
return jobSlot{
Job: job.NewJob(task),
slot: slot,
}
}
func (j *jobSlot) run(ctx context.Context) {
j.l.Lock()
defer j.l.Unlock()
j.attempts += 1
j.Job.Run(ctx)
}
func (j *jobSlot) getDetails() JobSlotDetails {
j.l.RLock()
defer j.l.RUnlock()
return JobSlotDetails{
JobDetails: j.IntoDetails(),
Attempts: j.attempts,
}
}
// SchedulerCycle is a dumb scheduler.
// It handle job and executes it at each cycle (60 * interval).
//
// Jobs are handle in a array of job slices.
// At each interval (clock), the cursor moves to the next slot (s*).
// If there are jobs, they are sent to workers to be executed
// and the slot is cleaned.
//
// At the end of the slot (s60), the cursor re-starts a cycle at s1.
type SchedulerCycle struct {
l sync.RWMutex
wg sync.WaitGroup
ctx context.Context
fnCancel context.CancelFunc
interval time.Duration
currentSlot int
slots [60][]*jobSlot
jobs map[uuid.UUID]*jobSlot
chJobs chan *jobSlot
}
func NewSchedulerCycle(ctx context.Context, interval time.Duration) *SchedulerCycle {
ctxChild, fnCancel := context.WithCancel(ctx)
c := SchedulerCycle{
wg: sync.WaitGroup{},
ctx: ctxChild,
fnCancel: fnCancel,
interval: interval,
currentSlot: 0,
slots: [60][]*jobSlot{},
jobs: make(map[uuid.UUID]*jobSlot),
chJobs: make(chan *jobSlot),
}
c.run()
return &c
}
func (c *SchedulerCycle) Stop() {
c.fnCancel()
}
func (c *SchedulerCycle) Done() <-chan struct{} {
done := make(chan struct{})
go func() {
<-c.ctx.Done()
c.wg.Wait()
done <- struct{}{}
}()
return done
}
func (c *SchedulerCycle) Len() int {
c.l.RLock()
defer c.l.RUnlock()
return len(c.jobs)
}
func (c *SchedulerCycle) HasAllJobsDone() bool {
c.l.RLock()
defer c.l.RUnlock()
for _, j := range c.jobs {
if j.GetState() == job.Pending || j.GetState() == job.Running {
return false
}
}
return true
}
func (c *SchedulerCycle) GetJobsDetails() []JobSlotDetails {
c.l.RLock()
defer c.l.RUnlock()
details := []JobSlotDetails{}
for _, j := range c.jobs {
details = append(details, j.getDetails())
}
return details
}
// Delay builds a job and add it to the scheduler engine.
func (c *SchedulerCycle) Delay(fnJob job.FnJob) uuid.UUID {
select {
case <-c.Done():
log.Error().Msg("context done unable to add new job")
default:
}
c.l.Lock()
defer c.l.Unlock()
nextSlot := c.currentSlot + 1
if nextSlot > MaxSlotsIdx {
nextSlot = 0
}
j := newJobSlot(fnJob, nextSlot)
c.slots[nextSlot] = append(c.slots[nextSlot], &j)
c.jobs[j.GetID()] = &j
log.Info().Str("job", j.GetID().String()).Msg("job added successfully")
return j.GetID()
}
// Abort aborts the job given by its id if it exists..
func (c *SchedulerCycle) Abort(id uuid.UUID) bool {
if j := c.getJob(id); j != nil {
j.Abort()
log.Info().Str("job", j.GetID().String()).Msg("abort job done")
return true
}
return false
}
// GetJobDetails returns the job details by .
func (c *SchedulerCycle) GetJobDetails(id uuid.UUID) JobSlotDetails {
c.l.RLock()
defer c.l.RUnlock()
j, ok := c.jobs[id]
if !ok {
return JobSlotDetails{
JobDetails: job.JobDetails{
State: job.UnknownState,
},
}
}
return j.getDetails()
}
// Display outputs earch interval the scheduler state.
func (c *SchedulerCycle) Display() {
ticker := time.NewTicker(c.interval)
go func() {
for range ticker.C {
c.display()
}
}()
}
// display writes to stdout the state of the scheduler as a table.
func (c *SchedulerCycle) display() { //nolint:gocyclo // not complex
c.l.RLock()
defer c.l.RUnlock()
var maxCols int
for i := range c.slots {
if l := len(c.slots[i]); l > maxCols {
maxCols = l
}
}
table := [][]string{}
title := fmt.Sprintf("%s (slot: %d)", TableTitle, c.currentSlot+1)
table = append(table, []string{title})
for {
if maxCols == 0 {
break
}
row := make([]string, CycleLength)
for i := 0; i <= MaxSlotsIdx; i++ {
row[i] = "_"
}
for i := range c.slots {
if len(c.slots[i]) < maxCols {
continue
}
j := c.slots[i][maxCols-1]
switch j.GetState() {
case job.Pending:
row[i] = "P"
case job.Running:
row[i] = "R"
case job.Failed:
row[i] = "X"
case job.Abort:
row[i] = "A"
case job.Unknown:
row[i] = "?"
case job.Success:
row[i] = "O"
}
}
table = append(table, row)
maxCols--
}
row := make([]string, CycleLength)
for i := 0; i <= MaxSlotsIdx; i++ {
row[i] = "-"
}
table = append(table, row)
if l := len(table); l > 0 {
table[l-1][c.currentSlot] = Cursor
}
tableFormat := ""
for _, r := range table {
tableFormat += strings.Join(r, " ")
tableFormat += "\n"
}
fmt.Println(tableFormat)
}
func (c *SchedulerCycle) getJob(id uuid.UUID) *jobSlot {
c.l.RLock()
defer c.l.RUnlock()
j, ok := c.jobs[id]
if !ok {
return nil
}
return j
}
// getCurrentSlotJobs collects all the current slot jobs
// and clean the slot.
func (c *SchedulerCycle) getCurrentSlotJobs() (int, []*jobSlot) {
c.l.Lock()
defer c.l.Unlock()
jobs := c.slots[c.currentSlot]
c.slots[c.currentSlot] = []*jobSlot{}
return c.currentSlot, jobs
}
// updateSlot add a job to the slot where it was before.
func (c *SchedulerCycle) updateSlot(slot int, j *jobSlot) {
c.l.Lock()
defer c.l.Unlock()
c.slots[slot] = append(c.slots[slot], j)
}
// updateCurrentSlot add a job to the current slot.
func (c *SchedulerCycle) updateCurrentSlot(j *jobSlot) {
c.l.Lock()
defer c.l.Unlock()
c.slots[c.currentSlot] = append(c.slots[c.currentSlot], j)
}
// incr increments the slot cursor.
// It the cursor reaches `MaxSlotsIdx`, it goes back to 0.
func (c *SchedulerCycle) incr() {
c.l.Lock()
defer c.l.Unlock()
nextSlot := c.currentSlot + 1
if nextSlot > MaxSlotsIdx {
nextSlot = 0
}
c.currentSlot = nextSlot
}
// dispatch gets jobs from the current slot, resets the slot
// and dispatch all jobs to the workers.
//
// It all the workers are busy, the jobs are re-schedule in the same slot
// to be executed in the next cycle.
func (c *SchedulerCycle) dispatch() {
slot, jobs := c.getCurrentSlotJobs()
for _, j := range jobs {
if j.GetState() == job.Abort {
continue
}
select {
case c.chJobs <- j:
default:
log.Warn().Msg("unable to put job in workers, trying next cycle")
c.updateSlot(slot, j)
}
}
}
// run launches the workers and the ticker.
func (c *SchedulerCycle) run() {
c.workers()
c.tick()
}
// workers launches `MaxWorkers` number of worker to execute job.
// If job returns `ErrJobNotCompletedYet`, it re-schedules in the same slot.
func (c *SchedulerCycle) workers() {
for i := 0; i < MaxWorkers; i++ {
c.wg.Add(1)
go func() {
defer c.wg.Done()
for {
select {
case j := <-c.chJobs:
c.executeJob(j, c.updateCurrentSlot)
case <-c.ctx.Done():
log.Error().Msg("context done, worker is stopping...")
return
}
}
}()
}
}
func (c *SchedulerCycle) executeJob(j *jobSlot, fnFallBack func(*jobSlot)) {
j.run(c.ctx)
if j.GetState() == job.Pending {
fnFallBack(j)
}
}
// tick is a simple ticker incrementing at each scheduler interval,
// the slot cursor and dispatch jobs to the workers.
func (c *SchedulerCycle) tick() {
c.wg.Add(1)
go func() {
defer c.wg.Done()
for {
select {
case <-c.ctx.Done():
log.Error().Msg("context done, ticker is stopping...")
return
default:
time.Sleep(c.interval)
c.incr()
c.dispatch()
}
}
}()
}