Heap 是一种数据结构,其中包含一个特殊的根节点,且每个节点的值都不小于(或不大于)其所有子节点的值。这种数据结构常用于实现优先队列。


Heap 可以通过一个数组来实现,这个数组满足以下条件:















优点和缺点 优点

简单高效:优先级队列的实现较为简单,查找和插入等操作都可以在 O(log(n))O(log(n))O(log(n)) 的时间复杂度内完成,所以在实现简单的情况下,可以极大提高程序性能。





package go_pool_priority import ( "container/heap" "errors" "sync" "github.com/mitchellh/copystructure" ) // ErrEmpty is returned for queues with no items var ErrEmpty = errors.New("queue is empty") // ErrDuplicateItem is returned when the queue attmepts to push an item to a key that // already exists. The queue does not attempt to update, instead returns this // error. If an Item needs to be updated or replaced, pop the item first. var ErrDuplicateItem = errors.New("duplicate item") // New initializes the internal data structures and returns a new // PriorityQueue func NewPriorityQueue() *PriorityQueue { pq := PriorityQueue{ data: make(queue, 0), dataMap: make(map[string]*Item), } heap.Init(&pq.data) return &pq } // PriorityQueue facilitates queue of Items, providing Push, Pop, and // PopByKey convenience methods. The ordering (priority) is an int64 value // with the smallest value is the highest priority. PriorityQueue maintains both // an internal slice for the queue as well as a map of the same items with their // keys as the index. This enables users to find specific items by key. The map // must be kept in sync with the data slice. // See https://golang.org/pkg/container/heap/#example__priorityQueue type PriorityQueue struct { // data is the internal structure that holds the queue, and is operated on by // heap functions data queue // dataMap represents all the items in the queue, with unique indexes, used // for finding specific items. dataMap is kept in sync with the data slice dataMap map[string]*Item // lock is a read/write mutex, and used to facilitate read/write locks on the // data and dataMap fields lock sync.RWMutex } // queue is the internal data structure used to satisfy heap.Interface. This // prevents users from calling Pop and Push heap methods directly type queue []*Item // Item is something managed in the priority queue type Item struct { // Key is a unique string used to identify items in the internal data map Key string // Value is an unspecified type that implementations can use to store // information Value interface{} // Priority determines ordering in the queue, with the lowest value being the // highest priority Priority int64 // index is an internal value used by the heap package, and should not be // modified by any consumer of the priority queue index int } // Len returns the count of items in the Priority Queue func (pq *PriorityQueue) Len() int { pq.lock.RLock() defer pq.lock.RUnlock() return pq.data.Len() } // Pop pops the highest priority item from the queue. This is a // wrapper/convenience method that calls heap.Pop, so consumers do not need to // invoke heap functions directly func (pq *PriorityQueue) Pop() (*Item, error) { pq.lock.Lock() defer pq.lock.Unlock() if pq.data.Len() == 0 { return nil, ErrEmpty } item := heap.Pop(&pq.data).(*Item) delete(pq.dataMap, item.Key) return item, nil } // Push pushes an item on to the queue. This is a wrapper/convenience // method that calls heap.Push, so consumers do not need to invoke heap // functions directly. Items must have unique Keys, and Items in the queue // cannot be updated. To modify an Item, users must first remove it and re-push // it after modifications func (pq *PriorityQueue) Push(i *Item) error { if i == nil || i.Key == "" { return errors.New("error adding item: Item Key is required") } pq.lock.Lock() defer pq.lock.Unlock() if _, ok := pq.dataMap[i.Key]; ok { return ErrDuplicateItem } // Copy the item value(s) so that modifications to the source item does not // affect the item on the queue clone, err := copystructure.Copy(i) if err != nil { return err } pq.dataMap[i.Key] = clone.(*Item) heap.Push(&pq.data, clone) return nil } // PopByKey searches the queue for an item with the given key and removes it // from the queue if found. Returns nil if not found. This method must fix the // queue after removing any key. func (pq *PriorityQueue) PopByKey(key string) (*Item, error) { pq.lock.Lock() defer pq.lock.Unlock() item, ok := pq.dataMap[key] if !ok { return nil, nil } // Remove the item the heap and delete it from the dataMap itemRaw := heap.Remove(&pq.data, item.index) delete(pq.dataMap, key) if itemRaw != nil { if i, ok := itemRaw.(*Item); ok { return i, nil } } return nil, nil } // Len returns the number of items in the queue data structure. Do not use this // method directly on the queue, use PriorityQueue.Len() instead. func (q queue) Len() int { return len(q) } // Less returns whether the Item with index i should sort before the Item with // index j in the queue. This method is used by the queue to determine priority // internally; the Item with the lower value wins. (priority zero is higher // priority than 1). The priority of Items with equal values is undetermined. func (q queue) Less(i, j int) bool { return q[i].Priority < q[j].Priority } // Swap swaps things in-place; part of sort.Interface func (q queue) Swap(i, j int) { q[i], q[j] = q[j], q[i] q[i].index = i q[j].index = j } // Push is used by heap.Interface to push items onto the heap. This method is // invoked by container/heap, and should not be used directly. // See: https://golang.org/pkg/container/heap/#Interface func (q *queue) Push(x interface{}) { n := len(*q) item := x.(*Item) item.index = n *q = append(*q, item) } // Pop is used by heap.Interface to pop items off of the heap. This method is // invoked by container/heap, and should not be used directly. // See: https://golang.org/pkg/container/heap/#Interface func (q *queue) Pop() interface{} { old := *q n := len(old) item := old[n-1] old[n-1] = nil // avoid memory leak item.index = -1 // for safety *q = old[0 : n-1] return item }

- 内部使用container/heap中的Interface接口实现堆结构;

- 提供了Push、Pop和PopByKey等一系列方法;

- 使用一个内部slice和一个以Key为索引的映射map来维护队列元素;

- 根据元素的Priority值进行优先级排序,Priority值越小表示优先级越高;

- 在Push时需要保证Key值唯一;

- PopByKey方法可以根据Key查找并移除对应的元素。








对于消费者来说,他需要获取优先级最高的任务进行消费。使用heap pop 取出优先级最高的任务即可



type PriorityQueueTask struct { mLock sync.Mutex // 互斥锁,queues和priorities并发操作时使用,当然针对当前读多写少的场景,也可以使用读写锁 pushChan chan *task // 推送任务管道 pq *PriorityQueue }


type task struct { priority int64 // 任务的优先级 value interface{} key string } 初始化优先级队列对象

在初始化对象时,需要先通过 NewPriorityQueue() 函数创建一个空的 PriorityQueue,然后再创建一个 PriorityQueueTask 对象,并将刚刚创建的 PriorityQueue 赋值给该对象的 pq 属性。同时,还要创建一个用于接收推送任务的管道,用于在生产者推送任务时,将新任务添加到队列中。

func NewPriorityQueueTask() *PriorityQueueTask { pq := &PriorityQueueTask{ pushChan: make(chan *task, 100), pq: NewPriorityQueue(), } // 监听pushChan go pq.listenPushChan() return pq } func (pq *PriorityQueueTask) listenPushChan() { for { select { case taskEle := <-pq.pushChan: pq.mLock.Lock() pq.pq.Push(&Item{Key: taskEle.key, Priority: taskEle.priority, Value: taskEle.value}) pq.mLock.Unlock() } } } 生产者推送任务

生产者向推送任务管道中推送新任务时,实际上是将一个 task 结构体实例发送到了管道中。在 task 结构体中,priority 属性表示这个任务的优先级,value 属性表示这个任务的值,key 属性表示这个任务的键。

// 插入work func (pq *PriorityQueueTask) Push(priority int64, value interface{}, key string) {     pq.pushChan <- &task{         value:    value,         priority: priority,         key:      key,     } } 消费者消费队列


// Consume 消费者轮询获取最高优先级的任务 func (pq *PriorityQueueTask) Consume() {     for {         task := pq.Pop()         if task == nil {             // 未获取到任务,则继续轮询             time.Sleep(time.Millisecond)             continue         }         // 获取到了任务,就执行任务         fmt.Println("推送任务的编号为:", task.Value)         fmt.Println("推送的任务优先级为:", task.Priority)         fmt.Println("============")     } } 完整代码 package go_pool_priority import ( "fmt" "sync" "time" ) type PriorityQueueTask struct { mLock sync.Mutex // 互斥锁,queues和priorities并发操作时使用,当然针对当前读多写少的场景,也可以使用读写锁 pushChan chan *task // 推送任务管道 pq *PriorityQueue } type task struct { priority int64 // 任务的优先级 value interface{} key string } func NewPriorityQueueTask() *PriorityQueueTask { pq := &PriorityQueueTask{ pushChan: make(chan *task, 100), pq: NewPriorityQueue(), } // 监听pushChan go pq.listenPushChan() return pq } func (pq *PriorityQueueTask) listenPushChan() { for { select { case taskEle := <-pq.pushChan: pq.mLock.Lock() pq.pq.Push(&Item{Key: taskEle.key, Priority: taskEle.priority, Value: taskEle.value}) pq.mLock.Unlock() } } } // 插入work func (pq *PriorityQueueTask) Push(priority int64, value interface{}, key string) { pq.pushChan <- &task{ value: value, priority: priority, key: key, } } // Pop 取出最高优先级队列中的一个任务 func (pq *PriorityQueueTask) Pop() *Item { pq.mLock.Lock() defer pq.mLock.Unlock() item, err := pq.pq.Pop() if err != nil { return nil } // 如果所有队列都没有任务,则返回null return item } // Consume 消费者轮询获取最高优先级的任务 func (pq *PriorityQueueTask) Consume() { for { task := pq.Pop() if task == nil { // 未获取到任务,则继续轮询 time.Sleep(time.Millisecond) continue } // 获取到了任务,就执行任务 fmt.Println("推送任务的编号为:", task.Value) fmt.Println("推送的任务优先级为:", task.Priority) fmt.Println("============") } } 测试用例 func TestQueue(t *testing.T) { defer func() { if err := recover(); err != nil { fmt.Println(err) } }() pq := NewPriorityQueueTask() // 我们在这里,随机生成一些优先级任务 for i := 0; i < 100; i++ { a := rand.Intn(1000) go func(a int64) { pq.Push(a, a, strconv.Itoa(int(a))) }(int64(a)) } // 这里会阻塞,消费者会轮询查询任务队列 pq.Consume() }


