mailsrv/runtime/queue.go

108 lines
2.0 KiB
Go

package runtime
import (
"sync"
)
type Queue struct {
// queue is an ordered list of items that require processing
queue []interface{}
// waiting are items that require processing
waiting map[interface{}]struct{}
// processing are items that are curently processed
processing map[interface{}]struct{}
cond *sync.Cond
shuttingDown bool
}
func NewQueue() *Queue {
return &Queue{
waiting: make(map[interface{}]struct{}),
processing: make(map[interface{}]struct{}),
cond: sync.NewCond(&sync.Mutex{}),
}
}
// Len returns the length of the queue
func (q *Queue) Len() int {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return len(q.queue)
}
// Get returns the first item in the queue.
// Blocks until there is some items in queue.
func (q *Queue) Get() (interface{}, bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
// wait for items in the queue
if len(q.queue) == 0 || q.shuttingDown {
q.cond.Wait()
}
if q.shuttingDown {
return nil, true
}
// removes and get first item
item := q.queue[0]
q.queue[0] = nil
q.queue = q.queue[1:]
// put item in processing
q.processing[item] = struct{}{}
// remove from waiting
delete(q.waiting, item)
return item, false
}
// Add adds an item in the queue
func (q *Queue) Add(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
// item already in queue
if _, ok := q.waiting[item]; ok {
return
}
q.waiting[item] = struct{}{}
// wait end of processing to add in queue
if _, ok := q.processing[item]; ok {
return
}
q.queue = append(q.queue, item)
q.cond.Signal()
}
// Done marks an item as processed
// Requeue it if marked as waiting
func (q *Queue) Done(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
delete(q.processing, item)
if _, ok := q.waiting[item]; ok {
q.queue = append(q.queue, item)
}
q.cond.Signal()
}
// Shutdown will prevent accepting new items in the queue
// and makes workers to stop
func (q *Queue) Shutdown() {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.shuttingDown = true
q.cond.Broadcast()
}