Data Engineering Template (ETL: S3 → EMR/PySpark → S3 or Redshift)
Use def functions for modularity.
Easy to replace S3 paths, schema, and transformations.
Redshift loading uses Spark JDBC.
Data Analytics Template (Boto3, SageMaker, Sklearn, Pandas, S3)
Uses simple variables and Boto3 directly—no need for functions unless you want.
Covers the typical workflow: S3 → Pandas → ML → S3.
Can be extended for SageMaker training jobs.
# Imports import boto3 from pyspark.sql import SparkSession from pyspark.sql.functions import col, when, lit from pyspark.sql.types import StructType, StructField, StringType, IntegerType # Initialize Spark Session spark = SparkSession.builder \ .appName("ETL Pipeline") \ .getOrCreate() # Read Data from S3 def read_s3_data(bucket_name, key, schema=None, file_format="csv"): s3_path = f"s3://{bucket_name}/{key}" if file_format == "csv": return spark.read.option("header", True).schema(schema).csv(s3_path) elif file_format == "parquet": return spark.read.parquet(s3_path) else: raise ValueError("Unsupported file format") # Example schema (optional) example_schema = StructType([ StructField("id", IntegerType(), True), StructField("name", StringType(), True), StructField("amount", IntegerType(), True) ]) # df = read_s3_data("my-bucket", "data/input.csv", example_schema) # Transform Data def transform_data(df): df_transformed = df.withColumn("amount_double", col("amount")*2) \ .withColumn("category", when(col("amount")>100, "High").otherwise("Low")) return df_transformed # df_transformed = transform_data(df) # Load Data to S3 def write_s3_data(df, bucket_name, key, file_format="parquet"): s3_path = f"s3://{bucket_name}/{key}" if file_format == "csv": df.write.mode("overwrite").option("header", True).csv(s3_path) elif file_format == "parquet": df.write.mode("overwrite").parquet(s3_path) else: raise ValueError("Unsupported file format") # write_s3_data(df_transformed, "my-bucket", "data/output/") # Load Data to Redshift def write_redshift(df, jdbc_url, table_name, user, password): df.write \ .format("jdbc") \ .option("url", jdbc_url) \ .option("dbtable", table_name) \ .option("user", user) \ .option("password", password) \ .mode("append") \ .save()Notes:Use def functions for modularity.
Easy to replace S3 paths, schema, and transformations.
Redshift loading uses Spark JDBC.
Data Analytics Template (Boto3, SageMaker, Sklearn, Pandas, S3)
# Imports import boto3 import pandas as pd from sklearn.model_selection import train_test_split from sklearn.linear_model import LinearRegression from sklearn.metrics import mean_squared_error import joblib # Read Data from S3 s3_client = boto3.client('s3') bucket_name = "my-bucket" key = "data/input.csv" obj = s3_client.get_object(Bucket=bucket_name, Key=key) df = pd.read_csv(obj['Body']) # Basic EDA / Analytics print(df.head()) print(df.describe()) print(df.info()) # Train/Test Split X = df.drop(columns=['target']) y = df['target'] X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # Train Model (Sklearn) model = LinearRegression() model.fit(X_train, y_train) # Evaluate Model y_pred = model.predict(X_test) mse = mean_squared_error(y_test, y_pred) print(f"Mean Squared Error: {mse}") # Save Model to S3 joblib.dump(model, "/tmp/model.joblib") s3_client.upload_file("/tmp/model.joblib", bucket_name, "models/model.joblib") # SageMaker (Optional for Advanced) # Example: Create SageMaker session and upload data import sagemaker sagemaker_session = sagemaker.Session() role = "arn:aws:iam::<account_id>:role/SageMakerRole" # Upload data to S3 for SageMaker training train_s3_path = sagemaker_session.upload_data(path="data/input.csv", key_prefix="training_data")Notes:Uses simple variables and Boto3 directly—no need for functions unless you want.
Covers the typical workflow: S3 → Pandas → ML → S3.
Can be extended for SageMaker training jobs.
