| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553 |
- from __future__ import annotations
- import functools
- from abc import ABC, abstractmethod
- from dataclasses import dataclass, field
- from enum import Enum
- from typing import (
- TYPE_CHECKING,
- Any,
- Callable,
- Dict,
- Generic,
- List,
- Optional,
- Tuple,
- Type,
- TypeVar,
- Union,
- )
- import pyarrow
- import pyarrow.compute as pc
- from ray.data.block import BatchColumn
- from ray.data.datatype import DataType
- from ray.util.annotations import DeveloperAPI, PublicAPI
- if TYPE_CHECKING:
- from ray.data.namespace_expressions.arr_namespace import _ArrayNamespace
- from ray.data.namespace_expressions.dt_namespace import _DatetimeNamespace
- from ray.data.namespace_expressions.list_namespace import _ListNamespace
- from ray.data.namespace_expressions.string_namespace import _StringNamespace
- from ray.data.namespace_expressions.struct_namespace import _StructNamespace
- T = TypeVar("T")
- UDFCallable = Callable[..., "UDFExpr"]
- Decorated = Union[UDFCallable, Type[T]]
- @DeveloperAPI(stability="alpha")
- class Operation(Enum):
- """Enumeration of supported operations in expressions.
- This enum defines all the binary operations that can be performed
- between expressions, including arithmetic, comparison, and boolean operations.
- Attributes:
- ADD: Addition operation (+)
- SUB: Subtraction operation (-)
- MUL: Multiplication operation (*)
- DIV: Division operation (/)
- FLOORDIV: Floor division operation (//)
- GT: Greater than comparison (>)
- LT: Less than comparison (<)
- GE: Greater than or equal comparison (>=)
- LE: Less than or equal comparison (<=)
- EQ: Equality comparison (==)
- NE: Not equal comparison (!=)
- AND: Logical AND operation (&)
- OR: Logical OR operation (|)
- NOT: Logical NOT operation (~)
- IS_NULL: Check if value is null
- IS_NOT_NULL: Check if value is not null
- IN: Check if value is in a list
- NOT_IN: Check if value is not in a list
- """
- ADD = "add"
- SUB = "sub"
- MUL = "mul"
- DIV = "div"
- MOD = "mod"
- FLOORDIV = "floordiv"
- GT = "gt"
- LT = "lt"
- GE = "ge"
- LE = "le"
- EQ = "eq"
- NE = "ne"
- AND = "and"
- OR = "or"
- NOT = "not"
- IS_NULL = "is_null"
- IS_NOT_NULL = "is_not_null"
- IN = "in"
- NOT_IN = "not_in"
- class _ExprVisitor(ABC, Generic[T]):
- """Base visitor with generic dispatch for Ray Data expressions."""
- def visit(self, expr: "Expr") -> T:
- if isinstance(expr, ColumnExpr):
- return self.visit_column(expr)
- elif isinstance(expr, LiteralExpr):
- return self.visit_literal(expr)
- elif isinstance(expr, BinaryExpr):
- return self.visit_binary(expr)
- elif isinstance(expr, UnaryExpr):
- return self.visit_unary(expr)
- elif isinstance(expr, AliasExpr):
- return self.visit_alias(expr)
- elif isinstance(expr, UDFExpr):
- return self.visit_udf(expr)
- elif isinstance(expr, DownloadExpr):
- return self.visit_download(expr)
- elif isinstance(expr, StarExpr):
- return self.visit_star(expr)
- else:
- raise TypeError(f"Unsupported expression type for conversion: {type(expr)}")
- @abstractmethod
- def visit_column(self, expr: "ColumnExpr") -> T:
- pass
- @abstractmethod
- def visit_literal(self, expr: "LiteralExpr") -> T:
- pass
- @abstractmethod
- def visit_binary(self, expr: "BinaryExpr") -> T:
- pass
- @abstractmethod
- def visit_unary(self, expr: "UnaryExpr") -> T:
- pass
- @abstractmethod
- def visit_alias(self, expr: "AliasExpr") -> T:
- pass
- @abstractmethod
- def visit_udf(self, expr: "UDFExpr") -> T:
- pass
- @abstractmethod
- def visit_star(self, expr: "StarExpr") -> T:
- pass
- @abstractmethod
- def visit_download(self, expr: "DownloadExpr") -> T:
- pass
- class _PyArrowExpressionVisitor(_ExprVisitor["pyarrow.compute.Expression"]):
- """Visitor that converts Ray Data expressions to PyArrow compute expressions."""
- def visit_column(self, expr: "ColumnExpr") -> "pyarrow.compute.Expression":
- return pc.field(expr.name)
- def visit_literal(self, expr: "LiteralExpr") -> "pyarrow.compute.Expression":
- return pc.scalar(expr.value)
- def visit_binary(self, expr: "BinaryExpr") -> "pyarrow.compute.Expression":
- import pyarrow as pa
- if expr.op in (Operation.IN, Operation.NOT_IN):
- left = self.visit(expr.left)
- if isinstance(expr.right, LiteralExpr):
- right_value = expr.right.value
- right = (
- pa.array(right_value)
- if isinstance(right_value, list)
- else pa.array([right_value])
- )
- else:
- raise ValueError(
- f"is_in/not_in operations require the right operand to be a "
- f"literal list, got {type(expr.right).__name__}."
- )
- result = pc.is_in(left, right)
- return pc.invert(result) if expr.op == Operation.NOT_IN else result
- left = self.visit(expr.left)
- right = self.visit(expr.right)
- from ray.data._internal.planner.plan_expression.expression_evaluator import (
- _ARROW_EXPR_OPS_MAP,
- )
- if expr.op in _ARROW_EXPR_OPS_MAP:
- return _ARROW_EXPR_OPS_MAP[expr.op](left, right)
- raise ValueError(f"Unsupported binary operation for PyArrow: {expr.op}")
- def visit_unary(self, expr: "UnaryExpr") -> "pyarrow.compute.Expression":
- operand = self.visit(expr.operand)
- from ray.data._internal.planner.plan_expression.expression_evaluator import (
- _ARROW_EXPR_OPS_MAP,
- )
- if expr.op in _ARROW_EXPR_OPS_MAP:
- return _ARROW_EXPR_OPS_MAP[expr.op](operand)
- raise ValueError(f"Unsupported unary operation for PyArrow: {expr.op}")
- def visit_alias(self, expr: "AliasExpr") -> "pyarrow.compute.Expression":
- return self.visit(expr.expr)
- def visit_udf(self, expr: "UDFExpr") -> "pyarrow.compute.Expression":
- raise TypeError("UDF expressions cannot be converted to PyArrow expressions")
- def visit_download(self, expr: "DownloadExpr") -> "pyarrow.compute.Expression":
- raise TypeError(
- "Download expressions cannot be converted to PyArrow expressions"
- )
- def visit_star(self, expr: "StarExpr") -> "pyarrow.compute.Expression":
- raise TypeError("Star expressions cannot be converted to PyArrow expressions")
- @DeveloperAPI(stability="alpha")
- @dataclass(frozen=True)
- class Expr(ABC):
- """Base class for all expression nodes.
- This is the abstract base class that all expression types inherit from.
- It provides operator overloads for building complex expressions using
- standard Python operators.
- Expressions form a tree structure where each node represents an operation
- or value. The tree can be evaluated against data batches to compute results.
- Example:
- >>> from ray.data.expressions import col, lit
- >>> # Create an expression tree: (col("x") + 5) * col("y")
- >>> expr = (col("x") + lit(5)) * col("y")
- >>> # This creates a BinaryExpr with operation=MUL
- >>> # left=BinaryExpr(op=ADD, left=ColumnExpr("x"), right=LiteralExpr(5))
- >>> # right=ColumnExpr("y")
- Note:
- This class should not be instantiated directly. Use the concrete
- subclasses like ColumnExpr, LiteralExpr, etc.
- """
- data_type: DataType
- @property
- def name(self) -> str | None:
- """Get the name associated with this expression.
- Returns:
- The name for expressions that have one (ColumnExpr, AliasExpr),
- None otherwise.
- """
- return None
- @abstractmethod
- def structurally_equals(self, other: Any) -> bool:
- """Compare two expression ASTs for structural equality."""
- raise NotImplementedError
- def to_pyarrow(self) -> "pyarrow.compute.Expression":
- """Convert this Ray Data expression to a PyArrow compute expression.
- Returns:
- A PyArrow compute expression equivalent to this Ray Data expression.
- Raises:
- ValueError: If the expression contains operations not supported by PyArrow.
- TypeError: If the expression type cannot be converted to PyArrow.
- """
- return _PyArrowExpressionVisitor().visit(self)
- def __repr__(self) -> str:
- """Return a tree-structured string representation of the expression.
- Returns:
- A multi-line string showing the expression tree structure using
- box-drawing characters for visual clarity.
- Example:
- >>> from ray.data.expressions import col, lit
- >>> expr = (col("x") + lit(5)) * col("y")
- >>> print(expr)
- MUL
- ├── left: ADD
- │ ├── left: COL('x')
- │ └── right: LIT(5)
- └── right: COL('y')
- """
- from ray.data._internal.planner.plan_expression.expression_visitors import (
- _TreeReprVisitor,
- )
- return _TreeReprVisitor().visit(self)
- def _bin(self, other: Any, op: Operation) -> "Expr":
- """Create a binary expression with the given operation.
- Args:
- other: The right operand expression or literal value
- op: The operation to perform
- Returns:
- A new BinaryExpr representing the operation
- Note:
- If other is not an Expr, it will be automatically converted to a LiteralExpr.
- """
- if not isinstance(other, Expr):
- other = LiteralExpr(other)
- return BinaryExpr(op, self, other)
- #
- # Arithmetic ops
- #
- def __add__(self, other: Any) -> "Expr":
- """Addition operator (+)."""
- return self._bin(other, Operation.ADD)
- def __radd__(self, other: Any) -> "Expr":
- """Reverse addition operator (for literal + expr)."""
- return LiteralExpr(other)._bin(self, Operation.ADD)
- def __sub__(self, other: Any) -> "Expr":
- """Subtraction operator (-)."""
- return self._bin(other, Operation.SUB)
- def __rsub__(self, other: Any) -> "Expr":
- """Reverse subtraction operator (for literal - expr)."""
- return LiteralExpr(other)._bin(self, Operation.SUB)
- def __mul__(self, other: Any) -> "Expr":
- """Multiplication operator (*)."""
- return self._bin(other, Operation.MUL)
- def __rmul__(self, other: Any) -> "Expr":
- """Reverse multiplication operator (for literal * expr)."""
- return LiteralExpr(other)._bin(self, Operation.MUL)
- def __mod__(self, other: Any):
- """Modulation operator (%)."""
- return self._bin(other, Operation.MOD)
- def __rmod__(self, other: Any):
- """Modulation operator (%)."""
- return LiteralExpr(other)._bin(self, Operation.MOD)
- def __truediv__(self, other: Any) -> "Expr":
- """Division operator (/)."""
- return self._bin(other, Operation.DIV)
- def __rtruediv__(self, other: Any) -> "Expr":
- """Reverse division operator (for literal / expr)."""
- return LiteralExpr(other)._bin(self, Operation.DIV)
- def __floordiv__(self, other: Any) -> "Expr":
- """Floor division operator (//)."""
- return self._bin(other, Operation.FLOORDIV)
- def __rfloordiv__(self, other: Any) -> "Expr":
- """Reverse floor division operator (for literal // expr)."""
- return LiteralExpr(other)._bin(self, Operation.FLOORDIV)
- # comparison
- def __gt__(self, other: Any) -> "Expr":
- """Greater than operator (>)."""
- return self._bin(other, Operation.GT)
- def __lt__(self, other: Any) -> "Expr":
- """Less than operator (<)."""
- return self._bin(other, Operation.LT)
- def __ge__(self, other: Any) -> "Expr":
- """Greater than or equal operator (>=)."""
- return self._bin(other, Operation.GE)
- def __le__(self, other: Any) -> "Expr":
- """Less than or equal operator (<=)."""
- return self._bin(other, Operation.LE)
- def __eq__(self, other: Any) -> "Expr":
- """Equality operator (==)."""
- return self._bin(other, Operation.EQ)
- def __ne__(self, other: Any) -> "Expr":
- """Not equal operator (!=)."""
- return self._bin(other, Operation.NE)
- # boolean
- def __and__(self, other: Any) -> "Expr":
- """Logical AND operator (&)."""
- return self._bin(other, Operation.AND)
- def __or__(self, other: Any) -> "Expr":
- """Logical OR operator (|)."""
- return self._bin(other, Operation.OR)
- def __invert__(self) -> "Expr":
- """Logical NOT operator (~)."""
- return UnaryExpr(Operation.NOT, self)
- # predicate methods
- def is_null(self) -> "Expr":
- """Check if the expression value is null."""
- return UnaryExpr(Operation.IS_NULL, self)
- def is_not_null(self) -> "Expr":
- """Check if the expression value is not null."""
- return UnaryExpr(Operation.IS_NOT_NULL, self)
- def is_in(self, values: Union[List[Any], "Expr"]) -> "Expr":
- """Check if the expression value is in a list of values."""
- if not isinstance(values, Expr):
- values = LiteralExpr(values)
- return self._bin(values, Operation.IN)
- def not_in(self, values: Union[List[Any], "Expr"]) -> "Expr":
- """Check if the expression value is not in a list of values."""
- if not isinstance(values, Expr):
- values = LiteralExpr(values)
- return self._bin(values, Operation.NOT_IN)
- def alias(self, name: str) -> "Expr":
- """Rename the expression.
- This method allows you to assign a new name to an expression result.
- This is particularly useful when you want to specify the output column name
- directly within the expression rather than as a separate parameter.
- Args:
- name: The new name for the expression
- Returns:
- An AliasExpr that wraps this expression with the specified name
- Example:
- >>> from ray.data.expressions import col, lit
- >>> # Create an expression with a new aliased name
- >>> expr = (col("price") * col("quantity")).alias("total")
- >>> # Can be used with Dataset operations that support named expressions
- """
- return AliasExpr(
- data_type=self.data_type, expr=self, _name=name, _is_rename=False
- )
- # rounding helpers
- def ceil(self) -> "UDFExpr":
- """Round values up to the nearest integer."""
- return _create_pyarrow_compute_udf(pc.ceil)(self)
- def floor(self) -> "UDFExpr":
- """Round values down to the nearest integer."""
- return _create_pyarrow_compute_udf(pc.floor)(self)
- def round(self) -> "UDFExpr":
- """Round values to the nearest integer using PyArrow semantics."""
- return _create_pyarrow_compute_udf(pc.round)(self)
- def trunc(self) -> "UDFExpr":
- """Truncate fractional values toward zero."""
- return _create_pyarrow_compute_udf(pc.trunc)(self)
- # logarithmic helpers
- def ln(self) -> "UDFExpr":
- """Compute the natural logarithm of the expression."""
- return _create_pyarrow_compute_udf(pc.ln, return_dtype=DataType.float64())(self)
- def log10(self) -> "UDFExpr":
- """Compute the base-10 logarithm of the expression."""
- return _create_pyarrow_compute_udf(pc.log10, return_dtype=DataType.float64())(
- self
- )
- def log2(self) -> "UDFExpr":
- """Compute the base-2 logarithm of the expression."""
- return _create_pyarrow_compute_udf(pc.log2, return_dtype=DataType.float64())(
- self
- )
- def exp(self) -> "UDFExpr":
- """Compute the natural exponential of the expression."""
- return _create_pyarrow_compute_udf(pc.exp, return_dtype=DataType.float64())(
- self
- )
- # trigonometric helpers
- def sin(self) -> "UDFExpr":
- """Compute the sine of the expression (in radians)."""
- return _create_pyarrow_compute_udf(pc.sin, return_dtype=DataType.float64())(
- self
- )
- def cos(self) -> "UDFExpr":
- """Compute the cosine of the expression (in radians)."""
- return _create_pyarrow_compute_udf(pc.cos, return_dtype=DataType.float64())(
- self
- )
- def tan(self) -> "UDFExpr":
- """Compute the tangent of the expression (in radians)."""
- return _create_pyarrow_compute_udf(pc.tan, return_dtype=DataType.float64())(
- self
- )
- def asin(self) -> "UDFExpr":
- """Compute the arcsine (inverse sine) of the expression, returning radians."""
- return _create_pyarrow_compute_udf(pc.asin, return_dtype=DataType.float64())(
- self
- )
- def acos(self) -> "UDFExpr":
- """Compute the arccosine (inverse cosine) of the expression, returning radians."""
- return _create_pyarrow_compute_udf(pc.acos, return_dtype=DataType.float64())(
- self
- )
- def atan(self) -> "UDFExpr":
- """Compute the arctangent (inverse tangent) of the expression, returning radians."""
- return _create_pyarrow_compute_udf(pc.atan, return_dtype=DataType.float64())(
- self
- )
- # arithmetic helpers
- def negate(self) -> "UDFExpr":
- """Compute the negation of the expression.
- Returns:
- A UDFExpr that computes the negation (multiplies values by -1).
- Example:
- >>> from ray.data.expressions import col
- >>> import ray
- >>> ds = ray.data.from_items([{"x": 5}, {"x": -3}])
- >>> ds = ds.with_column("neg_x", col("x").negate())
- >>> # Result: neg_x = [-5, 3]
- """
- return _create_pyarrow_compute_udf(pc.negate_checked)(self)
- def sign(self) -> "UDFExpr":
- """Compute the sign of the expression.
- Returns:
- A UDFExpr that returns -1 for negative values, 0 for zero, and 1 for positive values.
- Example:
- >>> from ray.data.expressions import col
- >>> import ray
- >>> ds = ray.data.from_items([{"x": 5}, {"x": -3}, {"x": 0}])
- >>> ds = ds.with_column("sign_x", col("x").sign())
- >>> # Result: sign_x = [1, -1, 0]
- """
- return _create_pyarrow_compute_udf(pc.sign)(self)
- def power(self, exponent: Any) -> "UDFExpr":
- """Raise the expression to the given power.
- Args:
- exponent: The exponent to raise the expression to.
- Returns:
- A UDFExpr that computes the power operation.
- Example:
- >>> from ray.data.expressions import col, lit
- >>> import ray
- >>> ds = ray.data.from_items([{"x": 2}, {"x": 3}])
- >>> ds = ds.with_column("x_squared", col("x").power(2))
- >>> # Result: x_squared = [4, 9]
- >>> ds = ds.with_column("x_cubed", col("x").power(3))
- >>> # Result: x_cubed = [8, 27]
- """
- return _create_pyarrow_compute_udf(pc.power)(self, exponent)
- def abs(self) -> "UDFExpr":
- """Compute the absolute value of the expression.
- Returns:
- A UDFExpr that computes the absolute value.
- Example:
- >>> from ray.data.expressions import col
- >>> import ray
- >>> ds = ray.data.from_items([{"x": 5}, {"x": -3}])
- >>> ds = ds.with_column("abs_x", col("x").abs())
- >>> # Result: abs_x = [5, 3]
- """
- return _create_pyarrow_compute_udf(pc.abs_checked)(self)
- @property
- def arr(self) -> "_ArrayNamespace":
- """Access array operations for this expression."""
- from ray.data.namespace_expressions.arr_namespace import _ArrayNamespace
- return _ArrayNamespace(self)
- @property
- def list(self) -> "_ListNamespace":
- """Access list operations for this expression.
- Returns:
- A _ListNamespace that provides list-specific operations for both
- PyArrow ``List`` and ``FixedSizeList`` columns.
- Example:
- >>> from ray.data.expressions import col
- >>> import ray
- >>> ds = ray.data.from_items([
- ... {"items": [1, 2, 3]},
- ... {"items": [4, 5]}
- ... ])
- >>> ds = ds.with_column("num_items", col("items").list.len())
- >>> ds = ds.with_column("first_item", col("items").list[0])
- >>> ds = ds.with_column("slice", col("items").list[1:3])
- """
- from ray.data.namespace_expressions.list_namespace import _ListNamespace
- return _ListNamespace(self)
- @property
- def str(self) -> "_StringNamespace":
- """Access string operations for this expression.
- Returns:
- A _StringNamespace that provides string-specific operations.
- Example:
- >>> from ray.data.expressions import col
- >>> import ray
- >>> ds = ray.data.from_items([
- ... {"name": "Alice"},
- ... {"name": "Bob"}
- ... ])
- >>> ds = ds.with_column("upper_name", col("name").str.upper())
- >>> ds = ds.with_column("name_len", col("name").str.len())
- >>> ds = ds.with_column("starts_a", col("name").str.starts_with("A"))
- """
- from ray.data.namespace_expressions.string_namespace import _StringNamespace
- return _StringNamespace(self)
- @property
- def struct(self) -> "_StructNamespace":
- """Access struct operations for this expression.
- Returns:
- A _StructNamespace that provides struct-specific operations.
- Example:
- >>> from ray.data.expressions import col
- >>> import ray
- >>> import pyarrow as pa
- >>> ds = ray.data.from_arrow(pa.table({
- ... "user": pa.array([
- ... {"name": "Alice", "age": 30}
- ... ], type=pa.struct([
- ... pa.field("name", pa.string()),
- ... pa.field("age", pa.int32())
- ... ]))
- ... }))
- >>> ds = ds.with_column("age", col("user").struct["age"]) # doctest: +SKIP
- """
- from ray.data.namespace_expressions.struct_namespace import _StructNamespace
- return _StructNamespace(self)
- @property
- def dt(self) -> "_DatetimeNamespace":
- """Access datetime operations for this expression."""
- from ray.data.namespace_expressions.dt_namespace import _DatetimeNamespace
- return _DatetimeNamespace(self)
- def _unalias(self) -> "Expr":
- return self
- @DeveloperAPI(stability="alpha")
- @dataclass(frozen=True, eq=False, repr=False)
- class ColumnExpr(Expr):
- """Expression that references a column by name.
- This expression type represents a reference to an existing column
- in the dataset. When evaluated, it returns the values from the
- specified column.
- Args:
- name: The name of the column to reference
- Example:
- >>> from ray.data.expressions import col
- >>> # Reference the "age" column
- >>> age_expr = col("age") # Creates ColumnExpr(name="age")
- """
- _name: str
- data_type: DataType = field(default_factory=lambda: DataType(object), init=False)
- @property
- def name(self) -> str:
- """Get the column name."""
- return self._name
- def _rename(self, name: str):
- return AliasExpr(self.data_type, self, name, _is_rename=True)
- def structurally_equals(self, other: Any) -> bool:
- return isinstance(other, ColumnExpr) and self.name == other.name
- @DeveloperAPI(stability="alpha")
- @dataclass(frozen=True, eq=False, repr=False)
- class LiteralExpr(Expr):
- """Expression that represents a constant scalar value.
- This expression type represents a literal value that will be broadcast
- to all rows when evaluated. The value can be any Python object.
- Args:
- value: The constant value to represent
- Example:
- >>> from ray.data.expressions import lit
- >>> import numpy as np
- >>> # Create a literal value
- >>> five = lit(5) # Creates LiteralExpr(value=5)
- >>> name = lit("John") # Creates LiteralExpr(value="John")
- >>> numpy_val = lit(np.int32(42)) # Creates LiteralExpr with numpy type
- """
- value: Any
- data_type: DataType = field(init=False)
- def __post_init__(self):
- # Infer the type from the value using DataType.infer_dtype
- inferred_dtype = DataType.infer_dtype(self.value)
- # Use object.__setattr__ since the dataclass is frozen
- object.__setattr__(self, "data_type", inferred_dtype)
- def structurally_equals(self, other: Any) -> bool:
- return (
- isinstance(other, LiteralExpr)
- and self.value == other.value
- and type(self.value) is type(other.value)
- )
- @DeveloperAPI(stability="alpha")
- @dataclass(frozen=True, eq=False, repr=False)
- class BinaryExpr(Expr):
- """Expression that represents a binary operation between two expressions.
- This expression type represents an operation with two operands (left and right).
- The operation is specified by the `op` field, which must be one of the
- supported operations from the Operation enum.
- Args:
- op: The operation to perform (from Operation enum)
- left: The left operand expression
- right: The right operand expression
- Example:
- >>> from ray.data.expressions import col, lit, Operation
- >>> # Manually create a binary expression (usually done via operators)
- >>> expr = BinaryExpr(Operation.ADD, col("x"), lit(5))
- >>> # This is equivalent to: col("x") + lit(5)
- """
- op: Operation
- left: Expr
- right: Expr
- data_type: DataType = field(default_factory=lambda: DataType(object), init=False)
- def structurally_equals(self, other: Any) -> bool:
- return (
- isinstance(other, BinaryExpr)
- and self.op is other.op
- and self.left.structurally_equals(other.left)
- and self.right.structurally_equals(other.right)
- )
- @DeveloperAPI(stability="alpha")
- @dataclass(frozen=True, eq=False, repr=False)
- class UnaryExpr(Expr):
- """Expression that represents a unary operation on a single expression.
- This expression type represents an operation with one operand.
- Common unary operations include logical NOT, IS NULL, IS NOT NULL, etc.
- Args:
- op: The operation to perform (from Operation enum)
- operand: The operand expression
- Example:
- >>> from ray.data.expressions import col
- >>> # Check if a column is null
- >>> expr = col("age").is_null() # Creates UnaryExpr(IS_NULL, col("age"))
- >>> # Logical not
- >>> expr = ~(col("active")) # Creates UnaryExpr(NOT, col("active"))
- """
- op: Operation
- operand: Expr
- # Default to bool return dtype for unary operations like is_null() and NOT.
- # This enables chaining operations such as col("x").is_not_null().alias("valid"),
- # where downstream expressions (like AliasExpr) need the data type.
- data_type: DataType = field(default_factory=lambda: DataType.bool(), init=False)
- def structurally_equals(self, other: Any) -> bool:
- return (
- isinstance(other, UnaryExpr)
- and self.op is other.op
- and self.operand.structurally_equals(other.operand)
- )
- @dataclass(frozen=True)
- class _CallableClassSpec:
- """Specification for a callable class UDF.
- This dataclass captures the class type and constructor arguments needed
- to instantiate a callable class UDF on an actor. It consolidates the
- callable class metadata that was previously spread across multiple fields.
- Attributes:
- cls: The original callable class type
- args: Positional arguments for the constructor
- kwargs: Keyword arguments for the constructor
- _cached_key: Pre-computed key that survives serialization
- """
- cls: type
- args: Tuple[Any, ...] = ()
- kwargs: Dict[str, Any] = field(default_factory=dict)
- _cached_key: Optional[Tuple] = field(default=None, compare=False, repr=False)
- def __post_init__(self):
- """Pre-compute and cache the key at construction time.
- This ensures the same key survives serialization, since the cached
- key tuple (containing the already-computed repr strings) gets pickled
- and unpickled as-is.
- """
- if self._cached_key is None:
- class_id = f"{self.cls.__module__}.{self.cls.__qualname__}"
- try:
- key = (
- class_id,
- self.args,
- tuple(sorted(self.kwargs.items())),
- )
- # Verify the key is actually hashable (args may contain lists)
- hash(key)
- except TypeError:
- # Fallback for unhashable args/kwargs - use repr for comparison
- key = (class_id, repr(self.args), repr(self.kwargs))
- # Use object.__setattr__ since dataclass is frozen
- object.__setattr__(self, "_cached_key", key)
- def make_key(self) -> Tuple:
- """Return the pre-computed hashable key for UDF instance lookup.
- The key uniquely identifies a UDF by its class and constructor arguments.
- This ensures that the same class with different constructor args
- (e.g., Multiplier(2) vs Multiplier(3)) are treated as distinct UDFs.
- Returns:
- A hashable tuple that uniquely identifies this UDF configuration.
- """
- return self._cached_key
- class _CallableClassUDF:
- """A wrapper that makes callable class UDFs appear as regular functions.
- This class wraps callable class UDFs for use in expressions. It provides
- an `init()` method that should be called at actor startup via `init_fn`
- to instantiate the underlying class before any blocks are processed.
- Key responsibilities:
- 1. Store the callable class and constructor arguments
- 2. Provide init() for actor startup initialization
- 3. Handle async bridging for coroutine/async generator UDFs
- 4. Reuse the same instance across all calls (actor semantics)
- Example:
- >>> @udf(return_dtype=DataType.int32())
- ... class AddOffset:
- ... def __init__(self, offset=1):
- ... self.offset = offset
- ... def __call__(self, x):
- ... return pc.add(x, self.offset)
- >>>
- >>> add_five = AddOffset(5) # Creates _CallableClassUDF internally
- >>> expr = add_five(col("value")) # Creates UDFExpr with fn=_CallableClassUDF
- """
- def __init__(
- self,
- cls: type,
- ctor_args: Tuple[Any, ...],
- ctor_kwargs: Dict[str, Any],
- return_dtype: DataType,
- ):
- """Initialize the _CallableClassUDF wrapper.
- Args:
- cls: The original callable class
- ctor_args: Constructor positional arguments
- ctor_kwargs: Constructor keyword arguments
- return_dtype: The return data type for schema inference
- """
- self._cls = cls
- self._ctor_args = ctor_args
- self._ctor_kwargs = ctor_kwargs
- self._return_dtype = return_dtype
- # Instance created by init() at actor startup
- self._instance = None
- # Cache the spec to avoid creating new instances on each access
- self._callable_class_spec = _CallableClassSpec(
- cls=cls,
- args=ctor_args,
- kwargs=ctor_kwargs,
- )
- @property
- def __name__(self) -> str:
- """Return the original class name for error messages."""
- return self._cls.__name__
- @property
- def callable_class_spec(self) -> _CallableClassSpec:
- """Return the callable class spec for this UDF.
- Used for deduplication when the same UDF appears multiple times
- in an expression tree.
- """
- return self._callable_class_spec
- def init(self) -> None:
- """Initialize the UDF instance. Called at actor startup via init_fn.
- This ensures the callable class is instantiated before any blocks
- are processed, matching the behavior of map_batches callable classes.
- """
- if self._instance is None:
- self._instance = self._cls(*self._ctor_args, **self._ctor_kwargs)
- def __call__(self, *args: Any, **kwargs: Any) -> Any:
- """Call the UDF instance.
- Args:
- *args: Evaluated expression arguments (PyArrow arrays, etc.)
- **kwargs: Evaluated expression keyword arguments
- Returns:
- The result of calling the UDF instance
- Raises:
- RuntimeError: If init() was not called before __call__
- """
- if self._instance is None:
- raise RuntimeError(
- f"_CallableClassUDF '{self._cls.__name__}' was not initialized. "
- f"init() must be called before __call__. This typically happens "
- f"via init_fn at actor startup."
- )
- from ray.data.util.expression_utils import _call_udf_instance_with_async_bridge
- # Call instance directly, handling async if needed
- return _call_udf_instance_with_async_bridge(self._instance, *args, **kwargs)
- @DeveloperAPI(stability="alpha")
- @dataclass(frozen=True, eq=False, repr=False)
- class UDFExpr(Expr):
- """Expression that represents a user-defined function call.
- This expression type wraps a UDF with schema inference capabilities,
- allowing UDFs to be used seamlessly within the expression system.
- UDFs operate on batches of data, where each column argument is passed
- as a PyArrow Array containing multiple values from that column across the batch.
- Args:
- fn: The user-defined function to call. For callable classes, this is an
- _CallableClassUDF instance that handles lazy instantiation internally.
- args: List of argument expressions (positional arguments)
- kwargs: Dictionary of keyword argument expressions
- Example:
- >>> from ray.data.expressions import col, udf
- >>> import pyarrow as pa
- >>> import pyarrow.compute as pc
- >>> from ray.data.datatype import DataType
- >>>
- >>> @udf(return_dtype=DataType.int32())
- ... def add_one(x: pa.Array) -> pa.Array:
- ... return pc.add(x, 1)
- >>>
- >>> # Use in expressions
- >>> expr = add_one(col("value"))
- >>> # Callable class example
- >>> @udf(return_dtype=DataType.int32())
- ... class AddOffset:
- ... def __init__(self, offset=1):
- ... self.offset = offset
- ... def __call__(self, x: pa.Array) -> pa.Array:
- ... return pc.add(x, self.offset)
- >>>
- >>> # Use callable class
- >>> add_five = AddOffset(5)
- >>> expr = add_five(col("value"))
- """
- fn: Callable[..., BatchColumn] # Can be regular function OR _CallableClassUDF
- args: List[Expr]
- kwargs: Dict[str, Expr]
- @property
- def callable_class_spec(self) -> Optional[_CallableClassSpec]:
- """Return callable_class_spec if fn is an _CallableClassUDF, else None.
- This property maintains backward compatibility with code that checks
- for callable_class_spec.
- """
- if isinstance(self.fn, _CallableClassUDF):
- return self.fn.callable_class_spec
- return None
- def structurally_equals(self, other: Any) -> bool:
- if not isinstance(other, UDFExpr):
- return False
- # For callable class UDFs (_CallableClassUDF), compare the callable_class_spec.
- # For regular function UDFs, compare fn directly.
- if isinstance(self.fn, _CallableClassUDF):
- if not isinstance(other.fn, _CallableClassUDF):
- return False
- if self.fn.callable_class_spec != other.fn.callable_class_spec:
- return False
- else:
- if self.fn != other.fn:
- return False
- return (
- len(self.args) == len(other.args)
- and all(a.structurally_equals(b) for a, b in zip(self.args, other.args))
- and self.kwargs.keys() == other.kwargs.keys()
- and all(
- self.kwargs[k].structurally_equals(other.kwargs[k])
- for k in self.kwargs.keys()
- )
- )
- def _create_udf_callable(
- fn: Callable[..., BatchColumn],
- return_dtype: DataType,
- ) -> Callable[..., UDFExpr]:
- """Create a callable that generates UDFExpr when called with expressions.
- Args:
- fn: The user-defined function to wrap. Can be a regular function
- or an _CallableClassUDF instance (for callable classes).
- return_dtype: The return data type of the UDF
- Returns:
- A callable that creates UDFExpr instances when called with expressions
- """
- def udf_callable(*args, **kwargs) -> UDFExpr:
- # Convert arguments to expressions if they aren't already
- expr_args = []
- for arg in args:
- if isinstance(arg, Expr):
- expr_args.append(arg)
- else:
- expr_args.append(LiteralExpr(arg))
- expr_kwargs = {}
- for k, v in kwargs.items():
- if isinstance(v, Expr):
- expr_kwargs[k] = v
- else:
- expr_kwargs[k] = LiteralExpr(v)
- return UDFExpr(
- fn=fn,
- args=expr_args,
- kwargs=expr_kwargs,
- data_type=return_dtype,
- )
- # Preserve original function metadata
- functools.update_wrapper(udf_callable, fn)
- # Store the original function for access if needed
- udf_callable._original_fn = fn
- return udf_callable
- @PublicAPI(stability="alpha")
- def udf(return_dtype: DataType) -> Callable[..., UDFExpr]:
- """
- Decorator to convert a UDF into an expression-compatible function.
- This decorator allows UDFs to be used seamlessly within the expression system,
- enabling schema inference and integration with other expressions.
- IMPORTANT: UDFs operate on batches of data, not individual rows. When your UDF
- is called, each column argument will be passed as a PyArrow Array containing
- multiple values from that column across the batch. Under the hood, when working
- with multiple columns, they get translated to PyArrow arrays (one array per column).
- Args:
- return_dtype: The data type of the return value of the UDF
- Returns:
- A callable that creates UDFExpr instances when called with expressions
- Example:
- >>> from ray.data.expressions import col, udf
- >>> import pyarrow as pa
- >>> import pyarrow.compute as pc
- >>> import ray
- >>>
- >>> # UDF that operates on a batch of values (PyArrow Array)
- >>> @udf(return_dtype=DataType.int32())
- ... def add_one(x: pa.Array) -> pa.Array:
- ... return pc.add(x, 1) # Vectorized operation on the entire Array
- >>>
- >>> # UDF that combines multiple columns (each as a PyArrow Array)
- >>> @udf(return_dtype=DataType.string())
- ... def format_name(first: pa.Array, last: pa.Array) -> pa.Array:
- ... return pc.binary_join_element_wise(first, last, " ") # Vectorized string concatenation
- >>>
- >>> # Callable class UDF
- >>> @udf(return_dtype=DataType.int32())
- ... class AddOffset:
- ... def __init__(self, offset=1):
- ... self.offset = offset
- ... def __call__(self, x: pa.Array) -> pa.Array:
- ... return pc.add(x, self.offset)
- >>>
- >>> # Use in dataset operations
- >>> ds = ray.data.from_items([
- ... {"value": 5, "first": "John", "last": "Doe"},
- ... {"value": 10, "first": "Jane", "last": "Smith"}
- ... ])
- >>>
- >>> # Single column transformation (operates on batches)
- >>> ds_incremented = ds.with_column("value_plus_one", add_one(col("value")))
- >>>
- >>> # Multi-column transformation (each column becomes a PyArrow Array)
- >>> ds_formatted = ds.with_column("full_name", format_name(col("first"), col("last")))
- >>>
- >>> # Callable class usage
- >>> add_five = AddOffset(5)
- >>> ds_with_offset = ds.with_column("value_plus_five", add_five(col("value")))
- >>>
- >>> # Can also be used in complex expressions
- >>> ds_complex = ds.with_column("doubled_plus_one", add_one(col("value")) * 2)
- """
- def decorator(
- func_or_class: Union[Callable[..., BatchColumn], Type[T]]
- ) -> Decorated:
- # Check if this is a callable class (has __call__ method defined)
- if isinstance(func_or_class, type) and issubclass(func_or_class, Callable):
- # Wrapper that delays instantiation and returns expressions instead of executing.
- # Without this, MyClass(args) would instantiate on the driver and
- # instance(col(...)) would try to execute rather than building an expression.
- class ExpressionAwareCallableClass:
- """Intercepts callable class instantiation to delay until actor execution.
- Allows natural syntax like:
- add_five = AddOffset(5)
- ds.with_column("result", add_five(col("x")))
- When instantiated, creates an _CallableClassUDF that is completely
- self-contained - it handles lazy instantiation and async bridging
- internally. From the planner's perspective, this is just a regular
- callable function.
- """
- def __init__(self, *args, **kwargs):
- # Create an _CallableClassUDF that is self-contained.
- # It lazily instantiates the class on first call (on the worker)
- # and handles async bridging internally.
- self._expr_udf = _CallableClassUDF(
- cls=func_or_class,
- ctor_args=args,
- ctor_kwargs=kwargs,
- return_dtype=return_dtype,
- )
- def __call__(self, *call_args, **call_kwargs):
- # Create UDFExpr with fn=_CallableClassUDF
- # The _CallableClassUDF is self-contained - no external setup needed
- return _create_udf_callable(
- self._expr_udf,
- return_dtype,
- )(*call_args, **call_kwargs)
- # Preserve the original class name and module for better error messages
- ExpressionAwareCallableClass.__name__ = func_or_class.__name__
- ExpressionAwareCallableClass.__qualname__ = func_or_class.__qualname__
- ExpressionAwareCallableClass.__module__ = func_or_class.__module__
- return ExpressionAwareCallableClass
- else:
- # Regular function
- return _create_udf_callable(func_or_class, return_dtype)
- return decorator
- def _create_pyarrow_wrapper(
- fn: Callable[..., BatchColumn]
- ) -> Callable[..., BatchColumn]:
- """Wrap a PyArrow compute function to auto-convert inputs to PyArrow format.
- This wrapper ensures that pandas Series and numpy arrays are converted to
- PyArrow Arrays before being passed to the function, enabling PyArrow compute
- functions to work seamlessly with any block format.
- Args:
- fn: The PyArrow compute function to wrap
- Returns:
- A wrapped function that handles format conversion
- """
- @functools.wraps(fn)
- def arrow_wrapper(*args, **kwargs):
- import numpy as np
- import pandas as pd
- import pyarrow as pa
- def to_arrow(val):
- """Convert a value to PyArrow Array if needed."""
- if isinstance(val, (pa.Array, pa.ChunkedArray)):
- return val, False
- elif isinstance(val, pd.Series):
- return pa.Array.from_pandas(val), True
- elif isinstance(val, np.ndarray):
- return pa.array(val), False
- else:
- return val, False
- # Convert inputs to PyArrow and track pandas flags
- args_results = [to_arrow(arg) for arg in args]
- kwargs_results = {k: to_arrow(v) for k, v in kwargs.items()}
- converted_args = [v[0] for v in args_results]
- converted_kwargs = {k: v[0] for k, v in kwargs_results.items()}
- input_was_pandas = any(v[1] for v in args_results) or any(
- v[1] for v in kwargs_results.values()
- )
- # Call function with converted inputs
- result = fn(*converted_args, **converted_kwargs)
- # Convert result back to pandas if input was pandas
- if input_was_pandas and isinstance(result, (pa.Array, pa.ChunkedArray)):
- result = result.to_pandas()
- return result
- return arrow_wrapper
- @PublicAPI(stability="alpha")
- def pyarrow_udf(return_dtype: DataType) -> Callable[..., UDFExpr]:
- """Decorator for PyArrow compute functions with automatic format conversion.
- This decorator wraps PyArrow compute functions to automatically convert pandas
- Series and numpy arrays to PyArrow Arrays, ensuring the function works seamlessly
- regardless of the underlying block format (pandas, arrow, or items).
- Used internally by namespace methods (list, str, struct) that wrap PyArrow
- compute functions.
- Args:
- return_dtype: The data type of the return value
- Returns:
- A callable that creates UDFExpr instances with automatic conversion
- """
- def decorator(func: Callable[..., BatchColumn]) -> Callable[..., UDFExpr]:
- # Wrap the function with PyArrow conversion logic
- wrapped_fn = _create_pyarrow_wrapper(func)
- # Create UDFExpr callable using the wrapped function
- return _create_udf_callable(wrapped_fn, return_dtype)
- return decorator
- def _create_pyarrow_compute_udf(
- pc_func: Callable[..., pyarrow.Array],
- return_dtype: DataType | None = None,
- ) -> Callable[..., "UDFExpr"]:
- """Create an expression UDF backed by a PyArrow compute function."""
- def wrapper(expr: "Expr", *positional: Any, **kwargs: Any) -> "UDFExpr":
- @pyarrow_udf(return_dtype=return_dtype or expr.data_type)
- def udf(arr: pyarrow.Array) -> pyarrow.Array:
- return pc_func(arr, *positional, **kwargs)
- return udf(expr)
- return wrapper
- @DeveloperAPI(stability="alpha")
- @dataclass(frozen=True, eq=False, repr=False)
- class DownloadExpr(Expr):
- """Expression that represents a download operation."""
- uri_column_name: str
- filesystem: "pyarrow.fs.FileSystem" = None
- data_type: DataType = field(default_factory=lambda: DataType.binary(), init=False)
- def structurally_equals(self, other: Any) -> bool:
- return (
- isinstance(other, DownloadExpr)
- and self.uri_column_name == other.uri_column_name
- )
- @DeveloperAPI(stability="alpha")
- @dataclass(frozen=True, eq=False, repr=False)
- class AliasExpr(Expr):
- """Expression that represents an alias for an expression."""
- expr: Expr
- _name: str
- _is_rename: bool
- @property
- def name(self) -> str:
- """Get the alias name."""
- return self._name
- def alias(self, name: str) -> "Expr":
- # Always unalias before creating new one
- return AliasExpr(
- self.expr.data_type, self.expr, _name=name, _is_rename=self._is_rename
- )
- def _unalias(self) -> "Expr":
- return self.expr
- def structurally_equals(self, other: Any) -> bool:
- return (
- isinstance(other, AliasExpr)
- and self.expr.structurally_equals(other.expr)
- and self.name == other.name
- and self._is_rename == self._is_rename
- )
- @DeveloperAPI(stability="alpha")
- @dataclass(frozen=True, eq=False, repr=False)
- class StarExpr(Expr):
- """Expression that represents all columns from the input.
- This is a special expression used in projections to indicate that
- all existing columns should be preserved at this position in the output.
- It's typically used internally by operations like with_column() and
- rename_columns() to maintain existing columns.
- Example:
- When with_column("new_col", expr) is called, it creates:
- Project(exprs=[star(), expr.alias("new_col")])
- This means: keep all existing columns, then add/overwrite "new_col"
- """
- # TODO: Add UnresolvedExpr. Both StarExpr and UnresolvedExpr won't have a defined data_type.
- data_type: DataType = field(default_factory=lambda: DataType(object), init=False)
- def structurally_equals(self, other: Any) -> bool:
- return isinstance(other, StarExpr)
- @PublicAPI(stability="beta")
- def col(name: str) -> ColumnExpr:
- """
- Reference an existing column by name.
- This is the primary way to reference columns in expressions.
- The returned expression will extract values from the specified
- column when evaluated.
- Args:
- name: The name of the column to reference
- Returns:
- A ColumnExpr that references the specified column
- Example:
- >>> from ray.data.expressions import col
- >>> # Reference columns in an expression
- >>> expr = col("price") * col("quantity")
- >>>
- >>> # Use with Dataset.with_column()
- >>> import ray
- >>> ds = ray.data.from_items([{"price": 10, "quantity": 2}])
- >>> ds = ds.with_column("total", col("price") * col("quantity"))
- """
- return ColumnExpr(name)
- @PublicAPI(stability="beta")
- def lit(value: Any) -> LiteralExpr:
- """
- Create a literal expression from a constant value.
- This creates an expression that represents a constant scalar value.
- The value will be broadcast to all rows when the expression is evaluated.
- Args:
- value: The constant value to represent. Can be any Python object
- (int, float, str, bool, etc.)
- Returns:
- A LiteralExpr containing the specified value
- Example:
- >>> from ray.data.expressions import col, lit
- >>> # Create literals of different types
- >>> five = lit(5)
- >>> pi = lit(3.14159)
- >>> name = lit("Alice")
- >>> flag = lit(True)
- >>>
- >>> # Use in expressions
- >>> expr = col("age") + lit(1) # Add 1 to age column
- >>>
- >>> # Use with Dataset.with_column()
- >>> import ray
- >>> ds = ray.data.from_items([{"age": 25}, {"age": 30}])
- >>> ds = ds.with_column("age_plus_one", col("age") + lit(1))
- """
- return LiteralExpr(value)
- # TODO remove
- @DeveloperAPI(stability="alpha")
- def star() -> StarExpr:
- """
- References all input columns from the input.
- This is a special expression used in projections to preserve all
- existing columns. It's typically used with operations that want to
- add or modify columns while keeping the rest.
- Returns:
- A StarExpr that represents all input columns.
- """
- return StarExpr()
- @PublicAPI(stability="alpha")
- def download(
- uri_column_name: str,
- *,
- filesystem: Optional["pyarrow.fs.FileSystem"] = None,
- ) -> DownloadExpr:
- """
- Create a download expression that downloads content from URIs.
- This creates an expression that will download bytes from URIs stored in
- a specified column. When evaluated, it will fetch the content from each URI
- and return the downloaded bytes.
- Args:
- uri_column_name: The name of the column containing URIs to download from
- filesystem: PyArrow filesystem to use for reading remote files.
- If None, the filesystem is auto-detected from the path scheme.
- Returns:
- A DownloadExpr that will download content from the specified URI column
- Example:
- >>> from ray.data.expressions import download
- >>> import ray
- >>> # Create dataset with URIs
- >>> ds = ray.data.from_items([
- ... {"uri": "s3://bucket/file1.jpg", "id": "1"},
- ... {"uri": "s3://bucket/file2.jpg", "id": "2"}
- ... ])
- >>> # Add downloaded bytes column
- >>> ds_with_bytes = ds.with_column("bytes", download("uri"))
- """
- return DownloadExpr(uri_column_name=uri_column_name, filesystem=filesystem)
- # ──────────────────────────────────────
- # Public API for evaluation
- # ──────────────────────────────────────
- # Note: Implementation details are in _expression_evaluator.py
- # Re-export eval_expr for public use
- __all__ = [
- "Operation",
- "Expr",
- "ColumnExpr",
- "LiteralExpr",
- "BinaryExpr",
- "UnaryExpr",
- "UDFExpr",
- "DownloadExpr",
- "AliasExpr",
- "StarExpr",
- "pyarrow_udf",
- "udf",
- "col",
- "lit",
- "download",
- "star",
- "_ArrayNamespace",
- "_ListNamespace",
- "_StringNamespace",
- "_StructNamespace",
- "_DatetimeNamespace",
- ]
- def __getattr__(name: str):
- """Lazy import of namespace classes to avoid circular imports."""
- if name == "_ArrayNamespace":
- from ray.data.namespace_expressions.arr_namespace import _ArrayNamespace
- return _ArrayNamespace
- elif name == "_ListNamespace":
- from ray.data.namespace_expressions.list_namespace import _ListNamespace
- return _ListNamespace
- elif name == "_StringNamespace":
- from ray.data.namespace_expressions.string_namespace import _StringNamespace
- return _StringNamespace
- elif name == "_StructNamespace":
- from ray.data.namespace_expressions.struct_namespace import _StructNamespace
- return _StructNamespace
- elif name == "_DatetimeNamespace":
- from ray.data.namespace_expressions.dt_namespace import _DatetimeNamespace
- return _DatetimeNamespace
- raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
|