Skip to content

Instantly share code, notes, and snippets.

@apatrushev
Created July 8, 2017 08:47
Show Gist options
  • Save apatrushev/2b1af8bee64177eb4f091ea9691bfcf1 to your computer and use it in GitHub Desktop.
Save apatrushev/2b1af8bee64177eb4f091ea9691bfcf1 to your computer and use it in GitHub Desktop.
import os
import logging
import gevent
from gevent import socket
from gflows import (
SocketSource,
SocketWrapper,
Partitioner,
Transparent,
Stripper,
NullSink,
Logger,
Reader,
Writer,
Tee,
Mapper,
)
def echo_spawner(data, log):
if data == StopIteration:
return True, data
(sock, addr) = data
f = SocketWrapper(sock)
logger = Transparent() >> Partitioner(pattern=os.linesep.encode()) >> Stripper()
logger >>= Logger(log=log) >> NullSink()
logger.start()
(Reader(f) >> Tee(qtee=logger.qin) >> Writer(f)).start()
return False, None
def socket_closer(sock, timeout):
gevent.sleep(timeout)
sock.close()
def main():
logging.basicConfig(level=logging.DEBUG)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', 12345))
sock.listen(5)
gevent.spawn(socket_closer, sock, 10)
flow = SocketSource(sock=sock)
flow >>= Mapper(func=echo_spawner, log=logging)
flow >>= NullSink()
flow.wait()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment