Post

Ray-Core 初见

Ray-Core 初见

什么是Ray

首先基因很重要,所以我们先需要探查下Ray最初是为了解决什么问题而产生的。Ray的论文显示,它最早是为了解决增强学习的挑战而设计的。增强学习的难点在于它是一个需要边学习,边做实时做预测的应用场景,这意味会有不同类型的tasks同时运行,并且他们之间存在复杂的依赖关系,tasks会在运行时动态产生产生新的tasks,现有的一些计算模型肯定是没办法解决的。如果Ray只是为了解决RL事情可能没有那么复杂,但是作者希望它不仅仅能跑增强学习相关的,希望是一个通用的分布式机器学习框架,这就意味着Ray必然要进行分层抽象了,也就是至少要分成系统层应用层

系统层面

既然是分布式的应用,那么肯定需要有一个应用内的resource/task调度和管理。首先是Yarn,K8s等资源调度框架是应用程序级别的的调度,Ray作为一个为了解决具体业务问题的应用,应该要跑在他们上面而不是取代他们,而像Spark/Flink虽然也是基于task级别的资源调度框架,但是因为他们在设计的时候是为了解决一个比较具体的抽象问题,所以系统对task/资源都做了比较高的封装,一般用户是面向业务编程,很难直接操控task以及对应的resource。

我们以Spark为例,用户定义好了数据处理逻辑,至于如何将这些逻辑分成多少个Job,Stage,Task,最后占用多少Resource(CPU,GPU,Memory,Disk) 等等,都是由框架自行决定,而用户无法染指。这也是我一直诟病Spark的地方。所以Ray在系统层面,是一个通用的以task为调度级别的,同时可以针对每个task控制资源粒度的一个通用的分布式task执行系统。

记住,在Ray里,你需要明确定义Task以及Task的依赖,并且为每个task指定合适(数量,资源类型)的资源。比如你需要用三个task处理一份数据,那么你就需要自己启动三个task,并且指定这些task需要的资源(GPU,CPU)以及数量(可以是小数或者整数)。而在Spark,Flink里这是不大可能的。Ray为了让我们做这些事情,默认提供了Python的语言接口,你可以像使用Numpy那样去使用Ray。实际上,也已经有基于Ray做Backend的numpy实现了,当然它属于应用层面的东西了。

Ray系统层面很简单,也是典型的master-worker模式。类似spark的driver-executor模式,不同的是,Ray的worker类似yarn的worker,是负责Resource管理的,具体任务它会启动Python worker去执行你的代码,而spark的executor虽然也会启动Python worker执行python代码,但是对应的executor也执行业务逻辑,和python worker有数据交换和传输。

应用层面

你可以基于Ray的系统进行编程,因为Ray默认提供了Python的编程接口,所以你可以自己实现增强学习库(RLLib),也可以整合已有的算法框架,比如tensorflow,让tensorflow成为Ray上的一个应用,并且轻松实现分布式。我记得知乎上有人说Ray其实就是一个Python的分布式RPC框架,这么说是对的,但是显然会有误导,因为这很可能让人以为他只是“Python分布式RPC框架”。

如何和Spark协作

根据前面我讲述的,我们是可以完全基于Ray实现Spark的大部分API的,只是是Ray backend而非Spark core backend。实际上Ray目前正在做流相关的功能,他们现在要做的就是要兼容Flink的API。虽然官方宣称Ray是一个新一代的机器学习分布式框架,但是他完全可以cover住当前大数据和AI领域的大部分事情,但是任重道远,还需要大量的事情。所以对我而言,我看中的是它良好的Python支持,以及系统层面对资源和task的控制,这使得:

  • 我们可以轻易的把我们的单机Python算法库在Ray里跑起来(虽然算法自身不是分布式的),但是我们可以很好的利用Ray的资源管理和调度功能,从而解决AI平台的资源管理问题。

  • Ray官方提供了大量的机器学习算法的实现,以及对当前机器学习框架如Tensorflow,Pytorch的整合,而分布式能力则比这些库原生提供的模式更靠谱和易用。毕竟对于这些框架而言,支持他们分布式运行的那些辅助库(比如TensorFlow提供parameter servers)相当简陋。

