io.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. """io-related utilities"""
  2. # Copyright (c) Jupyter Development Team.
  3. # Distributed under the terms of the Modified BSD License.
  4. import codecs
  5. import errno
  6. import os
  7. import random
  8. import shutil
  9. import sys
  10. from typing import Any, Optional
  11. def unicode_std_stream(stream="stdout"):
  12. """Get a wrapper to write unicode to stdout/stderr as UTF-8.
  13. This ignores environment variables and default encodings, to reliably write
  14. unicode to stdout or stderr.
  15. ::
  16. unicode_std_stream().write(u'ł@e¶ŧ←')
  17. """
  18. assert stream in ("stdout", "stderr")
  19. stream = getattr(sys, stream)
  20. try:
  21. stream_b = stream.buffer
  22. except AttributeError:
  23. # sys.stdout has been replaced - use it directly
  24. return stream
  25. return codecs.getwriter("utf-8")(stream_b)
  26. def unicode_stdin_stream():
  27. """Get a wrapper to read unicode from stdin as UTF-8.
  28. This ignores environment variables and default encodings, to reliably read unicode from stdin.
  29. ::
  30. totreat = unicode_stdin_stream().read()
  31. """
  32. stream = sys.stdin
  33. try:
  34. stream_b = stream.buffer
  35. except AttributeError:
  36. return stream
  37. return codecs.getreader("utf-8")(stream_b)
  38. class FormatSafeDict(dict[Any, Any]):
  39. """Format a dictionary safely."""
  40. def __missing__(self, key):
  41. """Handle missing value."""
  42. return "{" + key + "}"
  43. try:
  44. ENOLINK = errno.ENOLINK
  45. except AttributeError:
  46. ENOLINK = 1998
  47. def link(src, dst):
  48. """Hard links ``src`` to ``dst``, returning 0 or errno.
  49. Note that the special errno ``ENOLINK`` will be returned if ``os.link`` isn't
  50. supported by the operating system.
  51. """
  52. if not hasattr(os, "link"):
  53. return ENOLINK
  54. link_errno: Optional[int] = 0
  55. try:
  56. os.link(src, dst)
  57. except OSError as e:
  58. link_errno = e.errno
  59. return link_errno
  60. def link_or_copy(src, dst):
  61. """Attempts to hardlink ``src`` to ``dst``, copying if the link fails.
  62. Attempts to maintain the semantics of ``shutil.copy``.
  63. Because ``os.link`` does not overwrite files, a unique temporary file
  64. will be used if the target already exists, then that file will be moved
  65. into place.
  66. """
  67. if os.path.isdir(dst):
  68. dst = os.path.join(dst, os.path.basename(src))
  69. link_errno = link(src, dst)
  70. if link_errno == errno.EEXIST:
  71. if os.stat(src).st_ino == os.stat(dst).st_ino:
  72. # dst is already a hard link to the correct file, so we don't need
  73. # to do anything else. If we try to link and rename the file
  74. # anyway, we get duplicate files - see http://bugs.python.org/issue21876
  75. return
  76. new_dst = dst + f"-temp-{random.randint(1, 16**4):04X}" # noqa: S311
  77. try:
  78. link_or_copy(src, new_dst)
  79. except BaseException:
  80. try:
  81. os.remove(new_dst)
  82. except OSError:
  83. pass
  84. raise
  85. os.rename(new_dst, dst)
  86. elif link_errno != 0:
  87. # Either link isn't supported, or the filesystem doesn't support
  88. # linking, or 'src' and 'dst' are on different filesystems.
  89. shutil.copy(src, dst)