diff --git a/deployers/commons.go b/deployers/commons.go index ffec0f0..0c52089 100644 --- a/deployers/commons.go +++ b/deployers/commons.go @@ -1,11 +1,108 @@ package deployers -import "errors" +import ( + "context" + "errors" + "sync/atomic" + "time" + + "gitea.thegux.fr/hmdeploy/models" + "github.com/rs/zerolog/log" +) var ErrContextDone = errors.New("unable to execute, context done") type IDeployer interface { + Type() DeployerType Deploy() error Build() error Clear() error + Error() error + Done() <-chan struct{} +} + +type DeployerType string + +const ( + Nginx DeployerType = "nginx" + Swarm DeployerType = "swarm" + + GracefulTimeout = 10 * time.Second +) + +type deployer struct { + ctx context.Context + + type_ DeployerType + project *models.Project + + processing atomic.Bool + chDone chan struct{} + errFlag error +} + +func newDeployer(ctx context.Context, type_ DeployerType, project *models.Project) *deployer { + d := &deployer{ + ctx: ctx, + type_: type_, + project: project, + processing: atomic.Bool{}, + chDone: make(chan struct{}, 1), + } + + d.processing.Store(false) + return d +} + +func (d *deployer) setDone(err error) { + d.chDone <- struct{}{} + d.errFlag = err +} + +func (d *deployer) Type() DeployerType { + return d.type_ +} + +func (d *deployer) Error() error { + return d.errFlag +} + +func (d *deployer) Done() <-chan struct{} { + chDone := make(chan struct{}) + go func() { + defer func() { + close(chDone) + }() + + for { + select { + case <-d.ctx.Done(): + log.Warn().Str("deployer", string(d.type_)).Msg("context done catch") + + timeout := time.NewTicker(GracefulTimeout) + tick := time.NewTicker(time.Second) + for { + select { + case <-timeout.C: + log.Error(). + Str("deployer", string(d.type_)). + Msg("timeout while waiting for graceful shutdown") + chDone <- struct{}{} + return + case <-tick.C: + if !d.processing.Load() { + chDone <- struct{}{} + return + } + tick.Reset(1 * time.Second) + } + } + case <-d.chDone: + log.Info().Str("deployer", string(d.type_)).Msg("terminated") + chDone <- struct{}{} + return + } + } + }() + return chDone } diff --git a/deployers/nginx.go b/deployers/nginx.go index 027d3de..7cd2911 100644 --- a/deployers/nginx.go +++ b/deployers/nginx.go @@ -4,8 +4,6 @@ import ( "context" "fmt" "path/filepath" - "sync/atomic" - "time" "gitea.thegux.fr/hmdeploy/connection" "gitea.thegux.fr/hmdeploy/models" @@ -13,23 +11,17 @@ import ( ) type NginxDeployer struct { - ctx context.Context - - conn connection.IConnection - project *models.Project - - processing atomic.Bool - chDone chan struct{} - errFlag error + *deployer + conn connection.IConnection } var _ IDeployer = (*NginxDeployer)(nil) func NewNginxDeployer( ctx context.Context, - netInfo *models.HMNetInfo, project *models.Project, -) (*NginxDeployer, error) { + netInfo *models.HMNetInfo, +) (NginxDeployer, error) { var nd NginxDeployer conn, err := connection.NewSSHConn( @@ -39,17 +31,13 @@ func NewNginxDeployer( netInfo.SSH.PrivKey, ) if err != nil { - return &nd, err + return nd, err } nd.conn = &conn - nd.project = project - nd.ctx = ctx - nd.processing = atomic.Bool{} - nd.processing.Store(false) - nd.chDone = make(chan struct{}, 1) + nd.deployer = newDeployer(ctx, Nginx, project) - return &nd, nil + return nd, nil } func (nd *NginxDeployer) close() error { @@ -61,54 +49,6 @@ func (nd *NginxDeployer) clean() (err error) { return } -func (nd *NginxDeployer) setDone(err error) { - nd.chDone <- struct{}{} - nd.errFlag = err -} - -func (nd *NginxDeployer) Error() error { - return nd.errFlag -} - -func (nd *NginxDeployer) Done() <-chan struct{} { - chDone := make(chan struct{}) - go func() { - defer func() { - close(chDone) - }() - - for { - select { - case <-nd.ctx.Done(): - log.Warn().Str("deployer", "swarm").Msg("context done catch") - - timeout := time.NewTicker(10 * time.Second) //nolint:mnd //TODO: to refactor - tick := time.NewTicker(time.Second) - for { - select { - case <-timeout.C: - log.Error(). - Msg("timeout while waiting for graceful swarm deployer shutdown") - chDone <- struct{}{} - return - case <-tick.C: - if !nd.processing.Load() { - chDone <- struct{}{} - return - } - tick.Reset(1 * time.Second) - } - } - case <-nd.chDone: - log.Info().Str("deployer", "nginx").Msg("terminated") - chDone <- struct{}{} - return - } - } - }() - return chDone -} - func (nd *NginxDeployer) Clear() error { log.Debug().Msg("clearing nginx deployment...") diff --git a/deployers/swarm.go b/deployers/swarm.go index 12521f1..98ffd2a 100644 --- a/deployers/swarm.go +++ b/deployers/swarm.go @@ -6,8 +6,6 @@ import ( "fmt" "os" "path/filepath" - "sync/atomic" - "time" "gitea.thegux.fr/hmdeploy/connection" "gitea.thegux.fr/hmdeploy/docker" @@ -19,27 +17,20 @@ import ( var ErrSwarmDeployerNoArchive = errors.New("no archive found to be deployed") type SwarmDeployer struct { - ctx context.Context - - conn connection.IConnection - dcli docker.IClient - - project *models.Project + *deployer + conn connection.IConnection + dcli docker.IClient archivePath string - - processing atomic.Bool - chDone chan struct{} - errFlag error } var _ IDeployer = (*SwarmDeployer)(nil) func NewSwarmDeployer( ctx context.Context, - dockerClient docker.IClient, - netInfo *models.HMNetInfo, project *models.Project, -) (*SwarmDeployer, error) { + netInfo *models.HMNetInfo, + dockerClient docker.IClient, +) (SwarmDeployer, error) { var sd SwarmDeployer conn, err := connection.NewSSHConn( @@ -49,18 +40,14 @@ func NewSwarmDeployer( netInfo.SSH.PrivKey, ) if err != nil { - return &sd, err + return sd, err } - sd.ctx = ctx sd.conn = &conn sd.dcli = dockerClient - sd.project = project - sd.processing = atomic.Bool{} - sd.processing.Store(false) - sd.chDone = make(chan struct{}, 1) + sd.deployer = newDeployer(ctx, Swarm, project) - return &sd, nil + return sd, nil } func (sd *SwarmDeployer) close() error { @@ -75,53 +62,6 @@ func (sd *SwarmDeployer) clean() (err error) { return } -func (sd *SwarmDeployer) setDone(err error) { - sd.chDone <- struct{}{} - sd.errFlag = err -} - -func (sd *SwarmDeployer) Error() error { - return sd.errFlag -} - -func (sd *SwarmDeployer) Done() <-chan struct{} { - chDone := make(chan struct{}) - go func() { - defer func() { - close(chDone) - }() - for { - select { - case <-sd.ctx.Done(): - log.Warn().Str("deployer", "swarm").Msg("context done catch") - - timeout := time.NewTicker(10 * time.Second) //nolint:mnd //TODO: to refactor - tick := time.NewTicker(time.Second) - for { - select { - case <-timeout.C: - log.Error(). - Msg("timeout while waiting for graceful swarm deployer shutdown") - chDone <- struct{}{} - return - case <-tick.C: - if !sd.processing.Load() { - chDone <- struct{}{} - return - } - tick.Reset(1 * time.Second) - } - } - case <-sd.chDone: - log.Info().Str("deployer", "swarm").Msg("terminated") - chDone <- struct{}{} - return - } - } - }() - return chDone -} - func (sd *SwarmDeployer) Clear() error { log.Debug().Msg("clearing swarm deployment...") diff --git a/main.go b/main.go index 3ff58b2..04e367e 100644 --- a/main.go +++ b/main.go @@ -3,10 +3,13 @@ package main import ( "context" "encoding/json" + "errors" "flag" + "fmt" "os" "os/signal" "path" + "sync" "gitea.thegux.fr/hmdeploy/deployers" "gitea.thegux.fr/hmdeploy/docker" @@ -22,15 +25,138 @@ const ( SchedulerNbWorkers uint8 = 4 SchedulerQueueCapacity uint32 = 30 + + MaxDeployers int = 2 ) var HOME_PATH = os.Getenv("HOME") +var ( + ErrNetInfoNotFound = errors.New("unable to get net info") + ErrDeployerInit = errors.New("unable to initialize deployer") + ErrGenerateTasksTree = errors.New("unable to generate tasks tree") +) + func initLogger() { zerolog.TimeFieldFormat = zerolog.TimeFormatUnix log.Logger = log.With().Caller().Logger().Output(zerolog.ConsoleWriter{Out: os.Stderr}) } +func loadHMMap() (models.HMMap, error) { + var hmmap models.HMMap + + hmmap_path := path.Join(HOME_PATH, HMDeployDirname, NetworkFilename) + c, err := os.ReadFile(hmmap_path) + if err != nil { + return hmmap, fmt.Errorf( + "unable to load configuration from src=%s, err=%v", + hmmap_path, + err, + ) + } + + if err := json.Unmarshal(c, &hmmap); err != nil { + return hmmap, fmt.Errorf( + "unable to parse configuration from src=%s, err=%v", + hmmap_path, + err, + ) + } + + log.Info().Str("conf", hmmap_path).Msg("hmmap load successfully") + return hmmap, nil +} + +func initDeployers( + ctx context.Context, + hmmap *models.HMMap, + project *models.Project, +) ([]deployers.IDeployer, error) { + swarmNet := hmmap.GetSwarmNetInfo() + if swarmNet == nil { + return nil, fmt.Errorf("%w, swarm net info does not exist", ErrNetInfoNotFound) + } + + dcli := docker.NewClient() + sd, err := deployers.NewSwarmDeployer(ctx, project, swarmNet, &dcli) + if err != nil { + return nil, fmt.Errorf("%w, unable to init swarm deployer, err=%v", ErrDeployerInit, err) + } + + var nd deployers.IDeployer + if project.Deps.NginxFile != "" { + nginxNet := hmmap.GetNginxNetInfo() + if nginxNet == nil { + return nil, fmt.Errorf("%w, nginx net info does not exist", ErrNetInfoNotFound) + } + + d, err := deployers.NewNginxDeployer(ctx, project, nginxNet) + if err != nil { + return nil, fmt.Errorf( + "%w, unable to init nginx deployer, err=%v", + ErrDeployerInit, + err, + ) + } + + nd = &d + } + + return []deployers.IDeployer{&sd, nd}, nil +} + +func generateTasksTree(deployers []deployers.IDeployer) ([]*scheduler.Task, error) { + if len(deployers) != MaxDeployers { + return nil, fmt.Errorf("%w, deployers len should be equals to 2", ErrGenerateTasksTree) + } + + sd := deployers[0] + nd := deployers[1] + + tasks := []*scheduler.Task{} + + var swarmTask *scheduler.Task + if nd != nil { + deployNginx := scheduler.NewTask("nginx-deploy", nd.Deploy) + swarmTask = scheduler.NewTask("swarm-deploy", sd.Deploy, deployNginx) + } else { + swarmTask = scheduler.NewTask("swarm-deploy", sd.Deploy) + } + + swarmTask = scheduler.NewTask("swarm-build", sd.Build, swarmTask) + tasks = append(tasks, swarmTask, scheduler.NewTask("nginx-build", nd.Build)) + return tasks, nil +} + +func waitForCompletion(deployers []deployers.IDeployer, s *scheduler.Scheduler) error { + var wg sync.WaitGroup + + for idx := range deployers { + if d := deployers[idx]; d != nil { + wg.Add(1) + go func() { + defer wg.Done() + <-d.Done() + }() + } + } + + wg.Wait() + + var errs []error + for idx := range deployers { + if d := deployers[idx]; d != nil { + errs = append(errs, d.Error()) + s.Submit(scheduler.NewTask(string(d.Type()), d.Clear)) //nolint: errcheck // TODO + } + } + + s.Stop() + <-s.Done() + + return errors.Join(errs...) +} + func main() { //nolint: funlen // TODO: to rework ctx, _ := signal.NotifyContext( context.Background(), @@ -44,18 +170,11 @@ func main() { //nolint: funlen // TODO: to rework projectDir := flag.String("path", ".", "define the .homeserver project root dir") flag.Parse() - hmmap_path := path.Join(HOME_PATH, HMDeployDirname, NetworkFilename) - c, err := os.ReadFile(hmmap_path) + hmmap, err := loadHMMap() if err != nil { - log.Fatal().Err(err).Str("conf", hmmap_path).Msg("unable to load configuration") + log.Fatal().Err(err).Msg("failed to load conf") } - var hmmap models.HMMap - if err := json.Unmarshal(c, &hmmap); err != nil { - log.Fatal().Err(err).Str("conf", hmmap_path).Msg("unable to parse configuration") - } - log.Info().Str("conf", hmmap_path).Msg("hmmap load successfully") - project, err := models.ProjectFromDir(*projectDir) if err != nil { log.Fatal().Str("dir", *projectDir).Err(err).Msg("unable to init project from directory") @@ -65,64 +184,28 @@ func main() { //nolint: funlen // TODO: to rework Str("name", project.Name). Msg("project initialized with success") - swarmNet := hmmap.GetSwarmNetInfo() - if swarmNet == nil { - log.Fatal().Err(err).Msg("unable to get swarm net info, does not exist") - } - dcli := docker.NewClient() - - sd, err := deployers.NewSwarmDeployer(ctx, &dcli, swarmNet, &project) + deployers, err := initDeployers(ctx, &hmmap, &project) if err != nil { - log.Fatal().Err(err).Msg("unable to init swarm deployer") + log.Fatal().Err(err).Msg("unable to init deployers") } - var nd *deployers.NginxDeployer - if project.Deps.NginxFile != "" { - nginxNet := hmmap.GetNginxNetInfo() - if nginxNet == nil { - log.Err(err).Msg("unable to get nginx net info, does not exist") - return - } - - d, err := deployers.NewNginxDeployer(ctx, nginxNet, &project) - if err != nil { - log.Err(err).Msg("unable to nginx conf") - return - } - - nd = d - } - - var deploySwarm *scheduler.Task - if nd != nil { - deployNginx := scheduler.NewTask("nginx-deploy", nd.Deploy) - deploySwarm = scheduler.NewTask("swarm-deploy", sd.Deploy, deployNginx) - } else { - deploySwarm = scheduler.NewTask("swarm-deploy", sd.Deploy) + tasks, err := generateTasksTree(deployers) + if err != nil { + log.Fatal().Err(err).Msg("unable to generate tasks tree") } s := scheduler.NewScheduler( context.Background(), SchedulerQueueCapacity, SchedulerNbWorkers, + tasks..., ) - s.Submit(scheduler.NewTask("swarm-build", sd.Build, deploySwarm)) //nolint: errcheck // TODO - if nd != nil { - s.Submit(scheduler.NewTask("nginx-build", nd.Build)) //nolint: errcheck // TODO - } - <-sd.Done() - <-nd.Done() - - s.Submit(scheduler.NewTask("nginx-clear", nd.Clear)) //nolint: errcheck // TODO - s.Submit(scheduler.NewTask("swarm-clear", sd.Clear)) //nolint: errcheck // TODO - - s.Stop() - <-s.Done() - - if sd.Error() != nil || nd.Error() != nil { - log.Error().Str("name", project.Name).Msg("unable to deploy project, see logs for details") - return + if err := waitForCompletion(deployers, s); err != nil { + log.Fatal(). + Err(err). + Str("name", project.Name). + Msg("unable to deploy project, see logs for details") } log.Info().Str("name", project.Name).Msg("project deployed successfully") diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 7cc551f..659e72a 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -90,7 +90,7 @@ type Scheduler struct { tasks taskStore } -func NewScheduler(ctx context.Context, capacity uint32, workers uint8) *Scheduler { +func NewScheduler(ctx context.Context, capacity uint32, workers uint8, tasks ...*Task) *Scheduler { ctxChild, fnCancel := context.WithCancel(ctx) s := Scheduler{ ctx: ctxChild, @@ -104,6 +104,12 @@ func NewScheduler(ctx context.Context, capacity uint32, workers uint8) *Schedule s.capacity.Add(capacity) s.run() + if tasks != nil { + for idx := range tasks { + s.Submit(tasks[idx]) //nolint: errcheck // TODO + } + } + return &s }