Collibra DQ User Guide

CDQ + Databricks

Learn how to create CDQ jobs in Databricks notebook.
We've moved! To improve customer experience, the Collibra Data Quality User Guide has moved to the Collibra Documentation Center as part of the Collibra Data Quality 2022.11 release. To ensure a seamless transition, will remain accessible, but the DQ User Guide is now maintained exclusively in the Documentation Center.


This document provides how to guidance to help you to upload and add CDQ jars to a Databricks cluster and to run a CDQ job by invoking CDQ APIs (aka activities).


Running CDQ jobs from Scala and Pyspark notebooks.

CDQ Environment Setup

In this section, we explain the steps involved in setting up your CDQ environment in Databricks. This is the first step towards invoking CDQ APIs in Databricks.

Step 1: Upload CDQ Core jar to Databricks

1. Extract the core jar from owl package zipped file.

The first step is to get the CDQ jar file. Once you have the cdq jar package file, you can get the jars by running the following commands:
tar -xvf package.tar.gz ``example: tar -xvf owl-2022.04-RC1-default-package-base.tar.gz
Running this command instructs tar to extract the files from the zipped file. From the list of the files, you need to upload the owl-core-xxxx-jar-with-dependancies.jar to our Databricks file system which will be explained in the next section.
Extracting the owl jar files from owl package zipped file.

Step 2. Upload the file to Databricks file system using UI

The jars should be manually uploaded in Databricks file system. Below is the quick summary of the steps. You can find more details about upload files in Databricks page:
  1. 1.
    Login to your Databricks account.
  2. 2.
    Data Icon
    Data in the sidebar.
  3. 3.
    Click the DBFS button at the top of the page.
  4. 4.
    Upload the owl-core-xxxx-jar-with-dependancies.jar to your desired path.
Upload owl-core-xxxx-jar-with-dependancies.jar to DBFS.

Step 3. Install CDQ library in your Databricks cluster

Install owl-core-xxxx-jar-with-dependancies.jar in your cluster.
Once this step is completed, you can create a workspace and start using CDQ APIs. Step4. (Optional) Update datasource pool size ****This step is only necessary if you get PoolExhaustedException when you call CDQ APIs. To solve the issue you can simply update the connection pool size in the spark environment. SPRING_DATASOURCE_POOL_MAX_WAIT=500 SPRING_DATASOURCE_POOL_MAX_SIZE=30 SPRING_DATASOURCE_POOL_INITIAL_SIZE=5 \ Here is the documentation from Databricks about how to set up environment variables:
Add CDQ environment variables to Databricks's cluster

CDQ Working Example in DataBricks

Import Collibra DQ Library

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import scala.collection.JavaConverters._
import java.util.Date
import java.time.LocalDate
import java.text.SimpleDateFormat
import spark.implicits._
import java.util.{ArrayList, List, UUID}
// CDQ Imports
import com.owl.core.Owl
import com.owl.common.options._
import com.owl.common.domain2._
import com.owl.core.util.OwlUtils

Bringing Customer Data From Another Database

// Option 1: Bringing Customer Data From A File
val df = (
.format("csv").option("header", true).option("delimiter", ",")
// Option 2: Bringing Customer Data From A Database
val connProps = Map(
"driver" -> "org.postgresql.Driver",
"user" -> "????",
"password" -> "????",
"url" -> "jdbc:postgresql://xxx:0000/postgres",
"dbtable" -> "public.example_data")
//--- Load Spark DataFrame ---//
val df ="jdbc").options(connProps).load display(df)
display(df) // view your data

Variables to setup CDQ metastore database location

val pgHost = ""
val pgDatabase = "postgres"
val pgSchema = "public"
val pgUser = "???????"
val pgPass = "????"
val pgPort = "0000"

Create Collibra DQ Test (Rules) and Detects Breaks

