From 94ad3fb92ead72e916180637f017ee47e02928ec Mon Sep 17 00:00:00 2001 From: landrigun Date: Sat, 15 Oct 2022 20:20:17 +0000 Subject: [PATCH] impl sender watcher and queue --- Makefile | 4 ++ README.md | 15 +++++- mail/mail.go | 11 +++-- main.go | 7 +-- runtime/queue.go | 107 ++++++++++++++++++++++++++++++++++++++++++ runtime/queue_test.go | 23 +++++++++ services/sender.go | 86 ++++++++++++++++++++++++++++++--- 7 files changed, 235 insertions(+), 18 deletions(-) create mode 100644 runtime/queue.go create mode 100644 runtime/queue_test.go diff --git a/Makefile b/Makefile index c7955bc..7e2589f 100644 --- a/Makefile +++ b/Makefile @@ -16,3 +16,7 @@ vet: fmt build: vet go build -o mailsrv .PHONY:build + +test: + go test ./... +.PHONY:test diff --git a/README.md b/README.md index fd07ad8..1c5ce35 100644 --- a/README.md +++ b/README.md @@ -23,4 +23,17 @@ outbox_path = "" # directory used to retrieve `.json` e-mail format to ## Run ```bash ./mailsrv myconfig.ini -``` \ No newline at end of file +``` + +## How to send a mail ? +* create a JSON file like: +```json +{ + "sender":"", + "receivers": ["","", ..., ""], + "subject": "", + "content": "" +} +``` +* put the JSON file in the `outbox` directory define in the `.ini` file +**NOTE**: HTML is interpreted for the e-mail content diff --git a/mail/mail.go b/mail/mail.go index 768edef..37e0e93 100644 --- a/mail/mail.go +++ b/mail/mail.go @@ -23,18 +23,19 @@ func NewEmail(sender string, receivers []string, subject, content string) Email } } -func FromJSON(path string) error { +func FromJSON(path string) (Email, error) { + var mail Email + content, err := ioutil.ReadFile(path) if err != nil { - return err + return mail, err } - var mail Email if err := json.Unmarshal(content, &mail); err != nil { - return err + return mail, err } - return nil + return mail, nil } func (e Email) Generate() []byte { diff --git a/main.go b/main.go index 55e3c4e..586f0d9 100644 --- a/main.go +++ b/main.go @@ -6,7 +6,6 @@ import ( "os" cfg "mailsrv/config" - "mailsrv/mail" srv "mailsrv/services" "github.com/go-kit/kit/log" @@ -100,10 +99,6 @@ func main() { return } - logger.Log("action", "send email test") - content := fmt.Sprintf("Hi!

This is an e-mail test, please do not reply.

