Last active
May 5, 2023 14:34
-
-
Save arkadijs/172d9bda4e0d10a530add1faef942852 to your computer and use it in GitHub Desktop.
Streaming Writer-based pipeline in Go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"io" | |
"strings" | |
) | |
const rate = 2 // bytes per millisec of "audio" | |
type Anon struct { | |
Start, End int | |
} | |
type Writer func(timestamp int, bytes []byte) | |
func overlap(start, end, timestamp int, bytes []byte) bool { | |
endTimestamp := timestamp + len(bytes)/rate | |
return (timestamp >= start && timestamp <= end) || | |
(endTimestamp >= start && endTimestamp <= end) || | |
(start >= timestamp && start <= endTimestamp) || | |
(end >= timestamp && end <= endTimestamp) | |
} | |
func NewAnonymizator(start, end int, next Writer) Writer { | |
anonymize := func(timestamp int, bytes []byte) { | |
fmt.Printf("anon start=%d end=%d: ts=%02d endTs=%02d %s\n", | |
start, end, timestamp, timestamp+len(bytes)/rate-1, bytes) | |
if overlap(start, end, timestamp, bytes) { | |
// proper slice split logic instead | |
for i := range bytes { | |
currentTimestamp := timestamp + i/rate | |
if currentTimestamp >= start && currentTimestamp <= end { | |
bytes[i] = '_' | |
} | |
} | |
// ... | |
} | |
next(timestamp, bytes) | |
} | |
return anonymize | |
} | |
func NewEncoder(next Writer) Writer { | |
passthru := func(timestamp int, bytes []byte) { | |
next(timestamp, bytes) | |
} | |
return passthru | |
} | |
func upload(body io.Reader) { | |
buf := make([]byte, 5) | |
for { | |
n, err := body.Read(buf) | |
if n > 0 { | |
fmt.Printf("upload: len=%d %s\n", n, buf[:n]) | |
} | |
if err != nil { | |
break | |
} | |
} | |
} | |
type WriterReaderBridge struct { | |
ch chan []byte | |
tail []byte | |
} | |
func NewWriterReaderBridge(ch chan []byte) *WriterReaderBridge { | |
return &WriterReaderBridge{ch, nil} | |
} | |
func (bridge *WriterReaderBridge) Read(out []byte) (int, error) { | |
if len(bridge.tail) > 0 { | |
read := copy(out, bridge.tail) | |
if read >= len(bridge.tail) { | |
bridge.tail = nil | |
} else { | |
bridge.tail = bridge.tail[read:] | |
} | |
return read, nil | |
} | |
chunk := <-bridge.ch | |
read := copy(out, chunk) | |
if read < len(chunk) { | |
bridge.tail = chunk[read:] | |
} | |
return read, nil | |
} | |
func NewUploader() Writer { | |
ch := make(chan []byte) | |
writer := func(_ int, bytes []byte) { | |
ch <- bytes | |
} | |
go upload(NewWriterReaderBridge(ch)) | |
return writer | |
} | |
type IoWriterAdapter struct { | |
writer Writer | |
timestamp int | |
} | |
func (adapter *IoWriterAdapter) Write(p []byte) (n int, err error) { | |
adapter.writer(adapter.timestamp, p) | |
adapter.timestamp += len(p) / rate | |
return len(p), nil | |
} | |
func NewIoWriterAdapter(writer Writer) *IoWriterAdapter { | |
return &IoWriterAdapter{writer, 0} | |
} | |
func main() { | |
anon := []Anon{ | |
{2, 3}, | |
{7, 9}, | |
} | |
pipeline := NewEncoder(NewUploader()) | |
// anonymization doesn't care if it is applied in reverse order | |
for _, a := range anon { | |
pipeline = NewAnonymizator(a.Start, a.End, pipeline) | |
} | |
writer := NewIoWriterAdapter(pipeline) | |
io.Copy(writer, strings.NewReader("0123456789")) | |
io.Copy(writer, strings.NewReader("abcdefghijklmnopqrstxyz")) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment