208 lines
		
	
	
		
			4.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			208 lines
		
	
	
		
			4.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package scheduler
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"cycle-scheduler/internal/job"
 | |
| 	"math"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/google/uuid"
 | |
| 	"github.com/rs/zerolog/log"
 | |
| )
 | |
| 
 | |
| const ExponentialFactor = 1.8
 | |
| 
 | |
| // SchedulerCycle is a simple scheduler handling jobs and executes them at regular interval.
 | |
| // If a task is not in desired state, the task is re-scheduled with a backoff.
 | |
| type SchedulerCycle struct {
 | |
| 	l  sync.RWMutex
 | |
| 	wg sync.WaitGroup
 | |
| 
 | |
| 	ctx      context.Context
 | |
| 	fnCancel context.CancelFunc
 | |
| 
 | |
| 	interval time.Duration
 | |
| 	tasks    map[uuid.UUID]*task
 | |
| 
 | |
| 	chTasks chan *task
 | |
| }
 | |
| 
 | |
| func NewSchedulerCycle(ctx context.Context, interval time.Duration, workers uint32) *SchedulerCycle {
 | |
| 	ctxChild, fnCancel := context.WithCancel(ctx)
 | |
| 
 | |
| 	c := SchedulerCycle{
 | |
| 		wg:       sync.WaitGroup{},
 | |
| 		ctx:      ctxChild,
 | |
| 		fnCancel: fnCancel,
 | |
| 		interval: interval,
 | |
| 		tasks:    make(map[uuid.UUID]*task),
 | |
| 		chTasks:  make(chan *task),
 | |
| 	}
 | |
| 
 | |
| 	c.run(workers)
 | |
| 
 | |
| 	return &c
 | |
| }
 | |
| 
 | |
| func (c *SchedulerCycle) backoff(j *task) {
 | |
| 	backoff := c.interval + time.Duration(math.Pow(ExponentialFactor, float64(j.attempts.Load())))
 | |
| 
 | |
| 	j.timer.set(
 | |
| 		time.AfterFunc(backoff, func() {
 | |
| 			select {
 | |
| 			case c.chTasks <- j:
 | |
| 			default:
 | |
| 				log.Error().Str("job id", j.GetID().String()).Msg("unable to execute job to a worker")
 | |
| 				c.backoff(j)
 | |
| 			}
 | |
| 		}),
 | |
| 	)
 | |
| }
 | |
| 
 | |
| // exec runs the job now or if all the workers are in use, delayed it.
 | |
| func (c *SchedulerCycle) exec(j *task) {
 | |
| 	select {
 | |
| 	case c.chTasks <- j:
 | |
| 	default:
 | |
| 		log.Error().Str("job id", j.GetID().String()).Msg("unable to execute job to a worker now, delayed it")
 | |
| 		c.backoff(j)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (c *SchedulerCycle) getTask(id uuid.UUID) *task {
 | |
| 	c.l.RLock()
 | |
| 	defer c.l.RUnlock()
 | |
| 
 | |
| 	j, ok := c.tasks[id]
 | |
| 	if !ok {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	return j
 | |
| }
 | |
| 
 | |
| // run launches a number of worker to execute job.
 | |
| // If job returns `ErrJobNotCompletedYet`, it re-schedules with a backoff.
 | |
| func (c *SchedulerCycle) run(n uint32) {
 | |
| 	for i := 0; i < int(n); i++ {
 | |
| 		c.wg.Add(1)
 | |
| 		go func() {
 | |
| 			defer c.wg.Done()
 | |
| 			for {
 | |
| 				select {
 | |
| 				case j := <-c.chTasks:
 | |
| 					c.executeJob(j, c.backoff)
 | |
| 				case <-c.ctx.Done():
 | |
| 					log.Error().Msg("context done, worker is stopping...")
 | |
| 					return
 | |
| 				}
 | |
| 			}
 | |
| 		}()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (c *SchedulerCycle) executeJob(j *task, fnFallBack func(*task)) {
 | |
| 	j.run(c.ctx)
 | |
| 	if j.GetState() == job.Pending {
 | |
| 		fnFallBack(j)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (c *SchedulerCycle) Stop() {
 | |
| 	c.fnCancel()
 | |
| }
 | |
| 
 | |
| func (c *SchedulerCycle) Done() <-chan struct{} {
 | |
| 	done := make(chan struct{})
 | |
| 	go func() {
 | |
| 		<-c.ctx.Done()
 | |
| 		c.wg.Wait()
 | |
| 		done <- struct{}{}
 | |
| 	}()
 | |
| 	return done
 | |
| }
 | |
| 
 | |
| func (c *SchedulerCycle) Len() int {
 | |
| 	c.l.RLock()
 | |
| 	defer c.l.RUnlock()
 | |
| 
 | |
| 	return len(c.tasks)
 | |
| }
 | |
| 
 | |
| // HasAllJobsDone checks whether all the jobs has been executed.
 | |
| func (c *SchedulerCycle) HasAllJobsDone() bool {
 | |
| 	c.l.RLock()
 | |
| 	defer c.l.RUnlock()
 | |
| 
 | |
| 	for _, t := range c.tasks {
 | |
| 		if t.GetState() == job.Pending || t.GetState() == job.Running {
 | |
| 			return false
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| func (c *SchedulerCycle) GetJobsDetails() []TaskDetails {
 | |
| 	c.l.RLock()
 | |
| 	defer c.l.RUnlock()
 | |
| 
 | |
| 	details := []TaskDetails{}
 | |
| 	for _, t := range c.tasks {
 | |
| 		details = append(details, t.getDetails())
 | |
| 	}
 | |
| 
 | |
| 	return details
 | |
| }
 | |
| 
 | |
| // Delay builds a task and add it to the scheduler engine.
 | |
| func (c *SchedulerCycle) Delay(fnJob job.FnJob) uuid.UUID {
 | |
| 	select {
 | |
| 	case <-c.Done():
 | |
| 		log.Error().Msg("context done unable to add new job")
 | |
| 	default:
 | |
| 	}
 | |
| 
 | |
| 	j := newTask(fnJob)
 | |
| 
 | |
| 	c.l.Lock()
 | |
| 	defer c.l.Unlock()
 | |
| 
 | |
| 	c.tasks[j.GetID()] = j
 | |
| 
 | |
| 	c.exec(j)
 | |
| 
 | |
| 	log.Info().Str("job", j.GetID().String()).Msg("job added successfully")
 | |
| 	return j.GetID()
 | |
| }
 | |
| 
 | |
| // Abort aborts the job given by its id if it exists.
 | |
| func (c *SchedulerCycle) Abort(id uuid.UUID) bool {
 | |
| 	if j := c.getTask(id); j != nil {
 | |
| 		j.abort()
 | |
| 
 | |
| 		log.Info().Str("job", j.GetID().String()).Msg("abort job done")
 | |
| 		return true
 | |
| 	}
 | |
| 
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| // GetJobDetails returns the task details by id.
 | |
| func (c *SchedulerCycle) GetJobDetails(id uuid.UUID) TaskDetails {
 | |
| 	c.l.RLock()
 | |
| 	defer c.l.RUnlock()
 | |
| 
 | |
| 	j, ok := c.tasks[id]
 | |
| 	if !ok {
 | |
| 		return TaskDetails{
 | |
| 			JobDetails: job.JobDetails{
 | |
| 				State: job.UnknownState,
 | |
| 			},
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return j.getDetails()
 | |
| }
 | 
