package services import ( "fmt" cfg "mailsrv/config" "mailsrv/mail" "mailsrv/runtime" "os" "os/signal" "path" "strings" "syscall" "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "net/smtp" ) const ( TickerInterval time.Duration = 10 * time.Second JSONSuffix string = ".json" ErrorSuffix string = ".err" ) type Sender struct { smtpConfig cfg.SMTPConfig logger log.Logger // fetch this directory to collect `.json` e-mail format outboxPath string queue *runtime.Queue } func NewSender(logger log.Logger, config cfg.SMTPConfig, outboxPath string) Sender { logger = log.With(logger, "actor", "sender") return Sender{ smtpConfig: config, logger: logger, outboxPath: outboxPath, queue: runtime.NewQueue(), } } func (s Sender) SendMail(email mail.Email) error { auth := smtp.PlainAuth("", s.smtpConfig.User, s.smtpConfig.Password, s.smtpConfig.Url) level.Debug(s.logger).Log("msg", "SMTP authentication succeed") if err := smtp.SendMail(s.smtpConfig.GetFullUrl(), auth, email.Sender, email.Receivers, email.Generate()); err != nil { level.Error(s.logger).Log("msg", "error while sending email", "err", err) return err } level.Debug(s.logger).Log("msg", "mail send successfully") return nil } // watchOutbox reads the `outbox` directory every `TickInterval` and put JSON format e-mail in the queue func (s Sender) watchOutbox() { s.logger.Log("msg", "start watching outbox directory", "outbox", s.outboxPath) ticker := time.NewTicker(TickerInterval) go func() { for _ = range ticker.C { level.Debug(s.logger).Log("action", "retrieving json e-mail format...", "path", s.outboxPath) files, err := os.ReadDir(s.outboxPath) if err != nil && !os.IsExist(err) { level.Error(s.logger).Log("msg", "outbox directory does not exist", "err", err) s.queue.Shutdown() } for _, file := range files { filename := file.Name() if strings.HasSuffix(filename, JSONSuffix) { s.queue.Add(path.Join(s.outboxPath, filename)) continue } level.Debug(s.logger).Log("msg", "incorrect suffix", "filename", filename) } } }() } // processNextEmail loops over the queue and send email func (s Sender) processNextEmail() bool { item, quit := s.queue.Get() if quit { return false } defer s.queue.Done(item) path, ok := item.(string) if !ok { level.Error(s.logger).Log("msg", "unable to cast queue item into mail.Email", "item", item) return true } email, err := mail.FromJSON(path) if err != nil { level.Error(s.logger).Log("msg", "unable to parse JSON email", "path", path, "err", err) // if JSON parsing failed the `path` is renamed with an error suffix to avoid enqueued it again newPath := fmt.Sprintf("%s%s", path, ErrorSuffix) if err := os.Rename(path, newPath); err != nil { level.Error(s.logger).Log("msg", "unable to rename bad JSON email path", "path", path, "newPath", newPath) s.queue.Shutdown() } return true } // whatever the return, the email will be not enqueued again s.SendMail(email) if err := os.Remove(path); err != nil { // this is a fatal error, can't send same e-mail indefinitely if !os.IsExist(err) { level.Error(s.logger).Log("msg", "unable to remove the JSON email", "path", path, "err", err) s.queue.Shutdown() } } return true } // run starts processing the queue func (s Sender) run() <-chan struct{} { queueCh := make(chan struct{}) go func() { for s.processNextEmail() { } queueCh <- struct{}{} }() return queueCh } // Run launches the queue processing and the outbox watcher // catches `SIGINT` and `SIGTERM` to properly stopped the queue func (s Sender) Run() { s.logger.Log("msg", "sender service is running") sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) s.watchOutbox() queueCh := s.run() select { case <-sigCh: s.logger.Log("msg", "stop signal received, stopping e-mail queue...") s.queue.Shutdown() case <-queueCh: s.logger.Log("msg", "e-mail queue stopped successfully") } s.logger.Log("msg", "sender service stopped successfully") }