Python Forum
Standard template Data Engineering and Analytics - hosthub
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Standard template Data Engineering and Analytics - hosthub
#1
Data Engineering Template (ETL: S3 → EMR/PySpark → S3 or Redshift)
# Imports import boto3 from pyspark.sql import SparkSession from pyspark.sql.functions import col, when, lit from pyspark.sql.types import StructType, StructField, StringType, IntegerType # Initialize Spark Session spark = SparkSession.builder \ .appName("ETL Pipeline") \ .getOrCreate() # Read Data from S3 def read_s3_data(bucket_name, key, schema=None, file_format="csv"): s3_path = f"s3://{bucket_name}/{key}" if file_format == "csv": return spark.read.option("header", True).schema(schema).csv(s3_path) elif file_format == "parquet": return spark.read.parquet(s3_path) else: raise ValueError("Unsupported file format") # Example schema (optional) example_schema = StructType([ StructField("id", IntegerType(), True), StructField("name", StringType(), True), StructField("amount", IntegerType(), True) ]) # df = read_s3_data("my-bucket", "data/input.csv", example_schema) # Transform Data def transform_data(df): df_transformed = df.withColumn("amount_double", col("amount")*2) \ .withColumn("category", when(col("amount")>100, "High").otherwise("Low")) return df_transformed # df_transformed = transform_data(df) # Load Data to S3 def write_s3_data(df, bucket_name, key, file_format="parquet"): s3_path = f"s3://{bucket_name}/{key}" if file_format == "csv": df.write.mode("overwrite").option("header", True).csv(s3_path) elif file_format == "parquet": df.write.mode("overwrite").parquet(s3_path) else: raise ValueError("Unsupported file format") # write_s3_data(df_transformed, "my-bucket", "data/output/") # Load Data to Redshift def write_redshift(df, jdbc_url, table_name, user, password): df.write \ .format("jdbc") \ .option("url", jdbc_url) \ .option("dbtable", table_name) \ .option("user", user) \ .option("password", password) \ .mode("append") \ .save()
Notes:
Use def functions for modularity.
Easy to replace S3 paths, schema, and transformations.
Redshift loading uses Spark JDBC.

Data Analytics Template (Boto3, SageMaker, Sklearn, Pandas, S3)

# Imports import boto3 import pandas as pd from sklearn.model_selection import train_test_split from sklearn.linear_model import LinearRegression from sklearn.metrics import mean_squared_error import joblib # Read Data from S3 s3_client = boto3.client('s3') bucket_name = "my-bucket" key = "data/input.csv" obj = s3_client.get_object(Bucket=bucket_name, Key=key) df = pd.read_csv(obj['Body']) # Basic EDA / Analytics print(df.head()) print(df.describe()) print(df.info()) # Train/Test Split X = df.drop(columns=['target']) y = df['target'] X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # Train Model (Sklearn) model = LinearRegression() model.fit(X_train, y_train) # Evaluate Model y_pred = model.predict(X_test) mse = mean_squared_error(y_test, y_pred) print(f"Mean Squared Error: {mse}") # Save Model to S3 joblib.dump(model, "/tmp/model.joblib") s3_client.upload_file("/tmp/model.joblib", bucket_name, "models/model.joblib") # SageMaker (Optional for Advanced) # Example: Create SageMaker session and upload data import sagemaker sagemaker_session = sagemaker.Session() role = "arn:aws:iam::<account_id>:role/SageMakerRole" # Upload data to S3 for SageMaker training train_s3_path = sagemaker_session.upload_data(path="data/input.csv", key_prefix="training_data")
Notes:
Uses simple variables and Boto3 directly—no need for functions unless you want.
Covers the typical workflow: S3 → Pandas → ML → S3.
Can be extended for SageMaker training jobs.
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  [PIL] standard deck of cards joe_momma 5 7,576 Jan-13-2020, 11:05 AM
Last Post: snippsat

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020
This forum uses Lukasz Tkacz MyBB addons.
Forum use Krzysztof "Supryk" Supryczynski addons.