
掌握API建模:基本概念和實踐
{
//該方法用于處理查詢創建事件,它接受一個 QueryCreatedEvent 類型的參數,該參數包含了查詢創建時的詳細信息。
default void queryCreated(QueryCreatedEvent queryCreatedEvent)
{
}
//該方法用于處理查詢完成事件,它接受一個 QueryCompletedEvent 類型的參數,該參數包含了查詢完成時的詳細信息。
default void queryCompleted(QueryCompletedEvent queryCompletedEvent)
{
}
//該方法用于處理分割完成事件,它接受一個 SplitCompletedEvent 類型的參數,該參數包含了分割完成時的詳細信息。
default void splitCompleted(SplitCompletedEvent splitCompletedEvent)
{
}
}
下面通過一個簡單的例子來說明如何實現自定義Event Listener。假設我們的目標是創建一個監聽器,用于記錄查詢任務的一些狀態信息,并將這些信息發送kafka 中。
package com.ds;
import com.facebook.presto.spi.eventlistener.EventListener;
import com.facebook.presto.spi.eventlistener.EventListenerFactory;
import java.util.Map;
public class QueryEventListenerFactory implements EventListenerFactory {
@Override
public String getName() {
return "query-event-listener";
}
@Override
///直接使用Presto傳遞過來的Map<String, String>配置,
public EventListener create(Map<String, String> config) {
if (!config.containsKey("kafka.bootstrap.servers")) {
throw new RuntimeException("event-listener.properties file missing kafka.bootstrap.servers");
}
// 檢查Kafka主題配置
if (!config.containsKey("kafka.topic")) {
throw new RuntimeException("event-listener.properties file missing kafka.topic");
}
return new QueryEventListener(config);
}
}
代碼簡介:
1.QueryEventListenerFactory類實現了EventListenerFactory接口,這個接口是用來創建事件監聽器的工廠。
2.getName方法返回一個字符串"query-event-listener",表示這個工廠的名稱。
3.create方法接收一個Map<String, String>類型的配置參數,用于創建事件監聽器。在方法內部,首先檢查配置中是否包含"kafka.bootstrap.servers"和"kafka.topic"這兩個關鍵配置,如果缺少其中之一,就會拋出運行時異常。
4.如果配置完整,就會創建一個新的QueryEventListener對象,將配置參數傳遞給它,并返回該對象作為事件監聽器。
5.說明:Presto在啟動過程中會讀取/etc 目錄下以.properties結尾的配置文件,并將配置項以Map<String, String>的形式傳遞給EventListenerFactory的create方法。因此,QueryEventListenerFactory可以從這個Map中讀取所需的配置信息,如果沒有找到必要的配置項,會拋出異常。
Plugin
接口。package com.ds;
import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.eventlistener.EventListenerFactory;
import java.util.Arrays;
public class QueryEventPlugin implements Plugin {
@Override
public Iterable<EventListenerFactory> getEventListenerFactories() {
EventListenerFactory listenerFactory = new QueryEventListenerFactory();
return Arrays.asList(listenerFactory);
}
}
代碼簡介:
package com.ds;
import com.facebook.presto.spi.eventlistener.EventListener;
import com.facebook.presto.spi.eventlistener.QueryCompletedEvent;
import com.facebook.presto.spi.eventlistener.QueryCreatedEvent;
import com.facebook.presto.spi.eventlistener.SplitCompletedEvent;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class QueryEventListener implements EventListener {
private KafkaProducer<String, String> producer;
private String KafKaTopic;
private Map<String, String> config;
public QueryEventListener(Map<String, String> config) {
this.config = new HashMap<>();
this.config.putAll(config);
init();
}
private void init() {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", config.get("kafka.bootstrap.servers"));
kafkaProps.put("key.serializer", StringSerializer.class);
kafkaProps.put("value.serializer", StringSerializer.class);
this.producer = new KafkaProducer<>(kafkaProps);
this.KafKaTopic = (String) config.get("kafka.topic");
}
@Override
public void queryCreated(QueryCreatedEvent queryCreatedEvent) {
}
@Override
public void queryCompleted(QueryCompletedEvent queryCompletedEvent) {
String queryId = queryCompletedEvent.getMetadata().getQueryId();
String querySql = queryCompletedEvent.getMetadata().getQuery();
String queryState = queryCompletedEvent.getMetadata().getQueryState();
String queryUser = queryCompletedEvent.getContext().getUser();
long createTime = queryCompletedEvent.getCreateTime().toEpochMilli();
long endTime = queryCompletedEvent.getEndTime().toEpochMilli();
long startTime = queryCompletedEvent.getExecutionStartTime().toEpochMilli();
long analysisTime = queryCompletedEvent.getStatistics().getAnalysisTime().orElse(Duration.ZERO)
.toMillis();
long cpuTime = queryCompletedEvent.getStatistics().getCpuTime().toMillis();
long queuedTime = queryCompletedEvent.getStatistics().getQueuedTime().toMillis();
long wallTime = queryCompletedEvent.getStatistics().getWallTime().toMillis();
int completedSplits = queryCompletedEvent.getStatistics().getCompletedSplits();
double cumulativeMemory = queryCompletedEvent.getStatistics().getCumulativeMemory();
long outputBytes = queryCompletedEvent.getStatistics().getOutputBytes();
long outputRows = queryCompletedEvent.getStatistics().getOutputRows();
long totalBytes = queryCompletedEvent.getStatistics().getTotalBytes();
long totalRows = queryCompletedEvent.getStatistics().getTotalRows();
long writtenBytes = queryCompletedEvent.getStatistics().getWrittenOutputBytes();
long writtenRows = queryCompletedEvent.getStatistics().getWrittenOutputRows();
// 構造事件數據
String eventData = "{" +
"\"queryId\": \"" + queryId + "\"," +
"\"querySql\": \"" + querySql + "\"," +
"\"queryState\": \"" + queryState + "\"," +
"\"queryUser\": \"" + queryUser + "\"," +
"\"createTime\": " + createTime + "," +
"\"endTime\": " + endTime + "," +
"\"startTime\": " + startTime + "," +
"\"analysisTime\": " + analysisTime + "," +
"\"cpuTime\": " + cpuTime + "," +
"\"queuedTime\": " + queuedTime + "," +
"\"wallTime\": " + wallTime + "," +
"\"completedSplits\": " + completedSplits + "," +
"\"cumulativeMemory\": " + cumulativeMemory + "," +
"\"outputBytes\": " + outputBytes + "," +
"\"outputRows\": " + outputRows + "," +
"\"totalBytes\": " + totalBytes + "," +
"\"totalRows\": " + totalRows + "," +
"\"writtenBytes\": " + writtenBytes + "," +
"\"writtenRows\": " + writtenRows +
"}";
// 發送到 Kafka 主題
ProducerRecord<String, String> Completedrecord = new ProducerRecord<>(KafKaTopic, eventData);
producer.send(Completedrecord);
queryCompletedEvent.getFailureInfo().ifPresent(queryFailureInfo -> {
int code = queryFailureInfo.getErrorCode().getCode();
String name = queryFailureInfo.getErrorCode().getName();
String failureType = queryFailureInfo.getFailureType().orElse("").toUpperCase();
String failureHost = queryFailureInfo.getFailureHost().orElse("").toUpperCase();
String failureMessage = queryFailureInfo.getFailureMessage().orElse("").toUpperCase();
String failureTask = queryFailureInfo.getFailureTask().orElse("").toUpperCase();
String failuresJson = queryFailureInfo.getFailuresJson();
// 構造 JSON 字符串
String failureData = "{"
+ "\"code\": " + code + ","
+ "\"name\": \"" + name + "\","
+ "\"failureType\": \"" + failureType + "\","
+ "\"failureHost\": \"" + failureHost + "\","
+ "\"failureMessage\": \"" + failureMessage + "\","
+ "\"failureTask\": \"" + failureTask + "\","
+ "\"failuresJson\": \"" + failuresJson + "\""
+ "}";
// 發送到 Kafka 主題
ProducerRecord<String, String> failurerecord = new ProducerRecord<>(KafKaTopic, failureData);
producer.send(failurerecord);
});
}
@Override
public void splitCompleted(SplitCompletedEvent splitCompletedEvent) {
long createTime = splitCompletedEvent.getCreateTime().toEpochMilli();
long endTime = splitCompletedEvent.getEndTime().orElse(Instant.MIN).toEpochMilli();
String payload = splitCompletedEvent.getPayload();
String queryId = splitCompletedEvent.getQueryId();
String stageId = splitCompletedEvent.getStageId();
long startTime = splitCompletedEvent.getStartTime().orElse(Instant.MIN).toEpochMilli();
String taskId = splitCompletedEvent.getTaskId();
long completedDataSizeBytes = splitCompletedEvent.getStatistics().getCompletedDataSizeBytes();
long completedPositions = splitCompletedEvent.getStatistics().getCompletedPositions();
long completedReadTime = splitCompletedEvent.getStatistics().getCompletedReadTime().toMillis();
long cpuTime = splitCompletedEvent.getStatistics().getCpuTime().toMillis();
long queuedTime = splitCompletedEvent.getStatistics().getQueuedTime().toMillis();
long wallTime = splitCompletedEvent.getStatistics().getWallTime().toMillis();
////
// 構造拆分事件數據
String splitEventData = "{ "
+ "\"createTime\": " + createTime + ", "
+ "\"endTime\": " + endTime + ", "
+ "\"payload\": \"" + payload + "\", "
+ "\"queryId\": \"" + queryId + "\", "
+ "\"stageId\": \"" + stageId + "\", "
+ "\"startTime\": " + startTime + ", "
+ "\"taskId\": \"" + taskId + "\", "
+ "\"completedDataSizeBytes\": " + completedDataSizeBytes + ", "
+ "\"completedPositions\": " + completedPositions + ", "
+ "\"completedReadTime\": " + completedReadTime + ", "
+ "\"cpuTime\": " + cpuTime + ", "
+ "\"queuedTime\": " + queuedTime + ", "
+ "\"wallTime\": " + wallTime
+ " }";
// 發送到 Kafka 主題
ProducerRecord<String, String> splitrecord = new ProducerRecord<>(KafKaTopic, splitEventData);
producer.send(splitrecord);
}
public void close() {
if (producer != null) {
producer.close();
}
}
}
代碼簡介:
在 QueryEventListener 類中,首先聲明了一個 KafkaProducer 對象和一些其他實例變量。
構造函數接受一個 config 參數,該參數被用于初始化監聽器的配置信息,并且在初始化過程中會建立 Kafka 生產者連接。
init 方法用于初始化 Kafka 生產者對象,配置 Kafka 連接信息,并設置 Kafka 主題。
queryCreated 方法用于處理查詢創建事件,但在代碼中未給出具體的處理邏輯。
queryCompleted 方法用于處理查詢完成事件,根據傳入的 queryCompletedEvent 對象,提取了查詢完成時的一系列詳細信息,并將這些信息構造成 JSON 格式的數據,然后發送到 Kafka 主題中。同時,如果查詢失敗,也會將相應的失敗信息發送到 Kafka 主題中。
splitCompleted 方法用于處理分割完成事件,根據傳入的 splitCompletedEvent 對象,提取了分割完成時的一系列詳細信息,并將這些信息構造成 JSON 格式的數據,然后發送到 Kafka 主題中。
close 方法用于關閉 Kafka 生產者連接。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ds</groupId>
<artifactId>PrestoHook</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-spi</artifactId>
<version>0.275</version>
<scope>compile</scope>
</dependency>
<!-- Kafka 相關依賴 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version> <!-- 替換為你所使用的 Kafka 版本 -->
</dependency>
</dependencies>
<!-- 打包插件 -->
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
打包說明:
在 Presto 中,服務提供者接口(Service Provider Interface,SPI)用于擴展 Presto 并提供額外的功能,例如加載連接器、功能、類型和系統訪問控制等。SPI 通過元數據文件加載,這使得 Presto 能夠動態地加載和識別這些擴展點。
為了使用 SPI 擴展 Presto,我們需要創建一個特定的元數據文件并將其放置在 src/main/resources/META-INF/services/ 目錄下。這個元數據文件的名稱應該與要擴展的接口的完全限定名相匹配。對于插件接口來說,文件的名稱應該是 com.facebook.presto.spi.Plugin,文件的內容包含 com.ds.QueryEventPlugin。
在這個元數據文件中,我們需要列出實現了對應接口的類的完全限定名。在這種情況下,就是列出實現了 com.facebook.presto.spi.Plugin 接口的類的完全限定名。這樣,當 Presto 加載時,它會檢查這個元數據文件,并根據文件中列出的類的名稱來加載對應的插件。
event-listener.name=query-event-listener
kafka.bootstrap.servers=10.82.4.11:9092
kafka.topic=prestojob
注意:這里需要部署到所有的 presto 服務節點,然后重啟 presto 服務生效。
使用 presto 執行一條查詢語句“select 1”,對應的日志輸出到 kafka 的日志內容如下:
{"queryId": "20240604_090914_00005_bb9j6","querySql": "select 1","queryState": "FINISHED","queryUser": "presto","createTime": 1717492154262,"endTime": 1717492154326,"startTime": 1717492154264,"analysisTime": 16,"cpuTime": 6,"queuedTime": 1,"wallTime": 21,"completedSplits": 17,"cumulativeMemory": 0.0,"outputBytes": 5,"outputRows": 1,"totalBytes": 0,"totalRows": 0,"writtenBytes": 0,"writtenRows": 0}
后續就可以直接消費 kafka 里面的數據,實現自定義的處理。
自定義Event Listener是Presto生態系統中一個強大而靈活的功能,它不僅增強了系統的可觀測性,還提供了與外部系統交互的能力。通過本文的介紹和示例,希望能激發讀者在自己的Presto部署中探索和實現自定義Event Listener,從而更好地滿足業務需求和優化數據處理流程。
本文章轉載微信公眾號@滌生大數據