108 lines
		
	
	
		
			2.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			108 lines
		
	
	
		
			2.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package runtime
 | |
| 
 | |
| import (
 | |
| 	"sync"
 | |
| )
 | |
| 
 | |
| type Queue struct {
 | |
| 	// queue is an ordered list of items that require processing
 | |
| 	queue []interface{}
 | |
| 
 | |
| 	// waiting are items that require processing
 | |
| 	waiting map[interface{}]struct{}
 | |
| 
 | |
| 	// processing are items that are curently processed
 | |
| 	processing map[interface{}]struct{}
 | |
| 
 | |
| 	cond         *sync.Cond
 | |
| 	shuttingDown bool
 | |
| }
 | |
| 
 | |
| func NewQueue() *Queue {
 | |
| 	return &Queue{
 | |
| 		waiting:    make(map[interface{}]struct{}),
 | |
| 		processing: make(map[interface{}]struct{}),
 | |
| 		cond:       sync.NewCond(&sync.Mutex{}),
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Len returns the length of the queue
 | |
| func (q *Queue) Len() int {
 | |
| 	q.cond.L.Lock()
 | |
| 	defer q.cond.L.Unlock()
 | |
| 	return len(q.queue)
 | |
| }
 | |
| 
 | |
| // Get returns the first item in the queue.
 | |
| // Blocks until there is some items in queue.
 | |
| func (q *Queue) Get() (interface{}, bool) {
 | |
| 	q.cond.L.Lock()
 | |
| 	defer q.cond.L.Unlock()
 | |
| 
 | |
| 	// wait for items in the queue
 | |
| 	if len(q.queue) == 0 || q.shuttingDown {
 | |
| 		q.cond.Wait()
 | |
| 	}
 | |
| 
 | |
| 	if q.shuttingDown {
 | |
| 		return nil, true
 | |
| 	}
 | |
| 
 | |
| 	// removes and get first item
 | |
| 	item := q.queue[0]
 | |
| 	q.queue[0] = nil
 | |
| 	q.queue = q.queue[1:]
 | |
| 
 | |
| 	// put item in processing
 | |
| 	q.processing[item] = struct{}{}
 | |
| 	// remove from waiting
 | |
| 	delete(q.waiting, item)
 | |
| 
 | |
| 	return item, false
 | |
| }
 | |
| 
 | |
| // Add adds an item in the queue
 | |
| func (q *Queue) Add(item interface{}) {
 | |
| 	q.cond.L.Lock()
 | |
| 	defer q.cond.L.Unlock()
 | |
| 
 | |
| 	// item already in queue
 | |
| 	if _, ok := q.waiting[item]; ok {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	q.waiting[item] = struct{}{}
 | |
| 
 | |
| 	// wait end of processing to add in queue
 | |
| 	if _, ok := q.processing[item]; ok {
 | |
| 		return
 | |
| 	}
 | |
| 	q.queue = append(q.queue, item)
 | |
| 
 | |
| 	q.cond.Signal()
 | |
| }
 | |
| 
 | |
| // Done marks an item as processed
 | |
| // Requeue it if marked as waiting
 | |
| func (q *Queue) Done(item interface{}) {
 | |
| 	q.cond.L.Lock()
 | |
| 	defer q.cond.L.Unlock()
 | |
| 
 | |
| 	delete(q.processing, item)
 | |
| 
 | |
| 	if _, ok := q.waiting[item]; ok {
 | |
| 		q.queue = append(q.queue, item)
 | |
| 	}
 | |
| 	q.cond.Signal()
 | |
| }
 | |
| 
 | |
| // Shutdown will prevent accepting new items in the queue
 | |
| // and makes workers to stop
 | |
| func (q *Queue) Shutdown() {
 | |
| 	q.cond.L.Lock()
 | |
| 	defer q.cond.L.Unlock()
 | |
| 
 | |
| 	q.shuttingDown = true
 | |
| 	q.cond.Broadcast()
 | |
| }
 |