DEV Community

Cover image for Building an AI-powered Financial Behavior Analyzer with NodeJS, Python, SvelteKit, and TailwindCSS - Part 1: The AI Service
John Owolabi Idogun
John Owolabi Idogun

Posted on • Edited on • Originally published at johnowolabiidogun.dev

Building an AI-powered Financial Behavior Analyzer with NodeJS, Python, SvelteKit, and TailwindCSS - Part 1: The AI Service

Introduction

Part 2: GitHub OAuth is out on https://johnowolabiidogun.dev/blog/building-an-ai-powered-financial-behavior-analyzer-with-nodejs-python-sveltekit-and-tailwindcss-part-2-github-oauth-5195c7/67a06839edc9a2f58232fbf8

To kick off this intriguing and challenging journey, we'll start by building the AI service, the final component of our system’s architecture. We'll exploit the power of asynchronous programming in Python via the asyncio and one of its most lightweight yet powerful asynchronous HTTP Client/Servers, aiohttp. It is a "double-edged" sword that can be your http/websocket client and at the same time, http/websocket server. Unlike requests, which is synchronous, aiohttp supports both HTTP and WebSocket communication, making it a worthy alternative to FastAPI, Flask, or even Django for async-based applications.

Let's put aiohttp to work in this article! 🚀

Prerequisite

Before diving in, I assume you've already:

  • [x] Created a project (e.g., utility)
  • [x] Set up a virtual environment
  • [x] Installed the required dependencies

If you haven't, copy the following into requirements.txt:

aiohttp==3.11 pandas==2.2 pdf2image==1.17 pytesseract==0.3 transformers==4.48 python-dotenv==1.0 scikit-learn==1.6 torch @ https://download.pytorch.org/whl/cpu/torch-2.5.1%2Bcpu-cp312-cp312-linux_x86_64.whl torchvision @ https://download.pytorch.org/whl/cpu/torchvision-0.20.1%2Bcpu-cp312-cp312-linux_x86_64.whl 
Enter fullscreen mode Exit fullscreen mode

and, while in your project's folder's virtual environment, run:

(virtualenv)$ pip install -r requirements.txt 
Enter fullscreen mode Exit fullscreen mode

