Analyzing Wikipedia Text with pySpark

Spark improves usability by offering a rich set of APIs and making it easy for developers to write code. Programs in spark are 5x smaller than MapReduce. The Spark Python API (PySpark) exposes the Spark programming model to Python. To learn the basics of Spark, read through the Scala programming guide; it should be easy to follow even if you don’t know Scala. pySpark provides an easy-to-use programming abstraction and parallel runtime, we can think of it as – “Here’s an operation, run it on all of the data”.

To use Spark, developers write a driver program that implements the high-level control flow of their application and launches various operations in parallel on the nodes of the cluster.

The typical life cycle of a Spark program is –

  • Create RDDs from some external data source or parallelize a collection in your driver program.
  • Lazily transform the base RDDs into new RDDs using transformations.
  • Cache some of those RDDs for future reuse.
  • Perform actions to execute parallel computation and to produce results.

So we have the master-slave, parallel processing paradigm i.e we write commands in the master machine, it sends tasks to the workers, they compute and sends the results back but also persist in-memory any RDDs that they might have constructed along the way of completing the task. The next time we query, the persistent RDDs in-memory gets hit and results are computed much faster. Thus, by chaining the transformations and actions, we can write programs the same way as mapreduce.

Capture

The volume of unstructured text in existence is growing dramatically, and Spark is an excellent tool for analyzing this type of data. Lets look at the code that analyzes words in the complete text dump of Wikipedia.

def removePunctuation(text):
text=text.lower().strip()
text=re.sub(“[^0-9a-zA-Z ]”, ”, text)
return text def wordCount(wordListRDD):
return wordListRDD.map(lambda x:(x,1)).reduceByKey(lambda a,b:a+b)

removePunctuation : A python function to remove punctuation, change text to lower case, and strip the leading and trailing spaces.

wordCount : A python function to create a pair RDD with word and its occurrence counts from the RDD of words. Here we first apply the map function to create a pair RDD (word,1) for each occurrence of a word in text. We then used the reduceByKey() transforation on the RDD of (K, V) pairs which returns the RDD of (K, V) pairs where the values for each key are aggregated using the given reduce function func. Eg: If the map function creates n pairs for word xyz as (xyz,1),…,(xyz,1), the reduceByKey() will return the pair with aggregated value for each key (here (xyz,n)).

The first thing a Spark program must do is to create a SparkContext object, which tells Spark how to access the cluster. To create a SparkContext you first need to build a SparkConf object that contains information about the application.

conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)

Next, we create the base RDD by reading a text file (wikipedia dump) using SparkContext object. Spark determines that we want to create a RDD with 8 parts from this text file. When we do that, lazy evaluation occurs, it means that no execution happens right now. All that happens is Spark records how to create the RDD from that text file. So nothing actually happens until we perform an action. We are still constructing only a recipe. The textFile() method takes an optional second argument for controlling the number of partitions of the file (here 8).

By default, Spark creates one partition for each block of the file (blocks being 64MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks. Also, the file can be present in any storage system like Amazon S3, HDFS etc. Applying map(func) returns a new distributed dataset formed by passing each element of the source through a function func. So here we clean the text by applying the removePunctuation function to remove punctuation from each line of the text.

wikipediaRDD = (sc .textFile(“…fileName…”, 8) .map(removePunctuation))

We now create the words RDD by splitting the lines RDD based on space and then remove the blank elements using filter().

wikipediaWordsRDD = wikipediaRDD.flatMap(lambda x:x.split(‘ ‘))
wikiWordsRDD = wikipediaWordsRDD.filter(lambda x:x!=”)

Remember that until now we have only applied transformations, so no execution happened and Spark only stored the sequence of transformations that it needs to apply to create the final RDD. We now create the pair RDD of the form (word,count) for each unique word from the words RDD and apply takeOrdered() action which causes Spark to apply all the transformations which it has captured until now to return the final results. The takeOrdered() action returns the first n elements of the RDD as a list using either their natural order or a custom comparator. Here it returns the top 15 words from the paired RDD using custom comparator as a python lambda function.

top15WordsAndCounts = wordCount(wikiWordsRDD).takeOrdered(15,key=lambda (k,v):-v)

In this way, we can easily use Spark to perform data exploration and ask various questions to analyze a huge unstructured set of data (here Wikipedia text). Eg: We can find the occurrences of numbers in the wikipedia text using the below operations.

numbersRDD = wikiWordsRDD.filter(lambda x: x==”zero” or x==”one” or x==”two” or x==”three” or x==”four” or x==”five” or x==”six” or x==”seven” or x==”eight” or x==”nine”)
wordCount(numbersRDD).takeOrdered(100, lambda (k,v): -v)

Notice that if the number of elements in the RDD is less than the value of n used in the takeOrdered() action, instead of returning an error it returns the existing number of elements using the specified comparator function (here we used n=100 in the takeOrdered() when there are only 10 elements in the numbersRDD).

We can even process this text using the normal UNIX operations as shown below. However, it will not be a distributed processing like Spark and will result in very slow processing and may even fail for such humongous amount of data.

cat wikipedia.txt | tr -d [[:punct:]] | tr -s ‘[[:space:]]’ ‘\n’ | tr ‘[[:upper:]]’ ‘[[:lower:]]’ | sort | uniq -c | sort -r | head -n 15
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s