本文章为参加2024 ospp Milvus社区的赛题:Milvus CDC 支持sink 到kafka,做的相关工作后,根据导师的要求撰写的文档。

1.当前milvus-cdc的实现架构:

Milvus cdc由两部分组成:server与core

server负责提供api接口给用户,并调用core中的方法。

core由两部分组成:reader和writer。

reader与milvus组件交互,读取相关信息。

writer被server端调用,获取reader读取的信息,转换格式,然后调用api发送给另一个milvus.

2.项目需求分析:

Milvus CDC(Change Data Capture)是一种数据增量同步工具,当前支持读取Milvus系统中的流式日志数据,并将这些数据同步到其他Milvus集群。为了扩展其应用场景,需要将数据同步至其他流式系统或存储系统,例如Kafka。具体需求分析如下:

2.1. 需求背景:

  • 现有功能:Milvus CDC 已经实现了从Milvus系统读取流式日志数据,并将这些数据同步到其他Milvus集群。

  • 扩展需求:希望实现将增量数据同步到其他流式系统(如Kafka)或存储系统,以满足增量备份、防止数据丢失和增量数据分析等需求。

2.2. 功能需求:

  • 高级别Writer API

    需要抽象出一个更高级别的Writer API,以便开发者可以自定义数据输出格式。

    • 数据格式灵活性:允许用户定义不同的数据输出格式,以适应不同的目标系统(如Kafka、其他数据库等)的需求。

  • 数据同步至Kafka

    • 数据格式转换:实现将读取到的Milvus日志数据转换为适合Kafka的消息格式。

    • Kafka Producer集成:将转换后的数据通过Kafka Producer发送到指定的Kafka主题。

2.3. 技术需求:

  • API设计

    • 抽象接口:设计一个抽象的Writer接口,定义基本的数据写入方法。

    • 具体实现:提供默认的Kafka Writer实现,以及其他可能的Writer实现(如文件存储、数据库存储等)。

  • 数据转换

    • 日志数据解析:解析Milvus日志数据,提取增量数据。

    • 数据格式化:根据目标系统的需求,将增量数据格式化为适合的消息格式。

  • Kafka集成

    • Kafka配置:配置Kafka Producer,包括Kafka集群地址、主题、消息格式等。

    • 错误处理:处理在数据同步过程中可能出现的错误,如连接失败、消息发送失败等。

2.4. 非功能需求:

  • 性能:确保数据同步过程的性能,保证在高并发和大数据量情况下的稳定性。

  • 可靠性:保证数据同步的可靠性,防止数据丢失,提供重试机制。

  • 可扩展性:API设计应具备良好的可扩展性,便于未来支持更多类型的目标系统。

  • 可维护性:代码应具有良好的可读性和可维护性,便于后期维护和功能扩展。

3.项目实现方案设计:

3.1 当前代码结构:

当前milvus-cdc writer提供三个接口:

type Writer interface {
  HandleReplicateAPIEvent(ctx context.Context, apiEvent *ReplicateAPIEvent) error
  HandleReplicateMessage(ctx context.Context, channelName string, msgPack *msgstream.MsgPack) ([]byte, []byte, error)
  HandleOpMessagePack(ctx context.Context, msgPack *msgstream.MsgPack) ([]byte, error)
}

这三个接口实现了两种同步的方式:

1.通过指定方法:server端调用这些接口,传入指定的值,维护值与map的方法,通过值调用指定的方法。

现在共有如下的方法, 在ChannelWriter文件下:

func (c *ChannelWriter) initAPIEventFuncs() {
  c.apiEventFuncs = map[api.ReplicateAPIEventType]apiEventFunc{
    api.ReplicateCreateCollection: c.createCollection,
    api.ReplicateDropCollection:   c.dropCollection,
    api.ReplicateCreatePartition:  c.createPartition,
    api.ReplicateDropPartition:    c.dropPartition,
  }
}
​
func (c *ChannelWriter) initOPMessageFuncs() {
  c.opMessageFuncs = map[commonpb.MsgType]opMessageFunc{
    commonpb.MsgType_CreateDatabase:    c.createDatabase,
    commonpb.MsgType_DropDatabase:      c.dropDatabase,
    commonpb.MsgType_Flush:             c.flush,
    commonpb.MsgType_CreateIndex:       c.createIndex,
    commonpb.MsgType_DropIndex:         c.dropIndex,
    commonpb.MsgType_LoadCollection:    c.loadCollection,
    commonpb.MsgType_ReleaseCollection: c.releaseCollection,
    commonpb.MsgType_LoadPartitions:    c.loadPartitions,
    commonpb.MsgType_ReleasePartitions: c.releasePartitions,
  }
}

这些方法主要实现了对集合的创建/删除,分区/索引操作。用户配置之后,cdc监听这个milvus中的kafka获取对应配置的日志:

image-20240528180001023

然后转换格式发送给另一个milvus

比如用户创建集合a,那么cdc就监听kafka中关于这个集合a的内容 然后去做同步。

2.通过数据同步:通过HandleReplicateMessage接口实现,原理大致相同。

3.2 实现方案:

由于需要同步数据到kafka,所以其他部分不需要改变,只需要修改writer层及其以下的方法。

遵循开闭原则,在新增一套方法的同时,不影响原来的方法与调用链路。同时为了兼顾后续的可拓展性,使其能支持后续更多的数据源,采用装饰者模式配合go接口实现。

具体来说,维持现有的interface不变。

新建一个wrtier:

type TotalWriter struct {
  MilvusWriter Writer
  kafkaWriter Writer
}
​
func (t *TotalWriter) HandleReplicateAPIEvent(ctx context.Context, apiEvent *ReplicateAPIEvent) error {
  //
}

这个新的writer持有两个writer作为对象,同时实现原接口的所有方法。

在实现的时候,可以通过ctx中的参数判断是持久化到milvus还是持久化到kafka,如果是milvus就调用milvus中原来的逻辑,并把相同的参数传过去,原来的部分则不需要做任何的修改。

再编写一个kafkaWriter,仿照当前的ChannelWriter,实现所有的逻辑,具体命令同样是通过map持久化进行映射。多余的参数放到ctx中。

这样server调用的时候,只需要修改注入的实例,从原来的ChannelWriter变为现在的TotalWriter即可。这样原先的调用代码无需做修改。

而server新增kafka的调用的额外参数通过ctx进行(制定规范,应该不会太多),如果后续新增新的数据源,只需要修改TotalWriter并新增一个新的writer注入进TotalWriter即可。

注入进入kafka的topic和其他信息不做映射,由用户自己制定。

设计可以简单用示意图表示如下:

image-20240529024058967

4.项目时间规划:

首先说明一下情况,现在大三,暑期事情不多,大四学校并未安排课程,每天可以拿出三小时以上时间开发此项目。

按照ospp官网的设计,6.26号进行中选公示,如果能提交之后就确定中选的话,进行如下的时间安排:

  1. 6.4-6.26 根据设计完善细节,包括ctx传参的格式,接口具体实现的逻辑等,并整理成完整的文档。

  2. 6.26-8.10 进行项目代码的开发,并继续交流可能遇到的问题,目前看来该题目的代码工程量应该不会特别大,估计上有较大可能提前完成开发。开发期间协商周期进行测试并提交pr。并在开发过程中编写单元测试方法。(该设计天然适配go-mock测试框架,有利于单元测试)

  3. 8.10-9.10 进行整体全流程的测试,修改可能存在的问题,优化测试用例,尝试进行性能测试优化性能,并撰写完整的文档,

  4. 9.10-9.31 继续修复bug与完善文档,如果有可能的话尝试参与社区的其他工作

如果不能提交之后确定中选,那么所有工作整体向后推一个阶段,即:

  1. 6.26-7.20根据设计完善细节,包括ctx传参的格式,接口具体实现的逻辑等,并整理成完整的文档。

  2. 7.20-8.31 进行项目代码的开发,并继续交流可能遇到的问题,目前看来该题目的代码工程量应该不会特别大,估计上有较大可能提前完成开发。开发期间协商周期进行测试并提交pr。并在开发过程中编写单元测试方法。(该设计天然适配go-mock测试框架,有利于单元测试)

  3. 9.1-9.31 进行整体全流程的测试,修改可能存在的问题,优化测试用例,尝试进行性能测试优化性能,并撰写完整的文档,如果有可能的话尝试参与社区的其他工作。