Advanced
Using Owl on Spark

Programmatic DQ

Don't like leaving your notebook? Want to build Owl into your in-house data quality pipeline? Owl can do both!

Real World Examples

Rules

Let's assume we were provided a file named "atm_cust_file" and want to load it into a database table as well as scan it for all possible errors. We want to provide a couple levels of protection. 1) A business rule checking if a customer joined before before the company was founded. 2) Check if the file 100% matches to the DataFrame or db table we've created. 3) Check for all possible outliers or anomalies in the dataset. Each one of these 3 issues had a different impact to the business and causes a different flow to trigger in our pipeline.

Add Rule

Let's create a simple rule and assign points to the overall scoring system for later delegation.
1
val rule = new domain2.Rule
2
rule.setRuleNm("customer_before_company")
3
rule.setRuleValue("customer_since_date < '1956-11-01'")
4
rule.setPerc(1.0)
5
rule.setPoints(1)
6
rule.setIsActive(1)
7
rule.setUserNm("Kirk")
8
rule.setDataset("ATM_CUSTOMER3")
9
Util.addRule(rule=rule)
Copied!
Now let's chain together the remaining 2 items that were part of our original requirement. Note that Owl has 6 additional ML DQ features that we did not turn on in this case.
1
val owl = Util.OwlContext(df, atmCustFile, props)
2
3
// first register with catalog if not registered
4
owl.register(props)
5
6
// Check if dataframe matches the source file 'atm_cust_file'
7
val source = owl.validateSrcDF
8
if (source.count() > 1) {
9
// create service now ticket and exit with fail based on not matching to original file
10
}
11
12
owl.addAdHocRule(rule)
13
val ruleBreaks = owl.rulesDF
14
if (ruleBreaks.count() > 1) {
15
if (ruleBreaks.where($"score" > 5).count > 1) {
16
// create service now ticket and exit with fail based on rules
17
}
18
}
19
20
val outliers = owl.outliersDF
21
if (outliers.where($"confidence" < 10).count > 3) {
22
// Owl email Alert to business group for attention
23
// where 3 outliers have a confidence below 10
24
}
Copied!

Ingesting Intraday Files

Here we illustrate an example of how to work with files when using Owl programmatically. This can be implemented in both a Notebook setting and in your own codebase.
1
///////////////////////////////////////////////////////////////////////////
2
// USE CASE - Ingesting Intraday Files //
3
///////////////////////////////////////////////////////////////////////////
4
5
// Part of your pipeline includes the ingestion of files that have the date
6
// and hour encoded in the file name. How do you process those files using Owl?
7
//
8
// Format: <name>_<year>_<month>_<day>.csv
9
//
10
// Build up a data structure containg the files you want to process (here we
11
// just use a simple list, but you may want to be pulling from a pubsub
12
// queue, AWS bucket, etc...). Here we just use a simple file list of 6
13
// hours of trade position data.
14
val position_files = List(
15
new File(getClass.getResource("/position_file_2019_11_03_08.csv").getPath),
16
new File(getClass.getResource("/position_file_2019_11_03_09.csv").getPath),
17
new File(getClass.getResource("/position_file_2019_11_03_10.csv").getPath),
18
new File(getClass.getResource("/position_file_2019_11_03_11.csv").getPath),
19
new File(getClass.getResource("/position_file_2019_11_03_12.csv").getPath),
20
new File(getClass.getResource("/position_file_2019_11_03_13.csv").getPath),
21
new File(getClass.getResource("/position_file_2019_11_03_14.csv").getPath))
22
23
// Create your spark session.
24
val spark = SparkSession.builder
25
.master("local")
26
.appName("test")
27
.getOrCreate()
28
29
// Configure Owl.
30
val opt = new OwlOptions
31
opt.dataset = "positions"
32
opt.load.delimiter = ","
33
opt.spark.master = "local[1]"
34
opt.dupe.on = true
35
opt.dupe.include = Array("ticker", "cid")
36
opt.outlier.on = true
37
opt.outlier.key = Array("cid")
38
opt.outlier.timeBin = TimeBin.HOUR
39
// Customize this to only process a subset of the data.
40
opt.load.fileQuery = "select * from dataset"
41
42
position_files.foreach { file: File =>
43
// Tell Owl where to find the file.
44
opt.load.filePath = file.getPath
45
46
// Parse the filename to construct the run date (-rd) that will be passed
47
// to Owl.
48
val name = file.getName.split('.').head
49
val parts = name.split("_")
50
val date = parts.slice(2, 5).mkString("-")
51
val hour = parts.takeRight(1).head
52
53
// Must be in format 'yyyy-MM-dd' or 'yyyy-MM-dd HH:mm'.
54
val rd = s"${date} ${hour}"
55
56
// Tell Owl to process data
57
opt.runId = rd
58
59
// Create a DataFrame from the file.
60
val df = OwlUtils.load(opt.load.filePath, opt.load.delimiter, spark)
61
62
// Instantiate an OwlContext with the dataframe and our custom configuration.
63
val owl = OwlUtils.OwlContext(df, spark, opt)
64
65
// Make sure Owl has catalogued the dataset.
66
owl.register(opt)
67
68
// Let Owl do the rest!
69
owl.owlCheck()
70
71
}
Copied!

All Pipeline Activities in 1 Line

For brevity and convenience Owl allows a DF to be loaded in the constructor and in 1 line run all 9 dimensions of data quality "owl.owlcheck". To adjust the DQ dimensions you simply set the properties in the props object.
1
val owl = Util.OwlContext(df, atmCustFile, props)
2
owl.owlCheck
Copied!

Example of some common property settings

1
val props = new Props()
2
props.filePath = s"${filePath}/atm_customer_${rd.replace("-","_")}.csv"
3
props.runId = rd
4
props.dateCol = "OWL_RUN_ID"
5
props.dataset = "ATM_CUSTOMER3"
6
props.del = ","
7
props.datasetSafety = false
8
props.calculateBoundaries = true
9
props.fileLookBack = true
10
props.timeBin = "DAY"
11
12
// outlier, missing records
13
props.dl = true
14
props.dlKey = "customer_id"
15
props.dlLb = 4
16
17
// pattern mining
18
props.freqPatternMiningByKey = true
19
props.fpgKey = "customer_id"
20
props.fpgLookback = 4
21
props.fpgDateCol = "OWL_RUN_ID"
22
props.fpgCols = "card_number,first_name,last_name,checking_savings"
23
props.fpgLowFreq = true
24
25
// validate Src
26
props.validateSrc = true
27
props.valSrcKey = "customer_id"
28
29
// fuzzy match
30
props.isDupe = true
31
props.dupeCutOff = 88
32
props.depth = 3
33
props.dupeExcludeCols = "customer_id,card_number,customer_since_date,OWL_RUN_ID"
Copied!

Using Notebooks to build DQ Pipelines

For examples on how to do this, see our Notebook repository below.
GitHub - kirkhas/owl-notebooks: Owl Spark DQ Pipelines
GitHub

Last modified 20d ago