Skip to content

Commit a170c57

Browse files
hoaihuongbkhuong.vuong
andauthored
Add Hudi connector (#257)
* Add Hudi connector * Fix missing save action * Specified Hudi spark version per profile * Update Hudi test case * Update Spark 2.3 Hudi version * Fix Hudi test with dedicated spark session * Update Hudi version for Spark 2.3 * Update work version for different spark profile 3.0 & 2.4 * Add HudiConnectorConf Tests * Add more test cases Co-authored-by: huong.vuong <huong.vuong@grabtaxi.com>
1 parent 91a5780 commit a170c57

File tree

9 files changed

+261
-0
lines changed

9 files changed

+261
-0
lines changed

CONTRIBUTING.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,15 @@ export SPARK_VER=2.4
5555
./dev/test.sh
5656
```
5757

58+
Note: in some case you get the following error
59+
```
60+
java.net.BindException: Can't assign requested address: Service 'sparkDriver'
61+
```
62+
then you have to bind the spark to local ip like this
63+
```shell
64+
export SPARK_LOCAL_IP=127.0.0.1
65+
```
66+
5867
## Styleguide
5968

6069
### Commit styleguide

pom.xml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,9 @@
9898
<maven.site.plugin.version>3.9.1</maven.site.plugin.version>
9999
<nexus.staging.maven.plugin.version>1.6.8</nexus.staging.maven.plugin.version>
100100
<typesafe.config.version>1.4.2</typesafe.config.version>
101+
<hudi.version>0.11.0</hudi.version>
102+
<hudi.bundle.version>spark3.2-bundle</hudi.bundle.version>
103+
<spark.avro.version>3.0.2</spark.avro.version>
101104
</properties>
102105

103106
<dependencies>
@@ -178,6 +181,18 @@
178181
<version>${delta.version}</version>
179182
</dependency>
180183

184+
<dependency>
185+
<groupId>org.apache.hudi</groupId>
186+
<artifactId>hudi-${hudi.bundle.version}_${scala.compat.version}</artifactId>
187+
<version>${hudi.version}</version>
188+
</dependency>
189+
190+
<dependency>
191+
<groupId>org.apache.spark</groupId>
192+
<artifactId>spark-avro_${scala.compat.version}</artifactId>
193+
<version>${spark.avro.version}</version>
194+
</dependency>
195+
181196
<!-- TYPESAFE CONFIG -->
182197
<dependency>
183198
<groupId>com.typesafe</groupId>
@@ -393,6 +408,8 @@
393408
<spark.dynamodb.version>1.0.4</spark.dynamodb.version>
394409
<delta.version>0.6.1</delta.version>
395410
<apache.hadoop.version>2.9.2</apache.hadoop.version>
411+
<hudi.bundle.version>spark2.4-bundle</hudi.bundle.version>
412+
<spark.avro.version>2.4.8</spark.avro.version>
396413
</properties>
397414
</profile>
398415
<profile>
@@ -404,6 +421,9 @@
404421
<spark.dynamodb.version>1.0.4</spark.dynamodb.version>
405422
<delta.version>0.6.1</delta.version>
406423
<apache.hadoop.version>2.9.2</apache.hadoop.version>
424+
<hudi.bundle.version>spark-bundle</hudi.bundle.version>
425+
<hudi.version>0.7.0</hudi.version>
426+
<spark.avro.version>2.4.4</spark.avro.version>
407427
</properties>
408428
</profile>
409429
<profile>
@@ -413,6 +433,8 @@
413433
<spark.compat.version>3.0</spark.compat.version>
414434
<spark.cassandra.connector.version>3.0.0</spark.cassandra.connector.version>
415435
<delta.version>0.7.0</delta.version>
436+
<hudi.bundle.version>spark3.0.3-bundle</hudi.bundle.version>
437+
<hudi.version>0.10.1</hudi.version>
416438
</properties>
417439
</profile>
418440
<profile>

src/main/java/io/github/setl/enums/Storage.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ public enum Storage {
1313
JSON("io.github.setl.storage.connector.JSONConnector"),
1414
JDBC("io.github.setl.storage.connector.JDBCConnector"),
1515
STRUCTURED_STREAMING("io.github.setl.storage.connector.StructuredStreamingConnector"),
16+
HUDI("io.github.setl.storage.connector.HudiConnector"),
1617
OTHER(null);
1718

1819
private String connectorName;
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package io.github.setl.config
2+
3+
import io.github.setl.exception.ConfException
4+
import org.apache.spark.sql.SaveMode
5+
6+
class HudiConnectorConf extends ConnectorConf {
7+
8+
import HudiConnectorConf._
9+
10+
override def getReaderConf: Map[String, String] = {
11+
import scala.collection.JavaConverters._
12+
settings.asScala.toMap - PATH
13+
}
14+
15+
override def getWriterConf: Map[String, String] = {
16+
import scala.collection.JavaConverters._
17+
settings.asScala.toMap - SAVEMODE - PATH
18+
}
19+
20+
def setPath(path: String): this.type = set("path", path)
21+
22+
def setSaveMode(saveMode: String): this.type = set("saveMode", saveMode)
23+
24+
def setSaveMode(saveMode: SaveMode): this.type = set("saveMode", saveMode.toString)
25+
26+
def getPath: String = get("path") match {
27+
case Some(path) => path
28+
case _ => throw new ConfException("The value of path is not set")
29+
}
30+
31+
def getSaveMode: SaveMode = SaveMode.valueOf(get("saveMode", SaveMode.Append.toString))
32+
33+
}
34+
35+
object HudiConnectorConf {
36+
def fromMap(options: Map[String, String]): HudiConnectorConf = new HudiConnectorConf().set(options)
37+
38+
val SAVEMODE: String = "saveMode"
39+
val PATH: String = "path"
40+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package io.github.setl.storage.connector
2+
3+
import com.typesafe.config.Config
4+
import io.github.setl.config.{Conf, HudiConnectorConf}
5+
import io.github.setl.enums.Storage
6+
import io.github.setl.internal.HasReaderWriter
7+
import io.github.setl.util.TypesafeConfigUtils
8+
import org.apache.spark.sql._
9+
10+
class HudiConnector(val options: HudiConnectorConf) extends Connector with HasReaderWriter {
11+
override val storage: Storage = Storage.HUDI
12+
13+
def this(options: Map[String, String]) = this(HudiConnectorConf.fromMap(options))
14+
15+
def this(config: Config) = this(TypesafeConfigUtils.getMap(config))
16+
17+
def this(conf: Conf) = this(conf.toMap)
18+
19+
override val reader: DataFrameReader = {
20+
spark.read
21+
.format("hudi")
22+
.options(options.getReaderConf)
23+
}
24+
25+
override val writer: DataFrame => DataFrameWriter[Row] = (df: DataFrame) => {
26+
df.write
27+
.format("hudi")
28+
.mode(options.getSaveMode)
29+
.options(options.getWriterConf)
30+
}
31+
32+
/**
33+
* Read data from the data source
34+
*
35+
* @return a [[DataFrame]]
36+
*/
37+
@throws[java.io.FileNotFoundException](s"${options.getPath} doesn't exist")
38+
@throws[org.apache.spark.sql.AnalysisException](s"${options.getPath} doesn't exist")
39+
override def read(): DataFrame = {
40+
logDebug(s"Reading ${storage.toString} file in: '${options.getPath}'")
41+
this.setJobDescription(s"Read file(s) from '${options.getPath}'")
42+
reader.load(options.getPath)
43+
}
44+
45+
/**
46+
* Write a [[DataFrame]] into the data storage
47+
*
48+
* @param t a [[DataFrame]] to be saved
49+
* @param suffix for data connectors that support suffix (e.g. [[FileConnector]],
50+
* add the given suffix to the save path
51+
*/
52+
override def write(t: DataFrame, suffix: Option[String]): Unit = {
53+
if (suffix.isDefined) logWarning("Suffix is not supported in HudiConnector")
54+
write(t)
55+
}
56+
57+
/**
58+
* Write a [[DataFrame]] into the data storage
59+
*
60+
* @param t a [[DataFrame]] to be saved
61+
*/
62+
override def write(t: DataFrame): Unit = {
63+
this.setJobDescription(s"Write file to ${options.getPath}")
64+
writer(t).save(options.getPath)
65+
}
66+
}

src/test/resources/application.conf

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,3 +252,14 @@ schemaConverter {
252252
saveMode = "ErrorIfExists"
253253
}
254254
}
255+
256+
hudi {
257+
test {
258+
path = "${project.basedir}/src/test/resources/test_hudi_7"
259+
saveMode = "Overwrite"
260+
hoodie.table.name = "test_object_7"
261+
hoodie.datasource.write.recordkey.field = "col1"
262+
hoodie.datasource.write.precombine.field = "col4"
263+
hoodie.datasource.write.table.type = "MERGE_ON_READ"
264+
}
265+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package io.github.setl.config
2+
3+
import io.github.setl.exception.ConfException
4+
import org.scalatest.funsuite.AnyFunSuite
5+
import org.apache.spark.sql.SaveMode
6+
7+
class HudiConnectorConfSuite extends AnyFunSuite {
8+
val conf = new HudiConnectorConf
9+
10+
test("Get/Set HudiConnectorConf") {
11+
assert(conf.get("saveMode") === None)
12+
conf.setSaveMode("Append")
13+
assert(conf.getSaveMode === SaveMode.Append)
14+
conf.setSaveMode("Overwrite")
15+
assert(conf.getSaveMode === SaveMode.Overwrite)
16+
conf.setSaveMode(SaveMode.Overwrite)
17+
assert(conf.getSaveMode === SaveMode.Overwrite)
18+
19+
assert(conf.get("path") === None)
20+
assertThrows[ConfException](conf.getPath)
21+
22+
conf.setPath("path")
23+
assert(conf.getPath === "path")
24+
}
25+
26+
test("Init HudiConnectorConf from options") {
27+
val options : Map[String, String] = Map(
28+
"path" -> "path",
29+
"saveMode" -> "Append",
30+
"hoodie.table.name" -> "test_object",
31+
"hoodie.datasource.write.recordkey.field" -> "col1",
32+
"hoodie.datasource.write.precombine.field" -> "col4",
33+
"hoodie.datasource.write.table.type" -> "MERGE_ON_READ"
34+
)
35+
36+
val confFromOpts: HudiConnectorConf = HudiConnectorConf.fromMap(options)
37+
assert(confFromOpts.getPath === "path")
38+
assert(confFromOpts.getSaveMode === SaveMode.Append)
39+
40+
val readerOpts = confFromOpts.getReaderConf
41+
val writerOpts = confFromOpts.getWriterConf
42+
43+
// Config should not contains path & save mode
44+
assert(!readerOpts.contains("path"))
45+
assert(!writerOpts.contains("path"))
46+
assert(!writerOpts.contains("saveMode"))
47+
}
48+
}

src/test/scala/io/github/setl/config/Properties.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ object Properties {
2525

2626
val jdbcConfig: Config = cl.getConfig("psql.test")
2727

28+
val hudiConfig : Config = cl.getConfig("hudi.test")
29+
2830
val excelConfigConnector: Config = cl.getConfig("connector.excel")
2931
val cassandraConfigConnector: Config = cl.getConfig("connector.cassandra")
3032
val csvConfigConnector: Config = cl.getConfig("connector.csv")
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package io.github.setl.storage.connector
2+
3+
import io.github.setl.config.{Conf, HudiConnectorConf, Properties}
4+
import io.github.setl.{SparkSessionBuilder, SparkTestUtils, TestObject2}
5+
import org.apache.spark.sql.{SaveMode, SparkSession}
6+
import org.scalatest.funsuite.AnyFunSuite
7+
8+
import java.nio.file.Paths
9+
import java.sql.{Date, Timestamp}
10+
11+
class HudiConnectorSuite extends AnyFunSuite {
12+
13+
val path: String = Paths.get("src", "test", "resources", "test_hudi").toFile.getAbsolutePath
14+
val saveMode = SaveMode.Overwrite
15+
16+
val options: Map[String, String] = Map[String, String](
17+
"path" -> path,
18+
"saveMode" -> saveMode.toString,
19+
"hoodie.table.name" -> "test_object",
20+
"hoodie.datasource.write.recordkey.field" -> "col1",
21+
"hoodie.datasource.write.precombine.field" -> "col4",
22+
"hoodie.datasource.write.table.type" -> "MERGE_ON_READ"
23+
)
24+
25+
val testTable: Seq[TestObject2] = Seq(
26+
TestObject2("string", 5, 0.000000001685400132103450D, new Timestamp(1557153268000L), new Date(1557100800000L), 999999999999999999L),
27+
TestObject2("string2", 5, 0.000000001685400132103450D, new Timestamp(1557153268000L), new Date(1557100800000L), 999999999999999999L),
28+
TestObject2("string3", 5, 0.000000001685400132103450D, new Timestamp(1557153268000L), new Date(1557100800000L), 999999999999999999L)
29+
)
30+
31+
test("Instantiation of constructors") {
32+
33+
// New spark session here since Hudi only supports KryoSerializer
34+
val spark: SparkSession = new SparkSessionBuilder().setEnv("local")
35+
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
36+
.build()
37+
.get()
38+
assume(SparkTestUtils.checkSparkVersion("2.4"))
39+
40+
import spark.implicits._
41+
42+
val connector = new HudiConnector(HudiConnectorConf.fromMap(options))
43+
connector.write(testTable.toDF)
44+
assert(connector.read().collect().length == testTable.length)
45+
46+
val path2: String = Paths.get("src", "test", "resources", "test_hudi_2").toFile.getAbsolutePath
47+
val options2 = options + ("path" -> path2)
48+
val connector2 = new HudiConnector(options2)
49+
connector2.write(testTable.toDF)
50+
assert(connector2.read().collect().length == testTable.length)
51+
52+
val path3: String = Paths.get("src", "test", "resources", "test_hudi_3").toFile.getAbsolutePath
53+
val options3 = options + ("path" -> path3)
54+
val connector3 = new HudiConnector(Conf.fromMap(options3))
55+
connector3.write(testTable.toDF, Some("any_"))
56+
assert(connector3.read().collect().length == testTable.length)
57+
58+
val connector7 = new HudiConnector(Properties.hudiConfig)
59+
connector7.write(testTable.toDF)
60+
assert(connector7.read().collect().length == testTable.length)
61+
}
62+
}

0 commit comments

Comments
 (0)