Compare commits
No commits in common. "main" and "feat/slots" have entirely different histories.
main
...
feat/slots
3
Makefile
3
Makefile
@ -1,3 +1,6 @@
|
|||||||
|
run: lint
|
||||||
|
go run main.go
|
||||||
|
|
||||||
lint:
|
lint:
|
||||||
golangci-lint run ./...
|
golangci-lint run ./...
|
||||||
|
|
||||||
|
|||||||
81
README.md
81
README.md
@ -1,47 +1,48 @@
|
|||||||
# cycle-scheduler
|
# cycle-scheduler
|
||||||
|
|
||||||
cycle-scheduler is a simple scheduler lib, handling tasks and executes them at regular interval. If a task is not in desired state, the task is re-scheduled.
|
cycle-scheduler is a simple scheduler handling jobs and executes them at regular interval.
|
||||||
|
|
||||||
**NOTE**: this should be not used for long-running tasks, it's more suitable for shorts tasks like polling etc...
|
Here a simple representation:
|
||||||
|
```ascii
|
||||||
|
+------------------------------------------------------+
|
||||||
|
| +---+ +---+ +---+ +---+ +---+ +---+ |
|
||||||
|
| | | | | | | | | | | | | |
|
||||||
|
| | | | | | | | | | | | | |
|
||||||
|
| | | | | | | | | | | | | |
|
||||||
|
| | | | | | | | | | | | | |
|
||||||
|
| | | | | | | | | | | | | |
|
||||||
|
| | | | | | | | | | | | | |
|
||||||
|
| |s1 | |s2 | |s3 | |s4 | | | |s60| |
|
||||||
|
| +---+ +---+ +---+ +---+ +---+ +---+ |
|
||||||
|
+---------------^--------------------------------------+
|
||||||
|
```
|
||||||
|
Jobs are handle in a array of job slices.
|
||||||
|
|
||||||
## Examples
|
At each interval (clock), the cursor `^` moves to the next slot (s*).
|
||||||
* Init a new scheduler with 4 workers
|
If there are jobs, they are sent to workers to be executed
|
||||||
|
and the slot is cleaned.
|
||||||
|
At the end of the slot (s60), the cursor re-starts a new cycle from s1.
|
||||||
|
|
||||||
|
If a job is not in a desire state, the job is re-scheduled in the current slot to be re-executed in the next cycle.
|
||||||
|
|
||||||
|
**NOTE**: This scheduler does not accept long running tasks. Job execution have a fixed timeout of 10s.
|
||||||
|
Pooling tasks are more suitable for this kind of scheduler.
|
||||||
|
|
||||||
|
## Run
|
||||||
|
You can run sample tests from `main.go` to see the scheduler in action:
|
||||||
|
```bash
|
||||||
|
make run
|
||||||
|
```
|
||||||
|
If all goes well, you should see this kind of output in the stdout:
|
||||||
|
```ascii
|
||||||
|
# cycle-scheduler (slot: 7)
|
||||||
|
_ P _ _ _ _ _ _ _ _ _ _ _ _
|
||||||
|
- - - - - - ^ - - - - - - -
|
||||||
|
```
|
||||||
|
> **P** means *pending* state
|
||||||
|
|
||||||
|
You can adjust the clock interval as needed in `main.go`:
|
||||||
```go
|
```go
|
||||||
package main
|
interval := 200 * time.Millisecond
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
scheduler "gitea.thegux.fr/rmanach/cycle-scheduler.git"
|
|
||||||
)
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
ctx, fnCancel := context.WithCancel(context.Background())
|
|
||||||
s := scheduler.NewSchedulerCycle(ctx, 4)
|
|
||||||
|
|
||||||
// add a task with an execution interval of 2 ms (executed every 2 ms)
|
|
||||||
// and a maximum duration of 30 second.
|
|
||||||
s.Delay(
|
|
||||||
func(ctx context.Context) (any, error) {
|
|
||||||
// ...
|
|
||||||
return nil, nil
|
|
||||||
},
|
|
||||||
scheduler.WithExecInterval(2*time.Millisecond),
|
|
||||||
scheduler.WithMaxDuration(30*time.Second),
|
|
||||||
)
|
|
||||||
|
|
||||||
// stop the program after 5 seconds
|
|
||||||
go func() {
|
|
||||||
time.Sleep(5 * time.Second)
|
|
||||||
fnCancel()
|
|
||||||
}()
|
|
||||||
|
|
||||||
<-ctx.Done()
|
|
||||||
<-s.Done()
|
|
||||||
}
|
|
||||||
```
|
```
|
||||||
|
|
||||||
**NOTE**: for `Delay` optionals arguments, check the `NewTask` method documentation for more details.
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
2
go.mod
2
go.mod
@ -1,4 +1,4 @@
|
|||||||
module gitea.thegux.fr/rmanach/cycle-scheduler.git
|
module cycle-scheduler
|
||||||
|
|
||||||
go 1.22.4
|
go 1.22.4
|
||||||
|
|
||||||
|
|||||||
166
internal/job/job.go
Normal file
166
internal/job/job.go
Normal file
@ -0,0 +1,166 @@
|
|||||||
|
package job
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/rs/zerolog/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
type State int
|
||||||
|
|
||||||
|
const (
|
||||||
|
Pending State = iota
|
||||||
|
Running
|
||||||
|
Success
|
||||||
|
Failed
|
||||||
|
Abort
|
||||||
|
Unknown
|
||||||
|
)
|
||||||
|
|
||||||
|
const UnknownState = "unknown"
|
||||||
|
|
||||||
|
var JobExecTimeout = 10 * time.Second
|
||||||
|
|
||||||
|
func (s State) String() string {
|
||||||
|
switch s {
|
||||||
|
case Pending:
|
||||||
|
return "pending"
|
||||||
|
case Running:
|
||||||
|
return "running"
|
||||||
|
case Success:
|
||||||
|
return "success"
|
||||||
|
case Failed:
|
||||||
|
return "failed"
|
||||||
|
case Abort:
|
||||||
|
return "abort"
|
||||||
|
case Unknown:
|
||||||
|
return UnknownState
|
||||||
|
default:
|
||||||
|
return UnknownState
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrJobAborted = errors.New("job has been aborted")
|
||||||
|
ErrJobNotCompletedYet = errors.New("job is not right state, retrying")
|
||||||
|
)
|
||||||
|
|
||||||
|
type FnJob func(ctx context.Context) error
|
||||||
|
|
||||||
|
type JobDetails struct {
|
||||||
|
ID uuid.UUID `json:"id"`
|
||||||
|
State string `json:"state"`
|
||||||
|
CreatedAt time.Time `json:"createdAt"`
|
||||||
|
UpdatedAt *time.Time `json:"updatedAt,omitempty"`
|
||||||
|
Err string `json:"error"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(rmanach): add priority level
|
||||||
|
type Job struct {
|
||||||
|
l sync.RWMutex
|
||||||
|
id uuid.UUID
|
||||||
|
createdAt time.Time
|
||||||
|
updatedAt *time.Time
|
||||||
|
state State
|
||||||
|
task FnJob
|
||||||
|
err error
|
||||||
|
chAbort chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewJob(task FnJob, row, col int) Job {
|
||||||
|
return Job{
|
||||||
|
id: uuid.New(),
|
||||||
|
createdAt: time.Now().UTC(),
|
||||||
|
state: Pending,
|
||||||
|
task: task,
|
||||||
|
chAbort: make(chan struct{}, 1),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *Job) IntoDetails() JobDetails {
|
||||||
|
j.l.RLock()
|
||||||
|
defer j.l.RUnlock()
|
||||||
|
|
||||||
|
jd := JobDetails{
|
||||||
|
ID: j.id,
|
||||||
|
CreatedAt: j.createdAt,
|
||||||
|
State: j.state.String(),
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := j.err; err != nil {
|
||||||
|
jd.Err = err.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
if ut := j.updatedAt; ut != nil {
|
||||||
|
jd.UpdatedAt = ut
|
||||||
|
}
|
||||||
|
|
||||||
|
return jd
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *Job) GetID() uuid.UUID {
|
||||||
|
return j.id
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *Job) GetState() State {
|
||||||
|
j.l.RLock()
|
||||||
|
defer j.l.RUnlock()
|
||||||
|
|
||||||
|
return j.state
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *Job) setState(s State) {
|
||||||
|
j.l.Lock()
|
||||||
|
defer j.l.Unlock()
|
||||||
|
|
||||||
|
now := time.Now().UTC()
|
||||||
|
j.updatedAt = &now
|
||||||
|
|
||||||
|
j.state = s
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *Job) setFail(err error) {
|
||||||
|
j.l.Lock()
|
||||||
|
defer j.l.Unlock()
|
||||||
|
|
||||||
|
now := time.Now().UTC()
|
||||||
|
j.updatedAt = &now
|
||||||
|
|
||||||
|
j.state = Failed
|
||||||
|
j.err = err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *Job) Abort() {
|
||||||
|
j.setState(Abort)
|
||||||
|
j.chAbort <- struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *Job) Run(ctx context.Context) {
|
||||||
|
ctxExec, fnCancel := context.WithTimeout(ctx, JobExecTimeout)
|
||||||
|
defer fnCancel()
|
||||||
|
|
||||||
|
j.setState(Running)
|
||||||
|
|
||||||
|
log.Info().Str("job", j.GetID().String()).Msg("job running...")
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for range j.chAbort {
|
||||||
|
j.setState(Abort)
|
||||||
|
fnCancel()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
if err := j.task(ctxExec); err != nil {
|
||||||
|
if errors.Is(err, ErrJobNotCompletedYet) {
|
||||||
|
j.setState(Pending)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
j.setFail(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
j.setState(Success)
|
||||||
|
}
|
||||||
371
internal/scheduler/scheduler.go
Normal file
371
internal/scheduler/scheduler.go
Normal file
@ -0,0 +1,371 @@
|
|||||||
|
package scheduler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"cycle-scheduler/internal/job"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/rs/zerolog/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
TableTitle = "# cycle-scheduler"
|
||||||
|
Cursor = "^"
|
||||||
|
CycleLength = 60
|
||||||
|
MaxWorkers = 5
|
||||||
|
)
|
||||||
|
|
||||||
|
const MaxSlotsIdx = 59
|
||||||
|
|
||||||
|
type JobSlot struct {
|
||||||
|
*job.Job
|
||||||
|
row int
|
||||||
|
}
|
||||||
|
|
||||||
|
// SchedulerCycle is a dumb scheduler.
|
||||||
|
// It handle job and executes it at each cycle (60 * interval).
|
||||||
|
//
|
||||||
|
// Jobs are handle in a array of job slices.
|
||||||
|
// At each interval (clock), the cursor moves to the next slot (s*).
|
||||||
|
// If there are jobs, they are sent to workers to be executed
|
||||||
|
// and the slot is cleaned.
|
||||||
|
//
|
||||||
|
// At the end of the slot (s60), the cursor re-starts a cycle at s1.
|
||||||
|
type SchedulerCycle struct {
|
||||||
|
l sync.RWMutex
|
||||||
|
wg sync.WaitGroup
|
||||||
|
|
||||||
|
ctx context.Context
|
||||||
|
fnCancel context.CancelFunc
|
||||||
|
|
||||||
|
interval time.Duration
|
||||||
|
currentSlot int
|
||||||
|
slots [60][]*job.Job
|
||||||
|
jobs map[uuid.UUID]*job.Job
|
||||||
|
|
||||||
|
chJobs chan *JobSlot
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSchedulerCycle(ctx context.Context, interval time.Duration) *SchedulerCycle {
|
||||||
|
ctxChild, fnCancel := context.WithCancel(ctx)
|
||||||
|
|
||||||
|
c := SchedulerCycle{
|
||||||
|
wg: sync.WaitGroup{},
|
||||||
|
ctx: ctxChild,
|
||||||
|
fnCancel: fnCancel,
|
||||||
|
interval: interval,
|
||||||
|
currentSlot: 0,
|
||||||
|
slots: [60][]*job.Job{},
|
||||||
|
jobs: make(map[uuid.UUID]*job.Job),
|
||||||
|
chJobs: make(chan *JobSlot),
|
||||||
|
}
|
||||||
|
|
||||||
|
c.run()
|
||||||
|
|
||||||
|
return &c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SchedulerCycle) Stop() {
|
||||||
|
c.fnCancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SchedulerCycle) Done() <-chan struct{} {
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
<-c.ctx.Done()
|
||||||
|
c.wg.Done()
|
||||||
|
done <- struct{}{}
|
||||||
|
}()
|
||||||
|
return done
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SchedulerCycle) Len() int {
|
||||||
|
c.l.Lock()
|
||||||
|
defer c.l.Unlock()
|
||||||
|
|
||||||
|
return len(c.jobs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SchedulerCycle) HasAllJobsDone() bool {
|
||||||
|
c.l.Lock()
|
||||||
|
defer c.l.Unlock()
|
||||||
|
|
||||||
|
for _, j := range c.jobs {
|
||||||
|
if j.GetState() == job.Pending || j.GetState() == job.Running {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SchedulerCycle) GetJobsDetails() []job.JobDetails {
|
||||||
|
c.l.Lock()
|
||||||
|
defer c.l.Unlock()
|
||||||
|
|
||||||
|
details := []job.JobDetails{}
|
||||||
|
for _, j := range c.jobs {
|
||||||
|
details = append(details, j.IntoDetails())
|
||||||
|
}
|
||||||
|
|
||||||
|
return details
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delay builds a job and add it to the scheduler engine.
|
||||||
|
func (c *SchedulerCycle) Delay(fnJob job.FnJob) uuid.UUID {
|
||||||
|
c.l.Lock()
|
||||||
|
defer c.l.Unlock()
|
||||||
|
|
||||||
|
nextSlot := c.currentSlot + 1
|
||||||
|
if nextSlot > MaxSlotsIdx {
|
||||||
|
nextSlot = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
j := job.NewJob(fnJob, nextSlot, len(c.slots[nextSlot]))
|
||||||
|
|
||||||
|
c.slots[nextSlot] = append(c.slots[nextSlot], &j)
|
||||||
|
c.jobs[j.GetID()] = &j
|
||||||
|
|
||||||
|
log.Info().Str("job", j.GetID().String()).Msg("job added successfully")
|
||||||
|
return j.GetID()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Abort aborts the job given by its id if it exists..
|
||||||
|
func (c *SchedulerCycle) Abort(id uuid.UUID) bool {
|
||||||
|
if j := c.getJob(id); j != nil {
|
||||||
|
j.Abort()
|
||||||
|
|
||||||
|
log.Info().Str("job", j.GetID().String()).Msg("abort job done")
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetJobDetails returns the job details by .
|
||||||
|
func (c *SchedulerCycle) GetJobDetails(id uuid.UUID) job.JobDetails {
|
||||||
|
c.l.Lock()
|
||||||
|
defer c.l.Unlock()
|
||||||
|
|
||||||
|
j, ok := c.jobs[id]
|
||||||
|
if !ok {
|
||||||
|
return job.JobDetails{
|
||||||
|
State: job.Unknown.String(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return j.IntoDetails()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Display outputs earch interval the scheduler state.
|
||||||
|
func (c *SchedulerCycle) Display() {
|
||||||
|
ticker := time.NewTicker(c.interval)
|
||||||
|
go func() {
|
||||||
|
for range ticker.C {
|
||||||
|
c.display()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// display writes to stdout the state of the scheduler as a table.
|
||||||
|
func (c *SchedulerCycle) display() { //nolint:gocyclo // not complex
|
||||||
|
c.l.RLock()
|
||||||
|
defer c.l.RUnlock()
|
||||||
|
|
||||||
|
var maxCols int
|
||||||
|
for i := range c.slots {
|
||||||
|
if l := len(c.slots[i]); l > maxCols {
|
||||||
|
maxCols = l
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
table := [][]string{}
|
||||||
|
title := fmt.Sprintf("%s (slot: %d)", TableTitle, c.currentSlot+1)
|
||||||
|
table = append(table, []string{title})
|
||||||
|
for {
|
||||||
|
if maxCols == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
row := make([]string, CycleLength)
|
||||||
|
for i := 0; i <= MaxSlotsIdx; i++ {
|
||||||
|
row[i] = "_"
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range c.slots {
|
||||||
|
if len(c.slots[i]) < maxCols {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
j := c.slots[i][maxCols-1]
|
||||||
|
switch j.GetState() {
|
||||||
|
case job.Pending:
|
||||||
|
row[i] = "P"
|
||||||
|
case job.Running:
|
||||||
|
row[i] = "R"
|
||||||
|
case job.Failed:
|
||||||
|
row[i] = "X"
|
||||||
|
case job.Abort:
|
||||||
|
row[i] = "A"
|
||||||
|
case job.Unknown:
|
||||||
|
row[i] = "?"
|
||||||
|
case job.Success:
|
||||||
|
row[i] = "O"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
table = append(table, row)
|
||||||
|
maxCols--
|
||||||
|
}
|
||||||
|
|
||||||
|
row := make([]string, CycleLength)
|
||||||
|
for i := 0; i <= MaxSlotsIdx; i++ {
|
||||||
|
row[i] = "-"
|
||||||
|
}
|
||||||
|
table = append(table, row)
|
||||||
|
|
||||||
|
if l := len(table); l > 0 {
|
||||||
|
table[l-1][c.currentSlot] = Cursor
|
||||||
|
}
|
||||||
|
|
||||||
|
tableFormat := ""
|
||||||
|
for _, r := range table {
|
||||||
|
tableFormat += strings.Join(r, " ")
|
||||||
|
tableFormat += "\n"
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println(tableFormat)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SchedulerCycle) getJob(id uuid.UUID) *job.Job {
|
||||||
|
c.l.RLock()
|
||||||
|
defer c.l.RUnlock()
|
||||||
|
|
||||||
|
j, ok := c.jobs[id]
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return j
|
||||||
|
}
|
||||||
|
|
||||||
|
// getCurrentSlotJobs collects all the current slot jobs
|
||||||
|
// and clean the slot.
|
||||||
|
func (c *SchedulerCycle) getCurrentSlotJobs() (int, []*job.Job) {
|
||||||
|
c.l.Lock()
|
||||||
|
defer c.l.Unlock()
|
||||||
|
|
||||||
|
jobs := c.slots[c.currentSlot]
|
||||||
|
|
||||||
|
c.slots[c.currentSlot] = []*job.Job{}
|
||||||
|
|
||||||
|
return c.currentSlot, jobs
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateSlot add a job to the slot where it was before.
|
||||||
|
func (c *SchedulerCycle) updateSlot(row int, j *job.Job) {
|
||||||
|
c.l.Lock()
|
||||||
|
defer c.l.Unlock()
|
||||||
|
|
||||||
|
c.slots[row] = append(c.slots[row], j)
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateCurrentSlot add a job to the current slot.
|
||||||
|
func (c *SchedulerCycle) updateCurrentSlot(j *job.Job) {
|
||||||
|
c.l.Lock()
|
||||||
|
defer c.l.Unlock()
|
||||||
|
|
||||||
|
c.slots[c.currentSlot] = append(c.slots[c.currentSlot], j)
|
||||||
|
}
|
||||||
|
|
||||||
|
// incr increments the slot cursor.
|
||||||
|
// It the cursor reaches `MaxSlotsIdx`, it goes back to 0.
|
||||||
|
func (c *SchedulerCycle) incr() {
|
||||||
|
c.l.Lock()
|
||||||
|
defer c.l.Unlock()
|
||||||
|
|
||||||
|
nextSlot := c.currentSlot + 1
|
||||||
|
if nextSlot > MaxSlotsIdx {
|
||||||
|
nextSlot = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
c.currentSlot = nextSlot
|
||||||
|
}
|
||||||
|
|
||||||
|
// dispatch gets jobs from the current slot, resets the slot
|
||||||
|
// and dispatch all jobs to the workers.
|
||||||
|
//
|
||||||
|
// It all the workers are busy, the jobs are re-schedule in the same slot
|
||||||
|
// to be executed in the next cycle.
|
||||||
|
func (c *SchedulerCycle) dispatch() {
|
||||||
|
row, jobs := c.getCurrentSlotJobs()
|
||||||
|
for _, j := range jobs {
|
||||||
|
if j.GetState() == job.Abort {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case c.chJobs <- &JobSlot{row: row, Job: j}:
|
||||||
|
default:
|
||||||
|
log.Warn().Msg("unable to put job in workers, trying next cycle")
|
||||||
|
c.updateSlot(row, j)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// run launches the workers and the ticker.
|
||||||
|
func (c *SchedulerCycle) run() {
|
||||||
|
c.workers()
|
||||||
|
c.tick()
|
||||||
|
}
|
||||||
|
|
||||||
|
// workers launches `MaxWorkers` number of worker to execute job.
|
||||||
|
// If job returns `ErrJobNotCompletedYet`, it re-schedules in the same slot.
|
||||||
|
func (c *SchedulerCycle) workers() {
|
||||||
|
for i := 0; i < MaxWorkers; i++ {
|
||||||
|
c.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer c.wg.Done()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case j := <-c.chJobs:
|
||||||
|
c.executeJob(j.Job, c.updateCurrentSlot)
|
||||||
|
case <-c.ctx.Done():
|
||||||
|
log.Error().Msg("context done, worker is stopping...")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SchedulerCycle) executeJob(j *job.Job, fnFallBack func(*job.Job)) {
|
||||||
|
j.Run(c.ctx)
|
||||||
|
if j.GetState() == job.Pending {
|
||||||
|
fnFallBack(j)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// tick is a simple ticker incrementing at each scheduler interval,
|
||||||
|
// the slot cursor and dispatch jobs to the workers.
|
||||||
|
func (c *SchedulerCycle) tick() {
|
||||||
|
c.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer c.wg.Done()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-c.ctx.Done():
|
||||||
|
log.Error().Msg("context done, ticker is stopping...")
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
time.Sleep(c.interval)
|
||||||
|
c.incr()
|
||||||
|
c.dispatch()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
33
internal/scheduler/scheduler_test.go
Normal file
33
internal/scheduler/scheduler_test.go
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
package scheduler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"cycle-scheduler/internal/job"
|
||||||
|
"errors"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSlot(t *testing.T) {
|
||||||
|
ctx, fnCancel := context.WithCancel(context.Background())
|
||||||
|
defer fnCancel()
|
||||||
|
|
||||||
|
s := NewSchedulerCycle(ctx, 1*time.Millisecond)
|
||||||
|
|
||||||
|
s.Delay(func(ctx context.Context) error {
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
s.Delay(func(ctx context.Context) error {
|
||||||
|
return job.ErrJobNotCompletedYet
|
||||||
|
})
|
||||||
|
j3 := s.Delay(func(ctx context.Context) error {
|
||||||
|
return errors.New("errors")
|
||||||
|
})
|
||||||
|
|
||||||
|
time.Sleep(2 * time.Millisecond)
|
||||||
|
|
||||||
|
assert.Equal(t, 3, s.Len())
|
||||||
|
assert.Equal(t, job.Failed.String(), s.GetJobDetails(j3).State)
|
||||||
|
}
|
||||||
122
main.go
Normal file
122
main.go
Normal file
@ -0,0 +1,122 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"cycle-scheduler/internal/job"
|
||||||
|
"cycle-scheduler/internal/scheduler"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"math/rand/v2"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/rs/zerolog"
|
||||||
|
"github.com/rs/zerolog/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
func initLogger() {
|
||||||
|
zerolog.TimeFieldFormat = zerolog.TimeFormatUnix
|
||||||
|
log.Logger = log.With().Caller().Logger().Output(zerolog.ConsoleWriter{Out: os.Stderr})
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
initLogger()
|
||||||
|
|
||||||
|
ctx, stop := signal.NotifyContext(
|
||||||
|
context.Background(),
|
||||||
|
os.Interrupt,
|
||||||
|
os.Kill,
|
||||||
|
)
|
||||||
|
defer stop()
|
||||||
|
|
||||||
|
interval := 200 * time.Millisecond
|
||||||
|
s := scheduler.NewSchedulerCycle(ctx, interval)
|
||||||
|
s.Display()
|
||||||
|
|
||||||
|
// pending test
|
||||||
|
for i := 0; i < 20; i++ {
|
||||||
|
go func(i int) {
|
||||||
|
time.Sleep(time.Duration(i) * time.Second)
|
||||||
|
s.Delay(func(ctx context.Context) error {
|
||||||
|
time.Sleep(4 * time.Second) //nolint:mnd // test purpose
|
||||||
|
if rand.IntN(10)%2 == 0 { //nolint:gosec,mnd // test prupose
|
||||||
|
return job.ErrJobNotCompletedYet
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
// abort test
|
||||||
|
j := s.Delay(func(ctx context.Context) error {
|
||||||
|
time.Sleep(4 * time.Second) //nolint:mnd // test purpose
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
return job.ErrJobNotCompletedYet
|
||||||
|
})
|
||||||
|
go func() {
|
||||||
|
time.Sleep(2 * time.Second) //nolint:mnd // test purpose
|
||||||
|
s.Abort(j)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// abort test 2
|
||||||
|
j2 := s.Delay(func(ctx context.Context) error {
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
return job.ErrJobNotCompletedYet
|
||||||
|
})
|
||||||
|
go func() {
|
||||||
|
time.Sleep(10 * time.Second) //nolint:mnd // test purpose
|
||||||
|
s.Abort(j2)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// error test
|
||||||
|
s.Delay(func(ctx context.Context) error {
|
||||||
|
time.Sleep(5 * time.Second) //nolint:mnd // test purpose
|
||||||
|
return errors.New("err")
|
||||||
|
})
|
||||||
|
|
||||||
|
// success test
|
||||||
|
go func() {
|
||||||
|
time.Sleep(10 * time.Second) //nolint:mnd // test purpose
|
||||||
|
s.Delay(func(ctx context.Context) error {
|
||||||
|
time.Sleep(5 * time.Second) //nolint:mnd // test purpose
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
time.Sleep(2 * time.Second) //nolint:mnd // test purpose
|
||||||
|
if s.HasAllJobsDone() {
|
||||||
|
s.Stop()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
<-s.Done()
|
||||||
|
|
||||||
|
jds := s.GetJobsDetails()
|
||||||
|
for _, jd := range jds {
|
||||||
|
c, err := json.Marshal(&jd)
|
||||||
|
if err != nil {
|
||||||
|
log.Err(err).Str("job", jd.ID.String()).Msg("unable to parse job details into JSON")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
fmt.Println(string(c))
|
||||||
|
}
|
||||||
|
}
|
||||||
213
scheduler.go
213
scheduler.go
@ -1,213 +0,0 @@
|
|||||||
package scheduler
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/google/uuid"
|
|
||||||
|
|
||||||
"github.com/rs/zerolog/log"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
ChanLength = 500
|
|
||||||
DefaultExecInterval = 30 * time.Second
|
|
||||||
)
|
|
||||||
|
|
||||||
type IScheduler interface {
|
|
||||||
Delay(fnJob FnJob, opts ...TaskOption) uuid.UUID
|
|
||||||
}
|
|
||||||
|
|
||||||
// SchedulerCycle is a simple scheduler handling jobs and executes them at regular interval.
|
|
||||||
// If a task is not in desired state, the task is re-scheduled.
|
|
||||||
type SchedulerCycle struct {
|
|
||||||
wg sync.WaitGroup
|
|
||||||
|
|
||||||
ctx context.Context
|
|
||||||
fnCancel context.CancelFunc
|
|
||||||
|
|
||||||
tasks tasks
|
|
||||||
|
|
||||||
chTasks chan *task
|
|
||||||
chDone chan struct{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewSchedulerCycle(ctx context.Context, workers uint32) *SchedulerCycle {
|
|
||||||
ctxChild, fnCancel := context.WithCancel(ctx)
|
|
||||||
|
|
||||||
c := SchedulerCycle{
|
|
||||||
wg: sync.WaitGroup{},
|
|
||||||
ctx: ctxChild,
|
|
||||||
fnCancel: fnCancel,
|
|
||||||
tasks: newTasks(),
|
|
||||||
chTasks: make(chan *task, ChanLength),
|
|
||||||
}
|
|
||||||
|
|
||||||
done := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
<-c.ctx.Done()
|
|
||||||
defer c.fnCancel()
|
|
||||||
c.wg.Wait()
|
|
||||||
c.stop()
|
|
||||||
done <- struct{}{}
|
|
||||||
}()
|
|
||||||
c.chDone = done
|
|
||||||
|
|
||||||
c.run(workers)
|
|
||||||
|
|
||||||
return &c
|
|
||||||
}
|
|
||||||
|
|
||||||
// delay sets the task timer when the task should be scheduled.
|
|
||||||
func (c *SchedulerCycle) delay(t *task) {
|
|
||||||
interval := DefaultExecInterval
|
|
||||||
if t.execInterval != nil {
|
|
||||||
interval = *t.execInterval
|
|
||||||
}
|
|
||||||
|
|
||||||
t.setTimer(
|
|
||||||
time.AfterFunc(interval, func() {
|
|
||||||
select {
|
|
||||||
case c.chTasks <- t:
|
|
||||||
default:
|
|
||||||
log.Warn().Str("id", t.GetID().String()).Msg("queue is full, can't accept new task, delayed it")
|
|
||||||
c.delay(t)
|
|
||||||
}
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
// exec runs the task now or if all the workers are in use, delayed it.
|
|
||||||
func (c *SchedulerCycle) exec(t *task) {
|
|
||||||
select {
|
|
||||||
case c.chTasks <- t:
|
|
||||||
default:
|
|
||||||
log.Warn().Str("id", t.GetID().String()).Msg("queue is full, can't accept new task, delayed it")
|
|
||||||
c.delay(t)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *SchedulerCycle) getTask(id uuid.UUID) *task {
|
|
||||||
return c.tasks.get(id)
|
|
||||||
}
|
|
||||||
|
|
||||||
// run launches a number of worker to execute tasks.
|
|
||||||
func (c *SchedulerCycle) run(n uint32) {
|
|
||||||
for i := 0; i < int(n); i++ {
|
|
||||||
c.wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer c.wg.Done()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case t := <-c.chTasks:
|
|
||||||
c.execute(t, c.delay)
|
|
||||||
case <-c.ctx.Done():
|
|
||||||
log.Error().Msg("context done, worker is stopping...")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// execute executes the task.
|
|
||||||
//
|
|
||||||
// It does not handle task error, it's up to the task to implement its own callbacks.
|
|
||||||
// In case of pending state, a callback is executed for actions. For the others states,
|
|
||||||
// the task is deleted from the scheduler.
|
|
||||||
func (c *SchedulerCycle) execute(t *task, fnFallBack func(*task)) {
|
|
||||||
t.Run(c.ctx)
|
|
||||||
|
|
||||||
switch t.GetState() {
|
|
||||||
case Pending:
|
|
||||||
fnFallBack(t)
|
|
||||||
case Success, Failed, Abort, Unknown:
|
|
||||||
c.tasks.delete(t)
|
|
||||||
case Running:
|
|
||||||
c.tasks.delete(t)
|
|
||||||
log.Debug().Str("id", t.GetID().String()).Msg("weird state (running) after job execution...")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// stop aborts all tasks and waits until tasks are stopped.
|
|
||||||
// If the process can't be stopped within 10s, too bad...
|
|
||||||
func (c *SchedulerCycle) stop() {
|
|
||||||
c.tasks.abort()
|
|
||||||
|
|
||||||
if c.TasksDone() {
|
|
||||||
log.Info().Msg("all tasks has been stopped gracefully")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ctxTimeout := 10 * time.Second
|
|
||||||
ctx, fnCancel := context.WithTimeout(c.ctx, ctxTimeout)
|
|
||||||
defer fnCancel()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
log.Error().Msg("stop context done, tasks has been stopped gracefully")
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
if c.TasksDone() {
|
|
||||||
log.Info().Msg("all tasks has been stopped gracefully")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
time.Sleep(time.Second)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *SchedulerCycle) Done() <-chan struct{} {
|
|
||||||
return c.chDone
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *SchedulerCycle) Len() int {
|
|
||||||
return c.tasks.len()
|
|
||||||
}
|
|
||||||
|
|
||||||
// TasksDone checks whether all the tasks has been completed.
|
|
||||||
func (c *SchedulerCycle) TasksDone() bool {
|
|
||||||
return c.tasks.completed()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *SchedulerCycle) GetTasksDetails() []TaskDetails {
|
|
||||||
return c.tasks.getAllDetails()
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetTaskDetails returns the task details by id.
|
|
||||||
func (c *SchedulerCycle) GetTaskDetails(id uuid.UUID) TaskDetails {
|
|
||||||
return c.tasks.getDetails(id)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delay builds a task and adds it to the scheduler engine.
|
|
||||||
func (c *SchedulerCycle) Delay(fnJob FnJob, opts ...TaskOption) uuid.UUID {
|
|
||||||
select {
|
|
||||||
case <-c.Done():
|
|
||||||
log.Error().Msg("context done unable to add new job")
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
t := NewTask(fnJob, opts...)
|
|
||||||
|
|
||||||
c.tasks.add(t)
|
|
||||||
c.exec(t)
|
|
||||||
|
|
||||||
log.Info().Str("id", t.GetID().String()).Msg("task added successfully")
|
|
||||||
return t.GetID()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Abort aborts the task given by its id if it exists.
|
|
||||||
func (c *SchedulerCycle) Abort(id uuid.UUID) bool {
|
|
||||||
if t := c.getTask(id); t != nil {
|
|
||||||
t.Abort()
|
|
||||||
|
|
||||||
log.Info().Str("id", t.GetID().String()).Msg("abort task done")
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
@ -1,93 +0,0 @@
|
|||||||
package scheduler
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestScheduler(t *testing.T) {
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
var buf int
|
|
||||||
|
|
||||||
s := NewSchedulerCycle(ctx, 1)
|
|
||||||
taskID := s.Delay(func(ctx context.Context) (any, error) {
|
|
||||||
time.Sleep(50 * time.Millisecond)
|
|
||||||
buf += 1
|
|
||||||
return nil, nil
|
|
||||||
}, WithExecInterval(2*time.Millisecond))
|
|
||||||
|
|
||||||
assert.NotEmpty(t, taskID)
|
|
||||||
assert.False(t, s.TasksDone())
|
|
||||||
|
|
||||||
time.Sleep(2 * time.Millisecond)
|
|
||||||
|
|
||||||
details := s.GetTaskDetails(taskID)
|
|
||||||
assert.Equal(t, "running", details.State)
|
|
||||||
assert.LessOrEqual(t, details.ElapsedTime, 50*time.Millisecond)
|
|
||||||
|
|
||||||
time.Sleep(50 * time.Millisecond)
|
|
||||||
|
|
||||||
assert.True(t, s.TasksDone())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSchedulerLoad(t *testing.T) {
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
s := NewSchedulerCycle(ctx, 1)
|
|
||||||
|
|
||||||
for i := 0; i < 500; i++ {
|
|
||||||
s.Delay(func(ctx context.Context) (any, error) {
|
|
||||||
time.Sleep(1 * time.Millisecond)
|
|
||||||
return nil, nil
|
|
||||||
}, WithExecInterval(1*time.Millisecond))
|
|
||||||
}
|
|
||||||
|
|
||||||
assert.Eventually(t, func() bool {
|
|
||||||
return s.TasksDone()
|
|
||||||
}, time.Second, 250*time.Millisecond)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSchedulerExecInterval(t *testing.T) {
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
s := NewSchedulerCycle(ctx, 1)
|
|
||||||
|
|
||||||
s.Delay(
|
|
||||||
func(ctx context.Context) (any, error) {
|
|
||||||
return nil, ErrJobNotCompletedYet
|
|
||||||
},
|
|
||||||
WithMaxDuration(50*time.Millisecond),
|
|
||||||
WithExecInterval(2*time.Millisecond),
|
|
||||||
)
|
|
||||||
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
|
|
||||||
assert.True(t, s.TasksDone())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSchedulerContextDone(t *testing.T) {
|
|
||||||
ctx, fnCancel := context.WithCancel(context.Background())
|
|
||||||
|
|
||||||
s := NewSchedulerCycle(ctx, 1)
|
|
||||||
|
|
||||||
for i := 0; i < 250; i++ {
|
|
||||||
s.Delay(
|
|
||||||
func(ctx context.Context) (any, error) {
|
|
||||||
return nil, ErrJobNotCompletedYet
|
|
||||||
},
|
|
||||||
WithMaxDuration(100*time.Millisecond),
|
|
||||||
WithExecInterval(2*time.Millisecond),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
time.Sleep(50 * time.Millisecond)
|
|
||||||
fnCancel()
|
|
||||||
}()
|
|
||||||
|
|
||||||
<-s.Done()
|
|
||||||
}
|
|
||||||
451
task.go
451
task.go
@ -1,451 +0,0 @@
|
|||||||
package scheduler
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/google/uuid"
|
|
||||||
|
|
||||||
"github.com/rs/zerolog/log"
|
|
||||||
)
|
|
||||||
|
|
||||||
type State int
|
|
||||||
|
|
||||||
const (
|
|
||||||
Pending State = iota
|
|
||||||
Running
|
|
||||||
Success
|
|
||||||
Failed
|
|
||||||
Abort
|
|
||||||
Unknown
|
|
||||||
)
|
|
||||||
|
|
||||||
const UnknownState = "unknown"
|
|
||||||
|
|
||||||
func (s State) String() string {
|
|
||||||
return [...]string{"pending", "running", "success", "failed", "abort", "unknown"}[s]
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
ErrJobAborted = errors.New("job has been aborted")
|
|
||||||
ErrJobNotCompletedYet = errors.New("job is not right state, retrying")
|
|
||||||
ErrTimeExceeded = errors.New("time exceeded")
|
|
||||||
ErrExecutionEngine = errors.New("execution engine")
|
|
||||||
)
|
|
||||||
|
|
||||||
type FnJob func(ctx context.Context) (any, error)
|
|
||||||
|
|
||||||
type FnResult func(ctx context.Context, res any)
|
|
||||||
type FnError func(ctx context.Context, err error)
|
|
||||||
|
|
||||||
type TaskDetails struct {
|
|
||||||
State string `json:"state"`
|
|
||||||
CreatedAt time.Time `json:"createdAt"`
|
|
||||||
UpdatedAt *time.Time `json:"updatedAt,omitempty"`
|
|
||||||
MaxDuration *time.Duration `json:"maxDuration,omitempty"`
|
|
||||||
Attempts uint32 `json:"attempts"`
|
|
||||||
Err string `json:"error"`
|
|
||||||
ElapsedTime time.Duration `json:"elapsedTime"`
|
|
||||||
AdditionalInfo map[string]string `json:"additionalInfos"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func (td *TaskDetails) log() {
|
|
||||||
args := []any{}
|
|
||||||
args = append(args,
|
|
||||||
"state", td.State,
|
|
||||||
"createdAt", td.CreatedAt.Format("2006-01-02T15:04:05Z"),
|
|
||||||
"updatedAt", td.UpdatedAt.Format("2006-01-02T15:04:05Z"),
|
|
||||||
"elapsedTime", td.ElapsedTime.String(),
|
|
||||||
"attempts", td.Attempts,
|
|
||||||
)
|
|
||||||
|
|
||||||
if td.AdditionalInfo != nil {
|
|
||||||
for k, v := range td.AdditionalInfo {
|
|
||||||
args = append(args, k, v)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if td.Err != "" {
|
|
||||||
args = append(args, "error", td.Err)
|
|
||||||
log.Error().Any("args", args).Msg("task failed")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Info().Any("args", args).Msg("task execution done")
|
|
||||||
}
|
|
||||||
|
|
||||||
type task struct {
|
|
||||||
l sync.RWMutex
|
|
||||||
|
|
||||||
id uuid.UUID
|
|
||||||
createdAt time.Time
|
|
||||||
updatedAt *time.Time
|
|
||||||
state State
|
|
||||||
|
|
||||||
fnJob FnJob
|
|
||||||
fnSuccess FnResult
|
|
||||||
fnError FnError
|
|
||||||
|
|
||||||
attempts uint32
|
|
||||||
timer *time.Timer
|
|
||||||
maxDuration *time.Duration
|
|
||||||
execTimeout *time.Duration
|
|
||||||
execInterval *time.Duration
|
|
||||||
|
|
||||||
res any
|
|
||||||
err error
|
|
||||||
|
|
||||||
additionalInfos map[string]string
|
|
||||||
|
|
||||||
chAbort chan struct{}
|
|
||||||
}
|
|
||||||
|
|
||||||
type TaskOption func(t *task)
|
|
||||||
|
|
||||||
func WithMaxDuration(d time.Duration) TaskOption {
|
|
||||||
return func(t *task) {
|
|
||||||
t.maxDuration = &d
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithFnSuccess(f FnResult) TaskOption {
|
|
||||||
return func(t *task) {
|
|
||||||
t.fnSuccess = f
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithFnError(f FnError) TaskOption {
|
|
||||||
return func(t *task) {
|
|
||||||
t.fnError = f
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithExecTimeout(d time.Duration) TaskOption {
|
|
||||||
return func(t *task) {
|
|
||||||
t.execTimeout = &d
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithExecInterval(d time.Duration) TaskOption {
|
|
||||||
return func(t *task) {
|
|
||||||
t.execInterval = &d
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithAdditionalInfos(k, v string) TaskOption {
|
|
||||||
return func(t *task) {
|
|
||||||
if k == "" || v == "" {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if t.additionalInfos == nil {
|
|
||||||
t.additionalInfos = map[string]string{}
|
|
||||||
}
|
|
||||||
|
|
||||||
t.additionalInfos[k] = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewTask builds a task.
|
|
||||||
// Here the options details that can be set to the task:
|
|
||||||
// - WithMaxDuration(time.Duration): the task will stop executing if the duration is exceeded (raise ErrTimeExceeded)
|
|
||||||
// - WithFnSuccess(FnResult): call a function after a success execution
|
|
||||||
// - WithFnError(FnError): call a function if an error occurred
|
|
||||||
// - WithExecTimeout(time.Duration): sets a timeout for the task execution
|
|
||||||
// - WithAdditionalInfos(k, v string): key-value additional informations (does not inferred with the task execution, log purpose)
|
|
||||||
//
|
|
||||||
// Scheduler options:
|
|
||||||
// - WithExecInterval(time.Duration): specify the execution interval
|
|
||||||
func NewTask(f FnJob, opts ...TaskOption) *task {
|
|
||||||
t := task{
|
|
||||||
id: uuid.New(),
|
|
||||||
createdAt: time.Now().UTC(),
|
|
||||||
state: Pending,
|
|
||||||
fnJob: f,
|
|
||||||
chAbort: make(chan struct{}, 1),
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, o := range opts {
|
|
||||||
o(&t)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &t
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *task) setState(s State) {
|
|
||||||
t.l.Lock()
|
|
||||||
defer t.l.Unlock()
|
|
||||||
|
|
||||||
now := time.Now().UTC()
|
|
||||||
t.updatedAt = &now
|
|
||||||
|
|
||||||
t.state = s
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *task) setSuccess(ctx context.Context, res any) {
|
|
||||||
t.l.Lock()
|
|
||||||
defer t.l.Unlock()
|
|
||||||
|
|
||||||
now := time.Now().UTC()
|
|
||||||
t.updatedAt = &now
|
|
||||||
|
|
||||||
t.state = Success
|
|
||||||
t.res = res
|
|
||||||
|
|
||||||
if t.fnSuccess != nil {
|
|
||||||
t.fnSuccess(ctx, res)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *task) setFail(ctx context.Context, err error) {
|
|
||||||
t.l.Lock()
|
|
||||||
defer t.l.Unlock()
|
|
||||||
|
|
||||||
now := time.Now().UTC()
|
|
||||||
t.updatedAt = &now
|
|
||||||
|
|
||||||
if t.state != Abort {
|
|
||||||
t.state = Failed
|
|
||||||
}
|
|
||||||
|
|
||||||
t.err = err
|
|
||||||
|
|
||||||
if t.fnError != nil {
|
|
||||||
t.fnError(ctx, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *task) setTimer(tm *time.Timer) {
|
|
||||||
t.l.Lock()
|
|
||||||
defer t.l.Unlock()
|
|
||||||
|
|
||||||
t.timer = tm
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *task) stopTimer() {
|
|
||||||
t.l.Lock()
|
|
||||||
defer t.l.Unlock()
|
|
||||||
|
|
||||||
if t.timer != nil {
|
|
||||||
t.timer.Stop()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *task) incr() {
|
|
||||||
t.l.Lock()
|
|
||||||
defer t.l.Unlock()
|
|
||||||
|
|
||||||
t.attempts += 1
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *task) log() {
|
|
||||||
td := t.IntoDetails()
|
|
||||||
td.log()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *task) GetID() uuid.UUID {
|
|
||||||
t.l.RLock()
|
|
||||||
defer t.l.RUnlock()
|
|
||||||
|
|
||||||
return t.id
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *task) GetAttempts() uint32 {
|
|
||||||
t.l.RLock()
|
|
||||||
defer t.l.RUnlock()
|
|
||||||
|
|
||||||
return t.attempts
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *task) GetState() State {
|
|
||||||
t.l.RLock()
|
|
||||||
defer t.l.RUnlock()
|
|
||||||
|
|
||||||
return t.state
|
|
||||||
}
|
|
||||||
|
|
||||||
// TimeExceeded checks if the task does not reached its max duration execution (maxDuration).
|
|
||||||
func (t *task) TimeExceeded() bool {
|
|
||||||
t.l.RLock()
|
|
||||||
defer t.l.RUnlock()
|
|
||||||
|
|
||||||
if md := t.maxDuration; md != nil {
|
|
||||||
return time.Since(t.createdAt) >= *md
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *task) Abort() {
|
|
||||||
t.stopTimer()
|
|
||||||
t.chAbort <- struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *task) Run(ctx context.Context) {
|
|
||||||
if t.TimeExceeded() {
|
|
||||||
t.setFail(ctx, ErrTimeExceeded)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if s := t.GetState(); s != Pending {
|
|
||||||
log.Error().Msg("unable to launch a task that not in pending state")
|
|
||||||
t.setFail(ctx, ErrExecutionEngine)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var ctxExec context.Context
|
|
||||||
var fnCancel context.CancelFunc
|
|
||||||
if t.execTimeout != nil {
|
|
||||||
ctxExec, fnCancel = context.WithTimeout(ctx, *t.execTimeout)
|
|
||||||
} else {
|
|
||||||
ctxExec, fnCancel = context.WithCancel(ctx)
|
|
||||||
}
|
|
||||||
defer fnCancel()
|
|
||||||
|
|
||||||
t.incr()
|
|
||||||
t.setState(Running)
|
|
||||||
|
|
||||||
log.Info().Str("id", t.GetID().String()).Msg("task is running...")
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for range t.chAbort {
|
|
||||||
t.setState(Abort)
|
|
||||||
fnCancel()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
defer t.log()
|
|
||||||
res, err := t.fnJob(ctxExec)
|
|
||||||
if err != nil {
|
|
||||||
if errors.Is(err, ErrJobNotCompletedYet) {
|
|
||||||
t.setState(Pending)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
t.setFail(ctx, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
t.setSuccess(ctx, res)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *task) IntoDetails() TaskDetails {
|
|
||||||
t.l.RLock()
|
|
||||||
defer t.l.RUnlock()
|
|
||||||
|
|
||||||
td := TaskDetails{
|
|
||||||
CreatedAt: t.createdAt,
|
|
||||||
UpdatedAt: t.updatedAt,
|
|
||||||
State: t.state.String(),
|
|
||||||
Attempts: t.attempts,
|
|
||||||
MaxDuration: t.maxDuration,
|
|
||||||
}
|
|
||||||
|
|
||||||
if t.state == Pending || t.state == Running {
|
|
||||||
td.ElapsedTime = time.Since(t.createdAt)
|
|
||||||
} else {
|
|
||||||
td.ElapsedTime = t.updatedAt.Sub(t.createdAt)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := t.err; err != nil {
|
|
||||||
td.Err = err.Error()
|
|
||||||
}
|
|
||||||
|
|
||||||
if t.additionalInfos != nil {
|
|
||||||
td.AdditionalInfo = t.additionalInfos
|
|
||||||
}
|
|
||||||
|
|
||||||
return td
|
|
||||||
}
|
|
||||||
|
|
||||||
type tasks struct {
|
|
||||||
l sync.RWMutex
|
|
||||||
s map[uuid.UUID]*task
|
|
||||||
}
|
|
||||||
|
|
||||||
func newTasks() tasks {
|
|
||||||
return tasks{
|
|
||||||
s: make(map[uuid.UUID]*task),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ts *tasks) add(t *task) {
|
|
||||||
ts.l.Lock()
|
|
||||||
defer ts.l.Unlock()
|
|
||||||
|
|
||||||
ts.s[t.GetID()] = t
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ts *tasks) delete(t *task) {
|
|
||||||
ts.l.Lock()
|
|
||||||
defer ts.l.Unlock()
|
|
||||||
|
|
||||||
delete(ts.s, t.GetID())
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ts *tasks) get(id uuid.UUID) *task {
|
|
||||||
ts.l.RLock()
|
|
||||||
defer ts.l.RUnlock()
|
|
||||||
|
|
||||||
t, ok := ts.s[id]
|
|
||||||
if !ok {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return t
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ts *tasks) len() int {
|
|
||||||
ts.l.RLock()
|
|
||||||
defer ts.l.RUnlock()
|
|
||||||
|
|
||||||
return len(ts.s)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ts *tasks) completed() bool {
|
|
||||||
ts.l.RLock()
|
|
||||||
defer ts.l.RUnlock()
|
|
||||||
|
|
||||||
for _, t := range ts.s {
|
|
||||||
if t.GetState() == Pending || t.GetState() == Running {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ts *tasks) getAllDetails() []TaskDetails {
|
|
||||||
ts.l.RLock()
|
|
||||||
defer ts.l.RUnlock()
|
|
||||||
|
|
||||||
details := []TaskDetails{}
|
|
||||||
for _, t := range ts.s {
|
|
||||||
details = append(details, t.IntoDetails())
|
|
||||||
}
|
|
||||||
|
|
||||||
return details
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ts *tasks) getDetails(id uuid.UUID) TaskDetails {
|
|
||||||
ts.l.RLock()
|
|
||||||
defer ts.l.RUnlock()
|
|
||||||
|
|
||||||
t, ok := ts.s[id]
|
|
||||||
if !ok {
|
|
||||||
return TaskDetails{State: UnknownState}
|
|
||||||
}
|
|
||||||
|
|
||||||
return t.IntoDetails()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ts *tasks) abort() {
|
|
||||||
ts.l.RLock()
|
|
||||||
defer ts.l.RUnlock()
|
|
||||||
|
|
||||||
for _, t := range ts.s {
|
|
||||||
t.Abort()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
243
task_test.go
243
task_test.go
@ -1,243 +0,0 @@
|
|||||||
package scheduler
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/google/uuid"
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestTask(t *testing.T) {
|
|
||||||
ctx := context.Background()
|
|
||||||
var i int
|
|
||||||
|
|
||||||
task := NewTask(
|
|
||||||
func(ctx context.Context) (any, error) {
|
|
||||||
i += 1
|
|
||||||
return nil, nil
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
task.Run(ctx)
|
|
||||||
|
|
||||||
assert.Equal(t, nil, task.res)
|
|
||||||
assert.NotEmpty(t, task.updatedAt)
|
|
||||||
assert.Equal(t, 1, i)
|
|
||||||
assert.Equal(t, 1, int(task.GetAttempts()))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestAbortTask(t *testing.T) {
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
task := NewTask(
|
|
||||||
func(ctx context.Context) (any, error) {
|
|
||||||
<-ctx.Done()
|
|
||||||
return nil, ctx.Err()
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
timer := time.NewTicker(200 * time.Millisecond)
|
|
||||||
go func() {
|
|
||||||
<-timer.C
|
|
||||||
task.Abort()
|
|
||||||
}()
|
|
||||||
task.Run(ctx)
|
|
||||||
|
|
||||||
assert.Equal(t, Abort, task.GetState())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestTaskContextDone(t *testing.T) {
|
|
||||||
ctx, fnCancel := context.WithCancel(context.Background())
|
|
||||||
|
|
||||||
task := NewTask(
|
|
||||||
func(ctx context.Context) (any, error) {
|
|
||||||
<-ctx.Done()
|
|
||||||
return nil, ctx.Err()
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
timer := time.NewTicker(200 * time.Millisecond)
|
|
||||||
go func() {
|
|
||||||
<-timer.C
|
|
||||||
fnCancel()
|
|
||||||
}()
|
|
||||||
task.Run(ctx)
|
|
||||||
|
|
||||||
assert.Equal(t, Failed, task.GetState())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestTaskFnSuccess(t *testing.T) {
|
|
||||||
ctx := context.Background()
|
|
||||||
var result int
|
|
||||||
|
|
||||||
task := NewTask(
|
|
||||||
func(ctx context.Context) (any, error) {
|
|
||||||
return 3, nil
|
|
||||||
},
|
|
||||||
WithFnSuccess(func(ctx context.Context, res any) {
|
|
||||||
t, ok := res.(int)
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
result = t * 2
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
|
|
||||||
task.Run(ctx)
|
|
||||||
|
|
||||||
assert.Equal(t, Success, task.GetState())
|
|
||||||
assert.Equal(t, 6, result)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestTaskFnError(t *testing.T) {
|
|
||||||
ctx := context.Background()
|
|
||||||
var result error
|
|
||||||
|
|
||||||
task := NewTask(
|
|
||||||
func(ctx context.Context) (any, error) {
|
|
||||||
return 3, errors.New("error occurred...")
|
|
||||||
},
|
|
||||||
WithFnError(func(ctx context.Context, err error) {
|
|
||||||
result = err
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
|
|
||||||
task.Run(ctx)
|
|
||||||
|
|
||||||
assert.Equal(t, Failed, task.GetState())
|
|
||||||
assert.Equal(t, "error occurred...", result.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestTaskWithErrJobNotCompletedYet(t *testing.T) {
|
|
||||||
ctx := context.Background()
|
|
||||||
var attempts int
|
|
||||||
|
|
||||||
task := NewTask(
|
|
||||||
func(ctx context.Context) (any, error) {
|
|
||||||
if attempts < 2 {
|
|
||||||
attempts += 1
|
|
||||||
return nil, ErrJobNotCompletedYet
|
|
||||||
}
|
|
||||||
return "ok", nil
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
for i := 0; i < 2; i++ {
|
|
||||||
task.Run(ctx)
|
|
||||||
assert.Equal(t, Pending, task.GetState())
|
|
||||||
}
|
|
||||||
|
|
||||||
task.Run(ctx)
|
|
||||||
assert.Equal(t, Success, task.GetState())
|
|
||||||
assert.Equal(t, 3, int(task.GetAttempts()))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestTaskTimeExceeded(t *testing.T) {
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
task := NewTask(
|
|
||||||
func(ctx context.Context) (any, error) {
|
|
||||||
return "ko", nil
|
|
||||||
},
|
|
||||||
WithMaxDuration(5*time.Millisecond),
|
|
||||||
)
|
|
||||||
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
|
|
||||||
task.Run(ctx)
|
|
||||||
assert.Equal(t, Failed, task.GetState())
|
|
||||||
assert.Equal(t, 0, int(task.GetAttempts()))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestTaskExecTimeout(t *testing.T) {
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
task := NewTask(
|
|
||||||
func(ctx context.Context) (any, error) {
|
|
||||||
<-ctx.Done()
|
|
||||||
return nil, ctx.Err()
|
|
||||||
},
|
|
||||||
WithExecTimeout(5*time.Millisecond),
|
|
||||||
)
|
|
||||||
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
|
|
||||||
task.Run(ctx)
|
|
||||||
assert.Equal(t, Failed, task.GetState())
|
|
||||||
assert.Equal(t, 1, int(task.GetAttempts()))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestTaskDetails(t *testing.T) {
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
task := NewTask(
|
|
||||||
func(ctx context.Context) (any, error) {
|
|
||||||
return "coucou", nil
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
details := task.IntoDetails()
|
|
||||||
|
|
||||||
assert.Equal(t, 0, int(details.Attempts))
|
|
||||||
assert.Equal(t, "pending", details.State)
|
|
||||||
assert.False(t, details.CreatedAt.IsZero())
|
|
||||||
assert.Empty(t, details.UpdatedAt)
|
|
||||||
assert.Nil(t, details.MaxDuration)
|
|
||||||
assert.Empty(t, details.Err)
|
|
||||||
assert.NotEmpty(t, details.ElapsedTime)
|
|
||||||
|
|
||||||
task.Run(ctx)
|
|
||||||
|
|
||||||
details = task.IntoDetails()
|
|
||||||
|
|
||||||
assert.Equal(t, 1, int(details.Attempts))
|
|
||||||
assert.Equal(t, "success", details.State)
|
|
||||||
assert.False(t, details.CreatedAt.IsZero())
|
|
||||||
assert.NotEmpty(t, details.UpdatedAt)
|
|
||||||
assert.Nil(t, details.MaxDuration)
|
|
||||||
assert.Empty(t, details.Err)
|
|
||||||
assert.NotEmpty(t, details.ElapsedTime)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestTaskAdditionalInfos(t *testing.T) {
|
|
||||||
t.Run("with key value", func(t *testing.T) {
|
|
||||||
elementID := uuid.NewString()
|
|
||||||
task := NewTask(
|
|
||||||
func(ctx context.Context) (any, error) {
|
|
||||||
return "yo", nil
|
|
||||||
},
|
|
||||||
WithAdditionalInfos("transportId", elementID),
|
|
||||||
WithAdditionalInfos("element", "transport"),
|
|
||||||
)
|
|
||||||
|
|
||||||
assert.Equal(t, elementID, task.additionalInfos["transportId"])
|
|
||||||
assert.Equal(t, "transport", task.additionalInfos["element"])
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("with empty key", func(t *testing.T) {
|
|
||||||
elementID := uuid.NewString()
|
|
||||||
task := NewTask(
|
|
||||||
func(ctx context.Context) (any, error) {
|
|
||||||
return "hello", nil
|
|
||||||
},
|
|
||||||
WithAdditionalInfos("", elementID),
|
|
||||||
WithAdditionalInfos("element", "transport"),
|
|
||||||
)
|
|
||||||
|
|
||||||
assert.Equal(t, "transport", task.additionalInfos["element"])
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("with empty infos", func(t *testing.T) {
|
|
||||||
task := NewTask(
|
|
||||||
func(ctx context.Context) (any, error) {
|
|
||||||
return "hey", nil
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
assert.Nil(t, task.additionalInfos)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
Loading…
x
Reference in New Issue
Block a user