Compare commits
	
		
			13 Commits
		
	
	
		
			feat/slots
			...
			main
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|   | d5519cc064 | ||
|   | f2a0830ca1 | ||
|   | 63210639ea | ||
|   | 4a39ba5547 | ||
|   | 2a0ecf2d4e | ||
|   | 3f1afb63d4 | ||
|   | df9a0ffbbc | ||
|   | ec53ec990a | ||
|   | 152c4f925a | ||
|   | 3cbea438f6 | ||
|   | b6df47ed0c | ||
|   | 0cd4f264a3 | ||
|   | 4da553d156 | 
							
								
								
									
										3
									
								
								Makefile
									
									
									
									
									
								
							
							
						
						
									
										3
									
								
								Makefile
									
									
									
									
									
								
							| @ -1,6 +1,3 @@ | |||||||
| run: lint |  | ||||||
| 	go run main.go |  | ||||||
| 
 |  | ||||||
| lint: | lint: | ||||||
| 	golangci-lint run ./... | 	golangci-lint run ./... | ||||||
| 
 | 
 | ||||||
|  | |||||||
							
								
								
									
										81
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										81
									
								
								README.md
									
									
									
									
									
								
							| @ -1,48 +1,47 @@ | |||||||
| # cycle-scheduler | # cycle-scheduler | ||||||
| 
 | 
 | ||||||
| cycle-scheduler is a simple scheduler handling jobs and executes them at regular interval. | cycle-scheduler is a simple scheduler lib, handling tasks and executes them at regular interval. If a task is not in desired state, the task is re-scheduled. | ||||||
| 
 | 
 | ||||||
| Here a simple representation: | **NOTE**: this should be not used for long-running tasks, it's more suitable for shorts tasks like polling etc... | ||||||
| ```ascii |  | ||||||
| +------------------------------------------------------+ |  | ||||||
| | +---+ +---+ +---+ +---+ +---+                  +---+ | |  | ||||||
| | |   | |   | |   | |   | |   |                  |   | | |  | ||||||
| | |   | |   | |   | |   | |   |                  |   | | |  | ||||||
| | |   | |   | |   | |   | |   |                  |   | | |  | ||||||
| | |   | |   | |   | |   | |   |                  |   | | |  | ||||||
| | |   | |   | |   | |   | |   |                  |   | | |  | ||||||
| | |   | |   | |   | |   | |   |                  |   | | |  | ||||||
| | |s1 | |s2 | |s3 | |s4 | |   |                  |s60| | |  | ||||||
| | +---+ +---+ +---+ +---+ +---+                  +---+ | |  | ||||||
| +---------------^--------------------------------------+ |  | ||||||
| ``` |  | ||||||
| Jobs are handle in a array of job slices. |  | ||||||
| 
 | 
 | ||||||
| At each interval (clock), the cursor `^` moves to the next slot (s*). | ## Examples | ||||||
| If there are jobs, they are sent to workers to be executed | * Init a new scheduler with 4 workers | ||||||
| and the slot is cleaned. |  | ||||||
| At the end of the slot (s60), the cursor re-starts a new cycle from s1. |  | ||||||
| 
 |  | ||||||
| If a job is not in a desire state, the job is re-scheduled in the current slot to be re-executed in the next cycle. |  | ||||||
| 
 |  | ||||||
| **NOTE**: This scheduler does not accept long running tasks. Job execution have a fixed timeout of 10s. |  | ||||||
| Pooling tasks are more suitable for this kind of scheduler. |  | ||||||
| 
 |  | ||||||
| ## Run |  | ||||||
| You can run sample tests from `main.go` to see the scheduler in action: |  | ||||||
| ```bash |  | ||||||
| make run |  | ||||||
| ``` |  | ||||||
| If all goes well, you should see this kind of output in the stdout: |  | ||||||
| ```ascii |  | ||||||
| # cycle-scheduler (slot: 7) |  | ||||||
| _ P _ _ _ _ _ _ _ _ _ _ _ _ |  | ||||||
| - - - - - - ^ - - - - - - -  |  | ||||||
| ``` |  | ||||||
| > **P** means *pending* state |  | ||||||
| 
 |  | ||||||
| You can adjust the clock interval as needed in `main.go`: |  | ||||||
| ```go | ```go | ||||||
| interval := 200 * time.Millisecond | package main | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"time" | ||||||
|  | 
 | ||||||
|  | 	scheduler "gitea.thegux.fr/rmanach/cycle-scheduler.git" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | func main() { | ||||||
|  | 	ctx, fnCancel := context.WithCancel(context.Background()) | ||||||
|  | 	s := scheduler.NewSchedulerCycle(ctx, 4) | ||||||
|  | 
 | ||||||
|  | 	// add a task with an execution interval of 2 ms (executed every 2 ms) | ||||||
|  | 	// and a maximum duration of 30 second. | ||||||
|  | 	s.Delay( | ||||||
|  | 		func(ctx context.Context) (any, error) { | ||||||
|  | 			// ... | ||||||
|  | 			return nil, nil | ||||||
|  | 		}, | ||||||
|  | 		scheduler.WithExecInterval(2*time.Millisecond), | ||||||
|  | 		scheduler.WithMaxDuration(30*time.Second), | ||||||
|  | 	) | ||||||
|  | 
 | ||||||
|  | 	// stop the program after 5 seconds | ||||||
|  | 	go func() { | ||||||
|  | 		time.Sleep(5 * time.Second) | ||||||
|  | 		fnCancel() | ||||||
|  | 	}() | ||||||
|  | 
 | ||||||
|  | 	<-ctx.Done() | ||||||
|  | 	<-s.Done() | ||||||
|  | } | ||||||
| ``` | ``` | ||||||
| 
 | 
 | ||||||
|  | **NOTE**: for `Delay` optionals arguments, check the `NewTask` method documentation for more details. | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | |||||||
							
								
								
									
										2
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.mod
									
									
									
									
									
								
							| @ -1,4 +1,4 @@ | |||||||
| module cycle-scheduler | module gitea.thegux.fr/rmanach/cycle-scheduler.git | ||||||
| 
 | 
 | ||||||
| go 1.22.4 | go 1.22.4 | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -1,166 +0,0 @@ | |||||||
| package job |  | ||||||
| 
 |  | ||||||
| import ( |  | ||||||
| 	"context" |  | ||||||
| 	"errors" |  | ||||||
| 	"sync" |  | ||||||
| 	"time" |  | ||||||
| 
 |  | ||||||
| 	"github.com/google/uuid" |  | ||||||
| 	"github.com/rs/zerolog/log" |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| type State int |  | ||||||
| 
 |  | ||||||
| const ( |  | ||||||
| 	Pending State = iota |  | ||||||
| 	Running |  | ||||||
| 	Success |  | ||||||
| 	Failed |  | ||||||
| 	Abort |  | ||||||
| 	Unknown |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| const UnknownState = "unknown" |  | ||||||
| 
 |  | ||||||
| var JobExecTimeout = 10 * time.Second |  | ||||||
| 
 |  | ||||||
| func (s State) String() string { |  | ||||||
| 	switch s { |  | ||||||
| 	case Pending: |  | ||||||
| 		return "pending" |  | ||||||
| 	case Running: |  | ||||||
| 		return "running" |  | ||||||
| 	case Success: |  | ||||||
| 		return "success" |  | ||||||
| 	case Failed: |  | ||||||
| 		return "failed" |  | ||||||
| 	case Abort: |  | ||||||
| 		return "abort" |  | ||||||
| 	case Unknown: |  | ||||||
| 		return UnknownState |  | ||||||
| 	default: |  | ||||||
| 		return UnknownState |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| var ( |  | ||||||
| 	ErrJobAborted         = errors.New("job has been aborted") |  | ||||||
| 	ErrJobNotCompletedYet = errors.New("job is not right state, retrying") |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| type FnJob func(ctx context.Context) error |  | ||||||
| 
 |  | ||||||
| type JobDetails struct { |  | ||||||
| 	ID        uuid.UUID  `json:"id"` |  | ||||||
| 	State     string     `json:"state"` |  | ||||||
| 	CreatedAt time.Time  `json:"createdAt"` |  | ||||||
| 	UpdatedAt *time.Time `json:"updatedAt,omitempty"` |  | ||||||
| 	Err       string     `json:"error"` |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // TODO(rmanach): add priority level |  | ||||||
| type Job struct { |  | ||||||
| 	l         sync.RWMutex |  | ||||||
| 	id        uuid.UUID |  | ||||||
| 	createdAt time.Time |  | ||||||
| 	updatedAt *time.Time |  | ||||||
| 	state     State |  | ||||||
| 	task      FnJob |  | ||||||
| 	err       error |  | ||||||
| 	chAbort   chan struct{} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func NewJob(task FnJob, row, col int) Job { |  | ||||||
| 	return Job{ |  | ||||||
| 		id:        uuid.New(), |  | ||||||
| 		createdAt: time.Now().UTC(), |  | ||||||
| 		state:     Pending, |  | ||||||
| 		task:      task, |  | ||||||
| 		chAbort:   make(chan struct{}, 1), |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (j *Job) IntoDetails() JobDetails { |  | ||||||
| 	j.l.RLock() |  | ||||||
| 	defer j.l.RUnlock() |  | ||||||
| 
 |  | ||||||
| 	jd := JobDetails{ |  | ||||||
| 		ID:        j.id, |  | ||||||
| 		CreatedAt: j.createdAt, |  | ||||||
| 		State:     j.state.String(), |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	if err := j.err; err != nil { |  | ||||||
| 		jd.Err = err.Error() |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	if ut := j.updatedAt; ut != nil { |  | ||||||
| 		jd.UpdatedAt = ut |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return jd |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (j *Job) GetID() uuid.UUID { |  | ||||||
| 	return j.id |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (j *Job) GetState() State { |  | ||||||
| 	j.l.RLock() |  | ||||||
| 	defer j.l.RUnlock() |  | ||||||
| 
 |  | ||||||
| 	return j.state |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (j *Job) setState(s State) { |  | ||||||
| 	j.l.Lock() |  | ||||||
| 	defer j.l.Unlock() |  | ||||||
| 
 |  | ||||||
| 	now := time.Now().UTC() |  | ||||||
| 	j.updatedAt = &now |  | ||||||
| 
 |  | ||||||
| 	j.state = s |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (j *Job) setFail(err error) { |  | ||||||
| 	j.l.Lock() |  | ||||||
| 	defer j.l.Unlock() |  | ||||||
| 
 |  | ||||||
| 	now := time.Now().UTC() |  | ||||||
| 	j.updatedAt = &now |  | ||||||
| 
 |  | ||||||
| 	j.state = Failed |  | ||||||
| 	j.err = err |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (j *Job) Abort() { |  | ||||||
| 	j.setState(Abort) |  | ||||||
| 	j.chAbort <- struct{}{} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (j *Job) Run(ctx context.Context) { |  | ||||||
| 	ctxExec, fnCancel := context.WithTimeout(ctx, JobExecTimeout) |  | ||||||
| 	defer fnCancel() |  | ||||||
| 
 |  | ||||||
| 	j.setState(Running) |  | ||||||
| 
 |  | ||||||
| 	log.Info().Str("job", j.GetID().String()).Msg("job running...") |  | ||||||
| 
 |  | ||||||
| 	go func() { |  | ||||||
| 		for range j.chAbort { |  | ||||||
| 			j.setState(Abort) |  | ||||||
| 			fnCancel() |  | ||||||
| 		} |  | ||||||
| 	}() |  | ||||||
| 
 |  | ||||||
| 	if err := j.task(ctxExec); err != nil { |  | ||||||
| 		if errors.Is(err, ErrJobNotCompletedYet) { |  | ||||||
| 			j.setState(Pending) |  | ||||||
| 			return |  | ||||||
| 		} |  | ||||||
| 		j.setFail(err) |  | ||||||
| 		return |  | ||||||
| 	} |  | ||||||
| 	j.setState(Success) |  | ||||||
| } |  | ||||||
| @ -1,371 +0,0 @@ | |||||||
| package scheduler |  | ||||||
| 
 |  | ||||||
| import ( |  | ||||||
| 	"context" |  | ||||||
| 	"cycle-scheduler/internal/job" |  | ||||||
| 	"fmt" |  | ||||||
| 	"strings" |  | ||||||
| 	"sync" |  | ||||||
| 	"time" |  | ||||||
| 
 |  | ||||||
| 	"github.com/google/uuid" |  | ||||||
| 	"github.com/rs/zerolog/log" |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| const ( |  | ||||||
| 	TableTitle  = "# cycle-scheduler" |  | ||||||
| 	Cursor      = "^" |  | ||||||
| 	CycleLength = 60 |  | ||||||
| 	MaxWorkers  = 5 |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| const MaxSlotsIdx = 59 |  | ||||||
| 
 |  | ||||||
| type JobSlot struct { |  | ||||||
| 	*job.Job |  | ||||||
| 	row int |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // SchedulerCycle is a dumb scheduler. |  | ||||||
| // It handle job and executes it at each cycle (60 * interval). |  | ||||||
| // |  | ||||||
| // Jobs are handle in a array of job slices. |  | ||||||
| // At each interval (clock), the cursor moves to the next slot (s*). |  | ||||||
| // If there are jobs, they are sent to workers to be executed |  | ||||||
| // and the slot is cleaned. |  | ||||||
| // |  | ||||||
| // At the end of the slot (s60), the cursor re-starts a cycle at s1. |  | ||||||
| type SchedulerCycle struct { |  | ||||||
| 	l  sync.RWMutex |  | ||||||
| 	wg sync.WaitGroup |  | ||||||
| 
 |  | ||||||
| 	ctx      context.Context |  | ||||||
| 	fnCancel context.CancelFunc |  | ||||||
| 
 |  | ||||||
| 	interval    time.Duration |  | ||||||
| 	currentSlot int |  | ||||||
| 	slots       [60][]*job.Job |  | ||||||
| 	jobs        map[uuid.UUID]*job.Job |  | ||||||
| 
 |  | ||||||
| 	chJobs chan *JobSlot |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func NewSchedulerCycle(ctx context.Context, interval time.Duration) *SchedulerCycle { |  | ||||||
| 	ctxChild, fnCancel := context.WithCancel(ctx) |  | ||||||
| 
 |  | ||||||
| 	c := SchedulerCycle{ |  | ||||||
| 		wg:          sync.WaitGroup{}, |  | ||||||
| 		ctx:         ctxChild, |  | ||||||
| 		fnCancel:    fnCancel, |  | ||||||
| 		interval:    interval, |  | ||||||
| 		currentSlot: 0, |  | ||||||
| 		slots:       [60][]*job.Job{}, |  | ||||||
| 		jobs:        make(map[uuid.UUID]*job.Job), |  | ||||||
| 		chJobs:      make(chan *JobSlot), |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	c.run() |  | ||||||
| 
 |  | ||||||
| 	return &c |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *SchedulerCycle) Stop() { |  | ||||||
| 	c.fnCancel() |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *SchedulerCycle) Done() <-chan struct{} { |  | ||||||
| 	done := make(chan struct{}) |  | ||||||
| 	go func() { |  | ||||||
| 		<-c.ctx.Done() |  | ||||||
| 		c.wg.Done() |  | ||||||
| 		done <- struct{}{} |  | ||||||
| 	}() |  | ||||||
| 	return done |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *SchedulerCycle) Len() int { |  | ||||||
| 	c.l.Lock() |  | ||||||
| 	defer c.l.Unlock() |  | ||||||
| 
 |  | ||||||
| 	return len(c.jobs) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *SchedulerCycle) HasAllJobsDone() bool { |  | ||||||
| 	c.l.Lock() |  | ||||||
| 	defer c.l.Unlock() |  | ||||||
| 
 |  | ||||||
| 	for _, j := range c.jobs { |  | ||||||
| 		if j.GetState() == job.Pending || j.GetState() == job.Running { |  | ||||||
| 			return false |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return true |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *SchedulerCycle) GetJobsDetails() []job.JobDetails { |  | ||||||
| 	c.l.Lock() |  | ||||||
| 	defer c.l.Unlock() |  | ||||||
| 
 |  | ||||||
| 	details := []job.JobDetails{} |  | ||||||
| 	for _, j := range c.jobs { |  | ||||||
| 		details = append(details, j.IntoDetails()) |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return details |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // Delay builds a job and add it to the scheduler engine. |  | ||||||
| func (c *SchedulerCycle) Delay(fnJob job.FnJob) uuid.UUID { |  | ||||||
| 	c.l.Lock() |  | ||||||
| 	defer c.l.Unlock() |  | ||||||
| 
 |  | ||||||
| 	nextSlot := c.currentSlot + 1 |  | ||||||
| 	if nextSlot > MaxSlotsIdx { |  | ||||||
| 		nextSlot = 0 |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	j := job.NewJob(fnJob, nextSlot, len(c.slots[nextSlot])) |  | ||||||
| 
 |  | ||||||
| 	c.slots[nextSlot] = append(c.slots[nextSlot], &j) |  | ||||||
| 	c.jobs[j.GetID()] = &j |  | ||||||
| 
 |  | ||||||
| 	log.Info().Str("job", j.GetID().String()).Msg("job added successfully") |  | ||||||
| 	return j.GetID() |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // Abort aborts the job given by its id if it exists.. |  | ||||||
| func (c *SchedulerCycle) Abort(id uuid.UUID) bool { |  | ||||||
| 	if j := c.getJob(id); j != nil { |  | ||||||
| 		j.Abort() |  | ||||||
| 
 |  | ||||||
| 		log.Info().Str("job", j.GetID().String()).Msg("abort job done") |  | ||||||
| 		return true |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return false |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // GetJobDetails returns the job details by . |  | ||||||
| func (c *SchedulerCycle) GetJobDetails(id uuid.UUID) job.JobDetails { |  | ||||||
| 	c.l.Lock() |  | ||||||
| 	defer c.l.Unlock() |  | ||||||
| 
 |  | ||||||
| 	j, ok := c.jobs[id] |  | ||||||
| 	if !ok { |  | ||||||
| 		return job.JobDetails{ |  | ||||||
| 			State: job.Unknown.String(), |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return j.IntoDetails() |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // Display outputs earch interval the scheduler state. |  | ||||||
| func (c *SchedulerCycle) Display() { |  | ||||||
| 	ticker := time.NewTicker(c.interval) |  | ||||||
| 	go func() { |  | ||||||
| 		for range ticker.C { |  | ||||||
| 			c.display() |  | ||||||
| 		} |  | ||||||
| 	}() |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // display writes to stdout the state of the scheduler as a table. |  | ||||||
| func (c *SchedulerCycle) display() { //nolint:gocyclo // not complex |  | ||||||
| 	c.l.RLock() |  | ||||||
| 	defer c.l.RUnlock() |  | ||||||
| 
 |  | ||||||
| 	var maxCols int |  | ||||||
| 	for i := range c.slots { |  | ||||||
| 		if l := len(c.slots[i]); l > maxCols { |  | ||||||
| 			maxCols = l |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	table := [][]string{} |  | ||||||
| 	title := fmt.Sprintf("%s (slot: %d)", TableTitle, c.currentSlot+1) |  | ||||||
| 	table = append(table, []string{title}) |  | ||||||
| 	for { |  | ||||||
| 		if maxCols == 0 { |  | ||||||
| 			break |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		row := make([]string, CycleLength) |  | ||||||
| 		for i := 0; i <= MaxSlotsIdx; i++ { |  | ||||||
| 			row[i] = "_" |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		for i := range c.slots { |  | ||||||
| 			if len(c.slots[i]) < maxCols { |  | ||||||
| 				continue |  | ||||||
| 			} |  | ||||||
| 
 |  | ||||||
| 			j := c.slots[i][maxCols-1] |  | ||||||
| 			switch j.GetState() { |  | ||||||
| 			case job.Pending: |  | ||||||
| 				row[i] = "P" |  | ||||||
| 			case job.Running: |  | ||||||
| 				row[i] = "R" |  | ||||||
| 			case job.Failed: |  | ||||||
| 				row[i] = "X" |  | ||||||
| 			case job.Abort: |  | ||||||
| 				row[i] = "A" |  | ||||||
| 			case job.Unknown: |  | ||||||
| 				row[i] = "?" |  | ||||||
| 			case job.Success: |  | ||||||
| 				row[i] = "O" |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		table = append(table, row) |  | ||||||
| 		maxCols-- |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	row := make([]string, CycleLength) |  | ||||||
| 	for i := 0; i <= MaxSlotsIdx; i++ { |  | ||||||
| 		row[i] = "-" |  | ||||||
| 	} |  | ||||||
| 	table = append(table, row) |  | ||||||
| 
 |  | ||||||
| 	if l := len(table); l > 0 { |  | ||||||
| 		table[l-1][c.currentSlot] = Cursor |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	tableFormat := "" |  | ||||||
| 	for _, r := range table { |  | ||||||
| 		tableFormat += strings.Join(r, " ") |  | ||||||
| 		tableFormat += "\n" |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	fmt.Println(tableFormat) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *SchedulerCycle) getJob(id uuid.UUID) *job.Job { |  | ||||||
| 	c.l.RLock() |  | ||||||
| 	defer c.l.RUnlock() |  | ||||||
| 
 |  | ||||||
| 	j, ok := c.jobs[id] |  | ||||||
| 	if !ok { |  | ||||||
| 		return nil |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return j |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // getCurrentSlotJobs collects all the current slot jobs |  | ||||||
| // and clean the slot. |  | ||||||
| func (c *SchedulerCycle) getCurrentSlotJobs() (int, []*job.Job) { |  | ||||||
| 	c.l.Lock() |  | ||||||
| 	defer c.l.Unlock() |  | ||||||
| 
 |  | ||||||
| 	jobs := c.slots[c.currentSlot] |  | ||||||
| 
 |  | ||||||
| 	c.slots[c.currentSlot] = []*job.Job{} |  | ||||||
| 
 |  | ||||||
| 	return c.currentSlot, jobs |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // updateSlot add a job to the slot where it was before. |  | ||||||
| func (c *SchedulerCycle) updateSlot(row int, j *job.Job) { |  | ||||||
| 	c.l.Lock() |  | ||||||
| 	defer c.l.Unlock() |  | ||||||
| 
 |  | ||||||
| 	c.slots[row] = append(c.slots[row], j) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // updateCurrentSlot add a job to the current slot. |  | ||||||
| func (c *SchedulerCycle) updateCurrentSlot(j *job.Job) { |  | ||||||
| 	c.l.Lock() |  | ||||||
| 	defer c.l.Unlock() |  | ||||||
| 
 |  | ||||||
| 	c.slots[c.currentSlot] = append(c.slots[c.currentSlot], j) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // incr increments the slot cursor. |  | ||||||
| // It the cursor reaches `MaxSlotsIdx`, it goes back to 0. |  | ||||||
| func (c *SchedulerCycle) incr() { |  | ||||||
| 	c.l.Lock() |  | ||||||
| 	defer c.l.Unlock() |  | ||||||
| 
 |  | ||||||
| 	nextSlot := c.currentSlot + 1 |  | ||||||
| 	if nextSlot > MaxSlotsIdx { |  | ||||||
| 		nextSlot = 0 |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	c.currentSlot = nextSlot |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // dispatch gets jobs from the current slot, resets the slot |  | ||||||
| // and dispatch all jobs to the workers. |  | ||||||
| // |  | ||||||
| // It all the workers are busy, the jobs are re-schedule in the same slot |  | ||||||
| // to be executed in the next cycle. |  | ||||||
| func (c *SchedulerCycle) dispatch() { |  | ||||||
| 	row, jobs := c.getCurrentSlotJobs() |  | ||||||
| 	for _, j := range jobs { |  | ||||||
| 		if j.GetState() == job.Abort { |  | ||||||
| 			continue |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		select { |  | ||||||
| 		case c.chJobs <- &JobSlot{row: row, Job: j}: |  | ||||||
| 		default: |  | ||||||
| 			log.Warn().Msg("unable to put job in workers, trying next cycle") |  | ||||||
| 			c.updateSlot(row, j) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // run launches the workers and the ticker. |  | ||||||
| func (c *SchedulerCycle) run() { |  | ||||||
| 	c.workers() |  | ||||||
| 	c.tick() |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // workers launches `MaxWorkers` number of worker to execute job. |  | ||||||
| // If job returns `ErrJobNotCompletedYet`, it re-schedules in the same slot. |  | ||||||
| func (c *SchedulerCycle) workers() { |  | ||||||
| 	for i := 0; i < MaxWorkers; i++ { |  | ||||||
| 		c.wg.Add(1) |  | ||||||
| 		go func() { |  | ||||||
| 			defer c.wg.Done() |  | ||||||
| 			for { |  | ||||||
| 				select { |  | ||||||
| 				case j := <-c.chJobs: |  | ||||||
| 					c.executeJob(j.Job, c.updateCurrentSlot) |  | ||||||
| 				case <-c.ctx.Done(): |  | ||||||
| 					log.Error().Msg("context done, worker is stopping...") |  | ||||||
| 					return |  | ||||||
| 				} |  | ||||||
| 			} |  | ||||||
| 		}() |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *SchedulerCycle) executeJob(j *job.Job, fnFallBack func(*job.Job)) { |  | ||||||
| 	j.Run(c.ctx) |  | ||||||
| 	if j.GetState() == job.Pending { |  | ||||||
| 		fnFallBack(j) |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // tick is a simple ticker incrementing at each scheduler interval, |  | ||||||
| // the slot cursor and dispatch jobs to the workers. |  | ||||||
| func (c *SchedulerCycle) tick() { |  | ||||||
| 	c.wg.Add(1) |  | ||||||
| 	go func() { |  | ||||||
| 		defer c.wg.Done() |  | ||||||
| 		for { |  | ||||||
| 			select { |  | ||||||
| 			case <-c.ctx.Done(): |  | ||||||
| 				log.Error().Msg("context done, ticker is stopping...") |  | ||||||
| 				return |  | ||||||
| 			default: |  | ||||||
| 				time.Sleep(c.interval) |  | ||||||
| 				c.incr() |  | ||||||
| 				c.dispatch() |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 	}() |  | ||||||
| } |  | ||||||
| @ -1,33 +0,0 @@ | |||||||
| package scheduler |  | ||||||
| 
 |  | ||||||
| import ( |  | ||||||
| 	"context" |  | ||||||
| 	"cycle-scheduler/internal/job" |  | ||||||
| 	"errors" |  | ||||||
| 	"testing" |  | ||||||
| 	"time" |  | ||||||
| 
 |  | ||||||
| 	"github.com/stretchr/testify/assert" |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| func TestSlot(t *testing.T) { |  | ||||||
| 	ctx, fnCancel := context.WithCancel(context.Background()) |  | ||||||
| 	defer fnCancel() |  | ||||||
| 
 |  | ||||||
| 	s := NewSchedulerCycle(ctx, 1*time.Millisecond) |  | ||||||
| 
 |  | ||||||
| 	s.Delay(func(ctx context.Context) error { |  | ||||||
| 		return nil |  | ||||||
| 	}) |  | ||||||
| 	s.Delay(func(ctx context.Context) error { |  | ||||||
| 		return job.ErrJobNotCompletedYet |  | ||||||
| 	}) |  | ||||||
| 	j3 := s.Delay(func(ctx context.Context) error { |  | ||||||
| 		return errors.New("errors") |  | ||||||
| 	}) |  | ||||||
| 
 |  | ||||||
| 	time.Sleep(2 * time.Millisecond) |  | ||||||
| 
 |  | ||||||
| 	assert.Equal(t, 3, s.Len()) |  | ||||||
| 	assert.Equal(t, job.Failed.String(), s.GetJobDetails(j3).State) |  | ||||||
| } |  | ||||||
							
								
								
									
										122
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										122
									
								
								main.go
									
									
									
									
									
								
							| @ -1,122 +0,0 @@ | |||||||
| package main |  | ||||||
| 
 |  | ||||||
| import ( |  | ||||||
| 	"context" |  | ||||||
| 	"cycle-scheduler/internal/job" |  | ||||||
| 	"cycle-scheduler/internal/scheduler" |  | ||||||
| 	"encoding/json" |  | ||||||
| 	"errors" |  | ||||||
| 	"fmt" |  | ||||||
| 	"math/rand/v2" |  | ||||||
| 	"os" |  | ||||||
| 	"os/signal" |  | ||||||
| 	"time" |  | ||||||
| 
 |  | ||||||
| 	"github.com/rs/zerolog" |  | ||||||
| 	"github.com/rs/zerolog/log" |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| func initLogger() { |  | ||||||
| 	zerolog.TimeFieldFormat = zerolog.TimeFormatUnix |  | ||||||
| 	log.Logger = log.With().Caller().Logger().Output(zerolog.ConsoleWriter{Out: os.Stderr}) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func main() { |  | ||||||
| 	initLogger() |  | ||||||
| 
 |  | ||||||
| 	ctx, stop := signal.NotifyContext( |  | ||||||
| 		context.Background(), |  | ||||||
| 		os.Interrupt, |  | ||||||
| 		os.Kill, |  | ||||||
| 	) |  | ||||||
| 	defer stop() |  | ||||||
| 
 |  | ||||||
| 	interval := 200 * time.Millisecond |  | ||||||
| 	s := scheduler.NewSchedulerCycle(ctx, interval) |  | ||||||
| 	s.Display() |  | ||||||
| 
 |  | ||||||
| 	// pending test |  | ||||||
| 	for i := 0; i < 20; i++ { |  | ||||||
| 		go func(i int) { |  | ||||||
| 			time.Sleep(time.Duration(i) * time.Second) |  | ||||||
| 			s.Delay(func(ctx context.Context) error { |  | ||||||
| 				time.Sleep(4 * time.Second) //nolint:mnd // test purpose |  | ||||||
| 				if rand.IntN(10)%2 == 0 {   //nolint:gosec,mnd // test prupose |  | ||||||
| 					return job.ErrJobNotCompletedYet |  | ||||||
| 				} |  | ||||||
| 				return nil |  | ||||||
| 			}) |  | ||||||
| 		}(i) |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	// abort test |  | ||||||
| 	j := s.Delay(func(ctx context.Context) error { |  | ||||||
| 		time.Sleep(4 * time.Second) //nolint:mnd // test purpose |  | ||||||
| 
 |  | ||||||
| 		select { |  | ||||||
| 		case <-ctx.Done(): |  | ||||||
| 			return ctx.Err() |  | ||||||
| 		default: |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		return job.ErrJobNotCompletedYet |  | ||||||
| 	}) |  | ||||||
| 	go func() { |  | ||||||
| 		time.Sleep(2 * time.Second) //nolint:mnd // test purpose |  | ||||||
| 		s.Abort(j) |  | ||||||
| 	}() |  | ||||||
| 
 |  | ||||||
| 	// abort test 2 |  | ||||||
| 	j2 := s.Delay(func(ctx context.Context) error { |  | ||||||
| 		time.Sleep(time.Second) |  | ||||||
| 
 |  | ||||||
| 		select { |  | ||||||
| 		case <-ctx.Done(): |  | ||||||
| 			return ctx.Err() |  | ||||||
| 		default: |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		return job.ErrJobNotCompletedYet |  | ||||||
| 	}) |  | ||||||
| 	go func() { |  | ||||||
| 		time.Sleep(10 * time.Second) //nolint:mnd // test purpose |  | ||||||
| 		s.Abort(j2) |  | ||||||
| 	}() |  | ||||||
| 
 |  | ||||||
| 	// error test |  | ||||||
| 	s.Delay(func(ctx context.Context) error { |  | ||||||
| 		time.Sleep(5 * time.Second) //nolint:mnd // test purpose |  | ||||||
| 		return errors.New("err") |  | ||||||
| 	}) |  | ||||||
| 
 |  | ||||||
| 	// success test |  | ||||||
| 	go func() { |  | ||||||
| 		time.Sleep(10 * time.Second) //nolint:mnd // test purpose |  | ||||||
| 		s.Delay(func(ctx context.Context) error { |  | ||||||
| 			time.Sleep(5 * time.Second) //nolint:mnd // test purpose |  | ||||||
| 			return nil |  | ||||||
| 		}) |  | ||||||
| 	}() |  | ||||||
| 
 |  | ||||||
| 	go func() { |  | ||||||
| 		for { |  | ||||||
| 			time.Sleep(2 * time.Second) //nolint:mnd // test purpose |  | ||||||
| 			if s.HasAllJobsDone() { |  | ||||||
| 				s.Stop() |  | ||||||
| 				return |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 	}() |  | ||||||
| 
 |  | ||||||
| 	<-s.Done() |  | ||||||
| 
 |  | ||||||
| 	jds := s.GetJobsDetails() |  | ||||||
| 	for _, jd := range jds { |  | ||||||
| 		c, err := json.Marshal(&jd) |  | ||||||
| 		if err != nil { |  | ||||||
| 			log.Err(err).Str("job", jd.ID.String()).Msg("unable to parse job details into JSON") |  | ||||||
| 			continue |  | ||||||
| 		} |  | ||||||
| 		fmt.Println(string(c)) |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
							
								
								
									
										213
									
								
								scheduler.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										213
									
								
								scheduler.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,213 @@ | |||||||
|  | package scheduler | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"sync" | ||||||
|  | 	"time" | ||||||
|  | 
 | ||||||
|  | 	"github.com/google/uuid" | ||||||
|  | 
 | ||||||
|  | 	"github.com/rs/zerolog/log" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | const ( | ||||||
|  | 	ChanLength          = 500 | ||||||
|  | 	DefaultExecInterval = 30 * time.Second | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | type IScheduler interface { | ||||||
|  | 	Delay(fnJob FnJob, opts ...TaskOption) uuid.UUID | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // SchedulerCycle is a simple scheduler handling jobs and executes them at regular interval. | ||||||
|  | // If a task is not in desired state, the task is re-scheduled. | ||||||
|  | type SchedulerCycle struct { | ||||||
|  | 	wg sync.WaitGroup | ||||||
|  | 
 | ||||||
|  | 	ctx      context.Context | ||||||
|  | 	fnCancel context.CancelFunc | ||||||
|  | 
 | ||||||
|  | 	tasks tasks | ||||||
|  | 
 | ||||||
|  | 	chTasks chan *task | ||||||
|  | 	chDone  chan struct{} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func NewSchedulerCycle(ctx context.Context, workers uint32) *SchedulerCycle { | ||||||
|  | 	ctxChild, fnCancel := context.WithCancel(ctx) | ||||||
|  | 
 | ||||||
|  | 	c := SchedulerCycle{ | ||||||
|  | 		wg:       sync.WaitGroup{}, | ||||||
|  | 		ctx:      ctxChild, | ||||||
|  | 		fnCancel: fnCancel, | ||||||
|  | 		tasks:    newTasks(), | ||||||
|  | 		chTasks:  make(chan *task, ChanLength), | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	done := make(chan struct{}) | ||||||
|  | 	go func() { | ||||||
|  | 		<-c.ctx.Done() | ||||||
|  | 		defer c.fnCancel() | ||||||
|  | 		c.wg.Wait() | ||||||
|  | 		c.stop() | ||||||
|  | 		done <- struct{}{} | ||||||
|  | 	}() | ||||||
|  | 	c.chDone = done | ||||||
|  | 
 | ||||||
|  | 	c.run(workers) | ||||||
|  | 
 | ||||||
|  | 	return &c | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // delay sets the task timer when the task should be scheduled. | ||||||
|  | func (c *SchedulerCycle) delay(t *task) { | ||||||
|  | 	interval := DefaultExecInterval | ||||||
|  | 	if t.execInterval != nil { | ||||||
|  | 		interval = *t.execInterval | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	t.setTimer( | ||||||
|  | 		time.AfterFunc(interval, func() { | ||||||
|  | 			select { | ||||||
|  | 			case c.chTasks <- t: | ||||||
|  | 			default: | ||||||
|  | 				log.Warn().Str("id", t.GetID().String()).Msg("queue is full, can't accept new task, delayed it") | ||||||
|  | 				c.delay(t) | ||||||
|  | 			} | ||||||
|  | 		}), | ||||||
|  | 	) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // exec runs the task now or if all the workers are in use, delayed it. | ||||||
|  | func (c *SchedulerCycle) exec(t *task) { | ||||||
|  | 	select { | ||||||
|  | 	case c.chTasks <- t: | ||||||
|  | 	default: | ||||||
|  | 		log.Warn().Str("id", t.GetID().String()).Msg("queue is full, can't accept new task, delayed it") | ||||||
|  | 		c.delay(t) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *SchedulerCycle) getTask(id uuid.UUID) *task { | ||||||
|  | 	return c.tasks.get(id) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // run launches a number of worker to execute tasks. | ||||||
|  | func (c *SchedulerCycle) run(n uint32) { | ||||||
|  | 	for i := 0; i < int(n); i++ { | ||||||
|  | 		c.wg.Add(1) | ||||||
|  | 		go func() { | ||||||
|  | 			defer c.wg.Done() | ||||||
|  | 			for { | ||||||
|  | 				select { | ||||||
|  | 				case t := <-c.chTasks: | ||||||
|  | 					c.execute(t, c.delay) | ||||||
|  | 				case <-c.ctx.Done(): | ||||||
|  | 					log.Error().Msg("context done, worker is stopping...") | ||||||
|  | 					return | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 		}() | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // execute executes the task. | ||||||
|  | // | ||||||
|  | // It does not handle task error, it's up to the task to implement its own callbacks. | ||||||
|  | // In case of pending state, a callback is executed for actions. For the others states, | ||||||
|  | // the task is deleted from the scheduler. | ||||||
|  | func (c *SchedulerCycle) execute(t *task, fnFallBack func(*task)) { | ||||||
|  | 	t.Run(c.ctx) | ||||||
|  | 
 | ||||||
|  | 	switch t.GetState() { | ||||||
|  | 	case Pending: | ||||||
|  | 		fnFallBack(t) | ||||||
|  | 	case Success, Failed, Abort, Unknown: | ||||||
|  | 		c.tasks.delete(t) | ||||||
|  | 	case Running: | ||||||
|  | 		c.tasks.delete(t) | ||||||
|  | 		log.Debug().Str("id", t.GetID().String()).Msg("weird state (running) after job execution...") | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // stop aborts all tasks and waits until tasks are stopped. | ||||||
|  | // If the process can't be stopped within 10s, too bad... | ||||||
|  | func (c *SchedulerCycle) stop() { | ||||||
|  | 	c.tasks.abort() | ||||||
|  | 
 | ||||||
|  | 	if c.TasksDone() { | ||||||
|  | 		log.Info().Msg("all tasks has been stopped gracefully") | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	ctxTimeout := 10 * time.Second | ||||||
|  | 	ctx, fnCancel := context.WithTimeout(c.ctx, ctxTimeout) | ||||||
|  | 	defer fnCancel() | ||||||
|  | 
 | ||||||
|  | 	for { | ||||||
|  | 		select { | ||||||
|  | 		case <-ctx.Done(): | ||||||
|  | 			log.Error().Msg("stop context done, tasks has been stopped gracefully") | ||||||
|  | 			return | ||||||
|  | 		default: | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		if c.TasksDone() { | ||||||
|  | 			log.Info().Msg("all tasks has been stopped gracefully") | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		time.Sleep(time.Second) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *SchedulerCycle) Done() <-chan struct{} { | ||||||
|  | 	return c.chDone | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *SchedulerCycle) Len() int { | ||||||
|  | 	return c.tasks.len() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // TasksDone checks whether all the tasks has been completed. | ||||||
|  | func (c *SchedulerCycle) TasksDone() bool { | ||||||
|  | 	return c.tasks.completed() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *SchedulerCycle) GetTasksDetails() []TaskDetails { | ||||||
|  | 	return c.tasks.getAllDetails() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // GetTaskDetails returns the task details by id. | ||||||
|  | func (c *SchedulerCycle) GetTaskDetails(id uuid.UUID) TaskDetails { | ||||||
|  | 	return c.tasks.getDetails(id) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Delay builds a task and adds it to the scheduler engine. | ||||||
|  | func (c *SchedulerCycle) Delay(fnJob FnJob, opts ...TaskOption) uuid.UUID { | ||||||
|  | 	select { | ||||||
|  | 	case <-c.Done(): | ||||||
|  | 		log.Error().Msg("context done unable to add new job") | ||||||
|  | 	default: | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	t := NewTask(fnJob, opts...) | ||||||
|  | 
 | ||||||
|  | 	c.tasks.add(t) | ||||||
|  | 	c.exec(t) | ||||||
|  | 
 | ||||||
|  | 	log.Info().Str("id", t.GetID().String()).Msg("task added successfully") | ||||||
|  | 	return t.GetID() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Abort aborts the task given by its id if it exists. | ||||||
|  | func (c *SchedulerCycle) Abort(id uuid.UUID) bool { | ||||||
|  | 	if t := c.getTask(id); t != nil { | ||||||
|  | 		t.Abort() | ||||||
|  | 
 | ||||||
|  | 		log.Info().Str("id", t.GetID().String()).Msg("abort task done") | ||||||
|  | 		return true | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return false | ||||||
|  | } | ||||||
							
								
								
									
										93
									
								
								scheduler_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										93
									
								
								scheduler_test.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,93 @@ | |||||||
|  | package scheduler | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"testing" | ||||||
|  | 	"time" | ||||||
|  | 
 | ||||||
|  | 	"github.com/stretchr/testify/assert" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | func TestScheduler(t *testing.T) { | ||||||
|  | 	ctx := context.Background() | ||||||
|  | 
 | ||||||
|  | 	var buf int | ||||||
|  | 
 | ||||||
|  | 	s := NewSchedulerCycle(ctx, 1) | ||||||
|  | 	taskID := s.Delay(func(ctx context.Context) (any, error) { | ||||||
|  | 		time.Sleep(50 * time.Millisecond) | ||||||
|  | 		buf += 1 | ||||||
|  | 		return nil, nil | ||||||
|  | 	}, WithExecInterval(2*time.Millisecond)) | ||||||
|  | 
 | ||||||
|  | 	assert.NotEmpty(t, taskID) | ||||||
|  | 	assert.False(t, s.TasksDone()) | ||||||
|  | 
 | ||||||
|  | 	time.Sleep(2 * time.Millisecond) | ||||||
|  | 
 | ||||||
|  | 	details := s.GetTaskDetails(taskID) | ||||||
|  | 	assert.Equal(t, "running", details.State) | ||||||
|  | 	assert.LessOrEqual(t, details.ElapsedTime, 50*time.Millisecond) | ||||||
|  | 
 | ||||||
|  | 	time.Sleep(50 * time.Millisecond) | ||||||
|  | 
 | ||||||
|  | 	assert.True(t, s.TasksDone()) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func TestSchedulerLoad(t *testing.T) { | ||||||
|  | 	ctx := context.Background() | ||||||
|  | 
 | ||||||
|  | 	s := NewSchedulerCycle(ctx, 1) | ||||||
|  | 
 | ||||||
|  | 	for i := 0; i < 500; i++ { | ||||||
|  | 		s.Delay(func(ctx context.Context) (any, error) { | ||||||
|  | 			time.Sleep(1 * time.Millisecond) | ||||||
|  | 			return nil, nil | ||||||
|  | 		}, WithExecInterval(1*time.Millisecond)) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	assert.Eventually(t, func() bool { | ||||||
|  | 		return s.TasksDone() | ||||||
|  | 	}, time.Second, 250*time.Millisecond) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func TestSchedulerExecInterval(t *testing.T) { | ||||||
|  | 	ctx := context.Background() | ||||||
|  | 
 | ||||||
|  | 	s := NewSchedulerCycle(ctx, 1) | ||||||
|  | 
 | ||||||
|  | 	s.Delay( | ||||||
|  | 		func(ctx context.Context) (any, error) { | ||||||
|  | 			return nil, ErrJobNotCompletedYet | ||||||
|  | 		}, | ||||||
|  | 		WithMaxDuration(50*time.Millisecond), | ||||||
|  | 		WithExecInterval(2*time.Millisecond), | ||||||
|  | 	) | ||||||
|  | 
 | ||||||
|  | 	time.Sleep(100 * time.Millisecond) | ||||||
|  | 
 | ||||||
|  | 	assert.True(t, s.TasksDone()) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func TestSchedulerContextDone(t *testing.T) { | ||||||
|  | 	ctx, fnCancel := context.WithCancel(context.Background()) | ||||||
|  | 
 | ||||||
|  | 	s := NewSchedulerCycle(ctx, 1) | ||||||
|  | 
 | ||||||
|  | 	for i := 0; i < 250; i++ { | ||||||
|  | 		s.Delay( | ||||||
|  | 			func(ctx context.Context) (any, error) { | ||||||
|  | 				return nil, ErrJobNotCompletedYet | ||||||
|  | 			}, | ||||||
|  | 			WithMaxDuration(100*time.Millisecond), | ||||||
|  | 			WithExecInterval(2*time.Millisecond), | ||||||
|  | 		) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	go func() { | ||||||
|  | 		time.Sleep(50 * time.Millisecond) | ||||||
|  | 		fnCancel() | ||||||
|  | 	}() | ||||||
|  | 
 | ||||||
|  | 	<-s.Done() | ||||||
|  | } | ||||||
							
								
								
									
										451
									
								
								task.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										451
									
								
								task.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,451 @@ | |||||||
|  | package scheduler | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"errors" | ||||||
|  | 	"sync" | ||||||
|  | 	"time" | ||||||
|  | 
 | ||||||
|  | 	"github.com/google/uuid" | ||||||
|  | 
 | ||||||
|  | 	"github.com/rs/zerolog/log" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | type State int | ||||||
|  | 
 | ||||||
|  | const ( | ||||||
|  | 	Pending State = iota | ||||||
|  | 	Running | ||||||
|  | 	Success | ||||||
|  | 	Failed | ||||||
|  | 	Abort | ||||||
|  | 	Unknown | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | const UnknownState = "unknown" | ||||||
|  | 
 | ||||||
|  | func (s State) String() string { | ||||||
|  | 	return [...]string{"pending", "running", "success", "failed", "abort", "unknown"}[s] | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | var ( | ||||||
|  | 	ErrJobAborted         = errors.New("job has been aborted") | ||||||
|  | 	ErrJobNotCompletedYet = errors.New("job is not right state, retrying") | ||||||
|  | 	ErrTimeExceeded       = errors.New("time exceeded") | ||||||
|  | 	ErrExecutionEngine    = errors.New("execution engine") | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | type FnJob func(ctx context.Context) (any, error) | ||||||
|  | 
 | ||||||
|  | type FnResult func(ctx context.Context, res any) | ||||||
|  | type FnError func(ctx context.Context, err error) | ||||||
|  | 
 | ||||||
|  | type TaskDetails struct { | ||||||
|  | 	State          string            `json:"state"` | ||||||
|  | 	CreatedAt      time.Time         `json:"createdAt"` | ||||||
|  | 	UpdatedAt      *time.Time        `json:"updatedAt,omitempty"` | ||||||
|  | 	MaxDuration    *time.Duration    `json:"maxDuration,omitempty"` | ||||||
|  | 	Attempts       uint32            `json:"attempts"` | ||||||
|  | 	Err            string            `json:"error"` | ||||||
|  | 	ElapsedTime    time.Duration     `json:"elapsedTime"` | ||||||
|  | 	AdditionalInfo map[string]string `json:"additionalInfos"` | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (td *TaskDetails) log() { | ||||||
|  | 	args := []any{} | ||||||
|  | 	args = append(args, | ||||||
|  | 		"state", td.State, | ||||||
|  | 		"createdAt", td.CreatedAt.Format("2006-01-02T15:04:05Z"), | ||||||
|  | 		"updatedAt", td.UpdatedAt.Format("2006-01-02T15:04:05Z"), | ||||||
|  | 		"elapsedTime", td.ElapsedTime.String(), | ||||||
|  | 		"attempts", td.Attempts, | ||||||
|  | 	) | ||||||
|  | 
 | ||||||
|  | 	if td.AdditionalInfo != nil { | ||||||
|  | 		for k, v := range td.AdditionalInfo { | ||||||
|  | 			args = append(args, k, v) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if td.Err != "" { | ||||||
|  | 		args = append(args, "error", td.Err) | ||||||
|  | 		log.Error().Any("args", args).Msg("task failed") | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	log.Info().Any("args", args).Msg("task execution done") | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type task struct { | ||||||
|  | 	l sync.RWMutex | ||||||
|  | 
 | ||||||
|  | 	id        uuid.UUID | ||||||
|  | 	createdAt time.Time | ||||||
|  | 	updatedAt *time.Time | ||||||
|  | 	state     State | ||||||
|  | 
 | ||||||
|  | 	fnJob     FnJob | ||||||
|  | 	fnSuccess FnResult | ||||||
|  | 	fnError   FnError | ||||||
|  | 
 | ||||||
|  | 	attempts     uint32 | ||||||
|  | 	timer        *time.Timer | ||||||
|  | 	maxDuration  *time.Duration | ||||||
|  | 	execTimeout  *time.Duration | ||||||
|  | 	execInterval *time.Duration | ||||||
|  | 
 | ||||||
|  | 	res any | ||||||
|  | 	err error | ||||||
|  | 
 | ||||||
|  | 	additionalInfos map[string]string | ||||||
|  | 
 | ||||||
|  | 	chAbort chan struct{} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type TaskOption func(t *task) | ||||||
|  | 
 | ||||||
|  | func WithMaxDuration(d time.Duration) TaskOption { | ||||||
|  | 	return func(t *task) { | ||||||
|  | 		t.maxDuration = &d | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func WithFnSuccess(f FnResult) TaskOption { | ||||||
|  | 	return func(t *task) { | ||||||
|  | 		t.fnSuccess = f | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func WithFnError(f FnError) TaskOption { | ||||||
|  | 	return func(t *task) { | ||||||
|  | 		t.fnError = f | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func WithExecTimeout(d time.Duration) TaskOption { | ||||||
|  | 	return func(t *task) { | ||||||
|  | 		t.execTimeout = &d | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func WithExecInterval(d time.Duration) TaskOption { | ||||||
|  | 	return func(t *task) { | ||||||
|  | 		t.execInterval = &d | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func WithAdditionalInfos(k, v string) TaskOption { | ||||||
|  | 	return func(t *task) { | ||||||
|  | 		if k == "" || v == "" { | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		if t.additionalInfos == nil { | ||||||
|  | 			t.additionalInfos = map[string]string{} | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		t.additionalInfos[k] = v | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // NewTask builds a task. | ||||||
|  | // Here the options details that can be set to the task: | ||||||
|  | //   - WithMaxDuration(time.Duration): the task will stop executing if the duration is exceeded (raise ErrTimeExceeded) | ||||||
|  | //   - WithFnSuccess(FnResult): call a function after a success execution | ||||||
|  | //   - WithFnError(FnError): call a function if an error occurred | ||||||
|  | //   - WithExecTimeout(time.Duration): sets a timeout for the task execution | ||||||
|  | //   - WithAdditionalInfos(k, v string): key-value additional informations (does not inferred with the task execution, log purpose) | ||||||
|  | // | ||||||
|  | // Scheduler options: | ||||||
|  | //   - WithExecInterval(time.Duration): specify the execution interval | ||||||
|  | func NewTask(f FnJob, opts ...TaskOption) *task { | ||||||
|  | 	t := task{ | ||||||
|  | 		id:        uuid.New(), | ||||||
|  | 		createdAt: time.Now().UTC(), | ||||||
|  | 		state:     Pending, | ||||||
|  | 		fnJob:     f, | ||||||
|  | 		chAbort:   make(chan struct{}, 1), | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	for _, o := range opts { | ||||||
|  | 		o(&t) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return &t | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *task) setState(s State) { | ||||||
|  | 	t.l.Lock() | ||||||
|  | 	defer t.l.Unlock() | ||||||
|  | 
 | ||||||
|  | 	now := time.Now().UTC() | ||||||
|  | 	t.updatedAt = &now | ||||||
|  | 
 | ||||||
|  | 	t.state = s | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *task) setSuccess(ctx context.Context, res any) { | ||||||
|  | 	t.l.Lock() | ||||||
|  | 	defer t.l.Unlock() | ||||||
|  | 
 | ||||||
|  | 	now := time.Now().UTC() | ||||||
|  | 	t.updatedAt = &now | ||||||
|  | 
 | ||||||
|  | 	t.state = Success | ||||||
|  | 	t.res = res | ||||||
|  | 
 | ||||||
|  | 	if t.fnSuccess != nil { | ||||||
|  | 		t.fnSuccess(ctx, res) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *task) setFail(ctx context.Context, err error) { | ||||||
|  | 	t.l.Lock() | ||||||
|  | 	defer t.l.Unlock() | ||||||
|  | 
 | ||||||
|  | 	now := time.Now().UTC() | ||||||
|  | 	t.updatedAt = &now | ||||||
|  | 
 | ||||||
|  | 	if t.state != Abort { | ||||||
|  | 		t.state = Failed | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	t.err = err | ||||||
|  | 
 | ||||||
|  | 	if t.fnError != nil { | ||||||
|  | 		t.fnError(ctx, err) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *task) setTimer(tm *time.Timer) { | ||||||
|  | 	t.l.Lock() | ||||||
|  | 	defer t.l.Unlock() | ||||||
|  | 
 | ||||||
|  | 	t.timer = tm | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *task) stopTimer() { | ||||||
|  | 	t.l.Lock() | ||||||
|  | 	defer t.l.Unlock() | ||||||
|  | 
 | ||||||
|  | 	if t.timer != nil { | ||||||
|  | 		t.timer.Stop() | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *task) incr() { | ||||||
|  | 	t.l.Lock() | ||||||
|  | 	defer t.l.Unlock() | ||||||
|  | 
 | ||||||
|  | 	t.attempts += 1 | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *task) log() { | ||||||
|  | 	td := t.IntoDetails() | ||||||
|  | 	td.log() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *task) GetID() uuid.UUID { | ||||||
|  | 	t.l.RLock() | ||||||
|  | 	defer t.l.RUnlock() | ||||||
|  | 
 | ||||||
|  | 	return t.id | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *task) GetAttempts() uint32 { | ||||||
|  | 	t.l.RLock() | ||||||
|  | 	defer t.l.RUnlock() | ||||||
|  | 
 | ||||||
|  | 	return t.attempts | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *task) GetState() State { | ||||||
|  | 	t.l.RLock() | ||||||
|  | 	defer t.l.RUnlock() | ||||||
|  | 
 | ||||||
|  | 	return t.state | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // TimeExceeded checks if the task does not reached its max duration execution (maxDuration). | ||||||
|  | func (t *task) TimeExceeded() bool { | ||||||
|  | 	t.l.RLock() | ||||||
|  | 	defer t.l.RUnlock() | ||||||
|  | 
 | ||||||
|  | 	if md := t.maxDuration; md != nil { | ||||||
|  | 		return time.Since(t.createdAt) >= *md | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return false | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *task) Abort() { | ||||||
|  | 	t.stopTimer() | ||||||
|  | 	t.chAbort <- struct{}{} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *task) Run(ctx context.Context) { | ||||||
|  | 	if t.TimeExceeded() { | ||||||
|  | 		t.setFail(ctx, ErrTimeExceeded) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if s := t.GetState(); s != Pending { | ||||||
|  | 		log.Error().Msg("unable to launch a task that not in pending state") | ||||||
|  | 		t.setFail(ctx, ErrExecutionEngine) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	var ctxExec context.Context | ||||||
|  | 	var fnCancel context.CancelFunc | ||||||
|  | 	if t.execTimeout != nil { | ||||||
|  | 		ctxExec, fnCancel = context.WithTimeout(ctx, *t.execTimeout) | ||||||
|  | 	} else { | ||||||
|  | 		ctxExec, fnCancel = context.WithCancel(ctx) | ||||||
|  | 	} | ||||||
|  | 	defer fnCancel() | ||||||
|  | 
 | ||||||
|  | 	t.incr() | ||||||
|  | 	t.setState(Running) | ||||||
|  | 
 | ||||||
|  | 	log.Info().Str("id", t.GetID().String()).Msg("task is running...") | ||||||
|  | 
 | ||||||
|  | 	go func() { | ||||||
|  | 		for range t.chAbort { | ||||||
|  | 			t.setState(Abort) | ||||||
|  | 			fnCancel() | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  | 
 | ||||||
|  | 	defer t.log() | ||||||
|  | 	res, err := t.fnJob(ctxExec) | ||||||
|  | 	if err != nil { | ||||||
|  | 		if errors.Is(err, ErrJobNotCompletedYet) { | ||||||
|  | 			t.setState(Pending) | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 		t.setFail(ctx, err) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	t.setSuccess(ctx, res) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (t *task) IntoDetails() TaskDetails { | ||||||
|  | 	t.l.RLock() | ||||||
|  | 	defer t.l.RUnlock() | ||||||
|  | 
 | ||||||
|  | 	td := TaskDetails{ | ||||||
|  | 		CreatedAt:   t.createdAt, | ||||||
|  | 		UpdatedAt:   t.updatedAt, | ||||||
|  | 		State:       t.state.String(), | ||||||
|  | 		Attempts:    t.attempts, | ||||||
|  | 		MaxDuration: t.maxDuration, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if t.state == Pending || t.state == Running { | ||||||
|  | 		td.ElapsedTime = time.Since(t.createdAt) | ||||||
|  | 	} else { | ||||||
|  | 		td.ElapsedTime = t.updatedAt.Sub(t.createdAt) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if err := t.err; err != nil { | ||||||
|  | 		td.Err = err.Error() | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if t.additionalInfos != nil { | ||||||
|  | 		td.AdditionalInfo = t.additionalInfos | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return td | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type tasks struct { | ||||||
|  | 	l sync.RWMutex | ||||||
|  | 	s map[uuid.UUID]*task | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func newTasks() tasks { | ||||||
|  | 	return tasks{ | ||||||
|  | 		s: make(map[uuid.UUID]*task), | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (ts *tasks) add(t *task) { | ||||||
|  | 	ts.l.Lock() | ||||||
|  | 	defer ts.l.Unlock() | ||||||
|  | 
 | ||||||
|  | 	ts.s[t.GetID()] = t | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (ts *tasks) delete(t *task) { | ||||||
|  | 	ts.l.Lock() | ||||||
|  | 	defer ts.l.Unlock() | ||||||
|  | 
 | ||||||
|  | 	delete(ts.s, t.GetID()) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (ts *tasks) get(id uuid.UUID) *task { | ||||||
|  | 	ts.l.RLock() | ||||||
|  | 	defer ts.l.RUnlock() | ||||||
|  | 
 | ||||||
|  | 	t, ok := ts.s[id] | ||||||
|  | 	if !ok { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return t | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (ts *tasks) len() int { | ||||||
|  | 	ts.l.RLock() | ||||||
|  | 	defer ts.l.RUnlock() | ||||||
|  | 
 | ||||||
|  | 	return len(ts.s) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (ts *tasks) completed() bool { | ||||||
|  | 	ts.l.RLock() | ||||||
|  | 	defer ts.l.RUnlock() | ||||||
|  | 
 | ||||||
|  | 	for _, t := range ts.s { | ||||||
|  | 		if t.GetState() == Pending || t.GetState() == Running { | ||||||
|  | 			return false | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return true | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (ts *tasks) getAllDetails() []TaskDetails { | ||||||
|  | 	ts.l.RLock() | ||||||
|  | 	defer ts.l.RUnlock() | ||||||
|  | 
 | ||||||
|  | 	details := []TaskDetails{} | ||||||
|  | 	for _, t := range ts.s { | ||||||
|  | 		details = append(details, t.IntoDetails()) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return details | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (ts *tasks) getDetails(id uuid.UUID) TaskDetails { | ||||||
|  | 	ts.l.RLock() | ||||||
|  | 	defer ts.l.RUnlock() | ||||||
|  | 
 | ||||||
|  | 	t, ok := ts.s[id] | ||||||
|  | 	if !ok { | ||||||
|  | 		return TaskDetails{State: UnknownState} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return t.IntoDetails() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (ts *tasks) abort() { | ||||||
|  | 	ts.l.RLock() | ||||||
|  | 	defer ts.l.RUnlock() | ||||||
|  | 
 | ||||||
|  | 	for _, t := range ts.s { | ||||||
|  | 		t.Abort() | ||||||
|  | 	} | ||||||
|  | } | ||||||
							
								
								
									
										243
									
								
								task_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										243
									
								
								task_test.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,243 @@ | |||||||
|  | package scheduler | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"errors" | ||||||
|  | 	"testing" | ||||||
|  | 	"time" | ||||||
|  | 
 | ||||||
|  | 	"github.com/google/uuid" | ||||||
|  | 	"github.com/stretchr/testify/assert" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | func TestTask(t *testing.T) { | ||||||
|  | 	ctx := context.Background() | ||||||
|  | 	var i int | ||||||
|  | 
 | ||||||
|  | 	task := NewTask( | ||||||
|  | 		func(ctx context.Context) (any, error) { | ||||||
|  | 			i += 1 | ||||||
|  | 			return nil, nil | ||||||
|  | 		}, | ||||||
|  | 	) | ||||||
|  | 
 | ||||||
|  | 	task.Run(ctx) | ||||||
|  | 
 | ||||||
|  | 	assert.Equal(t, nil, task.res) | ||||||
|  | 	assert.NotEmpty(t, task.updatedAt) | ||||||
|  | 	assert.Equal(t, 1, i) | ||||||
|  | 	assert.Equal(t, 1, int(task.GetAttempts())) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func TestAbortTask(t *testing.T) { | ||||||
|  | 	ctx := context.Background() | ||||||
|  | 
 | ||||||
|  | 	task := NewTask( | ||||||
|  | 		func(ctx context.Context) (any, error) { | ||||||
|  | 			<-ctx.Done() | ||||||
|  | 			return nil, ctx.Err() | ||||||
|  | 		}, | ||||||
|  | 	) | ||||||
|  | 
 | ||||||
|  | 	timer := time.NewTicker(200 * time.Millisecond) | ||||||
|  | 	go func() { | ||||||
|  | 		<-timer.C | ||||||
|  | 		task.Abort() | ||||||
|  | 	}() | ||||||
|  | 	task.Run(ctx) | ||||||
|  | 
 | ||||||
|  | 	assert.Equal(t, Abort, task.GetState()) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func TestTaskContextDone(t *testing.T) { | ||||||
|  | 	ctx, fnCancel := context.WithCancel(context.Background()) | ||||||
|  | 
 | ||||||
|  | 	task := NewTask( | ||||||
|  | 		func(ctx context.Context) (any, error) { | ||||||
|  | 			<-ctx.Done() | ||||||
|  | 			return nil, ctx.Err() | ||||||
|  | 		}, | ||||||
|  | 	) | ||||||
|  | 
 | ||||||
|  | 	timer := time.NewTicker(200 * time.Millisecond) | ||||||
|  | 	go func() { | ||||||
|  | 		<-timer.C | ||||||
|  | 		fnCancel() | ||||||
|  | 	}() | ||||||
|  | 	task.Run(ctx) | ||||||
|  | 
 | ||||||
|  | 	assert.Equal(t, Failed, task.GetState()) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func TestTaskFnSuccess(t *testing.T) { | ||||||
|  | 	ctx := context.Background() | ||||||
|  | 	var result int | ||||||
|  | 
 | ||||||
|  | 	task := NewTask( | ||||||
|  | 		func(ctx context.Context) (any, error) { | ||||||
|  | 			return 3, nil | ||||||
|  | 		}, | ||||||
|  | 		WithFnSuccess(func(ctx context.Context, res any) { | ||||||
|  | 			t, ok := res.(int) | ||||||
|  | 			if !ok { | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			result = t * 2 | ||||||
|  | 		}), | ||||||
|  | 	) | ||||||
|  | 
 | ||||||
|  | 	task.Run(ctx) | ||||||
|  | 
 | ||||||
|  | 	assert.Equal(t, Success, task.GetState()) | ||||||
|  | 	assert.Equal(t, 6, result) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func TestTaskFnError(t *testing.T) { | ||||||
|  | 	ctx := context.Background() | ||||||
|  | 	var result error | ||||||
|  | 
 | ||||||
|  | 	task := NewTask( | ||||||
|  | 		func(ctx context.Context) (any, error) { | ||||||
|  | 			return 3, errors.New("error occurred...") | ||||||
|  | 		}, | ||||||
|  | 		WithFnError(func(ctx context.Context, err error) { | ||||||
|  | 			result = err | ||||||
|  | 		}), | ||||||
|  | 	) | ||||||
|  | 
 | ||||||
|  | 	task.Run(ctx) | ||||||
|  | 
 | ||||||
|  | 	assert.Equal(t, Failed, task.GetState()) | ||||||
|  | 	assert.Equal(t, "error occurred...", result.Error()) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func TestTaskWithErrJobNotCompletedYet(t *testing.T) { | ||||||
|  | 	ctx := context.Background() | ||||||
|  | 	var attempts int | ||||||
|  | 
 | ||||||
|  | 	task := NewTask( | ||||||
|  | 		func(ctx context.Context) (any, error) { | ||||||
|  | 			if attempts < 2 { | ||||||
|  | 				attempts += 1 | ||||||
|  | 				return nil, ErrJobNotCompletedYet | ||||||
|  | 			} | ||||||
|  | 			return "ok", nil | ||||||
|  | 		}, | ||||||
|  | 	) | ||||||
|  | 
 | ||||||
|  | 	for i := 0; i < 2; i++ { | ||||||
|  | 		task.Run(ctx) | ||||||
|  | 		assert.Equal(t, Pending, task.GetState()) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	task.Run(ctx) | ||||||
|  | 	assert.Equal(t, Success, task.GetState()) | ||||||
|  | 	assert.Equal(t, 3, int(task.GetAttempts())) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func TestTaskTimeExceeded(t *testing.T) { | ||||||
|  | 	ctx := context.Background() | ||||||
|  | 
 | ||||||
|  | 	task := NewTask( | ||||||
|  | 		func(ctx context.Context) (any, error) { | ||||||
|  | 			return "ko", nil | ||||||
|  | 		}, | ||||||
|  | 		WithMaxDuration(5*time.Millisecond), | ||||||
|  | 	) | ||||||
|  | 
 | ||||||
|  | 	time.Sleep(10 * time.Millisecond) | ||||||
|  | 
 | ||||||
|  | 	task.Run(ctx) | ||||||
|  | 	assert.Equal(t, Failed, task.GetState()) | ||||||
|  | 	assert.Equal(t, 0, int(task.GetAttempts())) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func TestTaskExecTimeout(t *testing.T) { | ||||||
|  | 	ctx := context.Background() | ||||||
|  | 
 | ||||||
|  | 	task := NewTask( | ||||||
|  | 		func(ctx context.Context) (any, error) { | ||||||
|  | 			<-ctx.Done() | ||||||
|  | 			return nil, ctx.Err() | ||||||
|  | 		}, | ||||||
|  | 		WithExecTimeout(5*time.Millisecond), | ||||||
|  | 	) | ||||||
|  | 
 | ||||||
|  | 	time.Sleep(10 * time.Millisecond) | ||||||
|  | 
 | ||||||
|  | 	task.Run(ctx) | ||||||
|  | 	assert.Equal(t, Failed, task.GetState()) | ||||||
|  | 	assert.Equal(t, 1, int(task.GetAttempts())) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func TestTaskDetails(t *testing.T) { | ||||||
|  | 	ctx := context.Background() | ||||||
|  | 
 | ||||||
|  | 	task := NewTask( | ||||||
|  | 		func(ctx context.Context) (any, error) { | ||||||
|  | 			return "coucou", nil | ||||||
|  | 		}, | ||||||
|  | 	) | ||||||
|  | 
 | ||||||
|  | 	details := task.IntoDetails() | ||||||
|  | 
 | ||||||
|  | 	assert.Equal(t, 0, int(details.Attempts)) | ||||||
|  | 	assert.Equal(t, "pending", details.State) | ||||||
|  | 	assert.False(t, details.CreatedAt.IsZero()) | ||||||
|  | 	assert.Empty(t, details.UpdatedAt) | ||||||
|  | 	assert.Nil(t, details.MaxDuration) | ||||||
|  | 	assert.Empty(t, details.Err) | ||||||
|  | 	assert.NotEmpty(t, details.ElapsedTime) | ||||||
|  | 
 | ||||||
|  | 	task.Run(ctx) | ||||||
|  | 
 | ||||||
|  | 	details = task.IntoDetails() | ||||||
|  | 
 | ||||||
|  | 	assert.Equal(t, 1, int(details.Attempts)) | ||||||
|  | 	assert.Equal(t, "success", details.State) | ||||||
|  | 	assert.False(t, details.CreatedAt.IsZero()) | ||||||
|  | 	assert.NotEmpty(t, details.UpdatedAt) | ||||||
|  | 	assert.Nil(t, details.MaxDuration) | ||||||
|  | 	assert.Empty(t, details.Err) | ||||||
|  | 	assert.NotEmpty(t, details.ElapsedTime) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func TestTaskAdditionalInfos(t *testing.T) { | ||||||
|  | 	t.Run("with key value", func(t *testing.T) { | ||||||
|  | 		elementID := uuid.NewString() | ||||||
|  | 		task := NewTask( | ||||||
|  | 			func(ctx context.Context) (any, error) { | ||||||
|  | 				return "yo", nil | ||||||
|  | 			}, | ||||||
|  | 			WithAdditionalInfos("transportId", elementID), | ||||||
|  | 			WithAdditionalInfos("element", "transport"), | ||||||
|  | 		) | ||||||
|  | 
 | ||||||
|  | 		assert.Equal(t, elementID, task.additionalInfos["transportId"]) | ||||||
|  | 		assert.Equal(t, "transport", task.additionalInfos["element"]) | ||||||
|  | 	}) | ||||||
|  | 
 | ||||||
|  | 	t.Run("with empty key", func(t *testing.T) { | ||||||
|  | 		elementID := uuid.NewString() | ||||||
|  | 		task := NewTask( | ||||||
|  | 			func(ctx context.Context) (any, error) { | ||||||
|  | 				return "hello", nil | ||||||
|  | 			}, | ||||||
|  | 			WithAdditionalInfos("", elementID), | ||||||
|  | 			WithAdditionalInfos("element", "transport"), | ||||||
|  | 		) | ||||||
|  | 
 | ||||||
|  | 		assert.Equal(t, "transport", task.additionalInfos["element"]) | ||||||
|  | 	}) | ||||||
|  | 
 | ||||||
|  | 	t.Run("with empty infos", func(t *testing.T) { | ||||||
|  | 		task := NewTask( | ||||||
|  | 			func(ctx context.Context) (any, error) { | ||||||
|  | 				return "hey", nil | ||||||
|  | 			}, | ||||||
|  | 		) | ||||||
|  | 
 | ||||||
|  | 		assert.Nil(t, task.additionalInfos) | ||||||
|  | 	}) | ||||||
|  | } | ||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user