|
import asyncio |
|
from collections import ChainMap |
|
import csv |
|
import time |
|
|
|
from farmOS import AsyncFarmClient |
|
from httpx_auth import OAuth2ResourceOwnerPasswordCredentials |
|
from httpx import Limits |
|
|
|
hostname = "https://rothamstedfarm.farmos.net" |
|
|
|
log_types = [ |
|
"activity", |
|
"drilling", |
|
"harvest", |
|
"input", |
|
"lab_test", |
|
"maintenance", |
|
"observation", |
|
"seeding", |
|
] |
|
|
|
|
|
async def process_all_asset_logs(farm, asset_id): |
|
""" |
|
Helper function to dispatch jobs and combine logs for the asset. |
|
""" |
|
tasks = [process_asset_log_type(farm, log_type, asset_id) for log_type in log_types] |
|
results = await asyncio.gather(*tasks) |
|
return dict(ChainMap(*results)) |
|
|
|
|
|
async def process_asset_log_type(farm, log_type, asset_id): |
|
""" |
|
Helper function to fetch all logs of a given type for an asset. |
|
""" |
|
|
|
collected_logs = {} |
|
asset_id_filter = farm.filter('asset.id', asset_id) |
|
params = { |
|
**asset_id_filter, |
|
"sort": "timestamp", |
|
"include": "quantity,quantity.units,quantity.material_type", |
|
} |
|
logs = await farm.resource.get('log', log_type, {**params}) |
|
|
|
# Index includes by id. |
|
includes = {} |
|
for included in logs.get("included", []): |
|
includes[included["id"]] = included |
|
|
|
# Process each log. |
|
for log in logs["data"]: |
|
uuid = log["id"] |
|
collected_logs[uuid] = { |
|
"uuid": uuid, |
|
"id": log["attributes"]["drupal_internal__id"], |
|
"quantity": {} |
|
} |
|
|
|
# Process quantities. |
|
quantity_ids = [quantity["id"] for quantity in log["relationships"]["quantity"]["data"]] |
|
for qid in quantity_ids: |
|
quantity = includes[qid] |
|
|
|
# Pull in units term. |
|
quantity["units"] = None |
|
if includes[qid]["relationships"]["units"]["data"] is not None: |
|
term_id = includes[qid]["relationships"]["units"]["data"]["id"] |
|
if term_id in includes: |
|
quantity["units"] = includes[term_id]["attributes"]["name"] |
|
|
|
# Pull in material type term. |
|
quantity["material_type"] = [] |
|
if includes[qid]["relationships"].get("material_type"): |
|
for term in includes[qid]["relationships"]["material_type"]["data"]: |
|
if term["id"] in includes: |
|
quantity["material_type"].append(includes[term["id"]]["attributes"]["name"]) |
|
|
|
collected_logs[uuid]["quantity"][qid] = quantity |
|
|
|
return collected_logs |
|
|
|
|
|
async def main(): |
|
""" |
|
Main function. |
|
""" |
|
|
|
auth = OAuth2ResourceOwnerPasswordCredentials( |
|
token_url=f"{hostname}/oauth/token", |
|
username="", |
|
password="", |
|
client_id="farm", |
|
scope="rothamsted_data_admin" |
|
) |
|
limits = Limits(max_connections=15, max_keepalive_connections=15) |
|
async with AsyncFarmClient(hostname, auth=auth, limits=limits, timeout=60) as farm: |
|
|
|
# Hard-code plan id. |
|
# Change me! |
|
plan_id = 123 |
|
|
|
# Query the plan. |
|
filters = farm.filter('drupal_internal__id', plan_id) |
|
plans = await farm.resource.get('plan', 'rothamsted_experiment', {**filters}) |
|
plan = plans["data"][0] |
|
|
|
# Collect plot and asset IDs. |
|
asset_ids = [reference["id"] for reference in plan["relationships"]["asset"]["data"]] |
|
plot_ids = [reference["id"] for reference in plan["relationships"]["plot"]["data"]] |
|
all_asset_ids = asset_ids + plot_ids |
|
|
|
# Dispatch jobs to request all logs for each asset. |
|
tic = time.time() |
|
print(f"Fetching logs for {len(all_asset_ids)} assets...") |
|
jobs = [process_all_asset_logs(farm, asset_id) for asset_id in all_asset_ids] |
|
result = await asyncio.gather(*jobs) |
|
toc = time.time() |
|
print(f"Done fetching in {toc-tic} seconds.") |
|
|
|
# Merge a list of dicts into a single dict. |
|
all_logs = dict(ChainMap(*result)) |
|
print(f"Found {len(all_logs)} unique logs.") |
|
|
|
# Build a CSV. |
|
with open(f"plan-{plan_id}-log-quantity.csv", 'w', newline='') as csvfile: |
|
writer = csv.writer(csvfile) |
|
|
|
# Header row. |
|
writer.writerow(["log id", "log uuid", "type", "measure", "value", "units", "label", "material_type"]) |
|
|
|
# Loop through logs. |
|
for log in all_logs.values(): |
|
id = log["id"] |
|
uuid = log["uuid"] |
|
for quantity in log.get("quantity", {}).values(): |
|
value = None |
|
if quantity["attributes"]["value"]: |
|
value = quantity["attributes"]["value"].get("decimal") |
|
writer.writerow([ |
|
id, |
|
uuid, |
|
quantity["relationships"]["quantity_type"]["data"]["meta"]["drupal_internal__target_id"], |
|
quantity["attributes"]["measure"], |
|
value, |
|
quantity["units"], |
|
quantity["attributes"]["label"], |
|
"|".join(quantity["material_type"]), |
|
]) |
|
|
|
asyncio.run(main()) |