国内精品久久久久影院日本,日本中文字幕视频,99久久精品99999久久,又粗又大又黄又硬又爽毛片

所有文章 > 學習各類API > 探索 Flink SQL Gateway REST API
探索 Flink SQL Gateway REST API

探索 Flink SQL Gateway REST API

Apache Flink 中的 SQL 網關提供了一種從 SQL 客戶端以外的位置在 Flink 中運行 SQL 的方法。這涵蓋了使用JDBC驅動程序(支持大量客戶端連接)、通過HiveServer2端點的Hive客戶端訪問,以及直接針對REST端點的操作,都是為了找到最適合您需求的產品。

在我繼續學習 Flink SQL 的旅程時,我想知道的一件事是,在生產場景中如何提交 Flink SQL 作業。在我看來,SQL Gateway的REST API會是一個很好的選擇。您將 SQL 代碼放在源代碼管理下的文件中,然后使用部署管道針對終端節點提交該 SQL。值得注意的是,建議在將生產作業部署到 Flink 時使用應用程序模式,而 SQL 客戶端和網關尚不支持此功能。當前有一個FLIP正在討論中,但如果您傾向于使用應用程序模式,那么就需要將SQL打包在JAR文件中進行部署。當然,您也可以選擇繼續使用SQL客戶端或網關,不過需要注意的是,在會話模式下運行作業時會存在一些限制,主要就是沒有資源隔離。

在本文中,我將向您展示如何使用終端節點,包括使用 Postman 工具探索它,使用 HTTPie 從 shell 調用終端節點,最后使用一個可行的概念驗證腳本來執行腳本中的語句。

該 API 有文檔記錄,有兩個版本,每個版本都有自己的 OpenAPI YAML 規范。我將在此處查看 <span class=“inline-code”>v2</span>。請注意,根據文檔,該規范目前仍處于實驗階段。

首先,讓我們在本地啟動 Flink 集群和 SQL 網關:

./bin/start-cluster.sh

./bin/sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=localhost

Postman是什么

Postman?是一個方便的工具,用于使用 API 執行大量有用的操作。在這里,我只是借助它來創建針對終端節點的示例調用,以便快速了解它們之間的關系。你可以在 Web 上使用它,但假設你使用的是 Flink 的本地實例(或者至少是不可公開訪問的實例),那么你需要桌面版本。請注意,為了訪問我們在此處所使用的導入功能,您仍然需要先注冊一個免費帳戶。

在您的工作區下,單擊導入并粘貼 SQL Gateway 的 OpenAPI YAML 文件的 URL (<span class=“inline-code”>https://nightlies.apache.org/flink/flink-docs-master/generated/rest_v2_sql_gateway.yml</span>),該文件在文檔頁面上鏈接到該文件。將其作為 Postman 集合導入。

CleanShot 2024-03-07 于 17.21.51 1.png

現在,您將在 Postman 集合下看到所有 API 終端節點的列表,以及每個終端節點的預創建調用。轉到 Environments -> Globals 并使用 SQL Gateway 的值定義 <span class=“inline-code”>baseUrl</span>。如果你在本地運行它,那么這將是 <span class=“inline-code”>http://localhost:8083</span>

CleanShot 2024-03-07 于 17.27.59.png

現在返回 Collections,在 Flink SQL Gateway REST API 文件夾下找到 <span class=“inline-code”>get Info</span> 調用。打開它并點擊 發送.您應該會看到如下所示的成功響應:

CleanShot 2024-03-07 于 17.32.59.png

您還可以單擊“代碼”圖標 (<span class=“inline-code”></></span> ) 查看各種不同語言和工具(包括 cURL 和 HTTPie)的調用。雖然目前這還不算是什么創新之舉,但當你開始使用有效載荷時,就會發現它真的非常便捷。

CleanShot 2024-03-07 于 17.34.00.png

就像我們在上面手動填充全局變量 <span class=“inline-code”>baseURL</span>一樣,我們也可以從一個調用中獲取響應,并在另一個調用中使用它。這非常有用,因為我們需要使用 REST API 返回的兩個變量(<span class=“inline-code”>sessionHandle</span> 和 <span class=“inline-code”>operationHandle</span>)。要在 Postman 中執行此操作,請將以下內容添加到請求窗格的 Tests (測試) 選項卡中:

var jsonData = JSON.parse(responseBody); postman.setEnvironmentVariable("sessionHandle", jsonData.sessionHandle);

這假設要填充的變量名為 <span class=“inline-code”>sessionHandle</span>并且它在響應的名為 <span class=“inline-code”>sessionHandle</span> 的根鍵中返回。它是:

$ http  --follow --timeout 3600 POST 'localhost:8083/sessions' \
Content-Type:'application/json' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 56
content-type: application/json; charset=UTF-8

{
"sessionHandle": "190edef5-df00-4182-be0e-431737b1e93b"
}

在設置變量之后,你可以通過在雙大括號中對其進行引用,來在其他調用中使用這個變量。如下所示:

CleanShot 2024-03-07 于 17.46.23.png

我在此分享了我的Postman集合的副本,其中已經為您完成了上述的變量配置。

現在,我們來了解一下如何從頭開始將 SQL 語句實際提交到網關的工作流。

使用 Flink SQL 網關運行 SQL 語句

從本質上講,最少的步驟如下。

  1. 建立會話(設置了可選配置參數)
  2. 提交 SQL 語句,這將生成一個 Operation。
  3. 檢查 Operation (操作) 的狀態,直到它完成
  4. 獲取 Operation 的結果。

以下是執行每個操作的方法,使用 HTTPie 作為示例客戶端并顯示響應。我使用bash變量來存儲會話和操作句柄的值。

檢查連接和 Flink 版本

$ http --body --follow --timeout 3600 GET 'localhost:8083/info' \
Accept:'application/json'
{
"productName": "Apache Flink",
"version": "1.18.1"
}

1. 創建會話

POST /會話

$ printf '{
"properties": {
"execution.runtime-mode": "batch"
}
}'| http --follow --timeout 3600 POST 'localhost:8083/sessions' \
Content-Type:'application/json' \
Accept:'application/json'

HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 56
content-type: application/json; charset=UTF-8

{
"sessionHandle": "e296eb18-9b6e-4fbc-bd6c-0cbb93a7fe28"
}

$ export SESSIONHANDLE="e296eb18-9b6e-4fbc-bd6c-0cbb93a7fe28"

[可選]驗證會話并讀取會話配置

請注意,此處的 <span class=“inline-code”>runtime-mode</span> 是從上面在會話創建中傳遞的 <span class=“inline-code”>properties</span> 中設置的。

$ http --ignore-stdin --form --follow --timeout 3600 GET 'localhost:8083/sessions/'$SESSIONHANDLE \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 2129
content-type: application/json; charset=UTF-8
Copy
{
"properties": {
"env.java.opts.all": "--add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNN
AMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=AL
L-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-op
ens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/ja
va.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.ut
il.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED",
"execution.attached": "true",
"execution.runtime-mode": "batch",
"execution.savepoint-restore-mode": "NO_CLAIM",
"execution.savepoint.ignore-unclaimed-state": "false",
"execution.shutdown-on-attached-exit": "false",
"execution.target": "remote",
"jobmanager.bind-host": "localhost",
"jobmanager.execution.failover-strategy": "region",
"jobmanager.memory.process.size": "1600m",
"jobmanager.rpc.address": "localhost",
"jobmanager.rpc.port": "6123",
"parallelism.default": "1",
"pipeline.classpaths": "",
"pipeline.jars": "file:/Users/rmoff/flink/flink-1.18.1/opt/flink-python-1.18.1.jar",
"rest.address": "localhost",
"rest.bind-address": "localhost",
"sql-gateway.endpoint.rest.address": "localhost",
"table.catalog-store.file.path": "./conf/catalogs",
"table.catalog-store.kind": "file",
"table.resources.download-dir": "/var/folders/7x/nscwrz557vlcd_ydgt7d5wt00000gn/T/sql-gateway-e296eb18-9b6e-4fbc-bd6c-0cbb93a7fe28",
"taskmanager.bind-host": "localhost",
"taskmanager.host": "localhost",
"taskmanager.memory.process.size": "1728m",
"taskmanager.numberOfTaskSlots": "1"
}
}

2. 提交 SQL 語句

$ printf '{
"statement": "CREATE TABLE t_foo WITH ( '\''connector'\'' = '\''filesystem'\'', '\''path'\'' = '\''file:///tmp/flink-test'\'', '\''format'\'' = '\''csv'\'', '\''csv.field-delimiter'\'' = '\'','\'' ) AS SELECT name, COUNT(*) AS cnt FROM (VALUES ('\''Bob'\''), ('\''Alice'\''), ('\''Greg'\''), ('\''Bob'\'')) AS NameTable(name) GROUP BY name;"
}'| http --follow --timeout 3600 POST 'localhost:8083/sessions/'$SESSIONHANDLE'/statements' \
Content-Type:'application/json' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 58
content-type: application/json; charset=UTF-8

{
"operationHandle": "ba45649c-07b2-4b1c-a190-df3631b53549"
}

$ export OPERATIONHANDLE="ba45649c-07b2-4b1c-a190-df3631b53549"

3. 獲取 Statement 執行狀態

$ http --follow --timeout 3600 GET 'localhost:8083/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/status' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 21
content-type: application/json; charset=UTF-8

{
"status": "FINISHED"
}

4. 獲得結果

$ http --follow --timeout 3600 GET 'localhost:8083/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/result/0?rowFormat=JSON' \
Accept:'application/json'

HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 483
content-type: application/json; charset=UTF-8
Copy
{
"resultType": "PAYLOAD",
"isQueryResult": false,
"jobID": "fb1a5f06643364bc82a9a4e0bd3e9c10",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": [
{
"kind": "INSERT",
"fields": [
"fb1a5f06643364bc82a9a4e0bd3e9c10"
]
}
]
},
"nextResultUri": "/v2/sessions/41ec5bb8-3574-4c6b-9b47-7bf9aa021ccc/operations/9bb84ff8-89a6-4f94-8dcc-e9125091c63b/result/1?rowFormat=JSON"
}

