Monday, October 17, 2016

How to summarize the GroupLens MovieLens 10M dataset using Flink, Go, Hadoop, MongoDB, Perl, Pig, Python, Ruby and Spark






This post is designed for a joint installation of Apache Flink 1.1.2, Golang 1.6, Apache Hadoop 2.6.0, MongoDB 2.4.9, MRJob 0.5.6, Perl Hadoop::Streaming 0.143060, Apache Pig 0.16.0, Apache Spark 1.6.2 (pre-built for Hadoop), Ubuntu Server 14.04.5 Long-Term-Support (LTS) and Wukong-Hadoop 0.2.0.

In this illustration we will consider the MovieLens population from the GroupLens MovieLens 10M dataset (Harper and Konstan, 2005). The specific 10M MovieLens datasets (files) considered are the ratings (ratings.dat file) and the movies (movies.dat file).  The aim of this post is to illustrate how to generate quick summaries of the MovieLens population from the datasets.

The measures that can be generated are as follows.

  • Number of ratings made by each UserID
  • Number of ratings for each MovieID
  • Number of ratings in each Rating score category
  • Average rating for each MovieID
  • Number of ratings for each MovieID genre
  • Average rating for each MovieID genre

The computed measures can then be incorporated into more detailed statistical analyses of the population. 

The measures can be computed using the MapReduce programming model. The MapReduce model can be implemented on files created from appropriate columns of the ratings and movies files. The first, second, third and fifth MapReduces can be implemented using an adjusted version of the word count configuration of the MapReduce model. 

The first key adjustment to the word count programming involves making an allowance for the structure of the data (i.e. the data is in column form rather than free flowing text). The second key adjustment involves treating blocks of numbers or special characters that designate entity ids as single words. 

In the resulting adjusted word count configuration, the MapReduce model will essentially involve mapping each UserID, MovieID, Rating score category or Genre category (i.e. keys) to 1 and then reducing the mapped key-value pairs to obtain the overall key counts.

The advantage of this interpretation of the word count MapReduce is that one can treat JavaScript Object Notation (JSON)/Binary JavaScript Object Notation (BSON) based MongoDB MapReduces as adjusted word counts (i.e. while actually being BSON-based counts of key-value pairs).

In this illustration we will also consider how to implement the adjusted word count MapReduce model to generate the fourth and sixth measures. In Flink, the batch word count MapReduce is applied to the data using the modified word count configuration interpretation.

The fourth MapReduce (for the fourth summary measure) will involve creating MovieID-Rating key-value mappings from two column dataset tuples and then reducing the key-value pairs to calculate the average rating for each key (MovieID). In this illustration this is referred to as implementing the average configuration of the MapReduce Model.

The sixth MapReduce similarly involves creating a mapping of the Genre-Rating key-value pairs and then reducing the key-value pairs to calculate the average rating for each key (Genre). 


In this illustration the word count MapReduce is implemented using twenty-five different methods. The methods result from blending four Hadoop Steaming interfaces/libraries (DMRGo, MRJob, Perl Hadoop::Streaming, Wukong-Hadoop) and five Big Data frameworks/platforms (Flink, Hadoop, MongoDB, Pig and Spark). 

In Hadoop, the word count is implemented using the Hadoop Streaming interface. In Flink, the batch word count is implemented in Java, Python and Scala. The Flink implementations will be illustrated in local mode. In Spark, the word count is implemented using a Scala Spark Pipe, PySpark Pipe, Java Spark Pipe and SparkR Pipe. The Spark implementations will also be illustrated in local mode. In Pig, the word count is implemented using Pig Streaming. In MongoDB, the (adjusted) word count is implemented using the JavaScript mapReduce() function in the MongoDB Shell and PyMongo (i.e. Code from bson.code in a PySpark application).  

The average MapReduce is implemented using a Python MRJob library program. The illustration will also consider how to construct query based summaries.


1. Prepare the data



The data is available in zip file format. The data from the ratings.dat file arranged into the following four columns.
















The Timestamps variable is not used in this illustration. The data can be arranged into five files for the job, namely, a UserID file, MovieID file, MovieID-Rating file, Rating score file and Fused MovieID-Rating file.

The UserID, MovieID, MovieID-Rating, Rating score and Fused MovieID-Rating files can be generated from the original ratings.dat file as follows.













The data from the movies.dat file can be arranged into the following three columns.













The files can then be used to generate three new files from the files generated from the ratings file. The first file that can be generated is the genres file or the genres counts source file. This can be generated by substituting each MovieID with its genre.

The second file that can be generated is the Genre-Rating file. This can be generated, analogously to the genres counts source file, by substituting the MovieID in the MovieID-Rating file with its genre.

The third file that can be generated is the Fused Genre-Rating file. This can be generated by fusing the two columns (Genre and Rating) in the Genre-Rating file.   

In summary, the files are generated as follows.









Once the files have been created the next step is to prepare the mappers and reducers.


2. Prepare the mapper and reducer sets



Go programming language mapper and reducer in DMRGo (Word count MapReduce configuration)



The DMRGo (Go) mapper and reducer can be prepared using a Go word count application developed using the DMRGo library available from this gist
The approach taken in this illustration is to create mapper and reducer files that are library based references to the Go programs housed in the application. The approach/method to perform this can be obtained from the tutorials in this manual, this post and this post.The first step involves creating this Mwordcount.go file (Mwordcount - Modified word count).



The Mwordcount.go file can be saved in local system folder called: <Local system Go word count application folder>. The program can be compiled into a Go word count application.

The following mapper and reducer Bash files can then be prepared.




The resulting files are the word count configuration MapReduce. The MapReduce can then be run in a choice of facilities using appropriate tools (i.e Bash line command-line interface submits in Ubuntu, shell programs or applications). The facilities that will considered for this purpose in this illustration are essentially the Hadoop Streaming interface, Scala Spark Pipe, PySpark Pipe, SparkR Pipe, Java Spark Pipe and Pig Stream operator. 

MRJob mapper and reducer (Word count MapReduce configuration)



The approach for constructing the Python MRJob (MRJob) mapper and reducer files is similar to the approach followed for the Go mapper and reducer files.

Essentially, one can first construct a Python MRJob word count Mapreduce program. The next step is to create Bash files using MRJob mapper and reducer library based references to Python code housed in a Python MRJob word count program (mrjobwc_program.py). The approach/method to perform this can be obtained from the tutorials in this manual and this document.


The mrjobwc_program.py file can be saved in local system folder <Local System Python MRJob WC program Folder>. One can then prepare following mapper and reducer Bash files.






Perl Hadoop::Streaming mapper and reducer (Word count MapReduce configuration)



The Perl Hadoop::Streaming (Hadoop::Streaming) mapper and reducer can be created using the Perl Hadoop::Streaming library. The approach/method to perform this can be obtained from the tutorials in this document and this gistThe following mapper and reducer files can be prepared.




Wukong-Hadoop mapper and reducer (Word count MapReduce configuration)



The approach for creating the Ruby Wukong-Hadoop (Wukong-Hadoop) mapper and reducer files follows analogously the approach for Go and MRJob wordcount.

The first step is to create a Hadoop-Wukong library word count program. One can then use the library based references to the Ruby code housed in a Ruby Wukong-Hadoop library word count program (wuword_count.rb) to create Bash mapper and reducer files. The approach/method to perform this can be obtained from the tutorial in this gist.


The Ruby word count program can be saved in local system folder <Local System Wu WC program Folder>. One can then prepare the following mapper and reducer Bash files.




MRJob mapper and reducer (Average MapReduce configuration)



The MRJob mapper and reducer for the Average MapReduce configuration are, analogously to the MRJob word count MapReduce case, library based references to Python code housed in a Python MRJob library average MapReduce program (mrjobavg_program.py). The approach/method to perform this can be obtained from the tutorials in this manual, this document and this post.


The mrjobavg_program.py file can be saved in local system folder <Local System Python MRJobAvg program Folder>. One can then prepare the following mapper and reducer Bash files.




3. Process the data in Flink, Hadoop, MongoDB, Pig and Spark



The UserID MapReduce calculation is illustrated in Hadoop (Go word count), Pig (Go word count), MongoDB (JavaScript and PyMongo Code) and Java Flink (Batch word count).

The MovieID MapReduce calculation is illustrated in Scala Spark (Go word count), Java Spark (Hadoop::Streaming word count) and MongoDB (JavaScript).

The Rating category counts MapReduce calculation is illustrated in PySpark (MRJob word count), Hadoop (Wukong-Hadoop word count) and Java Flink (Batch word count).

The average MapReduce calculation is illustrated in seven versions for the MovieID-Rating key-value map using two dataset structures. The first structure is a tab separated key-value two column dataset (MovieID-Rating file). The second structure is a single column key dataset (Fused MovieID-Rating file). The resulting average MapReduces are illustrated in the following configurations.

  • MRJob average configuration
  • Go word count configuration
  • MRJob word count configuration
  • Hadoop::Streaming word count configuration
  • Wukong-Hadoop word count configuration
  • Python Flink batch word count configuration
  • Scala Flink batch word count configuration

The specific MovieID averages calculation illustrations are as follows.


  • Java Spark Pipe (MRJob average configuration)
  • Java Spark Pipe (Go word count configuration)
  • Scala Spark Pipe (Hadoop::Streaming word count configuration)
  • SparkR Pipe (MRJob word count configuration)
  • Pig Streaming (Wukong-Hadoop word count configuration)
  • Python Flink (Batch word count configuration)
  • Scala Flink (Batch word count configuration)

The Genre counts MapReduce calculation is illustrated in SparkR Pipe (Go word count), Scala Spark Pipe (Wukong-Hadoop word count), Pig Streaming (Hadoop::Streaming word count) and Java Flink (Batch word count).

The fundamental structure of the Genre averages calculation is the same as (or similar to) that of the MovieID-Rating averages. The specific Genre averages calculation illustrations are as follows.


  • PySpark Pipe (MRJob average configuration)
  • Java Spark Pipe (Wukong-Hadoop word count configuration)
  • Scala Spark (Hadoop::Streaming word count configuration)
  • PySpark (Go word count configuration)
  • Hadoop (MRJob word count configuration)
  • Python Flink (Batch word count configuration)
  • Scala Flink (Batch word count configuration)



UserID counts




UserID counts (Go Hadoop Streaming)




In order to implement the UserID MapReduce using the Hadoop Streaming facility and a Go word count application, the following arrangements may be made.

Input data: InputData.txt
Hadoop Distributed File System (HDFS) input data folder: <HDFS Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Go mapper file: gomapper.sh
Local system Go reducer file: goreducer.sh
Local system Hadoop Streaming jar file folder: <Local System Hadoop Streaming jar Folder>
Hadoop Streaming jar file:  hadoop-streaming-2.6.0.jar
HDFS output data folder: <HDFS Output Data Folder>

The next step is to create this Bash file using the tutorial in this post and to save the file in local system folder: <Local System Bash  Hadoop Streaming Submit File Folder>.


The next step is to run the following command on Ubuntu Server 14.04.5 LTS (as outlined in this tutorial).



This will generate an output file in HDFS with the following contents excerpt.



















The next step is to generate counts of the UserIDs using a Go word count MapReduce in the Pig Streaming facility.



UserID counts (Pig Go Streaming Script)




In order to implement of the UserID counts MapReduce using the Pig Streaming facility with a Go word count application the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Go mapper file: gomapper.sh
Local system Go reducer file: goreducer.sh
Local system Pig Steam script file folder: <Local System Pig Script File Folder>
Pig Stream script file: PigGoStream.pig
HDFS output data folder: <HDFS Output Data Folder>

The next step is to create the following Pig script file using methods obtained from this book,  this wiki and this manual.


The next step is to save the Pig script file in local system folder: <Local System Pig Script File Folder>. The Pig script can be run in Pig from the command line (using mapreduce mode).



This will generate an output file in HDFS with the following contents excerpt.

















UserID counts (Java Flink Batch word count Application)



In order to implement the UserID counts MapReduce using the Java Flink Batch word count examples jar file one can follow the guidelines in this post. The following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Java Flink Batch word count examples jar file: WordCount.jar
Local system Java Flink Batch word count examples jar file folder:
<Local System Java Flink Batch word count examples jar File Folder>
Local system output data file: OutputData.txt
Local system output data folder: <Local System Output Data Folder>

The next step is to save the WordCount.jar file in local system folder:
<Local System Java Flink Batch word count examples jar File Folder>. 
One can then run the application using the Flink command-line interface.



This will generate a local system output file with the following contents excerpt.


















MovieID counts



MovieID counts (Scala Spark Go Pipe Application)


In order to implement the MovieID counts MapReduce using the Scala Spark Pipe facility with a Go word count application, the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Go mapper file: gomapper.sh
Local system Go reducer file: goreducer.sh
Local system jar file folder: <Local System jar File Folder>
Scala Spark Pipe App jar: ScalaGoPipeApp.jar
Local system output data folder: <Local System Output Data Folder>
Scala package: scalapackage
Scala object: scalaobject
Local system query scripts folder: <Local System Query Scripts Folder>
Local system rmongodb query script file: rmongodbqueryscript.R

Local system PyMongo query script file: pymongoqueryscript.py
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration

The next step is to create the following Scala Spark Go Pipe application file using the guidelines in this post, this post, this post, the Spark 1.6.2 Quick Start, the Spark 1.6.2 SQL Programming Guide, the aggregation section of the MongoDB manual, this gist and the PyMongo guide.





The next step is to export the file into a jar file and to save the jar file in local system folder: <Local System Scala jar File Folder>.  One can then run the Scala Spark Go Pipe application using the Spark-submit facility.





This will generate the following Spark SQL system output, MongoDB based NoSQL system output and a local system output file with the following contents excerpt.





























































MovieID counts (Java Spark Perl Pipe Application)



In order to implement the MovieID counts MapReduce using a Java Spark Pipe application and the Perl Hadoop::Streaming library (Word count configuration), the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Perl mapper file: map.pl
Local system Perl reducer file: reduce.pl
Local system Java Spark Perl Pipe application folder: <Local System jar File Folder>
Java Spark Perl Pipe application java file: JavaSparkPerlPipeApp.java
Java Spark Perl Pipe class: JavaSparkPerlPipe
Java Spark Perl Pipe application jar file: JavaSparkPerlPipeApp.jar
Local system output data folder: <Local System Output Data Folder>
Local system query scripts folder: <Local System Query Scripts Folder>
Local system query scripts submits Bash file: JavaScriptsSubmits.sh
Local system rmongodb query script file: rmongodbqueryscript.R

Local system PyMongo query script file: pymongoqueryscript.py
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration


The next step is to create the following Java Spark Hadoop::Streaming pipe application file (JavaSparkPerlPipeApp.java) using methods outlined in this book, this book, the Spark 1.6.2 Quick Start, the Spark 1.6.2 SQL Programming Guide, this post, this manual and the aggregation section of the MongoDB manual.





The next step is to save the Java Spark Perl Pipe application file in local system folder: <Local system Java Spark Perl Pipe Application Folder>

The next step is to run the application using the Spark-submit facility.



This will generate a local system output file with the following content excerpt, Spark SQL system output and (MongoDB-based) NoSQL system output.





























MovieID counts (Java Flink batch word count application)



In order to implement the MovieID counts MapReduce using the Java Flink word count batch examples application jar, the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Java Flink Batch wordcount examples jar file: WordCount.jar
Local system Java Flink Batch wordcount examples jar file folder:
<Local System Java Flink Batch wordcount examples jar File Folder>
Local system output data file: OutputData.txt
Local system output data folder: <Local System Output Data Folder>

The next step is to save the WordCount.jar file in the local system folder:
<Local System Java Flink Batch wordcount examples jar File Folder>. 

One can then run the application using the Flink run facility.



This will generate a local system output file with the following contents excerpt.


















UserID counts and MovieID counts




UserID counts and MovieID counts (MongoDB Shell)




The UserID and MovieID counts can also be generated using the mapReduce() function in the mongo Shell. In order to implement the MapReduces in the mongo Shell the following arrangements/selections may be made.

Database: MLens
UserID Collection: UserID
MovieID Collection: MovieID
Output collection name for UserID MapReduce: UserID_Counts
Output collection name for MovieID MapReduce: MovieID_Counts

The next step is to start the mongo Shell, switch to the MLens database (with the use MLens command) and to view the MLens collections with the show collections command.









The next step is to run the following shell program, which will run the MapReduce to calculate the UserID counts and generate a specific BSON query for ID 2 using the db.UserID_Counts.find({“_id”:2}).pretty() command.



The program commands will generate the following output.













The UserID_Counts can also be viewed with the general version db.UserID_Counts.find().pretty() command. The command will generate the following output.
















The next step is to run the following program to run the MapReduce to calculate the MovieID counts.



This will yield the following output for the MovieID counts MapReduce.





The individual JSON queries for MovieID 20 using the db.MovieID_Counts.find({“_id”:20}).pretty() will generate the following output:





The next step is to generate the counts for the Rating score categories in the Spark Pipe facility using a PySpark Pipe application. The PySpark Pipe application can also be used to implement the UserID counts MapReduce in MongoDB using PyMongo Code. 


Rating score counts and UserID counts (NoSQL query)



Rating score counts and UserID counts (PySpark MRJob Pipe Application)



In order to run the Rating score counts MapReduce with the UserID counts MapReduce using the PySpark Pipe facility, MRJob library (Word count configuration) and PyMongo, the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system MRJob mapper file: mrjobwcmapper.sh
Local system MRJob reducer file: mrjobwcreducer.sh
Local system PySpark MRJob Pipe application folder: <Local System PySpark MRJob Pipe Application Folder>
PySpark MRJob Pipe application file: PySparkMRJobPipeApp.py
Local system output data folder: <Local System Output Data Folder>
Local system query scripts folder: <Local System Query Scripts Folder>
Local system rmongodb query script file: rmongodbqueryscript.R
MongoDB: Have an instance of MongoDB running with the arrangements as outlined in the MongoDB part of the illustration

The next step is to create the following PySpark MRJob application file (PySparkMRJobPipeApp.py) using methods outlined in this post, this post, this post, the Spark 1.6.2 Quick Start, the Spark 1.6.2 SQL Programming Guide, this post, this manual and the aggregation section of the MongoDB manual.






One can then save the PySpark MRJob application file in local system folder: <Local System PySpark MRJob Pipe Application Folder> and run the application using the Spark-submit facility.



This will generate the following Spark SQL system output, MongoDB-based NoSQL system output and a local system file with the following contents excerpt.



































Rating score counts (Wukong-Hadoop Hadoop Streaming)


In order to implement the Rating counts MapReduce using Hadoop Streaming and the Wukong-Hadoop library (Word count configuration), the following arrangements may be made.

Input data: InputData.txt
Hadoop Distributed File System (HDFS) input data folder: <HDFS Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Wukong-Hadoop mapper file: wumapper.sh
Local system Wukong-Hadoop reducer file: wureducer.sh
Local system Hadoop Streaming jar file folder: <Local System Hadoop Streaming jar File Folder>
Hadoop Streaming jar file:  hadoop-streaming-2.6.0.jar
HDFS output data folder: <HDFS Output Data Folder>

The next step is to save the following Bash file in local system folder:
<Bash Hadoop Streaming Submit File Folder>.



The next step is to run the following command in Ubuntu Server 14.04.5 LTS.



This will generate an output file in HDFS with the following contents excerpt.









MovieID ratings average



MovieID ratings average (Java Spark MRJob Pipe Application)



In order to implement the MovieID ratings average MapReduce using the Java Spark Pipe facility and the MRJob library (Average configuration), the following arrangements may be made

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local system mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system MRJob mapper file: mrjobavgmapper.sh
Local system MRJob reducer file: mrjobavgreducer.sh
Local system jar file folder: <Local System jar File Folder>
Java Spark Pipe MRJob App jar: JavaSparkMRJobAvgPipeApp.jar
Local system output data foler: <Local System Output Data Folder>
Java class: JavaSparkMRJobAvgPipe
Local system query scripts folder: <Local System Query Scripts Folder>
Local system query scripts submits Bash file: JavaScriptsSubmits.sh
Local system rmongodb query script file: rmongodbqueryscript.R

Local system PyMongo query script file: pymongoqueryscript.py
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration

The next step is to create the following java file (JavaSparkMRJobAvgPipeApp.java).









The next step is to export the java file into a jar file and save the jar in local system folder: <Local System jar File Folder>.

The next step is to run the application using the Spark-submit facility.



This will generate a local system output file with the following contents excerpt, Spark SQL system output and NoSQL system output.
























MovieID ratings average (Java Spark Go Pipe Application)



In order to implement the MovieID ratings average MapReduce using the Java Spark Pipe facility and the DMRGo library (Word count configuration), the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Go mapper file: gomapper.sh
Local system Go reducer file: goreducer.sh
Local system jar file folder: <Local System jar File Folder>
Java Spark Pipe Go App jar: JavaSparkGoPipeApp.jar
Local system output data folder: <Local System Output Data Folder>
Java class: JavaSparkGoPipe
Local system query scripts folder: <Local System Query Scripts Folder>
Local system query scripts submits Bash file: JavaScriptsSubmits.sh
Local system rmongodb query script file: rmongodbqueryscript.R

Local system PyMongo query script file: pymongoqueryscript.py
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration
One can then create the following Java Spark Pipe application file.





The next step is to export the java file into a jar and to save the jar file in local system folder: <Local System Java jar File Folder>.

The next step is to run the application using the Spark-submit facility.



This will generate the following Spark SQL system output, MongoDB-based NoSQL output and a local system file with following file contents excerpt.
























MovieID ratings average (Scala Perl Pipe Application)



In order to implement the MovieID ratings average MapReduce using the Scala Spark Pipe facility and the Hadoop::Streaming library (Word count configuration), the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Perl mapper file: map.pl
Local system Perl reducer file: reduce.pl
Local system jar file folder: <Local System jar File Folder>
Scala Spark Pipe App jar: ScalaPerlPipeApp.jar
Local system output data folder: <Local System Output Data Folder>
Scala package: scalapackage
Scala object: scalaobject
Local system query scripts folder: <Local System Query Scripts Folder>
Local system rmongodb query script file: rmongodbqueryscript.R

Local system PyMongo query script file: pymongoqueryscript.py
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration

The next step is to create the following Scala Spark Pipe application file.





The next step is to export the scala file into a jar and save the jar file in local system folder: <Local System jar File Folder>.

The next step is to run the application using the Spark-submit facility.



This will generate a local system output file with the following contents excerpt, Spark SQL system output and MongoDB-based NoSQL system output.







































MovieID ratings average (SparkR MRJob Application)



In order to implement the MapReduce using the SparkR Pipe facility and a MRJob word count application, the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system MRJob mapper file: mrjobwcmapper.sh
Local system MRJob reducer file: mrjobwcreducer.sh
Local system SparkR MRJob Pipe application folder: <Local System SparkR Application Folder>
SparkR MRJobPipe R application file: SparkRMRJobPipeApp.R
Local system output data folder: <Local System Output Data Folder>
Local system query scripts folder: <Local System Query Scripts Folder>

Local system PyMongo query script file: pymongoqueryscript.py
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration
The next step is to create the following SparkR Pipe application file (SparkRMRJobPipeApp.R) using methods outlined in this guide, this guide, this package reference, this package reference, this post, this post, the Spark 1.6.2 Quick Start, the SQL 1.6.2 Programming Guide, this post and the aggregation section of the MongoDB manual.




The next step is to save the R file in local system folder: <Local system SparkR application folder>.

The next step is to run the application using the Spark-submit facility



This will generate the following Spark SQL system output, MongoDB-based NoSQL system output and a local system output file with the following contents excerpt.

















































MovieID ratings average (Pig Hadoop-Wukong Streaming Script)



In order to implement the MovieID ratings MapReduce using the Pig Streaming facility and the Wukong-Hadoop library (Word count configuration), the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Wukong-Hadoop mapper file: wumapper.sh
Local system Wukong-Hadoop reducer file: wureducer.sh
Local system Pig Steam script file folder: <Local System Pig Script File Folder>
Pig Stream script file: PigWuStream.pig
HDFS output data folder: <HDFS Output Data Folder>

The next step is to create the following Pig script file.  


The Pig script file can be saved in local system folder: <Local System Pig Script File Folder>. One can then run the Pig script in mapreduce mode from the command line using.



This will generate an output file in HDFS with the following contents excerpt.

















MovieID ratings average (Python Flink Application)



In order to implement the MapReduce using a Python Flink application the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Python Flink Batch wordcount application file: WordCount.py
Local system Python Flink application folder: <Local System Python Flink Application Folder>
Local system output data file: OutputData.txt
Local system output data folder: <Local System Output Data Folder>