但是,我们知道,数据处理它自身有一个很大的生态,比如你的用户画像数据都在数据湖里,你需要把这些数据进行非常复杂的计算才能作为特征喂给你的机器学习算法。而如果这个时候,你还要面向资源编程(或者使用一个还不够成熟的上层应用)而不是面向“业务”编程,这就显得很难受了,比如我就想用SQL处理数据,我只关注处理的业务逻辑,这个当前Ray以及之上的应用显然还是做不到如Spark那么便利的(毕竟Spark就是为了数据处理而生的),所以最好的方式是,数据的获取和加工依然是在Spark之上,但是数据准备好了就应该丢给用户基于Ray写的代码处理了。Ray可以通过Arrow项目读取HDFS上Spark已经处理好的数据,然后进行训练,然后将模型保存会HDFS。当然对于预测,Ray可以自己消化掉或者丢给其他系统完成。我们知道Spark 在整合Python生态方面做出了非常多的努力,比如他和Ray一样,也提供了python 编程接口,所以spark也较为容易的整合譬如Tensorflow等框架,但是没办法很好的管控资源(比如GPU),而且,spark 的executor 会在他所在的服务器上启动python worker,而spark一般而言是跑在yarn上的,这就对yarn造成了很大的管理麻烦,而且通常yarn 和hdfs之类的都是在一起的,python环境还有资源(CPU/GPU)除了管理难度大以外,还有一个很大的问题是可能会对yarn的集群造成比较大的稳定性风险。

所以最好的模式是按如下步骤开发一个机器学习应用:

  • 写一个python脚本

  • 在数据处理部分,使用pyspark

  • 在程序的算法训练部分,使用ray

  • spark 运行在yarn(k8s)上

  • ray运行在k8s里

好处显而易见:用户完全无感值他的应用其实是跑在两个集群里的,对他来说就是一个普通python脚本。

从架构角度来讲,复杂的python环境管理问题都可以丢给ray集群来完成,spark只要能跑基本的pyspark相关功能即可,数据衔接通过数据湖里的表(其实就是一堆parquet文件)即可。当然,如果最后结果数据不大,也可以直接通过client完成pyspark到ray的交互。

和MLSQL的整合

MLSQL 整合Ray也是非常简单,基本不需要做任何额外的工作就可以很好的整合,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
_- python ray代码_
**set** py_train**=**'''
import ray
ray.init()
@ray.remote
def f(x): return x * x
futures = [f.remote(i) for i in range(4)]
print(ray.get(futures))
''';
**load** script.**`**py_train**`** **as** py_train;
_- 设置需要的python环境描述_
**set** py_env**=**'''
''';
**load** script.**`**py_env**`** **as** py_env;
_- 加载hive的表_
**load** hive.**`**db1.table1**`** **as** table1;
**select** features,label **from** table1 **as** **data**;
_- 做训练_
train **data** **as** PythonAlg**`/**tmp**/**tf**/**model**`where** scripts**=**"py_train"
**and** entryPoint**=**"py_train"
**and** condaFile**=**"py_env"
**and** keepVersion**=**"true"
**and** fitParam.0.fileFormat**=**"json" _- 还可以是parquet_
**and** **`**fitParam.0.psNum**`=**"1"
**and** PYTHON_ENV**=**"streamingpro-spark-2.4.x";

延展

大数据处理方面,有 Intel 设计的 RayDP,将 Spark 无缝集成到 Ray 中,通过 Ray 的 Actor 拉起 Spark 的 executor,利用 Ray 的分布式调度实现资源细粒度的调控。这样做的好处在于以 Spark 为大数据引擎的机器学习应用中,通过 Ray 可以将 Spark 产生的 dataframe 以 ML Dataset 的形式直接从内存传给下游的机器学习框架,比如 PyTorch。

Ray 另外的优势在于

**高效的数据传递和存储:**Ray 通过共享内存实现了一个轻量级的 plasma 分布式 object store。数据通过 Apache Arrow 格式存储。

**分布式调度:**Ray 的调度是 decentralized,每个节点上的 raylet 都可以进行调度;raylet 通过向 gcs 发送 heart beat 获取全局信息,在本地优先调度不能满足的情况下,快速让位给周边 raylet 进行调度。

**多语言的支持:**目前已经支持的语言包括:Python, Java, C++。后续 go 的支持以及更通用的多语言架构设计也在进行中。

