1+ import  com .holdenkarau .spark .testing .{SharedSparkContext , StreamingSuiteBase }
2+ import  org .apache .spark .rdd .RDD 
3+ import  org .apache .spark .streaming .Seconds 
4+ import  org .apache .spark .streaming .dstream .DStream 
5+ import  org .scalactic .Equality 
6+ import  org .scalatest .FunSuite 
7+ 
8+ /** 
9+  * Created by xk on 2018/9/10. 
10+  */  
11+ class  SparkTest  extends  FunSuite  with  SharedSparkContext {
12+ 
13+  test(" test initializing spark context" 
14+  val  list  =  List (1 , 2 , 3 , 4 )
15+  val  rdd  =  sc.parallelize(list)
16+ 
17+  assert(rdd.count ===  list.length)
18+  }
19+ 
20+ }
21+ 
22+ class  SampleStreamingTest  extends  FunSuite  with  StreamingSuiteBase  {
23+ 
24+  test(" simple two stream streaming test" 
25+  val  input  =  List (List (" hi" " pandas" List (" hi holden" List (" bye" 
26+  val  input2  =  List (List (" hi" List (" pandas" List (" byes" 
27+  val  expected  =  List (List (" pandas" List (" hi holden" List (" bye" 
28+  testOperation[String , String , String ](input, input2, subtract _, expected, ordered =  false )
29+  }
30+ 
31+  def  subtract (d1 : DStream [String ], d2 : DStream [String ]):  DStream [String ] =  {
32+  d1.transformWith(d2, SampleStreamingTest .subtractRDDs _)
33+  }
34+ 
35+  test(" really simple transformation" 
36+  val  input  =  List (List (" hi" List (" hi holden" List (" bye" 
37+  val  expected  =  List (List (" hi" List (" hi" " holden" List (" bye" 
38+  testOperation[String , String ](input, tokenize _, expected, ordered =  false )
39+  }
40+ 
41+  //  This is the sample operation we are testing
42+  def  tokenize (f : DStream [String ]):  DStream [String ] =  {
43+  f.flatMap(_.split("  " 
44+  }
45+ 
46+  test(" CountByWindow with windowDuration 3s and slideDuration=2s" 
47+  //  There should be 2 windows : {batch2, batch1}, {batch4, batch3, batch2}
48+  val  batch1  =  List (" a" " b" 
49+  val  batch2  =  List (" d" " f" " a" " b" 
50+  val  batch3  =  List (" f" " g" "  h" 
51+  val  batch4  =  List (" a" 
52+  val  input =  List (batch1, batch2, batch3, batch4)
53+  val  expected  =  List (List (6L ), List (8L ))
54+  val  expected2  =  List (List (2L ), List (6L ), List (7L ), List (4L ))
55+ 
56+ 
57+  def  countByWindow (ds: DStream [String ]): DStream [Long ] =  {
58+  ds.countByWindow(windowDuration =  Seconds (3 ), slideDuration =  Seconds (2 ))
59+  }
60+ 
61+  def  countByWindow2 (ds: DStream [String ]): DStream [Long ] =  {
62+  ds.countByWindow(windowDuration =  Seconds (2 ), slideDuration =  Seconds (1 ))
63+  }
64+ 
65+  testOperation[String , Long ](input, countByWindow _, expected, ordered =  true )
66+  testOperation[String , Long ](input, countByWindow2 _, expected2, ordered =  true )
67+  }
68+ 
69+  test(" empty batch by using null" 
70+  def  multiply (stream1 : DStream [Int ]) =  stream1.map(_ *  3 )
71+ 
72+  val  input1  =  List (List (1 ), null , List (10 ))
73+  val  output  =  List (List (3 ), List (30 ))
74+ 
75+  testOperation(input1, multiply _, output, ordered =  false )
76+  }
77+ 
78+  test(" custom equality object (Integer)" 
79+  val  input  =  List (List (- 1 ), List (- 2 , 3 , - 4 ), List (5 , - 6 ))
80+  val  expected  =  List (List (1 ), List (2 , 3 , 4 ), List (5 , 6 ))
81+ 
82+  implicit  val  integerCustomEquality  = 
83+  new  Equality [Int ] {
84+  override  def  areEqual (a : Int , b : Any ):  Boolean  = 
85+  b match  {
86+  case  n : Int  =>  Math .abs(a) ==  Math .abs(n)
87+  case  _ =>  false 
88+  }
89+  }
90+ 
91+  def  doNothing (ds : DStream [Int ]) =  ds
92+ 
93+  testOperation[Int , Int ](input, doNothing _, expected, ordered =  false )
94+  testOperation[Int , Int ](input, doNothing _, expected, ordered =  true )
95+  }
96+ 
97+  override  def  maxWaitTimeMillis :  Int  =  20000 
98+ 
99+  test(" increase duration more than 10 seconds" 
100+  val  input  =  (1  to 1000 ).toList.map(x =>  List (x))
101+  val  expectedOutput  =  (1  to 1000 ).toList.map(x =>  List (2  *  x))
102+ 
103+  def  multiply (ds : DStream [Int ]) =  ds.map(_ *  2 )
104+ 
105+  testOperation[Int , Int ](input, multiply _, expectedOutput, ordered =  true )
106+  }
107+ 
108+ }
109+ 
110+ object  SampleStreamingTest  {
111+  def  subtractRDDs (r1 : RDD [String ], r2 : RDD [String ]):  RDD [String ] =  {
112+  r1.subtract(r2)
113+  }
114+ }
0 commit comments