但我們可以利用 CDC 記錄做更多事情。關(guān)注者不需要是關(guān)注者數(shù)據(jù)庫。它還可以是您想要保留最新用戶的緩存,或者是在添加新文檔時更新的搜索索引等等。這個想法是,您可以將不同的使用者附加到不同用例的 CDC 記錄,并且它們與原始數(shù)據(jù)庫以及彼此之間完全解耦。

我如何建立一個CDC系統(tǒng)?

我們剛剛回顧的 CDC 用例非常令人興奮,并且可以應(yīng)用于廣泛的問題。

現(xiàn)在我們“只”需要構(gòu)建一個系統(tǒng),該系統(tǒng)可以:

肯定有很多開源工具可以讓您構(gòu)建一個系統(tǒng)來做到這一點(diǎn)。如果您沿著這條路開始,您可能很快就會設(shè)計出一個與此類似的系統(tǒng):

每個系統(tǒng)都會執(zhí)行所承諾的操作,但部署、配置和操作每個系統(tǒng)以及確保它們彼此無縫協(xié)作的開銷并非微不足道。這就是 Decodable 將其作為服務(wù)提供的原因。

可解碼流程 CDC 記錄即服務(wù)

Decodable 是一個實時數(shù)據(jù)處理平臺,為 CDC 處理提供一流的服務(wù)。用戶只需連接到源數(shù)據(jù)庫并實時處理 CDC 記錄,無需管理底層基礎(chǔ)設(shè)施。

在 Decodable 中,CDC 連接器發(fā)出更改記錄,這些記錄存儲在更改流中。每個更改記錄都包含修改類型:插入、更新和刪除,以及修改的數(shù)據(jù)。這些記錄是根據(jù)管道中的修改進(jìn)行處理的。在接收器一側(cè),更改流可以連接到支持使用更改記錄的連接器(例如:Postgres Sink)。

典型的工作流程包含三個步驟(出于演示目的,此處顯示的示例已被簡化):

1. 配置 Decodable 的 CDC 連接器(例如:Postgres CDC、MySQL CDC),它會自動提供包含 CDC 記錄的變更流。例如:{"op":"c","before":null,"after":{"id":1,"user":"alice","status":"NEW"},"ts_ms":0} {"op":"c","before":null,"after":{"id":2,"user":"alice","status":"NEW"},"ts_ms":1} {"op":"c","before":null,"after":{"id":3,"user":"bob","status":"NEW"},"ts_ms":2} {"op":"u","before":{"id":2,"user":"alice","status":"NEW"},"after":{"id":2,"user":"alice","status":"SHIPPED"},"ts_ms":3} {"op":"u","before":{"id":2,"user":"alice","status":"SHIPPED"},"after":{"id":2,"user":"alice","status":"DELIVERED"},"ts_ms":4}Copy

2. 創(chuàng)建管道,處理邏輯用SQL編寫。例如,下面的 SQL 查詢對每個用戶未交付的所有訂單進(jìn)行計數(shù)。insert into non_delivered_count select user, count(status) as total_non_delivered from orders where status != "DELIVERED" group by userCopy

上面的查詢還會自動配置一個輸出更改流non_delivered_count ,其中包含供接收器連接器使用的 CDC 記錄。由于管道是連續(xù)處理的,因此結(jié)果也會隨著每個輸入記錄而更新。輸出流中的記錄如下所示:{"op":"c","before":null,"after":{"user":"alice","total_non_delivered":1},"ts_ms":0} {"op":"d","before":{"user":"alice","total_non_delivered":1},"after":null,"ts_ms":1} {"op":"c","before":null,"after":{"user":"alice","total_non_delivered":2},"ts_ms":1} {"op":"c","before":null,"after":{"user":"bob","total_non_delivered":1},"ts_ms":2} {"op":"d","before":{"user":"alice","total_non_delivered":2},"after":null,"ts_ms":4} {"op":"c","before":null,"after":{"user":"alice","total_non_delivered":1},"ts_ms":4}Copy

3. 配置一個接收器連接器,它可以消耗更改以實時查看結(jié)果。例如,如果配置了 Postgres 接收器,則連接會針對流中的每條記錄向 Postgres 數(shù)據(jù)庫發(fā)出一個操作。它們看起來像:INSERT INTO postgres_output(user, total_non_delivered) VALUES ("alice", 1) ON CONFLICT (user) DO UPDATE SET id=EXCLUDED.user, value=EXCLUDED.total_non_delivered; INSERT INTO postgres_output(user, total_non_delivered) VALUES ("alice", 2) ON CONFLICT (user) DO UPDATE SET id=EXCLUDED.user, value=EXCLUDED.total_non_delivered; INSERT INTO postgres_output(user, total_non_delivered) VALUES ("bob", 1) ON CONFLICT (user) DO UPDATE SET id=EXCLUDED.user, value=EXCLUDED.total_non_delivered; INSERT INTO postgres_output(user, total_non_delivered) VALUES ("alice", 1) ON CONFLICT (user) DO UPDATE SET id=EXCLUDED.user, value=EXCLUDED.total_non_delivered;Copy

請注意,SQL 語法INSERT…ON CONFLICT…DO UPDATE SET…用于確保以原子方式應(yīng)用撤消(刪除 + 創(chuàng)建)。

通過 CDC 解鎖更多用例

現(xiàn)在我們知道 Decodable 可以實時處理 CDC 記錄,并支持目標(biāo)系統(tǒng)始終擁有最新視圖。但沒有限制一次只能處理一個變更流。借助 Flink 強(qiáng)大的引擎和易于訪問的 SQL 接口,流連接變得更加容易——您可以實時連接來自不同數(shù)據(jù)庫的表!

文章來源:https://www.decodable.co/blog/what-is-change-data-capture

上一篇:

GraphQL vs. REST APIs:為何不應(yīng)使用GraphQL

下一篇:

簡化API縮寫:應(yīng)用程序編程接口終極指南
#你可能也喜歡這些API文章!

我們有何不同?

API服務(wù)商零注冊

多API并行試用

數(shù)據(jù)驅(qū)動選型,提升決策效率

查看全部API→
??

熱門場景實測,選對API

#AI文本生成大模型API

對比大模型API的內(nèi)容創(chuàng)意新穎性、情感共鳴力、商業(yè)轉(zhuǎn)化潛力

25個渠道
一鍵對比試用API 限時免費(fèi)

#AI深度推理大模型API

對比大模型API的邏輯推理準(zhǔn)確性、分析深度、可視化建議合理性

10個渠道
一鍵對比試用API 限時免費(fèi)