Skip to content

Instantly share code, notes, and snippets.

@mnlipp
Created October 12, 2025 12:01
Show Gist options
  • Select an option

  • Save mnlipp/0bc2ba8c1efdb48d5a6d7abe669dbb31 to your computer and use it in GitHub Desktop.

Select an option

Save mnlipp/0bc2ba8c1efdb48d5a6d7abe669dbb31 to your computer and use it in GitHub Desktop.
Creates an Airflow DAG that copies submissions from a Nextcloud Form into a Nextcloud Table
# SPDX-License-Identifier: AGPL-3.0-only
from airflow import DAG
from airflow.decorators import task
from airflow.providers.http.hooks.http import HttpHook
from datetime import datetime, timedelta
import json
import os
def create_form_to_table_dag(
dag_id: str, http_conn_id: str, form_title: str, table_title: str,
field_to_column: dict, schedule=None):
"""
Creates an Airflow DAG that copies submissions from a Nextcloud Form into
a Nextcloud Table.
Steps performed by the DAG:
1. Retrieve the ID of the form matching `form_title`.
2. Retrieve the ID of the table matching `table_title`.
3. Fetch all known entries in the table (to avoid duplicates) using
`field_to_column` mapping.
4. Fetch all submissions from the form.
5. Filter out submissions already present in the table.
6. Map form fields to table columns using `field_to_column` and the
table's column IDs.
7. Insert new submissions into the table via the Tables API.
Parameters:
- dag_id (str): The identifier for the created DAG.
- http_conn_id (str): Airflow HTTP connection ID for Nextcloud API.
Must have a value for the Authorization header.
- form_title (str): The title of the Nextcloud Form to retrieve
submissions from. There is no way to get the Id from the Nextcloud UI.
- table_title (str): The title of the Nextcloud Table to insert
submissions into. The Id is actuall in the URL, but we don't know
how reliable this is.
- field_to_column (dict): Mapping from form field names (technical name)
to table column titles, must include "submission_id" key which is used
to identify already copied submissions. May include "submission_date"
which is filled in with date from the submission.
- schedule (Optional): DAG schedule interval (default None).
Returns:
- DAG object with tasks:
get_form_id -> get_table_id -> get_known_submissions -> copy_submissions
"""
with DAG(
dag_id=dag_id,
description="A simple DAG",
start_date = datetime.now() - timedelta(days=1),
schedule=schedule, # timedelta(days=1),
catchup=False,
tags=["example"],
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'on_skipped_callback': another_function, #or list of functions
# 'trigger_rule': 'all_success'
},
) as dag:
@task()
def get_form_id():
"""
Retrieve the ID of the Nextcloud Form with the specified `form_title`.
Returns:
str: The ID of the form as a string.
"""
hook = HttpHook(http_conn_id=http_conn_id, method="GET")
# payload = {"param1": "value1", "param2": "value2"}
response = hook.run(
endpoint="/ocs/v2.php/apps/forms/api/v3/forms",
headers={"Accept": "application/json",
"OCS-APIRequest": "true"})
response.raise_for_status()
matching_entry = next(
(item for item in response.json()['ocs']['data']
if item.get('title') == form_title), None)
if not matching_entry:
raise ValueError(f"No form found with title '{form_title}'")
return str(matching_entry['id'])
@task()
def get_table_id():
"""
Retrieve the ID of the Nextcloud Table with the specified `table_title`.
Returns:
str: The ID of the table.
"""
hook = HttpHook(http_conn_id=http_conn_id, method="GET")
response = hook.run(
endpoint="/ocs/v2.php/apps/tables/api/2/tables",
headers={"Accept": "application/json",
"OCS-APIRequest": "true"})
response.raise_for_status()
matching_entry = next(
(item for item in response.json()['ocs']['data']
if item.get('title') == table_title), None)
if not matching_entry:
raise ValueError(f"No table found with title '{table_title}'")
return str(matching_entry['id'])
@task()
def get_known_submissions(table_id: str):
"""
Retrieve all existing row IDs in the specified table to avoid
duplicate inserts.
Args:
table_id (str): ID of the Nextcloud Table.
Returns:
list[str]: IDs of existing rows in the table.
"""
hook = HttpHook(http_conn_id=http_conn_id, method="GET")
response = hook.run(
endpoint=f"/index.php/apps/tables/api/1/tables/{table_id}/rows/simple",
headers={"Accept": "application/json",
"OCS-APIRequest": "true"})
response.raise_for_status()
data = response.json()
columns = data[0]
data = data[1:]
id_index = columns.index(field_to_column["submission_id"])
known_ids = [str(row[id_index]) for row in data]
return known_ids
@task()
def copy_submissions(form_id: str, table_id: str,
known_submissions: list[str]):
"""
Copy all new submissions from a Nextcloud Form into a Nextcloud Table.
Args:
form_id (str): ID of the Nextcloud Form.
table_id (str): ID of the Nextcloud Table.
known_submissions (list[str]): List of submission IDs
already in the table.
Returns:
None
"""
# Get all submissions
hook = HttpHook(http_conn_id=http_conn_id, method="GET")
response = hook.run(
endpoint=f"/ocs/v2.php/apps/forms/api/v3/forms/{form_id}/submissions",
headers={"Accept": "application/json", "OCS-APIRequest": "true"})
response.raise_for_status()
submissions = response.json()['ocs']['data']['submissions']
print(submissions)
print(known_submissions)
new_subs = [sub for sub in submissions
if str(sub["id"]) not in known_submissions]
if not new_subs:
return None
# Get column ids
hook = HttpHook(http_conn_id=http_conn_id, method="GET")
response = hook.run(
endpoint=f"/ocs/v2.php/apps/tables/api/2/columns/table/{table_id}",
headers={"Accept": "application/json", "OCS-APIRequest": "true"})
response.raise_for_status()
data = response.json()["ocs"]["data"]
column_to_id = {item["title"]: str(item["id"]) for item in data}
# combined mapping
field_to_id = { item: column_to_id[field_to_column[item]]
for item in field_to_column
if field_to_column[item] in column_to_id}
# Create rows
for sub in new_subs:
print(sub)
submission_id = str(sub["id"])
answers = sub["answers"]
payload = {field_to_id[answer['questionName']] : answer['text']
for answer in answers
if (answer['questionName'] in field_to_id)}
if "submission_id" in field_to_id:
payload[field_to_id["submission_id"]] = submission_id
else:
raise ValueError("No mapping for submission_id.")
if "submission_date" in field_to_id:
dt = datetime.fromtimestamp(int(sub["timestamp"]))
payload[field_to_id["submission_date"]] \
= dt.strftime("%Y-%m-%d %H:%M")
# Prepare for request
payload = { "data": payload }
hook = HttpHook(method="POST", http_conn_id=http_conn_id)
response = hook.run(
endpoint=f"/ocs/v2.php/apps/tables/api/2/tables/{table_id}/rows",
headers={"Content-Type": "application/json",
"OCS-APIRequest": "true"},
data=json.dumps(payload))
response.raise_for_status()
return None
# Define task dependencies
form_id = get_form_id()
table_id = get_table_id()
known_submissions = get_known_submissions(table_id)
copy_submissions(form_id, table_id, known_submissions)
return dag
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment