Skip to content

Instantly share code, notes, and snippets.

@paulgb
Created October 20, 2024 16:02
Show Gist options
  • Save paulgb/35762d455d641306dda85c7a8b373be6 to your computer and use it in GitHub Desktop.
Save paulgb/35762d455d641306dda85c7a8b373be6 to your computer and use it in GitHub Desktop.
Y-Sweet token streaming from Python example
from y_sweet_sdk import DocumentManager
import pycrdt
from websockets import connect
from pycrdt_websocket import WebsocketProvider
import asyncio
CONNECTION_STRING = "ys://localhost:8080"
DOC_NAME = "test_doc"
TEXT_NAME = "some-text"
async def main():
dm = DocumentManager(CONNECTION_STRING)
dm.create_doc(DOC_NAME)
ws_url = dm.get_websocket_url(DOC_NAME)
ydoc = pycrdt.Doc()
text = ydoc.get(TEXT_NAME, type=pycrdt.Text)
text.observe(lambda: print(text.to_py()))
async with (
connect(ws_url) as websocket,
WebsocketProvider(ydoc, websocket),
):
await asyncio.Future() # run forever
if __name__ == "__main__":
asyncio.run(main())
from y_sweet_sdk import DocumentManager
import pycrdt
import time
CONNECTION_STRING = "ys://localhost:8080"
DOC_NAME = "test_doc"
TEXT_NAME = "some-text"
def main():
dm = DocumentManager(CONNECTION_STRING)
dm.create_doc(DOC_NAME)
conn = dm.get_connection(DOC_NAME)
doc_as_update = conn.get_as_update()
doc = pycrdt.Doc()
doc.apply_update(doc_as_update)
sv = doc.get_state()
text = doc.get(TEXT_NAME, type=pycrdt.Text)
cursor = 0
for i in range(10):
new_text = f"token{i} "
text.insert(cursor, new_text)
cursor += len(new_text)
doc_as_update = doc.get_update(sv)
conn.update_doc(doc_as_update)
print("Sent update with size", len(doc_as_update))
sv = doc.get_state()
time.sleep(0.1)
if __name__ == "__main__":
main()
anyio==4.6.2.post1
asyncio==3.4.3
certifi==2024.8.30
charset-normalizer==3.4.0
idna==3.10
pycrdt==0.10.3
pycrdt-websocket==0.15.1
requests==2.32.3
sniffio==1.3.1
sqlite-anyio==0.2.3
urllib3==2.2.3
websockets==13.1
y_sweet_sdk==0.5.1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment