現在,您將在 Postman 集合下看到所有 API 終端節點的列表,以及每個終端節點的預創建調用。轉到 Environments -> Globals 并使用 SQL Gateway 的值定義 <span class=“inline-code”>baseUrl</span>。如果你在本地運行它,那么這將是 <span class=“inline-code”>http://localhost:8083</span>

CleanShot 2024-03-07 于 17.27.59.png

現在返回 Collections,在 Flink SQL Gateway REST API 文件夾下找到 <span class=“inline-code”>get Info</span> 調用。打開它并點擊 發送.您應該會看到如下所示的成功響應:

CleanShot 2024-03-07 于 17.32.59.png

您還可以單擊“代碼”圖標 (<span class=“inline-code”></></span> ) 查看各種不同語言和工具(包括 cURL 和 HTTPie)的調用。雖然目前這還不算是什么創新之舉,但當你開始使用有效載荷時,就會發現它真的非常便捷。

CleanShot 2024-03-07 于 17.34.00.png

就像我們在上面手動填充全局變量 <span class=“inline-code”>baseURL</span>一樣,我們也可以從一個調用中獲取響應,并在另一個調用中使用它。這非常有用,因為我們需要使用 REST API 返回的兩個變量(<span class=“inline-code”>sessionHandle</span> 和 <span class=“inline-code”>operationHandle</span>)。要在 Postman 中執行此操作,請將以下內容添加到請求窗格的 Tests (測試) 選項卡中:

var jsonData = JSON.parse(responseBody); postman.setEnvironmentVariable("sessionHandle", jsonData.sessionHandle);

這假設要填充的變量名為 <span class=“inline-code”>sessionHandle</span>并且它在響應的名為 <span class=“inline-code”>sessionHandle</span> 的根鍵中返回。它是:

$ http  --follow --timeout 3600 POST 'localhost:8083/sessions' \
Content-Type:'application/json' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 56
content-type: application/json; charset=UTF-8

{
"sessionHandle": "190edef5-df00-4182-be0e-431737b1e93b"
}

在設置變量之后,你可以通過在雙大括號中對其進行引用,來在其他調用中使用這個變量。如下所示:

CleanShot 2024-03-07 于 17.46.23.png

我在此分享了我的Postman集合的副本,其中已經為您完成了上述的變量配置。

現在,我們來了解一下如何從頭開始將 SQL 語句實際提交到網關的工作流。

使用 Flink SQL 網關運行 SQL 語句

從本質上講,最少的步驟如下。

  1. 建立會話(設置了可選配置參數)
  2. 提交 SQL 語句,這將生成一個 Operation。
  3. 檢查 Operation (操作) 的狀態,直到它完成
  4. 獲取 Operation 的結果。

以下是執行每個操作的方法,使用 HTTPie 作為示例客戶端并顯示響應。我使用bash變量來存儲會話和操作句柄的值。

檢查連接和 Flink 版本

$ http --body --follow --timeout 3600 GET 'localhost:8083/info' \
Accept:'application/json'
{
"productName": "Apache Flink",
"version": "1.18.1"
}

1. 創建會話

POST /會話

$ printf '{
"properties": {
"execution.runtime-mode": "batch"
}
}'| http --follow --timeout 3600 POST 'localhost:8083/sessions' \
Content-Type:'application/json' \
Accept:'application/json'

HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 56
content-type: application/json; charset=UTF-8

{
"sessionHandle": "e296eb18-9b6e-4fbc-bd6c-0cbb93a7fe28"
}

$ export SESSIONHANDLE="e296eb18-9b6e-4fbc-bd6c-0cbb93a7fe28"

[可選]驗證會話并讀取會話配置

請注意,此處的 <span class=“inline-code”>runtime-mode</span> 是從上面在會話創建中傳遞的 <span class=“inline-code”>properties</span> 中設置的。

