167 lines
		
	
	
		
			2.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			167 lines
		
	
	
		
			2.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package job
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/google/uuid"
 | |
| 	"github.com/rs/zerolog/log"
 | |
| )
 | |
| 
 | |
| type State int
 | |
| 
 | |
| const (
 | |
| 	Pending State = iota
 | |
| 	Running
 | |
| 	Success
 | |
| 	Failed
 | |
| 	Abort
 | |
| 	Unknown
 | |
| )
 | |
| 
 | |
| const UnknownState = "unknown"
 | |
| 
 | |
| var JobExecTimeout = 10 * time.Second
 | |
| 
 | |
| func (s State) String() string {
 | |
| 	switch s {
 | |
| 	case Pending:
 | |
| 		return "pending"
 | |
| 	case Running:
 | |
| 		return "running"
 | |
| 	case Success:
 | |
| 		return "success"
 | |
| 	case Failed:
 | |
| 		return "failed"
 | |
| 	case Abort:
 | |
| 		return "abort"
 | |
| 	case Unknown:
 | |
| 		return UnknownState
 | |
| 	default:
 | |
| 		return UnknownState
 | |
| 	}
 | |
| }
 | |
| 
 | |
| var (
 | |
| 	ErrJobAborted         = errors.New("job has been aborted")
 | |
| 	ErrJobNotCompletedYet = errors.New("job is not right state, retrying")
 | |
| )
 | |
| 
 | |
| type FnJob func(ctx context.Context) error
 | |
| 
 | |
| type JobDetails struct {
 | |
| 	ID        uuid.UUID  `json:"id"`
 | |
| 	State     string     `json:"state"`
 | |
| 	CreatedAt time.Time  `json:"createdAt"`
 | |
| 	UpdatedAt *time.Time `json:"updatedAt,omitempty"`
 | |
| 	Err       string     `json:"error"`
 | |
| }
 | |
| 
 | |
| // TODO(rmanach): add priority level
 | |
| type Job struct {
 | |
| 	l         sync.RWMutex
 | |
| 	id        uuid.UUID
 | |
| 	createdAt time.Time
 | |
| 	updatedAt *time.Time
 | |
| 	state     State
 | |
| 	task      FnJob
 | |
| 	err       error
 | |
| 	chAbort   chan struct{}
 | |
| }
 | |
| 
 | |
| func NewJob(task FnJob, row, col int) Job {
 | |
| 	return Job{
 | |
| 		id:        uuid.New(),
 | |
| 		createdAt: time.Now().UTC(),
 | |
| 		state:     Pending,
 | |
| 		task:      task,
 | |
| 		chAbort:   make(chan struct{}, 1),
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (j *Job) IntoDetails() JobDetails {
 | |
| 	j.l.RLock()
 | |
| 	defer j.l.RUnlock()
 | |
| 
 | |
| 	jd := JobDetails{
 | |
| 		ID:        j.id,
 | |
| 		CreatedAt: j.createdAt,
 | |
| 		State:     j.state.String(),
 | |
| 	}
 | |
| 
 | |
| 	if err := j.err; err != nil {
 | |
| 		jd.Err = err.Error()
 | |
| 	}
 | |
| 
 | |
| 	if ut := j.updatedAt; ut != nil {
 | |
| 		jd.UpdatedAt = ut
 | |
| 	}
 | |
| 
 | |
| 	return jd
 | |
| }
 | |
| 
 | |
| func (j *Job) GetID() uuid.UUID {
 | |
| 	return j.id
 | |
| }
 | |
| 
 | |
| func (j *Job) GetState() State {
 | |
| 	j.l.RLock()
 | |
| 	defer j.l.RUnlock()
 | |
| 
 | |
| 	return j.state
 | |
| }
 | |
| 
 | |
| func (j *Job) setState(s State) {
 | |
| 	j.l.Lock()
 | |
| 	defer j.l.Unlock()
 | |
| 
 | |
| 	now := time.Now().UTC()
 | |
| 	j.updatedAt = &now
 | |
| 
 | |
| 	j.state = s
 | |
| }
 | |
| 
 | |
| func (j *Job) setFail(err error) {
 | |
| 	j.l.Lock()
 | |
| 	defer j.l.Unlock()
 | |
| 
 | |
| 	now := time.Now().UTC()
 | |
| 	j.updatedAt = &now
 | |
| 
 | |
| 	j.state = Failed
 | |
| 	j.err = err
 | |
| }
 | |
| 
 | |
| func (j *Job) Abort() {
 | |
| 	j.setState(Abort)
 | |
| 	j.chAbort <- struct{}{}
 | |
| }
 | |
| 
 | |
| func (j *Job) Run(ctx context.Context) {
 | |
| 	ctxExec, fnCancel := context.WithTimeout(ctx, JobExecTimeout)
 | |
| 	defer fnCancel()
 | |
| 
 | |
| 	j.setState(Running)
 | |
| 
 | |
| 	log.Info().Str("job", j.GetID().String()).Msg("job running...")
 | |
| 
 | |
| 	go func() {
 | |
| 		for range j.chAbort {
 | |
| 			j.setState(Abort)
 | |
| 			fnCancel()
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	if err := j.task(ctxExec); err != nil {
 | |
| 		if errors.Is(err, ErrJobNotCompletedYet) {
 | |
| 			j.setState(Pending)
 | |
| 			return
 | |
| 		}
 | |
| 		j.setFail(err)
 | |
| 		return
 | |
| 	}
 | |
| 	j.setState(Success)
 | |
| }
 |