内容
活动
关注

Insert API执行流程_milvus源码解析

简介: Insert API执行流程_milvus源码解析

Insert API执行流程源码解析

milvus版本:v2.3.2

Insert这个API写入数据,流程较长,是milvus的核心API之一,本文介绍大致的写入流程。

整体架构:

architecture.png

Insert 的数据流向:

insert数据流向.jpg

1.客户端sdk发出Insert API请求。

import numpy as np from pymilvus import ( connections, FieldSchema, CollectionSchema, DataType, Collection, ) num_entities, dim = 2000, 8 print("start connecting to Milvus") connections.connect("default", host="192.168.230.71", port="19530") fields = [ FieldSchema(name="pk", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=100), FieldSchema(name="random", dtype=DataType.DOUBLE), FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim) ] schema = CollectionSchema(fields, "hello_milvus is the simplest demo to introduce the APIs") print("Create collection `hello_milvus`") hello_milvus = Collection("hello_milvus", schema, consistency_level="Strong",shards_num=2) print("Start inserting entities") rng = np.random.default_rng(seed=19530) entities = [ # provide the pk field because `auto_id` is set to False [str(i) for i in range(num_entities)], rng.random(num_entities).tolist(), # field random, only supports list rng.random((num_entities, dim)), # field embeddings, supports numpy.ndarray and list ] insert_result = hello_milvus.insert(entities) hello_milvus.flush() 

客户端SDK向proxy发送一个Insert API请求,向数据库写入数据。

这个例子向数据库写入2000条数据,每条数据是一个8维向量。

insert_milvus.jpg

2.客户端接受API请求,将request封装为insertTask,并压入dmQueue队列。

注意这里是dmQueue。DDL类型的是ddQueue。

代码路径:internal\proxy\impl.go

// Insert insert records into collection. func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.MutationResult, error) {  ...... // request封装为task it := &insertTask{  ctx: ctx, Condition: NewTaskCondition(ctx), insertMsg: &msgstream.InsertMsg{  BaseMsg: msgstream.BaseMsg{  HashValues: request.HashKeys, }, InsertRequest: msgpb.InsertRequest{  Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_Insert), commonpbutil.WithMsgID(0), commonpbutil.WithSourceID(paramtable.GetNodeID()), ), DbName: request.GetDbName(), CollectionName: request.CollectionName, PartitionName: request.PartitionName, FieldsData: request.FieldsData, NumRows: uint64(request.NumRows), Version: msgpb.InsertDataVersion_ColumnBased, }, }, idAllocator: node.rowIDAllocator, segIDAssigner: node.segAssigner, chMgr: node.chMgr, chTicker: node.chTicker, } ...... // 将task压入dmQueue队列 if err := node.sched.dmQueue.Enqueue(it); err != nil {  ...... } ...... // 等待任务执行完 if err := it.WaitToFinish(); err != nil {  ...... } ...... } 

InsertRequest结构:

type InsertRequest struct {  Base *commonpb.MsgBase DbName string CollectionName string PartitionName string FieldsData []*schemapb.FieldData HashKeys []uint32 NumRows uint32 XXX_NoUnkeyedLiteral struct{ } XXX_unrecognized []byte XXX_sizecache int32 } type FieldData struct {  Type DataType FieldName string // Types that are valid to be assigned to Field: // // *FieldData_Scalars // *FieldData_Vectors Field isFieldData_Field FieldId int64 IsDynamic bool XXX_NoUnkeyedLiteral struct{ } XXX_unrecognized []byte XXX_sizecache int32 } type isFieldData_Field interface {  isFieldData_Field() } type FieldData_Scalars struct {  Scalars *ScalarField } type FieldData_Vectors struct {  Vectors *VectorField } 

客户端通过grpc发送数据,FieldData.Field存储接受的数据。

isFieldData_Field是一个接口,有2个实现:FieldData_Scalars和FieldData_Vectors。

真正存储数据的就是这2个实现。

3.执行insertTask的3个方法PreExecute、Execute、PostExecute。

PreExecute()一般为参数校验等工作。

Execute()一般为真正执行逻辑。

PostExecute()执行完后的逻辑,什么都不做,返回nil。

代码路径:internal\proxy\task_insert.go

func (it *insertTask) Execute(ctx context.Context) error {  ...... collectionName := it.insertMsg.CollectionName // 根据collectionName得到collectionID collID, err := globalMetaCache.GetCollectionID(it.ctx, it.insertMsg.GetDbName(), collectionName) log := log.Ctx(ctx) if err != nil {  ...... } it.insertMsg.CollectionID = collID getCacheDur := tr.RecordSpan() // 得到stream,类型为mqMsgStream stream, err := it.chMgr.getOrCreateDmlStream(collID) if err != nil {  return err } getMsgStreamDur := tr.RecordSpan() // by-dev-rootcoord-dml_0_445811557825249939v0 // by-dev-rootcoord-dml_1_445811557825249939v1 // 如果shardNum=2,则获取2个虚拟channel channelNames, err := it.chMgr.getVChannels(collID) if err != nil {  ...... } ...... // assign segmentID for insert data and repack data by segmentID // msgPck包含segmentID var msgPack *msgstream.MsgPack if it.partitionKeys == nil {  // 分配segmentID // 重新打包为2个msgstream.TsMsg,分别发送给2个虚拟通道 msgPack, err = repackInsertData(it.TraceCtx(), channelNames, it.insertMsg, it.result, it.idAllocator, it.segIDAssigner) } else {  msgPack, err = repackInsertDataWithPartitionKey(it.TraceCtx(), channelNames, it.partitionKeys, it.insertMsg, it.result, it.idAllocator, it.segIDAssigner) } if err != nil {  ...... } ...... // 生产数据,将数据写入mq err = stream.Produce(msgPack) if err != nil {  ...... } ...... } 

总结:

1.Insert由proxy向mq(pulsar)写入数据。通过虚拟channel写入。

2.在pulsar创建topic,向topic写入数据。

目录
相关文章
|
1月前
|
JSON API 数据安全/隐私保护
Python采集淘宝拍立淘按图搜索API接口及JSON数据返回全流程指南
通过以上流程,可实现淘宝拍立淘按图搜索的完整调用链路,并获取结构化的JSON商品数据,支撑电商比价、智能推荐等业务场景。
|
2月前
|
JSON API 数据安全/隐私保护
Python采集淘宝评论API接口及JSON数据返回全流程指南
Python采集淘宝评论API接口及JSON数据返回全流程指南
JSON 安全 API
84 0
JSON 监控 API
74 0
|
2月前
|
存储 域名解析 弹性计算
阿里云上云流程参考:云服务器+域名+备案+域名解析绑定,全流程图文详解
对于初次通过阿里云完成上云的企业和个人用户来说,很多用户不仅是需要选购云服务器,同时还需要注册域名以及完成备案和域名的解析相关流程,从而实现网站的上线。本文将以上云操作流程为核心,结合阿里云的活动政策与用户系统梳理云服务器选购、域名注册、备案申请及域名绑定四大关键环节,以供用户完成线上业务部署做出参考。
|
2月前
|
人工智能 API 开发者
图文教程:阿里云百炼API-KEY获取方法,亲测全流程
本文详细介绍了如何获取阿里云百炼API-KEY,包含完整流程与截图指引。需先开通百炼平台及大模型服务,再通过控制台创建并复制API-KEY。目前平台提供千万tokens免费额度,适合开发者快速上手使用。
1748 5
|
3月前
|
JSON 缓存 供应链
API 接口驱动 1688 采购自动化:从商品获取到下单支付的全流程贯通
在B2B电商采购中,1688开放平台通过API实现商品筛选、比价、下单、支付及物流跟踪的全流程自动化,大幅提升采购效率,降低人工成本与错误率。企业可无缝对接ERP系统,实现数据驱动决策,显著优化采购周期、成本与风险管控,助力数字化转型。
|
3月前
|
JSON 自然语言处理 供应链
API接口赋能1688采购全流程:从商品获取到下单支付一键贯通
1688采购API助力企业实现全流程自动化,涵盖商品数据获取、智能比价、一键下单、支付及物流跟踪等环节,显著提升采购效率,降低成本与风险,推动B2B采购模式智能化升级。
|
3月前
|
缓存 Java API
Spring WebFlux 2025 实操指南详解高性能非阻塞 API 开发全流程核心技巧
本指南基于Spring WebFlux 2025最新技术栈,详解如何构建高性能非阻塞API。涵盖环境搭建、响应式数据访问、注解与函数式两种API开发模式、响应式客户端使用、测试方法及性能优化技巧,助你掌握Spring WebFlux全流程开发核心实践。
631 0

推荐镜像

查看更多
  • DNS
  • 下一篇