Instantly share code, notes, and snippets.
Created
December 31, 2018 15:16
-
Star
0
(0)
You must be signed in to star a gist -
Fork
0
(0)
You must be signed in to fork a gist
-
Save dpineiden/300e0ac00e305b69a19443f0061b75ff to your computer and use it in GitHub Desktop.
Object declared on a multiprocess manager
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import asyncio | |
import uvloop | |
import concurrent.futures | |
import functools | |
import sys | |
from multiprocessing import Manager | |
from networktools.colorprint import gprint, bprint, rprint | |
from multiprocessing.managers import SyncManager | |
from socketmanager.services import Service, ServiceManager | |
from tasktools.taskloop import (coromask, | |
renew, | |
simple_fargs, | |
simple_fargs_out) | |
from networktools.library import (my_random_string, | |
choice_input, | |
check_type) | |
class SharedManager(SyncManager): | |
pass | |
""" | |
This Script shows how to enable Service manager to | |
share services among processes | |
""" | |
SharedManager.register('ServiceManager', ServiceManager) | |
SharedManager.register('Service', Service) | |
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) | |
async def hola(): | |
servicios = { | |
'pineiden':{'port': 6688, | |
'host': '10.54.218.13', | |
'name': 'pineiden'}, | |
'collector':{'port': 6688, | |
'host': '10.54.217.15', | |
'name': 'collector'}, | |
'datawork':{'port': 6688, | |
'host': '10.54.218.66', | |
'name': 'datawork'}, | |
} | |
choices = {i: service for i, service in enumerate(servicios.keys())} | |
a,b,c = choice_input(choices) | |
print(a,b,c) | |
def hola_task(servicios): | |
loop = asyncio.get_event_loop() | |
try: | |
args = [servicios] | |
# Create instances | |
task = loop.create_task( | |
coromask( | |
hola, | |
args, | |
simple_fargs_out) | |
) | |
task.add_done_callback( | |
functools.partial( | |
renew, | |
task, | |
hola, | |
simple_fargs_out) | |
) | |
if not loop.is_running(): | |
loop.run_forever() | |
except Exception as exec: | |
raise exec | |
class MultiService: | |
def __init__(self, | |
service_manager: 'Shared Service Manager', | |
idt_list: 'Shared list', | |
flag_dict: 'Shared dict', | |
*args, | |
**kwargs): | |
# shared service manager | |
self.sm = service_manager | |
self.idt = idt_list | |
self.flags = flag_dict | |
# time to sleep | |
self.ts = 5 | |
print(self.sm.__dict__.keys()) | |
sm0 = self.sm | |
[print(method_name) for method_name in dir(sm0) if callable(getattr(sm0, method_name))] | |
async def select_service(self, service_map: dict, | |
count, | |
*args): | |
print("Select Service") | |
print(self.sm) | |
bprint("-=-"*30) | |
for i in range(10): | |
print("Select service | count -> %s" % count) | |
choices = {i: service for i, service in enumerate(service_map.keys())} | |
command, key, msg_type = choice_input(choices) | |
service = service_map.get(command) | |
idt_dict = {i: idt for i, idt in enumerate(self.idt)} | |
if not idt_dict: | |
idt_dict.update({'default':None}) | |
print("Select idt, assign to process") | |
print(idt_dict) | |
print("-"*30) | |
command_idt, key, msg_type = choice_input(idt_dict) | |
status_value_str = input("Give me status value to IDT %s\n" % | |
command_idt) | |
status_value = check_type(status_value_str) | |
# add command_service to self.sm | |
name = service.get('name') | |
exist_service = self.sm.find_services('name', name) | |
if not exist_service: | |
print("Service not exist") | |
self.sm.add_service(**service) | |
print(self.sm.get_services()) | |
# set flag on command_idt as status_value | |
self.flags.update({command_idt: status_value}) | |
await asyncio.sleep(self.ts) | |
count += 1 | |
return [service_map] | |
def select_service_task(self, service_map): | |
loop = asyncio.get_event_loop() | |
rprint("=================================") | |
gprint("Gestionando mensajes en Select Service") | |
rprint("=================================") | |
try: | |
args = [service_map, 0] | |
# Create instances | |
task = loop.create_task( | |
coromask( | |
self.select_service, | |
args, | |
simple_fargs_out) | |
) | |
task.add_done_callback( | |
functools.partial( | |
renew, | |
task, | |
self.select_service, | |
simple_fargs_out) | |
) | |
if not loop.is_running(): | |
loop.run_forever() | |
except Exception as exec: | |
raise exec | |
async def show_service(self, xprint, sign, idt, *args): | |
if self.flags.get(idt): | |
await asyncio.sleep(self.ts) | |
xprint(sign*20) | |
[print(s) for s in self.sm.get_services()] | |
xprint(sign*20) | |
return [xprint, sign, idt] | |
def show_service_task(self, | |
xprint: print, | |
sign: str, | |
idt: str): | |
loop = asyncio.get_event_loop() | |
rprint("=================================") | |
gprint("Gestionando mensajes en engine") | |
rprint("xprint -> %s" % xprint) | |
bprint("sign -> %s" % sign) | |
rprint("=================================") | |
try: | |
args = [xprint, sign, idt] | |
# Create instances | |
task = loop.create_task( | |
coromask( | |
self.show_service, | |
args, | |
simple_fargs_out) | |
) | |
task.add_done_callback( | |
functools.partial( | |
renew, | |
task, | |
self.show_service, | |
simple_fargs_out) | |
) | |
if not loop.is_running(): | |
loop.run_forever() | |
except Exception as exec: | |
raise exec | |
def main(): | |
workers = 5 | |
host = socket.gethostbyname(socket.gethostname()) | |
host_port = 9876 | |
address = (host, host_port) | |
executor = concurrent.futures.ProcessPoolExecutor(workers) | |
servicios = { | |
'pineiden':{'port': 6688, | |
'host': '10.54.218.13', | |
'name': 'pineiden'}, | |
'collector':{'port': 6688, | |
'host': '10.54.217.15', | |
'name': 'collector'}, | |
'datawork':{'port': 6688, | |
'host': '10.54.218.66', | |
'name': 'datawork'}, | |
} | |
idts = { | |
'A0': ('=-', bprint), | |
'A1': ('(o)', rprint), | |
'A2': ('|=|-', gprint) | |
} | |
with SharedManager() as manager: | |
loop = asyncio.get_event_loop() | |
service_manager = manager.ServiceManager() | |
idt_list = manager.list(idts.keys()) | |
flags = manager.dict() | |
flags.update({'default':None}) | |
multis = MultiService(service_manager, idt_list, flags) | |
task_list = [] | |
for idt, group in idts.items(): | |
sign = group[0] | |
xprint = group[1] | |
args = [xprint, sign, idt] | |
show_task = loop.run_in_executor( | |
executor, | |
functools.partial(multis.show_service_task, | |
*args)) | |
task_list.append(show_task) | |
select_task = asyncio.ensure_future(multis.select_service(servicios, 0)) | |
task_list.append(select_task) | |
loop.run_until_complete(asyncio.gather(*task_list)) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment