当前位置: 代码迷 >> 综合 >> Flink CDC 核心组件 Debezium 1.8.0.Beta1 发布!多个重磅特性解读!
  详细解决方案

Flink CDC 核心组件 Debezium 1.8.0.Beta1 发布!多个重磅特性解读!

热度:61   发布时间:2023-12-13 14:51:42.0

我很高兴地宣布 Debezium 1.8.0.Beta1的发布!

此版本包含令人兴奋的新功能,例如对 MongoDB 5.0 的支持、MongoDB 连接器的发件箱事件路由器和对 Postgres 逻辑解码消息的支持,以及大量错误修复和其他改进。总的来说,此版本已修复不少于63 个问题。

让我们仔细看看其中的一些。

5c704c46460514b2acaef58a96faeca0.png

MongoDB 发件箱事件路由器

发件箱模式越来越流行,用于以可靠的方式在微服务之间交换数据,而不使用对服务数据库和 Apache Kafka 的不安全双重写入

使用发件箱模式,您不是从实际业务表中捕获更改,而是将要发送给外部消费者的消息写入专用发件箱表。这很好地将您的内部数据模型与用于与外部服务通信的消息契约分离,允许您独立开发和发展这些模型。对您的业务表的更新和对发件箱表的插入是在一个数据库事务中完成的,因此要么完成这两件事,要么什么都不做。一旦消息被保存在发件箱表中,Debezium 可以从那里捕获它并使用通常的至少一次语义将其传播给任何消费者。

Debezium 通过特殊的单消息转换 (SMT),即发件箱事件路由器,为实现发件箱模式提供支持。这负责将事件从单个发件箱表路由到不同的主题,基于表示事件所针对的聚合类型(用域驱动设计的说法)的可配置列。此外,还有一个用于从使用Quarkus构建的服务发出发件箱事件的扩展,这是一个用于构建云原生微服务的堆栈。

这些东西现在由一个新的事件路由 SMT补充,它与 MongoDB 的 Debezium 连接器一起工作。由于 MongoDB 连接器的事件格式与关系数据库的 Debezium 连接器的格式不同,因此有必要创建此单独的 SMT。以下是配置 SMT 的示例:

{"name": "outbox-connector","config": {"connector.class" : "io.debezium.connector.mongodb.MongoDbConnector","tasks.max" : "1","mongodb.hosts" : "rs0/mongodb:27017","mongodb.name" : "dbserver1","mongodb.user" : "debezium","mongodb.password" : "dbz","collection.include.list": "inventory.outboxevent","database.history.kafka.bootstrap.servers" : "kafka:9092","transforms" : "outbox","transforms.outbox.type" :"io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter","transforms.outbox.route.topic.replacement" : "${routedByValue}.events","transforms.outbox.collection.expand.json.payload" : "true","transforms.outbox.collection.field.event.timestamp" : "timestamp","transforms.outbox.collection.fields.additional.placement" : "type:header:eventType","key.converter": "org.apache.kafka.connect.storage.StringConverter","value.converter": "org.apache.kafka.connect.storage.StringConverter"}
}

在这里,我们使用MongoEventRouterSMT 来捕获inventory.outboxevent集合中的更改。事件可以这样编写,以 MongoDB CLI 为例:

new_order = { "_id" : ObjectId("000000000000000000000002"), "order_date" : ISODate("2021-11-22T00:00:00Z"), "purchaser_id" : NumberLong(1004), "quantity" : 1, "product_id" : NumberLong(107) }s = db.getMongo().startSession()
s.startTransaction()s.getDatabase("inventory").orders.insert(new_order)
s.getDatabase("inventory").outboxevent.insert({ _id : ObjectId("000000000000000000000001"), aggregateid : new_order._id, aggregatetype : "Order", type : "OrderCreated", timestamp: NumberLong(1556890294484), payload : new_order })s.commitTransaction()

请注意我们如何在事务中插入业务集合(“订单”)和发件箱集合(“发件箱事件”),这是自 MongoDB 4.0 版以来所支持的。在这种特殊情况下,虽然我们在发件箱消息本身中使用实际订单对象,但我们也可以将这些内容分开,并在发件箱事件中选择采购订单的另一种表示形式。

订单聚合的 id用作 Kafka 中的消息键,确保与给定采购订单相关的所有发件箱事件的顺序一致。的聚合类型被用于确定所述主题名称路由事件,Order.events在这个例子中。消息本身的唯一ID作为 Kafka 消息中的标头传播,例如允许消费者识别重复的消息。

您可以在我们的演示存储库中找到使用这个新的 MongoDB 发件箱事件路由 SMT的完整示例。非常感谢Sungho Hwang,他不仅提供了实际的功能实现本身,还创建了这个示例。

围绕 Debezium MongoDB 连接器发件箱支持的潜在下一步可能是将 MongoDB 支持添加到 Quarkus 发件箱扩展中,并可以选择从附加到实体的子文档中捕获发件箱事件,例如Order. 这样,您的应用程序的数据和发件箱消息可以作为单个文档编写(否则应用程序将忽略发件箱子文档本身)并且不需要跨文档事务。这个想法是通过DBZ-4319跟踪的;如果您认为这是一个有用的补充,或者您是否有兴趣实施它,请告诉我们。

对 Postgres 的支持 pg_logical_emit_message()

Postgres 的多功能性和灵活性堪称传奇;有趣但鲜为人知的功能之一是能够将消息写入数据库的事务日志 (WAL),而实际上无需写入表。这是通过pg_logical_emit_message()函数完成的。从 Postgres 14 开始,可以使用pgoutput插件捕获这些逻辑解码消息,并且从该版本开始,Debezium 也支持这种事件类型。

逻辑解码消息非常适合传播与您的交易相关的上下文信息,而无需将此数据存储在表中。例如,这可能是审核元数据,例如触发某些数据更改的业务用户。另一个潜在的用例是上面提到的发件箱模式,它可以在没有专用发件箱表的情况下实现,只需将发件箱事件写入 WAL。例如,在考虑内务管理时,这是有利的:在将消息传播到 Kafka 后,无需从发件箱表中删除消息。

“发送”一个逻辑解码消息就这么简单:

SELECT pg_logical_emit_message(true, 'some-prefix', 'some text');

这会发出一个事务性的 ( true)消息,带有“some-prefix”前缀和“some text”作为消息内容。前缀可用于将消息分组到逻辑上下文中。Debezium 使用前缀作为 Kafka 消息键,即所有具有相同前缀的消息将进入相应 Kafka 主题的相同分区,因此将按照创建时相同的顺序传播给下游消费者。

逻辑解码消息由 Debezium Postgres 连接器使用新的事件类型(“m”)发出,看起来像这样(消息内容是二进制编码的,在本例中使用 Base64):

{"source": {"version": "1.8.0.Beta1","connector": "postgresql","name": "PostgreSQL_server","ts_ms": 1559033904863,"snapshot": false,"db": "postgres","schema": "","table": "","txId": 556,"lsn": 46523128,"xmin": null},"op": "m","ts_ms": 1559033904961,"message": {"prefix": "some-prefix","content": "c29tZSB0ZXh0"}
}

消息内容是任意有效载荷,除了文本表示之外,您还可以在此处插入二进制数据。事件生产者有责任记录格式,在考虑向后兼容性的情况下发展它,并与任何客户端交换模式信息。这样做的一种好方法是利用模式注册表,例如Apicurio。您还可以考虑使用诸如CloudEvents 之类的标准来进行逻辑解码消息,例如,这将允许 SMT(如上述发件箱事件路由器)根据事件结构中定义的属性采取行动。

要了解有关 Debezium 中逻辑解码消息支持的更多信息,请参阅连接器文档。非常感谢 Lairen Hightower 实现此功能!

其他修复和更改

1.8.0.Beta1 版本中的进一步修复和改进包括:

  • 支持在 Debezium UI 中配置 SMT 和主题创建设置;你可以在这篇文章的快速视频中看到前者,我们将在本周晚些时候分享主题创建 UI 的另一个演示

  • Vitess 连接器中的交易元数据事件 ( DBZ-4355 );我们还通过删除对 vtctld ( DBZ-4324 )的依赖来简化其配置,添加了对stop_on_reshard标志 ( DBZ-4295 ) 的支持,并提供了将 VGTID 指定为流式传输起点的功能 ( DBZ-4297 )。所有这些变化都是由 Stripe 工程团队的 Yang Wu 和 Shichao 贡献的,他们同意加强作为这个连接器的维护者。非常感谢,欢迎!

  • 针对 Oracle 的 Debezium 连接器的基于 Infinispan 的事务缓冲区的更灵活配置 ( DBZ-4169 )

  • 改进MONEY了 Postgres中的列 ( DBZ-1931 ) 和INTERVALOracle 中的列 ( DBZ-1539 ) 的类型映射

  • 在使用 Debezium 连接器为 MySQL ( DBZ-4196 )执行增量快照时支持架构更改;感谢 Kate Galieva 的这一非常有用的改进!

请参阅发行说明以了解有关此版本中这些和进一步修复的更多信息。

总结

随着 Beta1 的发布,我们正在接近 1.8 发布周期的最后阶段。您可以期待下周某个时候的 CR1,并且根据收到的问题报告,我们可能会决定在圣诞节前一周或 2022 年的第一周削减最终版本。就要添加的功能而言,我们有一件事想要了解的是对 MongoDB 连接器的增量快照支持。我们将不得不看看这是否会在剩余的时间内完成,或者是否必须等待 Debezium 1.9 发布。在 1.8 发布线日趋成熟的同时,您也可以期待 Debezium 1.7.2 的发布。

展望未来,我们还将继续围绕 Debezium 2.0 进行规划,该版本将于明年某个时候发布。请在邮件列表中加入有关此主题的讨论。

本文转自网络,因为找到来源,所以如有侵权,随时联系揽件删除,微信:

langjianliaodashuju 。

  相关解决方案