package scheduler import ( "context" "cycle-scheduler/internal/job" "fmt" "strings" "sync" "time" "github.com/google/uuid" "github.com/rs/zerolog/log" ) const ( TableTitle = "# cycle-scheduler" Cursor = "^" CycleLength = 60 MaxWorkers = 5 ) const MaxSlotsIdx = 59 type Priority int const ( Low Priority = iota Medium High ) type jobSlot struct { *job.Job row int col int } func newJobSlot(task job.FnJob, row, col int) jobSlot { j := job.NewJob(task) return jobSlot{ Job: &j, row: row, col: col, } } // SchedulerCycle is a dumb scheduler. // It handle job and executes it at each cycle (60 * interval). // // Jobs are handle in a array of job slices. // At each interval (clock), the cursor moves to the next slot (s*). // If there are jobs, they are sent to workers to be executed // and the slot is cleaned. // // At the end of the slot (s60), the cursor re-starts a cycle at s1. type SchedulerCycle struct { l sync.RWMutex wg sync.WaitGroup ctx context.Context fnCancel context.CancelFunc interval time.Duration currentSlot int slots [60][]*jobSlot jobs map[uuid.UUID]*jobSlot chJobs chan *jobSlot } func NewSchedulerCycle(ctx context.Context, interval time.Duration) *SchedulerCycle { ctxChild, fnCancel := context.WithCancel(ctx) c := SchedulerCycle{ wg: sync.WaitGroup{}, ctx: ctxChild, fnCancel: fnCancel, interval: interval, currentSlot: 0, slots: [60][]*jobSlot{}, jobs: make(map[uuid.UUID]*jobSlot), chJobs: make(chan *jobSlot), } c.run() return &c } func (c *SchedulerCycle) Stop() { c.fnCancel() } func (c *SchedulerCycle) Done() <-chan struct{} { done := make(chan struct{}) go func() { <-c.ctx.Done() c.wg.Done() done <- struct{}{} }() return done } func (c *SchedulerCycle) Len() int { c.l.Lock() defer c.l.Unlock() return len(c.jobs) } func (c *SchedulerCycle) HasAllJobsDone() bool { c.l.Lock() defer c.l.Unlock() for _, j := range c.jobs { if j.GetState() == job.Pending || j.GetState() == job.Running { return false } } return true } func (c *SchedulerCycle) GetJobsDetails() []job.JobDetails { c.l.Lock() defer c.l.Unlock() details := []job.JobDetails{} for _, j := range c.jobs { details = append(details, j.IntoDetails()) } return details } // Delay builds a job and add it to the scheduler engine. func (c *SchedulerCycle) Delay(fnJob job.FnJob) uuid.UUID { c.l.Lock() defer c.l.Unlock() nextSlot := c.currentSlot + 1 if nextSlot > MaxSlotsIdx { nextSlot = 0 } j := newJobSlot(fnJob, nextSlot, len(c.slots[nextSlot])) c.slots[nextSlot] = append(c.slots[nextSlot], &j) c.jobs[j.GetID()] = &j log.Info().Str("job", j.GetID().String()).Msg("job added successfully") return j.GetID() } // Abort aborts the job given by its id if it exists.. func (c *SchedulerCycle) Abort(id uuid.UUID) bool { if j := c.getJob(id); j != nil { j.Abort() log.Info().Str("job", j.GetID().String()).Msg("abort job done") return true } return false } // GetJobDetails returns the job details by . func (c *SchedulerCycle) GetJobDetails(id uuid.UUID) job.JobDetails { c.l.Lock() defer c.l.Unlock() j, ok := c.jobs[id] if !ok { return job.JobDetails{ State: job.Unknown.String(), } } return j.IntoDetails() } // Display outputs earch interval the scheduler state. func (c *SchedulerCycle) Display() { ticker := time.NewTicker(c.interval) go func() { for range ticker.C { c.display() } }() } // display writes to stdout the state of the scheduler as a table. func (c *SchedulerCycle) display() { //nolint:gocyclo // not complex c.l.RLock() defer c.l.RUnlock() var maxCols int for i := range c.slots { if l := len(c.slots[i]); l > maxCols { maxCols = l } } table := [][]string{} title := fmt.Sprintf("%s (slot: %d)", TableTitle, c.currentSlot+1) table = append(table, []string{title}) for { if maxCols == 0 { break } row := make([]string, CycleLength) for i := 0; i <= MaxSlotsIdx; i++ { row[i] = "_" } for i := range c.slots { if len(c.slots[i]) < maxCols { continue } j := c.slots[i][maxCols-1] switch j.GetState() { case job.Pending: row[i] = "P" case job.Running: row[i] = "R" case job.Failed: row[i] = "X" case job.Abort: row[i] = "A" case job.Unknown: row[i] = "?" case job.Success: row[i] = "O" } } table = append(table, row) maxCols-- } row := make([]string, CycleLength) for i := 0; i <= MaxSlotsIdx; i++ { row[i] = "-" } table = append(table, row) if l := len(table); l > 0 { table[l-1][c.currentSlot] = Cursor } tableFormat := "" for _, r := range table { tableFormat += strings.Join(r, " ") tableFormat += "\n" } fmt.Println(tableFormat) } func (c *SchedulerCycle) getJob(id uuid.UUID) *jobSlot { c.l.RLock() defer c.l.RUnlock() j, ok := c.jobs[id] if !ok { return nil } return j } // getCurrentSlotJobs collects all the current slot jobs // and clean the slot. func (c *SchedulerCycle) getCurrentSlotJobs() (int, []*jobSlot) { c.l.Lock() defer c.l.Unlock() jobs := c.slots[c.currentSlot] c.slots[c.currentSlot] = []*jobSlot{} return c.currentSlot, jobs } // updateSlot add a job to the slot where it was before. func (c *SchedulerCycle) updateSlot(row int, j *jobSlot) { c.l.Lock() defer c.l.Unlock() c.slots[row] = append(c.slots[row], j) } // updateCurrentSlot add a job to the current slot. func (c *SchedulerCycle) updateCurrentSlot(j *jobSlot) { c.l.Lock() defer c.l.Unlock() c.slots[c.currentSlot] = append(c.slots[c.currentSlot], j) } // incr increments the slot cursor. // It the cursor reaches `MaxSlotsIdx`, it goes back to 0. func (c *SchedulerCycle) incr() { c.l.Lock() defer c.l.Unlock() nextSlot := c.currentSlot + 1 if nextSlot > MaxSlotsIdx { nextSlot = 0 } c.currentSlot = nextSlot } // dispatch gets jobs from the current slot, resets the slot // and dispatch all jobs to the workers. // // It all the workers are busy, the jobs are re-schedule in the same slot // to be executed in the next cycle. func (c *SchedulerCycle) dispatch() { row, jobs := c.getCurrentSlotJobs() for _, j := range jobs { if j.GetState() == job.Abort { continue } select { case c.chJobs <- j: default: log.Warn().Msg("unable to put job in workers, trying next cycle") c.updateSlot(row, j) } } } // run launches the workers and the ticker. func (c *SchedulerCycle) run() { c.workers() c.tick() } // workers launches `MaxWorkers` number of worker to execute job. // If job returns `ErrJobNotCompletedYet`, it re-schedules in the same slot. func (c *SchedulerCycle) workers() { for i := 0; i < MaxWorkers; i++ { c.wg.Add(1) go func() { defer c.wg.Done() for { select { case j := <-c.chJobs: c.executeJob(j, c.updateCurrentSlot) case <-c.ctx.Done(): log.Error().Msg("context done, worker is stopping...") return } } }() } } func (c *SchedulerCycle) executeJob(j *jobSlot, fnFallBack func(*jobSlot)) { j.Run(c.ctx) if j.GetState() == job.Pending { fnFallBack(j) } } // tick is a simple ticker incrementing at each scheduler interval, // the slot cursor and dispatch jobs to the workers. func (c *SchedulerCycle) tick() { c.wg.Add(1) go func() { defer c.wg.Done() for { select { case <-c.ctx.Done(): log.Error().Msg("context done, ticker is stopping...") return default: time.Sleep(c.interval) c.incr() c.dispatch() } } }() }