unique_queue.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. package sync
  2. import (
  3. "github.com/Unknwon/com"
  4. )
  5. // UniqueQueue is a queue which guarantees only one instance of same
  6. // identity is in the line. Instances with same identity will be
  7. // discarded if there is already one in the line.
  8. //
  9. // This queue is particularly useful for preventing duplicated task
  10. // of same purpose.
  11. type UniqueQueue struct {
  12. table *StatusTable
  13. queue chan string
  14. }
  15. // NewUniqueQueue initializes and returns a new UniqueQueue object.
  16. func NewUniqueQueue(queueLength int) *UniqueQueue {
  17. if queueLength <= 0 {
  18. queueLength = 100
  19. }
  20. return &UniqueQueue{
  21. table: NewStatusTable(),
  22. queue: make(chan string, queueLength),
  23. }
  24. }
  25. // Queue returns channel of queue for retrieving instances.
  26. func (q *UniqueQueue) Queue() <-chan string {
  27. return q.queue
  28. }
  29. // Exist returns true if there is an instance with given indentity
  30. // exists in the queue.
  31. func (q *UniqueQueue) Exist(id interface{}) bool {
  32. return q.table.IsRunning(com.ToStr(id))
  33. }
  34. // AddFunc adds new instance to the queue with a custom runnable function,
  35. // the queue is blocked until the function exits.
  36. func (q *UniqueQueue) AddFunc(id interface{}, fn func()) {
  37. if q.Exist(id) {
  38. return
  39. }
  40. idStr := com.ToStr(id)
  41. q.table.Lock()
  42. q.table.pool[idStr] = true
  43. if fn != nil {
  44. fn()
  45. }
  46. q.table.Unlock()
  47. q.queue <- idStr
  48. }
  49. // Add adds new instance to the queue.
  50. func (q *UniqueQueue) Add(id interface{}) {
  51. q.AddFunc(id, nil)
  52. }
  53. // Remove removes instance from the queue.
  54. func (q *UniqueQueue) Remove(id interface{}) {
  55. q.table.Stop(com.ToStr(id))
  56. }