内容
活动
关注

CreateCollection API执行流程_milvus源码解析

简介: CreateCollection API执行流程_milvus源码解析

CreateCollection API执行流程源码解析

milvus版本:v2.3.2

CreateCollection这个API流程较长,也是milvus的核心API之一,涉及的内容比较复杂。这里只介绍和元数据相关的流程。

整体架构:

architecture.png

CreateCollection 的数据流向:

create_collection数据流向.jpg

1.客户端sdk发出CreateCollection API请求。

from pymilvus import ( connections, FieldSchema, CollectionSchema, DataType, Collection, ) num_entities, dim = 3000, 1024 print("start connecting to Milvus") connections.connect("default", host="192.168.230.71", port="19530") fields = [ FieldSchema(name="pk", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=100), FieldSchema(name="random", dtype=DataType.DOUBLE), FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim) ] schema = CollectionSchema(fields, "hello_milvus is the simplest demo to introduce the APIs") print("Create collection `hello_milvus`") hello_milvus = Collection("hello_milvus", schema, consistency_level="Strong",shards_num=2) 

客户端SDK向proxy发送一个CreateCollection API请求,创建一个名为hello_milvus的collection。

hello_milvus.jpg

2.客户端接受API请求,将request封装为createCollectionTask,并压入ddQueue队列。

代码路径:internal\proxy\impl.go

func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {  ...... // request封装为task cct := &createCollectionTask{  ctx: ctx, Condition: NewTaskCondition(ctx), CreateCollectionRequest: request, rootCoord: node.rootCoord, } ...... // 将task压入ddQueue队列 if err := node.sched.ddQueue.Enqueue(cct); err != nil {  ...... } ...... // 等待cct执行完 if err := cct.WaitToFinish(); err != nil {  ...... } ...... } 

3.执行createCollectionTask的3个方法PreExecute、Execute、PostExecute。

PreExecute()一般为参数校验等工作。

Execute()一般为真正执行逻辑。

PostExecute()执行完后的逻辑,什么都不做,返回nil。

代码路径:internal\proxy\task.go

func (t *createCollectionTask) Execute(ctx context.Context) error {  var err error t.result, err = t.rootCoord.CreateCollection(ctx, t.CreateCollectionRequest) return err } 

从代码可以看出调用了rootCoord的CreateCollection接口。

4.进入rootCoord的CreateCollection接口。

代码路径:internal\rootcoord\root_coord.go

继续将请求封装为rootcoord里的createCollectionTask

func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {  ...... // 封装为createCollectionTask t := &createCollectionTask{  baseTask: newBaseTask(ctx, c), Req: in, } // 加入调度 if err := c.scheduler.AddTask(t); err != nil {  ...... } // 等待task完成 if err := t.WaitToFinish(); err != nil {  ...... } ...... } 

5.执行createCollectionTask的Prepare、Execute、NotifyDone方法。

Execute()为核心方法。

代码路径:internal\rootcoord\create_collection_task.go

func (t *createCollectionTask) Execute(ctx context.Context) error {  // collID为collectionID,在Prepare()里分配 // partIDs为partitionID,在Prepare()里分配 collID := t.collID partIDs := t.partIDs // 产生时间戳 ts, err := t.getCreateTs() if err != nil {  return err } // vchanNames为虚拟channel,在Prepare()里分配 // chanNames为物理channel,在Prepare()里分配 vchanNames := t.channels.virtualChannels chanNames := t.channels.physicalChannels startPositions, err := t.addChannelsAndGetStartPositions(ctx, ts) if err != nil {  t.core.chanTimeTick.removeDmlChannels(t.channels.physicalChannels...) return err } // 填充partition,创建collection的时候,默认只有一个名为"Default partition"的partition。 partitions := make([]*model.Partition, len(partIDs)) for i, partID := range partIDs {  partitions[i] = &model.Partition{  PartitionID: partID, PartitionName: t.partitionNames[i], PartitionCreatedTimestamp: ts, CollectionID: collID, State: pb.PartitionState_PartitionCreated, } } // 填充collection // 可以看出collection由collID、dbid、schemaName、fields、vchanName、chanName、partition、shardNum等组成 collInfo := model.Collection{  CollectionID: collID, DBID: t.dbID, Name: t.schema.Name, Description: t.schema.Description, AutoID: t.schema.AutoID, Fields: model.UnmarshalFieldModels(t.schema.Fields), VirtualChannelNames: vchanNames, PhysicalChannelNames: chanNames, ShardsNum: t.Req.ShardsNum, ConsistencyLevel: t.Req.ConsistencyLevel, StartPositions: toKeyDataPairs(startPositions), CreateTime: ts, State: pb.CollectionState_CollectionCreating, Partitions: partitions, Properties: t.Req.Properties, EnableDynamicField: t.schema.EnableDynamicField, } clone := collInfo.Clone() existedCollInfo, err := t.core.meta.GetCollectionByName(ctx, t.Req.GetDbName(), t.Req.GetCollectionName(), typeutil.MaxTimestamp) if err == nil {  equal := existedCollInfo.Equal(*clone) if !equal {  return fmt.Errorf("create duplicate collection with different parameters, collection: %s", t.Req.GetCollectionName()) } log.Warn("add duplicate collection", zap.String("collection", t.Req.GetCollectionName()), zap.Uint64("ts", ts)) return nil } // 分为多个step执行,每一个undoTask由todoStep和undoStep构成 // 执行todoStep,报错则执行undoStep undoTask := newBaseUndoTask(t.core.stepExecutor) undoTask.AddStep(&expireCacheStep{  baseStep: baseStep{ core: t.core}, dbName: t.Req.GetDbName(), collectionNames: []string{ t.Req.GetCollectionName()}, collectionID: InvalidCollectionID, ts: ts, }, &nullStep{ }) undoTask.AddStep(&nullStep{ }, &removeDmlChannelsStep{  baseStep: baseStep{ core: t.core}, pChannels: chanNames, }) undoTask.AddStep(&addCollectionMetaStep{  baseStep: baseStep{ core: t.core}, coll: &collInfo, }, &deleteCollectionMetaStep{  baseStep: baseStep{ core: t.core}, collectionID: collID, ts: ts, }) undoTask.AddStep(&nullStep{ }, &unwatchChannelsStep{  baseStep: baseStep{ core: t.core}, collectionID: collID, channels: t.channels, isSkip: !Params.CommonCfg.TTMsgEnabled.GetAsBool(), }) undoTask.AddStep(&watchChannelsStep{  baseStep: baseStep{ core: t.core}, info: &watchInfo{  ts: ts, collectionID: collID, vChannels: t.channels.virtualChannels, startPositions: toKeyDataPairs(startPositions), schema: &schemapb.CollectionSchema{  Name: collInfo.Name, Description: collInfo.Description, AutoID: collInfo.AutoID, Fields: model.MarshalFieldModels(collInfo.Fields), }, }, }, &nullStep{ }) undoTask.AddStep(&changeCollectionStateStep{  baseStep: baseStep{ core: t.core}, collectionID: collID, state: pb.CollectionState_CollectionCreated, ts: ts, }, &nullStep{ }) return undoTask.Execute(ctx) } 

创建collection涉及多个步骤,可以看出这里依次分为expireCacheStep、addCollectionMetaStep、watchChannelsStep、changeCollectionStateStep这几个步骤,关于etcd元数据的操作,这里重点关注addCollectionMetaStep。其余step另用篇幅进行讲解。

6.进入addCollectionMetaStep,执行其Execute()方法。

代码路径:internal\rootcoord\step.go

func (s *addCollectionMetaStep) Execute(ctx context.Context) ([]nestedStep, error) {  err := s.core.meta.AddCollection(ctx, s.coll) return nil, err } 

在这里重点研究s.core.meta.AddCollection()这个方法做了什么事情。

调用栈如下:

CreateCollection()(internal\proxy\impl.go) |--Execute()(internal\proxy\task.go) |--t.rootCoord.CreateCollection()(同上) |--CreateCollection()(rpc调用,internal\rootcoord\root_coord.go) |--Execute()(internal\rootcoord\create_collection_task.go) |--Execute()(internal\rootcoord\step.go) |--s.core.meta.AddCollection() |--AddCollection()(internal\rootcoord\meta_table.go) |--mt.catalog.CreateCollection() |--CreateCollection()(internal\metastore\kv\rootcoord\kv_catalog.go) |--kc.Snapshot.Save() |--etcd.SaveByBatchWithLimit() 

create_collection堆栈.jpg

在etcd产生collection相关的key:

==root-coord/database/collection-info/1/445652621026918798==

value的值的结构为etcdpb.CollectionInfo,然后进行protobuf序列化后存入etcd。

因此etcd存储的是二进制数据。

collSchema := &schemapb.CollectionSchema{  Name: coll.Name, Description: coll.Description, AutoID: coll.AutoID, EnableDynamicField: coll.EnableDynamicField, } collectionPb := &pb.CollectionInfo{  ID: coll.CollectionID, DbId: coll.DBID, Schema: collSchema, CreateTime: coll.CreateTime, VirtualChannelNames: coll.VirtualChannelNames, PhysicalChannelNames: coll.PhysicalChannelNames, ShardsNum: coll.ShardsNum, ConsistencyLevel: coll.ConsistencyLevel, StartPositions: coll.StartPositions, State: coll.State, Properties: coll.Properties, } 

collectionInfo.jpg

可以看出collection由ID、DbId、schema等组成,其中schema不记录Fields,也不记录partitionID、partitionName、FieldIndex。其它信息由另外的key-value记录。

func (kc *Catalog) CreateCollection(ctx context.Context, coll *model.Collection, ts typeutil.Timestamp) error {  if coll.State != pb.CollectionState_CollectionCreating {  return fmt.Errorf("cannot create collection with state: %s, collection: %s", coll.State.String(), coll.Name) } // 构建key的规则 k1 := BuildCollectionKey(coll.DBID, coll.CollectionID) collInfo := model.MarshalCollectionModel(coll) // 序列化 v1, err := proto.Marshal(collInfo) if err != nil {  return fmt.Errorf("failed to marshal collection info: %s", err.Error()) } // 写入etcd if err := kc.Snapshot.Save(k1, string(v1), ts); err != nil {  return err } ...... } 

跟踪BuildCollectionKey()函数,不难得出key的规则。整理如下:

key规则:

  • 前缀/root-coord/database/collection-info/{dbID}/{collectionID}
  • 前缀/snapshots/root-coord/database/collection-info/{dbID}/{collectionID}_ts{时间戳}

根据路径能够反映出collection属于哪个DB。默认数据库名为default,dbID为1。

在etcd还会产生partition相关的key:

==root-coord/partitions/445653146967736660/445653146967736661==

value的值的结构为etcdpb.PartitionInfo,然后进行protobuf序列化后存入etcd。

因此etcd存储的是二进制数据。

&pb.PartitionInfo{  PartitionID: partition.PartitionID, PartitionName: partition.PartitionName, PartitionCreatedTimestamp: partition.PartitionCreatedTimestamp, CollectionId: partition.CollectionID, State: partition.State, } 

partitionInfo.jpg

可以看出来partition包括partitionID、partitionName、collectionId等。

for _, partition := range coll.Partitions {  k := BuildPartitionKey(coll.CollectionID, partition.PartitionID) partitionInfo := model.MarshalPartitionModel(partition) v, err := proto.Marshal(partitionInfo) if err != nil {  return err } kvs[k] = string(v) } 

跟踪BuildPartitionKey()函数,不难得出key的规则。整理如下:

key规则:

  • 前缀/root-coord/partitions/{collectionID}/{partitionID}

  • 前缀/snapshots/root-coord/partitions/{collectionID}/{partitionID}_ts{时间戳}

由路径可以反映出partition属于哪个collection。

一个collection可以包含多个partition。默认partition名为:_default。

分区名称可配置(milvus.yml):

common.defaultPartitionName 

在etcd还会产生field相关的key:

==root-coord/fields/445653146967736660/100==

value的值的结构为schemapb.FieldSchema ,然后进行protobuf序列化后存入etcd。

因此etcd存储的是二进制数据。

&schemapb.FieldSchema{  FieldID: field.FieldID, Name: field.Name, IsPrimaryKey: field.IsPrimaryKey, Description: field.Description, DataType: field.DataType, TypeParams: field.TypeParams, IndexParams: field.IndexParams, AutoID: field.AutoID, IsDynamic: field.IsDynamic, IsPartitionKey: field.IsPartitionKey, DefaultValue: field.DefaultValue, ElementType: field.ElementType, } 

fieldInfo.jpg

fieldInfo记录了字段的filedID、name、description、datatype等信息。

for _, field := range coll.Fields {  k := BuildFieldKey(coll.CollectionID, field.FieldID) fieldInfo := model.MarshalFieldModel(field) v, err := proto.Marshal(fieldInfo) if err != nil {  return err } kvs[k] = string(v) } 

跟踪BuildFieldKey()函数,不难得出key的规则。整理如下:

key规则:

  • 前缀/root-coord/fields/{collectionID}/{fieldID}
  • 前缀/snapshots/root-coord/fields/{collectionID}/{fieldID}_ts{时间戳}

从路径可以反映field属于哪个collection。一个field就是一个字段。

kvs.jpg

将kvs批量写入etcd。kvs既有partition,又有field。

完整代码:

func (kc *Catalog) CreateCollection(ctx context.Context, coll *model.Collection, ts typeutil.Timestamp) error {  if coll.State != pb.CollectionState_CollectionCreating {  return fmt.Errorf("cannot create collection with state: %s, collection: %s", coll.State.String(), coll.Name) } // 构建collection的key规则 k1 := BuildCollectionKey(coll.DBID, coll.CollectionID) // 填充collection collInfo := model.MarshalCollectionModel(coll) // 序列化 v1, err := proto.Marshal(collInfo) if err != nil {  return fmt.Errorf("failed to marshal collection info: %s", err.Error()) } // 写入etcd,最终会写入2个key,一个原始的,一个加snapshots if err := kc.Snapshot.Save(k1, string(v1), ts); err != nil {  return err } kvs := map[string]string{ } // 构建partition for _, partition := range coll.Partitions {  // 构建partition的key规则 k := BuildPartitionKey(coll.CollectionID, partition.PartitionID) // 填充partition partitionInfo := model.MarshalPartitionModel(partition) // 序列化 v, err := proto.Marshal(partitionInfo) if err != nil {  return err } kvs[k] = string(v) } // 构建field for _, field := range coll.Fields {  // 构建field的key规则 k := BuildFieldKey(coll.CollectionID, field.FieldID) // 填充field fieldInfo := model.MarshalFieldModel(field) // 序列化 v, err := proto.Marshal(fieldInfo) if err != nil {  return err } kvs[k] = string(v) } // 批量写入etcd,传入一个key,最终会写入2个key,一个原始的,一个加snapshots return etcd.SaveByBatchWithLimit(kvs, maxTxnNum/2, func(partialKvs map[string]string) error {  return kc.Snapshot.MultiSave(partialKvs, ts) }) } 

使用etcd-manager查看etcd。

partition-key.jpg

field-key.jpg

客户端SDK使用了3个field,分别是pk、random、embeddings。

fields = [ FieldSchema(name="pk", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=100), FieldSchema(name="random", dtype=DataType.DOUBLE), FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim) ] 

每一个field都分配有一个fieldID,例如本例中pk分配100、random分配101、embedding分配102。

但是注意还会产生2个fieldID,一个为0、一个为1。

总结:

1.CreateCollection由proxy传递给协调器rootCoord操作etcd。

2.CreateCollection最终会在etcd上写入3种类型的key

  • collection

    前缀/root-coord/database/collection-info/{dbID}/{collectionID}

  • partition

    前缀/root-coord/partitions/{collectionID}/{partitionID}

  • field

    前缀/root-coord/fields/{collectionID}/{fieldID}

目录
相关文章
|
30天前
|
存储 缓存 算法
淘宝买家秀 API 深度开发:多模态内容解析与合规推荐技术拆解
本文详解淘宝买家秀接口(taobao.reviews.get)的合规调用、数据标准化与智能推荐全链路方案。涵盖权限申请、多模态数据清洗、情感分析、混合推荐模型及缓存优化,助力开发者提升审核效率60%、商品转化率增长28%,实现UGC数据高效变现。
|
1月前
|
XML 数据采集 API
用Lxml高效解析XML格式数据:以天气API为例
免费Python教程:实战解析中国天气网XML数据,详解Lxml库高效解析技巧、XPath用法、流式处理大文件及IP封禁应对策略,助你构建稳定数据采集系统。
153 0
|
29天前
|
缓存 监控 前端开发
顺企网 API 开发实战:搜索 / 详情接口从 0 到 1 落地(附 Elasticsearch 优化 + 错误速查)
企业API开发常陷参数、缓存、错误处理三大坑?本指南拆解顺企网双接口全流程,涵盖搜索优化、签名验证、限流应对,附可复用代码与错误速查表,助你2小时高效搞定开发,提升响应速度与稳定性。
|
1月前
|
JSON 算法 API
Python采集淘宝商品评论API接口及JSON数据返回全程指南
Python采集淘宝商品评论API接口及JSON数据返回全程指南
|
2月前
|
数据可视化 测试技术 API
从接口性能到稳定性:这些API调试工具,让你的开发过程事半功倍
在软件开发中,接口调试与测试对接口性能、稳定性、准确性及团队协作至关重要。随着开发节奏加快,传统方式已难满足需求,专业API工具成为首选。本文介绍了Apifox、Postman、YApi、SoapUI、JMeter、Swagger等主流工具,对比其功能与适用场景,并推荐Apifox作为集成度高、支持中文、可视化强的一体化解决方案,助力提升API开发与测试效率。
|
1月前
|
JSON API 数据安全/隐私保护
Python采集淘宝拍立淘按图搜索API接口及JSON数据返回全流程指南
通过以上流程,可实现淘宝拍立淘按图搜索的完整调用链路,并获取结构化的JSON商品数据,支撑电商比价、智能推荐等业务场景。
|
3月前
|
JSON 算法 安全
淘宝商品详情API接口系列,json数据返回
淘宝开放平台提供了多种API接口用于获取商品详情信息,主要通过 淘宝开放平台(Taobao Open Platform, TOP) 的 taobao.tbk.item.info.get(淘宝客商品详情)或 taobao.item.get(标准商品API)等接口实现。以下是关键信息及JSON返回示例:
|
27天前
|
人工智能 自然语言处理 测试技术
Apipost智能搜索:只需用业务语言描述需求,就能精准定位目标接口,API 搜索的下一代形态!
在大型项目中,API 数量庞大、命名不一,导致“找接口”耗时费力。传统工具依赖关键词搜索,难以应对语义模糊或命名不规范的场景。Apipost AI 智能搜索功能,支持自然语言查询,如“和用户登录有关的接口”,系统可理解语义并精准匹配目标接口。无论是新人上手、模糊查找还是批量定位,都能大幅提升检索效率,降低协作成本。从关键词到语义理解,智能搜索让开发者少花时间找接口,多专注核心开发,真正实现高效协作。
|
2月前
|
JSON 前端开发 API
如何调用体育数据足篮接口API
本文介绍如何调用体育数据API:首先选择可靠服务商并注册获取密钥,接着阅读文档了解基础URL、端点、参数及请求头,然后使用Python等语言发送请求、解析JSON数据,最后将数据应用于Web、App或分析场景,同时注意密钥安全、速率限制与错误处理。

推荐镜像

查看更多
  • DNS
  • 下一篇