memory.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. package cache
  2. import (
  3. "container/heap"
  4. "container/list"
  5. "errors"
  6. "reflect"
  7. "sync"
  8. "time"
  9. )
  10. type MemoryCache struct {
  11. data map[string]*CacheItem
  12. ll *list.List // 用于实现LRU
  13. pq PriorityQueue // 用于实现TTL
  14. quit chan struct{}
  15. mu sync.Mutex
  16. maxBytes int64
  17. usedBytes int64
  18. }
  19. type CacheItem struct {
  20. Key string
  21. Value string
  22. Expiration int64
  23. Index int
  24. ListEle *list.Element
  25. }
  26. type PriorityQueue []*CacheItem
  27. func (pq PriorityQueue) Len() int { return len(pq) }
  28. func (pq PriorityQueue) Less(i, j int) bool {
  29. return pq[i].Expiration < pq[j].Expiration
  30. }
  31. func (pq PriorityQueue) Swap(i, j int) {
  32. pq[i], pq[j] = pq[j], pq[i]
  33. pq[i].Index = i
  34. pq[j].Index = j
  35. }
  36. func (pq *PriorityQueue) Push(x interface{}) {
  37. item := x.(*CacheItem)
  38. item.Index = len(*pq)
  39. *pq = append(*pq, item)
  40. }
  41. func (pq *PriorityQueue) Pop() interface{} {
  42. old := *pq
  43. n := len(old)
  44. item := old[n-1]
  45. old[n-1] = nil // avoid memory leak
  46. item.Index = -1 // for safety
  47. *pq = old[0 : n-1]
  48. return item
  49. }
  50. func (m *MemoryCache) Get(key string, value interface{}) error {
  51. // 使用反射将存储的值设置到传入的指针变量中
  52. val := reflect.ValueOf(value)
  53. if val.Kind() != reflect.Ptr {
  54. return errors.New("value must be a pointer")
  55. }
  56. //设为空值
  57. val.Elem().Set(reflect.Zero(val.Elem().Type()))
  58. m.mu.Lock()
  59. defer m.mu.Unlock()
  60. if m.data == nil {
  61. return nil
  62. }
  63. if item, ok := m.data[key]; ok {
  64. if item.Expiration < time.Now().UnixNano() {
  65. m.deleteItem(item)
  66. return nil
  67. }
  68. //移动到队列尾部
  69. m.ll.MoveToBack(item.ListEle)
  70. err := DecodeValue(item.Value, value)
  71. if err != nil {
  72. return err
  73. }
  74. }
  75. return nil
  76. }
  77. func (m *MemoryCache) Set(key string, value interface{}, exp int) error {
  78. m.mu.Lock()
  79. defer m.mu.Unlock()
  80. v, err := EncodeValue(value)
  81. if err != nil {
  82. return err
  83. }
  84. //key 所占用的内存
  85. keyBytes := int64(len(key))
  86. //value所占用的内存空间大小
  87. valueBytes := int64(len(v))
  88. //判断是否超过最大内存限制
  89. if m.maxBytes != 0 && m.maxBytes < keyBytes+valueBytes {
  90. return errors.New("exceed maxBytes")
  91. }
  92. m.usedBytes += keyBytes + valueBytes
  93. if m.maxBytes != 0 && m.usedBytes > m.maxBytes {
  94. m.RemoveOldest()
  95. }
  96. if exp <= 0 {
  97. exp = MaxTimeOut
  98. }
  99. expiration := time.Now().Add(time.Duration(exp) * time.Second).UnixNano()
  100. item, exists := m.data[key]
  101. if exists {
  102. item.Value = v
  103. item.Expiration = expiration
  104. heap.Fix(&m.pq, item.Index)
  105. m.ll.MoveToBack(item.ListEle)
  106. } else {
  107. ele := m.ll.PushBack(key)
  108. item = &CacheItem{
  109. Key: key,
  110. Value: v,
  111. Expiration: expiration,
  112. ListEle: ele,
  113. }
  114. m.data[key] = item
  115. heap.Push(&m.pq, item)
  116. }
  117. return nil
  118. }
  119. func (m *MemoryCache) RemoveOldest() {
  120. for m.maxBytes != 0 && m.usedBytes > m.maxBytes {
  121. elem := m.ll.Front()
  122. if elem != nil {
  123. key := elem.Value.(string)
  124. item := m.data[key]
  125. m.deleteItem(item)
  126. }
  127. }
  128. }
  129. // evictExpiredItems removes all expired items from the cache.
  130. func (m *MemoryCache) evictExpiredItems() {
  131. m.mu.Lock()
  132. defer m.mu.Unlock()
  133. now := time.Now().UnixNano()
  134. for m.pq.Len() > 0 {
  135. item := m.pq[0]
  136. if item.Expiration > now {
  137. break
  138. }
  139. m.deleteItem(item)
  140. }
  141. }
  142. // startEviction starts a goroutine that evicts expired items from the cache.
  143. func (m *MemoryCache) startEviction() {
  144. ticker := time.NewTicker(1 * time.Second)
  145. go func() {
  146. for {
  147. select {
  148. case <-ticker.C:
  149. m.evictExpiredItems()
  150. case <-m.quit:
  151. ticker.Stop()
  152. return
  153. }
  154. }
  155. }()
  156. }
  157. // stopEviction 停止定时清理
  158. func (m *MemoryCache) stopEviction() {
  159. close(m.quit)
  160. }
  161. // deleteItem removes a key from the cache.
  162. func (m *MemoryCache) deleteItem(item *CacheItem) {
  163. m.ll.Remove(item.ListEle)
  164. m.usedBytes -= int64(len(item.Key)) + int64(len(item.Value))
  165. heap.Remove(&m.pq, item.Index)
  166. delete(m.data, item.Key)
  167. }
  168. func (m *MemoryCache) Gc() error {
  169. m.mu.Lock()
  170. defer m.mu.Unlock()
  171. m.data = make(map[string]*CacheItem)
  172. m.ll = list.New()
  173. m.pq = make(PriorityQueue, 0)
  174. heap.Init(&m.pq)
  175. m.usedBytes = 0
  176. return nil
  177. }
  178. // NewMemoryCache creates a new MemoryCache.default maxBytes is 0, means no limit.
  179. func NewMemoryCache(maxBytes int64) *MemoryCache {
  180. cache := &MemoryCache{
  181. data: make(map[string]*CacheItem),
  182. pq: make(PriorityQueue, 0),
  183. quit: make(chan struct{}),
  184. ll: list.New(),
  185. maxBytes: maxBytes,
  186. }
  187. heap.Init(&cache.pq)
  188. cache.startEviction()
  189. return cache
  190. }