From 3cbea438f652a260d541fe185e6b1cdf26498561 Mon Sep 17 00:00:00 2001 From: rmanach Date: Tue, 24 Sep 2024 12:29:41 +0200 Subject: [PATCH] replace slot attempts int with atomics --- internal/scheduler/scheduler.go | 36 ++++++++++++++++----------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 32deccd..6d8304a 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -6,6 +6,7 @@ import ( "fmt" "strings" "sync" + "sync/atomic" "time" "github.com/google/uuid" @@ -35,36 +36,32 @@ type JobSlotDetails struct { } type jobSlot struct { - l sync.RWMutex - job.Job - slot int - attempts int + *job.Job + slot atomic.Uint32 + attempts atomic.Uint32 // priority Priority } -func newJobSlot(task job.FnJob, slot int) jobSlot { - return jobSlot{ - Job: job.NewJob(task), - slot: slot, +func newJobSlot(task job.FnJob, slot int) *jobSlot { + j := job.NewJob(task) + js := jobSlot{ + Job: &j, } + + js.slot.Add(uint32(slot)) + + return &js } func (j *jobSlot) run(ctx context.Context) { - j.l.Lock() - defer j.l.Unlock() - - j.attempts += 1 - + j.attempts.Add(1) j.Job.Run(ctx) } func (j *jobSlot) getDetails() JobSlotDetails { - j.l.RLock() - defer j.l.RUnlock() - return JobSlotDetails{ JobDetails: j.IntoDetails(), - Attempts: j.attempts, + Attempts: int(j.attempts.Load()), } } @@ -175,8 +172,8 @@ func (c *SchedulerCycle) Delay(fnJob job.FnJob) uuid.UUID { j := newJobSlot(fnJob, nextSlot) - c.slots[nextSlot] = append(c.slots[nextSlot], &j) - c.jobs[j.GetID()] = &j + c.slots[nextSlot] = append(c.slots[nextSlot], j) + c.jobs[j.GetID()] = j log.Info().Str("job", j.GetID().String()).Msg("job added successfully") return j.GetID() @@ -329,6 +326,7 @@ func (c *SchedulerCycle) updateCurrentSlot(j *jobSlot) { c.l.Lock() defer c.l.Unlock() + j.slot.CompareAndSwap(j.slot.Load(), uint32(c.currentSlot)) c.slots[c.currentSlot] = append(c.slots[c.currentSlot], j) }