
如何快速實(shí)現(xiàn)REST API集成以優(yōu)化業(yè)務(wù)流程
現(xiàn)在,您將在 Postman 集合下看到所有 API 終端節(jié)點(diǎn)的列表,以及每個(gè)終端節(jié)點(diǎn)的預(yù)創(chuàng)建調(diào)用。轉(zhuǎn)到 Environments -> Globals 并使用 SQL Gateway 的值定義 <span class=“inline-code”>baseUrl</span>
。如果你在本地運(yùn)行它,那么這將是 <span class=“inline-code”>http://localhost:8083</span>
現(xiàn)在返回 Collections,在 Flink SQL Gateway REST API 文件夾下找到 <span class=“inline-code”>get Info</span>
調(diào)用。打開(kāi)它并點(diǎn)擊 發(fā)送.您應(yīng)該會(huì)看到如下所示的成功響應(yīng):
您還可以單擊“代碼”圖標(biāo) (<span class=“inline-code”></></span> )
查看各種不同語(yǔ)言和工具(包括 cURL 和 HTTPie)的調(diào)用。雖然目前這還不算是什么創(chuàng)新之舉,但當(dāng)你開(kāi)始使用有效載荷時(shí),就會(huì)發(fā)現(xiàn)它真的非常便捷。
就像我們?cè)谏厦媸謩?dòng)填充全局變量 <span class=“inline-code”>baseURL</span>
一樣,我們也可以從一個(gè)調(diào)用中獲取響應(yīng),并在另一個(gè)調(diào)用中使用它。這非常有用,因?yàn)槲覀冃枰褂?REST API 返回的兩個(gè)變量(<span class=“inline-code”>sessionHandle</span> 和 <span class=“inline-code”>operationHandle</span>)
。要在 Postman 中執(zhí)行此操作,請(qǐng)將以下內(nèi)容添加到請(qǐng)求窗格的 Tests (測(cè)試) 選項(xiàng)卡中:
var jsonData = JSON.parse(responseBody); postman.setEnvironmentVariable("sessionHandle", jsonData.sessionHandle);
這假設(shè)要填充的變量名為 <span class=“inline-code”>sessionHandle</span>
并且它在響應(yīng)的名為 <span class=“inline-code”>sessionHandle</span>
的根鍵中返回。它是:
$ http --follow --timeout 3600 POST 'localhost:8083/sessions' \
Content-Type:'application/json' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 56
content-type: application/json; charset=UTF-8
{
"sessionHandle": "190edef5-df00-4182-be0e-431737b1e93b"
}
在設(shè)置變量之后,你可以通過(guò)在雙大括號(hào)中對(duì)其進(jìn)行引用,來(lái)在其他調(diào)用中使用這個(gè)變量。如下所示:
我在此分享了我的Postman集合的副本,其中已經(jīng)為您完成了上述的變量配置。
現(xiàn)在,我們來(lái)了解一下如何從頭開(kāi)始將 SQL 語(yǔ)句實(shí)際提交到網(wǎng)關(guān)的工作流。
從本質(zhì)上講,最少的步驟如下。
以下是執(zhí)行每個(gè)操作的方法,使用 HTTPie 作為示例客戶端并顯示響應(yīng)。我使用bash變量來(lái)存儲(chǔ)會(huì)話和操作句柄的值。
$ http --body --follow --timeout 3600 GET 'localhost:8083/info' \
Accept:'application/json'
{
"productName": "Apache Flink",
"version": "1.18.1"
}
POST /會(huì)話
$ printf '{
"properties": {
"execution.runtime-mode": "batch"
}
}'| http --follow --timeout 3600 POST 'localhost:8083/sessions' \
Content-Type:'application/json' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 56
content-type: application/json; charset=UTF-8
{
"sessionHandle": "e296eb18-9b6e-4fbc-bd6c-0cbb93a7fe28"
}
$ export SESSIONHANDLE="e296eb18-9b6e-4fbc-bd6c-0cbb93a7fe28"
請(qǐng)注意,此處的 <span class=“inline-code”>runtime-mode</span>
是從上面在會(huì)話創(chuàng)建中傳遞的 <span class=“inline-code”>properties</span>
中設(shè)置的。
$ http --ignore-stdin --form --follow --timeout 3600 GET 'localhost:8083/sessions/'$SESSIONHANDLE \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 2129
content-type: application/json; charset=UTF-8
Copy
{
"properties": {
"env.java.opts.all": "--add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNN
AMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=AL
L-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-op
ens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/ja
va.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.ut
il.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED",
"execution.attached": "true",
"execution.runtime-mode": "batch",
"execution.savepoint-restore-mode": "NO_CLAIM",
"execution.savepoint.ignore-unclaimed-state": "false",
"execution.shutdown-on-attached-exit": "false",
"execution.target": "remote",
"jobmanager.bind-host": "localhost",
"jobmanager.execution.failover-strategy": "region",
"jobmanager.memory.process.size": "1600m",
"jobmanager.rpc.address": "localhost",
"jobmanager.rpc.port": "6123",
"parallelism.default": "1",
"pipeline.classpaths": "",
"pipeline.jars": "file:/Users/rmoff/flink/flink-1.18.1/opt/flink-python-1.18.1.jar",
"rest.address": "localhost",
"rest.bind-address": "localhost",
"sql-gateway.endpoint.rest.address": "localhost",
"table.catalog-store.file.path": "./conf/catalogs",
"table.catalog-store.kind": "file",
"table.resources.download-dir": "/var/folders/7x/nscwrz557vlcd_ydgt7d5wt00000gn/T/sql-gateway-e296eb18-9b6e-4fbc-bd6c-0cbb93a7fe28",
"taskmanager.bind-host": "localhost",
"taskmanager.host": "localhost",
"taskmanager.memory.process.size": "1728m",
"taskmanager.numberOfTaskSlots": "1"
}
}
$ printf '{
"statement": "CREATE TABLE t_foo WITH ( '\''connector'\'' = '\''filesystem'\'', '\''path'\'' = '\''file:///tmp/flink-test'\'', '\''format'\'' = '\''csv'\'', '\''csv.field-delimiter'\'' = '\'','\'' ) AS SELECT name, COUNT(*) AS cnt FROM (VALUES ('\''Bob'\''), ('\''Alice'\''), ('\''Greg'\''), ('\''Bob'\'')) AS NameTable(name) GROUP BY name;"
}'| http --follow --timeout 3600 POST 'localhost:8083/sessions/'$SESSIONHANDLE'/statements' \
Content-Type:'application/json' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 58
content-type: application/json; charset=UTF-8
{
"operationHandle": "ba45649c-07b2-4b1c-a190-df3631b53549"
}
$ export OPERATIONHANDLE="ba45649c-07b2-4b1c-a190-df3631b53549"
$ http --follow --timeout 3600 GET 'localhost:8083/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/status' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 21
content-type: application/json; charset=UTF-8
{
"status": "FINISHED"
}
$ http --follow --timeout 3600 GET 'localhost:8083/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/result/0?rowFormat=JSON' \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 483
content-type: application/json; charset=UTF-8
Copy
{
"resultType": "PAYLOAD",
"isQueryResult": false,
"jobID": "fb1a5f06643364bc82a9a4e0bd3e9c10",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": [
{
"kind": "INSERT",
"fields": [
"fb1a5f06643364bc82a9a4e0bd3e9c10"
]
}
]
},
"nextResultUri": "/v2/sessions/41ec5bb8-3574-4c6b-9b47-7bf9aa021ccc/operations/9bb84ff8-89a6-4f94-8dcc-e9125091c63b/result/1?rowFormat=JSON"
}
因?yàn)?<span class=“inline-code”>resultType</span>
不是 <span class=“inline-code”>EOS</span>
并且有一個(gè)值<span class=“inline-code”>nextResultUri</span>
它告訴我們還有更多要獲取 – 在 <span class=“inline-code”>nextResultUri</span>
中指定的位置:
{
"resultType": "EOS",
"isQueryResult": false,
"jobID": "fb1a5f06643364bc82a9a4e0bd3e9c10",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": []
}
}
正確的做法是在會(huì)話結(jié)束后將其關(guān)閉:
$ http --follow --timeout 3600 DELETE 'localhost:8083/sessions/'$SESSIONHANDLE \
Accept:'application/json'
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 19
content-type: application/json; charset=UTF-8
{
"status": "CLOSED"
}
我們可以使用上述所有內(nèi)容和一些 bash 來(lái)編寫(xiě)腳本:
host='localhost:8083'
SESSIONHANDLE=$(printf '{
"properties": {
"execution.runtime-mode": "batch"
}
}'| http --follow --timeout 3600 POST $host'/sessions' \
Content-Type:'application/json' \
Accept:'application/json' | jq -r '.sessionHandle')
echo "Got session handle: "$SESSIONHANDLE
SQL_STATEMENT_ONE_LINE=$(tr '\n' ' ' < rmoff.sql)
OPERATIONHANDLE=$(printf '{
"statement": "%s"
}' "$SQL_STATEMENT_ONE_LINE" | http --follow --timeout 3600 POST $host'/sessions/'$SESSIONHANDLE'/statements' \
Content-Type:'application/json' \
Accept:'application/json' | jq -r '.operationHandle')
echo "Got operation handle: "$OPERATIONHANDLE
while [ 1 -eq 1 ]
do
STATUS=$(http --follow --timeout 3600 GET $host'/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/status' \
Accept:'application/json' | jq -r '.status')
echo $STATUS
if [ $STATUS != "RUNNING" ]; then
break
fi
sleep 2
done
echo "\n\n----- ?? RESULTS ?? -----\n"
URL='/sessions/'$SESSIONHANDLE'/operations/'$OPERATIONHANDLE'/result/0?rowFormat=JSON'
while [ 1 -eq 1 ]
do
RESULT=$(http --follow --timeout 3600 GET $host$URL \
Accept:'application/json')
echo $RESULT | jq '.'
URL=$(echo $RESULT | jq -r '.nextResultUri // ""')
if [ -z $URL ]; then
break
fi
echo "(next result chunk ??)"
done
echo "Closing session ???"
http --follow --timeout 3600 DELETE $host'/sessions/'$SESSIONHANDLE
我們將實(shí)際的 SQL 放入一個(gè)名為 <span class=“inline-code”>rmoff.sql</span>
的文件中:
CREATE TABLE t_foo WITH (
'connector' = 'filesystem',
'path' = 'file:///tmp/flink-test',
'format' = 'csv',
'csv.field-delimiter' = ','
) AS SELECT name, COUNT(*) AS cnt FROM (
VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')
) AS NameTable(name) GROUP BY name;
現(xiàn)在,當(dāng)我們運(yùn)行 shell 腳本時(shí),我們得到這個(gè):
Got session handle: 8d7dc671-d7aa-4ddb-ba04-706b0311aa69
Got operation handle: 3aa41360-bd21-453a-a759-b54db69c81ae
RUNNING
FINISHED
----- ?? RESULTS ?? -----
{
"resultType": "PAYLOAD",
"isQueryResult": false,
"jobID": "615365befee24c53d1efa195f9d72eee",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": [
{
"kind": "INSERT",
"fields": [
"615365befee24c53d1efa195f9d72eee"
]
}
]
},
"nextResultUri": "/v2/sessions/8d7dc671-d7aa-4ddb-ba04-706b0311aa69/operations/3aa41360-bd21-453a-a759-b54db69c81ae/result/1?rowFormat=JSON"
}
(next result chunk ??)
{
"resultType": "EOS",
"isQueryResult": false,
"jobID": "615365befee24c53d1efa195f9d72eee",
"resultKind": "SUCCESS_WITH_CONTENT",
"results": {
"columns": [
{
"name": "job id",
"logicalType": {
"type": "VARCHAR",
"nullable": true,
"length": 2147483647
},
"comment": null
}
],
"rowFormat": "JSON",
"data": []
}
}
Closing session ???
HTTP/1.1 200 OK
access-control-allow-origin: *
connection: keep-alive
content-length: 19
content-type: application/json; charset=UTF-8
{
"status": "CLOSED"
}
我們實(shí)際運(yùn)行的SQL語(yǔ)句會(huì)將一個(gè)CSV文件寫(xiě)入<span class="inline-code">/tmp</span>
文件夾,因此讓我們檢查一下該文件是否已成功生成并有效:
$ ls -lrt /tmp/flink-test && cat /tmp/flink-test/*
-rw-r--r--@ 1 rmoff wheel 21 7 Mar 18:07 part-f50c05ae-e39e-40c1-8b00-b1a1ebfced0d-task-0-file-0
Alice,1
Bob,2
Greg,1
很好,這完全符合我們的預(yù)期。
如果您想了解有關(guān) Flink SQL 的更多信息,您可能有興趣了解有關(guān) Catalog 角色的更多信息、使用 Catalog 的動(dòng)手示例,或者深入了解如何將 JAR 與 Flink SQL 結(jié)合使用。
您可能還想嘗試一下我們的Decodable服務(wù),它提供了完全托管的Apache Flink和Debezium。通過(guò)我們的CLI和API,您可以輕松地使用Flink SQL來(lái)部署管道。
原文鏈接:https://www.decodable.co/blog/exploring-the-flink-sql-gateway-rest-api
對(duì)比大模型API的內(nèi)容創(chuàng)意新穎性、情感共鳴力、商業(yè)轉(zhuǎn)化潛力
一鍵對(duì)比試用API 限時(shí)免費(fèi)