I got an assignment the other day to produce documentation to send to
a customer. The extraction of the table names required to execute a
certain Databricks notebook was part of the task. The plan was to
build an object dependency tree.
The query spanned 279 lines. How can you extract only the table names
from a file without having to manually look for them? Can we make use
of this technique again in the future?
During data engineering projects I tend to try and minimize the tools
being used. I think it’s a good practice. Having too many tools causes
sometimes for errors going unnoticed by the teams members.
One of the advantages of having a tool like Databricks is that it
allows us to use all the power of python
and avoid, like I did in
the past, to have something like Azure Functions
to compensate for
the limitations of some platform.
I’m currently working on a project where I’m adapting a code base of
Databricks notebooks for a new client. There are a few errors to hunt
but the Web UI is not really friendly for this purpose.
Just wanted a quick and easy way to not have to click around to find
the issues.
Here’s a quick script to just do that:
import os, json
import configparser
from databricks_cli.sdk.api_client import ApiClient
from databricks_cli.runs.api import RunsApi
def print_error(nb_path, nb_params, nb_run_url, nb_error="Unknown"):
error = nb_error.partition("\n")[0]
params = json.loads(nb_params) if nb_params != "" else {}
print(
f"""
Path: {nb_path}
Params: {json.dumps(params,indent=2)}
RunUrl: {nb_run_url}
Error: {error}
"""
)
databricks_cfg = "~/.databrickscfg"
conf = configparser.ConfigParser()
conf.read(os.path.expanduser(databricks_cfg))
api_client = ApiClient(
host=conf["DEFAULT"]["host"],
token=conf["DEFAULT"]["password"]
)
runs_api = RunsApi(api_client)
for x in range(1, 101, 25):
x = runs_api.list_runs(
job_id=None,
active_only=None,
completed_only=None,
offset=x,
limit=25,
version="2.1",
)
if len(x["runs"]) > 0:
for y in x["runs"]:
if y["state"]["result_state"] == "FAILED":
z = runs_api.get_run_output(run_id=y["run_id"])
if "error" in z:
print_error(
z["metadata"]["task"]["notebook_task"]["notebook_path"],
z["metadata"]["task"]["notebook_task"]["base_parameters"][
"Param1Value"
],
z["metadata"]["run_page_url"],
z["error"],
)
else:
print_error(
z["metadata"]["task"]["notebook_task"]["notebook_path"],
z["metadata"]["task"]["notebook_task"]["base_parameters"][
"Param1Value"
],
z["metadata"]["run_page_url"],
)
Follow this documentation to install the requirements. There’s
a lot more you can do with databricks-cli
to make your life easier.
It’s a great tool to add to your toolbox.
There’s not an official function to calculate workdays in Databricks.
Here are some solutions.
Having a DimCalendar with holidays and at least Databricks Runtime 12
If you have a DimCalendar in the system you can now do LEFT LATERAL
JOIN
without the correlated subquery errors when using non-equality
predicates. Check SPARK-36114 for more details.
Calculating working days is then as simple as run a query like:
SELECT mt.StartDate,
mt.EndDate,
dc.Workdays
FROM myTable mt
LEFT JOIN LATERAL
(SELECT COUNT(DateKey) - 1 AS Workdays
FROM dimCalendar dc
WHERE mt.StartDate <= dc.DateKey
AND mt.EndDate >= dc.Datekey
AND dc.IsDayWeekDay = TRUE
AND dc.IsDayHoliday = FALSE
)
If this query is slow, please check if the data types for the dates
columns match. All are DATE
or all are TIMESTAMP
. If the sluginess
remains check if any of the fields are not part of the statistics of
the table.