Skip to content

Instantly share code, notes, and snippets.

@ikappaki
Last active December 17, 2024 06:42
Show Gist options
  • Save ikappaki/631d24aab511dc8947084653a64885f3 to your computer and use it in GitHub Desktop.
Save ikappaki/631d24aab511dc8947084653a64885f3 to your computer and use it in GitHub Desktop.
pipecat-ai 01-say-one-thing in Basilisp
(import asyncio aiohttp os sys)
(defmacro import-from
"Helper function to import each name in NAMES from MODULE as a local
variable, allowing them to be referenced without the module name.
https://github.com/basilisp-lang/basilisp/issues/1154."
[module & names]
(let [defs (for [n (vec (map str names))]
`(def ~(symbol n) (importing-resolve (symbol ~(str module "/" n)))))]
`(do ~@defs)))
(import-from pipecat.frames.frames EndFrame TTSSpeakFrame)
(import-from pipecat.pipeline.pipeline Pipeline)
(import-from pipecat.pipeline.task PipelineTask)
(import-from pipecat.pipeline.runner PipelineRunner)
(import-from pipecat.services.cartesia CartesiaTTSService)
(import-from pipecat.transports.services.daily DailyParams, DailyTransport)
(import-from runner configure)
(import-from loguru logger)
(import-from dotenv load-dotenv)
(load-dotenv "./.env" ** :override true)
(.remove logger 0)
(.add logger sys/stderr ** :level "DEBUG")
(defasync main
[]
;; Basilisp doesn't support `async for`, so raw `aenter/aexit` calls
;; are used instead. I'll open a new feature request for this.
(let [session (aiohttp/ClientSession)]
(await (.__aenter__ session))
(try
(let [[room-url] (await (configure session))
transport (DailyTransport room-url nil "Say One Thing" (DailyParams ** :audio-out-enabled true))
tts (CartesiaTTSService
**
:api-key (os/getenv "CARTESIA_API_KEY")
:voice-id "79a125e8-cd45-4c13-8a67-188112f4dd22" ;; British Lady
)
runner (PipelineRunner)
task (PipelineTask (Pipeline #py [tts, (.output transport)]))
;; Register an event handler so we can play the audio when the
;; participant joins.
on-first-participant-joined
((.event-handler transport "on_first_participant_joined")
(fn ^:async on-first-participant-joined
[transport participant]
(let [participant-name (-> (.get participant "info" #py {})
(.get "userName" ""))]
(await (.queue-frames task
#py [(TTSSpeakFrame (str "Hello there " participant-name "!")) (EndFrame)])))))]
(await (.run runner task)))
(finally
(await (.__aexit__ session nil nil nil))))))
(asyncio/run (main))
@ikappaki
Copy link
Author

pipecat getting started setup : https://docs.pipecat.ai/getting-started/installation

Original python source code

#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import asyncio
import aiohttp
import os
import sys

from pipecat.frames.frames import EndFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.runner import PipelineRunner
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport

from runner import configure

from loguru import logger

from dotenv import load_dotenv

load_dotenv(override=True)

logger.remove(0)
logger.add(sys.stderr, level="DEBUG")


async def main():
    async with aiohttp.ClientSession() as session:
        (room_url, _) = await configure(session)

        transport = DailyTransport(
            room_url, None, "Say One Thing", DailyParams(audio_out_enabled=True)
        )

        tts = CartesiaTTSService(
            api_key=os.getenv("CARTESIA_API_KEY"),
            voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22",  # British Lady
        )

        runner = PipelineRunner()
        task = PipelineTask(Pipeline([tts, transport.output()]))

        # Register an event handler so we can play the audio when the
        # participant joins.
        @transport.event_handler("on_first_participant_joined")
        async def on_first_participant_joined(transport, participant):
            participant_name = participant.get("info", {}).get("userName", "")
            await task.queue_frames(
                [TTSSpeakFrame(f"Hello there, {participant_name}!"), EndFrame()]
            )

        await runner.run(task)


if __name__ == "__main__":
    asyncio.run(main())

@ovistoica
Copy link

Adding to this example the runner.py conversion used in this file:

#_:standard-clj/ignore
(ns daily-twilio-example.runner
  (:import argparse os asyncio aiohttp
           [pipecat.transports.services.helpers.daily_rest :as rest]))

(defn default-parser
  []
  (doto (argparse/ArgumentParser ** :description "Daily AI SDK Bot Sample")
    (.add-argument "-u" "--url" ** :type python/str :required false :help "URL of the daily room to join")
    (.add-argument "-k" "--api-key" **
                   :type python/str
                   :required false
                   :help "Daily API Key (needed to create an owner token for the room)")))

(defn ^:async configure-with-args
  [aiohttp-session parser]
  (let [[args] (.parse-known-args parser)
        url (or (.-url args) (os/getenv "DAILY_SAMPLE_ROOM_URL"))
        api-key (or (.-api-key args) (os/getenv "DAILY_API_KEY"))]
    (when-not url
      (throw (ex-info "No daily room specified. Use the -u/--url option from the command line or set DAILY_SAMPLE_ROOM_URL"
                      {:cause :missing-url})))
    (when-not api-key
      (throw (ex-info "No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
                      {:cause :missing-url})))
    (let [daily-rest-helper (rest/DailyRESTHelper **
                                                  :daily-api-key api-key
                                                  :daily-api-url (os/getenv "DAILY_API_URL" "https://api.daily.co/v1")
                                                  :aiohttp-session aiohttp-session)
          expiry-time (float (* 60 60))
          token (await (.get-token daily-rest-helper url expiry-time))]
      {:url url :token token :args args})))

(defn ^:async configure
  [aiohttp-session]
  (let [{:keys [url token]} (await (configure-with-args aiohttp-session (default-parser)))]
    {:room-url url :token token}))

(comment
   
  (defn ^:async test-configure
    []
    (let [session (aiohttp/ClientSession)
          response (await (configure session))]
      response))

  (asyncio/run (test-configure))

  ,)

Mentions:

  • I'm using (defn ^:async ..) instead of defasync because I can't get clj-kondo to recognize this macro (help is appreciated)
  • Changed return of configure to a map instead of an array for convenience
  • Top comment is to prevent clj-standard from formatting namespace into form which breaks:
(ns daily-twilio-example.runner
  (:import
   (aiohttp null)
   (argparse null)
   (asyncio null)
   (os null)
   (pipecat.transports.services.helpers.daily_rest :as rest)))

Original

#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import aiohttp
import argparse
import os

from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper


async def configure(aiohttp_session: aiohttp.ClientSession):
    (url, token, _) = await configure_with_args(aiohttp_session)
    return (url, token)


async def configure_with_args(
    aiohttp_session: aiohttp.ClientSession, parser: argparse.ArgumentParser | None = None
):
    if not parser:
        parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
    parser.add_argument(
        "-u", "--url", type=str, required=False, help="URL of the Daily room to join"
    )
    parser.add_argument(
        "-k",
        "--apikey",
        type=str,
        required=False,
        help="Daily API Key (needed to create an owner token for the room)",
    )

    args, unknown = parser.parse_known_args()

    url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL")
    key = args.apikey or os.getenv("DAILY_API_KEY")

    if not url:
        raise Exception(
            "No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
        )

    if not key:
        raise Exception(
            "No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
        )

    daily_rest_helper = DailyRESTHelper(
        daily_api_key=key,
        daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
        aiohttp_session=aiohttp_session,
    )

    # Create a meeting token for the given room with an expiration 1 hour in
    # the future.
    expiry_time: float = 60 * 60

    token = await daily_rest_helper.get_token(url, expiry_time)

    return (url, token, args)
    ```

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment