序
本文主要研究一下flink的SourceFunction
实例
// set up the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSourcedataStreamSource = env.addSource(new RandomWordSource()); dataStreamSource.map(new UpperCaseMapFunc()).print(); env.execute("sourceFunctionDemo");
- 这里通过addSource方法来添加自定义的SourceFunction
SourceFunction
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.java
/** * Base interface for all stream data sources in Flink. The contract of a stream source * is the following: When the source should start emitting elements, the {@link #run} method * is called with a {@link SourceContext} that can be used for emitting elements. * The run method can run for as long as necessary. The source must, however, react to an * invocation of {@link #cancel()} by breaking out of its main loop. * *CheckpointedFunction Sources
* *Sources that also implement the {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction} * interface must ensure that state checkpointing, updating of internal state and emission of * elements are not done concurrently. This is achieved by using the provided checkpointing lock * object to protect update of state and emission of elements in a synchronized block. * *
This is the basic pattern one should follow when implementing a checkpointed source: * *
{@code * public class ExampleCountSource implements SourceFunction* * *, CheckpointedFunction { * private long count = 0L; * private volatile boolean isRunning = true; * * private transient ListState checkpointedCount; * * public void run(SourceContext ctx) { * while (isRunning && count < 1000) { * // this synchronized block ensures that state checkpointing, * // internal state updates and emission of elements are an atomic operation * synchronized (ctx.getCheckpointLock()) { * ctx.collect(count); * count++; * } * } * } * * public void cancel() { * isRunning = false; * } * * public void initializeState(FunctionInitializationContext context) { * this.checkpointedCount = context * .getOperatorStateStore() * .getListState(new ListStateDescriptor<>("count", Long.class)); * * if (context.isRestored()) { * for (Long count : this.checkpointedCount.get()) { * this.count = count; * } * } * } * * public void snapshotState(FunctionSnapshotContext context) { * this.checkpointedCount.clear(); * this.checkpointedCount.add(count); * } * } * } Timestamps and watermarks:
* Sources may assign timestamps to elements and may manually emit watermarks. * However, these are only interpreted if the streaming program runs on * {@link TimeCharacteristic#EventTime}. On other time characteristics * ({@link TimeCharacteristic#IngestionTime} and {@link TimeCharacteristic#ProcessingTime}), * the watermarks from the source function are ignored. * *Gracefully Stopping Functions
* Functions may additionally implement the {@link org.apache.flink.api.common.functions.StoppableFunction} * interface. "Stopping" a function, in contrast to "canceling" means a graceful exit that leaves the * state and the emitted elements in a consistent state. * *When a source is stopped, the executing thread is not interrupted, but expected to leave the * {@link #run(SourceContext)} method in reasonable time on its own, preserving the atomicity * of state updates and element emission. * * @param
The type of the elements produced by this source. * * @see org.apache.flink.api.common.functions.StoppableFunction * @see org.apache.flink.streaming.api.TimeCharacteristic */@Publicpublic interface SourceFunction extends Function, Serializable { /** * Starts the source. Implementations can use the {@link SourceContext} emit * elements. * * Sources that implement {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction} * must lock on the checkpoint lock (using a synchronized block) before updating internal * state and emitting elements, to make both an atomic operation: * *
{@code * public class ExampleCountSource implements SourceFunction* * @param ctx The context to emit elements to and for accessing locks. */ void run(SourceContext, CheckpointedFunction { * private long count = 0L; * private volatile boolean isRunning = true; * * private transient ListState checkpointedCount; * * public void run(SourceContext ctx) { * while (isRunning && count < 1000) { * // this synchronized block ensures that state checkpointing, * // internal state updates and emission of elements are an atomic operation * synchronized (ctx.getCheckpointLock()) { * ctx.collect(count); * count++; * } * } * } * * public void cancel() { * isRunning = false; * } * * public void initializeState(FunctionInitializationContext context) { * this.checkpointedCount = context * .getOperatorStateStore() * .getListState(new ListStateDescriptor<>("count", Long.class)); * * if (context.isRestored()) { * for (Long count : this.checkpointedCount.get()) { * this.count = count; * } * } * } * * public void snapshotState(FunctionSnapshotContext context) { * this.checkpointedCount.clear(); * this.checkpointedCount.add(count); * } * } * } ctx) throws Exception; /** * Cancels the source. Most sources will have a while loop inside the * {@link #run(SourceContext)} method. The implementation needs to ensure that the * source will break out of that loop after this method is called. * * A typical pattern is to have an {@code "volatile boolean isRunning"} flag that is set to * {@code false} in this method. That flag is checked in the loop condition. * *
When a source is canceled, the executing thread will also be interrupted * (via {@link Thread#interrupt()}). The interruption happens strictly after this * method has been called, so any interruption handler can rely on the fact that * this method has completed. It is good practice to make any flags altered by * this method "volatile", in order to guarantee the visibility of the effects of * this method to any interruption handler. */ void cancel(); // ------------------------------------------------------------------------ // source context // ------------------------------------------------------------------------ /** * Interface that source functions use to emit elements, and possibly watermarks. * * @param
The type of the elements produced by the source. */ @Public // Interface might be extended in the future with additional methods. interface SourceContext { //...... }}
- SourceFunction是flink stream data sources的基本接口,这里头定义了run方法以及cancel方法,同时定义了SourceContext接口
SourceContext
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.java
/** * Interface that source functions use to emit elements, and possibly watermarks. * * @paramThe type of the elements produced by the source. */ @Public // Interface might be extended in the future with additional methods. interface SourceContext { /** * Emits one element from the source, without attaching a timestamp. In most cases, * this is the default way of emitting elements. * * The timestamp that the element will get assigned depends on the time characteristic of * the streaming program: *
*
* * @param element The element to emit */ void collect(T element); /** * Emits one element from the source, and attaches the given timestamp. This method * is relevant for programs using {@link TimeCharacteristic#EventTime}, where the * sources assign timestamps themselves, rather than relying on a {@link TimestampAssigner} * on the stream. * *- On {@link TimeCharacteristic#ProcessingTime}, the element has no timestamp.
*- On {@link TimeCharacteristic#IngestionTime}, the element gets the system's * current time as the timestamp.
*- On {@link TimeCharacteristic#EventTime}, the element will have no timestamp initially. * It needs to get a timestamp (via a {@link TimestampAssigner}) before any time-dependent * operation (like time windows).
*On certain time characteristics, this timestamp may be ignored or overwritten. * This allows programs to switch between the different time characteristics and behaviors * without changing the code of the source functions. *
*
* * @param element The element to emit * @param timestamp The timestamp in milliseconds since the Epoch */ @PublicEvolving void collectWithTimestamp(T element, long timestamp); /** * Emits the given {@link Watermark}. A Watermark of value {@code t} declares that no * elements with a timestamp {@code t' <= t} will occur any more. If further such * elements will be emitted, those elements are considered late. * *- On {@link TimeCharacteristic#ProcessingTime}, the timestamp will be ignored, * because processing time never works with element timestamps.
*- On {@link TimeCharacteristic#IngestionTime}, the timestamp is overwritten with the * system's current time, to realize proper ingestion time semantics.
*- On {@link TimeCharacteristic#EventTime}, the timestamp will be used.
*This method is only relevant when running on {@link TimeCharacteristic#EventTime}. * On {@link TimeCharacteristic#ProcessingTime},Watermarks will be ignored. On * {@link TimeCharacteristic#IngestionTime}, the Watermarks will be replaced by the * automatic ingestion time watermarks. * * @param mark The Watermark to emit */ @PublicEvolving void emitWatermark(Watermark mark); /** * Marks the source to be temporarily idle. This tells the system that this source will * temporarily stop emitting records and watermarks for an indefinite amount of time. This * is only relevant when running on {@link TimeCharacteristic#IngestionTime} and * {@link TimeCharacteristic#EventTime}, allowing downstream tasks to advance their * watermarks without the need to wait for watermarks from this source while it is idle. * *
Source functions should make a best effort to call this method as soon as they * acknowledge themselves to be idle. The system will consider the source to resume activity * again once {@link SourceContext#collect(T)}, {@link SourceContext#collectWithTimestamp(T, long)}, * or {@link SourceContext#emitWatermark(Watermark)} is called to emit elements or watermarks from the source. */ @PublicEvolving void markAsTemporarilyIdle(); /** * Returns the checkpoint lock. Please refer to the class-level comment in * {@link SourceFunction} for details about how to write a consistent checkpointed * source. * * @return The object to use as the lock */ Object getCheckpointLock(); /** * This method is called by the system to shut down the context. */ void close(); }
- SourceContext主要定义了数据源发射数据的接口,这里是collect方法(
如果数据本身没有时间,则在使用TimeCharacteristic.EventTime的时候,可以使用TimestampAssigner在进行依赖时间的相关操作时指定timestamp;如果是配合TimeCharacteristic.IngestionTime,则无需指定,系统会自动生成timestamp
);除了collect方法外,还有collectWithTimestamp发射数据同时指定timestamp(配合TimeCharacteristic.EventTime使用
) - 此外还定义了emitWatermark方法,用于处理数据乱序时,只考虑哪些时间范围内的数据,这个只有在配合TimeCharacteristic.EventTime的时候才有效;如果是TimeCharacteristic.ProcessingTime则watermark会被忽略;如果是TimeCharacteristic.IngestionTime则watermark会被自动生成的ingestion time watermarks替代
- 这里还定义了markAsTemporarilyIdle方法,用于告诉系统当前的source会暂停发射数据一段时间,这个只在配合使用TimeCharacteristic.IngestionTime或者TimeCharacteristic.EventTime的时候才有效;当SourceContext.collect(T)或者SourceContext.collectWithTimestamp(T, long)或者SourceContext.emitWatermark(Watermark)被调用时,系统会认为source又恢复回来继续生产数据
- 这里还定义了getCheckpointLock方法,用于返回checkpoint的lock,方便source处理checkpoint相关的逻辑
- close方法主要给系统来调用,用于关闭context相关的资源
Task.run(上游
)
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java
/** * The Task represents one execution of a parallel subtask on a TaskManager. * A Task wraps a Flink operator (which may be a user function) and * runs it, providing all services necessary for example to consume input data, * produce its results (intermediate result partitions) and communicate * with the JobManager. * *The Flink operators (implemented as subclasses of * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks. * The task connects those to the network stack and actor messages, and tracks the state * of the execution and handles exceptions. * *
Tasks have no knowledge about how they relate to other tasks, or whether they * are the first attempt to execute the task, or a repeated attempt. All of that * is only known to the JobManager. All the task knows are its own runnable code, * the task's configuration, and the IDs of the intermediate results to consume and * produce (if any). * *
Each Task is run by one dedicated thread. */public class Task implements Runnable, TaskActions, CheckpointListener { //...... /** * The core work method that bootstraps the task and executes its code. */ @Override public void run() { //...... // now load and instantiate the task's invokable code invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env); // ---------------------------------------------------------------- // actual task core work // ---------------------------------------------------------------- // we must make strictly sure that the invokable is accessible to the cancel() call // by the time we switched to running. this.invokable = invokable; // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { throw new CancelTaskException(); } // notify everyone that we switched to running notifyObservers(ExecutionState.RUNNING, null); taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)); // make sure the user code classloader is accessible thread-locally executingThread.setContextClassLoader(userCodeClassLoader); // run the invokable invokable.invoke(); //...... }}
- Task的run方法会调用invokable.invoke(),这里的invokable为StreamTask
StreamTask.invoke
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java
/** * Base class for all streaming tasks. A task is the unit of local processing that is deployed * and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form * the Task's operator chain. Operators that are chained together execute synchronously in the * same thread and hence on the same stream partition. A common case for these chains * are successive map/flatmap/filter tasks. * *The task chain contains one "head" operator and multiple chained operators. * The StreamTask is specialized for the type of the head operator: one-input and two-input tasks, * as well as for sources, iteration heads and iteration tails. * *
The Task class deals with the setup of the streams read by the head operator, and the streams * produced by the operators at the ends of the operator chain. Note that the chain may fork and * thus have multiple ends. * *
The life cycle of the task is set up as follows: *
{@code * -- setInitialState -> provides state of all operators in the chain * * -- invoke() * | * +----> Create basic utils (config, etc) and load the chain of operators * +----> operators.setup() * +----> task specific init() * +----> initialize-operator-states() * +----> open-operators() * +----> run() * +----> close-operators() * +----> dispose-operators() * +----> common cleanup * +----> task specific cleanup() * }* *The {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a * {@code StreamOperator} must be synchronized on this lock object to ensure that no methods * are called concurrently. * * @param
* @param */@Internalpublic abstract class StreamTask > extends AbstractInvokable implements AsyncExceptionHandler { //...... @Override public final void invoke() throws Exception { boolean disposed = false; try { //...... // let the task do its work isRunning = true; run(); // if this left the run() method cleanly despite the fact that this was canceled, // make sure the "clean shutdown" is not attempted if (canceled) { throw new CancelTaskException(); } LOG.debug("Finished task {}", getName()); //...... } finally { // clean up everything we initialized isRunning = false; //...... } }}
- StreamTask的invoke方法里头调用了子类的run方法,这里子类为SourceStreamTask
SourceStreamTask.run
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@Override protected void run() throws Exception { headOperator.run(getCheckpointLock(), getStreamStatusMaintainer()); }
- SourceStreamTask的run方法主要调用headOperator的run方法,这里的headOperator为SourceStream
SourceStream.run
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java
public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception { run(lockingObject, streamStatusMaintainer, output); } public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer, final Output> collector) throws Exception { final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic(); final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(); final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured() ? getExecutionConfig().getLatencyTrackingInterval() : configuration.getLong(MetricOptions.LATENCY_INTERVAL); LatencyMarksEmitter latencyEmitter = null; if (latencyTrackingInterval > 0) { latencyEmitter = new LatencyMarksEmitter<>( getProcessingTimeService(), collector, latencyTrackingInterval, this.getOperatorID(), getRuntimeContext().getIndexOfThisSubtask()); } final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(); this.ctx = StreamSourceContexts.getSourceContext( timeCharacteristic, getProcessingTimeService(), lockingObject, streamStatusMaintainer, collector, watermarkInterval, -1); try { userFunction.run(ctx); // if we get here, then the user function either exited after being done (finite source) // or the function was canceled or stopped. For the finite source case, we should emit // a final watermark that indicates that we reached the end of event-time if (!isCanceledOrStopped()) { ctx.emitWatermark(Watermark.MAX_WATERMARK); } } finally { // make sure that the context is closed in any case ctx.close(); if (latencyEmitter != null) { latencyEmitter.close(); } } }
- SourceStream的run方法,这里先通过StreamSourceContexts.getSourceContext构造SourceFunction.SourceContext,然后调用userFunction的run方法,这里的userFunction为RandomWordSource,即用户自定义的SourceFunction(
这里要注意在调用userFunction.run(ctx)之前,如果latencyTrackingInterval大于0,还创建了LatencyMarksEmitter
)
RandomWordSource.run
public class RandomWordSource implements SourceFunction{ private static final Logger LOGGER = LoggerFactory.getLogger(RandomWordSource.class); private volatile boolean isRunning = true; private static final String[] words = new String[]{"The", "brown", "fox", "quick", "jump", "sucky", "5dolla"}; @Override public void run(SourceContext ctx) throws Exception { while (isRunning) { Thread.sleep(300); int rnd = (int) (Math.random() * 10 % words.length); LOGGER.info("emit word: {}", words[rnd]); ctx.collect(words[rnd]); } } @Override public void cancel() { isRunning = false; }}
- RandomWordSource的run方法会一直循环发射数据
StreamSource.LatencyMarksEmitter
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java
private static class LatencyMarksEmitter{ private final ScheduledFuture latencyMarkTimer; public LatencyMarksEmitter( final ProcessingTimeService processingTimeService, final Output > output, long latencyTrackingInterval, final OperatorID operatorId, final int subtaskIndex) { latencyMarkTimer = processingTimeService.scheduleAtFixedRate( new ProcessingTimeCallback() { @Override public void onProcessingTime(long timestamp) throws Exception { try { // ProcessingTimeService callbacks are executed under the checkpointing lock output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex)); } catch (Throwable t) { // we catch the Throwables here so that we don't trigger the processing // timer services async exception handler LOG.warn("Error while emitting latency marker.", t); } } }, 0L, latencyTrackingInterval); } public void close() { latencyMarkTimer.cancel(true); } }
- LatencyMarksEmitter是在StreamSource的run方法里头,调用userFunction的run方法前创建的(
如果latencyTrackingInterval>0的话
),这里的latencyTrackingInterval先调用getExecutionConfig().isLatencyTrackingConfigured()判断executionConfig是否有配置该值,有配置的话则使用getExecutionConfig().getLatencyTrackingInterval()返回的值,没有配置的话则使用configuration.getLong(MetricOptions.LATENCY_INTERVAL)返回的值,后者默认是2000L(这里使用的是后者的配置,即为2000
) - LatencyMarksEmitter的构造器里头调用processingTimeService.scheduleAtFixedRate方法注册了一个fixedRate的定时任务,调度间隔为latencyTrackingInterval
- 定时任务的处理内容在ProcessingTimeCallback的onProcessTime方法,里头调用了output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex))来发送LatencyMarker;这里的processingTimeService为SystemProcessingTimeService;这里的output为AbstractStreamOperator.CountingOutput
SystemProcessingTimeService.scheduleAtFixedRate
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@Override public ScheduledFuture scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period) { long nextTimestamp = getCurrentProcessingTime() + initialDelay; // we directly try to register the timer and only react to the status on exception // that way we save unnecessary volatile accesses for each timer try { return timerService.scheduleAtFixedRate( new RepeatedTriggerTask(status, task, checkpointLock, callback, nextTimestamp, period), initialDelay, period, TimeUnit.MILLISECONDS); } catch (RejectedExecutionException e) { final int status = this.status.get(); if (status == STATUS_QUIESCED) { return new NeverCompleteFuture(initialDelay); } else if (status == STATUS_SHUTDOWN) { throw new IllegalStateException("Timer service is shut down"); } else { // something else happened, so propagate the exception throw e; } } } @Override public long getCurrentProcessingTime() { return System.currentTimeMillis(); }
- SystemProcessingTimeService的scheduleAtFixedRate方法,实际是委托timerService的scheduleAtFixedRate来执行的,这里的timerService即ScheduledThreadPoolExecutor,它的corePoolSize为1,然后它调度的任务是RepeatedTriggerTask
RepeatedTriggerTask
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
/** * Internal task which is repeatedly called by the processing time service. */ private static final class RepeatedTriggerTask implements Runnable { private final AtomicInteger serviceStatus; private final Object lock; private final ProcessingTimeCallback target; private final long period; private final AsyncExceptionHandler exceptionHandler; private long nextTimestamp; private RepeatedTriggerTask( final AtomicInteger serviceStatus, final AsyncExceptionHandler exceptionHandler, final Object lock, final ProcessingTimeCallback target, final long nextTimestamp, final long period) { this.serviceStatus = Preconditions.checkNotNull(serviceStatus); this.lock = Preconditions.checkNotNull(lock); this.target = Preconditions.checkNotNull(target); this.period = period; this.exceptionHandler = Preconditions.checkNotNull(exceptionHandler); this.nextTimestamp = nextTimestamp; } @Override public void run() { synchronized (lock) { try { if (serviceStatus.get() == STATUS_ALIVE) { target.onProcessingTime(nextTimestamp); } nextTimestamp += period; } catch (Throwable t) { TimerException asyncException = new TimerException(t); exceptionHandler.handleAsyncException("Caught exception while processing repeated timer task.", asyncException); } } } }
- RepeatedTriggerTask会在serviceStatus为STATUS_ALIVE的时候,调用ProcessingTimeCallback的onProcessingTime;这里的nextTimestamp最初传进来的是依据getCurrentProcessingTime() + initialDelay来算的,之后不断累加period
AbstractStreamOperator.CountingOutput.emitLatencyMarker
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
/** * Wrapping {@link Output} that updates metrics on the number of emitted elements. */ public static class CountingOutputimplements Output > { private final Output > output; private final Counter numRecordsOut; public CountingOutput(Output > output, Counter counter) { this.output = output; this.numRecordsOut = counter; } @Override public void emitWatermark(Watermark mark) { output.emitWatermark(mark); } @Override public void emitLatencyMarker(LatencyMarker latencyMarker) { output.emitLatencyMarker(latencyMarker); } @Override public void collect(StreamRecord record) { numRecordsOut.inc(); output.collect(record); } @Override public void collect(OutputTag outputTag, StreamRecord record) { numRecordsOut.inc(); output.collect(outputTag, record); } @Override public void close() { output.close(); } }
- 它实际包装的是RecordWriterOutput
RecordWriterOutput.emitLatencyMarker
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
/** * Implementation of {@link Output} that sends data using a {@link RecordWriter}. */@Internalpublic class RecordWriterOutputimplements OperatorChain.WatermarkGaugeExposingOutput > { private StreamRecordWriter > recordWriter; private SerializationDelegate serializationDelegate; //...... @Override public void emitLatencyMarker(LatencyMarker latencyMarker) { serializationDelegate.setInstance(latencyMarker); try { recordWriter.randomEmit(serializationDelegate); } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } }}
- 这里的emitLatencyMarker主要调用了StreamRecordWriter的randomEmit(
它实际上是通过父类RecordWriter来发射
),来发射LatencyMarker
RecordWriter
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
/** * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { sendToTarget(record, rng.nextInt(numChannels)); } private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { RecordSerializerserializer = serializers[targetChannel]; SerializationResult result = serializer.addRecord(record); while (result.isFullBuffer()) { if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { // If this was a full record, we are done. Not breaking // out of the loop at this point will lead to another // buffer request before breaking out (that would not be // a problem per se, but it can lead to stalls in the // pipeline). if (result.isFullRecord()) { break; } } BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel); result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder); } checkState(!serializer.hasSerializedData(), "All data should be written at once"); if (flushAlways) { targetPartition.flush(targetChannel); } }
- RecordWriter的randomEmit就是随机选择一个targetChannel,然后进行发送
Task.run(下游
)
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java
/** * The Task represents one execution of a parallel subtask on a TaskManager. * A Task wraps a Flink operator (which may be a user function) and * runs it, providing all services necessary for example to consume input data, * produce its results (intermediate result partitions) and communicate * with the JobManager. * *The Flink operators (implemented as subclasses of * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks. * The task connects those to the network stack and actor messages, and tracks the state * of the execution and handles exceptions. * *
Tasks have no knowledge about how they relate to other tasks, or whether they * are the first attempt to execute the task, or a repeated attempt. All of that * is only known to the JobManager. All the task knows are its own runnable code, * the task's configuration, and the IDs of the intermediate results to consume and * produce (if any). * *
Each Task is run by one dedicated thread. */public class Task implements Runnable, TaskActions, CheckpointListener { //...... /** * The core work method that bootstraps the task and executes its code. */ @Override public void run() { //...... // now load and instantiate the task's invokable code invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env); // ---------------------------------------------------------------- // actual task core work // ---------------------------------------------------------------- // we must make strictly sure that the invokable is accessible to the cancel() call // by the time we switched to running. this.invokable = invokable; // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { throw new CancelTaskException(); } // notify everyone that we switched to running notifyObservers(ExecutionState.RUNNING, null); taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)); // make sure the user code classloader is accessible thread-locally executingThread.setContextClassLoader(userCodeClassLoader); // run the invokable invokable.invoke(); //...... }}
- 下游的Task的run方法会调用invokable.invoke(),这里的invokable为OneInputStreamTask
OneInputStreamTask
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@Override protected void run() throws Exception { // cache processor reference on the stack, to make the code more JIT friendly final StreamInputProcessorinputProcessor = this.inputProcessor; while (running && inputProcessor.processInput()) { // all the work happens in the "processInput" method } }
- Task的run方法会调用StreamTask的invoke方法,而invoke方法会调用OneInputStreamTask的run方法这里主要是不断循环调用inputProcessor.processInput();这里的inputProcessor为StreamInputProcessor
StreamInputProcessor
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
public boolean processInput() throws Exception { if (isFinished) { return false; } if (numRecordsIn == null) { try { numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); } catch (Exception e) { LOG.warn("An exception occurred during the metrics setup.", e); numRecordsIn = new SimpleCounter(); } } while (true) { if (currentRecordDeserializer != null) { DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate); if (result.isBufferConsumed()) { currentRecordDeserializer.getCurrentBuffer().recycleBuffer(); currentRecordDeserializer = null; } if (result.isFullRecord()) { StreamElement recordOrMark = deserializationDelegate.getInstance(); if (recordOrMark.isWatermark()) { // handle watermark statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel); continue; } else if (recordOrMark.isStreamStatus()) { // handle stream status statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel); continue; } else if (recordOrMark.isLatencyMarker()) { // handle latency marker synchronized (lock) { streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker()); } continue; } else { // now we can do the actual processing StreamRecordrecord = recordOrMark.asRecord(); synchronized (lock) { numRecordsIn.inc(); streamOperator.setKeyContextElement1(record); streamOperator.processElement(record); } return true; } } } final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked(); if (bufferOrEvent != null) { if (bufferOrEvent.isBuffer()) { currentChannel = bufferOrEvent.getChannelIndex(); currentRecordDeserializer = recordDeserializers[currentChannel]; currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer()); } else { // Event received final AbstractEvent event = bufferOrEvent.getEvent(); if (event.getClass() != EndOfPartitionEvent.class) { throw new IOException("Unexpected event: " + event); } } } else { isFinished = true; if (!barrierHandler.isEmpty()) { throw new IllegalStateException("Trailing data in checkpoint barrier handler."); } return false; } } }
- processInput方法首先调用currentRecordDeserializer.getNextRecord(deserializationDelegate)获取nextRecord,然后只有当result.isFullRecord()的时候才进行处理
- 处理的时候会根据StreamElement的不同类型进行不同处理,主要分为watermark、streamStatus、latencyMakrker及正常的数据这几类来处理
- 如果是正常的数据,则调用streamOperator.processElement(record),这里的streamOperator为StreamMap
StreamMap.processElement
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamMap.java
/** * A {@link StreamOperator} for executing {@link MapFunction MapFunctions}. */@Internalpublic class StreamMapextends AbstractUdfStreamOperator > implements OneInputStreamOperator { private static final long serialVersionUID = 1L; public StreamMap(MapFunction mapper) { super(mapper); chainingStrategy = ChainingStrategy.ALWAYS; } @Override public void processElement(StreamRecord element) throws Exception { output.collect(element.replace(userFunction.map(element.getValue()))); }}
- 这里调用了userFunction.map(element.getValue())来进行map操作,这里的userFunction即为UpperCaseMapFunc
小结
- SourceFunction是flink stream data sources的基本接口,这里头定义了run方法以及cancel方法,同时定义了SourceContext接口;SourceContext接口主要定义了collect、collectWithTimestamp方法用于发射数据,同时也提供了emitWatermark来发射Watermark
- 对于数据的发射来说,其调用顺序为Task.run --> StreamTask.invoke --> SourceStreamTask.run --> SourceStream.run --> userFunction.run(ctx)(
RandomWordSource.run
);SourceStream.run里头在调用userFunction.run之前会判断latencyTrackingInterval是否大于0,如果大于0则会创建LatencyMarksEmitter,它注册了定时任务来定时回调ProcessingTimeCallback的onProcessingTime方法,来触发output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex)) - 这里相当于下游会收到userFunction.run发送的用户数据,也会收到定时任务发送的LatencyMarker;下游的调用顺序为Task.run --> StreamTask.invoke --> OneInputStreamTask.run --> StreamInputProcessor.processInput --> statusWatermarkValve.inputWatermark或者statusWatermarkValve.inputStreamStatus或者streamOperator.processLatencyMarker或者streamOperator.processElement;可以看到StreamInputProcessor.processInput里头会根据数据的不同类型做不同处理,如果是用户数据,则调用streamOperator.processElement即StreamMap.processElement --> userFunction.map(
UpperCaseMapFunc.map
)