Note: If the rules are already created and assigned to a dataset from UI, calling owlcheck() will automatically execute all the rules associated with the given dataset and there is no need to re-create the rule from notebook.
val dataset = "cdq_notebook_db_rules"
var date = "2018-01-11"
// Options
val opt = new OwlOptions()
opt.dataset = dataset
opt.runId = date = pgHost
opt.port = pgPort
opt.pgUser = pgUser
opt.pgPassword = pgPass
opt.setDatasetSafeOff(false) // to enable historical overwrite of dataset
// Create a simple rule and assign it to dataset
val simpleRule = OwlUtils.createRule(opt.dataset)
simpleRule.setRuleValue("symbol == 'BHK'")
// Create a rule from generic rules that are created from UI:
val genericRule = OwlUtils.createRule(opt.dataset)
genericRule.setRuleNm("exchangeRule") // this could be any name
genericRule.setRuleRepo("exchangeCheckRule"); // Validate the generic rule name
//from UI
genericRule.setRuleValue("EXCH") // COLUMN assosicate with the rule
// Pre Routine
val cdq = com.owl.core.util.OwlUtils.OwlContext(df, opt)
// Scan
val results = cdq.hoot() // returns object Hoot, not a DataFrame
//See Json Results(Option for downstream processing)
println(results) // optional
//Post Routine, See DataFrame Results (Option for downstream processing)
val breaks = cdq.getRuleBreakRows("nyse-stocks-symbol")
// Different Options for handling bad records
val badRecords = breaks.drop("_dataset","_run_id", "_rule_name", "owl_id")
val goodRecords = df.except(badRecords)

Write the breaks(bad records) DataFrame to a Parquet file

// Remove the file if it exists
dbutils.fs.rm("/tmp/databricks-df-example.parquet", true)
Below image shows the code snippet and the result in Databricks:
Create CDQ Test Rules In DataBricks
The breaks and the rules can be viewed in CDQ web as well.

Create Collibra DQ Test (Profile)

val dataset = "cdq_notebook_nyse_profile"
val runList = List("2018-01-01", "2018-01-02", "2018-01-03", "2018-01-04", "2018-01-05", "2018-01-08", "2018-01-09", "2018-01-10")
for(runId <- runList) {
// Options
val opt = new OwlOptions()
opt.dataset = dataset = pgHost
opt.port = pgPort
opt.pgUser = pgUser
opt.pgPassword = pgPass
val profileOpt = new ProfileOpt
profileOpt.on = true
profileOpt.behaviorEmptyCheck = true
profileOpt.behaviorMaxValueCheck = true
profileOpt.behaviorMinValueCheck = true
profileOpt.behaviorNullCheck = true
profileOpt.behaviorRowCheck = true
profileOpt.behaviorMeanValueCheck = true
profileOpt.behaviorUniqueCheck = true
profileOpt.behaviorMinSupport = 5 // default is 4
profileOpt.behaviorLookback = 5
options.profile = profileOpt
var date = runId
var df_1 = df.where($"TRADE_DATE"===s"$date")
val cdq = OwlUtils.OwlContext(df_1, options)
val profile = cdq.profileDF()
CDQ Profile Run In Databricks
The Profile result can be viewed in CDQ Web.

Create Collibra DQ Test (Dupes)

val dataset = "cdq_notebook_db_dupe"
var date = "2018-01-11"
// Options
val opt = new OwlOptions()
opt.dataset = dataset
opt.runId = date = pgHost
opt.port = pgPort
opt.pgUser = pgUser
opt.pgPassword = pgPass
opt.dupe.ignoreCase = true
opt.dupe.on = true
opt.dupe.lowerBound = 99
opt.dupe.include = Array("SYMBOL", "TRADE_DATE")
val cdq = OwlUtils.OwlContext(df, opt)
val dupesDf = cdq.getDupeRecords
CDQ Dupes Run In Databricks
Dupes results can be viewed in CDQ Web.

Create Collibra DQ Test (Outlier)

import scala.collection.JavaConverters._
import java.util
import java.util.{ArrayList, List, UUID}
val dataset = "cdq_notebook_db_outlier"
var date = "2018-01-11"
// Options
val opt = new OwlOptions()
opt.dataset = dataset
opt.runId = date = pgHost
opt.port = pgPort
opt.pgUser = pgUser
opt.pgPassword = pgPass
opt.dupe.on = false
val dlMulti: util.List[OutlierOpt] = new util.ArrayList[OutlierOpt]
val outlierOpt = new OutlierOpt()
outlierOpt.combine = true
outlierOpt.dateColumn = "trade_date"
outlierOpt.lookback = 4
outlierOpt.key = Array("symbol")
outlierOpt.include = Array("high")
outlierOpt.historyLimit = 10
val cdq = OwlUtils.OwlContext(df, opt)
val outliers = cdq.getOutliers()"value")

Known API Limitations

CDQ activities can not be called independently for the time being. owlCheck() function should be called before calling any of the activities. For example to get the profile DataFrame you should call below code snippet: