目录
基于状态后端
1.MemoryStateBackend
2.FsStateBackend
3.RocksDBStateBackend
基于HyperLogLog
基于布隆过滤器(BloomFilter)
基于BitMap
基于状态后端
默认flink状态分为3种。
1.MemoryStateBackend
(默认,内存)
2.FsStateBackend
(正在进行的数据保存在TaskManager内存中,checkpoint时会将状态快照写入checkpoint文件中。)适合用于生产环境
3.RocksDBStateBackend
(唯一支持增量checkpoint,正在进行的数据保存在rockdb数据库中,默认数据存储在taskManager运行节点的数据目录下)适合用于生产环境
示例代码
public class MapStateDistinctFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>> {private transient ValueState<Integer> counts;@Overridepublic void open(Configuration parameters) throws Exception {//设置ValueState的TTL生命周期为24小时,到期自动清除状态final StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.minutes(24 * 60)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();//设置ValueState的默认值final ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>("skuNum", Integer.class);descriptor.enableTimeToLive(ttlConfig);counts = getRuntimeContext().getState(descriptor);super.open(parameters);}@Overridepublic void processElement(Tuple2<String, Integer> value, Context context, Collector<Tuple2<String, Integer>> out) throws Exception {final String f0 = value.f0;//如果不存在则新增if(counts.value() == null){counts.update(1);}else{//如果存在则加1counts.update(counts.value()+1);}out.collect(Tuple2.of(f0,counts.value()));}
}
基于HyperLogLog
是一种估计统计算法,被用来统计一个集合中不同数据的个数,在不需要100%精确的业务场景下,可以使用这种方法来进行统计。
<dependency><groupId>net.agkn</groupId><artifactId>hll</artifactId><version>1.6.0</version>
</dependency>
示例代码
public class HyperLogLogDistinct implements AggregateFunction<Tuple2<String, Long>, HLL, Long> {@Overridepublic HLL createAccumulator() {return new HLL(14, 5);}@Overridepublic HLL add(Tuple2<String, Long> value, HLL accumulator) {//value为访问记录<商品,id>//向HyperLogLog中插入元素accumulator.addRaw(value.f1);return accumulator;}@Overridepublic Long getResult(HLL accumulator) {//计算HyperLogLog中元素的基数final long cardinality = accumulator.cardinality();return cardinality;}@Overridepublic HLL merge(HLL a, HLL b) {a.union(b);return a;}
}
基于布隆过滤器(BloomFilter)
类似于HashSet,用于快速判断某个元素是否在于集合中,和HyperLogLog一样,不能保证100%精确,但是插入和查询效率都很高.
示例代码
public class BloomFilterDistinct extends KeyedProcessFunction<Long, String, Long> {//import org.apache.flink.shaded.guava18.com.google.common.hash.BloomFilter;private transient ValueState<BloomFilter> bloomState;private transient ValueState<Long> countState;@Overridepublic void processElement(String value, Context context, Collector<Long> out) throws Exception {final BloomFilter bloomFilter = bloomState.value();Long skuCount = countState.value();if (skuCount == null) {skuCount = 0L;}if (!bloomFilter.mightContain(value)) {bloomFilter.put(value);skuCount = skuCount + 1;}bloomState.update(bloomFilter);countState.update(skuCount);out.collect(countState.value());}
}
基于BitMap
不仅可以减少存储,而且还可以做到完全准确
<dependency><groupId>org.roaringbitmap</groupId><artifactId>RoaringBitmap</artifactId><version>0.8.0</version>
</dependency>
示例代码
public class BitMapDistinct implements AggregateFunction<Long, Roaring64NavigableMap,Long> {@Overridepublic Roaring64NavigableMap createAccumulator() {return new Roaring64NavigableMap();}@Overridepublic Roaring64NavigableMap add(Long value, Roaring64NavigableMap accumulator) {accumulator.add(value);return accumulator;}@Overridepublic Long getResult(Roaring64NavigableMap accumulator) {return accumulator.getLongCardinality();}@Overridepublic Roaring64NavigableMap merge(Roaring64NavigableMap a, Roaring64NavigableMap b) {return null;}
}