127 lines
2.7 KiB
Go
127 lines
2.7 KiB
Go
package services
|
|
|
|
import (
|
|
"context"
|
|
cfg "mailsrv/config"
|
|
"mailsrv/mail"
|
|
"mailsrv/runtime"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
|
|
"net/smtp"
|
|
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
type Sender struct {
|
|
auth smtp.Auth
|
|
smtpURL string
|
|
|
|
outboxPath string
|
|
|
|
queue *runtime.Queue
|
|
}
|
|
|
|
func NewSender(config cfg.SMTPConfig, outboxPath string) Sender {
|
|
return Sender{
|
|
auth: smtp.PlainAuth("", config.User, config.Password, config.URL),
|
|
smtpURL: config.GetFullURL(),
|
|
outboxPath: outboxPath,
|
|
queue: runtime.NewQueue(),
|
|
}
|
|
}
|
|
|
|
func (s Sender) SendMail(email *mail.Email) error {
|
|
if err := smtp.SendMail(s.smtpURL, s.auth, email.Sender, email.GetReceivers(), email.Generate()); err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Debug().Msg("mail send successfully")
|
|
return nil
|
|
}
|
|
|
|
// processNextEmail iterates over the queue and send email.
|
|
func (s Sender) processNextEmail() bool {
|
|
item, quit := s.queue.Get()
|
|
if quit {
|
|
return false
|
|
}
|
|
defer s.queue.Done(item)
|
|
|
|
email, ok := item.(mail.Email)
|
|
if !ok {
|
|
log.Error().Any("item", item).Msg("unable to cast queue item into mail.Email")
|
|
return true
|
|
}
|
|
|
|
if err := s.SendMail(&email); err != nil {
|
|
log.Err(err).Msg("unable to send the email")
|
|
}
|
|
|
|
if path := email.Path; path != "" {
|
|
if err := os.Remove(path); err != nil {
|
|
// this is a fatal error, can't send same e-mail indefinitely
|
|
if !os.IsExist(err) {
|
|
log.Err(err).Str("path", path).Msg("unable to remove the JSON email")
|
|
s.queue.Shutdown()
|
|
}
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// run starts processing the queue.
|
|
func (s Sender) run() <-chan struct{} {
|
|
chQueue := make(chan struct{})
|
|
go func() {
|
|
for s.processNextEmail() {
|
|
}
|
|
chQueue <- struct{}{}
|
|
}()
|
|
return chQueue
|
|
}
|
|
|
|
// Run launches the queue processing, the outbox watcher and the HTTP server.
|
|
// It catches `SIGINT` and `SIGTERM` to properly stopped the queue and the services.
|
|
func (s Sender) Run() {
|
|
ctx, fnCancel := context.WithCancel(context.Background())
|
|
|
|
chSignal := make(chan os.Signal, 1)
|
|
signal.Notify(chSignal, os.Interrupt, syscall.SIGTERM)
|
|
|
|
chQueue := s.run()
|
|
|
|
server := NewServer(ctx, "1212", s.queue)
|
|
server.Serve()
|
|
|
|
watcher := NewDirectoryWatch(ctx, s.outboxPath, s.queue)
|
|
watcher.Watch()
|
|
|
|
log.Info().Msg("sender service is running...")
|
|
|
|
select {
|
|
case <-chSignal:
|
|
log.Warn().Msg("stop signal received, stopping...")
|
|
fnCancel()
|
|
case <-watcher.Done():
|
|
log.Warn().Msg("watcher is done, stopping...")
|
|
fnCancel()
|
|
case <-server.Done():
|
|
log.Warn().Msg("server is done, stopping...")
|
|
fnCancel()
|
|
}
|
|
|
|
<-server.Done()
|
|
log.Info().Msg("http server stopped successfully")
|
|
|
|
<-watcher.Done()
|
|
log.Info().Msg("watcher stopped successfully")
|
|
|
|
s.queue.Shutdown()
|
|
<-chQueue
|
|
|
|
log.Info().Msg("mailsrv stopped gracefully")
|
|
}
|