Ray Crash Course - Ray Parallel Iterators (Now deprecated in Ray 1.7)¶
NOTE: As of Ray release 1.7 ray.util.iter
has been deprecated
© 2019-2021, Anyscale. All Rights Reserved
This lesson explores ray.util.iter
(documentation), which provides a parallel iterator API for simple data ingest and processing. It can be thought of as syntactic sugar around Ray actors and ray.wait
loops.
Parallel iterators are lazy, so they can operate over infinite sequences of items. Iterator transformations are only executed when the user calls next()
to fetch the next output item from the iterator.
So, parallel iterators provide a simple, yet powerful API for stream processing.
Tip: For more about Ray, see ray.io or the Ray documentation.
import ray, time, sys, os
import numpy as np
ray.init(ignore_reinit_error=True)
The Ray Dashboard, if you are running this notebook on a local machine:
Parallel Iterators¶
You can create a ParallelIterator
object from an existing set of items, range of numbers, set of iterators, or set of worker actors (reference documentation).
Ray will create a worker actor that produces the data for each shard of the iterator. Here are some examples:
# Create an iterator with 2 worker actors over the list [1, 2, 3, 4].
it1 = ray.util.iter.from_items([1, 2, 3, 4], num_shards=2)
it1
# Create an iterator with 8 worker actors over range(1000000).
it2 = ray.util.iter.from_range(1000000, num_shards=8)
it2
# Create an iterator over three range(10) generators. How many shards are created?
it3 = ray.util.iter.from_iterators([range(10), range(10), range(10)])
it3.shards()
You can also use Actors, but they must subclass ParallelIteratorWorker.
from ray.util.iter import ParallelIteratorWorker
@ray.remote
class IterableWorker(ParallelIteratorWorker):
# Must call the parent constructor as shown (see also the docs linked above)
def __init__(self):
super().__init__(item_generator = lambda: np.random.randint(255, 100), repeat = True)
# Create an iterator from existing worker actors. These actors must
# implement the ParallelIteratorWorker interface.
# Using comprehension, create a list of four instances of
# IterableWorker
iws = [IterableWorker.remote() for _ in range(4)]
# Create a Parallel iterator from list of IterableWorkers
it4 = ray.util.iter.from_actors(iws)
it4.shards()
We'll use some of these iterators in subsequent cells.
Local Iterators¶
To read elements from a parallel iterator, it has to be converted to a LocalIterator by calling gather_sync or gather_async. These correspond to ray.get
and ray.wait
loops over the actors respectively:
# Gather items synchronously (deterministic round robin across shards).
# Use the it3 = ray.util.iter.from_iterators([range(10), range(10), range(10)]) iterator above:
local_it3 = it3.gather_sync()
local_it3
Local iterators can be used like any other Python iterator:
local_it3.take(5)
local_it3.take(5)
Can you understand what's happening in the two previous cells? To get 10 total elements, the iterator returns elements from every shard, cycling through them as required. Hence, four are returned from one shard and three from the other two shards. We'll discuss ordering and semantics in more detail below.
Transformations¶
Simple transformations can be chained on the iterator, such as mapping, filtering, and batching. These will be executed in parallel on the workers:
# Apply a "map" transformation to each element of the iterator.
it1_foreach = it1.for_each(lambda x: x ** 2)
it1_foreach
Notice the information printed for it1_foreach
. We'll keep printing these iterators going forward so you can see how each one is different.
it1_foreach.gather_sync().take(10) # 10 is too high, there are only 4, but that's fine.
Sometimes it's useful to batch elements together:
# Batch together items into a lists of 5 elements.
it2_batch5 = it2.batch(5)#.for_each(lambda list: "|".join(list))
it2_batch5
it2_batch5.gather_sync().take(10)
The order used to sequence operations matters. Notice what's different about the next two pairs of cells?
it2_batch5a = it2.batch(5).for_each(lambda list: sum(list))
it2_batch5a
it2_batch5a.gather_sync().take(10)
it2_batch5b = it2.for_each(lambda x: 2*x).batch(5)
it2_batch5b
it2_batch5b.gather_sync().take(10)
You can filter values. Next we filter to keep even values.
it2_evens = it2.filter(lambda x: x % 2 == 0)
it2_evens
it2_evens.batch(5).gather_sync().take(10)
Notes:
- Transformations used before the call to
gather_sync()
run in parallel on shards using theParallelIterator
.- Transformations used after the call to
gather_sync()
run in the current process.
The async gather, gather_async
, can be used for better performance, but it is non-deterministic.
it_async = ray.util.iter.from_range(100, 4).gather_async()
it_async.take(10)
Passing Iterators to Remote Functions¶
Both ParallelIterator
and LocalIterator
are serializable. They can be passed to any Ray remote function. However, note that each shard should only be read by one process at a time:
First, you can get local iterators representing the shards of a ParallelIterator
:
it = ray.util.iter.from_range(1000, 3)
[s0, s1, s2] = it.shards()
[s0, s1, s2]
The iterator shards can be passed to remote functions:
@ray.remote
def do_sum(it):
return sum(it)
ray.get([do_sum.remote(s) for s in it.shards()])
More on Semantic Guarantees¶
The parallel iterator API guarantees the following semantics:
Fetch Ordering¶
When using it.gather_sync().for_each(fn)
, it.gather_async().for_each(fn)
, or any other transformation after a gather_*sync
, for a sequence of elements served by a source actor, the function fn
will be called on those elements in order. In particular, it will be called for element i in the sequence before the next element, i+1, is fetched from the source actor. This is useful if you need to update the source actor between iterator steps. Note that for async gather, this ordering only applies per shard.
Operator State¶
Operator state is preserved for each shard. This means that you can pass a stateful callable to for_each()
, as in the following example:
class CumulativeList:
def __init__(self):
self.list = []
def __call__(self, x):
self.list.append(x)
return (x, self.list)
it = ray.util.iter.from_range(5, 1)
for x in it.for_each(CumulativeList()).gather_sync():
print(x)
For more details on Parallel Iterators, see the API reference.
@ray.remote
def train(data_shard):
for batch in data_shard:
# simulate performing a model update with a batch of data
print("train on", batch)
def train_model(range, batch, num_shards, repeat):
train_iter = (
ray.util.iter.from_range(range, num_shards=num_shards, repeat=repeat)
.batch(batch)
.for_each(np.array)
)
work = [train.remote(shard) for shard in train_iter.shards()]
ray.get(work)
train_model(range=1000, batch=250, num_shards=4, repeat=False)
Streaming Word Count¶
Word count was called the "Hello World" of Hadoop programming, because it's a conceptually simple algorithm that illustrates the map/reduce paradigm of Hadoop programming very well and it was often the first program written by developers learning about Hadoop.
Word count is tedious to write in the original MapReduce
API, but elegant in Apache Spark, which replaced MapReduce
. Our implementation here is similar to how you would implement it in Spark. The key is to have a powerful, composable set of operations, sometimes called "operators" (in the spirit of addition, multiplication, etc.) or combinators.
In word count, a corpus of documents is read in parallel processes, where each document is tokenized into words, and the occurrences of each word are counted, then combined into a global word-count dictionary. Usually the results are sorted by frequency of occurrence, descending. The idea is that the most common words are indicative of the major themes of the corpus. Slightly more sophisticated algorithms are n-grams, which count the short n
-word phrases, and inverted index, which builds a dictionary of words and the locations the words were found, the basis of a search engine!
Note: This code is also available as a complete, standalone example in word-count.py.
import glob, gzip, re
class WordCount:
"Wraps a dictionary of words and counts."
def __init__(self):
self.counts = {}
def __call__(self, word, increment):
count = increment
if word in self.counts:
count = self.counts[word]+increment
self.counts[word] = count
return (word, count)
def sort_counts(self, descending=True):
"Returns a generator of word-count pairs sorted by count."
return (wc for wc in sorted(self.counts.items(), key = lambda wc: wc[1], reverse=descending))
def unzip(f):
if f.endswith(".gz"):
return gzip.open(f)
else:
return open(f, 'r')
def count_words(file_globs, top_n = 100, batch_window = 1024):
# The working directory of this application may be _different_
# than the Ray cluster's working directory. (In a real cluster,
# the files available will be different, too, but we'll ignore
# the problem here.) So, we need to pass absolute paths or our
# ray.util.iter.from_items won't find the files!
globs = [g for f in file_globs for g in glob.glob(f)]
file_list = list(map(lambda f: os.path.abspath(f), globs))
print(f'Processing {len(file_list)} files: {file_list}')
# See also the combine operation, which is for_each(...).flatten(...).
word_count = (
ray.util.iter.from_items(file_list, num_shards=4)
.for_each(lambda f: unzip(f).readlines())
.flatten() # converts one record per file with all lines to one record per line.
.for_each(lambda line: re.split('\W+', line)) # split into words
.flatten() # flatten lists of words into one word per record
.for_each(lambda word: (word, 1))
.batch(batch_window)
)
# Combine the dictionaries of counts across shards with a sliding window
# of "batch_window" lines.
wordCount = WordCount()
for shard_counts in word_count.gather_async():
for word, count in shard_counts:
wordCount(word, count)
sorted_list_iterator = wordCount.sort_counts()
return [sorted_list_iterator.__next__() for i in range(top_n)]
Note that the current working directory used is actually the root of the project, so we have to use absolute paths or correct relative paths!
%time words_counts = count_words(['*.ipynb'], top_n=100)
words_counts
Exercises¶
Let's explore word count. Do the first two exercises, then any of the rest of them that interest you.
The solutions are in the solutions notebook.
As you do these exercises, pay attention to any perceived performance changes, better or worse.
Exercise 1¶
Replace the combinations of for_each().flatten()
with combine()
(documentation). In functional programming languages, this "operator" is usually called flatmap
, because for_each
as used here is called map
, where the output of each application of for_each/map
on a single record returns a collection of new records, and the resulting collection of collections is flattened to a collection of new records.
Exercise 2¶
The same word may appear with different capitalization, e.g., at the beginning of a sentence. We don't want these occurrences counted separately (although this makes less sense when you are scanning source code!). Convert all words to lower case. When in the pipeline should this be done?
Exercise 3¶
The most frequent "words" are actually single letters, like n
, which is the most numerous across the notebooks in this directory. Can you figure out why there are so many n
s? Hint: try this code:
f=open('./01-Ray-Tasks.ipynb', 'r')
[f.readline() for _ in range(10)]
In natural language processing, the term stop words is used for "tokens" that are not very useful for understanding, so they are filtered out. Add a filter
step (documentation) to remove a set of stop words that you define. Where is the best place to put this step? How would you represent the set of stop words and how would you filter them out?
Once the stop words are removed, what are the most common words?
Exercise 4¶
Try running word count on some of your own files. Adjust your list of stop words as needed.
"Homework"¶
Solutions are not provided in the solutions notebook for these suggested exercises.
Better Tokenization¶
Our "tokenization" technique is naïve; we simply split on and discard non-alphanumeric characters. Try using a more sophisticated tokenizer. See this Stack Overflow page for ideas.
N-Grams¶
Try implementing the N-grams algorithm, which counts the short n
-word phrases. Why are they useful?
Inverted Index¶
Try implementing the Inverted index algorithm, which builds a dictionary of words and the locations the were found, the basis of a search engine! Takeover Google...
ray.shutdown() # "Undo ray.init()". Terminate all the processes started in this notebook.
Ray parallel iterators isn't a full-featured replacement for powerful systems like Apache Spark, but it handles a lot of scenarios without having to leave Ray.
The next lesson, Exploring Ray API Calls explores the other Ray API calls for more advanced scenarios, including keyword arguments you can pass to the API calls already learned are explored.