Skip to content

Instantly share code, notes, and snippets.

@xgenvn
Last active June 28, 2024 09:14
Show Gist options
  • Save xgenvn/bd4e115e4e1ee5b85348f2ac205f8964 to your computer and use it in GitHub Desktop.
Save xgenvn/bd4e115e4e1ee5b85348f2ac205f8964 to your computer and use it in GitHub Desktop.
PubSub with Deno
import { Hono } from "jsr:@hono/hono@^4.4.0"
import { PubSub } from "npm:@google-cloud/pubsub"
import { Buffer } from "node:buffer"
import * as queryString from "npm:[email protected]"
const pubSub = new PubSub({
projectId: Deno.env.get("PROJECT_ID"),
keyFilename: Deno.env.get("GOOGLE_APPLICATION_CREDENTIALS"),
})
const app = new Hono()
async function publishMessage(topicName, message) {
try {
const [topic] = await pubSub.topic(topicName).get({ autoCreate: true })
const messageId = await topic.publish(message.data, message.attributes)
console.log(`Message ${messageId} published.`)
return messageId
} catch (error) {
console.error("Error publishing message:", error)
}
}
app.post("/:topic/:itemId?", async (c) => {
const { req: request } = c
const topicName = request.param("topic")
const itemId = request.param("itemId")
const contentType = request.header("Content-Type")
let notificationPayload
if (contentType === "application/json") {
notificationPayload = JSON.stringify(await request.json())
} else if (contentType === "text/plain") {
notificationPayload = await request.text()
} else if (contentType === "application/x-www-form-urlencoded") {
notificationPayload = JSON.stringify(await request.parseBody())
} else {
return new Response("Unsupported content type", { status: 415 })
}
const { url, headers } = request.raw
const queryParams = queryString.parse(new URL(url).searchParams.toString())
console.log(headers)
const message = {
data: Buffer.from(notificationPayload),
attributes: {
itemId: itemId || "",
queryParams: JSON.stringify(queryParams),
headers: JSON.stringify(headers),
},
}
const messageId = await publishMessage(topicName, message)
return c.json({
messageId,
topicName,
itemId,
})
})
Deno.serve(app.fetch)
// xh post 'localhost:8000/test-topic/123123123123?tags=a&tags=1&a=2&b=456' hello=world
// xh post 'localhost:8000/test-topic?tags=a&tags=1&a=2&b=456' hello=world
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment