Skip to content

Instantly share code, notes, and snippets.

View mattfysh's full-sized avatar
🌐

Matt mattfysh

🌐
View GitHub Profile
@mattfysh
mattfysh / kafka.py
Created March 13, 2024 01:04
bytewax KafkaBuilder
from typing import Dict, List
from bytewax import operators as op
from bytewax.dataflow import Dataflow, Stream
from bytewax.connectors.kafka import operators as kop
from bytewax.connectors.kafka.serde import SchemaDeserializer
class KeyDeserializer(SchemaDeserializer[bytes, str]):
def de(self, data):
return data.decode()
const mf = new Miniflare({
script: `
export default {
async fetch(request, env, ctx) {
let id = env.DOO.idFromName('foo')
return await env.DOO.get(id).fetch(request)
}
}
const SECONDS = 1000
@mattfysh
mattfysh / pulumi.ts
Last active February 8, 2024 05:41
Pulumi Kubernetes - deployment preview error
import * as k8s from '@pulumi/kubernetes'
const provider = new k8s.Provider('k8s-provider', {
context: 'docker-desktop',
namespace: 'testns',
})
const ns = new k8s.core.v1.Namespace(
'ns',
{ metadata: { name: 'testns' } },
@mattfysh
mattfysh / index.ts
Last active December 17, 2023 03:13
Pulumi deploy k8s redpanda + console
import * as k8s from '@pulumi/kubernetes'
import dedent from 'dedent'
new k8s.apps.v1.Deployment('redpanda', {
metadata: {
labels: {
appClass: 'redpanda',
},
},
spec: {
@mattfysh
mattfysh / policy.json
Created December 12, 2023 22:44
Danger, Will Robinson!
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iam:*",
"eks:*",
"ec2:*",
"autoscaling:*",
@mattfysh
mattfysh / enrich_join.py
Last active February 3, 2025 04:46
Custom bytewax operators
from copy import deepcopy
from typing import Optional, Any, TypeVar, Tuple
from bytewax.dataflow import operator
from bytewax import operators as op
from bytewax.operators import KeyedStream
RETAIN = False
DISCARD = True
E = TypeVar("E")
@mattfysh
mattfysh / 1-cleaning-notes.md
Last active October 20, 2023 05:26
arroyo prototype
  • CAST(extract_json(...)[1] AS TEXT) does not work
    • internal error: entered unreachable code: invalid cast from Utf8 to Utf8
    • when targeting string output, must use extract_json_string as workaround. all other output types will use CAST

output

Field Value
json_parsed_value "{"string":"abc","number":123,"number_float": 123.456,"boolean":true,"null":null,"string_null":"null"}"
json_parsed_json_string ""abc""
@mattfysh
mattfysh / artist.py
Created October 3, 2023 23:43
de proto
import httpx
from urllib.parse import urlparse, parse_qs
from prefect import flow, task
@task(
version="0.2.0",
cache_key_fn=lambda ctx, args: f"{args['id']} v{ctx.task.version}",
)
def collate_soundcloud_tracks(id):
path = f"/matt/soundcloud-dev/rest/Artist?id={id}"
@mattfysh
mattfysh / benthos.yaml
Last active September 3, 2023 11:34
benthos batching
input:
generate:
count: 1
mapping: '[{"x": 1}, {"x": 2, "y": "a"}]'
pipeline:
processors:
- try:
- unarchive:
format: json_array
@mattfysh
mattfysh / benthos.yaml
Last active August 22, 2023 01:00
Benthos + Consul
input:
sequence:
inputs:
- generate:
count: 1
processors:
- http:
verb: GET
url: ${! env("CONSUL_ENDPOINT") }/v1/kv/${! env("CONSUL_KV_KEY") }
headers: