In this article, I'll walk you through how to build a data pipeline that fetches weather data for multiple cities, processes the data using PySpark, and stores the output in Google Cloud.
We'll be using PySpark for distributed data processing, Prefect for workflow management, and Google Cloud Storage and BigQuery for data storage and processing.The code is available on github.
looker dashboard
The pipeline fetches weather data for multiple cities from the OpenWeatherMap API using requests library. The data is then processed and filtered using PySpark, and the output is stored as a CSV file in Google Cloud Storage. Finally, the data is loaded into a BigQuery table for further analysis.
pipeline
from pyspark.sql import SparkSession from google.cloud import storage from google.cloud import bigquery import requests import json from google.cloud.exceptions import NotFound import random # Define the OpenWeatherMap API key and base URL api_key = "" base_url = "https://api.openweathermap.org/data/2.5/weather" cities_100 = ['New York', 'London', 'Paris', 'Tokyo', 'Sydney', 'Moscow', 'Beijing', 'Rio de Janeiro', 'Mumbai', 'Cairo', 'Rome', 'Berlin', 'Toronto', 'Lagos', 'Bangkok', 'Melbourne', 'Johannesburg'] cities = random.sample(cities_100, 25) # Initialize a Spark session spark = SparkSession.builder.appName("WeatherData").getOrCreate() # Define a function to fetch weather data for a city and return a Spark dataframe def fetch_weather_data(city): # Send a request to the OpenWeatherMap API for the city's weather data params = {"q": city, "appid": api_key, "units": "metric"} response = requests.get(base_url, params=params) data = response.json() # Extract the relevant weather data from the API response temp = data["main"]["temp"] humidity = data["main"]["humidity"] wind_speed = data["wind"]["speed"] # Create a Spark dataframe with the weather data for the city df = spark.createDataFrame([(city, temp, humidity, wind_speed)], ["City", "Temperature", "Humidity", "WindSpeed"]) return df # Use the fetch_weather_data function to fetch weather data for all cities and merge them into a single dataframe weather_data = None for city in cities: city_weather_data = fetch_weather_data(city) if weather_data is None: weather_data = city_weather_data else: weather_data = weather_data.union(city_weather_data) # Perform some basic processing and transformation on the weather data using PySpark weather_data = weather_data.filter("Temperature > 10") \ .groupBy("City") \ .agg({"Humidity": "avg", "WindSpeed": "max"}) \ .withColumnRenamed("avg(Humidity)", "AverageHumidity") \ .withColumnRenamed("max(WindSpeed)", "MaxWindSpeed") # Show the final processed and transformed weather data weather_data.show() # Write the weather data as a CSV file to a Google Cloud Storage bucket bucket_name = "weather_app_dez" file_name = "weather_data.csv" storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob = bucket.blob(file_name) blob.upload_from_string(weather_data.toPandas().to_csv(index=False), content_type="text/csv") # Create a new BigQuery table and load the data from the CSV file table_name = "dez-dtc-23-384116.weather_app.weather_data" bigquery_client = bigquery.Client() table = bigquery.Table(table_name) schema = [bigquery.SchemaField("City", "STRING"), bigquery.SchemaField("AverageHumidity", "FLOAT"), bigquery.SchemaField("MaxWindSpeed", "FLOAT")] table.schema = schema job_config = bigquery.LoadJobConfig(source_format=bigquery.SourceFormat.CSV, skip_leading_rows=1, autodetect=True) job = bigquery_client.load_table_from_uri(f"gs://{bucket_name}/{file_name}", table, job_config=job_config) job.result() print(f"Loaded {job.output_rows} rows into BigQuery table {table_name}")
adding prefect functionality
from google.cloud import storage, bigquery import requests import json from google.cloud.exceptions import NotFound import random from prefect import task, Flow from pyspark.sql import SparkSession # Define the OpenWeatherMap API key and base URL api_key = "" base_url = "https://api.openweathermap.org/data/2.5/weather" spark = SparkSession.builder.appName("WeatherData").getOrCreate() # Define the cities list cities_100 = ['New York', 'London', 'Paris', 'Tokyo', 'Sydney', 'Moscow', 'Beijing', 'Rio de Janeiro', 'Mumbai', 'Cairo'] cities = random.sample(cities_100, 5) # Define a function to fetch weather data for a city and return a Spark dataframe @task def fetch_weather_data(cities): weather_data = None for city in cities: # Send a request to the OpenWeatherMap API for the city's weather data params = {"q": city, "appid": api_key, "units": "metric"} response = requests.get(base_url, params=params) data = response.json() # Extract the relevant weather data from the API response temp = data["main"]["temp"] humidity = data["main"]["humidity"] wind_speed = data["wind"]["speed"] # Create a Spark dataframe with the weather data for the city city_weather_data = spark.createDataFrame([(city, temp, humidity, wind_speed)], ["City", "Temperature", "Humidity", "WindSpeed"]) if weather_data is None: weather_data = city_weather_data else: weather_data = weather_data.union(city_weather_data) return weather_data ''' # Use the fetch_weather_data function to fetch weather data for all cities and merge them into a single dataframe @task def merge_weather_data(cities): weather_data = None for city in cities: city_weather_data = fetch_weather_data(city) if weather_data is None: weather_data = city_weather_data else: weather_data = weather_data.union(city_weather_data) return weather_data ''' # Perform some basic processing and transformation on the weather data using PySpark @task def process_weather_data(weather_data): processed_data = weather_data.filter("Temperature > 10") \ .groupBy("City") \ .agg({"Humidity": "avg", "WindSpeed": "max"}) \ .withColumnRenamed("avg(Humidity)", "AverageHumidity") \ .withColumnRenamed("max(WindSpeed)", "MaxWindSpeed") return processed_data # Write the weather data as a CSV file to a Google Cloud Storage bucket @task def write_weather_data_to_gcs(weather_data): bucket_name = "weather_app_dez" file_name = "weather_data.csv" storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob = bucket.blob(file_name) blob.upload_from_string(weather_data.toPandas().to_csv(index=False), content_type="text/csv") return f"gs://{bucket_name}/{file_name}" # Create a new BigQuery table and load the data from the CSV file @task def write_weather_data_to_bigquery(uri): table_name = "dez-dtc-23-384116.weather_app.weather_data" bigquery_client = bigquery.Client() table = bigquery.Table(table_name) schema = [bigquery.SchemaField("City", "STRING"), bigquery.SchemaField("AverageHumidity", "FLOAT"), bigquery.SchemaField("MaxWindSpeed", "FLOAT")] table.schema = schema job_config = bigquery.LoadJobConfig(source_format=bigquery.SourceFormat.CSV, skip_leading_rows=1, autodetect=True) job = bigquery_client.load_table_from_uri(uri, table, job_config=job_config) job.result() print(f"Loaded {job.output_rows} rows into BigQuery table {table_name}") with Flow("Weather Data Pipeline") as flow: cities = random.sample(cities_100, 5) weather_dat = fetch_weather_data(cities) processed_data = process_weather_data(weather_dat) uri = write_weather_data_to_gcs(processed_data) write_weather_data_to_bigquery(uri) flow.run()
To run the pipeline, you'll need to have a Google Cloud account with billing enabled or free tier, and the necessary API keys for accessing the OpenWeatherMap API.
Configuration
Create a new Google Cloud Storage bucket to store the output data. Create a new BigQuery dataset and table to store the output data. Update the bucket_name and table_name variables in the write_weather_data_to_gcs and write_weather_data_to_bigquery tasks, respectively, with the appropriate names of the bucket and table you created.
Running the Pipeline
To run the pipeline, follow these steps: Open a terminal window and navigate to the project directory. Build the Docker image using the following command: docker build -t weather-data-pipeline .
Run the Docker container using the following command: docker run--rm -it -v $(pwd):/app -e GOOGLE_APPLICATION_CREDENTIALS=/app/your-credentials.json weather-data-pipelin
e Note: Replace your-credentials.json with the name of your Google Cloud Platform service account key file. The pipeline will run and the output data will be written to the Google Cloud Storage bucket and BigQuery table you specified in the configuration step.
Troubleshooting
If you encounter any issues while running the pipeline, please check the following:
Ensure that the Google Cloud Platform credentials you specified are valid and have the appropriate permissions to access GCS and BigQuery. Ensure that the bucket and table names you specified in the configuration step are correct. Check the logs for any error messages that might indicate the cause of the issue.
In conclusion, building a data pipeline that fetches and processes weather data using PySpark, Prefect, and Google Cloud is an exciting project that showcases the power of these technologies. With this pipeline, you can easily collect and analyze weather data for multiple cities, and use it for various applications such as predictive modeling and weather forecasting.
Top comments (0)