Skip to content

Instantly share code, notes, and snippets.

@stefanmaric
Last active July 23, 2024 15:25
Show Gist options
  • Save stefanmaric/545853d63b8080c1b8681cc1ab111bbc to your computer and use it in GitHub Desktop.
Save stefanmaric/545853d63b8080c1b8681cc1ab111bbc to your computer and use it in GitHub Desktop.
Find and replace in web stream
import { strict as assert } from 'node:assert'
import fs from 'node:fs'
import path from 'node:path'
import { afterEach, beforeEach, describe, it } from 'node:test'
import { Readable } from 'node:stream'
import { findInStream as _find } from './src/utils/findInStream'
const findInStream = (pathname: string, search: string) => {
return _find(Readable.toWeb(fs.createReadStream(pathname)), search)
}
describe('findInStream', () => {
const testDir = path.join(__dirname, 'test-files')
beforeEach(() => {
if (!fs.existsSync(testDir)) {
fs.mkdirSync(testDir)
}
})
afterEach(() => {
fs.rmSync(testDir, { recursive: true, force: true })
})
it('should find a phrase in a small file', async () => {
const filePath = path.join(testDir, 'small.txt')
fs.writeFileSync(filePath, 'Hello, world!')
assert.equal(await findInStream(filePath, 'world'), true)
assert.equal(await findInStream(filePath, 'universe'), false)
})
it('should find a phrase spanning multiple chunks', async () => {
const filePath = path.join(testDir, 'multi-chunk.txt')
const content = 'a'.repeat(65536) + 'target' + 'b'.repeat(65536)
fs.writeFileSync(filePath, content)
assert.equal(await findInStream(filePath, 'target'), true)
})
it('should handle multi-byte characters', async () => {
const filePath = path.join(testDir, 'multi-byte.txt')
fs.writeFileSync(filePath, '你好,世界!')
assert.equal(await findInStream(filePath, '世界'), true)
assert.equal(await findInStream(filePath, '宇宙'), false)
})
it('should handle large files efficiently', async () => {
const filePath = path.join(testDir, 'large.txt')
const content = 'a'.repeat(10 * 1024 * 1024) + 'needle' + 'b'.repeat(10 * 1024 * 1024)
fs.writeFileSync(filePath, content)
const start = Date.now()
const result = await findInStream(filePath, 'needle')
const end = Date.now()
assert.equal(result, true)
assert.equal(Math.max(end - start, 1000), 1000) // Should complete in less than 1 second
})
})
import { hasPartialTailMatch } from './hasPartialTailMatch'
/**
* Finds the occurrence of string in a ReadableStream.
*
* @param inputStream A WebAPI ReadableStream.
* @param search Thing to replace.
* @returns `true` if the stream has the search string, otherwise `false`.
*/
export const findInStream = async (
inputStream: ReadableStream,
search: string
): Promise<boolean> => {
// Pre-compile the individual UTF-8 runes to avoid doing it on every chunk.
// Remove the last rune as we only care about partial matches, as a full match doesn't require to
// hold onto the current chunk.
const runes = [...search].slice(0, -1)
let buffer = ''
let found = false
try {
await inputStream.pipeThrough(new TextDecoderStream()).pipeTo(
new WritableStream({
write(chunk) {
buffer += chunk
if (buffer.includes(search)) {
found = true
throw new Error('Found')
}
if (!hasPartialTailMatch(buffer, runes)) {
buffer = ''
}
},
})
)
} catch (err) {
if (!found) throw err
}
return found
}
/**
* Checks if the given content has a partial tail match with the given runes.
*
* @param content - The content to search for a partial tail match.
* @param runes - The search term as runes. Get the runes of a string by spreading it: `[...'hue']`.
* @returns True if the content has a partial tail match with the given runes, false otherwise.
* @example
*
* hasPartialTailMatch('the train goes choo choo', [...'choose']) //=> true
* hasPartialTailMatch('the train goes choo choo!', [...'choose']) //=> false
*/
export const hasPartialTailMatch = (content: string, runes: string[]) => {
let search = ''
for (const rune of runes) {
search += rune
if (content.endsWith(search)) {
return true
}
}
return false
}
/**
* Converts an iterable to a readable stream. This can be replaced with the
* experimental WebAPI `ReadableStream.from()` method when it becomes available.
*
* @see https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/from_static
*
* @param list - The iterable to convert.
* @returns A readable stream.
*
* @example
*
* const stringStream = iterableToStream([
* 'lorem ipsum',
* 'dolor sit amet',
* 'consectetur adipiscing elit',
* 'sed do eiusmod tempor incididunt ut labore et dolore magna aliqua',
* ])
*
* return new Response(stringStream.pipeThrough(new TextEncoderStream()))
*/
export const iterableToStream = <T>(list: Iterable<T>): ReadableStream<T> => {
const iterator = list[Symbol.iterator]()
return new ReadableStream<T>({
async start(controller: ReadableStreamDefaultController<T>) {
while (true) {
const { done, value } = iterator.next()
if (done) {
controller.close()
break
}
controller.enqueue(value)
}
},
})
}
import { describe, it } from 'node:test'
import { strict as assert } from 'node:assert'
import { iterableToStream } from './iterableToStream'
import { replaceInStream } from './replaceInStream'
describe('replaceInStream', () => {
it('should replace all occurrences of a string in a stream', async () => {
const chunks = [
'This is sparta',
' and this is hue',
', and h',
'ue will not be gone until hu',
'stle is done.',
]
const inputStream = iterableToStream(chunks).pipeThrough(new TextEncoderStream())
const body = new Response(replaceInStream(inputStream, 'hue', 'kek'))
const text = await body.text()
assert.equal(text.includes('hue'), false)
})
it('should replace the occurrence of a string at the end of the stream', async () => {
const chunks = [
'The end ',
'of the stream',
' is at hue',
]
const inputStream = iterableToStream(chunks).pipeThrough(new TextEncoderStream())
const body = new Response(replaceInStream(inputStream, 'hue', 'kek'))
const text = await body.text()
assert.equal(text.includes('hue'), false)
assert.equal(text, chunks.join('').replace('hue', 'kek'))
})
it('should not hang when the end of a stream has a partial match', async () => {
const chunks = [
'The end ',
'of the stream',
' looks like a partial hu',
]
const inputStream = iterableToStream(chunks).pipeThrough(new TextEncoderStream())
const body = new Response(replaceInStream(inputStream, 'hue', 'kek'))
const text = await body.text()
assert.equal(text, chunks.join(''))
})
})
import { hasPartialTailMatch } from "./hasPartialTailMatch"
/**
* Replaces all occurrences of a string in a ReadableStream.
*
* @param inputStream A WebAPI ReadableStream.
* @param search Thing to replace.
* @param replace Thing to replace with.
* @returns A ReadableStream with the replaced content.
*/
export const replaceInStream = (
inputStream: ReadableStream,
search: string,
replace: string
): ReadableStream<Uint8Array> => {
// Pre-compile the individual UTF-8 runes to avoid doing it on every chunk.
// Remove the last rune as we only care about partial matches, as a full match doesn't require to
// hold onto the current chunk.
const runes = [...search].slice(0, -1)
let buffer = ''
return inputStream
.pipeThrough(new TextDecoderStream())
.pipeThrough(
new TransformStream({
transform(chunk, controller) {
buffer += chunk
if (!hasPartialTailMatch(buffer, runes)) {
controller.enqueue(buffer.replaceAll(search, replace))
buffer = ''
}
},
flush(controller) {
if (buffer) {
controller.enqueue(buffer)
buffer = ''
}
}
})
)
.pipeThrough(new TextEncoderStream())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment