Created
February 7, 2017 13:09
-
-
Save ivlevdenis/c465bca993ef3e9f955a50eff0c7c05c to your computer and use it in GitHub Desktop.
Интеграция 1С
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import json | |
import asyncio | |
from datetime import datetime | |
from sys import platform | |
from autobahn.asyncio.websocket import WebSocketClientProtocol, WebSocketClientFactory | |
loop = asyncio.get_event_loop() | |
WIN = platform == "win32" | |
if WIN: | |
import pythoncom | |
import win32com.client | |
FILE_DB_PATH = 'C:\\bibl2' | |
USER = '11111111' | |
PASSWD = '11111111' | |
HOST = '192.168.2.105' | |
PORT = 8888 | |
MAIN_URL = 'ws://{host}:{port}/1c/'.format( | |
host=HOST, | |
port=PORT, | |
) | |
UPDATES_URL = 'ws://{host}:{port}/1c_updates/'.format( | |
host=HOST, | |
port=PORT, | |
) | |
CONN_STRING = 'File={file};Usr={user};Pwd={passwd}'.format( | |
file=FILE_DB_PATH, | |
user=USER, | |
passwd=PASSWD, | |
) | |
user_attr_map = { | |
'first_name': 'Имя', | |
'last_name': 'Фамилия', | |
'third_name': 'Отчество', | |
'get_full_name': 'Наименование', | |
} | |
if WIN: | |
pythoncom.CoInitialize() | |
connection = win32com.client.Dispatch('V83.COMConnector').Connect(CONN_STRING) | |
catalogs = { | |
'users': getattr(connection.Catalogs, 'ФизическиеЛица'), | |
'readers': getattr(connection.Catalogs, 'ПользователиБиблиотеки') | |
} | |
enums = { | |
'reader_types': { | |
'individual': getattr(getattr(connection.Enums, 'ВидыЧитателей'), 'ФизЛицо'), | |
'entity': getattr(getattr(connection.Enums, 'ВидыЧитателей'), 'ЮрЛицо'), | |
} | |
} | |
class ACS1cClientProtocol(WebSocketClientProtocol): | |
def create_user(self, user_data, model, pk, update_after=True): | |
""" | |
Создание нового пользователя | |
:param user_data: | |
:param model: | |
:param pk: | |
:return: | |
""" | |
user = catalogs['users'].CreateItem() | |
for key, value in user_attr_map.items(): | |
if key in user_data: | |
setattr(user, value, user_data[key]) | |
user.Write() | |
reader = catalogs['readers'].CreateItem() | |
setattr(reader, 'Наименование', user_data['get_full_name']) | |
setattr(reader, 'ФизЮрЛицо', user.Ref) | |
setattr(reader, 'ВнешнийКод', pk) | |
setattr(reader, 'ВидЧитателя', enums['reader_types']['individual']) | |
reader.Write() | |
if update_after: | |
payload = { | |
'stream': 'user', | |
'payload': { | |
'action': 'update', | |
'model': model, | |
'pk': pk, | |
'data': { | |
'remote_id': user.Code | |
} | |
} | |
} | |
print('Send payload: {0}'.format(payload)) | |
self.sendJSON(payload) | |
return user | |
def create_users(self, users_data, model): | |
if not isinstance(users_data, list): | |
return | |
for user_data in users_data: | |
pk = user_data['pk'] | |
self.create_user(user_data, model, pk) | |
def update_user(self, user_data, model, pk): | |
""" | |
Обновление данных пользователя, при отсутствии пользователя - создается новый. | |
:param data: | |
:param stream: | |
:return: | |
""" | |
if 'remote_id' not in user_data or not user_data['remote_id']: | |
user = self.create_user(user_data, model, pk) | |
else: | |
user_link = catalogs['users'].FindByCode(user_data['remote_id']) | |
if user_link.IsEmpty(): | |
return self.create_user(user_data, model, pk) | |
user = user_link.GetObject() | |
for key, value in user_attr_map.items(): | |
if key in user_data: | |
setattr(user, value, user_data[key]) | |
user.Write() | |
return user | |
def update_users(self, users_data, model): | |
if not isinstance(users_data, list): | |
return | |
for user_data in users_data: | |
pk = user_data['pk'] | |
self.update_user(user_data, model, pk) | |
def delete_user(self, user_data, model, pk, only_mark=True): | |
""" | |
Удаление пользователя. | |
:param data: | |
:param stream: | |
:param only_mark: Только сделать пометку удаления, не удаляет пользователя из базы | |
:return: | |
""" | |
if 'remote_id' not in user_data or not user_data['remote_id']: | |
return None | |
user_link = catalogs['users'].FindByCode(user_data['remote_id']) | |
if user_link.IsEmpty(): | |
return | |
user = user_link.GetObject() | |
if only_mark: | |
user.SetDeletionMark(True) | |
else: | |
user.Delete() | |
return user | |
def delete_users(self, users_data, model): | |
if not isinstance(users_data, list): | |
return | |
for user_data in users_data: | |
pk = user_data['pk'] | |
self.delete_user(user_data, model, pk) | |
def sendJSON(self, payload: dict): | |
self.sendMessage(json.dumps(payload, ensure_ascii=False).encode('utf8')) | |
def onConnect(self, response): | |
print('Connect to server: {0}'.format(response.peer)) | |
if os.path.isfile( | |
os.path.join( | |
os.path.expanduser('~'), | |
'last_disconnect.date', | |
) | |
): | |
with open('last_disconnect.date') as f: | |
last_disconnect = f.readline() | |
else: | |
last_disconnect = datetime.now().strftime('%d.%m.%Y %H:%M') | |
self.sendJSON({ | |
'stream': 'user-bulk', | |
'payload': { | |
'action': 'update', | |
'last_disconnect': last_disconnect, | |
} | |
}) | |
def onOpen(self): | |
print('WebSocket connection open.') | |
def onMessage(self, payload, isBinary): | |
if not isBinary: | |
try: | |
received = json.loads(payload.decode('utf8')) | |
except json.JSONDecodeError: | |
pass | |
else: | |
stream = received['stream'] | |
action = received['payload']['action'] | |
if WIN: | |
if stream == 'user': | |
model = received['payload']['model'] | |
pk = received['payload']['pk'] | |
user_data = received['payload']['data'] | |
if action == 'create': | |
self.create_user(user_data, model, pk) | |
elif action == 'update': | |
self.update_user(user_data, model, pk) | |
elif action == 'delete': | |
self.delete_user(user_data, model, pk, only_mark=True) | |
elif stream == 'user-bulk': | |
users_data = received['payload']['data'] | |
model = received['payload']['model'] | |
if action == 'create': | |
self.create_users(users_data, model) | |
elif action == 'update': | |
self.update_users(users_data, model) | |
elif action == 'delete': | |
self.delete_users(users_data, model) | |
def onClose(self, wasClean, code, reason): | |
with open(os.path.join( | |
os.path.expanduser('~'), | |
'last_disconnect.date', | |
), mode='w') as f: | |
f.write(datetime.now().strftime('%d.%m.%Y %H:%M')) | |
print('WebSocket connection closed: {0}'.format(reason)) | |
self.factory.loop.stop() | |
def main(): | |
factory = WebSocketClientFactory(MAIN_URL) | |
factory.protocol = ACS1cClientProtocol | |
while True: | |
main_connection = loop.create_connection(factory, HOST, PORT) | |
try: | |
transport, protocol = loop.run_until_complete( | |
asyncio.wait_for(main_connection, 5) | |
) | |
except (asyncio.TimeoutError, OSError): | |
loop.run_until_complete(asyncio.sleep(5)) | |
continue | |
loop.run_forever() | |
loop.run_until_complete(asyncio.sleep(5)) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Подскажите, пожалуйста, как достать эту библиотеку
import pythoncom
спасибо