The next step is to create the following Python word count application (WordCount.py) using methods outlined in this guide, this post and this post.



The Python Flink wordcount application can be saved in local system folder: Local system Python Flink application folder: <Local System Python Flink Application Folder>.

One can then run the Python Flink program with the following command.



This will generate a local system output file with the following contents excerpt.
















MovieID ratings average (Scala Flink Application)



In order to implement the MovieID ratings average MapReduce using a Scala Flink application, the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Scala Flink Batch wordcount application file: FlinkScalaApp.scala
Local system Scala Flink Batch application folder: <Local System Scala Flink Wordcount Application jar File Folder>
Local system output data file: OutputData.txt
Local system output data folder: <Local System Output Data Folder>
Scala package: scalapackage
Scala object: Wordcount
Scala Flink Batch wordcount application jar file: ScalaFlinkWordcountApplication.jar

The next step is to create the following Scala application file (FlinkScalaApp.scala) using methods outlined in the Flink DataSet API proramming guide and this post.




Th e FlinkScalaApp.scala file can be exported into a jar file which can be saved in local system folder: Local system Scala Flink application folder: <Local System Scala  Flink Application jar File Folder>.

One can then run the Scala application with the following command.



This will generate a local system output file with the following contents excerpt.


















Genre counts



Genre counts (SparkR Go Pipe Application)



In order to implement the Genre counts MapReduce using the SparkR application and a Go word count application, the following arrangements may be made.


Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Go mapper file: gomapper.sh
Local system Go reducer file: goreducer.sh
Local system SparkR Go Pipe application folder: <Local System SparkR Go Pipe Application Folder>
SparkR Go Pipe application file: SparkRGoPipeApp.R
Local system output data folder: <Local System Output Data Folder>
Local system query scripts folder: <Local System Query Scripts Folder>

Local system PyMongo query script file: pymongoqueryscript.py
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration
The next step is to create the following SparkR application file  (SparkRGoPipeApp.R).



The R file can be saved in local system folder: Local system SparkR Application folder: <Local System R file Folder>

One can then run the application using the Spark-submit facility.



This will generate the following Spark SQL system output, MongoDB-based NoSQL system output, Relative Frequency Histogram Plot and a local system output file with file contents excerpt.



















Genre counts (Scala Spark Wukong-Hadoop Pipe Application)



In order to implement the Genre counts MapReduce using the Scala Spark Pipe facility with the Wukong-Hadoop library, the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Wukong-Hadoop mapper file: wumapper.sh
Local system Wukong-Hadoop reducer file: wureducer.sh
Local system Scala Spark Wukong-HadoopPipe application folder: <Local System jar File Folder>
Scala Spark Wukong-Hadoop Pipe application jar file: ScalaSparkWukongHadoopPipeApp.jar
Scala package: scalapackage
Scala object: scalaobject
Local system output data folder: <Local System Output Data Folder>
Local system query scripts folder: <Local System Query Scripts Folder>
Local system rmongodb query script file: rmongodbqueryscript.R

Local system PyMongo query script file: pymongoqueryscript.py
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration

The next step is to create the following Scala Spark Wukong-Hadoop Pipe application scala file (ScalaSparkWuPipeApp.scala).



The next step is to export the scala file into a jar file (ScalaSparWuPipeApp.jar) and to save the Scala Spark Wu application jar file in local system folder: <Local System jar File Folder>.

One can then run the application using the Spark-submit facility.



This will generate the following Spark SQL system output, NoSQL system output and local system output file with file content excerpt.





































Genre counts (Pig Perl Hadoop::Streaming Streaming Script)




In order to implement the Genre counts MapReduce using the Pig Stream facility and the Perl Hadoop::Streaming library (Word count configuration), the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Perl Hadoop::Streaming mapper file: map.pl
Local system Perl Hadoop::Streaming reducer file: reduce.pl
Local system Pig Steam script file folder: <Local System Pig Script File Folder>
Pig Stream script file: PigPerlStream.pig
HDFS output data folder: <HDFS Output Data Folder>

The next step is to create the following Pig script file. 


The next step is to save the Pig script file in local system folder: <Local System Pig Script File Folder> and run the Pig script in mapreduce mode from the command line using.



This will generate a local system output file with the following file content excerpt.








Genre ratings average




Genre ratings average (PySpark MRJob Pipe Application)




The Genre ratings average MapReduce can be implemented using a PySpark application and the MRJob library (Average configuration). In order to do this, the following arrangements can be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system MRJob mapper file: mrjobavgmapper.sh
Local system MRJob reducer file: mrjobavgreducer.sh
Local system PySpark MRJob Avg Pipe application file folder: <Local System PySpark MRJob Avg Pipe Application Folder>
PySpark MRJob Avg Pipe App file: PySparkMRJobAvgPipeApp.py
Local system output data folder: <Local System Output Data Folder>
Local system query scripts folder: <Local System Query Scripts Folder>
Local system rmongodb query script file: rmongodbqueryscript.R
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration
The next step is to create the following Python file (PySparkMRJobAvgPipeApp.py).




The PySpark MRJob Avg Pipe application file can be saved in local system folder: <Local System PySpark MRJob Avg Pipe Application Folder>.

One can then run the application using the Spark-submit facility.



This will generate the following Spark SQL system output, MongoDB-based NoSQL system Output and  a local system output file with file contents excerpt.















































Genre ratings average (Scala Spark Perl Pipe Application)



In order to implement the MovieID ratings average MapReduce using the Scala Spark Pipe facility and the Perl Hadoop::Streaming library (Word count configuration), the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Perl mapper file: map.pl
Local system Perl reducer file: reduce.pl
Local system jar file folder: <Local System jar file Folder>
Scala Spark Pipe App jar: ScalaPerlPipeApp2.jar
Local system output data folder: <Local System Output Data Folder>
Scala package: scalapackage
Scala object: scalaobject
Local system query scripts folder: <Local System Query Scripts Folder>
Local system rmongodb query script file: rmongodbqueryscript.R

Local system PyMongo query script file: pymongoqueryscript.py
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration
The next step is to create the following Spark Perl Pipe application Scala file (SparkScalaPerlPipeApp2.scala).




The next step is to export the scala file into a jar and to save the jar in local system folder: <Local System jar file Folder>.

One can then run the application using the Spark-submit facility.


This will generate the following Spark SQL system output, NoSQL system output and a local system output file with file contents excerpt.







































Genre ratings average (Java Spark Wukong-Hadoop Pipe Application)



In order to implement the Genre ratings average MapReduce using the Java Spark Pipe facility and the Wukong-Hadoop library, the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Wukong-Hadoop mapper file: wumapper.sh
Local system Wukong-Hadoop reducer file: wureducer.sh
Local system Java Spark Wukong-Hadoop Pipe application folder: <Local System jar File Folder>
Java Spark Wukong-Hadoop Pipe application java file: SparkJavaWuPipeApp.java
Java Spark Wukong-Hadoop Pipe class: SparkJavaWuPipeApp
Java Spark Wukong-Hadoop Pipe application jar file: JavaSparkWuPipeApp.jar
Local system output data folder: <Local System Output Data Folder>
Local system query scripts folder: <Local System Query Scripts Folder>
Local system query scripts submits Bash file: JavaScriptsSubmits.sh
Local system rmongodb query script file: rmongodbqueryscript.R
Local system PyMongo query script file: pymongoqueryscript.py
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration
The next step is to create the following Java Spark Wukong-Hadoop Pipe application file (JavaSparkWuPipeApp.java).






The next step is to export the file into a jar file and save the jar file in local system folder: <Local system Java Spark Wu Pipe Application Folder>

One can then run the application using the Spark-submit facility.



This will generate a local system file with the following file contents excerpt, Spark SQL system output and MongoDB-based NoSQL system output.



























Genre average (PySpark Go Pipe Application)




In order to implement the Genre average MapReduce using the PySpark Pipe facility, the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Go mapper file: gomapper.sh
Local system Go reducer file: goreducer.sh
Local system PySpark Go Pipe application folder: <Local System PySpark Go Pipe Application Folder>
PySpark Go Pipe application file: PySparkGoPipeApp.py
Local system output data folder: <Local System Output Data Folder>
Local system query scripts folder: <Local System Query Scripts Folder>
Local system rmongodb query script file: rmongodbqueryscript.R
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration

The next step is to create the following PySpark Go application file (PySparkMRJobPipeApp.py).




The next step is to save the PySpark Go application file in local system folder: <Local System PySpark Go Pipe Application Folder>

One can then run the application using the Spark-submit facility.


This will generate the following Spark SQL system output, MongoDB-based NoSQL system output and a local system output file with file contents excerpt.




















































Genre ratings average (MRJob Hadoop Streaming)



In order to implement the Genre rating average MapReduce using Hadoop Streaming and the MRJob library (word count configuration) the following arrangements may be made.

Input data: InputData.txt
Hadoop Distributed File System (HDFS) input data folder: < HDFS Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system MRJob mapper file: mrjobwcmapper.sh
Local system MRJob reducer folder: mrjobwcreducer.sh
Local system Hadoop Streaming jar file folder: <Local System Hadoop Streaming jar File Folder>
Hadoop Streaming jar file:  hadoop-streaming-2.6.0.jar
HDFS output data folder: <HDFS Output Data Folder>

The next step is to save the following Bash Hadoop Streaming submit file in local system folder: <Local System Bash Hadoop Streaming Submit File Folder>.



The next step is to run the following command on Ubuntu Server 14.04.5 LTS.



This will generate an output file in HDFS with the following contents excerpt.



















Genre ratings average (Python Flink Application)




In order to implement the MapReduce using a Python Flink application, the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Python Flink Batch wordcount application file:  WordCount.py
Local system python Flink application folder: <Local System Python Flink Application Folder>
Local system output data file: OutputData.txt
Local system output data folder: <Local System Output Data Folder>

The next step is to create the following Python Flink application file (WordCount.py).


The Python Flink wordcount application file can then be saved in local system folder: Local system Python Flink application folder: <Local System Python Flink Application Folder>.

One can then run the Python Flink application with the following command.



This will create a local system output file with the following file contents excerpt.

















Genre ratings average (Scala Flink Application)



In order to implement the Genre ratings average MapReduce using a Scala Flink application, the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Scala Flink Batch wordcount application file:  WordCount.py
Local system Scala Flink Batch application folder: <Local System Scala Flink Application Folder>
Local system output data file: OutputData.txt
Local system output data folder: <Local System Output Data Folder>
Scala Flink Batch wordcount application jar file: ScalaFlinkWordcountApplication.jar


The next step is to create the following Scala Flink application file (WordCount.scala).



The next step is to export the Wordcount.scala file into a jar file and to save the Scala Flink word count jar file in local system folder:  <Local System Scala  Flink Application jar File Folder>.

One can then run the Scala Flink application with the following command.



This will generate a local system output file with the following file contents excerpt.



















4. Data query and data interaction


The processed data can be queried interactively using ShinyMongo and Shiny. ShinyMongo can be used to create web applications that include an interface that can be used by the user to query the data in the MongoDB illustrations (generated in JavaScript and PyMongo) interactively using JSON/BSON syntax.


Shiny can be used to create Shiny Applications that generate web based interactive histograms for the UserID, MovieID, Rating Score  and Genrecounts.



ShinyMongo Application


The tutorial and the R scripts (server.R and ui.R) on how to generate interactive queries on data in MongoDB using ShinyMongo can be found in this gist. The method that can be used to create the applications from the ShinyMongo gist can be found in this Shiny tutorial

The application will generate the following web based interface, output and JSON/BSON queries for the MongoDB data.





















































































Shiny Applications 


In order to generate a Shiny application that generates an interactive web based histogram for the UserID_Counts variable the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Shiny library: Install Shiny package in R

The next step is to create the following two files and save them in a local system folder as outlined in this tutorial.




The next step is to run the application using the runApp() command and it will result in the following web based interface.


The output for 30 Histogram bins is as follows.















The output for 15 bins is as follows.













The applications for the MovieID, Rating score and Genre counts variables can be generated analogously to the case of the UserID counts variable.


The web-based output for the MovieID counts variable and 30 bins is as follows.















The web-based output for the MovieID counts variable and 15 bins is as follows.













The web-based output for the Rating score counts variable and 30 bins is as follows.














The web-based output for the Rating score counts variable and 10 bins is as follows.















The web-based output for the Genre counts variable and 30 bins is as follows.














The web-based output for the Genre counts variable and 15 bins is as follows.














5. Summarize the data



Summary statistics


The summary statistics can be generated in R using the stat.desc() function from the pastecs package.


UserID variable counts










MovieID variable counts










Rating score counts









Genre variable counts












Histograms




The histograms can be generated in Relative Frequency form using the PlotRelativeFrequency() function from the HistogramTools package.


UserID variable counts




In order to generate the Relative Frequency Histogram for the UserID_Counts variable one can use the following R command sequence.










This will generate the following histogram for the UserID counts.

























The other variables can be generated analogously.



MovieID variable counts



























Rating score variable counts




























Genre variable counts



























The next step is to generate summary measures of the metrics of the data generated during the data processing in section 3 using the SAS software Proc SGPlot statement.


6. Analyze the data


The summary measures of the metrics can be depicted graphically using the SAS software PROC SGPlot statement. The bar graph categories can also be further analysed using Bash script grep decompositions in Ubuntu Server 14.04.5 LTS and Hadoop.



UserID counts


In order to generate the bar graph for the UserID variable one can first create a Microsoft Excel file with a UserID column and a Counts column. The two columns can be named UserID for the user-ids and Counts for the user-id counts. 

The file can then be imported into a work.import dataset in the SAS software using the Proc Import statement. The following program can be prepared using the method outlined in this post and then be run in the SAS software.




This will generate the following plot.




















The plots for the other variables can be generated analogously to the case for the UserID variable counts.



Rating score counts




















The Rating score counts can be used to calculate the weighted mean and weighted variance of the ratings. 
In the SAS software this can be done with the PROC MEANS statement with the counts as the weights.

In the SAS software, one can run the following program with the values column named Var and the counts/weights column named Counts in a work.import dataset.




This will result in the following output.









In R, one can use the Weighted.Desc.Stat package and the following functions.

The application of these functions to the data will lead to the following output.


















The following illustrates how one can perform the calculation for the weighted (sample) mean and weighted (sample) variance using vector calculus in R.
















MovieID counts 





















MovieID Counts decomposition



Ubuntu 14.04.5 Server LTS grep


In order to generate a grep decomposition for the rating counts of MovieID 296, one can take the counts from one of the files (treating one output file partitions as one file) of the FusedMovieID-Rating MapReduce in section three (i.e. output files from one of Java Spark Pipe Go word count, Scala Spark Pipe Perl word count, SparkR Pipe MRJob word count, Pig Wu word count, Python Flink word count or Scala Flink word count) and treat it as an input file for the grep decomposition. If one selects, say, the Pig Wu Streaming output file as the input dataset then one may make the following arrangements/selections.

Input data: InputData.txt
Local system Bash script folder: <Bash Decomposition Folder>
Local system input data folder: <Local System Input Data Folder>
Local system MovieID counts decomposition Bash file: BashMovieIDCountsDecomp.sh
Local system Bash decomposition file folder: <Bash Decomposition Folder>

The next step is to create the following Bash file.




The next step is to save the file in local system folder: <Bash Decomposition Folder> and run the following command in Ubuntu Server 14.04.5 LTS.



This will generate the following Bash system output.










Hadoop grep

The equivalent output can be generated in Hadoop using the grep class from the hadoop-mapreduce-examples-2.6.0.jar and the FusedMovieID-Rating.txt file. 

In order to run the Hadoop grep decomposition of the rating counts for MovieID 296 in Hadoop the following arrangements/selections may be made.

Input data (i.e. Fused MovieID-Rating text file): InputData.txt
HDFS input data folder: <HDFS Input Data Folder>
HDFS output data folder: <HDFS output Folder>
Local system MovieID counts Hadoop decomposition Bash file:  HadoopMovieIDCountsDecomp.sh
Local system Hadoop Bash decomposition file folder: <Bash Hadoop Decomposition Folder>
Local system Hadoop MapReduce examples jar file Folder: <Local System Hadoop mapreduce examples jar file Folder>
Hadoop MapReduce examples jar file: hadoop-mapreduce-examples-2.6.0.jar

The next step is to create the following Bash file.


The next step is to save the file in local system folder: <Bash Hadoop Decomposition Folder>

One can then run the following command in Ubuntu Server 14.04.5 LTS.



This will generate an output file in HDFS with the following contents.










MovieID Rating averages






















MovieID Rating averages decomposition



In the case of the averages decomposition, it is important to note that the decomposition of the rating counts also allows one to calculate the average rating for a MovieID. This means that one can retain the structure of the Bash and Hadoop decomposition scripts for the averages decompositions.



Ubuntu Server 14.04.5 LTS grep


In order to generate a grep decomposition of the average for MovieID 33264, one can take the counts from one of the files  (treating one output file partitions as one file) from the Fused MovieID-Rating MapReduce in section three (i.e. output files from one of Java Spark Pipe Go word count, Scala Spark Pipe Perl word count, SparkR Pipe MRJob word count, Pig Wu word count, Python Flink word count or Scala Flink word count) and treat it as an input file for the grep decomposition. 

If one selects say the Pig Perl Streaming output as the input dataset then one may make the following arrangements/selections.

Input data: InputData.txt
Local system Bash script folder: <Bash Decomposition Folder>
Local system input data folder: <Local System Input Data Folder>
Local system MovieID counts decomposition Bash file: BashMovieIDAvgDecomp.sh
Local system Bash decomposition file folder: <Bash Decomposition Folder>

The next step is to create the following Bash file.






The next step is to save the file in local system folder: <Bash Decomposition Folder>
and run the following command in Ubuntu Server 14.04.5 LTS.



This will generate the following output.





Hadoop grep


In order to run the Hadoop grep decomposition for the average rating of MovieID 33264, the following arrangements may be made.


Input data (i.e. FusedMovieID-Rating text file): InputData.txt
HDFS input data folder: <HDFS Input Data Folder>

HDFS output data folder: <HDFS output Folder>
Local system MovieID average Hadoop decomposition Bash file:  BashMAvgHadoopDecomp.sh
Local system Hadoop Bash decomposition file folder: <Bash Hadoop Decomposition Folder>
Local system Hadoop MapReduce examples jar file Folder: <Local System Hadoop mapreduce examples jar file Folder>

Hadoop MapReduce examples jar file: hadoop-mapreduce-examples-2.6.0.jar

The next step is to create the following Bash file.


The next step is to save the file in local system folder: <Bash Hadoop Decomposition Folder> and run the following command in Ubuntu Server 14.04.5 LTS.



This will generate an output file in HDFS with the following contents.





MovieID Genre counts
























MovieID Genre counts decomposition




The Genre decompositions can be generated analogously to those of the MovieID ratings and MovieID ratings averages, using the output of the Fused Genre-Rating counts MapReduces for Ubuntu Server and the Fused Genre-Rating file for Hadoop.



Ubuntu Server 14.04.5 LTS grep


In order to generate a grep decomposition of Genre Drama, one can take the counts from one of the output files (treating one output file partitions as one file) in the Fused Genre-Rating MapReduces in section three (i.e. output files from one of Scala Spark Pipe Perl word count, Java Spark Pipe Wu word count, PySpark Pipe MRJob word count, Hadoop MRJob word count, Python Flink word count or Scala Flink word count) and treat it as an input file for the grep decomposition. 

If one selects say the Spark Java Wu output files and binds them into a single file (adding the rows of the second file below the rows of the first one). If one uses the resulting file as the input dataset then one can make the following arrangements/selections:

Input data: InputData.txt
Local system Bash script folder: <Bash Decomposition Folder>
Local system input data folder: <Local System Input Data Folder>
Local system Genre counts decomposition Bash file: BashCountsDramaDecomp.sh
Local system Bash decomposition file folder: <Bash Decomposition Folder

The next step is to create the following Bash file.



The next step is to save the file in local system folder: <Bash Decomposition Folder> and to run the following command in Ubuntu Server 14.04.5 LTS.



This will generate the following output.


















Hadoop grep



In order to run the Hadoop grep decomposition of the Drama Genre ratings the following arrangements may be made.


Input data (i.e. Fused Genre-Rating text file): InputData.txt
HDFS input data folder: <HDFS Input Data Folder>

Local system Genre counts Hadoop decomposition Bash file:  BashGenreDramaHadoopDecomp.sh
Local system Hadoop Bash decomposition file folder: <Bash Hadoop Decomposition Folder>
Local system Hadoop MapReduce examples jar file Folder: <Local System Hadoop mapreduce examples jar file Folder>

Hadoop MapReduce examples jar file: hadoop-mapreduce-examples-2.6.0.jar

The next step is to create the following Bash file.



The next step is to save the file in local system folder: <Bash Hadoop Decomposition Folder and run the following command in Ubuntu Server 14.04.5 LTS.




This will generate an output file in HDFS with the following contents.














MovieID Genre counts























MovieID Genre averages decomposition



Ubuntu Server 14.04.5 LTS grep



In order to generate a grep decomposition of rating average for the Genre Animation|IMAX|Sci-Fi, one can take the counts from one of the output files (treating one output file partitions as one file) from the Fused Genre-Rating MapReduces in section three (i.e. output files from one of Scala Spark Pipe Perl word count, Java Spark Pipe Wu word count, PySpark Pipe MRJob word count, Hadoop MRJob word count, Python Flink word count or Scala Flink word count) and treat it as an input file for the grep decomposition.


 If one selects, say, the Spark Java Wu output files and binds them into a single file (i.e. adding the rows of the second file below the rows of the first file) and uses the resulting file as an input dataset for the MovieID Genre averages Ubuntu grep decomposition. 

One can may make the following arrangements/selections:

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system Bash script folder: <Bash Decomposition Folder>
Local system Genre average decomposition bash file: BashGenreAvgDecomp.sh
Local system Bash decomposition file folder: <Bash Decomposition Folder


The next step is to create the following Bash file.


The next step is to save the file in local system folder: <Bash Decomposition Folder> and run the following command in Ubuntu Server 14.04.5 LTS.



This will generate the following output.








Hadoop grep



In order to run the Hadoop grep decomposition of the average rating for the Animation|IMAX|Sci-Fi Genre, the following arrangements may be made.

Input data (i.e. FusedGenre text file): InputData.txt
HDFS input data folder: <HDFS Input Data Folder>

HDFS output data folder: <HDFS output Folder>
Local system Genre average Hadoop decomposition bash file:  BashGenreAvgHadoopDecomp.sh
Local system Hadoop Bash decomposition file folder: <Bash Hadoop Decomposition Folder>
Local system Hadoop MapReduce examples jar file Folder: <Local System Hadoop mapreduce examples jar file Folder>
Hadoop MapReduce examples jar file: hadoop-mapreduce-examples-2.6.0.jar

The next step is to create the following Bash file.


The next step is to save the file in local system folder: <Bash Hadoop Decomposition Folder> and run the following command in Ubuntu Server 14.04.5 LTS.



This will generate an output file in HDFS with the following contents.








7. Conclusions




In the illustration we considered how to generate summary measures for the GroupLens MovieLens 10M ratings.dat and movie.dat datasets using the MapReduce programming model. The specific MapReduce configurations considered were the word count configuration and the average configuration. 

The MapReduces were, in turn, constructed using four Hadoop Streaming libraries and two MongoDB interfaces. The MapReduce illustrations were implemented in ten (eleven) facilities, namely, Hadoop Streaming, Pig Streaming, Scala Spark Pipe, PySpark Pipe, Java Spark Pipe, SparkR Pipe, MongoDB (JavaScript) and MongoDB (PyMongo), Java Flink, Python Flink and Scala Flink. The illustration list can be further generalized to more MapReduce configurations and facilities according to user preference.


The resulting output data sets were further summarized using Bash, Hadoop, R and the SAS software in order to illustrate the kind of information that can be mined from the data sets.



Interested in exploring more material on Big Data and Cloud Computing from the Stats Cosmos blog?

Check out my other posts
















































Subscribe via RSS to keep updated
















Check out my course at Udemy College















Check out our Big Data and statistical Services















Sources

http://bit.ly/2dmK2hv
http://bit.ly/2dyzOYI
http://bit.ly/2dM5xsG
http://bit.ly/2dWSWlN
http://bit.ly/2dzV9kx
http://bit.ly/2ebNLfh
http://bit.ly/2e5yaBn
http://bit.ly/29Hbbwn
http://bit.ly/2e27GzK
http://bit.ly/1Uo1MH8
http://bit.ly/2efZi2p
http://bit.ly/2dV7LF8