Skip to content

Commit fe1d96e

Browse files
committed
code refactor
1 parent 4207f89 commit fe1d96e

File tree

3 files changed

+40
-142
lines changed

3 files changed

+40
-142
lines changed

emr-maxcompute/src/main/scala/org/apache/spark/aliyun/odps/OdpsOps.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -713,7 +713,7 @@ class OdpsOps(@transient sc: SparkContext, accessKeyId: String,
713713
cols.sorted.map { idx =>
714714
val col = schema.getColumn(idx)
715715
try {
716-
OdpsUtils.odpsData2SparkData(col.getTypeInfo, true)(record.get(idx))
716+
OdpsUtils.odpsData2SparkData(col.getTypeInfo, false)(record.get(idx))
717717
} catch {
718718
case e: Exception =>
719719
log.error(s"Can not transfer record column value, idx: $idx, " +

emr-maxcompute/src/main/scala/org/apache/spark/aliyun/odps/datasource/ODPSRDD.scala

Lines changed: 3 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,18 @@
1717
package org.apache.spark.aliyun.odps.datasource
1818

1919
import java.io.EOFException
20-
import java.sql.{Date, SQLException}
2120
import scala.collection.JavaConverters._
2221
import scala.collection.mutable.ArrayBuffer
2322
import com.aliyun.odps.{Odps, PartitionSpec}
2423
import com.aliyun.odps.account.AliyunAccount
2524
import com.aliyun.odps.tunnel.TableTunnel
26-
import com.aliyun.odps.tunnel.io.TunnelRecordReader
2725
import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext}
2826
import org.apache.spark.aliyun.odps.OdpsPartition
2927
import org.apache.spark.aliyun.utils.OdpsUtils
3028
import org.apache.spark.rdd.RDD
3129
import org.apache.spark.sql.catalyst.InternalRow
3230
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
33-
import org.apache.spark.sql.catalyst.util.DateTimeUtils
3431
import org.apache.spark.sql.types._
35-
import org.apache.spark.unsafe.types.UTF8String
3632
import org.apache.spark.util.NextIterator
3733

3834
class ODPSRDD(
@@ -84,124 +80,8 @@ class ODPSRDD(
8480
schema.zipWithIndex.foreach {
8581
case (s: StructField, idx: Int) =>
8682
try {
87-
s.dataType match {
88-
case LongType =>
89-
val value = r.getBigint(s.name)
90-
if (value != null) {
91-
mutableRow.setLong(idx, value)
92-
} else {
93-
mutableRow.update(idx, null)
94-
}
95-
case BooleanType =>
96-
val value = r.getBoolean(s.name)
97-
if (value != null) {
98-
mutableRow.setBoolean(idx, value)
99-
} else {
100-
mutableRow.update(idx, null)
101-
}
102-
case DoubleType =>
103-
val value = r.getDouble(s.name)
104-
if (value != null) {
105-
mutableRow.setDouble(idx, value)
106-
} else {
107-
mutableRow.update(idx, null)
108-
}
109-
case ShortType =>
110-
val value = r.get(s.name)
111-
if (value != null) {
112-
mutableRow.setShort(idx, value.asInstanceOf[Short])
113-
} else {
114-
mutableRow.update(idx, null)
115-
}
116-
case ByteType =>
117-
val value = r.get(s.name)
118-
if (value != null) {
119-
mutableRow.setByte(idx, value.asInstanceOf[Byte])
120-
} else {
121-
mutableRow.update(idx, null)
122-
}
123-
case DateType =>
124-
val value = r.get(s.name)
125-
value match {
126-
case date1: java.sql.Date =>
127-
mutableRow.update(idx, DateTimeUtils.fromJavaDate(date1))
128-
case date2: java.util.Date =>
129-
mutableRow.setInt(idx,
130-
DateTimeUtils.fromJavaDate(new Date(date2.getTime)))
131-
case null => mutableRow.update(idx, null)
132-
case _ => throw new SQLException(s"Unknown type" +
133-
s" ${value.getClass.getCanonicalName}")
134-
}
135-
case TimestampType =>
136-
val value = r.get(s.name)
137-
value match {
138-
case timestamp: java.sql.Timestamp =>
139-
mutableRow.setLong(idx, DateTimeUtils.fromJavaTimestamp(timestamp))
140-
case null => mutableRow.update(idx, null)
141-
case _ => throw new SQLException(s"Unknown type" +
142-
s" ${value.getClass.getCanonicalName}")
143-
}
144-
case DecimalType.SYSTEM_DEFAULT =>
145-
val value = r.get(s.name)
146-
if (value != null) {
147-
mutableRow.update(idx,
148-
new Decimal().set(value.asInstanceOf[java.math.BigDecimal]))
149-
} else {
150-
mutableRow.update(idx, null)
151-
}
152-
case FloatType =>
153-
val value = r.get(s.name)
154-
if (value != null) {
155-
mutableRow.update(idx, value.asInstanceOf[Float])
156-
} else {
157-
mutableRow.update(idx, null)
158-
}
159-
case IntegerType =>
160-
val value = r.get(s.name)
161-
value match {
162-
case e: java.lang.Integer =>
163-
mutableRow.update(idx, e.toInt)
164-
case null => mutableRow.update(idx, null)
165-
case _ => throw new SQLException(s"Unknown type" +
166-
s" ${value.getClass.getCanonicalName}")
167-
}
168-
case StringType =>
169-
val value = r.get(s.name)
170-
value match {
171-
case e: com.aliyun.odps.data.Char =>
172-
mutableRow.update(idx, UTF8String.fromString(e.toString))
173-
case e: com.aliyun.odps.data.Varchar =>
174-
mutableRow.update(idx, UTF8String.fromString(e.toString))
175-
case e: String =>
176-
mutableRow.update(idx, UTF8String.fromString(e))
177-
case e: Array[Byte] =>
178-
mutableRow.update(idx, UTF8String.fromBytes(e))
179-
case null => mutableRow.update(idx, null)
180-
case _ => throw new SQLException(s"Unknown type" +
181-
s" ${value.getClass.getCanonicalName}")
182-
}
183-
case BinaryType =>
184-
val value = r.get(s.name)
185-
value match {
186-
case e: com.aliyun.odps.data.Binary =>
187-
mutableRow.update(idx, e.data())
188-
case null => mutableRow.update(idx, null)
189-
case _ => throw new SQLException(s"Unknown type" +
190-
s" ${value.getClass.getCanonicalName}")
191-
}
192-
case ArrayType(_, _) =>
193-
val value = r.get(s.name)
194-
mutableRow.update(idx, OdpsUtils.odpsData2SparkData(typeInfos(idx))(value))
195-
case MapType(_, _, _) =>
196-
val value = r.get(s.name)
197-
mutableRow.update(idx, OdpsUtils.odpsData2SparkData(typeInfos(idx))(value))
198-
case StructType(_) =>
199-
val value = r.get(s.name)
200-
mutableRow.update(idx, OdpsUtils.odpsData2SparkData(typeInfos(idx))(value))
201-
case NullType =>
202-
mutableRow.setNullAt(idx)
203-
case _ => throw new SQLException(s"Unknown type")
204-
}
83+
val value = r.get(s.name)
84+
mutableRow.update(idx, OdpsUtils.odpsData2SparkData(typeInfos(idx))(value))
20585
} catch {
20686
case e: Exception =>
20787
log.error(s"Can not transfer record column value, idx: $idx, " +
@@ -224,7 +104,7 @@ class ODPSRDD(
224104

225105
override def close() {
226106
try {
227-
val totalBytes = reader.asInstanceOf[TunnelRecordReader].getTotalBytes
107+
val totalBytes = reader.getTotalBytes
228108
inputMetrics.incBytesRead(totalBytes)
229109
reader.close()
230110
} catch {

emr-maxcompute/src/main/scala/org/apache/spark/aliyun/utils/OdpsUtils.scala

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.aliyun.utils
1919
import java.math.BigDecimal
2020
import scala.collection.JavaConverters._
2121
import com.aliyun.odps.{Partition, _}
22-
import com.aliyun.odps.`type`.{ArrayTypeInfo, CharTypeInfo, DecimalTypeInfo, MapTypeInfo, StructTypeInfo, TypeInfo, VarcharTypeInfo}
22+
import com.aliyun.odps.`type`._
2323
import com.aliyun.odps.account.AliyunAccount
2424
import com.aliyun.odps.data.{Binary, Char, SimpleStruct, Varchar}
2525
import com.aliyun.odps.task.SQLTask
@@ -456,16 +456,30 @@ object OdpsUtils {
456456
}
457457
}
458458

459-
def odpsData2SparkData(t: TypeInfo, load: Boolean = false): Object => Any = {
459+
def odpsData2SparkData(t: TypeInfo, isDatasource: Boolean = true): Object => Any = {
460460
val func = t.getOdpsType match {
461461
case OdpsType.BOOLEAN => (v: Object) => v.asInstanceOf[java.lang.Boolean]
462462
case OdpsType.DOUBLE => (v: Object) => v.asInstanceOf[java.lang.Double]
463463
case OdpsType.BIGINT => (v: Object) => v.asInstanceOf[java.lang.Long]
464464
case OdpsType.DATETIME => (v: Object) =>
465-
new java.sql.Date(v.asInstanceOf[java.util.Date].getTime)
465+
if (!isDatasource) {
466+
new java.sql.Date(v.asInstanceOf[java.util.Date].getTime)
467+
} else {
468+
v.asInstanceOf[java.util.Date].getTime.toInt
469+
}
466470
case OdpsType.STRING => (v: Object) => v match {
467-
case str: String => str
468-
case array: Array[Byte] => new String(array)
471+
case str: String =>
472+
if(!isDatasource) {
473+
str
474+
} else {
475+
UTF8String.fromString(str)
476+
}
477+
case bytes: Array[Byte] =>
478+
if (!isDatasource) {
479+
new String(bytes)
480+
} else {
481+
UTF8String.fromBytes(bytes)
482+
}
469483
}
470484
case OdpsType.DECIMAL => (v: Object) => {
471485
val ti = t.asInstanceOf[DecimalTypeInfo]
@@ -484,9 +498,13 @@ object OdpsUtils {
484498
UTF8String.fromString(char.getValue.substring(0, char.length()))
485499
}
486500
case OdpsType.DATE => (v: Object) =>
487-
v.asInstanceOf[java.sql.Date].getTime
501+
if (!isDatasource) {
502+
v.asInstanceOf[java.sql.Date]
503+
} else {
504+
v.asInstanceOf[java.sql.Date].getTime
505+
}
488506
case OdpsType.TIMESTAMP => (v: Object) => {
489-
if (load) {
507+
if (!isDatasource) {
490508
v.asInstanceOf[java.sql.Timestamp]
491509
} else {
492510
v.asInstanceOf[java.sql.Timestamp].getTime * 1000
@@ -498,16 +516,18 @@ object OdpsUtils {
498516
case OdpsType.TINYINT => (v: Object) => v.asInstanceOf[java.lang.Byte]
499517
case OdpsType.ARRAY => (v: Object) => {
500518
val array = v.asInstanceOf[java.util.ArrayList[Object]]
501-
if (!load) {
519+
if (!isDatasource) {
520+
array.asScala
521+
} else {
502522
new GenericArrayData(array.toArray().
503523
map(odpsData2SparkData(t.asInstanceOf[ArrayTypeInfo].getElementTypeInfo)(_)))
504-
} else {
505-
array.asScala
506524
}
507525
}
508526
case OdpsType.BINARY => (v: Object) => v.asInstanceOf[Binary].data()
509527
case OdpsType.MAP => (v: Object) => {
510-
if (!load) {
528+
if (!isDatasource) {
529+
v.asInstanceOf[java.util.HashMap[Object, Object]].asScala
530+
} else {
511531
val m = v.asInstanceOf[java.util.HashMap[Object, Object]]
512532
val keyArray = m.keySet().toArray()
513533
new ArrayBasedMapData(
@@ -516,19 +536,17 @@ object OdpsUtils {
516536
new GenericArrayData(keyArray.map(m.get(_)).
517537
map(odpsData2SparkData(t.asInstanceOf[MapTypeInfo].getValueTypeInfo)(_)))
518538
)
519-
} else {
520-
v.asInstanceOf[java.util.HashMap[Object, Object]].asScala
521539
}
522540
}
523541
case OdpsType.STRUCT => (v: Object) => {
524542
val struct = v.asInstanceOf[com.aliyun.odps.data.Struct]
525-
if (!load) {
543+
if (!isDatasource) {
544+
Row.fromSeq(struct.getFieldValues.asScala.zipWithIndex
545+
.map(x => odpsData2SparkData(struct.getFieldTypeInfo(x._2), isDatasource)(x._1)))
546+
} else {
526547
org.apache.spark.sql.catalyst.InternalRow
527548
.fromSeq(struct.getFieldValues.asScala.zipWithIndex
528-
.map(x => odpsData2SparkData(struct.getFieldTypeInfo(x._2), load)(x._1)))
529-
} else {
530-
Row.fromSeq(struct.getFieldValues.asScala.zipWithIndex
531-
.map(x => odpsData2SparkData(struct.getFieldTypeInfo(x._2), load)(x._1)))
549+
.map(x => odpsData2SparkData(struct.getFieldTypeInfo(x._2))(x._1)))
532550
}
533551
}
534552
}

0 commit comments

Comments
 (0)