195 lines
		
	
	
		
			4.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			195 lines
		
	
	
		
			4.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package services
 | |
| 
 | |
| import (
 | |
| 	"encoding/json"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	cfg "mailsrv/config"
 | |
| 	"mailsrv/mail"
 | |
| 	"mailsrv/runtime"
 | |
| 	"os"
 | |
| 	"os/signal"
 | |
| 	"path"
 | |
| 	"strings"
 | |
| 	"syscall"
 | |
| 	"time"
 | |
| 
 | |
| 	"net/http"
 | |
| 	"net/smtp"
 | |
| 
 | |
| 	"github.com/rs/zerolog/log"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	TickerInterval time.Duration = 10 * time.Second
 | |
| 	JSONSuffix     string        = ".json"
 | |
| 	ErrorSuffix    string        = ".err"
 | |
| )
 | |
| 
 | |
| type Sender struct {
 | |
| 	smtpConfig cfg.SMTPConfig
 | |
| 	// fetch this directory to collect `.json` e-mail format
 | |
| 	outboxPath string
 | |
| 	queue      *runtime.Queue
 | |
| }
 | |
| 
 | |
| func NewSender(config cfg.SMTPConfig, outboxPath string) Sender {
 | |
| 	return Sender{
 | |
| 		smtpConfig: config,
 | |
| 		outboxPath: outboxPath,
 | |
| 		queue:      runtime.NewQueue(),
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (s Sender) SendMail(email mail.Email) error {
 | |
| 	auth := smtp.PlainAuth("", s.smtpConfig.User, s.smtpConfig.Password, s.smtpConfig.Url)
 | |
| 	log.Debug().Msg("SMTP authentication succeed")
 | |
| 
 | |
| 	if err := smtp.SendMail(s.smtpConfig.GetFullUrl(), auth, email.Sender, email.GetReceivers(), email.Generate()); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	log.Debug().Msg("mail send successfully")
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (s Sender) mailHandler(w http.ResponseWriter, r *http.Request) {
 | |
| 	content, err := io.ReadAll(r.Body)
 | |
| 	if err != nil {
 | |
| 		log.Err(err).Msg("unable to read request body")
 | |
| 		w.WriteHeader(http.StatusInternalServerError)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	var mail mail.Email
 | |
| 	if err := json.Unmarshal(content, &mail); err != nil {
 | |
| 		log.Err(err).Msg("unable to deserialized request body into mail")
 | |
| 		w.WriteHeader(http.StatusInternalServerError)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	s.queue.Add(mail)
 | |
| 	w.WriteHeader(http.StatusOK)
 | |
| }
 | |
| 
 | |
| func (s Sender) runHTTPserver() {
 | |
| 
 | |
| 	mux := http.NewServeMux()
 | |
| 	mux.HandleFunc("/mail", s.mailHandler)
 | |
| 
 | |
| 	if err := http.ListenAndServe(":1212", mux); err != nil {
 | |
| 		log.Err(err).Msg("http server stops listening")
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // watchOutbox reads the `outbox` directory every `TickInterval` and put JSON format e-mail in the queue.
 | |
| func (s Sender) watchOutbox() {
 | |
| 	log.Info().Str("outbox", s.outboxPath).Msg("start watching outbox directory")
 | |
| 
 | |
| 	ticker := time.NewTicker(TickerInterval)
 | |
| 
 | |
| 	go func() {
 | |
| 		for range ticker.C {
 | |
| 			log.Debug().Str("action", "retrieving json e-mail format...").Str("path", s.outboxPath)
 | |
| 
 | |
| 			files, err := os.ReadDir(s.outboxPath)
 | |
| 			if err != nil && !os.IsExist(err) {
 | |
| 				log.Err(err).Msg("outbox directory does not exist")
 | |
| 				s.queue.Shutdown()
 | |
| 			}
 | |
| 
 | |
| 			for _, file := range files {
 | |
| 				filename := file.Name()
 | |
| 				if !strings.HasSuffix(filename, JSONSuffix) {
 | |
| 					log.Debug().Str("filename", filename).Msg("incorrect suffix")
 | |
| 					continue
 | |
| 				}
 | |
| 
 | |
| 				path := path.Join(s.outboxPath, filename)
 | |
| 				email, err := mail.FromJSON(path)
 | |
| 
 | |
| 				if err != nil {
 | |
| 					log.Err(err).Str("path", path).Msg("unable to parse JSON email")
 | |
| 
 | |
| 					// if JSON parsing failed the `path` is renamed with an error suffix to not watch it again
 | |
| 					newPath := fmt.Sprintf("%s%s", path, ErrorSuffix)
 | |
| 					if err := os.Rename(path, newPath); err != nil {
 | |
| 						log.Err(err).Str("path", path).Str("new path", newPath).Msg("unable to rename bad JSON email path")
 | |
| 					}
 | |
| 
 | |
| 					continue
 | |
| 				}
 | |
| 
 | |
| 				email.Path = path
 | |
| 				s.queue.Add(email)
 | |
| 			}
 | |
| 		}
 | |
| 	}()
 | |
| }
 | |
| 
 | |
| // processNextEmail iterates over the queue and send email.
 | |
| func (s Sender) processNextEmail() bool {
 | |
| 	item, quit := s.queue.Get()
 | |
| 	if quit {
 | |
| 		return false
 | |
| 	}
 | |
| 	defer s.queue.Done(item)
 | |
| 
 | |
| 	email, ok := item.(mail.Email)
 | |
| 	if !ok {
 | |
| 		log.Error().Any("item", item).Msg("unable to cast queue item into mail.Email")
 | |
| 		return true
 | |
| 	}
 | |
| 
 | |
| 	if err := s.SendMail(email); err != nil {
 | |
| 		log.Err(err).Msg("unable to send the email")
 | |
| 	}
 | |
| 
 | |
| 	if path := email.Path; path != "" {
 | |
| 		if err := os.Remove(path); err != nil {
 | |
| 			// this is a fatal error, can't send same e-mail indefinitely
 | |
| 			if !os.IsExist(err) {
 | |
| 				log.Err(err).Str("path", path).Msg("unable to remove the JSON email")
 | |
| 				s.queue.Shutdown()
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| // run starts processing the queue.
 | |
| func (s Sender) run() <-chan struct{} {
 | |
| 	chQueue := make(chan struct{})
 | |
| 	go func() {
 | |
| 		for s.processNextEmail() {
 | |
| 		}
 | |
| 		chQueue <- struct{}{}
 | |
| 	}()
 | |
| 	return chQueue
 | |
| }
 | |
| 
 | |
| // Run launches the queue processing and the outbox watcher.
 | |
| // It catches `SIGINT` and `SIGTERM` to properly stopped the queue.
 | |
| func (s Sender) Run() {
 | |
| 	log.Info().Msg("sender service is running")
 | |
| 
 | |
| 	chSignal := make(chan os.Signal, 1)
 | |
| 	signal.Notify(chSignal, os.Interrupt, syscall.SIGTERM)
 | |
| 
 | |
| 	s.watchOutbox()
 | |
| 	chQueue := s.run()
 | |
| 
 | |
| 	go s.runHTTPserver()
 | |
| 
 | |
| 	select {
 | |
| 	case <-chSignal:
 | |
| 		log.Warn().Msg("stop signal received, stopping e-mail queue...")
 | |
| 		s.queue.Shutdown()
 | |
| 	case <-chQueue:
 | |
| 		log.Info().Msg("e-mail queue stopped successfully")
 | |
| 	}
 | |
| 
 | |
| 	log.Info().Msg("sender service stopped successfully")
 | |
| }
 |