$ http --ignore-stdin --form --follow --timeout 3600 GET 'localhost:8083/sessions/'$SESSIONHANDLE \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 2129
content-type: application/json; charset=UTF-8
Copy
{
"properties": {
"env.java.opts.all": "--add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNN
AMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=AL
L-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-op
ens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/ja
va.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.ut
il.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED",
"execution.attached": "true",
"execution.runtime-mode": "batch",
"execution.savepoint-restore-mode": "NO_CLAIM",
"execution.savepoint.ignore-unclaimed-state": "false",
"execution.shutdown-on-attached-exit": "false",
"execution.target": "remote",
"jobmanager.bind-host": "localhost",
"jobmanager.execution.failover-strategy": "region",
"jobmanager.memory.process.size": "1600m",
"jobmanager.rpc.address": "localhost",
"jobmanager.rpc.port": "6123",
"parallelism.default": "1",
"pipeline.classpaths": "",
"pipeline.jars": "file:/Users/rmoff/flink/flink-1.18.1/opt/flink-python-1.18.1.jar",
"rest.address": "localhost",
"rest.bind-address": "localhost",
"sql-gateway.endpoint.rest.address": "localhost",
"table.catalog-store.file.path": "./conf/catalogs",
"table.catalog-store.kind": "file",
"table.resources.download-dir": "/var/folders/7x/nscwrz557vlcd_ydgt7d5wt00000gn/T/sql-gateway-e296eb18-9b6e-4fbc-bd6c-0cbb93a7fe28",
"taskmanager.bind-host": "localhost",
"taskmanager.host": "localhost",
"taskmanager.memory.process.size": "1728m",
"taskmanager.numberOfTaskSlots": "1"
}
}

2. 提交 SQL 語句

$ printf '{
"statement": "CREATE TABLE t_foo WITH ( '\''connector'\'' = '\''filesystem'\'', '\''path'\'' = '\''file:///tmp/flink-test'\'', '\''format'\'' = '\''csv'\'', '\''csv.field-delimiter'\'' = '\'','\'' ) AS SELECT name, COUNT(*) AS cnt FROM (VALUES ('\''Bob'\''), ('\''Alice'\''), ('\''Greg'\''), ('\''Bob'\'')) AS NameTable(name) GROUP BY name;"
}'| http --follow --timeout 3600 POST 'localhost:8083/sessions/'$SESSIONHANDLE'/statements' \
Content-Type:'application/json' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 58
content-type: application/json; charset=UTF-8

{
"operationHandle": "ba45649c-07b2-4b1c-a190-df3631b53549"
}

$ export OPERATIONHANDLE="ba45649c-07b2-4b1c-a190-df3631b53549"

3. 獲取 Statement 執行狀態

$ http --follow --timeout 3600 GET 'localhost:8083/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/status' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 21
content-type: application/json; charset=UTF-8

{
"status": "FINISHED"
}

4. 獲得結果

$ http --follow --timeout 3600 GET 'localhost:8083/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/result/0?rowFormat=JSON' \
Accept:'application/json'

HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 483
content-type: application/json; charset=UTF-8
Copy
{
"resultType": "PAYLOAD",
"isQueryResult": false,
"jobID": "fb1a5f06643364bc82a9a4e0bd3e9c10",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": [
{
"kind": "INSERT",
"fields": [
"fb1a5f06643364bc82a9a4e0bd3e9c10"
]
}
]
},
"nextResultUri": "/v2/sessions/41ec5bb8-3574-4c6b-9b47-7bf9aa021ccc/operations/9bb84ff8-89a6-4f94-8dcc-e9125091c63b/result/1?rowFormat=JSON"
}

因為 <span class=“inline-code”>resultType</span> 不是 <span class=“inline-code”>EOS</span>并且有一個值<span class=“inline-code”>nextResultUri</span>它告訴我們還有更多要獲取 – 在 <span class=“inline-code”>nextResultUri</span>中指定的位置:

{
"resultType": "EOS",
"isQueryResult": false,
"jobID": "fb1a5f06643364bc82a9a4e0bd3e9c10",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": []
}
}

5. 整理

正確的做法是在會話結束后將其關閉:

$ http --follow --timeout 3600 DELETE 'localhost:8083/sessions/'$SESSIONHANDLE \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 19
content-type: application/json; charset=UTF-8

{
"status": "CLOSED"
}

使用 Shell 腳本執行 Flink SQL

我們可以使用上述所有內容和一些 bash 來編寫腳本:

host='localhost:8083'

SESSIONHANDLE=$(printf '{
"properties": {
"execution.runtime-mode": "batch"
}
}'| http --follow --timeout 3600 POST $host'/sessions' \
Content-Type:'application/json' \
Accept:'application/json' | jq -r '.sessionHandle')

echo "Got session handle: "$SESSIONHANDLE

SQL_STATEMENT_ONE_LINE=$(tr '\n' ' ' < rmoff.sql)

