Milvus CDC 支持sink 到kafka
本文最后更新于 2024-06-03,文章内容可能已经过时。
本文章为参加2024 ospp Milvus社区的赛题:Milvus CDC 支持sink 到kafka,做的相关工作后,根据导师的要求撰写的文档。
1.当前milvus-cdc的实现架构:
Milvus cdc由两部分组成:server与core
server负责提供api接口给用户,并调用core中的方法。
core由两部分组成:reader和writer。
reader与milvus组件交互,读取相关信息。
writer被server端调用,获取reader读取的信息,转换格式,然后调用api发送给另一个milvus.
2.项目需求分析:
Milvus CDC(Change Data Capture)是一种数据增量同步工具,当前支持读取Milvus系统中的流式日志数据,并将这些数据同步到其他Milvus集群。为了扩展其应用场景,需要将数据同步至其他流式系统或存储系统,例如Kafka。具体需求分析如下:
2.1. 需求背景:
现有功能:Milvus CDC 已经实现了从Milvus系统读取流式日志数据,并将这些数据同步到其他Milvus集群。
扩展需求:希望实现将增量数据同步到其他流式系统(如Kafka)或存储系统,以满足增量备份、防止数据丢失和增量数据分析等需求。
2.2. 功能需求:
高级别Writer API
需要抽象出一个更高级别的Writer API,以便开发者可以自定义数据输出格式。
数据格式灵活性:允许用户定义不同的数据输出格式,以适应不同的目标系统(如Kafka、其他数据库等)的需求。
数据同步至Kafka
数据格式转换:实现将读取到的Milvus日志数据转换为适合Kafka的消息格式。
Kafka Producer集成:将转换后的数据通过Kafka Producer发送到指定的Kafka主题。
2.3. 技术需求:
API设计
抽象接口:设计一个抽象的Writer接口,定义基本的数据写入方法。
具体实现:提供默认的Kafka Writer实现,以及其他可能的Writer实现(如文件存储、数据库存储等)。
数据转换
日志数据解析:解析Milvus日志数据,提取增量数据。
数据格式化:根据目标系统的需求,将增量数据格式化为适合的消息格式。
Kafka集成
Kafka配置:配置Kafka Producer,包括Kafka集群地址、主题、消息格式等。
错误处理:处理在数据同步过程中可能出现的错误,如连接失败、消息发送失败等。
2.4. 非功能需求:
性能:确保数据同步过程的性能,保证在高并发和大数据量情况下的稳定性。
可靠性:保证数据同步的可靠性,防止数据丢失,提供重试机制。
可扩展性:API设计应具备良好的可扩展性,便于未来支持更多类型的目标系统。
可维护性:代码应具有良好的可读性和可维护性,便于后期维护和功能扩展。
3.项目实现方案设计:
3.1 当前代码结构:
当前milvus-cdc writer提供三个接口:
type Writer interface {
HandleReplicateAPIEvent(ctx context.Context, apiEvent *ReplicateAPIEvent) error
HandleReplicateMessage(ctx context.Context, channelName string, msgPack *msgstream.MsgPack) ([]byte, []byte, error)
HandleOpMessagePack(ctx context.Context, msgPack *msgstream.MsgPack) ([]byte, error)
}
这三个接口实现了两种同步的方式:
1.通过指定方法:server端调用这些接口,传入指定的值,维护值与map的方法,通过值调用指定的方法。
现在共有如下的方法, 在ChannelWriter文件下:
func (c *ChannelWriter) initAPIEventFuncs() {
c.apiEventFuncs = map[api.ReplicateAPIEventType]apiEventFunc{
api.ReplicateCreateCollection: c.createCollection,
api.ReplicateDropCollection: c.dropCollection,
api.ReplicateCreatePartition: c.createPartition,
api.ReplicateDropPartition: c.dropPartition,
}
}
func (c *ChannelWriter) initOPMessageFuncs() {
c.opMessageFuncs = map[commonpb.MsgType]opMessageFunc{
commonpb.MsgType_CreateDatabase: c.createDatabase,
commonpb.MsgType_DropDatabase: c.dropDatabase,
commonpb.MsgType_Flush: c.flush,
commonpb.MsgType_CreateIndex: c.createIndex,
commonpb.MsgType_DropIndex: c.dropIndex,
commonpb.MsgType_LoadCollection: c.loadCollection,
commonpb.MsgType_ReleaseCollection: c.releaseCollection,
commonpb.MsgType_LoadPartitions: c.loadPartitions,
commonpb.MsgType_ReleasePartitions: c.releasePartitions,
}
}
这些方法主要实现了对集合的创建/删除,分区/索引操作。用户配置之后,cdc监听这个milvus中的kafka获取对应配置的日志:
然后转换格式发送给另一个milvus
比如用户创建集合a,那么cdc就监听kafka中关于这个集合a的内容 然后去做同步。
2.通过数据同步:通过HandleReplicateMessage接口实现,原理大致相同。
3.2 实现方案:
由于需要同步数据到kafka,所以其他部分不需要改变,只需要修改writer层及其以下的方法。
遵循开闭原则,在新增一套方法的同时,不影响原来的方法与调用链路。同时为了兼顾后续的可拓展性,使其能支持后续更多的数据源,采用装饰者模式配合go接口实现。
具体来说,维持现有的interface不变。
新建一个wrtier:
type TotalWriter struct {
MilvusWriter Writer
kafkaWriter Writer
}
func (t *TotalWriter) HandleReplicateAPIEvent(ctx context.Context, apiEvent *ReplicateAPIEvent) error {
//
}
这个新的writer持有两个writer作为对象,同时实现原接口的所有方法。
在实现的时候,可以通过ctx中的参数判断是持久化到milvus还是持久化到kafka,如果是milvus就调用milvus中原来的逻辑,并把相同的参数传过去,原来的部分则不需要做任何的修改。
再编写一个kafkaWriter,仿照当前的ChannelWriter,实现所有的逻辑,具体命令同样是通过map持久化进行映射。多余的参数放到ctx中。
这样server调用的时候,只需要修改注入的实例,从原来的ChannelWriter变为现在的TotalWriter即可。这样原先的调用代码无需做修改。
而server新增kafka的调用的额外参数通过ctx进行(制定规范,应该不会太多),如果后续新增新的数据源,只需要修改TotalWriter并新增一个新的writer注入进TotalWriter即可。
注入进入kafka的topic和其他信息不做映射,由用户自己制定。
设计可以简单用示意图表示如下:
4.项目时间规划:
首先说明一下情况,现在大三,暑期事情不多,大四学校并未安排课程,每天可以拿出三小时以上时间开发此项目。
按照ospp官网的设计,6.26号进行中选公示,如果能提交之后就确定中选的话,进行如下的时间安排:
6.4-6.26 根据设计完善细节,包括ctx传参的格式,接口具体实现的逻辑等,并整理成完整的文档。
6.26-8.10 进行项目代码的开发,并继续交流可能遇到的问题,目前看来该题目的代码工程量应该不会特别大,估计上有较大可能提前完成开发。开发期间协商周期进行测试并提交pr。并在开发过程中编写单元测试方法。(该设计天然适配go-mock测试框架,有利于单元测试)
8.10-9.10 进行整体全流程的测试,修改可能存在的问题,优化测试用例,尝试进行性能测试优化性能,并撰写完整的文档,
9.10-9.31 继续修复bug与完善文档,如果有可能的话尝试参与社区的其他工作
如果不能提交之后确定中选,那么所有工作整体向后推一个阶段,即:
6.26-7.20根据设计完善细节,包括ctx传参的格式,接口具体实现的逻辑等,并整理成完整的文档。
7.20-8.31 进行项目代码的开发,并继续交流可能遇到的问题,目前看来该题目的代码工程量应该不会特别大,估计上有较大可能提前完成开发。开发期间协商周期进行测试并提交pr。并在开发过程中编写单元测试方法。(该设计天然适配go-mock测试框架,有利于单元测试)
9.1-9.31 进行整体全流程的测试,修改可能存在的问题,优化测试用例,尝试进行性能测试优化性能,并撰写完整的文档,如果有可能的话尝试参与社区的其他工作。