diff --git a/deployers/commons.go b/deployers/commons.go index b24c723..7b3d5cc 100644 --- a/deployers/commons.go +++ b/deployers/commons.go @@ -1,12 +1,10 @@ package deployers -import "context" - var ErrContextDone = "unable to execute, context done" type IDeployer interface { - Deploy(ctx context.Context) error - Build(ctx context.Context) error - Clear(ctx context.Context) error + Deploy() error + Build() error + Clear() error Done() <-chan struct{} } diff --git a/deployers/nginx.go b/deployers/nginx.go index 17dde4d..d1baee6 100644 --- a/deployers/nginx.go +++ b/deployers/nginx.go @@ -12,15 +12,17 @@ import ( ) type NginxDeployer struct { - conn connection.IConnection - project *models.Project - archivePath string - chDone chan struct{} + ctx context.Context + + conn connection.IConnection + project *models.Project + + chDone chan struct{} } var _ IDeployer = (*NginxDeployer)(nil) -func NewNginxDeployer(netInfo *models.HMNetInfo, project *models.Project) (NginxDeployer, error) { +func NewNginxDeployer(ctx context.Context, netInfo *models.HMNetInfo, project *models.Project) (NginxDeployer, error) { var nd NginxDeployer conn, err := connection.NewSSHConn(netInfo.IP.String(), netInfo.SSH.User, netInfo.SSH.Port, netInfo.SSH.PrivKey) @@ -30,16 +32,17 @@ func NewNginxDeployer(netInfo *models.HMNetInfo, project *models.Project) (Nginx nd.conn = &conn nd.project = project - nd.chDone = make(chan struct{}, 1) + nd.chDone = make(chan struct{}, 5) + nd.ctx = ctx return nd, nil } -func (nd *NginxDeployer) close(ctx context.Context) error { +func (nd *NginxDeployer) close() error { return nd.conn.Close() } -func (nd *NginxDeployer) clean(ctx context.Context) (err error) { +func (nd *NginxDeployer) clean() (err error) { _, err = nd.conn.Execute("rm -f " + nd.project.Name + ".conf") return } @@ -49,17 +52,30 @@ func (nd *NginxDeployer) setDone() { } func (nd *NginxDeployer) Done() <-chan struct{} { - return nd.chDone + chDone := make(chan struct{}) + go func() { + for { + select { + case <-nd.chDone: + chDone <- struct{}{} + return + case <-nd.ctx.Done(): + chDone <- struct{}{} + return + } + } + }() + return chDone } -func (nd *NginxDeployer) Clear(ctx context.Context) error { +func (nd *NginxDeployer) Clear() error { log.Debug().Msg("clearing nginx deployment...") - if err := nd.clean(ctx); err != nil { + if err := nd.clean(); err != nil { log.Err(err).Msg("unable to clean nginx conf remotly") } - if err := nd.close(ctx); err != nil { + if err := nd.close(); err != nil { log.Err(err).Msg("unable to close nginx conn") } @@ -67,11 +83,11 @@ func (nd *NginxDeployer) Clear(ctx context.Context) error { return nil } -func (nd *NginxDeployer) Build(ctx context.Context) error { +func (nd *NginxDeployer) Build() error { select { - case <-ctx.Done(): + case <-nd.ctx.Done(): nd.setDone() - return fmt.Errorf("%w, nginx close ssh conn skipped", ErrContextDone) + return fmt.Errorf("%w, build nginx archive skipped", ErrContextDone) default: } @@ -89,12 +105,12 @@ func (nd *NginxDeployer) Build(ctx context.Context) error { return nil } -func (nd *NginxDeployer) Deploy(ctx context.Context) (err error) { +func (nd *NginxDeployer) Deploy() (err error) { defer nd.setDone() select { - case <-ctx.Done(): - return fmt.Errorf("%w, nginx close ssh conn skipped", ErrContextDone) + case <-nd.ctx.Done(): + return fmt.Errorf("%w, nginx deployment skipped", ErrContextDone) default: } diff --git a/deployers/swarm.go b/deployers/swarm.go index 5de5526..13a34a8 100644 --- a/deployers/swarm.go +++ b/deployers/swarm.go @@ -15,6 +15,8 @@ import ( ) type SwarmDeployer struct { + ctx context.Context + conn connection.IConnection dcli docker.IClient @@ -26,7 +28,7 @@ type SwarmDeployer struct { var _ IDeployer = (*SwarmDeployer)(nil) -func NewSwarmDeployer(dockerClient docker.IClient, netInfo *models.HMNetInfo, project *models.Project) (SwarmDeployer, error) { +func NewSwarmDeployer(ctx context.Context, dockerClient docker.IClient, netInfo *models.HMNetInfo, project *models.Project) (SwarmDeployer, error) { var sd SwarmDeployer conn, err := connection.NewSSHConn(netInfo.IP.String(), netInfo.SSH.User, netInfo.SSH.Port, netInfo.SSH.PrivKey) @@ -34,19 +36,20 @@ func NewSwarmDeployer(dockerClient docker.IClient, netInfo *models.HMNetInfo, pr return sd, err } + sd.ctx = ctx sd.conn = &conn sd.dcli = dockerClient sd.project = project - sd.chDone = make(chan struct{}, 1) + sd.chDone = make(chan struct{}, 5) return sd, nil } -func (sd *SwarmDeployer) close(ctx context.Context) error { +func (sd *SwarmDeployer) close() error { return sd.conn.Close() } -func (sd *SwarmDeployer) clean(ctx context.Context) (err error) { +func (sd *SwarmDeployer) clean() (err error) { defer os.Remove(sd.archivePath) _, err = sd.conn.Execute(fmt.Sprintf("rm -f %s %s *.tar.gz *.tar", models.ComposeFile, models.EnvFile)) return @@ -57,17 +60,30 @@ func (sd *SwarmDeployer) setDone() { } func (sd *SwarmDeployer) Done() <-chan struct{} { - return sd.chDone + chDone := make(chan struct{}) + go func() { + for { + select { + case <-sd.chDone: + chDone <- struct{}{} + return + case <-sd.ctx.Done(): + chDone <- struct{}{} + return + } + } + }() + return chDone } -func (sd *SwarmDeployer) Clear(ctx context.Context) error { +func (sd *SwarmDeployer) Clear() error { log.Debug().Msg("clearing swarm deployment...") - if err := sd.clean(ctx); err != nil { + if err := sd.clean(); err != nil { log.Err(err).Msg("unable to clean swarm conf remotly") } - if err := sd.close(ctx); err != nil { + if err := sd.close(); err != nil { log.Err(err).Msg("unable to close swarm conn") } @@ -76,9 +92,9 @@ func (sd *SwarmDeployer) Clear(ctx context.Context) error { return nil } -func (sd *SwarmDeployer) Build(ctx context.Context) error { +func (sd *SwarmDeployer) Build() error { select { - case <-ctx.Done(): + case <-sd.ctx.Done(): sd.setDone() return fmt.Errorf("%w, swarm project build skipped", ErrContextDone) default: @@ -121,12 +137,12 @@ func (sd *SwarmDeployer) Build(ctx context.Context) error { return nil } -func (sd *SwarmDeployer) Deploy(ctx context.Context) error { +func (sd *SwarmDeployer) Deploy() error { defer sd.setDone() select { - case <-ctx.Done(): - return fmt.Errorf("%w, nginx close ssh conn skipped", ErrContextDone) + case <-sd.ctx.Done(): + return fmt.Errorf("%w, swarm deployment skipped", ErrContextDone) default: } diff --git a/main.go b/main.go index 3feab0f..e28fb7b 100644 --- a/main.go +++ b/main.go @@ -65,7 +65,7 @@ func main() { } dcli := docker.NewClient() - sd, err := deployers.NewSwarmDeployer(&dcli, swarmNet, &project) + sd, err := deployers.NewSwarmDeployer(ctx, &dcli, swarmNet, &project) if err != nil { log.Fatal().Err(err).Msg("unable to init swarm deployer") } @@ -78,7 +78,7 @@ func main() { return } - d, err := deployers.NewNginxDeployer(nginxNet, &project) + d, err := deployers.NewNginxDeployer(ctx, nginxNet, &project) if err != nil { log.Err(err).Msg("unable to nginx conf") return @@ -90,7 +90,7 @@ func main() { deployNginx := scheduler.NewTask("nginx-deploy", nd.Deploy) deploySwarm := scheduler.NewTask("swarm-deploy", sd.Deploy, deployNginx) - s := scheduler.NewScheduler(ctx, 30, 4) + s := scheduler.NewScheduler(context.Background(), 30, 4) s.Submit(scheduler.NewTask("swarm-build", sd.Build, deploySwarm)) s.Submit(scheduler.NewTask("nginx-build", nd.Build)) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 1151fd4..22dfc8b 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -9,7 +9,10 @@ import ( "github.com/rs/zerolog/log" ) -var ErrSchedulerMaxCapacityReached = errors.New("unable to add new task, max capacity reached") +var ( + ErrSchedulerMaxCapacityReached = errors.New("unable to add new task, max capacity reached") + ErrSchedulerContextDone = errors.New("context done, scheduler stopped") +) type TaskStatus string @@ -20,7 +23,7 @@ const ( Failed = "failed" ) -type FnJob func(context.Context) error +type FnJob func() error type taskStore struct { l sync.RWMutex @@ -114,7 +117,7 @@ func (s *Scheduler) run() { case t := <-s.chTasks: s.tasks.setStatus(t, Running) - if err := t.Job(s.ctx); err != nil { + if err := t.Job(); err != nil { log.Err(err).Str("task", t.Name).Msg("error executing task") s.tasks.setStatus(t, Failed) continue @@ -139,6 +142,13 @@ func (s *Scheduler) Stop() { } func (s *Scheduler) Submit(task *Task) error { + select { + case <-s.ctx.Done(): + log.Error().Msg("unable to submit new task, scheduler is stopping...") + return ErrSchedulerContextDone + default: + } + cap := s.capacity.Load() if s.tasks.len() >= int(cap) { return ErrSchedulerMaxCapacityReached