序
本文主要研究一下flink的AbstractTtlState
InternalKvState
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/internal/InternalKvState.java
/** * The {@code InternalKvState} is the root of the internal state type hierarchy, similar to the * {@link State} being the root of the public API state hierarchy. * *The internal state classes give access to the namespace getters and setters and access to * additional functionality, like raw value access or state merging. * *
The public API state hierarchy is intended to be programmed against by Flink applications. * The internal state hierarchy holds all the auxiliary methods that are used by the runtime and not * intended to be used by user applications. These internal methods are considered of limited use to users and * only confusing, and are usually not regarded as stable across releases. * *
Each specific type in the internal state hierarchy extends the type from the public * state hierarchy: * *
* State * | * +-------------------InternalKvState * | | * MergingState | * | | * +-----------------InternalMergingState * | | * +--------+------+ | * | | | * ReducingState ListState +-----+-----------------+ * | | | | * +-----------+ +----------- -----------------InternalListState * | | * +---------InternalReducingState ** * @paramThe type of key the state is associated to * @param The type of the namespace * @param The type of values kept internally in state */public interface InternalKvState extends State { TypeSerializer getKeySerializer(); TypeSerializer getNamespaceSerializer(); TypeSerializer getValueSerializer(); void setCurrentNamespace(N namespace); byte[] getSerializedValue( final byte[] serializedKeyAndNamespace, final TypeSerializer safeKeySerializer, final TypeSerializer safeNamespaceSerializer, final TypeSerializer safeValueSerializer) throws Exception;}
- InternalKvState接口定义内部的kvState要实现的方法,这里主要是getKeySerializer、getNamespaceSerializer、getValueSerializer、setCurrentNamespace、getSerializedValue
AbstractTtlState
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ttl/AbstractTtlState.java
/** * Base class for TTL logic wrappers of state objects. * * @paramThe type of key the state is associated to * @param The type of the namespace * @param The type of values kept internally in state without TTL * @param The type of values kept internally in state with TTL * @param Type of originally wrapped state object */abstract class AbstractTtlState> extends AbstractTtlDecorator implements InternalKvState{ private final TypeSerializer valueSerializer; AbstractTtlState(S original, StateTtlConfig config, TtlTimeProvider timeProvider, TypeSerializer valueSerializer) { super(original, config, timeProvider); this.valueSerializer = valueSerializer; } T getWithTtlCheckAndUpdate( SupplierWithException , SE> getter, ThrowingConsumer , CE> updater) throws SE, CE { return getWithTtlCheckAndUpdate(getter, updater, original::clear); } @Override public TypeSerializer getKeySerializer() { return original.getKeySerializer(); } @Override public TypeSerializer getNamespaceSerializer() { return original.getNamespaceSerializer(); } @Override public TypeSerializer getValueSerializer() { return valueSerializer; } @Override public void setCurrentNamespace(N namespace) { original.setCurrentNamespace(namespace); } @Override public byte[] getSerializedValue( byte[] serializedKeyAndNamespace, TypeSerializer safeKeySerializer, TypeSerializer safeNamespaceSerializer, TypeSerializer safeValueSerializer) { throw new FlinkRuntimeException("Queryable state is not currently supported with TTL."); } @Override public void clear() { original.clear(); }}
- AbstractTtlState实现了InternalKvState接口的方法,同时继承了AbstractTtlDecorator;它提供了getWithTtlCheckAndUpdate方法,该方法主要是调用AbstractTtlDecorator的getWithTtlCheckAndUpdate来实现TTL逻辑
AbstractTtlDecorator
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
/** * Base class for TTL logic wrappers. * * @paramType of originally wrapped object */abstract class AbstractTtlDecorator { /** Wrapped original state handler. */ final T original; final StateTtlConfig config; final TtlTimeProvider timeProvider; /** Whether to renew expiration timestamp on state read access. */ final boolean updateTsOnRead; /** Whether to renew expiration timestamp on state read access. */ final boolean returnExpired; /** State value time to live in milliseconds. */ final long ttl; AbstractTtlDecorator( T original, StateTtlConfig config, TtlTimeProvider timeProvider) { Preconditions.checkNotNull(original); Preconditions.checkNotNull(config); Preconditions.checkNotNull(timeProvider); this.original = original; this.config = config; this.timeProvider = timeProvider; this.updateTsOnRead = config.getUpdateType() == StateTtlConfig.UpdateType.OnReadAndWrite; this.returnExpired = config.getStateVisibility() == StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp; this.ttl = config.getTtl().toMilliseconds(); } V getUnexpired(TtlValue ttlValue) { return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); } boolean expired(TtlValue ttlValue) { return TtlUtils.expired(ttlValue, ttl, timeProvider); } TtlValue wrapWithTs(V value) { return TtlUtils.wrapWithTs(value, timeProvider.currentTimestamp()); } TtlValue rewrapWithNewTs(TtlValue ttlValue) { return wrapWithTs(ttlValue.getUserValue()); } V getWithTtlCheckAndUpdate( SupplierWithException , SE> getter, ThrowingConsumer , CE> updater, ThrowingRunnable stateClear) throws SE, CE, CLE { TtlValue ttlValue = getWrappedWithTtlCheckAndUpdate(getter, updater, stateClear); return ttlValue == null ? null : ttlValue.getUserValue(); } TtlValue getWrappedWithTtlCheckAndUpdate( SupplierWithException , SE> getter, ThrowingConsumer , CE> updater, ThrowingRunnable stateClear) throws SE, CE, CLE { TtlValue ttlValue = getter.get(); if (ttlValue == null) { return null; } else if (expired(ttlValue)) { stateClear.run(); if (!returnExpired) { return null; } } else if (updateTsOnRead) { updater.accept(rewrapWithNewTs(ttlValue)); } return ttlValue; }}
- AbstractTtlDecorator对TTL逻辑进行了封装,其主要的逻辑在getWrappedWithTtlCheckAndUpdate方法,它在每次访问的时候对于非null的value会先判断下是否expired(
TtlUtils.expired(ttlValue, ttl, timeProvider)
),如果过期了则调用stateClear(ThrowingRunnable类型,这里是original::clear
),对于非returnExpired的则直接返回null;对于没有expired的,则判断是否updateTsOnRead,若是则调用updater进行处理,最后返回ttlValue
TtlUtils.expired
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ttl/TtlUtils.java
/** Common functions related to State TTL. */class TtlUtils { staticboolean expired(@Nullable TtlValue ttlValue, long ttl, TtlTimeProvider timeProvider) { return expired(ttlValue, ttl, timeProvider.currentTimestamp()); } static boolean expired(@Nullable TtlValue ttlValue, long ttl, long currentTimestamp) { return ttlValue != null && expired(ttlValue.getLastAccessTimestamp(), ttl, currentTimestamp); } private static boolean expired(long ts, long ttl, long currentTimestamp) { return getExpirationTimestamp(ts, ttl) <= currentTimestamp; } private static long getExpirationTimestamp(long ts, long ttl) { long ttlWithoutOverflow = ts > 0 ? Math.min(Long.MAX_VALUE - ts, ttl) : ttl; return ts + ttlWithoutOverflow; } //......}
- TtlUtils的expired方法主要是通过getExpirationTimestamp获取过期时间,然后跟currentTimestamp进行比较;而getExpirationTimestamp这里是根据ttlValue.getLastAccessTimestamp()及ttl值进行判断,这里利用Long.MAX_VALUE处理了overflow的情况,防止最后的值超出long类型的最大范围
ThrowingRunnable
flink-core-1.7.0-sources.jar!/org/apache/flink/util/function/ThrowingRunnable.java
/** * Similar to a {@link Runnable}, this interface is used to capture a block of code * to be executed. In contrast to {@code Runnable}, this interface allows throwing * checked exceptions. */@PublicEvolving@FunctionalInterfacepublic interface ThrowingRunnable{ /** * The work method. * * @throws E Exceptions may be thrown. */ void run() throws E; /** * Converts a {@link ThrowingRunnable} into a {@link Runnable} which throws all checked exceptions * as unchecked. * * @param throwingRunnable to convert into a {@link Runnable} * @return {@link Runnable} which throws all checked exceptions as unchecked. */ static Runnable unchecked(ThrowingRunnable throwingRunnable) { return () -> { try { throwingRunnable.run(); } catch (Throwable t) { ExceptionUtils.rethrow(t); } }; }}
- stateClear是ThrowingRunnable类型,它与Runnable不同,ThrowingRunnable允许抛出checked exceptions,它提供了一个unchecked的静态方法,用于将非Error及非RuntimeException的转为RuntimeException抛出来,从而将ThrowingRunnable转换为Runnable
小结
- InternalKvState接口定义内部的kvState要实现的方法,这里主要是getKeySerializer、getNamespaceSerializer、getValueSerializer、setCurrentNamespace、getSerializedValue
- AbstractTtlState实现了InternalKvState接口的方法,同时继承了AbstractTtlDecorator;它提供了getWithTtlCheckAndUpdate方法,该方法主要是调用AbstractTtlDecorator的getWithTtlCheckAndUpdate来实现TTL逻辑
- AbstractTtlDecorator的getWrappedWithTtlCheckAndUpdate方法,在每次访问的时候对于非null的value会先判断下是否expired(
TtlUtils.expired(ttlValue, ttl, timeProvider)
),如果过期了则调用stateClear(ThrowingRunnable类型,这里是original::clear
),对于非returnExpired的则直接返回null;对于没有expired的,则判断是否updateTsOnRead,若是则调用updater进行处理,最后返回ttlValue