Created
October 12, 2025 12:01
-
-
Save mnlipp/0bc2ba8c1efdb48d5a6d7abe669dbb31 to your computer and use it in GitHub Desktop.
Creates an Airflow DAG that copies submissions from a Nextcloud Form into a Nextcloud Table
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # SPDX-License-Identifier: AGPL-3.0-only | |
| from airflow import DAG | |
| from airflow.decorators import task | |
| from airflow.providers.http.hooks.http import HttpHook | |
| from datetime import datetime, timedelta | |
| import json | |
| import os | |
| def create_form_to_table_dag( | |
| dag_id: str, http_conn_id: str, form_title: str, table_title: str, | |
| field_to_column: dict, schedule=None): | |
| """ | |
| Creates an Airflow DAG that copies submissions from a Nextcloud Form into | |
| a Nextcloud Table. | |
| Steps performed by the DAG: | |
| 1. Retrieve the ID of the form matching `form_title`. | |
| 2. Retrieve the ID of the table matching `table_title`. | |
| 3. Fetch all known entries in the table (to avoid duplicates) using | |
| `field_to_column` mapping. | |
| 4. Fetch all submissions from the form. | |
| 5. Filter out submissions already present in the table. | |
| 6. Map form fields to table columns using `field_to_column` and the | |
| table's column IDs. | |
| 7. Insert new submissions into the table via the Tables API. | |
| Parameters: | |
| - dag_id (str): The identifier for the created DAG. | |
| - http_conn_id (str): Airflow HTTP connection ID for Nextcloud API. | |
| Must have a value for the Authorization header. | |
| - form_title (str): The title of the Nextcloud Form to retrieve | |
| submissions from. There is no way to get the Id from the Nextcloud UI. | |
| - table_title (str): The title of the Nextcloud Table to insert | |
| submissions into. The Id is actuall in the URL, but we don't know | |
| how reliable this is. | |
| - field_to_column (dict): Mapping from form field names (technical name) | |
| to table column titles, must include "submission_id" key which is used | |
| to identify already copied submissions. May include "submission_date" | |
| which is filled in with date from the submission. | |
| - schedule (Optional): DAG schedule interval (default None). | |
| Returns: | |
| - DAG object with tasks: | |
| get_form_id -> get_table_id -> get_known_submissions -> copy_submissions | |
| """ | |
| with DAG( | |
| dag_id=dag_id, | |
| description="A simple DAG", | |
| start_date = datetime.now() - timedelta(days=1), | |
| schedule=schedule, # timedelta(days=1), | |
| catchup=False, | |
| tags=["example"], | |
| # These args will get passed on to each operator | |
| # You can override them on a per-task basis during operator initialization | |
| default_args={ | |
| "depends_on_past": False, | |
| "retries": 1, | |
| "retry_delay": timedelta(minutes=5), | |
| # 'queue': 'bash_queue', | |
| # 'pool': 'backfill', | |
| # 'priority_weight': 10, | |
| # 'end_date': datetime(2016, 1, 1), | |
| # 'wait_for_downstream': False, | |
| # 'execution_timeout': timedelta(seconds=300), | |
| # 'on_failure_callback': some_function, # or list of functions | |
| # 'on_success_callback': some_other_function, # or list of functions | |
| # 'on_retry_callback': another_function, # or list of functions | |
| # 'sla_miss_callback': yet_another_function, # or list of functions | |
| # 'on_skipped_callback': another_function, #or list of functions | |
| # 'trigger_rule': 'all_success' | |
| }, | |
| ) as dag: | |
| @task() | |
| def get_form_id(): | |
| """ | |
| Retrieve the ID of the Nextcloud Form with the specified `form_title`. | |
| Returns: | |
| str: The ID of the form as a string. | |
| """ | |
| hook = HttpHook(http_conn_id=http_conn_id, method="GET") | |
| # payload = {"param1": "value1", "param2": "value2"} | |
| response = hook.run( | |
| endpoint="/ocs/v2.php/apps/forms/api/v3/forms", | |
| headers={"Accept": "application/json", | |
| "OCS-APIRequest": "true"}) | |
| response.raise_for_status() | |
| matching_entry = next( | |
| (item for item in response.json()['ocs']['data'] | |
| if item.get('title') == form_title), None) | |
| if not matching_entry: | |
| raise ValueError(f"No form found with title '{form_title}'") | |
| return str(matching_entry['id']) | |
| @task() | |
| def get_table_id(): | |
| """ | |
| Retrieve the ID of the Nextcloud Table with the specified `table_title`. | |
| Returns: | |
| str: The ID of the table. | |
| """ | |
| hook = HttpHook(http_conn_id=http_conn_id, method="GET") | |
| response = hook.run( | |
| endpoint="/ocs/v2.php/apps/tables/api/2/tables", | |
| headers={"Accept": "application/json", | |
| "OCS-APIRequest": "true"}) | |
| response.raise_for_status() | |
| matching_entry = next( | |
| (item for item in response.json()['ocs']['data'] | |
| if item.get('title') == table_title), None) | |
| if not matching_entry: | |
| raise ValueError(f"No table found with title '{table_title}'") | |
| return str(matching_entry['id']) | |
| @task() | |
| def get_known_submissions(table_id: str): | |
| """ | |
| Retrieve all existing row IDs in the specified table to avoid | |
| duplicate inserts. | |
| Args: | |
| table_id (str): ID of the Nextcloud Table. | |
| Returns: | |
| list[str]: IDs of existing rows in the table. | |
| """ | |
| hook = HttpHook(http_conn_id=http_conn_id, method="GET") | |
| response = hook.run( | |
| endpoint=f"/index.php/apps/tables/api/1/tables/{table_id}/rows/simple", | |
| headers={"Accept": "application/json", | |
| "OCS-APIRequest": "true"}) | |
| response.raise_for_status() | |
| data = response.json() | |
| columns = data[0] | |
| data = data[1:] | |
| id_index = columns.index(field_to_column["submission_id"]) | |
| known_ids = [str(row[id_index]) for row in data] | |
| return known_ids | |
| @task() | |
| def copy_submissions(form_id: str, table_id: str, | |
| known_submissions: list[str]): | |
| """ | |
| Copy all new submissions from a Nextcloud Form into a Nextcloud Table. | |
| Args: | |
| form_id (str): ID of the Nextcloud Form. | |
| table_id (str): ID of the Nextcloud Table. | |
| known_submissions (list[str]): List of submission IDs | |
| already in the table. | |
| Returns: | |
| None | |
| """ | |
| # Get all submissions | |
| hook = HttpHook(http_conn_id=http_conn_id, method="GET") | |
| response = hook.run( | |
| endpoint=f"/ocs/v2.php/apps/forms/api/v3/forms/{form_id}/submissions", | |
| headers={"Accept": "application/json", "OCS-APIRequest": "true"}) | |
| response.raise_for_status() | |
| submissions = response.json()['ocs']['data']['submissions'] | |
| print(submissions) | |
| print(known_submissions) | |
| new_subs = [sub for sub in submissions | |
| if str(sub["id"]) not in known_submissions] | |
| if not new_subs: | |
| return None | |
| # Get column ids | |
| hook = HttpHook(http_conn_id=http_conn_id, method="GET") | |
| response = hook.run( | |
| endpoint=f"/ocs/v2.php/apps/tables/api/2/columns/table/{table_id}", | |
| headers={"Accept": "application/json", "OCS-APIRequest": "true"}) | |
| response.raise_for_status() | |
| data = response.json()["ocs"]["data"] | |
| column_to_id = {item["title"]: str(item["id"]) for item in data} | |
| # combined mapping | |
| field_to_id = { item: column_to_id[field_to_column[item]] | |
| for item in field_to_column | |
| if field_to_column[item] in column_to_id} | |
| # Create rows | |
| for sub in new_subs: | |
| print(sub) | |
| submission_id = str(sub["id"]) | |
| answers = sub["answers"] | |
| payload = {field_to_id[answer['questionName']] : answer['text'] | |
| for answer in answers | |
| if (answer['questionName'] in field_to_id)} | |
| if "submission_id" in field_to_id: | |
| payload[field_to_id["submission_id"]] = submission_id | |
| else: | |
| raise ValueError("No mapping for submission_id.") | |
| if "submission_date" in field_to_id: | |
| dt = datetime.fromtimestamp(int(sub["timestamp"])) | |
| payload[field_to_id["submission_date"]] \ | |
| = dt.strftime("%Y-%m-%d %H:%M") | |
| # Prepare for request | |
| payload = { "data": payload } | |
| hook = HttpHook(method="POST", http_conn_id=http_conn_id) | |
| response = hook.run( | |
| endpoint=f"/ocs/v2.php/apps/tables/api/2/tables/{table_id}/rows", | |
| headers={"Content-Type": "application/json", | |
| "OCS-APIRequest": "true"}, | |
| data=json.dumps(payload)) | |
| response.raise_for_status() | |
| return None | |
| # Define task dependencies | |
| form_id = get_form_id() | |
| table_id = get_table_id() | |
| known_submissions = get_known_submissions(table_id) | |
| copy_submissions(form_id, table_id, known_submissions) | |
| return dag |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment