Wednesday, March 28, 2018

How to analyze TF Cosine-based Similarity measures for the Last.fm Social Tagging System with Apache Hive and Apache Spark SQL






























*Photo by chuttersnap on Unsplash


This post is one in a series of posts designed for a joint installation of Apache FlinkApache HadoopApache HiveMongoDB, Apache Pig, Apache Spark (pre-built for Hadoop) and Ubuntu. This post is designed for an installation of Apache Hive 2.1.1, Apache Hadoop 2.6.1, Apache Spark 2.2.0 (pre-built for Hadoop) and Ubuntu 16.04.2. The purpose of the illustrations in the posts is to show how one can construct content-based recommendation measures for the Last.fm social system using the GroupLens HetRec Last.fm dataset. The modelling framework for the similarity measure analysis is that outlined in Cantador, Bellogin and Vallet (2010). The post follows on from my previous post: How to summarize Last.fm Social Tagging System Profiles using Golang, Hadoop, MongoDB and Spark.

The calculation of the similarity measures for the analyses in the posts involves implementing seventeen MapReduces on the user_taggedartists.dat dataset. The first six MapReduces were implemented in the previous post.

The similarity measures considered (in the posts) are as follows:
  • TF-based Similarity
  • TF Cosine-based Similarity
  • TF-IDF Cosine-based Similarity
  • Okapi BM25-based Similarity
  • Okapi BM25 Cosine-based Similarity


In this post only the TF Cosine-based Similarity measure is considered. The methodology for the construction of the measures is considered in How to summarize Last.fm Social Tagging System Profiles using Golang, Hadoop, MongoDB and Spark.

The MapReduces in the post series are illustrated using piping and non-piping methods. The purpose of the approach is to provide a choice of methods according to each setting. The advantage of this approach is that it inherently highlights the features available from each application programming interface (API). This, in turn, provides a casual portfolio of platform specific methods to choose from to implement the MapReduces. For example, one can implement the MapReduce using a Java (Apache) Spark pipe application, Java Spark non-pipe application or a Java (Apache) Flink Dataset application. The three approaches inherently illustrate the kind of programming advantages that can be harnessed from the features of the Spark Java Resilient Distributed Dataset (RDD) and the Flink Java Dataset.


The Flink illustrations use non-piping methods. The Spark illustrations use a mixture of piping and non-piping methods.The (Apache) Hadoop, Hive and Pig parts of the illustrations use piping type methods. The Hadoop MapReduce illustration uses piping type methods through the Hadoop Streaming facility. The Hive illustration uses piping type methods through the Hive map and reduce commands. The Spark SQL part of the illustration uses piping type methods through the Hive2 transform. The Pig part of the illustration uses piping type methods through the Pig stream facility. 


In this post the piping type methods use mapper-reducer sets prepared in Java and Python. In the post series the MapReduces have two main orders of magnitude, namely, three set and one set. In the case of the three set MapReduces the three set mapper-reducer sets can be used with the:  Hadoop Streaming facility; Hive map and reduce commands; Pig stream facility;  Java Spark pipe facility; Scala Spark pipe facility; SparkR pipe facility; PySpark pipe facility and Spark SQL (using the Beeline interface to the Spark SQL Thrift Server) transform command. 

The three set MapReduce piping type scripts are used to calculate the Cosine-based similarity measures (TF, TF-IDF, and Okapi BM25).


1. Prepare the data



The implementation of similarity measure MapReduces one to four to obtain the elements used in the proposed profile and recommendation models (Table one) in Cantador, Bellogin and Vallet (2010) were illustrated in the first post in the series.  

In this post the calculation involves using the output datasets from the first two MapReduces. The output dataset for the users will have um,l as the index/key and tfum(tl) as the value. The output dataset for the items will have in,l as the index/key and tfin(tl) as the value.

From the {{user id, tag}, user tag frequency} key-value pair (um,l, tfum (tl)) and {{item id,tag}, item tag frequency} key-value pair (in.l, tfin(tl)) in the output files from the first two MapReduces create new combined key-value combinations ({um,in,}, tfum (tl), tfin(tl)) without the tag part of the uncombined key indices (i.e. keep the user um index and item in index, respectively) for the similarity measure MapReduce.

In the MapReduce mapping phase the numerator entry values can be the cross-products tfum(tl)*tfin(tl) (i.e. the tl entry must be the same in the product) and the denominator values can be squares of the individual values in the form of (tfum(tl))2 and (tfin) (tl))2 ). In the reduce phase the sums can be outputted by key for the numerator and the square roots of the sums can be outputted by key for the denominators.

The operational aspects of the calculation in this illustration are as follows:

The combined tuple will initially take the form, {(um,l,in,l), tfu_m(tl), tfin(tl )}. Then, one will change the tuple to, say, the following {(um;in;), tfum(tl), tfin (tl)} for the actual MapReduce. It is important to make sure that the {tfum(tl), tfin(tl)} part of the tuple always pertains to the same l during the data preparation. Hence, one simply has to make sure that all the l’s for each (um;in;) key are obtained because this information is lost during the mapping phase when the keys do not include the information about the tag l for each key-value pair for the actual MapReduce. The products for the numerator can be programmed into the mapper. The squares for the denominators can also be programmed into the mapper.

In the reduce phase, the values of the numerator products can be summed and the totals outputted by key-value combination. In the case of the denominator entries, the square roots of the sums can be outputted for each key, value combination.  This will generate the outputs required by the similarity measure formulae in the case of the three set MapReduces. This is how the three term MapReduces can be implemented.

The next step is to construct the mapper-reducer sets in order to implement the MapReduces for the similarity measure in Hive and Spark SQL.


2. Prepare the mapper-reduce sets



Java mapper-reducer set


The Java mapper-reducer set was prepared using the tutorial in this post. The mapper is as follows:



The reducer is as follows:



The next step is to compile the two files into classes with the javac command:


 The java classes can be run using shell scripts. The shell script to run the mapper:


The shell script to run the reducer:



The chmod command can be used to give the files (Java, Java classes, and Bash) execution permission:



Python mapper-reducer set


The Python mapper-reducer set was prepared using a framework outlined in this book and this post. The mapper is as follows:



The reducer is as follows:





The chmod command can be used to give the files execution permission:



The mapper and reducer files can be copied from the <Local System MapReduce Folder> folder to the <SPARK_HOME> folder for the Beeline processing.


3. Process the data in Hive



The three set MapReduces in the post series aim to introduce the different methods for calculating the three set (Cosine-based) Similarity measures using the map and reduce functions. The three set MapReduces in piping type form are implemented within the Hadoop MapReduce framework (using Hadoop Streaming, Pig stream command and Hive map/reduce commands) and the Spark in-memory framework (using the Spark pipe function and Hive2 transform command with SparkSQL).

The three set MapReduce can be implemented in Hive with the Bash based Java three set mapper-reducer set using the following script prepared according to the tutorial in this post and this post.



The three set MapReduce can be implemented after making the following arrangements:

Input data: InputData.txt
Hadoop Distributed File System (HDFS) Input data folder: <HDFS Input Data Folder>
Local system Hive script folder: <Local System Hive script Folder>
Hive script: HiveThreesetscript.sql
Three set mapper: Threesetmapper.sh
Three set reducer: Threesetreducer.sh
Local system MapReduce folder for the mapper-reducer set: <Local System MapReduce Folder>
The HDFS output data folder: <HDFS Output Data Folder>

The script can be submitted to Hive using the following command:



This will yield the following output:



















4. Check the results in Spark SQL



The results of the three set MapReduce in section three can be replicated with the Python mapper-reducer set using a Hive2 script in the Spark SQL Thrift Server submitted to the Beeline interface. The three set MapReduce can be implemented using the following Hive2 script prepared using the tutorial in this post and this post.



In order to implement the three set MapReduce in Spark SQL using Beeline the following arrangements can be made:

Input data: InputData.txt
Local system Input data folder: <Local System Input Data Folder>
Local system Beeline script folder: <Local System Beeline script Folder>
Beeline script: BeelineThreesetscript.sql
Three set mapper: Threesetmapper.py
Three set reducer: Threesetreducer.py
Local system folder where the Python mapper-reducer set is saved: <SPARK_HOME>
In the <SPARK HOME> folder one can run the following commands (to start the Thrift server and submit the script to Beeline)



This will yield the following output:





























































































The next step is to stop the Thrift Server.



The output data from the Hive query and the Spark SQL Thrift Server query through the Beeline interface yield independent results that can be used to check the analysis dataset.
























































5. Brief analysis





The cosine-based similarity provides a measure that gives an indication of the angle between the user profile vector um= {um,1,....., um,L} and the item profile vector in= {in,1,....., in,L}, thus providing a measure of the similarity. In the context of the modelling framework, items that have a large value for this measure between user and item are potential candidates to be included in the set of items that maximize the utility function g() for the user. These items can be recommended to the user.

In the output above, the costf(um, in) for userid 1007 and itemid 913 is 0,887638 which yields an angle of 0,478606 radians (27.42214 degrees). This process can be used to find a bundle of items (whose measures are in the output dataset of the MapReduce) that would be best to recommend to user with id 1007 in order to maximize the utility function g() for the available items in the system.


Conclusions


Essentially, as a recapitulation, for a totally ordered set R, and utility function g, g:U×I →R, which measures the gain of usefulness of an item ito user um. The aim of the analysis was, for each user u ∈U, to find items i max,u ∈I, unknown to the user, that maximize the utility function g():

∀u∈U, i max,u = arg⁡ maxi∈I g(u,i).

The identified items can be recommended to the user.

The TF Cosine-based Similarity is easy to interpret and very useful for identifying items to recommend to users in a folksonomy like Last.fm. The similarity measures and elements used in the proposed profile and recommendation models in Cantador, Bellogin and Vallet (2010) provide a way to satisfy the aim of the analysis.





Interested in more Big data materials from the Stats Cosmos blog?


Check out my previous Big data posts



































Or check out our statistics and e-learning services




Or check out our blog resources page



Sources

http://bit.ly/2G6CXNP
http://bit.ly/2G8gqjP
https://oreil.ly/2G4SuOg
http://bit.ly/2INOtzw
http://bit.ly/2DQjvD1
http://bit.ly/2FX4THU
http://bit.ly/2DQxJ6N
http://bit.ly/2pGKITQ
http://bit.ly/2ujdcbA
http://bit.ly/1SN27EA
http://bit.ly/2GtWmM9
http://bit.ly/2GcEXbv
http://bit.ly/1SN27EA


Apache®, Apache Hadoop, Apache Hive, Apache Spark and the logos are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. No endorsement by The Apache Software Foundation is implied by the use of these marks.

Monday, October 17, 2016

How to summarize the GroupLens MovieLens 10M dataset using Flink, Go, Hadoop, MongoDB, Perl, Pig, Python, Ruby and Spark






This post is designed for a joint installation of Apache Flink 1.1.2, Golang 1.6, Apache Hadoop 2.6.0, MongoDB 2.4.9, MRJob 0.5.6, Perl Hadoop::Streaming 0.143060, Apache Pig 0.16.0, Apache Spark 1.6.2 (pre-built for Hadoop), Ubuntu Server 14.04.5 Long-Term-Support (LTS) and Wukong-Hadoop 0.2.0.

In this illustration we will consider the MovieLens population from the GroupLens MovieLens 10M dataset (Harper and Konstan, 2005). The specific 10M MovieLens datasets (files) considered are the ratings (ratings.dat file) and the movies (movies.dat file).  The aim of this post is to illustrate how to generate quick summaries of the MovieLens population from the datasets.

The measures that can be generated are as follows.

  • Number of ratings made by each UserID
  • Number of ratings for each MovieID
  • Number of ratings in each Rating score category
  • Average rating for each MovieID
  • Number of ratings for each MovieID genre
  • Average rating for each MovieID genre

The computed measures can then be incorporated into more detailed statistical analyses of the population. 

The measures can be computed using the MapReduce programming model. The MapReduce model can be implemented on files created from appropriate columns of the ratings and movies files. The first, second, third and fifth MapReduces can be implemented using an adjusted version of the word count configuration of the MapReduce model. 

The first key adjustment to the word count programming involves making an allowance for the structure of the data (i.e. the data is in column form rather than free flowing text). The second key adjustment involves treating blocks of numbers or special characters that designate entity ids as single words. 

In the resulting adjusted word count configuration, the MapReduce model will essentially involve mapping each UserID, MovieID, Rating score category or Genre category (i.e. keys) to 1 and then reducing the mapped key-value pairs to obtain the overall key counts.

The advantage of this interpretation of the word count MapReduce is that one can treat JavaScript Object Notation (JSON)/Binary JavaScript Object Notation (BSON) based MongoDB MapReduces as adjusted word counts (i.e. while actually being BSON-based counts of key-value pairs).

In this illustration we will also consider how to implement the adjusted word count MapReduce model to generate the fourth and sixth measures. In Flink, the batch word count MapReduce is applied to the data using the modified word count configuration interpretation.

The fourth MapReduce (for the fourth summary measure) will involve creating MovieID-Rating key-value mappings from two column dataset tuples and then reducing the key-value pairs to calculate the average rating for each key (MovieID). In this illustration this is referred to as implementing the average configuration of the MapReduce Model.

The sixth MapReduce similarly involves creating a mapping of the Genre-Rating key-value pairs and then reducing the key-value pairs to calculate the average rating for each key (Genre). 


In this illustration the word count MapReduce is implemented using twenty-five different methods. The methods result from blending four Hadoop Steaming interfaces/libraries (DMRGo, MRJob, Perl Hadoop::Streaming, Wukong-Hadoop) and five Big Data frameworks/platforms (Flink, Hadoop, MongoDB, Pig and Spark). 

In Hadoop, the word count is implemented using the Hadoop Streaming interface. In Flink, the batch word count is implemented in Java, Python and Scala. The Flink implementations will be illustrated in local mode. In Spark, the word count is implemented using a Scala Spark Pipe, PySpark Pipe, Java Spark Pipe and SparkR Pipe. The Spark implementations will also be illustrated in local mode. In Pig, the word count is implemented using Pig Streaming. In MongoDB, the (adjusted) word count is implemented using the JavaScript mapReduce() function in the MongoDB Shell and PyMongo (i.e. Code from bson.code in a PySpark application).  

The average MapReduce is implemented using a Python MRJob library program. The illustration will also consider how to construct query based summaries.


1. Prepare the data



The data is available in zip file format. The data from the ratings.dat file arranged into the following four columns.
















The Timestamps variable is not used in this illustration. The data can be arranged into five files for the job, namely, a UserID file, MovieID file, MovieID-Rating file, Rating score file and Fused MovieID-Rating file.

The UserID, MovieID, MovieID-Rating, Rating score and Fused MovieID-Rating files can be generated from the original ratings.dat file as follows.













The data from the movies.dat file can be arranged into the following three columns.













The files can then be used to generate three new files from the files generated from the ratings file. The first file that can be generated is the genres file or the genres counts source file. This can be generated by substituting each MovieID with its genre.

The second file that can be generated is the Genre-Rating file. This can be generated, analogously to the genres counts source file, by substituting the MovieID in the MovieID-Rating file with its genre.

The third file that can be generated is the Fused Genre-Rating file. This can be generated by fusing the two columns (Genre and Rating) in the Genre-Rating file.   

In summary, the files are generated as follows.









Once the files have been created the next step is to prepare the mappers and reducers.


2. Prepare the mapper and reducer sets



Go programming language mapper and reducer in DMRGo (Word count MapReduce configuration)



The DMRGo (Go) mapper and reducer can be prepared using a Go word count application developed using the DMRGo library available from this gist
The approach taken in this illustration is to create mapper and reducer files that are library based references to the Go programs housed in the application. The approach/method to perform this can be obtained from the tutorials in this manual, this post and this post.The first step involves creating this Mwordcount.go file (Mwordcount - Modified word count).



The Mwordcount.go file can be saved in local system folder called: <Local system Go word count application folder>. The program can be compiled into a Go word count application.

The following mapper and reducer Bash files can then be prepared.




The resulting files are the word count configuration MapReduce. The MapReduce can then be run in a choice of facilities using appropriate tools (i.e Bash line command-line interface submits in Ubuntu, shell programs or applications). The facilities that will considered for this purpose in this illustration are essentially the Hadoop Streaming interface, Scala Spark Pipe, PySpark Pipe, SparkR Pipe, Java Spark Pipe and Pig Stream operator. 

MRJob mapper and reducer (Word count MapReduce configuration)



The approach for constructing the Python MRJob (MRJob) mapper and reducer files is similar to the approach followed for the Go mapper and reducer files.

Essentially, one can first construct a Python MRJob word count Mapreduce program. The next step is to create Bash files using MRJob mapper and reducer library based references to Python code housed in a Python MRJob word count program (mrjobwc_program.py). The approach/method to perform this can be obtained from the tutorials in this manual and this document.


The mrjobwc_program.py file can be saved in local system folder <Local System Python MRJob WC program Folder>. One can then prepare following mapper and reducer Bash files.






Perl Hadoop::Streaming mapper and reducer (Word count MapReduce configuration)



The Perl Hadoop::Streaming (Hadoop::Streaming) mapper and reducer can be created using the Perl Hadoop::Streaming library. The approach/method to perform this can be obtained from the tutorials in this document and this gistThe following mapper and reducer files can be prepared.




Wukong-Hadoop mapper and reducer (Word count MapReduce configuration)



The approach for creating the Ruby Wukong-Hadoop (Wukong-Hadoop) mapper and reducer files follows analogously the approach for Go and MRJob wordcount.

The first step is to create a Hadoop-Wukong library word count program. One can then use the library based references to the Ruby code housed in a Ruby Wukong-Hadoop library word count program (wuword_count.rb) to create Bash mapper and reducer files. The approach/method to perform this can be obtained from the tutorial in this gist.


The Ruby word count program can be saved in local system folder <Local System Wu WC program Folder>. One can then prepare the following mapper and reducer Bash files.




MRJob mapper and reducer (Average MapReduce configuration)



The MRJob mapper and reducer for the Average MapReduce configuration are, analogously to the MRJob word count MapReduce case, library based references to Python code housed in a Python MRJob library average MapReduce program (mrjobavg_program.py). The approach/method to perform this can be obtained from the tutorials in this manual, this document and this post.


The mrjobavg_program.py file can be saved in local system folder <Local System Python MRJobAvg program Folder>. One can then prepare the following mapper and reducer Bash files.




3. Process the data in Flink, Hadoop, MongoDB, Pig and Spark



The UserID MapReduce calculation is illustrated in Hadoop (Go word count), Pig (Go word count), MongoDB (JavaScript and PyMongo Code) and Java Flink (Batch word count).

The MovieID MapReduce calculation is illustrated in Scala Spark (Go word count), Java Spark (Hadoop::Streaming word count) and MongoDB (JavaScript).

The Rating category counts MapReduce calculation is illustrated in PySpark (MRJob word count), Hadoop (Wukong-Hadoop word count) and Java Flink (Batch word count).

The average MapReduce calculation is illustrated in seven versions for the MovieID-Rating key-value map using two dataset structures. The first structure is a tab separated key-value two column dataset (MovieID-Rating file). The second structure is a single column key dataset (Fused MovieID-Rating file). The resulting average MapReduces are illustrated in the following configurations.

  • MRJob average configuration
  • Go word count configuration
  • MRJob word count configuration
  • Hadoop::Streaming word count configuration
  • Wukong-Hadoop word count configuration
  • Python Flink batch word count configuration
  • Scala Flink batch word count configuration

The specific MovieID averages calculation illustrations are as follows.


  • Java Spark Pipe (MRJob average configuration)
  • Java Spark Pipe (Go word count configuration)
  • Scala Spark Pipe (Hadoop::Streaming word count configuration)
  • SparkR Pipe (MRJob word count configuration)
  • Pig Streaming (Wukong-Hadoop word count configuration)
  • Python Flink (Batch word count configuration)
  • Scala Flink (Batch word count configuration)

The Genre counts MapReduce calculation is illustrated in SparkR Pipe (Go word count), Scala Spark Pipe (Wukong-Hadoop word count), Pig Streaming (Hadoop::Streaming word count) and Java Flink (Batch word count).

The fundamental structure of the Genre averages calculation is the same as (or similar to) that of the MovieID-Rating averages. The specific Genre averages calculation illustrations are as follows.


  • PySpark Pipe (MRJob average configuration)
  • Java Spark Pipe (Wukong-Hadoop word count configuration)
  • Scala Spark (Hadoop::Streaming word count configuration)
  • PySpark (Go word count configuration)
  • Hadoop (MRJob word count configuration)
  • Python Flink (Batch word count configuration)
  • Scala Flink (Batch word count configuration)



UserID counts




UserID counts (Go Hadoop Streaming)




In order to implement the UserID MapReduce using the Hadoop Streaming facility and a Go word count application, the following arrangements may be made.

Input data: InputData.txt
Hadoop Distributed File System (HDFS) input data folder: <HDFS Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Go mapper file: gomapper.sh
Local system Go reducer file: goreducer.sh
Local system Hadoop Streaming jar file folder: <Local System Hadoop Streaming jar Folder>
Hadoop Streaming jar file:  hadoop-streaming-2.6.0.jar
HDFS output data folder: <HDFS Output Data Folder>

The next step is to create this Bash file using the tutorial in this post and to save the file in local system folder: <Local System Bash  Hadoop Streaming Submit File Folder>.


The next step is to run the following command on Ubuntu Server 14.04.5 LTS (as outlined in this tutorial).



This will generate an output file in HDFS with the following contents excerpt.



















The next step is to generate counts of the UserIDs using a Go word count MapReduce in the Pig Streaming facility.



UserID counts (Pig Go Streaming Script)




In order to implement of the UserID counts MapReduce using the Pig Streaming facility with a Go word count application the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Go mapper file: gomapper.sh
Local system Go reducer file: goreducer.sh
Local system Pig Steam script file folder: <Local System Pig Script File Folder>
Pig Stream script file: PigGoStream.pig
HDFS output data folder: <HDFS Output Data Folder>

The next step is to create the following Pig script file using methods obtained from this book,  this wiki and this manual.


The next step is to save the Pig script file in local system folder: <Local System Pig Script File Folder>. The Pig script can be run in Pig from the command line (using mapreduce mode).



This will generate an output file in HDFS with the following contents excerpt.

















UserID counts (Java Flink Batch word count Application)



In order to implement the UserID counts MapReduce using the Java Flink Batch word count examples jar file one can follow the guidelines in this post. The following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Java Flink Batch word count examples jar file: WordCount.jar
Local system Java Flink Batch word count examples jar file folder:
<Local System Java Flink Batch word count examples jar File Folder>
Local system output data file: OutputData.txt
Local system output data folder: <Local System Output Data Folder>

The next step is to save the WordCount.jar file in local system folder:
<Local System Java Flink Batch word count examples jar File Folder>. 
One can then run the application using the Flink command-line interface.



This will generate a local system output file with the following contents excerpt.


















MovieID counts



MovieID counts (Scala Spark Go Pipe Application)


In order to implement the MovieID counts MapReduce using the Scala Spark Pipe facility with a Go word count application, the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Go mapper file: gomapper.sh
Local system Go reducer file: goreducer.sh
Local system jar file folder: <Local System jar File Folder>
Scala Spark Pipe App jar: ScalaGoPipeApp.jar
Local system output data folder: <Local System Output Data Folder>
Scala package: scalapackage
Scala object: scalaobject
Local system query scripts folder: <Local System Query Scripts Folder>
Local system rmongodb query script file: rmongodbqueryscript.R

Local system PyMongo query script file: pymongoqueryscript.py
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration

The next step is to create the following Scala Spark Go Pipe application file using the guidelines in this post, this post, this post, the Spark 1.6.2 Quick Start, the Spark 1.6.2 SQL Programming Guide, the aggregation section of the MongoDB manual, this gist and the PyMongo guide.





The next step is to export the file into a jar file and to save the jar file in local system folder: <Local System Scala jar File Folder>.  One can then run the Scala Spark Go Pipe application using the Spark-submit facility.





This will generate the following Spark SQL system output, MongoDB based NoSQL system output and a local system output file with the following contents excerpt.





























































MovieID counts (Java Spark Perl Pipe Application)



In order to implement the MovieID counts MapReduce using a Java Spark Pipe application and the Perl Hadoop::Streaming library (Word count configuration), the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Perl mapper file: map.pl
Local system Perl reducer file: reduce.pl
Local system Java Spark Perl Pipe application folder: <Local System jar File Folder>
Java Spark Perl Pipe application java file: JavaSparkPerlPipeApp.java
Java Spark Perl Pipe class: JavaSparkPerlPipe
Java Spark Perl Pipe application jar file: JavaSparkPerlPipeApp.jar
Local system output data folder: <Local System Output Data Folder>
Local system query scripts folder: <Local System Query Scripts Folder>
Local system query scripts submits Bash file: JavaScriptsSubmits.sh
Local system rmongodb query script file: rmongodbqueryscript.R

Local system PyMongo query script file: pymongoqueryscript.py
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration


The next step is to create the following Java Spark Hadoop::Streaming pipe application file (JavaSparkPerlPipeApp.java) using methods outlined in this book, this book, the Spark 1.6.2 Quick Start, the Spark 1.6.2 SQL Programming Guide, this post, this manual and the aggregation section of the MongoDB manual.





The next step is to save the Java Spark Perl Pipe application file in local system folder: <Local system Java Spark Perl Pipe Application Folder>

The next step is to run the application using the Spark-submit facility.



This will generate a local system output file with the following content excerpt, Spark SQL system output and (MongoDB-based) NoSQL system output.





























MovieID counts (Java Flink batch word count application)



In order to implement the MovieID counts MapReduce using the Java Flink word count batch examples application jar, the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Java Flink Batch wordcount examples jar file: WordCount.jar
Local system Java Flink Batch wordcount examples jar file folder:
<Local System Java Flink Batch wordcount examples jar File Folder>
Local system output data file: OutputData.txt
Local system output data folder: <Local System Output Data Folder>

The next step is to save the WordCount.jar file in the local system folder:
<Local System Java Flink Batch wordcount examples jar File Folder>. 

One can then run the application using the Flink run facility.



This will generate a local system output file with the following contents excerpt.


















UserID counts and MovieID counts




UserID counts and MovieID counts (MongoDB Shell)




The UserID and MovieID counts can also be generated using the mapReduce() function in the mongo Shell. In order to implement the MapReduces in the mongo Shell the following arrangements/selections may be made.

Database: MLens
UserID Collection: UserID
MovieID Collection: MovieID
Output collection name for UserID MapReduce: UserID_Counts
Output collection name for MovieID MapReduce: MovieID_Counts

The next step is to start the mongo Shell, switch to the MLens database (with the use MLens command) and to view the MLens collections with the show collections command.









The next step is to run the following shell program, which will run the MapReduce to calculate the UserID counts and generate a specific BSON query for ID 2 using the db.UserID_Counts.find({“_id”:2}).pretty() command.



The program commands will generate the following output.













The UserID_Counts can also be viewed with the general version db.UserID_Counts.find().pretty() command. The command will generate the following output.
















The next step is to run the following program to run the MapReduce to calculate the MovieID counts.



This will yield the following output for the MovieID counts MapReduce.





The individual JSON queries for MovieID 20 using the db.MovieID_Counts.find({“_id”:20}).pretty() will generate the following output:





The next step is to generate the counts for the Rating score categories in the Spark Pipe facility using a PySpark Pipe application. The PySpark Pipe application can also be used to implement the UserID counts MapReduce in MongoDB using PyMongo Code. 


Rating score counts and UserID counts (NoSQL query)



Rating score counts and UserID counts (PySpark MRJob Pipe Application)



In order to run the Rating score counts MapReduce with the UserID counts MapReduce using the PySpark Pipe facility, MRJob library (Word count configuration) and PyMongo, the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system MRJob mapper file: mrjobwcmapper.sh
Local system MRJob reducer file: mrjobwcreducer.sh
Local system PySpark MRJob Pipe application folder: <Local System PySpark MRJob Pipe Application Folder>
PySpark MRJob Pipe application file: PySparkMRJobPipeApp.py
Local system output data folder: <Local System Output Data Folder>
Local system query scripts folder: <Local System Query Scripts Folder>
Local system rmongodb query script file: rmongodbqueryscript.R
MongoDB: Have an instance of MongoDB running with the arrangements as outlined in the MongoDB part of the illustration

The next step is to create the following PySpark MRJob application file (PySparkMRJobPipeApp.py) using methods outlined in this post, this post, this post, the Spark 1.6.2 Quick Start, the Spark 1.6.2 SQL Programming Guide, this post, this manual and the aggregation section of the MongoDB manual.






One can then save the PySpark MRJob application file in local system folder: <Local System PySpark MRJob Pipe Application Folder> and run the application using the Spark-submit facility.



This will generate the following Spark SQL system output, MongoDB-based NoSQL system output and a local system file with the following contents excerpt.



































Rating score counts (Wukong-Hadoop Hadoop Streaming)


In order to implement the Rating counts MapReduce using Hadoop Streaming and the Wukong-Hadoop library (Word count configuration), the following arrangements may be made.

Input data: InputData.txt
Hadoop Distributed File System (HDFS) input data folder: <HDFS Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Wukong-Hadoop mapper file: wumapper.sh
Local system Wukong-Hadoop reducer file: wureducer.sh
Local system Hadoop Streaming jar file folder: <Local System Hadoop Streaming jar File Folder>
Hadoop Streaming jar file:  hadoop-streaming-2.6.0.jar
HDFS output data folder: <HDFS Output Data Folder>

The next step is to save the following Bash file in local system folder:
<Bash Hadoop Streaming Submit File Folder>.



The next step is to run the following command in Ubuntu Server 14.04.5 LTS.



This will generate an output file in HDFS with the following contents excerpt.









MovieID ratings average



MovieID ratings average (Java Spark MRJob Pipe Application)



In order to implement the MovieID ratings average MapReduce using the Java Spark Pipe facility and the MRJob library (Average configuration), the following arrangements may be made

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local system mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system MRJob mapper file: mrjobavgmapper.sh
Local system MRJob reducer file: mrjobavgreducer.sh
Local system jar file folder: <Local System jar File Folder>
Java Spark Pipe MRJob App jar: JavaSparkMRJobAvgPipeApp.jar
Local system output data foler: <Local System Output Data Folder>
Java class: JavaSparkMRJobAvgPipe
Local system query scripts folder: <Local System Query Scripts Folder>
Local system query scripts submits Bash file: JavaScriptsSubmits.sh
Local system rmongodb query script file: rmongodbqueryscript.R

Local system PyMongo query script file: pymongoqueryscript.py
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration

The next step is to create the following java file (JavaSparkMRJobAvgPipeApp.java).









The next step is to export the java file into a jar file and save the jar in local system folder: <Local System jar File Folder>.

The next step is to run the application using the Spark-submit facility.



This will generate a local system output file with the following contents excerpt, Spark SQL system output and NoSQL system output.
























MovieID ratings average (Java Spark Go Pipe Application)



In order to implement the MovieID ratings average MapReduce using the Java Spark Pipe facility and the DMRGo library (Word count configuration), the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Go mapper file: gomapper.sh
Local system Go reducer file: goreducer.sh
Local system jar file folder: <Local System jar File Folder>
Java Spark Pipe Go App jar: JavaSparkGoPipeApp.jar
Local system output data folder: <Local System Output Data Folder>
Java class: JavaSparkGoPipe
Local system query scripts folder: <Local System Query Scripts Folder>
Local system query scripts submits Bash file: JavaScriptsSubmits.sh
Local system rmongodb query script file: rmongodbqueryscript.R

Local system PyMongo query script file: pymongoqueryscript.py
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration
One can then create the following Java Spark Pipe application file.





The next step is to export the java file into a jar and to save the jar file in local system folder: <Local System Java jar File Folder>.

The next step is to run the application using the Spark-submit facility.



This will generate the following Spark SQL system output, MongoDB-based NoSQL output and a local system file with following file contents excerpt.
























MovieID ratings average (Scala Perl Pipe Application)



In order to implement the MovieID ratings average MapReduce using the Scala Spark Pipe facility and the Hadoop::Streaming library (Word count configuration), the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Perl mapper file: map.pl
Local system Perl reducer file: reduce.pl
Local system jar file folder: <Local System jar File Folder>
Scala Spark Pipe App jar: ScalaPerlPipeApp.jar
Local system output data folder: <Local System Output Data Folder>
Scala package: scalapackage
Scala object: scalaobject
Local system query scripts folder: <Local System Query Scripts Folder>
Local system rmongodb query script file: rmongodbqueryscript.R

Local system PyMongo query script file: pymongoqueryscript.py
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration

The next step is to create the following Scala Spark Pipe application file.





The next step is to export the scala file into a jar and save the jar file in local system folder: <Local System jar File Folder>.

The next step is to run the application using the Spark-submit facility.



This will generate a local system output file with the following contents excerpt, Spark SQL system output and MongoDB-based NoSQL system output.







































MovieID ratings average (SparkR MRJob Application)



In order to implement the MapReduce using the SparkR Pipe facility and a MRJob word count application, the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system MRJob mapper file: mrjobwcmapper.sh
Local system MRJob reducer file: mrjobwcreducer.sh
Local system SparkR MRJob Pipe application folder: <Local System SparkR Application Folder>
SparkR MRJobPipe R application file: SparkRMRJobPipeApp.R
Local system output data folder: <Local System Output Data Folder>
Local system query scripts folder: <Local System Query Scripts Folder>

Local system PyMongo query script file: pymongoqueryscript.py
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration
The next step is to create the following SparkR Pipe application file (SparkRMRJobPipeApp.R) using methods outlined in this guide, this guide, this package reference, this package reference, this post, this post, the Spark 1.6.2 Quick Start, the SQL 1.6.2 Programming Guide, this post and the aggregation section of the MongoDB manual.




The next step is to save the R file in local system folder: <Local system SparkR application folder>.

The next step is to run the application using the Spark-submit facility



This will generate the following Spark SQL system output, MongoDB-based NoSQL system output and a local system output file with the following contents excerpt.

















































MovieID ratings average (Pig Hadoop-Wukong Streaming Script)



In order to implement the MovieID ratings MapReduce using the Pig Streaming facility and the Wukong-Hadoop library (Word count configuration), the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Wukong-Hadoop mapper file: wumapper.sh
Local system Wukong-Hadoop reducer file: wureducer.sh
Local system Pig Steam script file folder: <Local System Pig Script File Folder>
Pig Stream script file: PigWuStream.pig
HDFS output data folder: <HDFS Output Data Folder>

The next step is to create the following Pig script file.  


The Pig script file can be saved in local system folder: <Local System Pig Script File Folder>. One can then run the Pig script in mapreduce mode from the command line using.



This will generate an output file in HDFS with the following contents excerpt.

















MovieID ratings average (Python Flink Application)



In order to implement the MapReduce using a Python Flink application the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Python Flink Batch wordcount application file: WordCount.py
Local system Python Flink application folder: <Local System Python Flink Application Folder>
Local system output data file: OutputData.txt
Local system output data folder: <Local System Output Data Folder>

The next step is to create the following Python word count application (WordCount.py) using methods outlined in this guide, this post and this post.



The Python Flink wordcount application can be saved in local system folder: Local system Python Flink application folder: <Local System Python Flink Application Folder>.

One can then run the Python Flink program with the following command.



This will generate a local system output file with the following contents excerpt.
















MovieID ratings average (Scala Flink Application)



In order to implement the MovieID ratings average MapReduce using a Scala Flink application, the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Scala Flink Batch wordcount application file: FlinkScalaApp.scala
Local system Scala Flink Batch application folder: <Local System Scala Flink Wordcount Application jar File Folder>
Local system output data file: OutputData.txt
Local system output data folder: <Local System Output Data Folder>
Scala package: scalapackage
Scala object: Wordcount
Scala Flink Batch wordcount application jar file: ScalaFlinkWordcountApplication.jar

The next step is to create the following Scala application file (FlinkScalaApp.scala) using methods outlined in the Flink DataSet API proramming guide and this post.




Th e FlinkScalaApp.scala file can be exported into a jar file which can be saved in local system folder: Local system Scala Flink application folder: <Local System Scala  Flink Application jar File Folder>.

One can then run the Scala application with the following command.



This will generate a local system output file with the following contents excerpt.


















Genre counts



Genre counts (SparkR Go Pipe Application)



In order to implement the Genre counts MapReduce using the SparkR application and a Go word count application, the following arrangements may be made.


Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Go mapper file: gomapper.sh
Local system Go reducer file: goreducer.sh
Local system SparkR Go Pipe application folder: <Local System SparkR Go Pipe Application Folder>
SparkR Go Pipe application file: SparkRGoPipeApp.R
Local system output data folder: <Local System Output Data Folder>
Local system query scripts folder: <Local System Query Scripts Folder>

Local system PyMongo query script file: pymongoqueryscript.py
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration
The next step is to create the following SparkR application file  (SparkRGoPipeApp.R).



The R file can be saved in local system folder: Local system SparkR Application folder: <Local System R file Folder>

One can then run the application using the Spark-submit facility.



This will generate the following Spark SQL system output, MongoDB-based NoSQL system output, Relative Frequency Histogram Plot and a local system output file with file contents excerpt.



















Genre counts (Scala Spark Wukong-Hadoop Pipe Application)



In order to implement the Genre counts MapReduce using the Scala Spark Pipe facility with the Wukong-Hadoop library, the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Wukong-Hadoop mapper file: wumapper.sh
Local system Wukong-Hadoop reducer file: wureducer.sh
Local system Scala Spark Wukong-HadoopPipe application folder: <Local System jar File Folder>
Scala Spark Wukong-Hadoop Pipe application jar file: ScalaSparkWukongHadoopPipeApp.jar
Scala package: scalapackage
Scala object: scalaobject
Local system output data folder: <Local System Output Data Folder>
Local system query scripts folder: <Local System Query Scripts Folder>
Local system rmongodb query script file: rmongodbqueryscript.R

Local system PyMongo query script file: pymongoqueryscript.py
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration

The next step is to create the following Scala Spark Wukong-Hadoop Pipe application scala file (ScalaSparkWuPipeApp.scala).



The next step is to export the scala file into a jar file (ScalaSparWuPipeApp.jar) and to save the Scala Spark Wu application jar file in local system folder: <Local System jar File Folder>.

One can then run the application using the Spark-submit facility.



This will generate the following Spark SQL system output, NoSQL system output and local system output file with file content excerpt.





































Genre counts (Pig Perl Hadoop::Streaming Streaming Script)




In order to implement the Genre counts MapReduce using the Pig Stream facility and the Perl Hadoop::Streaming library (Word count configuration), the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Perl Hadoop::Streaming mapper file: map.pl
Local system Perl Hadoop::Streaming reducer file: reduce.pl
Local system Pig Steam script file folder: <Local System Pig Script File Folder>
Pig Stream script file: PigPerlStream.pig
HDFS output data folder: <HDFS Output Data Folder>

The next step is to create the following Pig script file. 


The next step is to save the Pig script file in local system folder: <Local System Pig Script File Folder> and run the Pig script in mapreduce mode from the command line using.



This will generate a local system output file with the following file content excerpt.








Genre ratings average




Genre ratings average (PySpark MRJob Pipe Application)




The Genre ratings average MapReduce can be implemented using a PySpark application and the MRJob library (Average configuration). In order to do this, the following arrangements can be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system MRJob mapper file: mrjobavgmapper.sh
Local system MRJob reducer file: mrjobavgreducer.sh
Local system PySpark MRJob Avg Pipe application file folder: <Local System PySpark MRJob Avg Pipe Application Folder>
PySpark MRJob Avg Pipe App file: PySparkMRJobAvgPipeApp.py
Local system output data folder: <Local System Output Data Folder>
Local system query scripts folder: <Local System Query Scripts Folder>
Local system rmongodb query script file: rmongodbqueryscript.R
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration
The next step is to create the following Python file (PySparkMRJobAvgPipeApp.py).




The PySpark MRJob Avg Pipe application file can be saved in local system folder: <Local System PySpark MRJob Avg Pipe Application Folder>.

One can then run the application using the Spark-submit facility.



This will generate the following Spark SQL system output, MongoDB-based NoSQL system Output and  a local system output file with file contents excerpt.















































Genre ratings average (Scala Spark Perl Pipe Application)



In order to implement the MovieID ratings average MapReduce using the Scala Spark Pipe facility and the Perl Hadoop::Streaming library (Word count configuration), the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Perl mapper file: map.pl
Local system Perl reducer file: reduce.pl
Local system jar file folder: <Local System jar file Folder>
Scala Spark Pipe App jar: ScalaPerlPipeApp2.jar
Local system output data folder: <Local System Output Data Folder>
Scala package: scalapackage
Scala object: scalaobject
Local system query scripts folder: <Local System Query Scripts Folder>
Local system rmongodb query script file: rmongodbqueryscript.R

Local system PyMongo query script file: pymongoqueryscript.py
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration
The next step is to create the following Spark Perl Pipe application Scala file (SparkScalaPerlPipeApp2.scala).




The next step is to export the scala file into a jar and to save the jar in local system folder: <Local System jar file Folder>.

One can then run the application using the Spark-submit facility.


This will generate the following Spark SQL system output, NoSQL system output and a local system output file with file contents excerpt.







































Genre ratings average (Java Spark Wukong-Hadoop Pipe Application)



In order to implement the Genre ratings average MapReduce using the Java Spark Pipe facility and the Wukong-Hadoop library, the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Wukong-Hadoop mapper file: wumapper.sh
Local system Wukong-Hadoop reducer file: wureducer.sh
Local system Java Spark Wukong-Hadoop Pipe application folder: <Local System jar File Folder>
Java Spark Wukong-Hadoop Pipe application java file: SparkJavaWuPipeApp.java
Java Spark Wukong-Hadoop Pipe class: SparkJavaWuPipeApp
Java Spark Wukong-Hadoop Pipe application jar file: JavaSparkWuPipeApp.jar
Local system output data folder: <Local System Output Data Folder>
Local system query scripts folder: <Local System Query Scripts Folder>
Local system query scripts submits Bash file: JavaScriptsSubmits.sh
Local system rmongodb query script file: rmongodbqueryscript.R
Local system PyMongo query script file: pymongoqueryscript.py
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration
The next step is to create the following Java Spark Wukong-Hadoop Pipe application file (JavaSparkWuPipeApp.java).






The next step is to export the file into a jar file and save the jar file in local system folder: <Local system Java Spark Wu Pipe Application Folder>

One can then run the application using the Spark-submit facility.



This will generate a local system file with the following file contents excerpt, Spark SQL system output and MongoDB-based NoSQL system output.



























Genre average (PySpark Go Pipe Application)




In order to implement the Genre average MapReduce using the PySpark Pipe facility, the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Go mapper file: gomapper.sh
Local system Go reducer file: goreducer.sh
Local system PySpark Go Pipe application folder: <Local System PySpark Go Pipe Application Folder>
PySpark Go Pipe application file: PySparkGoPipeApp.py
Local system output data folder: <Local System Output Data Folder>
Local system query scripts folder: <Local System Query Scripts Folder>
Local system rmongodb query script file: rmongodbqueryscript.R
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration

The next step is to create the following PySpark Go application file (PySparkMRJobPipeApp.py).




The next step is to save the PySpark Go application file in local system folder: <Local System PySpark Go Pipe Application Folder>

One can then run the application using the Spark-submit facility.


This will generate the following Spark SQL system output, MongoDB-based NoSQL system output and a local system output file with file contents excerpt.




















































Genre ratings average (MRJob Hadoop Streaming)



In order to implement the Genre rating average MapReduce using Hadoop Streaming and the MRJob library (word count configuration) the following arrangements may be made.

Input data: InputData.txt
Hadoop Distributed File System (HDFS) input data folder: < HDFS Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system MRJob mapper file: mrjobwcmapper.sh
Local system MRJob reducer folder: mrjobwcreducer.sh
Local system Hadoop Streaming jar file folder: <Local System Hadoop Streaming jar File Folder>
Hadoop Streaming jar file:  hadoop-streaming-2.6.0.jar
HDFS output data folder: <HDFS Output Data Folder>

The next step is to save the following Bash Hadoop Streaming submit file in local system folder: <Local System Bash Hadoop Streaming Submit File Folder>.



The next step is to run the following command on Ubuntu Server 14.04.5 LTS.



This will generate an output file in HDFS with the following contents excerpt.



















Genre ratings average (Python Flink Application)




In order to implement the MapReduce using a Python Flink application, the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Python Flink Batch wordcount application file:  WordCount.py
Local system python Flink application folder: <Local System Python Flink Application Folder>
Local system output data file: OutputData.txt
Local system output data folder: <Local System Output Data Folder>

