@@ -131,7 +131,8 @@ object NewDataProcess {
131131 }
132132
133133 def  getColumnAgg (transData: Dataset [Row ], keyColumn: String , aggColumn: String ):  Dataset [Row ] =  {
134-  transData.groupBy(keyColumn)
134+  transData.select(col(keyColumn), col(aggColumn).cast(" double" 
135+  .groupBy(keyColumn)
135136 .agg(sum(aggColumn).as(s " sum_ $aggColumn" ),
136137 avg(aggColumn).as(s " avg_ $aggColumn" )
137138 )
@@ -140,32 +141,76 @@ object NewDataProcess {
140141 def  transProcess ():  Unit  =  {
141142 val  keyColumn  =  " card_id" 
142143 val  newTrans  =  spark.table(" merchant.new_merchant_transactions" 
143-  var  ids  =  newTrans.select(keyColumn).distinct()
144+  val  ids  =  newTrans.select(keyColumn).distinct()
145+  ids.cache()
146+  val  dropCols  =  Array (" merchant_category_id" " subsector_id" " category_1" " city_id" " state_id" " category_2" 
147+  val  merchants  =  spark.table(" merchant.merchants" Array (" merchant_id" 
148+  .drop(dropCols : _* )
149+  val  tmpTrans  =  newTrans.join(merchants, Seq (" merchant_id" " left" 
150+  tmpTrans.cache()
151+ 
152+  var  tmpModeRes  =  ids
144153 val  processMode  =  Seq (" city_id" " category_1" " installments" " category_3" 
145-  " merchant_category_id" " category_2" " state_id" " subsector_id" 
154+  " merchant_category_id" " category_2" " state_id" " subsector_id" 
155+  " merchant_group_id" " category_4" 
146156 for (colName <-  processMode){
147-  val  modeTmpTable  =  getColumnMode(newTrans , keyColumn, colName)
148-  ids  =  ids .join(modeTmpTable, Seq (keyColumn), " left" 
157+  val  modeTmpTable  =  getColumnMode(tmpTrans , keyColumn, colName)
158+  tmpModeRes  =  tmpModeRes .join(modeTmpTable, Seq (keyColumn), " left" 
149159 }
150-  val  aggTable  =  getColumnAgg(newTrans, keyColumn, " purchase_amount" 
151-  ids =  ids.join(aggTable, Seq (keyColumn), " left" 
152-  saveTable(ids, " new_transactions_processed" " merchant" 
160+  saveTable(tmpModeRes, " tmp_mode_res" " merchant" 
161+ 
162+  var  tmpAggRes  =  ids
163+  val  aggCols  =  Array (" purchase_amount" " numerical_1" " numerical_2" " avg_sales_lag3" " avg_purchases_lag3" 
164+  " avg_sales_lag6" " avg_purchases_lag6" " avg_sales_lag12" " avg_purchases_lag12" 
165+  for (aggCol <-  aggCols){
166+  val  aggTable  =  getColumnAgg(tmpTrans, keyColumn, aggCol)
167+  tmpAggRes =  tmpAggRes.join(aggTable, Seq (keyColumn), " left" 
168+  }
169+  saveTable(tmpAggRes, " tmp_agg_res" " merchant" 
170+ 
171+  val  modeRes  =  spark.table(" merchant.tmp_mode_res" 
172+  val  aggRes  =  spark.table(" merchant.tmp_agg_res" 
173+  val  tmpRes  =  modeRes.join(aggRes, Seq (keyColumn), " left" 
174+ 
175+  saveTable(tmpRes, " new_transactions_processed" " merchant" 
153176 }
154177
155178 def  hisProcess ():  Unit  =  {
156179 val  keyColumn  =  " card_id" 
157-  val  trans  =  spark.table(" merchant.historical_transactions" 
158-  var  ids  =  trans.select(keyColumn).distinct()
180+  val  hisTrans  =  spark.table(" merchant.historical_transactions" 
181+  val  ids  =  hisTrans.select(keyColumn).distinct()
182+  ids.cache()
183+  val  dropCols  =  Array (" merchant_category_id" " subsector_id" " category_1" " city_id" " state_id" " category_2" 
184+  val  merchants  =  spark.table(" merchant.merchants" Array (" merchant_id" 
185+  .drop(dropCols : _* )
186+  val  tmpTrans  =  hisTrans.join(merchants, Seq (" merchant_id" " left" 
187+  // tmpTrans.cache()
188+ 
189+  var  tmpModeRes  =  ids
159190 val  processMode  =  Seq (" city_id" " category_1" " installments" " category_3" 
160-  " merchant_category_id" " category_2" " state_id" " subsector_id" 
191+  " merchant_category_id" " category_2" " state_id" " subsector_id" 
192+  " merchant_group_id" " category_4" 
161193 for (colName <-  processMode){
162-  val  modeTmpTable  =  getColumnMode(trans , keyColumn, colName)
163-  ids  =  ids .join(modeTmpTable, Seq (keyColumn), " left" 
194+  val  modeTmpTable  =  getColumnMode(tmpTrans , keyColumn, colName)
195+  tmpModeRes  =  tmpModeRes .join(modeTmpTable, Seq (keyColumn), " left" 
164196 }
165-  val  aggTable  =  getColumnAgg(trans, keyColumn, " purchase_amount" 
166-  ids =  ids.join(aggTable, Seq (keyColumn), " left" 
167-  ids =  addColumnsPrefix(ids, " historical" Array (keyColumn))
168-  saveTable(ids, " historical_transactions_processed" " merchant" 
197+  saveTable(tmpModeRes, " tmp_mode_res" " merchant" 
198+ 
199+  var  tmpAggRes  =  ids
200+  val  aggCols  =  Array (" purchase_amount" " numerical_1" " numerical_2" 
201+  // "avg_sales_lag3", "avg_purchases_lag3", "avg_sales_lag6", "avg_purchases_lag6", "avg_sales_lag12", "avg_purchases_lag12")
202+  for (aggCol <-  aggCols){
203+  val  aggTable  =  getColumnAgg(tmpTrans, keyColumn, aggCol)
204+  tmpAggRes =  tmpAggRes.join(aggTable, Seq (keyColumn), " left" 
205+  }
206+  saveTable(tmpAggRes, " tmp_agg_res" " merchant" 
207+ 
208+  val  modeRes  =  spark.table(" merchant.tmp_mode_res" 
209+  val  aggRes  =  spark.table(" merchant.tmp_agg_res" 
210+  val  tmpRes  =  addColumnsPrefix(modeRes.join(aggRes, Seq (keyColumn), " left" 
211+  " historical" Array (keyColumn))
212+ 
213+  saveTable(tmpRes, " historical_transactions_processed" " merchant" 
169214 }
170215
171216 def  merchantProcess ():  Unit  =  {
@@ -186,8 +231,8 @@ object NewDataProcess {
186231 }
187232
188233 def  main (args : Array [String ]):  Unit  =  {
189-  transProcess()
190-  hisProcess()
234+  // transProcess()
235+  // hisProcess()
191236 merchantProcess()
192237 }
193238
0 commit comments