
使用NestJS和Prisma構建REST API:身份驗證
Flink編程API中,我們開發的數據處理Job程序,都是始于一個Source,對應的輸入數據源,終于一個Sink,對應輸出的存儲系統,中間是各種豐富的處理算子。但是對于批式和流式編程API,從代碼層面對應的抽象基本上是名稱不同,具體邏輯功能比較一致:批式編程API對批式Job DAG中每個節點的抽象使用的是DataSet,而流式編程API中對應的是DataStream。
對于批式Job DAG中,DataSet的類設計體系,如下圖所示:
相關的類都在包org.apache.flink.api.java.operators下面,通過上圖可以看出,主要分為4類:DataSource、DataSink、SingleInputOperator、TwoInputOperator。其中,DataSink并沒有繼承自DataSet,但是作為批式Job DAG的輸出節點抽象,也還是與上圖中各個Operator有直接或間接的關系。 在編寫批式Job過程中,輸入是一個DataSource(實際也是DataSet),處理中每經過一個轉換操作(Transformation),都會生成一個新的類型的DataSet,這在API上看是統一的,實際底層稍微有一點不同。比如,經過groupBy操作后,返回的是一個UnsortedGrouping,它不是一個DataSet實現,而是一種中間結構,封裝了很多有用的信息以供翻譯過程中使用,通過使用這種中間結構能夠更好地處理復雜的轉換操作,通過下面代碼來看可能更直觀一些,在DataSet中查看groupBy()方法代碼,如下所示:
public UnsortedGrouping<T> groupBy(int... fields) {
return new UnsortedGrouping<>(this, new Keys.ExpressionKeys<>(fields, getType()));
}
上面代碼中UnsortedGrouping并不是一個DataSet實現,而是一個用來處理groupBy操作的中間結構,它繼承自Grouping抽象類,類定義如下所示:
@Public
public abstract class Grouping<T> {
protected final DataSet<T> inputDataSet;
protected final Keys<T> keys;
protected Partitioner<?> customPartitioner;
public Grouping(DataSet<T> set, Keys<T> keys) {
if (set == null || keys == null) {
throw new NullPointerException();
}
if (keys.isEmpty()) {
throw new InvalidProgramException("The grouping keys must not be empty.");
}
this.inputDataSet = set;
this.keys = keys;
}
@Internal
public DataSet<T> getInputDataSet() {
return this.inputDataSet;
}
@Internal
public Keys<T> getKeys() {
return this.keys;
}
@Internal
public Partitioner<?> getCustomPartitioner() {
return this.customPartitioner;
}
}
對于流式Job DAG中,類設計方面稍有不同,Flink使用了DataStream和StreamOperator這兩個類設計體系。我們先看DataStream類設計體系,如下圖所示:
DataStream表示在流式Job DAG中每一步轉換操作之前與之后,都對應著一個DataStream的數據結構,它內部封裝了與轉換操作相關的處理邏輯,其實就是StreamOperator。對應上圖中,我們舉幾個編寫流式處理程序的例子說明:調用StreamExecutionEnvironment.readTextFile()時會生成一個DataStreamSource,調用keyBy()時會生成一個KeyedStream,調用split()時會生成一個SplitStream,調用iterate()時會生成一個IterativeStream。
下面看下StreamOperator類的設計體系,如下圖所示:
編寫批式Job程序,使用執行上線文環境對象ExecutionEnvironment,而流式使用的是StreamExecutionEnvironment。通過用戶編程API構建好DAG Job后,都是通過調用執行上線文環境對象的execute()方法提交Job去運行。無論是批式Job還是流式Job,它們在提交執行過程中,有相同的流程,也有不同的流程,通過識別這個過程中涉及相同/不同的API對象,我們抽象出如下流程概念圖:
上圖中,左側是批式Job通過API構建并提交到計算集群,基于DataSet進行編程實現,始于DataSource,終于DataSink;右側是流式Job通過API構建并提交到計算集群,基于DataStream進行編程實現,始于DataStreamSource,終于DataStreamSink。中間部分,跨兩個不同環境上下文對象是在提交Job過程中公共的抽象。
對于批式Job程序提交,核心代碼如下所示:
final Plan plan = createProgramPlan(jobName);
final PipelineExecutorFactory executorFactory =
executorServiceLoader.getExecutorFactory(configuration);
CompletableFuture<JobClient> jobClientFuture = executorFactory
.getExecutor(configuration)
.execute(plan, configuration);
對于流式Job程序提交運行的核心代碼,如下所示:
StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
if (clearTransformations) {
this.transformations.clear();
}
… …
final PipelineExecutorFactory executorFactory =
executorServiceLoader.getExecutorFactory(configuration);
CompletableFuture<JobClient> jobClientFuture = executorFactory
.getExecutor(configuration)
.execute(streamGraph, configuration);
? ? ? ? ?上面代碼中的Plan和StreamGraph,都是Pipeline接口的具體實現,這兩個實現分別用來表示批式和流式Job程序的拓撲,內部封裝了用于構建生成JobGraph所需要的全部信息。
? ? ? ?上面代碼是構建生成JobGraph的主要邏輯,先是通過getExecutor()獲取到一個PipelineExecutor,然后調用PipelineExecutor的execute()來構建并提交JobGraph。這里,PipelineExecutor表示執行Flink Job的方式,比如,本地執行使用LocalExecutor,或提交到YARN集群上執行使用YarnJobClusterExecutor,或提交到Kubernetes集群上執行使用KubernetesSessionClusterExecutor,等等。
? ? ? ?無論是提交批式還是流式Job,最終都被轉換成JobGraph對象,構建JobGraph的處理邏輯是完全統一的。構建JobGraph的代碼邏輯,如下代碼所示:
public static JobGraph getJobGraph(
Pipeline pipeline,
Configuration optimizerConfiguration,
int defaultParallelism) {
FlinkPipelineTranslator pipelineTranslator = getPipelineTranslator(pipeline);
return pipelineTranslator.translateToJobGraph(pipeline,
optimizerConfiguration,
defaultParallelism);
}
? ? ?上面,輸入的Pipeline對象,批式編程對應具體實現Plan,流式編程對應具體實現StreamGraph。由于用戶編程API的不同,這里面選擇了不同的FlinkPipelineTranslator來對輸入的Pipeline對象進行翻譯,其中批式對應的是PlanTranslator,流式對應的是StreamGraphTranslator,最終通過翻譯,都生成一個統一的JobGraph。當然,調用translateToJobGraph()方法進行翻譯處理的邏輯有很大的不同。
文章轉自微信公眾號@架構師