package watchers import ( "context" "errors" "fmt" "strings" "localenv/collector" "localenv/services" "localenv/utils" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/events" "github.com/docker/docker/client" "github.com/rs/zerolog/log" ) var ( ErrUninitializeEvents = errors.New("uninitialize events channel") ErrClientConnection = errors.New("unable to connect to the docker daemon") ErrParseServiceName = errors.New("unable to parse service name") ErrServiceEmptyImage = errors.New("no image found in service") ) func parseImageName(imageName string) string { if imageName == "" { return "" } values := strings.Split(imageName, "/") r := values[len(values)-1:][0] r = strings.TrimPrefix(r, "localenv-") r = strings.TrimSuffix(r, ":latest") r = strings.TrimSuffix(r, "-local") return fmt.Sprintf("localenv-%s", r) } type Watcher struct { ctx context.Context fnCancel context.CancelCauseFunc hostCLI *client.Client swarmCLI *client.Client collector collector.Collector chHostEvents <-chan events.Message chHostEventsErr <-chan error chSwarmEvents <-chan events.Message chSwarmEventsErr <-chan error chDone chan struct{} } func NewWatcher(ctx context.Context, hostCLI, swarmCLI *client.Client) (Watcher, error) { chidlCtx, cancel := context.WithCancelCause(ctx) chHostEvents, chHostEventsErr := hostCLI.Events(ctx, types.EventsOptions{}) chSwarmEvents, chSwarmEventsErr := swarmCLI.Events(ctx, types.EventsOptions{}) c := collector.NewCollector(hostCLI, swarmCLI) return Watcher{ ctx: chidlCtx, fnCancel: cancel, hostCLI: hostCLI, swarmCLI: swarmCLI, collector: c, chHostEvents: chHostEvents, chHostEventsErr: chHostEventsErr, chSwarmEvents: chSwarmEvents, chSwarmEventsErr: chSwarmEventsErr, chDone: make(chan struct{}), }, nil } // TODO(rmanach): in order to watch image built, force the image tag: // @docker image tag my-image:latest my-image:latest func (w *Watcher) watchHostEvents() error { if w.chHostEvents == nil { return fmt.Errorf("%w: host events", ErrUninitializeEvents) } if w.chHostEventsErr == nil { return fmt.Errorf("%w: host events err", ErrUninitializeEvents) } go func() { for { select { case evt := <-w.chHostEvents: log.Info().Str("event", evt.Actor.ID).Str("action", evt.Action).Str("type", evt.Type).Msg("host event received") switch evt.Type { case events.ImageEventType: // handle only this action, inf loop with the collector if evt.Action == "tag" { if err := w.handleImageEvent(w.ctx, evt.Actor.ID); err != nil { log.Err(err).Str("image", evt.Actor.ID).Msg("unable to handle image") } } default: } case err := <-w.chHostEventsErr: log.Err(err).Msg("error occurred in host stream events") case <-w.ctx.Done(): log.Error().Str("events", "host").Msg("context done") return } } }() return nil } // handleImageEvent handles image tag events. // It will build the service name from the tag, delete the service and the image on the swarm // and thanks to the `collector` redeploys the image and restart the service. // // TODO(rmanach): brute update, should be nice to compare image sha before update. // TODO(rmanach): deploy with migrations and data func (w *Watcher) handleImageEvent(ctx context.Context, imageID string) error { inspect, _, err := w.hostCLI.ImageInspectWithRaw(ctx, imageID) if err != nil { return err } imageNameWithTag := inspect.RepoTags[0] serviceName := parseImageName(imageNameWithTag) srv, err := utils.GetServiceByName(ctx, w.swarmCLI, serviceName) if err != nil { return err } if err := w.swarmCLI.ServiceRemove(ctx, srv.ID); err != nil { return err } if srv.Spec.TaskTemplate.ContainerSpec == nil { return ErrServiceEmptyImage } if err := utils.RemoveImage(ctx, w.swarmCLI, srv.Spec.TaskTemplate.ContainerSpec.Image); err != nil { return err } if err := w.collector.DeployImages(ctx, []string{imageNameWithTag}); err != nil { return err } if err := utils.CreateService(ctx, w.swarmCLI, &srv.Spec); err != nil { return err } if err := utils.CheckServiceHealthWithRetry(ctx, w.swarmCLI, srv.Spec.Name); err != nil { return err } log.Info().Str("service", serviceName).Str("image", imageNameWithTag).Msg("service update") // nginx needs update too to avoid 502 error if err := utils.UpdateServiceByName(ctx, w.swarmCLI, fmt.Sprintf("localenv-%s", services.NginxServiceName)); err != nil { return err } return nil } // TODO(rmanach): impl a graceful stop func (w *Watcher) Stop() { w.fnCancel(nil) } func (w *Watcher) Watch() { if err := w.watchHostEvents(); err != nil { log.Err(err).Msg("while watching host events") w.fnCancel(err) return } log.Info().Msg("watcher is listening events...") <-w.ctx.Done() }