Skip to content

Commit 55248ae

Browse files
authored
opensky pull request (#11)
* added opensky.py * added opensky.py * added opensky datasource
1 parent 372d3c7 commit 55248ae

File tree

2 files changed

+351
-0
lines changed

2 files changed

+351
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ spark.readStream.format("fake").load().writeStream.format("console").start()
4545
| [GoogleSheetsDataSource](pyspark_datasources/googlesheets.py) | `googlesheets` | Read table from public Google Sheets | None |
4646
| [KaggleDataSource](pyspark_datasources/kaggle.py) | `kaggle` | Read datasets from Kaggle | `kagglehub`, `pandas` |
4747
| [SimpleJsonDataSource](pyspark_datasources/simplejson.py) | `simplejson` | Write JSON data to Databricks DBFS | `databricks-sdk` |
48+
| [OpenSkyDataSource](pyspark_datasources/opensky.py) | `opensky` | Read from OpenSky Network. | None |
4849

4950
See more here: https://allisonwang-db.github.io/pyspark-data-sources/.
5051

pyspark_datasources/opensky.py

Lines changed: 350 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,350 @@
1+
"""
2+
OpenSky Network Data Source for Apache Spark - Academic/Private Use Example
3+
4+
This module provides a custom Spark data source for streaming real-time aircraft tracking data from the OpenSky Network API (https://opensky-network.org/). The OpenSky Network is a community-based receiver network that collects air traffic surveillance data and makes it available as open data to researchers and enthusiasts.
5+
6+
Features:
7+
- Real-time streaming of aircraft positions, velocities, and flight data
8+
- Support for multiple geographic regions (Europe, North America, Asia, etc.)
9+
- OAuth2 authentication for higher API rate limits (4000 vs 100 calls/day)
10+
- Robust error handling with automatic retries and rate limiting
11+
- Data validation and type-safe parsing of aircraft state vectors
12+
- Configurable bounding boxes for focused data collection
13+
14+
Usage Example (Academic/Research):
15+
# Basic usage with region NORTH_AMERICA
16+
df = spark.readStream.format("opensky").load()
17+
18+
# With specific region and authentication
19+
df = spark.readStream.format("opensky") \
20+
.option("region", "EUROPE") \
21+
.option("client_id", "your_research_client_id") \
22+
.option("client_secret", "your_research_client_secret") \
23+
.load()
24+
25+
Data Schema:
26+
Each record contains comprehensive aircraft information including position (lat/lon),altitude, velocity, heading, call sign, ICAO identifier, and various flight status flags. All timestamps are in UTC timezone for consistency.
27+
28+
Feed your own data to OpenSky Network https://opensky-network.org/feed
29+
30+
Rate Limits & Responsible Usage:
31+
- Anonymous access: 100 API calls per day
32+
- Authenticated access: 4000 API calls per day (research accounts)
33+
- Minimum 5-second interval between requests
34+
- 8000 API calls if you feed your own data to the OpenSky Network feed (https://opensky-network.org/feed)
35+
36+
Data Attribution:
37+
When using this data in research or publications, please cite:
38+
"The OpenSky Network, https://opensky-network.org"
39+
40+
Author: Frank Munz, Databricks - Example Only, No Warranty
41+
Purpose: Educational Example / Academic Research Tool
42+
Version: 1.0
43+
Last Updated: July-2025
44+
45+
================================================================================
46+
LEGAL NOTICES & TERMS OF USE
47+
48+
USAGE RESTRICTIONS:
49+
- Academic research and educational purposes only
50+
- Commercial use requires explicit permission from OpenSky Network
51+
- Must comply with OpenSky Network Terms of Use: https://opensky-network.org/about/terms-of-use
52+
53+
If you create a publication (including web pages, papers published by a third party, and publicly available presentations) using data from the OpenSky Network data set, you should cite the original OpenSky paper as follows:
54+
55+
Matthias Schäfer, Martin Strohmeier, Vincent Lenders, Ivan Martinovic and Matthias Wilhelm.
56+
"Bringing Up OpenSky: A Large-scale ADS-B Sensor Network for Research".
57+
In Proceedings of the 13th IEEE/ACM International Symposium on Information Processing in Sensor Networks (IPSN), pages 83-94, April 2014.
58+
59+
60+
DISCLAIMER & LIABILITY:
61+
This code is provided "AS IS" for educational purposes only. The author and Databricks make no warranties, express or implied, and disclaim all liability for any damages, losses, or issues arising from the use of this code. Users assume full responsibility for compliance with all applicable terms of service, laws, and regulations. Use at your own risk.
62+
63+
For commercial use, contact OpenSky Network directly.
64+
================================================================================
65+
66+
67+
"""
68+
69+
70+
import requests
71+
import time
72+
from datetime import datetime, timezone
73+
from typing import Dict, List, Tuple, Any, Optional, Iterator
74+
from dataclasses import dataclass
75+
from requests.adapters import HTTPAdapter
76+
from requests.packages.urllib3.util.retry import Retry
77+
from enum import Enum
78+
79+
from pyspark.sql.datasource import SimpleDataSourceStreamReader, DataSource
80+
from pyspark.sql.types import *
81+
82+
DS_NAME = "opensky"
83+
84+
@dataclass
85+
class BoundingBox:
86+
lamin: float # Minimum latitude
87+
lamax: float # Maximum latitude
88+
lomin: float # Minimum longitude
89+
lomax: float # Maximum longitude
90+
91+
class Region(Enum):
92+
EUROPE = BoundingBox(35.0, 72.0, -25.0, 45.0)
93+
NORTH_AMERICA = BoundingBox(7.0, 72.0, -168.0, -60.0)
94+
SOUTH_AMERICA = BoundingBox(-56.0, 15.0, -90.0, -30.0)
95+
ASIA = BoundingBox(-10.0, 82.0, 45.0, 180.0)
96+
AUSTRALIA = BoundingBox(-50.0, -10.0, 110.0, 180.0)
97+
AFRICA = BoundingBox(-35.0, 37.0, -20.0, 52.0)
98+
GLOBAL = BoundingBox(-90.0, 90.0, -180.0, 180.0)
99+
100+
class OpenSkyAPIError(Exception):
101+
"""Base exception for OpenSky API errors"""
102+
pass
103+
104+
class RateLimitError(OpenSkyAPIError):
105+
"""Raised when API rate limit is exceeded"""
106+
pass
107+
108+
class OpenSkyStreamReader(SimpleDataSourceStreamReader):
109+
110+
DEFAULT_REGION = "NORTH_AMERICA"
111+
MIN_REQUEST_INTERVAL = 5.0 # seconds between requests
112+
ANONYMOUS_RATE_LIMIT = 100 # calls per day
113+
AUTHENTICATED_RATE_LIMIT = 4000 # calls per day
114+
MAX_RETRIES = 3
115+
RETRY_BACKOFF = 2
116+
RETRY_STATUS_CODES = [429, 500, 502, 503, 504]
117+
118+
def __init__(self, schema: StructType, options: Dict[str, str]):
119+
super().__init__()
120+
self.schema = schema
121+
self.options = options
122+
self.session = self._create_session()
123+
self.last_request_time = 0
124+
125+
region_name = options.get('region', self.DEFAULT_REGION).upper()
126+
try:
127+
self.bbox = Region[region_name].value
128+
except KeyError:
129+
print(f"Invalid region '{region_name}'. Defaulting to {self.DEFAULT_REGION}.")
130+
self.bbox = Region[self.DEFAULT_REGION].value
131+
132+
self.client_id = options.get('client_id')
133+
self.client_secret = options.get('client_secret')
134+
self.access_token = None
135+
self.token_expires_at = 0
136+
137+
if self.client_id and self.client_secret:
138+
self._get_access_token() # OAuth2 authentication
139+
self.rate_limit = self.AUTHENTICATED_RATE_LIMIT
140+
else:
141+
self.rate_limit = self.ANONYMOUS_RATE_LIMIT
142+
143+
def _get_access_token(self):
144+
"""Get OAuth2 access token using client credentials flow"""
145+
current_time = time.time()
146+
if self.access_token and current_time < self.token_expires_at:
147+
return # Token still valid
148+
149+
token_url = "https://auth.opensky-network.org/auth/realms/opensky-network/protocol/openid-connect/token"
150+
data = {
151+
"grant_type": "client_credentials",
152+
"client_id": self.client_id,
153+
"client_secret": self.client_secret
154+
}
155+
156+
try:
157+
response = requests.post(token_url, data=data, timeout=10)
158+
response.raise_for_status()
159+
token_data = response.json()
160+
161+
self.access_token = token_data["access_token"]
162+
expires_in = token_data.get("expires_in", 1800)
163+
self.token_expires_at = current_time + expires_in - 300
164+
165+
except requests.exceptions.RequestException as e:
166+
raise OpenSkyAPIError(f"Failed to get access token: {str(e)}")
167+
168+
def _create_session(self) -> requests.Session:
169+
"""Create and configure requests session with retry logic"""
170+
session = requests.Session()
171+
retry_strategy = Retry(
172+
total=self.MAX_RETRIES,
173+
backoff_factor=self.RETRY_BACKOFF,
174+
status_forcelist=self.RETRY_STATUS_CODES
175+
)
176+
adapter = HTTPAdapter(max_retries=retry_strategy)
177+
session.mount("https://", adapter)
178+
session.mount("http://", adapter)
179+
return session
180+
181+
def initialOffset(self) -> Dict[str, int]:
182+
return {'last_fetch': 0}
183+
184+
def _handle_rate_limit(self):
185+
"""Ensure e MIN_REQUEST_INTERVAL seconds between requests"""
186+
current_time = time.time()
187+
time_since_last_request = current_time - self.last_request_time
188+
189+
if time_since_last_request < self.MIN_REQUEST_INTERVAL:
190+
sleep_time = self.MIN_REQUEST_INTERVAL - time_since_last_request
191+
time.sleep(sleep_time)
192+
193+
self.last_request_time = time.time()
194+
195+
def _fetch_states(self) -> requests.Response:
196+
"""Fetch states from OpenSky API with error handling"""
197+
self._handle_rate_limit()
198+
199+
if self.client_id and self.client_secret:
200+
self._get_access_token()
201+
202+
params = {
203+
'lamin': self.bbox.lamin,
204+
'lamax': self.bbox.lamax,
205+
'lomin': self.bbox.lomin,
206+
'lomax': self.bbox.lomax
207+
}
208+
209+
headers = {}
210+
if self.access_token:
211+
headers['Authorization'] = f'Bearer {self.access_token}'
212+
213+
try:
214+
response = self.session.get(
215+
"https://opensky-network.org/api/states/all",
216+
params=params,
217+
headers=headers,
218+
timeout=10
219+
)
220+
221+
if response.status_code == 429:
222+
raise RateLimitError("API rate limit exceeded")
223+
response.raise_for_status()
224+
225+
return response
226+
227+
except requests.exceptions.RequestException as e:
228+
error_msg = f"API request failed: {str(e)}"
229+
if isinstance(e, requests.exceptions.Timeout):
230+
error_msg = "API request timed out"
231+
elif isinstance(e, requests.exceptions.ConnectionError):
232+
error_msg = "Connection error occurred"
233+
raise OpenSkyAPIError(error_msg) from e
234+
235+
def valid_state(self, state: List) -> bool:
236+
"""Validate state data"""
237+
if not state or len(state) < 17:
238+
return False
239+
240+
return (state[0] is not None and # icao24
241+
state[5] is not None and # longitude
242+
state[6] is not None) # latitude
243+
244+
def parse_state(self, state: List, timestamp: int) -> Tuple:
245+
"""Parse state data with safe type conversion"""
246+
def safe_float(value: Any) -> Optional[float]:
247+
try:
248+
return float(value) if value is not None else None
249+
except (ValueError, TypeError):
250+
return None
251+
252+
def safe_int(value: Any) -> Optional[int]:
253+
try:
254+
return int(value) if value is not None else None
255+
except (ValueError, TypeError):
256+
return None
257+
258+
def safe_bool(value: Any) -> Optional[bool]:
259+
return bool(value) if value is not None else None
260+
261+
return (
262+
datetime.fromtimestamp(timestamp, tz=timezone.utc),
263+
state[0], # icao24
264+
state[1], # callsign
265+
state[2], # origin_country
266+
datetime.fromtimestamp(state[3], tz=timezone.utc),
267+
datetime.fromtimestamp(state[4], tz=timezone.utc),
268+
safe_float(state[5]), # longitude
269+
safe_float(state[6]), # latitude
270+
safe_float(state[7]), # geo_altitude
271+
safe_bool(state[8]), # on_ground
272+
safe_float(state[9]), # velocity
273+
safe_float(state[10]), # true_track
274+
safe_float(state[11]), # vertical_rate
275+
state[12], # sensors
276+
safe_float(state[13]), # baro_altitude
277+
state[14], # squawk
278+
safe_bool(state[15]), # spi
279+
safe_int(state[16]) # category
280+
)
281+
282+
def readBetweenOffsets(self, start: Dict[str, int], end: Dict[str, int]) -> Iterator[Tuple]:
283+
data, _ = self.read(start)
284+
return iter(data)
285+
286+
def read(self, start: Dict[str, int]) -> Tuple[List[Tuple], Dict[str, int]]:
287+
"""Read states with error handling and backoff"""
288+
try:
289+
response = self._fetch_states()
290+
data = response.json()
291+
292+
valid_states = [
293+
self.parse_state(s, data['time'])
294+
for s in data.get('states', [])
295+
if self.valid_state(s)
296+
]
297+
298+
return (
299+
valid_states,
300+
{'last_fetch': data.get('time', int(time.time()))}
301+
)
302+
303+
except OpenSkyAPIError as e:
304+
print(f"OpenSky API Error: {str(e)}")
305+
return ([], start)
306+
except Exception as e:
307+
print(f"Unexpected error: {str(e)}")
308+
return ([], start)
309+
310+
class OpenSkyDataSource(DataSource):
311+
def __init__(self, options: Dict[str, str] = None):
312+
super().__init__(options or {})
313+
self.options = options or {}
314+
315+
if 'client_id' in self.options and not self.options.get('client_secret'):
316+
raise ValueError("client_secret must be provided when client_id is set")
317+
318+
if 'region' in self.options and self.options['region'].upper() not in Region.__members__:
319+
raise ValueError(f"Invalid region. Must be one of: {', '.join(Region.__members__.keys())}")
320+
321+
@classmethod
322+
def name(cls) -> str:
323+
return DS_NAME
324+
325+
def schema(self) -> StructType:
326+
return StructType([
327+
StructField("time_ingest", TimestampType()),
328+
StructField("icao24", StringType()),
329+
StructField("callsign", StringType()),
330+
StructField("origin_country", StringType()),
331+
StructField("time_position", TimestampType()),
332+
StructField("last_contact", TimestampType()),
333+
StructField("longitude", DoubleType()),
334+
StructField("latitude", DoubleType()),
335+
StructField("geo_altitude", DoubleType()),
336+
StructField("on_ground", BooleanType()),
337+
StructField("velocity", DoubleType()),
338+
StructField("true_track", DoubleType()),
339+
StructField("vertical_rate", DoubleType()),
340+
StructField("sensors", ArrayType(IntegerType())),
341+
StructField("baro_altitude", DoubleType()),
342+
StructField("squawk", StringType()),
343+
StructField("spi", BooleanType()),
344+
StructField("category", IntegerType())
345+
])
346+
347+
def simpleStreamReader(self, schema: StructType) -> OpenSkyStreamReader:
348+
return OpenSkyStreamReader(schema, self.options)
349+
350+
spark.dataSource.register(OpenSkyDataSource)

0 commit comments

Comments
 (0)