Compare commits

..

13 Commits

Author SHA1 Message Date
rmanach
d5519cc064 improve README example 2024-11-16 11:23:22 +01:00
rmanach
f2a0830ca1 Merge tag 'v0.1.0' into develop
v0.1.0
2024-11-16 11:16:39 +01:00
rmanach
63210639ea Merge branch 'release/v0.1.0' 2024-11-16 11:15:45 +01:00
rmanach
4a39ba5547 fix go.mod + doc 2024-11-16 11:15:30 +01:00
rmanach
2a0ecf2d4e turn into lib 2024-11-16 11:00:46 +01:00
rmanach
3f1afb63d4 Merge pull request 'feat/engine' (#1) from feat/engine into main
Reviewed-on: https://gitea.thegux.fr/rmanach/cycle-scheduler/pulls/1
2024-09-24 14:47:21 +00:00
rmanach
df9a0ffbbc clean scheduler file + move tasks map 2024-09-24 16:45:09 +02:00
rmanach
ec53ec990a replace lock with atomic 2024-09-24 16:21:42 +02:00
rmanach
152c4f925a rework all the engine 2024-09-24 16:10:43 +02:00
rmanach
3cbea438f6 replace slot attempts int with atomics 2024-09-24 12:29:41 +02:00
rmanach
b6df47ed0c fix doc workers 2024-09-24 11:57:40 +02:00
rmanach
0cd4f264a3 rework locker + fix job state + fix graceful stop 2024-09-24 11:55:33 +02:00
rmanach
4da553d156 wrap job in job slot 2024-09-24 10:44:33 +02:00
11 changed files with 1041 additions and 737 deletions

View File

@ -1,6 +1,3 @@
run: lint
go run main.go
lint: lint:
golangci-lint run ./... golangci-lint run ./...

View File

@ -1,48 +1,47 @@
# cycle-scheduler # cycle-scheduler
cycle-scheduler is a simple scheduler handling jobs and executes them at regular interval. cycle-scheduler is a simple scheduler lib, handling tasks and executes them at regular interval. If a task is not in desired state, the task is re-scheduled.
Here a simple representation: **NOTE**: this should be not used for long-running tasks, it's more suitable for shorts tasks like polling etc...
```ascii
+------------------------------------------------------+
| +---+ +---+ +---+ +---+ +---+ +---+ |
| | | | | | | | | | | | | |
| | | | | | | | | | | | | |
| | | | | | | | | | | | | |
| | | | | | | | | | | | | |
| | | | | | | | | | | | | |
| | | | | | | | | | | | | |
| |s1 | |s2 | |s3 | |s4 | | | |s60| |
| +---+ +---+ +---+ +---+ +---+ +---+ |
+---------------^--------------------------------------+
```
Jobs are handle in a array of job slices.
At each interval (clock), the cursor `^` moves to the next slot (s*). ## Examples
If there are jobs, they are sent to workers to be executed * Init a new scheduler with 4 workers
and the slot is cleaned.
At the end of the slot (s60), the cursor re-starts a new cycle from s1.
If a job is not in a desire state, the job is re-scheduled in the current slot to be re-executed in the next cycle.
**NOTE**: This scheduler does not accept long running tasks. Job execution have a fixed timeout of 10s.
Pooling tasks are more suitable for this kind of scheduler.
## Run
You can run sample tests from `main.go` to see the scheduler in action:
```bash
make run
```
If all goes well, you should see this kind of output in the stdout:
```ascii
# cycle-scheduler (slot: 7)
_ P _ _ _ _ _ _ _ _ _ _ _ _
- - - - - - ^ - - - - - - -
```
> **P** means *pending* state
You can adjust the clock interval as needed in `main.go`:
```go ```go
interval := 200 * time.Millisecond package main
import (
"context"
"time"
scheduler "gitea.thegux.fr/rmanach/cycle-scheduler.git"
)
func main() {
ctx, fnCancel := context.WithCancel(context.Background())
s := scheduler.NewSchedulerCycle(ctx, 4)
// add a task with an execution interval of 2 ms (executed every 2 ms)
// and a maximum duration of 30 second.
s.Delay(
func(ctx context.Context) (any, error) {
// ...
return nil, nil
},
scheduler.WithExecInterval(2*time.Millisecond),
scheduler.WithMaxDuration(30*time.Second),
)
// stop the program after 5 seconds
go func() {
time.Sleep(5 * time.Second)
fnCancel()
}()
<-ctx.Done()
<-s.Done()
}
``` ```
**NOTE**: for `Delay` optionals arguments, check the `NewTask` method documentation for more details.

2
go.mod
View File

@ -1,4 +1,4 @@
module cycle-scheduler module gitea.thegux.fr/rmanach/cycle-scheduler.git
go 1.22.4 go 1.22.4

View File

@ -1,166 +0,0 @@
package job
import (
"context"
"errors"
"sync"
"time"
"github.com/google/uuid"
"github.com/rs/zerolog/log"
)
type State int
const (
Pending State = iota
Running
Success
Failed
Abort
Unknown
)
const UnknownState = "unknown"
var JobExecTimeout = 10 * time.Second
func (s State) String() string {
switch s {
case Pending:
return "pending"
case Running:
return "running"
case Success:
return "success"
case Failed:
return "failed"
case Abort:
return "abort"
case Unknown:
return UnknownState
default:
return UnknownState
}
}
var (
ErrJobAborted = errors.New("job has been aborted")
ErrJobNotCompletedYet = errors.New("job is not right state, retrying")
)
type FnJob func(ctx context.Context) error
type JobDetails struct {
ID uuid.UUID `json:"id"`
State string `json:"state"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt *time.Time `json:"updatedAt,omitempty"`
Err string `json:"error"`
}
// TODO(rmanach): add priority level
type Job struct {
l sync.RWMutex
id uuid.UUID
createdAt time.Time
updatedAt *time.Time
state State
task FnJob
err error
chAbort chan struct{}
}
func NewJob(task FnJob, row, col int) Job {
return Job{
id: uuid.New(),
createdAt: time.Now().UTC(),
state: Pending,
task: task,
chAbort: make(chan struct{}, 1),
}
}
func (j *Job) IntoDetails() JobDetails {
j.l.RLock()
defer j.l.RUnlock()
jd := JobDetails{
ID: j.id,
CreatedAt: j.createdAt,
State: j.state.String(),
}
if err := j.err; err != nil {
jd.Err = err.Error()
}
if ut := j.updatedAt; ut != nil {
jd.UpdatedAt = ut
}
return jd
}
func (j *Job) GetID() uuid.UUID {
return j.id
}
func (j *Job) GetState() State {
j.l.RLock()
defer j.l.RUnlock()
return j.state
}
func (j *Job) setState(s State) {
j.l.Lock()
defer j.l.Unlock()
now := time.Now().UTC()
j.updatedAt = &now
j.state = s
}
func (j *Job) setFail(err error) {
j.l.Lock()
defer j.l.Unlock()
now := time.Now().UTC()
j.updatedAt = &now
j.state = Failed
j.err = err
}
func (j *Job) Abort() {
j.setState(Abort)
j.chAbort <- struct{}{}
}
func (j *Job) Run(ctx context.Context) {
ctxExec, fnCancel := context.WithTimeout(ctx, JobExecTimeout)
defer fnCancel()
j.setState(Running)
log.Info().Str("job", j.GetID().String()).Msg("job running...")
go func() {
for range j.chAbort {
j.setState(Abort)
fnCancel()
}
}()
if err := j.task(ctxExec); err != nil {
if errors.Is(err, ErrJobNotCompletedYet) {
j.setState(Pending)
return
}
j.setFail(err)
return
}
j.setState(Success)
}

View File

@ -1,371 +0,0 @@
package scheduler
import (
"context"
"cycle-scheduler/internal/job"
"fmt"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/rs/zerolog/log"
)
const (
TableTitle = "# cycle-scheduler"
Cursor = "^"
CycleLength = 60
MaxWorkers = 5
)
const MaxSlotsIdx = 59
type JobSlot struct {
*job.Job
row int
}
// SchedulerCycle is a dumb scheduler.
// It handle job and executes it at each cycle (60 * interval).
//
// Jobs are handle in a array of job slices.
// At each interval (clock), the cursor moves to the next slot (s*).
// If there are jobs, they are sent to workers to be executed
// and the slot is cleaned.
//
// At the end of the slot (s60), the cursor re-starts a cycle at s1.
type SchedulerCycle struct {
l sync.RWMutex
wg sync.WaitGroup
ctx context.Context
fnCancel context.CancelFunc
interval time.Duration
currentSlot int
slots [60][]*job.Job
jobs map[uuid.UUID]*job.Job
chJobs chan *JobSlot
}
func NewSchedulerCycle(ctx context.Context, interval time.Duration) *SchedulerCycle {
ctxChild, fnCancel := context.WithCancel(ctx)
c := SchedulerCycle{
wg: sync.WaitGroup{},
ctx: ctxChild,
fnCancel: fnCancel,
interval: interval,
currentSlot: 0,
slots: [60][]*job.Job{},
jobs: make(map[uuid.UUID]*job.Job),
chJobs: make(chan *JobSlot),
}
c.run()
return &c
}
func (c *SchedulerCycle) Stop() {
c.fnCancel()
}
func (c *SchedulerCycle) Done() <-chan struct{} {
done := make(chan struct{})
go func() {
<-c.ctx.Done()
c.wg.Done()
done <- struct{}{}
}()
return done
}
func (c *SchedulerCycle) Len() int {
c.l.Lock()
defer c.l.Unlock()
return len(c.jobs)
}
func (c *SchedulerCycle) HasAllJobsDone() bool {
c.l.Lock()
defer c.l.Unlock()
for _, j := range c.jobs {
if j.GetState() == job.Pending || j.GetState() == job.Running {
return false
}
}
return true
}
func (c *SchedulerCycle) GetJobsDetails() []job.JobDetails {
c.l.Lock()
defer c.l.Unlock()
details := []job.JobDetails{}
for _, j := range c.jobs {
details = append(details, j.IntoDetails())
}
return details
}
// Delay builds a job and add it to the scheduler engine.
func (c *SchedulerCycle) Delay(fnJob job.FnJob) uuid.UUID {
c.l.Lock()
defer c.l.Unlock()
nextSlot := c.currentSlot + 1
if nextSlot > MaxSlotsIdx {
nextSlot = 0
}
j := job.NewJob(fnJob, nextSlot, len(c.slots[nextSlot]))
c.slots[nextSlot] = append(c.slots[nextSlot], &j)
c.jobs[j.GetID()] = &j
log.Info().Str("job", j.GetID().String()).Msg("job added successfully")
return j.GetID()
}
// Abort aborts the job given by its id if it exists..
func (c *SchedulerCycle) Abort(id uuid.UUID) bool {
if j := c.getJob(id); j != nil {
j.Abort()
log.Info().Str("job", j.GetID().String()).Msg("abort job done")
return true
}
return false
}
// GetJobDetails returns the job details by .
func (c *SchedulerCycle) GetJobDetails(id uuid.UUID) job.JobDetails {
c.l.Lock()
defer c.l.Unlock()
j, ok := c.jobs[id]
if !ok {
return job.JobDetails{
State: job.Unknown.String(),
}
}
return j.IntoDetails()
}
// Display outputs earch interval the scheduler state.
func (c *SchedulerCycle) Display() {
ticker := time.NewTicker(c.interval)
go func() {
for range ticker.C {
c.display()
}
}()
}
// display writes to stdout the state of the scheduler as a table.
func (c *SchedulerCycle) display() { //nolint:gocyclo // not complex
c.l.RLock()
defer c.l.RUnlock()
var maxCols int
for i := range c.slots {
if l := len(c.slots[i]); l > maxCols {
maxCols = l
}
}
table := [][]string{}
title := fmt.Sprintf("%s (slot: %d)", TableTitle, c.currentSlot+1)
table = append(table, []string{title})
for {
if maxCols == 0 {
break
}
row := make([]string, CycleLength)
for i := 0; i <= MaxSlotsIdx; i++ {
row[i] = "_"
}
for i := range c.slots {
if len(c.slots[i]) < maxCols {
continue
}
j := c.slots[i][maxCols-1]
switch j.GetState() {
case job.Pending:
row[i] = "P"
case job.Running:
row[i] = "R"
case job.Failed:
row[i] = "X"
case job.Abort:
row[i] = "A"
case job.Unknown:
row[i] = "?"
case job.Success:
row[i] = "O"
}
}
table = append(table, row)
maxCols--
}
row := make([]string, CycleLength)
for i := 0; i <= MaxSlotsIdx; i++ {
row[i] = "-"
}
table = append(table, row)
if l := len(table); l > 0 {
table[l-1][c.currentSlot] = Cursor
}
tableFormat := ""
for _, r := range table {
tableFormat += strings.Join(r, " ")
tableFormat += "\n"
}
fmt.Println(tableFormat)
}
func (c *SchedulerCycle) getJob(id uuid.UUID) *job.Job {
c.l.RLock()
defer c.l.RUnlock()
j, ok := c.jobs[id]
if !ok {
return nil
}
return j
}
// getCurrentSlotJobs collects all the current slot jobs
// and clean the slot.
func (c *SchedulerCycle) getCurrentSlotJobs() (int, []*job.Job) {
c.l.Lock()
defer c.l.Unlock()
jobs := c.slots[c.currentSlot]
c.slots[c.currentSlot] = []*job.Job{}
return c.currentSlot, jobs
}
// updateSlot add a job to the slot where it was before.
func (c *SchedulerCycle) updateSlot(row int, j *job.Job) {
c.l.Lock()
defer c.l.Unlock()
c.slots[row] = append(c.slots[row], j)
}
// updateCurrentSlot add a job to the current slot.
func (c *SchedulerCycle) updateCurrentSlot(j *job.Job) {
c.l.Lock()
defer c.l.Unlock()
c.slots[c.currentSlot] = append(c.slots[c.currentSlot], j)
}
// incr increments the slot cursor.
// It the cursor reaches `MaxSlotsIdx`, it goes back to 0.
func (c *SchedulerCycle) incr() {
c.l.Lock()
defer c.l.Unlock()
nextSlot := c.currentSlot + 1
if nextSlot > MaxSlotsIdx {
nextSlot = 0
}
c.currentSlot = nextSlot
}
// dispatch gets jobs from the current slot, resets the slot
// and dispatch all jobs to the workers.
//
// It all the workers are busy, the jobs are re-schedule in the same slot
// to be executed in the next cycle.
func (c *SchedulerCycle) dispatch() {
row, jobs := c.getCurrentSlotJobs()
for _, j := range jobs {
if j.GetState() == job.Abort {
continue
}
select {
case c.chJobs <- &JobSlot{row: row, Job: j}:
default:
log.Warn().Msg("unable to put job in workers, trying next cycle")
c.updateSlot(row, j)
}
}
}
// run launches the workers and the ticker.
func (c *SchedulerCycle) run() {
c.workers()
c.tick()
}
// workers launches `MaxWorkers` number of worker to execute job.
// If job returns `ErrJobNotCompletedYet`, it re-schedules in the same slot.
func (c *SchedulerCycle) workers() {
for i := 0; i < MaxWorkers; i++ {
c.wg.Add(1)
go func() {
defer c.wg.Done()
for {
select {
case j := <-c.chJobs:
c.executeJob(j.Job, c.updateCurrentSlot)
case <-c.ctx.Done():
log.Error().Msg("context done, worker is stopping...")
return
}
}
}()
}
}
func (c *SchedulerCycle) executeJob(j *job.Job, fnFallBack func(*job.Job)) {
j.Run(c.ctx)
if j.GetState() == job.Pending {
fnFallBack(j)
}
}
// tick is a simple ticker incrementing at each scheduler interval,
// the slot cursor and dispatch jobs to the workers.
func (c *SchedulerCycle) tick() {
c.wg.Add(1)
go func() {
defer c.wg.Done()
for {
select {
case <-c.ctx.Done():
log.Error().Msg("context done, ticker is stopping...")
return
default:
time.Sleep(c.interval)
c.incr()
c.dispatch()
}
}
}()
}

View File

@ -1,33 +0,0 @@
package scheduler
import (
"context"
"cycle-scheduler/internal/job"
"errors"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestSlot(t *testing.T) {
ctx, fnCancel := context.WithCancel(context.Background())
defer fnCancel()
s := NewSchedulerCycle(ctx, 1*time.Millisecond)
s.Delay(func(ctx context.Context) error {
return nil
})
s.Delay(func(ctx context.Context) error {
return job.ErrJobNotCompletedYet
})
j3 := s.Delay(func(ctx context.Context) error {
return errors.New("errors")
})
time.Sleep(2 * time.Millisecond)
assert.Equal(t, 3, s.Len())
assert.Equal(t, job.Failed.String(), s.GetJobDetails(j3).State)
}

122
main.go
View File

@ -1,122 +0,0 @@
package main
import (
"context"
"cycle-scheduler/internal/job"
"cycle-scheduler/internal/scheduler"
"encoding/json"
"errors"
"fmt"
"math/rand/v2"
"os"
"os/signal"
"time"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
func initLogger() {
zerolog.TimeFieldFormat = zerolog.TimeFormatUnix
log.Logger = log.With().Caller().Logger().Output(zerolog.ConsoleWriter{Out: os.Stderr})
}
func main() {
initLogger()
ctx, stop := signal.NotifyContext(
context.Background(),
os.Interrupt,
os.Kill,
)
defer stop()
interval := 200 * time.Millisecond
s := scheduler.NewSchedulerCycle(ctx, interval)
s.Display()
// pending test
for i := 0; i < 20; i++ {
go func(i int) {
time.Sleep(time.Duration(i) * time.Second)
s.Delay(func(ctx context.Context) error {
time.Sleep(4 * time.Second) //nolint:mnd // test purpose
if rand.IntN(10)%2 == 0 { //nolint:gosec,mnd // test prupose
return job.ErrJobNotCompletedYet
}
return nil
})
}(i)
}
// abort test
j := s.Delay(func(ctx context.Context) error {
time.Sleep(4 * time.Second) //nolint:mnd // test purpose
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return job.ErrJobNotCompletedYet
})
go func() {
time.Sleep(2 * time.Second) //nolint:mnd // test purpose
s.Abort(j)
}()
// abort test 2
j2 := s.Delay(func(ctx context.Context) error {
time.Sleep(time.Second)
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return job.ErrJobNotCompletedYet
})
go func() {
time.Sleep(10 * time.Second) //nolint:mnd // test purpose
s.Abort(j2)
}()
// error test
s.Delay(func(ctx context.Context) error {
time.Sleep(5 * time.Second) //nolint:mnd // test purpose
return errors.New("err")
})
// success test
go func() {
time.Sleep(10 * time.Second) //nolint:mnd // test purpose
s.Delay(func(ctx context.Context) error {
time.Sleep(5 * time.Second) //nolint:mnd // test purpose
return nil
})
}()
go func() {
for {
time.Sleep(2 * time.Second) //nolint:mnd // test purpose
if s.HasAllJobsDone() {
s.Stop()
return
}
}
}()
<-s.Done()
jds := s.GetJobsDetails()
for _, jd := range jds {
c, err := json.Marshal(&jd)
if err != nil {
log.Err(err).Str("job", jd.ID.String()).Msg("unable to parse job details into JSON")
continue
}
fmt.Println(string(c))
}
}

213
scheduler.go Normal file
View File

@ -0,0 +1,213 @@
package scheduler
import (
"context"
"sync"
"time"
"github.com/google/uuid"
"github.com/rs/zerolog/log"
)
const (
ChanLength = 500
DefaultExecInterval = 30 * time.Second
)
type IScheduler interface {
Delay(fnJob FnJob, opts ...TaskOption) uuid.UUID
}
// SchedulerCycle is a simple scheduler handling jobs and executes them at regular interval.
// If a task is not in desired state, the task is re-scheduled.
type SchedulerCycle struct {
wg sync.WaitGroup
ctx context.Context
fnCancel context.CancelFunc
tasks tasks
chTasks chan *task
chDone chan struct{}
}
func NewSchedulerCycle(ctx context.Context, workers uint32) *SchedulerCycle {
ctxChild, fnCancel := context.WithCancel(ctx)
c := SchedulerCycle{
wg: sync.WaitGroup{},
ctx: ctxChild,
fnCancel: fnCancel,
tasks: newTasks(),
chTasks: make(chan *task, ChanLength),
}
done := make(chan struct{})
go func() {
<-c.ctx.Done()
defer c.fnCancel()
c.wg.Wait()
c.stop()
done <- struct{}{}
}()
c.chDone = done
c.run(workers)
return &c
}
// delay sets the task timer when the task should be scheduled.
func (c *SchedulerCycle) delay(t *task) {
interval := DefaultExecInterval
if t.execInterval != nil {
interval = *t.execInterval
}
t.setTimer(
time.AfterFunc(interval, func() {
select {
case c.chTasks <- t:
default:
log.Warn().Str("id", t.GetID().String()).Msg("queue is full, can't accept new task, delayed it")
c.delay(t)
}
}),
)
}
// exec runs the task now or if all the workers are in use, delayed it.
func (c *SchedulerCycle) exec(t *task) {
select {
case c.chTasks <- t:
default:
log.Warn().Str("id", t.GetID().String()).Msg("queue is full, can't accept new task, delayed it")
c.delay(t)
}
}
func (c *SchedulerCycle) getTask(id uuid.UUID) *task {
return c.tasks.get(id)
}
// run launches a number of worker to execute tasks.
func (c *SchedulerCycle) run(n uint32) {
for i := 0; i < int(n); i++ {
c.wg.Add(1)
go func() {
defer c.wg.Done()
for {
select {
case t := <-c.chTasks:
c.execute(t, c.delay)
case <-c.ctx.Done():
log.Error().Msg("context done, worker is stopping...")
return
}
}
}()
}
}
// execute executes the task.
//
// It does not handle task error, it's up to the task to implement its own callbacks.
// In case of pending state, a callback is executed for actions. For the others states,
// the task is deleted from the scheduler.
func (c *SchedulerCycle) execute(t *task, fnFallBack func(*task)) {
t.Run(c.ctx)
switch t.GetState() {
case Pending:
fnFallBack(t)
case Success, Failed, Abort, Unknown:
c.tasks.delete(t)
case Running:
c.tasks.delete(t)
log.Debug().Str("id", t.GetID().String()).Msg("weird state (running) after job execution...")
}
}
// stop aborts all tasks and waits until tasks are stopped.
// If the process can't be stopped within 10s, too bad...
func (c *SchedulerCycle) stop() {
c.tasks.abort()
if c.TasksDone() {
log.Info().Msg("all tasks has been stopped gracefully")
return
}
ctxTimeout := 10 * time.Second
ctx, fnCancel := context.WithTimeout(c.ctx, ctxTimeout)
defer fnCancel()
for {
select {
case <-ctx.Done():
log.Error().Msg("stop context done, tasks has been stopped gracefully")
return
default:
}
if c.TasksDone() {
log.Info().Msg("all tasks has been stopped gracefully")
return
}
time.Sleep(time.Second)
}
}
func (c *SchedulerCycle) Done() <-chan struct{} {
return c.chDone
}
func (c *SchedulerCycle) Len() int {
return c.tasks.len()
}
// TasksDone checks whether all the tasks has been completed.
func (c *SchedulerCycle) TasksDone() bool {
return c.tasks.completed()
}
func (c *SchedulerCycle) GetTasksDetails() []TaskDetails {
return c.tasks.getAllDetails()
}
// GetTaskDetails returns the task details by id.
func (c *SchedulerCycle) GetTaskDetails(id uuid.UUID) TaskDetails {
return c.tasks.getDetails(id)
}
// Delay builds a task and adds it to the scheduler engine.
func (c *SchedulerCycle) Delay(fnJob FnJob, opts ...TaskOption) uuid.UUID {
select {
case <-c.Done():
log.Error().Msg("context done unable to add new job")
default:
}
t := NewTask(fnJob, opts...)
c.tasks.add(t)
c.exec(t)
log.Info().Str("id", t.GetID().String()).Msg("task added successfully")
return t.GetID()
}
// Abort aborts the task given by its id if it exists.
func (c *SchedulerCycle) Abort(id uuid.UUID) bool {
if t := c.getTask(id); t != nil {
t.Abort()
log.Info().Str("id", t.GetID().String()).Msg("abort task done")
return true
}
return false
}

93
scheduler_test.go Normal file
View File

@ -0,0 +1,93 @@
package scheduler
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestScheduler(t *testing.T) {
ctx := context.Background()
var buf int
s := NewSchedulerCycle(ctx, 1)
taskID := s.Delay(func(ctx context.Context) (any, error) {
time.Sleep(50 * time.Millisecond)
buf += 1
return nil, nil
}, WithExecInterval(2*time.Millisecond))
assert.NotEmpty(t, taskID)
assert.False(t, s.TasksDone())
time.Sleep(2 * time.Millisecond)
details := s.GetTaskDetails(taskID)
assert.Equal(t, "running", details.State)
assert.LessOrEqual(t, details.ElapsedTime, 50*time.Millisecond)
time.Sleep(50 * time.Millisecond)
assert.True(t, s.TasksDone())
}
func TestSchedulerLoad(t *testing.T) {
ctx := context.Background()
s := NewSchedulerCycle(ctx, 1)
for i := 0; i < 500; i++ {
s.Delay(func(ctx context.Context) (any, error) {
time.Sleep(1 * time.Millisecond)
return nil, nil
}, WithExecInterval(1*time.Millisecond))
}
assert.Eventually(t, func() bool {
return s.TasksDone()
}, time.Second, 250*time.Millisecond)
}
func TestSchedulerExecInterval(t *testing.T) {
ctx := context.Background()
s := NewSchedulerCycle(ctx, 1)
s.Delay(
func(ctx context.Context) (any, error) {
return nil, ErrJobNotCompletedYet
},
WithMaxDuration(50*time.Millisecond),
WithExecInterval(2*time.Millisecond),
)
time.Sleep(100 * time.Millisecond)
assert.True(t, s.TasksDone())
}
func TestSchedulerContextDone(t *testing.T) {
ctx, fnCancel := context.WithCancel(context.Background())
s := NewSchedulerCycle(ctx, 1)
for i := 0; i < 250; i++ {
s.Delay(
func(ctx context.Context) (any, error) {
return nil, ErrJobNotCompletedYet
},
WithMaxDuration(100*time.Millisecond),
WithExecInterval(2*time.Millisecond),
)
}
go func() {
time.Sleep(50 * time.Millisecond)
fnCancel()
}()
<-s.Done()
}

451
task.go Normal file
View File

@ -0,0 +1,451 @@
package scheduler
import (
"context"
"errors"
"sync"
"time"
"github.com/google/uuid"
"github.com/rs/zerolog/log"
)
type State int
const (
Pending State = iota
Running
Success
Failed
Abort
Unknown
)
const UnknownState = "unknown"
func (s State) String() string {
return [...]string{"pending", "running", "success", "failed", "abort", "unknown"}[s]
}
var (
ErrJobAborted = errors.New("job has been aborted")
ErrJobNotCompletedYet = errors.New("job is not right state, retrying")
ErrTimeExceeded = errors.New("time exceeded")
ErrExecutionEngine = errors.New("execution engine")
)
type FnJob func(ctx context.Context) (any, error)
type FnResult func(ctx context.Context, res any)
type FnError func(ctx context.Context, err error)
type TaskDetails struct {
State string `json:"state"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt *time.Time `json:"updatedAt,omitempty"`
MaxDuration *time.Duration `json:"maxDuration,omitempty"`
Attempts uint32 `json:"attempts"`
Err string `json:"error"`
ElapsedTime time.Duration `json:"elapsedTime"`
AdditionalInfo map[string]string `json:"additionalInfos"`
}
func (td *TaskDetails) log() {
args := []any{}
args = append(args,
"state", td.State,
"createdAt", td.CreatedAt.Format("2006-01-02T15:04:05Z"),
"updatedAt", td.UpdatedAt.Format("2006-01-02T15:04:05Z"),
"elapsedTime", td.ElapsedTime.String(),
"attempts", td.Attempts,
)
if td.AdditionalInfo != nil {
for k, v := range td.AdditionalInfo {
args = append(args, k, v)
}
}
if td.Err != "" {
args = append(args, "error", td.Err)
log.Error().Any("args", args).Msg("task failed")
return
}
log.Info().Any("args", args).Msg("task execution done")
}
type task struct {
l sync.RWMutex
id uuid.UUID
createdAt time.Time
updatedAt *time.Time
state State
fnJob FnJob
fnSuccess FnResult
fnError FnError
attempts uint32
timer *time.Timer
maxDuration *time.Duration
execTimeout *time.Duration
execInterval *time.Duration
res any
err error
additionalInfos map[string]string
chAbort chan struct{}
}
type TaskOption func(t *task)
func WithMaxDuration(d time.Duration) TaskOption {
return func(t *task) {
t.maxDuration = &d
}
}
func WithFnSuccess(f FnResult) TaskOption {
return func(t *task) {
t.fnSuccess = f
}
}
func WithFnError(f FnError) TaskOption {
return func(t *task) {
t.fnError = f
}
}
func WithExecTimeout(d time.Duration) TaskOption {
return func(t *task) {
t.execTimeout = &d
}
}
func WithExecInterval(d time.Duration) TaskOption {
return func(t *task) {
t.execInterval = &d
}
}
func WithAdditionalInfos(k, v string) TaskOption {
return func(t *task) {
if k == "" || v == "" {
return
}
if t.additionalInfos == nil {
t.additionalInfos = map[string]string{}
}
t.additionalInfos[k] = v
}
}
// NewTask builds a task.
// Here the options details that can be set to the task:
// - WithMaxDuration(time.Duration): the task will stop executing if the duration is exceeded (raise ErrTimeExceeded)
// - WithFnSuccess(FnResult): call a function after a success execution
// - WithFnError(FnError): call a function if an error occurred
// - WithExecTimeout(time.Duration): sets a timeout for the task execution
// - WithAdditionalInfos(k, v string): key-value additional informations (does not inferred with the task execution, log purpose)
//
// Scheduler options:
// - WithExecInterval(time.Duration): specify the execution interval
func NewTask(f FnJob, opts ...TaskOption) *task {
t := task{
id: uuid.New(),
createdAt: time.Now().UTC(),
state: Pending,
fnJob: f,
chAbort: make(chan struct{}, 1),
}
for _, o := range opts {
o(&t)
}
return &t
}
func (t *task) setState(s State) {
t.l.Lock()
defer t.l.Unlock()
now := time.Now().UTC()
t.updatedAt = &now
t.state = s
}
func (t *task) setSuccess(ctx context.Context, res any) {
t.l.Lock()
defer t.l.Unlock()
now := time.Now().UTC()
t.updatedAt = &now
t.state = Success
t.res = res
if t.fnSuccess != nil {
t.fnSuccess(ctx, res)
}
}
func (t *task) setFail(ctx context.Context, err error) {
t.l.Lock()
defer t.l.Unlock()
now := time.Now().UTC()
t.updatedAt = &now
if t.state != Abort {
t.state = Failed
}
t.err = err
if t.fnError != nil {
t.fnError(ctx, err)
}
}
func (t *task) setTimer(tm *time.Timer) {
t.l.Lock()
defer t.l.Unlock()
t.timer = tm
}
func (t *task) stopTimer() {
t.l.Lock()
defer t.l.Unlock()
if t.timer != nil {
t.timer.Stop()
}
}
func (t *task) incr() {
t.l.Lock()
defer t.l.Unlock()
t.attempts += 1
}
func (t *task) log() {
td := t.IntoDetails()
td.log()
}
func (t *task) GetID() uuid.UUID {
t.l.RLock()
defer t.l.RUnlock()
return t.id
}
func (t *task) GetAttempts() uint32 {
t.l.RLock()
defer t.l.RUnlock()
return t.attempts
}
func (t *task) GetState() State {
t.l.RLock()
defer t.l.RUnlock()
return t.state
}
// TimeExceeded checks if the task does not reached its max duration execution (maxDuration).
func (t *task) TimeExceeded() bool {
t.l.RLock()
defer t.l.RUnlock()
if md := t.maxDuration; md != nil {
return time.Since(t.createdAt) >= *md
}
return false
}
func (t *task) Abort() {
t.stopTimer()
t.chAbort <- struct{}{}
}
func (t *task) Run(ctx context.Context) {
if t.TimeExceeded() {
t.setFail(ctx, ErrTimeExceeded)
return
}
if s := t.GetState(); s != Pending {
log.Error().Msg("unable to launch a task that not in pending state")
t.setFail(ctx, ErrExecutionEngine)
return
}
var ctxExec context.Context
var fnCancel context.CancelFunc
if t.execTimeout != nil {
ctxExec, fnCancel = context.WithTimeout(ctx, *t.execTimeout)
} else {
ctxExec, fnCancel = context.WithCancel(ctx)
}
defer fnCancel()
t.incr()
t.setState(Running)
log.Info().Str("id", t.GetID().String()).Msg("task is running...")
go func() {
for range t.chAbort {
t.setState(Abort)
fnCancel()
return
}
}()
defer t.log()
res, err := t.fnJob(ctxExec)
if err != nil {
if errors.Is(err, ErrJobNotCompletedYet) {
t.setState(Pending)
return
}
t.setFail(ctx, err)
return
}
t.setSuccess(ctx, res)
}
func (t *task) IntoDetails() TaskDetails {
t.l.RLock()
defer t.l.RUnlock()
td := TaskDetails{
CreatedAt: t.createdAt,
UpdatedAt: t.updatedAt,
State: t.state.String(),
Attempts: t.attempts,
MaxDuration: t.maxDuration,
}
if t.state == Pending || t.state == Running {
td.ElapsedTime = time.Since(t.createdAt)
} else {
td.ElapsedTime = t.updatedAt.Sub(t.createdAt)
}
if err := t.err; err != nil {
td.Err = err.Error()
}
if t.additionalInfos != nil {
td.AdditionalInfo = t.additionalInfos
}
return td
}
type tasks struct {
l sync.RWMutex
s map[uuid.UUID]*task
}
func newTasks() tasks {
return tasks{
s: make(map[uuid.UUID]*task),
}
}
func (ts *tasks) add(t *task) {
ts.l.Lock()
defer ts.l.Unlock()
ts.s[t.GetID()] = t
}
func (ts *tasks) delete(t *task) {
ts.l.Lock()
defer ts.l.Unlock()
delete(ts.s, t.GetID())
}
func (ts *tasks) get(id uuid.UUID) *task {
ts.l.RLock()
defer ts.l.RUnlock()
t, ok := ts.s[id]
if !ok {
return nil
}
return t
}
func (ts *tasks) len() int {
ts.l.RLock()
defer ts.l.RUnlock()
return len(ts.s)
}
func (ts *tasks) completed() bool {
ts.l.RLock()
defer ts.l.RUnlock()
for _, t := range ts.s {
if t.GetState() == Pending || t.GetState() == Running {
return false
}
}
return true
}
func (ts *tasks) getAllDetails() []TaskDetails {
ts.l.RLock()
defer ts.l.RUnlock()
details := []TaskDetails{}
for _, t := range ts.s {
details = append(details, t.IntoDetails())
}
return details
}
func (ts *tasks) getDetails(id uuid.UUID) TaskDetails {
ts.l.RLock()
defer ts.l.RUnlock()
t, ok := ts.s[id]
if !ok {
return TaskDetails{State: UnknownState}
}
return t.IntoDetails()
}
func (ts *tasks) abort() {
ts.l.RLock()
defer ts.l.RUnlock()
for _, t := range ts.s {
t.Abort()
}
}

243
task_test.go Normal file
View File

@ -0,0 +1,243 @@
package scheduler
import (
"context"
"errors"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
)
func TestTask(t *testing.T) {
ctx := context.Background()
var i int
task := NewTask(
func(ctx context.Context) (any, error) {
i += 1
return nil, nil
},
)
task.Run(ctx)
assert.Equal(t, nil, task.res)
assert.NotEmpty(t, task.updatedAt)
assert.Equal(t, 1, i)
assert.Equal(t, 1, int(task.GetAttempts()))
}
func TestAbortTask(t *testing.T) {
ctx := context.Background()
task := NewTask(
func(ctx context.Context) (any, error) {
<-ctx.Done()
return nil, ctx.Err()
},
)
timer := time.NewTicker(200 * time.Millisecond)
go func() {
<-timer.C
task.Abort()
}()
task.Run(ctx)
assert.Equal(t, Abort, task.GetState())
}
func TestTaskContextDone(t *testing.T) {
ctx, fnCancel := context.WithCancel(context.Background())
task := NewTask(
func(ctx context.Context) (any, error) {
<-ctx.Done()
return nil, ctx.Err()
},
)
timer := time.NewTicker(200 * time.Millisecond)
go func() {
<-timer.C
fnCancel()
}()
task.Run(ctx)
assert.Equal(t, Failed, task.GetState())
}
func TestTaskFnSuccess(t *testing.T) {
ctx := context.Background()
var result int
task := NewTask(
func(ctx context.Context) (any, error) {
return 3, nil
},
WithFnSuccess(func(ctx context.Context, res any) {
t, ok := res.(int)
if !ok {
return
}
result = t * 2
}),
)
task.Run(ctx)
assert.Equal(t, Success, task.GetState())
assert.Equal(t, 6, result)
}
func TestTaskFnError(t *testing.T) {
ctx := context.Background()
var result error
task := NewTask(
func(ctx context.Context) (any, error) {
return 3, errors.New("error occurred...")
},
WithFnError(func(ctx context.Context, err error) {
result = err
}),
)
task.Run(ctx)
assert.Equal(t, Failed, task.GetState())
assert.Equal(t, "error occurred...", result.Error())
}
func TestTaskWithErrJobNotCompletedYet(t *testing.T) {
ctx := context.Background()
var attempts int
task := NewTask(
func(ctx context.Context) (any, error) {
if attempts < 2 {
attempts += 1
return nil, ErrJobNotCompletedYet
}
return "ok", nil
},
)
for i := 0; i < 2; i++ {
task.Run(ctx)
assert.Equal(t, Pending, task.GetState())
}
task.Run(ctx)
assert.Equal(t, Success, task.GetState())
assert.Equal(t, 3, int(task.GetAttempts()))
}
func TestTaskTimeExceeded(t *testing.T) {
ctx := context.Background()
task := NewTask(
func(ctx context.Context) (any, error) {
return "ko", nil
},
WithMaxDuration(5*time.Millisecond),
)
time.Sleep(10 * time.Millisecond)
task.Run(ctx)
assert.Equal(t, Failed, task.GetState())
assert.Equal(t, 0, int(task.GetAttempts()))
}
func TestTaskExecTimeout(t *testing.T) {
ctx := context.Background()
task := NewTask(
func(ctx context.Context) (any, error) {
<-ctx.Done()
return nil, ctx.Err()
},
WithExecTimeout(5*time.Millisecond),
)
time.Sleep(10 * time.Millisecond)
task.Run(ctx)
assert.Equal(t, Failed, task.GetState())
assert.Equal(t, 1, int(task.GetAttempts()))
}
func TestTaskDetails(t *testing.T) {
ctx := context.Background()
task := NewTask(
func(ctx context.Context) (any, error) {
return "coucou", nil
},
)
details := task.IntoDetails()
assert.Equal(t, 0, int(details.Attempts))
assert.Equal(t, "pending", details.State)
assert.False(t, details.CreatedAt.IsZero())
assert.Empty(t, details.UpdatedAt)
assert.Nil(t, details.MaxDuration)
assert.Empty(t, details.Err)
assert.NotEmpty(t, details.ElapsedTime)
task.Run(ctx)
details = task.IntoDetails()
assert.Equal(t, 1, int(details.Attempts))
assert.Equal(t, "success", details.State)
assert.False(t, details.CreatedAt.IsZero())
assert.NotEmpty(t, details.UpdatedAt)
assert.Nil(t, details.MaxDuration)
assert.Empty(t, details.Err)
assert.NotEmpty(t, details.ElapsedTime)
}
func TestTaskAdditionalInfos(t *testing.T) {
t.Run("with key value", func(t *testing.T) {
elementID := uuid.NewString()
task := NewTask(
func(ctx context.Context) (any, error) {
return "yo", nil
},
WithAdditionalInfos("transportId", elementID),
WithAdditionalInfos("element", "transport"),
)
assert.Equal(t, elementID, task.additionalInfos["transportId"])
assert.Equal(t, "transport", task.additionalInfos["element"])
})
t.Run("with empty key", func(t *testing.T) {
elementID := uuid.NewString()
task := NewTask(
func(ctx context.Context) (any, error) {
return "hello", nil
},
WithAdditionalInfos("", elementID),
WithAdditionalInfos("element", "transport"),
)
assert.Equal(t, "transport", task.additionalInfos["element"])
})
t.Run("with empty infos", func(t *testing.T) {
task := NewTask(
func(ctx context.Context) (any, error) {
return "hey", nil
},
)
assert.Nil(t, task.additionalInfos)
})
}