diff --git a/internal/job/job.go b/internal/job/job.go index 944f282..781ffde 100644 --- a/internal/job/job.go +++ b/internal/job/job.go @@ -59,7 +59,6 @@ type JobDetails struct { Err string `json:"error"` } -// TODO(rmanach): add priority level type Job struct { l sync.RWMutex id uuid.UUID @@ -130,7 +129,10 @@ func (j *Job) setFail(err error) { now := time.Now().UTC() j.updatedAt = &now - j.state = Failed + if j.state != Abort { + j.state = Failed + } + j.err = err } diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 7f5dc33..a168489 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -29,18 +29,42 @@ const ( High ) -type jobSlot struct { - *job.Job - row int - col int +type JobSlotDetails struct { + job.JobDetails + Attempts int `json:"attempts"` } -func newJobSlot(task job.FnJob, row, col int) jobSlot { - j := job.NewJob(task) +type jobSlot struct { + l sync.RWMutex + job.Job + slot int + attempts int + // priority Priority +} + +func newJobSlot(task job.FnJob, slot int) jobSlot { return jobSlot{ - Job: &j, - row: row, - col: col, + Job: job.NewJob(task), + slot: slot, + } +} + +func (j *jobSlot) run(ctx context.Context) { + j.l.Lock() + defer j.l.Unlock() + + j.attempts += 1 + + j.Job.Run(ctx) +} + +func (j *jobSlot) getDetails() JobSlotDetails { + j.l.RLock() + defer j.l.RUnlock() + + return JobSlotDetails{ + JobDetails: j.IntoDetails(), + Attempts: j.attempts, } } @@ -95,22 +119,22 @@ func (c *SchedulerCycle) Done() <-chan struct{} { done := make(chan struct{}) go func() { <-c.ctx.Done() - c.wg.Done() + c.wg.Wait() done <- struct{}{} }() return done } func (c *SchedulerCycle) Len() int { - c.l.Lock() - defer c.l.Unlock() + c.l.RLock() + defer c.l.RUnlock() return len(c.jobs) } func (c *SchedulerCycle) HasAllJobsDone() bool { - c.l.Lock() - defer c.l.Unlock() + c.l.RLock() + defer c.l.RUnlock() for _, j := range c.jobs { if j.GetState() == job.Pending || j.GetState() == job.Running { @@ -121,13 +145,13 @@ func (c *SchedulerCycle) HasAllJobsDone() bool { return true } -func (c *SchedulerCycle) GetJobsDetails() []job.JobDetails { - c.l.Lock() - defer c.l.Unlock() +func (c *SchedulerCycle) GetJobsDetails() []JobSlotDetails { + c.l.RLock() + defer c.l.RUnlock() - details := []job.JobDetails{} + details := []JobSlotDetails{} for _, j := range c.jobs { - details = append(details, j.IntoDetails()) + details = append(details, j.getDetails()) } return details @@ -135,6 +159,12 @@ func (c *SchedulerCycle) GetJobsDetails() []job.JobDetails { // Delay builds a job and add it to the scheduler engine. func (c *SchedulerCycle) Delay(fnJob job.FnJob) uuid.UUID { + select { + case <-c.Done(): + log.Error().Msg("context done unable to add new job") + default: + } + c.l.Lock() defer c.l.Unlock() @@ -143,7 +173,7 @@ func (c *SchedulerCycle) Delay(fnJob job.FnJob) uuid.UUID { nextSlot = 0 } - j := newJobSlot(fnJob, nextSlot, len(c.slots[nextSlot])) + j := newJobSlot(fnJob, nextSlot) c.slots[nextSlot] = append(c.slots[nextSlot], &j) c.jobs[j.GetID()] = &j @@ -165,18 +195,20 @@ func (c *SchedulerCycle) Abort(id uuid.UUID) bool { } // GetJobDetails returns the job details by . -func (c *SchedulerCycle) GetJobDetails(id uuid.UUID) job.JobDetails { - c.l.Lock() - defer c.l.Unlock() +func (c *SchedulerCycle) GetJobDetails(id uuid.UUID) JobSlotDetails { + c.l.RLock() + defer c.l.RUnlock() j, ok := c.jobs[id] if !ok { - return job.JobDetails{ - State: job.Unknown.String(), + return JobSlotDetails{ + JobDetails: job.JobDetails{ + State: job.UnknownState, + }, } } - return j.IntoDetails() + return j.getDetails() } // Display outputs earch interval the scheduler state. @@ -285,11 +317,11 @@ func (c *SchedulerCycle) getCurrentSlotJobs() (int, []*jobSlot) { } // updateSlot add a job to the slot where it was before. -func (c *SchedulerCycle) updateSlot(row int, j *jobSlot) { +func (c *SchedulerCycle) updateSlot(slot int, j *jobSlot) { c.l.Lock() defer c.l.Unlock() - c.slots[row] = append(c.slots[row], j) + c.slots[slot] = append(c.slots[slot], j) } // updateCurrentSlot add a job to the current slot. @@ -320,7 +352,7 @@ func (c *SchedulerCycle) incr() { // It all the workers are busy, the jobs are re-schedule in the same slot // to be executed in the next cycle. func (c *SchedulerCycle) dispatch() { - row, jobs := c.getCurrentSlotJobs() + slot, jobs := c.getCurrentSlotJobs() for _, j := range jobs { if j.GetState() == job.Abort { continue @@ -330,7 +362,7 @@ func (c *SchedulerCycle) dispatch() { case c.chJobs <- j: default: log.Warn().Msg("unable to put job in workers, trying next cycle") - c.updateSlot(row, j) + c.updateSlot(slot, j) } } } @@ -362,7 +394,7 @@ func (c *SchedulerCycle) workers() { } func (c *SchedulerCycle) executeJob(j *jobSlot, fnFallBack func(*jobSlot)) { - j.Run(c.ctx) + j.run(c.ctx) if j.GetState() == job.Pending { fnFallBack(j) }