Skip to content

Commit 9288ca7

Browse files
authored
Fix ots npe problem caused by computeParams and filter incompatibility (#502)
1 parent d9e8e15 commit 9288ca7

File tree

5 files changed

+60
-20
lines changed

5 files changed

+60
-20
lines changed

emr-tablestore/src/main/java/com/aliyun/openservices/tablestore/hadoop/TableStore.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,24 @@ public static void setFilterPushdownConfig(Configuration conf, FilterPushdownCon
105105
conf.set(FILTER_PUSHDOWN_CONFIG, filterPushdownConfigSerialize.serialize());
106106
}
107107

108+
/**
109+
* Set ComputeParams(maxSplitsCount, splitSizeInMBs, computeMode) into a Configuration.
110+
*/
111+
public static void setComputeParams(Configuration conf, ComputeParams cp) {
112+
Preconditions.checkNotNull(conf, "conf must be nonnull");
113+
Preconditions.checkNotNull(cp, "cp must be nonnull");
114+
conf.set(TableStoreInputFormat.COMPUTE_PARAMS, cp.serialize());
115+
}
116+
117+
/**
118+
* Set table name into a Configuration.
119+
*/
120+
public static void setTableName(Configuration conf, String tableName) {
121+
Preconditions.checkNotNull(conf, "conf must be nonnull");
122+
Preconditions.checkNotNull(tableName, "tableName must be nonnull");
123+
conf.set(TableStoreInputFormat.TABLE_NAME, tableName);
124+
}
125+
108126
/**
109127
* for internal use only
110128
*/

emr-tablestore/src/main/java/com/aliyun/openservices/tablestore/hadoop/TableStoreInputFormat.java

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ public static void addCriteria(Configuration conf, RangeRowQueryCriteria criteri
125125
}
126126
cri.addCriteria(criteria);
127127
conf.set(CRITERIA, cri.serialize());
128+
conf.set(TABLE_NAME, criteria.getTableName());
128129
}
129130

