現(xiàn)在,您將在 Postman 集合下看到所有 API 終端節(jié)點(diǎn)的列表,以及每個(gè)終端節(jié)點(diǎn)的預(yù)創(chuàng)建調(diào)用。轉(zhuǎn)到 Environments -> Globals 并使用 SQL Gateway 的值定義 <span class=“inline-code”>baseUrl</span>。如果你在本地運(yùn)行它,那么這將是 <span class=“inline-code”>http://localhost:8083</span>

CleanShot 2024-03-07 于 17.27.59.png

現(xiàn)在返回 Collections,在 Flink SQL Gateway REST API 文件夾下找到 <span class=“inline-code”>get Info</span> 調(diào)用。打開(kāi)它并點(diǎn)擊 發(fā)送.您應(yīng)該會(huì)看到如下所示的成功響應(yīng):

CleanShot 2024-03-07 于 17.32.59.png

您還可以單擊“代碼”圖標(biāo) (<span class=“inline-code”></></span> ) 查看各種不同語(yǔ)言和工具(包括 cURL 和 HTTPie)的調(diào)用。雖然目前這還不算是什么創(chuàng)新之舉,但當(dāng)你開(kāi)始使用有效載荷時(shí),就會(huì)發(fā)現(xiàn)它真的非常便捷。

CleanShot 2024-03-07 于 17.34.00.png

就像我們?cè)谏厦媸謩?dòng)填充全局變量 <span class=“inline-code”>baseURL</span>一樣,我們也可以從一個(gè)調(diào)用中獲取響應(yīng),并在另一個(gè)調(diào)用中使用它。這非常有用,因?yàn)槲覀冃枰褂?REST API 返回的兩個(gè)變量(<span class=“inline-code”>sessionHandle</span> 和 <span class=“inline-code”>operationHandle</span>)。要在 Postman 中執(zhí)行此操作,請(qǐng)將以下內(nèi)容添加到請(qǐng)求窗格的 Tests (測(cè)試) 選項(xiàng)卡中:

var jsonData = JSON.parse(responseBody); postman.setEnvironmentVariable("sessionHandle", jsonData.sessionHandle);

這假設(shè)要填充的變量名為 <span class=“inline-code”>sessionHandle</span>并且它在響應(yīng)的名為 <span class=“inline-code”>sessionHandle</span> 的根鍵中返回。它是:

$ http  --follow --timeout 3600 POST 'localhost:8083/sessions' \
Content-Type:'application/json' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 56
content-type: application/json; charset=UTF-8

{
"sessionHandle": "190edef5-df00-4182-be0e-431737b1e93b"
}

在設(shè)置變量之后,你可以通過(guò)在雙大括號(hào)中對(duì)其進(jìn)行引用,來(lái)在其他調(diào)用中使用這個(gè)變量。如下所示:

CleanShot 2024-03-07 于 17.46.23.png

我在此分享了我的Postman集合的副本,其中已經(jīng)為您完成了上述的變量配置。

現(xiàn)在,我們來(lái)了解一下如何從頭開(kāi)始將 SQL 語(yǔ)句實(shí)際提交到網(wǎng)關(guān)的工作流。

使用 Flink SQL 網(wǎng)關(guān)運(yùn)行 SQL 語(yǔ)句

從本質(zhì)上講,最少的步驟如下。

  1. 建立會(huì)話(設(shè)置了可選配置參數(shù))
  2. 提交 SQL 語(yǔ)句,這將生成一個(gè) Operation。
  3. 檢查 Operation (操作) 的狀態(tài),直到它完成
  4. 獲取 Operation 的結(jié)果。

以下是執(zhí)行每個(gè)操作的方法,使用 HTTPie 作為示例客戶端并顯示響應(yīng)。我使用bash變量來(lái)存儲(chǔ)會(huì)話和操作句柄的值。

檢查連接和 Flink 版本

$ http --body --follow --timeout 3600 GET 'localhost:8083/info' \
Accept:'application/json'
{
"productName": "Apache Flink",
"version": "1.18.1"
}

1. 創(chuàng)建會(huì)話

POST /會(huì)話

$ printf '{
"properties": {
"execution.runtime-mode": "batch"
}
}'| http --follow --timeout 3600 POST 'localhost:8083/sessions' \
Content-Type:'application/json' \
Accept:'application/json'

HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 56
content-type: application/json; charset=UTF-8

{
"sessionHandle": "e296eb18-9b6e-4fbc-bd6c-0cbb93a7fe28"
}

$ export SESSIONHANDLE="e296eb18-9b6e-4fbc-bd6c-0cbb93a7fe28"

[可選]驗(yàn)證會(huì)話并讀取會(huì)話配置

請(qǐng)注意,此處的 <span class=“inline-code”>runtime-mode</span> 是從上面在會(huì)話創(chuàng)建中傳遞的 <span class=“inline-code”>properties</span> 中設(shè)置的。

