Skip to content

Instantly share code, notes, and snippets.

View saumalya75's full-sized avatar

Saumalya Sarkar saumalya75

  • Wipro Limited
  • Kolkata
View GitHub Profile
class House:
def __init__(self, name: str, symbol: str, student_list: list):
self.name = name
self.symbol = symbol
self.student_list = student_list
def __len__(self):
return len(self.student_list)
def __iter__(self):
class House:
def __init__(self, name: str, symbol: str, student_list: list):
self.name = name
self.symbol = symbol
self.student_list = student_list
def __len__(self):
return len(self.student_list)
class House:
def __init__(self, name: str, symbol: str, student_list: list):
self.name = name
self.symbol = symbol
self.student_list = student_list
class Student:
def __init__(self, name: str, phone_number: int, standard: str):
self.name = name
self.standard = standard
self.phone_number = phone_number
def __str__(self):
return self.name
class Student:
def __init__(self, name: str, phone_number: int, standard: str):
self.name = name
self.standard = standard
self.phone_number = phone_number
def __str__(self):
return self.name
@staticmethod
class Student:
def __init__(self, name: str, phone_number: int, standard: str):
self.name = name
self.standard = standard
self.phone_number = phone_number
s3_sensor = CustomS3Sensor(
task_id=file_details['file_id'] + "_s3_sensor"
, poke_interval=10
, retry_delay=timedelta(seconds=25)
, xcom_task_id_key=file_details['file_id'] + "_s3_sensor_xcom_key"
, refresh_xcom=False
, conn_type="aws"
, wildcard_match=False
, from_xcom=True
, xcom_source_task_id=file_details['file_id'] + "_minio_sensor"
from airflow.models.baseoperator import BaseOperator
from hooks.custom_s3_minio_hook import CustomS3MinioHook
from airflow.utils.decorators import apply_defaults
# Demo application integration imports
import myscript as ms
from myscript.run import check
from myscript import run as r
class CustomFileProcessingOperator(BaseOperator):
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from hooks.custom_s3_minio_hook import CustomS3MinioHook
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException
import json, traceback, sys
def _trigger_file_to_xcom(hook, key, bucket, task_instance, task_key):
source_file_details = hook.read_key(key, bucket)
if source_file_details:
source_file_details_json = json.loads(source_file_details)
from airflow.plugins_manager import AirflowPlugin
from sensors.custom_s3_sensor import CustomS3Sensor
from operators.custom_file_load_operator import CustomFileProcessingOperator
# Defining the plugin class
class CustomPlugin(AirflowPlugin):
name = "custom_plugin"
sensors = [CustomS3Sensor]
operators = [CustomFileProcessingOperator]