add nginx deployers + scheduler
This commit is contained in:
parent
1ddc49ae15
commit
a266685a29
149
.golangci.yml
149
.golangci.yml
@ -1,146 +1,33 @@
|
||||
linters-settings:
|
||||
depguard:
|
||||
rules:
|
||||
Main:
|
||||
files:
|
||||
- $all
|
||||
- '!$test'
|
||||
allow:
|
||||
- $gostd
|
||||
- gitea.thegux.fr
|
||||
- github.com
|
||||
Test:
|
||||
files:
|
||||
- $test
|
||||
allow:
|
||||
- $gostd
|
||||
- gitea.thegux.fr
|
||||
- github.com
|
||||
dupl:
|
||||
threshold: 100
|
||||
funlen:
|
||||
lines: 100
|
||||
statements: 50
|
||||
gci:
|
||||
sections:
|
||||
- "standard"
|
||||
- "default"
|
||||
- "blank"
|
||||
- "dot"
|
||||
# - "alias"
|
||||
- "prefix(gitea.thegux.fr)"
|
||||
goconst:
|
||||
min-len: 2
|
||||
min-occurrences: 2
|
||||
gocritic:
|
||||
enabled-tags:
|
||||
- diagnostic
|
||||
- experimental
|
||||
- opinionated
|
||||
- performance
|
||||
- style
|
||||
disabled-checks:
|
||||
- dupImport # https://github.com/go-critic/go-critic/issues/845
|
||||
- ifElseChain
|
||||
- octalLiteral
|
||||
# - whyNoLint
|
||||
- wrapperFunc
|
||||
gocyclo:
|
||||
min-complexity: 15
|
||||
goimports:
|
||||
local-prefixes: gitea.thegux.fr
|
||||
mnd:
|
||||
checks:
|
||||
- argument
|
||||
- case
|
||||
- condition
|
||||
- return
|
||||
govet:
|
||||
disable:
|
||||
- fieldalignment
|
||||
lll:
|
||||
line-length: 200
|
||||
misspell:
|
||||
locale: US
|
||||
nolintlint:
|
||||
allow-unused: false # report any unused nolint directives
|
||||
require-explanation: false # don't require an explanation for nolint directives
|
||||
require-specific: false # don't require nolint directives to be specific about which linter is being skipped
|
||||
errcheck:
|
||||
check-blank: true
|
||||
exclude-functions:
|
||||
- '(*github.com/gin-gonic/gin.Error).SetType'
|
||||
- '(*github.com/gin-gonic/gin.Context).Error'
|
||||
run:
|
||||
timeout: 5m
|
||||
modules-download-mode: readonly
|
||||
|
||||
linters:
|
||||
disable-all: true
|
||||
enable:
|
||||
- bodyclose
|
||||
- depguard
|
||||
- dogsled
|
||||
- dupl
|
||||
- errcheck
|
||||
- exhaustive
|
||||
- exportloopref
|
||||
- funlen
|
||||
- gochecknoinits
|
||||
- goconst
|
||||
- gocritic
|
||||
- gocyclo
|
||||
- gofmt
|
||||
- goimports
|
||||
# - mnd
|
||||
- goprintffuncname
|
||||
- gosec
|
||||
- gosimple
|
||||
- govet
|
||||
# - inamedparam
|
||||
- ineffassign
|
||||
- lll
|
||||
- misspell
|
||||
- nakedret
|
||||
- noctx
|
||||
- nolintlint
|
||||
# - perfsprint
|
||||
- rowserrcheck
|
||||
# - sloglint
|
||||
- goimports
|
||||
- errcheck
|
||||
- staticcheck
|
||||
- stylecheck
|
||||
- typecheck
|
||||
- unconvert
|
||||
- unparam
|
||||
- unused
|
||||
- whitespace
|
||||
# - gochecknoglobals # too many global in ds9
|
||||
- gosimple
|
||||
- ineffassign
|
||||
- typecheck
|
||||
|
||||
# don't enable:
|
||||
# - asciicheck
|
||||
# - scopelint
|
||||
# - gocognit
|
||||
# - godot
|
||||
# - godox
|
||||
# - goerr113
|
||||
# - interfacer
|
||||
# - maligned
|
||||
# - nestif
|
||||
# - prealloc
|
||||
# - testpackage
|
||||
# - revive
|
||||
# - wsl
|
||||
|
||||
# issues:
|
||||
# Excluding configuration per-path, per-linter, per-text and per-source
|
||||
# fix: true
|
||||
linters-settings:
|
||||
gofmt:
|
||||
simplify: true
|
||||
goimports:
|
||||
local-prefixes: gitea.thegux.fr
|
||||
|
||||
issues:
|
||||
exclude-rules:
|
||||
- path: '(.+)_test\.go'
|
||||
- path: _test\.go
|
||||
linters:
|
||||
- funlen
|
||||
- goconst
|
||||
- dupl
|
||||
- errcheck
|
||||
- staticcheck
|
||||
exclude-dirs:
|
||||
- ..
|
||||
|
||||
run:
|
||||
timeout: 5m
|
||||
service:
|
||||
golangci-lint-version: 1.54.x
|
||||
@ -1,6 +1,12 @@
|
||||
package deployers
|
||||
|
||||
import "context"
|
||||
|
||||
var ErrContextDone = "unable to execute, context done"
|
||||
|
||||
type IDeployer interface {
|
||||
Deploy() error
|
||||
Close() error
|
||||
Deploy(ctx context.Context) error
|
||||
Build(ctx context.Context) error
|
||||
Clear(ctx context.Context) error
|
||||
Done() <-chan struct{}
|
||||
}
|
||||
|
||||
114
deployers/nginx.go
Normal file
114
deployers/nginx.go
Normal file
@ -0,0 +1,114 @@
|
||||
package deployers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
"gitea.thegux.fr/hmdeploy/connection"
|
||||
"gitea.thegux.fr/hmdeploy/models"
|
||||
)
|
||||
|
||||
type NginxDeployer struct {
|
||||
conn connection.IConnection
|
||||
project *models.Project
|
||||
archivePath string
|
||||
chDone chan struct{}
|
||||
}
|
||||
|
||||
var _ IDeployer = (*NginxDeployer)(nil)
|
||||
|
||||
func NewNginxDeployer(netInfo *models.HMNetInfo, project *models.Project) (NginxDeployer, error) {
|
||||
var nd NginxDeployer
|
||||
|
||||
conn, err := connection.NewSSHConn(netInfo.IP.String(), netInfo.SSH.User, netInfo.SSH.Port, netInfo.SSH.PrivKey)
|
||||
if err != nil {
|
||||
return nd, err
|
||||
}
|
||||
|
||||
nd.conn = &conn
|
||||
nd.project = project
|
||||
nd.chDone = make(chan struct{}, 1)
|
||||
|
||||
return nd, nil
|
||||
}
|
||||
|
||||
func (nd *NginxDeployer) close(ctx context.Context) error {
|
||||
return nd.conn.Close()
|
||||
}
|
||||
|
||||
func (nd *NginxDeployer) clean(ctx context.Context) (err error) {
|
||||
_, err = nd.conn.Execute("rm -f " + nd.project.Name + ".conf")
|
||||
return
|
||||
}
|
||||
|
||||
func (nd *NginxDeployer) setDone() {
|
||||
nd.chDone <- struct{}{}
|
||||
}
|
||||
|
||||
func (nd *NginxDeployer) Done() <-chan struct{} {
|
||||
return nd.chDone
|
||||
}
|
||||
|
||||
func (nd *NginxDeployer) Clear(ctx context.Context) error {
|
||||
log.Debug().Msg("clearing nginx deployment...")
|
||||
|
||||
if err := nd.clean(ctx); err != nil {
|
||||
log.Err(err).Msg("unable to clean nginx conf remotly")
|
||||
}
|
||||
|
||||
if err := nd.close(ctx); err != nil {
|
||||
log.Err(err).Msg("unable to close nginx conn")
|
||||
}
|
||||
|
||||
log.Debug().Msg("clear nginx deployment done")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (nd *NginxDeployer) Build(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
nd.setDone()
|
||||
return fmt.Errorf("%w, nginx close ssh conn skipped", ErrContextDone)
|
||||
default:
|
||||
}
|
||||
|
||||
nginxPath := filepath.Join(nd.project.Dir, filepath.Base(nd.project.Deps.NginxFile))
|
||||
nginxConf := nd.project.Name + ".conf"
|
||||
|
||||
log.Info().Str("nginx", nginxConf).Msg("transfering nginx conf...")
|
||||
|
||||
if err := nd.conn.CopyFile(nginxPath, nginxConf); err != nil {
|
||||
nd.setDone()
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info().Str("nginx", nginxConf).Msg("nginx conf transfered with success")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (nd *NginxDeployer) Deploy(ctx context.Context) (err error) {
|
||||
defer nd.setDone()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("%w, nginx close ssh conn skipped", ErrContextDone)
|
||||
default:
|
||||
}
|
||||
|
||||
nginxConf := nd.project.Name + ".conf"
|
||||
|
||||
log.Info().Str("nginx", nginxConf).Msg("deploying nginx conf...")
|
||||
|
||||
_, err = nd.conn.Execute(
|
||||
fmt.Sprintf(
|
||||
"cp %s /etc/nginx/sites-available && ln -sf /etc/nginx/sites-available/%s /etc/nginx/sites-enabled/%s",
|
||||
nginxConf, nginxConf, nginxConf,
|
||||
),
|
||||
)
|
||||
|
||||
log.Info().Str("nginx", nginxConf).Msg("nginx conf successfully deployed")
|
||||
return err
|
||||
}
|
||||
@ -1,137 +1,104 @@
|
||||
package deployers
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
"gitea.thegux.fr/hmdeploy/connection"
|
||||
"gitea.thegux.fr/hmdeploy/docker"
|
||||
"gitea.thegux.fr/hmdeploy/models"
|
||||
)
|
||||
|
||||
func addToArchive(tw *tar.Writer, filename string) error {
|
||||
file, err := os.Open(filename)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
info, err := file.Stat()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
header, err := tar.FileInfoHeader(info, info.Name())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
header.Name = filepath.Base(file.Name())
|
||||
|
||||
if err := tw.WriteHeader(header); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = io.Copy(tw, file)
|
||||
return err
|
||||
}
|
||||
|
||||
var (
|
||||
ErrSwarmDeployerArchive = errors.New("unable to generate archive")
|
||||
"gitea.thegux.fr/hmdeploy/utils"
|
||||
)
|
||||
|
||||
type SwarmDeployer struct {
|
||||
ctx context.Context
|
||||
fnCancel context.CancelFunc
|
||||
|
||||
conn connection.IConnection
|
||||
dcli docker.IClient
|
||||
|
||||
project *models.Project
|
||||
project *models.Project
|
||||
archivePath string
|
||||
|
||||
chDone chan struct{}
|
||||
}
|
||||
|
||||
var _ IDeployer = (*SwarmDeployer)(nil)
|
||||
|
||||
func NewSwarmDeployer(ctx context.Context, dockerClient docker.IClient, netInfo *models.HMNetInfo, project *models.Project) (SwarmDeployer, error) {
|
||||
var sm SwarmDeployer
|
||||
func NewSwarmDeployer(dockerClient docker.IClient, netInfo *models.HMNetInfo, project *models.Project) (SwarmDeployer, error) {
|
||||
var sd SwarmDeployer
|
||||
|
||||
conn, err := connection.NewSSHConn(netInfo.IP.String(), netInfo.SSH.User, netInfo.SSH.Port, netInfo.SSH.PrivKey)
|
||||
if err != nil {
|
||||
return sm, err
|
||||
return sd, err
|
||||
}
|
||||
|
||||
ctxChild, fnCancel := context.WithCancel(ctx)
|
||||
sd.conn = &conn
|
||||
sd.dcli = dockerClient
|
||||
sd.project = project
|
||||
sd.chDone = make(chan struct{}, 1)
|
||||
|
||||
sm.ctx = ctxChild
|
||||
sm.fnCancel = fnCancel
|
||||
|
||||
sm.conn = &conn
|
||||
sm.dcli = dockerClient
|
||||
sm.project = project
|
||||
|
||||
return sm, nil
|
||||
return sd, nil
|
||||
}
|
||||
|
||||
func (sd *SwarmDeployer) Close() error {
|
||||
func (sd *SwarmDeployer) close(ctx context.Context) error {
|
||||
return sd.conn.Close()
|
||||
}
|
||||
|
||||
func (sd *SwarmDeployer) clean() (err error) {
|
||||
func (sd *SwarmDeployer) clean(ctx context.Context) (err error) {
|
||||
defer os.Remove(sd.archivePath)
|
||||
_, err = sd.conn.Execute(fmt.Sprintf("rm -f %s %s *.tar.gz *.tar", models.ComposeFile, models.EnvFile))
|
||||
return
|
||||
}
|
||||
|
||||
func (sd *SwarmDeployer) createArchive(files ...string) (string, error) {
|
||||
now := time.Now().UTC()
|
||||
archivePath := filepath.Join(sd.project.Dir, fmt.Sprintf("%s-%s.tar.gz", sd.project.Name, strings.Replace(now.Format(time.RFC3339), ":", "-", -1)))
|
||||
|
||||
file, err := os.Create(archivePath)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("%w, unable to create archive=%s, err=%v", ErrSwarmDeployerArchive, archivePath, err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
gw := gzip.NewWriter(file)
|
||||
defer gw.Close()
|
||||
|
||||
tw := tar.NewWriter(gw)
|
||||
defer tw.Close()
|
||||
|
||||
for _, f := range files {
|
||||
if err := addToArchive(tw, f); err != nil {
|
||||
return "", fmt.Errorf("%w, unable to add file=%s to archive=%s, err=%v", ErrSwarmDeployerArchive, f, archivePath, err)
|
||||
}
|
||||
}
|
||||
|
||||
return archivePath, nil
|
||||
|
||||
func (sd *SwarmDeployer) setDone() {
|
||||
sd.chDone <- struct{}{}
|
||||
}
|
||||
|
||||
func (sd *SwarmDeployer) Deploy() error {
|
||||
defer sd.clean()
|
||||
func (sd *SwarmDeployer) Done() <-chan struct{} {
|
||||
return sd.chDone
|
||||
}
|
||||
|
||||
func (sd *SwarmDeployer) Clear(ctx context.Context) error {
|
||||
log.Debug().Msg("clearing swarm deployment...")
|
||||
|
||||
if err := sd.clean(ctx); err != nil {
|
||||
log.Err(err).Msg("unable to clean swarm conf remotly")
|
||||
}
|
||||
|
||||
if err := sd.close(ctx); err != nil {
|
||||
log.Err(err).Msg("unable to close swarm conn")
|
||||
}
|
||||
|
||||
log.Debug().Msg("clear swarm deployment done")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sd *SwarmDeployer) Build(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
sd.setDone()
|
||||
return fmt.Errorf("%w, swarm project build skipped", ErrContextDone)
|
||||
default:
|
||||
}
|
||||
|
||||
log.Info().Msg("building swarm archive for deployment...")
|
||||
|
||||
filesToArchive := []string{}
|
||||
if imageName := sd.project.ImageName; imageName != "" {
|
||||
log.Info().Str("image", imageName).Msg("saving image for transfert...")
|
||||
|
||||
tarFile, err := sd.dcli.Save(imageName, sd.project.Dir)
|
||||
if err != nil {
|
||||
sd.setDone()
|
||||
return err
|
||||
}
|
||||
|
||||
defer os.Remove(tarFile)
|
||||
|
||||
log.Info().Str("image", imageName).Str("dir", sd.project.Dir).Msg("image saved successfully")
|
||||
filesToArchive = append(filesToArchive, tarFile)
|
||||
|
||||
log.Info().Str("image", imageName).Msg("image added to archive")
|
||||
}
|
||||
|
||||
if envFilePath := sd.project.Deps.EnvFile; envFilePath != "" {
|
||||
@ -142,15 +109,36 @@ func (sd *SwarmDeployer) Deploy() error {
|
||||
composeFileBase := filepath.Base(sd.project.Deps.ComposeFile)
|
||||
filesToArchive = append(filesToArchive, filepath.Join(sd.project.Dir, composeFileBase))
|
||||
|
||||
archivePath, err := sd.createArchive(filesToArchive...)
|
||||
archivePath, err := utils.CreateArchive(sd.project.Dir, fmt.Sprintf("%s-%s", sd.project.Name, "swarm"), filesToArchive...)
|
||||
if err != nil {
|
||||
sd.setDone()
|
||||
return err
|
||||
}
|
||||
defer os.Remove(archivePath)
|
||||
|
||||
archiveDestPath := filepath.Base(archivePath)
|
||||
log.Info().Str("archive", archivePath).Msg("archive built with success, tranfering to swarm for deployment...")
|
||||
if err := sd.conn.CopyFile(archivePath, archiveDestPath); err != nil {
|
||||
sd.archivePath = archivePath
|
||||
|
||||
log.Info().Str("archive", archivePath).Msg("swarm archive built")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sd *SwarmDeployer) Deploy(ctx context.Context) error {
|
||||
defer sd.setDone()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("%w, nginx close ssh conn skipped", ErrContextDone)
|
||||
default:
|
||||
}
|
||||
|
||||
if sd.archivePath == "" {
|
||||
return fmt.Errorf("unable to deploy, no archive to deploy")
|
||||
}
|
||||
|
||||
log.Info().Str("archive", sd.archivePath).Msg("deploying archive to swarm...")
|
||||
|
||||
archiveDestPath := filepath.Base(sd.archivePath)
|
||||
log.Info().Str("archive", sd.archivePath).Msg("archive built with success, tranfering to swarm for deployment...")
|
||||
if err := sd.conn.CopyFile(sd.archivePath, archiveDestPath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -158,10 +146,12 @@ func (sd *SwarmDeployer) Deploy() error {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info().Str("project", sd.project.Name).Msg("deploying project...")
|
||||
log.Info().Str("project", sd.project.Name).Msg("deploying swarm project...")
|
||||
composeFileBase := filepath.Base(sd.project.Deps.ComposeFile)
|
||||
if _, err := sd.conn.Execute(fmt.Sprintf("docker stack deploy -c %s %s", composeFileBase, sd.project.Name)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info().Msg("swarm deployment done with success")
|
||||
return nil
|
||||
}
|
||||
|
||||
45
main.go
45
main.go
@ -14,6 +14,7 @@ import (
|
||||
"gitea.thegux.fr/hmdeploy/deployers"
|
||||
"gitea.thegux.fr/hmdeploy/docker"
|
||||
"gitea.thegux.fr/hmdeploy/models"
|
||||
"gitea.thegux.fr/hmdeploy/scheduler"
|
||||
)
|
||||
|
||||
const HMDEPLOY_DIRNAME = ".homeserver"
|
||||
@ -52,27 +53,55 @@ func main() {
|
||||
}
|
||||
log.Info().Str("conf", hmmap_path).Msg("hmmap load successfully")
|
||||
|
||||
swarmNet := hmmap.GetSwarmNetInfo()
|
||||
if swarmNet == nil {
|
||||
log.Fatal().Err(err).Msg("unable to get swarm net info, does not exist")
|
||||
}
|
||||
|
||||
project, err := models.ProjectFromDir(*projectDir)
|
||||
if err != nil {
|
||||
log.Fatal().Str("dir", *projectDir).Err(err).Msg("unable to init project from directory")
|
||||
}
|
||||
log.Info().Str("dir", project.Dir).Str("name", project.Name).Msg("project initialized with success")
|
||||
|
||||
swarmNet := hmmap.GetSwarmNetInfo()
|
||||
if swarmNet == nil {
|
||||
log.Fatal().Err(err).Msg("unable to get swarm net info, does not exist")
|
||||
}
|
||||
dcli := docker.NewClient()
|
||||
|
||||
sd, err := deployers.NewSwarmDeployer(ctx, &dcli, swarmNet, &project)
|
||||
sd, err := deployers.NewSwarmDeployer(&dcli, swarmNet, &project)
|
||||
if err != nil {
|
||||
log.Fatal().Err(err).Msg("unable to init swarm deployer")
|
||||
}
|
||||
|
||||
if err := sd.Deploy(); err != nil {
|
||||
log.Fatal().Err(err).Msg("unable to deploy project")
|
||||
var nd deployers.IDeployer
|
||||
if project.Deps.NginxFile != "" {
|
||||
nginxNet := hmmap.GetNginxNetInfo()
|
||||
if nginxNet == nil {
|
||||
log.Err(err).Msg("unable to get nginx net info, does not exist")
|
||||
return
|
||||
}
|
||||
|
||||
d, err := deployers.NewNginxDeployer(nginxNet, &project)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("unable to nginx conf")
|
||||
return
|
||||
}
|
||||
|
||||
nd = &d
|
||||
}
|
||||
|
||||
deployNginx := scheduler.NewTask("nginx-deploy", nd.Deploy)
|
||||
deploySwarm := scheduler.NewTask("swarm-deploy", sd.Deploy, deployNginx)
|
||||
|
||||
s := scheduler.NewScheduler(ctx, 30, 4)
|
||||
s.Submit(scheduler.NewTask("swarm-build", sd.Build, deploySwarm))
|
||||
s.Submit(scheduler.NewTask("nginx-build", nd.Build))
|
||||
|
||||
<-nd.Done()
|
||||
<-sd.Done()
|
||||
|
||||
s.Submit(scheduler.NewTask("nginx-clear", nd.Clear))
|
||||
s.Submit(scheduler.NewTask("swarm-clear", sd.Clear))
|
||||
|
||||
s.Stop()
|
||||
<-s.Done()
|
||||
|
||||
log.Info().Str("name", project.Name).Msg("project deployed successfully")
|
||||
}
|
||||
|
||||
@ -31,3 +31,12 @@ func (hm *HMMap) GetSwarmNetInfo() *HMNetInfo {
|
||||
|
||||
return data
|
||||
}
|
||||
|
||||
func (hm *HMMap) GetNginxNetInfo() *HMNetInfo {
|
||||
data, ok := hm.LXC["nginx"]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
return data
|
||||
}
|
||||
|
||||
166
scheduler/scheduler.go
Normal file
166
scheduler/scheduler.go
Normal file
@ -0,0 +1,166 @@
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
var ErrSchedulerMaxCapacityReached = errors.New("unable to add new task, max capacity reached")
|
||||
|
||||
type TaskStatus string
|
||||
|
||||
const (
|
||||
Pending TaskStatus = "pending"
|
||||
Running TaskStatus = "running"
|
||||
Success = "success"
|
||||
Failed = "failed"
|
||||
)
|
||||
|
||||
type FnJob func(context.Context) error
|
||||
|
||||
type taskStore struct {
|
||||
l sync.RWMutex
|
||||
tasks map[string]*Task
|
||||
}
|
||||
|
||||
func newTaskStore() taskStore {
|
||||
return taskStore{
|
||||
tasks: map[string]*Task{},
|
||||
}
|
||||
}
|
||||
|
||||
func (ts *taskStore) push(task *Task) {
|
||||
ts.l.Lock()
|
||||
defer ts.l.Unlock()
|
||||
|
||||
ts.tasks[task.Name] = task
|
||||
}
|
||||
|
||||
func (ts *taskStore) setStatus(task *Task, status TaskStatus) {
|
||||
ts.l.Lock()
|
||||
defer ts.l.Unlock()
|
||||
|
||||
if _, ok := ts.tasks[task.Name]; !ok {
|
||||
log.Warn().Str("name", task.Name).Msg("unable to update task status, does not exist")
|
||||
return
|
||||
}
|
||||
|
||||
ts.tasks[task.Name].Status = status
|
||||
}
|
||||
|
||||
func (ts *taskStore) len() int {
|
||||
ts.l.RLock()
|
||||
defer ts.l.RUnlock()
|
||||
|
||||
return len(ts.tasks)
|
||||
}
|
||||
|
||||
type Task struct {
|
||||
Name string
|
||||
Job FnJob
|
||||
Status TaskStatus
|
||||
Next []*Task
|
||||
}
|
||||
|
||||
func NewTask(name string, job FnJob, next ...*Task) *Task {
|
||||
return &Task{
|
||||
Name: name,
|
||||
Job: job,
|
||||
Next: next,
|
||||
Status: Pending,
|
||||
}
|
||||
}
|
||||
|
||||
type Scheduler struct {
|
||||
ctx context.Context
|
||||
fnCancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
|
||||
capacity atomic.Uint32
|
||||
workers int
|
||||
|
||||
chTasks chan *Task
|
||||
tasks taskStore
|
||||
}
|
||||
|
||||
func NewScheduler(ctx context.Context, capacity, workers int) *Scheduler {
|
||||
ctxChild, fnCancel := context.WithCancel(ctx)
|
||||
s := Scheduler{
|
||||
ctx: ctxChild,
|
||||
fnCancel: fnCancel,
|
||||
capacity: atomic.Uint32{},
|
||||
workers: workers,
|
||||
chTasks: make(chan *Task, capacity),
|
||||
tasks: newTaskStore(),
|
||||
wg: sync.WaitGroup{},
|
||||
}
|
||||
s.capacity.Add(uint32(capacity))
|
||||
s.run()
|
||||
|
||||
return &s
|
||||
}
|
||||
|
||||
func (s *Scheduler) run() {
|
||||
for i := 0; i < s.workers; i++ {
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
for {
|
||||
select {
|
||||
case t := <-s.chTasks:
|
||||
s.tasks.setStatus(t, Running)
|
||||
|
||||
if err := t.Job(s.ctx); err != nil {
|
||||
log.Err(err).Str("task", t.Name).Msg("error executing task")
|
||||
s.tasks.setStatus(t, Failed)
|
||||
continue
|
||||
}
|
||||
|
||||
s.tasks.setStatus(t, Success)
|
||||
|
||||
for _, nt := range t.Next {
|
||||
s.Submit(nt)
|
||||
}
|
||||
case <-s.ctx.Done():
|
||||
log.Warn().Msg("context done, stopping worker...")
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scheduler) Stop() {
|
||||
s.fnCancel()
|
||||
}
|
||||
|
||||
func (s *Scheduler) Submit(task *Task) error {
|
||||
cap := s.capacity.Load()
|
||||
if s.tasks.len() >= int(cap) {
|
||||
return ErrSchedulerMaxCapacityReached
|
||||
}
|
||||
|
||||
s.tasks.push(task)
|
||||
s.chTasks <- task
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Scheduler) Done() <-chan struct{} {
|
||||
chDone := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
log.Info().Msg("waiting for scheduler task completion...")
|
||||
s.wg.Wait()
|
||||
chDone <- struct{}{}
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
return chDone
|
||||
}
|
||||
63
utils/utils.go
Normal file
63
utils/utils.go
Normal file
@ -0,0 +1,63 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"compress/gzip"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
func addToArchive(tw *tar.Writer, filename string) error {
|
||||
file, err := os.Open(filename)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
info, err := file.Stat()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
header, err := tar.FileInfoHeader(info, info.Name())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
header.Name = filepath.Base(file.Name())
|
||||
|
||||
if err := tw.WriteHeader(header); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = io.Copy(tw, file)
|
||||
return err
|
||||
}
|
||||
|
||||
func CreateArchive(destDir, name string, files ...string) (string, error) {
|
||||
now := time.Now().UTC()
|
||||
archivePath := filepath.Join(destDir, fmt.Sprintf("%s-%s.tar.gz", name, strings.Replace(now.Format(time.RFC3339), ":", "-", -1)))
|
||||
|
||||
file, err := os.Create(archivePath)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("unable to create archive=%s, err=%v", archivePath, err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
gw := gzip.NewWriter(file)
|
||||
defer gw.Close()
|
||||
|
||||
tw := tar.NewWriter(gw)
|
||||
defer tw.Close()
|
||||
|
||||
for _, f := range files {
|
||||
if err := addToArchive(tw, f); err != nil {
|
||||
return "", fmt.Errorf("unable to add file=%s to archive=%s, err=%v", f, archivePath, err)
|
||||
}
|
||||
}
|
||||
|
||||
return archivePath, nil
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user