内容
活动
关注

DataX读取Hive Orc格式表丢失数据处理记录

简介: DataX读取Hive Orc格式表丢失数据处理记录

问题

问题概述

DataX读取Hive Orc存储格式表数据丢失

问题详细描述

同步Hive表将数据发送到Kafka,Hive表A数据总量如下

SQL:select count(1) from A; 数量:19397281 

使用DataX将表A数据发送到Kafka,最终打印读取数据量为12649450

任务总计耗时 : 1273s 任务平均流量 : 2.51MB/s 记录写入速度 : 9960rec/s 读出记录总数 : 12649450 读写失败总数 : 0 

在kafka中查询发送的数据为12649449(有一条垃圾数据被我写自定义KafkaWriter过滤掉了,这里忽略即可)

image-20230511152742037

原因

DataX读取HDFS Orce文件,代码有bug,当读取Hive文件大于BlockSize时会丢失数据,问题代码如下:

InputSplit[] splits = in.getSplits(conf, 1); RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL); 

代码位置:

hdfsreader模块,com.alibaba.datax.plugin.reader.hdfsreader.DFSUtil类334行代码(源码为v202210版本)

这里当文件大于BlockSize大小会将文件分为多个,但是下面只取了第一个文件splits[0],其他数据就会丢失

我们发现问题后,去验证一下,hive表存储目录查询文件存储大小如下

$ hdfs dfs -du -h /usr/hive/warehouse/dwd.db/A/dt=2022-05-05 .... 518.4 K 1.5 M /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000171_0 669.7 M 2.0 G /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000172_0 205.6 K 616.9 K /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000173_0 264.6 K 793.9 K /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000174_0 1.4 M 4.3 M /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000175_0 1.5 M 4.6 M /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000176_0 .... 

发现果然有文件669.7 M 2.0 G /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000172_0大于BlockSize大小

解决方法

修改源码

修改后方法源码如下,直接替换DFSUtil.javaorcFileStartRead方法即可

public void orcFileStartRead( String sourceOrcFilePath, Configuration readerSliceConfig, RecordSender recordSender, TaskPluginCollector taskPluginCollector) {  LOG.info(String.format("Start Read orcfile [%s].", sourceOrcFilePath)); List<ColumnEntry> column = UnstructuredStorageReaderUtil.getListColumnEntry( readerSliceConfig, com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN); String nullFormat = readerSliceConfig.getString( com.alibaba.datax.plugin.unstructuredstorage.reader.Key.NULL_FORMAT); StringBuilder allColumns = new StringBuilder(); StringBuilder allColumnTypes = new StringBuilder(); boolean isReadAllColumns = false; int columnIndexMax = -1; // 判断是否读取所有列 if (null == column || column.size() == 0) {  int allColumnsCount = getAllColumnsCount(sourceOrcFilePath); columnIndexMax = allColumnsCount - 1; isReadAllColumns = true; } else {  columnIndexMax = getMaxIndex(column); } for (int i = 0; i <= columnIndexMax; i++) {  allColumns.append("col"); allColumnTypes.append("string"); if (i != columnIndexMax) {  allColumns.append(","); allColumnTypes.append(":"); } } if (columnIndexMax >= 0) {  JobConf conf = new JobConf(hadoopConf); Path orcFilePath = new Path(sourceOrcFilePath); Properties p = new Properties(); p.setProperty("columns", allColumns.toString()); p.setProperty("columns.types", allColumnTypes.toString()); try {  OrcSerde serde = new OrcSerde(); serde.initialize(conf, p); StructObjectInspector inspector = (StructObjectInspector) serde.getObjectInspector(); InputFormat<?, ?> in = new OrcInputFormat(); FileInputFormat.setInputPaths(conf, orcFilePath.toString()); // If the network disconnected, will retry 45 times, each time the retry interval // for 20 seconds // Each file as a split InputSplit[] splits = in.getSplits(conf, -1); for (InputSplit split : splits) {  {  RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL); Object key = reader.createKey(); Object value = reader.createValue(); // 获取列信息 List<? extends StructField> fields = inspector.getAllStructFieldRefs(); List<Object> recordFields; while (reader.next(key, value)) {  recordFields = new ArrayList<Object>(); for (int i = 0; i <= columnIndexMax; i++) {  Object field = inspector.getStructFieldData(value, fields.get(i)); recordFields.add(field); } transportOneRecord( column, recordFields, recordSender, taskPluginCollector, isReadAllColumns, nullFormat); } reader.close(); } // transportOneRecord(column, recordFields, recordSender, // taskPluginCollector, isReadAllColumns, nullFormat); } // reader.close(); } catch (Exception e) {  String message = String.format("从orcfile文件路径[%s]中读取数据发生异常,请联系系统管理员。", sourceOrcFilePath); LOG.error(message); throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message); } } else {  String message = String.format( "请确认您所读取的列配置正确!columnIndexMax 小于0,column:%s", JSON.toJSONString(column)); throw DataXException.asDataXException(HdfsReaderErrorCode.BAD_CONFIG_VALUE, message); } } 

修改完成后将源码打包,打包后在hdfsreader模块下target目录有target/hdfsreader-0.0.1-SNAPSHOT.jar文件,将文件上传到部署服务器上的目录datax/plugin/reader/hdfsreader下,替换之前的包.

 [hadoop@10 /datax/plugin/reader/hdfsreader]$ pwd /datax/plugin/reader/hdfsreader [hadoop@10 /datax/plugin/reader/hdfsreader]$ ll total 52 -rw-rw-r-- 1 hadoop hadoop 28828 May 11 14:08 hdfsreader-0.0.1-SNAPSHOT.jar drwxrwxr-x 2 hadoop hadoop 8192 Dec 9 15:06 libs -rw-rw-r-- 1 hadoop hadoop 217 Oct 26 2022 plugin_job_template.json -rw-rw-r-- 1 hadoop hadoop 302 Oct 26 2022 plugin.json 

验证

重新启动DataX同步脚本,发现同步数据与Hive表保存数据一致。

目录
相关文章
|
存储 SQL Java
bigdata-18-Hive数据结构与存储格式
bigdata-18-Hive数据结构与存储格式
244 0
|
SQL 存储 HIVE
Hive中的表是如何定义的?请解释表的结构和数据类型。
Hive中的表是如何定义的?请解释表的结构和数据类型。
229 0
|
8月前
|
SQL DataX HIVE
【YashanDB知识库】DataX迁移Hive到崖山分布式
本文来自YashanDB官网,介绍通过DataX将Hive数据迁移到YashanDB的实现方法。源环境为Hive 3.1.3,目标为YashanDB 23.2.3.100。文章提供了Hive与YashanDB的建表脚本、数据类型映射及DataX配置示例,包含reader和writer插件参数设置,并通过`datax.py`执行同步任务。内容详尽展示了数据迁移的全流程。
【YashanDB知识库】DataX迁移Hive到崖山分布式
|
SQL 物联网 数据处理
"颠覆传统,Hive SQL与Flink激情碰撞!解锁流批一体数据处理新纪元,让数据决策力瞬间爆表,你准备好了吗?"
【8月更文挑战第9天】数据时代,实时性和准确性至关重要。传统上,批处理与流处理各司其职,但Apache Flink打破了这一界限,尤其Flink与Hive SQL的结合,开创了流批一体的数据处理新时代。这不仅简化了数据处理流程,还极大提升了效率和灵活性。例如,通过Flink SQL,可以轻松实现流数据与批数据的融合分析,无需在两者间切换。这种融合不仅降低了技术门槛,还为企业提供了更强大的数据支持,无论是在金融、电商还是物联网领域,都将发挥巨大作用。
175 6
|
SQL 缓存 关系型数据库
ClickHouse(19)ClickHouse集成Hive表引擎详细解析
Hive引擎允许对HDFS Hive表执行 `SELECT` 查询。目前它支持如下输入格式: -文本:只支持简单的标量列类型,除了 `Binary` - ORC:支持简单的标量列类型,除了`char`; 只支持 `array` 这样的复杂类型 - Parquet:支持所有简单标量列类型;只支持 `array` 这样的复杂类型
523 1
|
SQL 存储 分布式计算
【Hive】hive内部表和外部表的区别
【4月更文挑战第14天】【Hive】hive内部表和外部表的区别
|
存储 SQL 算法
【Hive】ORC、Parquet等列式存储的优点
【4月更文挑战第14天】【Hive】ORC、Parquet等列式存储的优点
|
SQL 存储 分布式计算
Hive【基础 01】核心概念+体系架构+数据类型+内容格式+存储格式+内外部表(部分图片来源于网络)
【4月更文挑战第6天】Hive【基础 01】核心概念+体系架构+数据类型+内容格式+存储格式+内外部表(部分图片来源于网络)
566 1
|
SQL 关系型数据库 MySQL
Hive 表注释乱码解决
Hive元数据在MySQL默认使用`latin1`字符集导致注释乱码。可通过修改MySQL配置文件`/etc/my.cnf`,在`[mysqld]`和末尾添加`character-set-server=utf8`等设置,重启MySQL。然后在Hive数据库中调整表字段、分区字段、索引注释的字符集。注意,这仅对新表生效。测试创建带注释的Hive表,问题解决。
456 0
下一篇