localenv/watchers/watcher.go
2023-08-05 22:10:48 +02:00

187 lines
4.8 KiB
Go

package watchers
import (
"context"
"errors"
"fmt"
"strings"
"localenv/collector"
"localenv/services"
"localenv/utils"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/client"
"github.com/rs/zerolog/log"
)
var (
ErrUninitializeEvents = errors.New("uninitialize events channel")
ErrClientConnection = errors.New("unable to connect to the docker daemon")
ErrParseServiceName = errors.New("unable to parse service name")
ErrServiceEmptyImage = errors.New("no image found in service")
)
func parseImageName(imageName string) string {
if imageName == "" {
return ""
}
values := strings.Split(imageName, "/")
r := values[len(values)-1:][0]
r = strings.TrimPrefix(r, "localenv-")
r = strings.TrimSuffix(r, ":latest")
r = strings.TrimSuffix(r, "-local")
return fmt.Sprintf("localenv-%s", r)
}
type Watcher struct {
ctx context.Context
fnCancel context.CancelCauseFunc
hostCLI *client.Client
swarmCLI *client.Client
collector collector.Collector
chHostEvents <-chan events.Message
chHostEventsErr <-chan error
chSwarmEvents <-chan events.Message
chSwarmEventsErr <-chan error
chDone chan struct{}
}
func NewWatcher(ctx context.Context, hostCLI, swarmCLI *client.Client) (Watcher, error) {
chidlCtx, cancel := context.WithCancelCause(ctx)
chHostEvents, chHostEventsErr := hostCLI.Events(ctx, types.EventsOptions{})
chSwarmEvents, chSwarmEventsErr := swarmCLI.Events(ctx, types.EventsOptions{})
c := collector.NewCollector(hostCLI, swarmCLI)
return Watcher{
ctx: chidlCtx,
fnCancel: cancel,
hostCLI: hostCLI,
swarmCLI: swarmCLI,
collector: c,
chHostEvents: chHostEvents,
chHostEventsErr: chHostEventsErr,
chSwarmEvents: chSwarmEvents,
chSwarmEventsErr: chSwarmEventsErr,
chDone: make(chan struct{}),
}, nil
}
// TODO(rmanach): in order to watch image built, force the image tag:
// @docker image tag my-image:latest my-image:latest
func (w *Watcher) watchHostEvents() error {
if w.chHostEvents == nil {
return fmt.Errorf("%w: host events", ErrUninitializeEvents)
}
if w.chHostEventsErr == nil {
return fmt.Errorf("%w: host events err", ErrUninitializeEvents)
}
go func() {
for {
select {
case evt := <-w.chHostEvents:
log.Info().Str("event", evt.Actor.ID).Str("action", evt.Action).Str("type", evt.Type).Msg("host event received")
switch evt.Type {
case events.ImageEventType:
// handle only this action, inf loop with the collector
if evt.Action == "tag" {
if err := w.handleImageEvent(w.ctx, evt.Actor.ID); err != nil {
log.Err(err).Str("image", evt.Actor.ID).Msg("unable to handle image")
}
}
default:
}
case err := <-w.chHostEventsErr:
log.Err(err).Msg("error occurred in host stream events")
case <-w.ctx.Done():
log.Error().Str("events", "host").Msg("context done")
return
}
}
}()
return nil
}
// handleImageEvent handles image tag events.
// It will build the service name from the tag, delete the service and the image on the swarm
// and thanks to the `collector` redeploys the image and restart the service.
//
// TODO(rmanach): brute update, should be nice to compare image sha before update.
// TODO(rmanach): deploy with migrations and data
func (w *Watcher) handleImageEvent(ctx context.Context, imageID string) error {
inspect, _, err := w.hostCLI.ImageInspectWithRaw(ctx, imageID)
if err != nil {
return err
}
imageNameWithTag := inspect.RepoTags[0]
serviceName := parseImageName(imageNameWithTag)
srv, err := utils.GetServiceByName(ctx, w.swarmCLI, serviceName)
if err != nil {
return err
}
if err := w.swarmCLI.ServiceRemove(ctx, srv.ID); err != nil {
return err
}
if srv.Spec.TaskTemplate.ContainerSpec == nil {
return ErrServiceEmptyImage
}
if err := utils.RemoveImage(ctx, w.swarmCLI, srv.Spec.TaskTemplate.ContainerSpec.Image); err != nil {
return err
}
if err := w.collector.DeployImages(ctx, []string{imageNameWithTag}); err != nil {
return err
}
if err := utils.CreateService(ctx, w.swarmCLI, &srv.Spec); err != nil {
return err
}
if err := utils.CheckServiceHealthWithRetry(ctx, w.swarmCLI, srv.Spec.Name); err != nil {
return err
}
log.Info().Str("service", serviceName).Str("image", imageNameWithTag).Msg("service update")
// nginx needs update too to avoid 502 error
if err := utils.UpdateServiceByName(ctx, w.swarmCLI, fmt.Sprintf("localenv-%s", services.NginxServiceName)); err != nil {
return err
}
return nil
}
// TODO(rmanach): impl a graceful stop
func (w *Watcher) Stop() {
w.fnCancel(nil)
}
func (w *Watcher) Watch() {
if err := w.watchHostEvents(); err != nil {
log.Err(err).Msg("while watching host events")
w.fnCancel(err)
return
}
log.Info().Msg("watcher is listening events...")
<-w.ctx.Done()
}