|
| 1 | +import ast |
| 2 | +import json |
| 3 | +import requests |
| 4 | + |
| 5 | +from pyspark.sql.datasource import DataSource, SimpleDataSourceStreamReader |
| 6 | +from pyspark.sql.types import StructType, StructField, DoubleType, StringType |
| 7 | + |
| 8 | + |
| 9 | +class WeatherDataSource(DataSource): |
| 10 | + """ |
| 11 | + A custom PySpark data source for fetching weather data from tomorrow.io for given |
| 12 | + locations (latitude, longitude). |
| 13 | +
|
| 14 | + Options |
| 15 | + ------- |
| 16 | +
|
| 17 | + - locations: specify a list of (latitude, longitude) tuples. |
| 18 | + - apikey: specify the API key for the weather service (tomorrow.io). |
| 19 | + - frequency: specify the frequency of the data ("minutely", "hourly", "daily"). |
| 20 | + Default is "minutely". |
| 21 | +
|
| 22 | + Examples |
| 23 | + -------- |
| 24 | +
|
| 25 | + Register the data source. |
| 26 | +
|
| 27 | + >>> from pyspark_datasources import WeatherDataSource |
| 28 | + >>> spark.dataSource.register(WeatherDataSource) |
| 29 | +
|
| 30 | +
|
| 31 | + Define the options for the custom data source |
| 32 | + |
| 33 | + >>> options = { |
| 34 | + ... "locations": "[(37.7749, -122.4194), (40.7128, -74.0060)]", # San Francisco and New York |
| 35 | + ... "apikey": "your_api_key_here", |
| 36 | + ... } |
| 37 | +
|
| 38 | + Create a DataFrame using the custom weather data source |
| 39 | + |
| 40 | + >>> weather_df = spark.readStream.format("weather").options(**options).load() |
| 41 | +
|
| 42 | + Stream weather data and print the results to the console in real-time. |
| 43 | +
|
| 44 | + >>> query = weather_df.writeStream.format("console").trigger(availableNow=True).start() |
| 45 | + """ |
| 46 | + |
| 47 | + @classmethod |
| 48 | + def name(cls): |
| 49 | + """Returns the name of the data source.""" |
| 50 | + return "weather" |
| 51 | + |
| 52 | + def __init__(self, options): |
| 53 | + """Initialize with options provided.""" |
| 54 | + self.options = options |
| 55 | + self.frequency = options.get("frequency", "minutely") |
| 56 | + if self.frequency not in ["minutely", "hourly", "daily"]: |
| 57 | + raise ValueError(f"Unsupported frequency: {self.frequency}") |
| 58 | + |
| 59 | + def schema(self): |
| 60 | + """Defines the output schema of the data source.""" |
| 61 | + return StructType([ |
| 62 | + StructField("latitude", DoubleType(), True), |
| 63 | + StructField("longitude", DoubleType(), True), |
| 64 | + StructField("weather", StringType(), True), |
| 65 | + StructField("timestamp", StringType(), True), |
| 66 | + ]) |
| 67 | + |
| 68 | + def simpleStreamReader(self, schema: StructType): |
| 69 | + """Returns an instance of the reader for this data source.""" |
| 70 | + return WeatherSimpleStreamReader(schema, self.options) |
| 71 | + |
| 72 | + |
| 73 | +class WeatherSimpleStreamReader(SimpleDataSourceStreamReader): |
| 74 | + |
| 75 | + def initialOffset(self): |
| 76 | + """ |
| 77 | + Returns the initial offset for reading, which serves as the starting point for |
| 78 | + the streaming data source. |
| 79 | +
|
| 80 | + The initial offset is returned as a dictionary where each key is a unique identifier |
| 81 | + for a specific (latitude, longitude) pair, and each value is a timestamp string |
| 82 | + (in ISO 8601 format) representing the point in time from which data should start being |
| 83 | + read. |
| 84 | +
|
| 85 | + Example: |
| 86 | + For locations [(37.7749, -122.4194), (40.7128, -74.0060)], the offset might look like: |
| 87 | + { |
| 88 | + "offset_37.7749_-122.4194": "2024-09-01T00:00:00Z", |
| 89 | + "offset_40.7128_-74.0060": "2024-09-01T00:00:00Z" |
| 90 | + } |
| 91 | + """ |
| 92 | + return {f"offset_{lat}_{long}": "2024-09-01T00:00:00Z" for (lat, long) in self.locations} |
| 93 | + |
| 94 | + @staticmethod |
| 95 | + def _parse_locations(locations_str: str): |
| 96 | + """Converts string representation of list of tuples to actual list of tuples.""" |
| 97 | + return [tuple(map(float, x)) for x in ast.literal_eval(locations_str)] |
| 98 | + |
| 99 | + def __init__(self, schema: StructType, options: dict): |
| 100 | + """Initialize with schema and options.""" |
| 101 | + super().__init__() |
| 102 | + self.schema = schema |
| 103 | + self.locations = self._parse_locations(options.get("locations", "[]")) |
| 104 | + self.api_key = options.get("apikey", "") |
| 105 | + self.current = 0 |
| 106 | + self.frequency = options.get("frequency", "minutely") |
| 107 | + self.session = requests.Session() # Use a session for connection pooling |
| 108 | + |
| 109 | + def read(self, start: dict): |
| 110 | + """Reads data starting from the given offset.""" |
| 111 | + data = [] |
| 112 | + new_offset = {} |
| 113 | + for lat, long in self.locations: |
| 114 | + start_ts = start[f"offset_{lat}_{long}"] |
| 115 | + weather = self._fetch_weather(lat, long, self.api_key, self.session)[self.frequency] |
| 116 | + for entry in weather: |
| 117 | + # Start time is exclusive and end time is inclusive. |
| 118 | + if entry["time"] > start_ts: |
| 119 | + data.append((lat, long, json.dumps(entry["values"]), entry["time"])) |
| 120 | + new_offset.update({f"offset_{lat}_{long}": weather[-1]["time"]}) |
| 121 | + return (data, new_offset) |
| 122 | + |
| 123 | + @staticmethod |
| 124 | + def _fetch_weather(lat: float, long: float, api_key: str, session): |
| 125 | + """Fetches weather data for the given latitude and longitude using a REST API.""" |
| 126 | + url = f"https://api.tomorrow.io/v4/weather/forecast?location={lat},{long}&apikey={api_key}" |
| 127 | + response = session.get(url) |
| 128 | + response.raise_for_status() |
| 129 | + return response.json()["timelines"] |
0 commit comments