Created
July 18, 2024 03:40
-
-
Save laiso/c36ac504afb2715831ef9410853753fb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { type Queue, R2Bucket } from "@cloudflare/workers-types"; | |
import puppeteer from "@cloudflare/puppeteer"; | |
import { PDFDocument } from "pdf-lib"; | |
import { Buffer } from 'node:buffer'; | |
type Env = { | |
MY_BUCKET: R2Bucket; | |
MY_QUEUE: Queue; | |
MY_BROWSER: any; | |
MY_KV_NAMESPACE: KVNamespace | |
}; | |
async function getRandomSession(endpoint: puppeteer.BrowserWorker): Promise<string> { | |
const sessions: puppeteer.ActiveSession[] = await puppeteer.sessions(endpoint); | |
console.log(`Sessions: ${JSON.stringify(sessions)}`) | |
const sessionsIds = sessions | |
.filter(v => { | |
return !v.connectionId; // remove sessions with workers connected to them | |
}) | |
.map(v => { | |
return v.sessionId; | |
}); | |
if (sessionsIds.length === 0) { | |
return ''; | |
} | |
const sessionId = sessionsIds[Math.floor(Math.random() * sessionsIds.length)]; | |
return sessionId!; | |
} | |
async function fetchPDFFromR2(env: Env, filename: string): Promise<Blob> { | |
const object = await env.MY_BUCKET.get(filename); | |
if (!object?.body) { | |
console.error(object); | |
throw new Error(`Failed to fetch ${filename} from R2`); | |
} | |
return await object.blob(); | |
} | |
async function mergePDFs(pdfFiles: Blob[]): Promise<Uint8Array> { | |
const mergedPdf = await PDFDocument.create(); | |
for (const blob of pdfFiles) { | |
const arrayBuffer = await blob.arrayBuffer(); | |
const existingPdf = await PDFDocument.load(arrayBuffer); | |
if (!existingPdf) { | |
console.error('Failed to load PDF'); | |
continue; | |
} | |
const copiedPages = await mergedPdf.copyPages(existingPdf, existingPdf.getPageIndices()); | |
copiedPages.forEach(page => { | |
mergedPdf.addPage(page); | |
}); | |
} | |
return mergedPdf.save(); | |
} | |
function normalizeUrls(urls: string[]): string[] { | |
const normalizedUrls = new Set<string>(); | |
for (let url of urls) { | |
try { | |
const urlObj = new URL(url); | |
urlObj.hash = ''; | |
if (urlObj.pathname.endsWith('/')) { | |
urlObj.pathname = urlObj.pathname.slice(0, -1); | |
} | |
normalizedUrls.add(urlObj.toString()); | |
} catch (e) { | |
console.error(`Invalid URL: ${url}`); | |
} | |
} | |
return Array.from(normalizedUrls); | |
} | |
export default { | |
async fetch(request: Request, env: Env): Promise<Response> { | |
const url = new URL(request.url); | |
const id = url.searchParams.get('id'); | |
const chunkSize = url.searchParams.get('size') ? parseInt(url.searchParams.get('size')!) : 20; | |
if (id) { | |
const kv = await env.MY_KV_NAMESPACE.get(`${id}:0`); | |
if (!kv) { | |
throw new Error(`Failed to fetch ${id}:0 from KV`); | |
} | |
const { total } = JSON.parse(kv); | |
const pdfFiles = []; | |
for (let i = 0; i < total; i++) { | |
const kv = await env.MY_KV_NAMESPACE.get(`${id}:${i}`); | |
if (!kv) { | |
throw new Error(`Failed to fetch ${id}:0 from KV`); | |
} | |
const { key } = JSON.parse(kv); | |
try { | |
const pdfFile = await fetchPDFFromR2(env, key); | |
console.log(`${key} bytes ${pdfFile.size}`); | |
pdfFiles.push(pdfFile); | |
} catch (e) { | |
console.error(e); | |
} | |
} | |
const mergedPdfBytes = await mergePDFs(pdfFiles); | |
return new Response(mergedPdfBytes, { | |
headers: { | |
'Content-Type': 'application/pdf', | |
'Content-Disposition': 'attachment; filename=merged.pdf', | |
} | |
}); | |
} | |
const targetUrl = url.searchParams.get('url'); | |
const pattern = url.searchParams.get('pattern'); | |
const matchRegex = pattern ? new RegExp(pattern, 'i') : new RegExp(`${targetUrl}`, 'i'); | |
if (!targetUrl) { | |
return new Response('URL is required as a query parameter', { status: 400 }) | |
} | |
const response = await fetch(new URL(targetUrl)); | |
const rewriter = new HTMLRewriter() | |
const links: string[] = [] | |
rewriter.on('a[href]', { | |
element(element) { | |
let href = element.getAttribute('href') | |
if (href) { | |
try { | |
console.log(`Found link: ${href}`) | |
href = new URL(href, targetUrl).href | |
if (matchRegex.test(href)) { | |
links.push(href) | |
} | |
} catch (e) { | |
console.log(`Failed to parse link: ${href}`); | |
} | |
} | |
} | |
}) | |
await rewriter.transform(response).text() | |
const normalizedUrls = normalizeUrls(links); | |
const taskId = crypto.randomUUID(); | |
let index = 0; | |
const total = Math.ceil(normalizedUrls.length / chunkSize); | |
for (let i = 0; i < normalizedUrls.length; i += chunkSize) { | |
const chunk = normalizedUrls.slice(i, i + chunkSize); | |
await env.MY_KV_NAMESPACE.put(`${taskId}:${index}`, JSON.stringify({ status: 'pending', total })); | |
await env.MY_QUEUE.send({ taskId, urls: chunk, total, index }); | |
index++; | |
} | |
return new Response(JSON.stringify({ | |
taskId, | |
total, | |
links: normalizedUrls | |
}, null, 2), { | |
headers: { 'Content-Type': 'application/json' } | |
}) | |
}, | |
async queue(batch: MessageBatch, env: Env): Promise<void> { | |
let sessionId = await getRandomSession(env.MY_BROWSER) | |
let browser | |
if (sessionId) { | |
try { | |
browser = await puppeteer.connect(env.MY_BROWSER, sessionId) | |
} catch (e) { | |
console.error(e); | |
console.error(`Failed to connect to session ${sessionId}`) | |
} | |
} | |
if (!browser) { | |
try { | |
browser = await puppeteer.launch(env.MY_BROWSER, { keep_alive: 600000 }); | |
} catch (e) { | |
console.error(e); | |
console.error(`Failed to launch browser`); | |
batch.retryAll(); | |
return; | |
} | |
} | |
sessionId = browser.sessionId(); | |
console.log(`Connected to browser session ${sessionId}`); | |
for (let message of batch.messages) { | |
const { taskId, index, urls } = message.body as { urls: string[], taskId: string, total: number, index: number } | |
let pdfBytesArray = []; | |
for (let i = 0; i < urls.length; i++) { | |
const url = urls[i]; | |
const page = await browser.newPage(); | |
const reqUrl = new URL(url).toString(); | |
console.log(`goto ${reqUrl}`); | |
try { | |
await page.goto(reqUrl, { timeout: 60000 }); | |
const bytes = await page.pdf({ format: "A4", timeout: 60000 }); | |
pdfBytesArray.push(bytes); | |
} catch (e) { | |
console.error(`Failed to generate PDFs for ${taskId}:${index}`); | |
message.retry(); | |
return; | |
} finally { | |
await page.close(); | |
} | |
} | |
const pdfDoc = await PDFDocument.create(); | |
for (let pdfBytes of pdfBytesArray) { | |
const subPdfDoc = await PDFDocument.load(pdfBytes); | |
const copiedPages = await pdfDoc.copyPages( | |
subPdfDoc, | |
subPdfDoc.getPageIndices(), | |
); | |
for (const p of copiedPages) { | |
pdfDoc.addPage(p); | |
} | |
} | |
const pdfFileBytes = await pdfDoc.save(); | |
const pdfFileBuffer = Buffer.from(pdfFileBytes); | |
const key = `${taskId}:${index}.pdf`; | |
env.MY_BUCKET.put(key, pdfFileBuffer) | |
await env.MY_KV_NAMESPACE.put(`${taskId}:${index}`, JSON.stringify({ key, status: 'done', ...(message.body as object) })); | |
} | |
await browser.disconnect(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment