
云原生 API 網關 APISIX 入門教程
Apache Flink 中的 SQL 網關提供了一種從 SQL 客戶端以外的位置在 Flink 中運行 SQL 的方法。這涵蓋了使用JDBC驅動程序(支持大量客戶端連接)、通過HiveServer2端點的Hive客戶端訪問,以及直接針對REST端點的操作,都是為了找到最適合您需求的產品。
在我繼續學習 Flink SQL 的旅程時,我想知道的一件事是,在生產場景中如何提交 Flink SQL 作業。在我看來,SQL Gateway的REST API會是一個很好的選擇。您將 SQL 代碼放在源代碼管理下的文件中,然后使用部署管道針對終端節點提交該 SQL。值得注意的是,建議在將生產作業部署到 Flink 時使用應用程序模式,而 SQL 客戶端和網關尚不支持此功能。當前有一個FLIP正在討論中,但如果您傾向于使用應用程序模式,那么就需要將SQL打包在JAR文件中進行部署。當然,您也可以選擇繼續使用SQL客戶端或網關,不過需要注意的是,在會話模式下運行作業時會存在一些限制,主要就是沒有資源隔離。
在本文中,我將向您展示如何使用終端節點,包括使用 Postman 工具探索它,使用 HTTPie 從 shell 調用終端節點,最后使用一個可行的概念驗證腳本來執行腳本中的語句。
該 API 有文檔記錄,有兩個版本,每個版本都有自己的 OpenAPI YAML 規范。我將在此處查看 <span class=“inline-code”>v2</span>。請注意,根據文檔,該規范目前仍處于實驗階段。
首先,讓我們在本地啟動 Flink 集群和 SQL 網關:
./bin/start-cluster.sh
./bin/sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=localhost
Postman?是一個方便的工具,用于使用 API 執行大量有用的操作。在這里,我只是借助它來創建針對終端節點的示例調用,以便快速了解它們之間的關系。你可以在 Web 上使用它,但假設你使用的是 Flink 的本地實例(或者至少是不可公開訪問的實例),那么你需要桌面版本。請注意,為了訪問我們在此處所使用的導入功能,您仍然需要先注冊一個免費帳戶。
在您的工作區下,單擊導入并粘貼 SQL Gateway 的 OpenAPI YAML 文件的 URL (<span class=“inline-code”>https://nightlies.apache.org/flink/flink-docs-master/generated/rest_v2_sql_gateway.yml</span>)
,該文件在文檔頁面上鏈接到該文件。將其作為 Postman 集合導入。
現在,您將在 Postman 集合下看到所有 API 終端節點的列表,以及每個終端節點的預創建調用。轉到 Environments -> Globals 并使用 SQL Gateway 的值定義 <span class=“inline-code”>baseUrl</span>
。如果你在本地運行它,那么這將是 <span class=“inline-code”>http://localhost:8083</span>
現在返回 Collections,在 Flink SQL Gateway REST API 文件夾下找到 <span class=“inline-code”>get Info</span>
調用。打開它并點擊 發送.您應該會看到如下所示的成功響應:
您還可以單擊“代碼”圖標 (<span class=“inline-code”></></span> )
查看各種不同語言和工具(包括 cURL 和 HTTPie)的調用。雖然目前這還不算是什么創新之舉,但當你開始使用有效載荷時,就會發現它真的非常便捷。
就像我們在上面手動填充全局變量 <span class=“inline-code”>baseURL</span>
一樣,我們也可以從一個調用中獲取響應,并在另一個調用中使用它。這非常有用,因為我們需要使用 REST API 返回的兩個變量(<span class=“inline-code”>sessionHandle</span> 和 <span class=“inline-code”>operationHandle</span>)
。要在 Postman 中執行此操作,請將以下內容添加到請求窗格的 Tests (測試) 選項卡中:
var jsonData = JSON.parse(responseBody); postman.setEnvironmentVariable("sessionHandle", jsonData.sessionHandle);
這假設要填充的變量名為 <span class=“inline-code”>sessionHandle</span>
并且它在響應的名為 <span class=“inline-code”>sessionHandle</span>
的根鍵中返回。它是:
$ http --follow --timeout 3600 POST 'localhost:8083/sessions' \
Content-Type:'application/json' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 56
content-type: application/json; charset=UTF-8
{
"sessionHandle": "190edef5-df00-4182-be0e-431737b1e93b"
}
在設置變量之后,你可以通過在雙大括號中對其進行引用,來在其他調用中使用這個變量。如下所示:
我在此分享了我的Postman集合的副本,其中已經為您完成了上述的變量配置。
現在,我們來了解一下如何從頭開始將 SQL 語句實際提交到網關的工作流。
從本質上講,最少的步驟如下。
以下是執行每個操作的方法,使用 HTTPie 作為示例客戶端并顯示響應。我使用bash變量來存儲會話和操作句柄的值。
$ http --body --follow --timeout 3600 GET 'localhost:8083/info' \
Accept:'application/json'
{
"productName": "Apache Flink",
"version": "1.18.1"
}
POST /會話
$ printf '{
"properties": {
"execution.runtime-mode": "batch"
}
}'| http --follow --timeout 3600 POST 'localhost:8083/sessions' \
Content-Type:'application/json' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 56
content-type: application/json; charset=UTF-8
{
"sessionHandle": "e296eb18-9b6e-4fbc-bd6c-0cbb93a7fe28"
}
$ export SESSIONHANDLE="e296eb18-9b6e-4fbc-bd6c-0cbb93a7fe28"
請注意,此處的 <span class=“inline-code”>runtime-mode</span>
是從上面在會話創建中傳遞的 <span class=“inline-code”>properties</span>
中設置的。
$ http --ignore-stdin --form --follow --timeout 3600 GET 'localhost:8083/sessions/'$SESSIONHANDLE \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 2129
content-type: application/json; charset=UTF-8
Copy
{
"properties": {
"env.java.opts.all": "--add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNN
AMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=AL
L-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-op
ens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/ja
va.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.ut
il.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED",
"execution.attached": "true",
"execution.runtime-mode": "batch",
"execution.savepoint-restore-mode": "NO_CLAIM",
"execution.savepoint.ignore-unclaimed-state": "false",
"execution.shutdown-on-attached-exit": "false",
"execution.target": "remote",
"jobmanager.bind-host": "localhost",
"jobmanager.execution.failover-strategy": "region",
"jobmanager.memory.process.size": "1600m",
"jobmanager.rpc.address": "localhost",
"jobmanager.rpc.port": "6123",
"parallelism.default": "1",
"pipeline.classpaths": "",
"pipeline.jars": "file:/Users/rmoff/flink/flink-1.18.1/opt/flink-python-1.18.1.jar",
"rest.address": "localhost",
"rest.bind-address": "localhost",
"sql-gateway.endpoint.rest.address": "localhost",
"table.catalog-store.file.path": "./conf/catalogs",
"table.catalog-store.kind": "file",
"table.resources.download-dir": "/var/folders/7x/nscwrz557vlcd_ydgt7d5wt00000gn/T/sql-gateway-e296eb18-9b6e-4fbc-bd6c-0cbb93a7fe28",
"taskmanager.bind-host": "localhost",
"taskmanager.host": "localhost",
"taskmanager.memory.process.size": "1728m",
"taskmanager.numberOfTaskSlots": "1"
}
}
$ printf '{
"statement": "CREATE TABLE t_foo WITH ( '\''connector'\'' = '\''filesystem'\'', '\''path'\'' = '\''file:///tmp/flink-test'\'', '\''format'\'' = '\''csv'\'', '\''csv.field-delimiter'\'' = '\'','\'' ) AS SELECT name, COUNT(*) AS cnt FROM (VALUES ('\''Bob'\''), ('\''Alice'\''), ('\''Greg'\''), ('\''Bob'\'')) AS NameTable(name) GROUP BY name;"
}'| http --follow --timeout 3600 POST 'localhost:8083/sessions/'$SESSIONHANDLE'/statements' \
Content-Type:'application/json' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 58
content-type: application/json; charset=UTF-8
{
"operationHandle": "ba45649c-07b2-4b1c-a190-df3631b53549"
}
$ export OPERATIONHANDLE="ba45649c-07b2-4b1c-a190-df3631b53549"
$ http --follow --timeout 3600 GET 'localhost:8083/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/status' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 21
content-type: application/json; charset=UTF-8
{
"status": "FINISHED"
}
$ http --follow --timeout 3600 GET 'localhost:8083/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/result/0?rowFormat=JSON' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 483
content-type: application/json; charset=UTF-8
Copy
{
"resultType": "PAYLOAD",
"isQueryResult": false,
"jobID": "fb1a5f06643364bc82a9a4e0bd3e9c10",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": [
{
"kind": "INSERT",
"fields": [
"fb1a5f06643364bc82a9a4e0bd3e9c10"
]
}
]
},
"nextResultUri": "/v2/sessions/41ec5bb8-3574-4c6b-9b47-7bf9aa021ccc/operations/9bb84ff8-89a6-4f94-8dcc-e9125091c63b/result/1?rowFormat=JSON"
}
因為 <span class=“inline-code”>resultType</span>
不是 <span class=“inline-code”>EOS</span>
并且有一個值<span class=“inline-code”>nextResultUri</span>
它告訴我們還有更多要獲取 – 在 <span class=“inline-code”>nextResultUri</span>
中指定的位置:
{
"resultType": "EOS",
"isQueryResult": false,
"jobID": "fb1a5f06643364bc82a9a4e0bd3e9c10",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": []
}
}
正確的做法是在會話結束后將其關閉:
$ http --follow --timeout 3600 DELETE 'localhost:8083/sessions/'$SESSIONHANDLE \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 19
content-type: application/json; charset=UTF-8
{
"status": "CLOSED"
}
我們可以使用上述所有內容和一些 bash 來編寫腳本:
host='localhost:8083'
SESSIONHANDLE=$(printf '{
"properties": {
"execution.runtime-mode": "batch"
}
}'| http --follow --timeout 3600 POST $host'/sessions' \
Content-Type:'application/json' \
Accept:'application/json' | jq -r '.sessionHandle')
echo "Got session handle: "$SESSIONHANDLE
SQL_STATEMENT_ONE_LINE=$(tr '\n' ' ' < rmoff.sql)
OPERATIONHANDLE=$(printf '{
"statement": "%s"
}' "$SQL_STATEMENT_ONE_LINE" | http --follow --timeout 3600 POST $host'/sessions/'$SESSIONHANDLE'/statements' \
Content-Type:'application/json' \
Accept:'application/json' | jq -r '.operationHandle')
echo "Got operation handle: "$OPERATIONHANDLE
while [ 1 -eq 1 ]
do
STATUS=$(http --follow --timeout 3600 GET $host'/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/status' \
Accept:'application/json' | jq -r '.status')
echo $STATUS
if [ $STATUS != "RUNNING" ]; then
break
fi
sleep 2
done
echo "\n\n----- ?? RESULTS ?? -----\n"
URL='/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/result/0?rowFormat=JSON'
while [ 1 -eq 1 ]
do
RESULT=$(http --follow --timeout 3600 GET $host$URL \
Accept:'application/json')
echo $RESULT | jq '.'
URL=$(echo $RESULT | jq -r '.nextResultUri // ""')
if [ -z $URL ]; then
break
fi
echo "(next result chunk ??)"
done
echo "Closing session ???"
http --follow --timeout 3600 DELETE $host'/sessions/'$SESSIONHANDLE
我們將實際的 SQL 放入一個名為 <span class=“inline-code”>rmoff.sql</span>
的文件中:
CREATE TABLE t_foo WITH (
'connector' = 'filesystem',
'path' = 'file:///tmp/flink-test',
'format' = 'csv',
'csv.field-delimiter' = ','
) AS SELECT name, COUNT(*) AS cnt FROM (
VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')
) AS NameTable(name) GROUP BY name;
現在,當我們運行 shell 腳本時,我們得到這個:
Got session handle: 8d7dc671-d7aa-4ddb-ba04-706b0311aa69
Got operation handle: 3aa41360-bd21-453a-a759-b54db69c81ae
RUNNING
FINISHED
----- ?? RESULTS ?? -----
{
"resultType": "PAYLOAD",
"isQueryResult": false,
"jobID": "615365befee24c53d1efa195f9d72eee",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": [
{
"kind": "INSERT",
"fields": [
"615365befee24c53d1efa195f9d72eee"
]
}
]
},
"nextResultUri": "/v2/sessions/8d7dc671-d7aa-4ddb-ba04-706b0311aa69/operations/3aa41360-bd21-453a-a759-b54db69c81ae/result/1?rowFormat=JSON"
}
(next result chunk ??)
{
"resultType": "EOS",
"isQueryResult": false,
"jobID": "615365befee24c53d1efa195f9d72eee",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": []
}
}
Closing session ???
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 19
content-type: application/json; charset=UTF-8
{
"status": "CLOSED"
}
我們實際運行的SQL語句會將一個CSV文件寫入<span class="inline-code">/tmp</span>
文件夾,因此讓我們檢查一下該文件是否已成功生成并有效:
$ ls -lrt /tmp/flink-test && cat /tmp/flink-test/*
-rw-r--r--@ 1 rmoff wheel 21 7 Mar 18:07 part-f50c05ae-e39e-40c1-8b00-b1a1ebfced0d-task-0-file-0
Alice,1
Bob,2
Greg,1
很好,這完全符合我們的預期。
如果您想了解有關 Flink SQL 的更多信息,您可能有興趣了解有關 Catalog 角色的更多信息、使用 Catalog 的動手示例,或者深入了解如何將 JAR 與 Flink SQL 結合使用。
您可能還想嘗試一下我們的Decodable服務,它提供了完全托管的Apache Flink和Debezium。通過我們的CLI和API,您可以輕松地使用Flink SQL來部署管道。
原文鏈接:https://www.decodable.co/blog/exploring-the-flink-sql-gateway-rest-api