DEV Community

Enhancing Your Matillion Python Component with a Custom Matillion Python Package Powered by Amazon Q Developer

At MRH Trowe, we've integrated Matillion with Snowflake and AWS to streamline our ELT and ETL data integrations (Extract, Load, Transform). Matillion offers a low-code/no-code solution for creating these integrations, providing both SQL-related components (Transformation) for transformation pipelines and orchestration pipelines for data integration and automation (Extract and Load).

While Matillion comes with many pre-built components to connect with various sources, there are situations where Python components are necessary for more complex tasks or to implement feature which are not supported. This combination allows us to load data from different sources into Snowflake, leveraging its robust data platform capabilities.

Given Matillion's focus on low-code integration, I often wondered how to streamline Python development for use within Matillion. Considering that Matillion operates on Docker in AWS (e.g., with Fargate), I decided it was best to take ownership of the container by creating a custom Matillion Helper Package for Python to ease integration. For this, I utilized Amazon Q Developer.

Amazon Q can be leveraged with VS Code and it's server version using a dedicated free extension. You can use your Builders profile or leverage SSO (Amazon Identity Center) to streamline IAM-related tasks for Amazon Q Developer. After installing this extension, you're provided with a chat interface that facilitates an agentic development lifecycle.

Creating a Custom PyPi Package with Amazon Q

I set out to create my first Python package as a companion to the Matillion Dockerfile with the following structure:

matillion-helper/ ├── matillion_helper/ │ ├── __init__.py │ └── helper.py ├── setup.py ├── pyproject.toml ├── README.md ├── Dockerfile └── .dockerignore 
Enter fullscreen mode Exit fullscreen mode

To generate this structure, I prompted Amazon Q: “Create the needed minimal setup for PyPi package to be installed into a Docker container. Name the package matillion_helper.” With Claude Sonnet 3.7. this took only a matter of seconds.

Use Case: Consolidated Logging

Once the package was set up, I focused on creating a module for consolidated logging. This module would log messages from any function up to the main execution, making logging consistent across the package and all Matillion projects. The prompt "Create a logging_config.py file for my matillion_helper package with the following features:

  • Functions to configure logging with console and file output
  • Support for log rotation with configurable max size and backup count
  • Environment variable support for log directory and log level
  • A get_logger convenience function
  • Proper error handling for file permissions
  • Type hints for all functions"

