1- # 存在则更新不存在则新增task: lucene_upset
2- task类型: ` lucene_upset `
1+ # 存在则更新不存在则新增task: lucene_upsert
2+ task类型: ` lucene_upsert `
33## task说明:
4- + ` lucene_upset ` 类型的task支持行级数据的新增,修改和删除,由` actionColumn ` 指定特定列,该列的数据将标识行的记录是执行新增,修改或删除操作.该列的值只允许` a ` ,` u ` ,` d ` 三个值,分别代表新增,修改,删除操作.
5- + ` lucene_upset ` 类型的task实现了UPSET特性,即如果存在则更新,如果不存在则新增.是否存在是根据指定的列值进行判断,通常是主键,比如订单表的主键是订单号orderid,则根据orderid判断记录是否存在.
4+ + ` lucene_upsert ` 类型的task支持行级数据的新增,修改和删除,由` actionColumn ` 指定特定列,该列的数据将标识行的记录是执行新增,修改或删除操作.该列的值只允许` a ` ,` u ` ,` d ` 三个值,分别代表新增,修改,删除操作.
5+ + ` lucene_upsert ` 类型的task实现了UPSERT特性,即如果存在则更新,如果不存在则新增.是否存在是根据指定的列值进行判断,通常是主键,比如订单表的主键是订单号orderid,则根据orderid判断记录是否存在.
6+ + 为了简化数据处理逻辑,默认情况下无需指定` actionColumn ` ,所有记录都当做更新操作,如果存在则修改,如果不存在则新增.此时不能处理删除操作.如果需要处理删除操作,需要在` context ` 中添加` allUpSert ` 参数,并指定为` false ` .
67+ ` granularitySpec.intervals ` :指定了数据允许的最早和最晚时间,如果早于最早时间或晚于最晚时间则丢弃该行数据.
78## 参数解析
8- - ` filterColumns ` : 指定主键列
9- - ` actionColumn ` : 指定操作列,比如` action `
9+ - ` filterColumns ` : 指定主键列
10+ - ` actionColumn ` : 指定操作列,比如` action ` .
1011- ` worker ` : 制定task运行的worker节点,需要使用域名,或参考task管理界面的` Remote Workers ` 表格
1112- ` dataSchema ` : 参考其他task的配置
1213- ` tuningConfig ` : 可以不指定,使用默认配置.注意不能同时指定` maxRowsPerSegment ` 和` numShards ` .建议通过配置将segment大小控制在300MB到600MB之间.
@@ -17,11 +18,15 @@ task类型: `lucene_upset`
1718- ` writerConfig ` : 默认不指定,使用默认配置即可.
1819- ` ioConfig ` : 指定firehose的类型,支持本地文件,hdfs文件接入.
1920- ` context.throwAwayBadData ` : 是否直接丢弃解析出错的数据而不停止task.默认为false,即如果遇到解析报错的数据,则停止task.如果确定只是少量数据有问题,并且直接丢弃也不影响业务,则可设置为true(请慎重).
20- - ` context.disableUpSet ` :是否要停用upset特性.默认为false.如果停用,如果设置action为修改(` u ` ),但原有数据中找不到对应的记录,则丢弃数据,而不采用新增方式插入数据,即不执行insert操作.
21+ - ` context.disableUpSert ` :是否要停用upsert特性.默认为false.如果停用,如果设置action为修改(` u ` ),但原有数据中找不到对应的记录,则丢弃数据,而不采用新增方式插入数据,即不执行insert操作.
22+ - ` context.commitThreshold ` :写入时多少条记录commit一次,默认100.
23+ - ` context.idealPersistThreadSize ` :处理数据更新时由于从数据源读数据比较快,但写入相对较慢,因此采用了生产者消费者模式,一个生产者读取数据后往队列中写数据,默认最大开启3个消费者.
24+ - ` context.persistQueueThreshold ` :指定消息队列的大小,默认1000.
25+ - ` context.allUpSert ` : 默认为true,即所有的记录都当做更新,无需指定actionColumn参数.如果设置为false,则必须指定actionColumn.
2126## 示例json配置
2227```
2328{
24- "type": "lucene_upset ",
29+ "type": "lucene_upsert ",
2530 "filterColumns": [
2631 "old_card_no"
2732 ],
@@ -209,7 +214,8 @@ task类型: `lucene_upset`
209214 "context": {
210215 "debug": true,
211216 "throwAwayBadData": true,
212- "disableUpSet": true
217+ "disableUpSert": true,
218+ "allUpSert": true
213219 }
214220}
215221```
0 commit comments