130131
/**
@@ -169,25 +170,31 @@ public List<InputSplit> getSplits(JobContext job)
169170
* for internal usage only
170171
*/
171172
public static List<InputSplit> getSplits(Configuration conf, SyncClientInterface syncClient) {
172-
Filter filter = null;
173-
List<String> requiredColumns = null;
174-
TableStoreFilterWritable origFilter = TableStoreFilterWritable.deserialize(conf.get(FILTER));
175-
if (origFilter != null) {
176-
filter = origFilter.getFilter();
177-
requiredColumns = origFilter.getRequiredColumns();
173+
Filter filter = new Filter(Filter.CompareOperator.EMPTY_FILTER);
174+
List<String> requiredColumns = new ArrayList<>();
175+
if (conf.get(FILTER) != null) {
176+
TableStoreFilterWritable origFilter = TableStoreFilterWritable.deserialize(conf.get(FILTER));
177+
if (origFilter != null) {
178+
filter = origFilter.getFilter();
179+
requiredColumns = origFilter.getRequiredColumns();
180+
LOG.info("Set customed filter and requiredColumns: {}", requiredColumns);
181+
}
178182
}
179183

180-
ComputeParams cp = ComputeParams.deserialize(conf.get(COMPUTE_PARAMS));
181-
ComputeParameters.ComputeMode computeMode = ComputeParameters.ComputeMode.valueOf(cp.getComputeMode());
182-
ComputeParameters computeParams;
183-
LOG.info("Compute mode: {}, max splits: {}, split size: {}MB, seachIndexName: {}",
184-
cp.getComputeMode(), cp.getMaxSplitsCount(), cp.getSplitSizeInMBs(), cp.getSearchIndexName());
185-
if (computeMode == ComputeParameters.ComputeMode.Search && !cp.getSearchIndexName().isEmpty()) {
186-
LOG.info("Generate Search compute parameters");
187-
computeParams = new ComputeParameters(cp.getSearchIndexName(), cp.getMaxSplitsCount());
188-
} else {
189-
computeParams = new ComputeParameters(cp.getMaxSplitsCount(), cp.getSplitSizeInMBs(), computeMode);
184+
ComputeParameters computeParams = new ComputeParameters();
185+
if (conf.get(COMPUTE_PARAMS) != null) {
186+
ComputeParams cp = ComputeParams.deserialize(conf.get(COMPUTE_PARAMS));
187+
ComputeParameters.ComputeMode computeMode = ComputeParameters.ComputeMode.valueOf(cp.getComputeMode());
188+
LOG.info("Compute mode: {}, max splits: {}, split size: {}MB, seachIndexName: {}",
189+
cp.getComputeMode(), cp.getMaxSplitsCount(), cp.getSplitSizeInMBs(), cp.getSearchIndexName());
190+
if (computeMode == ComputeParameters.ComputeMode.Search && !cp.getSearchIndexName().isEmpty()) {
191+
LOG.info("Generate Search compute parameters");
192+
computeParams = new ComputeParameters(cp.getSearchIndexName(), cp.getMaxSplitsCount());
193+
} else {
194+
computeParams = new ComputeParameters(cp.getMaxSplitsCount(), cp.getSplitSizeInMBs(), computeMode);
195+
}
190196
}
197+
191198
if (splitManager == null) {
192199
synchronized (TableStoreInputFormat.class) {
193200
LOG.info("Initial split manager in tablestore inputformat");

emr-tablestore/src/main/java/com/aliyun/openservices/tablestore/hive/TableStoreConsts.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,12 @@ public class TableStoreConsts {
2626
final public static String ACCESS_KEY_SECRET = "tablestore.access_key_secret";
2727
final public static String SECURITY_TOKEN = "tablestore.security_token";
2828
final public static String MAX_UPDATE_BATCH_SIZE = "tablestore.max_update_batch_size";
29+
final public static String COMPUTE_MODE = "tablestore.compute.mode";
30+
final public static String MAX_SPLIT_COUNT = "tablestore.max.split.count";
31+
final public static String SPLIT_SIZE_MBS = "tablestore.split.size.mbs";
2932

3033
final public static String COLUMNS_MAPPING = "tablestore.columns.mapping";
34+
final public static String FILTER = "filters";
3135

3236
final public static String[] REQUIRES = new String[] {
3337
ENDPOINT,

emr-tablestore/src/main/java/com/aliyun/openservices/tablestore/hive/TableStoreInputFormat.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@
1818

1919
package com.aliyun.openservices.tablestore.hive;
2020

21+
import java.util.Arrays;
2122
import java.util.List;
2223
import java.util.ArrayList;
2324
import java.io.IOException;
2425

26+
import com.alicloud.openservices.tablestore.ecosystem.Filter;
27+
import com.aliyun.openservices.tablestore.hadoop.*;
2528
import org.apache.hadoop.mapred.InputFormat;
2629
import org.apache.hadoop.mapred.RecordReader;
27-
import com.aliyun.openservices.tablestore.hadoop.PrimaryKeyWritable;
28-
import com.aliyun.openservices.tablestore.hadoop.RowWritable;
29-
import com.aliyun.openservices.tablestore.hadoop.TableStore;
3030
import org.slf4j.Logger;
3131
import org.slf4j.LoggerFactory;
3232

@@ -73,6 +73,9 @@ public class TableStoreInputFormat implements InputFormat<PrimaryKeyWritable, Ro
7373
RangeRowQueryCriteria criteria = fetchCriteria(meta, columns);
7474
com.aliyun.openservices.tablestore.hadoop.TableStoreInputFormat
7575
.addCriteria(dest, criteria);
76+
dest.set(TableStoreConsts.FILTER, new TableStoreFilterWritable(
77+
new Filter(Filter.CompareOperator.EMPTY_FILTER),
78+
Arrays.asList(columns.split(","))).serialize());
7679
splits = com.aliyun.openservices.tablestore.hadoop.TableStoreInputFormat
7780
.getSplits(dest, ots);
7881
} finally {
@@ -116,6 +119,14 @@ private static Configuration translateConfig(Configuration from) {
116119
endpoint, instance);
117120
}
118121
TableStore.setEndpoint(to, ep);
122+
TableStore.setTableName(to, from.get(TableStoreConsts.TABLE_NAME));
123+
}
124+
{
125+
ComputeParams computeParams = new ComputeParams(
126+
from.getInt(TableStoreConsts.MAX_SPLIT_COUNT, 1000),
127+
from.getLong(TableStoreConsts.SPLIT_SIZE_MBS, 100),
128+
from.getTrimmed(TableStoreConsts.COMPUTE_MODE, "KV"));
129+
TableStore.setComputeParams(to, computeParams);
119130
}
120131
return to;
121132
}

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
<scala.binary.version>2.11</scala.binary.version>
5353
<spark.version>2.4.3</spark.version>
5454
<oss.sdk.version>3.0.0</oss.sdk.version>
55-
<tablestore.sdk.version>5.10.0</tablestore.sdk.version>
55+
<tablestore.sdk.version>5.10.2</tablestore.sdk.version>
5656
<odps.version>0.28.4-public</odps.version>
5757
<loghubb.client.version>0.6.15</loghubb.client.version>
5858
<aliyun.sdk.core>4.3.2</aliyun.sdk.core>

0 commit comments

Comments
 (0)