metrics

在 train_epoch 中,同样有需要特殊处理的地方。RaySGD 默认支持的 metrics 只包括 loss 等。RTC 中,用户主要关心的指标包括 accuracy, precision, recall 以及 f1 measure 等。

Ray组件一览

1. Ray Core

  • 基本功能:

    • 提供基于任务(task)和 actor 的分布式调度与执行模型。

    • 支持跨节点并行计算,解决 CPU、GPU 等资源调度问题。

  • 应用场景:

    • 通用并行计算: 大规模数值计算、并行化批量任务(如大规模矩阵运算)。

    • 分布式系统模拟: 模拟现实世界中复杂系统(如交通调度、物流网络、供应链管理),每个 actor 可代表一个独立实体。

2. Ray Serve

  • 主要功能:

    • 构建高并发、可扩展的在线推理服务。

    • 支持 HTTP 或直接通过 actor 调用,不必局限于传统的 API 服务。

  • 应用场景:

    • 在线模型推理: 部署机器学习模型(如推荐系统、图像分类、自然语言处理)以实时响应请求。

    • 微服务架构: 将各个推理模块封装为独立服务,实现服务拆分和异步调用。

    • 实时数据处理: 结合流式数据处理,将推理结果实时反馈给业务系统。

3. Ray Tune

  • 主要功能:

    • 自动化超参数调优,支持分布式并行搜索。

    • 内置多种搜索算法(如随机搜索、贝叶斯优化、HyperBand)。

  • 应用场景:

    • 模型优化: 自动寻找最佳超参数组合,用于深度学习、强化学习等任务。

    • 系统性能调优: 除了机器学习,也可以用来调节其他系统或算法参数(例如调度算法的参数优化)。

4. Ray RLlib

  • 主要功能:

    • 分布式强化学习框架,支持多种 RL 算法。

    • 提供从单机到大规模集群的灵活扩展能力。

  • 应用场景:

    • 游戏 AI: 训练游戏中的 NPC 或对战机器人,实现复杂策略。

    • 机器人控制: 在仿真环境中训练机器人,后续可迁移至真实设备。

    • 自动驾驶和金融交易: 在高维、连续空间中进行策略优化和决策制定。

5. Ray Datasets

  • 主要功能:

    • 分布式数据加载与处理,支持大规模数据集的转换、过滤和聚合。

    • 提供类似 Pandas 的 API,但底层实现支持跨节点操作。

  • 应用场景:

    • 大数据 ETL: 实时或批量处理海量日志、传感器数据或社交媒体数据。

    • 数据清洗和特征工程: 并行执行数据预处理,提升数据科学实验的效率。

    • 分布式统计分析: 快速计算统计指标、生成数据报告等。

6. Ray Train (原 Ray SGD)

  • 主要功能:

    • 分布式训练框架,集成了主流深度学习库(如 PyTorch、TensorFlow)。

    • 支持多 GPU/多节点协同训练,简化分布式训练代码。

  • 应用场景:

    • 大规模模型训练: 训练图像识别、语音识别、自然语言处理等深度学习模型。

    • 多模态学习: 同时处理文本、图像、视频等多种数据源,实现联合训练。

7. Ray Workflow

  • 主要功能:

    • 构建容错、可重试的分布式工作流,管理任务之间的依赖关系。

    • 支持任务失败后的自动重试和进度恢复。

  • 应用场景:

    • 复杂数据管道: 将数据提取、转换、加载(ETL)与模型推理、结果汇总串联成完整工作流。

    • 业务流程自动化: 例如订单处理、用户行为分析、风险评估等需要跨多个步骤的数据处理场景。

    • 实验流水线: 自动化机器学习实验、数据科学项目的端到端流程管理。

8. Ray Air

  • 主要功能:

    • 提供一个统一的 API 框架,从数据预处理、模型训练到推理部署实现无缝衔接。

    • 简化了分布式机器学习整个流程的管理。

  • 应用场景:

    • 端到端机器学习平台: 快速搭建实验、部署模型、监控和优化整个 ML 流程。

    • 跨团队协作: 为数据工程师、数据科学家、运维人员提供一致的工作接口,降低沟通成本。

This post is licensed under CC BY 4.0 by the author.