{"content":{"title":"云原生 Kafka：提高链上数据同步的可靠和一致性","body":"作者：[ddl](https://chainbase.com/blog/article/the-cloud-native-kafka-boosting-data-synchronization-reliability-and-consistency)\r\n\r\n\r\n# 背景\r\n\r\n---\r\n\r\n在现有区块链世界，有很多业务场景需要实时获取链上和链下的数据，进一步加工数据价值。譬如基于数据筛选的 SmartMoney 交易监控，基于数据统计的实时TVL交易决策等等。这些场景都需要丰富的流式数据集的支持，Chainbase 开放性地提供基于「数据集」的实时 Sync 模块解决此类问题\r\n\r\n# 挑战\r\n\r\n---\r\n\r\n当前市面上大多数实时消息流推送，基本上都是基于 WebSocket 和 Webhook 模式，两者都无法保证 exact once 语意，即保证消息刚好被消费一次。\r\n\r\nWebSocket 可以实现实时的通信，但由于网络的不稳定性和消息传输的复杂性，可能会导致消息的乱序、丢失或重复。\r\n\r\n并且 HTTP 请求是无状态的，无法保证请求的可靠性和顺序性。如果网络传输中发生错误，或者服务端没有正确处理重试和幂等性，就有可能导致重复的请求或丢失的请求，从而无法保证「exact once」的语义。\r\n\r\n而 Kafka 的优势在于它支持消息的可重放功能。Kafka 作为消息队列，它可以持久化消息至磁盘。所以在保证「at least once」语意的同时，它也支持消息的重播功能，具有「效率一次」和「待机重放」的特性。\r\n\r\n这对于需要建立在数据不丢失基础上的流式应用尤为重要。只有具备消息重放能力的系统，才能确保数据处理的可靠性和一致性，满足「exact once」语意。比如在实时计算、日志收集和分析等应用场景,如果遇到错误,可以从头重新计算。这让Kafka可以更好地支持复杂流式数据处理需求。\r\n\r\n# 思路\r\n\r\n### 基本介绍\r\n\r\n---\r\n\r\n[Kafka](https://chainbase.com/sync/kafka) 是一种分布式流处理平台，用于高容量、低延迟的数据传输和持久化存储。它使用发布订阅的消息队列模型，基于分布式日志存储的方式来处理消息。主题是消息发布的目的地，可以被分为多个分区，每个分区在不同的存储节点上分布着。生产者负责将消息发布到指定主题中，消费者根据订阅的主题和分区从 Kafka 获取消息。Kafka 使用位移来跟踪消费状态和保证消息的顺序，同时使用分布式日志存储来持久化消息。\r\n\r\n![1-Introducing Chainbase Kafka.png](https://static.chainbase.online/1_Introducing_Chainbase_Kafka_cc2bd8f35e.png)\r\n\r\n在我们的[托管 Kafka 服务](https://chainbase.com/sync/kafka)中，为了满足区块链场景中的数据消费端到端有序性，我们限制了 topic 为单分区。\r\n\r\n### 区块链数据 ETL 到 Kafka\r\n\r\n---\r\n\r\n\r\n![2-Introducing Chainbase Kafka.png](https://static.chainbase.online/2_Introducing_Chainbase_Kafka_3be66846b9.png)\r\n\r\n[caption](https://excalidraw.com/#json=K24K9NN13ZNPkH9_r8hGh,_NKm2j1AnsGkYIVZoIfptA)\r\n\r\n当前 Chainbase 支持多链 (ethereum, bsc, polygon..) 以及多种类型数据 (log, transactions, transfer…)  Topic 进行消费；\r\n\r\n- **RawData**\r\n    \r\n    我们自建了多链的RPC节点，提供稳定可靠的RawData数据，字段格式和主流ETL工具保持一致，具体schema可以参考[2]。区块链数据公开透明，具有很好的可组合性。我们提供blocks, logs , transactions, traces, contracts等原始数据类型，用户可以结合自己业务场景自由组合这些RawData。下面给出了一条transaction message示例。\r\n    \r\n    ```jsx\r\n    {\r\n    \"type\": \"transaction\",\r\n    \"hash\": \"0xd310285f40c898ab6da873f4a4b769fb838cc618be495cf880c3a9e3f2e6dea4\",\r\n    \"nonce\": 24719,\r\n    \"transaction_index\": 1,\r\n    \"from_address\": \"0x759ec1b3326de6fd4ba316f65a6f689c4e4c3092\",\r\n    \"to_address\": \"0x759ec1b3326de6fd4ba316f65a6f689c4e4c3092\",\r\n    \"value\": 100000,\r\n    \"gas\": 31000,\r\n    \"gas_price\": 449542706643,\r\n    \"input\": \"0x\",\r\n    \"block_timestamp\": 1689253991,\r\n    \"block_number\": 17684799,\r\n    \"block_hash\": \"0x2b2e1e5cfce445a1ff5227eaff0ec8d6c335cf6b7e00e0bc05abc468bcdc9b89\",\r\n    \"max_fee_per_gas\": 526493200000,\r\n    \"max_priority_fee_per_gas\": 426493200000,\r\n    \"transaction_type\": 2,\r\n    \"receipt_cumulative_gas_used\": 141613,\r\n    \"receipt_gas_used\": 21000,\r\n    \"receipt_contract_address\": null,\r\n    \"receipt_root\": null,\r\n    \"receipt_status\": 1,\r\n    \"receipt_effective_gas_price\": 449542706643,\r\n    \"item_id\": \"transaction_0xd310285f40c898ab6da873f4a4b769fb838cc618be495cf880c3a9e3f2e6dea4\",\r\n    \"item_timestamp\": \"2023-07-13T13:13:11Z\"\r\n    }\r\n    ```\r\n    \r\n\r\n- **加工数据**\r\n    \r\n    同时为了满足更多链上交易监控的需求，我们对 Transfer 等数据进行了富化（Enrich）聚合，更加方便直观的洞察链上交易行为。后续如果有反馈，我们还将提供更丰富的聚合数据类型，帮助客户更快、更准确地获取实时链上数据。\r\n    \r\n    ```json\r\n    {\r\n    \t\"value\": \"41063984478246667122325862\",\r\n    \t\"block_number\": 17862713,\r\n    \t\"block_timestamp\": 1691407127,\r\n    \t\"block_hash\": \"0x5a134e7a3da9c51581f15f962f2beded4f87449dc708bd79df2ef148a4a219fc\",\r\n    \t\"transaction_index\": 59,\r\n    \t\"transaction_hash\": \"0x4ff499e45269101ef2444975013c78abafd612dfef7f603e62dca884d4f79158\",\r\n    \t\"contract_address\": \"0x6d23e40e776c5d3505f0a8e2b434425121d9818e\",\r\n    \t\"log_index\": 159,\r\n    \t\"from_address\": \"0xb6b6768a21cc3354dbc413739432c8d8213c1628\",\r\n    \t\"to_address\": \"0xe38b9f6d7a58aca5531e383d4357825a075c65ec\",\r\n    \t\"token_metas\": {\r\n    \t\t\"symbol\": \"DCI\",\r\n    \t\t\"name\": \"DeltaCore Integrations\",\r\n    \t\t\"decimals\": 18\r\n    \t}\r\n    }\r\n    \r\n    ```\r\n    \r\n\r\n# kafka 运维\r\n\r\n---\r\n\r\nChainbase 作为开放的 Web3 数据基础设施，具有多年海量数据运维经验，使用我们的[托管 kafka 服务](https://chainbase.com/sync/kafka)，用户可以做到真正的开箱即用，完善的ACL权限管理，丰富的指标监控，节省大量时间！具体来说有下面的优势：\r\n\r\n- **简化管理**：托管 Kafka 解放了客户的管理负担。托管提供了自动化的集群配置、部署、监控和维护，减轻了运维团队的工作量。\r\n- **高可靠性**：托管 Kafka 提供高「可用性和冗余」机制，确保消息的持久性和可靠性。具备备份和故障恢复策略，可以有效地应对硬件故障或其他意外情况。\r\n- **弹性伸缩**：托管 Kafka 会根据需求动态调整「容量和吞吐量」，从而提供弹性和可伸缩性。通过简单的控制界面或API，可以根据负载的变化来增加或减少Kafka集群的规模。\r\n- **安全加固**：托管 Kafka 提供数据加密、身份验证和访问控制等安全特性。这确保了数据的保密性和完整性，帮助组织符合合规性要求。\r\n- \r\n\r\n# 收益\r\n\r\n---\r\n\r\n1. 数据端到端一致\r\n    \r\n    基于 WebSocket 或 Webhook 等网络协议请求，无法保证「端到端」的数据一致性。比如，客户端发送数据包到服务器端，数据包在网络中传输，经过多个路由器和网络设备。在某个路由器或网络设备上，由于网络拥塞或故障，数据包丢失。丢失的数据包无法到达服务器端，导致数据丢失。服务器端可能会在特定时间间隔内等待数据包的到达，如果超过等待时间仍未接收到数据包，则数据丢失。而基于 Kafka 则为我们服务端提供了「端到端」数据一致性的可能性\r\n    \r\n\r\n1. 生态丰富\r\n    \r\n    在开源和商业的ETL（Extract, Transform, Load）工具中集成 [Kafka](https://chainbase.com/sync/kafka) 是非常常见的做法。Kafka作为一种高性能、可扩展的分布式消息队列系统，提供了可靠的数据传输和持久化存储。将Kafka与ETL集成可以带来多个好处。\r\n    \r\n    首先，[Kafka](https://chainbase.com/sync/kafka) 作为一个可靠的数据管道，可以作为 ETL 过程中数据的源头或目的地，通过Kafka的消息队列模型可以确保数据的顺序性和可靠性。其次，Kafka的高吞吐量和低延迟特性使得ETL能够更快速地处理和传输大量数据。\r\n    \r\n    此外，[Kafka](https://chainbase.com/sync/kafka) 可以与其他组件和工具集成，例如流处理引擎（如 Apache Flink、Apache Spark Streaming），开源ETL工具 airbyte、流数据库 RisingWave 等都支持 source or sink 到 kafka。通过集成Kafka，ETL工具可以实现实时数据处理、数据流转和数据交换等功能，帮助组织更好地管理和分析海量数据，并支持实时决策和洞察力。\r\n    \r\n2. 开放的数据集\r\n    \r\n    开放数据集与 [Kafka](https://chainbase.com/sync/kafka) 的实时订阅和同步结合使用，为区块链数据实时分析系统提供了多样性的数据源、丰富的分析模型和场景，以及精确的数据验证，同时促进数据共享和开放。这种结合为我们带来更全面、准确的区块链数据视角，推动技术进步并加速问题解决。\r\n    \r\n\r\n# 参考\r\n\r\n---\r\n\r\n[1] [A Detailed Introduction to Kafka Components – Bhanu's Blog](https://blog.bhanunadar.com/a-detailed-introduction-to-kafka-components/)\r\n\r\n[2] https://ethereum-etl.readthedocs.io/en/latest/schema/\r\n\r\n原文链接：[The Cloud-Native Kafka: Boosting Data Synchronization Reliability and Consistency](https://chainbase.com/blog/article/the-cloud-native-kafka-boosting-data-synchronization-reliability-and-consistency)"},"author":{"user":"https://learnblockchain.cn/people/16124","address":null},"history":"QmPhv3G4793fk5Yr6LtXyAKb1MckX55ZWQ5ssZn6cSzTFJ","timestamp":1694425122,"version":1}