file_stream_utils.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. #
  2. from collections.abc import Iterable
  3. from typing import Any
  4. def split_files(
  5. files: dict[str, Any], max_bytes: int = 10 * 1024 * 1024
  6. ) -> Iterable[dict[str, dict]]:
  7. """Split a file's dict (see `files` arg) into smaller dicts.
  8. Each smaller dict will have at most `MAX_BYTES` size.
  9. This method is used in `FileStreamAPI._send()` to limit the size of post requests
  10. sent to wandb server.
  11. Args:
  12. files (dict): `dict` of form {file_name: {'content': ".....", 'offset': 0}}
  13. The key `file_name` can also be mapped to a List [{"offset": int, "content": str}]
  14. `max_bytes`: max size for chunk in bytes
  15. """
  16. current_volume: dict[str, dict] = {}
  17. current_size = 0
  18. def _str_size(x):
  19. return len(x) if isinstance(x, bytes) else len(x.encode("utf-8"))
  20. def _file_size(file):
  21. size = file.get("_size")
  22. if size is None:
  23. size = sum(map(_str_size, file["content"]))
  24. file["_size"] = size
  25. return size
  26. def _split_file(file, num_lines):
  27. offset = file["offset"]
  28. content = file["content"]
  29. name = file["name"]
  30. f1 = {"offset": offset, "content": content[:num_lines], "name": name}
  31. f2 = {
  32. "offset": offset + num_lines,
  33. "content": content[num_lines:],
  34. "name": name,
  35. }
  36. return f1, f2
  37. def _num_lines_from_num_bytes(file, num_bytes):
  38. size = 0
  39. num_lines = 0
  40. content = file["content"]
  41. while num_lines < len(content):
  42. size += _str_size(content[num_lines])
  43. if size > num_bytes:
  44. break
  45. num_lines += 1
  46. return num_lines
  47. files_stack = []
  48. for k, v in files.items():
  49. if isinstance(v, list):
  50. for item in v:
  51. files_stack.append(
  52. {"name": k, "offset": item["offset"], "content": item["content"]}
  53. )
  54. else:
  55. files_stack.append(
  56. {"name": k, "offset": v["offset"], "content": v["content"]}
  57. )
  58. while files_stack:
  59. f = files_stack.pop()
  60. if f["name"] in current_volume:
  61. files_stack.append(f)
  62. yield current_volume
  63. current_volume = {}
  64. current_size = 0
  65. continue
  66. # For each file, we have to do 1 of 4 things:
  67. # - Add the file as such to the current volume if possible.
  68. # - Split the file and add the first part to the current volume and push the second part back onto the stack.
  69. # - If that's not possible, check if current volume is empty:
  70. # - If empty, add first line of file to current volume and push rest onto stack (This volume will exceed MAX_MB).
  71. # - If not, push file back to stack and yield current volume.
  72. fsize = _file_size(f)
  73. rem = max_bytes - current_size
  74. if fsize <= rem:
  75. current_volume[f["name"]] = {
  76. "offset": f["offset"],
  77. "content": f["content"],
  78. }
  79. current_size += fsize
  80. else:
  81. num_lines = _num_lines_from_num_bytes(f, rem)
  82. if not num_lines and not current_volume:
  83. num_lines = 1
  84. if num_lines:
  85. f1, f2 = _split_file(f, num_lines)
  86. current_volume[f1["name"]] = {
  87. "offset": f1["offset"],
  88. "content": f1["content"],
  89. }
  90. files_stack.append(f2)
  91. yield current_volume
  92. current_volume = {}
  93. current_size = 0
  94. continue
  95. else:
  96. files_stack.append(f)
  97. yield current_volume
  98. current_volume = {}
  99. current_size = 0
  100. continue
  101. if current_size >= max_bytes:
  102. yield current_volume
  103. current_volume = {}
  104. current_size = 0
  105. continue
  106. if current_volume:
  107. yield current_volume