Compare commits
	
		
			No commits in common. "main" and "feat/slots" have entirely different histories.
		
	
	
		
			main
			...
			feat/slots
		
	
		
							
								
								
									
										3
									
								
								Makefile
									
									
									
									
									
								
							
							
						
						
									
										3
									
								
								Makefile
									
									
									
									
									
								
							| @ -1,3 +1,6 @@ | |||||||
|  | run: lint | ||||||
|  | 	go run main.go | ||||||
|  | 
 | ||||||
| lint: | lint: | ||||||
| 	golangci-lint run ./... | 	golangci-lint run ./... | ||||||
| 
 | 
 | ||||||
|  | |||||||
							
								
								
									
										81
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										81
									
								
								README.md
									
									
									
									
									
								
							| @ -1,47 +1,48 @@ | |||||||
| # cycle-scheduler | # cycle-scheduler | ||||||
| 
 | 
 | ||||||
| cycle-scheduler is a simple scheduler lib, handling tasks and executes them at regular interval. If a task is not in desired state, the task is re-scheduled. | cycle-scheduler is a simple scheduler handling jobs and executes them at regular interval. | ||||||
| 
 | 
 | ||||||
| **NOTE**: this should be not used for long-running tasks, it's more suitable for shorts tasks like polling etc... | Here a simple representation: | ||||||
|  | ```ascii | ||||||
|  | +------------------------------------------------------+ | ||||||
|  | | +---+ +---+ +---+ +---+ +---+                  +---+ | | ||||||
|  | | |   | |   | |   | |   | |   |                  |   | | | ||||||
|  | | |   | |   | |   | |   | |   |                  |   | | | ||||||
|  | | |   | |   | |   | |   | |   |                  |   | | | ||||||
|  | | |   | |   | |   | |   | |   |                  |   | | | ||||||
|  | | |   | |   | |   | |   | |   |                  |   | | | ||||||
|  | | |   | |   | |   | |   | |   |                  |   | | | ||||||
|  | | |s1 | |s2 | |s3 | |s4 | |   |                  |s60| | | ||||||
|  | | +---+ +---+ +---+ +---+ +---+                  +---+ | | ||||||
|  | +---------------^--------------------------------------+ | ||||||
|  | ``` | ||||||
|  | Jobs are handle in a array of job slices. | ||||||
| 
 | 
 | ||||||
| ## Examples | At each interval (clock), the cursor `^` moves to the next slot (s*). | ||||||
| * Init a new scheduler with 4 workers | If there are jobs, they are sent to workers to be executed | ||||||
|  | and the slot is cleaned. | ||||||
|  | At the end of the slot (s60), the cursor re-starts a new cycle from s1. | ||||||
|  | 
 | ||||||
|  | If a job is not in a desire state, the job is re-scheduled in the current slot to be re-executed in the next cycle. | ||||||
|  | 
 | ||||||
|  | **NOTE**: This scheduler does not accept long running tasks. Job execution have a fixed timeout of 10s. | ||||||
|  | Pooling tasks are more suitable for this kind of scheduler. | ||||||
|  | 
 | ||||||
|  | ## Run | ||||||
|  | You can run sample tests from `main.go` to see the scheduler in action: | ||||||
|  | ```bash | ||||||
|  | make run | ||||||
|  | ``` | ||||||
|  | If all goes well, you should see this kind of output in the stdout: | ||||||
|  | ```ascii | ||||||
|  | # cycle-scheduler (slot: 7) | ||||||
|  | _ P _ _ _ _ _ _ _ _ _ _ _ _ | ||||||
|  | - - - - - - ^ - - - - - - -  | ||||||
|  | ``` | ||||||
|  | > **P** means *pending* state | ||||||
|  | 
 | ||||||
|  | You can adjust the clock interval as needed in `main.go`: | ||||||
| ```go | ```go | ||||||
| package main | interval := 200 * time.Millisecond | ||||||
| 
 |  | ||||||
| import ( |  | ||||||
| 	"context" |  | ||||||
| 	"time" |  | ||||||
| 
 |  | ||||||
| 	scheduler "gitea.thegux.fr/rmanach/cycle-scheduler.git" |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| func main() { |  | ||||||
| 	ctx, fnCancel := context.WithCancel(context.Background()) |  | ||||||
| 	s := scheduler.NewSchedulerCycle(ctx, 4) |  | ||||||
| 
 |  | ||||||
| 	// add a task with an execution interval of 2 ms (executed every 2 ms) |  | ||||||
| 	// and a maximum duration of 30 second. |  | ||||||
| 	s.Delay( |  | ||||||
| 		func(ctx context.Context) (any, error) { |  | ||||||
| 			// ... |  | ||||||
| 			return nil, nil |  | ||||||
| 		}, |  | ||||||
| 		scheduler.WithExecInterval(2*time.Millisecond), |  | ||||||
| 		scheduler.WithMaxDuration(30*time.Second), |  | ||||||
| 	) |  | ||||||
| 
 |  | ||||||
| 	// stop the program after 5 seconds |  | ||||||
| 	go func() { |  | ||||||
| 		time.Sleep(5 * time.Second) |  | ||||||
| 		fnCancel() |  | ||||||
| 	}() |  | ||||||
| 
 |  | ||||||
| 	<-ctx.Done() |  | ||||||
| 	<-s.Done() |  | ||||||
| } |  | ||||||
| ``` | ``` | ||||||
| 
 | 
 | ||||||
| **NOTE**: for `Delay` optionals arguments, check the `NewTask` method documentation for more details. |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
|  | |||||||
							
								
								
									
										2
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.mod
									
									
									
									
									
								
							| @ -1,4 +1,4 @@ | |||||||
| module gitea.thegux.fr/rmanach/cycle-scheduler.git | module cycle-scheduler | ||||||
| 
 | 
 | ||||||
| go 1.22.4 | go 1.22.4 | ||||||
| 
 | 
 | ||||||
|  | |||||||
							
								
								
									
										166
									
								
								internal/job/job.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										166
									
								
								internal/job/job.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,166 @@ | |||||||
|  | package job | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"errors" | ||||||
|  | 	"sync" | ||||||
|  | 	"time" | ||||||
|  | 
 | ||||||
|  | 	"github.com/google/uuid" | ||||||
|  | 	"github.com/rs/zerolog/log" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | type State int | ||||||
|  | 
 | ||||||
|  | const ( | ||||||
|  | 	Pending State = iota | ||||||
|  | 	Running | ||||||
|  | 	Success | ||||||
|  | 	Failed | ||||||
|  | 	Abort | ||||||
|  | 	Unknown | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | const UnknownState = "unknown" | ||||||
|  | 
 | ||||||
|  | var JobExecTimeout = 10 * time.Second | ||||||
|  | 
 | ||||||
|  | func (s State) String() string { | ||||||
|  | 	switch s { | ||||||
|  | 	case Pending: | ||||||
|  | 		return "pending" | ||||||
|  | 	case Running: | ||||||
|  | 		return "running" | ||||||
|  | 	case Success: | ||||||
|  | 		return "success" | ||||||
|  | 	case Failed: | ||||||
|  | 		return "failed" | ||||||
|  | 	case Abort: | ||||||
|  | 		return "abort" | ||||||
|  | 	case Unknown: | ||||||
|  | 		return UnknownState | ||||||
|  | 	default: | ||||||
|  | 		return UnknownState | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | var ( | ||||||
|  | 	ErrJobAborted         = errors.New("job has been aborted") | ||||||
|  | 	ErrJobNotCompletedYet = errors.New("job is not right state, retrying") | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | type FnJob func(ctx context.Context) error | ||||||
|  | 
 | ||||||
|  | type JobDetails struct { | ||||||
|  | 	ID        uuid.UUID  `json:"id"` | ||||||
|  | 	State     string     `json:"state"` | ||||||
|  | 	CreatedAt time.Time  `json:"createdAt"` | ||||||
|  | 	UpdatedAt *time.Time `json:"updatedAt,omitempty"` | ||||||
|  | 	Err       string     `json:"error"` | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // TODO(rmanach): add priority level | ||||||
|  | type Job struct { | ||||||
|  | 	l         sync.RWMutex | ||||||
|  | 	id        uuid.UUID | ||||||
|  | 	createdAt time.Time | ||||||
|  | 	updatedAt *time.Time | ||||||
|  | 	state     State | ||||||
|  | 	task      FnJob | ||||||
|  | 	err       error | ||||||
|  | 	chAbort   chan struct{} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func NewJob(task FnJob, row, col int) Job { | ||||||
|  | 	return Job{ | ||||||
|  | 		id:        uuid.New(), | ||||||
|  | 		createdAt: time.Now().UTC(), | ||||||
|  | 		state:     Pending, | ||||||
|  | 		task:      task, | ||||||
|  | 		chAbort:   make(chan struct{}, 1), | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (j *Job) IntoDetails() JobDetails { | ||||||
|  | 	j.l.RLock() | ||||||
|  | 	defer j.l.RUnlock() | ||||||
|  | 
 | ||||||
|  | 	jd := JobDetails{ | ||||||
|  | 		ID:        j.id, | ||||||
|  | 		CreatedAt: j.createdAt, | ||||||
|  | 		State:     j.state.String(), | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if err := j.err; err != nil { | ||||||
|  | 		jd.Err = err.Error() | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if ut := j.updatedAt; ut != nil { | ||||||
|  | 		jd.UpdatedAt = ut | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return jd | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (j *Job) GetID() uuid.UUID { | ||||||
|  | 	return j.id | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (j *Job) GetState() State { | ||||||
|  | 	j.l.RLock() | ||||||
|  | 	defer j.l.RUnlock() | ||||||
|  | 
 | ||||||
|  | 	return j.state | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (j *Job) setState(s State) { | ||||||
|  | 	j.l.Lock() | ||||||
|  | 	defer j.l.Unlock() | ||||||
|  | 
 | ||||||
|  | 	now := time.Now().UTC() | ||||||
|  | 	j.updatedAt = &now | ||||||
|  | 
 | ||||||
|  | 	j.state = s | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (j *Job) setFail(err error) { | ||||||
|  | 	j.l.Lock() | ||||||
|  | 	defer j.l.Unlock() | ||||||
|  | 
 | ||||||
|  | 	now := time.Now().UTC() | ||||||
|  | 	j.updatedAt = &now | ||||||
|  | 
 | ||||||
|  | 	j.state = Failed | ||||||
|  | 	j.err = err | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (j *Job) Abort() { | ||||||
|  | 	j.setState(Abort) | ||||||
|  | 	j.chAbort <- struct{}{} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (j *Job) Run(ctx context.Context) { | ||||||
|  | 	ctxExec, fnCancel := context.WithTimeout(ctx, JobExecTimeout) | ||||||
|  | 	defer fnCancel() | ||||||
|  | 
 | ||||||
|  | 	j.setState(Running) | ||||||
|  | 
 | ||||||
|  | 	log.Info().Str("job", j.GetID().String()).Msg("job running...") | ||||||
|  | 
 | ||||||
|  | 	go func() { | ||||||
|  | 		for range j.chAbort { | ||||||
|  | 			j.setState(Abort) | ||||||
|  | 			fnCancel() | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  | 
 | ||||||
|  | 	if err := j.task(ctxExec); err != nil { | ||||||
|  | 		if errors.Is(err, ErrJobNotCompletedYet) { | ||||||
|  | 			j.setState(Pending) | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 		j.setFail(err) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	j.setState(Success) | ||||||
|  | } | ||||||
							
								
								
									
										371
									
								
								internal/scheduler/scheduler.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										371
									
								
								internal/scheduler/scheduler.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,371 @@ | |||||||
|  | package scheduler | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"cycle-scheduler/internal/job" | ||||||
|  | 	"fmt" | ||||||
|  | 	"strings" | ||||||
|  | 	"sync" | ||||||
|  | 	"time" | ||||||
|  | 
 | ||||||
|  | 	"github.com/google/uuid" | ||||||
|  | 	"github.com/rs/zerolog/log" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | const ( | ||||||
|  | 	TableTitle  = "# cycle-scheduler" | ||||||
|  | 	Cursor      = "^" | ||||||
|  | 	CycleLength = 60 | ||||||
|  | 	MaxWorkers  = 5 | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | const MaxSlotsIdx = 59 | ||||||
|  | 
 | ||||||
|  | type JobSlot struct { | ||||||
|  | 	*job.Job | ||||||
|  | 	row int | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // SchedulerCycle is a dumb scheduler. | ||||||
|  | // It handle job and executes it at each cycle (60 * interval). | ||||||
|  | // | ||||||
|  | // Jobs are handle in a array of job slices. | ||||||
|  | // At each interval (clock), the cursor moves to the next slot (s*). | ||||||
|  | // If there are jobs, they are sent to workers to be executed | ||||||
|  | // and the slot is cleaned. | ||||||
|  | // | ||||||
|  | // At the end of the slot (s60), the cursor re-starts a cycle at s1. | ||||||
|  | type SchedulerCycle struct { | ||||||
|  | 	l  sync.RWMutex | ||||||
|  | 	wg sync.WaitGroup | ||||||
|  | 
 | ||||||
|  | 	ctx      context.Context | ||||||
|  | 	fnCancel context.CancelFunc | ||||||
|  | 
 | ||||||
|  | 	interval    time.Duration | ||||||
|  | 	currentSlot int | ||||||
|  | 	slots       [60][]*job.Job | ||||||
|  | 	jobs        map[uuid.UUID]*job.Job | ||||||
|  | 
 | ||||||
|  | 	chJobs chan *JobSlot | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func NewSchedulerCycle(ctx context.Context, interval time.Duration) *SchedulerCycle { | ||||||
|  | 	ctxChild, fnCancel := context.WithCancel(ctx) | ||||||
|  | 
 | ||||||
|  | 	c := SchedulerCycle{ | ||||||
|  | 		wg:          sync.WaitGroup{}, | ||||||
|  | 		ctx:         ctxChild, | ||||||
|  | 		fnCancel:    fnCancel, | ||||||
|  | 		interval:    interval, | ||||||
|  | 		currentSlot: 0, | ||||||
|  | 		slots:       [60][]*job.Job{}, | ||||||
|  | 		jobs:        make(map[uuid.UUID]*job.Job), | ||||||
|  | 		chJobs:      make(chan *JobSlot), | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	c.run() | ||||||
|  | 
 | ||||||
|  | 	return &c | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *SchedulerCycle) Stop() { | ||||||
|  | 	c.fnCancel() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *SchedulerCycle) Done() <-chan struct{} { | ||||||
|  | 	done := make(chan struct{}) | ||||||
|  | 	go func() { | ||||||
|  | 		<-c.ctx.Done() | ||||||
|  | 		c.wg.Done() | ||||||
|  | 		done <- struct{}{} | ||||||
|  | 	}() | ||||||
|  | 	return done | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *SchedulerCycle) Len() int { | ||||||
|  | 	c.l.Lock() | ||||||
|  | 	defer c.l.Unlock() | ||||||
|  | 
 | ||||||
|  | 	return len(c.jobs) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *SchedulerCycle) HasAllJobsDone() bool { | ||||||
|  | 	c.l.Lock() | ||||||
|  | 	defer c.l.Unlock() | ||||||
|  | 
 | ||||||
|  | 	for _, j := range c.jobs { | ||||||
|  | 		if j.GetState() == job.Pending || j.GetState() == job.Running { | ||||||
|  | 			return false | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return true | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *SchedulerCycle) GetJobsDetails() []job.JobDetails { | ||||||
|  | 	c.l.Lock() | ||||||
|  | 	defer c.l.Unlock() | ||||||
|  | 
 | ||||||
|  | 	details := []job.JobDetails{} | ||||||
|  | 	for _, j := range c.jobs { | ||||||
|  | 		details = append(details, j.IntoDetails()) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return details | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Delay builds a job and add it to the scheduler engine. | ||||||
|  | func (c *SchedulerCycle) Delay(fnJob job.FnJob) uuid.UUID { | ||||||
|  | 	c.l.Lock() | ||||||
|  | 	defer c.l.Unlock() | ||||||
|  | 
 | ||||||
|  | 	nextSlot := c.currentSlot + 1 | ||||||
|  | 	if nextSlot > MaxSlotsIdx { | ||||||
|  | 		nextSlot = 0 | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	j := job.NewJob(fnJob, nextSlot, len(c.slots[nextSlot])) | ||||||
|  | 
 | ||||||
|  | 	c.slots[nextSlot] = append(c.slots[nextSlot], &j) | ||||||
|  | 	c.jobs[j.GetID()] = &j | ||||||
|  | 
 | ||||||
|  | 	log.Info().Str("job", j.GetID().String()).Msg("job added successfully") | ||||||
|  | 	return j.GetID() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Abort aborts the job given by its id if it exists.. | ||||||
|  | func (c *SchedulerCycle) Abort(id uuid.UUID) bool { | ||||||
|  | 	if j := c.getJob(id); j != nil { | ||||||
|  | 		j.Abort() | ||||||
|  | 
 | ||||||
|  | 		log.Info().Str("job", j.GetID().String()).Msg("abort job done") | ||||||
|  | 		return true | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return false | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // GetJobDetails returns the job details by . | ||||||
|  | func (c *SchedulerCycle) GetJobDetails(id uuid.UUID) job.JobDetails { | ||||||
|  | 	c.l.Lock() | ||||||
|  | 	defer c.l.Unlock() | ||||||
|  | 
 | ||||||
|  | 	j, ok := c.jobs[id] | ||||||
|  | 	if !ok { | ||||||
|  | 		return job.JobDetails{ | ||||||
|  | 			State: job.Unknown.String(), | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return j.IntoDetails() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Display outputs earch interval the scheduler state. | ||||||
|  | func (c *SchedulerCycle) Display() { | ||||||
|  | 	ticker := time.NewTicker(c.interval) | ||||||
|  | 	go func() { | ||||||
|  | 		for range ticker.C { | ||||||
|  | 			c.display() | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // display writes to stdout the state of the scheduler as a table. | ||||||
|  | func (c *SchedulerCycle) display() { //nolint:gocyclo // not complex | ||||||
|  | 	c.l.RLock() | ||||||
|  | 	defer c.l.RUnlock() | ||||||
|  | 
 | ||||||
|  | 	var maxCols int | ||||||
|  | 	for i := range c.slots { | ||||||
|  | 		if l := len(c.slots[i]); l > maxCols { | ||||||
|  | 			maxCols = l | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	table := [][]string{} | ||||||
|  | 	title := fmt.Sprintf("%s (slot: %d)", TableTitle, c.currentSlot+1) | ||||||
|  | 	table = append(table, []string{title}) | ||||||
|  | 	for { | ||||||
|  | 		if maxCols == 0 { | ||||||
|  | 			break | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		row := make([]string, CycleLength) | ||||||
|  | 		for i := 0; i <= MaxSlotsIdx; i++ { | ||||||
|  | 			row[i] = "_" | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		for i := range c.slots { | ||||||
|  | 			if len(c.slots[i]) < maxCols { | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			j := c.slots[i][maxCols-1] | ||||||
|  | 			switch j.GetState() { | ||||||
|  | 			case job.Pending: | ||||||
|  | 				row[i] = "P" | ||||||
|  | 			case job.Running: | ||||||
|  | 				row[i] = "R" | ||||||
|  | 			case job.Failed: | ||||||
|  | 				row[i] = "X" | ||||||
|  | 			case job.Abort: | ||||||
|  | 				row[i] = "A" | ||||||
|  | 			case job.Unknown: | ||||||
|  | 				row[i] = "?" | ||||||
|  | 			case job.Success: | ||||||
|  | 				row[i] = "O" | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		table = append(table, row) | ||||||
|  | 		maxCols-- | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	row := make([]string, CycleLength) | ||||||
|  | 	for i := 0; i <= MaxSlotsIdx; i++ { | ||||||
|  | 		row[i] = "-" | ||||||
|  | 	} | ||||||
|  | 	table = append(table, row) | ||||||
|  | 
 | ||||||
|  | 	if l := len(table); l > 0 { | ||||||
|  | 		table[l-1][c.currentSlot] = Cursor | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	tableFormat := "" | ||||||
|  | 	for _, r := range table { | ||||||
|  | 		tableFormat += strings.Join(r, " ") | ||||||
|  | 		tableFormat += "\n" | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	fmt.Println(tableFormat) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *SchedulerCycle) getJob(id uuid.UUID) *job.Job { | ||||||
|  | 	c.l.RLock() | ||||||
|  | 	defer c.l.RUnlock() | ||||||
|  | 
 | ||||||
|  | 	j, ok := c.jobs[id] | ||||||
|  | 	if !ok { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return j | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // getCurrentSlotJobs collects all the current slot jobs | ||||||
|  | // and clean the slot. | ||||||
|  | func (c *SchedulerCycle) getCurrentSlotJobs() (int, []*job.Job) { | ||||||
|  | 	c.l.Lock() | ||||||
|  | 	defer c.l.Unlock() | ||||||
|  | 
 | ||||||
|  | 	jobs := c.slots[c.currentSlot] | ||||||
|  | 
 | ||||||
|  | 	c.slots[c.currentSlot] = []*job.Job{} | ||||||
|  | 
 | ||||||
|  | 	return c.currentSlot, jobs | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // updateSlot add a job to the slot where it was before. | ||||||
|  | func (c *SchedulerCycle) updateSlot(row int, j *job.Job) { | ||||||
|  | 	c.l.Lock() | ||||||
|  | 	defer c.l.Unlock() | ||||||
|  | 
 | ||||||
|  | 	c.slots[row] = append(c.slots[row], j) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // updateCurrentSlot add a job to the current slot. | ||||||
|  | func (c *SchedulerCycle) updateCurrentSlot(j *job.Job) { | ||||||
|  | 	c.l.Lock() | ||||||
|  | 	defer c.l.Unlock() | ||||||
|  | 
 | ||||||
|  | 	c.slots[c.currentSlot] = append(c.slots[c.currentSlot], j) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // incr increments the slot cursor. | ||||||
|  | // It the cursor reaches `MaxSlotsIdx`, it goes back to 0. | ||||||
|  | func (c *SchedulerCycle) incr() { | ||||||
|  | 	c.l.Lock() | ||||||
|  | 	defer c.l.Unlock() | ||||||
|  | 
 | ||||||
|  | 	nextSlot := c.currentSlot + 1 | ||||||
|  | 	if nextSlot > MaxSlotsIdx { | ||||||
|  | 		nextSlot = 0 | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	c.currentSlot = nextSlot | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // dispatch gets jobs from the current slot, resets the slot | ||||||
|  | // and dispatch all jobs to the workers. | ||||||
|  | // | ||||||
|  | // It all the workers are busy, the jobs are re-schedule in the same slot | ||||||
|  | // to be executed in the next cycle. | ||||||
|  | func (c *SchedulerCycle) dispatch() { | ||||||
|  | 	row, jobs := c.getCurrentSlotJobs() | ||||||
|  | 	for _, j := range jobs { | ||||||
|  | 		if j.GetState() == job.Abort { | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		select { | ||||||
|  | 		case c.chJobs <- &JobSlot{row: row, Job: j}: | ||||||
|  | 		default: | ||||||
|  | 			log.Warn().Msg("unable to put job in workers, trying next cycle") | ||||||
|  | 			c.updateSlot(row, j) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // run launches the workers and the ticker. | ||||||
|  | func (c *SchedulerCycle) run() { | ||||||
|  | 	c.workers() | ||||||
|  | 	c.tick() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // workers launches `MaxWorkers` number of worker to execute job. | ||||||
|  | // If job returns `ErrJobNotCompletedYet`, it re-schedules in the same slot. | ||||||
|  | func (c *SchedulerCycle) workers() { | ||||||
|  | 	for i := 0; i < MaxWorkers; i++ { | ||||||
|  | 		c.wg.Add(1) | ||||||
|  | 		go func() { | ||||||
|  | 			defer c.wg.Done() | ||||||
|  | 			for { | ||||||
|  | 				select { | ||||||
|  | 				case j := <-c.chJobs: | ||||||
|  | 					c.executeJob(j.Job, c.updateCurrentSlot) | ||||||
|  | 				case <-c.ctx.Done(): | ||||||
|  | 					log.Error().Msg("context done, worker is stopping...") | ||||||
|  | 					return | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 		}() | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *SchedulerCycle) executeJob(j *job.Job, fnFallBack func(*job.Job)) { | ||||||
|  | 	j.Run(c.ctx) | ||||||
|  | 	if j.GetState() == job.Pending { | ||||||
|  | 		fnFallBack(j) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // tick is a simple ticker incrementing at each scheduler interval, | ||||||
|  | // the slot cursor and dispatch jobs to the workers. | ||||||
|  | func (c *SchedulerCycle) tick() { | ||||||
|  | 	c.wg.Add(1) | ||||||
|  | 	go func() { | ||||||
|  | 		defer c.wg.Done() | ||||||
|  | 		for { | ||||||
|  | 			select { | ||||||
|  | 			case <-c.ctx.Done(): | ||||||
|  | 				log.Error().Msg("context done, ticker is stopping...") | ||||||
|  | 				return | ||||||
|  | 			default: | ||||||
|  | 				time.Sleep(c.interval) | ||||||
|  | 				c.incr() | ||||||
|  | 				c.dispatch() | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  | } | ||||||
							
								
								
									
										33
									
								
								internal/scheduler/scheduler_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										33
									
								
								internal/scheduler/scheduler_test.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,33 @@ | |||||||
|  | package scheduler | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"cycle-scheduler/internal/job" | ||||||
|  | 	"errors" | ||||||
|  | 	"testing" | ||||||
|  | 	"time" | ||||||
|  | 
 | ||||||
|  | 	"github.com/stretchr/testify/assert" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | func TestSlot(t *testing.T) { | ||||||
|  | 	ctx, fnCancel := context.WithCancel(context.Background()) | ||||||
|  | 	defer fnCancel() | ||||||
|  | 
 | ||||||
|  | 	s := NewSchedulerCycle(ctx, 1*time.Millisecond) | ||||||
|  | 
 | ||||||
|  | 	s.Delay(func(ctx context.Context) error { | ||||||
|  | 		return nil | ||||||
|  | 	}) | ||||||
|  | 	s.Delay(func(ctx context.Context) error { | ||||||
|  | 		return job.ErrJobNotCompletedYet | ||||||
|  | 	}) | ||||||
|  | 	j3 := s.Delay(func(ctx context.Context) error { | ||||||
|  | 		return errors.New("errors") | ||||||
|  | 	}) | ||||||
|  | 
 | ||||||
|  | 	time.Sleep(2 * time.Millisecond) | ||||||
|  | 
 | ||||||
|  | 	assert.Equal(t, 3, s.Len()) | ||||||
|  | 	assert.Equal(t, job.Failed.String(), s.GetJobDetails(j3).State) | ||||||
|  | } | ||||||
							
								
								
									
										122
									
								
								main.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										122
									
								
								main.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,122 @@ | |||||||
|  | package main | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"cycle-scheduler/internal/job" | ||||||
|  | 	"cycle-scheduler/internal/scheduler" | ||||||
|  | 	"encoding/json" | ||||||
|  | 	"errors" | ||||||
|  | 	"fmt" | ||||||
|  | 	"math/rand/v2" | ||||||
|  | 	"os" | ||||||
|  | 	"os/signal" | ||||||
|  | 	"time" | ||||||
|  | 
 | ||||||
|  | 	"github.com/rs/zerolog" | ||||||
|  | 	"github.com/rs/zerolog/log" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | func initLogger() { | ||||||
|  | 	zerolog.TimeFieldFormat = zerolog.TimeFormatUnix | ||||||
|  | 	log.Logger = log.With().Caller().Logger().Output(zerolog.ConsoleWriter{Out: os.Stderr}) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func main() { | ||||||
|  | 	initLogger() | ||||||
|  | 
 | ||||||
|  | 	ctx, stop := signal.NotifyContext( | ||||||
|  | 		context.Background(), | ||||||
|  | 		os.Interrupt, | ||||||
|  | 		os.Kill, | ||||||
|  | 	) | ||||||
|  | 	defer stop() | ||||||
|  | 
 | ||||||
|  | 	interval := 200 * time.Millisecond | ||||||
|  | 	s := scheduler.NewSchedulerCycle(ctx, interval) | ||||||
|  | 	s.Display() | ||||||
|  | 
 | ||||||
|  | 	// pending test | ||||||
|  | 	for i := 0; i < 20; i++ { | ||||||
|  | 		go func(i int) { | ||||||
|  | 			time.Sleep(time.Duration(i) * time.Second) | ||||||
|  | 			s.Delay(func(ctx context.Context) error { | ||||||
|  | 				time.Sleep(4 * time.Second) //nolint:mnd // test purpose | ||||||
|  | 				if rand.IntN(10)%2 == 0 {   //nolint:gosec,mnd // test prupose | ||||||
|  | 					return job.ErrJobNotCompletedYet | ||||||
|  | 				} | ||||||
|  | 				return nil | ||||||
|  | 			}) | ||||||
|  | 		}(i) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// abort test | ||||||
|  | 	j := s.Delay(func(ctx context.Context) error { | ||||||
|  | 		time.Sleep(4 * time.Second) //nolint:mnd // test purpose | ||||||
|  | 
 | ||||||
|  | 		select { | ||||||
|  | 		case <-ctx.Done(): | ||||||
|  | 			return ctx.Err() | ||||||
|  | 		default: | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		return job.ErrJobNotCompletedYet | ||||||
|  | 	}) | ||||||
|  | 	go func() { | ||||||
|  | 		time.Sleep(2 * time.Second) //nolint:mnd // test purpose | ||||||
|  | 		s.Abort(j) | ||||||
|  | 	}() | ||||||
|  | 
 | ||||||
|  | 	// abort test 2 | ||||||
|  | 	j2 := s.Delay(func(ctx context.Context) error { | ||||||
|  | 		time.Sleep(time.Second) | ||||||
|  | 
 | ||||||
|  | 		select { | ||||||
|  | 		case <-ctx.Done(): | ||||||
|  | 			return ctx.Err() | ||||||
|  | 		default: | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		return job.ErrJobNotCompletedYet | ||||||
|  | 	}) | ||||||
|  | 	go func() { | ||||||
|  | 		time.Sleep(10 * time.Second) //nolint:mnd // test purpose | ||||||
|  | 		s.Abort(j2) | ||||||
|  | 	}() | ||||||
|  | 
 | ||||||
|  | 	// error test | ||||||
|  | 	s.Delay(func(ctx context.Context) error { | ||||||
|  | 		time.Sleep(5 * time.Second) //nolint:mnd // test purpose | ||||||
|  | 		return errors.New("err") | ||||||
|  | 	}) | ||||||
|  | 
 | ||||||
|  | 	// success test | ||||||
|  | 	go func() { | ||||||
|  | 		time.Sleep(10 * time.Second) //nolint:mnd // test purpose | ||||||
|  | 		s.Delay(func(ctx context.Context) error { | ||||||
|  | 			time.Sleep(5 * time.Second) //nolint:mnd // test purpose | ||||||
|  | 			return nil | ||||||
|  | 		}) | ||||||
|  | 	}() | ||||||
|  | 
 | ||||||
|  | 	go func() { | ||||||
|  | 		for { | ||||||
|  | 			time.Sleep(2 * time.Second) //nolint:mnd // test purpose | ||||||
|  | 			if s.HasAllJobsDone() { | ||||||
|  | 				s.Stop() | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  | 
 | ||||||
|  | 	<-s.Done() | ||||||
|  | 
 | ||||||
|  | 	jds := s.GetJobsDetails() | ||||||
|  | 	for _, jd := range jds { | ||||||
|  | 		c, err := json.Marshal(&jd) | ||||||
|  | 		if err != nil { | ||||||
|  | 			log.Err(err).Str("job", jd.ID.String()).Msg("unable to parse job details into JSON") | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 		fmt.Println(string(c)) | ||||||
|  | 	} | ||||||
|  | } | ||||||
							
								
								
									
										213
									
								
								scheduler.go
									
									
									
									
									
								
							
							
						
						
									
										213
									
								
								scheduler.go
									
									
									
									
									
								
							| @ -1,213 +0,0 @@ | |||||||
| package scheduler |  | ||||||
| 
 |  | ||||||
| import ( |  | ||||||
| 	"context" |  | ||||||
| 	"sync" |  | ||||||
| 	"time" |  | ||||||
| 
 |  | ||||||
| 	"github.com/google/uuid" |  | ||||||
| 
 |  | ||||||
| 	"github.com/rs/zerolog/log" |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| const ( |  | ||||||
| 	ChanLength          = 500 |  | ||||||
| 	DefaultExecInterval = 30 * time.Second |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| type IScheduler interface { |  | ||||||
| 	Delay(fnJob FnJob, opts ...TaskOption) uuid.UUID |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // SchedulerCycle is a simple scheduler handling jobs and executes them at regular interval. |  | ||||||
| // If a task is not in desired state, the task is re-scheduled. |  | ||||||
| type SchedulerCycle struct { |  | ||||||
| 	wg sync.WaitGroup |  | ||||||
| 
 |  | ||||||
| 	ctx      context.Context |  | ||||||
| 	fnCancel context.CancelFunc |  | ||||||
| 
 |  | ||||||
| 	tasks tasks |  | ||||||
| 
 |  | ||||||
| 	chTasks chan *task |  | ||||||
| 	chDone  chan struct{} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func NewSchedulerCycle(ctx context.Context, workers uint32) *SchedulerCycle { |  | ||||||
| 	ctxChild, fnCancel := context.WithCancel(ctx) |  | ||||||
| 
 |  | ||||||
| 	c := SchedulerCycle{ |  | ||||||
| 		wg:       sync.WaitGroup{}, |  | ||||||
| 		ctx:      ctxChild, |  | ||||||
| 		fnCancel: fnCancel, |  | ||||||
| 		tasks:    newTasks(), |  | ||||||
| 		chTasks:  make(chan *task, ChanLength), |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	done := make(chan struct{}) |  | ||||||
| 	go func() { |  | ||||||
| 		<-c.ctx.Done() |  | ||||||
| 		defer c.fnCancel() |  | ||||||
| 		c.wg.Wait() |  | ||||||
| 		c.stop() |  | ||||||
| 		done <- struct{}{} |  | ||||||
| 	}() |  | ||||||
| 	c.chDone = done |  | ||||||
| 
 |  | ||||||
| 	c.run(workers) |  | ||||||
| 
 |  | ||||||
| 	return &c |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // delay sets the task timer when the task should be scheduled. |  | ||||||
| func (c *SchedulerCycle) delay(t *task) { |  | ||||||
| 	interval := DefaultExecInterval |  | ||||||
| 	if t.execInterval != nil { |  | ||||||
| 		interval = *t.execInterval |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	t.setTimer( |  | ||||||
| 		time.AfterFunc(interval, func() { |  | ||||||
| 			select { |  | ||||||
| 			case c.chTasks <- t: |  | ||||||
| 			default: |  | ||||||
| 				log.Warn().Str("id", t.GetID().String()).Msg("queue is full, can't accept new task, delayed it") |  | ||||||
| 				c.delay(t) |  | ||||||
| 			} |  | ||||||
| 		}), |  | ||||||
| 	) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // exec runs the task now or if all the workers are in use, delayed it. |  | ||||||
| func (c *SchedulerCycle) exec(t *task) { |  | ||||||
| 	select { |  | ||||||
| 	case c.chTasks <- t: |  | ||||||
| 	default: |  | ||||||
| 		log.Warn().Str("id", t.GetID().String()).Msg("queue is full, can't accept new task, delayed it") |  | ||||||
| 		c.delay(t) |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *SchedulerCycle) getTask(id uuid.UUID) *task { |  | ||||||
| 	return c.tasks.get(id) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // run launches a number of worker to execute tasks. |  | ||||||
| func (c *SchedulerCycle) run(n uint32) { |  | ||||||
| 	for i := 0; i < int(n); i++ { |  | ||||||
| 		c.wg.Add(1) |  | ||||||
| 		go func() { |  | ||||||
| 			defer c.wg.Done() |  | ||||||
| 			for { |  | ||||||
| 				select { |  | ||||||
| 				case t := <-c.chTasks: |  | ||||||
| 					c.execute(t, c.delay) |  | ||||||
| 				case <-c.ctx.Done(): |  | ||||||
| 					log.Error().Msg("context done, worker is stopping...") |  | ||||||
| 					return |  | ||||||
| 				} |  | ||||||
| 			} |  | ||||||
| 		}() |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // execute executes the task. |  | ||||||
| // |  | ||||||
| // It does not handle task error, it's up to the task to implement its own callbacks. |  | ||||||
| // In case of pending state, a callback is executed for actions. For the others states, |  | ||||||
| // the task is deleted from the scheduler. |  | ||||||
| func (c *SchedulerCycle) execute(t *task, fnFallBack func(*task)) { |  | ||||||
| 	t.Run(c.ctx) |  | ||||||
| 
 |  | ||||||
| 	switch t.GetState() { |  | ||||||
| 	case Pending: |  | ||||||
| 		fnFallBack(t) |  | ||||||
| 	case Success, Failed, Abort, Unknown: |  | ||||||
| 		c.tasks.delete(t) |  | ||||||
| 	case Running: |  | ||||||
| 		c.tasks.delete(t) |  | ||||||
| 		log.Debug().Str("id", t.GetID().String()).Msg("weird state (running) after job execution...") |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // stop aborts all tasks and waits until tasks are stopped. |  | ||||||
| // If the process can't be stopped within 10s, too bad... |  | ||||||
| func (c *SchedulerCycle) stop() { |  | ||||||
| 	c.tasks.abort() |  | ||||||
| 
 |  | ||||||
| 	if c.TasksDone() { |  | ||||||
| 		log.Info().Msg("all tasks has been stopped gracefully") |  | ||||||
| 		return |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	ctxTimeout := 10 * time.Second |  | ||||||
| 	ctx, fnCancel := context.WithTimeout(c.ctx, ctxTimeout) |  | ||||||
| 	defer fnCancel() |  | ||||||
| 
 |  | ||||||
| 	for { |  | ||||||
| 		select { |  | ||||||
| 		case <-ctx.Done(): |  | ||||||
| 			log.Error().Msg("stop context done, tasks has been stopped gracefully") |  | ||||||
| 			return |  | ||||||
| 		default: |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		if c.TasksDone() { |  | ||||||
| 			log.Info().Msg("all tasks has been stopped gracefully") |  | ||||||
| 			return |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		time.Sleep(time.Second) |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *SchedulerCycle) Done() <-chan struct{} { |  | ||||||
| 	return c.chDone |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *SchedulerCycle) Len() int { |  | ||||||
| 	return c.tasks.len() |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // TasksDone checks whether all the tasks has been completed. |  | ||||||
| func (c *SchedulerCycle) TasksDone() bool { |  | ||||||
| 	return c.tasks.completed() |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *SchedulerCycle) GetTasksDetails() []TaskDetails { |  | ||||||
| 	return c.tasks.getAllDetails() |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // GetTaskDetails returns the task details by id. |  | ||||||
| func (c *SchedulerCycle) GetTaskDetails(id uuid.UUID) TaskDetails { |  | ||||||
| 	return c.tasks.getDetails(id) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // Delay builds a task and adds it to the scheduler engine. |  | ||||||
| func (c *SchedulerCycle) Delay(fnJob FnJob, opts ...TaskOption) uuid.UUID { |  | ||||||
| 	select { |  | ||||||
| 	case <-c.Done(): |  | ||||||
| 		log.Error().Msg("context done unable to add new job") |  | ||||||
| 	default: |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	t := NewTask(fnJob, opts...) |  | ||||||
| 
 |  | ||||||
| 	c.tasks.add(t) |  | ||||||
| 	c.exec(t) |  | ||||||
| 
 |  | ||||||
| 	log.Info().Str("id", t.GetID().String()).Msg("task added successfully") |  | ||||||
| 	return t.GetID() |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // Abort aborts the task given by its id if it exists. |  | ||||||
| func (c *SchedulerCycle) Abort(id uuid.UUID) bool { |  | ||||||
| 	if t := c.getTask(id); t != nil { |  | ||||||
| 		t.Abort() |  | ||||||
| 
 |  | ||||||
| 		log.Info().Str("id", t.GetID().String()).Msg("abort task done") |  | ||||||
| 		return true |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return false |  | ||||||
| } |  | ||||||
| @ -1,93 +0,0 @@ | |||||||
| package scheduler |  | ||||||
| 
 |  | ||||||
| import ( |  | ||||||
| 	"context" |  | ||||||
| 	"testing" |  | ||||||
| 	"time" |  | ||||||
| 
 |  | ||||||
| 	"github.com/stretchr/testify/assert" |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| func TestScheduler(t *testing.T) { |  | ||||||
| 	ctx := context.Background() |  | ||||||
| 
 |  | ||||||
| 	var buf int |  | ||||||
| 
 |  | ||||||
| 	s := NewSchedulerCycle(ctx, 1) |  | ||||||
| 	taskID := s.Delay(func(ctx context.Context) (any, error) { |  | ||||||
| 		time.Sleep(50 * time.Millisecond) |  | ||||||
| 		buf += 1 |  | ||||||
| 		return nil, nil |  | ||||||
| 	}, WithExecInterval(2*time.Millisecond)) |  | ||||||
| 
 |  | ||||||
| 	assert.NotEmpty(t, taskID) |  | ||||||
| 	assert.False(t, s.TasksDone()) |  | ||||||
| 
 |  | ||||||
| 	time.Sleep(2 * time.Millisecond) |  | ||||||
| 
 |  | ||||||
| 	details := s.GetTaskDetails(taskID) |  | ||||||
| 	assert.Equal(t, "running", details.State) |  | ||||||
| 	assert.LessOrEqual(t, details.ElapsedTime, 50*time.Millisecond) |  | ||||||
| 
 |  | ||||||
| 	time.Sleep(50 * time.Millisecond) |  | ||||||
| 
 |  | ||||||
| 	assert.True(t, s.TasksDone()) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func TestSchedulerLoad(t *testing.T) { |  | ||||||
| 	ctx := context.Background() |  | ||||||
| 
 |  | ||||||
| 	s := NewSchedulerCycle(ctx, 1) |  | ||||||
| 
 |  | ||||||
| 	for i := 0; i < 500; i++ { |  | ||||||
| 		s.Delay(func(ctx context.Context) (any, error) { |  | ||||||
| 			time.Sleep(1 * time.Millisecond) |  | ||||||
| 			return nil, nil |  | ||||||
| 		}, WithExecInterval(1*time.Millisecond)) |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	assert.Eventually(t, func() bool { |  | ||||||
| 		return s.TasksDone() |  | ||||||
| 	}, time.Second, 250*time.Millisecond) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func TestSchedulerExecInterval(t *testing.T) { |  | ||||||
| 	ctx := context.Background() |  | ||||||
| 
 |  | ||||||
| 	s := NewSchedulerCycle(ctx, 1) |  | ||||||
| 
 |  | ||||||
| 	s.Delay( |  | ||||||
| 		func(ctx context.Context) (any, error) { |  | ||||||
| 			return nil, ErrJobNotCompletedYet |  | ||||||
| 		}, |  | ||||||
| 		WithMaxDuration(50*time.Millisecond), |  | ||||||
| 		WithExecInterval(2*time.Millisecond), |  | ||||||
| 	) |  | ||||||
| 
 |  | ||||||
| 	time.Sleep(100 * time.Millisecond) |  | ||||||
| 
 |  | ||||||
| 	assert.True(t, s.TasksDone()) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func TestSchedulerContextDone(t *testing.T) { |  | ||||||
| 	ctx, fnCancel := context.WithCancel(context.Background()) |  | ||||||
| 
 |  | ||||||
| 	s := NewSchedulerCycle(ctx, 1) |  | ||||||
| 
 |  | ||||||
| 	for i := 0; i < 250; i++ { |  | ||||||
| 		s.Delay( |  | ||||||
| 			func(ctx context.Context) (any, error) { |  | ||||||
| 				return nil, ErrJobNotCompletedYet |  | ||||||
| 			}, |  | ||||||
| 			WithMaxDuration(100*time.Millisecond), |  | ||||||
| 			WithExecInterval(2*time.Millisecond), |  | ||||||
| 		) |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	go func() { |  | ||||||
| 		time.Sleep(50 * time.Millisecond) |  | ||||||
| 		fnCancel() |  | ||||||
| 	}() |  | ||||||
| 
 |  | ||||||
| 	<-s.Done() |  | ||||||
| } |  | ||||||
							
								
								
									
										451
									
								
								task.go
									
									
									
									
									
								
							
							
						
						
									
										451
									
								
								task.go
									
									
									
									
									
								
							| @ -1,451 +0,0 @@ | |||||||
| package scheduler |  | ||||||
| 
 |  | ||||||
| import ( |  | ||||||
| 	"context" |  | ||||||
| 	"errors" |  | ||||||
| 	"sync" |  | ||||||
| 	"time" |  | ||||||
| 
 |  | ||||||
| 	"github.com/google/uuid" |  | ||||||
| 
 |  | ||||||
| 	"github.com/rs/zerolog/log" |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| type State int |  | ||||||
| 
 |  | ||||||
| const ( |  | ||||||
| 	Pending State = iota |  | ||||||
| 	Running |  | ||||||
| 	Success |  | ||||||
| 	Failed |  | ||||||
| 	Abort |  | ||||||
| 	Unknown |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| const UnknownState = "unknown" |  | ||||||
| 
 |  | ||||||
| func (s State) String() string { |  | ||||||
| 	return [...]string{"pending", "running", "success", "failed", "abort", "unknown"}[s] |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| var ( |  | ||||||
| 	ErrJobAborted         = errors.New("job has been aborted") |  | ||||||
| 	ErrJobNotCompletedYet = errors.New("job is not right state, retrying") |  | ||||||
| 	ErrTimeExceeded       = errors.New("time exceeded") |  | ||||||
| 	ErrExecutionEngine    = errors.New("execution engine") |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| type FnJob func(ctx context.Context) (any, error) |  | ||||||
| 
 |  | ||||||
| type FnResult func(ctx context.Context, res any) |  | ||||||
| type FnError func(ctx context.Context, err error) |  | ||||||
| 
 |  | ||||||
| type TaskDetails struct { |  | ||||||
| 	State          string            `json:"state"` |  | ||||||
| 	CreatedAt      time.Time         `json:"createdAt"` |  | ||||||
| 	UpdatedAt      *time.Time        `json:"updatedAt,omitempty"` |  | ||||||
| 	MaxDuration    *time.Duration    `json:"maxDuration,omitempty"` |  | ||||||
| 	Attempts       uint32            `json:"attempts"` |  | ||||||
| 	Err            string            `json:"error"` |  | ||||||
| 	ElapsedTime    time.Duration     `json:"elapsedTime"` |  | ||||||
| 	AdditionalInfo map[string]string `json:"additionalInfos"` |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (td *TaskDetails) log() { |  | ||||||
| 	args := []any{} |  | ||||||
| 	args = append(args, |  | ||||||
| 		"state", td.State, |  | ||||||
| 		"createdAt", td.CreatedAt.Format("2006-01-02T15:04:05Z"), |  | ||||||
| 		"updatedAt", td.UpdatedAt.Format("2006-01-02T15:04:05Z"), |  | ||||||
| 		"elapsedTime", td.ElapsedTime.String(), |  | ||||||
| 		"attempts", td.Attempts, |  | ||||||
| 	) |  | ||||||
| 
 |  | ||||||
| 	if td.AdditionalInfo != nil { |  | ||||||
| 		for k, v := range td.AdditionalInfo { |  | ||||||
| 			args = append(args, k, v) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	if td.Err != "" { |  | ||||||
| 		args = append(args, "error", td.Err) |  | ||||||
| 		log.Error().Any("args", args).Msg("task failed") |  | ||||||
| 		return |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	log.Info().Any("args", args).Msg("task execution done") |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| type task struct { |  | ||||||
| 	l sync.RWMutex |  | ||||||
| 
 |  | ||||||
| 	id        uuid.UUID |  | ||||||
| 	createdAt time.Time |  | ||||||
| 	updatedAt *time.Time |  | ||||||
| 	state     State |  | ||||||
| 
 |  | ||||||
| 	fnJob     FnJob |  | ||||||
| 	fnSuccess FnResult |  | ||||||
| 	fnError   FnError |  | ||||||
| 
 |  | ||||||
| 	attempts     uint32 |  | ||||||
| 	timer        *time.Timer |  | ||||||
| 	maxDuration  *time.Duration |  | ||||||
| 	execTimeout  *time.Duration |  | ||||||
| 	execInterval *time.Duration |  | ||||||
| 
 |  | ||||||
| 	res any |  | ||||||
| 	err error |  | ||||||
| 
 |  | ||||||
| 	additionalInfos map[string]string |  | ||||||
| 
 |  | ||||||
| 	chAbort chan struct{} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| type TaskOption func(t *task) |  | ||||||
| 
 |  | ||||||
| func WithMaxDuration(d time.Duration) TaskOption { |  | ||||||
| 	return func(t *task) { |  | ||||||
| 		t.maxDuration = &d |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func WithFnSuccess(f FnResult) TaskOption { |  | ||||||
| 	return func(t *task) { |  | ||||||
| 		t.fnSuccess = f |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func WithFnError(f FnError) TaskOption { |  | ||||||
| 	return func(t *task) { |  | ||||||
| 		t.fnError = f |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func WithExecTimeout(d time.Duration) TaskOption { |  | ||||||
| 	return func(t *task) { |  | ||||||
| 		t.execTimeout = &d |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func WithExecInterval(d time.Duration) TaskOption { |  | ||||||
| 	return func(t *task) { |  | ||||||
| 		t.execInterval = &d |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func WithAdditionalInfos(k, v string) TaskOption { |  | ||||||
| 	return func(t *task) { |  | ||||||
| 		if k == "" || v == "" { |  | ||||||
| 			return |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		if t.additionalInfos == nil { |  | ||||||
| 			t.additionalInfos = map[string]string{} |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		t.additionalInfos[k] = v |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // NewTask builds a task. |  | ||||||
| // Here the options details that can be set to the task: |  | ||||||
| //   - WithMaxDuration(time.Duration): the task will stop executing if the duration is exceeded (raise ErrTimeExceeded) |  | ||||||
| //   - WithFnSuccess(FnResult): call a function after a success execution |  | ||||||
| //   - WithFnError(FnError): call a function if an error occurred |  | ||||||
| //   - WithExecTimeout(time.Duration): sets a timeout for the task execution |  | ||||||
| //   - WithAdditionalInfos(k, v string): key-value additional informations (does not inferred with the task execution, log purpose) |  | ||||||
| // |  | ||||||
| // Scheduler options: |  | ||||||
| //   - WithExecInterval(time.Duration): specify the execution interval |  | ||||||
| func NewTask(f FnJob, opts ...TaskOption) *task { |  | ||||||
| 	t := task{ |  | ||||||
| 		id:        uuid.New(), |  | ||||||
| 		createdAt: time.Now().UTC(), |  | ||||||
| 		state:     Pending, |  | ||||||
| 		fnJob:     f, |  | ||||||
| 		chAbort:   make(chan struct{}, 1), |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	for _, o := range opts { |  | ||||||
| 		o(&t) |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return &t |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (t *task) setState(s State) { |  | ||||||
| 	t.l.Lock() |  | ||||||
| 	defer t.l.Unlock() |  | ||||||
| 
 |  | ||||||
| 	now := time.Now().UTC() |  | ||||||
| 	t.updatedAt = &now |  | ||||||
| 
 |  | ||||||
| 	t.state = s |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (t *task) setSuccess(ctx context.Context, res any) { |  | ||||||
| 	t.l.Lock() |  | ||||||
| 	defer t.l.Unlock() |  | ||||||
| 
 |  | ||||||
| 	now := time.Now().UTC() |  | ||||||
| 	t.updatedAt = &now |  | ||||||
| 
 |  | ||||||
| 	t.state = Success |  | ||||||
| 	t.res = res |  | ||||||
| 
 |  | ||||||
| 	if t.fnSuccess != nil { |  | ||||||
| 		t.fnSuccess(ctx, res) |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (t *task) setFail(ctx context.Context, err error) { |  | ||||||
| 	t.l.Lock() |  | ||||||
| 	defer t.l.Unlock() |  | ||||||
| 
 |  | ||||||
| 	now := time.Now().UTC() |  | ||||||
| 	t.updatedAt = &now |  | ||||||
| 
 |  | ||||||
| 	if t.state != Abort { |  | ||||||
| 		t.state = Failed |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	t.err = err |  | ||||||
| 
 |  | ||||||
| 	if t.fnError != nil { |  | ||||||
| 		t.fnError(ctx, err) |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (t *task) setTimer(tm *time.Timer) { |  | ||||||
| 	t.l.Lock() |  | ||||||
| 	defer t.l.Unlock() |  | ||||||
| 
 |  | ||||||
| 	t.timer = tm |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (t *task) stopTimer() { |  | ||||||
| 	t.l.Lock() |  | ||||||
| 	defer t.l.Unlock() |  | ||||||
| 
 |  | ||||||
| 	if t.timer != nil { |  | ||||||
| 		t.timer.Stop() |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (t *task) incr() { |  | ||||||
| 	t.l.Lock() |  | ||||||
| 	defer t.l.Unlock() |  | ||||||
| 
 |  | ||||||
| 	t.attempts += 1 |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (t *task) log() { |  | ||||||
| 	td := t.IntoDetails() |  | ||||||
| 	td.log() |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (t *task) GetID() uuid.UUID { |  | ||||||
| 	t.l.RLock() |  | ||||||
| 	defer t.l.RUnlock() |  | ||||||
| 
 |  | ||||||
| 	return t.id |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (t *task) GetAttempts() uint32 { |  | ||||||
| 	t.l.RLock() |  | ||||||
| 	defer t.l.RUnlock() |  | ||||||
| 
 |  | ||||||
| 	return t.attempts |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (t *task) GetState() State { |  | ||||||
| 	t.l.RLock() |  | ||||||
| 	defer t.l.RUnlock() |  | ||||||
| 
 |  | ||||||
| 	return t.state |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // TimeExceeded checks if the task does not reached its max duration execution (maxDuration). |  | ||||||
| func (t *task) TimeExceeded() bool { |  | ||||||
| 	t.l.RLock() |  | ||||||
| 	defer t.l.RUnlock() |  | ||||||
| 
 |  | ||||||
| 	if md := t.maxDuration; md != nil { |  | ||||||
| 		return time.Since(t.createdAt) >= *md |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return false |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (t *task) Abort() { |  | ||||||
| 	t.stopTimer() |  | ||||||
| 	t.chAbort <- struct{}{} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (t *task) Run(ctx context.Context) { |  | ||||||
| 	if t.TimeExceeded() { |  | ||||||
| 		t.setFail(ctx, ErrTimeExceeded) |  | ||||||
| 		return |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	if s := t.GetState(); s != Pending { |  | ||||||
| 		log.Error().Msg("unable to launch a task that not in pending state") |  | ||||||
| 		t.setFail(ctx, ErrExecutionEngine) |  | ||||||
| 		return |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	var ctxExec context.Context |  | ||||||
| 	var fnCancel context.CancelFunc |  | ||||||
| 	if t.execTimeout != nil { |  | ||||||
| 		ctxExec, fnCancel = context.WithTimeout(ctx, *t.execTimeout) |  | ||||||
| 	} else { |  | ||||||
| 		ctxExec, fnCancel = context.WithCancel(ctx) |  | ||||||
| 	} |  | ||||||
| 	defer fnCancel() |  | ||||||
| 
 |  | ||||||
| 	t.incr() |  | ||||||
| 	t.setState(Running) |  | ||||||
| 
 |  | ||||||
| 	log.Info().Str("id", t.GetID().String()).Msg("task is running...") |  | ||||||
| 
 |  | ||||||
| 	go func() { |  | ||||||
| 		for range t.chAbort { |  | ||||||
| 			t.setState(Abort) |  | ||||||
| 			fnCancel() |  | ||||||
| 			return |  | ||||||
| 		} |  | ||||||
| 	}() |  | ||||||
| 
 |  | ||||||
| 	defer t.log() |  | ||||||
| 	res, err := t.fnJob(ctxExec) |  | ||||||
| 	if err != nil { |  | ||||||
| 		if errors.Is(err, ErrJobNotCompletedYet) { |  | ||||||
| 			t.setState(Pending) |  | ||||||
| 			return |  | ||||||
| 		} |  | ||||||
| 		t.setFail(ctx, err) |  | ||||||
| 		return |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	t.setSuccess(ctx, res) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (t *task) IntoDetails() TaskDetails { |  | ||||||
| 	t.l.RLock() |  | ||||||
| 	defer t.l.RUnlock() |  | ||||||
| 
 |  | ||||||
| 	td := TaskDetails{ |  | ||||||
| 		CreatedAt:   t.createdAt, |  | ||||||
| 		UpdatedAt:   t.updatedAt, |  | ||||||
| 		State:       t.state.String(), |  | ||||||
| 		Attempts:    t.attempts, |  | ||||||
| 		MaxDuration: t.maxDuration, |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	if t.state == Pending || t.state == Running { |  | ||||||
| 		td.ElapsedTime = time.Since(t.createdAt) |  | ||||||
| 	} else { |  | ||||||
| 		td.ElapsedTime = t.updatedAt.Sub(t.createdAt) |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	if err := t.err; err != nil { |  | ||||||
| 		td.Err = err.Error() |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	if t.additionalInfos != nil { |  | ||||||
| 		td.AdditionalInfo = t.additionalInfos |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return td |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| type tasks struct { |  | ||||||
| 	l sync.RWMutex |  | ||||||
| 	s map[uuid.UUID]*task |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func newTasks() tasks { |  | ||||||
| 	return tasks{ |  | ||||||
| 		s: make(map[uuid.UUID]*task), |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (ts *tasks) add(t *task) { |  | ||||||
| 	ts.l.Lock() |  | ||||||
| 	defer ts.l.Unlock() |  | ||||||
| 
 |  | ||||||
| 	ts.s[t.GetID()] = t |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (ts *tasks) delete(t *task) { |  | ||||||
| 	ts.l.Lock() |  | ||||||
| 	defer ts.l.Unlock() |  | ||||||
| 
 |  | ||||||
| 	delete(ts.s, t.GetID()) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (ts *tasks) get(id uuid.UUID) *task { |  | ||||||
| 	ts.l.RLock() |  | ||||||
| 	defer ts.l.RUnlock() |  | ||||||
| 
 |  | ||||||
| 	t, ok := ts.s[id] |  | ||||||
| 	if !ok { |  | ||||||
| 		return nil |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return t |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (ts *tasks) len() int { |  | ||||||
| 	ts.l.RLock() |  | ||||||
| 	defer ts.l.RUnlock() |  | ||||||
| 
 |  | ||||||
| 	return len(ts.s) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (ts *tasks) completed() bool { |  | ||||||
| 	ts.l.RLock() |  | ||||||
| 	defer ts.l.RUnlock() |  | ||||||
| 
 |  | ||||||
| 	for _, t := range ts.s { |  | ||||||
| 		if t.GetState() == Pending || t.GetState() == Running { |  | ||||||
| 			return false |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return true |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (ts *tasks) getAllDetails() []TaskDetails { |  | ||||||
| 	ts.l.RLock() |  | ||||||
| 	defer ts.l.RUnlock() |  | ||||||
| 
 |  | ||||||
| 	details := []TaskDetails{} |  | ||||||
| 	for _, t := range ts.s { |  | ||||||
| 		details = append(details, t.IntoDetails()) |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return details |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (ts *tasks) getDetails(id uuid.UUID) TaskDetails { |  | ||||||
| 	ts.l.RLock() |  | ||||||
| 	defer ts.l.RUnlock() |  | ||||||
| 
 |  | ||||||
| 	t, ok := ts.s[id] |  | ||||||
| 	if !ok { |  | ||||||
| 		return TaskDetails{State: UnknownState} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return t.IntoDetails() |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (ts *tasks) abort() { |  | ||||||
| 	ts.l.RLock() |  | ||||||
| 	defer ts.l.RUnlock() |  | ||||||
| 
 |  | ||||||
| 	for _, t := range ts.s { |  | ||||||
| 		t.Abort() |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
							
								
								
									
										243
									
								
								task_test.go
									
									
									
									
									
								
							
							
						
						
									
										243
									
								
								task_test.go
									
									
									
									
									
								
							| @ -1,243 +0,0 @@ | |||||||
| package scheduler |  | ||||||
| 
 |  | ||||||
| import ( |  | ||||||
| 	"context" |  | ||||||
| 	"errors" |  | ||||||
| 	"testing" |  | ||||||
| 	"time" |  | ||||||
| 
 |  | ||||||
| 	"github.com/google/uuid" |  | ||||||
| 	"github.com/stretchr/testify/assert" |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| func TestTask(t *testing.T) { |  | ||||||
| 	ctx := context.Background() |  | ||||||
| 	var i int |  | ||||||
| 
 |  | ||||||
| 	task := NewTask( |  | ||||||
| 		func(ctx context.Context) (any, error) { |  | ||||||
| 			i += 1 |  | ||||||
| 			return nil, nil |  | ||||||
| 		}, |  | ||||||
| 	) |  | ||||||
| 
 |  | ||||||
| 	task.Run(ctx) |  | ||||||
| 
 |  | ||||||
| 	assert.Equal(t, nil, task.res) |  | ||||||
| 	assert.NotEmpty(t, task.updatedAt) |  | ||||||
| 	assert.Equal(t, 1, i) |  | ||||||
| 	assert.Equal(t, 1, int(task.GetAttempts())) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func TestAbortTask(t *testing.T) { |  | ||||||
| 	ctx := context.Background() |  | ||||||
| 
 |  | ||||||
| 	task := NewTask( |  | ||||||
| 		func(ctx context.Context) (any, error) { |  | ||||||
| 			<-ctx.Done() |  | ||||||
| 			return nil, ctx.Err() |  | ||||||
| 		}, |  | ||||||
| 	) |  | ||||||
| 
 |  | ||||||
| 	timer := time.NewTicker(200 * time.Millisecond) |  | ||||||
| 	go func() { |  | ||||||
| 		<-timer.C |  | ||||||
| 		task.Abort() |  | ||||||
| 	}() |  | ||||||
| 	task.Run(ctx) |  | ||||||
| 
 |  | ||||||
| 	assert.Equal(t, Abort, task.GetState()) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func TestTaskContextDone(t *testing.T) { |  | ||||||
| 	ctx, fnCancel := context.WithCancel(context.Background()) |  | ||||||
| 
 |  | ||||||
| 	task := NewTask( |  | ||||||
| 		func(ctx context.Context) (any, error) { |  | ||||||
| 			<-ctx.Done() |  | ||||||
| 			return nil, ctx.Err() |  | ||||||
| 		}, |  | ||||||
| 	) |  | ||||||
| 
 |  | ||||||
| 	timer := time.NewTicker(200 * time.Millisecond) |  | ||||||
| 	go func() { |  | ||||||
| 		<-timer.C |  | ||||||
| 		fnCancel() |  | ||||||
| 	}() |  | ||||||
| 	task.Run(ctx) |  | ||||||
| 
 |  | ||||||
| 	assert.Equal(t, Failed, task.GetState()) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func TestTaskFnSuccess(t *testing.T) { |  | ||||||
| 	ctx := context.Background() |  | ||||||
| 	var result int |  | ||||||
| 
 |  | ||||||
| 	task := NewTask( |  | ||||||
| 		func(ctx context.Context) (any, error) { |  | ||||||
| 			return 3, nil |  | ||||||
| 		}, |  | ||||||
| 		WithFnSuccess(func(ctx context.Context, res any) { |  | ||||||
| 			t, ok := res.(int) |  | ||||||
| 			if !ok { |  | ||||||
| 				return |  | ||||||
| 			} |  | ||||||
| 
 |  | ||||||
| 			result = t * 2 |  | ||||||
| 		}), |  | ||||||
| 	) |  | ||||||
| 
 |  | ||||||
| 	task.Run(ctx) |  | ||||||
| 
 |  | ||||||
| 	assert.Equal(t, Success, task.GetState()) |  | ||||||
| 	assert.Equal(t, 6, result) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func TestTaskFnError(t *testing.T) { |  | ||||||
| 	ctx := context.Background() |  | ||||||
| 	var result error |  | ||||||
| 
 |  | ||||||
| 	task := NewTask( |  | ||||||
| 		func(ctx context.Context) (any, error) { |  | ||||||
| 			return 3, errors.New("error occurred...") |  | ||||||
| 		}, |  | ||||||
| 		WithFnError(func(ctx context.Context, err error) { |  | ||||||
| 			result = err |  | ||||||
| 		}), |  | ||||||
| 	) |  | ||||||
| 
 |  | ||||||
| 	task.Run(ctx) |  | ||||||
| 
 |  | ||||||
| 	assert.Equal(t, Failed, task.GetState()) |  | ||||||
| 	assert.Equal(t, "error occurred...", result.Error()) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func TestTaskWithErrJobNotCompletedYet(t *testing.T) { |  | ||||||
| 	ctx := context.Background() |  | ||||||
| 	var attempts int |  | ||||||
| 
 |  | ||||||
| 	task := NewTask( |  | ||||||
| 		func(ctx context.Context) (any, error) { |  | ||||||
| 			if attempts < 2 { |  | ||||||
| 				attempts += 1 |  | ||||||
| 				return nil, ErrJobNotCompletedYet |  | ||||||
| 			} |  | ||||||
| 			return "ok", nil |  | ||||||
| 		}, |  | ||||||
| 	) |  | ||||||
| 
 |  | ||||||
| 	for i := 0; i < 2; i++ { |  | ||||||
| 		task.Run(ctx) |  | ||||||
| 		assert.Equal(t, Pending, task.GetState()) |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	task.Run(ctx) |  | ||||||
| 	assert.Equal(t, Success, task.GetState()) |  | ||||||
| 	assert.Equal(t, 3, int(task.GetAttempts())) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func TestTaskTimeExceeded(t *testing.T) { |  | ||||||
| 	ctx := context.Background() |  | ||||||
| 
 |  | ||||||
| 	task := NewTask( |  | ||||||
| 		func(ctx context.Context) (any, error) { |  | ||||||
| 			return "ko", nil |  | ||||||
| 		}, |  | ||||||
| 		WithMaxDuration(5*time.Millisecond), |  | ||||||
| 	) |  | ||||||
| 
 |  | ||||||
| 	time.Sleep(10 * time.Millisecond) |  | ||||||
| 
 |  | ||||||
| 	task.Run(ctx) |  | ||||||
| 	assert.Equal(t, Failed, task.GetState()) |  | ||||||
| 	assert.Equal(t, 0, int(task.GetAttempts())) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func TestTaskExecTimeout(t *testing.T) { |  | ||||||
| 	ctx := context.Background() |  | ||||||
| 
 |  | ||||||
| 	task := NewTask( |  | ||||||
| 		func(ctx context.Context) (any, error) { |  | ||||||
| 			<-ctx.Done() |  | ||||||
| 			return nil, ctx.Err() |  | ||||||
| 		}, |  | ||||||
| 		WithExecTimeout(5*time.Millisecond), |  | ||||||
| 	) |  | ||||||
| 
 |  | ||||||
| 	time.Sleep(10 * time.Millisecond) |  | ||||||
| 
 |  | ||||||
| 	task.Run(ctx) |  | ||||||
| 	assert.Equal(t, Failed, task.GetState()) |  | ||||||
| 	assert.Equal(t, 1, int(task.GetAttempts())) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func TestTaskDetails(t *testing.T) { |  | ||||||
| 	ctx := context.Background() |  | ||||||
| 
 |  | ||||||
| 	task := NewTask( |  | ||||||
| 		func(ctx context.Context) (any, error) { |  | ||||||
| 			return "coucou", nil |  | ||||||
| 		}, |  | ||||||
| 	) |  | ||||||
| 
 |  | ||||||
| 	details := task.IntoDetails() |  | ||||||
| 
 |  | ||||||
| 	assert.Equal(t, 0, int(details.Attempts)) |  | ||||||
| 	assert.Equal(t, "pending", details.State) |  | ||||||
| 	assert.False(t, details.CreatedAt.IsZero()) |  | ||||||
| 	assert.Empty(t, details.UpdatedAt) |  | ||||||
| 	assert.Nil(t, details.MaxDuration) |  | ||||||
| 	assert.Empty(t, details.Err) |  | ||||||
| 	assert.NotEmpty(t, details.ElapsedTime) |  | ||||||
| 
 |  | ||||||
| 	task.Run(ctx) |  | ||||||
| 
 |  | ||||||
| 	details = task.IntoDetails() |  | ||||||
| 
 |  | ||||||
| 	assert.Equal(t, 1, int(details.Attempts)) |  | ||||||
| 	assert.Equal(t, "success", details.State) |  | ||||||
| 	assert.False(t, details.CreatedAt.IsZero()) |  | ||||||
| 	assert.NotEmpty(t, details.UpdatedAt) |  | ||||||
| 	assert.Nil(t, details.MaxDuration) |  | ||||||
| 	assert.Empty(t, details.Err) |  | ||||||
| 	assert.NotEmpty(t, details.ElapsedTime) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func TestTaskAdditionalInfos(t *testing.T) { |  | ||||||
| 	t.Run("with key value", func(t *testing.T) { |  | ||||||
| 		elementID := uuid.NewString() |  | ||||||
| 		task := NewTask( |  | ||||||
| 			func(ctx context.Context) (any, error) { |  | ||||||
| 				return "yo", nil |  | ||||||
| 			}, |  | ||||||
| 			WithAdditionalInfos("transportId", elementID), |  | ||||||
| 			WithAdditionalInfos("element", "transport"), |  | ||||||
| 		) |  | ||||||
| 
 |  | ||||||
| 		assert.Equal(t, elementID, task.additionalInfos["transportId"]) |  | ||||||
| 		assert.Equal(t, "transport", task.additionalInfos["element"]) |  | ||||||
| 	}) |  | ||||||
| 
 |  | ||||||
| 	t.Run("with empty key", func(t *testing.T) { |  | ||||||
| 		elementID := uuid.NewString() |  | ||||||
| 		task := NewTask( |  | ||||||
| 			func(ctx context.Context) (any, error) { |  | ||||||
| 				return "hello", nil |  | ||||||
| 			}, |  | ||||||
| 			WithAdditionalInfos("", elementID), |  | ||||||
| 			WithAdditionalInfos("element", "transport"), |  | ||||||
| 		) |  | ||||||
| 
 |  | ||||||
| 		assert.Equal(t, "transport", task.additionalInfos["element"]) |  | ||||||
| 	}) |  | ||||||
| 
 |  | ||||||
| 	t.Run("with empty infos", func(t *testing.T) { |  | ||||||
| 		task := NewTask( |  | ||||||
| 			func(ctx context.Context) (any, error) { |  | ||||||
| 				return "hey", nil |  | ||||||
| 			}, |  | ||||||
| 		) |  | ||||||
| 
 |  | ||||||
| 		assert.Nil(t, task.additionalInfos) |  | ||||||
| 	}) |  | ||||||
| } |  | ||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user