As a result I got this code.

 import logging import logging.handlers import os import sys from pathlib import Path from typing import Any, Optional def configure_logging( logger_name: Optional[str] = None, log_level: int = logging.INFO, log_file: Optional[str] = None, log_format: str = "[%(asctime)s] [%(levelname)s] [%(name)s] - %(message)s", date_format: str = "%Y-%m-%d %H:%M:%S", console_output: bool = True, max_bytes: int = 10485760, # 10MB  backup_count: int = 5, propagate: bool = False, ) -> logging.Logger: """Configure logging for a module. Args: logger_name: Name of the logger (defaults to the calling module name) log_level: Logging level (default: INFO) log_file: Log file name (default: based on logger_name) log_format: Format string for log messages date_format: Format string for timestamps console_output: Whether to output logs to console file_output: Whether to output logs to file max_bytes: Maximum size of log file before rotation backup_count: Number of backup log files to keep propagate: Whether to propagate logs to parent loggers Returns: logging.Logger: Configured logger instance """ # If no logger name is provided, use the calling module's name  if logger_name is None: import inspect frame = inspect.stack()[1] module = inspect.getmodule(frame[0]) logger_name = module.__name__ if module else "matillion_helper" # Get or create the logger  logger = logging.getLogger(logger_name) # Clear any existing handlers to avoid duplicate logs  if logger.hasHandlers(): logger.handlers.clear() # Set the log level  logger.setLevel(log_level) logger.propagate = propagate # Create formatter  formatter = logging.Formatter(log_format, date_format) # Add console handler if requested  if console_output: console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(formatter) console_handler.setLevel(log_level) logger.addHandler(console_handler) return logger def get_logger( name: Optional[str] = None, log_level: Optional[int] = None, **kwargs: Any, ) -> logging.Logger: """Get a configured logger. This is a convenience function for getting a logger with the default configuration. Args: name: Logger name (defaults to the calling module name) log_level: Override default log level **kwargs: Additional arguments to pass to configure_logging Returns: logging.Logger: Configured logger instance """ # Get environment variable for log level, defaulting to INFO  env_log_level = os.environ.get("MATILLION_LOG_LEVEL", "INFO") # Map string log levels to logging constants  log_level_map = { "DEBUG": logging.DEBUG, "INFO": logging.INFO, "WARNING": logging.WARNING, "ERROR": logging.ERROR, "CRITICAL": logging.CRITICAL, } # Use provided log_level, or get from environment, or default to INFO  effective_log_level = log_level or log_level_map.get(env_log_level, logging.INFO) return configure_logging(logger_name=name, log_level=effective_log_level, **kwargs) 
Enter fullscreen mode Exit fullscreen mode

This implementation ensured that the logging mechanism was centralized and not dependent on individual developers' preferences.

Use Case: Interaction with AWS API

Another regular task we conduct with Matillion involves IAM context switches. This setup allows us to eliminate static credentials. To achieve this, we implemented the ability to make STS (Security Token Service) AssumeRole calls. The Matillion instance needs permissions from both the source and target AWS accounts to facilitate cross-account access.

Using a prompt like “Create a function to make STS AssumeRole possible in a generic way,” I developed a module that utilizes the existing logging_config.py for consistent logging throughout the implementation.

import boto3 from botocore.exceptions import ClientError, ParamValidationError from typing import Dict, Optional, Any from matillion_helper.logging_config import get_logger # Get configured logger for this module logger = get_logger(__name__) def assume_role( role_arn: str, session_name: str = "AssumedRoleSession", duration_seconds: int = 3600, external_id: Optional[str] = None, region_name: Optional[str] = None, sts_client: Optional[Any] = None, ) -> Dict[str, Any]: """Assume an AWS IAM role and return temporary credentials. This function allows for assuming an IAM role and obtaining temporary credentials that can be used to make AWS API calls with the permissions of the assumed role. Args: role_arn (str): The Amazon Resource Name (ARN) of the role to assume. session_name (str, optional): An identifier for the assumed role session. Defaults to "AssumedRoleSession". duration_seconds (int, optional): The duration, in seconds, of the role session. Defaults to 3600 (1 hour). external_id (str, optional): A unique identifier that might be required when assuming a role in another account. Defaults to None. region_name (str, optional): The AWS region to connect to. Defaults to None, which uses the default region from the AWS configuration. sts_client (boto3.client, optional): An existing STS client to use. If not provided, a new client will be created. Returns: Dict[str, Any]: A dictionary containing the temporary credentials and session information. The dictionary includes: - AccessKeyId: The access key ID for the temporary credentials - SecretAccessKey: The secret access key for the temporary credentials - SessionToken: The session token for the temporary credentials - Expiration: When the temporary credentials expire Raises: ClientError: If AWS returns an error during role assumption. ParamValidationError: If the parameters provided are invalid. Exception: Any other exception that occurs during role assumption. """ try: # Create STS client if not provided  if sts_client is None: sts_client = boto3.client("sts", region_name=region_name) # Prepare assume role parameters  assume_role_params = { "RoleArn": role_arn, "RoleSessionName": session_name, "DurationSeconds": duration_seconds, } # Add external ID if provided  if external_id: assume_role_params["ExternalId"] = external_id # Assume the role  response = sts_client.assume_role(**assume_role_params) # Extract credentials from the response  credentials = response["Credentials"] logger.info(f"Successfully assumed role: {role_arn}") return credentials except ClientError as e: logger.exception(f"Failed to assume role due to AWS error: {e}") raise except ParamValidationError as e: logger.exception(f"Invalid parameters for assume_role: {e}") raise except Exception as e: logger.exception(f"Unexpected error assuming role: {e}") raise 
Enter fullscreen mode Exit fullscreen mode

Use Case: Refreshing Power BI Semantic Models

Another useful feature we developed is the ability to kick off dataset refreshes for Power BI from Matillion. This change transforms a scheduled dependency between Matillion and Power BI semantic models into an event-based approach, resulting in more consistent behavior and fewer error sources. However, configuration in Power BI Admin Settings is required to allow API interaction, and there are restrictions on refreshes—limited to eight per day. If you meet the prerequisites, you leverage event based refreshes for all power bi workspaces and semantic models, by inniting the service principal and assigning contributor rights.

Using a prompt like “Create a Python function to update Power BI datasets in a workspace.”, I implemented a function that interacts with the Power BI API to refresh datasets, ensuring that we can maintain updated reports directly from our data integration workflows.

 import requests import json import msal from matillion_helper.logging_config import get_logger # Get configured logger for this module logger = get_logger(__name__) def update_powerbi_datasets(workspace_id, client_id, client_secret, tenant_id): """ Update Power BI datasets in a specified workspace. Args: workspace_id (str): The ID of the Power BI workspace client_id (str): Azure AD application client ID client_secret (str): Azure AD application client secret tenant_id (str): Azure AD tenant ID Returns: dict: Result of the refresh operation """ # Get access token using MSAL  authority = f"https://login.microsoftonline.com/{tenant_id}" app = msal.ConfidentialClientApplication( client_id=client_id, client_credential=client_secret, authority=authority ) # Acquire token for Power BI service  scopes = ["https://analysis.windows.net/powerbi/api/.default"] result = app.acquire_token_for_client(scopes=scopes) if "access_token" not in result: logger.exception(f"Error getting token: {result.get('error')}") logger.exception (f"Error description: {result.get('error_description')}") return None access_token = result["access_token"] # Get datasets in the workspace  headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" } datasets_url = f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}/datasets" datasets_response = requests.get(datasets_url, headers=headers) if datasets_response.status_code != 200: logger.exception (f"Error getting datasets: {datasets_response.text}") return None datasets = datasets_response.json()["value"] refresh_results = {} # Refresh each dataset  for dataset in datasets: dataset_id = dataset["id"] dataset_name = dataset["name"] refresh_url = f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}/datasets/{dataset_id}/refreshes" refresh_response = requests.post(refresh_url, headers=headers) if refresh_response.status_code == 202: refresh_results[dataset_name] = "Refresh triggered successfully" else: refresh_results[dataset_name] = f"Error: {refresh_response.text}" return refresh_results # Example usage if __name__ == "__main__": # Replace these with your actual value  client_id="client_id" client_secret="client_secret" tenant_id="tenant_id" workspace_id="workspace_id" results = update_powerbi_datasets(workspace_id, client_id, client_secret, tenant_id) logger.info(results) 
Enter fullscreen mode Exit fullscreen mode

Summary

By utilizing Amazon Q and creating a custom Matillion Python package, we gained much-needed flexibility in our development process. While this approach requires initial setup and containerization efforts, the benefits include a more integrated CI/CD process and the ability to manage Python scripts effectively within Matillion.

As of today, the current Copilot from Matillion (MAIA) is not helping you to create and maintain Python scripts. But by introducing Amazon Q Developer you can overcome this hurdle of hight code implementation by procideng meanigful buildings blocks and your data enginneers to leverage pure but predefined Python code.

Happy (vibed) Conding! :-)

Top comments (0)