The next step is to create the following Python Flink application file (WordCount.py).


The Python Flink wordcount application file can then be saved in local system folder: Local system Python Flink application folder: <Local System Python Flink Application Folder>.

One can then run the Python Flink application with the following command.



This will create a local system output file with the following file contents excerpt.

















Genre ratings average (Scala Flink Application)



In order to implement the Genre ratings average MapReduce using a Scala Flink application, the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Scala Flink Batch wordcount application file:  WordCount.py
Local system Scala Flink Batch application folder: <Local System Scala Flink Application Folder>
Local system output data file: OutputData.txt
Local system output data folder: <Local System Output Data Folder>
Scala Flink Batch wordcount application jar file: ScalaFlinkWordcountApplication.jar


The next step is to create the following Scala Flink application file (WordCount.scala).



The next step is to export the Wordcount.scala file into a jar file and to save the Scala Flink word count jar file in local system folder:  <Local System Scala  Flink Application jar File Folder>.

One can then run the Scala Flink application with the following command.



This will generate a local system output file with the following file contents excerpt.



















4. Data query and data interaction


The processed data can be queried interactively using ShinyMongo and Shiny. ShinyMongo can be used to create web applications that include an interface that can be used by the user to query the data in the MongoDB illustrations (generated in JavaScript and PyMongo) interactively using JSON/BSON syntax.


Shiny can be used to create Shiny Applications that generate web based interactive histograms for the UserID, MovieID, Rating Score  and Genrecounts.



ShinyMongo Application


The tutorial and the R scripts (server.R and ui.R) on how to generate interactive queries on data in MongoDB using ShinyMongo can be found in this gist. The method that can be used to create the applications from the ShinyMongo gist can be found in this Shiny tutorial

The application will generate the following web based interface, output and JSON/BSON queries for the MongoDB data.





















































































Shiny Applications 


In order to generate a Shiny application that generates an interactive web based histogram for the UserID_Counts variable the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Shiny library: Install Shiny package in R

The next step is to create the following two files and save them in a local system folder as outlined in this tutorial.




The next step is to run the application using the runApp() command and it will result in the following web based interface.


The output for 30 Histogram bins is as follows.















The output for 15 bins is as follows.













The applications for the MovieID, Rating score and Genre counts variables can be generated analogously to the case of the UserID counts variable.


The web-based output for the MovieID counts variable and 30 bins is as follows.















The web-based output for the MovieID counts variable and 15 bins is as follows.













The web-based output for the Rating score counts variable and 30 bins is as follows.














The web-based output for the Rating score counts variable and 10 bins is as follows.















The web-based output for the Genre counts variable and 30 bins is as follows.














The web-based output for the Genre counts variable and 15 bins is as follows.














5. Summarize the data



Summary statistics


The summary statistics can be generated in R using the stat.desc() function from the pastecs package.


UserID variable counts










MovieID variable counts










Rating score counts









Genre variable counts












Histograms




The histograms can be generated in Relative Frequency form using the PlotRelativeFrequency() function from the HistogramTools package.


UserID variable counts




In order to generate the Relative Frequency Histogram for the UserID_Counts variable one can use the following R command sequence.










This will generate the following histogram for the UserID counts.

























The other variables can be generated analogously.



MovieID variable counts



























Rating score variable counts




























Genre variable counts



























The next step is to generate summary measures of the metrics of the data generated during the data processing in section 3 using the SAS software Proc SGPlot statement.


6. Analyze the data


The summary measures of the metrics can be depicted graphically using the SAS software PROC SGPlot statement. The bar graph categories can also be further analysed using Bash script grep decompositions in Ubuntu Server 14.04.5 LTS and Hadoop.



UserID counts


In order to generate the bar graph for the UserID variable one can first create a Microsoft Excel file with a UserID column and a Counts column. The two columns can be named UserID for the user-ids and Counts for the user-id counts. 

The file can then be imported into a work.import dataset in the SAS software using the Proc Import statement. The following program can be prepared using the method outlined in this post and then be run in the SAS software.




This will generate the following plot.




















The plots for the other variables can be generated analogously to the case for the UserID variable counts.



Rating score counts




















The Rating score counts can be used to calculate the weighted mean and weighted variance of the ratings. 
In the SAS software this can be done with the PROC MEANS statement with the counts as the weights.

In the SAS software, one can run the following program with the values column named Var and the counts/weights column named Counts in a work.import dataset.




This will result in the following output.









In R, one can use the Weighted.Desc.Stat package and the following functions.

The application of these functions to the data will lead to the following output.


















The following illustrates how one can perform the calculation for the weighted (sample) mean and weighted (sample) variance using vector calculus in R.
















MovieID counts 





















MovieID Counts decomposition



Ubuntu 14.04.5 Server LTS grep


In order to generate a grep decomposition for the rating counts of MovieID 296, one can take the counts from one of the files (treating one output file partitions as one file) of the FusedMovieID-Rating MapReduce in section three (i.e. output files from one of Java Spark Pipe Go word count, Scala Spark Pipe Perl word count, SparkR Pipe MRJob word count, Pig Wu word count, Python Flink word count or Scala Flink word count) and treat it as an input file for the grep decomposition. If one selects, say, the Pig Wu Streaming output file as the input dataset then one may make the following arrangements/selections.

Input data: InputData.txt
Local system Bash script folder: <Bash Decomposition Folder>
Local system input data folder: <Local System Input Data Folder>
Local system MovieID counts decomposition Bash file: BashMovieIDCountsDecomp.sh
Local system Bash decomposition file folder: <Bash Decomposition Folder>

The next step is to create the following Bash file.




The next step is to save the file in local system folder: <Bash Decomposition Folder> and run the following command in Ubuntu Server 14.04.5 LTS.



This will generate the following Bash system output.










Hadoop grep

The equivalent output can be generated in Hadoop using the grep class from the hadoop-mapreduce-examples-2.6.0.jar and the FusedMovieID-Rating.txt file. 

In order to run the Hadoop grep decomposition of the rating counts for MovieID 296 in Hadoop the following arrangements/selections may be made.

Input data (i.e. Fused MovieID-Rating text file): InputData.txt
HDFS input data folder: <HDFS Input Data Folder>
HDFS output data folder: <HDFS output Folder>
Local system MovieID counts Hadoop decomposition Bash file:  HadoopMovieIDCountsDecomp.sh
Local system Hadoop Bash decomposition file folder: <Bash Hadoop Decomposition Folder>
Local system Hadoop MapReduce examples jar file Folder: <Local System Hadoop mapreduce examples jar file Folder>
Hadoop MapReduce examples jar file: hadoop-mapreduce-examples-2.6.0.jar

The next step is to create the following Bash file.


The next step is to save the file in local system folder: <Bash Hadoop Decomposition Folder>

One can then run the following command in Ubuntu Server 14.04.5 LTS.



This will generate an output file in HDFS with the following contents.










MovieID Rating averages






















MovieID Rating averages decomposition



In the case of the averages decomposition, it is important to note that the decomposition of the rating counts also allows one to calculate the average rating for a MovieID. This means that one can retain the structure of the Bash and Hadoop decomposition scripts for the averages decompositions.



Ubuntu Server 14.04.5 LTS grep


In order to generate a grep decomposition of the average for MovieID 33264, one can take the counts from one of the files  (treating one output file partitions as one file) from the Fused MovieID-Rating MapReduce in section three (i.e. output files from one of Java Spark Pipe Go word count, Scala Spark Pipe Perl word count, SparkR Pipe MRJob word count, Pig Wu word count, Python Flink word count or Scala Flink word count) and treat it as an input file for the grep decomposition. 

