Collibra DQ User Guide
2022.10
Search
⌃K

Advanced

Using Owl on Spark
We've moved! To improve customer experience, the Collibra Data Quality User Guide has moved to the Collibra Documentation Center as part of the Collibra Data Quality 2022.11 release. To ensure a seamless transition, dq-docs.collibra.com will remain accessible, but the DQ User Guide is now maintained exclusively in the Documentation Center.

Programmatic DQ

Don't like leaving your notebook? Want to build Owl into your in-house data quality pipeline? Owl can do both!

Real World Examples

Rules

Let's assume we were provided a file named "atm_cust_file" and want to load it into a database table as well as scan it for all possible errors. We want to provide a couple levels of protection. 1) A business rule checking if a customer joined before before the company was founded. 2) Check if the file 100% matches to the DataFrame or db table we've created. 3) Check for all possible outliers or anomalies in the dataset. Each one of these 3 issues had a different impact to the business and causes a different flow to trigger in our pipeline.

Add Rule

Let's create a simple rule and assign points to the overall scoring system for later delegation.
val rule = new domain2.Rule
rule.setRuleNm("customer_before_company")
rule.setRuleValue("customer_since_date < '1956-11-01'")
rule.setPerc(1.0)
rule.setPoints(1)
rule.setIsActive(1)
rule.setUserNm("Kirk")
rule.setDataset("ATM_CUSTOMER3")
Util.addRule(rule=rule)
Now let's chain together the remaining 2 items that were part of our original requirement. Note that Owl has 6 additional ML DQ features that we did not turn on in this case.
val owl = Util.OwlContext(df, atmCustFile, props)
// first register with catalog if not registered
owl.register(props)
// Check if dataframe matches the source file 'atm_cust_file'
val source = owl.validateSrcDF
if (source.count() > 1) {
// create service now ticket and exit with fail based on not matching to original file
}
owl.addAdHocRule(rule)
val ruleBreaks = owl.rulesDF
if (ruleBreaks.count() > 1) {
if (ruleBreaks.where($"score" > 5).count > 1) {
// create service now ticket and exit with fail based on rules
}
}
val outliers = owl.outliersDF
if (outliers.where($"confidence" < 10).count > 3) {
// Owl email Alert to business group for attention
// where 3 outliers have a confidence below 10
}

Ingesting Intraday Files

Here we illustrate an example of how to work with files when using Owl programmatically. This can be implemented in both a Notebook setting and in your own codebase.
///////////////////////////////////////////////////////////////////////////
// USE CASE - Ingesting Intraday Files //
///////////////////////////////////////////////////////////////////////////
// Part of your pipeline includes the ingestion of files that have the date
// and hour encoded in the file name. How do you process those files using Owl?
//
// Format: <name>_<year>_<month>_<day>.csv
//
// Build up a data structure containg the files you want to process (here we
// just use a simple list, but you may want to be pulling from a pubsub
// queue, AWS bucket, etc...). Here we just use a simple file list of 6
// hours of trade position data.
val position_files = List(
new File(getClass.getResource("/position_file_2019_11_03_08.csv").getPath),
new File(getClass.getResource("/position_file_2019_11_03_09.csv").getPath),
new File(getClass.getResource("/position_file_2019_11_03_10.csv").getPath),
new File(getClass.getResource("/position_file_2019_11_03_11.csv").getPath),
new File(getClass.getResource("/position_file_2019_11_03_12.csv").getPath),
new File(getClass.getResource("/position_file_2019_11_03_13.csv").getPath),
new File(getClass.getResource("/position_file_2019_11_03_14.csv").getPath))
// Create your spark session.
val spark = SparkSession.builder
.master("local")
.appName("test")
.getOrCreate()
// Configure Owl.
val opt = new OwlOptions
opt.dataset = "positions"
opt.load.delimiter = ","
opt.spark.master = "local[1]"
opt.dupe.on = true
opt.dupe.include = Array("ticker", "cid")
opt.outlier.on = true
opt.outlier.key = Array("cid")
opt.outlier.timeBin = TimeBin.HOUR
// Customize this to only process a subset of the data.
opt.load.fileQuery = "select * from dataset"
position_files.foreach { file: File =>
// Tell Owl where to find the file.
opt.load.filePath = file.getPath
// Parse the filename to construct the run date (-rd) that will be passed
// to Owl.
val name = file.getName.split('.').head
val parts = name.split("_")
val date = parts.slice(2, 5).mkString("-")
val hour = parts.takeRight(1).head
// Must be in format 'yyyy-MM-dd' or 'yyyy-MM-dd HH:mm'.
val rd = s"${date} ${hour}"
// Tell Owl to process data
opt.runId = rd
// Create a DataFrame from the file.
val df = OwlUtils.load(opt.load.filePath, opt.load.delimiter, spark)
// Instantiate an OwlContext with the dataframe and our custom configuration.
val owl = OwlUtils.OwlContext(df, spark, opt)
// Make sure Owl has catalogued the dataset.
owl.register(opt)
// Let Owl do the rest!
owl.owlCheck()
}

All Pipeline Activities in 1 Line

For brevity and convenience Owl allows a DF to be loaded in the constructor and in 1 line run all 9 dimensions of data quality "owl.owlcheck". To adjust the DQ dimensions you simply set the properties in the props object.
val owl = Util.OwlContext(df, atmCustFile, props)
owl.owlCheck

Example of some common property settings

val props = new Props()
props.filePath = s"${filePath}/atm_customer_${rd.replace("-","_")}.csv"
props.runId = rd
props.dateCol = "OWL_RUN_ID"
props.dataset = "ATM_CUSTOMER3"
props.del = ","
props.datasetSafety = false
props.calculateBoundaries = true
props.fileLookBack = true
props.timeBin = "DAY"
// outlier, missing records
props.dl = true
props.dlKey = "customer_id"
props.dlLb = 4
// pattern mining
props.freqPatternMiningByKey = true
props.fpgKey = "customer_id"
props.fpgLookback = 4
props.fpgDateCol = "OWL_RUN_ID"
props.fpgCols = "card_number,first_name,last_name,checking_savings"
props.fpgLowFreq = true
// validate Src
props.validateSrc = true
props.valSrcKey = "customer_id"
// fuzzy match
props.isDupe = true
props.dupeCutOff = 88
props.depth = 3
props.dupeExcludeCols = "customer_id,card_number,customer_since_date,OWL_RUN_ID"

Using Notebooks to build DQ Pipelines

For examples on how to do this, see our Notebook repository below.

Last modified 6mo ago