Compare commits
	
		
			13 Commits
		
	
	
		
			feat/slots
			...
			main
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|   | d5519cc064 | ||
|   | f2a0830ca1 | ||
|   | 63210639ea | ||
|   | 4a39ba5547 | ||
|   | 2a0ecf2d4e | ||
|   | 3f1afb63d4 | ||
|   | df9a0ffbbc | ||
|   | ec53ec990a | ||
|   | 152c4f925a | ||
|   | 3cbea438f6 | ||
|   | b6df47ed0c | ||
|   | 0cd4f264a3 | ||
|   | 4da553d156 | 
							
								
								
									
										81
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										81
									
								
								README.md
									
									
									
									
									
								
							| @ -1,48 +1,47 @@ | ||||
| # cycle-scheduler | ||||
| 
 | ||||
| cycle-scheduler is a simple scheduler handling jobs and executes them at regular interval. | ||||
| cycle-scheduler is a simple scheduler lib, handling tasks and executes them at regular interval. If a task is not in desired state, the task is re-scheduled. | ||||
| 
 | ||||
| Here a simple representation: | ||||
| ```ascii | ||||
| +------------------------------------------------------+ | ||||
| | +---+ +---+ +---+ +---+ +---+                  +---+ | | ||||
| | |   | |   | |   | |   | |   |                  |   | | | ||||
| | |   | |   | |   | |   | |   |                  |   | | | ||||
| | |   | |   | |   | |   | |   |                  |   | | | ||||
| | |   | |   | |   | |   | |   |                  |   | | | ||||
| | |   | |   | |   | |   | |   |                  |   | | | ||||
| | |   | |   | |   | |   | |   |                  |   | | | ||||
| | |s1 | |s2 | |s3 | |s4 | |   |                  |s60| | | ||||
| | +---+ +---+ +---+ +---+ +---+                  +---+ | | ||||
| +---------------^--------------------------------------+ | ||||
| ``` | ||||
| Jobs are handle in a array of job slices. | ||||
| **NOTE**: this should be not used for long-running tasks, it's more suitable for shorts tasks like polling etc... | ||||
| 
 | ||||
| At each interval (clock), the cursor `^` moves to the next slot (s*). | ||||
| If there are jobs, they are sent to workers to be executed | ||||
| and the slot is cleaned. | ||||
| At the end of the slot (s60), the cursor re-starts a new cycle from s1. | ||||
| 
 | ||||
| If a job is not in a desire state, the job is re-scheduled in the current slot to be re-executed in the next cycle. | ||||
| 
 | ||||
| **NOTE**: This scheduler does not accept long running tasks. Job execution have a fixed timeout of 10s. | ||||
| Pooling tasks are more suitable for this kind of scheduler. | ||||
| 
 | ||||
| ## Run | ||||
| You can run sample tests from `main.go` to see the scheduler in action: | ||||
| ```bash | ||||
| make run | ||||
| ``` | ||||
| If all goes well, you should see this kind of output in the stdout: | ||||
| ```ascii | ||||
| # cycle-scheduler (slot: 7) | ||||
| _ P _ _ _ _ _ _ _ _ _ _ _ _ | ||||
| - - - - - - ^ - - - - - - -  | ||||
| ``` | ||||
| > **P** means *pending* state | ||||
| 
 | ||||
| You can adjust the clock interval as needed in `main.go`: | ||||
| ## Examples | ||||
| * Init a new scheduler with 4 workers | ||||
| ```go | ||||
| interval := 200 * time.Millisecond | ||||
| package main | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"time" | ||||
| 
 | ||||
| 	scheduler "gitea.thegux.fr/rmanach/cycle-scheduler.git" | ||||
| ) | ||||
| 
 | ||||
| func main() { | ||||
| 	ctx, fnCancel := context.WithCancel(context.Background()) | ||||
| 	s := scheduler.NewSchedulerCycle(ctx, 4) | ||||
| 
 | ||||
| 	// add a task with an execution interval of 2 ms (executed every 2 ms) | ||||
| 	// and a maximum duration of 30 second. | ||||
| 	s.Delay( | ||||
| 		func(ctx context.Context) (any, error) { | ||||
| 			// ... | ||||
| 			return nil, nil | ||||
| 		}, | ||||
| 		scheduler.WithExecInterval(2*time.Millisecond), | ||||
| 		scheduler.WithMaxDuration(30*time.Second), | ||||
| 	) | ||||
| 
 | ||||
| 	// stop the program after 5 seconds | ||||
| 	go func() { | ||||
| 		time.Sleep(5 * time.Second) | ||||
| 		fnCancel() | ||||
| 	}() | ||||
| 
 | ||||
| 	<-ctx.Done() | ||||
| 	<-s.Done() | ||||
| } | ||||
| ``` | ||||
| 
 | ||||
| **NOTE**: for `Delay` optionals arguments, check the `NewTask` method documentation for more details. | ||||
| 
 | ||||
| 
 | ||||
|  | ||||
							
								
								
									
										2
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.mod
									
									
									
									
									
								
							| @ -1,4 +1,4 @@ | ||||
| module cycle-scheduler | ||||
| module gitea.thegux.fr/rmanach/cycle-scheduler.git | ||||
| 
 | ||||
| go 1.22.4 | ||||
| 
 | ||||
|  | ||||
| @ -1,166 +0,0 @@ | ||||
| package job | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"errors" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/google/uuid" | ||||
| 	"github.com/rs/zerolog/log" | ||||
| ) | ||||
| 
 | ||||
| type State int | ||||
| 
 | ||||
| const ( | ||||
| 	Pending State = iota | ||||
| 	Running | ||||
| 	Success | ||||
| 	Failed | ||||
| 	Abort | ||||
| 	Unknown | ||||
| ) | ||||
| 
 | ||||
| const UnknownState = "unknown" | ||||
| 
 | ||||
| var JobExecTimeout = 10 * time.Second | ||||
| 
 | ||||
| func (s State) String() string { | ||||
| 	switch s { | ||||
| 	case Pending: | ||||
| 		return "pending" | ||||
| 	case Running: | ||||
| 		return "running" | ||||
| 	case Success: | ||||
| 		return "success" | ||||
| 	case Failed: | ||||
| 		return "failed" | ||||
| 	case Abort: | ||||
| 		return "abort" | ||||
| 	case Unknown: | ||||
| 		return UnknownState | ||||
| 	default: | ||||
| 		return UnknownState | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| var ( | ||||
| 	ErrJobAborted         = errors.New("job has been aborted") | ||||
| 	ErrJobNotCompletedYet = errors.New("job is not right state, retrying") | ||||
| ) | ||||
| 
 | ||||
| type FnJob func(ctx context.Context) error | ||||
| 
 | ||||
| type JobDetails struct { | ||||
| 	ID        uuid.UUID  `json:"id"` | ||||
| 	State     string     `json:"state"` | ||||
| 	CreatedAt time.Time  `json:"createdAt"` | ||||
| 	UpdatedAt *time.Time `json:"updatedAt,omitempty"` | ||||
| 	Err       string     `json:"error"` | ||||
| } | ||||
| 
 | ||||
| // TODO(rmanach): add priority level | ||||
| type Job struct { | ||||
| 	l         sync.RWMutex | ||||
| 	id        uuid.UUID | ||||
| 	createdAt time.Time | ||||
| 	updatedAt *time.Time | ||||
| 	state     State | ||||
| 	task      FnJob | ||||
| 	err       error | ||||
| 	chAbort   chan struct{} | ||||
| } | ||||
| 
 | ||||
| func NewJob(task FnJob, row, col int) Job { | ||||
| 	return Job{ | ||||
| 		id:        uuid.New(), | ||||
| 		createdAt: time.Now().UTC(), | ||||
| 		state:     Pending, | ||||
| 		task:      task, | ||||
| 		chAbort:   make(chan struct{}, 1), | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (j *Job) IntoDetails() JobDetails { | ||||
| 	j.l.RLock() | ||||
| 	defer j.l.RUnlock() | ||||
| 
 | ||||
| 	jd := JobDetails{ | ||||
| 		ID:        j.id, | ||||
| 		CreatedAt: j.createdAt, | ||||
| 		State:     j.state.String(), | ||||
| 	} | ||||
| 
 | ||||
| 	if err := j.err; err != nil { | ||||
| 		jd.Err = err.Error() | ||||
| 	} | ||||
| 
 | ||||
| 	if ut := j.updatedAt; ut != nil { | ||||
| 		jd.UpdatedAt = ut | ||||
| 	} | ||||
| 
 | ||||
| 	return jd | ||||
| } | ||||
| 
 | ||||
| func (j *Job) GetID() uuid.UUID { | ||||
| 	return j.id | ||||
| } | ||||
| 
 | ||||
| func (j *Job) GetState() State { | ||||
| 	j.l.RLock() | ||||
| 	defer j.l.RUnlock() | ||||
| 
 | ||||
| 	return j.state | ||||
| } | ||||
| 
 | ||||
| func (j *Job) setState(s State) { | ||||
| 	j.l.Lock() | ||||
| 	defer j.l.Unlock() | ||||
| 
 | ||||
| 	now := time.Now().UTC() | ||||
| 	j.updatedAt = &now | ||||
| 
 | ||||
| 	j.state = s | ||||
| } | ||||
| 
 | ||||
| func (j *Job) setFail(err error) { | ||||
| 	j.l.Lock() | ||||
| 	defer j.l.Unlock() | ||||
| 
 | ||||
| 	now := time.Now().UTC() | ||||
| 	j.updatedAt = &now | ||||
| 
 | ||||
| 	j.state = Failed | ||||
| 	j.err = err | ||||
| } | ||||
| 
 | ||||
| func (j *Job) Abort() { | ||||
| 	j.setState(Abort) | ||||
| 	j.chAbort <- struct{}{} | ||||
| } | ||||
| 
 | ||||
| func (j *Job) Run(ctx context.Context) { | ||||
| 	ctxExec, fnCancel := context.WithTimeout(ctx, JobExecTimeout) | ||||
| 	defer fnCancel() | ||||
| 
 | ||||
| 	j.setState(Running) | ||||
| 
 | ||||
| 	log.Info().Str("job", j.GetID().String()).Msg("job running...") | ||||
| 
 | ||||
| 	go func() { | ||||
| 		for range j.chAbort { | ||||
| 			j.setState(Abort) | ||||
| 			fnCancel() | ||||
| 		} | ||||
| 	}() | ||||
| 
 | ||||
| 	if err := j.task(ctxExec); err != nil { | ||||
| 		if errors.Is(err, ErrJobNotCompletedYet) { | ||||
| 			j.setState(Pending) | ||||
| 			return | ||||
| 		} | ||||
| 		j.setFail(err) | ||||
| 		return | ||||
| 	} | ||||
| 	j.setState(Success) | ||||
| } | ||||
| @ -1,371 +0,0 @@ | ||||
| package scheduler | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"cycle-scheduler/internal/job" | ||||
| 	"fmt" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/google/uuid" | ||||
| 	"github.com/rs/zerolog/log" | ||||
| ) | ||||
| 
 | ||||
| const ( | ||||
| 	TableTitle  = "# cycle-scheduler" | ||||
| 	Cursor      = "^" | ||||
| 	CycleLength = 60 | ||||
| 	MaxWorkers  = 5 | ||||
| ) | ||||
| 
 | ||||
| const MaxSlotsIdx = 59 | ||||
| 
 | ||||
| type JobSlot struct { | ||||
| 	*job.Job | ||||
| 	row int | ||||
| } | ||||
| 
 | ||||
| // SchedulerCycle is a dumb scheduler. | ||||
| // It handle job and executes it at each cycle (60 * interval). | ||||
| // | ||||
| // Jobs are handle in a array of job slices. | ||||
| // At each interval (clock), the cursor moves to the next slot (s*). | ||||
| // If there are jobs, they are sent to workers to be executed | ||||
| // and the slot is cleaned. | ||||
| // | ||||
| // At the end of the slot (s60), the cursor re-starts a cycle at s1. | ||||
| type SchedulerCycle struct { | ||||
| 	l  sync.RWMutex | ||||
| 	wg sync.WaitGroup | ||||
| 
 | ||||
| 	ctx      context.Context | ||||
| 	fnCancel context.CancelFunc | ||||
| 
 | ||||
| 	interval    time.Duration | ||||
| 	currentSlot int | ||||
| 	slots       [60][]*job.Job | ||||
| 	jobs        map[uuid.UUID]*job.Job | ||||
| 
 | ||||
| 	chJobs chan *JobSlot | ||||
| } | ||||
| 
 | ||||
| func NewSchedulerCycle(ctx context.Context, interval time.Duration) *SchedulerCycle { | ||||
| 	ctxChild, fnCancel := context.WithCancel(ctx) | ||||
| 
 | ||||
| 	c := SchedulerCycle{ | ||||
| 		wg:          sync.WaitGroup{}, | ||||
| 		ctx:         ctxChild, | ||||
| 		fnCancel:    fnCancel, | ||||
| 		interval:    interval, | ||||
| 		currentSlot: 0, | ||||
| 		slots:       [60][]*job.Job{}, | ||||
| 		jobs:        make(map[uuid.UUID]*job.Job), | ||||
| 		chJobs:      make(chan *JobSlot), | ||||
| 	} | ||||
| 
 | ||||
| 	c.run() | ||||
| 
 | ||||
| 	return &c | ||||
| } | ||||
| 
 | ||||
| func (c *SchedulerCycle) Stop() { | ||||
| 	c.fnCancel() | ||||
| } | ||||
| 
 | ||||
| func (c *SchedulerCycle) Done() <-chan struct{} { | ||||
| 	done := make(chan struct{}) | ||||
| 	go func() { | ||||
| 		<-c.ctx.Done() | ||||
| 		c.wg.Done() | ||||
| 		done <- struct{}{} | ||||
| 	}() | ||||
| 	return done | ||||
| } | ||||
| 
 | ||||
| func (c *SchedulerCycle) Len() int { | ||||
| 	c.l.Lock() | ||||
| 	defer c.l.Unlock() | ||||
| 
 | ||||
| 	return len(c.jobs) | ||||
| } | ||||
| 
 | ||||
| func (c *SchedulerCycle) HasAllJobsDone() bool { | ||||
| 	c.l.Lock() | ||||
| 	defer c.l.Unlock() | ||||
| 
 | ||||
| 	for _, j := range c.jobs { | ||||
| 		if j.GetState() == job.Pending || j.GetState() == job.Running { | ||||
| 			return false | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return true | ||||
| } | ||||
| 
 | ||||
| func (c *SchedulerCycle) GetJobsDetails() []job.JobDetails { | ||||
| 	c.l.Lock() | ||||
| 	defer c.l.Unlock() | ||||
| 
 | ||||
| 	details := []job.JobDetails{} | ||||
| 	for _, j := range c.jobs { | ||||
| 		details = append(details, j.IntoDetails()) | ||||
| 	} | ||||
| 
 | ||||
| 	return details | ||||
| } | ||||
| 
 | ||||
| // Delay builds a job and add it to the scheduler engine. | ||||
| func (c *SchedulerCycle) Delay(fnJob job.FnJob) uuid.UUID { | ||||
| 	c.l.Lock() | ||||
| 	defer c.l.Unlock() | ||||
| 
 | ||||
| 	nextSlot := c.currentSlot + 1 | ||||
| 	if nextSlot > MaxSlotsIdx { | ||||
| 		nextSlot = 0 | ||||
| 	} | ||||
| 
 | ||||
| 	j := job.NewJob(fnJob, nextSlot, len(c.slots[nextSlot])) | ||||
| 
 | ||||
| 	c.slots[nextSlot] = append(c.slots[nextSlot], &j) | ||||
| 	c.jobs[j.GetID()] = &j | ||||
| 
 | ||||
| 	log.Info().Str("job", j.GetID().String()).Msg("job added successfully") | ||||
| 	return j.GetID() | ||||
| } | ||||
| 
 | ||||
| // Abort aborts the job given by its id if it exists.. | ||||
| func (c *SchedulerCycle) Abort(id uuid.UUID) bool { | ||||
| 	if j := c.getJob(id); j != nil { | ||||
| 		j.Abort() | ||||
| 
 | ||||
| 		log.Info().Str("job", j.GetID().String()).Msg("abort job done") | ||||
| 		return true | ||||
| 	} | ||||
| 
 | ||||
| 	return false | ||||
| } | ||||
| 
 | ||||
| // GetJobDetails returns the job details by . | ||||
| func (c *SchedulerCycle) GetJobDetails(id uuid.UUID) job.JobDetails { | ||||
| 	c.l.Lock() | ||||
| 	defer c.l.Unlock() | ||||
| 
 | ||||
| 	j, ok := c.jobs[id] | ||||
| 	if !ok { | ||||
| 		return job.JobDetails{ | ||||
| 			State: job.Unknown.String(), | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return j.IntoDetails() | ||||
| } | ||||
| 
 | ||||
| // Display outputs earch interval the scheduler state. | ||||
| func (c *SchedulerCycle) Display() { | ||||
| 	ticker := time.NewTicker(c.interval) | ||||
| 	go func() { | ||||
| 		for range ticker.C { | ||||
| 			c.display() | ||||
| 		} | ||||
| 	}() | ||||
| } | ||||
| 
 | ||||
| // display writes to stdout the state of the scheduler as a table. | ||||
| func (c *SchedulerCycle) display() { //nolint:gocyclo // not complex | ||||
| 	c.l.RLock() | ||||
| 	defer c.l.RUnlock() | ||||
| 
 | ||||
| 	var maxCols int | ||||
| 	for i := range c.slots { | ||||
| 		if l := len(c.slots[i]); l > maxCols { | ||||
| 			maxCols = l | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	table := [][]string{} | ||||
| 	title := fmt.Sprintf("%s (slot: %d)", TableTitle, c.currentSlot+1) | ||||
| 	table = append(table, []string{title}) | ||||
| 	for { | ||||
| 		if maxCols == 0 { | ||||
| 			break | ||||
| 		} | ||||
| 
 | ||||
| 		row := make([]string, CycleLength) | ||||
| 		for i := 0; i <= MaxSlotsIdx; i++ { | ||||
| 			row[i] = "_" | ||||
| 		} | ||||
| 
 | ||||
| 		for i := range c.slots { | ||||
| 			if len(c.slots[i]) < maxCols { | ||||
| 				continue | ||||
| 			} | ||||
| 
 | ||||
| 			j := c.slots[i][maxCols-1] | ||||
| 			switch j.GetState() { | ||||
| 			case job.Pending: | ||||
| 				row[i] = "P" | ||||
| 			case job.Running: | ||||
| 				row[i] = "R" | ||||
| 			case job.Failed: | ||||
| 				row[i] = "X" | ||||
| 			case job.Abort: | ||||
| 				row[i] = "A" | ||||
| 			case job.Unknown: | ||||
| 				row[i] = "?" | ||||
| 			case job.Success: | ||||
| 				row[i] = "O" | ||||
| 			} | ||||
| 		} | ||||
| 
 | ||||
| 		table = append(table, row) | ||||
| 		maxCols-- | ||||
| 	} | ||||
| 
 | ||||
| 	row := make([]string, CycleLength) | ||||
| 	for i := 0; i <= MaxSlotsIdx; i++ { | ||||
| 		row[i] = "-" | ||||
| 	} | ||||
| 	table = append(table, row) | ||||
| 
 | ||||
| 	if l := len(table); l > 0 { | ||||
| 		table[l-1][c.currentSlot] = Cursor | ||||
| 	} | ||||
| 
 | ||||
| 	tableFormat := "" | ||||
| 	for _, r := range table { | ||||
| 		tableFormat += strings.Join(r, " ") | ||||
| 		tableFormat += "\n" | ||||
| 	} | ||||
| 
 | ||||
| 	fmt.Println(tableFormat) | ||||
| } | ||||
| 
 | ||||
| func (c *SchedulerCycle) getJob(id uuid.UUID) *job.Job { | ||||
| 	c.l.RLock() | ||||
| 	defer c.l.RUnlock() | ||||
| 
 | ||||
| 	j, ok := c.jobs[id] | ||||
| 	if !ok { | ||||
| 		return nil | ||||
| 	} | ||||
| 
 | ||||
| 	return j | ||||
| } | ||||
| 
 | ||||
| // getCurrentSlotJobs collects all the current slot jobs | ||||
| // and clean the slot. | ||||
| func (c *SchedulerCycle) getCurrentSlotJobs() (int, []*job.Job) { | ||||
| 	c.l.Lock() | ||||
| 	defer c.l.Unlock() | ||||
| 
 | ||||
| 	jobs := c.slots[c.currentSlot] | ||||
| 
 | ||||
| 	c.slots[c.currentSlot] = []*job.Job{} | ||||
| 
 | ||||
| 	return c.currentSlot, jobs | ||||
| } | ||||
| 
 | ||||
| // updateSlot add a job to the slot where it was before. | ||||
| func (c *SchedulerCycle) updateSlot(row int, j *job.Job) { | ||||
| 	c.l.Lock() | ||||
| 	defer c.l.Unlock() | ||||
| 
 | ||||
| 	c.slots[row] = append(c.slots[row], j) | ||||
| } | ||||
| 
 | ||||
| // updateCurrentSlot add a job to the current slot. | ||||
| func (c *SchedulerCycle) updateCurrentSlot(j *job.Job) { | ||||
| 	c.l.Lock() | ||||
| 	defer c.l.Unlock() | ||||
| 
 | ||||
| 	c.slots[c.currentSlot] = append(c.slots[c.currentSlot], j) | ||||
| } | ||||
| 
 | ||||
| // incr increments the slot cursor. | ||||
| // It the cursor reaches `MaxSlotsIdx`, it goes back to 0. | ||||
| func (c *SchedulerCycle) incr() { | ||||
| 	c.l.Lock() | ||||
| 	defer c.l.Unlock() | ||||
| 
 | ||||
| 	nextSlot := c.currentSlot + 1 | ||||
| 	if nextSlot > MaxSlotsIdx { | ||||
| 		nextSlot = 0 | ||||
| 	} | ||||
| 
 | ||||
| 	c.currentSlot = nextSlot | ||||
| } | ||||
| 
 | ||||
| // dispatch gets jobs from the current slot, resets the slot | ||||
| // and dispatch all jobs to the workers. | ||||
| // | ||||
| // It all the workers are busy, the jobs are re-schedule in the same slot | ||||
| // to be executed in the next cycle. | ||||
| func (c *SchedulerCycle) dispatch() { | ||||
| 	row, jobs := c.getCurrentSlotJobs() | ||||
| 	for _, j := range jobs { | ||||
| 		if j.GetState() == job.Abort { | ||||
| 			continue | ||||
| 		} | ||||
| 
 | ||||
| 		select { | ||||
| 		case c.chJobs <- &JobSlot{row: row, Job: j}: | ||||
| 		default: | ||||
| 			log.Warn().Msg("unable to put job in workers, trying next cycle") | ||||
| 			c.updateSlot(row, j) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // run launches the workers and the ticker. | ||||
| func (c *SchedulerCycle) run() { | ||||
| 	c.workers() | ||||
| 	c.tick() | ||||
| } | ||||
| 
 | ||||
| // workers launches `MaxWorkers` number of worker to execute job. | ||||
| // If job returns `ErrJobNotCompletedYet`, it re-schedules in the same slot. | ||||
| func (c *SchedulerCycle) workers() { | ||||
| 	for i := 0; i < MaxWorkers; i++ { | ||||
| 		c.wg.Add(1) | ||||
| 		go func() { | ||||
| 			defer c.wg.Done() | ||||
| 			for { | ||||
| 				select { | ||||
| 				case j := <-c.chJobs: | ||||
| 					c.executeJob(j.Job, c.updateCurrentSlot) | ||||
| 				case <-c.ctx.Done(): | ||||
| 					log.Error().Msg("context done, worker is stopping...") | ||||
| 					return | ||||
| 				} | ||||
| 			} | ||||
| 		}() | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (c *SchedulerCycle) executeJob(j *job.Job, fnFallBack func(*job.Job)) { | ||||
| 	j.Run(c.ctx) | ||||
| 	if j.GetState() == job.Pending { | ||||
| 		fnFallBack(j) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // tick is a simple ticker incrementing at each scheduler interval, | ||||
| // the slot cursor and dispatch jobs to the workers. | ||||
| func (c *SchedulerCycle) tick() { | ||||
| 	c.wg.Add(1) | ||||
| 	go func() { | ||||
| 		defer c.wg.Done() | ||||
| 		for { | ||||
| 			select { | ||||
| 			case <-c.ctx.Done(): | ||||
| 				log.Error().Msg("context done, ticker is stopping...") | ||||
| 				return | ||||
| 			default: | ||||
| 				time.Sleep(c.interval) | ||||
| 				c.incr() | ||||
| 				c.dispatch() | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
| } | ||||
| @ -1,33 +0,0 @@ | ||||
| package scheduler | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"cycle-scheduler/internal/job" | ||||
| 	"errors" | ||||
| 	"testing" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/stretchr/testify/assert" | ||||
| ) | ||||
| 
 | ||||
| func TestSlot(t *testing.T) { | ||||
| 	ctx, fnCancel := context.WithCancel(context.Background()) | ||||
| 	defer fnCancel() | ||||
| 
 | ||||
| 	s := NewSchedulerCycle(ctx, 1*time.Millisecond) | ||||
| 
 | ||||
| 	s.Delay(func(ctx context.Context) error { | ||||
| 		return nil | ||||
| 	}) | ||||
| 	s.Delay(func(ctx context.Context) error { | ||||
| 		return job.ErrJobNotCompletedYet | ||||
| 	}) | ||||
| 	j3 := s.Delay(func(ctx context.Context) error { | ||||
| 		return errors.New("errors") | ||||
| 	}) | ||||
| 
 | ||||
| 	time.Sleep(2 * time.Millisecond) | ||||
| 
 | ||||
| 	assert.Equal(t, 3, s.Len()) | ||||
| 	assert.Equal(t, job.Failed.String(), s.GetJobDetails(j3).State) | ||||
| } | ||||
							
								
								
									
										122
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										122
									
								
								main.go
									
									
									
									
									
								
							| @ -1,122 +0,0 @@ | ||||
| package main | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"cycle-scheduler/internal/job" | ||||
| 	"cycle-scheduler/internal/scheduler" | ||||
| 	"encoding/json" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"math/rand/v2" | ||||
| 	"os" | ||||
| 	"os/signal" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/rs/zerolog" | ||||
| 	"github.com/rs/zerolog/log" | ||||
| ) | ||||
| 
 | ||||
| func initLogger() { | ||||
| 	zerolog.TimeFieldFormat = zerolog.TimeFormatUnix | ||||
| 	log.Logger = log.With().Caller().Logger().Output(zerolog.ConsoleWriter{Out: os.Stderr}) | ||||
| } | ||||
| 
 | ||||
| func main() { | ||||
| 	initLogger() | ||||
| 
 | ||||
| 	ctx, stop := signal.NotifyContext( | ||||
| 		context.Background(), | ||||
| 		os.Interrupt, | ||||
| 		os.Kill, | ||||
| 	) | ||||
| 	defer stop() | ||||
| 
 | ||||
| 	interval := 200 * time.Millisecond | ||||
| 	s := scheduler.NewSchedulerCycle(ctx, interval) | ||||
| 	s.Display() | ||||
| 
 | ||||
| 	// pending test | ||||
| 	for i := 0; i < 20; i++ { | ||||
| 		go func(i int) { | ||||
| 			time.Sleep(time.Duration(i) * time.Second) | ||||
| 			s.Delay(func(ctx context.Context) error { | ||||
| 				time.Sleep(4 * time.Second) //nolint:mnd // test purpose | ||||
| 				if rand.IntN(10)%2 == 0 {   //nolint:gosec,mnd // test prupose | ||||
| 					return job.ErrJobNotCompletedYet | ||||
| 				} | ||||
| 				return nil | ||||
| 			}) | ||||
| 		}(i) | ||||
| 	} | ||||
| 
 | ||||
| 	// abort test | ||||
| 	j := s.Delay(func(ctx context.Context) error { | ||||
| 		time.Sleep(4 * time.Second) //nolint:mnd // test purpose | ||||
| 
 | ||||
| 		select { | ||||
| 		case <-ctx.Done(): | ||||
| 			return ctx.Err() | ||||
| 		default: | ||||
| 		} | ||||
| 
 | ||||
| 		return job.ErrJobNotCompletedYet | ||||
| 	}) | ||||
| 	go func() { | ||||
| 		time.Sleep(2 * time.Second) //nolint:mnd // test purpose | ||||
| 		s.Abort(j) | ||||
| 	}() | ||||
| 
 | ||||
| 	// abort test 2 | ||||
| 	j2 := s.Delay(func(ctx context.Context) error { | ||||
| 		time.Sleep(time.Second) | ||||
| 
 | ||||
| 		select { | ||||
| 		case <-ctx.Done(): | ||||
| 			return ctx.Err() | ||||
| 		default: | ||||
| 		} | ||||
| 
 | ||||
| 		return job.ErrJobNotCompletedYet | ||||
| 	}) | ||||
| 	go func() { | ||||
| 		time.Sleep(10 * time.Second) //nolint:mnd // test purpose | ||||
| 		s.Abort(j2) | ||||
| 	}() | ||||
| 
 | ||||
| 	// error test | ||||
| 	s.Delay(func(ctx context.Context) error { | ||||
| 		time.Sleep(5 * time.Second) //nolint:mnd // test purpose | ||||
| 		return errors.New("err") | ||||
| 	}) | ||||
| 
 | ||||
| 	// success test | ||||
| 	go func() { | ||||
| 		time.Sleep(10 * time.Second) //nolint:mnd // test purpose | ||||
| 		s.Delay(func(ctx context.Context) error { | ||||
| 			time.Sleep(5 * time.Second) //nolint:mnd // test purpose | ||||
| 			return nil | ||||
| 		}) | ||||
| 	}() | ||||
| 
 | ||||
| 	go func() { | ||||
| 		for { | ||||
| 			time.Sleep(2 * time.Second) //nolint:mnd // test purpose | ||||
| 			if s.HasAllJobsDone() { | ||||
| 				s.Stop() | ||||
| 				return | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
| 
 | ||||
| 	<-s.Done() | ||||
| 
 | ||||
| 	jds := s.GetJobsDetails() | ||||
| 	for _, jd := range jds { | ||||
| 		c, err := json.Marshal(&jd) | ||||
| 		if err != nil { | ||||
| 			log.Err(err).Str("job", jd.ID.String()).Msg("unable to parse job details into JSON") | ||||
| 			continue | ||||
| 		} | ||||
| 		fmt.Println(string(c)) | ||||
| 	} | ||||
| } | ||||
							
								
								
									
										213
									
								
								scheduler.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										213
									
								
								scheduler.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,213 @@ | ||||
| package scheduler | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/google/uuid" | ||||
| 
 | ||||
| 	"github.com/rs/zerolog/log" | ||||
| ) | ||||
| 
 | ||||
| const ( | ||||
| 	ChanLength          = 500 | ||||
| 	DefaultExecInterval = 30 * time.Second | ||||
| ) | ||||
| 
 | ||||
| type IScheduler interface { | ||||
| 	Delay(fnJob FnJob, opts ...TaskOption) uuid.UUID | ||||
| } | ||||
| 
 | ||||
| // SchedulerCycle is a simple scheduler handling jobs and executes them at regular interval. | ||||
| // If a task is not in desired state, the task is re-scheduled. | ||||
| type SchedulerCycle struct { | ||||
| 	wg sync.WaitGroup | ||||
| 
 | ||||
| 	ctx      context.Context | ||||
| 	fnCancel context.CancelFunc | ||||
| 
 | ||||
| 	tasks tasks | ||||
| 
 | ||||
| 	chTasks chan *task | ||||
| 	chDone  chan struct{} | ||||
| } | ||||
| 
 | ||||
| func NewSchedulerCycle(ctx context.Context, workers uint32) *SchedulerCycle { | ||||
| 	ctxChild, fnCancel := context.WithCancel(ctx) | ||||
| 
 | ||||
| 	c := SchedulerCycle{ | ||||
| 		wg:       sync.WaitGroup{}, | ||||
| 		ctx:      ctxChild, | ||||
| 		fnCancel: fnCancel, | ||||
| 		tasks:    newTasks(), | ||||
| 		chTasks:  make(chan *task, ChanLength), | ||||
| 	} | ||||
| 
 | ||||
| 	done := make(chan struct{}) | ||||
| 	go func() { | ||||
| 		<-c.ctx.Done() | ||||
| 		defer c.fnCancel() | ||||
| 		c.wg.Wait() | ||||
| 		c.stop() | ||||
| 		done <- struct{}{} | ||||
| 	}() | ||||
| 	c.chDone = done | ||||
| 
 | ||||
| 	c.run(workers) | ||||
| 
 | ||||
| 	return &c | ||||
| } | ||||
| 
 | ||||
| // delay sets the task timer when the task should be scheduled. | ||||
| func (c *SchedulerCycle) delay(t *task) { | ||||
| 	interval := DefaultExecInterval | ||||
| 	if t.execInterval != nil { | ||||
| 		interval = *t.execInterval | ||||
| 	} | ||||
| 
 | ||||
| 	t.setTimer( | ||||
| 		time.AfterFunc(interval, func() { | ||||
| 			select { | ||||
| 			case c.chTasks <- t: | ||||
| 			default: | ||||
| 				log.Warn().Str("id", t.GetID().String()).Msg("queue is full, can't accept new task, delayed it") | ||||
| 				c.delay(t) | ||||
| 			} | ||||
| 		}), | ||||
| 	) | ||||
| } | ||||
| 
 | ||||
| // exec runs the task now or if all the workers are in use, delayed it. | ||||
| func (c *SchedulerCycle) exec(t *task) { | ||||
| 	select { | ||||
| 	case c.chTasks <- t: | ||||
| 	default: | ||||
| 		log.Warn().Str("id", t.GetID().String()).Msg("queue is full, can't accept new task, delayed it") | ||||
| 		c.delay(t) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (c *SchedulerCycle) getTask(id uuid.UUID) *task { | ||||
| 	return c.tasks.get(id) | ||||
| } | ||||
| 
 | ||||
| // run launches a number of worker to execute tasks. | ||||
| func (c *SchedulerCycle) run(n uint32) { | ||||
| 	for i := 0; i < int(n); i++ { | ||||
| 		c.wg.Add(1) | ||||
| 		go func() { | ||||
| 			defer c.wg.Done() | ||||
| 			for { | ||||
| 				select { | ||||
| 				case t := <-c.chTasks: | ||||
| 					c.execute(t, c.delay) | ||||
| 				case <-c.ctx.Done(): | ||||
| 					log.Error().Msg("context done, worker is stopping...") | ||||
| 					return | ||||
| 				} | ||||
| 			} | ||||
| 		}() | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // execute executes the task. | ||||
| // | ||||
| // It does not handle task error, it's up to the task to implement its own callbacks. | ||||
| // In case of pending state, a callback is executed for actions. For the others states, | ||||
| // the task is deleted from the scheduler. | ||||
| func (c *SchedulerCycle) execute(t *task, fnFallBack func(*task)) { | ||||
| 	t.Run(c.ctx) | ||||
| 
 | ||||
| 	switch t.GetState() { | ||||
| 	case Pending: | ||||
| 		fnFallBack(t) | ||||
| 	case Success, Failed, Abort, Unknown: | ||||
| 		c.tasks.delete(t) | ||||
| 	case Running: | ||||
| 		c.tasks.delete(t) | ||||
| 		log.Debug().Str("id", t.GetID().String()).Msg("weird state (running) after job execution...") | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // stop aborts all tasks and waits until tasks are stopped. | ||||
| // If the process can't be stopped within 10s, too bad... | ||||
| func (c *SchedulerCycle) stop() { | ||||
| 	c.tasks.abort() | ||||
| 
 | ||||
| 	if c.TasksDone() { | ||||
| 		log.Info().Msg("all tasks has been stopped gracefully") | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	ctxTimeout := 10 * time.Second | ||||
| 	ctx, fnCancel := context.WithTimeout(c.ctx, ctxTimeout) | ||||
| 	defer fnCancel() | ||||
| 
 | ||||
| 	for { | ||||
| 		select { | ||||
| 		case <-ctx.Done(): | ||||
| 			log.Error().Msg("stop context done, tasks has been stopped gracefully") | ||||
| 			return | ||||
| 		default: | ||||
| 		} | ||||
| 
 | ||||
| 		if c.TasksDone() { | ||||
| 			log.Info().Msg("all tasks has been stopped gracefully") | ||||
| 			return | ||||
| 		} | ||||
| 
 | ||||
| 		time.Sleep(time.Second) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (c *SchedulerCycle) Done() <-chan struct{} { | ||||
| 	return c.chDone | ||||
| } | ||||
| 
 | ||||
| func (c *SchedulerCycle) Len() int { | ||||
| 	return c.tasks.len() | ||||
| } | ||||
| 
 | ||||
| // TasksDone checks whether all the tasks has been completed. | ||||
| func (c *SchedulerCycle) TasksDone() bool { | ||||
| 	return c.tasks.completed() | ||||
| } | ||||
| 
 | ||||
| func (c *SchedulerCycle) GetTasksDetails() []TaskDetails { | ||||
| 	return c.tasks.getAllDetails() | ||||
| } | ||||
| 
 | ||||
| // GetTaskDetails returns the task details by id. | ||||
| func (c *SchedulerCycle) GetTaskDetails(id uuid.UUID) TaskDetails { | ||||
| 	return c.tasks.getDetails(id) | ||||
| } | ||||
| 
 | ||||
| // Delay builds a task and adds it to the scheduler engine. | ||||
| func (c *SchedulerCycle) Delay(fnJob FnJob, opts ...TaskOption) uuid.UUID { | ||||
| 	select { | ||||
| 	case <-c.Done(): | ||||
| 		log.Error().Msg("context done unable to add new job") | ||||
| 	default: | ||||
| 	} | ||||
| 
 | ||||
| 	t := NewTask(fnJob, opts...) | ||||
| 
 | ||||
| 	c.tasks.add(t) | ||||
| 	c.exec(t) | ||||
| 
 | ||||
| 	log.Info().Str("id", t.GetID().String()).Msg("task added successfully") | ||||
| 	return t.GetID() | ||||
| } | ||||
| 
 | ||||
| // Abort aborts the task given by its id if it exists. | ||||
| func (c *SchedulerCycle) Abort(id uuid.UUID) bool { | ||||
| 	if t := c.getTask(id); t != nil { | ||||
| 		t.Abort() | ||||
| 
 | ||||
| 		log.Info().Str("id", t.GetID().String()).Msg("abort task done") | ||||
| 		return true | ||||
| 	} | ||||
| 
 | ||||
| 	return false | ||||
| } | ||||
							
								
								
									
										93
									
								
								scheduler_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										93
									
								
								scheduler_test.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,93 @@ | ||||
| package scheduler | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"testing" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/stretchr/testify/assert" | ||||
| ) | ||||
| 
 | ||||
| func TestScheduler(t *testing.T) { | ||||
| 	ctx := context.Background() | ||||
| 
 | ||||
| 	var buf int | ||||
| 
 | ||||
| 	s := NewSchedulerCycle(ctx, 1) | ||||
| 	taskID := s.Delay(func(ctx context.Context) (any, error) { | ||||
| 		time.Sleep(50 * time.Millisecond) | ||||
| 		buf += 1 | ||||
| 		return nil, nil | ||||
| 	}, WithExecInterval(2*time.Millisecond)) | ||||
| 
 | ||||
| 	assert.NotEmpty(t, taskID) | ||||
| 	assert.False(t, s.TasksDone()) | ||||
| 
 | ||||
| 	time.Sleep(2 * time.Millisecond) | ||||
| 
 | ||||
| 	details := s.GetTaskDetails(taskID) | ||||
| 	assert.Equal(t, "running", details.State) | ||||
| 	assert.LessOrEqual(t, details.ElapsedTime, 50*time.Millisecond) | ||||
| 
 | ||||
| 	time.Sleep(50 * time.Millisecond) | ||||
| 
 | ||||
| 	assert.True(t, s.TasksDone()) | ||||
| } | ||||
| 
 | ||||
| func TestSchedulerLoad(t *testing.T) { | ||||
| 	ctx := context.Background() | ||||
| 
 | ||||
| 	s := NewSchedulerCycle(ctx, 1) | ||||
| 
 | ||||
| 	for i := 0; i < 500; i++ { | ||||
| 		s.Delay(func(ctx context.Context) (any, error) { | ||||
| 			time.Sleep(1 * time.Millisecond) | ||||
| 			return nil, nil | ||||
| 		}, WithExecInterval(1*time.Millisecond)) | ||||
| 	} | ||||
| 
 | ||||
| 	assert.Eventually(t, func() bool { | ||||
| 		return s.TasksDone() | ||||
| 	}, time.Second, 250*time.Millisecond) | ||||
| } | ||||
| 
 | ||||
| func TestSchedulerExecInterval(t *testing.T) { | ||||
| 	ctx := context.Background() | ||||
| 
 | ||||
| 	s := NewSchedulerCycle(ctx, 1) | ||||
| 
 | ||||
| 	s.Delay( | ||||
| 		func(ctx context.Context) (any, error) { | ||||
| 			return nil, ErrJobNotCompletedYet | ||||
| 		}, | ||||
| 		WithMaxDuration(50*time.Millisecond), | ||||
| 		WithExecInterval(2*time.Millisecond), | ||||
| 	) | ||||
| 
 | ||||
| 	time.Sleep(100 * time.Millisecond) | ||||
| 
 | ||||
| 	assert.True(t, s.TasksDone()) | ||||
| } | ||||
| 
 | ||||
| func TestSchedulerContextDone(t *testing.T) { | ||||
| 	ctx, fnCancel := context.WithCancel(context.Background()) | ||||
| 
 | ||||
| 	s := NewSchedulerCycle(ctx, 1) | ||||
| 
 | ||||
| 	for i := 0; i < 250; i++ { | ||||
| 		s.Delay( | ||||
| 			func(ctx context.Context) (any, error) { | ||||
| 				return nil, ErrJobNotCompletedYet | ||||
| 			}, | ||||
| 			WithMaxDuration(100*time.Millisecond), | ||||
| 			WithExecInterval(2*time.Millisecond), | ||||
| 		) | ||||
| 	} | ||||
| 
 | ||||
| 	go func() { | ||||
| 		time.Sleep(50 * time.Millisecond) | ||||
| 		fnCancel() | ||||
| 	}() | ||||
| 
 | ||||
| 	<-s.Done() | ||||
| } | ||||
							
								
								
									
										451
									
								
								task.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										451
									
								
								task.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,451 @@ | ||||
| package scheduler | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"errors" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/google/uuid" | ||||
| 
 | ||||
| 	"github.com/rs/zerolog/log" | ||||
| ) | ||||
| 
 | ||||
| type State int | ||||
| 
 | ||||
| const ( | ||||
| 	Pending State = iota | ||||
| 	Running | ||||
| 	Success | ||||
| 	Failed | ||||
| 	Abort | ||||
| 	Unknown | ||||
| ) | ||||
| 
 | ||||
| const UnknownState = "unknown" | ||||
| 
 | ||||
| func (s State) String() string { | ||||
| 	return [...]string{"pending", "running", "success", "failed", "abort", "unknown"}[s] | ||||
| } | ||||
| 
 | ||||
| var ( | ||||
| 	ErrJobAborted         = errors.New("job has been aborted") | ||||
| 	ErrJobNotCompletedYet = errors.New("job is not right state, retrying") | ||||
| 	ErrTimeExceeded       = errors.New("time exceeded") | ||||
| 	ErrExecutionEngine    = errors.New("execution engine") | ||||
| ) | ||||
| 
 | ||||
| type FnJob func(ctx context.Context) (any, error) | ||||
| 
 | ||||
| type FnResult func(ctx context.Context, res any) | ||||
| type FnError func(ctx context.Context, err error) | ||||
| 
 | ||||
| type TaskDetails struct { | ||||
| 	State          string            `json:"state"` | ||||
| 	CreatedAt      time.Time         `json:"createdAt"` | ||||
| 	UpdatedAt      *time.Time        `json:"updatedAt,omitempty"` | ||||
| 	MaxDuration    *time.Duration    `json:"maxDuration,omitempty"` | ||||
| 	Attempts       uint32            `json:"attempts"` | ||||
| 	Err            string            `json:"error"` | ||||
| 	ElapsedTime    time.Duration     `json:"elapsedTime"` | ||||
| 	AdditionalInfo map[string]string `json:"additionalInfos"` | ||||
| } | ||||
| 
 | ||||
| func (td *TaskDetails) log() { | ||||
| 	args := []any{} | ||||
| 	args = append(args, | ||||
| 		"state", td.State, | ||||
| 		"createdAt", td.CreatedAt.Format("2006-01-02T15:04:05Z"), | ||||
| 		"updatedAt", td.UpdatedAt.Format("2006-01-02T15:04:05Z"), | ||||
| 		"elapsedTime", td.ElapsedTime.String(), | ||||
| 		"attempts", td.Attempts, | ||||
| 	) | ||||
| 
 | ||||
| 	if td.AdditionalInfo != nil { | ||||
| 		for k, v := range td.AdditionalInfo { | ||||
| 			args = append(args, k, v) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	if td.Err != "" { | ||||
| 		args = append(args, "error", td.Err) | ||||
| 		log.Error().Any("args", args).Msg("task failed") | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	log.Info().Any("args", args).Msg("task execution done") | ||||
| } | ||||
| 
 | ||||
| type task struct { | ||||
| 	l sync.RWMutex | ||||
| 
 | ||||
| 	id        uuid.UUID | ||||
| 	createdAt time.Time | ||||
| 	updatedAt *time.Time | ||||
| 	state     State | ||||
| 
 | ||||
| 	fnJob     FnJob | ||||
| 	fnSuccess FnResult | ||||
| 	fnError   FnError | ||||
| 
 | ||||
| 	attempts     uint32 | ||||
| 	timer        *time.Timer | ||||
| 	maxDuration  *time.Duration | ||||
| 	execTimeout  *time.Duration | ||||
| 	execInterval *time.Duration | ||||
| 
 | ||||
| 	res any | ||||
| 	err error | ||||
| 
 | ||||
| 	additionalInfos map[string]string | ||||
| 
 | ||||
| 	chAbort chan struct{} | ||||
| } | ||||
| 
 | ||||
| type TaskOption func(t *task) | ||||
| 
 | ||||
| func WithMaxDuration(d time.Duration) TaskOption { | ||||
| 	return func(t *task) { | ||||
| 		t.maxDuration = &d | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func WithFnSuccess(f FnResult) TaskOption { | ||||
| 	return func(t *task) { | ||||
| 		t.fnSuccess = f | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func WithFnError(f FnError) TaskOption { | ||||
| 	return func(t *task) { | ||||
| 		t.fnError = f | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func WithExecTimeout(d time.Duration) TaskOption { | ||||
| 	return func(t *task) { | ||||
| 		t.execTimeout = &d | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func WithExecInterval(d time.Duration) TaskOption { | ||||
| 	return func(t *task) { | ||||
| 		t.execInterval = &d | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func WithAdditionalInfos(k, v string) TaskOption { | ||||
| 	return func(t *task) { | ||||
| 		if k == "" || v == "" { | ||||
| 			return | ||||
| 		} | ||||
| 
 | ||||
| 		if t.additionalInfos == nil { | ||||
| 			t.additionalInfos = map[string]string{} | ||||
| 		} | ||||
| 
 | ||||
| 		t.additionalInfos[k] = v | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // NewTask builds a task. | ||||
| // Here the options details that can be set to the task: | ||||
| //   - WithMaxDuration(time.Duration): the task will stop executing if the duration is exceeded (raise ErrTimeExceeded) | ||||
| //   - WithFnSuccess(FnResult): call a function after a success execution | ||||
| //   - WithFnError(FnError): call a function if an error occurred | ||||
| //   - WithExecTimeout(time.Duration): sets a timeout for the task execution | ||||
| //   - WithAdditionalInfos(k, v string): key-value additional informations (does not inferred with the task execution, log purpose) | ||||
| // | ||||
| // Scheduler options: | ||||
| //   - WithExecInterval(time.Duration): specify the execution interval | ||||
| func NewTask(f FnJob, opts ...TaskOption) *task { | ||||
| 	t := task{ | ||||
| 		id:        uuid.New(), | ||||
| 		createdAt: time.Now().UTC(), | ||||
| 		state:     Pending, | ||||
| 		fnJob:     f, | ||||
| 		chAbort:   make(chan struct{}, 1), | ||||
| 	} | ||||
| 
 | ||||
| 	for _, o := range opts { | ||||
| 		o(&t) | ||||
| 	} | ||||
| 
 | ||||
| 	return &t | ||||
| } | ||||
| 
 | ||||
| func (t *task) setState(s State) { | ||||
| 	t.l.Lock() | ||||
| 	defer t.l.Unlock() | ||||
| 
 | ||||
| 	now := time.Now().UTC() | ||||
| 	t.updatedAt = &now | ||||
| 
 | ||||
| 	t.state = s | ||||
| } | ||||
| 
 | ||||
| func (t *task) setSuccess(ctx context.Context, res any) { | ||||
| 	t.l.Lock() | ||||
| 	defer t.l.Unlock() | ||||
| 
 | ||||
| 	now := time.Now().UTC() | ||||
| 	t.updatedAt = &now | ||||
| 
 | ||||
| 	t.state = Success | ||||
| 	t.res = res | ||||
| 
 | ||||
| 	if t.fnSuccess != nil { | ||||
| 		t.fnSuccess(ctx, res) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (t *task) setFail(ctx context.Context, err error) { | ||||
| 	t.l.Lock() | ||||
| 	defer t.l.Unlock() | ||||
| 
 | ||||
| 	now := time.Now().UTC() | ||||
| 	t.updatedAt = &now | ||||
| 
 | ||||
| 	if t.state != Abort { | ||||
| 		t.state = Failed | ||||
| 	} | ||||
| 
 | ||||
| 	t.err = err | ||||
| 
 | ||||
| 	if t.fnError != nil { | ||||
| 		t.fnError(ctx, err) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (t *task) setTimer(tm *time.Timer) { | ||||
| 	t.l.Lock() | ||||
| 	defer t.l.Unlock() | ||||
| 
 | ||||
| 	t.timer = tm | ||||
| } | ||||
| 
 | ||||
| func (t *task) stopTimer() { | ||||
| 	t.l.Lock() | ||||
| 	defer t.l.Unlock() | ||||
| 
 | ||||
| 	if t.timer != nil { | ||||
| 		t.timer.Stop() | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (t *task) incr() { | ||||
| 	t.l.Lock() | ||||
| 	defer t.l.Unlock() | ||||
| 
 | ||||
| 	t.attempts += 1 | ||||
| } | ||||
| 
 | ||||
| func (t *task) log() { | ||||
| 	td := t.IntoDetails() | ||||
| 	td.log() | ||||
| } | ||||
| 
 | ||||
| func (t *task) GetID() uuid.UUID { | ||||
| 	t.l.RLock() | ||||
| 	defer t.l.RUnlock() | ||||
| 
 | ||||
| 	return t.id | ||||
| } | ||||
| 
 | ||||
| func (t *task) GetAttempts() uint32 { | ||||
| 	t.l.RLock() | ||||
| 	defer t.l.RUnlock() | ||||
| 
 | ||||
| 	return t.attempts | ||||
| } | ||||
| 
 | ||||
| func (t *task) GetState() State { | ||||
| 	t.l.RLock() | ||||
| 	defer t.l.RUnlock() | ||||
| 
 | ||||
| 	return t.state | ||||
| } | ||||
| 
 | ||||
| // TimeExceeded checks if the task does not reached its max duration execution (maxDuration). | ||||
| func (t *task) TimeExceeded() bool { | ||||
| 	t.l.RLock() | ||||
| 	defer t.l.RUnlock() | ||||
| 
 | ||||
| 	if md := t.maxDuration; md != nil { | ||||
| 		return time.Since(t.createdAt) >= *md | ||||
| 	} | ||||
| 
 | ||||
| 	return false | ||||
| } | ||||
| 
 | ||||
| func (t *task) Abort() { | ||||
| 	t.stopTimer() | ||||
| 	t.chAbort <- struct{}{} | ||||
| } | ||||
| 
 | ||||
| func (t *task) Run(ctx context.Context) { | ||||
| 	if t.TimeExceeded() { | ||||
| 		t.setFail(ctx, ErrTimeExceeded) | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	if s := t.GetState(); s != Pending { | ||||
| 		log.Error().Msg("unable to launch a task that not in pending state") | ||||
| 		t.setFail(ctx, ErrExecutionEngine) | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	var ctxExec context.Context | ||||
| 	var fnCancel context.CancelFunc | ||||
| 	if t.execTimeout != nil { | ||||
| 		ctxExec, fnCancel = context.WithTimeout(ctx, *t.execTimeout) | ||||
| 	} else { | ||||
| 		ctxExec, fnCancel = context.WithCancel(ctx) | ||||
| 	} | ||||
| 	defer fnCancel() | ||||
| 
 | ||||
| 	t.incr() | ||||
| 	t.setState(Running) | ||||
| 
 | ||||
| 	log.Info().Str("id", t.GetID().String()).Msg("task is running...") | ||||
| 
 | ||||
| 	go func() { | ||||
| 		for range t.chAbort { | ||||
| 			t.setState(Abort) | ||||
| 			fnCancel() | ||||
| 			return | ||||
| 		} | ||||
| 	}() | ||||
| 
 | ||||
| 	defer t.log() | ||||
| 	res, err := t.fnJob(ctxExec) | ||||
| 	if err != nil { | ||||
| 		if errors.Is(err, ErrJobNotCompletedYet) { | ||||
| 			t.setState(Pending) | ||||
| 			return | ||||
| 		} | ||||
| 		t.setFail(ctx, err) | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	t.setSuccess(ctx, res) | ||||
| } | ||||
| 
 | ||||
| func (t *task) IntoDetails() TaskDetails { | ||||
| 	t.l.RLock() | ||||
| 	defer t.l.RUnlock() | ||||
| 
 | ||||
| 	td := TaskDetails{ | ||||
| 		CreatedAt:   t.createdAt, | ||||
| 		UpdatedAt:   t.updatedAt, | ||||
| 		State:       t.state.String(), | ||||
| 		Attempts:    t.attempts, | ||||
| 		MaxDuration: t.maxDuration, | ||||
| 	} | ||||
| 
 | ||||
| 	if t.state == Pending || t.state == Running { | ||||
| 		td.ElapsedTime = time.Since(t.createdAt) | ||||
| 	} else { | ||||
| 		td.ElapsedTime = t.updatedAt.Sub(t.createdAt) | ||||
| 	} | ||||
| 
 | ||||
| 	if err := t.err; err != nil { | ||||
| 		td.Err = err.Error() | ||||
| 	} | ||||
| 
 | ||||
| 	if t.additionalInfos != nil { | ||||
| 		td.AdditionalInfo = t.additionalInfos | ||||
| 	} | ||||
| 
 | ||||
| 	return td | ||||
| } | ||||
| 
 | ||||
| type tasks struct { | ||||
| 	l sync.RWMutex | ||||
| 	s map[uuid.UUID]*task | ||||
| } | ||||
| 
 | ||||
| func newTasks() tasks { | ||||
| 	return tasks{ | ||||
| 		s: make(map[uuid.UUID]*task), | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (ts *tasks) add(t *task) { | ||||
| 	ts.l.Lock() | ||||
| 	defer ts.l.Unlock() | ||||
| 
 | ||||
| 	ts.s[t.GetID()] = t | ||||
| } | ||||
| 
 | ||||
| func (ts *tasks) delete(t *task) { | ||||
| 	ts.l.Lock() | ||||
| 	defer ts.l.Unlock() | ||||
| 
 | ||||
| 	delete(ts.s, t.GetID()) | ||||
| } | ||||
| 
 | ||||
| func (ts *tasks) get(id uuid.UUID) *task { | ||||
| 	ts.l.RLock() | ||||
| 	defer ts.l.RUnlock() | ||||
| 
 | ||||
| 	t, ok := ts.s[id] | ||||
| 	if !ok { | ||||
| 		return nil | ||||
| 	} | ||||
| 
 | ||||
| 	return t | ||||
| } | ||||
| 
 | ||||
| func (ts *tasks) len() int { | ||||
| 	ts.l.RLock() | ||||
| 	defer ts.l.RUnlock() | ||||
| 
 | ||||
| 	return len(ts.s) | ||||
| } | ||||
| 
 | ||||
| func (ts *tasks) completed() bool { | ||||
| 	ts.l.RLock() | ||||
| 	defer ts.l.RUnlock() | ||||
| 
 | ||||
| 	for _, t := range ts.s { | ||||
| 		if t.GetState() == Pending || t.GetState() == Running { | ||||
| 			return false | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return true | ||||
| } | ||||
| 
 | ||||
| func (ts *tasks) getAllDetails() []TaskDetails { | ||||
| 	ts.l.RLock() | ||||
| 	defer ts.l.RUnlock() | ||||
| 
 | ||||
| 	details := []TaskDetails{} | ||||
| 	for _, t := range ts.s { | ||||
| 		details = append(details, t.IntoDetails()) | ||||
| 	} | ||||
| 
 | ||||
| 	return details | ||||
| } | ||||
| 
 | ||||
| func (ts *tasks) getDetails(id uuid.UUID) TaskDetails { | ||||
| 	ts.l.RLock() | ||||
| 	defer ts.l.RUnlock() | ||||
| 
 | ||||
| 	t, ok := ts.s[id] | ||||
| 	if !ok { | ||||
| 		return TaskDetails{State: UnknownState} | ||||
| 	} | ||||
| 
 | ||||
| 	return t.IntoDetails() | ||||
| } | ||||
| 
 | ||||
| func (ts *tasks) abort() { | ||||
| 	ts.l.RLock() | ||||
| 	defer ts.l.RUnlock() | ||||
| 
 | ||||
| 	for _, t := range ts.s { | ||||
| 		t.Abort() | ||||
| 	} | ||||
| } | ||||
							
								
								
									
										243
									
								
								task_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										243
									
								
								task_test.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,243 @@ | ||||
| package scheduler | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"errors" | ||||
| 	"testing" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/google/uuid" | ||||
| 	"github.com/stretchr/testify/assert" | ||||
| ) | ||||
| 
 | ||||
| func TestTask(t *testing.T) { | ||||
| 	ctx := context.Background() | ||||
| 	var i int | ||||
| 
 | ||||
| 	task := NewTask( | ||||
| 		func(ctx context.Context) (any, error) { | ||||
| 			i += 1 | ||||
| 			return nil, nil | ||||
| 		}, | ||||
| 	) | ||||
| 
 | ||||
| 	task.Run(ctx) | ||||
| 
 | ||||
| 	assert.Equal(t, nil, task.res) | ||||
| 	assert.NotEmpty(t, task.updatedAt) | ||||
| 	assert.Equal(t, 1, i) | ||||
| 	assert.Equal(t, 1, int(task.GetAttempts())) | ||||
| } | ||||
| 
 | ||||
| func TestAbortTask(t *testing.T) { | ||||
| 	ctx := context.Background() | ||||
| 
 | ||||
| 	task := NewTask( | ||||
| 		func(ctx context.Context) (any, error) { | ||||
| 			<-ctx.Done() | ||||
| 			return nil, ctx.Err() | ||||
| 		}, | ||||
| 	) | ||||
| 
 | ||||
| 	timer := time.NewTicker(200 * time.Millisecond) | ||||
| 	go func() { | ||||
| 		<-timer.C | ||||
| 		task.Abort() | ||||
| 	}() | ||||
| 	task.Run(ctx) | ||||
| 
 | ||||
| 	assert.Equal(t, Abort, task.GetState()) | ||||
| } | ||||
| 
 | ||||
| func TestTaskContextDone(t *testing.T) { | ||||
| 	ctx, fnCancel := context.WithCancel(context.Background()) | ||||
| 
 | ||||
| 	task := NewTask( | ||||
| 		func(ctx context.Context) (any, error) { | ||||
| 			<-ctx.Done() | ||||
| 			return nil, ctx.Err() | ||||
| 		}, | ||||
| 	) | ||||
| 
 | ||||
| 	timer := time.NewTicker(200 * time.Millisecond) | ||||
| 	go func() { | ||||
| 		<-timer.C | ||||
| 		fnCancel() | ||||
| 	}() | ||||
| 	task.Run(ctx) | ||||
| 
 | ||||
| 	assert.Equal(t, Failed, task.GetState()) | ||||
| } | ||||
| 
 | ||||
| func TestTaskFnSuccess(t *testing.T) { | ||||
| 	ctx := context.Background() | ||||
| 	var result int | ||||
| 
 | ||||
| 	task := NewTask( | ||||
| 		func(ctx context.Context) (any, error) { | ||||
| 			return 3, nil | ||||
| 		}, | ||||
| 		WithFnSuccess(func(ctx context.Context, res any) { | ||||
| 			t, ok := res.(int) | ||||
| 			if !ok { | ||||
| 				return | ||||
| 			} | ||||
| 
 | ||||
| 			result = t * 2 | ||||
| 		}), | ||||
| 	) | ||||
| 
 | ||||
| 	task.Run(ctx) | ||||
| 
 | ||||
| 	assert.Equal(t, Success, task.GetState()) | ||||
| 	assert.Equal(t, 6, result) | ||||
| } | ||||
| 
 | ||||
| func TestTaskFnError(t *testing.T) { | ||||
| 	ctx := context.Background() | ||||
| 	var result error | ||||
| 
 | ||||
| 	task := NewTask( | ||||
| 		func(ctx context.Context) (any, error) { | ||||
| 			return 3, errors.New("error occurred...") | ||||
| 		}, | ||||
| 		WithFnError(func(ctx context.Context, err error) { | ||||
| 			result = err | ||||
| 		}), | ||||
| 	) | ||||
| 
 | ||||
| 	task.Run(ctx) | ||||
| 
 | ||||
| 	assert.Equal(t, Failed, task.GetState()) | ||||
| 	assert.Equal(t, "error occurred...", result.Error()) | ||||
| } | ||||
| 
 | ||||
| func TestTaskWithErrJobNotCompletedYet(t *testing.T) { | ||||
| 	ctx := context.Background() | ||||
| 	var attempts int | ||||
| 
 | ||||
| 	task := NewTask( | ||||
| 		func(ctx context.Context) (any, error) { | ||||
| 			if attempts < 2 { | ||||
| 				attempts += 1 | ||||
| 				return nil, ErrJobNotCompletedYet | ||||
| 			} | ||||
| 			return "ok", nil | ||||
| 		}, | ||||
| 	) | ||||
| 
 | ||||
| 	for i := 0; i < 2; i++ { | ||||
| 		task.Run(ctx) | ||||
| 		assert.Equal(t, Pending, task.GetState()) | ||||
| 	} | ||||
| 
 | ||||
| 	task.Run(ctx) | ||||
| 	assert.Equal(t, Success, task.GetState()) | ||||
| 	assert.Equal(t, 3, int(task.GetAttempts())) | ||||
| } | ||||
| 
 | ||||
| func TestTaskTimeExceeded(t *testing.T) { | ||||
| 	ctx := context.Background() | ||||
| 
 | ||||
| 	task := NewTask( | ||||
| 		func(ctx context.Context) (any, error) { | ||||
| 			return "ko", nil | ||||
| 		}, | ||||
| 		WithMaxDuration(5*time.Millisecond), | ||||
| 	) | ||||
| 
 | ||||
| 	time.Sleep(10 * time.Millisecond) | ||||
| 
 | ||||
| 	task.Run(ctx) | ||||
| 	assert.Equal(t, Failed, task.GetState()) | ||||
| 	assert.Equal(t, 0, int(task.GetAttempts())) | ||||
| } | ||||
| 
 | ||||
| func TestTaskExecTimeout(t *testing.T) { | ||||
| 	ctx := context.Background() | ||||
| 
 | ||||
| 	task := NewTask( | ||||
| 		func(ctx context.Context) (any, error) { | ||||
| 			<-ctx.Done() | ||||
| 			return nil, ctx.Err() | ||||
| 		}, | ||||
| 		WithExecTimeout(5*time.Millisecond), | ||||
| 	) | ||||
| 
 | ||||
| 	time.Sleep(10 * time.Millisecond) | ||||
| 
 | ||||
| 	task.Run(ctx) | ||||
| 	assert.Equal(t, Failed, task.GetState()) | ||||
| 	assert.Equal(t, 1, int(task.GetAttempts())) | ||||
| } | ||||
| 
 | ||||
| func TestTaskDetails(t *testing.T) { | ||||
| 	ctx := context.Background() | ||||
| 
 | ||||
| 	task := NewTask( | ||||
| 		func(ctx context.Context) (any, error) { | ||||
| 			return "coucou", nil | ||||
| 		}, | ||||
| 	) | ||||
| 
 | ||||
| 	details := task.IntoDetails() | ||||
| 
 | ||||
| 	assert.Equal(t, 0, int(details.Attempts)) | ||||
| 	assert.Equal(t, "pending", details.State) | ||||
| 	assert.False(t, details.CreatedAt.IsZero()) | ||||
| 	assert.Empty(t, details.UpdatedAt) | ||||
| 	assert.Nil(t, details.MaxDuration) | ||||
| 	assert.Empty(t, details.Err) | ||||
| 	assert.NotEmpty(t, details.ElapsedTime) | ||||
| 
 | ||||
| 	task.Run(ctx) | ||||
| 
 | ||||
| 	details = task.IntoDetails() | ||||
| 
 | ||||
| 	assert.Equal(t, 1, int(details.Attempts)) | ||||
| 	assert.Equal(t, "success", details.State) | ||||
| 	assert.False(t, details.CreatedAt.IsZero()) | ||||
| 	assert.NotEmpty(t, details.UpdatedAt) | ||||
| 	assert.Nil(t, details.MaxDuration) | ||||
| 	assert.Empty(t, details.Err) | ||||
| 	assert.NotEmpty(t, details.ElapsedTime) | ||||
| } | ||||
| 
 | ||||
| func TestTaskAdditionalInfos(t *testing.T) { | ||||
| 	t.Run("with key value", func(t *testing.T) { | ||||
| 		elementID := uuid.NewString() | ||||
| 		task := NewTask( | ||||
| 			func(ctx context.Context) (any, error) { | ||||
| 				return "yo", nil | ||||
| 			}, | ||||
| 			WithAdditionalInfos("transportId", elementID), | ||||
| 			WithAdditionalInfos("element", "transport"), | ||||
| 		) | ||||
| 
 | ||||
| 		assert.Equal(t, elementID, task.additionalInfos["transportId"]) | ||||
| 		assert.Equal(t, "transport", task.additionalInfos["element"]) | ||||
| 	}) | ||||
| 
 | ||||
| 	t.Run("with empty key", func(t *testing.T) { | ||||
| 		elementID := uuid.NewString() | ||||
| 		task := NewTask( | ||||
| 			func(ctx context.Context) (any, error) { | ||||
| 				return "hello", nil | ||||
| 			}, | ||||
| 			WithAdditionalInfos("", elementID), | ||||
| 			WithAdditionalInfos("element", "transport"), | ||||
| 		) | ||||
| 
 | ||||
| 		assert.Equal(t, "transport", task.additionalInfos["element"]) | ||||
| 	}) | ||||
| 
 | ||||
| 	t.Run("with empty infos", func(t *testing.T) { | ||||
| 		task := NewTask( | ||||
| 			func(ctx context.Context) (any, error) { | ||||
| 				return "hey", nil | ||||
| 			}, | ||||
| 		) | ||||
| 
 | ||||
| 		assert.Nil(t, task.additionalInfos) | ||||
| 	}) | ||||
| } | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user