From 4da553d156ba16f20bb12b9c34ad82f682339456 Mon Sep 17 00:00:00 2001 From: rmanach Date: Tue, 24 Sep 2024 10:44:33 +0200 Subject: [PATCH] wrap job in job slot --- internal/job/job.go | 2 +- internal/scheduler/scheduler.go | 50 ++++++++++++++++++++++----------- 2 files changed, 35 insertions(+), 17 deletions(-) diff --git a/internal/job/job.go b/internal/job/job.go index 617735f..944f282 100644 --- a/internal/job/job.go +++ b/internal/job/job.go @@ -71,7 +71,7 @@ type Job struct { chAbort chan struct{} } -func NewJob(task FnJob, row, col int) Job { +func NewJob(task FnJob) Job { return Job{ id: uuid.New(), createdAt: time.Now().UTC(), diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index d5e7351..7f5dc33 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -21,9 +21,27 @@ const ( const MaxSlotsIdx = 59 -type JobSlot struct { +type Priority int + +const ( + Low Priority = iota + Medium + High +) + +type jobSlot struct { *job.Job row int + col int +} + +func newJobSlot(task job.FnJob, row, col int) jobSlot { + j := job.NewJob(task) + return jobSlot{ + Job: &j, + row: row, + col: col, + } } // SchedulerCycle is a dumb scheduler. @@ -44,10 +62,10 @@ type SchedulerCycle struct { interval time.Duration currentSlot int - slots [60][]*job.Job - jobs map[uuid.UUID]*job.Job + slots [60][]*jobSlot + jobs map[uuid.UUID]*jobSlot - chJobs chan *JobSlot + chJobs chan *jobSlot } func NewSchedulerCycle(ctx context.Context, interval time.Duration) *SchedulerCycle { @@ -59,9 +77,9 @@ func NewSchedulerCycle(ctx context.Context, interval time.Duration) *SchedulerCy fnCancel: fnCancel, interval: interval, currentSlot: 0, - slots: [60][]*job.Job{}, - jobs: make(map[uuid.UUID]*job.Job), - chJobs: make(chan *JobSlot), + slots: [60][]*jobSlot{}, + jobs: make(map[uuid.UUID]*jobSlot), + chJobs: make(chan *jobSlot), } c.run() @@ -125,7 +143,7 @@ func (c *SchedulerCycle) Delay(fnJob job.FnJob) uuid.UUID { nextSlot = 0 } - j := job.NewJob(fnJob, nextSlot, len(c.slots[nextSlot])) + j := newJobSlot(fnJob, nextSlot, len(c.slots[nextSlot])) c.slots[nextSlot] = append(c.slots[nextSlot], &j) c.jobs[j.GetID()] = &j @@ -241,7 +259,7 @@ func (c *SchedulerCycle) display() { //nolint:gocyclo // not complex fmt.Println(tableFormat) } -func (c *SchedulerCycle) getJob(id uuid.UUID) *job.Job { +func (c *SchedulerCycle) getJob(id uuid.UUID) *jobSlot { c.l.RLock() defer c.l.RUnlock() @@ -255,19 +273,19 @@ func (c *SchedulerCycle) getJob(id uuid.UUID) *job.Job { // getCurrentSlotJobs collects all the current slot jobs // and clean the slot. -func (c *SchedulerCycle) getCurrentSlotJobs() (int, []*job.Job) { +func (c *SchedulerCycle) getCurrentSlotJobs() (int, []*jobSlot) { c.l.Lock() defer c.l.Unlock() jobs := c.slots[c.currentSlot] - c.slots[c.currentSlot] = []*job.Job{} + c.slots[c.currentSlot] = []*jobSlot{} return c.currentSlot, jobs } // updateSlot add a job to the slot where it was before. -func (c *SchedulerCycle) updateSlot(row int, j *job.Job) { +func (c *SchedulerCycle) updateSlot(row int, j *jobSlot) { c.l.Lock() defer c.l.Unlock() @@ -275,7 +293,7 @@ func (c *SchedulerCycle) updateSlot(row int, j *job.Job) { } // updateCurrentSlot add a job to the current slot. -func (c *SchedulerCycle) updateCurrentSlot(j *job.Job) { +func (c *SchedulerCycle) updateCurrentSlot(j *jobSlot) { c.l.Lock() defer c.l.Unlock() @@ -309,7 +327,7 @@ func (c *SchedulerCycle) dispatch() { } select { - case c.chJobs <- &JobSlot{row: row, Job: j}: + case c.chJobs <- j: default: log.Warn().Msg("unable to put job in workers, trying next cycle") c.updateSlot(row, j) @@ -333,7 +351,7 @@ func (c *SchedulerCycle) workers() { for { select { case j := <-c.chJobs: - c.executeJob(j.Job, c.updateCurrentSlot) + c.executeJob(j, c.updateCurrentSlot) case <-c.ctx.Done(): log.Error().Msg("context done, worker is stopping...") return @@ -343,7 +361,7 @@ func (c *SchedulerCycle) workers() { } } -func (c *SchedulerCycle) executeJob(j *job.Job, fnFallBack func(*job.Job)) { +func (c *SchedulerCycle) executeJob(j *jobSlot, fnFallBack func(*jobSlot)) { j.Run(c.ctx) if j.GetState() == job.Pending { fnFallBack(j)