paths.py 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110
  1. """Path utility functions."""
  2. # Copyright (c) Jupyter Development Team.
  3. # Distributed under the terms of the Modified BSD License.
  4. # Derived from IPython.utils.path, which is
  5. # Copyright (c) IPython Development Team.
  6. # Distributed under the terms of the Modified BSD License.
  7. from __future__ import annotations
  8. import errno
  9. import os
  10. import site
  11. import stat
  12. import sys
  13. import tempfile
  14. import warnings
  15. from collections.abc import Iterator
  16. from contextlib import contextmanager
  17. from pathlib import Path
  18. from typing import Any, overload
  19. import platformdirs
  20. pjoin = os.path.join
  21. # Capitalize Jupyter in paths only on Windows and MacOS (when not in Homebrew)
  22. if sys.platform == "win32" or (
  23. sys.platform == "darwin" and not sys.prefix.startswith("/opt/homebrew")
  24. ):
  25. APPNAME = "Jupyter"
  26. else:
  27. APPNAME = "jupyter"
  28. # UF_HIDDEN is a stat flag not defined in the stat module.
  29. # It is used by BSD to indicate hidden files.
  30. UF_HIDDEN = getattr(stat, "UF_HIDDEN", 32768)
  31. @overload
  32. def envset(name: str, default: bool = False) -> bool: ...
  33. @overload
  34. def envset(name: str, default: None) -> bool | None: ...
  35. def envset(name: str, default: bool | None = False) -> bool | None:
  36. """Return the boolean value of a given environment variable.
  37. An environment variable is considered set if it is assigned to a value
  38. other than 'no', 'n', 'false', 'off', '0', or '0.0' (case insensitive)
  39. If the environment variable is not defined, the default value is returned.
  40. """
  41. if name not in os.environ:
  42. return default
  43. return os.environ[name].lower() not in ["no", "n", "false", "off", "0", "0.0"]
  44. def use_platform_dirs() -> bool:
  45. """Determine if platformdirs should be used for system-specific paths.
  46. The default is False.
  47. """
  48. return envset("JUPYTER_PLATFORM_DIRS", False)
  49. def get_home_dir() -> str:
  50. """Get the real path of the home directory"""
  51. homedir = Path("~").expanduser()
  52. # Next line will make things work even when /home/ is a symlink to
  53. # /usr/home as it is on FreeBSD, for example
  54. return str(Path(homedir).resolve())
  55. _dtemps: dict[str, str] = {}
  56. def _do_i_own(path: str) -> bool:
  57. """Return whether the current user owns the given path"""
  58. p = Path(path).resolve()
  59. # walk up to first existing parent
  60. while not p.exists() and p != p.parent:
  61. p = p.parent
  62. # simplest check: owner by name
  63. # not always implemented or available
  64. try:
  65. return p.owner() == os.getlogin()
  66. except Exception: # noqa: S110
  67. pass
  68. if hasattr(os, "geteuid"):
  69. try:
  70. st = p.stat()
  71. return st.st_uid == os.geteuid()
  72. except (NotImplementedError, OSError):
  73. # geteuid not always implemented
  74. pass
  75. # no ownership checks worked, check write access
  76. return os.access(p, os.W_OK)
  77. def prefer_environment_over_user() -> bool:
  78. """Determine if environment-level paths should take precedence over user-level paths."""
  79. # If JUPYTER_PREFER_ENV_PATH is defined, that signals user intent, so return its value
  80. if "JUPYTER_PREFER_ENV_PATH" in os.environ:
  81. return envset("JUPYTER_PREFER_ENV_PATH")
  82. # If we are in a Python virtualenv, default to True (see https://docs.python.org/3/library/venv.html#venv-def)
  83. if sys.prefix != sys.base_prefix and _do_i_own(sys.prefix):
  84. return True
  85. # If sys.prefix indicates Python comes from a conda/mamba environment that is not the root environment, default to True
  86. if (
  87. "CONDA_PREFIX" in os.environ
  88. and sys.prefix.startswith(os.environ["CONDA_PREFIX"])
  89. and os.environ.get("CONDA_DEFAULT_ENV", "base") != "base"
  90. and _do_i_own(sys.prefix)
  91. ):
  92. return True
  93. return False
  94. def _mkdtemp_once(name: str) -> str:
  95. """Make or reuse a temporary directory.
  96. If this is called with the same name in the same process, it will return
  97. the same directory.
  98. """
  99. try:
  100. return _dtemps[name]
  101. except KeyError:
  102. d = _dtemps[name] = tempfile.mkdtemp(prefix=name + "-")
  103. return d
  104. def jupyter_config_dir() -> str:
  105. """Get the Jupyter config directory for this platform and user.
  106. Returns JUPYTER_CONFIG_DIR if defined, otherwise the appropriate
  107. directory for the platform.
  108. """
  109. env = os.environ
  110. if env.get("JUPYTER_NO_CONFIG"):
  111. return _mkdtemp_once("jupyter-clean-cfg")
  112. if env.get("JUPYTER_CONFIG_DIR"):
  113. return env["JUPYTER_CONFIG_DIR"]
  114. if use_platform_dirs():
  115. return platformdirs.user_config_dir(APPNAME, appauthor=False)
  116. home_dir = get_home_dir()
  117. return pjoin(home_dir, ".jupyter")
  118. def jupyter_data_dir() -> str:
  119. """Get the config directory for Jupyter data files for this platform and user.
  120. These are non-transient, non-configuration files.
  121. Returns JUPYTER_DATA_DIR if defined, else a platform-appropriate path.
  122. """
  123. env = os.environ
  124. if env.get("JUPYTER_DATA_DIR"):
  125. return env["JUPYTER_DATA_DIR"]
  126. if use_platform_dirs():
  127. return platformdirs.user_data_dir(APPNAME, appauthor=False)
  128. home = get_home_dir()
  129. if sys.platform == "darwin":
  130. return str(Path(home, "Library", "Jupyter"))
  131. # Bug in mypy which thinks it's unreachable: https://github.com/python/mypy/issues/10773
  132. if sys.platform == "win32":
  133. appdata = os.environ.get("APPDATA", None)
  134. if appdata:
  135. return str(Path(appdata, "jupyter").resolve())
  136. return pjoin(jupyter_config_dir(), "data")
  137. # Linux, non-OS X Unix, AIX, etc.
  138. # Bug in mypy which thinks it's unreachable: https://github.com/python/mypy/issues/10773
  139. xdg = env.get("XDG_DATA_HOME", None)
  140. if not xdg:
  141. xdg = pjoin(home, ".local", "share")
  142. return pjoin(xdg, "jupyter")
  143. def jupyter_runtime_dir() -> str:
  144. """Return the runtime dir for transient jupyter files.
  145. Returns JUPYTER_RUNTIME_DIR if defined.
  146. The default is now (data_dir)/runtime on all platforms;
  147. we no longer use XDG_RUNTIME_DIR after various problems.
  148. """
  149. env = os.environ
  150. if env.get("JUPYTER_RUNTIME_DIR"):
  151. return env["JUPYTER_RUNTIME_DIR"]
  152. return pjoin(jupyter_data_dir(), "runtime")
  153. # %PROGRAMDATA% is not safe by default, require opt-in to trust it
  154. _use_programdata: bool = envset("JUPYTER_USE_PROGRAMDATA")
  155. # _win_programdata is a path str if we're using it, None otherwise
  156. _win_programdata: str | None = None
  157. if os.name == "nt" and _use_programdata:
  158. _win_programdata = os.environ.get("PROGRAMDATA", None)
  159. if use_platform_dirs():
  160. if os.name == "nt" and not _use_programdata:
  161. # default PROGRAMDATA used by site_* is not safe by default on Windows
  162. SYSTEM_JUPYTER_PATH = [str(Path(sys.prefix, "share", "jupyter"))]
  163. else:
  164. SYSTEM_JUPYTER_PATH = platformdirs.site_data_dir(
  165. APPNAME, appauthor=False, multipath=True
  166. ).split(os.pathsep)
  167. else: # noqa: PLR5501
  168. # default dirs
  169. if os.name == "nt":
  170. # PROGRAMDATA is not defined by default on XP, and not safe by default
  171. if _win_programdata:
  172. SYSTEM_JUPYTER_PATH = [pjoin(_win_programdata, "jupyter")]
  173. else:
  174. SYSTEM_JUPYTER_PATH = [str(Path(sys.prefix, "share", "jupyter"))]
  175. else:
  176. SYSTEM_JUPYTER_PATH = [
  177. "/usr/local/share/jupyter",
  178. "/usr/share/jupyter",
  179. ]
  180. ENV_JUPYTER_PATH: list[str] = [str(Path(sys.prefix, "share", "jupyter"))]
  181. def jupyter_path(*subdirs: str) -> list[str]:
  182. """Return a list of directories to search for data files.
  183. There are four sources of paths to search:
  184. - $JUPYTER_PATH environment variable (always highest priority)
  185. - user directories (e.g. ~/.local/share/jupyter)
  186. - environment directories (e.g. {sys.prefix}/share/jupyter)
  187. - system-wide paths (e.g. /usr/local/share/jupyter)
  188. JUPYTER_PATH environment variable has highest priority, if defined,
  189. and is purely additive.
  190. If the JUPYTER_PREFER_ENV_PATH environment variable is set, the environment-level
  191. directories will have priority over user-level directories.
  192. You can also set JUPYTER_PREFER_ENV_PATH=0 to explicitly prefer user directories.
  193. If Jupyter detects that you are in a virtualenv or conda environment,
  194. environment paths are also preferred to user paths,
  195. otherwise user paths are preferred to environment paths.
  196. If the Python site.ENABLE_USER_SITE variable is True, we also add the
  197. appropriate Python user site subdirectory to the user-level directories.
  198. Finally, system-wide directories, such as `/usr/local/share/jupyter` are searched.
  199. If ``*subdirs`` are given, that subdirectory will be added to each element.
  200. .. versionchanged:: 5.8
  201. On Windows, %PROGRAMDATA% will be used as a system-wide path only if
  202. the JUPYTER_USE_PROGRAMDATA env is set.
  203. By default, there is no default system-wide path on Windows and the env path
  204. is used instead.
  205. Examples:
  206. >>> jupyter_path()
  207. ['~/.local/jupyter', '/usr/local/share/jupyter']
  208. >>> jupyter_path('kernels')
  209. ['~/.local/jupyter/kernels', '/usr/local/share/jupyter/kernels']
  210. """
  211. paths: list[str] = []
  212. # highest priority is explicit environment variable
  213. if os.environ.get("JUPYTER_PATH"):
  214. paths.extend(p.rstrip(os.sep) for p in os.environ["JUPYTER_PATH"].split(os.pathsep))
  215. # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag
  216. user = [jupyter_data_dir()]
  217. if site.ENABLE_USER_SITE:
  218. # Check if site.getuserbase() exists to be compatible with virtualenv,
  219. # which often does not have this method.
  220. userbase: str | None
  221. userbase = site.getuserbase() if hasattr(site, "getuserbase") else site.USER_BASE
  222. if userbase:
  223. userdir = str(Path(userbase, "share", "jupyter"))
  224. if userdir not in user:
  225. user.append(userdir)
  226. # Windows usually doesn't have a 'system' prefix,
  227. # so 'system' and 'env' are the same
  228. # make sure that env can still be preferred in this case
  229. if ENV_JUPYTER_PATH == SYSTEM_JUPYTER_PATH:
  230. env = ENV_JUPYTER_PATH
  231. else:
  232. env = [p for p in ENV_JUPYTER_PATH if p not in SYSTEM_JUPYTER_PATH]
  233. if prefer_environment_over_user():
  234. paths.extend(env)
  235. paths.extend(user)
  236. else:
  237. paths.extend(user)
  238. paths.extend(env)
  239. # finally, add system paths (can overlap with env, so avoid duplicates)
  240. for _path in SYSTEM_JUPYTER_PATH:
  241. if _path not in paths:
  242. paths.append(_path)
  243. # add subdir, if requested
  244. if subdirs:
  245. paths = [pjoin(p, *subdirs) for p in paths]
  246. return paths
  247. ENV_CONFIG_PATH: list[str] = [str(Path(sys.prefix, "etc", "jupyter"))]
  248. if use_platform_dirs():
  249. if os.name == "nt" and not _use_programdata:
  250. # default PROGRAMDATA is not safe by default on Windows
  251. # use ENV to avoid an empty list, since some may assume this is non-empty
  252. SYSTEM_CONFIG_PATH = ENV_CONFIG_PATH[:]
  253. else:
  254. SYSTEM_CONFIG_PATH = platformdirs.site_config_dir(
  255. APPNAME, appauthor=False, multipath=True
  256. ).split(os.pathsep)
  257. elif os.name == "nt":
  258. # PROGRAMDATA is not defined by default on XP, and not safe by default
  259. # but make sure it's not empty
  260. if _win_programdata:
  261. SYSTEM_CONFIG_PATH = [str(Path(_win_programdata, "jupyter"))]
  262. else:
  263. SYSTEM_CONFIG_PATH = ENV_CONFIG_PATH[:]
  264. else:
  265. SYSTEM_CONFIG_PATH = [
  266. "/usr/local/etc/jupyter",
  267. "/etc/jupyter",
  268. ]
  269. def jupyter_config_path() -> list[str]:
  270. """Return the search path for Jupyter config files as a list.
  271. If the JUPYTER_PREFER_ENV_PATH environment variable is set, the
  272. environment-level directories will have priority over user-level
  273. directories.
  274. If the Python site.ENABLE_USER_SITE variable is True, we also add the
  275. appropriate Python user site subdirectory to the user-level directories.
  276. Finally, system-wide directories such as `/usr/local/etc/jupyter` are searched.
  277. .. versionchanged:: 5.8
  278. On Windows, %PROGRAMDATA% will be used as a system-wide path only if
  279. the JUPYTER_USE_PROGRAMDATA env is set.
  280. By default, there is no system-wide config path on Windows.
  281. Examples:
  282. >>> jupyter_config_path()
  283. ['~/.jupyter', '~/.local/etc/jupyter', '/usr/local/etc/jupyter', '/etc/jupyter']
  284. """
  285. if os.environ.get("JUPYTER_NO_CONFIG"):
  286. # jupyter_config_dir makes a blank config when JUPYTER_NO_CONFIG is set.
  287. return [jupyter_config_dir()]
  288. paths: list[str] = []
  289. # highest priority is explicit environment variable
  290. if os.environ.get("JUPYTER_CONFIG_PATH"):
  291. paths.extend(p.rstrip(os.sep) for p in os.environ["JUPYTER_CONFIG_PATH"].split(os.pathsep))
  292. # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag
  293. user = [jupyter_config_dir()]
  294. if site.ENABLE_USER_SITE:
  295. userbase: str | None
  296. # Check if site.getuserbase() exists to be compatible with virtualenv,
  297. # which often does not have this method.
  298. userbase = site.getuserbase() if hasattr(site, "getuserbase") else site.USER_BASE
  299. if userbase:
  300. userdir = str(Path(userbase, "etc", "jupyter"))
  301. if userdir not in user:
  302. user.append(userdir)
  303. # Windows usually doesn't have a 'system' prefix,
  304. # so 'system' and 'env' are the same
  305. # make sure that env can still be preferred in this case
  306. if ENV_CONFIG_PATH == SYSTEM_CONFIG_PATH:
  307. env = ENV_CONFIG_PATH
  308. else:
  309. env = [p for p in ENV_CONFIG_PATH if p not in SYSTEM_CONFIG_PATH]
  310. if prefer_environment_over_user():
  311. paths.extend(env)
  312. paths.extend(user)
  313. else:
  314. paths.extend(user)
  315. paths.extend(env)
  316. # Finally, system path
  317. if ENV_CONFIG_PATH != SYSTEM_CONFIG_PATH:
  318. paths.extend(SYSTEM_CONFIG_PATH)
  319. return paths
  320. def exists(path: str) -> bool:
  321. """Replacement for `os.path.exists` which works for host mapped volumes
  322. on Windows containers
  323. """
  324. try:
  325. os.lstat(path)
  326. except OSError:
  327. return False
  328. return True
  329. def is_file_hidden_win(abs_path: str | Path, stat_res: Any | None = None) -> bool:
  330. """Is a file hidden?
  331. This only checks the file itself; it should be called in combination with
  332. checking the directory containing the file.
  333. Use is_hidden() instead to check the file and its parent directories.
  334. Parameters
  335. ----------
  336. abs_path : unicode
  337. The absolute path to check.
  338. stat_res : os.stat_result, optional
  339. The result of calling stat() on abs_path. If not passed, this function
  340. will call stat() internally.
  341. """
  342. abs_path = Path(abs_path)
  343. if abs_path.name.startswith("."):
  344. return True
  345. if stat_res is None:
  346. try:
  347. stat_res = Path(abs_path).stat()
  348. except OSError as e:
  349. if e.errno == errno.ENOENT:
  350. return False
  351. raise
  352. try:
  353. if (
  354. stat_res.st_file_attributes # type:ignore[union-attr]
  355. & stat.FILE_ATTRIBUTE_HIDDEN # type:ignore[attr-defined]
  356. ):
  357. return True
  358. except AttributeError:
  359. # allow AttributeError on PyPy for Windows
  360. # 'stat_result' object has no attribute 'st_file_attributes'
  361. # https://foss.heptapod.net/pypy/pypy/-/issues/3469
  362. warnings.warn(
  363. "hidden files are not detectable on this system, so no file will be marked as hidden.",
  364. stacklevel=2,
  365. )
  366. return False
  367. def is_file_hidden_posix(abs_path: str | Path, stat_res: Any | None = None) -> bool:
  368. """Is a file hidden?
  369. This only checks the file itself; it should be called in combination with
  370. checking the directory containing the file.
  371. Use is_hidden() instead to check the file and its parent directories.
  372. Parameters
  373. ----------
  374. abs_path : unicode
  375. The absolute path to check.
  376. stat_res : os.stat_result, optional
  377. The result of calling stat() on abs_path. If not passed, this function
  378. will call stat() internally.
  379. """
  380. abs_path = Path(abs_path)
  381. if abs_path.name.startswith("."):
  382. return True
  383. if stat_res is None or stat.S_ISLNK(stat_res.st_mode):
  384. try:
  385. stat_res = abs_path.stat()
  386. except OSError as e:
  387. if e.errno == errno.ENOENT:
  388. return False
  389. raise
  390. # check that dirs can be listed
  391. if stat.S_ISDIR(stat_res.st_mode): # noqa: SIM102
  392. # use x-access, not actual listing, in case of slow/large listings
  393. if not os.access(abs_path, os.X_OK | os.R_OK):
  394. return True
  395. # check UF_HIDDEN
  396. if getattr(stat_res, "st_flags", 0) & UF_HIDDEN:
  397. return True
  398. return False
  399. if sys.platform == "win32":
  400. is_file_hidden = is_file_hidden_win
  401. else:
  402. is_file_hidden = is_file_hidden_posix
  403. def is_hidden(abs_path: str | Path, abs_root: str | Path = "") -> bool:
  404. """Is a file hidden or contained in a hidden directory?
  405. This will start with the rightmost path element and work backwards to the
  406. given root to see if a path is hidden or in a hidden directory. Hidden is
  407. determined by either name starting with '.' or the UF_HIDDEN flag as
  408. reported by stat.
  409. If abs_path is the same directory as abs_root, it will be visible even if
  410. that is a hidden folder. This only checks the visibility of files
  411. and directories *within* abs_root.
  412. Parameters
  413. ----------
  414. abs_path : str or Path
  415. The absolute path to check for hidden directories.
  416. abs_root : str or Path
  417. The absolute path of the root directory in which hidden directories
  418. should be checked for.
  419. """
  420. abs_path = Path(os.path.normpath(abs_path))
  421. if abs_root:
  422. abs_root = Path(os.path.normpath(abs_root))
  423. else:
  424. abs_root = list(abs_path.parents)[-1]
  425. if abs_path == abs_root:
  426. # root itself is never hidden
  427. return False
  428. # check that arguments are valid
  429. if not abs_path.is_absolute():
  430. _msg = f"{abs_path=} is not absolute. abs_path must be absolute."
  431. raise ValueError(_msg)
  432. if not abs_root.is_absolute():
  433. _msg = f"{abs_root=} is not absolute. abs_root must be absolute."
  434. raise ValueError(_msg)
  435. if not abs_path.is_relative_to(abs_root):
  436. _msg = (
  437. f"{abs_path=} is not a subdirectory of {abs_root=}. abs_path must be within abs_root."
  438. )
  439. raise ValueError(_msg)
  440. if is_file_hidden(abs_path):
  441. return True
  442. relative_path = abs_path.relative_to(abs_root)
  443. if any(part.startswith(".") for part in relative_path.parts):
  444. return True
  445. # check UF_HIDDEN on any location up to root.
  446. # is_file_hidden() already checked the file, so start from its parent dir
  447. for parent in abs_path.parents:
  448. if not parent.exists():
  449. continue
  450. if parent == abs_root:
  451. break
  452. try:
  453. # may fail on Windows junctions
  454. st = parent.lstat()
  455. except OSError:
  456. return True
  457. if getattr(st, "st_flags", 0) & UF_HIDDEN:
  458. return True
  459. return False
  460. def win32_restrict_file_to_user(fname: str) -> None:
  461. """Secure a windows file to read-only access for the user.
  462. Follows guidance from win32 library creator:
  463. http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html
  464. This method should be executed against an already generated file which
  465. has no secrets written to it yet.
  466. Parameters
  467. ----------
  468. fname : unicode
  469. The path to the file to secure
  470. """
  471. try:
  472. import win32api # noqa: PLC0415
  473. except ImportError:
  474. return _win32_restrict_file_to_user_ctypes(fname)
  475. import ntsecuritycon as con # noqa: PLC0415
  476. import win32security # noqa: PLC0415
  477. # everyone, _domain, _type = win32security.LookupAccountName("", "Everyone")
  478. admins = win32security.CreateWellKnownSid(win32security.WinBuiltinAdministratorsSid)
  479. user, _domain, _type = win32security.LookupAccountName(
  480. "", win32api.GetUserNameEx(win32api.NameSamCompatible)
  481. )
  482. sd = win32security.GetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION)
  483. dacl = win32security.ACL()
  484. # dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, everyone)
  485. dacl.AddAccessAllowedAce(
  486. win32security.ACL_REVISION,
  487. con.FILE_GENERIC_READ | con.FILE_GENERIC_WRITE | con.DELETE,
  488. user,
  489. )
  490. dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, admins)
  491. sd.SetSecurityDescriptorDacl(1, dacl, 0)
  492. win32security.SetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION, sd)
  493. return None
  494. def _win32_restrict_file_to_user_ctypes(fname: str) -> None:
  495. """Secure a windows file to read-only access for the user.
  496. Follows guidance from win32 library creator:
  497. http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html
  498. This method should be executed against an already generated file which
  499. has no secrets written to it yet.
  500. Parameters
  501. ----------
  502. fname : unicode
  503. The path to the file to secure
  504. """
  505. import ctypes # noqa: PLC0415
  506. from ctypes import wintypes # noqa: PLC0415
  507. advapi32 = ctypes.WinDLL("advapi32", use_last_error=True) # type:ignore[attr-defined]
  508. secur32 = ctypes.WinDLL("secur32", use_last_error=True) # type:ignore[attr-defined]
  509. NameSamCompatible = 2
  510. WinBuiltinAdministratorsSid = 26
  511. DACL_SECURITY_INFORMATION = 4
  512. ACL_REVISION = 2
  513. ERROR_INSUFFICIENT_BUFFER = 122
  514. ERROR_MORE_DATA = 234
  515. SYNCHRONIZE = 0x100000
  516. DELETE = 0x00010000
  517. STANDARD_RIGHTS_REQUIRED = 0xF0000
  518. STANDARD_RIGHTS_READ = 0x20000
  519. STANDARD_RIGHTS_WRITE = 0x20000
  520. FILE_READ_DATA = 1
  521. FILE_READ_EA = 8
  522. FILE_READ_ATTRIBUTES = 128
  523. FILE_WRITE_DATA = 2
  524. FILE_APPEND_DATA = 4
  525. FILE_WRITE_EA = 16
  526. FILE_WRITE_ATTRIBUTES = 256
  527. FILE_ALL_ACCESS = STANDARD_RIGHTS_REQUIRED | SYNCHRONIZE | 0x1FF
  528. FILE_GENERIC_READ = (
  529. STANDARD_RIGHTS_READ | FILE_READ_DATA | FILE_READ_ATTRIBUTES | FILE_READ_EA | SYNCHRONIZE
  530. )
  531. FILE_GENERIC_WRITE = (
  532. STANDARD_RIGHTS_WRITE
  533. | FILE_WRITE_DATA
  534. | FILE_WRITE_ATTRIBUTES
  535. | FILE_WRITE_EA
  536. | FILE_APPEND_DATA
  537. | SYNCHRONIZE
  538. )
  539. class ACL(ctypes.Structure):
  540. _fields_ = [
  541. ("AclRevision", wintypes.BYTE),
  542. ("Sbz1", wintypes.BYTE),
  543. ("AclSize", wintypes.WORD),
  544. ("AceCount", wintypes.WORD),
  545. ("Sbz2", wintypes.WORD),
  546. ]
  547. PSID = ctypes.c_void_p
  548. PACL = ctypes.POINTER(ACL)
  549. PSECURITY_DESCRIPTOR = ctypes.POINTER(wintypes.BYTE)
  550. def _nonzero_success(result: int, func: Any, args: Any) -> Any: # noqa: ARG001
  551. if not result:
  552. raise ctypes.WinError(ctypes.get_last_error()) # type:ignore[attr-defined]
  553. return args
  554. secur32.GetUserNameExW.errcheck = _nonzero_success
  555. secur32.GetUserNameExW.restype = wintypes.BOOL
  556. secur32.GetUserNameExW.argtypes = (
  557. ctypes.c_int, # EXTENDED_NAME_FORMAT NameFormat
  558. wintypes.LPWSTR, # LPWSTR lpNameBuffer,
  559. wintypes.PULONG, # PULONG nSize
  560. )
  561. advapi32.CreateWellKnownSid.errcheck = _nonzero_success
  562. advapi32.CreateWellKnownSid.restype = wintypes.BOOL
  563. advapi32.CreateWellKnownSid.argtypes = (
  564. wintypes.DWORD, # WELL_KNOWN_SID_TYPE WellKnownSidType
  565. PSID, # PSID DomainSid
  566. PSID, # PSID pSid
  567. wintypes.PDWORD, # DWORD *cbSid
  568. )
  569. advapi32.LookupAccountNameW.errcheck = _nonzero_success
  570. advapi32.LookupAccountNameW.restype = wintypes.BOOL
  571. advapi32.LookupAccountNameW.argtypes = (
  572. wintypes.LPWSTR, # LPCWSTR lpSystemName
  573. wintypes.LPWSTR, # LPCWSTR lpAccountName
  574. PSID, # PSID Sid
  575. wintypes.LPDWORD, # LPDWORD cbSid
  576. wintypes.LPWSTR, # LPCWSTR ReferencedDomainName
  577. wintypes.LPDWORD, # LPDWORD cchReferencedDomainName
  578. wintypes.LPDWORD, # PSID_NAME_USE peUse
  579. )
  580. advapi32.AddAccessAllowedAce.errcheck = _nonzero_success
  581. advapi32.AddAccessAllowedAce.restype = wintypes.BOOL
  582. advapi32.AddAccessAllowedAce.argtypes = (
  583. PACL, # PACL pAcl
  584. wintypes.DWORD, # DWORD dwAceRevision
  585. wintypes.DWORD, # DWORD AccessMask
  586. PSID, # PSID pSid
  587. )
  588. advapi32.SetSecurityDescriptorDacl.errcheck = _nonzero_success
  589. advapi32.SetSecurityDescriptorDacl.restype = wintypes.BOOL
  590. advapi32.SetSecurityDescriptorDacl.argtypes = (
  591. PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor
  592. wintypes.BOOL, # BOOL bDaclPresent
  593. PACL, # PACL pDacl
  594. wintypes.BOOL, # BOOL bDaclDefaulted
  595. )
  596. advapi32.GetFileSecurityW.errcheck = _nonzero_success
  597. advapi32.GetFileSecurityW.restype = wintypes.BOOL
  598. advapi32.GetFileSecurityW.argtypes = (
  599. wintypes.LPCWSTR, # LPCWSTR lpFileName
  600. wintypes.DWORD, # SECURITY_INFORMATION RequestedInformation
  601. PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor
  602. wintypes.DWORD, # DWORD nLength
  603. wintypes.LPDWORD, # LPDWORD lpnLengthNeeded
  604. )
  605. advapi32.SetFileSecurityW.errcheck = _nonzero_success
  606. advapi32.SetFileSecurityW.restype = wintypes.BOOL
  607. advapi32.SetFileSecurityW.argtypes = (
  608. wintypes.LPCWSTR, # LPCWSTR lpFileName
  609. wintypes.DWORD, # SECURITY_INFORMATION SecurityInformation
  610. PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor
  611. )
  612. advapi32.MakeAbsoluteSD.errcheck = _nonzero_success
  613. advapi32.MakeAbsoluteSD.restype = wintypes.BOOL
  614. advapi32.MakeAbsoluteSD.argtypes = (
  615. PSECURITY_DESCRIPTOR, # pSelfRelativeSecurityDescriptor
  616. PSECURITY_DESCRIPTOR, # pAbsoluteSecurityDescriptor
  617. wintypes.LPDWORD, # LPDWORD lpdwAbsoluteSecurityDescriptorSize
  618. PACL, # PACL pDacl
  619. wintypes.LPDWORD, # LPDWORD lpdwDaclSize
  620. PACL, # PACL pSacl
  621. wintypes.LPDWORD, # LPDWORD lpdwSaclSize
  622. PSID, # PSID pOwner
  623. wintypes.LPDWORD, # LPDWORD lpdwOwnerSize
  624. PSID, # PSID pPrimaryGroup
  625. wintypes.LPDWORD, # LPDWORD lpdwPrimaryGroupSize
  626. )
  627. advapi32.MakeSelfRelativeSD.errcheck = _nonzero_success
  628. advapi32.MakeSelfRelativeSD.restype = wintypes.BOOL
  629. advapi32.MakeSelfRelativeSD.argtypes = (
  630. PSECURITY_DESCRIPTOR, # pAbsoluteSecurityDescriptor
  631. PSECURITY_DESCRIPTOR, # pSelfRelativeSecurityDescriptor
  632. wintypes.LPDWORD, # LPDWORD lpdwBufferLength
  633. )
  634. advapi32.InitializeAcl.errcheck = _nonzero_success
  635. advapi32.InitializeAcl.restype = wintypes.BOOL
  636. advapi32.InitializeAcl.argtypes = (
  637. PACL, # PACL pAcl,
  638. wintypes.DWORD, # DWORD nAclLength,
  639. wintypes.DWORD, # DWORD dwAclRevision
  640. )
  641. def CreateWellKnownSid(WellKnownSidType: Any) -> Any:
  642. # return a SID for predefined aliases
  643. pSid = (ctypes.c_char * 1)()
  644. cbSid = wintypes.DWORD()
  645. try:
  646. advapi32.CreateWellKnownSid(WellKnownSidType, None, pSid, ctypes.byref(cbSid))
  647. except OSError as e:
  648. if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
  649. raise
  650. pSid = (ctypes.c_char * cbSid.value)()
  651. advapi32.CreateWellKnownSid(WellKnownSidType, None, pSid, ctypes.byref(cbSid))
  652. return pSid[:]
  653. def GetUserNameEx(NameFormat: Any) -> Any:
  654. # return the user or other security principal associated with
  655. # the calling thread
  656. nSize = ctypes.pointer(ctypes.c_ulong(0))
  657. try:
  658. secur32.GetUserNameExW(NameFormat, None, nSize)
  659. except OSError as e:
  660. if e.winerror != ERROR_MORE_DATA: # type:ignore[attr-defined]
  661. raise
  662. if not nSize.contents.value:
  663. return None
  664. lpNameBuffer = ctypes.create_unicode_buffer(nSize.contents.value)
  665. secur32.GetUserNameExW(NameFormat, lpNameBuffer, nSize)
  666. return lpNameBuffer.value
  667. def LookupAccountName(lpSystemName: Any, lpAccountName: Any) -> Any:
  668. # return a security identifier (SID) for an account on a system
  669. # and the name of the domain on which the account was found
  670. cbSid = wintypes.DWORD(0)
  671. cchReferencedDomainName = wintypes.DWORD(0)
  672. peUse = wintypes.DWORD(0)
  673. try:
  674. advapi32.LookupAccountNameW(
  675. lpSystemName,
  676. lpAccountName,
  677. None,
  678. ctypes.byref(cbSid),
  679. None,
  680. ctypes.byref(cchReferencedDomainName),
  681. ctypes.byref(peUse),
  682. )
  683. except OSError as e:
  684. if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
  685. raise
  686. Sid = ctypes.create_unicode_buffer("", cbSid.value)
  687. pSid = ctypes.cast(ctypes.pointer(Sid), wintypes.LPVOID)
  688. lpReferencedDomainName = ctypes.create_unicode_buffer("", cchReferencedDomainName.value + 1)
  689. success = advapi32.LookupAccountNameW(
  690. lpSystemName,
  691. lpAccountName,
  692. pSid,
  693. ctypes.byref(cbSid),
  694. lpReferencedDomainName,
  695. ctypes.byref(cchReferencedDomainName),
  696. ctypes.byref(peUse),
  697. )
  698. if not success:
  699. raise ctypes.WinError() # type:ignore[attr-defined]
  700. return pSid, lpReferencedDomainName.value, peUse.value
  701. def AddAccessAllowedAce(pAcl: Any, dwAceRevision: Any, AccessMask: Any, pSid: Any) -> Any:
  702. # add an access-allowed access control entry (ACE)
  703. # to an access control list (ACL)
  704. advapi32.AddAccessAllowedAce(pAcl, dwAceRevision, AccessMask, pSid)
  705. def GetFileSecurity(lpFileName: Any, RequestedInformation: Any) -> Any:
  706. # return information about the security of a file or directory
  707. nLength = wintypes.DWORD(0)
  708. try:
  709. advapi32.GetFileSecurityW(
  710. lpFileName,
  711. RequestedInformation,
  712. None,
  713. 0,
  714. ctypes.byref(nLength),
  715. )
  716. except OSError as e:
  717. if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
  718. raise
  719. if not nLength.value:
  720. return None
  721. pSecurityDescriptor = (wintypes.BYTE * nLength.value)()
  722. advapi32.GetFileSecurityW(
  723. lpFileName,
  724. RequestedInformation,
  725. pSecurityDescriptor,
  726. nLength,
  727. ctypes.byref(nLength),
  728. )
  729. return pSecurityDescriptor
  730. def SetFileSecurity(
  731. lpFileName: Any, RequestedInformation: Any, pSecurityDescriptor: Any
  732. ) -> Any:
  733. # set the security of a file or directory object
  734. advapi32.SetFileSecurityW(lpFileName, RequestedInformation, pSecurityDescriptor)
  735. def SetSecurityDescriptorDacl(
  736. pSecurityDescriptor: Any, bDaclPresent: Any, pDacl: Any, bDaclDefaulted: Any
  737. ) -> Any:
  738. # set information in a discretionary access control list (DACL)
  739. advapi32.SetSecurityDescriptorDacl(pSecurityDescriptor, bDaclPresent, pDacl, bDaclDefaulted)
  740. def MakeAbsoluteSD(pSelfRelativeSecurityDescriptor: Any) -> Any:
  741. # return a security descriptor in absolute format
  742. # by using a security descriptor in self-relative format as a template
  743. pAbsoluteSecurityDescriptor = None
  744. lpdwAbsoluteSecurityDescriptorSize = wintypes.DWORD(0)
  745. pDacl = None
  746. lpdwDaclSize = wintypes.DWORD(0)
  747. pSacl = None
  748. lpdwSaclSize = wintypes.DWORD(0)
  749. pOwner = None
  750. lpdwOwnerSize = wintypes.DWORD(0)
  751. pPrimaryGroup = None
  752. lpdwPrimaryGroupSize = wintypes.DWORD(0)
  753. try:
  754. advapi32.MakeAbsoluteSD(
  755. pSelfRelativeSecurityDescriptor,
  756. pAbsoluteSecurityDescriptor,
  757. ctypes.byref(lpdwAbsoluteSecurityDescriptorSize),
  758. pDacl,
  759. ctypes.byref(lpdwDaclSize),
  760. pSacl,
  761. ctypes.byref(lpdwSaclSize),
  762. pOwner,
  763. ctypes.byref(lpdwOwnerSize),
  764. pPrimaryGroup,
  765. ctypes.byref(lpdwPrimaryGroupSize),
  766. )
  767. except OSError as e:
  768. if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
  769. raise
  770. pAbsoluteSecurityDescriptor = (wintypes.BYTE * lpdwAbsoluteSecurityDescriptorSize.value)()
  771. pDaclData = (wintypes.BYTE * lpdwDaclSize.value)()
  772. pDacl = ctypes.cast(pDaclData, PACL).contents
  773. pSaclData = (wintypes.BYTE * lpdwSaclSize.value)()
  774. pSacl = ctypes.cast(pSaclData, PACL).contents
  775. pOwnerData = (wintypes.BYTE * lpdwOwnerSize.value)()
  776. pOwner = ctypes.cast(pOwnerData, PSID)
  777. pPrimaryGroupData = (wintypes.BYTE * lpdwPrimaryGroupSize.value)()
  778. pPrimaryGroup = ctypes.cast(pPrimaryGroupData, PSID)
  779. advapi32.MakeAbsoluteSD(
  780. pSelfRelativeSecurityDescriptor,
  781. pAbsoluteSecurityDescriptor,
  782. ctypes.byref(lpdwAbsoluteSecurityDescriptorSize),
  783. pDacl,
  784. ctypes.byref(lpdwDaclSize),
  785. pSacl,
  786. ctypes.byref(lpdwSaclSize),
  787. pOwner,
  788. lpdwOwnerSize,
  789. pPrimaryGroup,
  790. ctypes.byref(lpdwPrimaryGroupSize),
  791. )
  792. return pAbsoluteSecurityDescriptor
  793. def MakeSelfRelativeSD(pAbsoluteSecurityDescriptor: Any) -> Any:
  794. # return a security descriptor in self-relative format
  795. # by using a security descriptor in absolute format as a template
  796. pSelfRelativeSecurityDescriptor = None
  797. lpdwBufferLength = wintypes.DWORD(0)
  798. try:
  799. advapi32.MakeSelfRelativeSD(
  800. pAbsoluteSecurityDescriptor,
  801. pSelfRelativeSecurityDescriptor,
  802. ctypes.byref(lpdwBufferLength),
  803. )
  804. except OSError as e:
  805. if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
  806. raise
  807. pSelfRelativeSecurityDescriptor = (wintypes.BYTE * lpdwBufferLength.value)()
  808. advapi32.MakeSelfRelativeSD(
  809. pAbsoluteSecurityDescriptor,
  810. pSelfRelativeSecurityDescriptor,
  811. ctypes.byref(lpdwBufferLength),
  812. )
  813. return pSelfRelativeSecurityDescriptor
  814. def NewAcl() -> Any:
  815. # return a new, initialized ACL (access control list) structure
  816. nAclLength = 32767 # TODO: calculate this: ctypes.sizeof(ACL) + ?
  817. acl_data = ctypes.create_string_buffer(nAclLength)
  818. pAcl = ctypes.cast(acl_data, PACL).contents
  819. advapi32.InitializeAcl(pAcl, nAclLength, ACL_REVISION)
  820. return pAcl
  821. SidAdmins = CreateWellKnownSid(WinBuiltinAdministratorsSid)
  822. SidUser = LookupAccountName("", GetUserNameEx(NameSamCompatible))[0]
  823. Acl = NewAcl()
  824. AddAccessAllowedAce(Acl, ACL_REVISION, FILE_ALL_ACCESS, SidAdmins)
  825. AddAccessAllowedAce(
  826. Acl,
  827. ACL_REVISION,
  828. FILE_GENERIC_READ | FILE_GENERIC_WRITE | DELETE,
  829. SidUser,
  830. )
  831. SelfRelativeSD = GetFileSecurity(fname, DACL_SECURITY_INFORMATION)
  832. AbsoluteSD = MakeAbsoluteSD(SelfRelativeSD)
  833. SetSecurityDescriptorDacl(AbsoluteSD, 1, Acl, 0)
  834. SelfRelativeSD = MakeSelfRelativeSD(AbsoluteSD)
  835. SetFileSecurity(fname, DACL_SECURITY_INFORMATION, SelfRelativeSD)
  836. def get_file_mode(fname: str) -> int:
  837. """Retrieves the file mode corresponding to fname in a filesystem-tolerant manner.
  838. Parameters
  839. ----------
  840. fname : unicode
  841. The path to the file to get mode from
  842. """
  843. # Some filesystems (e.g., CIFS) auto-enable the execute bit on files. As a result, we
  844. # should tolerate the execute bit on the file's owner when validating permissions - thus
  845. # the missing least significant bit on the third octal digit. In addition, we also tolerate
  846. # the sticky bit being set, so the lsb from the fourth octal digit is also removed.
  847. return (
  848. stat.S_IMODE(Path(fname).stat().st_mode) & 0o6677
  849. ) # Use 4 octal digits since S_IMODE does the same
  850. allow_insecure_writes = os.getenv("JUPYTER_ALLOW_INSECURE_WRITES", "false").lower() in ("true", "1")
  851. @contextmanager
  852. def secure_write(fname: str, binary: bool = False) -> Iterator[Any]:
  853. """Opens a file in the most restricted pattern available for
  854. writing content. This limits the file mode to `0o0600` and yields
  855. the resulting opened filed handle.
  856. Parameters
  857. ----------
  858. fname : unicode
  859. The path to the file to write
  860. binary: boolean
  861. Indicates that the file is binary
  862. """
  863. mode = "wb" if binary else "w"
  864. encoding = None if binary else "utf-8"
  865. open_flag = os.O_CREAT | os.O_WRONLY | os.O_TRUNC
  866. try:
  867. Path(fname).unlink()
  868. except OSError:
  869. # Skip any issues with the file not existing
  870. pass
  871. if os.name == "nt":
  872. if allow_insecure_writes:
  873. # Mounted file systems can have a number of failure modes inside this block.
  874. # For windows machines in insecure mode we simply skip this to avoid failures :/
  875. issue_insecure_write_warning()
  876. else:
  877. # Python on windows does not respect the group and public bits for chmod, so we need
  878. # to take additional steps to secure the contents.
  879. # Touch file preemptively to avoid editing permissions in open files in Windows
  880. fd = os.open(fname, open_flag, 0o0600)
  881. os.close(fd)
  882. open_flag = os.O_WRONLY | os.O_TRUNC
  883. win32_restrict_file_to_user(fname)
  884. with os.fdopen(os.open(fname, open_flag, 0o0600), mode, encoding=encoding) as f:
  885. if os.name != "nt":
  886. # Enforce that the file got the requested permissions before writing
  887. file_mode = get_file_mode(fname)
  888. if file_mode != 0o0600:
  889. if allow_insecure_writes:
  890. issue_insecure_write_warning()
  891. else:
  892. msg = (
  893. f"Permissions assignment failed for secure file: '{fname}'."
  894. f" Got '{oct(file_mode)}' instead of '0o0600'."
  895. )
  896. raise RuntimeError(msg)
  897. yield f
  898. def issue_insecure_write_warning() -> None:
  899. """Issue an insecure write warning."""
  900. def format_warning(msg: str, *args: Any, **kwargs: Any) -> str: # noqa: ARG001
  901. return str(msg) + "\n"
  902. warnings.formatwarning = format_warning # type:ignore[assignment]
  903. warnings.warn(
  904. "WARNING: Insecure writes have been enabled via environment variable "
  905. "'JUPYTER_ALLOW_INSECURE_WRITES'! If this is not intended, remove the "
  906. "variable or set its value to 'False'.",
  907. stacklevel=2,
  908. )