Skip to content

Commit 97c52fe

Browse files
maroilmarouanefeljaidemiaqxzzxq
authored
feat: add ability to use spark reader native implemention (#193)
* feat: add ability to use spark reader native implemention * fix: typo in FileConnector * feat: use file connector config for native option * feat: add unit tests for native * fix: deprecated method in FileConnector unit tests * feat: replace native option by pathFormat enum option * fix: typo in exception message for pathFormat * feat: add unit tests for unsupported path format option * Update configuration_example.md Co-authored-by: Marouane Felja <marouane.felja@idemia.com> Co-authored-by: Qinx <17144939+qxzzxq@users.noreply.github.com>
1 parent 3158cf4 commit 97c52fe

File tree

7 files changed

+95
-15
lines changed

7 files changed

+95
-15
lines changed

docs/data_access_layer/configuration_example.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
| emptyValue | sets the string representation of an empty value | true, default empty string| |
2020
| dateFormat | sets the string that indicates a date format | true, default `yyyy-MM-dd`| |
2121
| timestampFormat | sets the string that indicates a timestamp format | true, default `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`| |
22+
| pathFormat | choose between REGEX path format (ex: `^s3:\/\/bucket\/\/col1=B\/col2=([A-C])$`) or WILDCARD path format (ex: `s3://bucket/*/*A*/internal-*.csv`) | true, default `REGEX`| `WILDCARD` |
2223

2324
For other options, please refer to [this doc](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/DataFrameReader.html).
2425

@@ -69,6 +70,7 @@ csvWithSchema {
6970
| dateFormat | sets the string that indicates a date format | true, default `yyyy-MM-dd`| |
7071
| timestampFormat | sets the string that indicates a timestamp format | true, default `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`| |
7172
| dropFieldIfAllNull | whether to ignore column of all null values or empty array/struct during schema inference | true, default `false` | |
73+
| pathFormat | choose between REGEX path format (ex: `^s3:\/\/bucket\/\/col1=B\/col2=([A-C])$`) or WILDCARD path format (ex: `s3://bucket/*/*A*/internal-*.csv`) | true, default `REGEX`| `WILDCARD` |
7274

7375
For other options, please refer to [this doc](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/DataFrameReader.html).
7476

@@ -91,6 +93,7 @@ json {
9193
| path | directory of parquet files | | |
9294
| saveMode | | true, default `ErrorIfExists` | `Append` |
9395
| filenamePattern | Regex of the names of file to be read. When `filenamePattern` is set, then this connector can only be used for reading data | true, default empty | `(file)(.*)(\\.csv)` |
96+
| pathFormat | choose between REGEX path format (ex: `^s3:\/\/bucket\/\/col1=B\/col2=([A-C])$`) or WILDCARD path format (ex: `s3://bucket/*/*A*/internal-*.csv`) | true, default `REGEX`| `WILDCARD` |
9497

9598
### Example
9699

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package io.github.setl.enums;
2+
3+
public enum PathFormat {
4+
WILDCARD,
5+
REGEX;
6+
}

src/main/scala/io/github/setl/config/FileConnectorConf.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package io.github.setl.config
22

33
import io.github.setl.annotation.InterfaceStability
4-
import io.github.setl.enums.Storage
4+
import io.github.setl.enums.{PathFormat, Storage}
55
import io.github.setl.exception.ConfException
66
import io.github.setl.annotation.InterfaceStability.Evolving
77
import org.apache.spark.sql.SaveMode
@@ -11,6 +11,7 @@ class FileConnectorConf extends ConnectorConf {
1111

1212
private[this] val defaultEncoding: String = "UTF-8"
1313
private[this] val defaultSaveMode: String = SaveMode.ErrorIfExists.toString
14+
private[this] val defaultPathFormat: String = PathFormat.REGEX.toString
1415

1516
def setStorage(storage: String): this.type = set("storage", storage.toUpperCase)
1617

@@ -24,6 +25,8 @@ class FileConnectorConf extends ConnectorConf {
2425

2526
def setPath(path: String): this.type = set("path", path)
2627

28+
def setPathFormat(pathFormat: PathFormat): this.type = set("pathFormat", pathFormat.toString)
29+
2730
def setS3CredentialsProvider(value: String): this.type = set("fs.s3a.aws.credentials.provider", value)
2831

2932
def setS3AccessKey(value: String): this.type = set("fs.s3a.access.key", value)
@@ -36,6 +39,8 @@ class FileConnectorConf extends ConnectorConf {
3639

3740
def getSaveMode: SaveMode = SaveMode.valueOf(get("saveMode", defaultSaveMode))
3841

42+
def getPathFormat: String = get("pathFormat", defaultPathFormat)
43+
3944
def getStorage: Storage = get("storage") match {
4045
case Some(storage) => Storage.valueOf(storage)
4146
case _ => throw new ConfException("The value of storage is not set")

src/main/scala/io/github/setl/storage/connector/CSVConnector.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package io.github.setl.storage.connector
22

33
import io.github.setl.annotation.InterfaceStability
44
import io.github.setl.config.{Conf, FileConnectorConf}
5-
import io.github.setl.enums.Storage
5+
import io.github.setl.enums.{PathFormat, Storage}
66
import io.github.setl.util.TypesafeConfigUtils
77
import com.typesafe.config.Config
88
import org.apache.spark.sql._
@@ -91,12 +91,13 @@ class CSVConnector(override val options: FileConnectorConf) extends FileConnecto
9191

9292
def this(config: Conf) = this(config.toMap)
9393

94-
def this(path: String, inferSchema: String, delimiter: String, header: String, saveMode: SaveMode) =
94+
def this(path: String, inferSchema: String, delimiter: String, header: String, saveMode: SaveMode, pathFormat: PathFormat = PathFormat.REGEX) =
9595
this(Map[String, String](
9696
"path" -> path,
9797
"inferSchema" -> inferSchema,
9898
"header" -> header,
99-
"saveMode" -> saveMode.toString
99+
"saveMode" -> saveMode.toString,
100+
"pathFormat" -> pathFormat.toString
100101
))
101102

102103
override val storage: Storage = Storage.CSV

src/main/scala/io/github/setl/storage/connector/FileConnector.scala

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ import java.net.{URI, URLDecoder, URLEncoder}
44
import java.util.concurrent.TimeUnit
55
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
66
import java.util.concurrent.locks.ReentrantLock
7-
87
import io.github.setl.annotation.InterfaceStability
98
import io.github.setl.config.FileConnectorConf
9+
import io.github.setl.enums.PathFormat
1010
import io.github.setl.internal.{CanDrop, CanPartition, HasReaderWriter}
1111
import org.apache.hadoop.conf.Configuration
1212
import org.apache.hadoop.fs.{FileSystem, LocalFileSystem, Path}
@@ -483,10 +483,19 @@ abstract class FileConnector(val options: FileConnectorConf) extends Connector w
483483
logDebug(s"Reading ${options.getStorage.toString} file in: '${absolutePath.toString}'")
484484
this.setJobDescription(s"Read file(s) from '${absolutePath.toString}'")
485485

486-
val df = reader
487-
.option("basePath", basePath.toString)
488-
.format(options.getStorage.toString.toLowerCase())
489-
.load(listFilesToLoad(false): _*)
486+
val df = options.getPathFormat match {
487+
case value if value == PathFormat.WILDCARD.toString =>
488+
reader
489+
.format(options.getStorage.toString.toLowerCase())
490+
.load(options.getPath)
491+
case value if value == PathFormat.REGEX.toString =>
492+
reader
493+
.option("basePath", basePath.toString)
494+
.format(options.getStorage.toString.toLowerCase())
495+
.load(listFilesToLoad(false): _*)
496+
case _ =>
497+
throw new UnsupportedOperationException(s"Unsupported path format ${options.getPathFormat}.")
498+
}
490499

491500
if (dropUserDefinedSuffix & df.columns.contains(UDSKey)) {
492501
df.drop(UDSKey)

src/test/scala/io/github/setl/config/FileConnectorConfSuite.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package io.github.setl.config
22

3-
import io.github.setl.enums.Storage
3+
import io.github.setl.enums.{PathFormat, Storage}
44
import io.github.setl.exception.ConfException
55
import org.apache.spark.sql.SaveMode
66
import org.scalatest.funsuite.AnyFunSuite
@@ -31,6 +31,10 @@ class FileConnectorConfSuite extends AnyFunSuite {
3131
conf.setPath("path")
3232
assert(conf.get("path").get === "path")
3333

34+
assert(conf.get("pathFormat") === None)
35+
conf.setPathFormat(PathFormat.WILDCARD)
36+
assert(conf.get("pathFormat").get === "WILDCARD")
37+
3438
assert(conf.get("credentialsProvider") === None)
3539
conf.setS3CredentialsProvider("credentialsProvider")
3640
assert(conf.get("fs.s3a.aws.credentials.provider").get === "credentialsProvider")
@@ -53,6 +57,7 @@ class FileConnectorConfSuite extends AnyFunSuite {
5357
assert(conf.getSaveMode === SaveMode.Overwrite)
5458
assert(conf.getStorage === Storage.EXCEL)
5559
assert(conf.getPath === "path")
60+
assert(conf.getPathFormat === "WILDCARD")
5661
assert(conf.getSchema === None)
5762
assert(conf.getS3CredentialsProvider === Some("credentialsProvider"))
5863
assert(conf.getS3AccessKey === Some("accessKey"))

src/test/scala/io/github/setl/storage/connector/FileConnectorSuite.scala

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package io.github.setl.storage.connector
22

33
import io.github.setl.config.FileConnectorConf
4-
import io.github.setl.enums.Storage
4+
import io.github.setl.enums.{PathFormat, Storage}
55
import io.github.setl.{SparkSessionBuilder, TestObject}
66
import org.apache.hadoop.fs.{LocalFileSystem, Path}
77
import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}
@@ -129,7 +129,7 @@ class FileConnectorSuite extends AnyFunSuite with Matchers {
129129
connectorRead.collect() should contain theSameElementsAs sparkRead.collect()
130130

131131
// remove test files
132-
new ParquetConnector(path, SaveMode.Overwrite).delete()
132+
new ParquetConnector(path, SaveMode.Overwrite).drop()
133133
}
134134

135135
test("File connector should handle wildcard file path (csv)") {
@@ -163,7 +163,58 @@ class FileConnectorSuite extends AnyFunSuite with Matchers {
163163
connectorRead.collect() should contain theSameElementsAs sparkRead.collect()
164164

165165
// remove test files
166-
new CSVConnector(path, "true", ",", "true", SaveMode.Overwrite).delete()
166+
new CSVConnector(path, "true", ",", "true", SaveMode.Overwrite).drop()
167+
}
168+
169+
170+
test("File connector should handle spark native path format") {
171+
val spark: SparkSession = new SparkSessionBuilder().setEnv("local").build().get()
172+
val path = "src/test/resources/fileconnector_test_dir_csv"
173+
import spark.implicits._
174+
val df = Seq(
175+
("a", "A", "aa", "AA"),
176+
("a", "A", "bb", "BB"),
177+
("a", "B", "aa", "AA"),
178+
("a", "B", "bb", "BB"),
179+
("b", "A", "aa", "AA"),
180+
("b", "A", "bb", "BB"),
181+
("b", "B", "aa", "AA"),
182+
("b", "B", "bb", "BB")
183+
).toDF("col1", "col2", "col3", "col4")
184+
185+
df.write.mode(SaveMode.Overwrite).partitionBy("col1", "col2").option("header", "true").csv(path)
186+
187+
val connector = new CSVConnector(s"$path/col1=*/col2=A/*.csv", "true", ",", "true", SaveMode.Overwrite, PathFormat.WILDCARD)
188+
val connectorRead = connector.read()
189+
connectorRead.show()
190+
191+
val sparkRead = spark.read
192+
.option("header", "true")
193+
.csv(s"$path/col1=*/col2=A/*.csv")
194+
sparkRead.show()
195+
196+
connectorRead.collect() should contain theSameElementsAs sparkRead.collect()
197+
198+
// remove test files
199+
new CSVConnector(path, "true", ",", "true", SaveMode.Overwrite).drop()
200+
}
201+
202+
test("FileConnector should throw exception with unsupported pathFormat option") {
203+
val spark: SparkSession = new SparkSessionBuilder().setEnv("local").build().get()
204+
205+
import spark.implicits._
206+
val options = Map[String, String](
207+
"path" -> "src/test/resources",
208+
"pathFormat" -> "TEST"
209+
)
210+
val fileConnectorConf = new FileConnectorConf()
211+
fileConnectorConf.set(options)
212+
213+
val fileConnector = new FileConnector(fileConnectorConf) {
214+
override val storage: Storage = Storage.CSV
215+
}
216+
217+
assertThrows[UnsupportedOperationException](fileConnector.read())
167218
}
168219

169220
test("File connector functionality") {
@@ -202,7 +253,7 @@ class FileConnectorSuite extends AnyFunSuite with Matchers {
202253
connector.write(dff.toDF, None)
203254
assertThrows[IllegalArgumentException](connector.write(dff.toDF, Some("test")))
204255
assertThrows[IllegalArgumentException](connector.setSuffix(Some("test")))
205-
connector.delete()
256+
connector.drop()
206257
}
207258

208259
test("FileConnector should handle parallel write") {
@@ -256,7 +307,7 @@ class FileConnectorSuite extends AnyFunSuite with Matchers {
256307
case other: Exception => throw other
257308
}
258309

259-
connector.delete()
310+
connector.drop()
260311
}
261312

262313
test("FileConnector should handle base path correctly") {

0 commit comments

Comments
 (0)