Skip to content

Instantly share code, notes, and snippets.

View 2tony2's full-sized avatar
🔥
Lighting my keyboard on fire

Tony Zeljkovic 2tony2

🔥
Lighting my keyboard on fire
View GitHub Profile
@2tony2
2tony2 / dependency_injection_agent.md
Created March 17, 2026 21:09
dependency_injection_agent_of_peace.md

Dependency Injection Repo Architect — Agent Instructions, Interview, and Structure Tree

This document is a working agent spec for an AI assistant whose job is to help a user design, introduce, or improve dependency injection in an existing repository.

It is intentionally opinionated. The agent should not start by recommending a DI container. It should start by clarifying the application's entry points, side-effect boundaries, object lifetimes, runtime variability, and refactoring constraints. The default recommendation is the lightest solution that keeps dependencies explicit.


1. Mission

settings:
###### GENERAL RESOURCE PROVISIONING ######
max_memory: "4 GiB"
# max_temp_directory_size: "90% of available disk space"
# threads: 20
# external_threads: 4
# max_vacuum_tasks: 100
###### MORE NICHE PERFORMANCE / RESOURCE SETTINGS ######
# enable_fsst_vectors: false # Only useful for specific string uses cases.
-- Create a masking policy for email addresses
CREATE OR REPLACE MASKING POLICY email_mask AS (val string)
RETURNS string ->
CASE
WHEN CURRENT_ROLE() IN ('ANALYST') THEN val
ELSE REGEXP_REPLACE(val, '.+\@', '*****@')
END;
-- Create a tag for PII data
CREATE TAG pii_email_data;
1
with connection:
if file_path is not None:
counter = 0
cursor_execution_config.command = f"PUT file://{snowflake_file_name}_{counter} @{full_qualified_stage_name} OVERWRITE = {overwrite}"
with open(file_path, "rb") as f:
if chunk_size == 0:
raise ValueError(
"Chunk size of 0 is not allowed for filestream pushing."
)
elif chunk_size > 0:
pythonCopy code
import paramiko
def stream_data_from_sftp(hostname, port, username, password, remote_path, chunk_size=1024):
transport = paramiko.Transport((hostname, port))
transport.connect(username=username, password=password)
sftp = paramiko.SFTPClient.from_transport(transport)
with sftp.open(remote_path, 'rb') as file:
while chunk := file.read(chunk_size):
cursor = connection.cursor()
results = cursor.fetch_pandas_batches()
assert isinstance(results, Generator)
for result in results:
assert isinstance(result, PandasDataFrame)
break
import s3fs
def stream_data_from_s3(bucket_name, file_path, chunk_size=1024):
fs = s3fs.S3FileSystem()
with fs.open(f'{bucket_name}/{file_path}', 'rb') as file:
while chunk := file.read(chunk_size):
yield chunk
# Example usage
bucket_name = 'my-bucket'
import httpx
def stream_data_from_api(url, chunk_size=1024):
with httpx.stream('GET', url) as response:
for chunk in response.iter_bytes(chunk_size):
yield chunk
# Example usage
url = '<https://example.com/large_file.zip>'
for chunk in stream_data_from_api(url):
def count_up_to(max):
count = 1
while count <= max:
yield count
count += 1
counter = count_up_to(3)
print(next(counter)) # Output: 1
print(next(counter)) # Output: 2
print(next(counter)) # Output: 3