| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216 |
- package cache
- import (
- "container/heap"
- "container/list"
- "errors"
- "reflect"
- "sync"
- "time"
- )
- type MemoryCache struct {
- data map[string]*CacheItem
- ll *list.List // 用于实现LRU
- pq PriorityQueue // 用于实现TTL
- quit chan struct{}
- mu sync.Mutex
- maxBytes int64
- usedBytes int64
- }
- type CacheItem struct {
- Key string
- Value string
- Expiration int64
- Index int
- ListEle *list.Element
- }
- type PriorityQueue []*CacheItem
- func (pq PriorityQueue) Len() int { return len(pq) }
- func (pq PriorityQueue) Less(i, j int) bool {
- return pq[i].Expiration < pq[j].Expiration
- }
- func (pq PriorityQueue) Swap(i, j int) {
- pq[i], pq[j] = pq[j], pq[i]
- pq[i].Index = i
- pq[j].Index = j
- }
- func (pq *PriorityQueue) Push(x interface{}) {
- item := x.(*CacheItem)
- item.Index = len(*pq)
- *pq = append(*pq, item)
- }
- func (pq *PriorityQueue) Pop() interface{} {
- old := *pq
- n := len(old)
- item := old[n-1]
- old[n-1] = nil // avoid memory leak
- item.Index = -1 // for safety
- *pq = old[0 : n-1]
- return item
- }
- func (m *MemoryCache) Get(key string, value interface{}) error {
- // 使用反射将存储的值设置到传入的指针变量中
- val := reflect.ValueOf(value)
- if val.Kind() != reflect.Ptr {
- return errors.New("value must be a pointer")
- }
- //设为空值
- val.Elem().Set(reflect.Zero(val.Elem().Type()))
- m.mu.Lock()
- defer m.mu.Unlock()
- if m.data == nil {
- return nil
- }
- if item, ok := m.data[key]; ok {
- if item.Expiration < time.Now().UnixNano() {
- m.deleteItem(item)
- return nil
- }
- //移动到队列尾部
- m.ll.MoveToBack(item.ListEle)
- err := DecodeValue(item.Value, value)
- if err != nil {
- return err
- }
- }
- return nil
- }
- func (m *MemoryCache) Set(key string, value interface{}, exp int) error {
- m.mu.Lock()
- defer m.mu.Unlock()
- v, err := EncodeValue(value)
- if err != nil {
- return err
- }
- //key 所占用的内存
- keyBytes := int64(len(key))
- //value所占用的内存空间大小
- valueBytes := int64(len(v))
- //判断是否超过最大内存限制
- if m.maxBytes != 0 && m.maxBytes < keyBytes+valueBytes {
- return errors.New("exceed maxBytes")
- }
- m.usedBytes += keyBytes + valueBytes
- if m.maxBytes != 0 && m.usedBytes > m.maxBytes {
- m.RemoveOldest()
- }
- if exp <= 0 {
- exp = MaxTimeOut
- }
- expiration := time.Now().Add(time.Duration(exp) * time.Second).UnixNano()
- item, exists := m.data[key]
- if exists {
- item.Value = v
- item.Expiration = expiration
- heap.Fix(&m.pq, item.Index)
- m.ll.MoveToBack(item.ListEle)
- } else {
- ele := m.ll.PushBack(key)
- item = &CacheItem{
- Key: key,
- Value: v,
- Expiration: expiration,
- ListEle: ele,
- }
- m.data[key] = item
- heap.Push(&m.pq, item)
- }
- return nil
- }
- func (m *MemoryCache) RemoveOldest() {
- for m.maxBytes != 0 && m.usedBytes > m.maxBytes {
- elem := m.ll.Front()
- if elem != nil {
- key := elem.Value.(string)
- item := m.data[key]
- m.deleteItem(item)
- }
- }
- }
- // evictExpiredItems removes all expired items from the cache.
- func (m *MemoryCache) evictExpiredItems() {
- m.mu.Lock()
- defer m.mu.Unlock()
- now := time.Now().UnixNano()
- for m.pq.Len() > 0 {
- item := m.pq[0]
- if item.Expiration > now {
- break
- }
- m.deleteItem(item)
- }
- }
- // startEviction starts a goroutine that evicts expired items from the cache.
- func (m *MemoryCache) startEviction() {
- ticker := time.NewTicker(1 * time.Second)
- go func() {
- for {
- select {
- case <-ticker.C:
- m.evictExpiredItems()
- case <-m.quit:
- ticker.Stop()
- return
- }
- }
- }()
- }
- // stopEviction 停止定时清理
- func (m *MemoryCache) stopEviction() {
- close(m.quit)
- }
- // deleteItem removes a key from the cache.
- func (m *MemoryCache) deleteItem(item *CacheItem) {
- m.ll.Remove(item.ListEle)
- m.usedBytes -= int64(len(item.Key)) + int64(len(item.Value))
- heap.Remove(&m.pq, item.Index)
- delete(m.data, item.Key)
- }
- func (m *MemoryCache) Gc() error {
- m.mu.Lock()
- defer m.mu.Unlock()
- m.data = make(map[string]*CacheItem)
- m.ll = list.New()
- m.pq = make(PriorityQueue, 0)
- heap.Init(&m.pq)
- m.usedBytes = 0
- return nil
- }
- // NewMemoryCache creates a new MemoryCache.default maxBytes is 0, means no limit.
- func NewMemoryCache(maxBytes int64) *MemoryCache {
- cache := &MemoryCache{
- data: make(map[string]*CacheItem),
- pq: make(PriorityQueue, 0),
- quit: make(chan struct{}),
- ll: list.New(),
- maxBytes: maxBytes,
- }
- heap.Init(&cache.pq)
- cache.startEviction()
- return cache
- }
|