AOGN is an Asynchronous OGN (Open Glider Network) client for modern Python. It connects to the APRS servers and receives the planes', gliders', receivers', etc. APRS messages, while still allowing you control of the program flow.
This can simplify programs since:
- There are no callback or blocking functions (e.g.
listen()
orrun_forever()
). - One script/thread/process can still do other useful out-of-band tasks like computing aggregate statistics and making web requests.
To interpret the raw OGN messages, use a function like python-ogn-client's ogn.parser.parse
.
pip install aogn
Basic example:
import asyncio import logging import sys import aogn logging.basicConfig( stream=sys.stdout, level=logging.DEBUG, format='%(asctime)s %(levelname)s %(module)s %(message)s', datefmt='%b %d %H:%M:%S', ) async def example() -> None: conn = aogn.Client(aprs_user='NO-CALL', ) try: while True: # Get the APRS packet once available: raw_message = await conn.packet() logging.debug(raw_message) except KeyboardInterrupt: logging.info('OGN Gateway stopped.') await conn.disconnect() if __name__ == '__main__': asyncio.run(example())
Concurrent example, with raw_message parsing:
import asyncio import logging import sys logging.basicConfig( stream=sys.stdout, level=logging.DEBUG, format='%(asctime)s %(levelname)s %(module)s %(message)s', datefmt='%b %d %H:%M:%S', ) from aogn import Client from ogn.parser import parse, ParseError def process_beacon(raw_message): beacon = {} try: beacon = parse(raw_message) except ParseError as err: logging.warning(f'ParseError: {err}') except NotImplementedError as err: logging.error(f'NotImplementedError: {err}') except AttributeError as err: logging.error(f'raw_message: {raw_message}') logging.error(f'beacon: {beacon}') logging.error(err) return beacon async def example() -> None: conn = Client(aprs_user='NO-CALL', ) # aprs_filter='t/s') try: while True: raw_message = await conn.packet() if raw_message: beacon = process_beacon(raw_message) except KeyboardInterrupt: logging.info('OGN Gateway stopped.') await conn.disconnect() async def another_io_function() -> None: import random while True: sleep_duration = 120 * random.random() logging.debug(f'Concurrently sleeping for {sleep_duration:.0f} seconds...') await asyncio.sleep(sleep_duration) async def main(): await asyncio.gather(example(), another_io_function()) if __name__ == '__main__': asyncio.run(main())