file_stream.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671
  1. from __future__ import annotations
  2. import functools
  3. import itertools
  4. import json
  5. import logging
  6. import os
  7. import queue
  8. import random
  9. import sys
  10. import threading
  11. import time
  12. from types import TracebackType
  13. from typing import TYPE_CHECKING, Any, Callable, NamedTuple
  14. if TYPE_CHECKING:
  15. from typing import TypedDict
  16. class ProcessedChunk(TypedDict):
  17. offset: int
  18. content: list[str]
  19. class ProcessedBinaryChunk(TypedDict):
  20. offset: int
  21. content: str
  22. encoding: str
  23. import requests
  24. import wandb
  25. from wandb import util
  26. from wandb.analytics import get_sentry
  27. from wandb.sdk.internal import internal_api
  28. from ..lib import file_stream_utils
  29. logger = logging.getLogger(__name__)
  30. class Chunk(NamedTuple):
  31. filename: str
  32. data: str
  33. class DefaultFilePolicy:
  34. def __init__(self, start_chunk_id: int = 0) -> None:
  35. self._chunk_id = start_chunk_id
  36. self.has_debug_log = False
  37. def process_chunks(
  38. self, chunks: list[Chunk]
  39. ) -> bool | ProcessedChunk | ProcessedBinaryChunk | list[ProcessedChunk]:
  40. chunk_id = self._chunk_id
  41. self._chunk_id += len(chunks)
  42. return {"offset": chunk_id, "content": [c.data for c in chunks]}
  43. # TODO: this is very inefficient, this is meant for temporary debugging and will be removed in future releases
  44. def _debug_log(self, data: Any):
  45. if self.has_debug_log or not os.environ.get("WANDB_DEBUG_FILESTREAM_LOG"):
  46. return
  47. loaded = json.loads(data)
  48. if not isinstance(loaded, dict):
  49. return
  50. # get key size and convert to MB
  51. key_sizes = [(k, len(json.dumps(v))) for k, v in loaded.items()]
  52. key_msg = [f"{k}: {v / 1048576:.5f} MB" for k, v in key_sizes]
  53. wandb.termerror(f"Step: {loaded['_step']} | {key_msg}", repeat=False)
  54. self.has_debug_log = True
  55. class JsonlFilePolicy(DefaultFilePolicy):
  56. def process_chunks(self, chunks: list[Chunk]) -> ProcessedChunk:
  57. chunk_id = self._chunk_id
  58. # TODO: chunk_id is getting reset on each request...
  59. self._chunk_id += len(chunks)
  60. chunk_data = []
  61. for chunk in chunks:
  62. if len(chunk.data) > util.MAX_LINE_BYTES:
  63. msg = f"Metric data exceeds maximum size of {util.to_human_size(util.MAX_LINE_BYTES)} ({util.to_human_size(len(chunk.data))})"
  64. wandb.termerror(msg, repeat=False)
  65. get_sentry().message(msg, repeat=False)
  66. self._debug_log(chunk.data)
  67. else:
  68. chunk_data.append(chunk.data)
  69. return {
  70. "offset": chunk_id,
  71. "content": chunk_data,
  72. }
  73. class SummaryFilePolicy(DefaultFilePolicy):
  74. def process_chunks(self, chunks: list[Chunk]) -> bool | ProcessedChunk:
  75. data = chunks[-1].data
  76. if len(data) > util.MAX_LINE_BYTES:
  77. msg = f"Summary data exceeds maximum size of {util.to_human_size(util.MAX_LINE_BYTES)}. Dropping it."
  78. wandb.termerror(msg, repeat=False)
  79. get_sentry().message(msg, repeat=False)
  80. self._debug_log(data)
  81. return False
  82. return {"offset": 0, "content": [data]}
  83. class StreamCRState:
  84. r"""Stream state that tracks carriage returns.
  85. There are two streams: stdout and stderr. We create two instances for each stream.
  86. An instance holds state about:
  87. found_cr: if a carriage return has been found in this stream.
  88. cr: most recent offset (line number) where we found \r.
  89. We update this offset with every progress bar update.
  90. last_normal: most recent offset without a \r in this stream.
  91. i.e. the most recent "normal" line.
  92. """
  93. found_cr: bool
  94. cr: int | None
  95. last_normal: int | None
  96. def __init__(self) -> None:
  97. self.found_cr = False
  98. self.cr = None
  99. self.last_normal = None
  100. class CRDedupeFilePolicy(DefaultFilePolicy):
  101. r"""File stream policy for removing carriage-return erased characters.
  102. This is what a terminal does. We use it for console output to reduce the amount of
  103. data we need to send over the network (eg. for progress bars), while preserving the
  104. output's appearance in the web app.
  105. CR stands for "carriage return", for the character \r. It tells the terminal to move
  106. the cursor back to the start of the current line. Progress bars (like tqdm) use \r
  107. repeatedly to overwrite a line with newer updates. This gives the illusion of the
  108. progress bar filling up in real-time.
  109. """
  110. def __init__(self, start_chunk_id: int = 0) -> None:
  111. super().__init__(start_chunk_id=start_chunk_id)
  112. self._prev_chunk = None
  113. self.global_offset = 0
  114. # cr refers to carriage return \r
  115. self.stderr = StreamCRState()
  116. self.stdout = StreamCRState()
  117. @staticmethod
  118. def get_consecutive_offsets(console: dict[int, str]) -> list[list[int]]:
  119. """Compress consecutive line numbers into an interval.
  120. Args:
  121. console: Dict[int, str] which maps offsets (line numbers) to lines of text.
  122. It represents a mini version of our console dashboard on the UI.
  123. Returns:
  124. A list of intervals (we compress consecutive line numbers into an interval).
  125. Example:
  126. >>> console = {2: "", 3: "", 4: "", 5: "", 10: "", 11: "", 20: ""}
  127. >>> get_consecutive_offsets(console)
  128. [(2, 5), (10, 11), (20, 20)]
  129. """
  130. offsets = sorted(list(console.keys()))
  131. intervals: list = []
  132. for i, num in enumerate(offsets):
  133. if i == 0:
  134. intervals.append([num, num])
  135. continue
  136. largest = intervals[-1][1]
  137. if num == largest + 1:
  138. intervals[-1][1] = num
  139. else:
  140. intervals.append([num, num])
  141. return intervals
  142. @staticmethod
  143. def split_chunk(chunk: Chunk) -> tuple[str, str]:
  144. r"""Split chunks.
  145. Args:
  146. chunk: object with two fields: filename (str) & data (str)
  147. `chunk.data` is a str containing the lines we want. It usually contains \n or \r or both.
  148. `chunk.data` has two possible formats (for the two streams - stdout and stderr):
  149. - "2020-08-25T20:38:36.895321 this is my line of text\nsecond line\n"
  150. - "ERROR 2020-08-25T20:38:36.895321 this is my line of text\nsecond line\nthird\n".
  151. Here's another example with a carriage return \r.
  152. - "ERROR 2020-08-25T20:38:36.895321 \r progress bar\n"
  153. Returns:
  154. A 2-tuple of strings.
  155. First str is prefix, either "ERROR {timestamp} " or "{timestamp} ".
  156. Second str is the rest of the string.
  157. Example:
  158. >>> chunk = Chunk(
  159. ... filename="output.log",
  160. ... data="ERROR 2020-08-25T20:38 this is my line of text\n",
  161. ... )
  162. >>> split_chunk(chunk)
  163. ("ERROR 2020-08-25T20:38 ", "this is my line of text\n")
  164. """
  165. prefix = ""
  166. token, rest = chunk.data.split(" ", 1)
  167. if token == "ERROR":
  168. prefix += token + " "
  169. token, rest = rest.split(" ", 1)
  170. prefix += token + " "
  171. return prefix, rest
  172. def process_chunks(self, chunks: list[Chunk]) -> list[ProcessedChunk]:
  173. r"""Process chunks.
  174. Args:
  175. chunks: List of Chunk objects. See description of chunk above in `split_chunk(...)`.
  176. Returns:
  177. List[Dict]. Each dict in the list contains two keys: an `offset` which holds the line number
  178. and `content` which maps to a list of consecutive lines starting from that offset.
  179. `offset` here means global line number in our console on the UI.
  180. Example:
  181. >>> chunks = [
  182. Chunk("output.log", "ERROR 2020-08-25T20:38 this is my line of text\nboom\n"),
  183. Chunk("output.log", "2020-08-25T20:38 this is test\n"),
  184. ]
  185. >>> process_chunks(chunks)
  186. [
  187. {"offset": 0, "content": [
  188. "ERROR 2020-08-25T20:38 this is my line of text\n",
  189. "ERROR 2020-08-25T20:38 boom\n",
  190. "2020-08-25T20:38 this is test\n"
  191. ]
  192. }
  193. ]
  194. """
  195. # Dict[int->str], each offset (line number) mapped to a line.
  196. # Represents a mini-version of our console pane on the UI.
  197. console = {}
  198. sep = os.linesep
  199. for c in chunks:
  200. prefix, logs_str = self.split_chunk(c)
  201. logs = logs_str.split(sep)
  202. for line in logs:
  203. stream = self.stderr if prefix.startswith("ERROR ") else self.stdout
  204. if line.startswith("\r"):
  205. # line starting with \r will always overwrite a previous offset.
  206. offset: int = (
  207. stream.cr
  208. if (stream.found_cr and stream.cr is not None)
  209. else (stream.last_normal or 0)
  210. )
  211. stream.cr = offset
  212. stream.found_cr = True
  213. console[offset] = prefix + line[1:] + "\n"
  214. # Usually logs_str = "\r progress bar\n" for progress bar updates.
  215. # If instead logs_str = "\r progress bar\n text\n text\n",
  216. # treat this as the end of a progress bar and reset accordingly.
  217. if (
  218. logs_str.count(sep) > 1
  219. and logs_str.replace(sep, "").count("\r") == 1
  220. ):
  221. stream.found_cr = False
  222. elif line:
  223. console[self.global_offset] = prefix + line + "\n"
  224. stream.last_normal = self.global_offset
  225. self.global_offset += 1
  226. intervals = self.get_consecutive_offsets(console)
  227. ret = []
  228. for a, b in intervals:
  229. processed_chunk: ProcessedChunk = {
  230. "offset": self._chunk_id + a,
  231. "content": [console[i] for i in range(a, b + 1)],
  232. }
  233. ret.append(processed_chunk)
  234. return ret
  235. class FileStreamApi:
  236. """Pushes chunks of files to our streaming endpoint.
  237. This class is used as a singleton. It has a thread that serializes access to
  238. the streaming endpoint and performs rate-limiting and batching.
  239. TODO: Differentiate between binary/text encoding.
  240. """
  241. class Finish(NamedTuple):
  242. exitcode: int
  243. class Preempting(NamedTuple):
  244. pass
  245. class PushSuccess(NamedTuple):
  246. artifact_id: str
  247. save_name: str
  248. MAX_ITEMS_PER_PUSH = 10000
  249. def __init__(
  250. self,
  251. api: internal_api.Api,
  252. run_id: str,
  253. start_time: float,
  254. timeout: float = 0,
  255. settings: dict | None = None,
  256. ) -> None:
  257. settings = settings or dict()
  258. # NOTE: exc_info is set in thread_except_body context and readable by calling threads
  259. self._exc_info: (
  260. tuple[type[BaseException], BaseException, TracebackType]
  261. | tuple[None, None, None]
  262. | None
  263. ) = None
  264. self._settings = settings
  265. self._api = api
  266. self._run_id = run_id
  267. self._start_time = start_time
  268. self._client = requests.Session()
  269. timeout = timeout or 0
  270. if timeout > 0:
  271. self._client.post = functools.partial(self._client.post, timeout=timeout) # type: ignore[method-assign]
  272. self._client.auth = api.client.transport.session.auth
  273. self._client.headers.update(api.client.transport.headers or {})
  274. self._client.cookies.update(api.client.transport.cookies or {}) # type: ignore[no-untyped-call]
  275. self._client.proxies.update(api.client.transport.session.proxies or {})
  276. self._file_policies: dict[str, DefaultFilePolicy] = {}
  277. self._dropped_chunks: int = 0
  278. self._queue: queue.Queue = queue.Queue()
  279. self._thread = threading.Thread(target=self._thread_except_body)
  280. # It seems we need to make this a daemon thread to get sync.py's atexit handler to run, which
  281. # cleans this thread up.
  282. self._thread.name = "FileStreamThread"
  283. self._thread.daemon = True
  284. self._init_endpoint()
  285. def _init_endpoint(self) -> None:
  286. settings = self._api.settings()
  287. settings.update(self._settings)
  288. self._endpoint = "{base}/files/{entity}/{project}/{run}/file_stream".format(
  289. base=settings["base_url"],
  290. entity=settings["entity"],
  291. project=settings["project"],
  292. run=self._run_id,
  293. )
  294. def start(self) -> None:
  295. self._init_endpoint()
  296. self._thread.start()
  297. def set_default_file_policy(
  298. self, filename: str, file_policy: DefaultFilePolicy
  299. ) -> None:
  300. """Set an upload policy for a file unless one has already been set."""
  301. if filename not in self._file_policies:
  302. self._file_policies[filename] = file_policy
  303. def set_file_policy(self, filename: str, file_policy: DefaultFilePolicy) -> None:
  304. self._file_policies[filename] = file_policy
  305. @property
  306. def heartbeat_seconds(self) -> int | float:
  307. # Defaults to 30
  308. heartbeat_seconds: int | float = self._api.dynamic_settings["heartbeat_seconds"]
  309. return heartbeat_seconds
  310. def rate_limit_seconds(self) -> int | float:
  311. run_time = time.time() - self._start_time
  312. if run_time < 60:
  313. return max(1.0, self.heartbeat_seconds / 15)
  314. elif run_time < 300:
  315. return max(2.5, self.heartbeat_seconds / 3)
  316. else:
  317. return max(5.0, self.heartbeat_seconds)
  318. def _read_queue(self) -> list:
  319. # called from the push thread (_thread_body), this does an initial read
  320. # that'll block for up to rate_limit_seconds. Then it tries to read
  321. # as much out of the queue as it can. We do this because the http post
  322. # to the server happens within _thread_body, and can take longer than
  323. # our rate limit. So next time we get a chance to read the queue we want
  324. # read all the stuff that queue'd up since last time.
  325. #
  326. # If we have more than MAX_ITEMS_PER_PUSH in the queue then the push thread
  327. # will get behind and data will buffer up in the queue.
  328. return util.read_many_from_queue(
  329. self._queue, self.MAX_ITEMS_PER_PUSH, self.rate_limit_seconds()
  330. )
  331. def _thread_body(self) -> None:
  332. posted_data_time = time.time()
  333. posted_anything_time = time.time()
  334. ready_chunks = []
  335. uploaded: set[str] = set()
  336. finished: FileStreamApi.Finish | None = None
  337. while finished is None:
  338. items = self._read_queue()
  339. for item in items:
  340. if isinstance(item, self.Finish):
  341. finished = item
  342. elif isinstance(item, self.Preempting):
  343. request_with_retry(
  344. self._client.post,
  345. self._endpoint,
  346. json={
  347. "complete": False,
  348. "preempting": True,
  349. "dropped": self._dropped_chunks,
  350. "uploaded": list(uploaded),
  351. },
  352. )
  353. uploaded = set()
  354. elif isinstance(item, self.PushSuccess):
  355. uploaded.add(item.save_name)
  356. else:
  357. # item is Chunk
  358. ready_chunks.append(item)
  359. cur_time = time.time()
  360. if ready_chunks and (
  361. finished or cur_time - posted_data_time > self.rate_limit_seconds()
  362. ):
  363. posted_data_time = cur_time
  364. posted_anything_time = cur_time
  365. success = self._send(ready_chunks, uploaded=uploaded)
  366. ready_chunks = []
  367. if success:
  368. uploaded = set()
  369. # If there aren't ready chunks or uploaded files, we still want to
  370. # send regular heartbeats so the backend doesn't erroneously mark this
  371. # run as crashed.
  372. if cur_time - posted_anything_time > self.heartbeat_seconds:
  373. posted_anything_time = cur_time
  374. # If we encountered an error trying to publish the
  375. # list of uploaded files, don't reset the `uploaded`
  376. # list. Retry publishing the list on the next attempt.
  377. if not isinstance(
  378. request_with_retry(
  379. self._client.post,
  380. self._endpoint,
  381. json={
  382. "complete": False,
  383. "failed": False,
  384. "dropped": self._dropped_chunks,
  385. "uploaded": list(uploaded),
  386. },
  387. ),
  388. Exception,
  389. ):
  390. uploaded = set()
  391. # post the final close message. (item is self.Finish instance now)
  392. request_with_retry(
  393. self._client.post,
  394. self._endpoint,
  395. json={
  396. "complete": True,
  397. "exitcode": int(finished.exitcode),
  398. "dropped": self._dropped_chunks,
  399. "uploaded": list(uploaded),
  400. },
  401. )
  402. def _thread_except_body(self) -> None:
  403. # TODO: Consolidate with internal_util.ExceptionThread
  404. try:
  405. self._thread_body()
  406. except Exception:
  407. exc_info = sys.exc_info()
  408. self._exc_info = exc_info
  409. logger.exception("generic exception in filestream thread")
  410. get_sentry().exception(exc_info)
  411. raise
  412. def _handle_response(self, response: Exception | requests.Response) -> None:
  413. """Log dropped chunks and updates dynamic settings."""
  414. if isinstance(response, Exception):
  415. wandb.termerror(
  416. "Dropped streaming file chunk (see wandb/debug-internal.log)"
  417. )
  418. logger.exception(f"dropped chunk {response}")
  419. self._dropped_chunks += 1
  420. else:
  421. parsed: dict | None = None
  422. try:
  423. parsed = response.json()
  424. except Exception:
  425. pass
  426. if isinstance(parsed, dict):
  427. limits = parsed.get("limits")
  428. if isinstance(limits, dict):
  429. self._api.dynamic_settings.update(limits)
  430. def _send(self, chunks: list[Chunk], uploaded: set[str] | None = None) -> bool:
  431. uploaded_list = list(uploaded or [])
  432. # create files dict. dict of <filename: chunks> pairs where chunks are a list of
  433. # [chunk_id, chunk_data] tuples (as lists since this will be json).
  434. files = {}
  435. # Groupby needs group keys to be consecutive, so sort first.
  436. chunks.sort(key=lambda c: c.filename)
  437. for filename, file_chunks in itertools.groupby(chunks, lambda c: c.filename):
  438. file_chunks_list = list(file_chunks) # groupby returns iterator
  439. # Specific file policies are set by internal/sender.py
  440. self.set_default_file_policy(filename, DefaultFilePolicy())
  441. files[filename] = self._file_policies[filename].process_chunks(
  442. file_chunks_list
  443. )
  444. if not files[filename]:
  445. del files[filename]
  446. for fs in file_stream_utils.split_files(files, max_bytes=util.MAX_LINE_BYTES):
  447. self._handle_response(
  448. request_with_retry(
  449. self._client.post,
  450. self._endpoint,
  451. json={"files": fs, "dropped": self._dropped_chunks},
  452. retry_callback=self._api.retry_callback,
  453. )
  454. )
  455. return not (
  456. uploaded_list
  457. and isinstance(
  458. request_with_retry(
  459. self._client.post,
  460. self._endpoint,
  461. json={
  462. "complete": False,
  463. "failed": False,
  464. "dropped": self._dropped_chunks,
  465. "uploaded": uploaded_list,
  466. },
  467. ),
  468. Exception,
  469. )
  470. )
  471. def stream_file(self, path: str) -> None:
  472. name = path.split("/")[-1]
  473. with open(path) as f:
  474. self._send([Chunk(name, line) for line in f])
  475. def enqueue_preempting(self) -> None:
  476. self._queue.put(self.Preempting())
  477. def push(self, filename: str, data: str) -> None:
  478. """Push a chunk of a file to the streaming endpoint.
  479. Args:
  480. filename: Name of file to append to.
  481. data: Text to append to the file.
  482. """
  483. self._queue.put(Chunk(filename, data))
  484. def push_success(self, artifact_id: str, save_name: str) -> None:
  485. """Notification that a file upload has been successfully completed.
  486. Args:
  487. artifact_id: ID of artifact
  488. save_name: saved name of the uploaded file
  489. """
  490. self._queue.put(self.PushSuccess(artifact_id, save_name))
  491. def finish(self, exitcode: int) -> None:
  492. """Clean up.
  493. Anything pushed after finish will be dropped.
  494. Args:
  495. exitcode: The exitcode of the watched process.
  496. """
  497. logger.info("file stream finish called")
  498. self._queue.put(self.Finish(exitcode))
  499. # TODO(jhr): join on a thread which exited with an exception is a noop, clean up this path
  500. self._thread.join()
  501. logger.info("file stream finish is done")
  502. if self._exc_info:
  503. logger.error("FileStream exception", exc_info=self._exc_info)
  504. # re-raising the original exception, will get re-caught in internal.py for the sender thread
  505. if self._exc_info[1] is not None:
  506. raise self._exc_info[1].with_traceback(self._exc_info[2])
  507. MAX_SLEEP_SECONDS = 60 * 5
  508. def request_with_retry(
  509. func: Callable,
  510. *args: Any,
  511. **kwargs: Any,
  512. ) -> requests.Response | requests.RequestException:
  513. """Perform a requests http call, retrying with exponential backoff.
  514. Args:
  515. func: An http-requesting function to call, like requests.post
  516. max_retries: Maximum retries before giving up.
  517. By default, we retry 30 times in ~2 hours before dropping the chunk
  518. *args: passed through to func
  519. **kwargs: passed through to func
  520. """
  521. max_retries: int = kwargs.pop("max_retries", 30)
  522. retry_callback: Callable | None = kwargs.pop("retry_callback", None)
  523. sleep = 2
  524. retry_count = 0
  525. while True:
  526. try:
  527. response: requests.Response = func(*args, **kwargs)
  528. response.raise_for_status()
  529. return response
  530. except (
  531. requests.exceptions.ConnectionError,
  532. requests.exceptions.HTTPError,
  533. requests.exceptions.Timeout,
  534. ) as e:
  535. if isinstance(e, requests.exceptions.HTTPError) and (
  536. e.response is not None
  537. and e.response.status_code in {400, 403, 404, 409}
  538. ):
  539. # Non-retriable HTTP errors.
  540. #
  541. # We retry 500s just to be cautious, and because the back end
  542. # returns them when there are infrastructure issues. If retrying
  543. # some request winds up being problematic, we'll change the
  544. # back end to indicate that it shouldn't be retried.
  545. return e
  546. if retry_count == max_retries:
  547. return e
  548. retry_count += 1
  549. delay = sleep + random.random() * 0.25 * sleep
  550. if isinstance(e, requests.exceptions.HTTPError) and (
  551. e.response is not None and e.response.status_code == 429
  552. ):
  553. err_str = (
  554. f"Filestream rate limit exceeded, retrying in {delay:.1f} seconds. "
  555. )
  556. if retry_callback:
  557. retry_callback(e.response.status_code, err_str)
  558. logger.info(err_str)
  559. else:
  560. logger.warning(
  561. "requests_with_retry encountered retryable exception: %s. func: %s, args: %s, kwargs: %s",
  562. e,
  563. func,
  564. args,
  565. kwargs,
  566. )
  567. time.sleep(delay)
  568. sleep *= 2
  569. if sleep > MAX_SLEEP_SECONDS:
  570. sleep = MAX_SLEEP_SECONDS
  571. except requests.exceptions.RequestException as e:
  572. error_message = "unknown error"
  573. try:
  574. error_message = response.json()["error"] # todo: clean this up
  575. except Exception:
  576. pass
  577. logger.exception(f"requests_with_retry error: {error_message}")
  578. return e