Ray: A Distributed Framework for Emerging AI Applications
TL;DR Summary
Ray is a distributed framework designed for emerging AI applications, especially in reinforcement learning, providing a unified interface for task-parallel and actor-based computations, achieving over 1.8 million tasks per second with superior performance.
Abstract
The next generation of AI applications will continuously interact with the environment and learn from these interactions. These applications impose new and demanding systems requirements, both in terms of performance and flexibility. In this paper, we consider these requirements and present Ray---a distributed system to address them. Ray implements a unified interface that can express both task-parallel and actor-based computations, supported by a single dynamic execution engine. To meet the performance requirements, Ray employs a distributed scheduler and a distributed and fault-tolerant store to manage the system's control state. In our experiments, we demonstrate scaling beyond 1.8 million tasks per second and better performance than existing specialized systems for several challenging reinforcement learning applications.
Mind Map
In-depth Reading
English Analysis
1. Bibliographic Information
1.1. Title
Ray: A Distributed Framework for Emerging AI Applications
1.2. Authors
Philipp Moritz, Robert Nishihara, Stephanie Wang, Alexey Tumanov, Richard Liaw, Eric Liang, Melih Elibol, Zongheng Yang, William Paul, Michael I. Jordan, Ion Stoica
- Affiliation: University of California, Berkeley (RISELab)
1.3. Journal/Conference
Published at the 13th USENIX Symposium on Operating Systems Design and Implementation (OSDI '18).
- Comment: OSDI is one of the premier and most prestigious conferences in the field of computer systems, known for publishing high-impact work on operating systems, distributed systems, and networking.
1.4. Publication Year
2018 (ArXiv version submitted Dec 2017)
1.5. Abstract
The paper addresses the system requirements for emerging AI applications, particularly Reinforcement Learning (RL), which demand a tight coupling of simulation, training, and serving. Existing systems support these individually but struggle to unify them with the necessary performance and flexibility. Ray is proposed as a distributed system solving this by implementing a unified interface for task-parallel and actor-based computations. It employs a Global Control Store (GCS) to decouple control state from execution, allowing the system to scale stateless components independently. It uses a bottom-up distributed scheduler to handle millions of tasks per second with millisecond-level latency. Experiments demonstrate scaling to 1.8 million tasks/s and outperforming specialized systems in RL workloads.
1.6. Original Source Link
- Official PDF: https://arxiv.org/pdf/1712.05889v2.pdf
- Status: Published (OSDI '18)
2. Executive Summary
2.1. Background & Motivation
The landscape of data analysis has evolved from batch processing (Hadoop/Spark) to streaming and graph processing. Recently, the focus has shifted to AI and Machine Learning (ML), specifically Deep Learning (DL) and Reinforcement Learning (RL).
- The Problem: RL applications differ significantly from standard supervised learning. They require a continuous loop of:
- Simulation: Interacting with an environment to generate data (trajectories). This requires fine-grained, dynamic, and often heterogeneous computation (e.g., varying duration).
- Training: Updating policies using data, often requiring stateful computations (e.g., parameter servers) and high-throughput GPU usage.
- Serving: Using the policy to take actions in real-time, requiring extremely low latency.
- The Gap: Existing frameworks are specialized:
- Dataflow systems (Spark, MapReduce): Good for batch, bad for fine-grained dynamic simulation and low-latency serving.
- DL Frameworks (TensorFlow, PyTorch): Good for training, but lack native support for general-purpose distributed simulation and serving logic.
- Serving Systems (Clipper): Good for serving, but don't handle training or simulation.
- The Consequence: Researchers forced to stitch together disparate systems (high latency/complexity) or build custom one-off systems (high engineering burden, no fault tolerance).
2.2. Main Contributions / Findings
Ray introduces a general-purpose cluster-computing framework designed to unify these workloads.
- Unified Abstraction: Ray combines Tasks (stateless functions for load balancing and recovery) and Actors (stateful processes for maintaining state like model weights or simulator environments).
- System Architecture Innovations:
- Global Control Store (GCS): A centralized, sharded metadata store that holds the entire control state (lineage, object locations). This allows all other components (schedulers, workers) to be stateless and horizontally scalable.
- Bottom-Up Distributed Scheduler: A two-level hierarchy where tasks are scheduled locally first for low latency, and forwarded to a global scheduler only when necessary (e.g., resource constraints).
- Performance:
-
Ray scales linearly to over 1.8 million tasks per second.
-
It provides transparent fault tolerance for both actors and tasks.
-
It outperforms specialized custom systems for RL algorithms like Evolution Strategies (ES) and Proximal Policy Optimization (PPO).
The following figure (Figure 1 from the original paper) illustrates the typical loop of an RL system that Ray aims to support:
该图像是一个示意图,展示了强化学习(RL)系统的结构。左侧为智能体(Agent),包括训练和服务两部分,培训中使用如SGD的策略改进方法;右侧为环境(Environment),呈现动作、状态和奖励的交互机制。轨迹表示为 。
-
3. Prerequisite Knowledge & Related Work
3.1. Foundational Concepts
- Reinforcement Learning (RL): A type of machine learning where an Agent learns to make decisions by performing Actions in an Environment to maximize a cumulative Reward.
- Policy: The strategy or function the agent uses to decide the next action based on the current state.
- Rollout: A sequence of interactions (simulation steps) typically generated to evaluate a policy.
- Task-Parallelism: A computing model where a problem is broken down into discrete tasks that can run concurrently. In Ray, these are stateless functions.
- Actor Model: A model of concurrent computation where "Actors" are the universal primitives. An actor maintains its own private state and interacts with others only by sending messages. In Ray, these are stateful classes.
- Lineage: In distributed systems (like Spark), lineage is a record of the graph of operations used to create a dataset. If data is lost, the system uses the lineage to recompute it.
- Parameter Server: A design pattern common in distributed ML where a server holds the global model parameters (weights) and workers pull them to compute gradients, then push updates back.
3.2. Previous Works
The paper contrasts Ray with several categories of existing systems:
- Bulk-Synchronous Parallel (BSP) Systems (MapReduce, Spark):
- Concept: Execution is divided into supersteps separated by global barriers.
- Limitation for RL: Too rigid. RL simulations are dynamic (finish at different times, spawn new tasks conditionally). The overhead of launching tasks in Spark is too high for the millisecond-latency requirements of RL serving/simulation.
- Task-Parallel Systems (CIEL, Dask):
- Concept: Support dynamic task graphs.
- Limitation: CIEL uses a centralized master (bottleneck). Dask (at the time) lacked the robust actor support needed for stateful training components like parameter servers.
- Actor Systems (Orleans, Akka):
- Concept: pure actor models.
- Limitation: Typically offer "at-most-once" or "at-least-once" messaging but lack the "exactly-once" semantics and automatic data reconstruction (lineage) that dataflow systems provide for fault tolerance. Ray aims to provide the best of both.
- Deep Learning Frameworks (TensorFlow, MXNet):
- Concept: Static graphs of tensor operations.
- Limitation: Not designed for general-purpose distributed logic (like managing a simulator fleet).
3.3. Technological Evolution
- Big Data Era: Focus on processing static data volumes (Hadoop).
- In-Memory Analytics: Focus on speed and iteration (Spark).
- Deep Learning: Focus on tensor computation and GPU acceleration (TensorFlow).
- Emerging AI (Ray's Niche): Focus on dynamic interaction with environments, requiring a fusion of high-throughput simulation, distributed training, and low-latency serving in a single closed loop.
4. Methodology
4.1. Principles
Ray is built on the principle of a Dynamic Task Graph.
- Dynamic: The computation graph is not defined in advance (like in early TensorFlow) but is generated on the fly as the program runs.
- Hybrid Abstraction: It unifies:
-
Tasks (Remote Functions): Stateless, idempotent, fine-grained. Good for load balancing and recovery via re-execution.
-
Actors (Remote Classes): Stateful. Good for encapsulating mutable state (e.g., simulators, neural network weights).
The following figure (Figure 5 from the original paper) presents the overall architecture of Ray, showing the separation between the Application Layer and the System Layer:
该图像是图示,展示了Ray的体系结构,包括应用层和系统层。应用层实现API和计算模型,系统层负责任务调度和数据管理,以满足性能和容错要求。
-
4.2. Architecture In-Depth
Ray's architecture is unique because it decouples the control plane (metadata) from the data plane (execution and object storage).
4.2.1. Application Layer
The application layer exposes the API to the user. It consists of three types of processes:
- Driver: The process executing the main user program (the script).
- Worker: A stateless process that executes tasks (remote functions). Workers are managed automatically.
- Actor: A stateful process that executes methods invoked on it. Explicitly instantiated.
4.2.2. System Layer: The Global Control Store (GCS)
This is the most critical innovation. In traditional systems (e.g., Spark), the "Master" node holds the metadata (who is running what, where data is). In Ray, this state is pushed into the GCS.
- Function: Stores the entire control state:
- Object Table: Where objects are located.
- Task Table: Status of tasks and lineage.
- Function Table: Code definitions.
- Implementation: A sharded key-value store (using Redis) with chain replication for fault tolerance.
- Benefit: Since the metadata is in GCS, the Scheduler and Object Store can be stateless. If a scheduler fails, a new one can spin up and read the state from GCS. This enables massive horizontal scalability.
4.2.3. System Layer: Bottom-Up Distributed Scheduler
To handle millions of tasks per second, a centralized scheduler is a bottleneck. Ray uses a two-level hierarchy:
- Local Scheduler: Each node has one.
- Tasks submitted by a driver/worker on Node A go to Node A's Local Scheduler first.
- If Node A has resources (CPUs/GPUs) and the task requirements are met, it schedules locally. This ensures millisecond-level latency.
- If not, it forwards the task to the Global Scheduler.
- Global Scheduler:
- Receives tasks that local schedulers cannot handle.
- Makes decisions based on global load and data location information (fetched from GCS).
- Algorithm: It selects the node that provides the lowest estimated waiting time.
Scheduling Logic: The Global Scheduler estimates the waiting time at a candidate node for a task as:
- Symbol Explanation:
-
: The estimated time the task will wait in the queue at node . This is calculated as: .
-
: The estimated time to move task 's inputs to node . This is calculated as: .
-
The scheduler maintains these averages using exponential averaging.
The following figure (Figure 6 from the original paper) visualizes this bottom-up flow:
该图像是一个图表,展示了底向上分布式调度器的结构。任务从驱动程序和工作节点提交到本地调度器,必要时再转发至全局调度器。箭头的粗细与请求速率成正比。
-
4.2.4. System Layer: In-Memory Distributed Object Store
- Design: Each node has a shared-memory object store (using Apache Arrow).
- Mechanism:
- Zero-Copy: Tasks on the same node can read data (e.g., large numpy arrays) without copying, via shared memory mapping.
- Object Transfer: If a task needs an object not on its node, the Object Store replicates it from the remote node.
- Immutability: Objects in the store are immutable. This simplifies consistency; no locks or complex update protocols are needed.
4.3. Computation Model & API
Ray provides a Python API that translates directly to the underlying dynamic task graph.
4.3.1. Core API
The following table (Table 1 from the original paper) summarizes the key API methods:
| Name | Description |
|---|---|
futures = f.remote(args) |
Execute function f remotely. Returns futures (IDs) immediately. Non-blocking. |
objects = ray.get(futures) |
Block and retrieve the actual values associated with the futures. |
ready_futures = ray.wait(futures, k, timeout) |
Return futures that have completed. Returns as soon as k are ready. Crucial for handling heterogeneous task durations. |
actor = Class.remote(args)futures = actor.method.remote(args) |
Instantiate a remote actor and call methods on it. Non-blocking. |
4.3.2. Graph Construction
-
Data Edges: Dependencies between objects and tasks (e.g., output of Task A is input to Task B).
-
Control Edges: Dependencies from nested tasks (e.g., Task A calls Task B).
-
Stateful Edges: Dependencies between method calls on the same Actor. This ensures serial execution within an actor and maintains state consistency.
The following figure (Figure 4 from the original paper) shows how code translates to a task graph with stateful edges (between and ):
该图像是图表,展示了调用 train_policy.remote() 后的任务图。图中包括多个角色和方法的调用,表示任务之间的数据依赖和状态共享关系。其中,控制边和状态边分别用不同的线条表示,反映了任务执行的逻辑结构。
4.4. Fault Tolerance Strategy
Ray uses Lineage Re-execution for recovery.
- For Tasks: If a node fails, the system looks up the lineage of the lost objects in the GCS and re-executes the tasks that produced them. Because tasks are deterministic and side-effect-free, this is safe.
- For Actors:
-
Actor methods have stateful edges in the lineage graph.
-
To recover an actor, Ray can re-execute the chain of methods called on that actor from the beginning (or from a checkpoint).
-
Checkpointing: To avoid replaying infinite history, users can snapshot actor state.
The following figure (Figure 7 from the original paper) demonstrates the end-to-end flow of an
add(a, b)operation, illustrating the interaction between the Driver, Scheduler, GCS, and Object Store:
该图像是示意图,展示了一个远程任务执行和结果返回的过程。图中分为两部分:(a) 在节点 N1 上注册并调用函数 add(),然后在节点 N2 上执行该函数;(b) 节点 N1 通过 c = \text{ray.get}(id_c)获取 add() 的结果。实线表示数据平面操作,虚线表示控制平面操作。此过程涉及全局控制存储(GCS)和对象存储的交互。
-
5. Experimental Setup
5.1. Datasets & Tasks
The paper evaluates Ray on both microbenchmarks and end-to-end RL applications.
- Microbenchmarks:
- Task Throughput: Executing empty tasks to measure overhead.
- Allreduce: A communication primitive used in distributed training.
- Object Store I/O: Measuring write throughput and IOPS.
- RL Applications:
- Evolution Strategies (ES): Tested on the
Humanoid-v1task from OpenAI Gym. - Proximal Policy Optimization (PPO): Tested on
Humanoid-v1. - Training: ResNet-101 image classification model (TensorFlow benchmark).
- Evolution Strategies (ES): Tested on the
5.2. Evaluation Metrics
- Throughput:
- Definition: The number of tasks the system can complete per unit of time.
- Unit: Tasks per second.
- Latency:
- Definition: The time delay between submitting a task and its execution start, or the time for a complete primitive (like allreduce).
- Unit: Milliseconds (ms).
- Completion Time:
- Definition: Time taken to reach a target accuracy or reward score.
- Unit: Minutes or Seconds.
5.3. Baselines
- MPI (OpenMPI): Represents high-performance, low-level message passing. Hard to program, no fault tolerance, but theoretically fast.
- Horovod: A specialized system for distributed deep learning training (optimized ring-allreduce).
- Clipper: A specialized system for model serving.
- Reference Implementations: Highly optimized custom code provided by OpenAI for ES and PPO algorithms.
6. Results & Analysis
6.1. Core Results Analysis: Scalability
Ray demonstrates near-linear scalability.
-
Throughput: As the number of nodes increases to 60, Ray reaches 1 million tasks/s. At 100 nodes, it exceeds 1.8 million tasks/s.
-
Latency: The bottom-up scheduler enables this. Most tasks are scheduled locally, avoiding the global scheduler bottleneck.
The following figure (Figure 8 from the original paper) shows the linear scalability (b) and the benefit of locality-aware placement (a):
该图像是图表,展示了Ray的任务调度和可扩展性的性能。左侧(a)显示了在不同对象大小下,本地敏感调度与无感调度的平均任务延迟对比(单位为秒),而右侧(b)则展示了随着节点数增加,Ray能够达到的任务处理能力,以每秒百万任务为单位。
6.2. System Component Performance
-
Object Store: Achieves write throughput exceeding 15 GB/s (for large objects) and 18K IOPS (for small objects) on a single node. This confirms the efficiency of the shared memory design.
-
GCS Fault Tolerance: Experiments show that when a GCS shard replica fails, the delay observed by the client is under 30ms, proving the robustness of the chain replication.
The following figure (Figure 9 from the original paper) details the object store performance:
该图像是图表,展示了对象存储的写入吞吐量和每秒输入输出操作数(IOPS)。随着对象大小的变化,吞吐量超过15GB/s,IOPS达到18K,图中使用的线程数分别为1、2、4、8和16,结果为5次试验的平均值。
6.3. Comparison with Specialized Systems
6.3.1. Distributed Training (ResNet-101)
Ray was used to implement a parameter server for training a ResNet-101 model.
-
Result: Ray matches the performance of Horovod and is within 10% of distributed TensorFlow.
-
Significance: This proves that a general-purpose system (Ray) can match specialized systems in their specific domain while offering broader flexibility.
The following figure (Figure 13 from the original paper) compares Ray's training throughput (Images/sec) against Horovod and Distributed TF:
该图像是图表,展示了在不同 GPU 数量下,采用 Horovod + TF、Distributed TF 和 Ray + TF 训练 ResNet-101 模型时每秒处理的平均图像数量。随着 GPU 数量的增加,Ray + TF 的性能表现优于其他方法。
6.3.2. Serving
Ray was compared to Clipper for embedded serving (serving a policy to a simulator on the same machine).
- Result: Ray achieved significantly higher throughput (e.g., 6900 vs 290 queries/s for larger inputs).
- Reason: Ray uses shared memory (zero-copy), whereas Clipper uses REST (RPC overhead).
6.3.3. Simulation
Comparison between MPI and Ray for the Pendulum-v0 simulator.
The following table (Table 4 from the original paper) shows the results:
| System, programming model | 1 CPU | 16 CPUs | 256 CPUs |
|---|---|---|---|
| MPI, bulk synchronous | 22.6K | 208K | 2.16M |
| Ray, asynchronous tasks | 22.3K | 290K | 4.03M |
- Analysis: Ray outperforms MPI at scale (256 CPUs) by nearly 2x. This is because MPI (in BSP mode) waits for the slowest simulation to finish, whereas Ray's asynchronous tasks utilize resources continuously.
6.4. RL Application Performance (ES & PPO)
Ray implementations were compared against highly optimized reference implementations.
- Evolution Strategies (ES): Ray scaled to 8192 cores and reached the solution in 3.7 minutes (half the time of the best published result). The reference system failed to scale beyond 1024 cores.
- PPO: Ray outperformed the specialized MPI implementation while using fewer GPUs.
-
Reason: Ray handles heterogeneity better. It can schedule CPU-only tasks on CPU nodes and GPU tasks on GPU nodes. The MPI implementation forced a symmetric structure (1 GPU per 8 CPUs), wasting resources.
The following figure (Figure 14 from the original paper) illustrates these RL benchmarks:
该图像是图表,展示了在Humanoidv1任务中,不同CPU数量下Ray ES与参考ES的平均解决时间,以及不同GPU配置下Ray PPO与MPI PPO的平均解决时间。左侧(a)显示Evolution Strategies的结果,右侧(b)显示PPO的结果,均以分钟为单位。
-
7. Conclusion & Reflections
7.1. Conclusion Summary
Ray successfully introduces a unified framework for the emerging class of AI applications that require tight integration of simulation, training, and serving. By combining Tasks and Actors on a dynamic graph, and supporting this with a Global Control Store and Bottom-Up Scheduler, Ray achieves:
- High Scalability: 1.8M+ tasks/s.
- Low Latency: Millisecond-level task execution suitable for serving.
- Flexibility: Supporting heterogeneous hardware (CPU/GPU) and dynamic computation patterns better than rigid BSP or static graph systems.
7.2. Limitations & Future Work
- Garbage Collection: Storing lineage for every task in the GCS can consume infinite memory. The paper notes that effective garbage collection strategies for lineage are an active area of development (GCS flushing is discussed as a mitigation).
- Scheduling Complexity: The scheduler makes decisions without full knowledge of the computation graph (since it's dynamic). This might lead to suboptimal decisions compared to static graph schedulers (like TensorFlow's XLA) that can optimize the entire DAG globally.
7.3. Personal Insights & Critique
- Architectural Shift: The GCS is the most profound idea here. Moving the "brain" (metadata) out of the "master" process allows the master to effectively become a distributed entity. This addresses the single-point-of-failure and bottleneck issues inherent in architectures like Hadoop/Spark master nodes.
- Unification Power: The ability to pass handles of Actors into Tasks (and vice versa) allows for extremely complex distributed patterns (like tree-aggregation or parameter servers) to be written in standard Python code. This lowers the barrier to entry significantly compared to writing MPI code.
- Relevance Today: Ray has grown massive since this paper (2018). It is now the backbone for many LLM training (e.g., portions of GPT training pipelines) and serving infrastructures (Ray Serve). This paper lays the foundational theory—Actor/Task unification and GCS—that enabled this growth. The decision to support "stateful" computations (Actors) as a first-class citizen was the key differentiator from serverless (Lambda) and dataflow (Spark) systems.
Similar papers
Recommended via semantic vector search.