-
-
Save svavassori/3319ff9d7e16a8788665ca59a5a04889 to your computer and use it in GitHub Desktop.
import sys | |
import csv | |
import json | |
# Converts the JSON output of a PowerBI query to a CSV file | |
def extract(input_file, output_file): | |
input_json = read_json(input_file) | |
data = input_json["results"][0]["result"]["data"] | |
dm0 = data["dsr"]["DS"][0]["PH"][0]["DM0"] | |
columns_types = dm0[0]["S"] | |
columns = map(lambda item: item["GroupKeys"][0]["Source"]["Property"] if item["Kind"] == 1 else item["Value"], data["descriptor"]["Select"]) | |
value_dicts = data["dsr"]["DS"][0].get("ValueDicts", {}) | |
reconstruct_arrays(columns_types, dm0) | |
expand_values(columns_types, dm0, value_dicts) | |
replace_newlines_with(dm0, "") | |
write_csv(output_file, columns, dm0) | |
def read_json(file_name): | |
with open(file_name) as json_config_file: | |
return json.load(json_config_file) | |
def write_csv(output_file, columns, dm0): | |
with open(output_file, "w") as csvfile: | |
wrt = csv.writer(csvfile) | |
wrt.writerow(columns) | |
for item in dm0: | |
wrt.writerow(item["C"]) | |
def reconstruct_arrays(columns_types, dm0): | |
# fixes array index by applying | |
# "R" bitset to copy previous values | |
# "Ø" bitset to set null values | |
lenght = len(columns_types) | |
for item in dm0: | |
currentItem = item["C"] | |
if "R" in item or "Ø" in item: | |
copyBitset = item.get("R", 0) | |
deleteBitSet = item.get("Ø", 0) | |
for i in range(lenght): | |
if is_bit_set_for_index(i, copyBitset): | |
currentItem.insert(i, prevItem[i]) | |
elif is_bit_set_for_index(i, deleteBitSet): | |
currentItem.insert(i, None) | |
prevItem = currentItem | |
def is_bit_set_for_index(index, bitset): | |
return (bitset >> index) & 1 == 1 | |
# substitute indexes with actual values | |
def expand_values(columns_types, dm0, value_dicts): | |
for (idx, col) in enumerate(columns_types): | |
if "DN" in col: | |
for item in dm0: | |
dataItem = item["C"] | |
if isinstance(dataItem[idx], int): | |
valDict = value_dicts[col["DN"]] | |
dataItem[idx] = valDict[dataItem[idx]] | |
def replace_newlines_with(dm0, replacement): | |
for item in dm0: | |
elem = item["C"] | |
for i in range(len(elem)): | |
if isinstance(elem[i], str): | |
elem[i] = elem[i].replace("\n", replacement) | |
def main(): | |
if len(sys.argv) == 3: | |
extract(sys.argv[1], sys.argv[2]) | |
else: | |
sys.exit("Usage: python3 " + sys.argv[0] + " input_file output_file", file=sys.stderr) | |
if __name__ == "__main__": | |
main() |
Thanks for this script. It was hard to understand the JSON structure but your script made it super easy.
Thanks!
I'm glad it was useful for you too.
I'm struggling to understand the use case - how would I produce an input_file?
The content of input_file
is the output (json-like) of an HTTP request query to PowerBI, you can grab it from the network tab of the browser's dev tools or by using an external tool like cURL
or postman
got it - thanks!
What would happen if you have both a DM0 and a DM1? I don't understand much fo this DSR language tbh
The script doesn't look at DM1, so nothing different from DM0 only
Alright so for anyone reading this in the future, DM I believe is just a result of a query, in my scenario I had a DM0 which was high level aggregates and DM1 which contained the detailed information I was interested in. I was able to use the script on DM1 and worked flawlessly.
Thanks @svavassori for spending time to understand this strange data format!
This was SUPER helpful! Thanks for this simple Power BI Script to help extract data!