util.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. import json
  2. import re
  3. import threading
  4. from typing import Dict
  5. from ray._common.usage.usage_lib import TagKey, record_extra_usage_tag
  6. from ray.data._internal.logical.interfaces import LogicalOperator
  7. from ray.data._internal.logical.operators import AbstractUDFMap, Read, Write
  8. # The dictionary for the operator name and count.
  9. _recorded_operators = dict()
  10. _recorded_operators_lock = threading.Lock()
  11. # The white list of operator names allowed to be recorded.
  12. _op_name_white_list = [
  13. # Read
  14. "ReadBigQuery",
  15. "ReadRange",
  16. "ReadMongo",
  17. "ReadParquet",
  18. "ReadParquetBulk",
  19. "ReadImage",
  20. "ReadJSON",
  21. "ReadCSV",
  22. "ReadText",
  23. "ReadNumpy",
  24. "ReadTFRecord",
  25. "ReadBinary",
  26. "ReadTorch",
  27. "ReadAvro",
  28. "ReadWebDataset",
  29. "ReadSQL",
  30. "ReadDatabricksUC",
  31. "ReadLance",
  32. "ReadHuggingFace",
  33. "ReadCustom",
  34. # From
  35. "FromArrow",
  36. "FromItems",
  37. "FromNumpy",
  38. "FromPandas",
  39. # Write
  40. "WriteBigQuery",
  41. "WriteParquet",
  42. "WriteJSON",
  43. "WriteCSV",
  44. "WriteTFRecord",
  45. "WriteNumpy",
  46. "WriteMongo",
  47. "WriteWebDataset",
  48. "WriteSQL",
  49. "WriteCustom",
  50. # Map
  51. "Map",
  52. "MapBatches",
  53. "Filter",
  54. "FlatMap",
  55. # All-to-all
  56. "RandomizeBlockOrder",
  57. "RandomShuffle",
  58. "Repartition",
  59. "Sort",
  60. "Aggregate",
  61. # N-ary
  62. "Zip",
  63. "Union",
  64. ]
  65. def record_operators_usage(op: LogicalOperator):
  66. """Record logical operator usage with Ray telemetry."""
  67. ops_dict = dict()
  68. _collect_operators_to_dict(op, ops_dict)
  69. ops_json_str = ""
  70. with _recorded_operators_lock:
  71. for op, count in ops_dict.items():
  72. _recorded_operators.setdefault(op, 0)
  73. _recorded_operators[op] += count
  74. ops_json_str = json.dumps(_recorded_operators)
  75. record_extra_usage_tag(TagKey.DATA_LOGICAL_OPS, ops_json_str)
  76. def _collect_operators_to_dict(op: LogicalOperator, ops_dict: Dict[str, int]):
  77. """Collect the logical operator name and count into `ops_dict`."""
  78. for child in op.input_dependencies:
  79. _collect_operators_to_dict(child, ops_dict)
  80. op_name = op.name
  81. # Check read and write operator, and anonymize user-defined data source.
  82. if isinstance(op, Read):
  83. op_name = f"Read{op.datasource.get_name()}"
  84. if op_name not in _op_name_white_list:
  85. op_name = "ReadCustom"
  86. elif isinstance(op, Write):
  87. op_name = f"Write{op.datasink_or_legacy_datasource.get_name()}"
  88. if op_name not in _op_name_white_list:
  89. op_name = "WriteCustom"
  90. elif isinstance(op, AbstractUDFMap):
  91. # Remove the function name from the map operator name.
  92. # E.g., Map(<lambda>) -> Map
  93. op_name = re.sub("\\(.*\\)$", "", op_name)
  94. # Anonymize any operator name if not in white list.
  95. if op_name not in _op_name_white_list:
  96. op_name = "Unknown"
  97. ops_dict.setdefault(op_name, 0)
  98. ops_dict[op_name] += 1