Skip to content

Instantly share code, notes, and snippets.

@jpivarski
Last active July 17, 2024 15:58
Show Gist options
  • Save jpivarski/0132e88beab1f10b3f6edd02c0632357 to your computer and use it in GitHub Desktop.
Save jpivarski/0132e88beab1f10b3f6edd02c0632357 to your computer and use it in GitHub Desktop.
Introductory Dask
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## This notebook is meant for you to play around with!\n",
"## Try investigating `.dask` and other aspects of the taskgraphs below!"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import dask\n",
"from dask.threaded import get\n",
"from dask.local import get_sync\n",
"from dask.optimization import cull, inline, inline_functions, fuse\n",
"\n",
"def print_and_return(string):\n",
" print(string)\n",
" return string\n",
"\n",
"def format_str(count, val, nwords):\n",
" return (f'word list has {count} occurrences of '\n",
" f'{val}, out of {nwords} words')\n",
"\n",
"dsk = {'words': 'apple orange apple pear orange pear pear',\n",
" 'nwords': (len, (str.split, 'words')),\n",
" 'val1': 'orange',\n",
" 'val2': 'apple',\n",
" 'val3': 'pear',\n",
" 'count1': (str.count, 'words', 'val1'),\n",
" 'count2': (str.count, 'words', 'val2'),\n",
" 'count3': (str.count, 'words', 'val3'),\n",
" 'format1': (format_str, 'count1', 'val1', 'nwords'),\n",
" 'format2': (format_str, 'count2', 'val2', 'nwords'),\n",
" 'format3': (format_str, 'count3', 'val3', 'nwords'),\n",
" 'print1': (print_and_return, 'format1'),\n",
" 'print2': (print_and_return, 'format2'),\n",
" 'print3': (print_and_return, 'format3')}"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"dask.base.visualize_dsk(dsk, verbose=True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"outputs = ['print1', 'print2']\n",
"dsk1, dependencies = cull(dsk, outputs) # remove unnecessary tasks from the graph\n",
"\n",
"results = get_sync(dsk1, outputs)\n",
"\n",
"dask.base.visualize_dsk(dsk1, verbose=True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"dsk2 = inline(dsk1, dependencies=dependencies)\n",
"results = get_sync(dsk2, outputs)\n",
"\n",
"dask.base.visualize_dsk(dsk2, verbose=True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"dsk3 = inline_functions(dsk2, outputs, [len, str.split], dependencies=dependencies)\n",
"results = get_sync(dsk3, outputs)\n",
"\n",
"dask.base.visualize_dsk(dsk3, verbose=True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"dsk4, dependencies = fuse(dsk3)\n",
"results = get_sync(dsk4, outputs)\n",
"\n",
"dask.base.visualize_dsk(dsk4, verbose=True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def optimize(dsk, keys):\n",
" dsk1, deps = cull(dsk, keys)\n",
" dsk2 = inline(dsk1, dependencies=deps)\n",
" dsk3 = inline_functions(dsk2, keys, [len, str.split],\n",
" dependencies=deps)\n",
" dsk4, deps = fuse(dsk3)\n",
" return dsk4, deps\n",
"\n",
"def optimize_and_get(dsk, keys): \n",
" dsk4, deps = fuse(dsk, keys)\n",
" return get(dsk4, keys)\n",
"\n",
"optimize_and_get(dsk, outputs)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"dask.base.visualize_dsk(dsk, verbose=True, color=\"order\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"dsk5, _ = optimize(dsk, outputs)\n",
"\n",
"dask.base.visualize_dsk(dsk5, verbose=True, color=\"order\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import time\n",
"\n",
"import awkward as ak\n",
"import hist\n",
"import matplotlib.pyplot as plt\n",
"import numpy as np\n",
"from coffea.nanoevents import NanoEventsFactory, NanoAODSchema\n",
"from coffea import processor\n",
"\n",
"import dask\n",
"import dask_awkward as dak\n",
"import hist.dask as hda\n",
"\n",
"# The opendata files are non-standard NanoAOD, so some optional data columns are missing\n",
"NanoAODSchema.warn_missing_crossrefs = False\n",
"\n",
"# The warning emitted below indicates steps_per_file is for initial data exploration\n",
"# and test. When running at scale there are better ways to specify processing chunks\n",
"# of files.\n",
"events, report = NanoEventsFactory.from_root(\n",
" {\"https://github.com/CoffeaTeam/coffea/raw/master/tests/samples/nano_dy.root\": \"Events\"},\n",
" metadata={\"dataset\": \"Test\"},\n",
" uproot_options={\"allow_read_errors_with_report\": True},\n",
").events()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Query 1\n",
"\n",
"Plot the <i>E</i><sub>T</sub><sup>miss</sup> of all events."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"q1_hist = (\n",
" hda.Hist.new.Reg(100, 0, 200, name=\"met\", label=\"$E_{T}^{miss}$ [GeV]\")\n",
" .Double()\n",
" .fill(events.MET.pt)\n",
")\n",
"\n",
"dask.compute(q1_hist, report)[0].plot1d(flow=\"none\")\n",
"dak.necessary_columns(q1_hist)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Query 2\n",
"Plot the <i>p</i><sub>T</sub> of all jets."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"q2_hist = (\n",
" hda.Hist.new.Reg(100, 0, 200, name=\"ptj\", label=\"Jet $p_{T}$ [GeV]\")\n",
" .Double()\n",
" .fill(ak.flatten(events.Jet.pt))\n",
")\n",
"\n",
"\n",
"q2_hist.compute().plot1d(flow=\"none\")\n",
"dak.necessary_columns(q2_hist)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Query 3\n",
"Plot the <i>p</i><sub>T</sub> of jets with |<i>η</i>| < 1."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"q3_hist = (\n",
" hda.Hist.new.Reg(100, 0, 200, name=\"ptj\", label=\"Jet $p_{T}$ [GeV]\")\n",
" .Double()\n",
" .fill(ak.flatten(events.Jet[abs(events.Jet.eta) < 1].pt))\n",
")\n",
"\n",
"q3_hist.compute().plot1d(flow=\"none\")\n",
"dak.necessary_columns(q3_hist)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Query 4\n",
"Plot the <i>E</i><sub>T</sub><sup>miss</sup> of events that have at least two jets with <i>p</i><sub>T</sub> > 40 GeV."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"has2jets = ak.sum(events.Jet.pt > 40, axis=1) >= 2\n",
"q4_hist = (\n",
" hda.Hist.new.Reg(100, 0, 200, name=\"met\", label=\"$E_{T}^{miss}$ [GeV]\")\n",
" .Double()\n",
" .fill(events[has2jets].MET.pt)\n",
")\n",
"\n",
"q4_hist.compute().plot1d(flow=\"none\")\n",
"dak.necessary_columns(q4_hist)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Query 5\n",
"Plot the <i>E</i><sub>T</sub><sup>miss</sup> of events that have an\n",
"opposite-charge muon pair with an invariant mass between 60 and 120 GeV."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"mupair = ak.combinations(events.Muon, 2, fields=[\"mu1\", \"mu2\"])\n",
"pairmass = (mupair.mu1 + mupair.mu2).mass\n",
"goodevent = ak.any(\n",
" (pairmass > 60)\n",
" & (pairmass < 120)\n",
" & (mupair.mu1.charge == -mupair.mu2.charge),\n",
" axis=1,\n",
")\n",
"q5_hist = (\n",
" hda.Hist.new.Reg(100, 0, 200, name=\"met\", label=\"$E_{T}^{miss}$ [GeV]\")\n",
" .Double()\n",
" .fill(events[goodevent].MET.pt)\n",
")\n",
"\n",
"\n",
"q5_hist.compute().plot1d(flow=\"none\")\n",
"dak.necessary_columns(q5_hist)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Query 6\n",
"For events with at least three jets, plot the <i>p</i><sub>T</sub> of the trijet four-momentum that has the invariant mass closest to 172.5 GeV in each event and plot the maximum <i>b</i>-tagging discriminant value among the jets in this trijet."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"jets = ak.zip(\n",
" {k: getattr(events.Jet, k) for k in [\"x\", \"y\", \"z\", \"t\", \"btag\"]},\n",
" with_name=\"LorentzVector\",\n",
" behavior=events.Jet.behavior,\n",
")\n",
"trijet = ak.combinations(jets, 3, fields=[\"j1\", \"j2\", \"j3\"])\n",
"trijet[\"p4\"] = trijet.j1 + trijet.j2 + trijet.j3\n",
"trijet = ak.flatten(\n",
" trijet[ak.singletons(ak.argmin(abs(trijet.p4.mass - 172.5), axis=1))]\n",
")\n",
"maxBtag = np.maximum(\n",
" trijet.j1.btag,\n",
" np.maximum(\n",
" trijet.j2.btag,\n",
" trijet.j3.btag,\n",
" ),\n",
")\n",
"q6_hists = {\n",
" \"trijetpt\": hda.Hist.new.Reg(\n",
" 100, 0, 200, name=\"pt3j\", label=\"Trijet $p_{T}$ [GeV]\"\n",
" )\n",
" .Double()\n",
" .fill(trijet.p4.pt),\n",
" \"maxbtag\": hda.Hist.new.Reg(\n",
" 100, 0, 1, name=\"btag\", label=\"Max jet b-tag score\"\n",
" )\n",
" .Double()\n",
" .fill(maxBtag),\n",
"}\n",
"\n",
"out = dask.compute(q6_hists, report)[0]\n",
"fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(8, 4), sharey=True)\n",
"out[\"trijetpt\"].plot1d(ax=ax1, flow=\"none\")\n",
"out[\"maxbtag\"].plot1d(ax=ax2, flow=\"none\")\n",
"dak.necessary_columns(q6_hists)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Query 7\n",
"Plot the scalar sum in each event of the <i>p</i><sub>T</sub> of jets with <i>p</i><sub>T</sub> > 30 GeV that are not within 0.4 in Δ<i>R</i> of any light lepton with <i>p</i><sub>T</sub> > 10 GeV."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"cleanjets = events.Jet[\n",
" ak.all(\n",
" events.Jet.metric_table(events.Muon[events.Muon.pt > 10]) >= 0.4, axis=2\n",
" )\n",
" & ak.all(\n",
" events.Jet.metric_table(events.Electron[events.Electron.pt > 10]) >= 0.4,\n",
" axis=2,\n",
" )\n",
" & (events.Jet.pt > 30)\n",
"]\n",
"q7_hist = (\n",
" hda.Hist.new.Reg(\n",
" 100, 0, 200, name=\"sumjetpt\", label=\"Jet $\\sum p_{T}$ [GeV]\"\n",
" )\n",
" .Double()\n",
" .fill(ak.sum(cleanjets.pt, axis=1))\n",
")\n",
"\n",
"q7_hist.compute().plot1d(flow=\"none\")\n",
"dak.necessary_columns(q7_hist)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Query 8\n",
"For events with at least three light leptons and a same-flavor opposite-charge light lepton pair, find such a pair that has the invariant mass closest to 91.2 GeV in each event and plot the transverse mass of the system consisting of the missing tranverse momentum and the highest-<i>p</i><sub>T</sub> light lepton not in this pair."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"events[\"Electron\", \"pdgId\"] = -11 * events.Electron.charge\n",
"events[\"Muon\", \"pdgId\"] = -13 * events.Muon.charge\n",
"events[\"leptons\"] = ak.concatenate(\n",
" [events.Electron, events.Muon],\n",
" axis=1,\n",
")\n",
"events = events[ak.num(events.leptons) >= 3]\n",
"pair = ak.argcombinations(events.leptons, 2, fields=[\"l1\", \"l2\"])\n",
"pair = pair[(events.leptons[pair.l1].pdgId == -events.leptons[pair.l2].pdgId)]\n",
"\n",
"pair = pair[\n",
" ak.singletons(\n",
" ak.argmin(\n",
" abs(\n",
" (events.leptons[pair.l1] + events.leptons[pair.l2]).mass\n",
" - 91.2\n",
" ),\n",
" axis=1,\n",
" )\n",
" )\n",
"]\n",
"\n",
"events = events[ak.num(pair) > 0]\n",
"pair = pair[ak.num(pair) > 0][:, 0]\n",
"\n",
"l3 = ak.local_index(events.leptons)\n",
"l3 = l3[(l3 != pair.l1) & (l3 != pair.l2)]\n",
"l3 = l3[ak.argmax(events.leptons[l3].pt, axis=1, keepdims=True)]\n",
"l3 = events.leptons[l3][:, 0]\n",
"\n",
"mt = np.sqrt(2 * l3.pt * events.MET.pt * (1 - np.cos(events.MET.delta_phi(l3))))\n",
"q8_hist = (\n",
" hda.Hist.new.Reg(\n",
" 100, 0, 200, name=\"mt\", label=\"$\\ell$-MET transverse mass [GeV]\"\n",
" )\n",
" .Double()\n",
" .fill(mt)\n",
")\n",
"\n",
"q8_hist.compute().plot1d(flow=\"none\")\n",
"dak.necessary_columns(q8_hist)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.4"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment