11package org.jetbrains.kotlinx.dataframe.impl.api
22
3+ import kotlinx.coroutines.async
4+ import kotlinx.coroutines.awaitAll
5+ import kotlinx.coroutines.coroutineScope
6+ import kotlinx.coroutines.runBlocking
37import kotlinx.datetime.Instant
48import kotlinx.datetime.LocalDate
59import kotlinx.datetime.LocalDateTime
@@ -18,13 +22,16 @@ import org.jetbrains.kotlinx.dataframe.DataRow
1822import org.jetbrains.kotlinx.dataframe.api.GlobalParserOptions
1923import org.jetbrains.kotlinx.dataframe.api.ParserOptions
2024import org.jetbrains.kotlinx.dataframe.api.asColumnGroup
25+ import org.jetbrains.kotlinx.dataframe.api.asComparable
2126import org.jetbrains.kotlinx.dataframe.api.asDataColumn
27+ import org.jetbrains.kotlinx.dataframe.api.asFrameColumn
2228import org.jetbrains.kotlinx.dataframe.api.cast
23- import org.jetbrains.kotlinx.dataframe.api.convert
29+ import org.jetbrains.kotlinx.dataframe.api.getColumns
2430import org.jetbrains.kotlinx.dataframe.api.isColumnGroup
2531import org.jetbrains.kotlinx.dataframe.api.isFrameColumn
26- import org.jetbrains.kotlinx.dataframe.api.parse
27- import org.jetbrains.kotlinx.dataframe.api.to
32+ import org.jetbrains.kotlinx.dataframe.api.name
33+ import org.jetbrains.kotlinx.dataframe.api.toColumn
34+ import org.jetbrains.kotlinx.dataframe.api.toDataFrame
2835import org.jetbrains.kotlinx.dataframe.api.tryParse
2936import org.jetbrains.kotlinx.dataframe.columns.size
3037import org.jetbrains.kotlinx.dataframe.columns.values
@@ -37,6 +44,7 @@ import org.jetbrains.kotlinx.dataframe.impl.javaDurationCanParse
3744import org.jetbrains.kotlinx.dataframe.io.isURL
3845import org.jetbrains.kotlinx.dataframe.io.readJsonStr
3946import org.jetbrains.kotlinx.dataframe.typeClass
47+ import org.jetbrains.kotlinx.dataframe.values
4048import java.math.BigDecimal
4149import java.net.URL
4250import java.text.NumberFormat
@@ -541,19 +549,39 @@ internal fun <T> DataColumn<String?>.parse(parser: StringParser<T>, options: Par
541549 return DataColumn .createValueColumn(name(), parsedValues, parser.type.withNullability(hasNulls)) as DataColumn <T ?>
542550}
543551
544- internal fun <T > DataFrame<T>.parseImpl (options : ParserOptions ? , columns : ColumnsSelector <T , Any ?>) =
545- convert(columns).to {
546- when {
547- it.isFrameColumn() -> it.cast<AnyFrame ?>().parse(options)
548-
549- it.isColumnGroup() ->
550- it.asColumnGroup()
551- .parse(options) { all() }
552- .asColumnGroup(it.name())
553- .asDataColumn()
554-
555- it.typeClass == String ::class -> it.cast<String ?>().tryParse(options)
556-
557- else -> it
558- }
552+ internal fun <T > DataFrame<T>.parseImpl (options : ParserOptions ? , columns : ColumnsSelector <T , Any ?>): DataFrame <T > =
553+ runBlocking { parseParallel(options, columns) }
554+
555+ private suspend fun <T > DataFrame<T>.parseParallel (
556+ options : ParserOptions ? ,
557+ columns : ColumnsSelector <T , Any ?>,
558+ ): DataFrame <T > =
559+ coroutineScope {
560+ getColumns(columns).map {
561+ async {
562+ when {
563+ it.isFrameColumn() ->
564+ it.asFrameColumn().values.map {
565+ async {
566+ it.parseParallel(options) {
567+ colsAtAnyDepth { ! it.isColumnGroup() }
568+ }
569+ }
570+ }.awaitAll()
571+ .toColumn(it.name)
572+
573+
574+ it.isColumnGroup() ->
575+ it.asColumnGroup().parseParallel(options) { all() }
576+ .asColumnGroup(it.name())
577+ .asDataColumn()
578+
579+ it.typeClass == String ::class -> it.cast<String ?>().tryParse(options)
580+
581+ else -> it
582+ }
583+ }
584+ }.awaitAll()
585+ .toDataFrame()
586+ .cast()
559587 }
0 commit comments