问题
问题概述
DataX读取Hive Orc存储格式表数据丢失
问题详细描述
同步Hive表将数据发送到Kafka,Hive表A数据总量如下
SQL:select count(1) from A; 数量:19397281 使用DataX将表A数据发送到Kafka,最终打印读取数据量为12649450
任务总计耗时 : 1273s 任务平均流量 : 2.51MB/s 记录写入速度 : 9960rec/s 读出记录总数 : 12649450 读写失败总数 : 0 在kafka中查询发送的数据为12649449(有一条垃圾数据被我写自定义KafkaWriter过滤掉了,这里忽略即可)

原因
DataX读取HDFS Orce文件,代码有bug,当读取Hive文件大于BlockSize时会丢失数据,问题代码如下:
InputSplit[] splits = in.getSplits(conf, 1); RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL); 代码位置:
hdfsreader模块,com.alibaba.datax.plugin.reader.hdfsreader.DFSUtil类334行代码(源码为v202210版本)
这里当文件大于BlockSize大小会将文件分为多个,但是下面只取了第一个文件splits[0],其他数据就会丢失
我们发现问题后,去验证一下,hive表存储目录查询文件存储大小如下
$ hdfs dfs -du -h /usr/hive/warehouse/dwd.db/A/dt=2022-05-05 .... 518.4 K 1.5 M /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000171_0 669.7 M 2.0 G /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000172_0 205.6 K 616.9 K /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000173_0 264.6 K 793.9 K /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000174_0 1.4 M 4.3 M /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000175_0 1.5 M 4.6 M /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000176_0 .... 发现果然有文件669.7 M 2.0 G /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000172_0大于BlockSize大小
解决方法
修改源码
修改后方法源码如下,直接替换DFSUtil.java中orcFileStartRead方法即可
public void orcFileStartRead( String sourceOrcFilePath, Configuration readerSliceConfig, RecordSender recordSender, TaskPluginCollector taskPluginCollector) { LOG.info(String.format("Start Read orcfile [%s].", sourceOrcFilePath)); List<ColumnEntry> column = UnstructuredStorageReaderUtil.getListColumnEntry( readerSliceConfig, com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN); String nullFormat = readerSliceConfig.getString( com.alibaba.datax.plugin.unstructuredstorage.reader.Key.NULL_FORMAT); StringBuilder allColumns = new StringBuilder(); StringBuilder allColumnTypes = new StringBuilder(); boolean isReadAllColumns = false; int columnIndexMax = -1; // 判断是否读取所有列 if (null == column || column.size() == 0) { int allColumnsCount = getAllColumnsCount(sourceOrcFilePath); columnIndexMax = allColumnsCount - 1; isReadAllColumns = true; } else { columnIndexMax = getMaxIndex(column); } for (int i = 0; i <= columnIndexMax; i++) { allColumns.append("col"); allColumnTypes.append("string"); if (i != columnIndexMax) { allColumns.append(","); allColumnTypes.append(":"); } } if (columnIndexMax >= 0) { JobConf conf = new JobConf(hadoopConf); Path orcFilePath = new Path(sourceOrcFilePath); Properties p = new Properties(); p.setProperty("columns", allColumns.toString()); p.setProperty("columns.types", allColumnTypes.toString()); try { OrcSerde serde = new OrcSerde(); serde.initialize(conf, p); StructObjectInspector inspector = (StructObjectInspector) serde.getObjectInspector(); InputFormat<?, ?> in = new OrcInputFormat(); FileInputFormat.setInputPaths(conf, orcFilePath.toString()); // If the network disconnected, will retry 45 times, each time the retry interval // for 20 seconds // Each file as a split InputSplit[] splits = in.getSplits(conf, -1); for (InputSplit split : splits) { { RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL); Object key = reader.createKey(); Object value = reader.createValue(); // 获取列信息 List<? extends StructField> fields = inspector.getAllStructFieldRefs(); List<Object> recordFields; while (reader.next(key, value)) { recordFields = new ArrayList<Object>(); for (int i = 0; i <= columnIndexMax; i++) { Object field = inspector.getStructFieldData(value, fields.get(i)); recordFields.add(field); } transportOneRecord( column, recordFields, recordSender, taskPluginCollector, isReadAllColumns, nullFormat); } reader.close(); } // transportOneRecord(column, recordFields, recordSender, // taskPluginCollector, isReadAllColumns, nullFormat); } // reader.close(); } catch (Exception e) { String message = String.format("从orcfile文件路径[%s]中读取数据发生异常,请联系系统管理员。", sourceOrcFilePath); LOG.error(message); throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message); } } else { String message = String.format( "请确认您所读取的列配置正确!columnIndexMax 小于0,column:%s", JSON.toJSONString(column)); throw DataXException.asDataXException(HdfsReaderErrorCode.BAD_CONFIG_VALUE, message); } } 修改完成后将源码打包,打包后在hdfsreader模块下target目录有target/hdfsreader-0.0.1-SNAPSHOT.jar文件,将文件上传到部署服务器上的目录datax/plugin/reader/hdfsreader下,替换之前的包.
[hadoop@10 /datax/plugin/reader/hdfsreader]$ pwd /datax/plugin/reader/hdfsreader [hadoop@10 /datax/plugin/reader/hdfsreader]$ ll total 52 -rw-rw-r-- 1 hadoop hadoop 28828 May 11 14:08 hdfsreader-0.0.1-SNAPSHOT.jar drwxrwxr-x 2 hadoop hadoop 8192 Dec 9 15:06 libs -rw-rw-r-- 1 hadoop hadoop 217 Oct 26 2022 plugin_job_template.json -rw-rw-r-- 1 hadoop hadoop 302 Oct 26 2022 plugin.json 验证
重新启动DataX同步脚本,发现同步数据与Hive表保存数据一致。