LRU算法缓存python实现

    技术2023-12-19  87

    LRU算法实现缓存定期处理

    LRU定义

    LRU是什么?按照英文的直接原义就是Least Recently Used,最近最久未使用法,它是按照一个非常著名的计算机操作系统基础理论得来的: 最近使用的页面数据会在未来一段时期内仍然被使用,已经很久没有使用的页面很有可能在未来较长的一段时间内仍然不会被使用。 基于这个思想,会存在一种缓存淘汰机制,每次从内存中找到最久未使用的数据然后置换出来,从而存入新的数据!它的主要衡量指标是使用的时间,附加指标是使用的次数。在计算机中大量使用了这个机制,它的合理性在于优先筛选热点数据,所谓热点数据,就是最近最多使用的数据!因为,利用LRU我们可以解决很多实际开发中的问题,并且很符合业务场景。

    python实现
    import time import functools from collections import OrderedDict # 1 LRU设计缓存 class LRUCacheDict : def __init__(self, max_size=1024, expiration=60): """最大容量为1024个key ,每个key的有效期为60s""" self .max_size = max_size self.expiration = expiration self._cache = {} self._access_records = OrderedDict() # 记录访问时间 self._expire_records = OrderedDict() # 记录失效时间 def __setitem__(self, key, value): """设置缓存""" now= int(time.time()) self.__delete__(key) self._cache[key] = value self._expire_records[key] = now + self.expiration self._access_records[key] = now self.clean_up () def __getitem__(self, key): """获取缓存""" now = int(time.time()) del self._access_records[key] self._access_records[key] = now self.clean_up() return self.cache[key] def __contains__(self, key): self.clean_up() return key in self._cache def __delete__ (self, key) : if key in self._cache : del self._cache[key] del self._expire_records[key] del self._access_records[key] def clean_up(self): """去掉无效(过期或者超出存储大小)的生是存""" if self.expiration is None : return None pending_delete_keys = [] now = int(time.time()) # 删除已经过期的缓存 for k, v in self._expire_records.items(): if v < now: pending_delete_keys.append(k) for del_k in pending_delete_keys: self.__delete__(del_k) # 如果数据堂大于max_size , 则删掉最旧的缓存 while(len(self._cache) > self.max_size): for k in self._access_records: self.__delete__(k) break # 2 将复杂些的LRU缓存函数转换成装饰器 def cache_it(max_size=1024, expiration=60) : """可以设置过期时间的缓存器""" CACHE = LRUCacheDict(max_size=max_size, expiration=expiration) def wrapper(func): @functools.wraps(func) def inner(*args, **kwargs): key = repr(*args, **kwargs) try: result = CACHE[key] except KeyError: result = func(*args, ** kwargs) CACHE[key] = result return result return inner return wrapper @cache_it(max_size=10, expiration=3) def query_it(sql): time.sleep(1) result = 'execute %s' % sql print(result) return result if __name__ == '__main__': # 1 直接测试 cache_dict = LRUCacheDict(max_size=2, expiration=10) cache_dict['name'] = 'achjiang' cache_dict['age'] = 30 cache_dict['addr'] = 'jiangsu ' print('name' in cache_dict) # 输出False,因为容量是2 ,第一个key会被删掉 print('age' in cache_dict) # 输出True time.sleep(11) print( 'age' in cache_dict) # 输出False,因为缓存失效了 # 2 缓存装饰器方法实现 query_it(100)

    输出

    False True False execute 100

    附录:python3中使用了functools.lru_cache函数封装该功能源码:

    def lru_cache(maxsize=128, typed=False): """Least-recently-used cache decorator. If *maxsize* is set to None, the LRU features are disabled and the cache can grow without bound. If *typed* is True, arguments of different types will be cached separately. For example, f(3.0) and f(3) will be treated as distinct calls with distinct results. Arguments to the cached function must be hashable. View the cache statistics named tuple (hits, misses, maxsize, currsize) with f.cache_info(). Clear the cache and statistics with f.cache_clear(). Access the underlying function with f.__wrapped__. See: http://en.wikipedia.org/wiki/Cache_algorithms#Least_Recently_Used """ # Users should only access the lru_cache through its public API: # cache_info, cache_clear, and f.__wrapped__ # The internals of the lru_cache are encapsulated for thread safety and # to allow the implementation to change (including a possible C version). # Early detection of an erroneous call to @lru_cache without any arguments # resulting in the inner function being passed to maxsize instead of an # integer or None. if maxsize is not None and not isinstance(maxsize, int): raise TypeError('Expected maxsize to be an integer or None') def decorating_function(user_function): wrapper = _lru_cache_wrapper(user_function, maxsize, typed, _CacheInfo) return update_wrapper(wrapper, user_function) return decorating_function def _lru_cache_wrapper(user_function, maxsize, typed, _CacheInfo): # Constants shared by all lru cache instances: sentinel = object() # unique object used to signal cache misses make_key = _make_key # build a key from the function arguments PREV, NEXT, KEY, RESULT = 0, 1, 2, 3 # names for the link fields cache = {} hits = misses = 0 full = False cache_get = cache.get # bound method to lookup a key or return None cache_len = cache.__len__ # get cache size without calling len() lock = RLock() # because linkedlist updates aren't threadsafe root = [] # root of the circular doubly linked list root[:] = [root, root, None, None] # initialize by pointing to self if maxsize == 0: def wrapper(*args, **kwds): # No caching -- just a statistics update after a successful call nonlocal misses result = user_function(*args, **kwds) misses += 1 return result elif maxsize is None: def wrapper(*args, **kwds): # Simple caching without ordering or size limit nonlocal hits, misses key = make_key(args, kwds, typed) result = cache_get(key, sentinel) if result is not sentinel: hits += 1 return result result = user_function(*args, **kwds) cache[key] = result misses += 1 return result else: def wrapper(*args, **kwds): # Size limited caching that tracks accesses by recency nonlocal root, hits, misses, full key = make_key(args, kwds, typed) with lock: link = cache_get(key) if link is not None: # Move the link to the front of the circular queue link_prev, link_next, _key, result = link link_prev[NEXT] = link_next link_next[PREV] = link_prev last = root[PREV] last[NEXT] = root[PREV] = link link[PREV] = last link[NEXT] = root hits += 1 return result result = user_function(*args, **kwds) with lock: if key in cache: # Getting here means that this same key was added to the # cache while the lock was released. Since the link # update is already done, we need only return the # computed result and update the count of misses. pass elif full: # Use the old root to store the new key and result. oldroot = root oldroot[KEY] = key oldroot[RESULT] = result # Empty the oldest link and make it the new root. # Keep a reference to the old key and old result to # prevent their ref counts from going to zero during the # update. That will prevent potentially arbitrary object # clean-up code (i.e. __del__) from running while we're # still adjusting the links. root = oldroot[NEXT] oldkey = root[KEY] oldresult = root[RESULT] root[KEY] = root[RESULT] = None # Now update the cache dictionary. del cache[oldkey] # Save the potentially reentrant cache[key] assignment # for last, after the root and links have been put in # a consistent state. cache[key] = oldroot else: # Put result in a new link at the front of the queue. last = root[PREV] link = [last, root, key, result] last[NEXT] = root[PREV] = cache[key] = link # Use the cache_len bound method instead of the len() function # which could potentially be wrapped in an lru_cache itself. full = (cache_len() >= maxsize) misses += 1 return result def cache_info(): """Report cache statistics""" with lock: return _CacheInfo(hits, misses, maxsize, cache_len()) def cache_clear(): """Clear the cache and cache statistics""" nonlocal hits, misses, full with lock: cache.clear() root[:] = [root, root, None, None] hits = misses = 0 full = False wrapper.cache_info = cache_info wrapper.cache_clear = cache_clear return wrapper try: from _functools import _lru_cache_wrapper except ImportError: pass
    Processed: 0.032, SQL: 9