Last active
August 29, 2015 14:25
-
-
Save hannelita/9b15d257f9a847c58d4e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Neo4j implementation for the DocManager. Receives documents and | |
communicates with Neo4j Server. | |
""" | |
import base64 | |
import logging | |
from threading import Timer | |
import bson.json_util | |
from py2neo import Graph, exceptions as es_exceptions | |
from mongo_connector import errors | |
from mongo_connector.compat import u | |
from mongo_connector.constants import (DEFAULT_COMMIT_INTERVAL, | |
DEFAULT_MAX_BULK) | |
from mongo_connector.util import exception_wrapper, retry_until_ok | |
from mongo_connector.doc_managers.doc_manager_base import DocManagerBase | |
from mongo_connector.doc_managers.formatters import DefaultDocumentFormatter | |
wrap_exceptions = exception_wrapper({ | |
es_exceptions.ConnectionError: errors.ConnectionFailed, | |
es_exceptions.TransportError: errors.OperationFailed, | |
es_exceptions.NotFoundError: errors.OperationFailed, | |
es_exceptions.RequestError: errors.OperationFailed}) | |
LOG = logging.getLogger(__name__) | |
class DocManager(DocManagerBase): | |
""" | |
Neo4j implementation for the DocManager. Receives documents and | |
communicates with Neo4j Server. | |
""" | |
def __init__(self, url, **kwargs): | |
self.remote_graph = Graph(url, | |
**kwargs.get('clientOptions', {})) | |
def stop(self): | |
"""Stop the auto-commit thread.""" | |
self.auto_commit_interval = None | |
def upsert(self, doc, namespace, timestamp): | |
"""Inserts a document into Neo4j.""" | |
def bulk_upsert(self, docs, namespace, timestamp): | |
"""Insert multiple documents into Neo4j.""" | |
def update(self, document_id, update_spec, namespace, timestamp): | |
def remove(self, document_id, namespace, timestamp): | |
def search(self, start_ts, end_ts): | |
def commit(self): | |
def get_last_doc(self): | |
def handle_command(self, doc, namespace, timestamp): | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment