Skip to content

Commit 75c472e

Browse files
authored
fix #472 (#473)
1 parent b551e75 commit 75c472e

File tree

2 files changed

+18
-4
lines changed

2 files changed

+18
-4
lines changed

emr-kudu/src/main/scala/org/apache/kudu/spark/kudu/KuduSourceProvider.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ class KuduSourceProvider extends DefaultSource with Logging {
7676
val writeOptions =
7777
KuduWriteOptions(ignoreDuplicateRowErrors, ignoreNull, repartition, repartitionSort)
7878

79-
new KuduUpdatableRelation(tableName, kuduMaster, operationType, schemaOption,
80-
readOptions, writeOptions)(sqlContext)
79+
new KuduUpdatableRelation(sqlContext, tableName, kuduMaster, operationType, schemaOption,
80+
readOptions, writeOptions)
8181
}
8282
}
8383

emr-kudu/src/main/scala/org/apache/kudu/spark/kudu/KuduUpdatableRelation.scala

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@ import org.apache.spark.sql._
2424
import org.apache.spark.sql.types.StructType
2525

2626
class KuduUpdatableRelation(
27+
override val sqlContext: SQLContext,
2728
override val tableName: String,
2829
override val masterAddrs: String,
2930
override val operationType: OperationType,
3031
override val userSchema: Option[StructType],
3132
override val readOptions: KuduReadOptions = new KuduReadOptions,
3233
override val writeOptions: KuduWriteOptions = new KuduWriteOptions)
33-
(override val sqlContext: SQLContext)
3434
extends KuduRelation(tableName, masterAddrs, operationType, userSchema,
3535
readOptions, writeOptions)(sqlContext) with Serializable {
3636

@@ -39,9 +39,23 @@ class KuduUpdatableRelation(
3939
def merge(data: DataFrame, opTypeColumn: Column): Unit = {
4040
val syncClient: KuduClient = KuduClientCache.getAsyncClient(masterAddrs).syncClient()
4141
val lastPropagatedTimestamp = syncClient.getLastPropagatedTimestamp
42+
val opTypeColumnName = opTypeColumn.toString()
43+
doMerge(data, opTypeColumnName, lastPropagatedTimestamp, masterAddrs, schema,
44+
tableName, readOptions, writeOptions)
45+
}
46+
47+
private def doMerge(
48+
data: DataFrame,
49+
opTypeColumnName: String,
50+
lastPropagatedTimestamp: Long,
51+
masterAddrs: String,
52+
schema: StructType,
53+
tableName: String,
54+
readOptions: KuduReadOptions,
55+
writeOptions: KuduWriteOptions): Unit = {
4256
data.toDF().foreachPartition(it => {
4357
val operator = new KuduOperator(masterAddrs)
44-
val pendingErrors = operator.writePartitionRows(it, schema, opTypeColumn.toString(),
58+
val pendingErrors = operator.writePartitionRows(it, schema, opTypeColumnName,
4559
tableName, lastPropagatedTimestamp, writeOptions)
4660
if (pendingErrors.getRowErrors.nonEmpty) {
4761
val errors = pendingErrors.getRowErrors

0 commit comments

Comments
 (0)