{
//該方法用于處理查詢創建事件,它接受一個 QueryCreatedEvent 類型的參數,該參數包含了查詢創建時的詳細信息。
default void queryCreated(QueryCreatedEvent queryCreatedEvent)
{
}
//該方法用于處理查詢完成事件,它接受一個 QueryCompletedEvent 類型的參數,該參數包含了查詢完成時的詳細信息。
default void queryCompleted(QueryCompletedEvent queryCompletedEvent)
{
}
//該方法用于處理分割完成事件,它接受一個 SplitCompletedEvent 類型的參數,該參數包含了分割完成時的詳細信息。
default void splitCompleted(SplitCompletedEvent splitCompletedEvent)
{
}
}

3.實踐案例

下面通過一個簡單的例子來說明如何實現自定義Event Listener。假設我們的目標是創建一個監聽器,用于記錄查詢任務的一些狀態信息,并將這些信息發送kafka 中。

1.自定義一個工廠類QueryEventListenerFactory,實現了EventListenerFactory接口。

package com.ds;

import com.facebook.presto.spi.eventlistener.EventListener;
import com.facebook.presto.spi.eventlistener.EventListenerFactory;

import java.util.Map;

public class QueryEventListenerFactory implements EventListenerFactory {
@Override
public String getName() {
return "query-event-listener";
}

@Override
///直接使用Presto傳遞過來的Map<String, String>配置,
public EventListener create(Map<String, String> config) {
if (!config.containsKey("kafka.bootstrap.servers")) {
throw new RuntimeException("event-listener.properties file missing kafka.bootstrap.servers");
}
// 檢查Kafka主題配置
if (!config.containsKey("kafka.topic")) {
throw new RuntimeException("event-listener.properties file missing kafka.topic");
}

return new QueryEventListener(config);
}
}

代碼簡介:

1.QueryEventListenerFactory類實現了EventListenerFactory接口,這個接口是用來創建事件監聽器的工廠。
2.getName方法返回一個字符串"query-event-listener",表示這個工廠的名稱。
3.create方法接收一個Map<String, String>類型的配置參數,用于創建事件監聽器。在方法內部,首先檢查配置中是否包含"kafka.bootstrap.servers"和"kafka.topic"這兩個關鍵配置,如果缺少其中之一,就會拋出運行時異常。
4.如果配置完整,就會創建一個新的QueryEventListener對象,將配置參數傳遞給它,并返回該對象作為事件監聽器。
5.說明:Presto在啟動過程中會讀取/etc 目錄下以.properties結尾的配置文件,并將配置項以Map<String, String>的形式傳遞給EventListenerFactory的create方法。因此,QueryEventListenerFactory可以從這個Map中讀取所需的配置信息,如果沒有找到必要的配置項,會拋出異常。

2.自定義類 QueryEventPlugin 實現Plugin接口。

package com.ds;

import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.eventlistener.EventListenerFactory;
import java.util.Arrays;

public class QueryEventPlugin implements Plugin {

@Override
public Iterable<EventListenerFactory> getEventListenerFactories() {
EventListenerFactory listenerFactory = new QueryEventListenerFactory();
return Arrays.asList(listenerFactory);
}
}

代碼簡介:

3.自定義一個核心類QueryEventListener

package com.ds;

import com.facebook.presto.spi.eventlistener.EventListener;
import com.facebook.presto.spi.eventlistener.QueryCompletedEvent;
import com.facebook.presto.spi.eventlistener.QueryCreatedEvent;
import com.facebook.presto.spi.eventlistener.SplitCompletedEvent;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class QueryEventListener implements EventListener {
private KafkaProducer<String, String> producer;
private String KafKaTopic;
private Map<String, String> config;

public QueryEventListener(Map<String, String> config) {
this.config = new HashMap<>();
this.config.putAll(config);
init();
}

private void init() {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", config.get("kafka.bootstrap.servers"));
kafkaProps.put("key.serializer", StringSerializer.class);
kafkaProps.put("value.serializer", StringSerializer.class);
this.producer = new KafkaProducer<>(kafkaProps);
this.KafKaTopic = (String) config.get("kafka.topic");
}

@Override
public void queryCreated(QueryCreatedEvent queryCreatedEvent) {
}

@Override
public void queryCompleted(QueryCompletedEvent queryCompletedEvent) {

String queryId = queryCompletedEvent.getMetadata().getQueryId();
String querySql = queryCompletedEvent.getMetadata().getQuery();
String queryState = queryCompletedEvent.getMetadata().getQueryState();
String queryUser = queryCompletedEvent.getContext().getUser();
long createTime = queryCompletedEvent.getCreateTime().toEpochMilli();
long endTime = queryCompletedEvent.getEndTime().toEpochMilli();
long startTime = queryCompletedEvent.getExecutionStartTime().toEpochMilli();
long analysisTime = queryCompletedEvent.getStatistics().getAnalysisTime().orElse(Duration.ZERO)
.toMillis();
long cpuTime = queryCompletedEvent.getStatistics().getCpuTime().toMillis();
long queuedTime = queryCompletedEvent.getStatistics().getQueuedTime().toMillis();
long wallTime = queryCompletedEvent.getStatistics().getWallTime().toMillis();
int completedSplits = queryCompletedEvent.getStatistics().getCompletedSplits();
double cumulativeMemory = queryCompletedEvent.getStatistics().getCumulativeMemory();
long outputBytes = queryCompletedEvent.getStatistics().getOutputBytes();
long outputRows = queryCompletedEvent.getStatistics().getOutputRows();
long totalBytes = queryCompletedEvent.getStatistics().getTotalBytes();
long totalRows = queryCompletedEvent.getStatistics().getTotalRows();
long writtenBytes = queryCompletedEvent.getStatistics().getWrittenOutputBytes();
long writtenRows = queryCompletedEvent.getStatistics().getWrittenOutputRows();
// 構造事件數據
String eventData = "{" +
"\"queryId\": \"" + queryId + "\"," +
"\"querySql\": \"" + querySql + "\"," +
"\"queryState\": \"" + queryState + "\"," +
"\"queryUser\": \"" + queryUser + "\"," +
"\"createTime\": " + createTime + "," +
"\"endTime\": " + endTime + "," +
"\"startTime\": " + startTime + "," +
"\"analysisTime\": " + analysisTime + "," +
"\"cpuTime\": " + cpuTime + "," +
"\"queuedTime\": " + queuedTime + "," +
"\"wallTime\": " + wallTime + "," +
"\"completedSplits\": " + completedSplits + "," +
"\"cumulativeMemory\": " + cumulativeMemory + "," +
"\"outputBytes\": " + outputBytes + "," +
"\"outputRows\": " + outputRows + "," +
"\"totalBytes\": " + totalBytes + "," +
"\"totalRows\": " + totalRows + "," +
"\"writtenBytes\": " + writtenBytes + "," +
"\"writtenRows\": " + writtenRows +
"}";
// 發送到 Kafka 主題
ProducerRecord<String, String> Completedrecord = new ProducerRecord<>(KafKaTopic, eventData);
producer.send(Completedrecord);

queryCompletedEvent.getFailureInfo().ifPresent(queryFailureInfo -> {
int code = queryFailureInfo.getErrorCode().getCode();
String name = queryFailureInfo.getErrorCode().getName();
String failureType = queryFailureInfo.getFailureType().orElse("").toUpperCase();
String failureHost = queryFailureInfo.getFailureHost().orElse("").toUpperCase();
String failureMessage = queryFailureInfo.getFailureMessage().orElse("").toUpperCase();
String failureTask = queryFailureInfo.getFailureTask().orElse("").toUpperCase();
String failuresJson = queryFailureInfo.getFailuresJson();
// 構造 JSON 字符串
String failureData = "{"
+ "\"code\": " + code + ","
+ "\"name\": \"" + name + "\","
+ "\"failureType\": \"" + failureType + "\","
+ "\"failureHost\": \"" + failureHost + "\","
+ "\"failureMessage\": \"" + failureMessage + "\","
+ "\"failureTask\": \"" + failureTask + "\","
+ "\"failuresJson\": \"" + failuresJson + "\""
+ "}";
// 發送到 Kafka 主題
ProducerRecord<String, String> failurerecord = new ProducerRecord<>(KafKaTopic, failureData);
producer.send(failurerecord);
});
}

@Override
public void splitCompleted(SplitCompletedEvent splitCompletedEvent) {
long createTime = splitCompletedEvent.getCreateTime().toEpochMilli();
long endTime = splitCompletedEvent.getEndTime().orElse(Instant.MIN).toEpochMilli();
String payload = splitCompletedEvent.getPayload();
String queryId = splitCompletedEvent.getQueryId();
String stageId = splitCompletedEvent.getStageId();
long startTime = splitCompletedEvent.getStartTime().orElse(Instant.MIN).toEpochMilli();
String taskId = splitCompletedEvent.getTaskId();
long completedDataSizeBytes = splitCompletedEvent.getStatistics().getCompletedDataSizeBytes();
long completedPositions = splitCompletedEvent.getStatistics().getCompletedPositions();
long completedReadTime = splitCompletedEvent.getStatistics().getCompletedReadTime().toMillis();
long cpuTime = splitCompletedEvent.getStatistics().getCpuTime().toMillis();
long queuedTime = splitCompletedEvent.getStatistics().getQueuedTime().toMillis();
long wallTime = splitCompletedEvent.getStatistics().getWallTime().toMillis();
////
// 構造拆分事件數據
String splitEventData = "{ "
+ "\"createTime\": " + createTime + ", "
+ "\"endTime\": " + endTime + ", "
+ "\"payload\": \"" + payload + "\", "
+ "\"queryId\": \"" + queryId + "\", "
+ "\"stageId\": \"" + stageId + "\", "
+ "\"startTime\": " + startTime + ", "
+ "\"taskId\": \"" + taskId + "\", "
+ "\"completedDataSizeBytes\": " + completedDataSizeBytes + ", "
+ "\"completedPositions\": " + completedPositions + ", "
+ "\"completedReadTime\": " + completedReadTime + ", "
+ "\"cpuTime\": " + cpuTime + ", "
+ "\"queuedTime\": " + queuedTime + ", "
+ "\"wallTime\": " + wallTime
+ " }";
// 發送到 Kafka 主題

ProducerRecord<String, String> splitrecord = new ProducerRecord<>(KafKaTopic, splitEventData);
producer.send(splitrecord);

}

public void close() {
if (producer != null) {
producer.close();
}
}
}

代碼簡介:

在 QueryEventListener 類中,首先聲明了一個 KafkaProducer 對象和一些其他實例變量。
構造函數接受一個 config 參數,該參數被用于初始化監聽器的配置信息,并且在初始化過程中會建立 Kafka 生產者連接。
init 方法用于初始化 Kafka 生產者對象,配置 Kafka 連接信息,并設置 Kafka 主題。
queryCreated 方法用于處理查詢創建事件,但在代碼中未給出具體的處理邏輯。
queryCompleted 方法用于處理查詢完成事件,根據傳入的 queryCompletedEvent 對象,提取了查詢完成時的一系列詳細信息,并將這些信息構造成 JSON 格式的數據,然后發送到 Kafka 主題中。同時,如果查詢失敗,也會將相應的失敗信息發送到 Kafka 主題中。
splitCompleted 方法用于處理分割完成事件,根據傳入的 splitCompletedEvent 對象,提取了分割完成時的一系列詳細信息,并將這些信息構造成 JSON 格式的數據,然后發送到 Kafka 主題中。
close 方法用于關閉 Kafka 生產者連接。

4.pom 依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.ds</groupId>
<artifactId>PrestoHook</artifactId>
<version>1.0-SNAPSHOT</version>

<dependencies>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-spi</artifactId>
<version>0.275</version>
<scope>compile</scope>
</dependency>

<!-- Kafka 相關依賴 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version> <!-- 替換為你所使用的 Kafka 版本 -->
</dependency>
</dependencies>
<!-- 打包插件 -->
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>

</project>

5.具體使用步驟

打包說明:

在 Presto 中,服務提供者接口(Service Provider Interface,SPI)用于擴展 Presto 并提供額外的功能,例如加載連接器、功能、類型和系統訪問控制等。SPI 通過元數據文件加載,這使得 Presto 能夠動態地加載和識別這些擴展點。

為了使用 SPI 擴展 Presto,我們需要創建一個特定的元數據文件并將其放置在 src/main/resources/META-INF/services/ 目錄下。這個元數據文件的名稱應該與要擴展的接口的完全限定名相匹配。對于插件接口來說,文件的名稱應該是 com.facebook.presto.spi.Plugin,文件的內容包含 com.ds.QueryEventPlugin。

在這個元數據文件中,我們需要列出實現了對應接口的類的完全限定名。在這種情況下,就是列出實現了 com.facebook.presto.spi.Plugin 接口的類的完全限定名。這樣,當 Presto 加載時,它會檢查這個元數據文件,并根據文件中列出的類的名稱來加載對應的插件。

6.打成 jar 包文件,部署到 presto 集群,重啟 presto 服務生效。

  1. 在 presto 的 home 路徑的plugin路徑下新建目錄query-event-listener
  2. 將 jar 包文件放在上面創建的目錄下,注意權限(啟動 presto 服務的用戶要有可讀權限)
  3. 在presto 的 home 路徑的 etc 路徑下新建配置文件event-listener.properties,文件的內容如下:
event-listener.name=query-event-listener
kafka.bootstrap.servers=10.82.4.11:9092
kafka.topic=prestojob

注意:這里需要部署到所有的 presto 服務節點,然后重啟 presto 服務生效。

7.驗證效果

使用 presto 執行一條查詢語句“select 1”,對應的日志輸出到 kafka 的日志內容如下:

{"queryId": "20240604_090914_00005_bb9j6","querySql": "select 1","queryState": "FINISHED","queryUser": "presto","createTime": 1717492154262,"endTime": 1717492154326,"startTime": 1717492154264,"analysisTime": 16,"cpuTime": 6,"queuedTime": 1,"wallTime": 21,"completedSplits": 17,"cumulativeMemory": 0.0,"outputBytes": 5,"outputRows": 1,"totalBytes": 0,"totalRows": 0,"writtenBytes": 0,"writtenRows": 0}

后續就可以直接消費 kafka 里面的數據,實現自定義的處理。

4.結尾總結

自定義Event Listener是Presto生態系統中一個強大而靈活的功能,它不僅增強了系統的可觀測性,還提供了與外部系統交互的能力。通過本文的介紹和示例,希望能激發讀者在自己的Presto部署中探索和實現自定義Event Listener,從而更好地滿足業務需求和優化數據處理流程。

本文章轉載微信公眾號@滌生大數據

上一篇:

在 .NET Core 中使用客戶端-服務器體系結構的 WebAPI 和 Dapper

下一篇:

.NET Core WebAPI 緩存神器Redis
#你可能也喜歡這些API文章!

我們有何不同?

API服務商零注冊

多API并行試用

數據驅動選型,提升決策效率

查看全部API→
??

熱門場景實測,選對API

#AI文本生成大模型API

對比大模型API的內容創意新穎性、情感共鳴力、商業轉化潛力

25個渠道
一鍵對比試用API 限時免費

#AI深度推理大模型API

對比大模型API的邏輯推理準確性、分析深度、可視化建議合理性

10個渠道
一鍵對比試用API 限時免費