Created
July 3, 2014 10:25
-
-
Save ksze/57047d5cb3fbb8cfda4e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# pywin32 | |
from win32api import GetLastError | |
from win32event import CreateEvent, WaitForSingleObject | |
from win32file import ReadFile, WriteFile | |
from win32pipe import CreateNamedPipe, ConnectNamedPipe, DisconnectNamedPipe | |
from win32pipe import PIPE_ACCESS_DUPLEX, PIPE_READMODE_MESSAGE, PIPE_TYPE_MESSAGE, PIPE_UNLIMITED_INSTANCES, PIPE_WAIT | |
from winerror import ERROR_IO_PENDING, ERROR_PIPE_CONNECTED | |
# named pipe parameters | |
MESSAGE_PIPE_NAME = r'\\.\pipe\MessagePipe' | |
BUFFER_SIZE = 1024 | |
PIPE_TIMEOUT = 100000 | |
# action codes | |
CHECKREADY = 200 | |
def main(): | |
# Create the named pipe | |
print "x" | |
message_pipe = CreateNamedPipe(MESSAGE_PIPE_NAME, PIPE_ACCESS_DUPLEX, PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT, PIPE_UNLIMITED_INSTANCES, BUFFER_SIZE, BUFFER_SIZE, PIPE_TIMEOUT, None) | |
print "y" | |
last_error = GetLastError() | |
if last_error: | |
print u'Error creating named pipe.' | |
return last_error | |
# Connect to the named pipe | |
print "x" | |
result = ConnectNamedPipe(message_pipe, None) | |
print "y" | |
if result == ERROR_IO_PENDING: | |
print u'ERROR_IO_PENDING' | |
return result | |
if result == ERROR_PIPE_CONNECTED: | |
print u'ERROR_PIPE_CONNECTED' | |
return result | |
if result != 0: | |
print result | |
return result | |
# Try sending a message to the named pipe | |
print "write file x" | |
last_error, bytes_written = WriteFile(message_pipe, '%d' % CHECKREADY) | |
print "write file y" | |
if last_error != 0: | |
print u'Error writing to pipe: {}'.format(last_error) | |
return last_error | |
# Try reading from the named pipe | |
last_error, result = ReadFile(message_pipe, 512, None) | |
print u'last_error: {}'.format(last_error) | |
print u'result: {}'.format(result) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment