| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- #
- from collections.abc import Iterable
- from typing import Any
- def split_files(
- files: dict[str, Any], max_bytes: int = 10 * 1024 * 1024
- ) -> Iterable[dict[str, dict]]:
- """Split a file's dict (see `files` arg) into smaller dicts.
- Each smaller dict will have at most `MAX_BYTES` size.
- This method is used in `FileStreamAPI._send()` to limit the size of post requests
- sent to wandb server.
- Args:
- files (dict): `dict` of form {file_name: {'content': ".....", 'offset': 0}}
- The key `file_name` can also be mapped to a List [{"offset": int, "content": str}]
- `max_bytes`: max size for chunk in bytes
- """
- current_volume: dict[str, dict] = {}
- current_size = 0
- def _str_size(x):
- return len(x) if isinstance(x, bytes) else len(x.encode("utf-8"))
- def _file_size(file):
- size = file.get("_size")
- if size is None:
- size = sum(map(_str_size, file["content"]))
- file["_size"] = size
- return size
- def _split_file(file, num_lines):
- offset = file["offset"]
- content = file["content"]
- name = file["name"]
- f1 = {"offset": offset, "content": content[:num_lines], "name": name}
- f2 = {
- "offset": offset + num_lines,
- "content": content[num_lines:],
- "name": name,
- }
- return f1, f2
- def _num_lines_from_num_bytes(file, num_bytes):
- size = 0
- num_lines = 0
- content = file["content"]
- while num_lines < len(content):
- size += _str_size(content[num_lines])
- if size > num_bytes:
- break
- num_lines += 1
- return num_lines
- files_stack = []
- for k, v in files.items():
- if isinstance(v, list):
- for item in v:
- files_stack.append(
- {"name": k, "offset": item["offset"], "content": item["content"]}
- )
- else:
- files_stack.append(
- {"name": k, "offset": v["offset"], "content": v["content"]}
- )
- while files_stack:
- f = files_stack.pop()
- if f["name"] in current_volume:
- files_stack.append(f)
- yield current_volume
- current_volume = {}
- current_size = 0
- continue
- # For each file, we have to do 1 of 4 things:
- # - Add the file as such to the current volume if possible.
- # - Split the file and add the first part to the current volume and push the second part back onto the stack.
- # - If that's not possible, check if current volume is empty:
- # - If empty, add first line of file to current volume and push rest onto stack (This volume will exceed MAX_MB).
- # - If not, push file back to stack and yield current volume.
- fsize = _file_size(f)
- rem = max_bytes - current_size
- if fsize <= rem:
- current_volume[f["name"]] = {
- "offset": f["offset"],
- "content": f["content"],
- }
- current_size += fsize
- else:
- num_lines = _num_lines_from_num_bytes(f, rem)
- if not num_lines and not current_volume:
- num_lines = 1
- if num_lines:
- f1, f2 = _split_file(f, num_lines)
- current_volume[f1["name"]] = {
- "offset": f1["offset"],
- "content": f1["content"],
- }
- files_stack.append(f2)
- yield current_volume
- current_volume = {}
- current_size = 0
- continue
- else:
- files_stack.append(f)
- yield current_volume
- current_volume = {}
- current_size = 0
- continue
- if current_size >= max_bytes:
- yield current_volume
- current_volume = {}
- current_size = 0
- continue
- if current_volume:
- yield current_volume
|