Notice the versions of torch and torchvision used here. I used CPU-only versions of torch and torchvision since I deployed this on a CPU-based server. If you have a GPU-enabled server or development environment, you can install the standard versions without the URL specifier (@ https://download.pytorch.org/whl/...) which, by the way, is a nifty way to include libraries' URL in requirements.txt.

Note: You may prefer to develop using the latest tourch (v2.6).

Just as I write this, The PyTorch Foundation announced the release of PyTorch® 2.6 which debuts it for Python 3.13 with some performance improvements. Consider using it instead.

Source code

GitHub logo Sirneij / finance-analyzer

An AI-powered financial behavior analyzer and advisor written in Python (aiohttp) and TypeScript (ExpressJS & SvelteKit with Svelte 5)

Implementation

Let's now go into the meat of this article.

Step 1: Spawning an aiohttp Server

Create app.py and populate it with the following:

import os from asyncio import Lock from aiohttp import WSMsgType, web from aiohttp.web import Request, Response, WebSocketResponse from utils.analyzer import analyze_transactions from utils.extract_text import extract_text_from_pdf from utils.settings import base_settings from utils.summarize import summarize_transactions from utils.websocket import WebSocketManager # Replace global ws_connections with typed version ws_connections: set[WebSocketResponse] = set() ws_lock = Lock() async def start_background_tasks(app): """Initialize application background tasks.""" app['ws_connections'] = ws_connections app['ws_lock'] = ws_lock async def cleanup_background_tasks(app): """Cleanup application resources.""" await cleanup_ws(app) async def cleanup_ws(app): """Cleanup WebSocket connections on shutdown.""" async with ws_lock: connections = set(ws_connections) # Create a copy to iterate safely  for ws in connections: await ws.close(code=WSMsgType.CLOSE, message='Server shutdown') ws_connections.clear() async def extract_text(request: Request) -> Response: try: base_settings.logger.info('Received text extraction request') reader = await request.multipart() field = await reader.next() if field.name != 'file': base_settings.logger.warning('No file field in request') return web.json_response({'error': 'No file uploaded'}, status=400) # Read the file content  base_settings.logger.info('Reading uploaded file') file_content: bytes = await field.read() text: str = await extract_text_from_pdf(file_content) base_settings.logger.info('Successfully processed request') return web.json_response({'text': text}) except Exception as e: base_settings.logger.error( f'Request processing failed: {str(e)}', exc_info=True ) return web.json_response({'error': str(e)}, status=500) async def websocket_handler(request: Request) -> WebSocketResponse: """WebSocket handler for real-time communication.""" ws = web.WebSocketResponse() await ws.prepare(request) async with ws_lock: ws_connections.add(ws) ws_manager = WebSocketManager(ws) await ws_manager.prepare() base_settings.logger.info('WebSocket connection established') try: async for msg in ws: if msg.type == WSMsgType.TEXT: try: data = msg.json() if data.get('action') == 'analyze': result = await analyze_transactions( data.get('transactions'), ws_manager ) await ws_manager.send_progress( 'Analysis complete', 1.0, 'Analysis' ) await ws_manager.send_result( result, 'Analysis', 'analysis_complete' ) elif data.get('action') == 'summary': result = await summarize_transactions( data.get('transactions'), ws_manager ) await ws_manager.send_progress( 'Summary complete', 1.0, 'Summarize' ) await ws_manager.send_result( result, 'Summarize', 'summary_complete' ) else: await ws_manager.send_result( {'message': 'Unknown action'}, 'Error', 'error' ) except Exception as e: base_settings.logger.error(f'Message processing error: {str(e)}') await ws_manager.send_result({'error': str(e)}, 'Error', 'error') elif msg.type == WSMsgType.ERROR: base_settings.logger.error(f'WebSocket error: {ws.exception()}') finally: async with ws_lock: ws_connections.remove(ws) await ws.close() base_settings.logger.info('WebSocket connection closed') return ws def init_app() -> web.Application: app = web.Application() app.router.add_post('/extract-text', extract_text) app.router.add_get('/ws', websocket_handler) # Add startup/cleanup handlers  app.on_startup.append(start_background_tasks) app.on_cleanup.append(cleanup_background_tasks) return app if __name__ == '__main__': app = init_app() try: web.run_app( app, host='0.0.0.0', port=int(os.environ.get('PORT', 5173)), ) except KeyboardInterrupt: base_settings.logger.info('Received keyboard interrupt...') except Exception as e: base_settings.logger.error(f'Server error: {e}') finally: base_settings.logger.info('Server shutdown complete.') 
Enter fullscreen mode Exit fullscreen mode

It was a long line of code. However, looking closely, it's very straightforward. We started out registering all our WebSocket connections in a set (to prevent duplicates). We could have done something like as suggested:

... import weakref ... ws_connections = web.AppKey('ws_connections', weakref.WeakSet) ... 
Enter fullscreen mode Exit fullscreen mode

but I went for the low-level approach and controlled the startup, and async locks to ensure thread safety, cleaning up, and stuff like that. You can use the suggested approach instead. Then comes the extract_text route handler. It's the handler that receives the user's transactions PDF file, forces that file is its name (this is my preference, you can allow any file name), and then let extract_text_from_pdf do its magic in extracting texts from the file and response with the extracted text. Let's see how extract_text_from_pdf can do this.

Step 2: Utility package - extract_text.py

import os import tempfile import pytesseract from pdf2image import convert_from_path from .settings import base_settings as settings async def extract_text_from_pdf(pdf_file: bytes) -> str: settings.logger.info('Starting PDF text extraction') # Create temporary file to store uploaded PDF  with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_pdf: tmp_pdf.write(pdf_file) tmp_path: str = tmp_pdf.name settings.logger.debug(f'Created temporary file: {tmp_path}') try: # Convert to image using resolution 600 dpi  settings.logger.info('Converting PDF to images') pages = convert_from_path(tmp_path) settings.logger.debug(f'Converted {len(pages)} pages') # Extract text  text_data: str = '' for i, page in enumerate(pages, 1): settings.logger.debug(f'Processing page {i}') text: str = pytesseract.image_to_string(page) text_data += text + '\n' settings.logger.info('Text extraction completed successfully') return text_data except Exception as e: settings.logger.error(f'Error during text extraction: {str(e)}', exc_info=True) raise finally: # Clean up temporary file  settings.logger.debug(f'Cleaning up temporary file: {tmp_path}') os.unlink(tmp_path) 
Enter fullscreen mode Exit fullscreen mode

It uses Google OCR via the pytesseract library to extract texts from the file. Although, OCR should be able to get texts directly from any document but it worked better for me with images via image_to_string hence the intermediate step of converting the PDF file to images before feeding it into OCR. The temporary file creation was because convert_from_path requires a path that could only be gotten from a temporary file at this point since we don't want to save anyone's file on our end. A better option is to use the convert_from_bytes from pdf2image which doesn't require temporary file creation.

Back to app.py, we also have a WebSocket handler that simply delegates actions to various submodules to handle. But before then, I have a utility that helps manage all the app's WebSocket connections. This is to elegantly manage connections without worrying about leaks and stuff like that.

Step 3: Utility package - websocket.py

from aiohttp import web from utils.settings import base_settings class WebSocketManager: def __init__(self, ws: web.WebSocketResponse): self.ws = ws self._ready = False base_settings.logger.info(f'Initializing WebSocket manager: {ws}') async def prepare(self): self._ready = True base_settings.logger.info('WebSocket manager ready') async def send_progress(self, message: str, progress: float, task_type: str = None): if not self._ready or self.ws.closed: base_settings.logger.warning( 'Cannot send progress - WebSocket not ready/closed' ) return try: await self.ws.send_json( { 'action': 'progress', 'message': message, 'progress': progress, 'taskType': task_type, } ) except Exception as e: base_settings.logger.error(f'Error sending progress: {str(e)}') async def send_result(self, result: dict, task_type: str, action: str): if not self._ready or self.ws.closed: base_settings.logger.warning( 'Cannot send result - WebSocket not ready/closed' ) return try: await self.ws.send_json( { 'action': action, 'result': result, 'taskType': task_type, } ) except Exception as e: base_settings.logger.error(f'Error sending result: {str(e)}') 
Enter fullscreen mode Exit fullscreen mode

The app would well work without it but I encountered a bug where connections were not ready but requests were already being made by the frontend. So I figured having this utility would help to ensure the readiness of these connections before accepting requests. I also implemented methods, send_result and send_progress, to utilize aiohttp's WebSocket's send_json to send analysis results and analysis progress reports respectively back to the requester(s) so that they won't be kept in the dark if the analysis is taking long (which will on a CPU-only machine using many transformer models). After the custom WebSocket manager completes preparation, we loop over the messages received and appropriately delegate actions based on the action key. For now, we support analyze and summary actions via the analyze_transactions and summarize_transactions respectively. Let's see what they do.

Step 4: Utility package - analyzer.py

import asyncio import os from datetime import datetime import numpy as np import pandas as pd import torch from sklearn.ensemble import IsolationForest from transformers import pipeline from models.base import Transaction from utils.settings import base_settings as settings from utils.websocket import WebSocketManager def get_device() -> tuple[torch.device, str]: """ Detect the best available device (GPU, MPS, or CPU) for PyTorch computations. """ if torch.cuda.is_available(): # Check if CUDA (NVIDIA GPU) is available  return torch.device('cuda'), 'CUDA (NVIDIA GPU)' elif torch.backends.mps.is_available(): # Check if MPS (Metal Performance Shaders on Apple Silicon) is available  return torch.device('mps'), 'MPS (Apple Metal)' else: # Default to CPU  return torch.device('cpu'), 'CPU' async def analyze_transactions( transactions: list[dict], ws_manager: WebSocketManager = None ) -> dict: """Analyze transactions and return insights with progress updates.""" try: # Step 1: Validate and preprocess transactions  if ws_manager: await ws_manager.send_progress( 'Validating transactions...', 0.1, 'Analysis' ) if not transactions: if ws_manager: await ws_manager.send_progress( 'No transactions provided', 1.0, 'Summarize' ) return {'error': 'No transactions provided'} tx_objects = [ Transaction( _id=t['_id'], balance=float(t['balance']), type=t['type'], date=datetime.fromisoformat(t['date']), description=t['description'], amount=float(t['amount']), userId=t['userId'], createdAt=datetime.fromisoformat(t['createdAt']), updatedAt=datetime.fromisoformat(t['updatedAt']), ) for t in transactions if validate_transaction(t) ] if not tx_objects: if ws_manager: await ws_manager.send_progress( 'No valid transactions provided', 1.0, 'Analysis' ) return {'error': 'No valid transactions provided'} # Step 2: Classification  if ws_manager: await ws_manager.send_progress( 'Classifying transactions...', 0.2, 'Analysis' ) categories = await classify_transactions(tx_objects) # Step 3: Anomaly Detection  if ws_manager: await ws_manager.send_progress('Detecting anomalies...', 0.4, 'Analysis') anomalies = await detect_anomalies(tx_objects) # Step 4: Spending Analysis  if ws_manager: await ws_manager.send_progress('Analyzing spending...', 0.6, 'Analysis') spending_analysis = await analyze_spending(tx_objects) # Step 5: Trend Prediction  if ws_manager: await ws_manager.send_progress( 'Predicting spending trends...', 0.8, 'Analysis' ) spending_trends = await predict_trends(tx_objects) # Compile the results  result = { 'categories': categories, 'anomalies': anomalies, 'spending_analysis': spending_analysis, 'spending_trends': spending_trends, } settings.logger.info('Transaction analysis completed successfully') return result except Exception as e: settings.logger.error(f'Error analyzing transactions: {str(e)}', exc_info=True) if ws_manager: await ws_manager.send_progress('Analysis failed', 1.0) return {'error': f'Analysis failed: {str(e)}'} def validate_transaction(t: dict) -> bool: """Validate transaction fields.""" try: required_fields = {'date', 'description', 'amount', 'balance', 'type', 'userId'} if not all(field in t for field in required_fields): return False # Validate amount and balance are numeric  float(t['amount']) float(t['balance']) # Validate date formats  datetime.fromisoformat(t['date']) return True except (ValueError, TypeError): return False async def classify_transactions(transactions: list[Transaction]) -> dict: """ Classify transactions using FinBERT. """ # Get device (GPU if available, otherwise CPU)  device, device_name = get_device() settings.logger.info(f'Using device for classification: {device_name}') # Load FinBERT classification pipeline  classifier = pipeline( 'zero-shot-classification', model='yiyanghkust/finbert-tone', device=0 if device.type in ['cuda', 'mps'] else -1, # Use GPU if available  ) # Define financial categories  labels = os.getenv( 'LABELS', 'groceries,housing,transportation,entertainment,utilities,other' ).split(',') # Prepare transaction descriptions for classification  descriptions = [tx.description.lower() for tx in transactions] # Batch classify descriptions (Improves performance)  results = await asyncio.to_thread( classifier, descriptions, labels, truncation=True, max_length=128 ) # Initialize categories and percentages  categories = {label: 0 for label in labels} # Aggregate classification results  for tx, result in zip(transactions, results): category = result['labels'][0] categories[category] += abs(tx.amount) total_spent = sum(categories.values()) # Calculate percentages  percentages = { category: (amount / total_spent) * 100 if total_spent > 0 else 0 for category, amount in categories.items() } return {'categories': categories, 'percentages': percentages} async def detect_anomalies(transactions: list[Transaction]) -> list[dict]: """Detect anomalies in transactions and provide specific reasons.""" # Extract transaction amounts and reshape for Isolation Forest  amounts = np.array([tx.amount for tx in transactions]).reshape(-1, 1) model = IsolationForest(contamination=0.05, random_state=42) anomalies = model.fit_predict(amounts) # Calculate mean and standard deviation for dynamic reason generation  mean = np.mean(amounts) std = np.std(amounts) # Detect anomalies and generate specific reasons  anomaly_details = [] for tx, anomaly in zip(transactions, anomalies): if anomaly == -1: # Anomaly detected  # Calculate the Z-score for the transaction  z_score = (tx.amount - mean) / std # Generate a dynamic reason  if tx.amount > 0: reason = ( f'Unusually high income of {tx.amount} detected ' f'(Z-score: {z_score:.2f}).' ) elif tx.amount < 0 and abs(tx.amount) > abs(mean) + 2 * std: reason = ( f'Unusually large expense of {tx.amount} detected ' f'(Z-score: {z_score:.2f}).' ) elif tx.amount < 0 and 'luxury' in tx.description.lower(): reason = 'Uncommon luxury expense detected.' elif tx.amount < 0 and 'groceries' in tx.description.lower(): reason = 'Unusually high grocery expense detected.' else: reason = f'Outlier transaction with amount {tx.amount} (Z-score: {z_score:.2f}).' # Append the anomaly details  anomaly_details.append( { 'date': tx.date.isoformat(), 'description': tx.description, 'amount': tx.amount, 'reason': reason, } ) return anomaly_details async def analyze_spending(transactions: list[Transaction]) -> dict: """Generate spending analysis with cumulative balance""" total_spent = sum(tx.amount for tx in transactions if tx.amount < 0) total_income = sum(tx.amount for tx in transactions if tx.amount > 0) # Create a DataFrame from transactions  df = pd.DataFrame([t.__dict__ for t in transactions]) # Ensure date parsing is correct  df['date'] = pd.to_datetime(df['date']) df['date'] = df['date'].dt.tz_localize(None) # Group by the date and calculate daily totals  daily_summary = df.groupby(df['date'].dt.date)['amount'].sum() # Sort by date to ensure cumulative calculations are correct  df = df.sort_values(by='date') # Calculate the cumulative balance  df['cumulative_balance'] = df['balance'] # Convert daily_summary to JSON-serializable format  daily_summary = {str(date): float(amount) for date, amount in daily_summary.items()} # Prepare cumulative balance as JSON-serializable format  cumulative_balance = { row['date'].strftime('%Y-%m-%d'): row['cumulative_balance'] for _, row in df.iterrows() } return { 'total_spent': abs(total_spent), 'total_income': total_income, 'savings_rate': ( ((total_income + total_spent) / total_income) * 100 if total_income else 0 ), 'daily_summary': daily_summary, 'cumulative_balance': cumulative_balance, } async def predict_trends(transactions: list[Transaction]) -> dict: """Predict future spending trends with enhanced analysis.""" if len(transactions) < 2: return {'trend': 'Not enough data'} # Convert dates to numeric for regression  dates = [(tx.date - transactions[0].date).days for tx in transactions] amounts = [tx.amount for tx in transactions] # Linear regression for trends  coeffs = np.polyfit(dates, amounts, 1) trend = 'increasing' if coeffs[0] > 0 else 'decreasing' # Include confidence interval (optional)  slope, intercept = coeffs trend_line = [slope * x + intercept for x in dates] # Estimated monthly spend  df = pd.DataFrame([t.__dict__ for t in transactions]) df['date'] = pd.to_datetime(df['date']) df['date'] = df['date'].dt.tz_localize(None) months = len(df['date'].dt.to_period('M').unique()) return { 'trend': trend, 'trend_slope': slope, 'estimated_monthly_spend': abs( sum(tx.amount for tx in transactions if tx.amount < 0) ) / (months or 1), } 
Enter fullscreen mode Exit fullscreen mode

It's full of async functions that handle specific cases in the analysis (not minding transaction validation and device detection logic). A particular function of interest is the classify_transactions which uses yiyanghkust/finbert-tone, A Large Language Model for Extracting Information from Financial Text, to perform Zero-Shot Classification of the transactions mostly into:

LABELS=groceries,school,housing,transportation,gadgets,entertainment,utilities,credit cards,miscellaneous,dining out,healthcare,insurance,savings,investments,childcare,travel,personal care,debts,charity,taxes,subscriptions,streaming services,home maintenance,shopping,pets,fitness,hobbies,gifts 
Enter fullscreen mode Exit fullscreen mode

Realistically, it'd be more accurate to finetune a barebone Bidirectional Encoder Representations from Transformers (BERT) such as RoBERTa, DistilBERT, and co with our data but my excuse was lack of adequate data and GPU (Google Colab isn't enough).

