spinner_compiler.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. import operator
  2. import random
  3. import time
  4. from inspect import signature
  5. from itertools import chain, count, islice, repeat
  6. from types import SimpleNamespace
  7. from about_time import about_time
  8. from .utils import fix_signature
  9. from ..utils import terminal
  10. from ..utils.cells import fix_cells, is_wide, join_cells, strip_marks, to_cells
  11. from ..utils.colors import BLUE, BLUE_BOLD, CYAN, DIM, GREEN, ORANGE, ORANGE_BOLD, RED, YELLOW_BOLD
  12. def spinner_controller(*, natural, skip_compiler=False):
  13. def inner_controller(spinner_inner_factory, op_params=None, extra_commands=None):
  14. def spinner_compiler_dispatcher_factory(actual_length=None):
  15. """Compile this spinner factory into an actual spinner runner.
  16. The previous parameters were the styling parameters, which defined a style.
  17. These are called operational parameters, which `alive_progress` binds dynamically
  18. as needed. Do not call this manually.
  19. Args:
  20. actual_length (int): the actual length to compile the frames renditions
  21. Returns:
  22. a spinner runner
  23. """
  24. if skip_compiler:
  25. return spinner_inner_factory(actual_length, **op_params)
  26. with about_time() as t_compile:
  27. gen = spinner_inner_factory(actual_length, **op_params)
  28. spec = spinner_compiler(gen, natural, extra_commands.get(True, ()))
  29. return spinner_runner_factory(spec, t_compile, extra_commands.get(False, ()))
  30. def compile_and_check(*args, **kwargs): # pragma: no cover
  31. """Compile this spinner factory at its natural length, and..."""
  32. spinner_compiler_dispatcher_factory().check(*args, **kwargs)
  33. def set_operational(**params):
  34. signature(spinner_inner_factory).bind(1, **params) # test arguments (one is provided).
  35. return inner_controller(spinner_inner_factory, params, extra_commands)
  36. def schedule_command(command):
  37. def inner_schedule(*args, **kwargs):
  38. signature(command).bind(1, *args, **kwargs) # test arguments (one is provided).
  39. extra, cmd_type = dict(extra_commands), EXTRA_COMMANDS[command]
  40. extra[cmd_type] = extra.get(cmd_type, ()) + ((command, args, kwargs),)
  41. return inner_controller(spinner_inner_factory, op_params, extra)
  42. return fix_signature(inner_schedule, command, 1)
  43. spinner_compiler_dispatcher_factory.__dict__.update(
  44. check=fix_signature(compile_and_check, check, 1), op=set_operational,
  45. **{c.__name__: schedule_command(c) for c in EXTRA_COMMANDS},
  46. )
  47. op_params, extra_commands = op_params or {}, extra_commands or {}
  48. spinner_compiler_dispatcher_factory.natural = natural # share with the spinner code.
  49. return spinner_compiler_dispatcher_factory
  50. return inner_controller
  51. """
  52. The commands here are made available in the compiler controller, thus in all spinners.
  53. They work lazily: when called they only schedule themselves to be run when the spinner
  54. gets compiled, i.e., when it receives the operational parameters like `actual_length`.
  55. They can take place inside the compiler or inside the runner.
  56. Compiler commands can change the data at will, before the animation specs are computed.
  57. Runner commands can only change presentation order.
  58. """
  59. def extra_command(is_compiler):
  60. def inner_command(command):
  61. EXTRA_COMMANDS[command] = is_compiler
  62. return command
  63. return inner_command
  64. EXTRA_COMMANDS = {}
  65. compiler_command, runner_command = extra_command(True), extra_command(False)
  66. @compiler_command
  67. def replace(spec, old, new): # noqa
  68. """Replace a portion of the frames by another with the same length.
  69. Args:
  70. old (str): the old string to be replaced
  71. new (str): the new string
  72. """
  73. # different lengths could lead to broken frames, but they will be verified afterwards.
  74. spec.data = tuple(tuple(
  75. to_cells(join_cells(frame).replace(old, new)) for frame in cycle
  76. ) for cycle in spec.data)
  77. @compiler_command
  78. def pause(spec, edges=None, center=None, other=None): # noqa
  79. """Make the animation appear to pause at the edges or at the middle, or make it slower as
  80. a whole, or both.
  81. Use without arguments to get their defaults, which gives a small pause at the edges,
  82. very nice for bouncing text with `hide=False`. Please note that the defaults only apply
  83. if none of the params are set.
  84. In the future, I'd like to make this a `pace` command, which would receive a sequence
  85. of ints of any length, and apply it bouncing across the cycle. For example to smoothly
  86. decelerate it could be (6, 3, 2, 1), which would become (6, 3, 2, 1, 1, ..., 1, 2, 3, 6).
  87. Args:
  88. edges (Optional[int]): how many times the first and last frames of a cycle repeats
  89. default is 8.
  90. center (Optional[int]): how many times the middle frame of a cycle repeats
  91. default is 1.
  92. other (Optional[int]): how many times all the other frames of a cycle repeats
  93. default is 1.
  94. """
  95. edges, center, other = (max(1, x or 1) for x in (edges, center, other))
  96. if all(x == 1 for x in (edges, center, other)):
  97. edges, center, other = 8, 1, 1
  98. def repeats_func(length):
  99. return {
  100. 0: edges,
  101. length - 1: edges,
  102. round(length / 2): center,
  103. }
  104. spec.data = tuple(tuple(chain.from_iterable(
  105. repeat(frame, repeats.get(i) or other) for i, frame in enumerate(cycle)
  106. )) for cycle, repeats in ((cycle, repeats_func(len(cycle))) for cycle in spec.data))
  107. @compiler_command
  108. def reshape(spec, num_frames): # noqa
  109. """Reshape frame data into another grouping. It can be used to simplify content
  110. description, or for artistic effects.
  111. Args:
  112. num_frames (int): the number of consecutive frames to group
  113. """
  114. flatten = chain.from_iterable(cycle for cycle in spec.data)
  115. spec.data = tuple(iter(lambda: tuple(islice(flatten, num_frames)), ()))
  116. @compiler_command
  117. def bounce(spec):
  118. """Make the animation bounce its cycles."""
  119. spec.data = tuple(chain(spec.data, spec.data[-2:0:-1]))
  120. @compiler_command
  121. def transpose(spec):
  122. """Transpose the frame content matrix, exchanging columns for rows. It can be used
  123. to simplify content description, or for artistic effects."""
  124. spec.data = tuple(tuple(cycle) for cycle in zip(*spec.data))
  125. @runner_command
  126. def sequential(spec):
  127. """Configure the runner to play the compiled cycles in sequential order."""
  128. def cycle_data(data):
  129. while True:
  130. yield from data
  131. cycle_data.name = 'sequential'
  132. spec.__dict__.update(strategy=cycle_data, cycles=len(spec.data))
  133. @runner_command
  134. def randomize(spec, cycles=None): # noqa
  135. """Configure the runner to play the compiled cycles in random order.
  136. Args:
  137. cycles (Optional[int]): number of cycles to play randomized
  138. """
  139. def cycle_data(data):
  140. while True:
  141. yield random.choice(data)
  142. cycle_data.name = 'randomized'
  143. spec.__dict__.update(strategy=cycle_data, cycles=max(0, cycles or 0) or spec.cycles)
  144. def apply_extra_commands(spec, extra_commands): # pragma: no cover
  145. for command, args, kwargs in extra_commands:
  146. command(spec, *args, **kwargs)
  147. def spinner_compiler(gen, natural, extra_commands):
  148. """Optimized spinner compiler, which compiles ahead of time all frames of all cycles
  149. of a spinner.
  150. Args:
  151. gen (Generator): the generator expressions that defines the cycles and their frames
  152. natural (int): the natural length of the spinner
  153. extra_commands (tuple[tuple[cmd, list[Any], dict[Any]]]): requested extra commands
  154. Returns:
  155. the spec of a compiled animation
  156. """
  157. spec = SimpleNamespace(
  158. data=tuple(tuple(fix_cells(frame) for frame in cycle) for cycle in gen), natural=natural)
  159. apply_extra_commands(spec, extra_commands)
  160. # generate spec info.
  161. frames = tuple(len(cycle) for cycle in spec.data)
  162. spec.__dict__.update(cycles=len(spec.data), length=len(spec.data[0][0]),
  163. frames=frames, total_frames=sum(frames))
  164. assert (max(len(frame) for cycle in spec.data for frame in cycle) ==
  165. min(len(frame) for cycle in spec.data for frame in cycle)), \
  166. render_data(spec, True) or 'Different cell lengths detected in frame data.'
  167. return spec
  168. def spinner_runner_factory(spec, t_compile, extra_commands):
  169. """Optimized spinner runner, which receives the spec of an animation, and controls
  170. the flow of cycles and frames already compiled to a certain screen length and with
  171. wide chars fixed, thus avoiding any overhead in runtime within complex spinners,
  172. while allowing their factories to be garbage collected.
  173. Args:
  174. spec (SimpleNamespace): the spec of an animation
  175. t_compile (about_time.Handler): the compile time information
  176. extra_commands (tuple[tuple[cmd, list[Any], dict[Any]]]): requested extra commands
  177. Returns:
  178. a spinner runner
  179. """
  180. def spinner_runner():
  181. """Wow, you are really deep! This is the runner of a compiled spinner.
  182. Every time you call this function, a different generator will kick in,
  183. which yields the frames of the current animation cycle. Enjoy!"""
  184. yield from next(cycle_gen) # I love generators!
  185. def runner_check(*args, **kwargs): # pragma: no cover
  186. return check(spec, *args, **kwargs)
  187. spinner_runner.__dict__.update(spec.__dict__, check=fix_signature(runner_check, check, 1))
  188. spec.__dict__.update(t_compile=t_compile, runner=spinner_runner) # set after the update above.
  189. sequential(spec)
  190. apply_extra_commands(spec, extra_commands)
  191. cycle_gen = spec.strategy(spec.data)
  192. return spinner_runner
  193. def check(spec, verbosity=0): # noqa # pragma: no cover
  194. """Check the specs, contents, codepoints, and even the animation of this compiled spinner.
  195. Args:
  196. verbosity (int): change the verbosity level
  197. 0 for specs only (default)
  198. / \\
  199. / 3 to include animation
  200. / \\
  201. 1 to unfold frame data -------- 4 to unfold frame data
  202. | |
  203. 2 to reveal codepoints -------- 5 to reveal codepoints
  204. """
  205. verbosity = max(0, min(5, verbosity or 0))
  206. if verbosity in (1, 2, 4, 5):
  207. render_data(spec, verbosity in (2, 5))
  208. spec_data(spec) # spec_data here displays calculated frame data, always shown.
  209. duration = spec.t_compile.duration_human
  210. print(f'\nSpinner frames compiled in: {GREEN(duration)}')
  211. print(f'(call {HELP_MSG[verbosity]})')
  212. if verbosity in (3, 4, 5):
  213. animate(spec)
  214. def __check(p):
  215. return f'{BLUE(f".{check.__name__}(")}{BLUE_BOLD(p)}{BLUE(")")}'
  216. SECTION = ORANGE_BOLD
  217. HELP_MSG = {
  218. 0: f'{__check(1)} to unfold frame data, or {__check(3)} to include animation',
  219. 1: f'{__check(2)} to reveal codepoints, or {__check(4)} to include animation,'
  220. f' or {__check(0)} to fold up frame data',
  221. 2: f'{__check(5)} to include animation, or {__check(1)} to hide codepoints',
  222. 3: f'{__check(4)} to unfold frame data, or {__check(0)} to omit animation',
  223. 4: f'{__check(5)} to reveal codepoints, or {__check(1)} to omit animation,'
  224. f' or {__check(3)} to fold up frame data',
  225. 5: f'{__check(2)} to omit animation, or {__check(4)} to hide codepoints',
  226. }
  227. def spec_data(spec): # pragma: no cover
  228. def info(field):
  229. return f'{YELLOW_BOLD(field.split(".")[0])}: {operator.attrgetter(field)(spec)}'
  230. print(f'\n{SECTION("Specs")}')
  231. print(info('length'), f'({info("natural")})')
  232. print(info('cycles'), f'({info("strategy.name")})')
  233. print('\n'.join(info(field) for field in ('frames', 'total_frames')))
  234. def format_codepoints(frame): # pragma: no cover
  235. codes = '|'.join((ORANGE if is_wide(g) else BLUE)(
  236. ' '.join(hex(ord(c)).replace('0x', '') for c in g)) for g in frame)
  237. return f" -> {RED(sum(len(fragment) for fragment in frame))}:[{codes}]"
  238. def render_data(spec, show_codepoints): # pragma: no cover
  239. print(f'\n{SECTION("Frame data")}', end='')
  240. whole_index = count(1)
  241. lf, wf = f'>{1 + len(str(max(spec.frames)))}', f'<{len(str(spec.total_frames))}'
  242. codepoints = format_codepoints if show_codepoints else lambda _: ''
  243. for i, cycle in enumerate(spec.data, 1):
  244. frames = map(lambda fragment: tuple(strip_marks(fragment)), cycle)
  245. print(f'\ncycle {i}\n' + '\n'.join(
  246. DIM(li, lf) + f' |{"".join(frame)}| {DIM(wi, wf)}' + codepoints(frame)
  247. for li, frame, wi in zip(count(1), frames, whole_index)
  248. ))
  249. def animate(spec): # pragma: no cover
  250. print(f'\n{SECTION("Animation")}')
  251. cf, lf, tf = (f'>{len(str(x))}' for x in (spec.cycles, max(spec.frames), spec.total_frames))
  252. from itertools import cycle
  253. cycles, frames = cycle(range(1, spec.cycles + 1)), cycle(range(1, spec.total_frames + 1))
  254. term = terminal.get_term()
  255. term.hide_cursor()
  256. try:
  257. while True:
  258. c = next(cycles)
  259. for i, f in enumerate(spec.runner(), 1):
  260. n = next(frames)
  261. print(f'\r{CYAN(c, cf)}:{CYAN(i, lf)} -->{join_cells(f)}<-- {CYAN(n, tf)} ')
  262. print(DIM('(press CTRL+C to stop)'), end='')
  263. term.clear_end_line()
  264. time.sleep(1 / 15)
  265. term.cursor_up_1()
  266. except KeyboardInterrupt:
  267. pass
  268. finally:
  269. term.show_cursor()