Introduction
Part 2: GitHub OAuth is out on https://johnowolabiidogun.dev/blog/building-an-ai-powered-financial-behavior-analyzer-with-nodejs-python-sveltekit-and-tailwindcss-part-2-github-oauth-5195c7/67a06839edc9a2f58232fbf8
To kick off this intriguing and challenging journey, we'll start by building the AI service, the final component of our system’s architecture. We'll exploit the power of asynchronous programming in Python via the asyncio
and one of its most lightweight yet powerful asynchronous HTTP Client/Servers, aiohttp
. It is a "double-edged" sword that can be your http/websocket client and at the same time, http/websocket server. Unlike requests
, which is synchronous, aiohttp
supports both HTTP and WebSocket communication, making it a worthy alternative to FastAPI
, Flask
, or even Django
for async-based applications.
Let's put aiohttp to work in this article! 🚀
Prerequisite
Before diving in, I assume you've already:
- [x] Created a project (e.g.,
utility
) - [x] Set up a virtual environment
- [x] Installed the required dependencies
If you haven't, copy the following into requirements.txt
:
aiohttp==3.11 pandas==2.2 pdf2image==1.17 pytesseract==0.3 transformers==4.48 python-dotenv==1.0 scikit-learn==1.6 torch @ https://download.pytorch.org/whl/cpu/torch-2.5.1%2Bcpu-cp312-cp312-linux_x86_64.whl torchvision @ https://download.pytorch.org/whl/cpu/torchvision-0.20.1%2Bcpu-cp312-cp312-linux_x86_64.whl
and, while in your project's folder's virtual environment, run:
(virtualenv)$ pip install -r requirements.txt
Notice the versions of torch
and torchvision
used here. I used CPU-only versions of torch
and torchvision
since I deployed this on a CPU-based server. If you have a GPU-enabled server or development environment, you can install the standard versions without the URL specifier (@ https://download.pytorch.org/whl/...
) which, by the way, is a nifty way to include libraries' URL in requirements.txt
.
tourch
(v2.6). Just as I write this, The PyTorch Foundation announced the release of PyTorch® 2.6 which debuts it for Python 3.13 with some performance improvements. Consider using it instead.
Source code
Sirneij / finance-analyzer
An AI-powered financial behavior analyzer and advisor written in Python (aiohttp) and TypeScript (ExpressJS & SvelteKit with Svelte 5)
Implementation
Let's now go into the meat of this article.
Step 1: Spawning an aiohttp
Server
Create app.py
and populate it with the following:
import os from asyncio import Lock from aiohttp import WSMsgType, web from aiohttp.web import Request, Response, WebSocketResponse from utils.analyzer import analyze_transactions from utils.extract_text import extract_text_from_pdf from utils.settings import base_settings from utils.summarize import summarize_transactions from utils.websocket import WebSocketManager # Replace global ws_connections with typed version ws_connections: set[WebSocketResponse] = set() ws_lock = Lock() async def start_background_tasks(app): """Initialize application background tasks.""" app['ws_connections'] = ws_connections app['ws_lock'] = ws_lock async def cleanup_background_tasks(app): """Cleanup application resources.""" await cleanup_ws(app) async def cleanup_ws(app): """Cleanup WebSocket connections on shutdown.""" async with ws_lock: connections = set(ws_connections) # Create a copy to iterate safely for ws in connections: await ws.close(code=WSMsgType.CLOSE, message='Server shutdown') ws_connections.clear() async def extract_text(request: Request) -> Response: try: base_settings.logger.info('Received text extraction request') reader = await request.multipart() field = await reader.next() if field.name != 'file': base_settings.logger.warning('No file field in request') return web.json_response({'error': 'No file uploaded'}, status=400) # Read the file content base_settings.logger.info('Reading uploaded file') file_content: bytes = await field.read() text: str = await extract_text_from_pdf(file_content) base_settings.logger.info('Successfully processed request') return web.json_response({'text': text}) except Exception as e: base_settings.logger.error( f'Request processing failed: {str(e)}', exc_info=True ) return web.json_response({'error': str(e)}, status=500) async def websocket_handler(request: Request) -> WebSocketResponse: """WebSocket handler for real-time communication.""" ws = web.WebSocketResponse() await ws.prepare(request) async with ws_lock: ws_connections.add(ws) ws_manager = WebSocketManager(ws) await ws_manager.prepare() base_settings.logger.info('WebSocket connection established') try: async for msg in ws: if msg.type == WSMsgType.TEXT: try: data = msg.json() if data.get('action') == 'analyze': result = await analyze_transactions( data.get('transactions'), ws_manager ) await ws_manager.send_progress( 'Analysis complete', 1.0, 'Analysis' ) await ws_manager.send_result( result, 'Analysis', 'analysis_complete' ) elif data.get('action') == 'summary': result = await summarize_transactions( data.get('transactions'), ws_manager ) await ws_manager.send_progress( 'Summary complete', 1.0, 'Summarize' ) await ws_manager.send_result( result, 'Summarize', 'summary_complete' ) else: await ws_manager.send_result( {'message': 'Unknown action'}, 'Error', 'error' ) except Exception as e: base_settings.logger.error(f'Message processing error: {str(e)}') await ws_manager.send_result({'error': str(e)}, 'Error', 'error') elif msg.type == WSMsgType.ERROR: base_settings.logger.error(f'WebSocket error: {ws.exception()}') finally: async with ws_lock: ws_connections.remove(ws) await ws.close() base_settings.logger.info('WebSocket connection closed') return ws def init_app() -> web.Application: app = web.Application() app.router.add_post('/extract-text', extract_text) app.router.add_get('/ws', websocket_handler) # Add startup/cleanup handlers app.on_startup.append(start_background_tasks) app.on_cleanup.append(cleanup_background_tasks) return app if __name__ == '__main__': app = init_app() try: web.run_app( app, host='0.0.0.0', port=int(os.environ.get('PORT', 5173)), ) except KeyboardInterrupt: base_settings.logger.info('Received keyboard interrupt...') except Exception as e: base_settings.logger.error(f'Server error: {e}') finally: base_settings.logger.info('Server shutdown complete.')
It was a long line of code. However, looking closely, it's very straightforward. We started out registering all our WebSocket connections in a set (to prevent duplicates). We could have done something like as suggested:
... import weakref ... ws_connections = web.AppKey('ws_connections', weakref.WeakSet) ...
but I went for the low-level approach and controlled the startup, and async locks to ensure thread safety, cleaning up, and stuff like that. You can use the suggested approach instead. Then comes the extract_text
route handler. It's the handler that receives the user's transactions PDF file, forces that file
is its name (this is my preference, you can allow any file name), and then let extract_text_from_pdf
do its magic in extracting texts from the file and response with the extracted text. Let's see how extract_text_from_pdf
can do this.
Step 2: Utility package - extract_text.py
import os import tempfile import pytesseract from pdf2image import convert_from_path from .settings import base_settings as settings async def extract_text_from_pdf(pdf_file: bytes) -> str: settings.logger.info('Starting PDF text extraction') # Create temporary file to store uploaded PDF with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_pdf: tmp_pdf.write(pdf_file) tmp_path: str = tmp_pdf.name settings.logger.debug(f'Created temporary file: {tmp_path}') try: # Convert to image using resolution 600 dpi settings.logger.info('Converting PDF to images') pages = convert_from_path(tmp_path) settings.logger.debug(f'Converted {len(pages)} pages') # Extract text text_data: str = '' for i, page in enumerate(pages, 1): settings.logger.debug(f'Processing page {i}') text: str = pytesseract.image_to_string(page) text_data += text + '\n' settings.logger.info('Text extraction completed successfully') return text_data except Exception as e: settings.logger.error(f'Error during text extraction: {str(e)}', exc_info=True) raise finally: # Clean up temporary file settings.logger.debug(f'Cleaning up temporary file: {tmp_path}') os.unlink(tmp_path)
It uses Google OCR via the pytesseract
library to extract texts from the file. Although, OCR should be able to get texts directly from any document but it worked better for me with images via image_to_string
hence the intermediate step of converting the PDF file to images before feeding it into OCR. The temporary file creation was because convert_from_path
requires a path that could only be gotten from a temporary file at this point since we don't want to save anyone's file on our end. A better option is to use the convert_from_bytes
from pdf2image
which doesn't require temporary file creation.
Back to app.py
, we also have a WebSocket handler that simply delegates actions to various submodules to handle. But before then, I have a utility that helps manage all the app's WebSocket connections. This is to elegantly manage connections without worrying about leaks and stuff like that.
Step 3: Utility package - websocket.py
from aiohttp import web from utils.settings import base_settings class WebSocketManager: def __init__(self, ws: web.WebSocketResponse): self.ws = ws self._ready = False base_settings.logger.info(f'Initializing WebSocket manager: {ws}') async def prepare(self): self._ready = True base_settings.logger.info('WebSocket manager ready') async def send_progress(self, message: str, progress: float, task_type: str = None): if not self._ready or self.ws.closed: base_settings.logger.warning( 'Cannot send progress - WebSocket not ready/closed' ) return try: await self.ws.send_json( { 'action': 'progress', 'message': message, 'progress': progress, 'taskType': task_type, } ) except Exception as e: base_settings.logger.error(f'Error sending progress: {str(e)}') async def send_result(self, result: dict, task_type: str, action: str): if not self._ready or self.ws.closed: base_settings.logger.warning( 'Cannot send result - WebSocket not ready/closed' ) return try: await self.ws.send_json( { 'action': action, 'result': result, 'taskType': task_type, } ) except Exception as e: base_settings.logger.error(f'Error sending result: {str(e)}')
The app would well work without it but I encountered a bug where connections were not ready but requests were already being made by the frontend. So I figured having this utility would help to ensure the readiness of these connections before accepting requests. I also implemented methods, send_result
and send_progress
, to utilize aiohttp's WebSocket's send_json
to send analysis results and analysis progress reports respectively back to the requester(s) so that they won't be kept in the dark if the analysis is taking long (which will on a CPU-only machine using many transformer models). After the custom WebSocket manager completes preparation, we loop over the messages received and appropriately delegate actions based on the action
key. For now, we support analyze
and summary
actions via the analyze_transactions
and summarize_transactions
respectively. Let's see what they do.
Step 4: Utility package - analyzer.py
import asyncio import os from datetime import datetime import numpy as np import pandas as pd import torch from sklearn.ensemble import IsolationForest from transformers import pipeline from models.base import Transaction from utils.settings import base_settings as settings from utils.websocket import WebSocketManager def get_device() -> tuple[torch.device, str]: """ Detect the best available device (GPU, MPS, or CPU) for PyTorch computations. """ if torch.cuda.is_available(): # Check if CUDA (NVIDIA GPU) is available return torch.device('cuda'), 'CUDA (NVIDIA GPU)' elif torch.backends.mps.is_available(): # Check if MPS (Metal Performance Shaders on Apple Silicon) is available return torch.device('mps'), 'MPS (Apple Metal)' else: # Default to CPU return torch.device('cpu'), 'CPU' async def analyze_transactions( transactions: list[dict], ws_manager: WebSocketManager = None ) -> dict: """Analyze transactions and return insights with progress updates.""" try: # Step 1: Validate and preprocess transactions if ws_manager: await ws_manager.send_progress( 'Validating transactions...', 0.1, 'Analysis' ) if not transactions: if ws_manager: await ws_manager.send_progress( 'No transactions provided', 1.0, 'Summarize' ) return {'error': 'No transactions provided'} tx_objects = [ Transaction( _id=t['_id'], balance=float(t['balance']), type=t['type'], date=datetime.fromisoformat(t['date']), description=t['description'], amount=float(t['amount']), userId=t['userId'], createdAt=datetime.fromisoformat(t['createdAt']), updatedAt=datetime.fromisoformat(t['updatedAt']), ) for t in transactions if validate_transaction(t) ] if not tx_objects: if ws_manager: await ws_manager.send_progress( 'No valid transactions provided', 1.0, 'Analysis' ) return {'error': 'No valid transactions provided'} # Step 2: Classification if ws_manager: await ws_manager.send_progress( 'Classifying transactions...', 0.2, 'Analysis' ) categories = await classify_transactions(tx_objects) # Step 3: Anomaly Detection if ws_manager: await ws_manager.send_progress('Detecting anomalies...', 0.4, 'Analysis') anomalies = await detect_anomalies(tx_objects) # Step 4: Spending Analysis if ws_manager: await ws_manager.send_progress('Analyzing spending...', 0.6, 'Analysis') spending_analysis = await analyze_spending(tx_objects) # Step 5: Trend Prediction if ws_manager: await ws_manager.send_progress( 'Predicting spending trends...', 0.8, 'Analysis' ) spending_trends = await predict_trends(tx_objects) # Compile the results result = { 'categories': categories, 'anomalies': anomalies, 'spending_analysis': spending_analysis, 'spending_trends': spending_trends, } settings.logger.info('Transaction analysis completed successfully') return result except Exception as e: settings.logger.error(f'Error analyzing transactions: {str(e)}', exc_info=True) if ws_manager: await ws_manager.send_progress('Analysis failed', 1.0) return {'error': f'Analysis failed: {str(e)}'} def validate_transaction(t: dict) -> bool: """Validate transaction fields.""" try: required_fields = {'date', 'description', 'amount', 'balance', 'type', 'userId'} if not all(field in t for field in required_fields): return False # Validate amount and balance are numeric float(t['amount']) float(t['balance']) # Validate date formats datetime.fromisoformat(t['date']) return True except (ValueError, TypeError): return False async def classify_transactions(transactions: list[Transaction]) -> dict: """ Classify transactions using FinBERT. """ # Get device (GPU if available, otherwise CPU) device, device_name = get_device() settings.logger.info(f'Using device for classification: {device_name}') # Load FinBERT classification pipeline classifier = pipeline( 'zero-shot-classification', model='yiyanghkust/finbert-tone', device=0 if device.type in ['cuda', 'mps'] else -1, # Use GPU if available ) # Define financial categories labels = os.getenv( 'LABELS', 'groceries,housing,transportation,entertainment,utilities,other' ).split(',') # Prepare transaction descriptions for classification descriptions = [tx.description.lower() for tx in transactions] # Batch classify descriptions (Improves performance) results = await asyncio.to_thread( classifier, descriptions, labels, truncation=True, max_length=128 ) # Initialize categories and percentages categories = {label: 0 for label in labels} # Aggregate classification results for tx, result in zip(transactions, results): category = result['labels'][0] categories[category] += abs(tx.amount) total_spent = sum(categories.values()) # Calculate percentages percentages = { category: (amount / total_spent) * 100 if total_spent > 0 else 0 for category, amount in categories.items() } return {'categories': categories, 'percentages': percentages} async def detect_anomalies(transactions: list[Transaction]) -> list[dict]: """Detect anomalies in transactions and provide specific reasons.""" # Extract transaction amounts and reshape for Isolation Forest amounts = np.array([tx.amount for tx in transactions]).reshape(-1, 1) model = IsolationForest(contamination=0.05, random_state=42) anomalies = model.fit_predict(amounts) # Calculate mean and standard deviation for dynamic reason generation mean = np.mean(amounts) std = np.std(amounts) # Detect anomalies and generate specific reasons anomaly_details = [] for tx, anomaly in zip(transactions, anomalies): if anomaly == -1: # Anomaly detected # Calculate the Z-score for the transaction z_score = (tx.amount - mean) / std # Generate a dynamic reason if tx.amount > 0: reason = ( f'Unusually high income of {tx.amount} detected ' f'(Z-score: {z_score:.2f}).' ) elif tx.amount < 0 and abs(tx.amount) > abs(mean) + 2 * std: reason = ( f'Unusually large expense of {tx.amount} detected ' f'(Z-score: {z_score:.2f}).' ) elif tx.amount < 0 and 'luxury' in tx.description.lower(): reason = 'Uncommon luxury expense detected.' elif tx.amount < 0 and 'groceries' in tx.description.lower(): reason = 'Unusually high grocery expense detected.' else: reason = f'Outlier transaction with amount {tx.amount} (Z-score: {z_score:.2f}).' # Append the anomaly details anomaly_details.append( { 'date': tx.date.isoformat(), 'description': tx.description, 'amount': tx.amount, 'reason': reason, } ) return anomaly_details async def analyze_spending(transactions: list[Transaction]) -> dict: """Generate spending analysis with cumulative balance""" total_spent = sum(tx.amount for tx in transactions if tx.amount < 0) total_income = sum(tx.amount for tx in transactions if tx.amount > 0) # Create a DataFrame from transactions df = pd.DataFrame([t.__dict__ for t in transactions]) # Ensure date parsing is correct df['date'] = pd.to_datetime(df['date']) df['date'] = df['date'].dt.tz_localize(None) # Group by the date and calculate daily totals daily_summary = df.groupby(df['date'].dt.date)['amount'].sum() # Sort by date to ensure cumulative calculations are correct df = df.sort_values(by='date') # Calculate the cumulative balance df['cumulative_balance'] = df['balance'] # Convert daily_summary to JSON-serializable format daily_summary = {str(date): float(amount) for date, amount in daily_summary.items()} # Prepare cumulative balance as JSON-serializable format cumulative_balance = { row['date'].strftime('%Y-%m-%d'): row['cumulative_balance'] for _, row in df.iterrows() } return { 'total_spent': abs(total_spent), 'total_income': total_income, 'savings_rate': ( ((total_income + total_spent) / total_income) * 100 if total_income else 0 ), 'daily_summary': daily_summary, 'cumulative_balance': cumulative_balance, } async def predict_trends(transactions: list[Transaction]) -> dict: """Predict future spending trends with enhanced analysis.""" if len(transactions) < 2: return {'trend': 'Not enough data'} # Convert dates to numeric for regression dates = [(tx.date - transactions[0].date).days for tx in transactions] amounts = [tx.amount for tx in transactions] # Linear regression for trends coeffs = np.polyfit(dates, amounts, 1) trend = 'increasing' if coeffs[0] > 0 else 'decreasing' # Include confidence interval (optional) slope, intercept = coeffs trend_line = [slope * x + intercept for x in dates] # Estimated monthly spend df = pd.DataFrame([t.__dict__ for t in transactions]) df['date'] = pd.to_datetime(df['date']) df['date'] = df['date'].dt.tz_localize(None) months = len(df['date'].dt.to_period('M').unique()) return { 'trend': trend, 'trend_slope': slope, 'estimated_monthly_spend': abs( sum(tx.amount for tx in transactions if tx.amount < 0) ) / (months or 1), }
It's full of async functions that handle specific cases in the analysis (not minding transaction validation and device detection logic). A particular function of interest is the classify_transactions
which uses yiyanghkust/finbert-tone
, A Large Language Model for Extracting Information from Financial Text, to perform Zero-Shot Classification of the transactions mostly into:
LABELS=groceries,school,housing,transportation,gadgets,entertainment,utilities,credit cards,miscellaneous,dining out,healthcare,insurance,savings,investments,childcare,travel,personal care,debts,charity,taxes,subscriptions,streaming services,home maintenance,shopping,pets,fitness,hobbies,gifts
Realistically, it'd be more accurate to finetune a barebone Bidirectional Encoder Representations from Transformers (BERT) such as RoBERTa
, DistilBERT
, and co with our data but my excuse was lack of adequate data and GPU (Google Colab isn't enough).
Tip: A nice way to collaborate
If you want to explore the idea behind finetuning and barebone Natural Language Processing with transformers, we can collaborate to create something like FinBERT
but for account statements and co. Please reach out.
There are other functions for detecting incomes/expenses that are "abnormal" by using IsolationForest and Z-score for the detection and for generating human-readable reasons for such a detection. Cool stuff!!! There were also functions for spending analysis and stuff but they are pretty basic.
Note: amount
assumption
All the analysis here assumed that income
has a positive amount while expenses
possess negative amounts. If your data is different from this assumption, you may need to modify the code or data depending on your preference.
Before we roundup, let's see what utils/summarize.py
is
Step 5: Utility package - summarize.py
from datetime import datetime import pandas as pd from models.base import Transaction from utils.analyzer import validate_transaction from utils.settings import base_settings as settings from utils.websocket import WebSocketManager async def summarize_transactions( transactions: list[dict], ws_manager: WebSocketManager = None ) -> dict: """Summarize transaction data.""" try: # Validate and convert transactions to objects if ws_manager: await ws_manager.send_progress( 'Validating transactions...', 0.1, 'Summarize' ) if not transactions: if ws_manager: await ws_manager.send_progress( 'No transactions provided', 1.0, 'Summarize' ) return {'error': 'No transactions provided'} tx_objects = [ Transaction( _id=t['_id'], balance=float(t['balance']), type=t['type'], date=datetime.fromisoformat(t['date']), description=t['description'], amount=float(t['amount']), userId=t['userId'], createdAt=datetime.fromisoformat(t['createdAt']), updatedAt=datetime.fromisoformat(t['updatedAt']), ) for t in transactions if validate_transaction(t) ] if not tx_objects: settings.logger.warning('No valid transactions provided') if ws_manager: await ws_manager.send_progress( 'No valid transactions provided', 1.0, 'Summarize' ) return {'error': 'No valid transactions provided'} # Step 1: Calculate totals if ws_manager: await ws_manager.send_progress('Calculating totals...', 0.2, 'Summarize') total_spent = sum(tx.amount for tx in tx_objects if tx.amount < 0) total_income = sum(tx.amount for tx in tx_objects if tx.amount > 0) # Step 2: Calculate additional metrics if ws_manager: await ws_manager.send_progress('Calculating average..', 0.35, 'Summarize') total_savings = total_income + total_spent total_transactions = len(tx_objects) expense_count = sum(1 for tx in tx_objects if tx.amount < 0) income_count = sum(1 for tx in tx_objects if tx.amount > 0) avg_expense = abs(total_spent / expense_count) if expense_count > 0 else 0 avg_income = total_income / income_count if income_count > 0 else 0 # Step 3: Calculate largest transactions and date range if ws_manager: await ws_manager.send_progress( 'Identifying largest transactions...', 0.5, 'Summarize' ) start_date = min(tx.date for tx in tx_objects) end_date = max(tx.date for tx in tx_objects) largest_expense = min(tx.amount for tx in tx_objects if tx.amount < 0) largest_income = max(tx.amount for tx in tx_objects if tx.amount > 0) # Step 4: Generate monthly summaries if ws_manager: await ws_manager.send_progress( 'Generating monthly summaries...', 0.7, 'Summarize' ) df = pd.DataFrame([t.__dict__ for t in tx_objects]) df['date'] = pd.to_datetime(df['date']) monthly_summary = ( df.groupby(df['date'].dt.to_period('M'))['amount'].sum().to_dict() ) # Step 5: Analyze trends and changes if ws_manager: await ws_manager.send_progress('Analyzing trends...', 0.85, 'Summarize') monthly_income = ( df[df['amount'] > 0].groupby(df['date'].dt.to_period('M'))['amount'].sum() ) monthly_expense = ( df[df['amount'] < 0] .groupby(df['date'].dt.to_period('M'))['amount'] .sum() .abs() ) monthly_savings = (monthly_income - monthly_expense).fillna(0) income_trend = await calculate_trend(monthly_income) expense_trend = await calculate_trend(monthly_expense) savings_trend = await calculate_trend(monthly_savings) income_change = await calculate_percentage_change(monthly_income) expense_change = await calculate_percentage_change(monthly_expense) savings_change = await calculate_percentage_change(monthly_savings) savings_rate = ( ((total_income + total_spent) / total_income) * 100 if total_income else 0 ) monthly_summary = { str(month): { 'income': float(monthly_income.get(month, 0)), 'expenses': float(monthly_expense.get(month, 0)), 'savings': float(monthly_savings.get(month, 0)), } for month in monthly_income.index.union(monthly_expense.index) } # Compile summary results summary = { 'income': { 'total': total_income, 'trend': income_trend, 'change': income_change, }, 'expenses': { 'total': abs(total_spent), 'trend': expense_trend, 'change': expense_change, }, 'savings': { 'total': total_savings, 'trend': savings_trend, 'change': savings_change, }, 'total_transactions': total_transactions, 'expense_count': expense_count, 'income_count': income_count, 'avg_expense': avg_expense, 'avg_income': avg_income, 'start_date': start_date.isoformat(), 'end_date': end_date.isoformat(), 'largest_expense': largest_expense, 'largest_income': largest_income, 'savings_rate': savings_rate, 'monthly_summary': monthly_summary, } settings.logger.info('Transaction summarization completed successfully') return summary except Exception as e: settings.logger.error(f'Error summarizing transactions: {str(e)}') if ws_manager: await ws_manager.send_progress('Summarization failed', 1.0) return {'error': f'Summarization failed: {str(e)}'} async def calculate_trend(monthly_data: pd.Series) -> str: """ Calculate trend ('up', 'down', 'neutral') based on monthly data. """ if len(monthly_data) < 2: return 'neutral' recent_avg = monthly_data[-2:].mean() earlier_avg = monthly_data[:-2].mean() if recent_avg > earlier_avg: return 'up' elif recent_avg < earlier_avg: return 'down' else: return 'neutral' async def calculate_percentage_change(monthly_data: pd.Series) -> float: """ Calculate the percentage change from the highest monthly value to the average of the last two months. """ if len(monthly_data) < 2: return 0 highest_value = monthly_data.max() recent_avg = monthly_data[-2:].mean() return ( ((recent_avg - highest_value) / highest_value) * 100 if highest_value != 0 else 0 )
Though long (due to repeated codes that could have been drafted into reusable functions), the function just does basic summaries of your transactions. It's comprehensive but can be well extended.
In both analyses, I endeavored to update connections with progress reports by sending progress
actions at intervals informing the user of the time left.
Now, in app.py
, using init_app
, we created an Application
instance and added routes to it. We could have used @route
decorator instead. We didn't forget to do some housekeeping (clean-ups) of the background tasks previously started whenever we quit the app via different interrupts. That's it!
Note: Undiscussed utilities
There were some submodules: utils/settings.py
, models/base.py
, etc, not discussed. They are basic setups for instrumentation (logging) and stuff. Others are for another system (maybe discussed in another series). In all, they are very basic.
Outro
Enjoyed this article? I'm a Software Engineer and Technical Writer actively seeking new opportunities, particularly in areas related to web security, finance, healthcare, and education. If you think my expertise aligns with your team's needs, let's chat! You can find me on LinkedIn and X. I am also an email away.
If you found this article valuable, consider sharing it with your network to help spread the knowledge!
Top comments (0)