Tip: A nice way to collaborate

If you want to explore the idea behind finetuning and barebone Natural Language Processing with transformers, we can collaborate to create something like FinBERT but for account statements and co. Please reach out.

There are other functions for detecting incomes/expenses that are "abnormal" by using IsolationForest and Z-score for the detection and for generating human-readable reasons for such a detection. Cool stuff!!! There were also functions for spending analysis and stuff but they are pretty basic.

Note: amount assumption

All the analysis here assumed that income has a positive amount while expenses possess negative amounts. If your data is different from this assumption, you may need to modify the code or data depending on your preference.

Before we roundup, let's see what utils/summarize.py is

Step 5: Utility package - summarize.py

from datetime import datetime import pandas as pd from models.base import Transaction from utils.analyzer import validate_transaction from utils.settings import base_settings as settings from utils.websocket import WebSocketManager async def summarize_transactions( transactions: list[dict], ws_manager: WebSocketManager = None ) -> dict: """Summarize transaction data.""" try: # Validate and convert transactions to objects  if ws_manager: await ws_manager.send_progress( 'Validating transactions...', 0.1, 'Summarize' ) if not transactions: if ws_manager: await ws_manager.send_progress( 'No transactions provided', 1.0, 'Summarize' ) return {'error': 'No transactions provided'} tx_objects = [ Transaction( _id=t['_id'], balance=float(t['balance']), type=t['type'], date=datetime.fromisoformat(t['date']), description=t['description'], amount=float(t['amount']), userId=t['userId'], createdAt=datetime.fromisoformat(t['createdAt']), updatedAt=datetime.fromisoformat(t['updatedAt']), ) for t in transactions if validate_transaction(t) ] if not tx_objects: settings.logger.warning('No valid transactions provided') if ws_manager: await ws_manager.send_progress( 'No valid transactions provided', 1.0, 'Summarize' ) return {'error': 'No valid transactions provided'} # Step 1: Calculate totals  if ws_manager: await ws_manager.send_progress('Calculating totals...', 0.2, 'Summarize') total_spent = sum(tx.amount for tx in tx_objects if tx.amount < 0) total_income = sum(tx.amount for tx in tx_objects if tx.amount > 0) # Step 2: Calculate additional metrics  if ws_manager: await ws_manager.send_progress('Calculating average..', 0.35, 'Summarize') total_savings = total_income + total_spent total_transactions = len(tx_objects) expense_count = sum(1 for tx in tx_objects if tx.amount < 0) income_count = sum(1 for tx in tx_objects if tx.amount > 0) avg_expense = abs(total_spent / expense_count) if expense_count > 0 else 0 avg_income = total_income / income_count if income_count > 0 else 0 # Step 3: Calculate largest transactions and date range  if ws_manager: await ws_manager.send_progress( 'Identifying largest transactions...', 0.5, 'Summarize' ) start_date = min(tx.date for tx in tx_objects) end_date = max(tx.date for tx in tx_objects) largest_expense = min(tx.amount for tx in tx_objects if tx.amount < 0) largest_income = max(tx.amount for tx in tx_objects if tx.amount > 0) # Step 4: Generate monthly summaries  if ws_manager: await ws_manager.send_progress( 'Generating monthly summaries...', 0.7, 'Summarize' ) df = pd.DataFrame([t.__dict__ for t in tx_objects]) df['date'] = pd.to_datetime(df['date']) monthly_summary = ( df.groupby(df['date'].dt.to_period('M'))['amount'].sum().to_dict() ) # Step 5: Analyze trends and changes  if ws_manager: await ws_manager.send_progress('Analyzing trends...', 0.85, 'Summarize') monthly_income = ( df[df['amount'] > 0].groupby(df['date'].dt.to_period('M'))['amount'].sum() ) monthly_expense = ( df[df['amount'] < 0] .groupby(df['date'].dt.to_period('M'))['amount'] .sum() .abs() ) monthly_savings = (monthly_income - monthly_expense).fillna(0) income_trend = await calculate_trend(monthly_income) expense_trend = await calculate_trend(monthly_expense) savings_trend = await calculate_trend(monthly_savings) income_change = await calculate_percentage_change(monthly_income) expense_change = await calculate_percentage_change(monthly_expense) savings_change = await calculate_percentage_change(monthly_savings) savings_rate = ( ((total_income + total_spent) / total_income) * 100 if total_income else 0 ) monthly_summary = { str(month): { 'income': float(monthly_income.get(month, 0)), 'expenses': float(monthly_expense.get(month, 0)), 'savings': float(monthly_savings.get(month, 0)), } for month in monthly_income.index.union(monthly_expense.index) } # Compile summary results  summary = { 'income': { 'total': total_income, 'trend': income_trend, 'change': income_change, }, 'expenses': { 'total': abs(total_spent), 'trend': expense_trend, 'change': expense_change, }, 'savings': { 'total': total_savings, 'trend': savings_trend, 'change': savings_change, }, 'total_transactions': total_transactions, 'expense_count': expense_count, 'income_count': income_count, 'avg_expense': avg_expense, 'avg_income': avg_income, 'start_date': start_date.isoformat(), 'end_date': end_date.isoformat(), 'largest_expense': largest_expense, 'largest_income': largest_income, 'savings_rate': savings_rate, 'monthly_summary': monthly_summary, } settings.logger.info('Transaction summarization completed successfully') return summary except Exception as e: settings.logger.error(f'Error summarizing transactions: {str(e)}') if ws_manager: await ws_manager.send_progress('Summarization failed', 1.0) return {'error': f'Summarization failed: {str(e)}'} async def calculate_trend(monthly_data: pd.Series) -> str: """ Calculate trend ('up', 'down', 'neutral') based on monthly data. """ if len(monthly_data) < 2: return 'neutral' recent_avg = monthly_data[-2:].mean() earlier_avg = monthly_data[:-2].mean() if recent_avg > earlier_avg: return 'up' elif recent_avg < earlier_avg: return 'down' else: return 'neutral' async def calculate_percentage_change(monthly_data: pd.Series) -> float: """ Calculate the percentage change from the highest monthly value to the average of the last two months. """ if len(monthly_data) < 2: return 0 highest_value = monthly_data.max() recent_avg = monthly_data[-2:].mean() return ( ((recent_avg - highest_value) / highest_value) * 100 if highest_value != 0 else 0 ) 
Enter fullscreen mode Exit fullscreen mode

Though long (due to repeated codes that could have been drafted into reusable functions), the function just does basic summaries of your transactions. It's comprehensive but can be well extended.

In both analyses, I endeavored to update connections with progress reports by sending progress actions at intervals informing the user of the time left.

Now, in app.py, using init_app, we created an Application instance and added routes to it. We could have used @route decorator instead. We didn't forget to do some housekeeping (clean-ups) of the background tasks previously started whenever we quit the app via different interrupts. That's it!

Note: Undiscussed utilities

There were some submodules: utils/settings.py, models/base.py, etc, not discussed. They are basic setups for instrumentation (logging) and stuff. Others are for another system (maybe discussed in another series). In all, they are very basic.

Outro

Enjoyed this article? I'm a Software Engineer and Technical Writer actively seeking new opportunities, particularly in areas related to web security, finance, healthcare, and education. If you think my expertise aligns with your team's needs, let's chat! You can find me on LinkedIn and X. I am also an email away.

If you found this article valuable, consider sharing it with your network to help spread the knowledge!

Top comments (0)