OPERATIONHANDLE=$(printf '{
"statement": "%s"
}' "$SQL_STATEMENT_ONE_LINE" | http --follow --timeout 3600 POST $host'/sessions/'$SESSIONHANDLE'/statements' \
Content-Type:'application/json' \
Accept:'application/json' | jq -r '.operationHandle')

echo "Got operation handle: "$OPERATIONHANDLE

while [ 1 -eq 1 ]
do
STATUS=$(http --follow --timeout 3600 GET $host'/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/status' \
Accept:'application/json' | jq -r '.status')
echo $STATUS
if [ $STATUS != "RUNNING" ]; then
break
fi
sleep 2
done

echo "\n\n----- ?? RESULTS ?? -----\n"
URL='/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/result/0?rowFormat=JSON'
while [ 1 -eq 1 ]
do
RESULT=$(http --follow --timeout 3600 GET $host$URL \
Accept:'application/json')
echo $RESULT | jq '.'
URL=$(echo $RESULT | jq -r '.nextResultUri // ""')
if [ -z $URL ]; then
break
fi
echo "(next result chunk ??)"
done

echo "Closing session ???"
http --follow --timeout 3600 DELETE $host'/sessions/'$SESSIONHANDLE

我們將實際的 SQL 放入一個名為 <span class=“inline-code”>rmoff.sql</span> 的文件中:

CREATE TABLE t_foo WITH (
'connector' = 'filesystem',
'path' = 'file:///tmp/flink-test',
'format' = 'csv',
'csv.field-delimiter' = ','
) AS SELECT name, COUNT(*) AS cnt FROM (
VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')
) AS NameTable(name) GROUP BY name;

現在,當我們運行 shell 腳本時,我們得到這個:

Got session handle: 8d7dc671-d7aa-4ddb-ba04-706b0311aa69
Got operation handle: 3aa41360-bd21-453a-a759-b54db69c81ae
RUNNING
FINISHED

----- ?? RESULTS ?? -----

{
"resultType": "PAYLOAD",
"isQueryResult": false,
"jobID": "615365befee24c53d1efa195f9d72eee",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": [
{
"kind": "INSERT",
"fields": [
"615365befee24c53d1efa195f9d72eee"
]
}
]
},
"nextResultUri": "/v2/sessions/8d7dc671-d7aa-4ddb-ba04-706b0311aa69/operations/3aa41360-bd21-453a-a759-b54db69c81ae/result/1?rowFormat=JSON"
}
(next result chunk ??)
{
"resultType": "EOS",
"isQueryResult": false,
"jobID": "615365befee24c53d1efa195f9d72eee",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": []
}
}
Closing session ???
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 19
content-type: application/json; charset=UTF-8

{
"status": "CLOSED"
}

我們實際運行的SQL語句會將一個CSV文件寫入<span class="inline-code">/tmp</span>文件夾,因此讓我們檢查一下該文件是否已成功生成并有效:

$ ls -lrt /tmp/flink-test && cat /tmp/flink-test/*
-rw-r--r--@ 1 rmoff wheel 21 7 Mar 18:07 part-f50c05ae-e39e-40c1-8b00-b1a1ebfced0d-task-0-file-0
Alice,1
Bob,2
Greg,1

很好,這完全符合我們的預期。

總結

如果您想了解有關 Flink SQL 的更多信息,您可能有興趣了解有關 Catalog 角色的更多信息、使用 Catalog 的動手示例,或者深入了解如何將 JAR 與 Flink SQL 結合使用。

您可能還想嘗試一下我們的Decodable服務,它提供了完全托管的Apache Flink和Debezium。通過我們的CLI和API,您可以輕松地使用Flink SQL來部署管道。

原文鏈接:https://www.decodable.co/blog/exploring-the-flink-sql-gateway-rest-api

上一篇:

使用 FastAPI、Pytorch 和 SerpApi 進行自動訓練

下一篇:

Rest API 教程 – 完整的初學者指南
#你可能也喜歡這些API文章!

我們有何不同?

API服務商零注冊

多API并行試用

數據驅動選型,提升決策效率

查看全部API→
??

熱門場景實測,選對API

#AI文本生成大模型API

對比大模型API的內容創意新穎性、情感共鳴力、商業轉化潛力

25個渠道
一鍵對比試用API 限時免費

#AI深度推理大模型API

對比大模型API的邏輯推理準確性、分析深度、可視化建議合理性

10個渠道
一鍵對比試用API 限時免費