jsonutil.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. """Utilities to manipulate JSON objects."""
  2. # Copyright (c) Jupyter Development Team.
  3. # Distributed under the terms of the Modified BSD License.
  4. import math
  5. import numbers
  6. import re
  7. import types
  8. import warnings
  9. from binascii import b2a_base64
  10. from collections.abc import Iterable
  11. from datetime import date, datetime
  12. from typing import Any, Union
  13. from dateutil.parser import isoparse as _dateutil_parse
  14. from dateutil.tz import tzlocal
  15. next_attr_name = "__next__" # Not sure what downstream library uses this, but left it to be safe
  16. # -----------------------------------------------------------------------------
  17. # Globals and constants
  18. # -----------------------------------------------------------------------------
  19. # timestamp formats
  20. ISO8601 = "%Y-%m-%dT%H:%M:%S.%f"
  21. ISO8601_PAT = re.compile(
  22. r"^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})(\.\d{1,6})?(Z|([\+\-]\d{2}:?\d{2}))?$"
  23. )
  24. # holy crap, strptime is not threadsafe.
  25. # Calling it once at import seems to help.
  26. datetime.strptime("2000-01-01", "%Y-%m-%d") # noqa
  27. # -----------------------------------------------------------------------------
  28. # Classes and functions
  29. # -----------------------------------------------------------------------------
  30. def _ensure_tzinfo(dt: datetime) -> datetime:
  31. """Ensure a datetime object has tzinfo
  32. If no tzinfo is present, add tzlocal
  33. """
  34. if not dt.tzinfo:
  35. # No more naïve datetime objects!
  36. warnings.warn(
  37. "Interpreting naive datetime as local %s. Please add timezone info to timestamps." % dt,
  38. DeprecationWarning,
  39. stacklevel=4,
  40. )
  41. dt = dt.replace(tzinfo=tzlocal())
  42. return dt
  43. def parse_date(s: str | None) -> Union[str, datetime] | None:
  44. """parse an ISO8601 date string
  45. If it is None or not a valid ISO8601 timestamp,
  46. it will be returned unmodified.
  47. Otherwise, it will return a datetime object.
  48. """
  49. if s is None:
  50. return s
  51. m = ISO8601_PAT.match(s)
  52. if m:
  53. dt = _dateutil_parse(s)
  54. return _ensure_tzinfo(dt)
  55. return s
  56. def extract_dates(obj: Any) -> Any:
  57. """extract ISO8601 dates from unpacked JSON"""
  58. if isinstance(obj, dict):
  59. new_obj = {} # don't clobber
  60. for k, v in obj.items():
  61. new_obj[k] = extract_dates(v)
  62. obj = new_obj
  63. elif isinstance(obj, list | tuple):
  64. obj = [extract_dates(o) for o in obj]
  65. elif isinstance(obj, str):
  66. obj = parse_date(obj)
  67. return obj
  68. def squash_dates(obj: Any) -> Any:
  69. """squash datetime objects into ISO8601 strings"""
  70. if isinstance(obj, dict):
  71. obj = dict(obj) # don't clobber
  72. for k, v in obj.items():
  73. obj[k] = squash_dates(v)
  74. elif isinstance(obj, list | tuple):
  75. obj = [squash_dates(o) for o in obj]
  76. elif isinstance(obj, datetime):
  77. obj = obj.isoformat()
  78. return obj
  79. def date_default(obj: Any) -> Any:
  80. """DEPRECATED: Use jupyter_client.jsonutil.json_default"""
  81. warnings.warn(
  82. "date_default is deprecated since jupyter_client 7.0.0."
  83. " Use jupyter_client.jsonutil.json_default.",
  84. stacklevel=2,
  85. )
  86. return json_default(obj)
  87. def json_default(obj: Any) -> Any:
  88. """default function for packing objects in JSON."""
  89. if isinstance(obj, datetime):
  90. obj = _ensure_tzinfo(obj)
  91. return obj.isoformat().replace("+00:00", "Z")
  92. if isinstance(obj, date):
  93. return obj.isoformat()
  94. if isinstance(obj, bytes):
  95. return b2a_base64(obj, newline=False).decode("ascii")
  96. if isinstance(obj, Iterable):
  97. return list(obj)
  98. if isinstance(obj, numbers.Integral):
  99. return int(obj)
  100. if isinstance(obj, numbers.Real):
  101. return float(obj)
  102. raise TypeError("%r is not JSON serializable" % obj)
  103. # Copy of the old ipykernel's json_clean
  104. # This is temporary, it should be removed when we deprecate support for
  105. # non-valid JSON messages
  106. def json_clean(obj: Any) -> Any:
  107. # types that are 'atomic' and ok in json as-is.
  108. atomic_ok = (str, type(None))
  109. # containers that we need to convert into lists
  110. container_to_list = (tuple, set, types.GeneratorType)
  111. # Since bools are a subtype of Integrals, which are a subtype of Reals,
  112. # we have to check them in that order.
  113. if isinstance(obj, bool):
  114. return obj
  115. if isinstance(obj, numbers.Integral):
  116. # cast int to int, in case subclasses override __str__ (e.g. boost enum, #4598)
  117. return int(obj)
  118. if isinstance(obj, numbers.Real):
  119. # cast out-of-range floats to their reprs
  120. if math.isnan(obj) or math.isinf(obj):
  121. return repr(obj)
  122. return float(obj)
  123. if isinstance(obj, atomic_ok):
  124. return obj
  125. if isinstance(obj, bytes):
  126. # unanmbiguous binary data is base64-encoded
  127. # (this probably should have happened upstream)
  128. return b2a_base64(obj, newline=False).decode("ascii")
  129. if isinstance(obj, container_to_list) or (
  130. hasattr(obj, "__iter__") and hasattr(obj, next_attr_name)
  131. ):
  132. obj = list(obj)
  133. if isinstance(obj, list):
  134. return [json_clean(x) for x in obj]
  135. if isinstance(obj, dict):
  136. # First, validate that the dict won't lose data in conversion due to
  137. # key collisions after stringification. This can happen with keys like
  138. # True and 'true' or 1 and '1', which collide in JSON.
  139. nkeys = len(obj)
  140. nkeys_collapsed = len(set(map(str, obj)))
  141. if nkeys != nkeys_collapsed:
  142. msg = (
  143. "dict cannot be safely converted to JSON: "
  144. "key collision would lead to dropped values"
  145. )
  146. raise ValueError(msg)
  147. # If all OK, proceed by making the new dict that will be json-safe
  148. out = {}
  149. for k, v in obj.items():
  150. out[str(k)] = json_clean(v)
  151. return out
  152. if isinstance(obj, datetime | date):
  153. return obj.strftime(ISO8601)
  154. # we don't understand it, it's probably an unserializable object
  155. raise ValueError("Can't clean for JSON: %r" % obj)