Skip to content

Instantly share code, notes, and snippets.

@paul121
Created April 18, 2024 03:40
Show Gist options
  • Save paul121/9b1ab39c9045ed81817bc05350282cd2 to your computer and use it in GitHub Desktop.
Save paul121/9b1ab39c9045ed81817bc05350282cd2 to your computer and use it in GitHub Desktop.
Experiment plan log quantity export

Experiment plan exports

This script is a good example of how to use the API to export log + quantity data from experiment plans. This use-case is a good complex examples that demonstrates why a good understanding of the data structure is needed.

In order to get logs associtated with the plan, we must first determine what assets we are interested in as logs will only reference assets. Our experiment plans have an asset and plot reference field that we can use to determine the assets we are interested in.

Next, we need to find logs associated with each asset. Due to how the API works we need to make separate API requests for each log_type. I've also chosen to filter this request to only a single asset ID. It is possible to filter by many asset IDs, but there is a limit to how many you can use at once, and I found that I was hitting this limit. Ultimately this results in LOTS of requests...

Which is good timing, because this was a good way to validate our decision to add async support to the FarmOS.py Python library. This allows us to dispatch multiple (10-20) requests at once rather than executing one at a time. Thus, this example code is for the upcoming (unreleased) v2 of FarmOS.py, but can be easily adapted to standard v1.

import asyncio
from collections import ChainMap
import csv
import time
from farmOS import AsyncFarmClient
from httpx_auth import OAuth2ResourceOwnerPasswordCredentials
from httpx import Limits
hostname = "https://rothamstedfarm.farmos.net"
log_types = [
"activity",
"drilling",
"harvest",
"input",
"lab_test",
"maintenance",
"observation",
"seeding",
]
async def process_all_asset_logs(farm, asset_id):
"""
Helper function to dispatch jobs and combine logs for the asset.
"""
tasks = [process_asset_log_type(farm, log_type, asset_id) for log_type in log_types]
results = await asyncio.gather(*tasks)
return dict(ChainMap(*results))
async def process_asset_log_type(farm, log_type, asset_id):
"""
Helper function to fetch all logs of a given type for an asset.
"""
collected_logs = {}
asset_id_filter = farm.filter('asset.id', asset_id)
params = {
**asset_id_filter,
"sort": "timestamp",
"include": "quantity,quantity.units,quantity.material_type",
}
logs = await farm.resource.get('log', log_type, {**params})
# Index includes by id.
includes = {}
for included in logs.get("included", []):
includes[included["id"]] = included
# Process each log.
for log in logs["data"]:
uuid = log["id"]
collected_logs[uuid] = {
"uuid": uuid,
"id": log["attributes"]["drupal_internal__id"],
"quantity": {}
}
# Process quantities.
quantity_ids = [quantity["id"] for quantity in log["relationships"]["quantity"]["data"]]
for qid in quantity_ids:
quantity = includes[qid]
# Pull in units term.
quantity["units"] = None
if includes[qid]["relationships"]["units"]["data"] is not None:
term_id = includes[qid]["relationships"]["units"]["data"]["id"]
if term_id in includes:
quantity["units"] = includes[term_id]["attributes"]["name"]
# Pull in material type term.
quantity["material_type"] = []
if includes[qid]["relationships"].get("material_type"):
for term in includes[qid]["relationships"]["material_type"]["data"]:
if term["id"] in includes:
quantity["material_type"].append(includes[term["id"]]["attributes"]["name"])
collected_logs[uuid]["quantity"][qid] = quantity
return collected_logs
async def main():
"""
Main function.
"""
auth = OAuth2ResourceOwnerPasswordCredentials(
token_url=f"{hostname}/oauth/token",
username="",
password="",
client_id="farm",
scope="rothamsted_data_admin"
)
limits = Limits(max_connections=15, max_keepalive_connections=15)
async with AsyncFarmClient(hostname, auth=auth, limits=limits, timeout=60) as farm:
# Hard-code plan id.
# Change me!
plan_id = 123
# Query the plan.
filters = farm.filter('drupal_internal__id', plan_id)
plans = await farm.resource.get('plan', 'rothamsted_experiment', {**filters})
plan = plans["data"][0]
# Collect plot and asset IDs.
asset_ids = [reference["id"] for reference in plan["relationships"]["asset"]["data"]]
plot_ids = [reference["id"] for reference in plan["relationships"]["plot"]["data"]]
all_asset_ids = asset_ids + plot_ids
# Dispatch jobs to request all logs for each asset.
tic = time.time()
print(f"Fetching logs for {len(all_asset_ids)} assets...")
jobs = [process_all_asset_logs(farm, asset_id) for asset_id in all_asset_ids]
result = await asyncio.gather(*jobs)
toc = time.time()
print(f"Done fetching in {toc-tic} seconds.")
# Merge a list of dicts into a single dict.
all_logs = dict(ChainMap(*result))
print(f"Found {len(all_logs)} unique logs.")
# Build a CSV.
with open(f"plan-{plan_id}-log-quantity.csv", 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
# Header row.
writer.writerow(["log id", "log uuid", "type", "measure", "value", "units", "label", "material_type"])
# Loop through logs.
for log in all_logs.values():
id = log["id"]
uuid = log["uuid"]
for quantity in log.get("quantity", {}).values():
value = None
if quantity["attributes"]["value"]:
value = quantity["attributes"]["value"].get("decimal")
writer.writerow([
id,
uuid,
quantity["relationships"]["quantity_type"]["data"]["meta"]["drupal_internal__target_id"],
quantity["attributes"]["measure"],
value,
quantity["units"],
quantity["attributes"]["label"],
"|".join(quantity["material_type"]),
])
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment