DQ Job JSON

Files

Run against a file using -json. Additionally, options are available for -flatten and -multiline. This is helpful for nested and various formats.
1
-ds json_file_example \
2
-f s3a://bucket_name/file.json \
3
-h instance.us-east4-c.c.owl-node.internal:5432/postgres \
4
-master spark://instance.us-east4-c.c.owl-node.internal:7077 \
5
-json \
6
-flatten \
7
-multiline
Copied!
Automatic flattening will infer schema and explode all structs, arrays, and map types.

Using Spark SQL

1
-ds public.json_sample \
2
-lib "/opt/owl/drivers/postgres/" \
3
-h instance.us-east4-c.c.owl-node.internal:5432/postgres \
4
-master spark://instance.us-east4-c.c.owl-node.internal:7077
5
-q "select * from public.jason"
6
-rd "2021-01-17"
7
-driver "org.postgresql.Driver"
8
-cxn postgres-gcp
9
-fq "select \
10
get_json_object(col_3, '$.data._customer_name') AS `data_customer_name` , \
11
get_json_object(col_3, '$.data._active_customer') AS `data_active_customer` , \
12
from dataset "
13
Copied!
Pass in the path to Owls' -fq parameter. This is great for mixed data types within a database. For example, if you store JSON data as a string or a blob among other data.
1
// Flatten
2
val colArr = new JsonReader().flattenSchema(df.schema)
3
colArr.foreach(x => println(x))
Copied!
This Owl utility will traverse the entire schema and print the proper get JSON object spark sql strings. You can use this instead of typing each query statement into the command line -fq parameter as seen above.

Using Owl Libraries

1
import com.owl.common.options._
2
import com.owl.core.util.OwlUtils
3
import com.owl.core.activity2.JsonReader
4
5
val connProps = Map (
6
"driver" -> "org.postgresql.Driver",
7
"user" -> "user",
8
"password" -> "password",
9
"url" -> "jdbc:postgresql://10.173.0.14:5432/postgres",
10
"dbtable" -> "public.data"
11
)
12
13
// Spark
14
var rdd = spark.read.format("jdbc").options(connProps).load.select($"col_name").map(x=>x.toString()).rdd
15
var df = spark.read.json(rdd)
16
17
// Flatten
18
val colArr = new JsonReader().flattenSchema(df.schema)
19
val flatJson = df.select(colArr: _*)
20
flatJson.cache.count
21
22
// Opts
23
val dataset = "json_example"
24
val runId = s"2021-01-14"
25
val opt = new OwlOptions()
26
opt.dataset = dataset
27
opt.runId = runId
28
opt.datasetSafeOff = true
29
30
// Owlcheck
31
OwlUtils.resetDataSource("instance.us-east4-c.c.owl-node.internal","5432/postgres","user","pass", spark)
32
val owl = OwlUtils.OwlContext(flatJson, opt)
33
owl.register(opt)
34
owl.owlCheck
Copied!

JsonReader()

This uses Owl's JsonReader to do the heavy lifting.
Last modified 20d ago