Skip to content

Instantly share code, notes, and snippets.

@mahmoudimus
Created December 21, 2022 01:01
Show Gist options
  • Select an option

  • Save mahmoudimus/eaab9edbd2de7878c4778faf2c62f258 to your computer and use it in GitHub Desktop.

Select an option

Save mahmoudimus/eaab9edbd2de7878c4778faf2c62f258 to your computer and use it in GitHub Desktop.
Fake fcntl module for windows
"""
This *fake* module is for windows only
Based on:
- https://github.com/facebook/tornado/blob/master/tornado/win32_support.py
- https://github.com/typecode/wikileaks/blob/23a6243df473102a9a1b84f5dde66173df3132b5/lib/tornado/win32_support.py
- https://raw.githubusercontent.com/twisted/twistedmatrix.com-trac-attachments/0a05e3294e7488d8864a73666a007db842c9633e/ticket/bc1/bc1a1f5e1875e3916492b3b509f58cd420eba1d5/b683c2d57b5d19e4fb24a78b44e76ad9129fe19f.patch
- https://github.com/yt-dlp/yt-dlp/blob/1fc089143c79b02b8373ae1d785d5e3a68635d4d/yt_dlp/utils.py#L2095-L2150
"""
import ctypes
import ctypes.wintypes
import msvcrt
import os
import socket
import errno
# See: http://msdn.microsoft.com/en-us/library/ms738573(VS.85).aspx
ioctlsocket = ctypes.windll.ws2_32.ioctlsocket
ioctlsocket.argtypes = (ctypes.wintypes.HANDLE, ctypes.wintypes.LONG, ctypes.wintypes.ULONG)
ioctlsocket.restype = ctypes.c_int
# See: http://msdn.microsoft.com/en-us/library/ms724935(VS.85).aspx
SetHandleInformation = ctypes.windll.kernel32.SetHandleInformation
SetHandleInformation.argtypes = (ctypes.wintypes.HANDLE, ctypes.wintypes.DWORD, ctypes.wintypes.DWORD)
SetHandleInformation.restype = ctypes.wintypes.BOOL
HANDLE_FLAG_INHERIT = 0x00000001
DN_ACCESS = 1
DN_ATTRIB = 32
DN_CREATE = 4
DN_DELETE = 8
DN_MODIFY = 2
DN_MULTISHOT = -2147483648
DN_RENAME = 16
F_DUPFD = 0
F_EXLCK = 4
F_GETFD = 1
F_GETFL = 3
F_GETLEASE = 1025
F_GETLK = 12
F_GETLK64 = 12
F_GETOWN = 9
F_GETSIG = 11
F_NOTIFY = 1026
F_RDLCK = 0
F_SETFD = 2
F_SETFL = 4
F_SETLEASE = 1024
F_SETLK = 13
F_SETLK64 = 13
F_SETLKW = 14
F_SETLKW64 = 14
F_SETOWN = 8
F_SETSIG = 10
F_SHLCK = 8
F_UNLCK = 2
F_WRLCK = 1
FASYNC = 8192
FD_CLOEXEC = 1
I_ATMARK = 21279
I_CANPUT = 21282
I_CKBAND = 21277
I_FDINSERT = 21264
I_FIND = 21259
I_FLUSH = 21253
I_FLUSHBAND = 21276
I_GETBAND = 21278
I_GETCLTIME = 21281
I_GETSIG = 21258
I_GRDOPT = 21255
I_GWROPT = 21268
I_LINK = 21260
I_LIST = 21269
I_LOOK = 21252
I_NREAD = 21249
I_PEEK = 21263
I_PLINK = 21270
I_POP = 21251
I_PUNLINK = 21271
I_PUSH = 21250
I_RECVFD = 21262
I_SENDFD = 21265
I_SETCLTIME = 21280
I_SETSIG = 21257
I_SRDOPT = 21254
I_STR = 21256
I_SWROPT = 21267
I_UNLINK = 21261
LOCK_EX = 2
LOCK_MAND = 32
LOCK_NB = 4
LOCK_READ = 64
LOCK_RW = 192
LOCK_SH = 1
LOCK_UN = 8
LOCK_WRITE = 128
os.O_NONBLOCK = 2048
FIONBIO = 126
TIOCGWINSZ = 1074295912
class OVERLAPPED(ctypes.Structure):
_fields_ = [
('Internal', ctypes.wintypes.LPVOID),
('InternalHigh', ctypes.wintypes.LPVOID),
('Offset', ctypes.wintypes.DWORD),
('OffsetHigh', ctypes.wintypes.DWORD),
('hEvent', ctypes.wintypes.HANDLE),
]
kernel32 = ctypes.windll.kernel32
LockFileEx = kernel32.LockFileEx
LockFileEx.argtypes = [
ctypes.wintypes.HANDLE, # hFile
ctypes.wintypes.DWORD, # dwFlags
ctypes.wintypes.DWORD, # dwReserved
ctypes.wintypes.DWORD, # nNumberOfBytesToLockLow
ctypes.wintypes.DWORD, # nNumberOfBytesToLockHigh
ctypes.POINTER(OVERLAPPED) # Overlapped
]
LockFileEx.restype = ctypes.wintypes.BOOL
UnlockFileEx = kernel32.UnlockFileEx
UnlockFileEx.argtypes = [
ctypes.wintypes.HANDLE, # hFile
ctypes.wintypes.DWORD, # dwReserved
ctypes.wintypes.DWORD, # nNumberOfBytesToLockLow
ctypes.wintypes.DWORD, # nNumberOfBytesToLockHigh
ctypes.POINTER(OVERLAPPED) # Overlapped
]
UnlockFileEx.restype = ctypes.wintypes.BOOL
whole_low = 0xffffffff
whole_high = 0x7fffffff
def _lock_file(f, exclusive, block):
overlapped = OVERLAPPED()
overlapped.Offset = 0
overlapped.OffsetHigh = 0
overlapped.hEvent = 0
f._lock_file_overlapped_p = ctypes.pointer(overlapped)
if not LockFileEx(msvcrt.get_osfhandle(f.fileno()),
(0x2 if exclusive else 0x0) | (0x0 if block else 0x1),
0, whole_low, whole_high, f._lock_file_overlapped_p):
# NB: No argument form of "ctypes.FormatError" does not work on PyPy
raise BlockingIOError(f'Locking file failed: {ctypes.FormatError(ctypes.GetLastError())!r}')
def _unlock_file(f):
assert f._lock_file_overlapped_p
handle = msvcrt.get_osfhandle(f.fileno())
if not UnlockFileEx(handle, 0, whole_low, whole_high, f._lock_file_overlapped_p):
raise OSError('Unlocking file failed: %r' % ctypes.FormatError())
def fcntl(fd, op, arg=0):
if op == F_GETFD or op == F_GETFL:
return 0
elif op == F_SETFD:
# Check that the flag is CLOEXEC and translate
if arg == FD_CLOEXEC:
success = SetHandleInformation(fd, HANDLE_FLAG_INHERIT, arg)
if not success:
raise ctypes.GetLastError()
else:
raise ValueError("Unsupported arg")
#elif op == F_SETFL:
## Check that the flag is NONBLOCK and translate
#if arg == os.O_NONBLOCK:
##pass
#result = ioctlsocket(fd, FIONBIO, 1)
#if result != 0:
#raise ctypes.GetLastError()
#else:
#raise ValueError("Unsupported arg")
else:
raise ValueError("Unsupported op")
def ioctl(fd, op, arg=0, mutable_flag=True):
if mutable_flag:
return 0
else:
return ""
# if op == tty.TIOCGWINSZ:
# width, height = win32conio.getWindowSize()
# return struct.pack("4H", height, width, 0, 0)
# else:
# raise NotImplementedError
def flock(fd, op):
return
def lockf(fd, operation, length=0, start=0, whence=0):
return
class Pipe(object):
"""Create an OS independent asynchronous pipe"""
def __init__(self):
# Based on Zope async.py: http://svn.zope.org/zc.ngi/trunk/src/zc/ngi/async.py
self.writer = socket.socket()
# Disable buffering -- pulling the trigger sends 1 byte,
# and we want that sent immediately, to wake up ASAP.
self.writer.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
count = 0
while 1:
count += 1
# Bind to a local port; for efficiency, let the OS pick
# a free port for us.
# Unfortunately, stress tests showed that we may not
# be able to connect to that port ("Address already in
# use") despite that the OS picked it. This appears
# to be a race bug in the Windows socket implementation.
# So we loop until a connect() succeeds (almost always
# on the first try). See the long thread at
# http://mail.zope.org/pipermail/zope/2005-July/160433.html
# for hideous details.
a = socket.socket()
a.bind(("127.0.0.1", 0))
connect_address = a.getsockname() # assigned (host, port) pair
a.listen(1)
try:
self.writer.connect(connect_address)
break # success
except socket.error as detail:
if detail[0] != errno.WSAEADDRINUSE:
# "Address already in use" is the only error
# I've seen on two WinXP Pro SP2 boxes, under
# Pythons 2.3.5 and 2.4.1.
raise
# (10048, 'Address already in use')
# assert count <= 2 # never triggered in Tim's tests
if count >= 10: # I've never seen it go above 2
a.close()
self.writer.close()
raise socket.error("Cannot bind trigger!")
# Close `a` and try again. Note: I originally put a short
# sleep() here, but it didn't appear to help or hurt.
a.close()
self.reader, addr = a.accept()
self.reader.setblocking(0)
self.writer.setblocking(0)
a.close()
self.reader_fd = self.reader.fileno()
def read(self):
"""Emulate a file descriptors read method"""
try:
return self.reader.recv(1)
except socket.error as ex:
if ex.args[0] == errno.EWOULDBLOCK:
raise IOError
raise
def write(self, data):
"""Emulate a file descriptors write method"""
return self.writer.send(data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment