read_api.py 189 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095409640974098409941004101410241034104410541064107410841094110411141124113411441154116411741184119412041214122412341244125412641274128412941304131413241334134413541364137413841394140414141424143414441454146414741484149415041514152415341544155415641574158415941604161416241634164416541664167416841694170417141724173417441754176417741784179418041814182418341844185418641874188418941904191419241934194419541964197419841994200420142024203420442054206420742084209421042114212421342144215421642174218421942204221422242234224422542264227422842294230423142324233423442354236423742384239424042414242424342444245424642474248424942504251425242534254425542564257425842594260426142624263426442654266426742684269427042714272427342744275427642774278427942804281428242834284428542864287428842894290429142924293429442954296429742984299430043014302430343044305430643074308430943104311431243134314431543164317431843194320432143224323432443254326432743284329433043314332433343344335433643374338433943404341434243434344434543464347434843494350435143524353435443554356435743584359436043614362436343644365436643674368436943704371437243734374437543764377437843794380438143824383438443854386438743884389439043914392439343944395439643974398439944004401440244034404440544064407440844094410441144124413441444154416441744184419442044214422442344244425442644274428442944304431443244334434443544364437443844394440444144424443444444454446444744484449445044514452445344544455445644574458445944604461446244634464446544664467446844694470447144724473447444754476447744784479448044814482448344844485448644874488448944904491449244934494449544964497449844994500450145024503450445054506450745084509451045114512451345144515451645174518451945204521452245234524452545264527452845294530453145324533
  1. import collections
  2. import logging
  3. import warnings
  4. from typing import (
  5. TYPE_CHECKING,
  6. Any,
  7. Callable,
  8. Dict,
  9. List,
  10. Literal,
  11. Optional,
  12. Set,
  13. Tuple,
  14. TypeVar,
  15. Union,
  16. )
  17. import numpy as np
  18. from packaging.version import parse as parse_version
  19. import ray
  20. from ray._private.auto_init_hook import wrap_auto_init
  21. from ray.data._internal.compute import TaskPoolStrategy
  22. from ray.data._internal.datasource.audio_datasource import AudioDatasource
  23. from ray.data._internal.datasource.avro_datasource import AvroDatasource
  24. from ray.data._internal.datasource.bigquery_datasource import BigQueryDatasource
  25. from ray.data._internal.datasource.binary_datasource import BinaryDatasource
  26. from ray.data._internal.datasource.clickhouse_datasource import ClickHouseDatasource
  27. from ray.data._internal.datasource.csv_datasource import CSVDatasource
  28. from ray.data._internal.datasource.databricks_credentials import (
  29. DatabricksCredentialProvider,
  30. )
  31. from ray.data._internal.datasource.delta_sharing_datasource import (
  32. DeltaSharingDatasource,
  33. )
  34. from ray.data._internal.datasource.hudi_datasource import HudiDatasource
  35. from ray.data._internal.datasource.image_datasource import (
  36. ImageDatasource,
  37. ImageFileMetadataProvider,
  38. )
  39. from ray.data._internal.datasource.json_datasource import (
  40. JSON_FILE_EXTENSIONS,
  41. ArrowJSONDatasource,
  42. PandasJSONDatasource,
  43. )
  44. from ray.data._internal.datasource.kafka_datasource import (
  45. KafkaAuthConfig,
  46. KafkaDatasource,
  47. )
  48. from ray.data._internal.datasource.lance_datasource import LanceDatasource
  49. from ray.data._internal.datasource.mcap_datasource import MCAPDatasource, TimeRange
  50. from ray.data._internal.datasource.mongo_datasource import MongoDatasource
  51. from ray.data._internal.datasource.numpy_datasource import NumpyDatasource
  52. from ray.data._internal.datasource.parquet_datasource import ParquetDatasource
  53. from ray.data._internal.datasource.range_datasource import RangeDatasource
  54. from ray.data._internal.datasource.sql_datasource import SQLDatasource
  55. from ray.data._internal.datasource.text_datasource import TextDatasource
  56. from ray.data._internal.datasource.tfrecords_datasource import TFRecordDatasource
  57. from ray.data._internal.datasource.torch_datasource import TorchDatasource
  58. from ray.data._internal.datasource.uc_datasource import UnityCatalogConnector
  59. from ray.data._internal.datasource.video_datasource import VideoDatasource
  60. from ray.data._internal.datasource.webdataset_datasource import WebDatasetDatasource
  61. from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
  62. from ray.data._internal.logical.interfaces import LogicalPlan
  63. from ray.data._internal.logical.operators import (
  64. FromArrow,
  65. FromBlocks,
  66. FromItems,
  67. FromNumpy,
  68. FromPandas,
  69. Read,
  70. )
  71. from ray.data._internal.plan import ExecutionPlan
  72. from ray.data._internal.remote_fn import cached_remote_fn
  73. from ray.data._internal.stats import DatasetStats
  74. from ray.data._internal.tensor_extensions.utils import _create_possibly_ragged_ndarray
  75. from ray.data._internal.util import (
  76. _autodetect_parallelism,
  77. get_table_block_metadata_schema,
  78. merge_resources_to_ray_remote_args,
  79. ndarray_to_block,
  80. pandas_df_to_arrow_block,
  81. )
  82. from ray.data._internal.utils.arrow_utils import get_pyarrow_version
  83. from ray.data.block import (
  84. Block,
  85. BlockExecStats,
  86. BlockMetadataWithSchema,
  87. )
  88. from ray.data.context import DataContext
  89. from ray.data.dataset import Dataset, MaterializedDataset
  90. from ray.data.datasource import (
  91. Connection,
  92. Datasource,
  93. PathPartitionFilter,
  94. )
  95. from ray.data.datasource.datasource import Reader
  96. from ray.data.datasource.file_based_datasource import (
  97. FileShuffleConfig,
  98. _validate_shuffle_arg,
  99. )
  100. from ray.data.datasource.file_meta_provider import (
  101. DefaultFileMetadataProvider,
  102. )
  103. from ray.data.datasource.partitioning import Partitioning
  104. from ray.types import ObjectRef
  105. from ray.util.annotations import DeveloperAPI, PublicAPI
  106. from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
  107. if TYPE_CHECKING:
  108. import daft
  109. import dask
  110. import datasets
  111. import mars
  112. import modin
  113. import pandas
  114. import pyarrow
  115. import pymongoarrow.api
  116. import pyspark
  117. import tensorflow as tf
  118. import torch
  119. from pyiceberg.expressions import BooleanExpression
  120. from tensorflow_metadata.proto.v0 import schema_pb2
  121. from ray.data._internal.datasource.tfrecords_datasource import TFXReadOptions
  122. T = TypeVar("T")
  123. logger = logging.getLogger(__name__)
  124. @DeveloperAPI
  125. def from_blocks(blocks: List[Block]):
  126. """Create a :class:`~ray.data.Dataset` from a list of blocks.
  127. This method is primarily used for testing. Unlike other methods like
  128. :func:`~ray.data.from_pandas` and :func:`~ray.data.from_arrow`, this method
  129. gaurentees that it won't modify the number of blocks.
  130. Args:
  131. blocks: List of blocks to create the dataset from.
  132. Returns:
  133. A :class:`~ray.data.Dataset` holding the blocks.
  134. """
  135. block_refs = [ray.put(block) for block in blocks]
  136. meta_with_schema = [BlockMetadataWithSchema.from_block(block) for block in blocks]
  137. from_blocks_op = FromBlocks(block_refs, meta_with_schema)
  138. execution_plan = ExecutionPlan(
  139. DatasetStats(metadata={"FromBlocks": meta_with_schema}, parent=None),
  140. DataContext.get_current().copy(),
  141. )
  142. logical_plan = LogicalPlan(from_blocks_op, execution_plan._context)
  143. return MaterializedDataset(
  144. execution_plan,
  145. logical_plan,
  146. )
  147. @PublicAPI
  148. def from_items(
  149. items: List[Any],
  150. *,
  151. parallelism: int = -1,
  152. override_num_blocks: Optional[int] = None,
  153. ) -> MaterializedDataset:
  154. """Create a :class:`~ray.data.Dataset` from a list of local Python objects.
  155. Use this method to create small datasets from data that fits in memory. The column
  156. name defaults to "item".
  157. Examples:
  158. >>> import ray
  159. >>> ds = ray.data.from_items([1, 2, 3, 4, 5])
  160. >>> ds # doctest: +ELLIPSIS
  161. shape: (5, 1)
  162. ╭───────╮
  163. │ item │
  164. │ --- │
  165. │ int64 │
  166. ╞═══════╡
  167. │ 1 │
  168. │ 2 │
  169. │ 3 │
  170. │ 4 │
  171. │ 5 │
  172. ╰───────╯
  173. (Showing 5 of 5 rows)
  174. >>> ds.schema()
  175. Column Type
  176. ------ ----
  177. item int64
  178. Args:
  179. items: List of local Python objects.
  180. parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
  181. override_num_blocks: Override the number of output blocks from all read tasks.
  182. By default, the number of output blocks is dynamically decided based on
  183. input data size and available resources. You shouldn't manually set this
  184. value in most cases.
  185. Returns:
  186. A :class:`~ray.data.Dataset` holding the items.
  187. """
  188. import builtins
  189. parallelism = _get_num_output_blocks(parallelism, override_num_blocks)
  190. if parallelism == 0:
  191. raise ValueError(f"parallelism must be -1 or > 0, got: {parallelism}")
  192. detected_parallelism, _, _ = _autodetect_parallelism(
  193. parallelism,
  194. ray.util.get_current_placement_group(),
  195. DataContext.get_current(),
  196. )
  197. # Truncate parallelism to number of items to avoid empty blocks.
  198. detected_parallelism = min(len(items), detected_parallelism)
  199. if detected_parallelism > 0:
  200. block_size, remainder = divmod(len(items), detected_parallelism)
  201. else:
  202. block_size, remainder = 0, 0
  203. # NOTE: We need to explicitly use the builtins range since we override range below,
  204. # with the definition of ray.data.range.
  205. blocks: List[ObjectRef[Block]] = []
  206. meta_with_schema: List[BlockMetadataWithSchema] = []
  207. for i in builtins.range(detected_parallelism):
  208. stats = BlockExecStats.builder()
  209. builder = DelegatingBlockBuilder()
  210. # Evenly distribute remainder across block slices while preserving record order.
  211. block_start = i * block_size + min(i, remainder)
  212. block_end = (i + 1) * block_size + min(i + 1, remainder)
  213. for j in builtins.range(block_start, block_end):
  214. item = items[j]
  215. if not isinstance(item, collections.abc.Mapping):
  216. item = {"item": item}
  217. builder.add(item)
  218. block = builder.build()
  219. blocks.append(ray.put(block))
  220. meta_with_schema.append(
  221. BlockMetadataWithSchema.from_block(block, stats=stats.build())
  222. )
  223. from_items_op = FromItems(blocks, meta_with_schema)
  224. execution_plan = ExecutionPlan(
  225. DatasetStats(metadata={"FromItems": meta_with_schema}, parent=None),
  226. DataContext.get_current().copy(),
  227. )
  228. logical_plan = LogicalPlan(from_items_op, execution_plan._context)
  229. return MaterializedDataset(
  230. execution_plan,
  231. logical_plan,
  232. )
  233. @PublicAPI
  234. def range(
  235. n: int,
  236. *,
  237. parallelism: int = -1,
  238. concurrency: Optional[int] = None,
  239. override_num_blocks: Optional[int] = None,
  240. ) -> Dataset:
  241. """Creates a :class:`~ray.data.Dataset` from a range of integers [0..n).
  242. This function allows for easy creation of synthetic datasets for testing or
  243. benchmarking :ref:`Ray Data <data>`. The column name defaults to "id".
  244. Examples:
  245. >>> import ray
  246. >>> ds = ray.data.range(10000)
  247. >>> ds # doctest: +ELLIPSIS
  248. shape: (10000, 1)
  249. ╭───────╮
  250. │ id │
  251. │ --- │
  252. │ int64 │
  253. ╰───────╯
  254. (Dataset isn't materialized)
  255. >>> ds.map(lambda row: {"id": row["id"] * 2}).take(4)
  256. [{'id': 0}, {'id': 2}, {'id': 4}, {'id': 6}]
  257. Args:
  258. n: The upper bound of the range of integers.
  259. parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
  260. concurrency: The maximum number of Ray tasks to run concurrently. Set this
  261. to control number of tasks to run concurrently. This doesn't change the
  262. total number of tasks run or the total number of output blocks. By default,
  263. concurrency is dynamically decided based on the available resources.
  264. override_num_blocks: Override the number of output blocks from all read tasks.
  265. By default, the number of output blocks is dynamically decided based on
  266. input data size and available resources. You shouldn't manually set this
  267. value in most cases.
  268. Returns:
  269. A :class:`~ray.data.Dataset` producing the integers from the range 0 to n.
  270. .. seealso::
  271. :meth:`~ray.data.range_tensor`
  272. Call this method for creating synthetic datasets of tensor data.
  273. """
  274. datasource = RangeDatasource(n=n, block_format="arrow", column_name="id")
  275. return read_datasource(
  276. datasource,
  277. parallelism=parallelism,
  278. concurrency=concurrency,
  279. override_num_blocks=override_num_blocks,
  280. )
  281. @PublicAPI
  282. def range_tensor(
  283. n: int,
  284. *,
  285. shape: Tuple = (1,),
  286. parallelism: int = -1,
  287. concurrency: Optional[int] = None,
  288. override_num_blocks: Optional[int] = None,
  289. ) -> Dataset:
  290. """Creates a :class:`~ray.data.Dataset` tensors of the provided shape from range
  291. [0...n].
  292. This function allows for easy creation of synthetic tensor datasets for testing or
  293. benchmarking :ref:`Ray Data <data>`. The column name defaults to "data".
  294. Examples:
  295. >>> import ray
  296. >>> ds = ray.data.range_tensor(1000, shape=(2, 2))
  297. >>> ds # doctest: +ELLIPSIS
  298. shape: (1000, 1)
  299. ╭──────────────────────────────────────────╮
  300. │ data │
  301. │ --- │
  302. │ ArrowTensorTypeV2(shape=(2, 2), dtype=i… │
  303. ╰──────────────────────────────────────────╯
  304. (Dataset isn't materialized)
  305. >>> ds.map_batches(lambda row: {"data": row["data"] * 2}).take(2)
  306. [{'data': array([[0, 0],
  307. [0, 0]])}, {'data': array([[2, 2],
  308. [2, 2]])}]
  309. Args:
  310. n: The upper bound of the range of tensor records.
  311. shape: The shape of each tensor in the dataset.
  312. parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
  313. concurrency: The maximum number of Ray tasks to run concurrently. Set this
  314. to control number of tasks to run concurrently. This doesn't change the
  315. total number of tasks run or the total number of output blocks. By default,
  316. concurrency is dynamically decided based on the available resources.
  317. override_num_blocks: Override the number of output blocks from all read tasks.
  318. By default, the number of output blocks is dynamically decided based on
  319. input data size and available resources. You shouldn't manually set this
  320. value in most cases.
  321. Returns:
  322. A :class:`~ray.data.Dataset` producing the tensor data from range 0 to n.
  323. .. seealso::
  324. :meth:`~ray.data.range`
  325. Call this method to create synthetic datasets of integer data.
  326. """
  327. datasource = RangeDatasource(
  328. n=n, block_format="tensor", column_name="data", tensor_shape=tuple(shape)
  329. )
  330. return read_datasource(
  331. datasource,
  332. parallelism=parallelism,
  333. concurrency=concurrency,
  334. override_num_blocks=override_num_blocks,
  335. )
  336. @PublicAPI
  337. @wrap_auto_init
  338. def read_datasource(
  339. datasource: Datasource,
  340. *,
  341. parallelism: int = -1,
  342. num_cpus: Optional[float] = None,
  343. num_gpus: Optional[float] = None,
  344. memory: Optional[float] = None,
  345. ray_remote_args: Dict[str, Any] = None,
  346. concurrency: Optional[int] = None,
  347. override_num_blocks: Optional[int] = None,
  348. **read_args,
  349. ) -> Dataset:
  350. """Read a stream from a custom :class:`~ray.data.Datasource`.
  351. Args:
  352. datasource: The :class:`~ray.data.Datasource` to read data from.
  353. parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
  354. num_cpus: The number of CPUs to reserve for each parallel read worker.
  355. num_gpus: The number of GPUs to reserve for each parallel read worker. For
  356. example, specify `num_gpus=1` to request 1 GPU for each parallel read
  357. worker.
  358. memory: The heap memory in bytes to reserve for each parallel read worker.
  359. ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
  360. concurrency: The maximum number of Ray tasks to run concurrently. Set this
  361. to control number of tasks to run concurrently. This doesn't change the
  362. total number of tasks run or the total number of output blocks. By default,
  363. concurrency is dynamically decided based on the available resources.
  364. override_num_blocks: Override the number of output blocks from all read tasks.
  365. By default, the number of output blocks is dynamically decided based on
  366. input data size and available resources. You shouldn't manually set this
  367. value in most cases.
  368. **read_args: Additional kwargs to pass to the :class:`~ray.data.Datasource`
  369. implementation.
  370. Returns:
  371. :class:`~ray.data.Dataset` that reads data from the :class:`~ray.data.Datasource`.
  372. """ # noqa: E501
  373. parallelism = _get_num_output_blocks(parallelism, override_num_blocks)
  374. ctx = DataContext.get_current()
  375. if ray_remote_args is None:
  376. ray_remote_args = {}
  377. if not datasource.supports_distributed_reads:
  378. ray_remote_args["scheduling_strategy"] = NodeAffinitySchedulingStrategy(
  379. ray.get_runtime_context().get_node_id(),
  380. soft=False,
  381. )
  382. if "scheduling_strategy" not in ray_remote_args:
  383. ray_remote_args["scheduling_strategy"] = ctx.scheduling_strategy
  384. ray_remote_args = merge_resources_to_ray_remote_args(
  385. num_cpus,
  386. num_gpus,
  387. memory,
  388. ray_remote_args,
  389. )
  390. datasource_or_legacy_reader = _get_datasource_or_legacy_reader(
  391. datasource,
  392. ctx,
  393. read_args,
  394. )
  395. cur_pg = ray.util.get_current_placement_group()
  396. requested_parallelism, _, _ = _autodetect_parallelism(
  397. parallelism,
  398. ctx.target_max_block_size,
  399. DataContext.get_current(),
  400. datasource_or_legacy_reader,
  401. placement_group=cur_pg,
  402. )
  403. # TODO(hchen/chengsu): Remove the duplicated get_read_tasks call here after
  404. # removing LazyBlockList code path.
  405. read_tasks = datasource_or_legacy_reader.get_read_tasks(requested_parallelism)
  406. stats = DatasetStats(
  407. metadata={"Read": [read_task.metadata for read_task in read_tasks]},
  408. parent=None,
  409. )
  410. read_op = Read(
  411. datasource,
  412. datasource_or_legacy_reader,
  413. parallelism=parallelism,
  414. num_outputs=len(read_tasks) if read_tasks else 0,
  415. ray_remote_args=ray_remote_args,
  416. compute=TaskPoolStrategy(concurrency),
  417. )
  418. execution_plan = ExecutionPlan(
  419. stats,
  420. DataContext.get_current().copy(),
  421. )
  422. logical_plan = LogicalPlan(read_op, execution_plan._context)
  423. return Dataset(
  424. plan=execution_plan,
  425. logical_plan=logical_plan,
  426. )
  427. @PublicAPI(stability="alpha")
  428. def read_audio(
  429. paths: Union[str, List[str]],
  430. *,
  431. filesystem: Optional["pyarrow.fs.FileSystem"] = None,
  432. arrow_open_stream_args: Optional[Dict[str, Any]] = None,
  433. partition_filter: Optional[PathPartitionFilter] = None,
  434. partitioning: Optional[Partitioning] = None,
  435. include_paths: bool = False,
  436. ignore_missing_paths: bool = False,
  437. file_extensions: Optional[List[str]] = AudioDatasource._FILE_EXTENSIONS,
  438. shuffle: Union[Literal["files"], None] = None,
  439. concurrency: Optional[int] = None,
  440. override_num_blocks: Optional[int] = None,
  441. num_cpus: Optional[float] = None,
  442. num_gpus: Optional[float] = None,
  443. memory: Optional[float] = None,
  444. ray_remote_args: Optional[Dict[str, Any]] = None,
  445. ):
  446. """Creates a :class:`~ray.data.Dataset` from audio files.
  447. The column names default to "amplitude" and "sample_rate".
  448. Examples:
  449. >>> import ray
  450. >>> path = "s3://anonymous@air-example-data-2/6G-audio-data-LibriSpeech-train-clean-100-flac/train-clean-100/5022/29411/5022-29411-0000.flac"
  451. >>> ds = ray.data.read_audio(path)
  452. >>> ds.schema()
  453. Column Type
  454. ------ ----
  455. amplitude ArrowTensorTypeV2(shape=(1, 191760), dtype=float)
  456. sample_rate int64
  457. Args:
  458. paths: A single file or directory, or a list of file or directory paths.
  459. A list of paths can contain both files and directories.
  460. filesystem: The pyarrow filesystem
  461. implementation to read from. These filesystems are specified in the
  462. `pyarrow docs <https://arrow.apache.org/docs/python/api/\
  463. filesystems.html#filesystem-implementations>`_. Specify this parameter if
  464. you need to provide specific configurations to the filesystem. By default,
  465. the filesystem is automatically selected based on the scheme of the paths.
  466. For example, if the path begins with ``s3://``, the `S3FileSystem` is used.
  467. arrow_open_stream_args: kwargs passed to
  468. `pyarrow.fs.FileSystem.open_input_file <https://arrow.apache.org/docs/\
  469. python/generated/pyarrow.fs.FileSystem.html\
  470. #pyarrow.fs.FileSystem.open_input_file>`_.
  471. when opening input files to read.
  472. partition_filter: A
  473. :class:`~ray.data.datasource.partitioning.PathPartitionFilter`. Use
  474. with a custom callback to read only selected partitions of a dataset.
  475. partitioning: A :class:`~ray.data.datasource.partitioning.Partitioning` object
  476. that describes how paths are organized. Defaults to ``None``.
  477. include_paths: If ``True``, include the path to each image. File paths are
  478. stored in the ``'path'`` column.
  479. ignore_missing_paths: If True, ignores any file/directory paths in ``paths``
  480. that are not found. Defaults to False.
  481. file_extensions: A list of file extensions to filter files by.
  482. shuffle: If ``"files"``, randomly shuffle input files order before read.
  483. Defaults to ``None``.
  484. concurrency: The maximum number of Ray tasks to run concurrently. Set this
  485. to control number of tasks to run concurrently. This doesn't change the
  486. total number of tasks run or the total number of output blocks. By default,
  487. concurrency is dynamically decided based on the available resources.
  488. override_num_blocks: Override the number of output blocks from all read tasks.
  489. By default, the number of output blocks is dynamically decided based on
  490. input data size and available resources. You shouldn't manually set this
  491. value in most cases.
  492. num_cpus: The number of CPUs to reserve for each parallel read worker.
  493. num_gpus: The number of GPUs to reserve for each parallel read worker. For
  494. example, specify `num_gpus=1` to request 1 GPU for each parallel read
  495. worker.
  496. memory: The heap memory in bytes to reserve for each parallel read worker.
  497. ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks.
  498. Returns:
  499. A :class:`~ray.data.Dataset` containing audio amplitudes and associated
  500. metadata.
  501. """ # noqa: E501
  502. datasource = AudioDatasource(
  503. paths,
  504. filesystem=filesystem,
  505. open_stream_args=arrow_open_stream_args,
  506. partition_filter=partition_filter,
  507. partitioning=partitioning,
  508. ignore_missing_paths=ignore_missing_paths,
  509. shuffle=shuffle,
  510. include_paths=include_paths,
  511. file_extensions=file_extensions,
  512. )
  513. return read_datasource(
  514. datasource,
  515. ray_remote_args=ray_remote_args,
  516. num_cpus=num_cpus,
  517. num_gpus=num_gpus,
  518. memory=memory,
  519. concurrency=concurrency,
  520. override_num_blocks=override_num_blocks,
  521. )
  522. @PublicAPI(stability="alpha")
  523. def read_videos(
  524. paths: Union[str, List[str]],
  525. *,
  526. filesystem: Optional["pyarrow.fs.FileSystem"] = None,
  527. arrow_open_stream_args: Optional[Dict[str, Any]] = None,
  528. partition_filter: Optional[PathPartitionFilter] = None,
  529. partitioning: Optional[Partitioning] = None,
  530. include_paths: bool = False,
  531. include_timestamps: bool = False,
  532. ignore_missing_paths: bool = False,
  533. file_extensions: Optional[List[str]] = VideoDatasource._FILE_EXTENSIONS,
  534. shuffle: Union[Literal["files"], None] = None,
  535. concurrency: Optional[int] = None,
  536. override_num_blocks: Optional[int] = None,
  537. num_cpus: Optional[float] = None,
  538. num_gpus: Optional[float] = None,
  539. memory: Optional[float] = None,
  540. ray_remote_args: Optional[Dict[str, Any]] = None,
  541. ):
  542. """Creates a :class:`~ray.data.Dataset` from video files.
  543. Each row in the resulting dataset represents a video frame. The column names default
  544. to "frame", "frame_index" and "frame_timestamp".
  545. Examples:
  546. >>> import ray
  547. >>> path = "s3://anonymous@ray-example-data/basketball.mp4"
  548. >>> ds = ray.data.read_videos(path)
  549. >>> ds.schema()
  550. Column Type
  551. ------ ----
  552. frame ArrowTensorTypeV2(shape=(720, 1280, 3), dtype=uint8)
  553. frame_index int64
  554. Args:
  555. paths: A single file or directory, or a list of file or directory paths.
  556. A list of paths can contain both files and directories.
  557. filesystem: The pyarrow filesystem
  558. implementation to read from. These filesystems are specified in the
  559. `pyarrow docs <https://arrow.apache.org/docs/python/api/\
  560. filesystems.html#filesystem-implementations>`_. Specify this parameter if
  561. you need to provide specific configurations to the filesystem. By default,
  562. the filesystem is automatically selected based on the scheme of the paths.
  563. For example, if the path begins with ``s3://``, the `S3FileSystem` is used.
  564. arrow_open_stream_args: kwargs passed to
  565. `pyarrow.fs.FileSystem.open_input_file <https://arrow.apache.org/docs/\
  566. python/generated/pyarrow.fs.FileSystem.html\
  567. #pyarrow.fs.FileSystem.open_input_file>`_.
  568. when opening input files to read.
  569. partition_filter: A
  570. :class:`~ray.data.datasource.partitioning.PathPartitionFilter`. Use
  571. with a custom callback to read only selected partitions of a dataset.
  572. partitioning: A :class:`~ray.data.datasource.partitioning.Partitioning` object
  573. that describes how paths are organized. Defaults to ``None``.
  574. include_paths: If ``True``, include the path to each image. File paths are
  575. stored in the ``'path'`` column.
  576. include_timestamps: If ``True``, include the frame timestamps from the video
  577. as a ``'frame_timestamp'`` column.
  578. ignore_missing_paths: If True, ignores any file/directory paths in ``paths``
  579. that are not found. Defaults to False.
  580. file_extensions: A list of file extensions to filter files by.
  581. shuffle: If ``"files"``, randomly shuffle input files order before read.
  582. Defaults to ``None``.
  583. concurrency: The maximum number of Ray tasks to run concurrently. Set this
  584. to control number of tasks to run concurrently. This doesn't change the
  585. total number of tasks run or the total number of output blocks. By default,
  586. concurrency is dynamically decided based on the available resources.
  587. override_num_blocks: Override the number of output blocks from all read tasks.
  588. By default, the number of output blocks is dynamically decided based on
  589. input data size and available resources. You shouldn't manually set this
  590. value in most cases.
  591. num_cpus: The number of CPUs to reserve for each parallel read worker.
  592. num_gpus: The number of GPUs to reserve for each parallel read worker. For
  593. example, specify `num_gpus=1` to request 1 GPU for each parallel read
  594. worker.
  595. memory: The heap memory in bytes to reserve for each parallel read worker.
  596. ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks.
  597. Returns:
  598. A :class:`~ray.data.Dataset` containing video frames from the video files.
  599. """
  600. datasource = VideoDatasource(
  601. paths,
  602. filesystem=filesystem,
  603. open_stream_args=arrow_open_stream_args,
  604. partition_filter=partition_filter,
  605. partitioning=partitioning,
  606. ignore_missing_paths=ignore_missing_paths,
  607. shuffle=shuffle,
  608. include_paths=include_paths,
  609. include_timestamps=include_timestamps,
  610. file_extensions=file_extensions,
  611. )
  612. return read_datasource(
  613. datasource,
  614. ray_remote_args=ray_remote_args,
  615. num_cpus=num_cpus,
  616. num_gpus=num_gpus,
  617. memory=memory,
  618. concurrency=concurrency,
  619. override_num_blocks=override_num_blocks,
  620. )
  621. @PublicAPI(stability="alpha")
  622. def read_mongo(
  623. uri: str,
  624. database: str,
  625. collection: str,
  626. *,
  627. pipeline: Optional[List[Dict]] = None,
  628. schema: Optional["pymongoarrow.api.Schema"] = None,
  629. parallelism: int = -1,
  630. num_cpus: Optional[float] = None,
  631. num_gpus: Optional[float] = None,
  632. memory: Optional[float] = None,
  633. ray_remote_args: Dict[str, Any] = None,
  634. concurrency: Optional[int] = None,
  635. override_num_blocks: Optional[int] = None,
  636. **mongo_args,
  637. ) -> Dataset:
  638. """Create a :class:`~ray.data.Dataset` from a MongoDB database.
  639. The data to read from is specified via the ``uri``, ``database`` and ``collection``
  640. of the MongoDB. The dataset is created from the results of executing
  641. ``pipeline`` against the ``collection``. If ``pipeline`` is None, the entire
  642. ``collection`` is read.
  643. .. tip::
  644. For more details about these MongoDB concepts, see the following:
  645. - URI: https://www.mongodb.com/docs/manual/reference/connection-string/
  646. - Database and Collection: https://www.mongodb.com/docs/manual/core/databases-and-collections/
  647. - Pipeline: https://www.mongodb.com/docs/manual/core/aggregation-pipeline/
  648. To read the MongoDB in parallel, the execution of the pipeline is run on partitions
  649. of the collection, with a Ray read task to handle a partition. Partitions are
  650. created in an attempt to evenly distribute the documents into the specified number
  651. of partitions. The number of partitions is determined by ``parallelism`` which can
  652. be requested from this interface or automatically chosen if unspecified (see the
  653. ``parallelism`` arg below).
  654. Examples:
  655. >>> import ray
  656. >>> from pymongoarrow.api import Schema # doctest: +SKIP
  657. >>> ds = ray.data.read_mongo( # doctest: +SKIP
  658. ... uri="mongodb://username:password@mongodb0.example.com:27017/?authSource=admin", # noqa: E501
  659. ... database="my_db",
  660. ... collection="my_collection",
  661. ... pipeline=[{"$match": {"col2": {"$gte": 0, "$lt": 100}}}, {"$sort": "sort_field"}], # noqa: E501
  662. ... schema=Schema({"col1": pa.string(), "col2": pa.int64()}),
  663. ... override_num_blocks=10,
  664. ... )
  665. Args:
  666. uri: The URI of the source MongoDB where the dataset is
  667. read from. For the URI format, see details in the `MongoDB docs <https:/\
  668. /www.mongodb.com/docs/manual/reference/connection-string/>`_.
  669. database: The name of the database hosted in the MongoDB. This database
  670. must exist otherwise ValueError is raised.
  671. collection: The name of the collection in the database. This collection
  672. must exist otherwise ValueError is raised.
  673. pipeline: A `MongoDB pipeline <https://www.mongodb.com/docs/manual/core\
  674. /aggregation-pipeline/>`_, which is executed on the given collection
  675. with results used to create Dataset. If None, the entire collection will
  676. be read.
  677. schema: The schema used to read the collection. If None, it'll be inferred from
  678. the results of pipeline.
  679. parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
  680. num_cpus: The number of CPUs to reserve for each parallel read worker.
  681. num_gpus: The number of GPUs to reserve for each parallel read worker. For
  682. example, specify `num_gpus=1` to request 1 GPU for each parallel read
  683. worker.
  684. memory: The heap memory in bytes to reserve for each parallel read worker.
  685. ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
  686. concurrency: The maximum number of Ray tasks to run concurrently. Set this
  687. to control number of tasks to run concurrently. This doesn't change the
  688. total number of tasks run or the total number of output blocks. By default,
  689. concurrency is dynamically decided based on the available resources.
  690. override_num_blocks: Override the number of output blocks from all read tasks.
  691. By default, the number of output blocks is dynamically decided based on
  692. input data size and available resources. You shouldn't manually set this
  693. value in most cases.
  694. **mongo_args: kwargs passed to `aggregate_arrow_all() <https://mongo-arrow\
  695. .readthedocs.io/en/latest/api/api.html#pymongoarrow.api\
  696. aggregate_arrow_all>`_ in pymongoarrow in producing
  697. Arrow-formatted results.
  698. Returns:
  699. :class:`~ray.data.Dataset` producing rows from the results of executing the pipeline on the specified MongoDB collection.
  700. Raises:
  701. ValueError: if ``database`` doesn't exist.
  702. ValueError: if ``collection`` doesn't exist.
  703. """
  704. datasource = MongoDatasource(
  705. uri=uri,
  706. database=database,
  707. collection=collection,
  708. pipeline=pipeline,
  709. schema=schema,
  710. **mongo_args,
  711. )
  712. return read_datasource(
  713. datasource,
  714. num_cpus=num_cpus,
  715. num_gpus=num_gpus,
  716. memory=memory,
  717. parallelism=parallelism,
  718. ray_remote_args=ray_remote_args,
  719. concurrency=concurrency,
  720. override_num_blocks=override_num_blocks,
  721. )
  722. @PublicAPI(stability="alpha")
  723. def read_bigquery(
  724. project_id: str,
  725. dataset: Optional[str] = None,
  726. query: Optional[str] = None,
  727. *,
  728. parallelism: int = -1,
  729. num_cpus: Optional[float] = None,
  730. num_gpus: Optional[float] = None,
  731. memory: Optional[float] = None,
  732. ray_remote_args: Dict[str, Any] = None,
  733. concurrency: Optional[int] = None,
  734. override_num_blocks: Optional[int] = None,
  735. ) -> Dataset:
  736. """Create a dataset from BigQuery.
  737. The data to read from is specified via the ``project_id``, ``dataset``
  738. and/or ``query`` parameters. The dataset is created from the results of
  739. executing ``query`` if a query is provided. Otherwise, the entire
  740. ``dataset`` is read.
  741. For more information about BigQuery, see the following concepts:
  742. - Project id: `Creating and Managing Projects <https://cloud.google.com/resource-manager/docs/creating-managing-projects>`_
  743. - Dataset: `Datasets Intro <https://cloud.google.com/bigquery/docs/datasets-intro>`_
  744. - Query: `Query Syntax <https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax>`_
  745. This method uses the BigQuery Storage Read API which reads in parallel,
  746. with a Ray read task to handle each stream. The number of streams is
  747. determined by ``parallelism`` which can be requested from this interface
  748. or automatically chosen if unspecified (see the ``parallelism`` arg below).
  749. .. warning::
  750. The maximum query response size is 10GB.
  751. Examples:
  752. .. testcode::
  753. :skipif: True
  754. import ray
  755. # Users will need to authenticate beforehand (https://cloud.google.com/sdk/gcloud/reference/auth/login)
  756. ds = ray.data.read_bigquery(
  757. project_id="my_project",
  758. query="SELECT * FROM `bigquery-public-data.samples.gsod` LIMIT 1000",
  759. )
  760. Args:
  761. project_id: The name of the associated Google Cloud Project that hosts the dataset to read.
  762. For more information, see `Creating and Managing Projects <https://cloud.google.com/resource-manager/docs/creating-managing-projects>`_.
  763. dataset: The name of the dataset hosted in BigQuery in the format of ``dataset_id.table_id``.
  764. Both the dataset_id and table_id must exist otherwise an exception will be raised.
  765. query: The SQL query to execute. `query` and `dataset` are mutually exclusive.
  766. If `query` is provided, the query result is read as the dataset.
  767. parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
  768. num_cpus: The number of CPUs to reserve for each parallel read worker.
  769. num_gpus: The number of GPUs to reserve for each parallel read worker. For
  770. example, specify `num_gpus=1` to request 1 GPU for each parallel read
  771. worker.
  772. memory: The heap memory in bytes to reserve for each parallel read worker.
  773. ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
  774. concurrency: The maximum number of Ray tasks to run concurrently. Set this
  775. to control number of tasks to run concurrently. This doesn't change the
  776. total number of tasks run or the total number of output blocks. By default,
  777. concurrency is dynamically decided based on the available resources.
  778. override_num_blocks: Override the number of output blocks from all read tasks.
  779. By default, the number of output blocks is dynamically decided based on
  780. input data size and available resources. You shouldn't manually set this
  781. value in most cases.
  782. Returns:
  783. Dataset producing rows from the results of executing the query (or reading the entire dataset)
  784. on the specified BigQuery dataset.
  785. """ # noqa: E501
  786. datasource = BigQueryDatasource(project_id=project_id, dataset=dataset, query=query)
  787. return read_datasource(
  788. datasource,
  789. num_cpus=num_cpus,
  790. num_gpus=num_gpus,
  791. memory=memory,
  792. parallelism=parallelism,
  793. ray_remote_args=ray_remote_args,
  794. concurrency=concurrency,
  795. override_num_blocks=override_num_blocks,
  796. )
  797. @PublicAPI
  798. def read_parquet(
  799. paths: Union[str, List[str]],
  800. *,
  801. filesystem: Optional["pyarrow.fs.FileSystem"] = None,
  802. columns: Optional[List[str]] = None,
  803. parallelism: int = -1,
  804. num_cpus: Optional[float] = None,
  805. num_gpus: Optional[float] = None,
  806. memory: Optional[float] = None,
  807. ray_remote_args: Dict[str, Any] = None,
  808. tensor_column_schema: Optional[Dict[str, Tuple[np.dtype, Tuple[int, ...]]]] = None,
  809. partition_filter: Optional[PathPartitionFilter] = None,
  810. partitioning: Optional[Partitioning] = Partitioning("hive"),
  811. shuffle: Optional[Union[Literal["files"], FileShuffleConfig]] = None,
  812. include_paths: bool = False,
  813. file_extensions: Optional[List[str]] = ParquetDatasource._FILE_EXTENSIONS,
  814. concurrency: Optional[int] = None,
  815. override_num_blocks: Optional[int] = None,
  816. **arrow_parquet_args,
  817. ) -> Dataset:
  818. """Creates a :class:`~ray.data.Dataset` from parquet files.
  819. Examples:
  820. Read a file in remote storage.
  821. >>> import ray
  822. >>> ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet")
  823. >>> ds.schema()
  824. Column Type
  825. ------ ----
  826. sepal.length double
  827. sepal.width double
  828. petal.length double
  829. petal.width double
  830. variety string
  831. Read a directory in remote storage.
  832. >>> ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris-parquet/")
  833. Read multiple local files.
  834. >>> ray.data.read_parquet(
  835. ... ["local:///path/to/file1", "local:///path/to/file2"]) # doctest: +SKIP
  836. Specify a schema for the parquet file.
  837. >>> import pyarrow as pa
  838. >>> fields = [("sepal.length", pa.float32()),
  839. ... ("sepal.width", pa.float32()),
  840. ... ("petal.length", pa.float32()),
  841. ... ("petal.width", pa.float32()),
  842. ... ("variety", pa.string())]
  843. >>> ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet",
  844. ... schema=pa.schema(fields))
  845. >>> ds.schema()
  846. Column Type
  847. ------ ----
  848. sepal.length float
  849. sepal.width float
  850. petal.length float
  851. petal.width float
  852. variety string
  853. The Parquet reader also supports projection and filter pushdown, allowing column
  854. selection and row filtering to be pushed down to the file scan.
  855. .. testcode::
  856. import pyarrow as pa
  857. # Create a Dataset by reading a Parquet file, pushing column selection and
  858. # row filtering down to the file scan.
  859. ds = ray.data.read_parquet(
  860. "s3://anonymous@ray-example-data/iris.parquet",
  861. columns=["sepal.length", "variety"],
  862. filter=pa.dataset.field("sepal.length") > 5.0,
  863. )
  864. ds.show(2)
  865. .. testoutput::
  866. {'sepal.length': 5.1, 'variety': 'Setosa'}
  867. {'sepal.length': 5.4, 'variety': 'Setosa'}
  868. For further arguments you can pass to PyArrow as a keyword argument, see the
  869. `PyArrow API reference <https://arrow.apache.org/docs/python/generated/\
  870. pyarrow.dataset.Scanner.html#pyarrow.dataset.Scanner.from_fragment>`_.
  871. Args:
  872. paths: A single file path or directory, or a list of file paths. Multiple
  873. directories are not supported.
  874. filesystem: The PyArrow filesystem
  875. implementation to read from. These filesystems are specified in the
  876. `pyarrow docs <https://arrow.apache.org/docs/python/api/\
  877. filesystems.html#filesystem-implementations>`_. Specify this parameter if
  878. you need to provide specific configurations to the filesystem. By default,
  879. the filesystem is automatically selected based on the scheme of the paths.
  880. For example, if the path begins with ``s3://``, the ``S3FileSystem`` is
  881. used. If ``None``, this function uses a system-chosen implementation.
  882. columns: A list of column names to read. Only the specified columns are
  883. read during the file scan.
  884. parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
  885. num_cpus: The number of CPUs to reserve for each parallel read worker.
  886. num_gpus: The number of GPUs to reserve for each parallel read worker. For
  887. example, specify `num_gpus=1` to request 1 GPU for each parallel read
  888. worker.
  889. memory: The heap memory in bytes to reserve for each parallel read worker.
  890. ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
  891. tensor_column_schema: A dict of column name to PyArrow dtype and shape
  892. mappings for converting a Parquet column containing serialized
  893. tensors (ndarrays) as their elements to PyArrow tensors. This function
  894. assumes that the tensors are serialized in the raw
  895. NumPy array format in C-contiguous order (e.g., via
  896. `arr.tobytes()`).
  897. partition_filter: A
  898. :class:`~ray.data.datasource.partitioning.PathPartitionFilter`. Use
  899. with a custom callback to read only selected partitions of a dataset.
  900. partitioning: A :class:`~ray.data.datasource.partitioning.Partitioning` object
  901. that describes how paths are organized. Defaults to HIVE partitioning.
  902. shuffle: If setting to "files", randomly shuffle input files order before read.
  903. If setting to :class:`~ray.data.FileShuffleConfig`, you can pass a seed to
  904. shuffle the input files. Defaults to not shuffle with ``None``.
  905. include_paths: If ``True``, include the path to each file. File paths are
  906. stored in the ``'path'`` column.
  907. file_extensions: A list of file extensions to filter files by.
  908. concurrency: The maximum number of Ray tasks to run concurrently. Set this
  909. to control number of tasks to run concurrently. This doesn't change the
  910. total number of tasks run or the total number of output blocks. By default,
  911. concurrency is dynamically decided based on the available resources.
  912. override_num_blocks: Override the number of output blocks from all read tasks.
  913. By default, the number of output blocks is dynamically decided based on
  914. input data size and available resources. You shouldn't manually set this
  915. value in most cases.
  916. **arrow_parquet_args: Other parquet read options to pass to PyArrow. For the full
  917. set of arguments, see the `PyArrow API <https://arrow.apache.org/docs/\
  918. python/generated/pyarrow.dataset.Scanner.html\
  919. #pyarrow.dataset.Scanner.from_fragment>`_
  920. Returns:
  921. :class:`~ray.data.Dataset` producing records read from the specified parquet
  922. files.
  923. """
  924. _validate_shuffle_arg(shuffle)
  925. # Check for deprecated filter parameter
  926. if "filter" in arrow_parquet_args:
  927. warnings.warn(
  928. "The `filter` argument is deprecated and will not be supported in a future release. "
  929. "Use `dataset.filter(expr=expr)` instead to filter rows.",
  930. DeprecationWarning,
  931. stacklevel=2,
  932. )
  933. arrow_parquet_args = _resolve_parquet_args(
  934. tensor_column_schema,
  935. **arrow_parquet_args,
  936. )
  937. dataset_kwargs = arrow_parquet_args.pop("dataset_kwargs", None)
  938. _block_udf = arrow_parquet_args.pop("_block_udf", None)
  939. schema = arrow_parquet_args.pop("schema", None)
  940. datasource = ParquetDatasource(
  941. paths,
  942. columns=columns,
  943. dataset_kwargs=dataset_kwargs,
  944. to_batch_kwargs=arrow_parquet_args,
  945. _block_udf=_block_udf,
  946. filesystem=filesystem,
  947. schema=schema,
  948. partition_filter=partition_filter,
  949. partitioning=partitioning,
  950. shuffle=shuffle,
  951. include_paths=include_paths,
  952. file_extensions=file_extensions,
  953. )
  954. return read_datasource(
  955. datasource,
  956. num_cpus=num_cpus,
  957. num_gpus=num_gpus,
  958. memory=memory,
  959. parallelism=parallelism,
  960. ray_remote_args=ray_remote_args,
  961. concurrency=concurrency,
  962. override_num_blocks=override_num_blocks,
  963. )
  964. @PublicAPI(stability="beta")
  965. def read_images(
  966. paths: Union[str, List[str]],
  967. *,
  968. filesystem: Optional["pyarrow.fs.FileSystem"] = None,
  969. parallelism: int = -1,
  970. num_cpus: Optional[float] = None,
  971. num_gpus: Optional[float] = None,
  972. memory: Optional[float] = None,
  973. ray_remote_args: Dict[str, Any] = None,
  974. arrow_open_file_args: Optional[Dict[str, Any]] = None,
  975. partition_filter: Optional[PathPartitionFilter] = None,
  976. partitioning: Partitioning = None,
  977. size: Optional[Tuple[int, int]] = None,
  978. mode: Optional[str] = None,
  979. include_paths: bool = False,
  980. ignore_missing_paths: bool = False,
  981. shuffle: Optional[Union[Literal["files"], FileShuffleConfig]] = None,
  982. file_extensions: Optional[List[str]] = ImageDatasource._FILE_EXTENSIONS,
  983. concurrency: Optional[int] = None,
  984. override_num_blocks: Optional[int] = None,
  985. ) -> Dataset:
  986. """Creates a :class:`~ray.data.Dataset` from image files.
  987. The column name defaults to "image".
  988. Examples:
  989. >>> import ray
  990. >>> path = "s3://anonymous@ray-example-data/batoidea/JPEGImages/"
  991. >>> ds = ray.data.read_images(path)
  992. >>> ds.schema()
  993. Column Type
  994. ------ ----
  995. image ArrowTensorTypeV2(shape=(32, 32, 3), dtype=uint8)
  996. If you need image file paths, set ``include_paths=True``.
  997. >>> ds = ray.data.read_images(path, include_paths=True)
  998. >>> ds.schema()
  999. Column Type
  1000. ------ ----
  1001. image ArrowTensorTypeV2(shape=(32, 32, 3), dtype=uint8)
  1002. path string
  1003. >>> ds.take(1)[0]["path"]
  1004. 'ray-example-data/batoidea/JPEGImages/1.jpeg'
  1005. If your images are arranged like:
  1006. .. code::
  1007. root/dog/xxx.png
  1008. root/dog/xxy.png
  1009. root/cat/123.png
  1010. root/cat/nsdf3.png
  1011. Then you can include the labels by specifying a
  1012. :class:`~ray.data.datasource.partitioning.Partitioning`.
  1013. >>> import ray
  1014. >>> from ray.data.datasource.partitioning import Partitioning
  1015. >>> root = "s3://anonymous@ray-example-data/image-datasets/dir-partitioned"
  1016. >>> partitioning = Partitioning("dir", field_names=["class"], base_dir=root)
  1017. >>> ds = ray.data.read_images(root, size=(224, 224), partitioning=partitioning)
  1018. >>> ds.schema()
  1019. Column Type
  1020. ------ ----
  1021. image ArrowTensorTypeV2(shape=(224, 224, 3), dtype=uint8)
  1022. class string
  1023. Args:
  1024. paths: A single file or directory, or a list of file or directory paths.
  1025. A list of paths can contain both files and directories.
  1026. filesystem: The pyarrow filesystem
  1027. implementation to read from. These filesystems are specified in the
  1028. `pyarrow docs <https://arrow.apache.org/docs/python/api/\
  1029. filesystems.html#filesystem-implementations>`_. Specify this parameter if
  1030. you need to provide specific configurations to the filesystem. By default,
  1031. the filesystem is automatically selected based on the scheme of the paths.
  1032. For example, if the path begins with ``s3://``, the `S3FileSystem` is used.
  1033. parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
  1034. num_cpus: The number of CPUs to reserve for each parallel read worker.
  1035. num_gpus: The number of GPUs to reserve for each parallel read worker. For
  1036. example, specify `num_gpus=1` to request 1 GPU for each parallel read
  1037. worker.
  1038. memory: The heap memory in bytes to reserve for each parallel read worker.
  1039. ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
  1040. arrow_open_file_args: kwargs passed to
  1041. `pyarrow.fs.FileSystem.open_input_file <https://arrow.apache.org/docs/\
  1042. python/generated/pyarrow.fs.FileSystem.html\
  1043. #pyarrow.fs.FileSystem.open_input_file>`_.
  1044. when opening input files to read.
  1045. partition_filter: A
  1046. :class:`~ray.data.datasource.partitioning.PathPartitionFilter`. Use
  1047. with a custom callback to read only selected partitions of a dataset.
  1048. By default, this filters out any file paths whose file extension does not
  1049. match ``*.png``, ``*.jpg``, ``*.jpeg``, ``*.tiff``, ``*.bmp``, or ``*.gif``.
  1050. partitioning: A :class:`~ray.data.datasource.partitioning.Partitioning` object
  1051. that describes how paths are organized. Defaults to ``None``.
  1052. size: The desired height and width of loaded images. If unspecified, images
  1053. retain their original shape.
  1054. mode: A `Pillow mode <https://pillow.readthedocs.io/en/stable/handbook/concepts\
  1055. .html#modes>`_
  1056. describing the desired type and depth of pixels. If unspecified, image
  1057. modes are inferred by
  1058. `Pillow <https://pillow.readthedocs.io/en/stable/index.html>`_.
  1059. include_paths: If ``True``, include the path to each image. File paths are
  1060. stored in the ``'path'`` column.
  1061. ignore_missing_paths: If True, ignores any file/directory paths in ``paths``
  1062. that are not found. Defaults to False.
  1063. shuffle: If setting to "files", randomly shuffle input files order before read.
  1064. If setting to :class:`~ray.data.FileShuffleConfig`, you can pass a seed to
  1065. shuffle the input files. Defaults to not shuffle with ``None``.
  1066. file_extensions: A list of file extensions to filter files by.
  1067. concurrency: The maximum number of Ray tasks to run concurrently. Set this
  1068. to control number of tasks to run concurrently. This doesn't change the
  1069. total number of tasks run or the total number of output blocks. By default,
  1070. concurrency is dynamically decided based on the available resources.
  1071. override_num_blocks: Override the number of output blocks from all read tasks.
  1072. By default, the number of output blocks is dynamically decided based on
  1073. input data size and available resources. You shouldn't manually set this
  1074. value in most cases.
  1075. Returns:
  1076. A :class:`~ray.data.Dataset` producing tensors that represent the images at
  1077. the specified paths. For information on working with tensors, read the
  1078. :ref:`tensor data guide <working_with_tensors>`.
  1079. Raises:
  1080. ValueError: if ``size`` contains non-positive numbers.
  1081. ValueError: if ``mode`` is unsupported.
  1082. """
  1083. datasource = ImageDatasource(
  1084. paths,
  1085. size=size,
  1086. mode=mode,
  1087. include_paths=include_paths,
  1088. filesystem=filesystem,
  1089. meta_provider=ImageFileMetadataProvider(),
  1090. open_stream_args=arrow_open_file_args,
  1091. partition_filter=partition_filter,
  1092. partitioning=partitioning,
  1093. ignore_missing_paths=ignore_missing_paths,
  1094. shuffle=shuffle,
  1095. file_extensions=file_extensions,
  1096. )
  1097. return read_datasource(
  1098. datasource,
  1099. num_cpus=num_cpus,
  1100. num_gpus=num_gpus,
  1101. memory=memory,
  1102. parallelism=parallelism,
  1103. ray_remote_args=ray_remote_args,
  1104. concurrency=concurrency,
  1105. override_num_blocks=override_num_blocks,
  1106. )
  1107. @PublicAPI
  1108. def read_json(
  1109. paths: Union[str, List[str]],
  1110. *,
  1111. lines: bool = False,
  1112. filesystem: Optional["pyarrow.fs.FileSystem"] = None,
  1113. parallelism: int = -1,
  1114. num_cpus: Optional[float] = None,
  1115. num_gpus: Optional[float] = None,
  1116. memory: Optional[float] = None,
  1117. ray_remote_args: Dict[str, Any] = None,
  1118. arrow_open_stream_args: Optional[Dict[str, Any]] = None,
  1119. partition_filter: Optional[PathPartitionFilter] = None,
  1120. partitioning: Partitioning = Partitioning("hive"),
  1121. include_paths: bool = False,
  1122. ignore_missing_paths: bool = False,
  1123. shuffle: Optional[Union[Literal["files"], FileShuffleConfig]] = None,
  1124. file_extensions: Optional[List[str]] = JSON_FILE_EXTENSIONS,
  1125. concurrency: Optional[int] = None,
  1126. override_num_blocks: Optional[int] = None,
  1127. **arrow_json_args,
  1128. ) -> Dataset:
  1129. """Creates a :class:`~ray.data.Dataset` from JSON and JSONL files.
  1130. For JSON file, the whole file is read as one row.
  1131. For JSONL file, each line of file is read as separate row.
  1132. Examples:
  1133. Read a JSON file in remote storage.
  1134. >>> import ray
  1135. >>> ds = ray.data.read_json("s3://anonymous@ray-example-data/log.json")
  1136. >>> ds.schema()
  1137. Column Type
  1138. ------ ----
  1139. timestamp timestamp[...]
  1140. size int64
  1141. Read a JSONL file in remote storage.
  1142. >>> ds = ray.data.read_json("s3://anonymous@ray-example-data/train.jsonl", lines=True)
  1143. >>> ds.schema()
  1144. Column Type
  1145. ------ ----
  1146. input <class 'object'>
  1147. Read multiple local files.
  1148. >>> ray.data.read_json( # doctest: +SKIP
  1149. ... ["local:///path/to/file1", "local:///path/to/file2"])
  1150. Read multiple directories.
  1151. >>> ray.data.read_json( # doctest: +SKIP
  1152. ... ["s3://bucket/path1", "s3://bucket/path2"])
  1153. By default, :meth:`~ray.data.read_json` parses
  1154. `Hive-style partitions <https://athena.guide/articles/\
  1155. hive-style-partitioning/>`_
  1156. from file paths. If your data adheres to a different partitioning scheme, set
  1157. the ``partitioning`` parameter.
  1158. >>> ds = ray.data.read_json("s3://anonymous@ray-example-data/year=2022/month=09/sales.json")
  1159. >>> ds.take(1)
  1160. [{'order_number': 10107, 'quantity': 30, 'year': '2022', 'month': '09'}]
  1161. Args:
  1162. paths: A single file or directory, or a list of file or directory paths.
  1163. A list of paths can contain both files and directories.
  1164. lines: [Experimental] If ``True``, read files assuming line-delimited JSON.
  1165. If set, will ignore the ``filesystem``, ``arrow_open_stream_args``, and
  1166. ``arrow_json_args`` parameters.
  1167. filesystem: The PyArrow filesystem
  1168. implementation to read from. These filesystems are specified in the
  1169. `PyArrow docs <https://arrow.apache.org/docs/python/api/\
  1170. filesystems.html#filesystem-implementations>`_. Specify this parameter if
  1171. you need to provide specific configurations to the filesystem. By default,
  1172. the filesystem is automatically selected based on the scheme of the paths.
  1173. For example, if the path begins with ``s3://``, the `S3FileSystem` is used.
  1174. parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
  1175. num_cpus: The number of CPUs to reserve for each parallel read worker.
  1176. num_gpus: The number of GPUs to reserve for each parallel read worker. For
  1177. example, specify `num_gpus=1` to request 1 GPU for each parallel read
  1178. worker.
  1179. memory: The heap memory in bytes to reserve for each parallel read worker.
  1180. ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
  1181. arrow_open_stream_args: kwargs passed to
  1182. `pyarrow.fs.FileSystem.open_input_file <https://arrow.apache.org/docs/\
  1183. python/generated/pyarrow.fs.FileSystem.html\
  1184. #pyarrow.fs.FileSystem.open_input_stream>`_.
  1185. when opening input files to read.
  1186. partition_filter: A
  1187. :class:`~ray.data.datasource.partitioning.PathPartitionFilter`.
  1188. Use with a custom callback to read only selected partitions of a
  1189. dataset.
  1190. By default, this filters out any file paths whose file extension does not
  1191. match "*.json" or "*.jsonl".
  1192. partitioning: A :class:`~ray.data.datasource.partitioning.Partitioning` object
  1193. that describes how paths are organized. By default, this function parses
  1194. `Hive-style partitions <https://athena.guide/articles/\
  1195. hive-style-partitioning/>`_.
  1196. include_paths: If ``True``, include the path to each file. File paths are
  1197. stored in the ``'path'`` column.
  1198. ignore_missing_paths: If True, ignores any file paths in ``paths`` that are not
  1199. found. Defaults to False.
  1200. shuffle: If setting to "files", randomly shuffle input files order before read.
  1201. If setting to ``FileShuffleConfig``, you can pass a random seed to shuffle
  1202. the input files, e.g. ``FileShuffleConfig(seed=42)``.
  1203. Defaults to not shuffle with ``None``.
  1204. file_extensions: A list of file extensions to filter files by.
  1205. concurrency: The maximum number of Ray tasks to run concurrently. Set this
  1206. to control number of tasks to run concurrently. This doesn't change the
  1207. total number of tasks run or the total number of output blocks. By default,
  1208. concurrency is dynamically decided based on the available resources.
  1209. override_num_blocks: Override the number of output blocks from all read tasks.
  1210. By default, the number of output blocks is dynamically decided based on
  1211. input data size and available resources. You shouldn't manually set this
  1212. value in most cases.
  1213. **arrow_json_args: JSON read options to pass to `pyarrow.json.read_json <https://\
  1214. arrow.apache.org/docs/python/generated/pyarrow.json.read_json.html#pyarrow.\
  1215. json.read_json>`_.
  1216. Returns:
  1217. :class:`~ray.data.Dataset` producing records read from the specified paths.
  1218. """ # noqa: E501
  1219. if lines:
  1220. incompatible_params = {
  1221. "filesystem": filesystem,
  1222. "arrow_open_stream_args": arrow_open_stream_args,
  1223. "arrow_json_args": arrow_json_args,
  1224. }
  1225. for param, value in incompatible_params.items():
  1226. if value:
  1227. raise ValueError(f"`{param}` is not supported when `lines=True`. ")
  1228. file_based_datasource_kwargs = dict(
  1229. filesystem=filesystem,
  1230. open_stream_args=arrow_open_stream_args,
  1231. meta_provider=DefaultFileMetadataProvider(),
  1232. partition_filter=partition_filter,
  1233. partitioning=partitioning,
  1234. ignore_missing_paths=ignore_missing_paths,
  1235. shuffle=shuffle,
  1236. include_paths=include_paths,
  1237. file_extensions=file_extensions,
  1238. )
  1239. if lines:
  1240. target_output_size_bytes = (
  1241. ray.data.context.DataContext.get_current().target_max_block_size
  1242. )
  1243. datasource = PandasJSONDatasource(
  1244. paths,
  1245. target_output_size_bytes=target_output_size_bytes,
  1246. **file_based_datasource_kwargs,
  1247. )
  1248. else:
  1249. datasource = ArrowJSONDatasource(
  1250. paths,
  1251. arrow_json_args=arrow_json_args,
  1252. **file_based_datasource_kwargs,
  1253. )
  1254. return read_datasource(
  1255. datasource,
  1256. num_cpus=num_cpus,
  1257. num_gpus=num_gpus,
  1258. memory=memory,
  1259. parallelism=parallelism,
  1260. ray_remote_args=ray_remote_args,
  1261. concurrency=concurrency,
  1262. override_num_blocks=override_num_blocks,
  1263. )
  1264. @PublicAPI
  1265. def read_csv(
  1266. paths: Union[str, List[str]],
  1267. *,
  1268. filesystem: Optional["pyarrow.fs.FileSystem"] = None,
  1269. parallelism: int = -1,
  1270. num_cpus: Optional[float] = None,
  1271. num_gpus: Optional[float] = None,
  1272. memory: Optional[float] = None,
  1273. ray_remote_args: Dict[str, Any] = None,
  1274. arrow_open_stream_args: Optional[Dict[str, Any]] = None,
  1275. partition_filter: Optional[PathPartitionFilter] = None,
  1276. partitioning: Partitioning = Partitioning("hive"),
  1277. include_paths: bool = False,
  1278. ignore_missing_paths: bool = False,
  1279. shuffle: Optional[Union[Literal["files"], FileShuffleConfig]] = None,
  1280. file_extensions: Optional[List[str]] = None,
  1281. concurrency: Optional[int] = None,
  1282. override_num_blocks: Optional[int] = None,
  1283. **arrow_csv_args,
  1284. ) -> Dataset:
  1285. """Creates a :class:`~ray.data.Dataset` from CSV files.
  1286. Examples:
  1287. Read a file in remote storage.
  1288. >>> import ray
  1289. >>> ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
  1290. >>> ds.schema()
  1291. Column Type
  1292. ------ ----
  1293. sepal length (cm) double
  1294. sepal width (cm) double
  1295. petal length (cm) double
  1296. petal width (cm) double
  1297. target int64
  1298. Read multiple local files.
  1299. >>> ray.data.read_csv( # doctest: +SKIP
  1300. ... ["local:///path/to/file1", "local:///path/to/file2"])
  1301. Read a directory from remote storage.
  1302. >>> ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris-csv/")
  1303. Read files that use a different delimiter. For more uses of ParseOptions see
  1304. https://arrow.apache.org/docs/python/generated/pyarrow.csv.ParseOptions.html # noqa: #501
  1305. >>> from pyarrow import csv
  1306. >>> parse_options = csv.ParseOptions(delimiter="\\t")
  1307. >>> ds = ray.data.read_csv(
  1308. ... "s3://anonymous@ray-example-data/iris.tsv",
  1309. ... parse_options=parse_options)
  1310. >>> ds.schema()
  1311. Column Type
  1312. ------ ----
  1313. sepal.length double
  1314. sepal.width double
  1315. petal.length double
  1316. petal.width double
  1317. variety string
  1318. Convert a date column with a custom format from a CSV file. For more uses of ConvertOptions see https://arrow.apache.org/docs/python/generated/pyarrow.csv.ConvertOptions.html # noqa: #501
  1319. >>> from pyarrow import csv
  1320. >>> convert_options = csv.ConvertOptions(
  1321. ... timestamp_parsers=["%m/%d/%Y"])
  1322. >>> ds = ray.data.read_csv(
  1323. ... "s3://anonymous@ray-example-data/dow_jones.csv",
  1324. ... convert_options=convert_options)
  1325. By default, :meth:`~ray.data.read_csv` parses
  1326. `Hive-style partitions <https://athena.guide/\
  1327. articles/hive-style-partitioning/>`_
  1328. from file paths. If your data adheres to a different partitioning scheme, set
  1329. the ``partitioning`` parameter.
  1330. >>> ds = ray.data.read_csv("s3://anonymous@ray-example-data/year=2022/month=09/sales.csv")
  1331. >>> ds.take(1)
  1332. [{'order_number': 10107, 'quantity': 30, 'year': '2022', 'month': '09'}]
  1333. By default, :meth:`~ray.data.read_csv` reads all files from file paths. If you want to filter
  1334. files by file extensions, set the ``file_extensions`` parameter.
  1335. Read only ``*.csv`` files from a directory.
  1336. >>> ray.data.read_csv("s3://anonymous@ray-example-data/different-extensions/",
  1337. ... file_extensions=["csv"])
  1338. Dataset(num_rows=?, schema=Unknown schema)
  1339. Args:
  1340. paths: A single file or directory, or a list of file or directory paths.
  1341. A list of paths can contain both files and directories.
  1342. filesystem: The PyArrow filesystem
  1343. implementation to read from. These filesystems are specified in the
  1344. `pyarrow docs <https://arrow.apache.org/docs/python/api/\
  1345. filesystems.html#filesystem-implementations>`_. Specify this parameter if
  1346. you need to provide specific configurations to the filesystem. By default,
  1347. the filesystem is automatically selected based on the scheme of the paths.
  1348. For example, if the path begins with ``s3://``, the `S3FileSystem` is used.
  1349. parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
  1350. num_cpus: The number of CPUs to reserve for each parallel read worker.
  1351. num_gpus: The number of GPUs to reserve for each parallel read worker. For
  1352. example, specify `num_gpus=1` to request 1 GPU for each parallel read
  1353. worker.
  1354. memory: The heap memory in bytes to reserve for each parallel read worker.
  1355. ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
  1356. arrow_open_stream_args: kwargs passed to
  1357. `pyarrow.fs.FileSystem.open_input_file <https://arrow.apache.org/docs/\
  1358. python/generated/pyarrow.fs.FileSystem.html\
  1359. #pyarrow.fs.FileSystem.open_input_stream>`_.
  1360. when opening input files to read.
  1361. partition_filter: A
  1362. :class:`~ray.data.datasource.partitioning.PathPartitionFilter`.
  1363. Use with a custom callback to read only selected partitions of a
  1364. dataset. By default, no files are filtered.
  1365. partitioning: A :class:`~ray.data.datasource.partitioning.Partitioning` object
  1366. that describes how paths are organized. By default, this function parses
  1367. `Hive-style partitions <https://athena.guide/articles/\
  1368. hive-style-partitioning/>`_.
  1369. include_paths: If ``True``, include the path to each file. File paths are
  1370. stored in the ``'path'`` column.
  1371. ignore_missing_paths: If True, ignores any file paths in ``paths`` that are not
  1372. found. Defaults to False.
  1373. shuffle: If setting to "files", randomly shuffle input files order before read.
  1374. If setting to :class:`~ray.data.FileShuffleConfig`, you can pass a seed to
  1375. shuffle the input files. Defaults to not shuffle with ``None``.
  1376. file_extensions: A list of file extensions to filter files by.
  1377. concurrency: The maximum number of Ray tasks to run concurrently. Set this
  1378. to control number of tasks to run concurrently. This doesn't change the
  1379. total number of tasks run or the total number of output blocks. By default,
  1380. concurrency is dynamically decided based on the available resources.
  1381. override_num_blocks: Override the number of output blocks from all read tasks.
  1382. By default, the number of output blocks is dynamically decided based on
  1383. input data size and available resources. You shouldn't manually set this
  1384. value in most cases.
  1385. **arrow_csv_args: CSV read options to pass to
  1386. `pyarrow.csv.open_csv <https://arrow.apache.org/docs/python/generated/\
  1387. pyarrow.csv.open_csv.html#pyarrow.csv.open_csv>`_
  1388. when opening CSV files.
  1389. Returns:
  1390. :class:`~ray.data.Dataset` producing records read from the specified paths.
  1391. """
  1392. datasource = CSVDatasource(
  1393. paths,
  1394. arrow_csv_args=arrow_csv_args,
  1395. filesystem=filesystem,
  1396. open_stream_args=arrow_open_stream_args,
  1397. meta_provider=DefaultFileMetadataProvider(),
  1398. partition_filter=partition_filter,
  1399. partitioning=partitioning,
  1400. ignore_missing_paths=ignore_missing_paths,
  1401. shuffle=shuffle,
  1402. include_paths=include_paths,
  1403. file_extensions=file_extensions,
  1404. )
  1405. return read_datasource(
  1406. datasource,
  1407. num_cpus=num_cpus,
  1408. num_gpus=num_gpus,
  1409. memory=memory,
  1410. parallelism=parallelism,
  1411. ray_remote_args=ray_remote_args,
  1412. concurrency=concurrency,
  1413. override_num_blocks=override_num_blocks,
  1414. )
  1415. @PublicAPI
  1416. def read_text(
  1417. paths: Union[str, List[str]],
  1418. *,
  1419. encoding: str = "utf-8",
  1420. drop_empty_lines: bool = True,
  1421. filesystem: Optional["pyarrow.fs.FileSystem"] = None,
  1422. parallelism: int = -1,
  1423. num_cpus: Optional[float] = None,
  1424. num_gpus: Optional[float] = None,
  1425. memory: Optional[float] = None,
  1426. ray_remote_args: Optional[Dict[str, Any]] = None,
  1427. arrow_open_stream_args: Optional[Dict[str, Any]] = None,
  1428. partition_filter: Optional[PathPartitionFilter] = None,
  1429. partitioning: Partitioning = None,
  1430. include_paths: bool = False,
  1431. ignore_missing_paths: bool = False,
  1432. shuffle: Optional[Union[Literal["files"], FileShuffleConfig]] = None,
  1433. file_extensions: Optional[List[str]] = None,
  1434. concurrency: Optional[int] = None,
  1435. override_num_blocks: Optional[int] = None,
  1436. ) -> Dataset:
  1437. """Create a :class:`~ray.data.Dataset` from lines stored in text files.
  1438. The column name default to "text".
  1439. Examples:
  1440. Read a file in remote storage.
  1441. >>> import ray
  1442. >>> ds = ray.data.read_text("s3://anonymous@ray-example-data/this.txt")
  1443. >>> ds.schema()
  1444. Column Type
  1445. ------ ----
  1446. text string
  1447. Read multiple local files.
  1448. >>> ray.data.read_text( # doctest: +SKIP
  1449. ... ["local:///path/to/file1", "local:///path/to/file2"])
  1450. Args:
  1451. paths: A single file or directory, or a list of file or directory paths.
  1452. A list of paths can contain both files and directories.
  1453. encoding: The encoding of the files (e.g., "utf-8" or "ascii").
  1454. drop_empty_lines: If ``True``, drop empty lines from the dataset.
  1455. Defaults to ``True``.
  1456. filesystem: The PyArrow filesystem
  1457. implementation to read from. These filesystems are specified in the
  1458. `PyArrow docs <https://arrow.apache.org/docs/python/api/\
  1459. filesystems.html#filesystem-implementations>`_. Specify this parameter if
  1460. you need to provide specific configurations to the filesystem. By default,
  1461. the filesystem is automatically selected based on the scheme of the paths.
  1462. For example, if the path begins with ``s3://``, the `S3FileSystem` is used.
  1463. parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
  1464. num_cpus: The number of CPUs to reserve for each parallel read worker.
  1465. num_gpus: The number of GPUs to reserve for each parallel read worker. For
  1466. example, specify `num_gpus=1` to request 1 GPU for each parallel read
  1467. worker.
  1468. memory: The heap memory in bytes to reserve for each parallel read worker.
  1469. ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks and
  1470. in the subsequent text decoding map task.
  1471. arrow_open_stream_args: kwargs passed to
  1472. `pyarrow.fs.FileSystem.open_input_file <https://arrow.apache.org/docs/\
  1473. python/generated/pyarrow.fs.FileSystem.html\
  1474. #pyarrow.fs.FileSystem.open_input_stream>`_.
  1475. when opening input files to read.
  1476. partition_filter: A
  1477. :class:`~ray.data.datasource.partitioning.PathPartitionFilter`.
  1478. Use with a custom callback to read only selected partitions of a
  1479. dataset. By default, no files are filtered.
  1480. partitioning: A :class:`~ray.data.datasource.partitioning.Partitioning` object
  1481. that describes how paths are organized. Defaults to ``None``.
  1482. include_paths: If ``True``, include the path to each file. File paths are
  1483. stored in the ``'path'`` column.
  1484. ignore_missing_paths: If True, ignores any file paths in ``paths`` that are not
  1485. found. Defaults to False.
  1486. shuffle: If setting to "files", randomly shuffle input files order before read.
  1487. If setting to :class:`~ray.data.FileShuffleConfig`, you can pass a seed to
  1488. shuffle the input files. Defaults to not shuffle with ``None``.
  1489. file_extensions: A list of file extensions to filter files by.
  1490. concurrency: The maximum number of Ray tasks to run concurrently. Set this
  1491. to control number of tasks to run concurrently. This doesn't change the
  1492. total number of tasks run or the total number of output blocks. By default,
  1493. concurrency is dynamically decided based on the available resources.
  1494. override_num_blocks: Override the number of output blocks from all read tasks.
  1495. By default, the number of output blocks is dynamically decided based on
  1496. input data size and available resources. You shouldn't manually set this
  1497. value in most cases.
  1498. Returns:
  1499. :class:`~ray.data.Dataset` producing lines of text read from the specified
  1500. paths.
  1501. """
  1502. datasource = TextDatasource(
  1503. paths,
  1504. drop_empty_lines=drop_empty_lines,
  1505. encoding=encoding,
  1506. filesystem=filesystem,
  1507. open_stream_args=arrow_open_stream_args,
  1508. meta_provider=DefaultFileMetadataProvider(),
  1509. partition_filter=partition_filter,
  1510. partitioning=partitioning,
  1511. ignore_missing_paths=ignore_missing_paths,
  1512. shuffle=shuffle,
  1513. include_paths=include_paths,
  1514. file_extensions=file_extensions,
  1515. )
  1516. return read_datasource(
  1517. datasource,
  1518. num_cpus=num_cpus,
  1519. num_gpus=num_gpus,
  1520. memory=memory,
  1521. parallelism=parallelism,
  1522. ray_remote_args=ray_remote_args,
  1523. concurrency=concurrency,
  1524. override_num_blocks=override_num_blocks,
  1525. )
  1526. @PublicAPI
  1527. def read_avro(
  1528. paths: Union[str, List[str]],
  1529. *,
  1530. filesystem: Optional["pyarrow.fs.FileSystem"] = None,
  1531. parallelism: int = -1,
  1532. num_cpus: Optional[float] = None,
  1533. num_gpus: Optional[float] = None,
  1534. memory: Optional[float] = None,
  1535. ray_remote_args: Optional[Dict[str, Any]] = None,
  1536. arrow_open_stream_args: Optional[Dict[str, Any]] = None,
  1537. partition_filter: Optional[PathPartitionFilter] = None,
  1538. partitioning: Partitioning = None,
  1539. include_paths: bool = False,
  1540. ignore_missing_paths: bool = False,
  1541. shuffle: Optional[Union[Literal["files"], FileShuffleConfig]] = None,
  1542. file_extensions: Optional[List[str]] = None,
  1543. concurrency: Optional[int] = None,
  1544. override_num_blocks: Optional[int] = None,
  1545. ) -> Dataset:
  1546. """Create a :class:`~ray.data.Dataset` from records stored in Avro files.
  1547. Examples:
  1548. Read an Avro file in remote storage or local storage.
  1549. >>> import ray
  1550. >>> ds = ray.data.read_avro("s3://anonymous@ray-example-data/mnist.avro")
  1551. >>> ds.schema()
  1552. Column Type
  1553. ------ ----
  1554. features list<item: int64>
  1555. label int64
  1556. dataType string
  1557. >>> ray.data.read_avro( # doctest: +SKIP
  1558. ... ["local:///path/to/file1", "local:///path/to/file2"])
  1559. Args:
  1560. paths: A single file or directory, or a list of file or directory paths.
  1561. A list of paths can contain both files and directories.
  1562. filesystem: The PyArrow filesystem
  1563. implementation to read from. These filesystems are specified in the
  1564. `PyArrow docs <https://arrow.apache.org/docs/python/api/\
  1565. filesystems.html#filesystem-implementations>`_. Specify this parameter if
  1566. you need to provide specific configurations to the filesystem. By default,
  1567. the filesystem is automatically selected based on the scheme of the paths.
  1568. For example, if the path begins with ``s3://``, the `S3FileSystem` is used.
  1569. parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
  1570. num_cpus: The number of CPUs to reserve for each parallel read worker.
  1571. num_gpus: The number of GPUs to reserve for each parallel read worker. For
  1572. example, specify `num_gpus=1` to request 1 GPU for each parallel read
  1573. worker.
  1574. memory: The heap memory in bytes to reserve for each parallel read worker.
  1575. ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks and
  1576. in the subsequent text decoding map task.
  1577. arrow_open_stream_args: kwargs passed to
  1578. `pyarrow.fs.FileSystem.open_input_file <https://arrow.apache.org/docs/\
  1579. python/generated/pyarrow.fs.FileSystem.html\
  1580. #pyarrow.fs.FileSystem.open_input_stream>`_.
  1581. when opening input files to read.
  1582. partition_filter: A
  1583. :class:`~ray.data.datasource.partitioning.PathPartitionFilter`.
  1584. Use with a custom callback to read only selected partitions of a
  1585. dataset. By default, no files are filtered.
  1586. partitioning: A :class:`~ray.data.datasource.partitioning.Partitioning` object
  1587. that describes how paths are organized. Defaults to ``None``.
  1588. include_paths: If ``True``, include the path to each file. File paths are
  1589. stored in the ``'path'`` column.
  1590. ignore_missing_paths: If True, ignores any file paths in ``paths`` that are not
  1591. found. Defaults to False.
  1592. shuffle: If setting to "files", randomly shuffle input files order before read.
  1593. If setting to :class:`~ray.data.FileShuffleConfig`, you can pass a seed to
  1594. shuffle the input files. Defaults to not shuffle with ``None``.
  1595. file_extensions: A list of file extensions to filter files by.
  1596. concurrency: The maximum number of Ray tasks to run concurrently. Set this
  1597. to control number of tasks to run concurrently. This doesn't change the
  1598. total number of tasks run or the total number of output blocks. By default,
  1599. concurrency is dynamically decided based on the available resources.
  1600. override_num_blocks: Override the number of output blocks from all read tasks.
  1601. By default, the number of output blocks is dynamically decided based on
  1602. input data size and available resources. You shouldn't manually set this
  1603. value in most cases.
  1604. Returns:
  1605. :class:`~ray.data.Dataset` holding records from the Avro files.
  1606. """
  1607. datasource = AvroDatasource(
  1608. paths,
  1609. filesystem=filesystem,
  1610. open_stream_args=arrow_open_stream_args,
  1611. meta_provider=DefaultFileMetadataProvider(),
  1612. partition_filter=partition_filter,
  1613. partitioning=partitioning,
  1614. ignore_missing_paths=ignore_missing_paths,
  1615. shuffle=shuffle,
  1616. include_paths=include_paths,
  1617. file_extensions=file_extensions,
  1618. )
  1619. return read_datasource(
  1620. datasource,
  1621. num_cpus=num_cpus,
  1622. num_gpus=num_gpus,
  1623. memory=memory,
  1624. parallelism=parallelism,
  1625. ray_remote_args=ray_remote_args,
  1626. concurrency=concurrency,
  1627. override_num_blocks=override_num_blocks,
  1628. )
  1629. @PublicAPI
  1630. def read_numpy(
  1631. paths: Union[str, List[str]],
  1632. *,
  1633. filesystem: Optional["pyarrow.fs.FileSystem"] = None,
  1634. parallelism: int = -1,
  1635. arrow_open_stream_args: Optional[Dict[str, Any]] = None,
  1636. partition_filter: Optional[PathPartitionFilter] = None,
  1637. partitioning: Partitioning = None,
  1638. include_paths: bool = False,
  1639. ignore_missing_paths: bool = False,
  1640. shuffle: Optional[Union[Literal["files"], FileShuffleConfig]] = None,
  1641. file_extensions: Optional[List[str]] = NumpyDatasource._FILE_EXTENSIONS,
  1642. concurrency: Optional[int] = None,
  1643. override_num_blocks: Optional[int] = None,
  1644. **numpy_load_args,
  1645. ) -> Dataset:
  1646. """Create an Arrow dataset from numpy files.
  1647. The column name defaults to "data".
  1648. Examples:
  1649. Read a directory of files in remote storage.
  1650. >>> import ray
  1651. >>> ray.data.read_numpy("s3://bucket/path") # doctest: +SKIP
  1652. Read multiple local files.
  1653. >>> ray.data.read_numpy(["/path/to/file1", "/path/to/file2"]) # doctest: +SKIP
  1654. Read multiple directories.
  1655. >>> ray.data.read_numpy( # doctest: +SKIP
  1656. ... ["s3://bucket/path1", "s3://bucket/path2"])
  1657. Args:
  1658. paths: A single file/directory path or a list of file/directory paths.
  1659. A list of paths can contain both files and directories.
  1660. filesystem: The filesystem implementation to read from.
  1661. parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
  1662. arrow_open_stream_args: kwargs passed to
  1663. `pyarrow.fs.FileSystem.open_input_stream <https://arrow.apache.org/docs/python/generated/pyarrow.fs.FileSystem.html>`_.
  1664. partition_filter: Path-based partition filter, if any. Can be used
  1665. with a custom callback to read only selected partitions of a dataset.
  1666. By default, this filters out any file paths whose file extension does not
  1667. match "*.npy*".
  1668. partitioning: A :class:`~ray.data.datasource.partitioning.Partitioning` object
  1669. that describes how paths are organized. Defaults to ``None``.
  1670. include_paths: If ``True``, include the path to each file. File paths are
  1671. stored in the ``'path'`` column.
  1672. ignore_missing_paths: If True, ignores any file paths in ``paths`` that are not
  1673. found. Defaults to False.
  1674. shuffle: If setting to "files", randomly shuffle input files order before read.
  1675. if setting to ``FileShuffleConfig``, the random seed can be passed toshuffle the
  1676. input files, i.e. ``FileShuffleConfig(seed = 42)``.
  1677. Defaults to not shuffle with ``None``.
  1678. file_extensions: A list of file extensions to filter files by.
  1679. concurrency: The maximum number of Ray tasks to run concurrently. Set this
  1680. to control number of tasks to run concurrently. This doesn't change the
  1681. total number of tasks run or the total number of output blocks. By default,
  1682. concurrency is dynamically decided based on the available resources.
  1683. override_num_blocks: Override the number of output blocks from all read tasks.
  1684. By default, the number of output blocks is dynamically decided based on
  1685. input data size and available resources. You shouldn't manually set this
  1686. value in most cases.
  1687. **numpy_load_args: Other options to pass to np.load.
  1688. Returns:
  1689. Dataset holding Tensor records read from the specified paths.
  1690. """ # noqa: E501
  1691. datasource = NumpyDatasource(
  1692. paths,
  1693. numpy_load_args=numpy_load_args,
  1694. filesystem=filesystem,
  1695. open_stream_args=arrow_open_stream_args,
  1696. meta_provider=DefaultFileMetadataProvider(),
  1697. partition_filter=partition_filter,
  1698. partitioning=partitioning,
  1699. ignore_missing_paths=ignore_missing_paths,
  1700. shuffle=shuffle,
  1701. include_paths=include_paths,
  1702. file_extensions=file_extensions,
  1703. )
  1704. return read_datasource(
  1705. datasource,
  1706. parallelism=parallelism,
  1707. concurrency=concurrency,
  1708. override_num_blocks=override_num_blocks,
  1709. )
  1710. @PublicAPI(stability="alpha")
  1711. def read_tfrecords(
  1712. paths: Union[str, List[str]],
  1713. *,
  1714. filesystem: Optional["pyarrow.fs.FileSystem"] = None,
  1715. parallelism: int = -1,
  1716. num_cpus: Optional[float] = None,
  1717. num_gpus: Optional[float] = None,
  1718. memory: Optional[float] = None,
  1719. ray_remote_args: Dict[str, Any] = None,
  1720. arrow_open_stream_args: Optional[Dict[str, Any]] = None,
  1721. partition_filter: Optional[PathPartitionFilter] = None,
  1722. include_paths: bool = False,
  1723. ignore_missing_paths: bool = False,
  1724. tf_schema: Optional["schema_pb2.Schema"] = None,
  1725. shuffle: Optional[Union[Literal["files"], FileShuffleConfig]] = None,
  1726. file_extensions: Optional[List[str]] = None,
  1727. concurrency: Optional[int] = None,
  1728. override_num_blocks: Optional[int] = None,
  1729. tfx_read_options: Optional["TFXReadOptions"] = None,
  1730. ) -> Dataset:
  1731. """Create a :class:`~ray.data.Dataset` from TFRecord files that contain
  1732. `tf.train.Example <https://www.tensorflow.org/api_docs/python/tf/train/Example>`_
  1733. messages.
  1734. .. tip::
  1735. Using the ``tfx-bsl`` library is more performant when reading large
  1736. datasets (for example, in production use cases). To use this
  1737. implementation, you must first install ``tfx-bsl``:
  1738. 1. `pip install tfx_bsl --no-dependencies`
  1739. 2. Pass tfx_read_options to read_tfrecords, for example:
  1740. `ds = read_tfrecords(path, ..., tfx_read_options=TFXReadOptions())`
  1741. .. warning::
  1742. This function exclusively supports ``tf.train.Example`` messages. If a file
  1743. contains a message that isn't of type ``tf.train.Example``, then this function
  1744. fails.
  1745. Examples:
  1746. >>> import ray
  1747. >>> ray.data.read_tfrecords("s3://anonymous@ray-example-data/iris.tfrecords")
  1748. Dataset(num_rows=?, schema=Unknown schema)
  1749. We can also read compressed TFRecord files, which use one of the
  1750. `compression types supported by Arrow <https://arrow.apache.org/docs/python/\
  1751. generated/pyarrow.CompressedInputStream.html>`_:
  1752. >>> ray.data.read_tfrecords(
  1753. ... "s3://anonymous@ray-example-data/iris.tfrecords.gz",
  1754. ... arrow_open_stream_args={"compression": "gzip"},
  1755. ... )
  1756. Dataset(num_rows=?, schema=Unknown schema)
  1757. Args:
  1758. paths: A single file or directory, or a list of file or directory paths.
  1759. A list of paths can contain both files and directories.
  1760. filesystem: The PyArrow filesystem
  1761. implementation to read from. These filesystems are specified in the
  1762. `PyArrow docs <https://arrow.apache.org/docs/python/api/\
  1763. filesystems.html#filesystem-implementations>`_. Specify this parameter if
  1764. you need to provide specific configurations to the filesystem. By default,
  1765. the filesystem is automatically selected based on the scheme of the paths.
  1766. For example, if the path begins with ``s3://``, the `S3FileSystem` is used.
  1767. parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
  1768. num_cpus: The number of CPUs to reserve for each parallel read worker.
  1769. num_gpus: The number of GPUs to reserve for each parallel read worker. For
  1770. example, specify `num_gpus=1` to request 1 GPU for each parallel read
  1771. worker.
  1772. memory: The heap memory in bytes to reserve for each parallel read worker.
  1773. ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
  1774. arrow_open_stream_args: kwargs passed to
  1775. `pyarrow.fs.FileSystem.open_input_file <https://arrow.apache.org/docs/\
  1776. python/generated/pyarrow.fs.FileSystem.html\
  1777. #pyarrow.fs.FileSystem.open_input_stream>`_.
  1778. when opening input files to read. To read a compressed TFRecord file,
  1779. pass the corresponding compression type (e.g., for ``GZIP`` or ``ZLIB``),
  1780. use ``arrow_open_stream_args={'compression': 'gzip'}``).
  1781. partition_filter: A
  1782. :class:`~ray.data.datasource.partitioning.PathPartitionFilter`.
  1783. Use with a custom callback to read only selected partitions of a
  1784. dataset.
  1785. include_paths: If ``True``, include the path to each file. File paths are
  1786. stored in the ``'path'`` column.
  1787. ignore_missing_paths: If True, ignores any file paths in ``paths`` that are not
  1788. found. Defaults to False.
  1789. tf_schema: Optional TensorFlow Schema which is used to explicitly set the schema
  1790. of the underlying Dataset.
  1791. shuffle: If setting to "files", randomly shuffle input files order before read.
  1792. If setting to :class:`~ray.data.FileShuffleConfig`, you can pass a seed to
  1793. shuffle the input files. Defaults to not shuffle with ``None``.
  1794. file_extensions: A list of file extensions to filter files by.
  1795. concurrency: The maximum number of Ray tasks to run concurrently. Set this
  1796. to control number of tasks to run concurrently. This doesn't change the
  1797. total number of tasks run or the total number of output blocks. By default,
  1798. concurrency is dynamically decided based on the available resources.
  1799. override_num_blocks: Override the number of output blocks from all read tasks.
  1800. By default, the number of output blocks is dynamically decided based on
  1801. input data size and available resources. You shouldn't manually set this
  1802. value in most cases.
  1803. tfx_read_options: Specifies read options when reading TFRecord files with TFX.
  1804. When no options are provided, the default version without tfx-bsl will
  1805. be used to read the tfrecords.
  1806. Returns:
  1807. A :class:`~ray.data.Dataset` that contains the example features.
  1808. Raises:
  1809. ValueError: If a file contains a message that isn't a ``tf.train.Example``.
  1810. """
  1811. import platform
  1812. tfx_read = False
  1813. if tfx_read_options and platform.processor() != "arm":
  1814. try:
  1815. import tfx_bsl # noqa: F401
  1816. tfx_read = True
  1817. except ModuleNotFoundError:
  1818. # override the tfx_read_options given that tfx-bsl is not installed
  1819. tfx_read_options = None
  1820. logger.warning(
  1821. "Please install tfx-bsl package with"
  1822. " `pip install tfx_bsl --no-dependencies`."
  1823. " This can help speed up the reading of large TFRecord files."
  1824. )
  1825. datasource = TFRecordDatasource(
  1826. paths,
  1827. tf_schema=tf_schema,
  1828. filesystem=filesystem,
  1829. open_stream_args=arrow_open_stream_args,
  1830. meta_provider=DefaultFileMetadataProvider(),
  1831. partition_filter=partition_filter,
  1832. ignore_missing_paths=ignore_missing_paths,
  1833. shuffle=shuffle,
  1834. include_paths=include_paths,
  1835. file_extensions=file_extensions,
  1836. tfx_read_options=tfx_read_options,
  1837. )
  1838. ds = read_datasource(
  1839. datasource,
  1840. parallelism=parallelism,
  1841. ray_remote_args=ray_remote_args,
  1842. num_cpus=num_cpus,
  1843. num_gpus=num_gpus,
  1844. memory=memory,
  1845. concurrency=concurrency,
  1846. override_num_blocks=override_num_blocks,
  1847. )
  1848. if (
  1849. tfx_read_options
  1850. and tfx_read_options.auto_infer_schema
  1851. and tfx_read
  1852. and not tf_schema
  1853. ):
  1854. from ray.data._internal.datasource.tfrecords_datasource import (
  1855. _infer_schema_and_transform,
  1856. )
  1857. return _infer_schema_and_transform(ds)
  1858. return ds
  1859. @PublicAPI(stability="alpha")
  1860. def read_mcap(
  1861. paths: Union[str, List[str]],
  1862. *,
  1863. topics: Optional[Union[List[str], Set[str]]] = None,
  1864. time_range: Optional[Union[Tuple[int, int], TimeRange]] = None,
  1865. message_types: Optional[Union[List[str], Set[str]]] = None,
  1866. include_metadata: bool = True,
  1867. filesystem: Optional["pyarrow.fs.FileSystem"] = None,
  1868. parallelism: int = -1,
  1869. num_cpus: Optional[float] = None,
  1870. num_gpus: Optional[float] = None,
  1871. memory: Optional[float] = None,
  1872. ray_remote_args: Optional[Dict[str, Any]] = None,
  1873. partition_filter: Optional[PathPartitionFilter] = None,
  1874. partitioning: Partitioning = None,
  1875. include_paths: bool = False,
  1876. ignore_missing_paths: bool = False,
  1877. shuffle: Optional[Union[Literal["files"], FileShuffleConfig]] = None,
  1878. file_extensions: Optional[List[str]] = None,
  1879. concurrency: Optional[int] = None,
  1880. override_num_blocks: Optional[int] = None,
  1881. ) -> Dataset:
  1882. """Create a :class:`~ray.data.Dataset` from MCAP (Message Capture) files.
  1883. MCAP is a format commonly used in robotics and autonomous systems for storing
  1884. ROS2 messages and other time-series data. This reader provides predicate pushdown
  1885. optimization for efficient filtering by topics, time ranges, and message types.
  1886. Examples:
  1887. :noindex:
  1888. Read all MCAP files in a directory.
  1889. >>> import ray
  1890. >>> ds = ray.data.read_mcap("s3://bucket/mcap-data/") # doctest: +SKIP
  1891. >>> ds.schema() # doctest: +SKIP
  1892. Read with filtering for specific topics and time range.
  1893. >>> from ray.data.datasource import TimeRange # doctest: +SKIP
  1894. >>> ds = ray.data.read_mcap( # doctest: +SKIP
  1895. ... "s3://bucket/mcap-data/", # doctest: +SKIP
  1896. ... topics={"/camera/image_raw", "/lidar/points"}, # doctest: +SKIP
  1897. ... time_range=TimeRange(start_time=1000000000, end_time=5000000000), # doctest: +SKIP
  1898. ... message_types={"sensor_msgs/Image", "sensor_msgs/PointCloud2"} # doctest: +SKIP
  1899. ... ) # doctest: +SKIP
  1900. Alternatively, use a tuple for time range (backwards compatible).
  1901. >>> ds = ray.data.read_mcap( # doctest: +SKIP
  1902. ... "s3://bucket/mcap-data/", # doctest: +SKIP
  1903. ... topics={"/camera/image_raw", "/lidar/points"}, # doctest: +SKIP
  1904. ... time_range=(1000000000, 5000000000), # doctest: +SKIP
  1905. ... ) # doctest: +SKIP
  1906. Read multiple local files with include_paths.
  1907. >>> ray.data.read_mcap( # doctest: +SKIP
  1908. ... ["local:///path/to/file1.mcap", "local:///path/to/file2.mcap"], # doctest: +SKIP
  1909. ... include_paths=True # doctest: +SKIP
  1910. ... ) # doctest: +SKIP
  1911. Read with topic filtering and metadata inclusion.
  1912. >>> ds = ray.data.read_mcap( # doctest: +SKIP
  1913. ... "data.mcap", # doctest: +SKIP
  1914. ... topics={"/camera/image_raw", "/lidar/points"}, # doctest: +SKIP
  1915. ... include_metadata=True, # doctest: +SKIP
  1916. ... include_paths=True # doctest: +SKIP
  1917. ... ) # doctest: +SKIP
  1918. Args:
  1919. paths: A single file or directory, or a list of file or directory paths.
  1920. A list of paths can contain both files and directories.
  1921. topics: Optional list or set of topic names to include. If specified, only
  1922. messages from these topics will be read.
  1923. time_range: Optional time range for filtering messages by timestamp. Can be either
  1924. a tuple of (start_time, end_time) in nanoseconds (for backwards compatibility)
  1925. or a TimeRange object. Both values must be non-negative and start_time < end_time.
  1926. message_types: Optional list or set of message type names (schema names) to
  1927. include. Only messages with matching schema names will be read.
  1928. include_metadata: Whether to include MCAP metadata fields in the output.
  1929. Defaults to True. When True, includes schema, channel, and message metadata.
  1930. filesystem: The PyArrow filesystem implementation to read from.
  1931. parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
  1932. num_cpus: The number of CPUs to reserve for each parallel read worker.
  1933. num_gpus: The number of GPUs to reserve for each parallel read worker. For
  1934. example, specify `num_gpus=1` to request 1 GPU for each parallel read worker.
  1935. memory: The heap memory in bytes to reserve for each parallel read worker.
  1936. ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
  1937. partition_filter: A :class:`~ray.data.datasource.partitioning.PathPartitionFilter`.
  1938. Use with a custom callback to read only selected partitions of a dataset.
  1939. partitioning: A :class:`~ray.data.datasource.partitioning.Partitioning` object
  1940. that describes how paths are organized. Defaults to ``None``.
  1941. include_paths: If ``True``, include the path to each file. File paths are
  1942. stored in the ``'path'`` column.
  1943. ignore_missing_paths: If True, ignores any file paths in ``paths`` that are not
  1944. found. Defaults to False.
  1945. shuffle: If setting to "files", randomly shuffle input files order before read.
  1946. If setting to :class:`~ray.data.FileShuffleConfig`, you can pass a seed to
  1947. shuffle the input files. Defaults to not shuffle with ``None``.
  1948. file_extensions: A list of file extensions to filter files by.
  1949. Defaults to ``["mcap"]``.
  1950. concurrency: The maximum number of Ray tasks to run concurrently. Set this
  1951. to control number of tasks to run concurrently. This doesn't change the
  1952. total number of tasks run or the total number of output blocks. By default,
  1953. concurrency is dynamically decided based on the available resources.
  1954. override_num_blocks: Override the number of output blocks from all read tasks.
  1955. By default, the number of output blocks is dynamically decided based on
  1956. input data size and available resources. You shouldn't manually set this
  1957. value in most cases.
  1958. Returns:
  1959. :class:`~ray.data.Dataset` producing records read from the specified MCAP files.
  1960. """
  1961. _validate_shuffle_arg(shuffle)
  1962. if file_extensions is None:
  1963. file_extensions = ["mcap"]
  1964. # Convert tuple time_range to TimeRange for backwards compatibility
  1965. if time_range is not None and isinstance(time_range, tuple):
  1966. if len(time_range) != 2:
  1967. raise ValueError(
  1968. "Time range must be a tuple of (start_time, end_time): got "
  1969. f"{time_range}"
  1970. )
  1971. time_range = TimeRange(start_time=time_range[0], end_time=time_range[1])
  1972. datasource = MCAPDatasource(
  1973. paths,
  1974. topics=topics,
  1975. time_range=time_range,
  1976. message_types=message_types,
  1977. include_metadata=include_metadata,
  1978. filesystem=filesystem,
  1979. meta_provider=DefaultFileMetadataProvider(),
  1980. partition_filter=partition_filter,
  1981. partitioning=partitioning,
  1982. ignore_missing_paths=ignore_missing_paths,
  1983. shuffle=shuffle,
  1984. include_paths=include_paths,
  1985. file_extensions=file_extensions,
  1986. )
  1987. return read_datasource(
  1988. datasource,
  1989. parallelism=parallelism,
  1990. num_cpus=num_cpus,
  1991. num_gpus=num_gpus,
  1992. memory=memory,
  1993. ray_remote_args=ray_remote_args,
  1994. concurrency=concurrency,
  1995. override_num_blocks=override_num_blocks,
  1996. )
  1997. @PublicAPI(stability="alpha")
  1998. def read_webdataset(
  1999. paths: Union[str, List[str]],
  2000. *,
  2001. filesystem: Optional["pyarrow.fs.FileSystem"] = None,
  2002. parallelism: int = -1,
  2003. arrow_open_stream_args: Optional[Dict[str, Any]] = None,
  2004. partition_filter: Optional[PathPartitionFilter] = None,
  2005. decoder: Optional[Union[bool, str, callable, list]] = True,
  2006. fileselect: Optional[Union[list, callable]] = None,
  2007. filerename: Optional[Union[list, callable]] = None,
  2008. suffixes: Optional[Union[list, callable]] = None,
  2009. verbose_open: bool = False,
  2010. shuffle: Optional[Union[Literal["files"], FileShuffleConfig]] = None,
  2011. include_paths: bool = False,
  2012. file_extensions: Optional[List[str]] = None,
  2013. concurrency: Optional[int] = None,
  2014. override_num_blocks: Optional[int] = None,
  2015. expand_json: bool = False,
  2016. ) -> Dataset:
  2017. """Create a :class:`~ray.data.Dataset` from
  2018. `WebDataset <https://github.com/webdataset/webdataset>`_ files.
  2019. Args:
  2020. paths: A single file/directory path or a list of file/directory paths.
  2021. A list of paths can contain both files and directories.
  2022. filesystem: The filesystem implementation to read from.
  2023. parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
  2024. arrow_open_stream_args: Key-word arguments passed to
  2025. `pyarrow.fs.FileSystem.open_input_stream <https://arrow.apache.org/docs/python/generated/pyarrow.fs.FileSystem.html>`_.
  2026. To read a compressed TFRecord file,
  2027. pass the corresponding compression type (e.g. for ``GZIP`` or ``ZLIB``, use
  2028. ``arrow_open_stream_args={'compression': 'gzip'}``).
  2029. partition_filter: Path-based partition filter, if any. Can be used
  2030. with a custom callback to read only selected partitions of a dataset.
  2031. decoder: A function or list of functions to decode the data.
  2032. fileselect: A callable or list of glob patterns to select files.
  2033. filerename: A function or list of tuples to rename files prior to grouping.
  2034. suffixes: A function or list of suffixes to select for creating samples.
  2035. verbose_open: Whether to print the file names as they are opened.
  2036. shuffle: If setting to "files", randomly shuffle input files order before read.
  2037. if setting to ``FileShuffleConfig``, the random seed can be passed toshuffle the
  2038. input files, i.e. ``FileShuffleConfig(seed = 42)``.
  2039. Defaults to not shuffle with ``None``.
  2040. include_paths: If ``True``, include the path to each file. File paths are
  2041. stored in the ``'path'`` column.
  2042. file_extensions: A list of file extensions to filter files by.
  2043. concurrency: The maximum number of Ray tasks to run concurrently. Set this
  2044. to control number of tasks to run concurrently. This doesn't change the
  2045. total number of tasks run or the total number of output blocks. By default,
  2046. concurrency is dynamically decided based on the available resources.
  2047. override_num_blocks: Override the number of output blocks from all read tasks.
  2048. By default, the number of output blocks is dynamically decided based on
  2049. input data size and available resources. You shouldn't manually set this
  2050. value in most cases.
  2051. expand_json: If ``True``, expand JSON objects into individual samples.
  2052. Defaults to ``False``.
  2053. Returns:
  2054. A :class:`~ray.data.Dataset` that contains the example features.
  2055. Raises:
  2056. ValueError: If a file contains a message that isn't a `tf.train.Example`_.
  2057. .. _tf.train.Example: https://www.tensorflow.org/api_docs/python/tf/train/Example
  2058. """ # noqa: E501
  2059. datasource = WebDatasetDatasource(
  2060. paths,
  2061. decoder=decoder,
  2062. fileselect=fileselect,
  2063. filerename=filerename,
  2064. suffixes=suffixes,
  2065. verbose_open=verbose_open,
  2066. filesystem=filesystem,
  2067. open_stream_args=arrow_open_stream_args,
  2068. meta_provider=DefaultFileMetadataProvider(),
  2069. partition_filter=partition_filter,
  2070. shuffle=shuffle,
  2071. include_paths=include_paths,
  2072. file_extensions=file_extensions,
  2073. expand_json=expand_json,
  2074. )
  2075. return read_datasource(
  2076. datasource,
  2077. parallelism=parallelism,
  2078. concurrency=concurrency,
  2079. override_num_blocks=override_num_blocks,
  2080. )
  2081. @PublicAPI
  2082. def read_binary_files(
  2083. paths: Union[str, List[str]],
  2084. *,
  2085. include_paths: bool = False,
  2086. filesystem: Optional["pyarrow.fs.FileSystem"] = None,
  2087. parallelism: int = -1,
  2088. num_cpus: Optional[float] = None,
  2089. num_gpus: Optional[float] = None,
  2090. memory: Optional[float] = None,
  2091. ray_remote_args: Dict[str, Any] = None,
  2092. arrow_open_stream_args: Optional[Dict[str, Any]] = None,
  2093. partition_filter: Optional[PathPartitionFilter] = None,
  2094. partitioning: Partitioning = None,
  2095. ignore_missing_paths: bool = False,
  2096. shuffle: Optional[Union[Literal["files"], FileShuffleConfig]] = None,
  2097. file_extensions: Optional[List[str]] = None,
  2098. concurrency: Optional[int] = None,
  2099. override_num_blocks: Optional[int] = None,
  2100. ) -> Dataset:
  2101. """Create a :class:`~ray.data.Dataset` from binary files of arbitrary contents.
  2102. Examples:
  2103. Read a file in remote storage.
  2104. >>> import ray
  2105. >>> path = "s3://anonymous@ray-example-data/pdf-sample_0.pdf"
  2106. >>> ds = ray.data.read_binary_files(path)
  2107. >>> ds.schema()
  2108. Column Type
  2109. ------ ----
  2110. bytes binary
  2111. Read multiple local files.
  2112. >>> ray.data.read_binary_files( # doctest: +SKIP
  2113. ... ["local:///path/to/file1", "local:///path/to/file2"])
  2114. Read a file with the filepaths included as a column in the dataset.
  2115. >>> path = "s3://anonymous@ray-example-data/pdf-sample_0.pdf"
  2116. >>> ds = ray.data.read_binary_files(path, include_paths=True)
  2117. >>> ds.take(1)[0]["path"]
  2118. 'ray-example-data/pdf-sample_0.pdf'
  2119. Args:
  2120. paths: A single file or directory, or a list of file or directory paths.
  2121. A list of paths can contain both files and directories.
  2122. include_paths: If ``True``, include the path to each file. File paths are
  2123. stored in the ``'path'`` column.
  2124. filesystem: The PyArrow filesystem
  2125. implementation to read from. These filesystems are specified in the
  2126. `PyArrow docs <https://arrow.apache.org/docs/python/api/\
  2127. filesystems.html#filesystem-implementations>`_. Specify this parameter if
  2128. you need to provide specific configurations to the filesystem. By default,
  2129. the filesystem is automatically selected based on the scheme of the paths.
  2130. For example, if the path begins with ``s3://``, the `S3FileSystem` is used.
  2131. parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
  2132. num_cpus: The number of CPUs to reserve for each parallel read worker.
  2133. num_gpus: The number of GPUs to reserve for each parallel read worker. For
  2134. example, specify `num_gpus=1` to request 1 GPU for each parallel read
  2135. worker.
  2136. memory: The heap memory in bytes to reserve for each parallel read worker.
  2137. ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
  2138. arrow_open_stream_args: kwargs passed to
  2139. `pyarrow.fs.FileSystem.open_input_file <https://arrow.apache.org/docs/\
  2140. python/generated/pyarrow.fs.FileSystem.html\
  2141. #pyarrow.fs.FileSystem.open_input_stream>`_.
  2142. partition_filter: A
  2143. :class:`~ray.data.datasource.partitioning.PathPartitionFilter`.
  2144. Use with a custom callback to read only selected partitions of a
  2145. dataset. By default, no files are filtered.
  2146. By default, this does not filter out any files.
  2147. partitioning: A :class:`~ray.data.datasource.partitioning.Partitioning` object
  2148. that describes how paths are organized. Defaults to ``None``.
  2149. ignore_missing_paths: If True, ignores any file paths in ``paths`` that are not
  2150. found. Defaults to False.
  2151. shuffle: If setting to "files", randomly shuffle input files order before read.
  2152. If setting to :class:`~ray.data.FileShuffleConfig`, you can pass a seed to
  2153. shuffle the input files. Defaults to not shuffle with ``None``.
  2154. file_extensions: A list of file extensions to filter files by.
  2155. concurrency: The maximum number of Ray tasks to run concurrently. Set this
  2156. to control number of tasks to run concurrently. This doesn't change the
  2157. total number of tasks run or the total number of output blocks. By default,
  2158. concurrency is dynamically decided based on the available resources.
  2159. override_num_blocks: Override the number of output blocks from all read tasks.
  2160. By default, the number of output blocks is dynamically decided based on
  2161. input data size and available resources. You shouldn't manually set this
  2162. value in most cases.
  2163. Returns:
  2164. :class:`~ray.data.Dataset` producing rows read from the specified paths.
  2165. """
  2166. datasource = BinaryDatasource(
  2167. paths,
  2168. include_paths=include_paths,
  2169. filesystem=filesystem,
  2170. open_stream_args=arrow_open_stream_args,
  2171. meta_provider=DefaultFileMetadataProvider(),
  2172. partition_filter=partition_filter,
  2173. partitioning=partitioning,
  2174. ignore_missing_paths=ignore_missing_paths,
  2175. shuffle=shuffle,
  2176. file_extensions=file_extensions,
  2177. )
  2178. return read_datasource(
  2179. datasource,
  2180. num_cpus=num_cpus,
  2181. num_gpus=num_gpus,
  2182. memory=memory,
  2183. parallelism=parallelism,
  2184. ray_remote_args=ray_remote_args,
  2185. concurrency=concurrency,
  2186. override_num_blocks=override_num_blocks,
  2187. )
  2188. @PublicAPI(stability="alpha")
  2189. def read_sql(
  2190. sql: str,
  2191. connection_factory: Callable[[], Connection],
  2192. *,
  2193. sql_params: Optional[Any] = None,
  2194. shard_keys: Optional[list[str]] = None,
  2195. shard_hash_fn: str = "MD5",
  2196. parallelism: int = -1,
  2197. num_cpus: Optional[float] = None,
  2198. num_gpus: Optional[float] = None,
  2199. memory: Optional[float] = None,
  2200. ray_remote_args: Optional[Dict[str, Any]] = None,
  2201. concurrency: Optional[int] = None,
  2202. override_num_blocks: Optional[int] = None,
  2203. ) -> Dataset:
  2204. """Read from a database that provides a
  2205. `Python DB API2-compliant <https://peps.python.org/pep-0249/>`_ connector.
  2206. .. note::
  2207. Parallelism is supported by databases that support sharding. This means
  2208. that the database needs to support all of the following operations:
  2209. ``MOD``, ``ABS``, and ``CONCAT``.
  2210. You can use ``shard_hash_fn`` to specify the hash function to use for sharding.
  2211. The default is ``MD5``, but other common alternatives include ``hash``,
  2212. ``unicode``, and ``SHA``.
  2213. If the database does not support sharding, the read operation will be
  2214. executed in a single task.
  2215. Examples:
  2216. For examples of reading from larger databases like MySQL and PostgreSQL, see
  2217. :ref:`Reading from SQL Databases <reading_sql>`.
  2218. .. testcode::
  2219. import sqlite3
  2220. import ray
  2221. # Create a simple database
  2222. connection = sqlite3.connect("example.db")
  2223. connection.execute("CREATE TABLE movie(title, year, score)")
  2224. connection.execute(
  2225. \"\"\"
  2226. INSERT INTO movie VALUES
  2227. ('Monty Python and the Holy Grail', 1975, 8.2),
  2228. ("Monty Python Live at the Hollywood Bowl", 1982, 7.9),
  2229. ("Monty Python's Life of Brian", 1979, 8.0),
  2230. ("Rocky II", 1979, 7.3)
  2231. \"\"\"
  2232. )
  2233. connection.commit()
  2234. connection.close()
  2235. def create_connection():
  2236. return sqlite3.connect("example.db")
  2237. # Get all movies
  2238. ds = ray.data.read_sql("SELECT * FROM movie", create_connection)
  2239. # Get movies after the year 1980
  2240. ds = ray.data.read_sql(
  2241. "SELECT title, score FROM movie WHERE year >= 1980", create_connection
  2242. )
  2243. # Get the number of movies per year
  2244. ds = ray.data.read_sql(
  2245. "SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
  2246. )
  2247. .. testcode::
  2248. :hide:
  2249. import os
  2250. os.remove("example.db")
  2251. Args:
  2252. sql: The SQL query to execute.
  2253. connection_factory: A function that takes no arguments and returns a
  2254. Python DB API2
  2255. `Connection object <https://peps.python.org/pep-0249/#connection-objects>`_.
  2256. sql_params: Parameters to bind to the SQL query. Use the placeholder style
  2257. required by your database connector (per Python DB API2).
  2258. shard_keys: The keys to shard the data by.
  2259. shard_hash_fn: The hash function string to use for sharding. Defaults to "MD5".
  2260. For other databases, common alternatives include "hash" and "SHA".
  2261. This is applied to the shard keys.
  2262. parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
  2263. num_cpus: The number of CPUs to reserve for each parallel read worker.
  2264. num_gpus: The number of GPUs to reserve for each parallel read worker. For
  2265. example, specify `num_gpus=1` to request 1 GPU for each parallel read
  2266. worker.
  2267. memory: The heap memory in bytes to reserve for each parallel read worker.
  2268. ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
  2269. concurrency: The maximum number of Ray tasks to run concurrently. Set this
  2270. to control number of tasks to run concurrently. This doesn't change the
  2271. total number of tasks run or the total number of output blocks. By default,
  2272. concurrency is dynamically decided based on the available resources.
  2273. override_num_blocks: Override the number of output blocks from all read tasks.
  2274. This is used for sharding when shard_keys is provided.
  2275. By default, the number of output blocks is dynamically decided based on
  2276. input data size and available resources. You shouldn't manually set this
  2277. value in most cases.
  2278. Returns:
  2279. A :class:`Dataset` containing the queried data.
  2280. """
  2281. datasource = SQLDatasource(
  2282. sql=sql,
  2283. sql_params=sql_params,
  2284. shard_keys=shard_keys,
  2285. shard_hash_fn=shard_hash_fn,
  2286. connection_factory=connection_factory,
  2287. )
  2288. if override_num_blocks and override_num_blocks > 1:
  2289. if shard_keys is None:
  2290. raise ValueError("shard_keys must be provided when override_num_blocks > 1")
  2291. if not datasource.supports_sharding(override_num_blocks):
  2292. raise ValueError(
  2293. "Database does not support sharding. Please set override_num_blocks to 1."
  2294. )
  2295. return read_datasource(
  2296. datasource,
  2297. num_cpus=num_cpus,
  2298. num_gpus=num_gpus,
  2299. memory=memory,
  2300. parallelism=parallelism,
  2301. ray_remote_args=ray_remote_args,
  2302. concurrency=concurrency,
  2303. override_num_blocks=override_num_blocks,
  2304. )
  2305. @PublicAPI(stability="alpha")
  2306. def read_snowflake(
  2307. sql: str,
  2308. connection_parameters: Dict[str, Any],
  2309. *,
  2310. shard_keys: Optional[list[str]] = None,
  2311. parallelism: int = -1,
  2312. num_cpus: Optional[float] = None,
  2313. num_gpus: Optional[float] = None,
  2314. memory: Optional[float] = None,
  2315. ray_remote_args: Dict[str, Any] = None,
  2316. concurrency: Optional[int] = None,
  2317. override_num_blocks: Optional[int] = None,
  2318. ) -> Dataset:
  2319. """Read data from a Snowflake data set.
  2320. Example:
  2321. .. testcode::
  2322. :skipif: True
  2323. import ray
  2324. connection_parameters = dict(
  2325. user=...,
  2326. account="ABCDEFG-ABC12345",
  2327. password=...,
  2328. database="SNOWFLAKE_SAMPLE_DATA",
  2329. schema="TPCDS_SF100TCL"
  2330. )
  2331. ds = ray.data.read_snowflake("SELECT * FROM CUSTOMERS", connection_parameters)
  2332. Args:
  2333. sql: The SQL query to execute.
  2334. connection_parameters: Keyword arguments to pass to
  2335. ``snowflake.connector.connect``. To view supported parameters, read
  2336. https://docs.snowflake.com/developer-guide/python-connector/python-connector-api#functions.
  2337. shard_keys: The keys to shard the data by.
  2338. parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
  2339. num_cpus: The number of CPUs to reserve for each parallel read worker.
  2340. num_gpus: The number of GPUs to reserve for each parallel read worker. For
  2341. example, specify `num_gpus=1` to request 1 GPU for each parallel read
  2342. worker.
  2343. memory: The heap memory in bytes to reserve for each parallel read worker.
  2344. ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
  2345. concurrency: The maximum number of Ray tasks to run concurrently. Set this
  2346. to control number of tasks to run concurrently. This doesn't change the
  2347. total number of tasks run or the total number of output blocks. By default,
  2348. concurrency is dynamically decided based on the available resources.
  2349. override_num_blocks: Override the number of output blocks from all read tasks.
  2350. This is used for sharding when shard_keys is provided.
  2351. By default, the number of output blocks is dynamically decided based on
  2352. input data size and available resources. You shouldn't manually set this
  2353. value in most cases.
  2354. Returns:
  2355. A ``Dataset`` containing the data from the Snowflake data set.
  2356. """ # noqa: E501
  2357. import snowflake.connector
  2358. def snowflake_connection_factory():
  2359. return snowflake.connector.connect(**connection_parameters)
  2360. return ray.data.read_sql(
  2361. sql,
  2362. connection_factory=snowflake_connection_factory,
  2363. shard_keys=shard_keys,
  2364. shard_hash_fn="hash",
  2365. parallelism=parallelism,
  2366. num_cpus=num_cpus,
  2367. num_gpus=num_gpus,
  2368. memory=memory,
  2369. ray_remote_args=ray_remote_args,
  2370. concurrency=concurrency,
  2371. override_num_blocks=override_num_blocks,
  2372. )
  2373. @PublicAPI(stability="alpha")
  2374. def read_databricks_tables(
  2375. *,
  2376. warehouse_id: str,
  2377. table: Optional[str] = None,
  2378. query: Optional[str] = None,
  2379. catalog: Optional[str] = None,
  2380. schema: Optional[str] = None,
  2381. credential_provider: Optional[DatabricksCredentialProvider] = None,
  2382. parallelism: int = -1,
  2383. num_cpus: Optional[float] = None,
  2384. num_gpus: Optional[float] = None,
  2385. memory: Optional[float] = None,
  2386. ray_remote_args: Optional[Dict[str, Any]] = None,
  2387. concurrency: Optional[int] = None,
  2388. override_num_blocks: Optional[int] = None,
  2389. ) -> Dataset:
  2390. """Read a Databricks unity catalog table or Databricks SQL execution result.
  2391. Before calling this API, set the ``DATABRICKS_TOKEN`` environment
  2392. variable to your Databricks warehouse access token.
  2393. .. code-block:: console
  2394. export DATABRICKS_TOKEN=...
  2395. If you're not running your program on the Databricks runtime, also set the
  2396. ``DATABRICKS_HOST`` environment variable.
  2397. .. code-block:: console
  2398. export DATABRICKS_HOST=adb-<workspace-id>.<random-number>.azuredatabricks.net
  2399. Alternatively, you can provide a custom credential provider for more advanced
  2400. authentication scenarios (e.g., token refresh, dynamic credentials). Create a
  2401. subclass of ``DatabricksCredentialProvider`` and pass it via the
  2402. ``credential_provider`` parameter.
  2403. .. note::
  2404. This function is built on the
  2405. `Databricks statement execution API <https://docs.databricks.com/api/workspace/statementexecution>`_.
  2406. Examples:
  2407. Read using environment variables:
  2408. .. testcode::
  2409. :skipif: True
  2410. import ray
  2411. ds = ray.data.read_databricks_tables(
  2412. warehouse_id='...',
  2413. catalog='catalog_1',
  2414. schema='db_1',
  2415. query='select id from table_1 limit 750000',
  2416. )
  2417. Read using a custom credential provider:
  2418. .. testcode::
  2419. :skipif: True
  2420. from ray.data._internal.datasource.databricks_credentials import (
  2421. DatabricksCredentialProvider,
  2422. )
  2423. class MyCredentialProvider(DatabricksCredentialProvider):
  2424. def get_token(self) -> str:
  2425. return "my-token" # Fetch token from custom source
  2426. def get_host(self) -> str:
  2427. return "my-host.databricks.com"
  2428. def invalidate(self) -> None:
  2429. pass # Clear cached credentials if applicable
  2430. ds = ray.data.read_databricks_tables(
  2431. warehouse_id='...',
  2432. catalog='catalog_1',
  2433. schema='db_1',
  2434. query='select id from table_1 limit 750000',
  2435. credential_provider=MyCredentialProvider(),
  2436. )
  2437. Args:
  2438. warehouse_id: The ID of the Databricks warehouse. The query statement is
  2439. executed on this warehouse.
  2440. table: The name of UC table you want to read. If this argument is set,
  2441. you can't set ``query`` argument, and the reader generates query
  2442. of ``select * from {table_name}`` under the hood.
  2443. query: The query you want to execute. If this argument is set,
  2444. you can't set ``table_name`` argument.
  2445. catalog: (Optional) The default catalog name used by the query.
  2446. schema: (Optional) The default schema used by the query.
  2447. credential_provider: (Optional) A custom credential provider for
  2448. authentication. Must be a subclass of ``DatabricksCredentialProvider``
  2449. implementing ``get_token()``, ``get_host()``, and ``invalidate()``.
  2450. The provider must be picklable (serializable) as it is sent to Ray
  2451. workers for distributed execution. If provided, the provider is used
  2452. exclusively and environment variables are ignored.
  2453. parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
  2454. num_cpus: The number of CPUs to reserve for each parallel read worker.
  2455. num_gpus: The number of GPUs to reserve for each parallel read worker. For
  2456. example, specify `num_gpus=1` to request 1 GPU for each parallel read
  2457. worker.
  2458. memory: The heap memory in bytes to reserve for each parallel read worker.
  2459. ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
  2460. concurrency: The maximum number of Ray tasks to run concurrently. Set this
  2461. to control number of tasks to run concurrently. This doesn't change the
  2462. total number of tasks run or the total number of output blocks. By default,
  2463. concurrency is dynamically decided based on the available resources.
  2464. override_num_blocks: Override the number of output blocks from all read tasks.
  2465. By default, the number of output blocks is dynamically decided based on
  2466. input data size and available resources. You shouldn't manually set this
  2467. value in most cases.
  2468. Returns:
  2469. A :class:`Dataset` containing the queried data.
  2470. """ # noqa: E501
  2471. # Resolve credential provider (single source of truth for token and host)
  2472. from ray.data._internal.datasource.databricks_credentials import (
  2473. resolve_credential_provider,
  2474. )
  2475. from ray.data._internal.datasource.databricks_uc_datasource import (
  2476. DatabricksUCDatasource,
  2477. )
  2478. resolved_provider = resolve_credential_provider(
  2479. credential_provider=credential_provider
  2480. )
  2481. if not catalog:
  2482. from ray.util.spark.utils import get_spark_session
  2483. catalog = get_spark_session().sql("SELECT CURRENT_CATALOG()").collect()[0][0]
  2484. if not schema:
  2485. from ray.util.spark.utils import get_spark_session
  2486. schema = get_spark_session().sql("SELECT CURRENT_DATABASE()").collect()[0][0]
  2487. if query is not None and table is not None:
  2488. raise ValueError("Only one of 'query' and 'table' arguments can be set.")
  2489. if table:
  2490. query = f"select * from {table}"
  2491. if query is None:
  2492. raise ValueError("One of 'query' and 'table' arguments should be set.")
  2493. datasource = DatabricksUCDatasource(
  2494. warehouse_id=warehouse_id,
  2495. catalog=catalog,
  2496. schema=schema,
  2497. query=query,
  2498. credential_provider=resolved_provider,
  2499. )
  2500. return read_datasource(
  2501. datasource=datasource,
  2502. parallelism=parallelism,
  2503. num_cpus=num_cpus,
  2504. num_gpus=num_gpus,
  2505. memory=memory,
  2506. ray_remote_args=ray_remote_args,
  2507. concurrency=concurrency,
  2508. override_num_blocks=override_num_blocks,
  2509. )
  2510. @PublicAPI(stability="alpha")
  2511. def read_hudi(
  2512. table_uri: str,
  2513. *,
  2514. query_type: str = "snapshot",
  2515. filters: Optional[List[Tuple[str, str, str]]] = None,
  2516. hudi_options: Optional[Dict[str, str]] = None,
  2517. storage_options: Optional[Dict[str, str]] = None,
  2518. num_cpus: Optional[float] = None,
  2519. num_gpus: Optional[float] = None,
  2520. memory: Optional[float] = None,
  2521. ray_remote_args: Optional[Dict[str, Any]] = None,
  2522. concurrency: Optional[int] = None,
  2523. override_num_blocks: Optional[int] = None,
  2524. ) -> Dataset:
  2525. """
  2526. Create a :class:`~ray.data.Dataset` from an
  2527. `Apache Hudi table <https://hudi.apache.org>`_.
  2528. Examples:
  2529. >>> import ray
  2530. >>> ds = ray.data.read_hudi( # doctest: +SKIP
  2531. ... table_uri="/hudi/trips",
  2532. ... query_type="snapshot",
  2533. ... filters=[("city", "=", "san_francisco")],
  2534. ... )
  2535. >>> ds = ray.data.read_hudi( # doctest: +SKIP
  2536. ... table_uri="/hudi/trips",
  2537. ... query_type="incremental",
  2538. ... hudi_options={
  2539. ... "hoodie.read.file_group.start_timestamp": "20230101123456789",
  2540. ... "hoodie.read.file_group.end_timestamp": "20230201123456789",
  2541. ... },
  2542. ... )
  2543. Args:
  2544. table_uri: The URI of the Hudi table to read from. Local file paths, S3, and GCS are supported.
  2545. query_type: The Hudi query type to use. Supported values are ``snapshot`` and ``incremental``.
  2546. filters: Optional list of filters to apply to the Hudi table when the
  2547. ``query_type`` is ``snapshot``. Each filter is a tuple of the form
  2548. ``(column_name, operator, value)``. The operator can be
  2549. one of ``"="``, ``"!="``, ``"<"``, ``"<="``, ``">"``, ``">="``.
  2550. Currently, only filters on partition columns will be effective.
  2551. hudi_options: A dictionary of Hudi options to pass to the Hudi reader.
  2552. storage_options: Extra options that make sense for a particular storage
  2553. connection. This is used to store connection parameters like credentials,
  2554. endpoint, etc. See more explanation
  2555. `here <https://github.com/apache/hudi-rs?tab=readme-ov-file#work-with-cloud-storage>`_.
  2556. num_cpus: The number of CPUs to reserve for each parallel read worker.
  2557. num_gpus: The number of GPUs to reserve for each parallel read worker. For
  2558. example, specify `num_gpus=1` to request 1 GPU for each parallel read
  2559. worker.
  2560. memory: The heap memory in bytes to reserve for each parallel read worker.
  2561. ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
  2562. concurrency: The maximum number of Ray tasks to run concurrently. Set this
  2563. to control number of tasks to run concurrently. This doesn't change the
  2564. total number of tasks run or the total number of output blocks. By default,
  2565. concurrency is dynamically decided based on the available resources.
  2566. override_num_blocks: Override the number of output blocks from all read tasks.
  2567. By default, the number of output blocks is dynamically decided based on
  2568. input data size and available resources. You shouldn't manually set this
  2569. value in most cases.
  2570. Returns:
  2571. A :class:`~ray.data.Dataset` producing records read from the Hudi table.
  2572. """ # noqa: E501
  2573. datasource = HudiDatasource(
  2574. table_uri=table_uri,
  2575. query_type=query_type,
  2576. filters=filters,
  2577. hudi_options=hudi_options,
  2578. storage_options=storage_options,
  2579. )
  2580. return read_datasource(
  2581. datasource=datasource,
  2582. ray_remote_args=ray_remote_args,
  2583. num_cpus=num_cpus,
  2584. num_gpus=num_gpus,
  2585. memory=memory,
  2586. concurrency=concurrency,
  2587. override_num_blocks=override_num_blocks,
  2588. )
  2589. @PublicAPI
  2590. def from_daft(df: "daft.DataFrame") -> Dataset:
  2591. """Create a :class:`~ray.data.Dataset` from a `Daft DataFrame <https://docs.getdaft.io/en/stable/api/dataframe/>`_.
  2592. .. warning::
  2593. This function only works with PyArrow 13 or lower. For more details, see
  2594. https://github.com/ray-project/ray/issues/53278.
  2595. Args:
  2596. df: A Daft DataFrame
  2597. Returns:
  2598. A :class:`~ray.data.Dataset` holding rows read from the DataFrame.
  2599. """
  2600. pyarrow_version = get_pyarrow_version()
  2601. assert pyarrow_version is not None
  2602. if pyarrow_version >= parse_version("14.0.0"):
  2603. raise RuntimeError(
  2604. "`from_daft` only works with PyArrow 13 or lower. For more details, see "
  2605. "https://github.com/ray-project/ray/issues/53278."
  2606. )
  2607. # NOTE: Today this returns a MaterializedDataset. We should also integrate Daft such
  2608. # that we can stream object references into a Ray dataset. Unfortunately this is
  2609. # very tricky today because of the way Ray Datasources are implemented with a fully-
  2610. # materialized `list` of ReadTasks, rather than an iterator which can lazily return
  2611. # these tasks.
  2612. return df.to_ray_dataset()
  2613. @PublicAPI
  2614. def from_dask(df: "dask.dataframe.DataFrame") -> MaterializedDataset:
  2615. """Create a :class:`~ray.data.Dataset` from a
  2616. `Dask DataFrame <https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.html#dask.dataframe.DataFrame>`_.
  2617. Args:
  2618. df: A `Dask DataFrame`_.
  2619. Returns:
  2620. A :class:`~ray.data.MaterializedDataset` holding rows read from the DataFrame.
  2621. """ # noqa: E501
  2622. import dask
  2623. from ray.util.dask import ray_dask_get
  2624. partitions = df.to_delayed()
  2625. persisted_partitions = dask.persist(*partitions, scheduler=ray_dask_get)
  2626. import pandas
  2627. def to_ref(df):
  2628. if isinstance(df, pandas.DataFrame):
  2629. return ray.put(df)
  2630. elif isinstance(df, ray.ObjectRef):
  2631. return df
  2632. else:
  2633. raise ValueError(
  2634. f"Expected a Ray object ref or a Pandas DataFrame, got {type(df)}"
  2635. )
  2636. ds = from_pandas_refs(
  2637. [to_ref(next(iter(part.dask.values()))) for part in persisted_partitions],
  2638. )
  2639. return ds
  2640. @PublicAPI
  2641. def from_mars(df: "mars.dataframe.DataFrame") -> MaterializedDataset:
  2642. """Create a :class:`~ray.data.Dataset` from a
  2643. `Mars DataFrame <https://mars-project.readthedocs.io/en/latest/reference/dataframe/index.html>`_.
  2644. Args:
  2645. df: A `Mars DataFrame`_, which must be executed by Mars-on-Ray.
  2646. Returns:
  2647. A :class:`~ray.data.MaterializedDataset` holding rows read from the DataFrame.
  2648. """ # noqa: E501
  2649. import mars.dataframe as md
  2650. ds: Dataset = md.to_ray_dataset(df)
  2651. return ds
  2652. @PublicAPI
  2653. def from_modin(df: "modin.pandas.dataframe.DataFrame") -> MaterializedDataset:
  2654. """Create a :class:`~ray.data.Dataset` from a
  2655. `Modin DataFrame <https://modin.readthedocs.io/en/stable/flow/modin/pandas/dataframe.html>`_.
  2656. Args:
  2657. df: A `Modin DataFrame`_, which must be using the Ray backend.
  2658. Returns:
  2659. A :class:`~ray.data.MaterializedDataset` rows read from the DataFrame.
  2660. """ # noqa: E501
  2661. from modin.distributed.dataframe.pandas.partitions import unwrap_partitions
  2662. parts = unwrap_partitions(df, axis=0)
  2663. ds = from_pandas_refs(parts)
  2664. return ds
  2665. @PublicAPI
  2666. def from_pandas(
  2667. dfs: Union["pandas.DataFrame", List["pandas.DataFrame"]],
  2668. override_num_blocks: Optional[int] = None,
  2669. ) -> MaterializedDataset:
  2670. """Create a :class:`~ray.data.Dataset` from a list of pandas dataframes.
  2671. Examples:
  2672. >>> import pandas as pd
  2673. >>> import ray
  2674. >>> df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
  2675. >>> ray.data.from_pandas(df) # doctest: +ELLIPSIS
  2676. shape: (3, 2)
  2677. ╭───────┬───────╮
  2678. │ a ┆ b │
  2679. │ --- ┆ --- │
  2680. │ int64 ┆ int64 │
  2681. ╞═══════╪═══════╡
  2682. │ 1 ┆ 4 │
  2683. │ 2 ┆ 5 │
  2684. │ 3 ┆ 6 │
  2685. ╰───────┴───────╯
  2686. (Showing 3 of 3 rows)
  2687. Create a Ray Dataset from a list of Pandas DataFrames.
  2688. >>> ray.data.from_pandas([df, df]) # doctest: +ELLIPSIS
  2689. shape: (6, 2)
  2690. ╭───────┬───────╮
  2691. │ a ┆ b │
  2692. │ --- ┆ --- │
  2693. │ int64 ┆ int64 │
  2694. ╞═══════╪═══════╡
  2695. │ 1 ┆ 4 │
  2696. │ 2 ┆ 5 │
  2697. │ 3 ┆ 6 │
  2698. │ 1 ┆ 4 │
  2699. │ 2 ┆ 5 │
  2700. │ 3 ┆ 6 │
  2701. ╰───────┴───────╯
  2702. (Showing 6 of 6 rows)
  2703. Args:
  2704. dfs: A pandas dataframe or a list of pandas dataframes.
  2705. override_num_blocks: Override the number of output blocks from all read tasks.
  2706. By default, the number of output blocks is dynamically decided based on
  2707. input data size and available resources. You shouldn't manually set this
  2708. value in most cases.
  2709. Returns:
  2710. :class:`~ray.data.Dataset` holding data read from the dataframes.
  2711. """
  2712. import pandas as pd
  2713. if isinstance(dfs, pd.DataFrame):
  2714. dfs = [dfs]
  2715. if override_num_blocks is not None:
  2716. if len(dfs) > 1:
  2717. # I assume most users pass a single DataFrame as input. For simplicity, I'm
  2718. # concatenating DataFrames, even though it's not efficient.
  2719. ary = pd.concat(dfs, axis=0)
  2720. else:
  2721. ary = dfs[0]
  2722. dfs = np.array_split(ary, override_num_blocks)
  2723. from ray.data.util.data_batch_conversion import (
  2724. _cast_ndarray_columns_to_tensor_extension,
  2725. )
  2726. context = DataContext.get_current()
  2727. if context.enable_tensor_extension_casting:
  2728. dfs = [_cast_ndarray_columns_to_tensor_extension(df.copy()) for df in dfs]
  2729. return from_pandas_refs([ray.put(df) for df in dfs])
  2730. @DeveloperAPI
  2731. def from_pandas_refs(
  2732. dfs: Union[ObjectRef["pandas.DataFrame"], List[ObjectRef["pandas.DataFrame"]]],
  2733. ) -> MaterializedDataset:
  2734. """Create a :class:`~ray.data.Dataset` from a list of Ray object references to
  2735. pandas dataframes.
  2736. Examples:
  2737. >>> import pandas as pd
  2738. >>> import ray
  2739. >>> df_ref = ray.put(pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}))
  2740. >>> ray.data.from_pandas_refs(df_ref) # doctest: +ELLIPSIS
  2741. shape: (3, 2)
  2742. ╭───────┬───────╮
  2743. │ a ┆ b │
  2744. │ --- ┆ --- │
  2745. │ int64 ┆ int64 │
  2746. ╞═══════╪═══════╡
  2747. │ 1 ┆ 4 │
  2748. │ 2 ┆ 5 │
  2749. │ 3 ┆ 6 │
  2750. ╰───────┴───────╯
  2751. (Showing 3 of 3 rows)
  2752. Create a Ray Dataset from a list of Pandas Dataframes references.
  2753. >>> ray.data.from_pandas_refs([df_ref, df_ref]) # doctest: +ELLIPSIS
  2754. shape: (6, 2)
  2755. ╭───────┬───────╮
  2756. │ a ┆ b │
  2757. │ --- ┆ --- │
  2758. │ int64 ┆ int64 │
  2759. ╞═══════╪═══════╡
  2760. │ 1 ┆ 4 │
  2761. │ 2 ┆ 5 │
  2762. │ 3 ┆ 6 │
  2763. │ 1 ┆ 4 │
  2764. │ 2 ┆ 5 │
  2765. │ 3 ┆ 6 │
  2766. ╰───────┴───────╯
  2767. (Showing 6 of 6 rows)
  2768. Args:
  2769. dfs: A Ray object reference to a pandas dataframe, or a list of
  2770. Ray object references to pandas dataframes.
  2771. Returns:
  2772. :class:`~ray.data.Dataset` holding data read from the dataframes.
  2773. """
  2774. if isinstance(dfs, ray.ObjectRef):
  2775. dfs = [dfs]
  2776. elif isinstance(dfs, list):
  2777. for df in dfs:
  2778. if not isinstance(df, ray.ObjectRef):
  2779. raise ValueError(
  2780. f"Expected list of Ray object refs, got list containing {type(df)}"
  2781. )
  2782. else:
  2783. raise ValueError(
  2784. f"Expected Ray object ref or list of Ray object refs, got {type(df)}"
  2785. )
  2786. context = DataContext.get_current()
  2787. if context.enable_pandas_block:
  2788. get_metadata_schema = cached_remote_fn(get_table_block_metadata_schema)
  2789. metadata_schema = ray.get([get_metadata_schema.remote(df) for df in dfs])
  2790. execution_plan = ExecutionPlan(
  2791. DatasetStats(metadata={"FromPandas": metadata_schema}, parent=None),
  2792. DataContext.get_current().copy(),
  2793. )
  2794. logical_plan = LogicalPlan(
  2795. FromPandas(dfs, metadata_schema), execution_plan._context
  2796. )
  2797. return MaterializedDataset(
  2798. execution_plan,
  2799. logical_plan,
  2800. )
  2801. df_to_block = cached_remote_fn(pandas_df_to_arrow_block, num_returns=2)
  2802. res = [df_to_block.remote(df) for df in dfs]
  2803. blocks, metadata_schema = map(list, zip(*res))
  2804. metadata_schema = ray.get(metadata_schema)
  2805. execution_plan = ExecutionPlan(
  2806. DatasetStats(metadata={"FromPandas": metadata_schema}, parent=None),
  2807. DataContext.get_current().copy(),
  2808. )
  2809. logical_plan = LogicalPlan(
  2810. FromPandas(blocks, metadata_schema), execution_plan._context
  2811. )
  2812. return MaterializedDataset(
  2813. execution_plan,
  2814. logical_plan,
  2815. )
  2816. @PublicAPI
  2817. def from_numpy(ndarrays: Union[np.ndarray, List[np.ndarray]]) -> MaterializedDataset:
  2818. """Creates a :class:`~ray.data.Dataset` from a list of NumPy ndarrays.
  2819. The column name defaults to "data".
  2820. Examples:
  2821. >>> import numpy as np
  2822. >>> import ray
  2823. >>> arr = np.array([1])
  2824. >>> ray.data.from_numpy(arr) # doctest: +ELLIPSIS
  2825. shape: (1, 1)
  2826. ╭───────╮
  2827. │ data │
  2828. │ --- │
  2829. │ int64 │
  2830. ╞═══════╡
  2831. │ 1 │
  2832. ╰───────╯
  2833. (Showing 1 of 1 rows)
  2834. Create a Ray Dataset from a list of NumPy arrays.
  2835. >>> ray.data.from_numpy([arr, arr]) # doctest: +ELLIPSIS
  2836. shape: (2, 1)
  2837. ╭───────╮
  2838. │ data │
  2839. │ --- │
  2840. │ int64 │
  2841. ╞═══════╡
  2842. │ 1 │
  2843. │ 1 │
  2844. ╰───────╯
  2845. (Showing 2 of 2 rows)
  2846. Args:
  2847. ndarrays: A NumPy ndarray or a list of NumPy ndarrays.
  2848. Returns:
  2849. :class:`~ray.data.Dataset` holding data from the given ndarrays.
  2850. """
  2851. if isinstance(ndarrays, np.ndarray):
  2852. ndarrays = [ndarrays]
  2853. return from_numpy_refs([ray.put(ndarray) for ndarray in ndarrays])
  2854. @DeveloperAPI
  2855. def from_numpy_refs(
  2856. ndarrays: Union[ObjectRef[np.ndarray], List[ObjectRef[np.ndarray]]],
  2857. ) -> MaterializedDataset:
  2858. """Creates a :class:`~ray.data.Dataset` from a list of Ray object references to
  2859. NumPy ndarrays.
  2860. The column name defaults to "data".
  2861. Examples:
  2862. >>> import numpy as np
  2863. >>> import ray
  2864. >>> arr_ref = ray.put(np.array([1]))
  2865. >>> ray.data.from_numpy_refs(arr_ref) # doctest: +ELLIPSIS
  2866. shape: (1, 1)
  2867. ╭───────╮
  2868. │ data │
  2869. │ --- │
  2870. │ int64 │
  2871. ╞═══════╡
  2872. │ 1 │
  2873. ╰───────╯
  2874. (Showing 1 of 1 rows)
  2875. Create a Ray Dataset from a list of NumPy array references.
  2876. >>> ray.data.from_numpy_refs([arr_ref, arr_ref]) # doctest: +ELLIPSIS
  2877. shape: (2, 1)
  2878. ╭───────╮
  2879. │ data │
  2880. │ --- │
  2881. │ int64 │
  2882. ╞═══════╡
  2883. │ 1 │
  2884. │ 1 │
  2885. ╰───────╯
  2886. (Showing 2 of 2 rows)
  2887. Args:
  2888. ndarrays: A Ray object reference to a NumPy ndarray or a list of Ray object
  2889. references to NumPy ndarrays.
  2890. Returns:
  2891. :class:`~ray.data.Dataset` holding data from the given ndarrays.
  2892. """
  2893. if isinstance(ndarrays, ray.ObjectRef):
  2894. ndarrays = [ndarrays]
  2895. elif isinstance(ndarrays, list):
  2896. for ndarray in ndarrays:
  2897. if not isinstance(ndarray, ray.ObjectRef):
  2898. raise ValueError(
  2899. "Expected list of Ray object refs, "
  2900. f"got list containing {type(ndarray)}"
  2901. )
  2902. else:
  2903. raise ValueError(
  2904. f"Expected Ray object ref or list of Ray object refs, got {type(ndarray)}"
  2905. )
  2906. ctx = DataContext.get_current()
  2907. ndarray_to_block_remote = cached_remote_fn(ndarray_to_block, num_returns=2)
  2908. res = [ndarray_to_block_remote.remote(ndarray, ctx) for ndarray in ndarrays]
  2909. blocks, metadata_schema = map(list, zip(*res))
  2910. metadata_schema = ray.get(metadata_schema)
  2911. execution_plan = ExecutionPlan(
  2912. DatasetStats(metadata={"FromNumpy": metadata_schema}, parent=None),
  2913. DataContext.get_current().copy(),
  2914. )
  2915. logical_plan = LogicalPlan(
  2916. FromNumpy(blocks, metadata_schema), execution_plan._context
  2917. )
  2918. return MaterializedDataset(
  2919. execution_plan,
  2920. logical_plan,
  2921. )
  2922. @PublicAPI
  2923. def from_arrow(
  2924. tables: Union["pyarrow.Table", bytes, List[Union["pyarrow.Table", bytes]]],
  2925. *,
  2926. override_num_blocks: Optional[int] = None,
  2927. ) -> MaterializedDataset:
  2928. """Create a :class:`~ray.data.Dataset` from a list of PyArrow tables.
  2929. Examples:
  2930. >>> import pyarrow as pa
  2931. >>> import ray
  2932. >>> table = pa.table({"x": [1]})
  2933. >>> ray.data.from_arrow(table) # doctest: +ELLIPSIS
  2934. shape: (1, 1)
  2935. ╭───────╮
  2936. │ x │
  2937. │ --- │
  2938. │ int64 │
  2939. ╞═══════╡
  2940. │ 1 │
  2941. ╰───────╯
  2942. (Showing 1 of 1 rows)
  2943. Create a Ray Dataset from a list of PyArrow tables.
  2944. >>> ray.data.from_arrow([table, table]) # doctest: +ELLIPSIS
  2945. shape: (2, 1)
  2946. ╭───────╮
  2947. │ x │
  2948. │ --- │
  2949. │ int64 │
  2950. ╞═══════╡
  2951. │ 1 │
  2952. │ 1 │
  2953. ╰───────╯
  2954. (Showing 2 of 2 rows)
  2955. Args:
  2956. tables: A PyArrow table, or a list of PyArrow tables,
  2957. or its streaming format in bytes.
  2958. override_num_blocks: Override the number of output blocks from all read tasks.
  2959. By default, the number of output blocks is dynamically decided based on
  2960. input data size and available resources. You shouldn't manually set this
  2961. value in most cases.
  2962. Returns:
  2963. :class:`~ray.data.Dataset` holding data from the PyArrow tables.
  2964. """
  2965. import builtins
  2966. import pyarrow as pa
  2967. if isinstance(tables, (pa.Table, bytes)):
  2968. tables = [tables]
  2969. if override_num_blocks is not None:
  2970. if override_num_blocks <= 0:
  2971. raise ValueError("override_num_blocks must be > 0")
  2972. combined_table = pa.concat_tables(tables) if len(tables) > 1 else tables[0]
  2973. total_rows = len(combined_table)
  2974. if total_rows == 0:
  2975. # Handle empty table case
  2976. tables = [
  2977. combined_table.slice(0, 0) for _ in builtins.range(override_num_blocks)
  2978. ]
  2979. else:
  2980. batch_size = (total_rows + override_num_blocks - 1) // override_num_blocks
  2981. slices = []
  2982. for i in builtins.range(override_num_blocks):
  2983. start = i * batch_size
  2984. if start >= total_rows:
  2985. break
  2986. length = min(batch_size, total_rows - start)
  2987. slices.append(combined_table.slice(start, length))
  2988. # Pad with empty slices if needed
  2989. if len(slices) < override_num_blocks:
  2990. empty_table = combined_table.slice(0, 0)
  2991. slices.extend([empty_table] * (override_num_blocks - len(slices)))
  2992. tables = slices
  2993. return from_arrow_refs([ray.put(t) for t in tables])
  2994. @DeveloperAPI
  2995. def from_arrow_refs(
  2996. tables: Union[
  2997. ObjectRef[Union["pyarrow.Table", bytes]],
  2998. List[ObjectRef[Union["pyarrow.Table", bytes]]],
  2999. ],
  3000. ) -> MaterializedDataset:
  3001. """Create a :class:`~ray.data.Dataset` from a list of Ray object references to
  3002. PyArrow tables.
  3003. Examples:
  3004. >>> import pyarrow as pa
  3005. >>> import ray
  3006. >>> table_ref = ray.put(pa.table({"x": [1]}))
  3007. >>> ray.data.from_arrow_refs(table_ref) # doctest: +ELLIPSIS
  3008. shape: (1, 1)
  3009. ╭───────╮
  3010. │ x │
  3011. │ --- │
  3012. │ int64 │
  3013. ╞═══════╡
  3014. │ 1 │
  3015. ╰───────╯
  3016. (Showing 1 of 1 rows)
  3017. Create a Ray Dataset from a list of PyArrow table references
  3018. >>> ray.data.from_arrow_refs([table_ref, table_ref]) # doctest: +ELLIPSIS
  3019. shape: (2, 1)
  3020. ╭───────╮
  3021. │ x │
  3022. │ --- │
  3023. │ int64 │
  3024. ╞═══════╡
  3025. │ 1 │
  3026. │ 1 │
  3027. ╰───────╯
  3028. (Showing 2 of 2 rows)
  3029. Args:
  3030. tables: A Ray object reference to Arrow table, or list of Ray object
  3031. references to Arrow tables, or its streaming format in bytes.
  3032. Returns:
  3033. :class:`~ray.data.Dataset` holding data read from the tables.
  3034. """
  3035. if isinstance(tables, ray.ObjectRef):
  3036. tables = [tables]
  3037. get_metadata_schema = cached_remote_fn(get_table_block_metadata_schema)
  3038. metadata_schema = ray.get([get_metadata_schema.remote(t) for t in tables])
  3039. execution_plan = ExecutionPlan(
  3040. DatasetStats(metadata={"FromArrow": metadata_schema}, parent=None),
  3041. DataContext.get_current().copy(),
  3042. )
  3043. logical_plan = LogicalPlan(
  3044. FromArrow(tables, metadata_schema), execution_plan._context
  3045. )
  3046. return MaterializedDataset(
  3047. execution_plan,
  3048. logical_plan,
  3049. )
  3050. @PublicAPI(stability="alpha")
  3051. def read_delta_sharing_tables(
  3052. url: str,
  3053. *,
  3054. limit: Optional[int] = None,
  3055. version: Optional[int] = None,
  3056. timestamp: Optional[str] = None,
  3057. json_predicate_hints: Optional[str] = None,
  3058. ray_remote_args: Optional[Dict[str, Any]] = None,
  3059. num_cpus: Optional[float] = None,
  3060. num_gpus: Optional[float] = None,
  3061. memory: Optional[float] = None,
  3062. concurrency: Optional[int] = None,
  3063. override_num_blocks: Optional[int] = None,
  3064. ) -> Dataset:
  3065. """
  3066. Read data from a Delta Sharing table.
  3067. Delta Sharing projct https://github.com/delta-io/delta-sharing/tree/main
  3068. This function reads data from a Delta Sharing table specified by the URL.
  3069. It supports various options such as limiting the number of rows, specifying
  3070. a version or timestamp, and configuring concurrency.
  3071. Before calling this function, ensure that the URL is correctly formatted
  3072. to point to the Delta Sharing table you want to access. Make sure you have
  3073. a valid delta_share profile in the working directory.
  3074. Examples:
  3075. .. testcode::
  3076. :skipif: True
  3077. import ray
  3078. ds = ray.data.read_delta_sharing_tables(
  3079. url=f"your-profile.json#your-share-name.your-schema-name.your-table-name",
  3080. limit=100000,
  3081. version=1,
  3082. )
  3083. Args:
  3084. url: A URL under the format
  3085. "<profile-file-path>#<share-name>.<schema-name>.<table-name>".
  3086. Example can be found at
  3087. https://github.com/delta-io/delta-sharing/blob/main/README.md#quick-start
  3088. limit: A non-negative integer. Load only the ``limit`` rows if the
  3089. parameter is specified. Use this optional parameter to explore the
  3090. shared table without loading the entire table into memory.
  3091. version: A non-negative integer. Load the snapshot of the table at
  3092. the specified version.
  3093. timestamp: A timestamp to specify the version of the table to read.
  3094. json_predicate_hints: Predicate hints to be applied to the table. For more
  3095. details, see:
  3096. https://github.com/delta-io/delta-sharing/blob/main/PROTOCOL.md#json-predicates-for-filtering.
  3097. ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
  3098. num_cpus: The number of CPUs to reserve for each parallel read worker.
  3099. num_gpus: The number of GPUs to reserve for each parallel read worker. For
  3100. example, specify `num_gpus=1` to request 1 GPU for each parallel read
  3101. worker.
  3102. memory: The heap memory in bytes to reserve for each parallel read worker.
  3103. concurrency: The maximum number of Ray tasks to run concurrently. Set this
  3104. to control the number of tasks to run concurrently. This doesn't change the
  3105. total number of tasks run or the total number of output blocks. By default,
  3106. concurrency is dynamically decided based on the available resources.
  3107. override_num_blocks: Override the number of output blocks from all read tasks.
  3108. By default, the number of output blocks is dynamically decided based on
  3109. input data size and available resources. You shouldn't manually set this
  3110. value in most cases.
  3111. Returns:
  3112. A :class:`Dataset` containing the queried data.
  3113. Raises:
  3114. ValueError: If the URL is not properly formatted or if there is an issue
  3115. with the Delta Sharing table connection.
  3116. """
  3117. datasource = DeltaSharingDatasource(
  3118. url=url,
  3119. json_predicate_hints=json_predicate_hints,
  3120. limit=limit,
  3121. version=version,
  3122. timestamp=timestamp,
  3123. )
  3124. # DeltaSharing limit is at the add_files level, it will not return
  3125. # exactly the limit number of rows but it will return less files and rows.
  3126. return ray.data.read_datasource(
  3127. datasource=datasource,
  3128. ray_remote_args=ray_remote_args,
  3129. num_cpus=num_cpus,
  3130. num_gpus=num_gpus,
  3131. memory=memory,
  3132. concurrency=concurrency,
  3133. override_num_blocks=override_num_blocks,
  3134. )
  3135. @PublicAPI
  3136. def from_spark(
  3137. df: "pyspark.sql.DataFrame",
  3138. *,
  3139. parallelism: Optional[int] = None,
  3140. override_num_blocks: Optional[int] = None,
  3141. ) -> MaterializedDataset:
  3142. """Create a :class:`~ray.data.Dataset` from a
  3143. `Spark DataFrame <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html>`_.
  3144. Args:
  3145. df: A `Spark DataFrame`_, which must be created by RayDP (Spark-on-Ray).
  3146. parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
  3147. override_num_blocks: Override the number of output blocks from all read tasks.
  3148. By default, the number of output blocks is dynamically decided based on
  3149. input data size and available resources. You shouldn't manually set this
  3150. value in most cases.
  3151. Returns:
  3152. A :class:`~ray.data.MaterializedDataset` holding rows read from the DataFrame.
  3153. """ # noqa: E501
  3154. import raydp
  3155. parallelism = _get_num_output_blocks(parallelism, override_num_blocks)
  3156. return raydp.spark.spark_dataframe_to_ray_dataset(df, parallelism)
  3157. @PublicAPI
  3158. def from_huggingface(
  3159. dataset: Union["datasets.Dataset", "datasets.IterableDataset"],
  3160. parallelism: int = -1,
  3161. concurrency: Optional[int] = None,
  3162. override_num_blocks: Optional[int] = None,
  3163. ) -> Union[MaterializedDataset, Dataset]:
  3164. """Read a Hugging Face Dataset into a Ray Dataset.
  3165. Creates a :class:`~ray.data.MaterializedDataset` from a
  3166. `Hugging Face Datasets Dataset <https://huggingface.co/docs/datasets/package_reference/main_classes#datasets.Dataset/>`_
  3167. or a :class:`~ray.data.Dataset` from a `Hugging Face Datasets IterableDataset <https://huggingface.co/docs/datasets/package_reference/main_classes#datasets.IterableDataset/>`_.
  3168. It is recommended to use :func:`~ray.data.read_parquet` with the ``HfFileSystem``
  3169. filesystem to read Hugging Face datasets rather than ``from_huggingface``.
  3170. See :ref:`Loading Hugging Face datasets <loading_huggingface_datasets>` for more details.
  3171. Args:
  3172. dataset: A `Hugging Face Datasets Dataset`_ or `Hugging Face Datasets IterableDataset`_.
  3173. `DatasetDict <https://huggingface.co/docs/datasets/package_reference/main_classes#datasets.DatasetDict/>`_
  3174. and `IterableDatasetDict <https://huggingface.co/docs/datasets/package_reference/main_classes#datasets.IterableDatasetDict/>`_
  3175. are not supported.
  3176. parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
  3177. concurrency: The maximum number of Ray tasks to run concurrently. Set this
  3178. to control number of tasks to run concurrently. This doesn't change the
  3179. total number of tasks run or the total number of output blocks. By default,
  3180. concurrency is dynamically decided based on the available resources.
  3181. override_num_blocks: Override the number of output blocks from all read tasks.
  3182. By default, the number of output blocks is dynamically decided based on
  3183. input data size and available resources. You shouldn't manually set this
  3184. value in most of cases.
  3185. Returns:
  3186. A :class:`~ray.data.Dataset` holding rows from the `Hugging Face Datasets Dataset`_.
  3187. """ # noqa: E501
  3188. import datasets
  3189. from aiohttp.client_exceptions import ClientResponseError
  3190. from ray.data._internal.datasource.huggingface_datasource import (
  3191. HuggingFaceDatasource,
  3192. )
  3193. if isinstance(dataset, (datasets.IterableDataset, datasets.Dataset)):
  3194. try:
  3195. # Attempt to read data via Hugging Face Hub parquet files. If the
  3196. # returned list of files is empty, attempt read via other methods.
  3197. file_urls = HuggingFaceDatasource.list_parquet_urls_from_dataset(dataset)
  3198. if len(file_urls) > 0:
  3199. # Resolve HTTP 302 redirects
  3200. import requests
  3201. resolved_urls = []
  3202. for url in file_urls:
  3203. try:
  3204. resp = requests.head(url, allow_redirects=True, timeout=5)
  3205. if resp.status_code == 200:
  3206. resolved_urls.append(resp.url)
  3207. else:
  3208. logger.warning(
  3209. f"Unexpected status {resp.status_code} resolving {url} from "
  3210. f"Hugging Face Hub parquet files"
  3211. )
  3212. except requests.RequestException as e:
  3213. logger.warning(
  3214. f"Failed to resolve {url}: {e} from Hugging Face Hub parquet files"
  3215. )
  3216. if not resolved_urls:
  3217. raise FileNotFoundError(
  3218. "No resolvable Parquet URLs found from Hugging Face Hub parquet files"
  3219. )
  3220. # If file urls are returned, the parquet files are available via API
  3221. # TODO: Add support for reading from http filesystem in
  3222. # FileBasedDatasource. GH Issue:
  3223. # https://github.com/ray-project/ray/issues/42706
  3224. import fsspec.implementations.http
  3225. http = fsspec.implementations.http.HTTPFileSystem()
  3226. return read_parquet(
  3227. resolved_urls,
  3228. parallelism=parallelism,
  3229. filesystem=http,
  3230. concurrency=concurrency,
  3231. override_num_blocks=override_num_blocks,
  3232. # The resolved HTTP URLs might not contain a `.parquet` suffix. So,
  3233. # we override the default file extension filter and allow all files.
  3234. file_extensions=None,
  3235. ray_remote_args={
  3236. "retry_exceptions": [FileNotFoundError, ClientResponseError]
  3237. },
  3238. )
  3239. except (FileNotFoundError, ClientResponseError):
  3240. logger.warning(
  3241. "Distributed read via Hugging Face Hub parquet files failed, "
  3242. "falling back on single node read."
  3243. )
  3244. if isinstance(dataset, datasets.IterableDataset):
  3245. # For an IterableDataset, we can use a streaming implementation to read data.
  3246. return read_datasource(
  3247. HuggingFaceDatasource(dataset=dataset),
  3248. parallelism=parallelism,
  3249. concurrency=concurrency,
  3250. override_num_blocks=override_num_blocks,
  3251. )
  3252. if isinstance(dataset, datasets.Dataset):
  3253. # To get the resulting Arrow table from a Hugging Face Dataset after
  3254. # applying transformations (e.g., train_test_split(), shard(), select()),
  3255. # we create a copy of the Arrow table, which applies the indices
  3256. # mapping from the transformations.
  3257. hf_ds_arrow = dataset.with_format("arrow")
  3258. ray_ds = from_arrow(hf_ds_arrow[:], override_num_blocks=override_num_blocks)
  3259. return ray_ds
  3260. if isinstance(dataset, (datasets.DatasetDict, datasets.IterableDatasetDict)):
  3261. available_keys = list(dataset.keys())
  3262. raise DeprecationWarning(
  3263. "You provided a Hugging Face DatasetDict or IterableDatasetDict, "
  3264. "which contains multiple datasets, but `from_huggingface` now "
  3265. "only accepts a single Hugging Face Dataset. To convert just "
  3266. "a single Hugging Face Dataset to a Ray Dataset, specify a split. "
  3267. "For example, `ray.data.from_huggingface(my_dataset_dictionary"
  3268. f"['{available_keys[0]}'])`. "
  3269. f"Available splits are {available_keys}."
  3270. )
  3271. else:
  3272. raise TypeError(
  3273. f"`dataset` must be a `datasets.Dataset`, but got {type(dataset)}"
  3274. )
  3275. @PublicAPI
  3276. def from_tf(
  3277. dataset: "tf.data.Dataset",
  3278. ) -> MaterializedDataset:
  3279. """Create a :class:`~ray.data.Dataset` from a
  3280. `TensorFlow Dataset <https://www.tensorflow.org/api_docs/python/tf/data/Dataset/>`_.
  3281. This function is inefficient. Use it to read small datasets or prototype.
  3282. .. warning::
  3283. If your dataset is large, this function may execute slowly or raise an
  3284. out-of-memory error. To avoid issues, read the underyling data with a function
  3285. like :meth:`~ray.data.read_images`.
  3286. .. note::
  3287. This function isn't parallelized. It loads the entire dataset into the local
  3288. node's memory before moving the data to the distributed object store.
  3289. Examples:
  3290. >>> import ray
  3291. >>> import tensorflow_datasets as tfds
  3292. >>> dataset, _ = tfds.load('cifar10', split=["train", "test"]) # doctest: +SKIP
  3293. >>> ds = ray.data.from_tf(dataset) # doctest: +SKIP
  3294. >>> ds # doctest: +SKIP
  3295. MaterializedDataset(
  3296. num_blocks=...,
  3297. num_rows=50000,
  3298. schema={
  3299. id: binary,
  3300. image: ArrowTensorTypeV2(shape=(32, 32, 3), dtype=uint8),
  3301. label: int64
  3302. }
  3303. )
  3304. >>> ds.take(1) # doctest: +SKIP
  3305. [{'id': b'train_16399', 'image': array([[[143, 96, 70],
  3306. [141, 96, 72],
  3307. [135, 93, 72],
  3308. ...,
  3309. [ 96, 37, 19],
  3310. [105, 42, 18],
  3311. [104, 38, 20]],
  3312. ...,
  3313. [[195, 161, 126],
  3314. [187, 153, 123],
  3315. [186, 151, 128],
  3316. ...,
  3317. [212, 177, 147],
  3318. [219, 185, 155],
  3319. [221, 187, 157]]], dtype=uint8), 'label': 7}]
  3320. Args:
  3321. dataset: A `TensorFlow Dataset`_.
  3322. Returns:
  3323. A :class:`MaterializedDataset` that contains the samples stored in the `TensorFlow Dataset`_.
  3324. """ # noqa: E501
  3325. # FIXME: `as_numpy_iterator` errors if `dataset` contains ragged tensors.
  3326. return from_items(list(dataset.as_numpy_iterator()))
  3327. @PublicAPI
  3328. def from_torch(
  3329. dataset: "torch.utils.data.Dataset",
  3330. local_read: bool = False,
  3331. ) -> Dataset:
  3332. """Create a :class:`~ray.data.Dataset` from a
  3333. `Torch Dataset <https://pytorch.org/docs/stable/data.html#torch.utils.data.Dataset/>`_.
  3334. The column name defaults to "data".
  3335. .. note::
  3336. The input dataset can either be map-style or iterable-style, and can have arbitrarily large amount of data.
  3337. The data will be sequentially streamed with one single read task.
  3338. Examples:
  3339. >>> import ray
  3340. >>> from torchvision import datasets
  3341. >>> dataset = datasets.MNIST("data", download=True) # doctest: +SKIP
  3342. >>> ds = ray.data.from_torch(dataset) # doctest: +SKIP
  3343. >>> ds # doctest: +SKIP
  3344. MaterializedDataset(num_blocks=..., num_rows=60000, schema={item: object})
  3345. >>> ds.take(1) # doctest: +SKIP
  3346. {"item": (<PIL.Image.Image image mode=L size=28x28 at 0x...>, 5)}
  3347. Args:
  3348. dataset: A `Torch Dataset`_.
  3349. local_read: If ``True``, perform the read as a local read.
  3350. Returns:
  3351. A :class:`~ray.data.Dataset` containing the Torch dataset samples.
  3352. """ # noqa: E501
  3353. # Files may not be accessible from all nodes, run the read task on current node.
  3354. ray_remote_args = {}
  3355. if local_read:
  3356. ray_remote_args = {
  3357. "scheduling_strategy": NodeAffinitySchedulingStrategy(
  3358. ray.get_runtime_context().get_node_id(),
  3359. soft=False,
  3360. ),
  3361. # The user might have initialized Ray to have num_cpus = 0 for the head
  3362. # node. For a local read we expect the read task to be executed on the
  3363. # head node, so we should set num_cpus = 0 for the task to allow it to
  3364. # run regardless of the user's head node configuration.
  3365. "num_cpus": 0,
  3366. }
  3367. return read_datasource(
  3368. TorchDatasource(dataset=dataset),
  3369. ray_remote_args=ray_remote_args,
  3370. # Only non-parallel, streaming read is currently supported
  3371. override_num_blocks=1,
  3372. )
  3373. @PublicAPI
  3374. def read_iceberg(
  3375. *,
  3376. table_identifier: str,
  3377. row_filter: Union[str, "BooleanExpression"] = None,
  3378. parallelism: int = -1,
  3379. selected_fields: Tuple[str, ...] = ("*",),
  3380. snapshot_id: Optional[int] = None,
  3381. scan_kwargs: Optional[Dict[str, str]] = None,
  3382. catalog_kwargs: Optional[Dict[str, str]] = None,
  3383. ray_remote_args: Optional[Dict[str, Any]] = None,
  3384. num_cpus: Optional[float] = None,
  3385. num_gpus: Optional[float] = None,
  3386. memory: Optional[float] = None,
  3387. override_num_blocks: Optional[int] = None,
  3388. ) -> Dataset:
  3389. """Create a :class:`~ray.data.Dataset` from an Iceberg table.
  3390. The table to read from is specified using a fully qualified ``table_identifier``.
  3391. Using PyIceberg, any intended row filters, selection of specific fields and
  3392. picking of a particular snapshot ID are applied, and the files that satisfy
  3393. the query are distributed across Ray read tasks.
  3394. The number of output blocks is determined by ``override_num_blocks``
  3395. which can be requested from this interface or automatically chosen if
  3396. unspecified.
  3397. .. tip::
  3398. For more details on PyIceberg, see
  3399. - URI: https://py.iceberg.apache.org/
  3400. Examples:
  3401. >>> import ray
  3402. >>> from ray.data.expressions import col #doctest: +SKIP
  3403. >>> # Read the table and apply filters using Ray Data expressions
  3404. >>> ds = ray.data.read_iceberg( #doctest: +SKIP
  3405. ... table_identifier="db_name.table_name",
  3406. ... catalog_kwargs={"name": "default", "type": "glue"}
  3407. ... ).filter(col("column_name") == "literal_value")
  3408. >>> # Select specific columns
  3409. >>> ds = ds.select_columns(["col1", "col2"]) #doctest: +SKIP
  3410. Args:
  3411. table_identifier: Fully qualified table identifier (``db_name.table_name``)
  3412. row_filter: **Deprecated**. Use ``.filter()`` method on the dataset instead.
  3413. A PyIceberg :class:`~pyiceberg.expressions.BooleanExpression`
  3414. to use to filter the data *prior* to reading.
  3415. parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
  3416. selected_fields: **Deprecated**. Use ``.select_columns()`` method on the dataset instead.
  3417. Which columns from the data to read, passed directly to
  3418. PyIceberg's load functions. Should be an tuple of string column names.
  3419. snapshot_id: Optional snapshot ID for the Iceberg table, by default the latest
  3420. snapshot is used
  3421. scan_kwargs: Optional arguments to pass to PyIceberg's Table.scan() function
  3422. (e.g., case_sensitive, limit, etc.)
  3423. catalog_kwargs: Optional arguments to pass to PyIceberg's catalog.load_catalog()
  3424. function (e.g., name, type, etc.). For the function definition, see
  3425. `pyiceberg catalog
  3426. <https://py.iceberg.apache.org/reference/pyiceberg/catalog/\
  3427. #pyiceberg.catalog.load_catalog>`_.
  3428. ray_remote_args: Optional arguments to pass to :func:`ray.remote` in the
  3429. read tasks.
  3430. num_cpus: The number of CPUs to reserve for each parallel read worker.
  3431. num_gpus: The number of GPUs to reserve for each parallel read worker. For
  3432. example, specify `num_gpus=1` to request 1 GPU for each parallel read
  3433. worker.
  3434. memory: The heap memory in bytes to reserve for each parallel read worker.
  3435. override_num_blocks: Override the number of output blocks from all read tasks.
  3436. By default, the number of output blocks is dynamically decided based on
  3437. input data size and available resources, and capped at the number of
  3438. physical files to be read. You shouldn't manually set this value in most
  3439. cases.
  3440. Returns:
  3441. :class:`~ray.data.Dataset` with rows from the Iceberg table.
  3442. """
  3443. from ray.data._internal.datasource.iceberg_datasource import IcebergDatasource
  3444. # Deprecation warning for row_filter parameter
  3445. if row_filter is not None:
  3446. warnings.warn(
  3447. "The 'row_filter' parameter is deprecated and will be removed in a "
  3448. "future release. Use the .filter() method on the dataset instead. "
  3449. "For example: ds = ray.data.read_iceberg(...).filter(col('column') > 5)",
  3450. DeprecationWarning,
  3451. stacklevel=2,
  3452. )
  3453. # Deprecation warning for selected_fields parameter
  3454. if selected_fields != ("*",):
  3455. warnings.warn(
  3456. "The 'selected_fields' parameter is deprecated and will be removed in a "
  3457. "future release. Use the .select_columns() method on the dataset instead. "
  3458. "For example: ds = ray.data.read_iceberg(...).select_columns(['col1', 'col2'])",
  3459. DeprecationWarning,
  3460. stacklevel=2,
  3461. )
  3462. # Setup the Datasource
  3463. datasource = IcebergDatasource(
  3464. table_identifier=table_identifier,
  3465. row_filter=row_filter,
  3466. selected_fields=selected_fields,
  3467. snapshot_id=snapshot_id,
  3468. scan_kwargs=scan_kwargs,
  3469. catalog_kwargs=catalog_kwargs,
  3470. )
  3471. dataset = read_datasource(
  3472. datasource=datasource,
  3473. parallelism=parallelism,
  3474. num_cpus=num_cpus,
  3475. num_gpus=num_gpus,
  3476. memory=memory,
  3477. override_num_blocks=override_num_blocks,
  3478. ray_remote_args=ray_remote_args,
  3479. )
  3480. return dataset
  3481. @PublicAPI
  3482. def read_lance(
  3483. uri: str,
  3484. *,
  3485. version: Optional[Union[int, str]] = None,
  3486. columns: Optional[List[str]] = None,
  3487. filter: Optional[str] = None,
  3488. storage_options: Optional[Dict[str, str]] = None,
  3489. scanner_options: Optional[Dict[str, Any]] = None,
  3490. ray_remote_args: Optional[Dict[str, Any]] = None,
  3491. num_cpus: Optional[float] = None,
  3492. num_gpus: Optional[float] = None,
  3493. memory: Optional[float] = None,
  3494. concurrency: Optional[int] = None,
  3495. override_num_blocks: Optional[int] = None,
  3496. ) -> Dataset:
  3497. """
  3498. Create a :class:`~ray.data.Dataset` from a
  3499. `Lance Dataset <https://lance-format.github.io/lance-python-doc/dataset.html>`_.
  3500. Examples:
  3501. >>> import ray
  3502. >>> ds = ray.data.read_lance( # doctest: +SKIP
  3503. ... uri="./db_name.lance",
  3504. ... columns=["image", "label"],
  3505. ... filter="label = 2 AND text IS NOT NULL",
  3506. ... )
  3507. Args:
  3508. uri: The URI of the Lance dataset to read from. Local file paths, S3, and GCS
  3509. are supported.
  3510. version: Load a specific version of the Lance dataset. This can be an
  3511. integer version number or a string tag. By default, the
  3512. latest version is loaded.
  3513. columns: The columns to read. By default, all columns are read.
  3514. filter: A string that is a valid SQL WHERE clause. Read returns
  3515. only the rows matching the filter. See
  3516. `Lance filter push-down <https://lance.org/guide/read_and_write/#filter-push-down>`_
  3517. for valid SQL expressions. By default, no filter is applied.
  3518. storage_options: Extra options that make sense for a particular storage
  3519. connection. This is used to store connection parameters like credentials,
  3520. endpoint, etc. For more information, see `Object Store Configuration <https://lance.org/guide/object_store/>`_.
  3521. scanner_options: Additional options to configure the `LanceDataset.scanner()`
  3522. method, such as `batch_size`. For more information,
  3523. see `Lance Python API doc <https://lance-format.github.io/lance-python-doc/all-modules.html#lance.dataset.LanceDataset.scanner>`_
  3524. ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
  3525. num_cpus: The number of CPUs to reserve for each parallel read worker.
  3526. num_gpus: The number of GPUs to reserve for each parallel read worker. For
  3527. example, specify `num_gpus=1` to request 1 GPU for each parallel read
  3528. worker.
  3529. memory: The heap memory in bytes to reserve for each parallel read worker.
  3530. concurrency: The maximum number of Ray tasks to run concurrently. Set this
  3531. to control number of tasks to run concurrently. This doesn't change the
  3532. total number of tasks run or the total number of output blocks. By default,
  3533. concurrency is dynamically decided based on the available resources.
  3534. override_num_blocks: Override the number of output blocks from all read tasks.
  3535. By default, the number of output blocks is dynamically decided based on
  3536. input data size and available resources. You shouldn't manually set this
  3537. value in most cases.
  3538. Returns:
  3539. A :class:`~ray.data.Dataset` producing records read from the Lance dataset.
  3540. """ # noqa: E501
  3541. datasource = LanceDatasource(
  3542. uri=uri,
  3543. version=version,
  3544. columns=columns,
  3545. filter=filter,
  3546. storage_options=storage_options,
  3547. scanner_options=scanner_options,
  3548. )
  3549. return read_datasource(
  3550. datasource=datasource,
  3551. ray_remote_args=ray_remote_args,
  3552. num_cpus=num_cpus,
  3553. num_gpus=num_gpus,
  3554. memory=memory,
  3555. concurrency=concurrency,
  3556. override_num_blocks=override_num_blocks,
  3557. )
  3558. @PublicAPI(stability="alpha")
  3559. def read_clickhouse(
  3560. *,
  3561. table: str,
  3562. dsn: str,
  3563. columns: Optional[List[str]] = None,
  3564. filter: Optional[str] = None,
  3565. order_by: Optional[Tuple[List[str], bool]] = None,
  3566. client_settings: Optional[Dict[str, Any]] = None,
  3567. client_kwargs: Optional[Dict[str, Any]] = None,
  3568. ray_remote_args: Optional[Dict[str, Any]] = None,
  3569. num_cpus: Optional[float] = None,
  3570. num_gpus: Optional[float] = None,
  3571. memory: Optional[float] = None,
  3572. concurrency: Optional[int] = None,
  3573. override_num_blocks: Optional[int] = None,
  3574. ) -> Dataset:
  3575. """
  3576. Create a :class:`~ray.data.Dataset` from a ClickHouse table or view.
  3577. Examples:
  3578. >>> import ray
  3579. >>> ds = ray.data.read_clickhouse( # doctest: +SKIP
  3580. ... table="default.table",
  3581. ... dsn="clickhouse+http://username:password@host:8124/default",
  3582. ... columns=["timestamp", "age", "status", "text", "label"],
  3583. ... filter="age > 18 AND status = 'active'",
  3584. ... order_by=(["timestamp"], False),
  3585. ... )
  3586. Args:
  3587. table: Fully qualified table or view identifier (e.g.,
  3588. "default.table_name").
  3589. dsn: A string in standard DSN (Data Source Name) HTTP format (e.g.,
  3590. "clickhouse+http://username:password@host:8124/default").
  3591. For more information, see `ClickHouse Connection String doc
  3592. <https://clickhouse.com/docs/en/integrations/sql-clients/cli#connection_string>`_.
  3593. columns: Optional list of columns to select from the data source.
  3594. If no columns are specified, all columns will be selected by default.
  3595. filter: Optional SQL filter string that will be used in the WHERE statement
  3596. (e.g., "label = 2 AND text IS NOT NULL"). The filter string must be valid for use in
  3597. a ClickHouse SQL WHERE clause. Please Note: Parallel reads are not currently supported
  3598. when a filter is set. Specifying a filter forces the parallelism to 1 to ensure
  3599. deterministic and consistent results. For more information, see `ClickHouse SQL WHERE Clause doc
  3600. <https://clickhouse.com/docs/en/sql-reference/statements/select/where>`_.
  3601. order_by: Optional tuple containing a list of columns to order by and a boolean indicating whether the order
  3602. should be descending (True for DESC, False for ASC). Please Note: order_by is required to support
  3603. parallelism. If not provided, the data will be read in a single task. This is to ensure
  3604. that the data is read in a consistent order across all tasks.
  3605. client_settings: Optional ClickHouse server settings to be used with the session/every request.
  3606. For more information, see `ClickHouse Client Settings
  3607. <https://clickhouse.com/docs/en/integrations/python#settings-argument>`_.
  3608. client_kwargs: Optional additional arguments to pass to the ClickHouse client. For more information,
  3609. see `ClickHouse Core Settings <https://clickhouse.com/docs/en/integrations/python#additional-options>`_.
  3610. ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
  3611. num_cpus: The number of CPUs to reserve for each parallel read worker.
  3612. num_gpus: The number of GPUs to reserve for each parallel read worker. For
  3613. example, specify `num_gpus=1` to request 1 GPU for each parallel read
  3614. worker.
  3615. memory: The heap memory in bytes to reserve for each parallel read worker.
  3616. concurrency: The maximum number of Ray tasks to run concurrently. Set this
  3617. to control number of tasks to run concurrently. This doesn't change the
  3618. total number of tasks run or the total number of output blocks. By default,
  3619. concurrency is dynamically decided based on the available resources.
  3620. override_num_blocks: Override the number of output blocks from all read tasks.
  3621. By default, the number of output blocks is dynamically decided based on
  3622. input data size and available resources. You shouldn't manually set this
  3623. value in most cases.
  3624. Returns:
  3625. A :class:`~ray.data.Dataset` producing records read from the ClickHouse table or view.
  3626. """ # noqa: E501
  3627. datasource = ClickHouseDatasource(
  3628. table=table,
  3629. dsn=dsn,
  3630. columns=columns,
  3631. filter=filter,
  3632. order_by=order_by,
  3633. client_settings=client_settings,
  3634. client_kwargs=client_kwargs,
  3635. )
  3636. return read_datasource(
  3637. datasource=datasource,
  3638. ray_remote_args=ray_remote_args,
  3639. num_cpus=num_cpus,
  3640. num_gpus=num_gpus,
  3641. memory=memory,
  3642. concurrency=concurrency,
  3643. override_num_blocks=override_num_blocks,
  3644. )
  3645. @PublicAPI(stability="alpha")
  3646. def read_unity_catalog(
  3647. table: str,
  3648. url: Optional[str] = None,
  3649. token: Optional[str] = None,
  3650. *,
  3651. credential_provider: Optional["DatabricksCredentialProvider"] = None,
  3652. data_format: Optional[str] = None,
  3653. region: Optional[str] = None,
  3654. reader_kwargs: Optional[dict] = None,
  3655. ) -> Dataset:
  3656. """Loads a Unity Catalog table or files into a Ray Dataset using Databricks Unity Catalog credential vending,
  3657. with automatic short-lived cloud credential handoff for secure, parallel, distributed access from external engines.
  3658. This function works by leveraging Unity Catalog's credential vending feature, which grants temporary, least-privilege
  3659. credentials for the cloud storage location backing the requested table or data files. It authenticates via the Unity Catalog
  3660. REST API (Unity Catalog credential vending for external system access, `Databricks Docs <https://docs.databricks.com/en/external-access/credential-vending.html>`_),
  3661. ensuring that permissions are enforced at the Databricks principal (user, group, or service principal) making the request.
  3662. The function supports reading data directly from AWS S3, Azure Data Lake, or GCP GCS in standard formats including Delta and Parquet.
  3663. .. note::
  3664. This function is experimental and under active development.
  3665. Examples:
  3666. Read a Unity Catalog Delta table:
  3667. >>> import ray
  3668. >>> ds = ray.data.read_unity_catalog( # doctest: +SKIP
  3669. ... table="main.sales.transactions",
  3670. ... url="https://dbc-XXXXXXX-XXXX.cloud.databricks.com",
  3671. ... token="dapi...",
  3672. ... region="us-west-2"
  3673. ... )
  3674. >>> ds.show(3) # doctest: +SKIP
  3675. Read using a custom credential provider:
  3676. >>> from ray.data._internal.datasource.databricks_credentials import ( # doctest: +SKIP
  3677. ... StaticCredentialProvider,
  3678. ... )
  3679. >>> provider = StaticCredentialProvider( # doctest: +SKIP
  3680. ... token="dapi...",
  3681. ... host="https://dbc-XXXXXXX-XXXX.cloud.databricks.com",
  3682. ... )
  3683. >>> ds = ray.data.read_unity_catalog( # doctest: +SKIP
  3684. ... table="main.sales.transactions",
  3685. ... credential_provider=provider,
  3686. ... region="us-west-2"
  3687. ... )
  3688. Args:
  3689. table: Unity Catalog table path in format ``catalog.schema.table``.
  3690. url: Databricks workspace URL (e.g., ``"https://dbc-XXXXXXX-XXXX.cloud.databricks.com"``).
  3691. Required if ``credential_provider`` is not specified. Please prefer to use
  3692. credential_provider instead of url and token parameters. This parameter will be
  3693. deprecated in a future release.
  3694. token: Databricks Personal Access Token with ``EXTERNAL USE SCHEMA`` permission.
  3695. Required if ``credential_provider`` is not specified. Please prefer to use
  3696. credential_provider instead of url and token parameters. This parameter will be
  3697. deprecated in a future release.
  3698. credential_provider: (Optional) A custom credential provider for
  3699. authentication. Must be a subclass of ``DatabricksCredentialProvider``
  3700. implementing ``get_token()``, ``get_host()``, and ``invalidate()``.
  3701. The provider must be picklable (serializable) as it is sent to Ray
  3702. workers for distributed execution. If provided, the provider is used
  3703. exclusively and ``url``/``token`` parameters are ignored.
  3704. data_format: Data format (``"delta"`` or ``"parquet"``). If not specified, inferred from table metadata.
  3705. region: AWS region for S3 access (e.g., ``"us-west-2"``). Required for AWS, not needed for Azure/GCP.
  3706. reader_kwargs: Additional arguments passed to the underlying Ray Data reader.
  3707. Returns:
  3708. A :class:`~ray.data.Dataset` containing the data from Unity Catalog.
  3709. """ # noqa: E501
  3710. from ray.data._internal.datasource.databricks_credentials import (
  3711. StaticCredentialProvider,
  3712. resolve_credential_provider,
  3713. )
  3714. # Resolve credentials: either from credential_provider or from url/token
  3715. if credential_provider is not None:
  3716. resolved_provider = resolve_credential_provider(credential_provider)
  3717. elif url is not None and token is not None:
  3718. # Backwards compatible: create provider from url/token
  3719. resolved_provider = StaticCredentialProvider(token=token, host=url)
  3720. else:
  3721. raise ValueError(
  3722. "Either 'credential_provider' or both 'url' and 'token' must be provided."
  3723. )
  3724. connector = UnityCatalogConnector(
  3725. table_full_name=table,
  3726. credential_provider=resolved_provider,
  3727. data_format=data_format,
  3728. region=region,
  3729. reader_kwargs=reader_kwargs,
  3730. )
  3731. return connector.read()
  3732. @PublicAPI(stability="alpha")
  3733. def read_delta(
  3734. path: Union[str, List[str]],
  3735. version: Optional[int] = None,
  3736. *,
  3737. filesystem: Optional["pyarrow.fs.FileSystem"] = None,
  3738. columns: Optional[List[str]] = None,
  3739. parallelism: int = -1,
  3740. num_cpus: Optional[float] = None,
  3741. num_gpus: Optional[float] = None,
  3742. memory: Optional[float] = None,
  3743. ray_remote_args: Optional[Dict[str, Any]] = None,
  3744. partition_filter: Optional[PathPartitionFilter] = None,
  3745. partitioning: Optional[Partitioning] = Partitioning("hive"),
  3746. shuffle: Union[Literal["files"], None] = None,
  3747. include_paths: bool = False,
  3748. concurrency: Optional[int] = None,
  3749. override_num_blocks: Optional[int] = None,
  3750. **arrow_parquet_args,
  3751. ):
  3752. """Creates a :class:`~ray.data.Dataset` from Delta Lake files.
  3753. Examples:
  3754. >>> import ray
  3755. >>> ds = ray.data.read_delta("s3://bucket@path/to/delta-table/") # doctest: +SKIP
  3756. Args:
  3757. path: A single file path for a Delta Lake table. Multiple tables are not yet
  3758. supported.
  3759. version: The version of the Delta Lake table to read. If not specified, the latest version is read.
  3760. filesystem: The PyArrow filesystem
  3761. implementation to read from. These filesystems are specified in the
  3762. `pyarrow docs <https://arrow.apache.org/docs/python/api/\
  3763. filesystems.html#filesystem-implementations>`_. Specify this parameter if
  3764. you need to provide specific configurations to the filesystem. By default,
  3765. the filesystem is automatically selected based on the scheme of the paths.
  3766. For example, if the path begins with ``s3://``, the ``S3FileSystem`` is
  3767. used. If ``None``, this function uses a system-chosen implementation.
  3768. columns: A list of column names to read. Only the specified columns are
  3769. read during the file scan.
  3770. parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
  3771. num_cpus: The number of CPUs to reserve for each parallel read worker.
  3772. num_gpus: The number of GPUs to reserve for each parallel read worker. For
  3773. example, specify `num_gpus=1` to request 1 GPU for each parallel read
  3774. worker.
  3775. memory: The heap memory in bytes to reserve for each parallel read worker.
  3776. ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks.
  3777. partition_filter: A
  3778. :class:`~ray.data.datasource.partitioning.PathPartitionFilter`. Use
  3779. with a custom callback to read only selected partitions of a dataset.
  3780. partitioning: A :class:`~ray.data.datasource.partitioning.Partitioning` object
  3781. that describes how paths are organized. Defaults to HIVE partitioning.
  3782. shuffle: If setting to "files", randomly shuffle input files order before read.
  3783. Defaults to not shuffle with ``None``.
  3784. include_paths: If ``True``, include the path to each file. File paths are
  3785. stored in the ``'path'`` column.
  3786. concurrency: The maximum number of Ray tasks to run concurrently. Set this
  3787. to control number of tasks to run concurrently. This doesn't change the
  3788. total number of tasks run or the total number of output blocks. By default,
  3789. concurrency is dynamically decided based on the available resources.
  3790. override_num_blocks: Override the number of output blocks from all read tasks.
  3791. By default, the number of output blocks is dynamically decided based on
  3792. input data size and available resources. You shouldn't manually set this
  3793. value in most cases.
  3794. **arrow_parquet_args: Other parquet read options to pass to PyArrow. For the full
  3795. set of arguments, see the `PyArrow API <https://arrow.apache.org/docs/\
  3796. python/generated/pyarrow.dataset.Scanner.html\
  3797. #pyarrow.dataset.Scanner.from_fragment>`_
  3798. Returns:
  3799. :class:`~ray.data.Dataset` producing records read from the specified parquet
  3800. files.
  3801. """
  3802. # Modified from ray.data._internal.util._check_import, which is meant for objects,
  3803. # not functions. Move to _check_import if moved to a DataSource object.
  3804. import importlib
  3805. package = "deltalake"
  3806. try:
  3807. importlib.import_module(package)
  3808. except ImportError:
  3809. raise ImportError(
  3810. f"`ray.data.read_delta` depends on '{package}', but '{package}' "
  3811. f"couldn't be imported. You can install '{package}' by running `pip "
  3812. f"install {package}`."
  3813. )
  3814. from deltalake import DeltaTable
  3815. # This seems reasonable to keep it at one table, even Spark doesn't really support
  3816. # multi-table reads, it's usually up to the developer to keep it in one table.
  3817. if not isinstance(path, str):
  3818. raise ValueError("Only a single Delta Lake table path is supported.")
  3819. # Get the parquet file paths from the DeltaTable
  3820. paths = DeltaTable(path, version=version).file_uris()
  3821. return read_parquet(
  3822. paths,
  3823. filesystem=filesystem,
  3824. columns=columns,
  3825. parallelism=parallelism,
  3826. ray_remote_args=ray_remote_args,
  3827. partition_filter=partition_filter,
  3828. partitioning=partitioning,
  3829. shuffle=shuffle,
  3830. include_paths=include_paths,
  3831. concurrency=concurrency,
  3832. override_num_blocks=override_num_blocks,
  3833. **arrow_parquet_args,
  3834. )
  3835. @PublicAPI(stability="alpha")
  3836. def read_kafka(
  3837. topics: Union[str, List[str]],
  3838. *,
  3839. bootstrap_servers: Union[str, List[str]],
  3840. trigger: Literal["once"] = "once",
  3841. start_offset: Union[int, Literal["earliest"]] = "earliest",
  3842. end_offset: Union[int, Literal["latest"]] = "latest",
  3843. kafka_auth_config: Optional[KafkaAuthConfig] = None,
  3844. num_cpus: Optional[float] = None,
  3845. num_gpus: Optional[float] = None,
  3846. memory: Optional[float] = None,
  3847. ray_remote_args: Optional[Dict[str, Any]] = None,
  3848. override_num_blocks: Optional[int] = None,
  3849. timeout_ms: int = 10000,
  3850. ) -> Dataset:
  3851. """Read data from Kafka topics.
  3852. This function supports bounded reads from Kafka topics, reading messages
  3853. between a start and end offset. Only the "once" trigger is
  3854. supported for now, which performs a single bounded read. Currently we only
  3855. have one read task for each partition.
  3856. Examples:
  3857. .. testcode::
  3858. :skipif: True
  3859. import ray
  3860. # Read from a single topic with offset range
  3861. ds = ray.data.read_kafka(
  3862. topics="my-topic",
  3863. bootstrap_servers="localhost:9092",
  3864. start_offset=0,
  3865. end_offset=1000,
  3866. )
  3867. Args:
  3868. topics: Kafka topic name(s) to read from. Can be a single topic name
  3869. or a list of topic names.
  3870. bootstrap_servers: Kafka broker addresses. Can be a single string or
  3871. a list of strings.
  3872. trigger: Trigger mode for reading. Only "once" is supported, which
  3873. performs a single bounded read.
  3874. start_offset: Starting position for reading. Can be:
  3875. - int: Offset number
  3876. - str: "earliest"
  3877. end_offset: Ending position for reading (exclusive). Can be:
  3878. - int: Offset number
  3879. - str: "latest"
  3880. kafka_auth_config: Authentication configuration. See KafkaAuthConfig for details.
  3881. num_cpus: The number of CPUs to reserve for each parallel read worker.
  3882. num_gpus: The number of GPUs to reserve for each parallel read worker.
  3883. memory: The heap memory in bytes to reserve for each parallel read worker.
  3884. ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
  3885. override_num_blocks: Override the number of output blocks from all read tasks.
  3886. By default, the number of output blocks is dynamically decided based on
  3887. input data size and available resources. You shouldn't manually set this
  3888. value in most cases.
  3889. timeout_ms: Timeout in milliseconds for every read task to poll until reaching end_offset (default 10000ms).
  3890. If the read task does not reach end_offset within the timeout, it will stop polling and return the messages
  3891. it has read so far.
  3892. Returns:
  3893. A :class:`~ray.data.Dataset` containing Kafka messages with the following schema:
  3894. - offset: int64 - Message offset within partition
  3895. - key: binary - Message key as raw bytes
  3896. - value: binary - Message value as raw bytes
  3897. - topic: string - Topic name
  3898. - partition: int32 - Partition ID
  3899. - timestamp: int64 - Message timestamp in milliseconds
  3900. - timestamp_type: int32 - 0=CreateTime, 1=LogAppendTime
  3901. - headers: map<string, binary> - Message headers (keys as strings, values as bytes)
  3902. Raises:
  3903. ValueError: If invalid parameters are provided.
  3904. ImportError: If kafka-python is not installed.
  3905. """ # noqa: E501
  3906. if trigger != "once":
  3907. raise ValueError(f"Only trigger='once' is supported. Got trigger={trigger!r}")
  3908. return ray.data.read_datasource(
  3909. KafkaDatasource(
  3910. topics=topics,
  3911. bootstrap_servers=bootstrap_servers,
  3912. start_offset=start_offset,
  3913. end_offset=end_offset,
  3914. kafka_auth_config=kafka_auth_config,
  3915. timeout_ms=timeout_ms,
  3916. ),
  3917. parallelism=-1,
  3918. num_cpus=num_cpus,
  3919. num_gpus=num_gpus,
  3920. memory=memory,
  3921. ray_remote_args=ray_remote_args,
  3922. override_num_blocks=override_num_blocks,
  3923. )
  3924. def _get_datasource_or_legacy_reader(
  3925. ds: Datasource,
  3926. ctx: DataContext,
  3927. kwargs: dict,
  3928. ) -> Union[Datasource, Reader]:
  3929. """Generates reader.
  3930. Args:
  3931. ds: Datasource to read from.
  3932. ctx: Dataset config to use.
  3933. kwargs: Additional kwargs to pass to the legacy reader if
  3934. `Datasource.create_reader` is implemented.
  3935. Returns:
  3936. The datasource or a generated legacy reader.
  3937. """
  3938. DataContext._set_current(ctx)
  3939. if ds.should_create_reader:
  3940. warnings.warn(
  3941. "`create_reader` has been deprecated in Ray 2.9. Instead of creating a "
  3942. "`Reader`, implement `Datasource.get_read_tasks` and "
  3943. "`Datasource.estimate_inmemory_data_size`.",
  3944. DeprecationWarning,
  3945. )
  3946. datasource_or_legacy_reader = ds.create_reader(**kwargs)
  3947. else:
  3948. datasource_or_legacy_reader = ds
  3949. return datasource_or_legacy_reader
  3950. def _resolve_parquet_args(
  3951. tensor_column_schema: Optional[Dict[str, Tuple[np.dtype, Tuple[int, ...]]]] = None,
  3952. **arrow_parquet_args,
  3953. ) -> Dict[str, Any]:
  3954. if tensor_column_schema is not None:
  3955. existing_block_udf = arrow_parquet_args.pop("_block_udf", None)
  3956. def _block_udf(block: "pyarrow.Table") -> "pyarrow.Table":
  3957. from ray.data.extensions import ArrowTensorArray
  3958. for tensor_col_name, (dtype, shape) in tensor_column_schema.items():
  3959. # NOTE(Clark): We use NumPy to consolidate these potentially
  3960. # non-contiguous buffers, and to do buffer bookkeeping in
  3961. # general.
  3962. np_col = _create_possibly_ragged_ndarray(
  3963. [
  3964. np.ndarray(shape, buffer=buf.as_buffer(), dtype=dtype)
  3965. for buf in block.column(tensor_col_name)
  3966. ]
  3967. )
  3968. block = block.set_column(
  3969. block._ensure_integer_index(tensor_col_name),
  3970. tensor_col_name,
  3971. ArrowTensorArray.from_numpy(np_col, column_name=tensor_col_name),
  3972. )
  3973. if existing_block_udf is not None:
  3974. # Apply UDF after casting the tensor columns.
  3975. block = existing_block_udf(block)
  3976. return block
  3977. arrow_parquet_args["_block_udf"] = _block_udf
  3978. return arrow_parquet_args
  3979. def _get_num_output_blocks(
  3980. parallelism: int = -1,
  3981. override_num_blocks: Optional[int] = None,
  3982. ) -> int:
  3983. if parallelism != -1:
  3984. logger.warning(
  3985. "The argument ``parallelism`` is deprecated in Ray 2.10. Please specify "
  3986. "argument ``override_num_blocks`` instead."
  3987. )
  3988. elif override_num_blocks is not None:
  3989. parallelism = override_num_blocks
  3990. return parallelism