Sunday, February 28, 2016

Textual description of firstImageUrl

How to set up Hadoop Streaming to analyze MovieLens data























This post is designed for an Apache Hadoop 2.6.0 single cluster installation. The job uses a Hadoop Streaming design with C++, Ruby and Python. The MapReduce configuration is a standard or simple configuration without any tweaking on the streaming, mappers and reducers for enhanced performance. The dataset used is the GroupLens MovieLens 1M dataset. The file used is the ratings file (and the README.txt file).



1. Prepare the data




The data for the analysis is relatively easy to prepare. It can be imported (after extraction from the zipped format) into Microsoft Excel. The file can then be arranged into four columns (whose metadata can be obtained from the README.txt file packaged with the data). These are:

UserIDs range between 1 and 6040  
MovieIDs range between 1 and 3952
Ratings are made on a 5-star scale (whole-star ratings only)
Timestamp is represented in seconds since the epoch as returned by time(2).

There are a number of MapReduce jobs that can be set up on the dataset. The one selected for this illustration is to determine the following metrics:

Number of ratings made by each UserID
Number of ratings for each MovieID
Average rating for each MovieID
Number of ratings in each score category in the list {1,2,3,4,5}.


The data can be arranged into four files for the job. The UserID column can be extracted into a text file for analysis using wordcount MapReduce. The rationale for the approach is that number times a UserID occurs in the file column is equal to the number ratings made by the UserID. The scheme for generating the number of ratings for each movie follows analogously for a MovieID column text file. 

The average rating calculation outlined uses all four columns of the data in csv format, however, the Python (mapper/reducer) code can be easily adapted to include only the MovieID column and the Ratings column. The number of ratings for each score category can be determined (analogously to the UserID and MovieID approach) by using the ratings as words (instead of numbers).The Timestamp variable is not used in the illustration. 

Once the four files have been prepared they can be loaded into the Hadoop Distributed File System (HDFS) for the job. The next step is to prepare the mapper-reducer sets for the job.



2. Prepare the Mappers and Reducers




In this illustration the four mapper-reducer sets were prepared in three scripting languages (C++, Ruby and Python). 



C++ mapper-reducer set




The mapper-reducer set for the UserID wordcount MapReduce job was prepared in C++ according to the tutorial in this blog post.

C++ Mapper



C++ Reducer



The mapper.cpp file and reducer.cpp file have to be compiled into *.out files using the following commands on Ubuntu 14.04.3 LTS.


These will generate a mapper.out file and a reducer.out file which can be used in the Hadoop streaming.



Ruby mapper-reducer set 




The mapper-reducer set for the MovieID and Rating category wordcount MapReduce jobs was prepared in Ruby according to the tutorial in this blog post.

Ruby Mapper



Ruby Reducer





Python mapper-reducer set




The mapper-reducer set for the Average Ratings MapReduce job was prepared in Python according to the tutorial in this blog post.


Python Mapper



Python Reducer



It is important to run the chmod procedure for each file created in each of the mapper-reducer set creation processes. The mapper-reducer sets can then be saved in appropriate folders in the Hadoop local environment. The next step is to process the data in Hadoop using streaming.



3. Process the data in Hadoop




UserID file


The assumption is that the UserID input dataset (in HDFS) is called InputData.txt. In terms of the MapReduce Streaming code the assumption is that the data has been successfully loaded into HDFS folder - <HDFS input folder>, the hadoop-streaming-2.6.0.jar file is located in the local system folder - <hadoop-streaming-2.6.0.jar local folder> , the C++ mapper is located in local system mapper folder - < mapper folder>, the C++ reducer is located in local system reducer folder - < reducer folder>, and the HDFS output name has been selected to be - <HDFS output folder>. The next step is to run the following command in Hadoop.




This is an excerpt of my UserID rating counts output (6040 in total).




MovieID file


The assumption is that the MovieID input dataset (in HDFS) is called InputData.txt. In terms of the MapReduce Streaming code the assumption is that the data has been successfully loaded into HDFS folder - <HDFS input folder>, the hadoop-streaming-2.6.0.jar file is located in the local system folder - <hadoop-streaming-2.6.0.jar local folder> , the Ruby mapper is located in local system mapper folder - < mapper folder>, the Ruby reducer is located in local system reducer folder - < reducer folder>, and the HDFS output name has been selected to be - <HDFS output folder>. The next step is to run the following command in Hadoop.




This is an excerpt of my MovieID rating counts output (3706 in total).

























MovieID Ratings file


The assumption is that the MovieID Ratings input dataset (in HDFS) is called InputData.csv. In terms of the MapReduce Streaming code the assumption is that the data has been successfully loaded into HDFS folder - <HDFS input folder>, the hadoop-streaming-2.6.0.jar file is located in the local system folder - <hadoop-streaming-2.6.0.jar local folder> , the Python mapper (KeyAvgmapper.py) is located in local system mapper folder - < mapper folder>, the Python reducer (KeyAvgreducer.py) is located in local system reducer folder - < reducer folder>, and the HDFS output name has been selected to be - <HDFS output folder>. The next step is to run the following command in Hadoop.




This is an excerpt of my MovieID average ratings output (3706 in total).






Score Rating (category) counts file



The assumption is that the Score Rating (category) counts input dataset (in HDFS) is called InputData.txt. In terms of the MapReduce Streaming code the assumption is that the data has been successfully loaded into HDFS folder - <HDFS input folder>, the hadoop-streaming-2.6.0.jar file is located in the local system folder - <hadoop-streaming-2.6.0.jar local folder> , the Ruby mapper is located in local system mapper folder - < mapper folder>, the Ruby reducer is located in local system reducer folder - < reducer folder>, and the HDFS output name has been selected to be - <HDFS output folder>. The next step is to run the following command in Hadoop.




This is an excerpt of my category counts output.








4. Highlights from the results



The results from the MapReduce job were very interesting and exciting. I give a very brief summary from the SAS software PROC SGPLOT procedure using techniques from the tutorial in this post. I also supplemented the illustration with results from the SAS software PROC UNIVARIATE. The top 20 User_ID rating volumes (counts). 





The basic summary measures provide useful statistics for analyzing the User_ID rating volumes and also brief checks for the MapReduce job. These are the number of User_IDs that rated, 6040, and the sum of the observations which equals 1000209.




The top 20 Movie_ID rating volumes.



The brief checks for the MapReduce job and summary statistics of the Movie_ID rating volumes.













The top 20 Movie_ID average ratings. The highest average rating was 5. This was the average rating for the top ten movies.


























The brief check for the MapReduce job (only the 3706 in this case) and summary statistics of the Movie_ID rating averages.












The category counts are commensurate with the summary statistics of the movie averages.






 Summary




The MapReduce programming model provides a very powerful method to process data. The Apache Hadoop Streaming facility provides a way to customize the processing according to programming style and cluster resources.

The MovieLens dataset has a lot of information about user ratings. The MapReduce jobs considered in this post provide a simple way to begin to analyze the dataset.

I hope this post proves useful to you in applying MapReduce and analyzing the MovieLens dataset(s).


Interested in finding out more information about simple approaches to Hadoop Streaming and cloud computing?


  

Monday, February 22, 2016

Textual description of firstImageUrl

How to incorporate Python and R into a Hadoop 2.6.0 MapReduce job using Hadoop Streaming


























This setup guide is designed for an Apache Hadoop 2.6.0 installation. Hadoop streaming is a utility/facility that allows one to create and run MapReduce jobs with any executable or script as the mapper and/or reducer. The functionality is part of the Hadoop distribution. A detailed explanation of Hadoop streaming and Hadoop 2.6.0 can be found in the Apache Hadoop project website. In this post I will explain how to execute the Hadoop 2.6.0 MapReduce examples word count, word mean and word standard deviation. The examples word count, word mean and word standard deviation are also part of the Hadoop distribution. In this scheme, Hadoop Streaming is used for the word count MapReduce instead of the Hadoop distribution word count (implemented in my previous blogpost).


The first part of the post gives the setup and execution of word count using Hadoop Streaming MapReduce. The Hadoop Streaming MapReduce setup has mapper/reducer set in Python script and a set in R script. The second part of the post gives the setup and execution of the word mean and word standard deviation using the standard Hadoop MapReduce.


The MapReduce job is designed to analyze four sets of aggregates. These are the 2014 global population, 2014 global internet user population, 2012 Facebook population and the spatial time series variance-covariance matrix (annual steps) for the global internet user population between the years 2008 to 2014. The analysis of the first three sets of aggregates in Hadoop was for testing purposes and the last set was the main analysis.


In the scheme the word count MapReduce job was implemented using Python for all the sets. The word count MapReduce job using R script was implemented only for the global internet user population spatial time series variance-covariance matrix aggregates (the fourth set). The word mean and word standard deviation standard MapReduce jobs were also only implemented for the fourth set of aggregates.




1. Prepare the data




A detailed account of the aggregates can be found in my previous blog post: 5 matrix decompositions for visualizing the global internet user population spatial time series variance-covariance matrix. The data preparation essentially involved categorizing the aggregates into decile categories (classes). The decile classes are then given word values whose length gives an indication of the size of the figure. For example, the first decile class, namely, decile_one, had a word length that is shorter than that of the second decile class, namely, decile_two_. The naming convention is designed to facilitate the word mean and word standard deviation of the analysis.  


Decile classes for the 2014 Global population aggregates



The decile classes for the 2014 global population aggregates are shown in the following table.





Decile classes for the 2014 Global internet user population aggregates



The decile classes for the 2014 global internet user population aggregates are shown in the following table.






Decile classes for the 2012 Facebook user population aggregates



The decile classes for the 2012 Facebook user population aggregates are shown in the following table.





Decile classes for the Global internet user population (2008 to 2014) spatial time series variance-covariance matrix aggregates 



In processing the matrix, the first step is to obtain the absolute value for all the entries (in order to handle the cases of negative variance-covariances). It is also worth noting that variance-covariance matrices are symmetric so in this analysis one half of the off-diagonal elements can be omitted from the processing. There are advantages (lower number of values to process) and disadvantages (transforming the final Hadoop results before presenting them, possibilities of errors from further processing of the matrix because of its size and more complex processing procedures) of following this procedure. In the present analyses, however, they were retained in the processing because of the resulting disadvantages.

The decile classes for the 2008 to 2014 annual global internet user population aggregates are shown the following table.





The classified aggregates were then read into the Hadoop Distributed File System (HDFS) in preparation for the MapReduce job. The procedure for loading data into the HDFS can be found in the Apache Hadoop project website.



2. Prepare the Mappers and Reducers for Hadoop Streaming




The next step is to prepare the mappers and reducer scripts that will be used in the Streaming job.


Python mapper and reducer



The Python mapper and reducer for the word count jobs were obtained and prepared according to the tutorial in this post. The improved Python mapper and reducer combination was selected. The Python mapper is as follows:





The Python reducer is as follows:




R script mapper and reducer



The R Script mapper and reducer for the word count jobs were obtained and prepared according to this post. The R script mapper is as follows:




The R script reducer is as follows:






3. Analyze the data in Hadoop


The next step is to execute the jobs in Hadoop.


2014 Global population word count job (Python)



In this section the first assumption is that the 2014 global population data is called InputData.txt (i.e. name in HDFS). In terms of the MapReduce Streaming code the assumption is that the data has been successfully loaded into HDFS in the input folder - <HDFS input folder>, the hadoop-streaming-2.6.0.jar file is located in a local system folder called - <hadoop-streaming-2.6.0.jar local folder>, the Python mapper is located in a local system mapper folder called - <Python mapper folder>, the Python reducer is located in a local system reducer folder called - <Python reducer folder> and the HDFS output folder name has been selected to be - <HDFS output folder>. The next step is to run the following command in Hadoop.




In my case I obtained the following category counts.













2014 Global internet user population word count job (Python)



In this section the first assumption is that the 2014 global internet user population data is called InputData.txt (i.e. name in HDFS). In terms of the MapReduce Streaming code the assumption is that the data has been successfully loaded into HDFS in the input folder - <HDFS input folder>, the hadoop-streaming-2.6.0.jar file is located in a local system folder called - <hadoop-streaming-2.6.0.jar local folder>, the Python mapper is located in a local system mapper folder called - <Python mapper folder>, the Python reducer is located in a local system reducer folder called - <Python reducer folder> and the HDFS output folder name has been selected to be - <HDFS output folder>. The next step is to run the following command in Hadoop.






In my case I obtained the following category counts.













2012 Facebook user population word count job (Python)



In this section the first assumption is that the 2012 Facebook population data is called InputData.txt (i.e. name in HDFS). In terms of the MapReduce Streaming code the assumption is that the data has been successfully loaded into HDFS in the input folder - <HDFS input folder>, the hadoop-streaming-2.6.0.jar file is located in a local system folder called - <hadoop-streaming-2.6.0.jar local folder>, the Python mapper is located in a local system mapper folder called - <Python mapper folder>, the Python reducer is located in a local system reducer folder called - <Python reducer folder> and the HDFS output folder name has been selected to be - <HDFS output folder>. The next step is to run the following command in Hadoop.




In my case I obtained the following category counts.





2008 to 2014 Global internet user population spatial time series variance-covariance matrix word count job (Python)



In this section the first assumption is that the 2008 to 2014 global internet user population spatial time series variance-covariance matrix data is called InputData.txt (i.e. name in HDFS). In terms of the MapReduce Streaming code the assumption is that the data has been successfully loaded into HDFS in the input folder - <HDFS input folder>, the hadoop-streaming-2.6.0.jar file is located in a local system folder called - <hadoop-streaming-2.6.0.jar local folder>, the Python mapper is located in a local system mapper folder called - <Python mapper folder>, the Python reducer is located in a local system reducer folder called - <Python reducer folder> and the HDFS output folder name has been selected to be - <HDFS output folder>. The next step is to run the following command in Hadoop.




In my case I obtained the following category counts.











2008 to 2014 Global internet user population spatial time series variance-covariance matrix word count job (R script)




In this section the first assumption is that the 2008 to 2014 global internet user population spatial time series variance-covariance matrix data is called InputData.txt (i.e. name in HDFS). In terms of the MapReduce Streaming code the assumption is that the data has been successfully loaded into HDFS in the input folder - <HDFS input folder>, the hadoop-streaming-2.6.0.jar file is located in a local system folder called - <hadoop-streaming-2.6.0.jar local folder>, the R script mapper is located in a local system mapper folder called - <R script mapper folder>, the R script reducer is located in a local system reducer folder called - <R script  reducer folder> and the HDFS output folder name has been selected to be - <HDFS output folder>. The next step is to run the following command in Hadoop.





In my case I obtained the following category counts.














2008 to 2014 Global internet user population spatial time series variance-covariance matrix word mean and word standard deviation jobs



In this section the first assumption is that the 2008 to 2014 global internet user population spatial time series variance-covariance matrix data is called InputData.txt (i.e. name in HDFS). In terms of the standard Hadoop 2.6.0 MapReduce code the assumption is that the data has been successfully loaded in the input folder - <HDFS input folder>, the hadoop-mapreduce-examples-2.6.0.jar file is located in the local system  folder - < hadoop-mapreduce-examples-2.6.0.jar folder> and the HDFS output folder name has been selected to be - <HDFS output folder>. The word mean is obtained by running the following command in Hadoop.




In my case I obtained the following decile class based word mean value.





The word mean value (i.e. mean of the decile class data) is a function of the mean of the original data (quantitative values).

The word standard deviation is obtained by running the following command in Hadoop.




In my case I obtained the following decile class based word standard deviation value.




The word standard deviation (i.e standard deviation of the decile class data) is a function of the standard deviation of the original data (quantitative values). The scheme can be refined to use percentile divisions that have a finer granulity. An example of the approach is to use 5% percentile interval cut-offs, 2.5% and so on.



Summary 



In the post I outlined how to setup a MapReduce job that can be used to generate summaries of a big annual spatial time series variance-covariance matrix of the global internet user population between the years 2008 to 2014. The summaries can be used to generate more specific/elegant/specialized analyses of the spatial time series variance-covariance matrix.


The procedure is simple to execute in the sense that there are essentially three sets of MapReduce jobs/procedures that were run. For example, the R script procedure replicates the Python MapReduce job. Secondly, a Python word count procedure wthat has the same structure was run on four different data sets. This essentially reduces the jobs to a Hadoop Streaming Python MapReduce word count job, a standard Hadoop MapReduce word mean job and a standard Hadoop MapReduce word standard deviation job.


I hope this post proves useful for your own analyses. Check out my other related blog posts for a better context on how you can use the procedure in your own analyses.