Baseline for Databricks Labs projects written in Python. Sources are validated with mypy
and pylint
. See Contributing instructions if you would like to improve this project.
- Databricks Labs Blueprint
- Installation
- Batteries Included
- Python-native
pathlib.Path
-like interfaces - Basic Terminal User Interface (TUI) Primitives
- Nicer Logging Formatter
- Parallel Task Execution
- Application and Installation State
- Install Folder
- Detecting Current Installation
- Detecting Installations From All Users
- Saving
@dataclass
configuration - Saving CSV files
- Loading
@dataclass
configuration - Brute-forcing
SerdeError
withas_dict()
andfrom_dict()
- Configuration Format Evolution
- Uploading Untyped Files
- Listing All Files in the Install Folder
- Unit Testing Installation State
- Assert Rewriting with PyTest
- Application State Migrations
- Building Wheels
- Databricks CLI's
databricks labs ...
Router
- Python-native
- Notable Downstream Projects
- Project Support
You can install this project via pip
:
pip install databricks-labs-blueprint
This library contains a proven set of building blocks, tested in production through UCX and projects.
This library exposes subclasses of pathlib
from Python's standard library that work with Databricks Workspace paths. These classes provide a more intuitive and Pythonic way to work with Databricks Workspace paths than the standard str
paths. The classes are designed to be drop-in replacements for pathlib.Path
and provide additional functionality for working with Databricks Workspace paths.
This code initializes a client to interact with a Databricks workspace, creates a relative workspace path (~/some-folder/foo/bar/baz
), verifies the path is not absolute, and then demonstrates that converting this relative path to an absolute path is not implemented and raises an error. Subsequently, it expands the relative path to the user's home directory and creates the specified directory if it does not already exist.
from databricks.sdk import WorkspaceClient from databricks.labs.blueprint.paths import WorkspacePath name = 'some-folder' ws = WorkspaceClient() wsp = WorkspacePath(ws, f"~/{name}/foo/bar/baz") assert not wsp.is_absolute() wsp.absolute() # raises NotImplementedError with_user = wsp.expanduser() with_user.mkdir() user_name = ws.current_user.me().user_name wsp_check = WorkspacePath(ws, f"/Users/{user_name}/{name}/foo/bar/baz") assert wsp_check.is_dir() wsp_check.parent.rmdir() # raises BadRequest wsp_check.parent.rmdir(recursive=True) assert not wsp_check.exists()
This code expands the ~
symbol to the full path of the user's home directory, computes the relative path from this home directory to the previously created directory (~/some-folder/foo/bar/baz
), and verifies it matches the expected relative path (some-folder/foo/bar/baz
). It then confirms that the expanded path is absolute, checks that calling absolute()
on this path returns the path itself, and converts the path to a FUSE-compatible path format (/Workspace/username@example.com/some-folder/foo/bar/baz
).
from pathlib import Path from databricks.sdk import WorkspaceClient from databricks.labs.blueprint.paths import WorkspacePath name = 'some-folder' ws = WorkspaceClient() wsp = WorkspacePath(ws, f"~/{name}/foo/bar/baz") with_user = wsp.expanduser() home = WorkspacePath(ws, "~").expanduser() relative_name = with_user.relative_to(home) assert relative_name.as_posix() == f"{name}/foo/bar/baz" assert with_user.is_absolute() assert with_user.absolute() == with_user assert with_user.as_fuse() == Path("/Workspace") / with_user.as_posix()
as_uri()
method returns a browser-accessible URI for the workspace path. This example retrieves the current user's username from the Databricks workspace client, constructs a browser-accessible URI for the previously created directory (~/some-folder/foo/bar/baz) by formatting the host URL and encoding the username, and then verifies that the URI generated by the with_user path object matches the constructed browser URI:
from databricks.sdk import WorkspaceClient from databricks.labs.blueprint.paths import WorkspacePath name = 'some-folder' ws = WorkspaceClient() wsp = WorkspacePath(ws, f"~/{name}/foo/bar/baz") with_user = wsp.expanduser() user_name = ws.current_user.me().user_name browser_uri = f'{ws.config.host}#workspace/Users/{user_name.replace("@", "%40")}/{name}/foo/bar/baz' assert with_user.as_uri() == browser_uri
This code creates a WorkspacePath
object for the path ~/some-folder/a/b/c
, expands it to the full user path, and creates the directory along with any necessary parent directories. It then creates a file named hello.txt
within this directory, writes "Hello, World!" to it, and verifies the content. The code lists all .txt
files in the directory and ensures there is exactly one file, which is hello.txt
. Finally, it deletes hello.txt
and confirms that the file no longer exists.
from databricks.sdk import WorkspaceClient from databricks.labs.blueprint.paths import WorkspacePath name = 'some-folder' ws = WorkspaceClient() wsp = WorkspacePath(ws, f"~/{name}/a/b/c") with_user = wsp.expanduser() with_user.mkdir(parents=True) hello_txt = with_user / "hello.txt" hello_txt.write_text("Hello, World!") assert hello_txt.read_text() == "Hello, World!" files = list(with_user.glob("**/*.txt")) assert len(files) == 1 assert hello_txt == files[0] assert files[0].name == "hello.txt" with_user.joinpath("hello.txt").unlink() assert not hello_txt.exists()
read_bytes()
method works as expected:
from databricks.sdk import WorkspaceClient from databricks.labs.blueprint.paths import WorkspacePath name = 'some-folder' ws = WorkspaceClient() wsp = WorkspacePath(ws, f"~/{name}") with_user = wsp.expanduser() with_user.mkdir(parents=True) hello_bin = with_user.joinpath("hello.bin") hello_bin.write_bytes(b"Hello, World!") assert hello_bin.read_bytes() == b"Hello, World!" with_user.joinpath("hello.bin").unlink() assert not hello_bin.exists()
This code creates a WorkspacePath object for the path ~/some-folder, expands it to the full user path, and creates the directory along with any necessary parent directories. It then creates a file named hello.txt within this directory and writes "Hello, World!" to it. The code then renames the file to hello2.txt, verifies that hello.txt no longer exists, and checks that the content of hello2.txt is "Hello, World!".
from databricks.sdk import WorkspaceClient from databricks.labs.blueprint.paths import WorkspacePath name = 'some-folder' ws = WorkspaceClient() wsp = WorkspacePath(ws, f"~/{name}") with_user = wsp.expanduser() with_user.mkdir(parents=True) hello_txt = with_user / "hello.txt" hello_txt.write_text("Hello, World!") hello_txt.replace(with_user / "hello2.txt") assert not hello_txt.exists() assert (with_user / "hello2.txt").read_text() == "Hello, World!"
This code initializes a Databricks WorkspaceClient, creates a WorkspacePath object for the path ~/some-folder, and defines two items within this folder: a text file (a.txt) and a Python notebook (b). It creates the notebook with specified content and writes "Hello, World!" to the text file. The code then retrieves all files in the folder, asserts there are exactly two files, and verifies the suffix and content of each file. Specifically, it checks that a.txt has a .txt suffix and b has a .py suffix, with the notebook containing the expected code.
from databricks.sdk import WorkspaceClient from databricks.labs.blueprint.paths import WorkspacePath ws = WorkspaceClient() folder = WorkspacePath(ws, "~/some-folder") txt_file = folder / "a.txt" py_notebook = folder / "b" # notebooks have no file extension make_notebook(path=py_notebook, content="display(spark.range(10))") txt_file.write_text("Hello, World!") files = {_.name: _ for _ in folder.glob("**/*")} assert len(files) == 2 assert files["a.txt"].suffix == ".txt" assert files["b"].suffix == ".py" # suffix is determined from ObjectInfo assert files["b"].read_text() == "# Databricks notebook source\ndisplay(spark.range(10))"
Your command-line apps do need testable interactivity, which is provided by from databricks.labs.blueprint.tui import Prompts
. Here are some examples of it:
It is also integrated with our command router.
Use prompts.question()
as a bit more involved than input()
builtin:
from databricks.labs.blueprint.tui import Prompts prompts = Prompts() answer = prompts.question('Enter a year', default='2024', valid_number=True) print(answer)
Optional arguments are:
default
(str) - use given value if user didn't input anythingmax_attempts
(int, default 10) - number of attempts to throw exception after invalid or empty inputvalid_number
(bool) - input has to be a valid numbervalid_regex
(bool) - input has to be a valid regular expressionvalidate
- function that takes a string and returns boolean, likelambda x: 'awesome' in x
, that could be used to further validate input.
Use prompts.confirm()
to guard any optional or destructive actions of your app:
if prompts.confirm('Destroy database?'): print('DESTROYING DATABASE')
Use to select a value from a list:
answer = prompts.choice('Select a language', ['Python', 'Rust', 'Go', 'Java']) print(answer)
Use to select a value from the dictionary by showing users sorted dictionary keys:
answer = prompts.choice_from_dict('Select a locale', { 'Українська': 'ua', 'English': 'en' }) print(f'Locale is: {answer}')
Use to select multiple items from dictionary
answer = prompts.multiple_choice_from_dict( 'What projects are written in Python? Select [DONE] when ready.', { 'Databricks Labs UCX': 'ucx', 'Databricks SDK for Python': 'sdk-py', 'Databricks SDK for Go': 'sdk-go', 'Databricks CLI': 'cli', }) print(f'Answer is: {answer}')
Hidden (Password) Prompt
Use to prompt for text without showing the response in the terminal.
Example:
from databricks.labs.blueprint.tui import Prompts prompts = Prompts() secret = prompts.password('Enter a password') print(f'Secret without redaction: {secret}')
Use MockPrompts
with regular expressions as keys and values as answers. The longest key takes precedence.
from databricks.labs.blueprint.tui import MockPrompts def test_ask_for_int(): prompts = MockPrompts({r".*": ""}) res = prompts.question("Number of threads", default="8", valid_number=True) assert "8" == res
There's a basic logging configuration available for Python SDK, but the default output is not pretty and is relatively inconvenient to read. Here's how make output from Python's standard logging facility more enjoyable to read:
from databricks.labs.blueprint.logger import install_logger install_logger() import logging logging.root.setLevel("DEBUG") # use only for development or demo purposes logger = logging.getLogger("name.of.your.module") logger.debug("This is a debug message") logger.info("This is an table message") logger.warning("This is a warning message") logger.error("This is an error message", exc_info=KeyError(123)) logger.critical("This is a critical message")
Here are the assumptions made by this formatter:
- Most likely you're forwarding your logs to a file already, this log formatter is mainly for visual consumption.
- The average app or Databricks Job most likely finishes running within a day or two, so we display only hours, minutes, and seconds from the timestamp.
- We gray out debug messages, and highlight all other messages. Errors and fatas are additionally painted with red.
- We shorten the name of the logger to a readable chunk only, not to clutter the space. Real-world apps have deeply nested folder structures and filenames like
src/databricks/labs/ucx/migration/something.py
, which translate intodatabricks.labs.ucx.migration.something
fully-qualified Python module names, that get reflected into__name__
top-level code environment special variable, that you idiomatically use with logging aslogger.getLogger(__name__)
. This log formatter shortens the full module path to a more readabled.l.u.migration.something
, which is easier to consume from a terminal screen or a notebook. - We only show the name of the thread if it's other than
MainThread
, because the overwhelming majority of Python applications are single-threaded.
Here's how the output would look like on dark terminal backgrounds, including those from GitHub Actions:
And here's how things will appear when executed from Databricks Runtime as part of notebook or a workflow:
Just place the following code in your wheel's top-most __init__.py
file:
from databricks.labs.blueprint.logger import install_logger install_logger(level="INFO")
And place this idiomatic
# ... insert this into the top of your file from databricks.labs.blueprint.entrypoint import get_logger logger = get_logger(__file__) # ... top of the file insert end
... and you'll be able to benefit from the readable console stderr formatting everywhere
Each time you'd need to turn on debug logging, just invoke logging.root.setLevel("DEBUG")
(even in notebook).
When you invoke Python as an entry point to your wheel (also known as console_scripts
), __name__
top-level code environment would always be equal to __main__
. But you really want to get the logger to be named after your Python module and not just __main__
(see rendering in Databricks notebooks).
If you create a dist/logger.py
file with the following contents:
from databricks.labs.blueprint.entrypoint import get_logger, run_main logger = get_logger(__file__) def main(first_arg, second_arg, *other): logger.info(f'First arg is: {first_arg}') logger.info(f'Second arg is: {second_arg}') logger.info(f'Everything else is: {other}') logger.debug('... and this message is only shown when you are debugging from PyCharm IDE') if __name__ == '__main__': run_main(main)
... and invoke it with python dist/logger.py Hello world, my name is Serge
, you should get back the following output.
13:46:42 INFO [dist.logger] First arg is: Hello 13:46:42 INFO [dist.logger] Second arg is: world, 13:46:42 INFO [dist.logger] Everything else is: ('my', 'name', 'is', 'Serge')
Everything is made easy thanks to run_main(fn)
helper.
Python applies global interpreter lock (GIL) for compute-intensive tasks, though IO-intensive tasks, like calling Databricks APIs through Databricks SDK for Python, are not subject to GIL. It's quite a common task to perform multiple different API calls in parallel, though it is overwhelmingly difficult to do multi-threading right. concurrent.futures import ThreadPoolExecutor
is great, but sometimes we want something even more high level. This library helps you navigate the most common road bumps.
This library helps you filtering out empty results from background tasks, so that the downstream code is generally simpler. We're also handling the thread pool namind, so that the name of the list of tasks properly gets into log messages. After all background tasks completed their execution, we log something like Finished 'task group name' tasks: 50% results available (2/4). Took 0:00:00.000604
.
from databricks.labs.blueprint.parallel import Threads def not_really_but_fine(): logger.info("did something, but returned None") def doing_something(): logger.info("doing something important") return f'result from {doing_something.__name__}' logger.root.setLevel('DEBUG') tasks = [not_really_but_fine, not_really_but_fine, doing_something, doing_something] results, errors = Threads.gather("task group name", tasks) assert ['result from doing_something', 'result from doing_something'] == results assert [] == errors
This will log the following messages:
14:20:15 DEBUG [d.l.blueprint.parallel] Starting 4 tasks in 20 threads 14:20:15 INFO [dist.logger][task_group_name_0] did something, but returned None 14:20:15 INFO [dist.logger][task_group_name_1] did something, but returned None 14:20:15 INFO [dist.logger][task_group_name_1] doing something important 14:20:15 INFO [dist.logger][task_group_name_1] doing something important 14:20:15 INFO [d.l.blueprint.parallel][task_group_name_1] task group name 4/4, rps: 7905.138/sec 14:20:15 INFO [d.l.blueprint.parallel] Finished 'task group name' tasks: 50% results available (2/4). Took 0:00:00.000604
Inspired by Go Language's idiomatic error handling approach, this library allows for collecting errors from all of the background tasks and handle them separately. For all other cases, we recommend using strict failures
from databricks.sdk.errors import NotFound from databricks.labs.blueprint.parallel import Threads def works(): return True def fails(): raise NotFound("something is not right") tasks = [works, fails, works, fails, works, fails, works, fails] results, errors = Threads.gather("doing some work", tasks) assert [True, True, True, True] == results assert 4 == len(errors)
This will log the following messages:
14:08:31 ERROR [d.l.blueprint.parallel][doing_some_work_0] doing some work task failed: something is not right: ... ... 14:08:31 ERROR [d.l.blueprint.parallel][doing_some_work_3] doing some work task failed: something is not right: ... 14:08:31 ERROR [d.l.blueprint.parallel] More than half 'doing some work' tasks failed: 50% results available (4/8). Took 0:00:00.001011
Use Threads.strict(...)
to raise ManyError
with the summary of all failed tasks:
from databricks.sdk.errors import NotFound from databricks.labs.blueprint.parallel import Threads def works(): return True def fails(): raise NotFound("something is not right") tasks = [works, fails, works, fails, works, fails, works, fails] results = Threads.strict("doing some work", tasks) # this line won't get executed assert [True, True, True, True] == results
This will log the following messages:
... 14:11:46 ERROR [d.l.blueprint.parallel] More than half 'doing some work' tasks failed: 50% results available (4/8). Took 0:00:00.001098 ... databricks.labs.blueprint.parallel.ManyError: Detected 4 failures: NotFound: something is not right
There always needs to be a location, where you put application code, artifacts, and configuration. The Installation
class is used to manage the ~/.{product}
folder on WorkspaceFS to track typed files. It provides methods for serializing and deserializing objects of a specific type, as well as managing the storage location for those objects. The class includes methods for loading and saving objects, uploading and downloading files, and managing the installation folder.
The Installation
class can be helpful for unit testing by allowing you to mock the file system and control the behavior of the load
and save
methods. See unit testing for more details.
The install_folder
method returns the path to the installation folder on WorkspaceFS. The installation folder is used to store typed files that are managed by the Installation
class. Publishing wheels update the version.json
file in the install folder.
When integration testing, you may want to have a random installation folder for each test execution.
If an install_folder
argument is provided to the constructor of the Installation
class, it will be used as the installation folder. Otherwise, the installation folder will be determined based on the current user's username. Specifically, the installation folder will be /Users/{user_name}/.{product}
, where {user_name}
is the username of the current user and {product}
is the name of the product associated with the installation. Here is an example of how you can use the install_folder
method:
from databricks.sdk import WorkspaceClient from databricks.labs.blueprint.installation import Installation # Create an Installation object for the "blueprint" product install = Installation(WorkspaceClient(), "blueprint") # Print the path to the installation folder print(install.install_folder()) # Output: /Users/{user_name}/.blueprint
In this example, the Installation
object is created for the "blueprint" product. The install_folder
method is then called to print the path to the installation folder. The output will be /Users/{user_name}/.blueprint
, where {user_name}
is the username of the current user.
You can also provide an install_folder
argument to the constructor to specify a custom installation folder. Here is an example of how you can do this:
from databricks.sdk import WorkspaceClient from databricks.labs.blueprint.installation import Installation # Create an Installation object for the "blueprint" product with a custom installation folder install = Installation(WorkspaceClient(), "blueprint", install_folder="/my/custom/folder") # Print the path to the installation folder print(install.install_folder()) # Output: /my/custom/folder
In this example, the Installation
object is created for the "blueprint" product with a custom installation folder of /my/custom/folder
. The install_folder
method is then called to print the path to the installation folder. The output will be /my/custom/folder
.
Installation.current(ws, product)
returns the Installation
object for the given product in the current workspace.
If the installation is not found, a NotFound
error is raised. If assume_user
argument is True, the method will assume that the installation is in the user's home directory and return it if found. If False, the method will only return an installation that is in the /Applications
directory.
from databricks.sdk import WorkspaceClient from databricks.labs.blueprint.installation import Installation ws = WorkspaceClient() # current user installation installation = Installation.assume_user_home(ws, "blueprint") assert "/Users/foo/.blueprint" == installation.install_folder() assert not installation.is_global() # workspace global installation installation = Installation.current(ws, "blueprint") assert "/Applications/blueprint" == installation.install_folder() assert installation.is_global()
Installation.existing(ws, product)
Returns a collection of all existing installations for the given product in the current workspace.
This method searches for installations in the root /Applications directory and home directories of all users in the workspace. Let's say, users foo@example.com
and bar@example.com
installed blueprint
product in their home folders. The following code will print /Workspace/bar@example.com/.blueprint
and /Workspace/foo@example.com/.blueprint
:
from databricks.sdk import WorkspaceClient from databricks.labs.blueprint.installation import Installation ws = WorkspaceClient() global_install = Installation.assume_global(ws, 'blueprint') global_install.upload("some.bin", b"...") user_install = Installation.assume_user_home(ws, 'blueprint') user_install.upload("some.bin", b"...") for blueprint in Installation.existing(ws, "blueprint"): print(blueprint.install_folder())
The save(obj)
method saves a dataclass instance of type T
to a file on WorkspaceFS. If no filename
is provided, the name of the type_ref
class will be used as the filename. Any missing parent directories are created automatically. If the object has a __version__
attribute, the method will add a version
field to the serialized object with the value of the __version__
attribute. See configuration format evolution for more details. save(obj)
works with JSON and YAML configurations without the need to supply filename
keyword attribute. When you need to save CSV files, the filename
attribute is required. If you need to upload arbitrary and untyped files, use the upload()
method.
Here is an example of how you can use the save
method:
from databricks.sdk import WorkspaceClient from databricks.labs.blueprint.installation import Installation install = Installation(WorkspaceClient(), "blueprint") @dataclass class MyClass: field1: str field2: str obj = MyClass('value1', 'value2') install.save(obj) # Verify that the object was saved correctly loaded_obj = install.load(MyClass) assert loaded_obj == obj
In this example, the Installation
object is created for the "blueprint" product. A dataclass object of type MyClass
is then created and saved to a file using the save
method. The object is then loaded from the file using the load
method and compared to the original object to verify that it was saved correctly.
You may need to upload a CSV file to Databricks Workspace, so that it's easier editable from a Databricks Workspace UI or tools like Google Sheets or Microsoft Excel. If non-technical humands don't need to edit application state, use dataclasses for configuration. CSV files currently don't support format evolution.
The following example will save workspaces.csv
file with two records and a header:
from databricks.sdk import WorkspaceClient from databricks.sdk.service.provisioning import Workspace from databricks.labs.blueprint.installation import Installation installation = Installation(WorkspaceClient(), "blueprint") installation.save([ Workspace(workspace_id=1234, workspace_name="first"), Workspace(workspace_id=1235, workspace_name="second"), ], filename="workspaces.csv") # ~ $ databricks workspace export /Users/foo@example.com/.blueprint/workspaces.csv # ... workspace_id,workspace_name # ... 1234,first # ... 1235,second
The load(type_ref[, filename])
method loads an object of type type_ref
from a file on WorkspaceFS. If no filename
is provided, the __file__
attribute of type_ref
will be used as the filename, otherwise the library will figure out the name based on a class name.
from databricks.sdk import WorkspaceClient from databricks.labs.blueprint.installation import Installation @dataclass class SomeConfig: # <-- auto-detected filename is `some-config.json` version: str ws = WorkspaceClient() installation = Installation.current(ws, "blueprint") cfg = installation.load(SomeConfig) installation.save(SomeConfig("0.1.2")) installation.assert_file_written("some-config.json", {"version": "0.1.2"})
In the rare circumstances when you cannot use @dataclass or you get SerdeError
that you cannot explain, you can implement from_dict(cls, raw: dict) -> 'T'
and as_dict(self) -> dict
methods on the class:
from databricks.sdk import WorkspaceClient from databricks.labs.blueprint.installation import Installation class SomePolicy: def __init__(self, a, b): self._a = a self._b = b def as_dict(self) -> dict: return {"a": self._a, "b": self._b} @classmethod def from_dict(cls, raw: dict): return cls(raw.get("a"), raw.get("b")) def __eq__(self, o): assert isinstance(o, SomePolicy) return self._a == o._a and self._b == o._b policy = SomePolicy(1, 2) installation = Installation.current(WorkspaceClient(), "blueprint") installation.save(policy, filename="backups/policy-123.json") load = installation.load(SomePolicy, filename="backups/policy-123.json") assert load == policy
As time progresses, your application evolves. So does the configuration file format with it. This library provides a common utility to seamlessly evolve configuration file format across versions, providing callbacks to convert from older versions to newer. If you need to migrate configuration or database state of the entire application, use the application state migrations.
If the type has a __version__
attribute, the method will check that the version of the object in the file matches the expected version. If the versions do not match, the method will attempt to migrate the object to the expected version using a method named v{actual_version}_migrate
on the type_ref
class. If the migration is successful, the method will return the migrated object. If the migration is not successful, the method will raise an IllegalState
exception. Let's say, we have /Users/foo@example.com/.blueprint/config.yml
file with only the initial: 999
as content, which is from older installations of the blueprint
product:
from databricks.sdk import WorkspaceClient from databricks.labs.blueprint.installation import Installation @dataclass class EvolvedConfig: __file__ = "config.yml" __version__ = 3 initial: int added_in_v1: int added_in_v2: int @staticmethod def v1_migrate(raw: dict) -> dict: raw["added_in_v1"] = 111 raw["version"] = 2 return raw @staticmethod def v2_migrate(raw: dict) -> dict: raw["added_in_v2"] = 222 raw["version"] = 3 return raw installation = Installation.current(WorkspaceClient(), "blueprint") cfg = installation.load(EvolvedConfig) assert 999 == cfg.initial assert 111 == cfg.added_in_v1 # <-- added by v1_migrate() assert 222 == cfg.added_in_v2 # <-- added by v2_migrate()
The upload(filename, raw_bytes)
and upload_dbfs(filename, raw_bytes)
methods upload raw bytes to a file on WorkspaceFS (or DBFS) with the given filename
, creating any missing directories where required. This method is used to upload files that are not typed, i.e., they do not use the @dataclass
decorator.
installation = Installation(ws, "blueprint") target = installation.upload("wheels/foo.whl", b"abc") assert "/Users/foo/.blueprint/wheels/foo.whl" == target
The most common example is a wheel, which we already integrate with Installation
framework.
You can use files()
method to recursively list all files in the install folder.
You can create a MockInstallation
object and use it to override the default installation folder and the contents of the files in that folder. This allows you to test the of your code in different scenarios, such as when a file is not found or when the contents of a file do not match the expected format.
For example, you have the following WorkspaceConfig
class that is serialized into config.yml
on your workspace:
@dataclass class WorkspaceConfig: __file__ = "config.yml" __version__ = 2 inventory_database: str connect: Config | None = None workspace_group_regex: str | None = None include_group_names: list[str] | None = None num_threads: int | None = 10 database_to_catalog_mapping: dict[str, str] | None = None log_level: str | None = "INFO" workspace_start_path: str = "/"
Here's the only code necessary to verify that specific content got written:
from databricks.labs.blueprint.installation import MockInstallation installation = MockInstallation() installation.save(WorkspaceConfig(inventory_database="some_blueprint")) installation.assert_file_written("config.yml", { "version": 2, "inventory_database": "some_blueprint", "log_level": "INFO", "num_threads": 10, "workspace_start_path": "/", })
This method is far superior than directly comparing raw bytes content via mock:
ws.workspace.upload.assert_called_with( "/Users/foo/.blueprint/config.yml", yaml.dump( { "version": 2, "num_threads": 10, "inventory_database": "some_blueprint", "include_group_names": ["foo", "bar"], "workspace_start_path": "/", "log_level": "INFO", } ).encode("utf8"), format=ImportFormat.AUTO, overwrite=True, )
And it's even better if you use PyTest, where we have even deeper integration.
If you are using PyTest, then add this to your conftest.py
, so that the assertions are more readable:
import pytest pytest.register_assert_rewrite('databricks.labs.blueprint.installation')
As time goes by, your applications evolve as well, requiring the addition of new columns to database schemas, changes of the database state, or some migrations of configured workflows. This utility allows you to do seamless upgrades from version X to version Z through version Y. Idiomatic usage in your deployment automation is as follows:
from ... import Config from databricks.sdk import WorkspaceClient from databricks.labs.blueprint.upgrades import Upgrades from databricks.labs.blueprint.wheels import ProductInfo product_info = ProductInfo.from_class(Config) ws = WorkspaceClient(product=product_info.product_name(), product_version=product_info.version()) installation = product_info.current_installation(ws) config = installation.load(Config) upgrades = Upgrades(product_info, installation) upgrades.apply(ws)
The upgrade process loads the version of the product that is about to be installed from __about__.py
file that declares the __version__
variable. This version is compares with the version currently installed on the Databricks Workspace by loading it from the version.json
file in the installation folder. This file is kept up-to-date automatically if you use the databricks.labs.blueprint.wheels.WheelsV2.
If those versions are different, the process looks for the upgrades
folder next to __about__.py
file and computes a difference for the upgrades in need to be rolled out. Every upgrade script in that directory has to start with a valid SemVer identifier, followed by the alphanumeric description of the change, like v0.0.1_add_service.py
. Each script has to expose a function that takes Installation
and WorkspaceClient
arguments to perform the relevant upgrades. Here's the example:
from ... import Config import logging, dataclasses from databricks.sdk import WorkspaceClient from databricks.labs.blueprint.installation import Installation upgrade_logger = logging.getLogger(__name__) def upgrade(installation: Installation, ws: WorkspaceClient): upgrade_logger.info(f"creating new automated service user for the installation") config = installation.load(Config) service_principal = ws.service_principals.create(display_name='blueprint-service') new_config = dataclasses.replace(config, application_id=service_principal.application_id) installation.save(new_config)
To prevent the same upgrade script from being applies twice, we use applied-upgrades.json
file in the installation directory. At the moment, there's no downgrade(installation, ws)
, but it can easily be added in the future versions of this library.
We recommend deploying applications as wheels, which are part of the application installation. But versioning, testing, and deploying those is often a tedious process.
When you deploy your Python app as a wheel, every time it has to have a different version. This library detects __about__.py
file automatically anywhere in the project root and reads __version__
variable from it. We support SemVer versioning scheme. Publishing wheels update version.json
file in the install folder.
from databricks.labs.blueprint.wheels import ProductInfo product_info = ProductInfo(__file__) version = product_info.released_version() logger.info(f'Version is: {version}')
When you develop your wheel and iterate on testing it, it's often required to upload a file with different name each time you build it. We use git describe --tags
command to fetch the latest SemVer-compatible tag (e.g. v0.0.2
) and append the number of commits with timestamp to it. For example, if the released version is v0.0.1
, then the unreleased version would be something like 0.0.2+120240105144650
. We verify that this version is compatible with both SemVer and PEP 440. Publishing wheels update version.json
file in the install folder.
product_info = ProductInfo(__file__) version = product_info.unreleased_version() is_git = product_info.is_git_checkout() is_unreleased = product_info.is_unreleased_version() logger.info(f'Version is: {version}') logger.info(f'Git checkout: {is_git}') logger.info(f'Is unreleased: {is_unreleased}')
Library can infer the name of application by taking the directory name when __about__.py
file is located within the current project. See released version detection for more details. ProductInfo.for_testing(klass)
creates a new ProductInfo
object with a random product_name
.
from databricks.labs.blueprint.wheels import ProductInfo product_info = ProductInfo(__file__) logger.info(f'Product name is: {product_info.product_name()}')
When you're integration testing your installations, you may want to have different installation folders for each test execution. ProductInfo.for_testing(klass)
helps you with this:
from ... import ConfigurationClass from databricks.labs.blueprint.wheels import ProductInfo first = ProductInfo.for_testing(ConfigurationClass) second = ProductInfo.for_testing(ConfigurationClass) assert first.product_name() != second.product_name()
Before you execute a wheel on Databricks, you have to build it and upload it. This library provides detects released or unreleased version of the wheel, copies it over to a temporary folder, changes the __about__.py
file with the right version, and builds the wheel in the temporary location, so that it's not polluted with build artifacts. Wheels
is a context manager, so it removes all temporary files and folders ather with
block finishes. This library is successfully used to concurrently test wheels on Shared Databricks Clusters through notebook-scoped libraries. Before you deploy the new version of the wheel, it is highly advised that you perform application state upgrades.
Every call wheels.upload_to_wsfs()
updates version.json
file in the install folder, which holds version
field with the current wheel version. There's also wheel
field, that contains the path to the current wheel file on WorkspaceFS.
from databricks.sdk import WorkspaceClient from databricks.labs.blueprint.wheels import ProductInfo w = WorkspaceClient() product_info = ProductInfo(__file__) installation = product_info.current_installation(w) with product_info.wheels(w) as wheels: remote_wheel = wheels.upload_to_wsfs() logger.info(f'Uploaded to {remote_wheel}')
This will print something like:
15:08:44 INFO [dist.logger] Uploaded to /Users/serge.smertin@databricks.com/.blueprint/wheels/databricks_labs_blueprint-0.0.2+120240105150840-py3-none-any.whl
You can also do wheels.upload_to_dbfs()
, though you're not able to set any access control over it.
Python wheel may have dependencies that are not included in the wheel itself. These dependencies are usually other Python packages that your wheel relies on. During installation on regular Databricks Workspaces, these dependencies get automatically fetched from Python Package Index.
Some Databricks Workspaces are configured with extra layers of network security, that block all access to Public Internet, including Python Package Index. To ensure installations working on these kinds of workspaces, developers need to explicitly upload all upstream dependencies for their applications to work correctly.
The upload_wheel_dependencies(prefixes)
method can be used to upload these dependencies to Databricks Workspace. This method takes a list of prefixes as an argument. It will upload all the dependencies of the wheel that have names starting with any of the provided prefixes.
Here is an example of how you can use this method:
from databricks.sdk import WorkspaceClient from databricks.labs.blueprint.wheels import ProductInfo ws = WorkspaceClient() product_info = ProductInfo(__file__) installation = product_info.current_installation(ws) with product_info.wheels(ws) as wheels: wheel_paths = wheels.upload_wheel_dependencies(['databricks_sdk', 'pandas']) for path in wheel_paths: print(f'Uploaded dependency to {path}')
In this example, the upload_wheel_dependencies(['databricks_sdk', 'pandas'])
call will upload all the dependencies of the wheel that have names starting with 'databricks_sdk' or 'pandas'. This method excludes any platform specific dependencies (i.e. ending with -none-any.whl
). Also the main wheel file is not uploaded. The method returns a list of paths to the uploaded dependencies on WorkspaceFS.
This library contains common utilities for Databricks CLI entrypoints defined in labs.yml
file. Here's the example metadata for a tool named blueprint
with a single me
command and flag named --greeting
, that has Hello
as default value:
--- name: blueprint description: Common libraries for Databricks Labs install: script: src/databricks/labs/blueprint/__init__.py entrypoint: src/databricks/labs/blueprint/__main__.py min_python: 3.10 commands: - name: me description: shows current username flags: - name: greeting default: Hello description: Greeting prefix
And here's the content for src/databricks/labs/blueprint/__main__.py
file, that executes databricks labs blueprint me
command with databricks.sdk.WorkspaceClient
automatically injected into an argument with magical name w
:
from databricks.sdk import WorkspaceClient from databricks.labs.blueprint.entrypoint import get_logger from databricks.labs.blueprint.cli import App app = App(__file__) logger = get_logger(__file__) @app.command def me(w: WorkspaceClient, greeting: str): """Shows current username""" logger.info(f"{greeting}, {w.current_user.me().user_name}!") if "__main__" == __name__: app()
As you may have noticed, there were only workspace-level commands, but you can also nave native account-level command support. You need to specify the is_account
property when declaring it in labs.yml
file:
commands: # ... - name: workspaces is_account: true description: shows current workspaces
and @app.command(is_account=True)
will get you databricks.sdk.AccountClient
injected into a
argument:
from databricks.sdk import AccountClient @app.command(is_account=True) def workspaces(a: AccountClient): """Shows workspaces""" for ws in a.workspaces.list(): logger.info(f"Workspace: {ws.workspace_name} ({ws.workspace_id})")
If your command needs some terminal interactivity, simply add prompts: Prompts
argument to your command:
from databricks.sdk import WorkspaceClient from databricks.labs.blueprint.entrypoint import get_logger from databricks.labs.blueprint.cli import App from databricks.labs.blueprint.tui import Prompts app = App(__file__) logger = get_logger(__file__) @app.command def me(w: WorkspaceClient, prompts: Prompts): """Shows current username""" if prompts.confirm("Are you sure?"): logger.info(f"Hello, {w.current_user.me().user_name}!") if "__main__" == __name__: app()
Invoking Sparksession using Databricks Connect
from databricks.sdk import WorkspaceClient from databricks.connect import DatabricksSession @app.command def example(w: WorkspaceClient): """Building Spark Session using Databricks Connect""" spark = DatabricksSession.builder().sdk_config(w.config).getOrCreate() spark.sql("SHOW TABLES")
This tooling makes it easier to start new projects. First, install the CLI:
databricks labs install blueprint
After, create new project in a designated directory:
databricks labs blueprint init-project --target /path/to/folder
This library is used in the following projects:
Please note that this project is provided for your exploration only and is not formally supported by Databricks with Service Level Agreements (SLAs). They are provided AS-IS, and we do not make any guarantees of any kind. Please do not submit a support ticket relating to any issues arising from the use of this project.
Any issues discovered through the use of this project should be filed as GitHub Issues on this repository. They will be reviewed as time permits, but no formal SLAs for support exist.