import os
from stat import S_ISDIR
from errno import ENOTDIR
from functools import partial
O_DIRECTORY = getattr(os, "O_DIRECTORY", None)
AT_FUNCTIONS = set()
for name in dir(os):
if name.endswith("at") and hasattr(getattr(os, name), "__call__"):
if hasattr(getattr(os, name[:-2], None), "__call__"):
AT_FUNCTIONS.add(name)
del name
class atcontext:
def __init__(self, directory, closefd=True, cloexec=False):
self.cloexec = cloexec
if isinstance(directory, str):
self.directory = directory
self.dirfd = None
self.closefd = True
elif isinstance(directory, int):
self.directory = None
self.dirfd = directory
self.closefd = closefd
else:
raise TypeError(directory)
def _check(self):
if self.directory is None and self.dirfd is None:
raise ValueError("I/O operation on closed file.")
def __enter__(self):
self._check()
if self.dirfd is None:
flags = os.O_RDONLY
if self.cloexec:
flags |= os.O_CLOEXEC
if O_DIRECTORY is not None:
flags |= O_DIRECTORY
elif not S_ISDIR(os.stat(self.directory).st_mode):
# must check early as open blocks on fifos (DoS)
# see Linux's man(2) open, O_DIRECTORY
raise NotADirectoryError(ENOTDIR, os.strerror(ENOTDIR), self.directory)
self.dirfd = os.open(self.directory, flags)
return self
def __exit__(self, typ, value, tb):
if self.closefd:
os.close(self.dirfd)
self.directory = self.dirfd = None
def __getattr__(self, name):
self._check()
nameat = name + "at"
if nameat in AT_FUNCTIONS:
return partial(getattr(os, nameat), self.dirfd)
raise AttributeError(name)
def __dir__(self):
content = super().__dir__() + list(AT_FUNCTIONS)
return sorted(content)
if __name__ == "__main__":
with atcontext("/etc") as at:
print(dir(at))
f = at.open("fstab", os.O_RDONLY)
print(os.read(f, 50))
os.close(f)
with atcontext("/etc/fstab") as at:
pass