Skip to content

Commit 2184270

Browse files
SireInsectusSireInsectus
authored andcommitted
Publishing v2.2.6
1 parent 13e7ffa commit 2184270

17 files changed

+589
-123
lines changed

ASP 5 - Streaming/ASP 5.1aL - Coupon Sales Lab.py

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
# COMMAND ----------
2525

26-
# MAGIC %run ../Includes/Classroom-Setup
26+
# MAGIC %run ../Includes/Classroom-Setup-5.1a
2727

2828
# COMMAND ----------
2929

@@ -45,9 +45,7 @@
4545

4646
# COMMAND ----------
4747

48-
assert df.isStreaming
49-
assert df.columns == ["order_id", "email", "transaction_timestamp", "total_item_quantity", "purchase_revenue_in_usd", "unique_items", "items"]
50-
print("All test pass")
48+
DA.tests.validate_1_1(df)
5149

5250
# COMMAND ----------
5351

@@ -69,9 +67,7 @@
6967

7068
# COMMAND ----------
7169

72-
schema_str = str(coupon_sales_df.schema)
73-
assert "StructField(items,StructType(List(StructField(coupon" in schema_str, "items column was not exploded"
74-
print("All test pass")
70+
DA.tests.validate_2_1(coupon_sales_df.schema)
7571

7672
# COMMAND ----------
7773

@@ -90,21 +86,17 @@
9086
coupons_checkpoint_path = f"{DA.paths.checkpoints}/coupon-sales"
9187
coupons_output_path = f"{DA.paths.working_dir}/coupon-sales/output"
9288

93-
coupon_sales_query = (coupon_sales_df.FILL_IN
94-
)
89+
coupon_sales_query = (coupon_sales_df.FILL_IN)
90+
91+
DA.block_until_stream_is_ready(coupon_sales_query)
9592

9693
# COMMAND ----------
9794

9895
# MAGIC %md **3.1: CHECK YOUR WORK**
9996

10097
# COMMAND ----------
10198

102-
DA.block_until_stream_is_ready("coupon_sales")
103-
assert coupon_sales_query.isActive
104-
assert len(dbutils.fs.ls(coupons_output_path)) > 0
105-
assert len(dbutils.fs.ls(coupons_checkpoint_path)) > 0
106-
assert "coupon_sales" in coupon_sales_query.lastProgress["name"]
107-
print("All test pass")
99+
DA.tests.validate_3_1(coupon_sales_query)
108100

109101
# COMMAND ----------
110102

@@ -128,9 +120,7 @@
128120

129121
# COMMAND ----------
130122

131-
assert type(query_id) == str
132-
assert list(query_status.keys()) == ["message", "isDataAvailable", "isTriggerActive"]
133-
print("All test pass")
123+
DA.tests.validate_4_1(query_id, query_status)
134124

135125
# COMMAND ----------
136126

@@ -148,8 +138,7 @@
148138

149139
# COMMAND ----------
150140

151-
assert not coupon_sales_query.isActive
152-
print("All test pass")
141+
DA.tests.validate_5_1(coupon_sales_query)
153142

154143
# COMMAND ----------
155144

ASP 5 - Streaming/ASP 5.1bL - Hourly Activity by Traffic Lab.py

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
# COMMAND ----------
2424

25-
# MAGIC %run ../Includes/Classroom-Setup
25+
# MAGIC %run ../Includes/Classroom-Setup-5.1b
2626

2727
# COMMAND ----------
2828

@@ -31,12 +31,10 @@
3131
# Directory of hourly events logged from the BedBricks website on July 3, 2020
3232
hourly_events_path = f"{DA.paths.datasets}/ecommerce/events/events-2020-07-03.json"
3333

34-
df = (spark
35-
.readStream
36-
.schema(schema)
37-
.option("maxFilesPerTrigger", 1)
38-
.json(hourly_events_path)
39-
)
34+
df = (spark.readStream
35+
.schema(schema)
36+
.option("maxFilesPerTrigger", 1)
37+
.json(hourly_events_path))
4038

4139
# COMMAND ----------
4240

@@ -49,17 +47,15 @@
4947
# COMMAND ----------
5048

5149
# TODO
52-
events_df = (df.FILL_IN
53-
)
50+
events_df = (df.FILL_IN)
5451

5552
# COMMAND ----------
5653

5754
# MAGIC %md **1.1: CHECK YOUR WORK**
5855

5956
# COMMAND ----------
6057

61-
assert "StructField(createdAt,TimestampType,true" in str(events_df.schema)
62-
print("All test pass")
58+
DA.tests.validate_1_1(events_df.schema)
6359

6460
# COMMAND ----------
6561

@@ -86,8 +82,7 @@
8682

8783
# COMMAND ----------
8884

89-
assert str(traffic_df.schema) == "StructType(List(StructField(traffic_source,StringType,true),StructField(active_users,LongType,false),StructField(hour,IntegerType,true)))"
90-
print("All test pass")
85+
DA.tests.validate_2_1(traffic_df.schema)
9186

9287
# COMMAND ----------
9388

@@ -132,8 +127,7 @@
132127

133128
# COMMAND ----------
134129

135-
for s in spark.streams.active:
136-
print(s.name)
130+
DA.tests.validate_4_1()
137131

138132
# COMMAND ----------
139133

ASP 5 - Streaming/ASP 5.1cL - Activity by Traffic Lab.py

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,32 +31,28 @@
3131

3232
# COMMAND ----------
3333

34-
# MAGIC %run ../Includes/Classroom-Setup
34+
# MAGIC %run ../Includes/Classroom-Setup-5.1c
3535

3636
# COMMAND ----------
3737

3838
# MAGIC %md ### 1. Read data stream
3939
# MAGIC - Set to process 1 file per trigger
4040
# MAGIC - Read from Delta with filepath stored in **`DA.paths.events`**
4141
# MAGIC
42-
# MAGIC Assign the resulting DataFrame to **`df`**.
42+
# MAGIC Assign the resulting Query to **`df`**.
4343

4444
# COMMAND ----------
4545

4646
# TODO
4747
df = FILL_IN
4848

49-
df.isStreaming
50-
5149
# COMMAND ----------
5250

5351
# MAGIC %md **1.1: CHECK YOUR WORK**
5452

5553
# COMMAND ----------
5654

57-
assert df.isStreaming
58-
assert df.columns == ["device", "ecommerce", "event_name", "event_previous_timestamp", "event_timestamp", "geo", "items", "traffic_source", "user_first_touch_timestamp", "user_id"]
59-
print("All test pass")
55+
DA.tests.validate_1_1(df)
6056

6157
# COMMAND ----------
6258

@@ -79,8 +75,7 @@
7975

8076
# COMMAND ----------
8177

82-
assert str(traffic_df.schema) == "StructType(List(StructField(traffic_source,StringType,true),StructField(active_users,LongType,false)))"
83-
print("All test pass")
78+
DA.tests.validate_2_1(traffic_df.schema)
8479

8580
# COMMAND ----------
8681

@@ -117,11 +112,7 @@
117112

118113
# COMMAND ----------
119114

120-
DA.block_until_stream_is_ready("active_users_by_traffic")
121-
assert traffic_query.isActive
122-
assert "active_users_by_traffic" in traffic_query.name
123-
assert traffic_query.lastProgress["sink"]["description"] == "MemorySink"
124-
print("All test pass")
115+
DA.tests.validate_4_1(traffic_query)
125116

126117
# COMMAND ----------
127118

@@ -163,8 +154,7 @@
163154

164155
# COMMAND ----------
165156

166-
assert not traffic_query.isActive
167-
print("All test pass")
157+
DA.tests.validate_6_1(traffic_query)
168158

169159
# COMMAND ----------
170160

Includes/Classroom-Setup-5.1a.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
# Databricks notebook source
2+
# MAGIC %run ./_common
3+
4+
# COMMAND ----------
5+
6+
@TestHelper.monkey_patch
7+
def validate_1_1(self, df):
8+
suite = DA.tests.new("5.1a-1.1")
9+
10+
suite.test_true(df.isStreaming, description="The query is streaming")
11+
12+
columns = ['order_id', 'email', 'transaction_timestamp', 'total_item_quantity', 'purchase_revenue_in_usd', 'unique_items', 'items']
13+
suite.test_sequence(actual_value=df.columns,
14+
expected_value=columns,
15+
test_column_order=False,
16+
description=f"DataFrame contains all {len(columns)} columns",
17+
hint="Found [[ACTUAL_VALUE]]")
18+
19+
suite.display_results()
20+
assert suite.passed, "One or more tests failed."
21+
22+
# COMMAND ----------
23+
24+
@TestHelper.monkey_patch
25+
def validate_2_1(self, schema:StructType):
26+
27+
suite = DA.tests.new("5.1a-2.1")
28+
29+
suite.test_equals(actual_value=type(schema), expected_value=StructType, description="Schema is of type StructType", hint="Found [[ACTUAL_VALUE]]")
30+
31+
suite.test_length(schema.fieldNames(), 7, description="Schema contians seven fields", hint="Found [[LEN_ACTUAL_VALUE]]: [[ACTUAL_VALUE]]")
32+
33+
suite.test_struct_field(schema, "order_id", "LongType", None)
34+
suite.test_struct_field(schema, "email", "StringType", None)
35+
suite.test_struct_field(schema, "transaction_timestamp", "LongType", None)
36+
suite.test_struct_field(schema, "total_item_quantity", "LongType", None)
37+
suite.test_struct_field(schema, "purchase_revenue_in_usd", "DoubleType", None)
38+
suite.test_struct_field(schema, "unique_items", "LongType", None)
39+
suite.test_struct_field(schema, "items", "StructType", None)
40+
41+
suite.display_results()
42+
assert suite.passed, "One or more tests failed."
43+
44+
45+
# COMMAND ----------
46+
47+
@TestHelper.monkey_patch
48+
def validate_3_1(self, query):
49+
suite = DA.tests.new("5.1a-3.1")
50+
51+
suite.test_true(query.isActive, description="The query is active")
52+
53+
suite.test_equals(coupon_sales_query.lastProgress["name"], "coupon_sales",
54+
description="The query name is \"coupon_sales\".")
55+
56+
coupons_output_path = f"{DA.paths.working_dir}/coupon-sales/output"
57+
suite.test(actual_value=None, test_function=lambda: len(dbutils.fs.ls(coupons_output_path)) > 0,
58+
description=f"Found at least one file in .../coupon-sales/output")
59+
60+
coupons_checkpoint_path = f"{DA.paths.checkpoints}/coupon-sales"
61+
suite.test(actual_value=None, test_function=lambda: len(dbutils.fs.ls(coupons_checkpoint_path)) > 0,
62+
description=f"Found at least one file in .../coupon-sales")
63+
64+
suite.display_results()
65+
assert suite.passed, "One or more tests failed."
66+
67+
68+
# COMMAND ----------
69+
70+
@TestHelper.monkey_patch
71+
def validate_4_1(self, query_id, query_status):
72+
suite = DA.tests.new("5.1a-4.1")
73+
74+
suite.test_sequence(actual_value=query_status.keys(),
75+
expected_value=["message", "isDataAvailable", "isTriggerActive"],
76+
test_column_order=False,
77+
description="Valid status value.")
78+
79+
suite.test_equals(type(query_id), str, description="Valid query_id value.")
80+
81+
suite.display_results()
82+
assert suite.passed, "One or more tests failed."
83+
84+
# COMMAND ----------
85+
86+
@TestHelper.monkey_patch
87+
def validate_5_1(self, query):
88+
suite = DA.tests.new("5.1a-5.1")
89+
90+
suite.test_false(query.isActive, description="The query is not active")
91+
92+
suite.display_results()
93+
assert suite.passed, "One or more tests failed."
94+
95+
96+
# COMMAND ----------
97+
98+
DA = DBAcademyHelper(**helper_arguments) # Create the DA object
99+
DA.reset_environment() # Reset by removing databases and files from other lessons
100+
DA.init(install_datasets=True, # Initialize, install and validate the datasets
101+
create_db=True) # Continue initialization, create the user-db
102+
103+
DA.paths.sales = f"{DA.paths.datasets}/ecommerce/sales/sales.delta"
104+
DA.paths.users = f"{DA.paths.datasets}/ecommerce/users/users.delta"
105+
DA.paths.events = f"{DA.paths.datasets}/ecommerce/events/events.delta"
106+
DA.paths.products = f"{DA.paths.datasets}/products/products.delta"
107+
108+
DA.conclude_setup() # Conclude setup by advertising environmental changes
109+

Includes/Classroom-Setup-5.1b.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
# Databricks notebook source
2+
# MAGIC %run ./_common
3+
4+
# COMMAND ----------
5+
6+
@TestHelper.monkey_patch
7+
def validate_1_1(self, schema):
8+
suite = DA.tests.new("5.1b-1.1")
9+
10+
suite.test_equals(actual_value=type(schema), expected_value=StructType, description="Schema is of type StructType", hint="Found [[ACTUAL_VALUE]]")
11+
12+
suite.test_length(schema.fieldNames(), 12, description="Schema contians 12 field", hint="Found [[LEN_ACTUAL_VALUE]]: [[ACTUAL_VALUE]]")
13+
14+
suite.test_struct_field(schema, "device", "StringType", None)
15+
suite.test_struct_field(schema, "ecommerce", "StructType", None)
16+
suite.test_struct_field(schema, "event_name", "StringType", None)
17+
suite.test_struct_field(schema, "event_previous_timestamp", "LongType", None)
18+
suite.test_struct_field(schema, "event_timestamp", "LongType", None)
19+
suite.test_struct_field(schema, "geo", "StructType", None)
20+
suite.test_struct_field(schema, "items", "ArrayType", None)
21+
suite.test_struct_field(schema, "traffic_source", "StringType", None)
22+
suite.test_struct_field(schema, "user_first_touch_timestamp", "LongType", None)
23+
suite.test_struct_field(schema, "user_id", "StringType", None)
24+
suite.test_struct_field(schema, "hour", "IntegerType", None)
25+
suite.test_struct_field(schema, "createdAt", "TimestampType", None)
26+
27+
suite.display_results()
28+
assert suite.passed, "One or more tests failed."
29+
30+
31+
# COMMAND ----------
32+
33+
@TestHelper.monkey_patch
34+
def validate_2_1(self, schema):
35+
suite = DA.tests.new("5.1b-2.1")
36+
37+
suite.test_equals(actual_value=type(schema), expected_value=StructType, description="Schema is of type StructType", hint="Found [[ACTUAL_VALUE]]")
38+
39+
suite.test_length(schema.fieldNames(), 3, description="Schema contians three field", hint="Found [[LEN_ACTUAL_VALUE]]: [[ACTUAL_VALUE]]")
40+
41+
suite.test_struct_field(schema, "traffic_source", "StringType", None)
42+
suite.test_struct_field(schema, "active_users", "LongType", None)
43+
suite.test_struct_field(schema, "hour", "IntegerType", None)
44+
45+
suite.display_results()
46+
assert suite.passed, "One or more tests failed."
47+
48+
49+
# COMMAND ----------
50+
51+
@TestHelper.monkey_patch
52+
def validate_4_1(self):
53+
suite = DA.tests.new("5.1b-4.1")
54+
55+
suite.test_length(spark.streams.active, 0, description="All queries have stopped streaming")
56+
57+
suite.display_results()
58+
assert suite.passed, "One or more tests failed."
59+
60+
# COMMAND ----------
61+
62+
DA = DBAcademyHelper(**helper_arguments) # Create the DA object
63+
DA.reset_environment() # Reset by removing databases and files from other lessons
64+
DA.init(install_datasets=True, # Initialize, install and validate the datasets
65+
create_db=True) # Continue initialization, create the user-db
66+
67+
DA.paths.sales = f"{DA.paths.datasets}/ecommerce/sales/sales.delta"
68+
DA.paths.users = f"{DA.paths.datasets}/ecommerce/users/users.delta"
69+
DA.paths.events = f"{DA.paths.datasets}/ecommerce/events/events.delta"
70+
DA.paths.products = f"{DA.paths.datasets}/products/products.delta"
71+
72+
DA.conclude_setup() # Conclude setup by advertising environmental changes
73+

0 commit comments

Comments
 (0)