官方介绍
Efficiently thread safe for inserts (main point of this stuff),wait-free for lookups. 插入和查找都是wait-free的
You can erase from this container, but the cell containing the key will not be free or reclaimed.可以删除 但是实际上内存不会删除
clear()函数为把全部元素清楚,只能一个线程调用
AtomicHashMap概览
AtomicHashMap由哪些组成呢?
std::atomic<SubMap*> subMaps_[kNumSubMaps_];// submap是AtomicHashArray kNumSubMaps_=16std::atomic<uint32_t> numMapsAllocated_;
可以看到一个AtomicHashMap由16个数组组成,结构体AtomicHashArray,AtomicHashArray的组成如下:
const size_t capacity_;const size_t maxEntries_;const KeyT kEmptyKey_;//表示空keyconst KeyT kLockedKey_;//表示该槽位正在执行操作,lock住const KeyT kErasedKey_;//表示已经被删除的keyThreadCachedInt<uint64_t> numEntries_; // Successful key insertsThreadCachedInt<uint64_t> numPendingEntries_; // Used by insertInternalstd::atomic<int64_t> isFull_; // Used by insertInternalstd::atomic<int64_t> numErases_; // Successful key erasesvalue_type cells_[0]; // This must be the last field of this class
cells_是该类最后的成员,&cells 是该数组存元素的 数组指针,具体可以看下面的初始化流程。
AtomicHashMap初始化
看看构造函数
template <typename KeyT,typename ValueT,typename HashFcn,typename EqualFcn,typename Allocator,typename ProbeFcn,typename KeyConvertFcn>
AtomicHashMap<KeyT,ValueT,HashFcn,EqualFcn,Allocator,ProbeFcn,KeyConvertFcn>::AtomicHashMap(size_t finalSizeEst, const Config& config): kGrowthFrac_(config.growthFactor < 0 ? 1.0f - config.maxLoadFactor: config.growthFactor) {CHECK(config.maxLoadFactor > 0.0f && config.maxLoadFactor < 1.0f);subMaps_[0].store(SubMap::create(finalSizeEst, config).release(),std::memory_order_relaxed);auto subMapCount = kNumSubMaps_;FOR_EACH_RANGE (i, 1, subMapCount) {subMaps_[i].store(nullptr, std::memory_order_relaxed);}numMapsAllocated_.store(1, std::memory_order_relaxed);
}
subMap就是AtomicHashArray,构造函数会调用create先创建一个数组,然后将subMaps_其他槽位先置为空指针。
看下create过程
template <class KeyT,class ValueT,class HashFcn,class EqualFcn,class Allocator,class ProbeFcn,class KeyConvertFcn>
typename AtomicHashArray<KeyT,ValueT,HashFcn,EqualFcn,Allocator,ProbeFcn,KeyConvertFcn>::SmartPtr
AtomicHashArray<KeyT,ValueT,HashFcn,EqualFcn,Allocator,ProbeFcn,KeyConvertFcn>::create(size_t maxSize, const Config& c) {CHECK_LE(c.maxLoadFactor, 1.0);CHECK_GT(c.maxLoadFactor, 0.0);CHECK_NE(c.emptyKey, c.lockedKey);size_t capacity = size_t(maxSize / c.maxLoadFactor);size_t sz = sizeof(AtomicHashArray) + sizeof(value_type) * capacity;auto const mem = Allocator().allocate(sz);try {new (mem) AtomicHashArray(capacity,c.emptyKey,c.lockedKey,c.erasedKey,c.maxLoadFactor,c.entryCountThreadCacheSize);} catch (...) {Allocator().deallocate(mem, sz);throw;}SmartPtr map(static_cast<AtomicHashArray*>((void*)mem));FOR_EACH_RANGE (i, 0, map->capacity_) {cellKeyPtr(map->cells_[i])->store(map->kEmptyKey_, std::memory_order_relaxed);}return map;
}
分配内存mem,其大小是sizeof(AtomicHashArray) + sizeof(value_type) * capacity;将new出来的AtomicHashArray 绑定在mem上,然后将数组每个元素置为 kEmptyKey_ 空key.
查找流程
typename AtomicHashMap<KeyT,ValueT,HashFcn,EqualFcn,Allocator,ProbeFcn,KeyConvertFcn>::iterator
AtomicHashMap<KeyT,ValueT,HashFcn,EqualFcn,Allocator,ProbeFcn,KeyConvertFcn>::find(LookupKeyT k) {SimpleRetT ret = findInternal<LookupKeyT, LookupHashFcn, LookupEqualFcn>(k);if (!ret.success) {return end();}SubMap* subMap = subMaps_[ret.i].load(std::memory_order_relaxed);return iterator(this, ret.i, subMap->makeIter(ret.j));
}// findInternal --
template <typename KeyT,typename ValueT,typename HashFcn,typename EqualFcn,typename Allocator,typename ProbeFcn,typename KeyConvertFcn>
template <class LookupKeyT, class LookupHashFcn, class LookupEqualFcn>
typename AtomicHashMap<KeyT,ValueT,HashFcn,EqualFcn,Allocator,ProbeFcn,KeyConvertFcn>::SimpleRetT
AtomicHashMap<KeyT,ValueT,HashFcn,EqualFcn,Allocator,ProbeFcn,KeyConvertFcn>::findInternal(const LookupKeyT k) const {SubMap* const primaryMap = subMaps_[0].load(std::memory_order_relaxed);typename SubMap::SimpleRetT ret =primaryMap->template findInternal<LookupKeyT, LookupHashFcn, LookupEqualFcn>(k);if (LIKELY(ret.idx != primaryMap->capacity_)) {return SimpleRetT(0, ret.idx, ret.success);}const unsigned int numMaps =numMapsAllocated_.load(std::memory_order_acquire);FOR_EACH_RANGE (i, 1, numMaps) {// Check each map successively. If one succeeds, we're done!SubMap* thisMap = subMaps_[i].load(std::memory_order_relaxed);ret =thisMap->template findInternal<LookupKeyT, LookupHashFcn, LookupEqualFcn>(k);if (LIKELY(ret.idx != thisMap->capacity_)) {return SimpleRetT(i, ret.idx, ret.success);}}// Didn't find our key...return SimpleRetT(numMaps, 0, false);
}
可见,挨个执行 submap(即是数组)的findInternal函数,找到key
checkLegalKeyIfKey<LookupKeyT>(key_in);for (size_t idx = keyToAnchorIdx<LookupKeyT, LookupHashFcn>(key_in),numProbes = 0;;idx = ProbeFcn()(idx, numProbes, capacity_)) {const KeyT key = acquireLoadKey(cells_[idx]);if (LIKELY(LookupEqualFcn()(key, key_in))) {return SimpleRetT(idx, true);}if (UNLIKELY(key == kEmptyKey_)) {// if we hit an empty element, this key does not existreturn SimpleRetT(capacity_, false);}// NOTE: the way we count numProbes must be same in find(), insert(),// and erase(). Otherwise it may break probing.++numProbes;if (UNLIKELY(numProbes >= capacity_)) {// probed every cell...failreturn SimpleRetT(capacity_, false);}}
findInternal函数主要是从hash(key) %capacity_位置开始遍历数组,如果找到想等的key,则返回,遍历到空的key 就返回空
注意到,这里有三个函数,LookupHashFcn,ProbeFcn,LookupEqualFcn,这个三个由用户创建hashmap的时候自定义
LookupHashFcn是哈希函数,ProbeFcn是idx跳转函数,目前有2个,一个线性跳转 idx++,一个二次跳转,每次idx+=numProbe跳转。LookupEqualFcn比较key是否想等。
atomicHashmap的value_type必须是std::pair<,>
typedef std::pair<const KeyT, ValueT> value_type;
insert流程
insert调用了emplace函数,再调用insertInternal函数
emplace(LookupKeyT k, ArgTs&&... vCtorArgs) {SimpleRetT ret = insertInternal<LookupKeyT,LookupHashFcn,LookupEqualFcn,LookupKeyToKeyFcn>(k, std::forward<ArgTs>(vCtorArgs)...);SubMap* subMap = subMaps_[ret.i].load(std::memory_order_relaxed);return std::make_pair(iterator(this, ret.i, subMap->makeIter(ret.j)), ret.success);}
尝试在已经分配的每个数组insert,如果Insert的时候最后个数组满了,就要判断能否分配新的数组。
如果可以,有tryLockMap锁住map,tryLockMap是对数组指针CAS操作,比较是不是空值然后赋值上kLockedPtr_,只有一个线程会CAS成功,其他线程进入一个自旋锁,直到新数组分配空间完毕(nextMapIdx==numMapsAllocated_.load)的时候
insertInternal(LookupKeyT key, ArgTs&&... vCtorArgs) {
beginInsertInternal:auto nextMapIdx = // this maintains our statenumMapsAllocated_.load(std::memory_order_acquire);typename SubMap::SimpleRetT ret;FOR_EACH_RANGE (i, 0, nextMapIdx) {// insert in each map successively. If one succeeds, we're done!SubMap* subMap = subMaps_[i].load(std::memory_order_relaxed);ret = subMap->template insertInternal<LookupKeyT,LookupHashFcn,LookupEqualFcn,LookupKeyToKeyFcn>(key, std::forward<ArgTs>(vCtorArgs)...);if (ret.idx == subMap->capacity_) {continue; // map is full, so try the next one}// Either collision or success - insert in either casereturn SimpleRetT(i, ret.idx, ret.success);}
SubMap* primarySubMap = subMaps_[0].load(std::memory_order_relaxed);if (nextMapIdx >= kNumSubMaps_ ||primarySubMap->capacity_ * kGrowthFrac_ < 1.0) {// Can't allocate any more sub maps.throw AtomicHashMapFullError();}if (tryLockMap(nextMapIdx)) {// Alloc a new map and shove it in. We can change whatever// we want because other threads are waiting on us...size_t numCellsAllocated = (size_t)(primarySubMap->capacity_ *std::pow(1.0 + kGrowthFrac_, nextMapIdx - 1));size_t newSize = size_t(numCellsAllocated * kGrowthFrac_);DCHECK(subMaps_[nextMapIdx].load(std::memory_order_relaxed) ==(SubMap*)kLockedPtr_);// create a new map using the settings stored in the first mapConfig config;config.emptyKey = primarySubMap->kEmptyKey_;config.lockedKey = primarySubMap->kLockedKey_;config.erasedKey = primarySubMap->kErasedKey_;config.maxLoadFactor = primarySubMap->maxLoadFactor();config.entryCountThreadCacheSize =primarySubMap->getEntryCountThreadCacheSize();subMaps_[nextMapIdx].store(SubMap::create(newSize, config).release(), std::memory_order_relaxed);// Publish the map to other threads.numMapsAllocated_.fetch_add(1, std::memory_order_release);DCHECK_EQ(nextMapIdx + 1, numMapsAllocated_.load(std::memory_order_relaxed));} else {// If we lost the race, we'll have to wait for the next map to get// allocated before doing any insertion here.detail::atomic_hash_spin_wait([&] {return nextMapIdx >= numMapsAllocated_.load(std::memory_order_acquire);});}// Relaxed is ok here because either we just created this map, or we// just did a spin wait with an acquire load on numMapsAllocated_.SubMap* loadedMap = subMaps_[nextMapIdx].load(std::memory_order_relaxed);DCHECK(loadedMap && loadedMap != (SubMap*)kLockedPtr_);ret = loadedMap->insertInternal(key, std::forward<ArgTs>(vCtorArgs)...);if (ret.idx != loadedMap->capacity_) {return SimpleRetT(nextMapIdx, ret.idx, ret.success);}// We took way too long and the new map is already full...try again from// the top (this should pretty much never happen).goto beginInsertInternal;
}
下面是数组的internal的过程。
insertInternal(LookupKeyT key_in, ArgTs&&... vCtorArgs) {const short NO_NEW_INSERTS = 1;const short NO_PENDING_INSERTS = 2;checkLegalKeyIfKey<LookupKeyT>(key_in);size_t idx = keyToAnchorIdx<LookupKeyT, LookupHashFcn>(key_in);size_t numProbes = 0;for (;;) {DCHECK_LT(idx, capacity_);value_type* cell = &cells_[idx];if (relaxedLoadKey(*cell) == kEmptyKey_) {// NOTE: isFull_ is set based on numEntries_.readFast(), so it's// possible to insert more than maxEntries_ entries. However, it's not// possible to insert past capacity_.++numPendingEntries_;if (isFull_.load(std::memory_order_acquire)) {--numPendingEntries_;// Before deciding whether this insert succeeded, this thread needs to// wait until no other thread can add a new entry.// Correctness assumes isFull_ is true at this point. If// another thread now does ++numPendingEntries_, we expect it// to pass the isFull_.load() test above. (It shouldn't insert// a new entry.)detail::atomic_hash_spin_wait([&] {return (isFull_.load(std::memory_order_acquire) !=NO_PENDING_INSERTS) &&(numPendingEntries_.readFull() != 0);});isFull_.store(NO_PENDING_INSERTS, std::memory_order_release);if (relaxedLoadKey(*cell) == kEmptyKey_) {// Don't insert past max load factorreturn SimpleRetT(capacity_, false);}} else {// An unallocated cell. Try once to lock it. If we succeed, insert here.// If we fail, fall through to comparison below; maybe the insert that// just beat us was for this very key....if (tryLockCell(cell)) {KeyT key_new;// Write the value - done before unlockingtry {key_new = LookupKeyToKeyFcn()(key_in);typedeftypename std::remove_const<LookupKeyT>::type LookupKeyTNoConst;constexpr bool kAlreadyChecked =std::is_same<KeyT, LookupKeyTNoConst>::value;if (!kAlreadyChecked) {checkLegalKeyIfKey(key_new);}DCHECK(relaxedLoadKey(*cell) == kLockedKey_);// A const mapped_type is only constant once constructed, so cast// away any const for the placement new here.using mapped = typename std::remove_const<mapped_type>::type;new (const_cast<mapped*>(&cell->second))ValueT(std::forward<ArgTs>(vCtorArgs)...);unlockCell(cell, key_new); // Sets the new key} catch (...) {// Transition back to empty key---requires handling// locked->empty below.unlockCell(cell, kEmptyKey_);--numPendingEntries_;throw;}// An erase() can race here and delete right after our insertion// Direct comparison rather than EqualFcn ok here// (we just inserted it)DCHECK(relaxedLoadKey(*cell) == key_new ||relaxedLoadKey(*cell) == kErasedKey_);--numPendingEntries_;++numEntries_; // This is a thread cached atomic increment :)if (numEntries_.readFast() >= maxEntries_) {isFull_.store(NO_NEW_INSERTS, std::memory_order_relaxed);}return SimpleRetT(idx, true);}--numPendingEntries_;}}DCHECK(relaxedLoadKey(*cell) != kEmptyKey_);if (kLockedKey_ == acquireLoadKey(*cell)) {detail::atomic_hash_spin_wait([&] { return kLockedKey_ == acquireLoadKey(*cell); });}const KeyT thisKey = acquireLoadKey(*cell);if (LookupEqualFcn()(thisKey, key_in)) {// Found an existing entry for our key, but we don't overwrite the// previous value.return SimpleRetT(idx, false);} else if (thisKey == kEmptyKey_ || thisKey == kLockedKey_) {// We need to try again (i.e., don't increment numProbes or// advance idx): this case can happen if the constructor for// ValueT threw for this very cell (the rethrow block above).continue;}// NOTE: the way we count numProbes must be same in find(),// insert(), and erase(). Otherwise it may break probing.++numProbes;if (UNLIKELY(numProbes >= capacity_)) {// probed every cell...failreturn SimpleRetT(capacity_, false);}idx = ProbeFcn()(idx, numProbes, capacity_);}
}
解决冲突的方法,是线性探测。为啥不用开链法,实现比较复杂,而且线性探测,cache局部性好