博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink的AbstractTtlState
阅读量:5836 次
发布时间:2019-06-18

本文共 9955 字,大约阅读时间需要 33 分钟。

  hot3.png

本文主要研究一下flink的AbstractTtlState

InternalKvState

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/internal/InternalKvState.java

/** * The {@code InternalKvState} is the root of the internal state type hierarchy, similar to the * {@link State} being the root of the public API state hierarchy. *  * 

The internal state classes give access to the namespace getters and setters and access to * additional functionality, like raw value access or state merging. * *

The public API state hierarchy is intended to be programmed against by Flink applications. * The internal state hierarchy holds all the auxiliary methods that are used by the runtime and not * intended to be used by user applications. These internal methods are considered of limited use to users and * only confusing, and are usually not regarded as stable across releases. * *

Each specific type in the internal state hierarchy extends the type from the public * state hierarchy: * *

 *             State *               | *               +-------------------InternalKvState *               |                         | *          MergingState                   | *               |                         | *               +-----------------InternalMergingState *               |                         | *      +--------+------+                  | *      |               |                  | * ReducingState    ListState        +-----+-----------------+ *      |               |            |                       | *      +-----------+   +-----------   -----------------InternalListState *                  |                | *                  +---------InternalReducingState * 
* * @param
The type of key the state is associated to * @param
The type of the namespace * @param
The type of values kept internally in state */public interface InternalKvState
extends State { TypeSerializer
getKeySerializer(); TypeSerializer
getNamespaceSerializer(); TypeSerializer
getValueSerializer(); void setCurrentNamespace(N namespace); byte[] getSerializedValue( final byte[] serializedKeyAndNamespace, final TypeSerializer
safeKeySerializer, final TypeSerializer
safeNamespaceSerializer, final TypeSerializer
safeValueSerializer) throws Exception;}
  • InternalKvState接口定义内部的kvState要实现的方法,这里主要是getKeySerializer、getNamespaceSerializer、getValueSerializer、setCurrentNamespace、getSerializedValue

AbstractTtlState

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ttl/AbstractTtlState.java

/** * Base class for TTL logic wrappers of state objects. * * @param 
The type of key the state is associated to * @param
The type of the namespace * @param
The type of values kept internally in state without TTL * @param
The type of values kept internally in state with TTL * @param
Type of originally wrapped state object */abstract class AbstractTtlState
> extends AbstractTtlDecorator
implements InternalKvState
{ private final TypeSerializer
valueSerializer; AbstractTtlState(S original, StateTtlConfig config, TtlTimeProvider timeProvider, TypeSerializer
valueSerializer) { super(original, config, timeProvider); this.valueSerializer = valueSerializer; }
T getWithTtlCheckAndUpdate( SupplierWithException
, SE> getter, ThrowingConsumer
, CE> updater) throws SE, CE { return getWithTtlCheckAndUpdate(getter, updater, original::clear); } @Override public TypeSerializer
getKeySerializer() { return original.getKeySerializer(); } @Override public TypeSerializer
getNamespaceSerializer() { return original.getNamespaceSerializer(); } @Override public TypeSerializer
getValueSerializer() { return valueSerializer; } @Override public void setCurrentNamespace(N namespace) { original.setCurrentNamespace(namespace); } @Override public byte[] getSerializedValue( byte[] serializedKeyAndNamespace, TypeSerializer
safeKeySerializer, TypeSerializer
safeNamespaceSerializer, TypeSerializer
safeValueSerializer) { throw new FlinkRuntimeException("Queryable state is not currently supported with TTL."); } @Override public void clear() { original.clear(); }}
  • AbstractTtlState实现了InternalKvState接口的方法,同时继承了AbstractTtlDecorator;它提供了getWithTtlCheckAndUpdate方法,该方法主要是调用AbstractTtlDecorator的getWithTtlCheckAndUpdate来实现TTL逻辑

AbstractTtlDecorator

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java

/** * Base class for TTL logic wrappers. * * @param 
Type of originally wrapped object */abstract class AbstractTtlDecorator
{ /** Wrapped original state handler. */ final T original; final StateTtlConfig config; final TtlTimeProvider timeProvider; /** Whether to renew expiration timestamp on state read access. */ final boolean updateTsOnRead; /** Whether to renew expiration timestamp on state read access. */ final boolean returnExpired; /** State value time to live in milliseconds. */ final long ttl; AbstractTtlDecorator( T original, StateTtlConfig config, TtlTimeProvider timeProvider) { Preconditions.checkNotNull(original); Preconditions.checkNotNull(config); Preconditions.checkNotNull(timeProvider); this.original = original; this.config = config; this.timeProvider = timeProvider; this.updateTsOnRead = config.getUpdateType() == StateTtlConfig.UpdateType.OnReadAndWrite; this.returnExpired = config.getStateVisibility() == StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp; this.ttl = config.getTtl().toMilliseconds(); }
V getUnexpired(TtlValue
ttlValue) { return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); }
boolean expired(TtlValue
ttlValue) { return TtlUtils.expired(ttlValue, ttl, timeProvider); }
TtlValue
wrapWithTs(V value) { return TtlUtils.wrapWithTs(value, timeProvider.currentTimestamp()); }
TtlValue
rewrapWithNewTs(TtlValue
ttlValue) { return wrapWithTs(ttlValue.getUserValue()); }
V getWithTtlCheckAndUpdate( SupplierWithException
, SE> getter, ThrowingConsumer
, CE> updater, ThrowingRunnable
stateClear) throws SE, CE, CLE { TtlValue
ttlValue = getWrappedWithTtlCheckAndUpdate(getter, updater, stateClear); return ttlValue == null ? null : ttlValue.getUserValue(); }
TtlValue
getWrappedWithTtlCheckAndUpdate( SupplierWithException
, SE> getter, ThrowingConsumer
, CE> updater, ThrowingRunnable
stateClear) throws SE, CE, CLE { TtlValue
ttlValue = getter.get(); if (ttlValue == null) { return null; } else if (expired(ttlValue)) { stateClear.run(); if (!returnExpired) { return null; } } else if (updateTsOnRead) { updater.accept(rewrapWithNewTs(ttlValue)); } return ttlValue; }}
  • AbstractTtlDecorator对TTL逻辑进行了封装,其主要的逻辑在getWrappedWithTtlCheckAndUpdate方法,它在每次访问的时候对于非null的value会先判断下是否expired(TtlUtils.expired(ttlValue, ttl, timeProvider)),如果过期了则调用stateClear(ThrowingRunnable类型,这里是original::clear),对于非returnExpired的则直接返回null;对于没有expired的,则判断是否updateTsOnRead,若是则调用updater进行处理,最后返回ttlValue

