package services import ( "encoding/json" "fmt" "io" cfg "mailsrv/config" "mailsrv/mail" "mailsrv/runtime" "os" "os/signal" "path" "strings" "syscall" "time" "net/http" "net/smtp" "github.com/rs/zerolog/log" ) const ( TickerInterval time.Duration = 10 * time.Second JSONSuffix string = ".json" ErrorSuffix string = ".err" ) type Sender struct { smtpConfig cfg.SMTPConfig // fetch this directory to collect `.json` e-mail format outboxPath string queue *runtime.Queue } func NewSender(config cfg.SMTPConfig, outboxPath string) Sender { return Sender{ smtpConfig: config, outboxPath: outboxPath, queue: runtime.NewQueue(), } } func (s Sender) SendMail(email mail.Email) error { auth := smtp.PlainAuth("", s.smtpConfig.User, s.smtpConfig.Password, s.smtpConfig.Url) log.Debug().Msg("SMTP authentication succeed") if err := smtp.SendMail(s.smtpConfig.GetFullUrl(), auth, email.Sender, email.GetReceivers(), email.Generate()); err != nil { return err } log.Debug().Msg("mail send successfully") return nil } func (s Sender) mailHandler(w http.ResponseWriter, r *http.Request) { content, err := io.ReadAll(r.Body) if err != nil { log.Err(err).Msg("unable to read request body") w.WriteHeader(http.StatusInternalServerError) return } var mail mail.Email if err := json.Unmarshal(content, &mail); err != nil { log.Err(err).Msg("unable to deserialized request body into mail") w.WriteHeader(http.StatusInternalServerError) return } s.queue.Add(mail) w.WriteHeader(http.StatusOK) } func (s Sender) runHTTPserver() { mux := http.NewServeMux() mux.HandleFunc("/mail", s.mailHandler) if err := http.ListenAndServe(":1212", mux); err != nil { log.Err(err).Msg("http server stops listening") } } // watchOutbox reads the `outbox` directory every `TickInterval` and put JSON format e-mail in the queue. func (s Sender) watchOutbox() { log.Info().Str("outbox", s.outboxPath).Msg("start watching outbox directory") ticker := time.NewTicker(TickerInterval) go func() { for range ticker.C { log.Debug().Str("action", "retrieving json e-mail format...").Str("path", s.outboxPath) files, err := os.ReadDir(s.outboxPath) if err != nil && !os.IsExist(err) { log.Err(err).Msg("outbox directory does not exist") s.queue.Shutdown() } for _, file := range files { filename := file.Name() if !strings.HasSuffix(filename, JSONSuffix) { log.Debug().Str("filename", filename).Msg("incorrect suffix") continue } path := path.Join(s.outboxPath, filename) email, err := mail.FromJSON(path) if err != nil { log.Err(err).Str("path", path).Msg("unable to parse JSON email") // if JSON parsing failed the `path` is renamed with an error suffix to not watch it again newPath := fmt.Sprintf("%s%s", path, ErrorSuffix) if err := os.Rename(path, newPath); err != nil { log.Err(err).Str("path", path).Str("new path", newPath).Msg("unable to rename bad JSON email path") } continue } email.Path = path s.queue.Add(email) } } }() } // processNextEmail iterates over the queue and send email. func (s Sender) processNextEmail() bool { item, quit := s.queue.Get() if quit { return false } defer s.queue.Done(item) email, ok := item.(mail.Email) if !ok { log.Error().Any("item", item).Msg("unable to cast queue item into mail.Email") return true } if err := s.SendMail(email); err != nil { log.Err(err).Msg("unable to send the email") } if path := email.Path; path != "" { if err := os.Remove(path); err != nil { // this is a fatal error, can't send same e-mail indefinitely if !os.IsExist(err) { log.Err(err).Str("path", path).Msg("unable to remove the JSON email") s.queue.Shutdown() } } } return true } // run starts processing the queue. func (s Sender) run() <-chan struct{} { chQueue := make(chan struct{}) go func() { for s.processNextEmail() { } chQueue <- struct{}{} }() return chQueue } // Run launches the queue processing and the outbox watcher. // It catches `SIGINT` and `SIGTERM` to properly stopped the queue. func (s Sender) Run() { log.Info().Msg("sender service is running") chSignal := make(chan os.Signal, 1) signal.Notify(chSignal, os.Interrupt, syscall.SIGTERM) s.watchOutbox() chQueue := s.run() go s.runHTTPserver() select { case <-chSignal: log.Warn().Msg("stop signal received, stopping e-mail queue...") s.queue.Shutdown() case <-chQueue: log.Info().Msg("e-mail queue stopped successfully") } log.Info().Msg("sender service stopped successfully") }