mailsrv/services/sender.go

138 lines
3.3 KiB
Go

package services
import (
cfg "mailsrv/config"
"mailsrv/mail"
"mailsrv/runtime"
"os"
"os/signal"
"path"
"strings"
"syscall"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"net/smtp"
)
const (
TickerInterval time.Duration = 10 * time.Second
JSONSuffix string = ".json"
)
type Sender struct {
smtpConfig cfg.SMTPConfig
logger log.Logger
// fetch this directory to collect `.json` e-mail format
outboxPath string
queue *runtime.Queue
}
func NewSender(logger log.Logger, config cfg.SMTPConfig, outboxPath string) Sender {
logger = log.With(logger, "actor", "sender")
return Sender{
smtpConfig: config,
logger: logger,
outboxPath: outboxPath,
queue: runtime.NewQueue(),
}
}
func (s Sender) SendMail(email mail.Email) error {
auth := smtp.PlainAuth("", s.smtpConfig.User, s.smtpConfig.Password, s.smtpConfig.Url)
s.logger.Log("action", "authentication succeed")
if err := smtp.SendMail(s.smtpConfig.GetFullUrl(), auth, email.Sender, email.Receivers, email.Generate()); err != nil {
level.Error(s.logger).Log("msg", "error while sending email", "err", err)
return err
}
s.logger.Log("msg", "mail send successfully")
return nil
}
// watchOutbox reads the `outbox` directory every `TickInterval` and put JSON format e-mail in the queue
func (s Sender) watchOutbox() {
ticker := time.NewTicker(TickerInterval)
go func() {
for _ = range ticker.C {
s.logger.Log("action", "retrieving json e-mail format...", "path", s.outboxPath)
files, err := os.ReadDir(s.outboxPath)
if err != nil && !os.IsExist(err) {
level.Error(s.logger).Log("msg", "outbox directory does not exist", "err", err)
s.queue.Shutdown()
}
for _, file := range files {
filename := file.Name()
if strings.HasSuffix(filename, JSONSuffix) {
s.queue.Add(path.Join(s.outboxPath, filename))
continue
}
level.Warn(s.logger).Log("msg", "incorrect suffix", "filename", filename)
}
}
}()
}
// processNextEmail loops over the queue and send email
func (s Sender) processNextEmail() bool {
item, quit := s.queue.Get()
if quit {
return false
}
defer s.queue.Done(item)
path, ok := item.(string)
if !ok {
level.Error(s.logger).Log("msg", "unable to cast queue item into mail.Email")
return true
}
email, err := mail.FromJSON(path)
if err != nil {
level.Error(s.logger).Log("msg", "unable to parse JSON email", "err", err)
return true
}
s.SendMail(email)
if err := os.Remove(path); err != nil {
level.Error(s.logger).Log("msg", "unable to remove the JSON email", "path", path, "err", err)
}
return true
}
// run starts processing the queue
func (s Sender) run() <-chan struct{} {
queueCh := make(chan struct{})
go func() {
for s.processNextEmail() {
}
queueCh <- struct{}{}
}()
return queueCh
}
// Run launches the queue processing and the outbox watcher
// catches `SIGINT` and `SIGTERM` to properly stopped the queue
func (s Sender) Run() {
s.logger.Log("msg", "sender service is running")
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
s.watchOutbox()
queueCh := s.run()
<-sigCh
s.logger.Log("msg", "stop signal received, stopping e-mail queue...")
s.queue.Shutdown()
<-queueCh
s.logger.Log("msg", "sender service stopped successfully")
}