
掌握API建模:基本概念和實踐
但我們可以利用 CDC 記錄做更多事情。關(guān)注者不需要是關(guān)注者數(shù)據(jù)庫。它還可以是您想要保留最新用戶的緩存,或者是在添加新文檔時更新的搜索索引等等。這個想法是,您可以將不同的使用者附加到不同用例的 CDC 記錄,并且它們與原始數(shù)據(jù)庫以及彼此之間完全解耦。
我們剛剛回顧的 CDC 用例非常令人興奮,并且可以應(yīng)用于廣泛的問題。
現(xiàn)在我們“只”需要構(gòu)建一個系統(tǒng),該系統(tǒng)可以:
肯定有很多開源工具可以讓您構(gòu)建一個系統(tǒng)來做到這一點(diǎn)。如果您沿著這條路開始,您可能很快就會設(shè)計出一個與此類似的系統(tǒng):
每個系統(tǒng)都會執(zhí)行所承諾的操作,但部署、配置和操作每個系統(tǒng)以及確保它們彼此無縫協(xié)作的開銷并非微不足道。這就是 Decodable 將其作為服務(wù)提供的原因。
Decodable 是一個實時數(shù)據(jù)處理平臺,為 CDC 處理提供一流的服務(wù)。用戶只需連接到源數(shù)據(jù)庫并實時處理 CDC 記錄,無需管理底層基礎(chǔ)設(shè)施。
在 Decodable 中,CDC 連接器發(fā)出更改記錄,這些記錄存儲在更改流中。每個更改記錄都包含修改類型:插入、更新和刪除,以及修改的數(shù)據(jù)。這些記錄是根據(jù)管道中的修改進(jìn)行處理的。在接收器一側(cè),更改流可以連接到支持使用更改記錄的連接器(例如:Postgres Sink)。
典型的工作流程包含三個步驟(出于演示目的,此處顯示的示例已被簡化):
1. 配置 Decodable 的 CDC 連接器(例如:Postgres CDC、MySQL CDC),它會自動提供包含 CDC 記錄的變更流。例如:{"op":"c","before":null,"after":{"id":1,"user":"alice","status":"NEW"},"ts_ms":0} {"op":"c","before":null,"after":{"id":2,"user":"alice","status":"NEW"},"ts_ms":1} {"op":"c","before":null,"after":{"id":3,"user":"bob","status":"NEW"},"ts_ms":2} {"op":"u","before":{"id":2,"user":"alice","status":"NEW"},"after":{"id":2,"user":"alice","status":"SHIPPED"},"ts_ms":3} {"op":"u","before":{"id":2,"user":"alice","status":"SHIPPED"},"after":{"id":2,"user":"alice","status":"DELIVERED"},"ts_ms":4}
Copy
2. 創(chuàng)建管道,處理邏輯用SQL編寫。例如,下面的 SQL 查詢對每個用戶未交付的所有訂單進(jìn)行計數(shù)。insert into non_delivered_count select user, count(status) as total_non_delivered from orders where status != "DELIVERED" group by user
Copy
上面的查詢還會自動配置一個輸出更改流non_delivered_count ,其中包含供接收器連接器使用的 CDC 記錄。由于管道是連續(xù)處理的,因此結(jié)果也會隨著每個輸入記錄而更新。輸出流中的記錄如下所示:{"op":"c","before":null,"after":{"user":"alice","total_non_delivered":1},"ts_ms":0} {"op":"d","before":{"user":"alice","total_non_delivered":1},"after":null,"ts_ms":1} {"op":"c","before":null,"after":{"user":"alice","total_non_delivered":2},"ts_ms":1} {"op":"c","before":null,"after":{"user":"bob","total_non_delivered":1},"ts_ms":2} {"op":"d","before":{"user":"alice","total_non_delivered":2},"after":null,"ts_ms":4} {"op":"c","before":null,"after":{"user":"alice","total_non_delivered":1},"ts_ms":4}
Copy
3. 配置一個接收器連接器,它可以消耗更改以實時查看結(jié)果。例如,如果配置了 Postgres 接收器,則連接會針對流中的每條記錄向 Postgres 數(shù)據(jù)庫發(fā)出一個操作。它們看起來像:INSERT INTO postgres_output(user, total_non_delivered) VALUES ("alice", 1) ON CONFLICT (user) DO UPDATE SET id=EXCLUDED.user, value=EXCLUDED.total_non_delivered; INSERT INTO postgres_output(user, total_non_delivered) VALUES ("alice", 2) ON CONFLICT (user) DO UPDATE SET id=EXCLUDED.user, value=EXCLUDED.total_non_delivered; INSERT INTO postgres_output(user, total_non_delivered) VALUES ("bob", 1) ON CONFLICT (user) DO UPDATE SET id=EXCLUDED.user, value=EXCLUDED.total_non_delivered; INSERT INTO postgres_output(user, total_non_delivered) VALUES ("alice", 1) ON CONFLICT (user) DO UPDATE SET id=EXCLUDED.user, value=EXCLUDED.total_non_delivered;
Copy
請注意,SQL 語法INSERT…ON CONFLICT…DO UPDATE SET…用于確保以原子方式應(yīng)用撤消(刪除 + 創(chuàng)建)。
現(xiàn)在我們知道 Decodable 可以實時處理 CDC 記錄,并支持目標(biāo)系統(tǒng)始終擁有最新視圖。但沒有限制一次只能處理一個變更流。借助 Flink 強(qiáng)大的引擎和易于訪問的 SQL 接口,流連接變得更加容易——您可以實時連接來自不同數(shù)據(jù)庫的表!
文章來源:https://www.decodable.co/blog/what-is-change-data-capture