Coursera Assignment: Simple Join in Spark

Coursera Assignment: Simple Join in Spark

SIMPLE JOIN

1)  If you’ve made it this far in the course, you’ll have a couple of files in your HDFS directory. We need these files to perform our simple join. To make use of these files, you need to load them as an RDD in Spark, which can be achieved with this code:

fileA = sc.textFile("input/join1_FileA.txt")

2)  Since RDDs are evaluated lazily, we should do a quick spot check to see if the file has been loaded correctly. Either of the following options will help us, so take your pick and make sure it produces some output. If you don’t get any output, there’s a good chance that the files are not in your HDFS directory or that the file path you selected above is incorrect.  If that doesn’t help, look for some hints in the “Name Error” line in the error output.

fileA.collect()# returns everything from the RDD
fileA.take(4)# returns the first four, or however many you want, from the RDD
Out[]: [u'able,991', u'about,11', u'burger,15', u'actor,22']

3)  As you can see in the output above, each record is stored as a unicode string.  See the “u” in front of each record? That means unicode. And strings are indicated by single (or double) quotation marks.  

4)  Now, let’s load the other file needed for this exercise and check for output as well. Based on the output, it looks like we have some more unicode strings to work with.

fileB = sc.textFile("input/join1_FileB.txt")
fileB.collect()
Out[]: 
[u'Jan-01 able,5',
 u'Feb-02 about,3',
 u'Mar-03 about,8 ',
 u'Apr-04 able,13',
 u'Feb-22 actor,3',
 u'Feb-23 burger,5',
 u'Mar-08 burger,2',
 u'Dec-15 able,100']


MAPPER FOR fileA

1)  Since our records in both fileA and fileB are strings, we need to break the strings up to get at their individual elements and eventually join them. The output for each of the files is a little different, so we’ll need two different functions to parse them. For the first function, we are asked to split up the string and convert the number (currently represented as a string) to an integer. Here’s one way to do that:

def split_fileA(line):
split = line.split(',')
word, count = split[0], int(split[1]) 
return (word, count)

Python was new for some of my classmates, so let’s go through this function step-by-step. The first part of our function defines a new variable called “split” that breaks a string into two strings (e.g ‘able’ & ‘991’) that we can extract and manipulate.

Now we can assign our parsed elements to the “word” and “count” variables by using Python slicing/subsetting. Since Python uses zero-based counting, we get the first element (‘able’) by using split[0] and split[1] to grab our second element (‘991’).  The other thing we need to do is convert our number represented as a string (‘991’) to an integer. This is easy to do by calling the int( ) function around our second element. Lastly, we return our two elements, which should be ‘able’ and 991 now.

2)  You may have some hesitation about your new function and whether it will actually work, so let’s do a quick test to put you at ease. First, we create a test line that resembles the lines in your file, then we call our function on that test line.

test_line = "able, 991"
split_fileA(test_line)
Out[]: ('able', 991)

3)  Looks good! Now we can run this as a map transformation on our full file. The map function is nice because it takes the function you just wrote — which is built to parse only a single string — and applies it to a list or array of items. So, let’s do the transformation and check our output.

fileA_data = fileA.map(split_fileA)
fileA_data.collect()
Out[]: [(u'able', 991), (u'about', 11), (u'burger', 15), (u'actor', 22)]

Coursera Assignment: Simple Join in Spark


MAPPER FOR fileB 

 

1) The mapper for fileB is a little trickier because we need to extract three different elements and reorder them in the output. But once you read through the function we use to do this, you’ll realize it probably easier than you anticipated.

def split_fileB(line):
    first_split = line.split()
    date = first_split[0]
    second_split = first_split[1].split(',')
    word, count_string = second_split[0], second_split[1]
    return (word, date + " " + count_string)

I’m sure there are simpler ways to do this (and please do share if you have another approach!), but this gets the job done. As we did with the first function, let’s quickly step through this function to understand what it’s doing. So after defining the function, we take a line and split it with the split( ) function and use the default whitespace delimiter, rather than specifying it as a comma like in the previous function. That splits our line that used to look like this:

'Jan-01 able,5'

And turns it into this:

'Jan-01' 'able,5'

Awesome. Now we can grab the date from the output by subsetting, which is shown in the third line in the function. The second part of our output has the word and count bound together in a string. So, on the fourth line of code, we use subsetting first to point Python at the second part of the output, and then we split again — this time using the comma delimiter — to break up the word and count elements. If we were to stop the function here, we could access some output that would look like this:

'Jan-01' 'able' '5'

That’s not bad but remember that we need to reorder our output. So, in the fifth line of code, we create the word and count_string variables by pulling out the achieved through the second_split function. And lastly, we return our output in the order defined in the assignment.  Not sure if this will work as you hope, use a testline like we did above. Your output should look like this:

'able', 'Jan-01 5'

Nice. Just as we did we the earlier mapper, we can run this as a map transformation for fileB so we parse and reorder the data as required and check the output again with the collect funtion.

fileB_data = fileB.map(split_fileB)

fileB_data.collect()
Out[]: 
[(u'able', u'Jan-01 5'),
 (u'about', u'Feb-02 3'),
 (u'about', u'Mar-03 8 '),
 (u'able', u'Apr-04 13'),
 (u'actor', u'Feb-22 3'),
 (u'burger', u'Feb-23 5'),
 (u'burger', u'Mar-08 2'),
 (u'able', u'Dec-15 100')]

JOIN TWO OUTPUT FILES

Assuming the output from your two mappers is in good shape, the next step of joining the files is easy and can be done with the following code:

fileB_joined_fileA = fileB_data.join(fileA_data)

This performs a join transformation in Spark with a given RDD of key, value pairs in each file and returns a data set with output joined on the key and the values associated with it. For our two files, this means our key will be the word (e.g. u’able’), and the values associated with the key will be the date from fileB (e.g. u’Jan-01 5′) and the count from fileA (e.g. 991). The value from fileB comes first because it is the file being joined to rather than the other way around.

The important thing to keep in mind here is that the output is not a single tuple like this: (key, value, value). Rather the output comes in the form of atuple within a tuple (or nested tuple), which looks like this: (key, (value, value)). Understanding this will be important for the next assignment on advance joins with PySpark.

spark-logo


SUBMIT YOUR ONE-LINER FOR GRADING ( Coursera Assignment: Simple Join in Spark ) 

Nice job! Now all you need to do is run the collect function on your RDD, grab the line with the ‘actor’ key, and submit it for grading. I tried doing this in a couple of different text editors but my result was rejected by the grader. A workaround was highlighted on the Coursera discussion board for the class, but your best bet is to use the gedit Text Editor available in the Cloudera VM. So, open the text editor, paste in the line below, save the file in a convenient location and upload it to the grader for your 100/100 score. Great work!

 

(u’actor’, (u’Feb-22 3′, 22))