Skip to content

Instantly share code, notes, and snippets.

View davebshow's full-sized avatar

David Michael Brown davebshow

  • Vancouver Island
View GitHub Profile
@davebshow
davebshow / raw.py
Last active September 12, 2016 18:00
import asyncio
import uvloop
from aiohttp import ClientSession
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
msg = b'\x10application/json{"processor": "", "op": "eval", "args": {"aliases": {}, "gremlin": "x + x", "bindings": {"x": 1}}, "requestId": "2f52dced-0c4d-4ccd-a8a6-62a59bbb52af"}'
b'\x10application/json{"processor": "", "op": "eval", "args": {"aliases": {}, "gremlin": "graph.features().graph().supportsTransactions()"}, "requestId": "2f52dced-0c4d-4ccd-a8a6-62a59bbb52af"}'
-------------------------------------------------------
T E S T S
-------------------------------------------------------
Running org.apache.tinkerpop.gremlin.python.structure.io.graphson.GraphSONReaderTest
Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 5.019 sec - in org.apache.tinkerpop.gremlin.python.structure.io.graphson.GraphSONReaderTest
Running org.apache.tinkerpop.gremlin.python.structure.io.graphson.GraphSONWriterTest
Tests run: 5, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.133 sec - in org.apache.tinkerpop.gremlin.python.structure.io.graphson.GraphSONWriterTest
Running org.apache.tinkerpop.gremlin.python.jsr223.PythonProcessStandardTest
Tests run: 569, Failures: 0, Errors: 0, Skipped: 44, Time elapsed: 13.2 sec - in org.apache.tinkerpop.gremlin.python.jsr223.PythonProcessStandardTest
Running org.apache.tinkerpop.gremlin.python.jsr223.PythonProcessComputerTest
[INFO] Executing tasks
main:
[exec] running pytest
[exec] Searching for tornado==4.4.1
[exec] Reading https://pypi.python.org/simple/tornado/
[exec] Best match: tornado 4.4.1
[exec] Downloading https://pypi.python.org/packages/96/5d/ff472313e8f337d5acda5d56e6ea79a43583cc8771b34c85a1f458e197c3/tornado-4.4.1.tar.gz#md5=fff8a7d7f580b04bacc2ffe7ddf41133
[exec] Processing tornado-4.4.1.tar.gz
[exec] Writing /tmp/easy_install-HmS6zn/tornado-4.4.1/setup.cfg
<?xml version="1.0" encoding="utf-8"?><testsuite errors="0" failures="0" name="pytest" skips="0" tests="91" time="7.217"><testcase classname="tests.test_app" file="tests/test_app.py" line="23" name="test_registry" time="0.055725812911987305"></testcase><testcase classname="tests.test_app" file="tests/test_app.py" line="34" name="test_registry_defaults" time="0.045771121978759766"></testcase><testcase classname="tests.test_app" file="tests/test_app.py" line="42" name="test_transaction_discovery" time="0.04336738586425781"></testcase><testcase classname="tests.test_app" file="tests/test_app.py" line="47" name="test_translator" time="0.04281353950500488"></testcase><testcase classname="tests.test_config" file="tests/test_config.py" line="23" name="test_cluster_default_config" time="0.0010075569152832031"></testcase><testcase classname="tests.test_config" file="tests/test_config.py" line="34" name="test_app_default_config" time="0.043279170989990234"></testcase><testcase classname="tests.test_config" file="tests/
from gremlin_python.structure import RemoteGraph
from gremlin_python.driver.rest_remote_connection import RESTRemoteConnection
rc = RESTRemoteConnection('ws://localhost:8182', 'g')
graph = RemoteGraph(rc)
g = graph.traversal()
for x in g.V():
print(x)
from goblin import api
from goblin import properties
class TestVertexProperty(api.VertexProperty):
notes = properties.Property(properties.String)
class TestVertex(api.Vertex):
__label__ = 'test_vertex'
from tornado import gen
from tornado.ioloop import IOLoop
from goblin import properties, connection
from goblin.models import Vertex, Edge, V
class Device(Vertex):
serialNumber = properties.String()
latitude = properties.Double()
longitude = properties.Double()
import asyncio
import datetime
from pytz import utc
from tornado.platform.asyncio import AsyncIOMainLoop
from goblin import properties
from goblin.connection import setup, tear_down
from goblin.models import Vertex, Edge, V
import asyncio
import pulsar
from pulsar.apps import ws, wsgi
class Echo(ws.WS):
def on_bytes(self, websocket, msg):
websocket._loop.call_soon(
pulsar.ensure_future, self._call(websocket, msg))