缓存
缓存是高并发场景下提高热点数据访问性能的一个有效手段,在开发项目时会经常使用到。缓存的类型分为:本地缓存、分布式缓存和多级缓存
- 本地缓存:也就是在进程的内存中进行缓存,比如最简单的Map来实现,也可以使用Guava Cache和Ehcache这样的工具来实现;本地缓存是内存访问,没有远程交互开销,性能最好,但是受限于单机容量,一般缓存较小且无法扩展。
- 分布式缓存: 一般具有良好的水平扩展能力,可以很好的解决以上问题,对较大数据量的场景也能应付自如。缺点就是需要进行远程请求,性能不如本地缓存
- 多级缓存:为了平衡本地缓存和分布式缓存的问题,实际业务中一般采用多级缓存,本地缓存只保存访问频率最高的部分热点数据,其他的热点数据放在分布式缓存中
今天聊聊Guava Cache实现的本地缓存
选用Guava Cache
版本
- 与master分支同步的HEAD-jre-SNAPSHOT版本,据此最近的一个发行版本是14/04/2020发布的29.0-jre
Guava Cache
用Java Map做缓存功能简单,缺乏淘汰策略,缓存统计等信息,使用Guava Cache可以弥补这类问题
- 提供LRU缓存淘汰策略
- 清理过期等数据时 一是可以通过另开线程进行维护,二是惰性清理 即 访问时清理,Guava Cache采用的是后者
- 支持强引用、弱引用和软引用的key/value,及时清理掉key/value被GC的entry
- 提供CacheStats记录缓存击中率等信息
使用
Guava Cache使用非常方便,通过CacheBuilder定义一个cache并设置相应参数即可
@Testpublic void test() throws Exception {// 创建一个Cache// - 写入数据后10s过期// - 2s刷新一次// - 添加删除Listener,当entry被删除时可以做相关操作// - 定义CacheLoader,当cache中不存在Cache<String, CacheObject> cache = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).refreshAfterWrite(2, TimeUnit.SECONDS).removalListener((notification) -> {System.out.println("removed key:" + notification.getKey()+ " value:" + notification.getValue());}).build(new CacheLoader<String, CacheObject>() {@Overridepublic CacheObject load(String key) throws Exception {System.out.println(key + " loaded");CacheObject o = new CacheObject();o.setKey(key);o.setValue(key + ":value:" + Thread.currentThread().getId());return o;}});CacheObject o1 = cache.get("key1");assertNotNull(o1);Thread.sleep(3000);CacheObject o2 = cache.get("key1");assertNotNull(o2);}
应用及原理
加载 - 提供两种方式进行加载
-
CacheLoader
Cache<String, CacheObject> cache = CacheBuilder.newBuilder().build(new CacheLoader<String, CacheObject>() {@Overridepublic CacheObject load(String key) throws Exception {// load your datareturn o;}});
-
Callable
// 如果cache中没有 就根据loader进行加载V get(K key, Callable<? extends V> loader)
两者区别呢, CacheLoader针对全局的key进行load, 而Callable则是指定key, 使用上更为灵活
淘汰策略
-
通过参数expireAfterAccessNanos访问多久后过期, expireAfterWriteNanos写入多久后过期
/** Returns true if the entry has expired. */ boolean isExpired(ReferenceEntry<K, V> entry, long now) {checkNotNull(entry);// 设置了参数&已过期if (expiresAfterAccess() && (now - entry.getAccessTime() >= expireAfterAccessNanos)) {return true;}if (expiresAfterWrite() && (now - entry.getWriteTime() >= expireAfterWriteNanos)) {return true;}return false; }
-
根据参数maximumSize最大entry数量,maximumWeight最大占用内存。两者只能设置一个
// 构造函数中初始化 maxWeight = builder.getMaximumWeight();long getMaximumWeight() {if (expireAfterWriteNanos == 0 || expireAfterAccessNanos == 0) {return 0;}return (weigher == null) ? maximumSize : maximumWeight; }// 超过限制则尝试淘汰部分entryvoid evictEntries(ReferenceEntry<K, V> newest) {if (!map.evictsBySize()) {return;}// 将recencyQueue的访问信息维护到LRU队列accessQueue中drainRecencyQueue();// newest.getValueReference().getWeight()这个值单位既可以是条数也可以是大小, 具体看子类实现// 如果这个entry的weight大小已经大于整个段(segment)的最大限制,直接清除掉if (newest.getValueReference().getWeight() > maxSegmentWeight) {if (!removeEntry(newest, newest.getHash(), RemovalCause.SIZE)) {throw new AssertionError();}}// 此Segment中存活的weight总大小 大于segment的最大限制,从LRU队列(accessQueue) 清除,直到满足限制while (totalWeight > maxSegmentWeight) {ReferenceEntry<K, V> e = getNextEvictable();if (!removeEntry(e, e.getHash(), RemovalCause.SIZE)) {throw new AssertionError();}}}
-
软引用、弱引用回收
/*** Drain the key and value reference queues, cleaning up internal entries containing garbage* collected keys or values.*/void drainReferenceQueues() {// 首先得使用了软引用or弱引用if (map.usesKeyReferences()) {// 清除key被GC的entrydrainKeyReferenceQueue();}if (map.usesValueReferences()) {// 清除value被GC的entrydrainValueReferenceQueue();}}boolean usesKeyReferences() {return keyStrength != Strength.STRONG;}void drainKeyReferenceQueue() {Reference<? extends K> ref;int i = 0;// 如果已经进入了引用队列 那就是被GC了while ((ref = keyReferenceQueue.poll()) != null) {@SuppressWarnings("unchecked")ReferenceEntry<K, V> entry = (ReferenceEntry<K, V>) ref;// 清除这条entrymap.reclaimKey(entry);if (++i == DRAIN_MAX) {break;}}}void drainValueReferenceQueue() {Reference<? extends V> ref;int i = 0;while ((ref = valueReferenceQueue.poll()) != null) {@SuppressWarnings("unchecked")ValueReference<K, V> valueReference = (ValueReference<K, V>) ref;map.reclaimValue(valueReference);if (++i == DRAIN_MAX) {break;}}}
手动清除
-
指定key清除
/** Discards any cached value for key {@code key}. */ void invalidate(@CompatibleWith("K") Object key);
-
清除多个key
/*** Discards any cached values for keys {@code keys}.** @since 11.0*/void invalidateAll(Iterable<?> keys);
-
清除所有entry
/** Discards all entries in the cache. */void invalidateAll();
删除监听器
-
整个cache中每移除一条entry时都会调用一个方法来触发删除监听器
void enqueueNotification(@Nullable K key, int hash, @Nullable V value, int weight, RemovalCause cause) {totalWeight -= weight;if (cause.wasEvicted()) {statsCounter.recordEviction();}// 如果设置了监听器就将删除信息放在队列中等待 删除监听器处理if (map.removalNotificationQueue != DISCARDING_QUEUE) {RemovalNotification<K, V> notification = RemovalNotification.create(key, value, cause);map.removalNotificationQueue.offer(notification);}}// get or put都会触发此操作,无须加锁场景void processPendingNotifications() {RemovalNotification<K, V> notification;while ((notification = removalNotificationQueue.poll()) != null) {try {removalListener.onRemoval(notification);} catch (Throwable e) {logger.log(Level.WARNING, "Exception thrown by removal listener", e);}}}
CacheStats统计信息
public static final class SimpleStatsCounter implements StatsCounter {// 缓存击中次数private final LongAddable hitCount = LongAddables.create();// 未击中次数private final LongAddable missCount = LongAddables.create();// 数据加载成功次数private final LongAddable loadSuccessCount = LongAddables.create();// 加载异常次数private final LongAddable loadExceptionCount = LongAddables.create();// 总的加载时间private final LongAddable totalLoadTime = LongAddables.create();// 淘汰次数private final LongAddable evictionCount = LongAddables.create();}
源码分析
CacheBuilder
使用Cache的入口类, 构造者模式 按需设置参数,常用有两种
-
不带loader,底层通过LocalCache.LocalManualCache构建
public <K1 extends K, V1 extends V> Cache<K1, V1> build() {checkWeightWithWeigher();checkNonLoadingCache();return new LocalCache.LocalManualCache<>(this);}其中LocalManualCache定义static class LocalManualCache<K, V> implements Cache<K, V>, Serializable {final LocalCache<K, V> localCache;LocalManualCache(CacheBuilder<? super K, ? super V> builder) {this(new LocalCache<K, V>(builder, null));}
-
带loader, 底层通过LocalCache.LoadingCache构建
public <K1 extends K, V1 extends V> LoadingCache<K1, V1> build(CacheLoader<? super K1, V1> loader) {checkWeightWithWeigher();return new LocalCache.LocalLoadingCache<>(this, loader);}// 其中LocalLoadingCache定义static class LocalLoadingCache<K, V> extends LocalManualCache<K, V>implements LoadingCache<K, V> {LocalLoadingCache(CacheBuilder<? super K, ? super V> builder, CacheLoader<? super K, V> loader) {super(new LocalCache<K, V>(builder, checkNotNull(loader)));}
-
这种方式在get(key)的时候,cache中有则返回,否则根据loader进行加载
public V get(K key) throws ExecutionException {return localCache.getOrLoad(key);}
-
LocalCache - 缓存核心实现
LocalCache是Guava Cache缓存的核心实现,整个组建都以它为中心扩展,常用的两个扩展LocalManualCache, LoadingCache
-
LocalCache 从设计上来看
class LocalCache<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V> {// 最大容量static final int MAXIMUM_CAPACITY = 1 << 30;// 最大段数量static final int MAX_SEGMENTS = 1 << 16; // slightly conservative// 通过contains判断某个值是否存在时,因为在未加锁的情况下,通过多次尝试来判断static final int CONTAINS_VALUE_RETRIES = 3;// 简单的说就是 在无锁的情况下,读操作达到了这个次数才进行一次清理static final int DRAIN_THRESHOLD = 0x3F;// 每次清理的最大entry数量(避免一次性清理过多 耗时太长)static final int DRAIN_MAX = 16;// 这两参数是在根据hash确定具体的segment时用的final int segmentMask;final int segmentShift;// 每个localCache的段信息,所有数据也都在这里final Segment<K, V>[] segments;// 并发数,影响段数量(segments)final int concurrencyLevel;// key/value判等器final Equivalence<Object> keyEquivalence;final Equivalence<Object> valueEquivalence;// 这俩值确定来key/value的引用类型(强/软/弱)final Strength keyStrength;final Strength valueStrength;// 允许的最大条目数or最大占用内存final long maxWeight;// 访问后多久过期final long expireAfterAccessNanos;// 写入后多久过期final long expireAfterWriteNanos;// 刷新间隔final long refreshNanos;// entry移除后,等待removalListener处理的队列final Queue<RemovalNotification<K, V>> removalNotificationQueue;// 删除监听器,可做一些删除后处理final RemovalListener<K, V> removalListener;// 计时相关的final Ticker ticker;// Entry创建工厂, 枚举类final EntryFactory entryFactory;// 统计器final StatsCounter globalStatsCounter;// 默认加载final @Nullable CacheLoader<? super K, V> defaultLoader;.......
设计思路上和JDK1.7 ConcurrentHashMap相似,采用分段锁的思想保证线程安全性,而segments的数量在构造时就已经确定好了,
因此实际的操作已委托给segment处理了,Segment可以看作HashTable的缩小版(将一个大的Table首次hash分散到不同Segment上,在segment中再次hash将数据分散到不同table上,
也就是后续的增/删/查/扩容都只和segment有关了),不同segment之间互不影响,可以并发操作看看LocalCache和Segment的关联
public @Nullable V get(@Nullable Object key) {if (key == null) {return null;}int hash = hash(key);// 首次hash找到相关的段return segmentFor(hash).get(key, hash); }V get(K key, CacheLoader<? super K, V> loader) throws ExecutionException {int hash = hash(checkNotNull(key));return segmentFor(hash).get(key, hash, loader); }public @Nullable V getIfPresent(Object key) {int hash = hash(checkNotNull(key));V value = segmentFor(hash).get(key, hash);if (value == null) {globalStatsCounter.recordMisses(1);} else {globalStatsCounter.recordHits(1);}return value; }Segment<K, V> segmentFor(int hash) {return segments[(hash >>> segmentShift) & segmentMask]; }
来看看LocalCache构造方法
LocalCache(CacheBuilder<? super K, ? super V> builder, @Nullable CacheLoader<? super K, V> loader) {// 并发数concurrencyLevel = Math.min(builder.getConcurrencyLevel(), MAX_SEGMENTS);// key/value引用类型keyStrength = builder.getKeyStrength();valueStrength = builder.getValueStrength();// key/value判等器keyEquivalence = builder.getKeyEquivalence();valueEquivalence = builder.getValueEquivalence();maxWeight = builder.getMaximumWeight();weigher = builder.getWeigher();expireAfterAccessNanos = builder.getExpireAfterAccessNanos();expireAfterWriteNanos = builder.getExpireAfterWriteNanos();refreshNanos = builder.getRefreshNanos();removalListener = builder.getRemovalListener();removalNotificationQueue =(removalListener == NullListener.INSTANCE)? LocalCache.<RemovalNotification<K, V>>discardingQueue(): new ConcurrentLinkedQueue<RemovalNotification<K, V>>();ticker = builder.getTicker(recordsTime());entryFactory = EntryFactory.getFactory(keyStrength, usesAccessEntries(), usesWriteEntries());globalStatsCounter = builder.getStatsCounterSupplier().get();defaultLoader = loader;int initialCapacity = Math.min(builder.getInitialCapacity(), MAXIMUM_CAPACITY);if (evictsBySize() && !customWeigher()) {initialCapacity = (int) Math.min(initialCapacity, maxWeight);}int segmentShift = 0;int segmentCount = 1;// 根据concurrencyLevel将segment数量调成2的指数大小,想想HashMap// 在没有指定maxWeight的情况下,segment数量只和concurrencyLevel相关; 否则segment数量至少要满足有20条entrywhile (segmentCount < concurrencyLevel && (!evictsBySize() || segmentCount * 20 <= maxWeight)) {++segmentShift;segmentCount <<= 1;}this.segmentShift = 32 - segmentShift;segmentMask = segmentCount - 1;// 创建segments数组并初始化this.segments = newSegmentArray(segmentCount);// 初始化容量均分到segment上int segmentCapacity = initialCapacity / segmentCount;if (segmentCapacity * segmentCount < initialCapacity) {++segmentCapacity;}int segmentSize = 1;while (segmentSize < segmentCapacity) {segmentSize <<= 1;}// 如果设置了条目数量(or大小),需要分配到每个segment上if (evictsBySize()) {// Ensure sum of segment max weights = overall max weightslong maxSegmentWeight = maxWeight / segmentCount + 1;long remainder = maxWeight % segmentCount;for (int i = 0; i < this.segments.length; ++i) {if (i == remainder) {maxSegmentWeight--;}this.segments[i] =createSegment(segmentSize, maxSegmentWeight, builder.getStatsCounterSupplier().get());}} else {for (int i = 0; i < this.segments.length; ++i) {this.segments[i] =createSegment(segmentSize, UNSET_INT, builder.getStatsCounterSupplier().get());}}}
OK, LocalCache要做的工作就差不多了,下面看看核心操作Segment,接着上面创建createSegment来看看
Segment<K, V> createSegment(int initialCapacity, long maxSegmentWeight, StatsCounter statsCounter) {return new Segment<>(this, initialCapacity, maxSegmentWeight, statsCounter);}Segment(LocalCache<K, V> map,int initialCapacity,long maxSegmentWeight,StatsCounter statsCounter) {this.map = map;this.maxSegmentWeight = maxSegmentWeight;this.statsCounter = checkNotNull(statsCounter);initTable(newEntryArray(initialCapacity));// key/value引用队列,软/弱引用可用,key or value被GC后的entry都会被放进此队列// 之后便可根据此队列清除entry(也就是之前说的 引用回收)keyReferenceQueue = map.usesKeyReferences() ? new ReferenceQueue<K>() : null;valueReferenceQueue = map.usesValueReferences() ? new ReferenceQueue<V>() : null;// 注意这里默认用的支持并发访问的ConcurrentLinkedQueue// 本来accessQueue就可以支持LRU策略,那recencyQueue的具体作用是什么?// - accessQueue在加锁的情况下记录read信息,而recencyQueue在不加锁的情况下使用// - recencyQueue最终信息也也同步到accessQueue中(按顺序往队尾加),这样的设计在高并发读的情况下 确实不错recencyQueue =map.usesAccessQueue()? new ConcurrentLinkedQueue<ReferenceEntry<K, V>>(): LocalCache.<ReferenceEntry<K, V>>discardingQueue();// 和"写"淘汰策略相关,其实每次写包含了读(影响accessQueue),反之不然.writeQueue =map.usesWriteQueue()? new WriteQueue<K, V>(): LocalCache.<ReferenceEntry<K, V>>discardingQueue();// 和"读"策略相关accessQueue =map.usesAccessQueue()? new AccessQueue<K, V>(): LocalCache.<ReferenceEntry<K, V>>discardingQueue();}
以上便是LocalCache、Segment的创建过程,下面看看具体的操作实现
GET操作
-
V get(Object key, int hash)
V get(Object key, int hash) {try {// count表示此segment中entry的数量, 大于0则尝试读取if (count != 0) { // read-volatilelong now = map.ticker.read();// 根据key找到是否存活状态的entry,主要操作:// - 如果key为null, 则可能被GC了,触发清理key/value引用队列// - 如果此entry已过期,尝试清除过期的entriesReferenceEntry<K, V> e = getLiveEntry(key, hash, now);if (e == null) {return null;}V value = e.getValueReference().get();if (value != null) {// 记录读信息recordRead(e, now);// 如果设置了刷新参数refreshNanos,并且到了刷新时间点了则进行刷新,等待返回结果return scheduleRefresh(e, e.getKey(), hash, value, now, map.defaultLoader);}// 清理一次引用队列tryDrainReferenceQueues();}return null;} finally {// 后置读清除,之前提到过 要达到一定读次数才会清除postReadCleanup();} }
-
V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException
V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {checkNotNull(key);checkNotNull(loader);try {if (count != 0) { // read-volatile// don't call getLiveEntry, which would ignore loading valuesReferenceEntry<K, V> e = getEntry(key, hash);if (e != null) {long now = map.ticker.read();V value = getLiveValue(e, now);if (value != null) {recordRead(e, now);statsCounter.recordHits(1);return scheduleRefresh(e, key, hash, value, now, loader);}ValueReference<K, V> valueReference = e.getValueReference();if (valueReference.isLoading()) {// 多线程的情况下,此value可能正在加载,等待加载完成返回return waitForLoadingValue(e, key, valueReference);}}}// at this point e is either null or expired;// 如果上面操作没有成功返回,则锁定并用loader加载数据return lockedGetOrLoad(key, hash, loader);} catch (ExecutionException ee) {Throwable cause = ee.getCause();if (cause instanceof Error) {throw new ExecutionError((Error) cause);} else if (cause instanceof RuntimeException) {throw new UncheckedExecutionException(cause);}throw ee;} finally {// 后置读清理postReadCleanup();}}
-
scheduleRefresh 如果必要的话,刷新数据
V scheduleRefresh(ReferenceEntry<K, V> entry,K key,int hash,V oldValue,long now,CacheLoader<? super K, V> loader) {// 设置了刷新时间// 到了刷新时间点// 此entry的value没有在加载中 if (map.refreshes()&& (now - entry.getWriteTime() > map.refreshNanos)&& !entry.getValueReference().isLoading()) { // todoV newValue = refresh(key, hash, loader, true);if (newValue != null) {return newValue;}}return oldValue;}
-
refresh
V refresh(K key, int hash, CacheLoader<? super K, V> loader, boolean checkTime) {// 这步操作主要目的是 为当前key对应的entry(没有则创建) 的valueReference设置这样一个值LoadingValueReference,// 表示此值正在加载,多线程环境下,其他线程只需等待结果返回即可// 全程加锁场景下final LoadingValueReference<K, V> loadingValueReference =insertLoadingValueReference(key, hash, checkTime);if (loadingValueReference == null) {return null;}// 异步加载数据,实际上这里的异步主要是返回值和保存值的异步操作ListenableFuture<V> result = loadAsync(key, hash, loadingValueReference, loader);if (result.isDone()) {try {// 从Future中拿到值return Uninterruptibles.getUninterruptibly(result);} catch (Throwable t) {// don't let refresh exceptions propagate; error was already logged}}return null;}
-
loadAsync 来看看真正执行数据"异步"加载的方法
ListenableFuture<V> loadAsync(final K key,final int hash,final LoadingValueReference<K, V> loadingValueReference,CacheLoader<? super K, V> loader) {// 真正会去加载数据的方法// 会调用用户自定义的loader进行加载,并将值以Future包装返回 final ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);loadingFuture.addListener(new Runnable() {@Overridepublic void run() {try {// 获取最新的值,更新entry中的value,记录stats等信息getAndRecordStats(key, hash, loadingValueReference, loadingFuture);} catch (Throwable t) {logger.log(Level.WARNING, "Exception thrown during refresh", t);loadingValueReference.setException(t);}}},// 目前此方法directExecutor的作用是同步执行,当然也可以传一个支持异步的Executor// 也就是getAndRecordStats方法将会和 return loadingFuture异步执行,// 这样外层loadingFuture.get()取值和保存record信息就实现了异步执行,提高效率directExecutor());return loadingFuture;}
如果先处理getAndRecordStats操作,在返回value值就变成来同步加载数据了,如下:
-
loadSync 同步加载数据
V loadSync(K key,int hash,LoadingValueReference<K, V> loadingValueReference,CacheLoader<? super K, V> loader)throws ExecutionException {ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);return getAndRecordStats(key, hash, loadingValueReference, loadingFuture);}
-
loadFuture 真正加载数据的方法,此方法比较巧妙,会先将值设置回LoadingValueReference#futureValue中,再进行返回;这样的好处是 多线程环境下,读线程可以拿到该新值然后直接返回
public ListenableFuture<V> loadFuture(K key, CacheLoader<? super K, V> loader) {try {stopwatch.start();V previousValue = oldValue.get();if (previousValue == null) {V newValue = loader.load(key);// 尝试将值先设置到futureValue中(并发场景下,读线程可以立即取到值),成功了立即返回futureValue,// 失败了也没关系,包装一个ImmediateFuture返回return set(newValue) ? futureValue : Futures.immediateFuture(newValue);}ListenableFuture<V> newValue = loader.reload(key, previousValue);if (newValue == null) {return Futures.immediateFuture(null);}// To avoid a race, make sure the refreshed value is set into loadingValueReference// *before* returning newValue from the cache query.// 为避免竞争等待,将加载的值在返回存储到cache之前,提前设置回LoadingValueReference#futureValue中// 多线程读操作情况下 在Segment#waitForLoadingValue中可以直接拿到这个值返回return transform(newValue,new com.google.common.base.Function<V, V>() {@Overridepublic V apply(V newValue) {LoadingValueReference.this.set(newValue);return newValue;}},// com.google.common.util.concurrent.DirectExecutor枚举实例,实际上也是串行操作directExecutor());} catch (Throwable t) {ListenableFuture<V> result = setException(t) ? futureValue : fullyFailedFuture(t);if (t instanceof InterruptedException) {Thread.currentThread().interrupt();}return result;} }
-
最后便是保存最新value和记录stats信息 getAndRecordStats, storeLoadedValue
/** Waits uninterruptibly for {@code newValue} to be loaded, and then records loading stats. */V getAndRecordStats(K key,int hash,LoadingValueReference<K, V> loadingValueReference,ListenableFuture<V> newValue)throws ExecutionException {V value = null;try {// 自旋的方式从future中取值value = getUninterruptibly(newValue);if (value == null) {throw new InvalidCacheLoadException("CacheLoader returned null for key " + key + ".");}// 记录stats信息statsCounter.recordLoadSuccess(loadingValueReference.elapsedNanos());// 保存加载的新值storeLoadedValue(key, hash, loadingValueReference, value);return value;} finally {if (value == null) {statsCounter.recordLoadException(loadingValueReference.elapsedNanos());removeLoadingValue(key, hash, loadingValueReference);}}}// 将加载好的值设置到entry value中,也就是将LoadingValueReference类型的值设置为真实的ValueReference类型的值boolean storeLoadedValue(K key, int hash, LoadingValueReference<K, V> oldValueReference, V newValue) {lock();try {long now = map.ticker.read();// 引用清理,过期清理preWriteCleanup(now);int newCount = this.count + 1;if (newCount > this.threshold) { // ensure capacityexpand();newCount = this.count + 1;}AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;int index = hash & (table.length() - 1);ReferenceEntry<K, V> first = table.get(index);for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {K entryKey = e.getKey();if (e.getHash() == hash&& entryKey != null&& map.keyEquivalence.equivalent(key, entryKey)) {ValueReference<K, V> valueReference = e.getValueReference();V entryValue = valueReference.get();// replace the old LoadingValueReference if it's live, otherwise// perform a putIfAbsentif (oldValueReference == valueReference|| (entryValue == null && valueReference != UNSET)) {++modCount;if (oldValueReference.isActive()) {RemovalCause cause =(entryValue == null) ? RemovalCause.COLLECTED : RemovalCause.REPLACED;enqueueNotification(key, hash, entryValue, oldValueReference.getWeight(), cause);newCount--;}setValue(e, key, newValue, now);this.count = newCount; // write-volatileevictEntries(e);return true;}// the loaded value was already clobberedenqueueNotification(key, hash, newValue, 0, RemovalCause.REPLACED);return false;}}++modCount;ReferenceEntry<K, V> newEntry = newEntry(key, hash, first);// 保存新值setValue(newEntry, key, newValue, now);table.set(index, newEntry);this.count = newCount; // write-volatile// 如果达到了限制则清理entryevictEntries(newEntry);return true;} finally {unlock();postWriteCleanup();}}/** Sets a new value of an entry. Adds newly created entries at the end of the access queue. */@GuardedBy("this")void setValue(ReferenceEntry<K, V> entry, K key, V value, long now) {ValueReference<K, V> previous = entry.getValueReference();int weight = map.weigher.weigh(key, value);checkState(weight >= 0, "Weights must be non-negative");ValueReference<K, V> valueReference =map.valueStrength.referenceValue(this, entry, value, weight);// 这一步就是用真实的ValueReference替换临时的LoadingValueReferenceentry.setValueReference(valueReference);recordWrite(entry, weight, now);previous.notifyNewValue(value);}
来看看get操作涉及到的方法,这些方法比较公用,很多操作都会用到
-
getLiveEntry 根据key拿到 "存活"的entry
ReferenceEntry<K, V> getLiveEntry(Object key, int hash, long now) {ReferenceEntry<K, V> e = getEntry(key, hash);if (e == null) {return null;} else if (map.isExpired(e, now)) { // entry是否过期// 尝试清除过期的entrytryExpireEntries(now);return null;}return e;}
-
getEntry 执行获取entry操作
ReferenceEntry<K, V> getEntry(Object key, int hash) {for (ReferenceEntry<K, V> e = getFirst(hash); e != null; e = e.getNext()) {if (e.getHash() != hash) {continue;}K entryKey = e.getKey();if (entryKey == null) {// 可能被GC了,尝试清除引用队列tryDrainReferenceQueues();continue;}// key是否相等if (map.keyEquivalence.equivalent(key, entryKey)) {return e;}}return null;}
-
tryDrainReferenceQueues 尝试清除引用队列
void tryDrainReferenceQueues() {// 加锁清除if (tryLock()) {try {drainReferenceQueues();} finally {unlock();}} }// 最终又到这里了... void drainReferenceQueues() {if (map.usesKeyReferences()) {drainKeyReferenceQueue();}if (map.usesValueReferences()) {drainValueReferenceQueue();} }
-
isExpired 是否已过期了
/** Returns true if the entry has expired. */boolean isExpired(ReferenceEntry<K, V> entry, long now) {checkNotNull(entry);// 如果设置了此参数&过期了if (expiresAfterAccess() && (now - entry.getAccessTime() >= expireAfterAccessNanos)) {return true;}if (expiresAfterWrite() && (now - entry.getWriteTime() >= expireAfterWriteNanos)) {return true;}return false;}
-
tryExpireEntries
void tryExpireEntries(long now) {// 加锁清理if (tryLock()) {try {expireEntries(now);} finally {unlock();// don't call postWriteCleanup as we're in a read}}}
-
expireEntries
void expireEntries(long now) {drainRecencyQueue();ReferenceEntry<K, V> e;while ((e = writeQueue.peek()) != null && map.isExpired(e, now)) {if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {throw new AssertionError();}}while ((e = accessQueue.peek()) != null && map.isExpired(e, now)) {if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {throw new AssertionError();}}}
PUT操作
-
LocalCache#put(K key, V value);
public V put(K key, V value) {checkNotNull(key);checkNotNull(value);int hash = hash(key);return segmentFor(hash).put(key, hash, value, false);}
真正的实现在Segment中 LocalCache.Segment#put(K key, int hash, V value, boolean onlyIfAbsent)
V put(K key, int hash, V value, boolean onlyIfAbsent) {lock();try {long now = map.ticker.read();// 添加前先清理被GC的entry和已过期的entrypreWriteCleanup(now);int newCount = this.count + 1;if (newCount > this.threshold) { // ensure capacity// 在put操作之前进行扩容// storeLoadedValue也是在存值之前进行扩容expand();newCount = this.count + 1;}AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;int index = hash & (table.length() - 1);ReferenceEntry<K, V> first = table.get(index);// Look for an existing entry.for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {K entryKey = e.getKey();if (e.getHash() == hash&& entryKey != null&& map.keyEquivalence.equivalent(key, entryKey)) {// We found an existing entry.ValueReference<K, V> valueReference = e.getValueReference();V entryValue = valueReference.get();if (entryValue == null) {++modCount;// 说明value已经被GC了if (valueReference.isActive()) {// 放进删除队列 待删除监听器处理enqueueNotification(key, hash, entryValue, valueReference.getWeight(), RemovalCause.COLLECTED);// 赋予新值setValue(e, key, value, now);newCount = this.count; // count remains unchanged} else {setValue(e, key, value, now);newCount = this.count + 1;}this.count = newCount; // write-volatileevictEntries(e);return null;} else if (onlyIfAbsent) { // onlyIfAbsent表示仅在不存在的情况下put, 存在的情况下直接返回// Mimic// "if (!map.containsKey(key)) ...// else return map.get(key);recordLockedRead(e, now);return entryValue;} else { // 否则的话便是新值取代旧值// clobber existing entry, count remains unchanged++modCount;enqueueNotification(key, hash, entryValue, valueReference.getWeight(), RemovalCause.REPLACED);setValue(e, key, value, now);evictEntries(e);return entryValue;}}}// Create a new entry.++modCount;ReferenceEntry<K, V> newEntry = newEntry(key, hash, first);setValue(newEntry, key, value, now);table.set(index, newEntry);newCount = this.count + 1;this.count = newCount; // write-volatile// put之后根据maxWeight进行清理超出的部分evictEntries(newEntry);return null;} finally {unlock();// 这一步便是在无锁的情况下通过 删除监听器处理被删除的entrypostWriteCleanup();}}
可以看到put操作之前有一步是否扩容判断
-
expand 执行扩容
void expand() {AtomicReferenceArray<ReferenceEntry<K, V>> oldTable = table;int oldCapacity = oldTable.length();if (oldCapacity >= MAXIMUM_CAPACITY) {return;}int newCount = count;AtomicReferenceArray<ReferenceEntry<K, V>> newTable = newEntryArray(oldCapacity << 1);// 0.75的扩容因子threshold = newTable.length() * 3 / 4;int newMask = newTable.length() - 1;for (int oldIndex = 0; oldIndex < oldCapacity; ++oldIndex) {// We need to guarantee that any existing reads of old Map can// proceed. So we cannot yet null out each bin.ReferenceEntry<K, V> head = oldTable.get(oldIndex);if (head != null) {ReferenceEntry<K, V> next = head.getNext();int headIndex = head.getHash() & newMask;// Single node on listif (next == null) {newTable.set(headIndex, head);} else {// Reuse the consecutive sequence of nodes with the same target// index from the end of the list. tail points to the first// entry in the reusable list.// 这步操作主要是找到最后newIndex相同的list, 然后一次性放在新table index上ReferenceEntry<K, V> tail = head;int tailIndex = headIndex;for (ReferenceEntry<K, V> e = next; e != null; e = e.getNext()) {int newIndex = e.getHash() & newMask;if (newIndex != tailIndex) {// The index changed. We'll need to copy the previous entry.tailIndex = newIndex;tail = e;}}// 因为采用的是2倍扩容机制,所以要么stay at same index, 要么是oldIndex + oldCapacity上// 因此这里直接设置table[tailIndex], 无须考虑此位置是否有值newTable.set(tailIndex, tail);// Clone nodes leading up to the tail.// 挨个遍历到新table上for (ReferenceEntry<K, V> e = head; e != tail; e = e.getNext()) {int newIndex = e.getHash() & newMask;ReferenceEntry<K, V> newNext = newTable.get(newIndex);ReferenceEntry<K, V> newFirst = copyEntry(e, newNext);if (newFirst != null) {newTable.set(newIndex, newFirst);} else {// 遇到key/value被GC掉的entry直接清除removeCollectedEntry(e);newCount--;}}}}}table = newTable;this.count = newCount;}
以上便是GET和PUT操作的流程,其他操作如compute, reclaimKey等都大同小异,不在赘述
以上是个人理解,如有问题请指出,谢谢!