編程API設計

Flink編程API中,我們開發的數據處理Job程序,都是始于一個Source,對應的輸入數據源,終于一個Sink,對應輸出的存儲系統,中間是各種豐富的處理算子。但是對于批式和流式編程API,從代碼層面對應的抽象基本上是名稱不同,具體邏輯功能比較一致:批式編程API對批式Job DAG中每個節點的抽象使用的是DataSet,而流式編程API中對應的是DataStream。
對于批式Job DAG中,DataSet的類設計體系,如下圖所示:


        相關的類都在包org.apache.flink.api.java.operators下面,通過上圖可以看出,主要分為4類:DataSource、DataSink、SingleInputOperator、TwoInputOperator。其中,DataSink并沒有繼承自DataSet,但是作為批式Job DAG的輸出節點抽象,也還是與上圖中各個Operator有直接或間接的關系。      在編寫批式Job過程中,輸入是一個DataSource(實際也是DataSet),處理中每經過一個轉換操作(Transformation),都會生成一個新的類型的DataSet,這在API上看是統一的,實際底層稍微有一點不同。比如,經過groupBy操作后,返回的是一個UnsortedGrouping,它不是一個DataSet實現,而是一種中間結構,封裝了很多有用的信息以供翻譯過程中使用,通過使用這種中間結構能夠更好地處理復雜的轉換操作,通過下面代碼來看可能更直觀一些,在DataSet中查看groupBy()方法代碼,如下所示:

public UnsortedGrouping<T> groupBy(int... fields) {
return new UnsortedGrouping<>(this, new Keys.ExpressionKeys<>(fields, getType()));
}

上面代碼中UnsortedGrouping并不是一個DataSet實現,而是一個用來處理groupBy操作的中間結構,它繼承自Grouping抽象類,類定義如下所示:

@Public
public abstract class Grouping<T> {

protected final DataSet<T> inputDataSet;
protected final Keys<T> keys;
protected Partitioner<?> customPartitioner;

public Grouping(DataSet<T> set, Keys<T> keys) {
if (set == null || keys == null) {
throw new NullPointerException();
}

if (keys.isEmpty()) {
throw new InvalidProgramException("The grouping keys must not be empty.");
}

this.inputDataSet = set;
this.keys = keys;
}

@Internal
public DataSet<T> getInputDataSet() {
return this.inputDataSet;
}

@Internal
public Keys<T> getKeys() {
return this.keys;
}

@Internal
public Partitioner<?> getCustomPartitioner() {
return this.customPartitioner;
}
}

對于流式Job DAG中,類設計方面稍有不同,Flink使用了DataStream和StreamOperator這兩個類設計體系。我們先看DataStream類設計體系,如下圖所示:

        DataStream表示在流式Job DAG中每一步轉換操作之前與之后,都對應著一個DataStream的數據結構,它內部封裝了與轉換操作相關的處理邏輯,其實就是StreamOperator。對應上圖中,我們舉幾個編寫流式處理程序的例子說明:調用StreamExecutionEnvironment.readTextFile()時會生成一個DataStreamSource,調用keyBy()時會生成一個KeyedStream,調用split()時會生成一個SplitStream,調用iterate()時會生成一個IterativeStream。
下面看下StreamOperator類的設計體系,如下圖所示:

生成JobGraph對象

編寫批式Job程序,使用執行上線文環境對象ExecutionEnvironment,而流式使用的是StreamExecutionEnvironment。通過用戶編程API構建好DAG Job后,都是通過調用執行上線文環境對象的execute()方法提交Job去運行。無論是批式Job還是流式Job,它們在提交執行過程中,有相同的流程,也有不同的流程,通過識別這個過程中涉及相同/不同的API對象,我們抽象出如下流程概念圖:


        上圖中,左側是批式Job通過API構建并提交到計算集群,基于DataSet進行編程實現,始于DataSource,終于DataSink;右側是流式Job通過API構建并提交到計算集群,基于DataStream進行編程實現,始于DataStreamSource,終于DataStreamSink。中間部分,跨兩個不同環境上下文對象是在提交Job過程中公共的抽象。
對于批式Job程序提交,核心代碼如下所示:

final Plan plan = createProgramPlan(jobName);
final PipelineExecutorFactory executorFactory =
executorServiceLoader.getExecutorFactory(configuration);

CompletableFuture<JobClient> jobClientFuture = executorFactory
.getExecutor(configuration)
.execute(plan, configuration);

對于流式Job程序提交運行的核心代碼,如下所示:

StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
if (clearTransformations) {
this.transformations.clear();
}
… …
final PipelineExecutorFactory executorFactory =
executorServiceLoader.getExecutorFactory(configuration);

CompletableFuture<JobClient> jobClientFuture = executorFactory
.getExecutor(configuration)
.execute(streamGraph, configuration);

? ? ? ? ?上面代碼中的Plan和StreamGraph,都是Pipeline接口的具體實現,這兩個實現分別用來表示批式和流式Job程序的拓撲,內部封裝了用于構建生成JobGraph所需要的全部信息。
? ? ? ?上面代碼是構建生成JobGraph的主要邏輯,先是通過getExecutor()獲取到一個PipelineExecutor,然后調用PipelineExecutor的execute()來構建并提交JobGraph。這里,PipelineExecutor表示執行Flink Job的方式,比如,本地執行使用LocalExecutor,或提交到YARN集群上執行使用YarnJobClusterExecutor,或提交到Kubernetes集群上執行使用KubernetesSessionClusterExecutor,等等。
? ? ? ?無論是提交批式還是流式Job,最終都被轉換成JobGraph對象,構建JobGraph的處理邏輯是完全統一的。構建JobGraph的代碼邏輯,如下代碼所示:

public static JobGraph getJobGraph(
Pipeline pipeline,
Configuration optimizerConfiguration,
int defaultParallelism) {

FlinkPipelineTranslator pipelineTranslator = getPipelineTranslator(pipeline);

return pipelineTranslator.translateToJobGraph(pipeline,
optimizerConfiguration,
defaultParallelism);
}

? ? ?上面,輸入的Pipeline對象,批式編程對應具體實現Plan,流式編程對應具體實現StreamGraph。由于用戶編程API的不同,這里面選擇了不同的FlinkPipelineTranslator來對輸入的Pipeline對象進行翻譯,其中批式對應的是PlanTranslator,流式對應的是StreamGraphTranslator,最終通過翻譯,都生成一個統一的JobGraph。當然,調用translateToJobGraph()方法進行翻譯處理的邏輯有很大的不同。

文章轉自微信公眾號@架構師

上一篇:

15 個REST API 設計的基本技巧

下一篇:

全面解析RESTful API設計規范:最佳實踐與細節指南
#你可能也喜歡這些API文章!

我們有何不同?

API服務商零注冊

多API并行試用

數據驅動選型,提升決策效率

查看全部API→
??

熱門場景實測,選對API

#AI文本生成大模型API

對比大模型API的內容創意新穎性、情感共鳴力、商業轉化潛力

25個渠道
一鍵對比試用API 限時免費

#AI深度推理大模型API

對比大模型API的邏輯推理準確性、分析深度、可視化建議合理性

10個渠道
一鍵對比試用API 限時免費