During data engineering projects I tend to try and minimize the tools
being used. I think it’s a good practice. Having too many tools causes
sometimes for errors going unnoticed by the teams members.
One of the advantages of having a tool like Databricks is that it
allows us to use all the power of python
and avoid, like I did in
the past, to have something like Azure Functions
to compensate for
the limitations of some platform.
I’m currently working on a project where I’m adapting a code base of
Databricks notebooks for a new client. There are a few errors to hunt
but the Web UI is not really friendly for this purpose.
Just wanted a quick and easy way to not have to click around to find
the issues.
Here’s a quick script to just do that:
import os, json
import configparser
from databricks_cli.sdk.api_client import ApiClient
from databricks_cli.runs.api import RunsApi
def print_error(nb_path, nb_params, nb_run_url, nb_error="Unknown"):
error = nb_error.partition("\n")[0]
params = json.loads(nb_params) if nb_params != "" else {}
print(
f"""
Path: {nb_path}
Params: {json.dumps(params,indent=2)}
RunUrl: {nb_run_url}
Error: {error}
"""
)
databricks_cfg = "~/.databrickscfg"
conf = configparser.ConfigParser()
conf.read(os.path.expanduser(databricks_cfg))
api_client = ApiClient(
host=conf["DEFAULT"]["host"],
token=conf["DEFAULT"]["password"]
)
runs_api = RunsApi(api_client)
for x in range(1, 101, 25):
x = runs_api.list_runs(
job_id=None,
active_only=None,
completed_only=None,
offset=x,
limit=25,
version="2.1",
)
if len(x["runs"]) > 0:
for y in x["runs"]:
if y["state"]["result_state"] == "FAILED":
z = runs_api.get_run_output(run_id=y["run_id"])
if "error" in z:
print_error(
z["metadata"]["task"]["notebook_task"]["notebook_path"],
z["metadata"]["task"]["notebook_task"]["base_parameters"][
"Param1Value"
],
z["metadata"]["run_page_url"],
z["error"],
)
else:
print_error(
z["metadata"]["task"]["notebook_task"]["notebook_path"],
z["metadata"]["task"]["notebook_task"]["base_parameters"][
"Param1Value"
],
z["metadata"]["run_page_url"],
)
Follow this documentation to install the requirements. There’s
a lot more you can do with databricks-cli
to make your life easier.
It’s a great tool to add to your toolbox.
There’s not an official function to calculate workdays in Databricks.
Here are some solutions.
Having a DimCalendar with holidays and at least Databricks Runtime 12
If you have a DimCalendar in the system you can now do LEFT LATERAL
JOIN
without the correlated subquery errors when using non-equality
predicates. Check SPARK-36114 for more details.
Calculating working days is then as simple as run a query like:
SELECT mt.StartDate,
mt.EndDate,
dc.Workdays
FROM myTable mt
LEFT JOIN LATERAL
(SELECT COUNT(DateKey) - 1 AS Workdays
FROM dimCalendar dc
WHERE mt.StartDate <= dc.DateKey
AND mt.EndDate >= dc.Datekey
AND dc.IsDayWeekDay = TRUE
AND dc.IsDayHoliday = FALSE
)
If this query is slow, please check if the data types for the dates
columns match. All are DATE
or all are TIMESTAMP
. If the sluginess
remains check if any of the fields are not part of the statistics of
the table.
While creating a new machine to be the Integration Runtime for
Purview and after I have installed the mandatory JRE that allows
the connection to Snowflake I kept getting this error:
Error: (3913) JavaException: Must have Java 8 or newer installed.
This puzzled me because I had installed version 17. Started
troubleshooting and followed this guide to check if the
installation was correct. Everything seemed alright but the error
was still there.
There’s a lot to chew while unit testing Azure Functions. I going to
be quite liberal with the terminology because technically some of
this will be in fact integration testing and not unit testing per se.
Either way, Azure Functions load the local.settings.json
on
startup, creating several environment variables that then we can use
in our code. In C#
we would access them like so:
tenantId = configRoot["TenantId"];
appId = configRoot["AppId"];
or in Python
:
As I explained in a previous post about Purview with the official
Snowflake connector you need to create a scan per database/schema
pair. In the customer where I was working there are a few hundred
databases plus their schemas.
Doing this by using the UI would mean that we will be wasting a lot
of time clicking away and be prone to forget some database/schema
pair.
I’ve explored the documentation about the Scans - REST API and the
rest of this post is just some PowerShell
code to automate the
creation of scans and their schedules in Purview.
In the last few weeks I’ve been working with Purview doing some code
to integrate Snowflake entities because there wasn’t an official
connector. You can imagine my surprise when I’ve noticed last friday
that there’s a new unannounced preview connector that does the work
for us.
PyApacheAtlas
If you already read or watched something about the recommended way to
create custom entities on Purview you have heard something about this
library.