TtlUtils.expired

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ttl/TtlUtils.java

/** Common functions related to State TTL. */class TtlUtils {	static 
boolean expired(@Nullable TtlValue
ttlValue, long ttl, TtlTimeProvider timeProvider) { return expired(ttlValue, ttl, timeProvider.currentTimestamp()); } static
boolean expired(@Nullable TtlValue
ttlValue, long ttl, long currentTimestamp) { return ttlValue != null && expired(ttlValue.getLastAccessTimestamp(), ttl, currentTimestamp); } private static boolean expired(long ts, long ttl, long currentTimestamp) { return getExpirationTimestamp(ts, ttl) <= currentTimestamp; } private static long getExpirationTimestamp(long ts, long ttl) { long ttlWithoutOverflow = ts > 0 ? Math.min(Long.MAX_VALUE - ts, ttl) : ttl; return ts + ttlWithoutOverflow; } //......}
  • TtlUtils的expired方法主要是通过getExpirationTimestamp获取过期时间,然后跟currentTimestamp进行比较;而getExpirationTimestamp这里是根据ttlValue.getLastAccessTimestamp()及ttl值进行判断,这里利用Long.MAX_VALUE处理了overflow的情况,防止最后的值超出long类型的最大范围

ThrowingRunnable

flink-core-1.7.0-sources.jar!/org/apache/flink/util/function/ThrowingRunnable.java

/** * Similar to a {@link Runnable}, this interface is used to capture a block of code * to be executed. In contrast to {@code Runnable}, this interface allows throwing * checked exceptions. */@PublicEvolving@FunctionalInterfacepublic interface ThrowingRunnable
{ /** * The work method. * * @throws E Exceptions may be thrown. */ void run() throws E; /** * Converts a {@link ThrowingRunnable} into a {@link Runnable} which throws all checked exceptions * as unchecked. * * @param throwingRunnable to convert into a {@link Runnable} * @return {@link Runnable} which throws all checked exceptions as unchecked. */ static Runnable unchecked(ThrowingRunnable
throwingRunnable) { return () -> { try { throwingRunnable.run(); } catch (Throwable t) { ExceptionUtils.rethrow(t); } }; }}
  • stateClear是ThrowingRunnable类型,它与Runnable不同,ThrowingRunnable允许抛出checked exceptions,它提供了一个unchecked的静态方法,用于将非Error及非RuntimeException的转为RuntimeException抛出来,从而将ThrowingRunnable转换为Runnable

小结

  • InternalKvState接口定义内部的kvState要实现的方法,这里主要是getKeySerializer、getNamespaceSerializer、getValueSerializer、setCurrentNamespace、getSerializedValue
  • AbstractTtlState实现了InternalKvState接口的方法,同时继承了AbstractTtlDecorator;它提供了getWithTtlCheckAndUpdate方法,该方法主要是调用AbstractTtlDecorator的getWithTtlCheckAndUpdate来实现TTL逻辑
  • AbstractTtlDecorator的getWrappedWithTtlCheckAndUpdate方法,在每次访问的时候对于非null的value会先判断下是否expired(TtlUtils.expired(ttlValue, ttl, timeProvider)),如果过期了则调用stateClear(ThrowingRunnable类型,这里是original::clear),对于非returnExpired的则直接返回null;对于没有expired的,则判断是否updateTsOnRead,若是则调用updater进行处理,最后返回ttlValue

doc

转载于:https://my.oschina.net/go4it/blog/2992848

你可能感兴趣的文章
log4j日志归档
查看>>
Java笔记01——IO流
查看>>
mysql遇见error,1049
查看>>
NYOJ311 完全背包
查看>>
shp格式数据发布服务:postGIS + postgresql + geoserver
查看>>
codevs——2822 爱在心中
查看>>
Python基础班---第一部分(基础)---Python基础知识---认识Python
查看>>
JAVA MAC 配置
查看>>
1134 最长上升子序列 (序列型 DP)
查看>>
js冒泡排序
查看>>
第一次作业 4班卢炳武
查看>>
const int * 与 int *const
查看>>
抽象类的调用
查看>>
libjpeg.a exists or that its path is correct
查看>>
使用硬盘,安装双系统,Win7+CentOS
查看>>
Javascript学习总结
查看>>
快速安装infobright
查看>>
JS 操作Excel格式
查看>>
php 用正则替换中文字符一系列问题解决
查看>>
ActiveMQ应用笔记一:基本概念&安装
查看>>