博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink的SourceFunction
阅读量:6186 次
发布时间:2019-06-21

本文共 37439 字,大约阅读时间需要 124 分钟。

  hot3.png

本文主要研究一下flink的SourceFunction

实例

// set up the execution environment        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        DataStreamSource
dataStreamSource = env.addSource(new RandomWordSource()); dataStreamSource.map(new UpperCaseMapFunc()).print(); env.execute("sourceFunctionDemo");
  • 这里通过addSource方法来添加自定义的SourceFunction

SourceFunction

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.java

/** * Base interface for all stream data sources in Flink. The contract of a stream source * is the following: When the source should start emitting elements, the {@link #run} method * is called with a {@link SourceContext} that can be used for emitting elements. * The run method can run for as long as necessary. The source must, however, react to an * invocation of {@link #cancel()} by breaking out of its main loop. * * 

CheckpointedFunction Sources

* *

Sources that also implement the {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction} * interface must ensure that state checkpointing, updating of internal state and emission of * elements are not done concurrently. This is achieved by using the provided checkpointing lock * object to protect update of state and emission of elements in a synchronized block. * *

This is the basic pattern one should follow when implementing a checkpointed source: * *

{@code *  public class ExampleCountSource implements SourceFunction
, CheckpointedFunction { * private long count = 0L; * private volatile boolean isRunning = true; * * private transient ListState
checkpointedCount; * * public void run(SourceContext
ctx) { * while (isRunning && count < 1000) { * // this synchronized block ensures that state checkpointing, * // internal state updates and emission of elements are an atomic operation * synchronized (ctx.getCheckpointLock()) { * ctx.collect(count); * count++; * } * } * } * * public void cancel() { * isRunning = false; * } * * public void initializeState(FunctionInitializationContext context) { * this.checkpointedCount = context * .getOperatorStateStore() * .getListState(new ListStateDescriptor<>("count", Long.class)); * * if (context.isRestored()) { * for (Long count : this.checkpointedCount.get()) { * this.count = count; * } * } * } * * public void snapshotState(FunctionSnapshotContext context) { * this.checkpointedCount.clear(); * this.checkpointedCount.add(count); * } * } * }
* * *

Timestamps and watermarks:

* Sources may assign timestamps to elements and may manually emit watermarks. * However, these are only interpreted if the streaming program runs on * {@link TimeCharacteristic#EventTime}. On other time characteristics * ({@link TimeCharacteristic#IngestionTime} and {@link TimeCharacteristic#ProcessingTime}), * the watermarks from the source function are ignored. * *

Gracefully Stopping Functions

* Functions may additionally implement the {@link org.apache.flink.api.common.functions.StoppableFunction} * interface. "Stopping" a function, in contrast to "canceling" means a graceful exit that leaves the * state and the emitted elements in a consistent state. * *

When a source is stopped, the executing thread is not interrupted, but expected to leave the * {@link #run(SourceContext)} method in reasonable time on its own, preserving the atomicity * of state updates and element emission. * * @param

The type of the elements produced by this source. * * @see org.apache.flink.api.common.functions.StoppableFunction * @see org.apache.flink.streaming.api.TimeCharacteristic */@Publicpublic interface SourceFunction
extends Function, Serializable { /** * Starts the source. Implementations can use the {@link SourceContext} emit * elements. * *

Sources that implement {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction} * must lock on the checkpoint lock (using a synchronized block) before updating internal * state and emitting elements, to make both an atomic operation: * *

{@code	 *  public class ExampleCountSource implements SourceFunction
, CheckpointedFunction { * private long count = 0L; * private volatile boolean isRunning = true; * * private transient ListState
checkpointedCount; * * public void run(SourceContext
ctx) { * while (isRunning && count < 1000) { * // this synchronized block ensures that state checkpointing, * // internal state updates and emission of elements are an atomic operation * synchronized (ctx.getCheckpointLock()) { * ctx.collect(count); * count++; * } * } * } * * public void cancel() { * isRunning = false; * } * * public void initializeState(FunctionInitializationContext context) { * this.checkpointedCount = context * .getOperatorStateStore() * .getListState(new ListStateDescriptor<>("count", Long.class)); * * if (context.isRestored()) { * for (Long count : this.checkpointedCount.get()) { * this.count = count; * } * } * } * * public void snapshotState(FunctionSnapshotContext context) { * this.checkpointedCount.clear(); * this.checkpointedCount.add(count); * } * } * }
* * @param ctx The context to emit elements to and for accessing locks. */ void run(SourceContext
ctx) throws Exception; /** * Cancels the source. Most sources will have a while loop inside the * {@link #run(SourceContext)} method. The implementation needs to ensure that the * source will break out of that loop after this method is called. * *

A typical pattern is to have an {@code "volatile boolean isRunning"} flag that is set to * {@code false} in this method. That flag is checked in the loop condition. * *

When a source is canceled, the executing thread will also be interrupted * (via {@link Thread#interrupt()}). The interruption happens strictly after this * method has been called, so any interruption handler can rely on the fact that * this method has completed. It is good practice to make any flags altered by * this method "volatile", in order to guarantee the visibility of the effects of * this method to any interruption handler. */ void cancel(); // ------------------------------------------------------------------------ // source context // ------------------------------------------------------------------------ /** * Interface that source functions use to emit elements, and possibly watermarks. * * @param

The type of the elements produced by the source. */ @Public // Interface might be extended in the future with additional methods. interface SourceContext
{ //...... }}

  • SourceFunction是flink stream data sources的基本接口,这里头定义了run方法以及cancel方法,同时定义了SourceContext接口

SourceContext

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.java

/**	 * Interface that source functions use to emit elements, and possibly watermarks.	 *	 * @param 
The type of the elements produced by the source. */ @Public // Interface might be extended in the future with additional methods. interface SourceContext
{ /** * Emits one element from the source, without attaching a timestamp. In most cases, * this is the default way of emitting elements. * *

The timestamp that the element will get assigned depends on the time characteristic of * the streaming program: *

    *
  • On {@link TimeCharacteristic#ProcessingTime}, the element has no timestamp.
  • *
  • On {@link TimeCharacteristic#IngestionTime}, the element gets the system's * current time as the timestamp.
  • *
  • On {@link TimeCharacteristic#EventTime}, the element will have no timestamp initially. * It needs to get a timestamp (via a {@link TimestampAssigner}) before any time-dependent * operation (like time windows).
  • *
* * @param element The element to emit */ void collect(T element); /** * Emits one element from the source, and attaches the given timestamp. This method * is relevant for programs using {@link TimeCharacteristic#EventTime}, where the * sources assign timestamps themselves, rather than relying on a {@link TimestampAssigner} * on the stream. * *

On certain time characteristics, this timestamp may be ignored or overwritten. * This allows programs to switch between the different time characteristics and behaviors * without changing the code of the source functions. *

    *
  • On {@link TimeCharacteristic#ProcessingTime}, the timestamp will be ignored, * because processing time never works with element timestamps.
  • *
  • On {@link TimeCharacteristic#IngestionTime}, the timestamp is overwritten with the * system's current time, to realize proper ingestion time semantics.
  • *
  • On {@link TimeCharacteristic#EventTime}, the timestamp will be used.
  • *
* * @param element The element to emit * @param timestamp The timestamp in milliseconds since the Epoch */ @PublicEvolving void collectWithTimestamp(T element, long timestamp); /** * Emits the given {@link Watermark}. A Watermark of value {@code t} declares that no * elements with a timestamp {@code t' <= t} will occur any more. If further such * elements will be emitted, those elements are considered
late. * *

This method is only relevant when running on {@link TimeCharacteristic#EventTime}. * On {@link TimeCharacteristic#ProcessingTime},Watermarks will be ignored. On * {@link TimeCharacteristic#IngestionTime}, the Watermarks will be replaced by the * automatic ingestion time watermarks. * * @param mark The Watermark to emit */ @PublicEvolving void emitWatermark(Watermark mark); /** * Marks the source to be temporarily idle. This tells the system that this source will * temporarily stop emitting records and watermarks for an indefinite amount of time. This * is only relevant when running on {@link TimeCharacteristic#IngestionTime} and * {@link TimeCharacteristic#EventTime}, allowing downstream tasks to advance their * watermarks without the need to wait for watermarks from this source while it is idle. * *

Source functions should make a best effort to call this method as soon as they * acknowledge themselves to be idle. The system will consider the source to resume activity * again once {@link SourceContext#collect(T)}, {@link SourceContext#collectWithTimestamp(T, long)}, * or {@link SourceContext#emitWatermark(Watermark)} is called to emit elements or watermarks from the source. */ @PublicEvolving void markAsTemporarilyIdle(); /** * Returns the checkpoint lock. Please refer to the class-level comment in * {@link SourceFunction} for details about how to write a consistent checkpointed * source. * * @return The object to use as the lock */ Object getCheckpointLock(); /** * This method is called by the system to shut down the context. */ void close(); }

  • SourceContext主要定义了数据源发射数据的接口,这里是collect方法(如果数据本身没有时间,则在使用TimeCharacteristic.EventTime的时候,可以使用TimestampAssigner在进行依赖时间的相关操作时指定timestamp;如果是配合TimeCharacteristic.IngestionTime,则无需指定,系统会自动生成timestamp);除了collect方法外,还有collectWithTimestamp发射数据同时指定timestamp(配合TimeCharacteristic.EventTime使用)
  • 此外还定义了emitWatermark方法,用于处理数据乱序时,只考虑哪些时间范围内的数据,这个只有在配合TimeCharacteristic.EventTime的时候才有效;如果是TimeCharacteristic.ProcessingTime则watermark会被忽略;如果是TimeCharacteristic.IngestionTime则watermark会被自动生成的ingestion time watermarks替代
  • 这里还定义了markAsTemporarilyIdle方法,用于告诉系统当前的source会暂停发射数据一段时间,这个只在配合使用TimeCharacteristic.IngestionTime或者TimeCharacteristic.EventTime的时候才有效;当SourceContext.collect(T)或者SourceContext.collectWithTimestamp(T, long)或者SourceContext.emitWatermark(Watermark)被调用时,系统会认为source又恢复回来继续生产数据
  • 这里还定义了getCheckpointLock方法,用于返回checkpoint的lock,方便source处理checkpoint相关的逻辑
  • close方法主要给系统来调用,用于关闭context相关的资源

Task.run(上游)

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java

/** * The Task represents one execution of a parallel subtask on a TaskManager. * A Task wraps a Flink operator (which may be a user function) and * runs it, providing all services necessary for example to consume input data, * produce its results (intermediate result partitions) and communicate * with the JobManager. * * 

The Flink operators (implemented as subclasses of * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks. * The task connects those to the network stack and actor messages, and tracks the state * of the execution and handles exceptions. * *

Tasks have no knowledge about how they relate to other tasks, or whether they * are the first attempt to execute the task, or a repeated attempt. All of that * is only known to the JobManager. All the task knows are its own runnable code, * the task's configuration, and the IDs of the intermediate results to consume and * produce (if any). * *

Each Task is run by one dedicated thread. */public class Task implements Runnable, TaskActions, CheckpointListener { //...... /** * The core work method that bootstraps the task and executes its code. */ @Override public void run() { //...... // now load and instantiate the task's invokable code invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env); // ---------------------------------------------------------------- // actual task core work // ---------------------------------------------------------------- // we must make strictly sure that the invokable is accessible to the cancel() call // by the time we switched to running. this.invokable = invokable; // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { throw new CancelTaskException(); } // notify everyone that we switched to running notifyObservers(ExecutionState.RUNNING, null); taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)); // make sure the user code classloader is accessible thread-locally executingThread.setContextClassLoader(userCodeClassLoader); // run the invokable invokable.invoke(); //...... }}

  • Task的run方法会调用invokable.invoke(),这里的invokable为StreamTask

StreamTask.invoke

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java

/** * Base class for all streaming tasks. A task is the unit of local processing that is deployed * and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form * the Task's operator chain. Operators that are chained together execute synchronously in the * same thread and hence on the same stream partition. A common case for these chains * are successive map/flatmap/filter tasks. * * 

The task chain contains one "head" operator and multiple chained operators. * The StreamTask is specialized for the type of the head operator: one-input and two-input tasks, * as well as for sources, iteration heads and iteration tails. * *

The Task class deals with the setup of the streams read by the head operator, and the streams * produced by the operators at the ends of the operator chain. Note that the chain may fork and * thus have multiple ends. * *

The life cycle of the task is set up as follows: *

{@code *  -- setInitialState -> provides state of all operators in the chain * *  -- invoke() *        | *        +----> Create basic utils (config, etc) and load the chain of operators *        +----> operators.setup() *        +----> task specific init() *        +----> initialize-operator-states() *        +----> open-operators() *        +----> run() *        +----> close-operators() *        +----> dispose-operators() *        +----> common cleanup *        +----> task specific cleanup() * }
* *

The {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a * {@code StreamOperator} must be synchronized on this lock object to ensure that no methods * are called concurrently. * * @param

* @param
*/@Internalpublic abstract class StreamTask
> extends AbstractInvokable implements AsyncExceptionHandler { //...... @Override public final void invoke() throws Exception { boolean disposed = false; try { //...... // let the task do its work isRunning = true; run(); // if this left the run() method cleanly despite the fact that this was canceled, // make sure the "clean shutdown" is not attempted if (canceled) { throw new CancelTaskException(); } LOG.debug("Finished task {}", getName()); //...... } finally { // clean up everything we initialized isRunning = false; //...... } }}

  • StreamTask的invoke方法里头调用了子类的run方法,这里子类为SourceStreamTask

SourceStreamTask.run

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java

@Override	protected void run() throws Exception {		headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());	}
  • SourceStreamTask的run方法主要调用headOperator的run方法,这里的headOperator为SourceStream

SourceStream.run

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java

public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception {		run(lockingObject, streamStatusMaintainer, output);	}	public void run(final Object lockingObject,			final StreamStatusMaintainer streamStatusMaintainer,			final Output
> collector) throws Exception { final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic(); final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(); final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured() ? getExecutionConfig().getLatencyTrackingInterval() : configuration.getLong(MetricOptions.LATENCY_INTERVAL); LatencyMarksEmitter
latencyEmitter = null; if (latencyTrackingInterval > 0) { latencyEmitter = new LatencyMarksEmitter<>( getProcessingTimeService(), collector, latencyTrackingInterval, this.getOperatorID(), getRuntimeContext().getIndexOfThisSubtask()); } final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(); this.ctx = StreamSourceContexts.getSourceContext( timeCharacteristic, getProcessingTimeService(), lockingObject, streamStatusMaintainer, collector, watermarkInterval, -1); try { userFunction.run(ctx); // if we get here, then the user function either exited after being done (finite source) // or the function was canceled or stopped. For the finite source case, we should emit // a final watermark that indicates that we reached the end of event-time if (!isCanceledOrStopped()) { ctx.emitWatermark(Watermark.MAX_WATERMARK); } } finally { // make sure that the context is closed in any case ctx.close(); if (latencyEmitter != null) { latencyEmitter.close(); } } }
  • SourceStream的run方法,这里先通过StreamSourceContexts.getSourceContext构造SourceFunction.SourceContext,然后调用userFunction的run方法,这里的userFunction为RandomWordSource,即用户自定义的SourceFunction(这里要注意在调用userFunction.run(ctx)之前,如果latencyTrackingInterval大于0,还创建了LatencyMarksEmitter)

RandomWordSource.run

public class RandomWordSource implements SourceFunction
{ private static final Logger LOGGER = LoggerFactory.getLogger(RandomWordSource.class); private volatile boolean isRunning = true; private static final String[] words = new String[]{"The", "brown", "fox", "quick", "jump", "sucky", "5dolla"}; @Override public void run(SourceContext
ctx) throws Exception { while (isRunning) { Thread.sleep(300); int rnd = (int) (Math.random() * 10 % words.length); LOGGER.info("emit word: {}", words[rnd]); ctx.collect(words[rnd]); } } @Override public void cancel() { isRunning = false; }}
  • RandomWordSource的run方法会一直循环发射数据

StreamSource.LatencyMarksEmitter

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java

private static class LatencyMarksEmitter
{ private final ScheduledFuture
latencyMarkTimer; public LatencyMarksEmitter( final ProcessingTimeService processingTimeService, final Output
> output, long latencyTrackingInterval, final OperatorID operatorId, final int subtaskIndex) { latencyMarkTimer = processingTimeService.scheduleAtFixedRate( new ProcessingTimeCallback() { @Override public void onProcessingTime(long timestamp) throws Exception { try { // ProcessingTimeService callbacks are executed under the checkpointing lock output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex)); } catch (Throwable t) { // we catch the Throwables here so that we don't trigger the processing // timer services async exception handler LOG.warn("Error while emitting latency marker.", t); } } }, 0L, latencyTrackingInterval); } public void close() { latencyMarkTimer.cancel(true); } }
  • LatencyMarksEmitter是在StreamSource的run方法里头,调用userFunction的run方法前创建的(如果latencyTrackingInterval>0的话),这里的latencyTrackingInterval先调用getExecutionConfig().isLatencyTrackingConfigured()判断executionConfig是否有配置该值,有配置的话则使用getExecutionConfig().getLatencyTrackingInterval()返回的值,没有配置的话则使用configuration.getLong(MetricOptions.LATENCY_INTERVAL)返回的值,后者默认是2000L(这里使用的是后者的配置,即为2000)
  • LatencyMarksEmitter的构造器里头调用processingTimeService.scheduleAtFixedRate方法注册了一个fixedRate的定时任务,调度间隔为latencyTrackingInterval
  • 定时任务的处理内容在ProcessingTimeCallback的onProcessTime方法,里头调用了output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex))来发送LatencyMarker;这里的processingTimeService为SystemProcessingTimeService;这里的output为AbstractStreamOperator.CountingOutput

SystemProcessingTimeService.scheduleAtFixedRate

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java

@Override	public ScheduledFuture
scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period) { long nextTimestamp = getCurrentProcessingTime() + initialDelay; // we directly try to register the timer and only react to the status on exception // that way we save unnecessary volatile accesses for each timer try { return timerService.scheduleAtFixedRate( new RepeatedTriggerTask(status, task, checkpointLock, callback, nextTimestamp, period), initialDelay, period, TimeUnit.MILLISECONDS); } catch (RejectedExecutionException e) { final int status = this.status.get(); if (status == STATUS_QUIESCED) { return new NeverCompleteFuture(initialDelay); } else if (status == STATUS_SHUTDOWN) { throw new IllegalStateException("Timer service is shut down"); } else { // something else happened, so propagate the exception throw e; } } } @Override public long getCurrentProcessingTime() { return System.currentTimeMillis(); }
  • SystemProcessingTimeService的scheduleAtFixedRate方法,实际是委托timerService的scheduleAtFixedRate来执行的,这里的timerService即ScheduledThreadPoolExecutor,它的corePoolSize为1,然后它调度的任务是RepeatedTriggerTask

RepeatedTriggerTask

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java

/**	 * Internal task which is repeatedly called by the processing time service.	 */	private static final class RepeatedTriggerTask implements Runnable {		private final AtomicInteger serviceStatus;		private final Object lock;		private final ProcessingTimeCallback target;		private final long period;		private final AsyncExceptionHandler exceptionHandler;		private long nextTimestamp;		private RepeatedTriggerTask(				final AtomicInteger serviceStatus,				final AsyncExceptionHandler exceptionHandler,				final Object lock,				final ProcessingTimeCallback target,				final long nextTimestamp,				final long period) {			this.serviceStatus = Preconditions.checkNotNull(serviceStatus);			this.lock = Preconditions.checkNotNull(lock);			this.target = Preconditions.checkNotNull(target);			this.period = period;			this.exceptionHandler = Preconditions.checkNotNull(exceptionHandler);			this.nextTimestamp = nextTimestamp;		}		@Override		public void run() {			synchronized (lock) {				try {					if (serviceStatus.get() == STATUS_ALIVE) {						target.onProcessingTime(nextTimestamp);					}					nextTimestamp += period;				} catch (Throwable t) {					TimerException asyncException = new TimerException(t);					exceptionHandler.handleAsyncException("Caught exception while processing repeated timer task.", asyncException);				}			}		}	}
  • RepeatedTriggerTask会在serviceStatus为STATUS_ALIVE的时候,调用ProcessingTimeCallback的onProcessingTime;这里的nextTimestamp最初传进来的是依据getCurrentProcessingTime() + initialDelay来算的,之后不断累加period

AbstractStreamOperator.CountingOutput.emitLatencyMarker

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java

/**	 * Wrapping {@link Output} that updates metrics on the number of emitted elements.	 */	public static class CountingOutput
implements Output
> { private final Output
> output; private final Counter numRecordsOut; public CountingOutput(Output
> output, Counter counter) { this.output = output; this.numRecordsOut = counter; } @Override public void emitWatermark(Watermark mark) { output.emitWatermark(mark); } @Override public void emitLatencyMarker(LatencyMarker latencyMarker) { output.emitLatencyMarker(latencyMarker); } @Override public void collect(StreamRecord
record) { numRecordsOut.inc(); output.collect(record); } @Override public
void collect(OutputTag
outputTag, StreamRecord
record) { numRecordsOut.inc(); output.collect(outputTag, record); } @Override public void close() { output.close(); } }
  • 它实际包装的是RecordWriterOutput

RecordWriterOutput.emitLatencyMarker

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java

/** * Implementation of {@link Output} that sends data using a {@link RecordWriter}. */@Internalpublic class RecordWriterOutput
implements OperatorChain.WatermarkGaugeExposingOutput
> { private StreamRecordWriter
> recordWriter; private SerializationDelegate
serializationDelegate; //...... @Override public void emitLatencyMarker(LatencyMarker latencyMarker) { serializationDelegate.setInstance(latencyMarker); try { recordWriter.randomEmit(serializationDelegate); } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } }}
  • 这里的emitLatencyMarker主要调用了StreamRecordWriter的randomEmit(它实际上是通过父类RecordWriter来发射),来发射LatencyMarker

RecordWriter

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java

/**	 * This is used to send LatencyMarks to a random target channel.	 */	public void randomEmit(T record) throws IOException, InterruptedException {		sendToTarget(record, rng.nextInt(numChannels));	}	private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {		RecordSerializer
serializer = serializers[targetChannel]; SerializationResult result = serializer.addRecord(record); while (result.isFullBuffer()) { if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { // If this was a full record, we are done. Not breaking // out of the loop at this point will lead to another // buffer request before breaking out (that would not be // a problem per se, but it can lead to stalls in the // pipeline). if (result.isFullRecord()) { break; } } BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel); result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder); } checkState(!serializer.hasSerializedData(), "All data should be written at once"); if (flushAlways) { targetPartition.flush(targetChannel); } }
  • RecordWriter的randomEmit就是随机选择一个targetChannel,然后进行发送

Task.run(下游)

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java

/** * The Task represents one execution of a parallel subtask on a TaskManager. * A Task wraps a Flink operator (which may be a user function) and * runs it, providing all services necessary for example to consume input data, * produce its results (intermediate result partitions) and communicate * with the JobManager. * * 

The Flink operators (implemented as subclasses of * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks. * The task connects those to the network stack and actor messages, and tracks the state * of the execution and handles exceptions. * *

Tasks have no knowledge about how they relate to other tasks, or whether they * are the first attempt to execute the task, or a repeated attempt. All of that * is only known to the JobManager. All the task knows are its own runnable code, * the task's configuration, and the IDs of the intermediate results to consume and * produce (if any). * *

Each Task is run by one dedicated thread. */public class Task implements Runnable, TaskActions, CheckpointListener { //...... /** * The core work method that bootstraps the task and executes its code. */ @Override public void run() { //...... // now load and instantiate the task's invokable code invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env); // ---------------------------------------------------------------- // actual task core work // ---------------------------------------------------------------- // we must make strictly sure that the invokable is accessible to the cancel() call // by the time we switched to running. this.invokable = invokable; // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { throw new CancelTaskException(); } // notify everyone that we switched to running notifyObservers(ExecutionState.RUNNING, null); taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)); // make sure the user code classloader is accessible thread-locally executingThread.setContextClassLoader(userCodeClassLoader); // run the invokable invokable.invoke(); //...... }}

  • 下游的Task的run方法会调用invokable.invoke(),这里的invokable为OneInputStreamTask

OneInputStreamTask

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java

@Override	protected void run() throws Exception {		// cache processor reference on the stack, to make the code more JIT friendly		final StreamInputProcessor
inputProcessor = this.inputProcessor; while (running && inputProcessor.processInput()) { // all the work happens in the "processInput" method } }
  • Task的run方法会调用StreamTask的invoke方法,而invoke方法会调用OneInputStreamTask的run方法这里主要是不断循环调用inputProcessor.processInput();这里的inputProcessor为StreamInputProcessor

StreamInputProcessor

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java

public boolean processInput() throws Exception {		if (isFinished) {			return false;		}		if (numRecordsIn == null) {			try {				numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();			} catch (Exception e) {				LOG.warn("An exception occurred during the metrics setup.", e);				numRecordsIn = new SimpleCounter();			}		}		while (true) {			if (currentRecordDeserializer != null) {				DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);				if (result.isBufferConsumed()) {					currentRecordDeserializer.getCurrentBuffer().recycleBuffer();					currentRecordDeserializer = null;				}				if (result.isFullRecord()) {					StreamElement recordOrMark = deserializationDelegate.getInstance();					if (recordOrMark.isWatermark()) {						// handle watermark						statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);						continue;					} else if (recordOrMark.isStreamStatus()) {						// handle stream status						statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);						continue;					} else if (recordOrMark.isLatencyMarker()) {						// handle latency marker						synchronized (lock) {							streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());						}						continue;					} else {						// now we can do the actual processing						StreamRecord
record = recordOrMark.asRecord(); synchronized (lock) { numRecordsIn.inc(); streamOperator.setKeyContextElement1(record); streamOperator.processElement(record); } return true; } } } final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked(); if (bufferOrEvent != null) { if (bufferOrEvent.isBuffer()) { currentChannel = bufferOrEvent.getChannelIndex(); currentRecordDeserializer = recordDeserializers[currentChannel]; currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer()); } else { // Event received final AbstractEvent event = bufferOrEvent.getEvent(); if (event.getClass() != EndOfPartitionEvent.class) { throw new IOException("Unexpected event: " + event); } } } else { isFinished = true; if (!barrierHandler.isEmpty()) { throw new IllegalStateException("Trailing data in checkpoint barrier handler."); } return false; } } }
  • processInput方法首先调用currentRecordDeserializer.getNextRecord(deserializationDelegate)获取nextRecord,然后只有当result.isFullRecord()的时候才进行处理
  • 处理的时候会根据StreamElement的不同类型进行不同处理,主要分为watermark、streamStatus、latencyMakrker及正常的数据这几类来处理
  • 如果是正常的数据,则调用streamOperator.processElement(record),这里的streamOperator为StreamMap

StreamMap.processElement

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamMap.java

/** * A {@link StreamOperator} for executing {@link MapFunction MapFunctions}. */@Internalpublic class StreamMap
extends AbstractUdfStreamOperator
> implements OneInputStreamOperator
{ private static final long serialVersionUID = 1L; public StreamMap(MapFunction
mapper) { super(mapper); chainingStrategy = ChainingStrategy.ALWAYS; } @Override public void processElement(StreamRecord
element) throws Exception { output.collect(element.replace(userFunction.map(element.getValue()))); }}
  • 这里调用了userFunction.map(element.getValue())来进行map操作,这里的userFunction即为UpperCaseMapFunc

小结

  • SourceFunction是flink stream data sources的基本接口,这里头定义了run方法以及cancel方法,同时定义了SourceContext接口;SourceContext接口主要定义了collect、collectWithTimestamp方法用于发射数据,同时也提供了emitWatermark来发射Watermark
  • 对于数据的发射来说,其调用顺序为Task.run --> StreamTask.invoke --> SourceStreamTask.run --> SourceStream.run --> userFunction.run(ctx)(RandomWordSource.run);SourceStream.run里头在调用userFunction.run之前会判断latencyTrackingInterval是否大于0,如果大于0则会创建LatencyMarksEmitter,它注册了定时任务来定时回调ProcessingTimeCallback的onProcessingTime方法,来触发output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex))
  • 这里相当于下游会收到userFunction.run发送的用户数据,也会收到定时任务发送的LatencyMarker;下游的调用顺序为Task.run --> StreamTask.invoke --> OneInputStreamTask.run --> StreamInputProcessor.processInput --> statusWatermarkValve.inputWatermark或者statusWatermarkValve.inputStreamStatus或者streamOperator.processLatencyMarker或者streamOperator.processElement;可以看到StreamInputProcessor.processInput里头会根据数据的不同类型做不同处理,如果是用户数据,则调用streamOperator.processElement即StreamMap.processElement --> userFunction.map(UpperCaseMapFunc.map)

doc

转载于:https://my.oschina.net/go4it/blog/2961718

你可能感兴趣的文章
Linux学习之CentOS(二十三)--Linux软件管理之源代码以及RPM软件包管理
查看>>
SQL*Loader使用方法
查看>>
ERP系统容灾方案典型架构
查看>>
我的友情链接
查看>>
D3.js学习
查看>>
kafka监控
查看>>
1-7华为HCNA认证eNSP基础B
查看>>
Linux中SUID和SGID详解
查看>>
windows下安装mysql5.7 (爬过多次坑)总结
查看>>
VIM编辑器的简单应用
查看>>
Django 01
查看>>
域名跳转
查看>>
访问控制
查看>>
两人一组,注册账号密码,注册COOKIE是否能够登陆?
查看>>
Object-C中使用NSKeyedArchiver归档(将各种类型的对象存储到文件中)
查看>>
一位大牛整理的python资源
查看>>
设计模式 单例模式(Singleton)
查看>>
jqurey 隐藏
查看>>
Python-编码(base64,md5)
查看>>
Cisco Eigrp SIA 理解
查看>>