Thegux Administrator
%s", `
visit the website: thegux.fr`) - email := mail.NewEmail(config.User, []string{"example@example.com"}, "e-mail test", content) - sender := srv.NewSender(logger, config, outboxPath) - sender.SendMail(email) + sender.Run() } diff --git a/runtime/queue.go b/runtime/queue.go new file mode 100644 index 0000000..a0351e1 --- /dev/null +++ b/runtime/queue.go @@ -0,0 +1,107 @@ +package runtime + +import ( + "sync" +) + +type Queue struct { + // queue is an ordered list of items that require processing + queue []interface{} + + // waiting are items that require processing + waiting map[interface{}]struct{} + + // processing are items that are curently processed + processing map[interface{}]struct{} + + cond *sync.Cond + shuttingDown bool +} + +func NewQueue() *Queue { + return &Queue{ + waiting: make(map[interface{}]struct{}), + processing: make(map[interface{}]struct{}), + cond: sync.NewCond(&sync.Mutex{}), + } +} + +// Len returns the length of the queue +func (q *Queue) Len() int { + q.cond.L.Lock() + defer q.cond.L.Unlock() + return len(q.queue) +} + +// Get returns the first item in the queue. +// Blocks until there is some items in queue. +func (q *Queue) Get() (interface{}, bool) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + + // wait for items in the queue + if len(q.queue) == 0 || q.shuttingDown { + q.cond.Wait() + } + + if q.shuttingDown { + return nil, true + } + + // removes and get first item + item := q.queue[0] + q.queue[0] = nil + q.queue = q.queue[1:] + + // put item in processing + q.processing[item] = struct{}{} + // remove from waiting + delete(q.waiting, item) + + return item, false +} + +// Add adds an item in the queue +func (q *Queue) Add(item interface{}) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + + // item already in queue + if _, ok := q.waiting[item]; ok { + return + } + + q.waiting[item] = struct{}{} + + // wait end of processing to add in queue + if _, ok := q.processing[item]; ok { + return + } + q.queue = append(q.queue, item) + + q.cond.Signal() +} + +// Done marks an item as processed +// Requeue it if marked as waiting +func (q *Queue) Done(item interface{}) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + + delete(q.processing, item) + + if _, ok := q.waiting[item]; ok { + q.queue = append(q.queue, item) + } + q.cond.Signal() +} + +// Shutdown will prevent accepting new items in the queue +// and makes workers to stop +func (q *Queue) Shutdown() { + q.cond.L.Lock() + defer q.cond.L.Unlock() + + q.shuttingDown = true + q.cond.Broadcast() +} diff --git a/runtime/queue_test.go b/runtime/queue_test.go new file mode 100644 index 0000000..e421fe3 --- /dev/null +++ b/runtime/queue_test.go @@ -0,0 +1,23 @@ +package runtime + +import ( + "testing" +) + +func TestQueue(t *testing.T) { + queue := NewQueue() + + paths := []string{ + "outbox/tutu.json", + "outbox/popo.json", + "outbox/tutu.json", + } + + for _, path := range paths { + queue.Add(path) + } + + if queue.Len() == 3 { + t.Fail() + } +} diff --git a/services/sender.go b/services/sender.go index d1bb035..5029ead 100644 --- a/services/sender.go +++ b/services/sender.go @@ -3,33 +3,45 @@ package services import ( cfg "mailsrv/config" "mailsrv/mail" + "mailsrv/runtime" + "os" + "path" + "strings" + "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "net/smtp" ) +const ( + TickerInterval time.Duration = 10 * time.Second + JSONSuffix string = ".json" +) + type Sender struct { - SMTPConfig cfg.SMTPConfig + smtpConfig cfg.SMTPConfig logger log.Logger // fetch this directory to collect `.json` e-mail format - OutboxPath string + outboxPath string + queue *runtime.Queue } func NewSender(logger log.Logger, config cfg.SMTPConfig, outboxPath string) Sender { logger = log.With(logger, "actor", "sender") return Sender{ - SMTPConfig: config, + smtpConfig: config, logger: logger, - OutboxPath: outboxPath, + outboxPath: outboxPath, + queue: runtime.NewQueue(), } } func (s Sender) SendMail(email mail.Email) error { - auth := smtp.PlainAuth("", s.SMTPConfig.User, s.SMTPConfig.Password, s.SMTPConfig.Url) + auth := smtp.PlainAuth("", s.smtpConfig.User, s.smtpConfig.Password, s.smtpConfig.Url) s.logger.Log("action", "authentication succeed") - if err := smtp.SendMail(s.SMTPConfig.GetFullUrl(), auth, email.Sender, email.Receivers, email.Generate()); err != nil { + if err := smtp.SendMail(s.smtpConfig.GetFullUrl(), auth, email.Sender, email.Receivers, email.Generate()); err != nil { level.Error(s.logger).Log("msg", "error while sending email", "err", err) return err } @@ -37,3 +49,65 @@ func (s Sender) SendMail(email mail.Email) error { s.logger.Log("msg", "mail send successfully") return nil } + +// every `TickerInterval` reads the `outbox` directory and put JSON format e-mail in the queue +func (s Sender) watchOutbox() { + ticker := time.NewTicker(TickerInterval) + go func() { + for _ = range ticker.C { + s.logger.Log("action", "retrieving json e-mail format...", "path", s.outboxPath) + + files, err := os.ReadDir(s.outboxPath) + if err != nil && !os.IsExist(err) { + level.Error(s.logger).Log("msg", "outbox directory does not exist", "err", err) + s.queue.Shutdown() + } + + for _, file := range files { + filename := file.Name() + if strings.HasSuffix(filename, JSONSuffix) { + s.queue.Add(path.Join(s.outboxPath, filename)) + continue + } + level.Warn(s.logger).Log("msg", "incorrect suffix", "filename", filename) + } + } + }() +} + +// loops over the queue and send email +func (s Sender) processNextMail() bool { + item, quit := s.queue.Get() + if quit { + return false + } + defer s.queue.Done(item) + + path, ok := item.(string) + if !ok { + level.Error(s.logger).Log("msg", "unable to cast queue item into mail.Email") + return true + } + + email, err := mail.FromJSON(path) + if err != nil { + level.Error(s.logger).Log("msg", "unable to parse JSON email", "err", err) + return true + } + + s.SendMail(email) + + if err := os.Remove(path); err != nil { + level.Error(s.logger).Log("msg", "unable to remove the JSON email", "path", path, "err", err) + } + + return true +} + +func (s Sender) Run() { + s.logger.Log("msg", "sender service is running") + + s.watchOutbox() + for s.processNextMail() { + } +}