If one selects say the Pig Perl Streaming output as the input dataset then one may make the following arrangements/selections.

Input data: InputData.txt
Local system Bash script folder: <Bash Decomposition Folder>
Local system input data folder: <Local System Input Data Folder>
Local system MovieID counts decomposition Bash file: BashMovieIDAvgDecomp.sh
Local system Bash decomposition file folder: <Bash Decomposition Folder>

The next step is to create the following Bash file.






The next step is to save the file in local system folder: <Bash Decomposition Folder>
and run the following command in Ubuntu Server 14.04.5 LTS.



This will generate the following output.





Hadoop grep


In order to run the Hadoop grep decomposition for the average rating of MovieID 33264, the following arrangements may be made.


Input data (i.e. FusedMovieID-Rating text file): InputData.txt
HDFS input data folder: <HDFS Input Data Folder>

HDFS output data folder: <HDFS output Folder>
Local system MovieID average Hadoop decomposition Bash file:  BashMAvgHadoopDecomp.sh
Local system Hadoop Bash decomposition file folder: <Bash Hadoop Decomposition Folder>
Local system Hadoop MapReduce examples jar file Folder: <Local System Hadoop mapreduce examples jar file Folder>

Hadoop MapReduce examples jar file: hadoop-mapreduce-examples-2.6.0.jar

The next step is to create the following Bash file.


The next step is to save the file in local system folder: <Bash Hadoop Decomposition Folder> and run the following command in Ubuntu Server 14.04.5 LTS.



This will generate an output file in HDFS with the following contents.





MovieID Genre counts
























MovieID Genre counts decomposition




The Genre decompositions can be generated analogously to those of the MovieID ratings and MovieID ratings averages, using the output of the Fused Genre-Rating counts MapReduces for Ubuntu Server and the Fused Genre-Rating file for Hadoop.



Ubuntu Server 14.04.5 LTS grep


In order to generate a grep decomposition of Genre Drama, one can take the counts from one of the output files (treating one output file partitions as one file) in the Fused Genre-Rating MapReduces in section three (i.e. output files from one of Scala Spark Pipe Perl word count, Java Spark Pipe Wu word count, PySpark Pipe MRJob word count, Hadoop MRJob word count, Python Flink word count or Scala Flink word count) and treat it as an input file for the grep decomposition. 

If one selects say the Spark Java Wu output files and binds them into a single file (adding the rows of the second file below the rows of the first one). If one uses the resulting file as the input dataset then one can make the following arrangements/selections:

Input data: InputData.txt
Local system Bash script folder: <Bash Decomposition Folder>
Local system input data folder: <Local System Input Data Folder>
Local system Genre counts decomposition Bash file: BashCountsDramaDecomp.sh
Local system Bash decomposition file folder: <Bash Decomposition Folder

The next step is to create the following Bash file.



The next step is to save the file in local system folder: <Bash Decomposition Folder> and to run the following command in Ubuntu Server 14.04.5 LTS.



This will generate the following output.


















Hadoop grep



In order to run the Hadoop grep decomposition of the Drama Genre ratings the following arrangements may be made.


Input data (i.e. Fused Genre-Rating text file): InputData.txt
HDFS input data folder: <HDFS Input Data Folder>

Local system Genre counts Hadoop decomposition Bash file:  BashGenreDramaHadoopDecomp.sh
Local system Hadoop Bash decomposition file folder: <Bash Hadoop Decomposition Folder>
Local system Hadoop MapReduce examples jar file Folder: <Local System Hadoop mapreduce examples jar file Folder>

Hadoop MapReduce examples jar file: hadoop-mapreduce-examples-2.6.0.jar

The next step is to create the following Bash file.



The next step is to save the file in local system folder: <Bash Hadoop Decomposition Folder and run the following command in Ubuntu Server 14.04.5 LTS.




This will generate an output file in HDFS with the following contents.














MovieID Genre counts























MovieID Genre averages decomposition



Ubuntu Server 14.04.5 LTS grep



In order to generate a grep decomposition of rating average for the Genre Animation|IMAX|Sci-Fi, one can take the counts from one of the output files (treating one output file partitions as one file) from the Fused Genre-Rating MapReduces in section three (i.e. output files from one of Scala Spark Pipe Perl word count, Java Spark Pipe Wu word count, PySpark Pipe MRJob word count, Hadoop MRJob word count, Python Flink word count or Scala Flink word count) and treat it as an input file for the grep decomposition.


 If one selects, say, the Spark Java Wu output files and binds them into a single file (i.e. adding the rows of the second file below the rows of the first file) and uses the resulting file as an input dataset for the MovieID Genre averages Ubuntu grep decomposition. 

One can may make the following arrangements/selections:

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system Bash script folder: <Bash Decomposition Folder>
Local system Genre average decomposition bash file: BashGenreAvgDecomp.sh
Local system Bash decomposition file folder: <Bash Decomposition Folder


The next step is to create the following Bash file.


The next step is to save the file in local system folder: <Bash Decomposition Folder> and run the following command in Ubuntu Server 14.04.5 LTS.



This will generate the following output.








Hadoop grep



In order to run the Hadoop grep decomposition of the average rating for the Animation|IMAX|Sci-Fi Genre, the following arrangements may be made.

Input data (i.e. FusedGenre text file): InputData.txt
HDFS input data folder: <HDFS Input Data Folder>

HDFS output data folder: <HDFS output Folder>
Local system Genre average Hadoop decomposition bash file:  BashGenreAvgHadoopDecomp.sh
Local system Hadoop Bash decomposition file folder: <Bash Hadoop Decomposition Folder>
Local system Hadoop MapReduce examples jar file Folder: <Local System Hadoop mapreduce examples jar file Folder>
Hadoop MapReduce examples jar file: hadoop-mapreduce-examples-2.6.0.jar

The next step is to create the following Bash file.


The next step is to save the file in local system folder: <Bash Hadoop Decomposition Folder> and run the following command in Ubuntu Server 14.04.5 LTS.



This will generate an output file in HDFS with the following contents.








7. Conclusions




In the illustration we considered how to generate summary measures for the GroupLens MovieLens 10M ratings.dat and movie.dat datasets using the MapReduce programming model. The specific MapReduce configurations considered were the word count configuration and the average configuration. 

The MapReduces were, in turn, constructed using four Hadoop Streaming libraries and two MongoDB interfaces. The MapReduce illustrations were implemented in ten (eleven) facilities, namely, Hadoop Streaming, Pig Streaming, Scala Spark Pipe, PySpark Pipe, Java Spark Pipe, SparkR Pipe, MongoDB (JavaScript) and MongoDB (PyMongo), Java Flink, Python Flink and Scala Flink. The illustration list can be further generalized to more MapReduce configurations and facilities according to user preference.


The resulting output data sets were further summarized using Bash, Hadoop, R and the SAS software in order to illustrate the kind of information that can be mined from the data sets.



Interested in exploring more material on Big Data and Cloud Computing from the Stats Cosmos blog?

Check out my other posts
















































Subscribe via RSS to keep updated
















Check out my course at Udemy College















Check out our Big Data and statistical Services















Sources

http://bit.ly/2dmK2hv
http://bit.ly/2dyzOYI
http://bit.ly/2dM5xsG
http://bit.ly/2dWSWlN
http://bit.ly/2dzV9kx
http://bit.ly/2ebNLfh
http://bit.ly/2e5yaBn
http://bit.ly/29Hbbwn
http://bit.ly/2e27GzK
http://bit.ly/1Uo1MH8
http://bit.ly/2efZi2p
http://bit.ly/2dV7LF8