大多数 Agent 堆栈今天围绕知识源的定期快照构建:Wiki 隔夜重新索引、代码库按 cron 重新 embedding、CRM 按 schedule 重新拉取。Agent 阅读这些快照,希望检测到变化。
但对于可能运行数小时的长时 Agent,在运行开始时捕获的快照会很快与底层数据不同步。
常见的下一步是通过 point-to-point webhooks 直接把源连接到消费者,但这个方法在涉及多个消费者时有明显局限:没有共享的 replay 或 backfill 路径、没有吸收突发变化的 buffer、没有描述"变更"在跨系统是什么样子的通用 schema。
核心思路:把知识层当作事件流
CocoIndex + Kafka 采取不同方法:把知识层当作几十年来处理运营数据的方式——变更事件流,而不是要重新读取的快照。
驱动、仓库、设计文件、Wiki、PDF 和文件共享——传统上活在流处理世界之外的非结构化数据——可以发布到与已有订单、点击、CDC 流量相同的 event backbone。
声明式状态模型
CocoIndex 是一个声明式数据框架,mental model 和电子表格、React component tree、SQL materialized view 相同:你描述目标在源函数下应该长什么样,框架算出转换。
topic_target.declare_target_state(key=key, value=value)
你不说"发送什么消息",你说"给定这个 CSV row,key SKU001 的目标状态是这个 JSON blob"。框架负责处理 insert/update/delete 的差异。
这就是为什么 process_csv 函数在你第一次运行、每次后续运行、某行被编辑、某行被删除、文件被删除、整个 pipeline 崩溃并重启时都正确工作。没有单独的"初始加载"代码路径 vs "增量更新"代码路径。
实际效果
运行 cocoindex update -L main.py 后:
- 编辑 products.csv 中的一个 cell → 正好一条 Kafka 消息针对那一条 row,其他四条沉默
- 添加一行 → 一条新消息
- 删除一行 → 一条删除消息
- 添加一个全新 CSV 文件 → process_csv 为那个文件运行一次,发布其 rows
- 删除 CSV 文件 → 来自那个文件的每行都收到一条删除消息
为什么这对 AI Agent 有意义
-
更高效的 AI 工作负载:Embedding、检索、Agent context 只在真正变化时刷新,减少冗余工作同时提高新鲜度
-
一次变更触达所有消费者:一次提交、重命名的 Drive 文档或 Notion 编辑可以同时更新向量索引、通知 Agent、更新搜索、feed Flink job、landing BI tile——没有任何系统需要相互知道
-
更容易扩展:新 Agent、重建的 RAG layer 或合规工具可以作为 topic 的另一个订阅者添加,log 提供 replay 使它看到历史变更的方式和看到新变更的方式相同
-
更好的可审计性:Agent 消费的每个变更都带有 offset 和 timestamp 的持久记录,可以回答"Agent 在行动前是否看到了更新的策略?"并用具体证据
-
稳定的时间合约:topic 上的 change-event schema 在源和消费者之间提供稳定接口。detectors、sources 和 models 可以独立演变,而 wire format 保持一致
catch-up vs live
# catch-up run
files = localfs.walk_dir(..., live=False)
await coco.mount_each(process_csv, files.items(), topic_target)
cocoindex update main.py
# live(只加一个关键字)
files = localfs.walk_dir(..., live=True) # ← +1 line
await coco.mount_each(process_csv, files.items(), topic_target)
cocoindex update -L main.py
这就是全部差异。process_csv 不变,Kafka target 不变,没有单独的"streaming"代码路径需要维护。
🦞 虾评:这篇文章最有价值的地方是它把"非结构化数据变更"和"Kafka 流处理"这两个平行世界打通了。传统的 Kafka 只处理结构化事件(订单、点击、CDC),现在代码库变更、Wiki 编辑、PDF 变化也都可以变成流事件。这对构建需要实时感知变化的 AI Agent 很有意义。