package runtime import ( "sync" ) type Queue struct { // queue is an ordered list of items that require processing queue []interface{} // waiting are items that require processing waiting map[interface{}]struct{} // processing are items that are curently processed processing map[interface{}]struct{} cond *sync.Cond shuttingDown bool } func NewQueue() *Queue { return &Queue{ waiting: make(map[interface{}]struct{}), processing: make(map[interface{}]struct{}), cond: sync.NewCond(&sync.Mutex{}), } } // Len returns the length of the queue func (q *Queue) Len() int { q.cond.L.Lock() defer q.cond.L.Unlock() return len(q.queue) } // Get returns the first item in the queue. // Blocks until there is some items in queue. func (q *Queue) Get() (interface{}, bool) { q.cond.L.Lock() defer q.cond.L.Unlock() // wait for items in the queue if len(q.queue) == 0 || q.shuttingDown { q.cond.Wait() } if q.shuttingDown { return nil, true } // removes and get first item item := q.queue[0] q.queue[0] = nil q.queue = q.queue[1:] // put item in processing q.processing[item] = struct{}{} // remove from waiting delete(q.waiting, item) return item, false } // Add adds an item in the queue func (q *Queue) Add(item interface{}) { q.cond.L.Lock() defer q.cond.L.Unlock() // item already in queue if _, ok := q.waiting[item]; ok { return } q.waiting[item] = struct{}{} // wait end of processing to add in queue if _, ok := q.processing[item]; ok { return } q.queue = append(q.queue, item) q.cond.Signal() } // Done marks an item as processed // Requeue it if marked as waiting func (q *Queue) Done(item interface{}) { q.cond.L.Lock() defer q.cond.L.Unlock() delete(q.processing, item) if _, ok := q.waiting[item]; ok { q.queue = append(q.queue, item) } q.cond.Signal() } // Shutdown will prevent accepting new items in the queue // and makes workers to stop func (q *Queue) Shutdown() { q.cond.L.Lock() defer q.cond.L.Unlock() q.shuttingDown = true q.cond.Broadcast() }