$ http --ignore-stdin --form --follow --timeout 3600 GET 'localhost:8083/sessions/'$SESSIONHANDLE \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 2129
content-type: application/json; charset=UTF-8
Copy
{
"properties": {
"env.java.opts.all": "--add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNN
AMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=AL
L-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-op
ens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/ja
va.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.ut
il.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED",
"execution.attached": "true",
"execution.runtime-mode": "batch",
"execution.savepoint-restore-mode": "NO_CLAIM",
"execution.savepoint.ignore-unclaimed-state": "false",
"execution.shutdown-on-attached-exit": "false",
"execution.target": "remote",
"jobmanager.bind-host": "localhost",
"jobmanager.execution.failover-strategy": "region",
"jobmanager.memory.process.size": "1600m",
"jobmanager.rpc.address": "localhost",
"jobmanager.rpc.port": "6123",
"parallelism.default": "1",
"pipeline.classpaths": "",
"pipeline.jars": "file:/Users/rmoff/flink/flink-1.18.1/opt/flink-python-1.18.1.jar",
"rest.address": "localhost",
"rest.bind-address": "localhost",
"sql-gateway.endpoint.rest.address": "localhost",
"table.catalog-store.file.path": "./conf/catalogs",
"table.catalog-store.kind": "file",
"table.resources.download-dir": "/var/folders/7x/nscwrz557vlcd_ydgt7d5wt00000gn/T/sql-gateway-e296eb18-9b6e-4fbc-bd6c-0cbb93a7fe28",
"taskmanager.bind-host": "localhost",
"taskmanager.host": "localhost",
"taskmanager.memory.process.size": "1728m",
"taskmanager.numberOfTaskSlots": "1"
}
}

2. 提交 SQL 語(yǔ)句

$ printf '{
"statement": "CREATE TABLE t_foo WITH ( '\''connector'\'' = '\''filesystem'\'', '\''path'\'' = '\''file:///tmp/flink-test'\'', '\''format'\'' = '\''csv'\'', '\''csv.field-delimiter'\'' = '\'','\'' ) AS SELECT name, COUNT(*) AS cnt FROM (VALUES ('\''Bob'\''), ('\''Alice'\''), ('\''Greg'\''), ('\''Bob'\'')) AS NameTable(name) GROUP BY name;"
}'| http --follow --timeout 3600 POST 'localhost:8083/sessions/'$SESSIONHANDLE'/statements' \
Content-Type:'application/json' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 58
content-type: application/json; charset=UTF-8

{
"operationHandle": "ba45649c-07b2-4b1c-a190-df3631b53549"
}

$ export OPERATIONHANDLE="ba45649c-07b2-4b1c-a190-df3631b53549"

3. 獲取 Statement 執(zhí)行狀態(tài)

$ http --follow --timeout 3600 GET 'localhost:8083/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/status' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 21
content-type: application/json; charset=UTF-8

{
"status": "FINISHED"
}

4. 獲得結(jié)果

$ http --follow --timeout 3600 GET 'localhost:8083/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/result/0?rowFormat=JSON' \
Accept:'application/json'

HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 483
content-type: application/json; charset=UTF-8
Copy
{
"resultType": "PAYLOAD",
"isQueryResult": false,
"jobID": "fb1a5f06643364bc82a9a4e0bd3e9c10",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": [
{
"kind": "INSERT",
"fields": [
"fb1a5f06643364bc82a9a4e0bd3e9c10"
]
}
]
},
"nextResultUri": "/v2/sessions/41ec5bb8-3574-4c6b-9b47-7bf9aa021ccc/operations/9bb84ff8-89a6-4f94-8dcc-e9125091c63b/result/1?rowFormat=JSON"
}

因?yàn)?<span class=“inline-code”>resultType</span> 不是 <span class=“inline-code”>EOS</span>并且有一個(gè)值<span class=“inline-code”>nextResultUri</span>它告訴我們還有更多要獲取 – 在 <span class=“inline-code”>nextResultUri</span>中指定的位置:

{
"resultType": "EOS",
"isQueryResult": false,
"jobID": "fb1a5f06643364bc82a9a4e0bd3e9c10",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": []
}
}

5. 整理

正確的做法是在會(huì)話結(jié)束后將其關(guān)閉:

$ http --follow --timeout 3600 DELETE 'localhost:8083/sessions/'$SESSIONHANDLE \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 19
content-type: application/json; charset=UTF-8

{
"status": "CLOSED"
}

使用 Shell 腳本執(zhí)行 Flink SQL

我們可以使用上述所有內(nèi)容和一些 bash 來(lái)編寫(xiě)腳本:

host='localhost:8083'

SESSIONHANDLE=$(printf '{
"properties": {
"execution.runtime-mode": "batch"
}
}'| http --follow --timeout 3600 POST $host'/sessions' \
Content-Type:'application/json' \
Accept:'application/json' | jq -r '.sessionHandle')

echo "Got session handle: "$SESSIONHANDLE

SQL_STATEMENT_ONE_LINE=$(tr '\n' ' ' < rmoff.sql)

OPERATIONHANDLE=$(printf '{
"statement": "%s"
}' "$SQL_STATEMENT_ONE_LINE" | http --follow --timeout 3600 POST $host'/sessions/'$SESSIONHANDLE'/statements' \
Content-Type:'application/json' \
Accept:'application/json' | jq -r '.operationHandle')

echo "Got operation handle: "$OPERATIONHANDLE

while [ 1 -eq 1 ]
do
STATUS=$(http --follow --timeout 3600 GET $host'/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/status' \
Accept:'application/json' | jq -r '.status')
echo $STATUS
if [ $STATUS != "RUNNING" ]; then
break
fi
sleep 2
done

echo "\n\n----- ?? RESULTS ?? -----\n"
URL='/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/result/0?rowFormat=JSON'
while [ 1 -eq 1 ]
do
RESULT=$(http --follow --timeout 3600 GET $host$URL \
Accept:'application/json')
echo $RESULT | jq '.'
URL=$(echo $RESULT | jq -r '.nextResultUri // ""')
if [ -z $URL ]; then
break
fi
echo "(next result chunk ??)"
done

echo "Closing session ???"
http --follow --timeout 3600 DELETE $host'/sessions/'$SESSIONHANDLE

我們將實(shí)際的 SQL 放入一個(gè)名為 <span class=“inline-code”>rmoff.sql</span> 的文件中:

CREATE TABLE t_foo WITH (
'connector' = 'filesystem',
'path' = 'file:///tmp/flink-test',
'format' = 'csv',
'csv.field-delimiter' = ','
) AS SELECT name, COUNT(*) AS cnt FROM (
VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')
) AS NameTable(name) GROUP BY name;

現(xiàn)在,當(dāng)我們運(yùn)行 shell 腳本時(shí),我們得到這個(gè):

Got session handle: 8d7dc671-d7aa-4ddb-ba04-706b0311aa69
Got operation handle: 3aa41360-bd21-453a-a759-b54db69c81ae
RUNNING
FINISHED

----- ?? RESULTS ?? -----

{
"resultType": "PAYLOAD",
"isQueryResult": false,
"jobID": "615365befee24c53d1efa195f9d72eee",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": [
{
"kind": "INSERT",
"fields": [
"615365befee24c53d1efa195f9d72eee"
]
}
]
},
"nextResultUri": "/v2/sessions/8d7dc671-d7aa-4ddb-ba04-706b0311aa69/operations/3aa41360-bd21-453a-a759-b54db69c81ae/result/1?rowFormat=JSON"
}
(next result chunk ??)
{
"resultType": "EOS",
"isQueryResult": false,
"jobID": "615365befee24c53d1efa195f9d72eee",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": []
}
}
Closing session ???
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 19
content-type: application/json; charset=UTF-8

{
"status": "CLOSED"
}

我們實(shí)際運(yùn)行的SQL語(yǔ)句會(huì)將一個(gè)CSV文件寫(xiě)入<span class="inline-code">/tmp</span>文件夾,因此讓我們檢查一下該文件是否已成功生成并有效:

$ ls -lrt /tmp/flink-test && cat /tmp/flink-test/*
-rw-r--r--@ 1 rmoff wheel 21 7 Mar 18:07 part-f50c05ae-e39e-40c1-8b00-b1a1ebfced0d-task-0-file-0
Alice,1
Bob,2
Greg,1

很好,這完全符合我們的預(yù)期。

總結(jié)

如果您想了解有關(guān) Flink SQL 的更多信息,您可能有興趣了解有關(guān) Catalog 角色的更多信息、使用 Catalog 的動(dòng)手示例,或者深入了解如何將 JAR 與 Flink SQL 結(jié)合使用。

您可能還想嘗試一下我們的Decodable服務(wù),它提供了完全托管的Apache Flink和Debezium。通過(guò)我們的CLI和API,您可以輕松地使用Flink SQL來(lái)部署管道。

原文鏈接:https://www.decodable.co/blog/exploring-the-flink-sql-gateway-rest-api

上一篇:

使用 FastAPI、Pytorch 和 SerpApi 進(jìn)行自動(dòng)訓(xùn)練

下一篇:

Rest API 教程 – 完整的初學(xué)者指南
#你可能也喜歡這些API文章!

我們有何不同?

API服務(wù)商零注冊(cè)

多API并行試用

數(shù)據(jù)驅(qū)動(dòng)選型,提升決策效率

查看全部API→
??

熱門場(chǎng)景實(shí)測(cè),選對(duì)API

#AI文本生成大模型API

對(duì)比大模型API的內(nèi)容創(chuàng)意新穎性、情感共鳴力、商業(yè)轉(zhuǎn)化潛力

25個(gè)渠道
一鍵對(duì)比試用API 限時(shí)免費(fèi)

#AI深度推理大模型API

對(duì)比大模型API的邏輯推理準(zhǔn)確性、分析深度、可視化建議合理性

10個(gè)渠道
一鍵對(duì)比試用API 限時(shí)免費(fèi)