因為 <span class=“inline-code”>resultType</span> 不是 <span class=“inline-code”>EOS</span>并且有一個值<span class=“inline-code”>nextResultUri</span>它告訴我們還有更多要獲取 – 在 <span class=“inline-code”>nextResultUri</span>中指定的位置:

{
"resultType": "EOS",
"isQueryResult": false,
"jobID": "fb1a5f06643364bc82a9a4e0bd3e9c10",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": []
}
}

5. 整理

正確的做法是在會話結束后將其關閉:

$ http --follow --timeout 3600 DELETE 'localhost:8083/sessions/'$SESSIONHANDLE \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 19
content-type: application/json; charset=UTF-8

{
"status": "CLOSED"
}

使用 Shell 腳本執行 Flink SQL

我們可以使用上述所有內容和一些 bash 來編寫腳本:

host='localhost:8083'

SESSIONHANDLE=$(printf '{
"properties": {
"execution.runtime-mode": "batch"
}
}'| http --follow --timeout 3600 POST $host'/sessions' \
Content-Type:'application/json' \
Accept:'application/json' | jq -r '.sessionHandle')

echo "Got session handle: "$SESSIONHANDLE


SQL_STATEMENT_ONE_LINE=$(tr '\n' ' ' < rmoff.sql)

OPERATIONHANDLE=$(printf '{
"statement": "%s"
}' "$SQL_STATEMENT_ONE_LINE" | http --follow --timeout 3600 POST $host'/sessions/'$SESSIONHANDLE'/statements' \
Content-Type:'application/json' \
Accept:'application/json' | jq -r '.operationHandle')

echo "Got operation handle: "$OPERATIONHANDLE

while [ 1 -eq 1 ]
do
STATUS=$(http --follow --timeout 3600 GET $host'/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/status' \
Accept:'application/json' | jq -r '.status')
echo $STATUS
if [ $STATUS != "RUNNING" ]; then
break
fi
sleep 2
done

echo "\n\n----- ?? RESULTS ?? -----\n"
URL='/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/result/0?rowFormat=JSON'
while [ 1 -eq 1 ]
do
RESULT=$(http --follow --timeout 3600 GET $host$URL \
Accept:'application/json')
echo $RESULT | jq '.'
URL=$(echo $RESULT | jq -r '.nextResultUri // ""')
if [ -z $URL ]; then
break
fi
echo "(next result chunk ??)"
done

echo "Closing session ???"
http --follow --timeout 3600 DELETE $host'/sessions/'$SESSIONHANDLE

我們將實際的 SQL 放入一個名為 <span class=“inline-code”>rmoff.sql</span> 的文件中:

CREATE TABLE t_foo WITH (
'connector' = 'filesystem',
'path' = 'file:///tmp/flink-test',
'format' = 'csv',
'csv.field-delimiter' = ','
) AS SELECT name, COUNT(*) AS cnt FROM (
VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')
) AS NameTable(name) GROUP BY name;

現在,當我們運行 shell 腳本時,我們得到這個:

Got session handle: 8d7dc671-d7aa-4ddb-ba04-706b0311aa69
Got operation handle: 3aa41360-bd21-453a-a759-b54db69c81ae
RUNNING
FINISHED


----- ?? RESULTS ?? -----

{
"resultType": "PAYLOAD",
"isQueryResult": false,
"jobID": "615365befee24c53d1efa195f9d72eee",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": [
{
"kind": "INSERT",
"fields": [
"615365befee24c53d1efa195f9d72eee"
]
}
]
},
"nextResultUri": "/v2/sessions/8d7dc671-d7aa-4ddb-ba04-706b0311aa69/operations/3aa41360-bd21-453a-a759-b54db69c81ae/result/1?rowFormat=JSON"
}
(next result chunk ??)
{
"resultType": "EOS",
"isQueryResult": false,
"jobID": "615365befee24c53d1efa195f9d72eee",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": []
}
}
Closing session ???
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 19
content-type: application/json; charset=UTF-8

{
"status": "CLOSED"
}

我們實際運行的SQL語句會將一個CSV文件寫入<span class="inline-code">/tmp</span>文件夾,因此讓我們檢查一下該文件是否已成功生成并有效:

$ ls -lrt /tmp/flink-test && cat /tmp/flink-test/*
-rw-r--r--@ 1 rmoff wheel 21 7 Mar 18:07 part-f50c05ae-e39e-40c1-8b00-b1a1ebfced0d-task-0-file-0
Alice,1
Bob,2
Greg,1

很好,這完全符合我們的預期。

總結

如果您想了解有關 Flink SQL 的更多信息,您可能有興趣了解有關 Catalog 角色的更多信息、使用 Catalog 的動手示例,或者深入了解如何將 JAR 與 Flink SQL 結合使用。

您可能還想嘗試一下我們的Decodable服務,它提供了完全托管的Apache Flink和Debezium。通過我們的CLI和API,您可以輕松地使用Flink SQL來部署管道。

原文鏈接:https://www.decodable.co/blog/exploring-the-flink-sql-gateway-rest-api

#你可能也喜歡這些API文章!