Skip to content

Instantly share code, notes, and snippets.

View NicolasPA's full-sized avatar
🚰
Would you have some data?

Nicolas Parot Alvarez NicolasPA

🚰
Would you have some data?
View GitHub Profile
@NicolasPA
NicolasPA / write_index.py
Last active December 1, 2021 11:01
Write to file MSSQL create table and create indexe with SQLAlchemy
from sqlalchemy import Table, Column, Index, Integer, VARCHAR, MetaData, ForeignKey
from sqlalchemy.dialects import mssql
from sqlalchemy.schema import CreateTable, CreateIndex
md = MetaData()
trade = Table(
"trade",
md,
Column("pk_trade", Integer, primary_key=True, autoincrement=True),
@NicolasPA
NicolasPA / fancy_log_format.py
Last active November 14, 2023 04:13
A fancy log format with ISO date, package and function name
import logging
from datetime import datetime
from typing import Union
def get_configured_logger(level: Union[int, str]) -> logging.Logger:
"""
Set a useful logging format and return a logger with a full reference to the current module.
>>> import logging
@NicolasPA
NicolasPA / unzip_recursively.py
Last active April 8, 2022 17:03
Unzip archive recursively
import zipfile
from typing import Dict, Union, IO
from io import BytesIO
from zipfile import ZipFile
def unzip_recursively(archive_path: Union[str, IO[bytes]]) -> Dict[str, BytesIO]:
"""
Unzip archive recursively
@NicolasPA
NicolasPA / update_git_submodules.sh
Created July 25, 2023 13:25
Update git submodules
git submodule sync && git submodule update --init --force --recursive
@NicolasPA
NicolasPA / dagster_sensor_check_running_job.py
Created July 31, 2023 10:15
Dagster sensor checking if jobs are already running
from dagster import (
RunRequest,
MultiAssetSensorEvaluationContext,
multi_asset_sensor,
AssetSelection,
SkipReason,
RunsFilter,
DagsterRunStatus,
)
@NicolasPA
NicolasPA / xml2db_usage_example.py
Last active November 28, 2023 13:39
xml2db usage example
from xml2db import DataModel
# Create a data model of tables with relations based on the XSD file
data_model = DataModel(
xsd_file="path/to/file.xsd",
connection_string="mssql+pyodbc://server/database?driver=ODBC+Driver+17+for+SQL+Server&trusted_connection=yes",
)
# Parse an XML file based on this XSD
document = data_model.parse_xml(
xml_file="path/to/file.xml"
@NicolasPA
NicolasPA / dagster_dynamic_mapping_dynamic_outputs.py
Created August 8, 2023 15:43
Dagster dynamic asset graph mixing dynamic mapping (one edge for each file to load) and dynamic outputs (a file doesn't go through the same node/function depending on its type) created by factory functions
DAILY_PARTITIONS = DailyPartitionsDefinition(start_date="2022-06-01")
@asset(
description="Files to load",
partitions_def=DAILY_PARTITIONS,
key_prefix="source",
config_schema={
"selected_file_paths": Field(Array(str), is_required=False, default_value=[])
},
@NicolasPA
NicolasPA / dagster_new_file_sensor.py
Created August 8, 2023 15:46
Dagster new files sensors. Detects new files to integrate by computing the difference between files in the source directory and the list of integrated files stored in a table. itt won't trigger any new run if a job is already running, since the output table used to compute the difference is not yet up to date.
import datetime
from dagster import (
sensor,
SensorDefinition,
AssetKey,
SkipReason,
RunsFilter,
DagsterRunStatus,
SensorEvaluationContext,
@NicolasPA
NicolasPA / dagster_multi_asset_sensor.py
Created August 8, 2023 15:50
Dagster sensor that watches the refresh of the source table assets and then triggers the job that runs the DBT transformations. It avoids stacking run requests by checking what's already running.
from dagster import (
RunRequest,
MultiAssetSensorEvaluationContext,
multi_asset_sensor,
AssetSelection,
SkipReason,
RunsFilter,
DagsterRunStatus,
)
@NicolasPA
NicolasPA / dagster_graph.md
Last active August 8, 2023 15:54
High level dagster graph
flowchart TD
    A{"new file 
    sensor"}
    B["source table asset
    (load with xml2db)"]
    C{"asset sensor"}
    D["final table asset
    (transform with DBT)"]
 A --> B --> C --> D