Created
December 21, 2022 01:01
-
-
Save mahmoudimus/eaab9edbd2de7878c4778faf2c62f258 to your computer and use it in GitHub Desktop.
Fake fcntl module for windows
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| This *fake* module is for windows only | |
| Based on: | |
| - https://github.com/facebook/tornado/blob/master/tornado/win32_support.py | |
| - https://github.com/typecode/wikileaks/blob/23a6243df473102a9a1b84f5dde66173df3132b5/lib/tornado/win32_support.py | |
| - https://raw.githubusercontent.com/twisted/twistedmatrix.com-trac-attachments/0a05e3294e7488d8864a73666a007db842c9633e/ticket/bc1/bc1a1f5e1875e3916492b3b509f58cd420eba1d5/b683c2d57b5d19e4fb24a78b44e76ad9129fe19f.patch | |
| - https://github.com/yt-dlp/yt-dlp/blob/1fc089143c79b02b8373ae1d785d5e3a68635d4d/yt_dlp/utils.py#L2095-L2150 | |
| """ | |
| import ctypes | |
| import ctypes.wintypes | |
| import msvcrt | |
| import os | |
| import socket | |
| import errno | |
| # See: http://msdn.microsoft.com/en-us/library/ms738573(VS.85).aspx | |
| ioctlsocket = ctypes.windll.ws2_32.ioctlsocket | |
| ioctlsocket.argtypes = (ctypes.wintypes.HANDLE, ctypes.wintypes.LONG, ctypes.wintypes.ULONG) | |
| ioctlsocket.restype = ctypes.c_int | |
| # See: http://msdn.microsoft.com/en-us/library/ms724935(VS.85).aspx | |
| SetHandleInformation = ctypes.windll.kernel32.SetHandleInformation | |
| SetHandleInformation.argtypes = (ctypes.wintypes.HANDLE, ctypes.wintypes.DWORD, ctypes.wintypes.DWORD) | |
| SetHandleInformation.restype = ctypes.wintypes.BOOL | |
| HANDLE_FLAG_INHERIT = 0x00000001 | |
| DN_ACCESS = 1 | |
| DN_ATTRIB = 32 | |
| DN_CREATE = 4 | |
| DN_DELETE = 8 | |
| DN_MODIFY = 2 | |
| DN_MULTISHOT = -2147483648 | |
| DN_RENAME = 16 | |
| F_DUPFD = 0 | |
| F_EXLCK = 4 | |
| F_GETFD = 1 | |
| F_GETFL = 3 | |
| F_GETLEASE = 1025 | |
| F_GETLK = 12 | |
| F_GETLK64 = 12 | |
| F_GETOWN = 9 | |
| F_GETSIG = 11 | |
| F_NOTIFY = 1026 | |
| F_RDLCK = 0 | |
| F_SETFD = 2 | |
| F_SETFL = 4 | |
| F_SETLEASE = 1024 | |
| F_SETLK = 13 | |
| F_SETLK64 = 13 | |
| F_SETLKW = 14 | |
| F_SETLKW64 = 14 | |
| F_SETOWN = 8 | |
| F_SETSIG = 10 | |
| F_SHLCK = 8 | |
| F_UNLCK = 2 | |
| F_WRLCK = 1 | |
| FASYNC = 8192 | |
| FD_CLOEXEC = 1 | |
| I_ATMARK = 21279 | |
| I_CANPUT = 21282 | |
| I_CKBAND = 21277 | |
| I_FDINSERT = 21264 | |
| I_FIND = 21259 | |
| I_FLUSH = 21253 | |
| I_FLUSHBAND = 21276 | |
| I_GETBAND = 21278 | |
| I_GETCLTIME = 21281 | |
| I_GETSIG = 21258 | |
| I_GRDOPT = 21255 | |
| I_GWROPT = 21268 | |
| I_LINK = 21260 | |
| I_LIST = 21269 | |
| I_LOOK = 21252 | |
| I_NREAD = 21249 | |
| I_PEEK = 21263 | |
| I_PLINK = 21270 | |
| I_POP = 21251 | |
| I_PUNLINK = 21271 | |
| I_PUSH = 21250 | |
| I_RECVFD = 21262 | |
| I_SENDFD = 21265 | |
| I_SETCLTIME = 21280 | |
| I_SETSIG = 21257 | |
| I_SRDOPT = 21254 | |
| I_STR = 21256 | |
| I_SWROPT = 21267 | |
| I_UNLINK = 21261 | |
| LOCK_EX = 2 | |
| LOCK_MAND = 32 | |
| LOCK_NB = 4 | |
| LOCK_READ = 64 | |
| LOCK_RW = 192 | |
| LOCK_SH = 1 | |
| LOCK_UN = 8 | |
| LOCK_WRITE = 128 | |
| os.O_NONBLOCK = 2048 | |
| FIONBIO = 126 | |
| TIOCGWINSZ = 1074295912 | |
| class OVERLAPPED(ctypes.Structure): | |
| _fields_ = [ | |
| ('Internal', ctypes.wintypes.LPVOID), | |
| ('InternalHigh', ctypes.wintypes.LPVOID), | |
| ('Offset', ctypes.wintypes.DWORD), | |
| ('OffsetHigh', ctypes.wintypes.DWORD), | |
| ('hEvent', ctypes.wintypes.HANDLE), | |
| ] | |
| kernel32 = ctypes.windll.kernel32 | |
| LockFileEx = kernel32.LockFileEx | |
| LockFileEx.argtypes = [ | |
| ctypes.wintypes.HANDLE, # hFile | |
| ctypes.wintypes.DWORD, # dwFlags | |
| ctypes.wintypes.DWORD, # dwReserved | |
| ctypes.wintypes.DWORD, # nNumberOfBytesToLockLow | |
| ctypes.wintypes.DWORD, # nNumberOfBytesToLockHigh | |
| ctypes.POINTER(OVERLAPPED) # Overlapped | |
| ] | |
| LockFileEx.restype = ctypes.wintypes.BOOL | |
| UnlockFileEx = kernel32.UnlockFileEx | |
| UnlockFileEx.argtypes = [ | |
| ctypes.wintypes.HANDLE, # hFile | |
| ctypes.wintypes.DWORD, # dwReserved | |
| ctypes.wintypes.DWORD, # nNumberOfBytesToLockLow | |
| ctypes.wintypes.DWORD, # nNumberOfBytesToLockHigh | |
| ctypes.POINTER(OVERLAPPED) # Overlapped | |
| ] | |
| UnlockFileEx.restype = ctypes.wintypes.BOOL | |
| whole_low = 0xffffffff | |
| whole_high = 0x7fffffff | |
| def _lock_file(f, exclusive, block): | |
| overlapped = OVERLAPPED() | |
| overlapped.Offset = 0 | |
| overlapped.OffsetHigh = 0 | |
| overlapped.hEvent = 0 | |
| f._lock_file_overlapped_p = ctypes.pointer(overlapped) | |
| if not LockFileEx(msvcrt.get_osfhandle(f.fileno()), | |
| (0x2 if exclusive else 0x0) | (0x0 if block else 0x1), | |
| 0, whole_low, whole_high, f._lock_file_overlapped_p): | |
| # NB: No argument form of "ctypes.FormatError" does not work on PyPy | |
| raise BlockingIOError(f'Locking file failed: {ctypes.FormatError(ctypes.GetLastError())!r}') | |
| def _unlock_file(f): | |
| assert f._lock_file_overlapped_p | |
| handle = msvcrt.get_osfhandle(f.fileno()) | |
| if not UnlockFileEx(handle, 0, whole_low, whole_high, f._lock_file_overlapped_p): | |
| raise OSError('Unlocking file failed: %r' % ctypes.FormatError()) | |
| def fcntl(fd, op, arg=0): | |
| if op == F_GETFD or op == F_GETFL: | |
| return 0 | |
| elif op == F_SETFD: | |
| # Check that the flag is CLOEXEC and translate | |
| if arg == FD_CLOEXEC: | |
| success = SetHandleInformation(fd, HANDLE_FLAG_INHERIT, arg) | |
| if not success: | |
| raise ctypes.GetLastError() | |
| else: | |
| raise ValueError("Unsupported arg") | |
| #elif op == F_SETFL: | |
| ## Check that the flag is NONBLOCK and translate | |
| #if arg == os.O_NONBLOCK: | |
| ##pass | |
| #result = ioctlsocket(fd, FIONBIO, 1) | |
| #if result != 0: | |
| #raise ctypes.GetLastError() | |
| #else: | |
| #raise ValueError("Unsupported arg") | |
| else: | |
| raise ValueError("Unsupported op") | |
| def ioctl(fd, op, arg=0, mutable_flag=True): | |
| if mutable_flag: | |
| return 0 | |
| else: | |
| return "" | |
| # if op == tty.TIOCGWINSZ: | |
| # width, height = win32conio.getWindowSize() | |
| # return struct.pack("4H", height, width, 0, 0) | |
| # else: | |
| # raise NotImplementedError | |
| def flock(fd, op): | |
| return | |
| def lockf(fd, operation, length=0, start=0, whence=0): | |
| return | |
| class Pipe(object): | |
| """Create an OS independent asynchronous pipe""" | |
| def __init__(self): | |
| # Based on Zope async.py: http://svn.zope.org/zc.ngi/trunk/src/zc/ngi/async.py | |
| self.writer = socket.socket() | |
| # Disable buffering -- pulling the trigger sends 1 byte, | |
| # and we want that sent immediately, to wake up ASAP. | |
| self.writer.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) | |
| count = 0 | |
| while 1: | |
| count += 1 | |
| # Bind to a local port; for efficiency, let the OS pick | |
| # a free port for us. | |
| # Unfortunately, stress tests showed that we may not | |
| # be able to connect to that port ("Address already in | |
| # use") despite that the OS picked it. This appears | |
| # to be a race bug in the Windows socket implementation. | |
| # So we loop until a connect() succeeds (almost always | |
| # on the first try). See the long thread at | |
| # http://mail.zope.org/pipermail/zope/2005-July/160433.html | |
| # for hideous details. | |
| a = socket.socket() | |
| a.bind(("127.0.0.1", 0)) | |
| connect_address = a.getsockname() # assigned (host, port) pair | |
| a.listen(1) | |
| try: | |
| self.writer.connect(connect_address) | |
| break # success | |
| except socket.error as detail: | |
| if detail[0] != errno.WSAEADDRINUSE: | |
| # "Address already in use" is the only error | |
| # I've seen on two WinXP Pro SP2 boxes, under | |
| # Pythons 2.3.5 and 2.4.1. | |
| raise | |
| # (10048, 'Address already in use') | |
| # assert count <= 2 # never triggered in Tim's tests | |
| if count >= 10: # I've never seen it go above 2 | |
| a.close() | |
| self.writer.close() | |
| raise socket.error("Cannot bind trigger!") | |
| # Close `a` and try again. Note: I originally put a short | |
| # sleep() here, but it didn't appear to help or hurt. | |
| a.close() | |
| self.reader, addr = a.accept() | |
| self.reader.setblocking(0) | |
| self.writer.setblocking(0) | |
| a.close() | |
| self.reader_fd = self.reader.fileno() | |
| def read(self): | |
| """Emulate a file descriptors read method""" | |
| try: | |
| return self.reader.recv(1) | |
| except socket.error as ex: | |
| if ex.args[0] == errno.EWOULDBLOCK: | |
| raise IOError | |
| raise | |
| def write(self, data): | |
| """Emulate a file descriptors write method""" | |
| return self.writer.send(data) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment