­
StatsCosmos

Wednesday, March 28, 2018

Textual description of firstImageUrl

How to analyze TF Cosine-based Similarity measures for the Last.fm Social Tagging System with Apache Hive and Apache Spark SQL






*Photo by chuttersnap on Unsplash


This post is one in a series of posts designed for a joint installation of Apache FlinkApache HadoopApache HiveMongoDB, Apache Pig, Apache Spark (pre-built for Hadoop) and Ubuntu. This post is designed for an installation of Apache Hive 2.1.1, Apache Hadoop 2.6.1, Apache Spark 2.2.0 (pre-built for Hadoop) and Ubuntu 16.04.2. The purpose of the illustrations in the posts is to show how one can construct content-based recommendation measures for the Last.fm social system using the GroupLens HetRec Last.fm dataset. The modelling framework for the similarity measure analysis is that outlined in Cantador, Bellogin and Vallet (2010). The post follows on from my previous post: How to summarize Last.fm Social Tagging System Profiles using Golang, Hadoop, MongoDB and Spark.

The calculation of the similarity measures for the analyses in the posts involves implementing seventeen MapReduces on the user_taggedartists.dat dataset. The first six MapReduces were implemented in the previous post.

The similarity measures considered (in the posts) are as follows:
  • TF-based Similarity
  • TF Cosine-based Similarity
  • TF-IDF Cosine-based Similarity
  • Okapi BM25-based Similarity
  • Okapi BM25 Cosine-based Similarity


In this post only the TF Cosine-based Similarity measure is considered. The methodology for the construction of the measures is considered in How to summarize Last.fm Social Tagging System Profiles using Golang, Hadoop, MongoDB and Spark.

The MapReduces in the post series are illustrated using piping and non-piping methods. The purpose of the approach is to provide a choice of methods according to each setting. The advantage of this approach is that it inherently highlights the features available from each application programming interface (API). This, in turn, provides a casual portfolio of platform specific methods to choose from to implement the MapReduces. For example, one can implement the MapReduce using a Java (Apache) Spark pipe application, Java Spark non-pipe application or a Java (Apache) Flink Dataset application. The three approaches inherently illustrate the kind of programming advantages that can be harnessed from the features of the Spark Java Resilient Distributed Dataset (RDD) and the Flink Java Dataset.


The Flink illustrations use non-piping methods. The Spark illustrations use a mixture of piping and non-piping methods.The (Apache) Hadoop, Hive and Pig parts of the illustrations use piping type methods. The Hadoop MapReduce illustration uses piping type methods through the Hadoop Streaming facility. The Hive illustration uses piping type methods through the Hive map and reduce commands. The Spark SQL part of the illustration uses piping type methods through the Hive2 transform. The Pig part of the illustration uses piping type methods through the Pig stream facility. 


In this post the piping type methods use mapper-reducer sets prepared in Java and Python. In the post series the MapReduces have two main orders of magnitude, namely, three set and one set. In the case of the three set MapReduces the three set mapper-reducer sets can be used with the:  Hadoop Streaming facility; Hive map and reduce commands; Pig stream facility;  Java Spark pipe facility; Scala Spark pipe facility; SparkR pipe facility; PySpark pipe facility and Spark SQL (using the Beeline interface to the Spark SQL Thrift Server) transform command. 

The three set MapReduce piping type scripts are used to calculate the Cosine-based similarity measures (TF, TF-IDF, and Okapi BM25).


1. Prepare the data



The implementation of similarity measure MapReduces one to four to obtain the elements used in the proposed profile and recommendation models (Table one) in Cantador, Bellogin and Vallet (2010) were illustrated in the first post in the series.  

In this post the calculation involves using the output datasets from the first two MapReduces. The output dataset for the users will have um,l as the index/key and tfum(tl) as the value. The output dataset for the items will have in,l as the index/key and tfin(tl) as the value.

From the {{user id, tag}, user tag frequency} key-value pair (um,l, tfum (tl)) and {{item id,tag}, item tag frequency} key-value pair (in.l, tfin(tl)) in the output files from the first two MapReduces create new combined key-value combinations ({um,in,}, tfum (tl), tfin(tl)) without the tag part of the uncombined key indices (i.e. keep the user um index and item in index, respectively) for the similarity measure MapReduce.

In the MapReduce mapping phase the numerator entry values can be the cross-products tfum(tl)*tfin(tl) (i.e. the tl entry must be the same in the product) and the denominator values can be squares of the individual values in the form of (tfum(tl))2 and (tfin) (tl))2 ). In the reduce phase the sums can be outputted by key for the numerator and the square roots of the sums can be outputted by key for the denominators.

The operational aspects of the calculation in this illustration are as follows:

The combined tuple will initially take the form, {(um,l,in,l), tfu_m(tl), tfin(tl )}. Then, one will change the tuple to, say, the following {(um;in;), tfum(tl), tfin (tl)} for the actual MapReduce. It is important to make sure that the {tfum(tl), tfin(tl)} part of the tuple always pertains to the same l during the data preparation. Hence, one simply has to make sure that all the l’s for each (um;in;) key are obtained because this information is lost during the mapping phase when the keys do not include the information about the tag l for each key-value pair for the actual MapReduce. The products for the numerator can be programmed into the mapper. The squares for the denominators can also be programmed into the mapper.

In the reduce phase, the values of the numerator products can be summed and the totals outputted by key-value combination. In the case of the denominator entries, the square roots of the sums can be outputted for each key, value combination.  This will generate the outputs required by the similarity measure formulae in the case of the three set MapReduces. This is how the three term MapReduces can be implemented.

The next step is to construct the mapper-reducer sets in order to implement the MapReduces for the similarity measure in Hive and Spark SQL.


2. Prepare the mapper-reduce sets



Java mapper-reducer set


The Java mapper-reducer set was prepared using the tutorial in this post. The mapper is as follows:


import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.Math;
public class Threesetmapper{
public static void main(String args[]){
try {
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
String input;
String index = null;
// While we have input on stdin
while((input = br.readLine()) !=null){
String[] fields = input.split("\t");
index = fields[0];
float count1= Float.parseFloat(fields[1]);
float count2= Float.parseFloat(fields[2]);
float valcp = count1*count2;
Double val1sq = Math.pow(count1,2);
Double val1sq = Math.pow(count2,2);
System.out.println(index + "\t" + valcp+ "\t" + val1sq+ "\t" +val2sq);}
} catch (IOException io) {
io.printStackTrace();
}
}
}

The reducer is as follows:


import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.Math;
public class Threesetreducer {
public static void main(String args[]) {
try {
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
String input;
String index = null;
String currentIndex = null;
double currentCount1 = 0;
double currentCount2 = 0;
double currentCount3 = 0;
while ((input = br.readLine()) != null) {
try {
String[] fields = input.split("\t");
key = fields[0];
double val1 = Double.parseDouble(fields[1]);
double val2 = Double.parseDouble(fields[2]);
double val3 = Double.parseDouble(fields[3]);
// We have sorted input, so check if we have the same index
if (currentIndex != null && currentIndex.equals(index)){
currentCount1 += val1;
currentCount2 += val2;
currentCount3 += val3;
} else { // the index has changed
if (currentIndex != null) {
' System.out.println(currentIndex + "\t" + currentCount1 + "\t" +Math.sqrt(currentCount2)+ "\t" + Math.sqrt(currentCount3));
}
currentIndex = index;
currentCount1 = val1;
currentCount2 = val2;
currentCount3 = val3;
}
} catch (NumberFormatException e) {
continue;
}
}
if (currentIndex != null && currentIndex.equals(index)) {
System.out.println(currentIndex + "\t" + currentCount1 + "\t" + Math.sqrt(currentCount2) + "\t" + Math.sqrt(currentCount3));
}
} catch(IOException io) {
io.printStackTrace();
}
}
}

The next step is to compile the two files into classes with the javac command:

JAVA_HOME/bin/javac -d <Local System Java class Folder> -classpath Threesetmapper.java
JAVA_HOME/bin/javac -d <Local System Java class Folder> -classpath Threesetreducer.java

 The java classes can be run using shell scripts. The shell script to run the mapper:

#!/bin/bash
JAVA_HOME/bin/java -cp <Local System class Folder>:. Threesetmapper

The shell script to run the reducer:


#!/bin/bash
JAVA_HOME/bin/java -cp <Local System class Folder>:. Threesetreducer

The chmod command can be used to give the files (Java, Java classes, and Bash) execution permission:


chmod +x <Local System MapReduce Folder>/hreesetmapper.java
chmod +x <Local System MapReduce Folder>/Threesetreducer.java
chmod +x <Local System MapReduce Folder>/Threesetmapper.sh
chmod +x <Local System MapReduce Folder>/Threesetreducer.sh
chmod +x <Local System Java class Folder>/Threesetmapper.class
chmod +x <Local System Java class Folder>/Threesetreducer.class

Python mapper-reducer set


The Python mapper-reducer set was prepared using a framework outlined in this book and this post. The mapper is as follows:

#!/usr/bin/env python
import sys
if __name__ == "__main__":
for line in sys.stdin:
index, val1,val2 = line.split("\t")
val1 = float(val1)
val2 = float(val2)
cp = val1*val2
val1sq = val1*val1
val2sq = val2*val2
if index is not None:
sys.stdout.write("{}\t{}\t{}\t{}\n".format(index,cp,val1sq,val2sq))


The reducer is as follows:


#!/usr/bin/env python
import sys
import math
if __name__ == '__main__':
curindex = None
total1 = 0
total2 = 0
total3 = 0
for line sys.stdin:
index, val1,val2,val3 = line.split("\t")
val1 = float(val1)
val2 = float(val2)
val3 = float(val3)
if index == curindex:
total1 += val1
total2 += val2
total3 += val3
else:
if curindex is not None:
sys.stdout.write("{}\t{}\t{}\t{}\t{}\n".format(curindex,total1,math.sqrt(total2),math.sqrt(total3)))
curindex = index
total1 = val1
total2 = val2
total3 = val3
if curindex == index:
sys.stdout.write("{}\t{}\t{}\t{}\t{}\n".format(curindex,total1,math.sqrt(total2),math.sqrt(total3)))



The chmod command can be used to give the files execution permission:


chmod +x <Local System MapReduce Folder>/Threesetmapper.py
chmod +x <Local System MapReduce Folder>/Threesetreducer.py

The mapper and reducer files can be copied from the <Local System MapReduce Folder> folder to the <SPARK_HOME> folder for the Beeline processing.


3. Process the data in Hive



The three set MapReduces in the post series aim to introduce the different methods for calculating the three set (Cosine-based) Similarity measures using the map and reduce functions. The three set MapReduces in piping type form are implemented within the Hadoop MapReduce framework (using Hadoop Streaming, Pig stream command and Hive map/reduce commands) and the Spark in-memory framework (using the Spark pipe function and Hive2 transform command with SparkSQL).

The three set MapReduce can be implemented in Hive with the Bash based Java three set mapper-reducer set using the following script prepared according to the tutorial in this post and this post.


create external table table_1 (
`ind` string,
`val1` float,
`val2` float)
row format delimited fields terminated by '\t' stored as textfile
location '${hiveconf:input}';
add file ${hiveconf:mapper};
add file ${hiveconf:reducer};
create external table if not exists table_2(ind string, val1 float, val2 float, val3 float);
from (from table_1
map table_1.ind,table_1.val1,table_1.val2
using '${hiveconf:mapper}'
as ind,val1,val2,val3
cluster by `ind`) map_output
insert overwrite table table_2
reduce map_output.ind,map_output.val1,map_output.val2,map_output.val3
using '${hiveconf:reducer}'
as ind,val1,val2,val3;
INSERT OVERWRITE DIRECTORY '${hiveconf:outputdir}'
row format delimited
fields terminated by '\t'
SELECT * FROM table_2;

The three set MapReduce can be implemented after making the following arrangements:

Input data: InputData.txt
Hadoop Distributed File System (HDFS) Input data folder: <HDFS Input Data Folder>
Local system Hive script folder: <Local System Hive script Folder>
Hive script: HiveThreesetscript.sql
Three set mapper: Threesetmapper.sh
Three set reducer: Threesetreducer.sh
Local system MapReduce folder for the mapper-reducer set: <Local System MapReduce Folder>
The HDFS output data folder: <HDFS Output Data Folder>

The script can be submitted to Hive using the following command:


hive –f <Local Hive script Folder>/HiveThreesetscript.sql -hiveconf inputdir=<HDFS Input Data Folder> –hiveconf mapper=<Local system MapReduce Folder>/Threesetmapper.sh -hiveconf reducer=<Local System MapReduce Folder>/Threesetreducer.sh -hiveconf outputdir=<HDFS Output Data Folder>

This will yield the following output:



















4. Check the results in Spark SQL



The results of the three set MapReduce in section three can be replicated with the Python mapper-reducer set using a Hive2 script in the Spark SQL Thrift Server submitted to the Beeline interface. The three set MapReduce can be implemented using the following Hive2 script prepared using the tutorial in this post and this post.


CREATE TABLE table_1 (
ind STRING,
val1 FLOAT,
val2 FLOAT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;
LOAD DATA INPATH '<Local System Input Data Folder>/InputData.txt'
OVERWRITE INTO TABLE table_1;
SELECT COUNT(*)
FROM u0_kv3_data;
CREATE TABLE table_2 (
ind STRING,
val1 FLOAT,
val2 FLOAT,
val3 FLOAT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t';
add FILE Threesetmapper.py;
add FILE Threesetreducer.py;
INSERT OVERWRITE TABLE table_2
SELECT
TRANSFORM (ind,val1,val2)
USING 'python Threesetmapper.py'
AS (ind,val1,val2,val3)
FROM table_1
CLUSTER BY ind;
SELECT ind,val1,val2,val3
FROM table_2 limit 10;
CREATE TABLE table_3 (
ind STRING,
val1 FLOAT,
val2 FLOAT,
val3 FLOAT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t';
INSERT OVERWRITE TABLE table_3
SELECT
TRANSFORM (ind, val1, val2, val3)
USING 'python Threesetreducer.py'
AS (ind,val1,val2,val3)
FROM table_2;
SELECT ind,val1,val2,val3
FROM table_3 limit 10;

In order to implement the three set MapReduce in Spark SQL using Beeline the following arrangements can be made:

Input data: InputData.txt
Local system Input data folder: <Local System Input Data Folder>
Local system Beeline script folder: <Local System Beeline script Folder>
Beeline script: BeelineThreesetscript.sql
Three set mapper: Threesetmapper.py
Three set reducer: Threesetreducer.py
Local system folder where the Python mapper-reducer set is saved: <SPARK_HOME>
In the <SPARK HOME> folder one can run the following commands (to start the Thrift server and submit the script to Beeline)


sudo ./sbin/start-thriftserver.sh
sudo ./bin/beeline -u jdbc:hive2://localhost:10000 -f <Local System Beeline script Folder>/BeelineThreesetscript.sql

This will yield the following output:





























































































The next step is to stop the Thrift Server.


sudo ./sbin/stop-thriftserver.sh

The output data from the Hive query and the Spark SQL Thrift Server query through the Beeline interface yield independent results that can be used to check the analysis dataset.
























































5. Brief analysis





The cosine-based similarity provides a measure that gives an indication of the angle between the user profile vector um= {um,1,....., um,L} and the item profile vector in= {in,1,....., in,L}, thus providing a measure of the similarity. In the context of the modelling framework, items that have a large value for this measure between user and item are potential candidates to be included in the set of items that maximize the utility function g() for the user. These items can be recommended to the user.

In the output above, the costf(um, in) for userid 1007 and itemid 913 is 0,887638 which yields an angle of 0,478606 radians (27.42214 degrees). This process can be used to find a bundle of items (whose measures are in the output dataset of the MapReduce) that would be best to recommend to user with id 1007 in order to maximize the utility function g() for the available items in the system.


Conclusions


Essentially, as a recapitulation, for a totally ordered set R, and utility function g, g:U×I →R, which measures the gain of usefulness of an item ito user um. The aim of the analysis was, for each user u ∈U, to find items i max,u ∈I, unknown to the user, that maximize the utility function g():

∀u∈U, i max,u = arg⁡ maxi∈I g(u,i).

The identified items can be recommended to the user.

The TF Cosine-based Similarity is easy to interpret and very useful for identifying items to recommend to users in a folksonomy like Last.fm. The similarity measures and elements used in the proposed profile and recommendation models in Cantador, Bellogin and Vallet (2010) provide a way to satisfy the aim of the analysis.





Interested in more Big data materials from the Stats Cosmos blog?


Check out my previous Big data posts



































Or check out our statistics and e-learning services




Or check out our blog resources page



Sources

http://bit.ly/2G6CXNP
http://bit.ly/2G8gqjP
https://oreil.ly/2G4SuOg
http://bit.ly/2INOtzw
http://bit.ly/2DQjvD1
http://bit.ly/2FX4THU
http://bit.ly/2DQxJ6N
http://bit.ly/2pGKITQ
http://bit.ly/2ujdcbA
http://bit.ly/1SN27EA
http://bit.ly/2GtWmM9
http://bit.ly/2GcEXbv
http://bit.ly/1SN27EA


Apache®, Apache Hadoop, Apache Hive, Apache Spark and the logos are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. No endorsement by The Apache Software Foundation is implied by the use of these marks.

Monday, October 17, 2016

Textual description of firstImageUrl

How to summarize the GroupLens MovieLens 10M dataset using Flink, Go, Hadoop, MongoDB, Perl, Pig, Python, Ruby and Spark






This post is designed for a joint installation of Apache Flink 1.1.2, Golang 1.6, Apache Hadoop 2.6.0, MongoDB 2.4.9, MRJob 0.5.6, Perl Hadoop::Streaming 0.143060, Apache Pig 0.16.0, Apache Spark 1.6.2 (pre-built for Hadoop), Ubuntu Server 14.04.5 Long-Term-Support (LTS) and Wukong-Hadoop 0.2.0.

In this illustration we will consider the MovieLens population from the GroupLens MovieLens 10M dataset (Harper and Konstan, 2005). The specific 10M MovieLens datasets (files) considered are the ratings (ratings.dat file) and the movies (movies.dat file).  The aim of this post is to illustrate how to generate quick summaries of the MovieLens population from the datasets.

The measures that can be generated are as follows.

  • Number of ratings made by each UserID
  • Number of ratings for each MovieID
  • Number of ratings in each Rating score category
  • Average rating for each MovieID
  • Number of ratings for each MovieID genre
  • Average rating for each MovieID genre

The computed measures can then be incorporated into more detailed statistical analyses of the population. 

The measures can be computed using the MapReduce programming model. The MapReduce model can be implemented on files created from appropriate columns of the ratings and movies files. The first, second, third and fifth MapReduces can be implemented using an adjusted version of the word count configuration of the MapReduce model. 

The first key adjustment to the word count programming involves making an allowance for the structure of the data (i.e. the data is in column form rather than free flowing text). The second key adjustment involves treating blocks of numbers or special characters that designate entity ids as single words. 

In the resulting adjusted word count configuration, the MapReduce model will essentially involve mapping each UserID, MovieID, Rating score category or Genre category (i.e. keys) to 1 and then reducing the mapped key-value pairs to obtain the overall key counts.

The advantage of this interpretation of the word count MapReduce is that one can treat JavaScript Object Notation (JSON)/Binary JavaScript Object Notation (BSON) based MongoDB MapReduces as adjusted word counts (i.e. while actually being BSON-based counts of key-value pairs).

In this illustration we will also consider how to implement the adjusted word count MapReduce model to generate the fourth and sixth measures. In Flink, the batch word count MapReduce is applied to the data using the modified word count configuration interpretation.

The fourth MapReduce (for the fourth summary measure) will involve creating MovieID-Rating key-value mappings from two column dataset tuples and then reducing the key-value pairs to calculate the average rating for each key (MovieID). In this illustration this is referred to as implementing the average configuration of the MapReduce Model.

The sixth MapReduce similarly involves creating a mapping of the Genre-Rating key-value pairs and then reducing the key-value pairs to calculate the average rating for each key (Genre). 


In this illustration the word count MapReduce is implemented using twenty-five different methods. The methods result from blending four Hadoop Steaming interfaces/libraries (DMRGo, MRJob, Perl Hadoop::Streaming, Wukong-Hadoop) and five Big Data frameworks/platforms (Flink, Hadoop, MongoDB, Pig and Spark). 

In Hadoop, the word count is implemented using the Hadoop Streaming interface. In Flink, the batch word count is implemented in Java, Python and Scala. The Flink implementations will be illustrated in local mode. In Spark, the word count is implemented using a Scala Spark Pipe, PySpark Pipe, Java Spark Pipe and SparkR Pipe. The Spark implementations will also be illustrated in local mode. In Pig, the word count is implemented using Pig Streaming. In MongoDB, the (adjusted) word count is implemented using the JavaScript mapReduce() function in the MongoDB Shell and PyMongo (i.e. Code from bson.code in a PySpark application).  

The average MapReduce is implemented using a Python MRJob library program. The illustration will also consider how to construct query based summaries.


1. Prepare the data



The data is available in zip file format. The data from the ratings.dat file arranged into the following four columns.
















The Timestamps variable is not used in this illustration. The data can be arranged into five files for the job, namely, a UserID file, MovieID file, MovieID-Rating file, Rating score file and Fused MovieID-Rating file.

The UserID, MovieID, MovieID-Rating, Rating score and Fused MovieID-Rating files can be generated from the original ratings.dat file as follows.













The data from the movies.dat file can be arranged into the following three columns.













The files can then be used to generate three new files from the files generated from the ratings file. The first file that can be generated is the genres file or the genres counts source file. This can be generated by substituting each MovieID with its genre.

The second file that can be generated is the Genre-Rating file. This can be generated, analogously to the genres counts source file, by substituting the MovieID in the MovieID-Rating file with its genre.

The third file that can be generated is the Fused Genre-Rating file. This can be generated by fusing the two columns (Genre and Rating) in the Genre-Rating file.   

In summary, the files are generated as follows.









Once the files have been created the next step is to prepare the mappers and reducers.


2. Prepare the mapper and reducer sets



Go programming language mapper and reducer in DMRGo (Word count MapReduce configuration)



The DMRGo (Go) mapper and reducer can be prepared using a Go word count application developed using the DMRGo library available from this gist
The approach taken in this illustration is to create mapper and reducer files that are library based references to the Go programs housed in the application. The approach/method to perform this can be obtained from the tutorials in this manual, this post and this post.The first step involves creating this Mwordcount.go file (Mwordcount - Modified word count).

package main
import (
"flag"
"fmt"
"github.com/dgryski/dmrgo"
"log"
"os"
"runtime/pprof"
"strconv"
"strings"
"sync/atomic"
)
// As example, just to show we can write our own custom protocols
type WordCountProto struct{}
func (p *WordCountProto) UnmarshalKVs(key string, values []string, k interface{}, vs interface{}) {
kptr := k.(*string)
*kptr = key
vsptr := vs.(*[]int)
v := make([]int, len(values))
for i, s := range values {
v[i], _ = strconv.Atoi(s)
}
*vsptr = v
}
func (p *WordCountProto) MarshalKV(key interface{}, value interface{}) *dmrgo.KeyValue {
ks := key.(string)
vi := value.(int)
if vi == 1 {
return &dmrgo.KeyValue{ks, "1"}
}
return &dmrgo.KeyValue{ks, strconv.Itoa(vi)}
}
type MRWordCount struct {
protocol dmrgo.StreamProtocol // overkill -- we would normally just inline the un/marshal calls
// mapper variables
mappedWords uint32
}
func NewWordCount(proto dmrgo.StreamProtocol) dmrgo.MapReduceJob {
mr := new(MRWordCount)
mr.protocol = proto
return mr
}
func (mr *MRWordCount) Map(key string, value string, emitter dmrgo.Emitter) {
val := string(value)
characters := strings.Map(func(r rune) rune {
if r != ' ' {
return r
}
return ' '
},
val)
trimmed := strings.TrimSpace(characters)
words := strings.Fields(trimmed)
w := uint32(0)
for _, word := range words {
w++
kv := mr.protocol.MarshalKV(word, 1)
emitter.Emit(kv.Key, kv.Value)
}
atomic.AddUint32(&mr.mappedWords, w)
}
func (mr *MRWordCount) MapFinal(emitter dmrgo.Emitter) {
dmrgo.Statusln("finished -- mapped ", mr.mappedWords)
dmrgo.IncrCounter("Program", "mapped words", int(mr.mappedWords))
}
func (mr *MRWordCount) Reduce(key string, values []string, emitter dmrgo.Emitter) {
counts := []int{}
mr.protocol.UnmarshalKVs(key, values, &key, &counts)
count := 0
for _, c := range counts {
count += c
}
kv := mr.protocol.MarshalKV(key, count)
emitter.Emit(kv.Key, kv.Value)
}
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
func main() {
var use_proto = flag.String("proto", "wc", "use protocol (json/wc/tsv)")
flag.Parse()
if *cpuprofile != "" {
f, err := os.Create(*cpuprofile)
if err != nil {
log.Fatal(err)
}
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
}
var proto dmrgo.StreamProtocol
if *use_proto == "json" {
proto = new(dmrgo.JSONProtocol)
} else if *use_proto == "wc" {
proto = new(WordCountProto)
} else if *use_proto == "tsv" {
proto = new(dmrgo.TSVProtocol)
} else {
fmt.Println("unknown proto=", *use_proto)
os.Exit(1)
}
wordCounter := NewWordCount(proto)
dmrgo.Main(wordCounter)
}
view raw MGoWordcount.go hosted with ❤ by GitHub


The Mwordcount.go file can be saved in local system folder called: <Local system Go word count application folder>. The program can be compiled into a Go word count application.

The following mapper and reducer Bash files can then be prepared.

#!/bin/bash
<Local system Go Wordcount application folder>/./Go –run=mapper
view raw gomapper.sh hosted with ❤ by GitHub


#!/bin/bash
<Local system Go Wordcount application folder>/./Go –run=reducer
view raw goreducer.sh hosted with ❤ by GitHub

The resulting files are the word count configuration MapReduce. The MapReduce can then be run in a choice of facilities using appropriate tools (i.e Bash line command-line interface submits in Ubuntu, shell programs or applications). The facilities that will considered for this purpose in this illustration are essentially the Hadoop Streaming interface, Scala Spark Pipe, PySpark Pipe, SparkR Pipe, Java Spark Pipe and Pig Stream operator. 

MRJob mapper and reducer (Word count MapReduce configuration)



The approach for constructing the Python MRJob (MRJob) mapper and reducer files is similar to the approach followed for the Go mapper and reducer files.

Essentially, one can first construct a Python MRJob word count Mapreduce program. The next step is to create Bash files using MRJob mapper and reducer library based references to Python code housed in a Python MRJob word count program (mrjobwc_program.py). The approach/method to perform this can be obtained from the tutorials in this manual and this document.

#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep:
import re
class MRWordFreqCount(MRJob):
def steps(self):
return [
MRStep(mapper=self.mapper,
reducer=self.reducer)
]
def mapper(self,_, line):
for word in line.split():
yield word, 1
def reducer(self, key, values):
yield key, sum(values)
if __name__ == '__main__':
MRWordFreqCount.run()

The mrjobwc_program.py file can be saved in local system folder <Local System Python MRJob WC program Folder>. One can then prepare following mapper and reducer Bash files.



#!/bin/bash
python <Local System Python MRJob WC program Folder>/mrjobwc_program.py –step-num=0 –mapper


#!/bin/bash
python <Local System Python MRJob WC program Folder>/mrjobwc_program.py –step-num=0 –reducer

Perl Hadoop::Streaming mapper and reducer (Word count MapReduce configuration)



The Perl Hadoop::Streaming (Hadoop::Streaming) mapper and reducer can be created using the Perl Hadoop::Streaming library. The approach/method to perform this can be obtained from the tutorials in this document and this gistThe following mapper and reducer files can be prepared.

#!/usr/bin/env perl
package Wordcount::Mapper;
use Moo;
with ‘Hadoop::Streaming::mapper’;
sub map {
my ($self, $line) = @_;
for (split /\s+/, $line) {
$self->emit( $_ => 1 );
}
}
package main;
wordcount::Mapper->run;
view raw map.pl hosted with ❤ by GitHub


#!/usr/bin/env perl
package WordCount::Reducer;
use Moo;
with qw/Hadoop::Streaming::Reducer/;
sub reduce {
my ($self, $key, $values) = @_;
my $count = 0;
while ( $values->has_next ) {
$count++;
$values->next;
}
$self->emit( $key => $count );
}
package main;
WordCount::Reducer->run;
view raw reduce.pl hosted with ❤ by GitHub

Wukong-Hadoop mapper and reducer (Word count MapReduce configuration)



The approach for creating the Ruby Wukong-Hadoop (Wukong-Hadoop) mapper and reducer files follows analogously the approach for Go and MRJob wordcount.

The first step is to create a Hadoop-Wukong library word count program. One can then use the library based references to the Ruby code housed in a Ruby Wukong-Hadoop library word count program (wuword_count.rb) to create Bash mapper and reducer files. The approach/method to perform this can be obtained from the tutorial in this gist.

Wukong.processor(:mapper) do
field :min_length, Integer, :default => 1
field :max_length, Integer, :default => 256
field :split_on, Regexp, :default => /\s+/
field :remove, Regexp, :default => /[^a-zA-Z0-9\’]+/
field :fold_case, :Boolean, :default => false
def process string
tokenize(string).each do |token|
yield token if acceptable?(token)
end
end
private
def tokenize string
string.split(split_on),.map do |token|
stripped = token
fold_case ? stripped.downcase : stripped
end
end
def acceptable? token
(min_length..max_length).include?(token.length)
end
end
Wukong.processor(:reducer, Wukong::Processor::Accumulator) do
attr_accessor :count
def start record
self.count = 0
end
def accumulate record
self.count += 1
end
def finalize
yield [key, count].join(“\t”)
end
end
view raw wuword_count.rb hosted with ❤ by GitHub

The Ruby word count program can be saved in local system folder <Local System Wu WC program Folder>. One can then prepare the following mapper and reducer Bash files.

#!/bin/bash
wu-local <Local System Wu WC program Folder>/wuword_count.rb --run=mapper
view raw wumapper.sh hosted with ❤ by GitHub


#!/bin/bash
wu-local <Local system Wu WC program Folder>/wuword_count.rb --run=reducer
view raw wureducer.sh hosted with ❤ by GitHub

MRJob mapper and reducer (Average MapReduce configuration)



The MRJob mapper and reducer for the Average MapReduce configuration are, analogously to the MRJob word count MapReduce case, library based references to Python code housed in a Python MRJob library average MapReduce program (mrjobavg_program.py). The approach/method to perform this can be obtained from the tutorials in this manual, this document and this post.

#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep:
import re
class MRWordFreqCount(MRJob):
def steps(self):
return [
MRStep(mapper=self.mapper,
reducer=self.reducer)
]
def mapper(self,_, line):
tokenid, value= line.split('\t')
yield tokenid,float(value)
def reducer(self, key, values):
t=list(values)
l=len(t)
z=sum(t)
yield key, z/l
if __name__ == '__main__':
MRWordFreqCount.run()

The mrjobavg_program.py file can be saved in local system folder <Local System Python MRJobAvg program Folder>. One can then prepare the following mapper and reducer Bash files.

#!/bin/bash
python <Local System Python MRJobAvg program Folder>/mrjobavg_program.py –step-num=0 –mapper


#!/bin/bash
python <Local System Python MRJobAvg program Folder>/mrjobavg_program.py –step-num=0 –reducer

3. Process the data in Flink, Hadoop, MongoDB, Pig and Spark



The UserID MapReduce calculation is illustrated in Hadoop (Go word count), Pig (Go word count), MongoDB (JavaScript and PyMongo Code) and Java Flink (Batch word count).

The MovieID MapReduce calculation is illustrated in Scala Spark (Go word count), Java Spark (Hadoop::Streaming word count) and MongoDB (JavaScript).

The Rating category counts MapReduce calculation is illustrated in PySpark (MRJob word count), Hadoop (Wukong-Hadoop word count) and Java Flink (Batch word count).

The average MapReduce calculation is illustrated in seven versions for the MovieID-Rating key-value map using two dataset structures. The first structure is a tab separated key-value two column dataset (MovieID-Rating file). The second structure is a single column key dataset (Fused MovieID-Rating file). The resulting average MapReduces are illustrated in the following configurations.

  • MRJob average configuration
  • Go word count configuration
  • MRJob word count configuration
  • Hadoop::Streaming word count configuration
  • Wukong-Hadoop word count configuration
  • Python Flink batch word count configuration
  • Scala Flink batch word count configuration

The specific MovieID averages calculation illustrations are as follows.


  • Java Spark Pipe (MRJob average configuration)
  • Java Spark Pipe (Go word count configuration)
  • Scala Spark Pipe (Hadoop::Streaming word count configuration)
  • SparkR Pipe (MRJob word count configuration)
  • Pig Streaming (Wukong-Hadoop word count configuration)
  • Python Flink (Batch word count configuration)
  • Scala Flink (Batch word count configuration)

The Genre counts MapReduce calculation is illustrated in SparkR Pipe (Go word count), Scala Spark Pipe (Wukong-Hadoop word count), Pig Streaming (Hadoop::Streaming word count) and Java Flink (Batch word count).

The fundamental structure of the Genre averages calculation is the same as (or similar to) that of the MovieID-Rating averages. The specific Genre averages calculation illustrations are as follows.


  • PySpark Pipe (MRJob average configuration)
  • Java Spark Pipe (Wukong-Hadoop word count configuration)
  • Scala Spark (Hadoop::Streaming word count configuration)
  • PySpark (Go word count configuration)
  • Hadoop (MRJob word count configuration)
  • Python Flink (Batch word count configuration)
  • Scala Flink (Batch word count configuration)



UserID counts




UserID counts (Go Hadoop Streaming)




In order to implement the UserID MapReduce using the Hadoop Streaming facility and a Go word count application, the following arrangements may be made.

Input data: InputData.txt
Hadoop Distributed File System (HDFS) input data folder: <HDFS Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Go mapper file: gomapper.sh
Local system Go reducer file: goreducer.sh
Local system Hadoop Streaming jar file folder: <Local System Hadoop Streaming jar Folder>
Hadoop Streaming jar file:  hadoop-streaming-2.6.0.jar
HDFS output data folder: <HDFS Output Data Folder>

The next step is to create this Bash file using the tutorial in this post and to save the file in local system folder: <Local System Bash  Hadoop Streaming Submit File Folder>.

#!/bin/bash
hadoop jar <Local System Hadoop Streaming jar Folder>/hadoop-streamig-2.6.0.jar \
-file $1 -mapper $1 -file $2 -reducer $2 -partitioner \
org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner -input $3 -output $4

The next step is to run the following command on Ubuntu Server 14.04.5 LTS (as outlined in this tutorial).

$ bash <Local System Bash Hadoop Streaming Submit File Folder>/HadoopStreamingSubmit.sh \
<Local System mapper Folder>/gomapper.sh \
<Local System reducer Folder>/goreducer.sh \
<HDFS Input Data Folder>/InputData.txt \
<HDFS Output Data Folder>


This will generate an output file in HDFS with the following contents excerpt.



















The next step is to generate counts of the UserIDs using a Go word count MapReduce in the Pig Streaming facility.



UserID counts (Pig Go Streaming Script)




In order to implement of the UserID counts MapReduce using the Pig Streaming facility with a Go word count application the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Go mapper file: gomapper.sh
Local system Go reducer file: goreducer.sh
Local system Pig Steam script file folder: <Local System Pig Script File Folder>
Pig Stream script file: PigGoStream.pig
HDFS output data folder: <HDFS Output Data Folder>

The next step is to create the following Pig script file using methods obtained from this book,  this wiki and this manual.

col = LOAD '<HDFS Input Data Folder>/InputData.txt' USING PigStorage() AS (id:chararray);
DEFINE gomapper `gomapper.sh` SHIP ('<Local System mapper Folder>/
gomapper.sh');
DEFINE goreducer `goreducer.sh` SHIP ('<Local System reducer Folder>/
goreducer.sh');
mcol = STREAM col THROUGH gomapper;
omcol = ORDER mcol by $0;
romcol = STREAM omcol THROUGH go reducer AS (key: chararray, count:float);
STORE romcol INTO '<HDFS Output Folder>';
view raw PigGoStream.pig hosted with ❤ by GitHub

The next step is to save the Pig script file in local system folder: <Local System Pig Script File Folder>. The Pig script can be run in Pig from the command line (using mapreduce mode).

$ pig <Local System Pig Script File Folder>/PigGoStream.pig
view raw Pig_Go_Submit hosted with ❤ by GitHub


This will generate an output file in HDFS with the following contents excerpt.

















UserID counts (Java Flink Batch word count Application)



In order to implement the UserID counts MapReduce using the Java Flink Batch word count examples jar file one can follow the guidelines in this post. The following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Java Flink Batch word count examples jar file: WordCount.jar
Local system Java Flink Batch word count examples jar file folder:
<Local System Java Flink Batch word count examples jar File Folder>
Local system output data file: OutputData.txt
Local system output data folder: <Local System Output Data Folder>

The next step is to save the WordCount.jar file in local system folder:
<Local System Java Flink Batch word count examples jar File Folder>. 
One can then run the application using the Flink command-line interface.

./bin/flink run <Local System Java Flink Batch word count examples jar File Folder>/WordCount.jar
--input file://<Local System Input Data Folder>/InputData.txt
--output file://<Local System Output Data Folder>/OutputData.txt
view raw JavaFlinkRun hosted with ❤ by GitHub


This will generate a local system output file with the following contents excerpt.


















MovieID counts



MovieID counts (Scala Spark Go Pipe Application)


In order to implement the MovieID counts MapReduce using the Scala Spark Pipe facility with a Go word count application, the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Go mapper file: gomapper.sh
Local system Go reducer file: goreducer.sh
Local system jar file folder: <Local System jar File Folder>
Scala Spark Pipe App jar: ScalaGoPipeApp.jar
Local system output data folder: <Local System Output Data Folder>
Scala package: scalapackage
Scala object: scalaobject
Local system query scripts folder: <Local System Query Scripts Folder>
Local system rmongodb query script file: rmongodbqueryscript.R

Local system PyMongo query script file: pymongoqueryscript.py
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration

The next step is to create the following Scala Spark Go Pipe application file using the guidelines in this post, this post, this post, the Spark 1.6.2 Quick Start, the Spark 1.6.2 SQL Programming Guide, the aggregation section of the MongoDB manual, this gist and the PyMongo guide.

package scalapackage
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
object scalaobject{
def main(args: Array[String]){
// Load up the contexts and the sqlContext implicits
val conf = new.SparkConf().setAppname("SparkScalaGoPipeApp")
val sc =new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLCotext(sc)
// Check whether sufficient params were supplied
if (args.length < 2) {
println("Usage: ScalaWordCount ,input> <output>")
System.exit(1)
}
val rdd = sc.textFile(args(0))
val scriptPathm = "<Local System mapper Folder>/gomapper.sh"
val scriptPathr = "<Local System reducer Folder>/goreducer.sh"
val pipeRDD = rdd.pipe(scriptPathm).coalesce(1).sortBy[String]({a => a}, false).flatMap(line => line.split(“,“))
val pipe1RDD = pipeRDD.pipe(scriptPathr).flatMap(line =>line.split(","))
pipe1RDD.saveAsTextFile(args(1))
// Import Spark SQL implicits
import sqlContext.implicits._
// Spark SQL query
val bookmarks = pipe1RDD.map(_.split("\t")).map(p => (p(0), p(1).toFloat)).cache()
val bookmarks1 = bookmarks.toDF()
bookmarks1.show()
bookmarks1.describe.show()
bookmarks1.filter(bookmarks1("_2")>20).show()
bookmarks1.filter("_1 = 2").show()
// NoSQL query
import scala.sys.process._
// Next run query using rmongodb
val rmongodbquery =
"Rscript <Local System rmongodbqueryscript Folder>/rmongodbqueryscript.py".!!
print(rmongodbquery)
// Next run query using PyMongo
val pymongoquery =
"python <Local System PyMongoqueryscript Folder>/PyMongoqueryscript.py".!!
print(pymongoquery)
//Stop Spark context
sc.stop
}
}

#!/usr/bin/env Rscript
library(rmongodb)
# connect to MongoDB
mongo = mongo.create(host = "localhost")
mongo.is.connected(mongo)
mongo.get.databases(mongo)
mongo.get.database.collections(mongo, db="MLens")
# UserID counts
bson <-mongo.find.one(mongo, "MLens.UserID_Counts", query = '{"_id": 1}')
bson
bson1 <-mongo.find.one(mongo, "MLens.MovieID_Counts", query = '{"_id": 2}')
bson1
# Close connection
mongo.destroy(mongo)

#!/usr/bin/env python
import pymongo
from pymongo import MongoClient
# set up the environment
client = MongoClient()
# connect to the database
db=client.MLens
# UserID counts
collection = db.UserID_Counts
q = collection.find_one(1)
print(q)
# MovieID counts
collection= db.MovieID_Counts
r = collection.find_one(2)
print(r)


The next step is to export the file into a jar file and to save the jar file in local system folder: <Local System Scala jar File Folder>.  One can then run the Scala Spark Go Pipe application using the Spark-submit facility.

$ YOUR_SPARK_HOME/bin/spark-submit \
--master local[4] \
--class “scalapackage.scalaobject” \
<Local System jar File Folder>/ScalaGoPipeApp.jar \
<Local System Input Data Folder>/InputData.txt <Local System Output Data Folder>
view raw Scala_Go_Submit hosted with ❤ by GitHub




This will generate the following Spark SQL system output, MongoDB based NoSQL system output and a local system output file with the following contents excerpt.





























































MovieID counts (Java Spark Perl Pipe Application)



In order to implement the MovieID counts MapReduce using a Java Spark Pipe application and the Perl Hadoop::Streaming library (Word count configuration), the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Perl mapper file: map.pl
Local system Perl reducer file: reduce.pl
Local system Java Spark Perl Pipe application folder: <Local System jar File Folder>
Java Spark Perl Pipe application java file: JavaSparkPerlPipeApp.java
Java Spark Perl Pipe class: JavaSparkPerlPipe
Java Spark Perl Pipe application jar file: JavaSparkPerlPipeApp.jar
Local system output data folder: <Local System Output Data Folder>
Local system query scripts folder: <Local System Query Scripts Folder>
Local system query scripts submits Bash file: JavaScriptsSubmits.sh
Local system rmongodb query script file: rmongodbqueryscript.R

Local system PyMongo query script file: pymongoqueryscript.py
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration


The next step is to create the following Java Spark Hadoop::Streaming pipe application file (JavaSparkPerlPipeApp.java) using methods outlined in this book, this book, the Spark 1.6.2 Quick Start, the Spark 1.6.2 SQL Programming Guide, this post, this manual and the aggregation section of the MongoDB manual.

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamreader;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkFiles;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
// Import factory methods provided by DataTypes
import org.apache.spark.sql.types.DataTypes;
// Import StructType and StructField
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructField;
// Import RowFactory
import org.apache.spark.sql.RowFactory;
import scala.Tuple2;
public class SparkJavaPerlPipeApp {
public static void main(String[] args) throws Exception {
String inputFile = args[0];
String outputFolder = args[1];
// Create a Java Spark Context.
// Create a Java Spark SQLContext.
SparkConf conf = new SparkConf().setAppName("SparkJavaGoPipeApp");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
// load out input data.
JavaRDD<String> input = sc.textFile(inputFile);
// Pipe the data to external script
String mapperScript = "<Local System mapper Folder>/map.pl";
String mapperScriptName = "map.pl";
String reducerScript = "<Local System mapper Folder>/reduce.pl";
String reducerScriptName = "reduce.pl";
// Add the mapper
sc.addFile(mapperScript);
JavaRDD<String> pipeinputs = input;
JavaRDD<String> piperdd = pipeinputs.pipe(SparkFiles.get(mapperScriptName));
// Add the reducer
sc.addFile(reducerScript);
JavaPairRDD<String, Integer> sortpiperdd = piperdd.mapToPair(
new PairFunction <
String, // T <String, Integer>
String, // K key
Integer // V Integer
>(){
/**
*
*/
private static final long serialVersionUID = 1L;
public Tuple2<String, Integer> call (String x){
String[] record = x.split("\t");
String index = record[0];
Integer freq= new Integer(record[1]);
return new Tuple2<String,Integer>
(index,freq);
}
}).sortByKey().coalesce(2, true);
JavaRDD<String> pipeInputs2 = sortpiperdd.map(
new Function <Tuple2 <String, Integer>, String> () {
public String call(Tuple2<String, Integer> s) {
{return s._1() + "\t" +s._2();
}}});
JavaRDD<String> piperdd2 = pipeinputs2.pipe(SparkFiles.get(reducerScriptName));
piperdd2.saveAsTextFile(outputFolder);
// The schema is encoded in a string
String schemaString = "key freq";
// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<StructField>();
for (String fieldName: schemaString.split(" ")) {
fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true));
}
StructType schema = DataTypes.createStructType(fields);
// Convert records of the RDD (piperdd2) to Rows.
JavaRDD<Row> rowRDD = piperdd2.map(
new Function<String, Row>() {
/**
*
*/
private static final long serialVersionUID = 1L;
public Row call(String record) throws Exception {
String[] fields = record.split("\t");
return RowFactory.create(fields[0], fields[1].trim());
}
});
// Apply the schema to the RDD
DataFrame piperdd2DataFrame = sqlContext.createDataFrame(rowRDD, schema);
// Register the DataFrame as table.
piperdd2DataFrame.registerTempTable("piperdd2");
// SQL can be run over RDDs that have been registered as tables.
DataFrame results = sqlContext.sql("SELECT key, freq FROM piperdd2 WHERE key =2");
// The results of SQL queries are DataFrames and support all the normal RDD oprations
// The columns of a row in the result can be accessed by ordinal.
List<String> names = results.javaRDD().map(new Function<Row, String>(){
/**
*
*/
private static final long serialversionUID = 1L;
public String call(Row row) {
return "MovieID: Counts: " +row.getString(0) + "\t " + row.getString(1);
}
}).collect();
for (String s: names) {
System.out.println(s);
}
// NoSQL Queries in RMongoDB and PyMongo
String x = null;
try {
// run the Unix "bash <Local System Query Scripts Folder>/JavaScriptsSubmit.sh" command
// using the Runtime exec method:
Process p = Runtime.getRuntime().exec("bash <Local System Query Scripts Folder>/JavaScriptsSubmit.sh");
BufferedReader stdInput = new BufferedReader(new
InputStreamReader(p.getInputStream()));
BufferedReader stdError = new BufferedReader(new
InputStreamReader(p.getErrorStream()));
// read the output from the command
System.out.println("Here is the standard output of the command:\n");
while ((x = stdInput.readLine()) != null) {
System.out.println(x);
}
// read any errors from the attempted command
System.out.println("Here is the standard error of the command (if any):\n");
while ((x = stdError.readLine()) != null) {
System.out.println(x);
}
System.exit(0);
}
catch (IOException e) {
System.out.println("exception happened - here's what I know: ");
e.printStackTrace();
System.exit(-1);
};
}
}

#!/bin/bash
Rscript <Local System Query Scripts Folder>/rmongodbqueryscript.R
python <Local System Query Scripts Folder>/pymongoqueryscript.py

#!/usr/bin/env Rscript
library(rmongodb)
# connect to MongoDB
mongo = mongo.create(host = "localhost")
mongo.is.connected(mongo)
mongo.get.databases(mongo)
mongo.get.database.collections(mongo, db="MLens")
# UserID counts
bson <-mongo.find.one(mongo, "MLens.UserID_Counts", query = '{"_id": 1}')
bson
bson1 <-mongo.find.one(mongo, "MLens.MovieID_Counts", query = '{"_id": 2}')
bson1
# Close connection
mongo.destroy(mongo)

#!/usr/bin/env python
import pymongo
from pymongo import MongoClient
# set up the environment
client = MongoClient()
# connect to the database
db=client.MLens
# UserID counts
collection = db.UserID_Counts
q = collection.find_one(1)
print(q)
# MovieID counts
collection= db.MovieID_Counts
r = collection.find_one(2)
print(r)

The next step is to save the Java Spark Perl Pipe application file in local system folder: <Local system Java Spark Perl Pipe Application Folder>

The next step is to run the application using the Spark-submit facility.

$ YOUR_SPARK_HOME/bin/spark-submit \
--master local[4] \
--class “SparkJavaPerlPipeApp” \
<Local System jar File Folder>/JavaSparkPerlPipeApp.jar <Local System Input Data Folder>/InputData.txt <Local System Output Data Folder>


This will generate a local system output file with the following content excerpt, Spark SQL system output and (MongoDB-based) NoSQL system output.





























MovieID counts (Java Flink batch word count application)



In order to implement the MovieID counts MapReduce using the Java Flink word count batch examples application jar, the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Java Flink Batch wordcount examples jar file: WordCount.jar
Local system Java Flink Batch wordcount examples jar file folder:
<Local System Java Flink Batch wordcount examples jar File Folder>
Local system output data file: OutputData.txt
Local system output data folder: <Local System Output Data Folder>

The next step is to save the WordCount.jar file in the local system folder:
<Local System Java Flink Batch wordcount examples jar File Folder>. 

One can then run the application using the Flink run facility.

./bin/flink run <Local System Java Flink Batch word count examples jar File Folder>/WordCount.jar
--input file://<Local System Input Data Folder>/InputData.txt
--output file://<Local System Output Data Folder>/OutputData.txt
view raw JavaFlinkRun hosted with ❤ by GitHub


This will generate a local system output file with the following contents excerpt.


















UserID counts and MovieID counts




UserID counts and MovieID counts (MongoDB Shell)




The UserID and MovieID counts can also be generated using the mapReduce() function in the mongo Shell. In order to implement the MapReduces in the mongo Shell the following arrangements/selections may be made.

Database: MLens
UserID Collection: UserID
MovieID Collection: MovieID
Output collection name for UserID MapReduce: UserID_Counts
Output collection name for MovieID MapReduce: MovieID_Counts

The next step is to start the mongo Shell, switch to the MLens database (with the use MLens command) and to view the MLens collections with the show collections command.









The next step is to run the following shell program, which will run the MapReduce to calculate the UserID counts and generate a specific BSON query for ID 2 using the db.UserID_Counts.find({“_id”:2}).pretty() command.

db.ItemProfile.mapReduce(
function() { emit(this.UserID,1); },
function(key, values) {return Array.sum(values)}, {
out:"UserID_Counts" }
)
db.UserID_Counts.find({"_id": 2}).pretty()


The program commands will generate the following output.













The UserID_Counts can also be viewed with the general version db.UserID_Counts.find().pretty() command. The command will generate the following output.
















The next step is to run the following program to run the MapReduce to calculate the MovieID counts.

db.ItemProfile.mapReduce(
function() { emit(this.MovieID,1); },
function(key, values) {return Array.sum(values)}, {
out:"UserID_Counts" }
)


This will yield the following output for the MovieID counts MapReduce.





The individual JSON queries for MovieID 20 using the db.MovieID_Counts.find({“_id”:20}).pretty() will generate the following output:





The next step is to generate the counts for the Rating score categories in the Spark Pipe facility using a PySpark Pipe application. The PySpark Pipe application can also be used to implement the UserID counts MapReduce in MongoDB using PyMongo Code. 


Rating score counts and UserID counts (NoSQL query)



Rating score counts and UserID counts (PySpark MRJob Pipe Application)



In order to run the Rating score counts MapReduce with the UserID counts MapReduce using the PySpark Pipe facility, MRJob library (Word count configuration) and PyMongo, the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system MRJob mapper file: mrjobwcmapper.sh
Local system MRJob reducer file: mrjobwcreducer.sh
Local system PySpark MRJob Pipe application folder: <Local System PySpark MRJob Pipe Application Folder>
PySpark MRJob Pipe application file: PySparkMRJobPipeApp.py
Local system output data folder: <Local System Output Data Folder>
Local system query scripts folder: <Local System Query Scripts Folder>
Local system rmongodb query script file: rmongodbqueryscript.R
MongoDB: Have an instance of MongoDB running with the arrangements as outlined in the MongoDB part of the illustration

The next step is to create the following PySpark MRJob application file (PySparkMRJobPipeApp.py) using methods outlined in this post, this post, this post, the Spark 1.6.2 Quick Start, the Spark 1.6.2 SQL Programming Guide, this post, this manual and the aggregation section of the MongoDB manual.

"""PysparkMRJobPipeApp.py"""
import sys
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import Row
import pymongo
from pymongo import mongoClient
import subprocess
from bson.code import Code
sc = SparkContext("local", "PySparkMRJobPipeApp")
sqlContext = SQLContext(sc)
rdd = sc.textFile("Local System Input Data Folder>/InputData.txt")
def preparem(line):
"""Each line contains fields separated by a tab."""
return '\t'.join(line.split('\t')) + '\n'
rdd1 = rdd.map(lambda s: preparem(s)).pipe('<Local System mapper Folder>/mrjobwcmapper.sh').map(lambda x: x.split("\t")).sortByKey(False).cache()
def preparer(line):
"""Each line contains fields separated by a tab."""
return '\t'.join(line) + '\n'
rdd2 = rdd1.map(lambda s: preparer(s)).pipe('<Local System mapper Folder>/mrjobwcreducer.sh').map(lambda x: x.split("\t")).cache()
rdd2.saveAsTextFile("file://<Local System Output Data Folder>")
#sqlContext
#from pyspark.sql import Row
Rating= Row('UserID', 'Freq')
rdd3 = rdd2.map(lambda x: (x[0],x[1])).map(lambda r: Rating(*r)) .cache()
df = sqlContext.createDataFrame(rdd3)
df.show(25)
df.describe("Freq").show()
df.where(df.Rating == "\"5\"").collect()
# Generate the UserID MapReduce in MongoDB using PyMongo
#set up the environment
client = MongoClient()
#v connect to the database
db = client.MLens
# UserID counts
collection = db.UserID
map = Code("function() {"
" emit(this.UserID, 1);"
"}")
reduce = Code("function(key,values) {"
" var total = 0;"
" for (var i = 0; i < values.length; i++) {"
" total += values[i];"
" }"
" return total;"
"}")
result = collection.map_reduce(map, reduce, "PyMongoresults1")
for doc in result.find().limit(10):
print doc
x = db.PyMongoresults1.find_one()
print(x)
y = db.PyMongoresults1.find_one({"_id" : 2})
print(y)
# Query using rmongodb and subprocess
#Define command and arguements
command = 'Rscript'
path2script = '<Local System Query Scripts Folder>/rmongodbqueryscript.R'
#Build subprocess command
cmd =[command, path2script]
# check_output will run the command and store the result
x = subprocess.check_output(cmd, universal_newlines=True)
print('UserID and MovieID Counts subprocess rmongodb Query is:')
print(x)


#!/usr/bin/env Rscript
library(rmongodb)
# connect to MongoDB
mongo = mongo.create(host = "localhost")
mongo.is.connected(mongo)
mongo.get.databases(mongo)
mongo.get.database.collections(mongo, db="MLens")
# UserID counts
bson <-mongo.find.one(mongo, "MLens.UserID_Counts", query = '{"_id": 1}')
bson
bson1 <-mongo.find.one(mongo, "MLens.MovieID_Counts", query = '{"_id": 2}')
bson1
# Close connection
mongo.destroy(mongo)



One can then save the PySpark MRJob application file in local system folder: <Local System PySpark MRJob Pipe Application Folder> and run the application using the Spark-submit facility.

$ YOUR_SPARK_HOME/bin/spark-submit \
--master local[4] \
<Local System PySpark MRJob Pipe Application Folder>/PySparkMRJobPipeApp.py


This will generate the following Spark SQL system output, MongoDB-based NoSQL system output and a local system file with the following contents excerpt.



































Rating score counts (Wukong-Hadoop Hadoop Streaming)


In order to implement the Rating counts MapReduce using Hadoop Streaming and the Wukong-Hadoop library (Word count configuration), the following arrangements may be made.

Input data: InputData.txt
Hadoop Distributed File System (HDFS) input data folder: <HDFS Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Wukong-Hadoop mapper file: wumapper.sh
Local system Wukong-Hadoop reducer file: wureducer.sh
Local system Hadoop Streaming jar file folder: <Local System Hadoop Streaming jar File Folder>
Hadoop Streaming jar file:  hadoop-streaming-2.6.0.jar
HDFS output data folder: <HDFS Output Data Folder>

The next step is to save the following Bash file in local system folder:
<Bash Hadoop Streaming Submit File Folder>.

#!/bin/bash
hadoop jar <Local System Hadoop Streaming jar Folder>/hadoop-streamig-2.6.0.jar \
-file $1 -mapper $1 -file $2 -reducer $2 -partitioner \
org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner -input $3 -output $4


The next step is to run the following command in Ubuntu Server 14.04.5 LTS.

$ bash <Local System Bash Hadoop Streaming Submit File Folder>/HadoopStreamingSubmit.sh \
<Local System mapper Folder>/wumapper.sh \
<Local System reducer Folder>/wureducer.sh \
<HDFS Input Data Folder>/InputData.txt \
<HDFS Output Data Folder>


This will generate an output file in HDFS with the following contents excerpt.









MovieID ratings average



MovieID ratings average (Java Spark MRJob Pipe Application)



In order to implement the MovieID ratings average MapReduce using the Java Spark Pipe facility and the MRJob library (Average configuration), the following arrangements may be made

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local system mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system MRJob mapper file: mrjobavgmapper.sh
Local system MRJob reducer file: mrjobavgreducer.sh
Local system jar file folder: <Local System jar File Folder>
Java Spark Pipe MRJob App jar: JavaSparkMRJobAvgPipeApp.jar
Local system output data foler: <Local System Output Data Folder>
Java class: JavaSparkMRJobAvgPipe
Local system query scripts folder: <Local System Query Scripts Folder>
Local system query scripts submits Bash file: JavaScriptsSubmits.sh
Local system rmongodb query script file: rmongodbqueryscript.R

Local system PyMongo query script file: pymongoqueryscript.py
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration

The next step is to create the following java file (JavaSparkMRJobAvgPipeApp.java).

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamreader;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkFiles;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
// Import factory methods provided by DataTypes
import org.apache.spark.sql.types.DataTypes;
// Import StructType and StructField
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructField;
// Import RowFactory
import org.apache.spark.sql.RowFactory;
import scala.Tuple2;
public class SparkJavaMRJobAvgPipeApp {
public static void main(String[] args) throws Exception {
String inputFile = args[0];
String outputFolder = args[1];
// Create a Java Spark Context.
// Create a Java Spark SQLContext.
SparkConf conf = new SparkConf().setAppName("SparkJavaMRJobAvgPipeApp");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
// load out input data.
JavaRDD<String> input = sc.textFile(inputFile);
// Pipe the data to external script
String mapperScript = "<Local System mapper Folder>/mrjobavgmapper.sh";
String mapperScriptName = "mrjobavgmapper.sh";
String reducerScript = "<Local System reducer Folder>/mrjobavgreducer.sh";
String reducerScriptName = "mrjobavgreducer.sh";
// Add the mapper
sc.addFile(mapperScript);
JavaRDD<String> pipeinputs = input;
JavaRDD<String> piperdd = pipeinputs.pipe(SparkFiles.get(mapperScriptName));
// Add the reducer
sc.addFile(reducerScript);
JavaPairRDD<String, Integer> sortpiperdd = piperdd.mapToPair(
new PairFunction <
String, // T <String, Integer>
String, // K key
Integer // V Integer
>(){
/**
*
*/
private static final long serialVersionUID = 1L;
public Tuple2<String, Integer> call (String x){
String[] record = x.split("\t");
String index = record[0];
Integer freq= new Integer(record[1]);
return new Tuple2<String,Integer>
(index,freq);
}
}).sortByKey().coalesce(2, true);
JavaRDD<String> pipeInputs2 = sortpiperdd.map(
new Function <Tuple2 <String, Integer>, String> () {
public String call(Tuple2<String, Integer> s) {
{return s._1() + "\t" +s._2();
}}});
JavaRDD<String> piperdd2 = pipeinputs2.pipe(SparkFiles.get(reducerScriptName));
piperdd2.saveAsTextFile(outputFolder);
// The schema is encoded in a string
String schemaString = "key freq";
// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<StructField>();
for (String fieldName: schemaString.split(" ")) {
fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true));
}
StructType schema = DataTypes.createStructType(fields);
// Convert records of the RDD (piperdd2) to Rows.
JavaRDD<Row> rowRDD = piperdd2.map(
new Function<String, Row>() {
/**
*
*/
private static final long serialVersionUID = 1L;
public Row call(String record) throws Exception {
String[] fields = record.split("\t");
return RowFactory.create(fields[0], fields[1].trim());
}
});
// Apply the schema to the RDD
DataFrame piperdd2DataFrame = sqlContext.createDataFrame(rowRDD, schema);
// Register the DataFrame as table.
piperdd2DataFrame.registerTempTable("piperdd2");
// SQL can be run over RDDs that have been registered as tables.
DataFrame results = sqlContext.sql("SELECT key, freq FROM piperdd2 WHERE key = '\"2\"'");
// The results of SQL queries are DataFrames and support all the normal RDD oprations
// The columns of a row in the result can be accessed by ordinal.
List<String> names = results.javaRDD().map(new Function<Row, String>(){
/**
*
*/
private static final long serialVersionUID = 1L;
public String call(Row row) {
return "MovieID: Counts: " + row.getString(0) + "\t " + row.getString(1);
}
}).collect();
for (String s: names) {
System.out.println(s);
}
// NoSQL Queries in RMongoDB and PyMongo
String x = null;
try {
// run the Unix "bash <Local System Query Scripts Folder>/JavaScriptsSubmit.sh" command
// using the Runtime exec method:
Process p = Runtime.getRuntime().exec("bash <Local System Query Scripts Folder>/JavaScriptsSubmit.sh");
BufferedReader stdInput = new BufferedReader(new
InputStreamReader(p.getInputStream()));
BufferedReader stdError = new BufferedReader(new
InputStreamReader(p.getErrorStream()));
// read the output from the command
System.out.println("Here is the standard output of the command:\n");
while ((x = stdInput.readLine()) != null) {
System.out.println(x);
}
// read any errors from the attempted command
System.out.println("Here is the standard error of the command (if any):\n");
while ((x = stdError.readLine()) != null) {
System.out.println(x);
}
System.exit(0);
}
catch (IOException e) {
System.out.println("exception happened - here's what I know: ");
e.printStackTrace();
System.exit(-1);
};
}
}


#!/bin/bash
Rscript <Local System Query Scripts Folder>/rmongodbqueryscript.R
python <Local System Query Scripts Folder>/pymongoqueryscript.py


#!/usr/bin/env Rscript
library(rmongodb)
# connect to MongoDB
mongo = mongo.create(host = "localhost")
mongo.is.connected(mongo)
mongo.get.databases(mongo)
mongo.get.database.collections(mongo, db="MLens")
# UserID counts
bson <-mongo.find.one(mongo, "MLens.UserID_Counts", query = '{"_id": 1}')
bson
bson1 <-mongo.find.one(mongo, "MLens.MovieID_Counts", query = '{"_id": 2}')
bson1
# Close connection
mongo.destroy(mongo)

#!/usr/bin/env python
import pymongo
from pymongo import MongoClient
# set up the environment
client = MongoClient()
# connect to the database
db=client.MLens
# UserID counts
collection = db.UserID_Counts
q = collection.find_one(1)
print(q)
# MovieID counts
collection= db.MovieID_Counts
r = collection.find_one(2)
print(r)



The next step is to export the java file into a jar file and save the jar in local system folder: <Local System jar File Folder>.

The next step is to run the application using the Spark-submit facility.

$ YOUR_SPARK_HOME/bin/spark-submit \
–class “SparkJavaMRJobAvgPipeApp” \
–master local[4] \
<Local System jar File Folder>/JavaSparkMRJobAvgPipeApp.jar <Local System Input Data Folder>/InputData.txt <Local System Output Data Folder>


This will generate a local system output file with the following contents excerpt, Spark SQL system output and NoSQL system output.
























MovieID ratings average (Java Spark Go Pipe Application)



In order to implement the MovieID ratings average MapReduce using the Java Spark Pipe facility and the DMRGo library (Word count configuration), the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Go mapper file: gomapper.sh
Local system Go reducer file: goreducer.sh
Local system jar file folder: <Local System jar File Folder>
Java Spark Pipe Go App jar: JavaSparkGoPipeApp.jar
Local system output data folder: <Local System Output Data Folder>
Java class: JavaSparkGoPipe
Local system query scripts folder: <Local System Query Scripts Folder>
Local system query scripts submits Bash file: JavaScriptsSubmits.sh
Local system rmongodb query script file: rmongodbqueryscript.R

Local system PyMongo query script file: pymongoqueryscript.py
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration
One can then create the following Java Spark Pipe application file.

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamreader;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkFiles;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
// Import factory methods provided by DataTypes
import org.apache.spark.sql.types.DataTypes;
// Import StructType and StructField
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructField;
// Import RowFactory
import org.apache.spark.sql.RowFactory;
import scala.Tuple2;
public class SparkJavaGoPipeApp {
public static void main(String[] args) throws Exception {
String inputFile = args[0];
String outputFolder = args[1];
// Create a Java Spark Context.
// Create a Java Spark SQLContext.
SparkConf conf = new SparkConf().setAppName("SparkJavaGoPipeApp");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
// load out input data.
JavaRDD<String> input = sc.textFile(inputFile);
// Pipe the data to external script
String mapperScript = "<Local System mapper Folder>/gomapper.sh";
String mapperScriptName = "gomapper.sh";
String reducerScript = "<Local System mapper Folder>/goreducer.sh";
String reducerScriptName = "goreducer.sh";
// Add the mapper
sc.addFile(mapperScript);
JavaRDD<String> pipeinputs = input;
JavaRDD<String> piperdd = pipeinputs.pipe(SparkFiles.get(mapperScriptName));
// Add the reducer
sc.addFile(reducerScript);
JavaPairRDD<String, Integer> sortpiperdd = piperdd.mapToPair(
new PairFunction <
String, // T <String, Integer>
String, // K key
Integer // V Integer
>(){
/**
*
*/
private static final long serialVersionUID = 1L;
public Tuple2<String, Integer> call (String x){
String[] record = x.split("\t");
String index = record[0];
Integer freq= new Integer(record[1]);
return new Tuple2<String,Integer>
(index,freq);
}
}).sortByKey().coalesce(2, true);
JavaRDD<String> pipeInputs2 = sortpiperdd.map(
new Function <Tuple2 <String, Integer>, String> () {
public String call(Tuple2<String, Integer> s) {
{return s._1() + "\t" +s._2();
}}});
JavaRDD<String> piperdd2 = pipeinputs2.pipe(SparkFiles.get(reducerScriptName));
piperdd2.saveAsTextFile(outputFolder);
// The schema is encoded in a string
String schemaString = "key freq";
// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<StructField>();
for (String fieldName: schemaString.split(" ")) {
fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true));
}
StructType schema = DataTypes.createStructType(fields);
// Convert records of the RDD (piperdd2) to Rows.
JavaRDD<Row> rowRDD = piperdd2.map(
new Function<String, Row>() {
/**
*
*/
private static final long serialVersionUID = 1L;
public Row call(String record) throws Exception {
String[] fields = record.split("\t");
return RowFactory.create(fields[0], fields[1].trim());
}
});
// Apply the schema to the RDD
DataFrame piperdd2DataFrame = sqlContext.createDataFrame(rowRDD, schema);
// Register the DataFrame as table.
piperdd2DataFrame.registerTempTable("piperdd2");
// SQL can be run over RDDs that have been registered as tables.
DataFrame results = sqlContext.sql("SELECT key, freq FROM piperdd2 WHERE key = '3;3.5'");
// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by ordinal.
List<String> names = results.javaRDD().map(new Function<Row, String>(){
/**
*
*/
private static final long serialversionUID = 1L;
public String call(Row row) {
return "MovieID: Counts: " + row.getString(0) + "\t " + row.getString(1);
}
}).collect();
for (String s: names) {
System.out.println(s);
}
// NoSQL Queries in RMongoDB and PyMongo
String x = null;
try {
// run the Unix "bash <Local System Query Scripts Folder>/JavaScriptsSubmit.sh" command
// using the Runtime exec method:
Process p = Runtime.getRuntime().exec("bash <Local System Query Scripts Folder>/JavaScriptsSubmit.sh");
BufferedReader stdInput = new BufferedReader(new
InputStreamReader(p.getInputStream()));
BufferedReader stdError = new BufferedReader(new
InputStreamReader(p.getErrorStream()));
// read the output from the command
System.out.println("Here is the standard output of the command:\n");
while ((x = stdInput.readLine()) != null) {
System.out.println(x);
}
// read any errors from the attempted command
System.out.println("Here is the standard error of the command (if any):\n");
while ((x = stdError.readLine()) != null) {
System.out.println(x);
}
System.exit(0);
}
catch (IOException e) {
System.out.println("exception happened - here's what I know: ");
e.printStackTrace();
System.exit(-1);
};
}
}

#!/bin/bash
Rscript <Local System Query Scripts Folder>/rmongodbqueryscript.R
python <Local System Query Scripts Folder>/pymongoqueryscript.py

#!/usr/bin/env Rscript
library(rmongodb)
# connect to MongoDB
mongo = mongo.create(host = "localhost")
mongo.is.connected(mongo)
mongo.get.databases(mongo)
mongo.get.database.collections(mongo, db="MLens")
# UserID counts
bson <-mongo.find.one(mongo, "MLens.UserID_Counts", query = '{"_id": 1}')
bson
bson1 <-mongo.find.one(mongo, "MLens.MovieID_Counts", query = '{"_id": 2}')
bson1
# Close connection
mongo.destroy(mongo)

#!/usr/bin/env python
import pymongo
from pymongo import MongoClient
# set up the environment
client = MongoClient()
# connect to the database
db=client.MLens
# UserID counts
collection = db.UserID_Counts
q = collection.find_one(1)
print(q)
# MovieID counts
collection= db.MovieID_Counts
r = collection.find_one(2)
print(r)

The next step is to export the java file into a jar and to save the jar file in local system folder: <Local System Java jar File Folder>.

The next step is to run the application using the Spark-submit facility.

$ YOUR_SPARK_HOME/bin/spark-submit \
–class "SparkJavaGoPipeApp" \
–master local[4] \
<Local System jar File Folder>/ JavaSparkGoPipeApp.jar <Local System Input Data Folder>/InputData.txt <Local System Output Data Folder>
view raw Java_Go_Submit hosted with ❤ by GitHub


This will generate the following Spark SQL system output, MongoDB-based NoSQL output and a local system file with following file contents excerpt.
























MovieID ratings average (Scala Perl Pipe Application)



In order to implement the MovieID ratings average MapReduce using the Scala Spark Pipe facility and the Hadoop::Streaming library (Word count configuration), the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Perl mapper file: map.pl
Local system Perl reducer file: reduce.pl
Local system jar file folder: <Local System jar File Folder>
Scala Spark Pipe App jar: ScalaPerlPipeApp.jar
Local system output data folder: <Local System Output Data Folder>
Scala package: scalapackage
Scala object: scalaobject
Local system query scripts folder: <Local System Query Scripts Folder>
Local system rmongodb query script file: rmongodbqueryscript.R

Local system PyMongo query script file: pymongoqueryscript.py
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration

The next step is to create the following Scala Spark Pipe application file.

package scalapackage
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
object scalaobject{
def main(args: Array[String]){
// Load up the contexts and the sqlContext implicits
val conf = new.SparkConf().setAppname("SparkScalaPerlPipeApp")
val sc =new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLCotext(sc)
// Check whether sufficient params were supplied
if (args.length < 2) {
println("Usage: ScalaWordCount ,input> <output>")
System.exit(1)
}
val rdd = sc.textFile(args(0))
val scriptPathm = "<Local System mapper Folder>/map.pl"
val scriptPathr = "<Local System reducer Folder>/reduce.pl"
val pipeRDD = rdd.pipe(scriptPathm).coalesce(1).sortBy[String]({a => a}, false).flatMap(line => line.split(“,“))
val pipe1RDD = pipeRDD.pipe(scriptPathr).flatMap(line =>line.split(","))
pipe1RDD.saveAsTextFile(args(1))
// Import Spark SQL implicits
import sqlContext.implicits._
// Spark SQL query
val bookmarks = pipe1RDD.map(_.split("\t")).map(p => (p(0), p(1).toFloat)).cache()
val bookmarks1 = bookmarks.toDF()
bookmarks1.show()
bookmarks1.describe.show()
bookmarks1.filter(bookmarks1("_2")>20).show()
bookmarks1.filter(bookmarks1("_1").equalTo("\"2;5\"").show()
// NoSQL query
import scala.sys.process._
// Next run query using rmongodb
val rmongodbquery =
"Rscript <Local System Query Scripts Folder>/rmongodbqueryscript.R".!!
print(rmongodbquery)
// Next run query using PyMongo
val pymongoquery =
"python <Local System Query Scripts Folder>/pymongoqueryscript.py".!!
print(pymongoquery)
//Stop Spark context
sc.stop
}
}

#!/usr/bin/env Rscript
library(rmongodb)
# connect to MongoDB
mongo = mongo.create(host = "localhost")
mongo.is.connected(mongo)
mongo.get.databases(mongo)
mongo.get.database.collections(mongo, db="MLens")
# UserID counts
bson <-mongo.find.one(mongo, "MLens.UserID_Counts", query = '{"_id": 1}')
bson
bson1 <-mongo.find.one(mongo, "MLens.MovieID_Counts", query = '{"_id": 2}')
bson1
# Close connection
mongo.destroy(mongo)

#!/usr/bin/env python
import pymongo
from pymongo import MongoClient
# set up the environment
client = MongoClient()
# connect to the database
db=client.MLens
# UserID counts
collection = db.UserID_Counts
q = collection.find_one(1)
print(q)
# MovieID counts
collection= db.MovieID_Counts
r = collection.find_one(2)
print(r)


The next step is to export the scala file into a jar and save the jar file in local system folder: <Local System jar File Folder>.

The next step is to run the application using the Spark-submit facility.

$ YOUR_SPARK_HOME/bin/spark-submit \
--class "scalapackage.scalaobject" \
--master local[4] \
<Local System jar File Folder>/ScalaPerlPipeApp.jar <Local System Input Data Folder>/InputData.txt <Local System Output Data Folder>


This will generate a local system output file with the following contents excerpt, Spark SQL system output and MongoDB-based NoSQL system output.







































MovieID ratings average (SparkR MRJob Application)



In order to implement the MapReduce using the SparkR Pipe facility and a MRJob word count application, the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system MRJob mapper file: mrjobwcmapper.sh
Local system MRJob reducer file: mrjobwcreducer.sh
Local system SparkR MRJob Pipe application folder: <Local System SparkR Application Folder>
SparkR MRJobPipe R application file: SparkRMRJobPipeApp.R
Local system output data folder: <Local System Output Data Folder>
Local system query scripts folder: <Local System Query Scripts Folder>

Local system PyMongo query script file: pymongoqueryscript.py
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration
The next step is to create the following SparkR Pipe application file (SparkRMRJobPipeApp.R) using methods outlined in this guide, this guide, this package reference, this package reference, this post, this post, the Spark 1.6.2 Quick Start, the SQL 1.6.2 Programming Guide, this post and the aggregation section of the MongoDB manual.

#!/usr/bin/env Rscript
args = commandArgs(trailingOnly=TRUE)
#load required packages and set up the contexts
library(SparkR)
library(magrittr)
library(HistogramTools)
library(rmongodb)
sc <- sparkR.init(master="local")
sqlContext <- sparkRSQL.init(sc)
# read in the data
lines_1 <- SparkR:::textFile(sc, args[1])
lines_2 <- SparkR:::textFile(sc, args[2])
lines_3 <- SparkR:::textFile(sc, args[3])
lines_4 <- SparkR:::textFile(sc, args[4])
rdd1<- SparkR:::flatMap(lines_1,function(line) { strsplit(line, " ")[[1]]})
rdd2<- SparkR:::flatMap(lines_2,function(line) { strsplit(line, " ")[[1]]})
rdd3<- SparkR:::flatMap(lines_3,function(line) { strsplit(line, " ")[[1]]})
rdd3<- SparkR:::flatMap(lines_4,function(line) { strsplit(line, " ")[[1]]})
# Pipe the RDD
mappipeArg<- c("<Local System mapper Folder>/mrjobwcmapper.sh")
rdd2_1 <-SparkR:::pipeRDD(rdd1_1,mappipeArg)
rdd2_2 <-SparkR:::pipeRDD(rdd1_2,mappipeArg)
rdd2_3 <-SparkR:::pipeRDD(rdd1_3,mappipeArg)
rdd2_4 <-SparkR:::pipeRDD(rdd1_3,mappipeArg)
# Sort by key
rdd3_1 <-SparkR:::sortByKey(rdd2_1)
rdd3_2 <-SparkR:::sortByKey(rdd2_2)
rdd3_3 <-SparkR:::sortByKey(rdd2_3)
rdd3_4 <-SparkR:::sortByKey(rdd2_4)
# Reduce by key
redpipeArg<- c("<Local System reducer Folder>/mrjobwcreducer.sh")
rdd4_1 <-cache(SparkR:::pipeRDD(rdd3_1,redpipeArg))
rdd4_2 <-cache(SparkR:::pipeRDD(rdd3_2,redpipeArg))
rdd4_3 <-cache(SparkR:::pipeRDD(rdd3_3,redpipeArg))
rdd4_4 <-cache(SparkR:::pipeRDD(rdd3_4,redpipeArg))
# Union RDD
rdd5_1<-SparkR:::unionRDD(rdd4_1,rdd4_2)
rdd5_2<-SparkR:::unionRDD(rdd4_3,rdd4_4)
rdd5_3<-SparkR:::unionRDD(rdd5_1,rdd5_2)
# Sort by key again
rdd5_4<-SparkR:::sortByKey(rdd5_3)
# Reduce by key
rdd6 <-SparkR:::coalesce(SparkR:::pipeRDD(rdd5_4,redpipeArg), 1L)
# Save as text file
SparkR:::saveAsTextFile(rdd6,"<Local System Output Data Folder>")
parseFields <-function(record) {
Sys.setlocale("LC_ALL", "C"); # necessary for strsplit() to work correctly
parts <- strsplit(record, "\t")[[1]];
list(parts[1],parts[2])
}
# SQL Context
parsedRDD <- SparkR:::lapply(rdd6, parseFields)
output <- collect(parsedRDD)
df <- createDataFrame(sqlContext, output)
head(df)
showDF(df)
showDF(df, numRows= 25)
subsetDF <- subset(df, df$"_1" %in% c('"\\"2;3.5\\""'), c(1,2))
subsetDF %>% showDF()
# connect to MongoDB
mongo = mongo.create(host ="localhost")
mongo.is.connected(mongo)
mongo.get.databases(mongo)
mongo.get.database.collections(mongo, db= "MLens")
#UserID Counts
bson<-mongo.find.one(mongo, "MLens.UserID_Counts", query= '{"_id": 1}')
bson
bson1<-mongo.find.one(mongo, "MLens.UserID_Counts", query= '{"_id": 2}')
bson1
# Close connection
mongo.destroy(mongo)
#Run the query using pymongo and system2
#command
command = "python"
#note the single + double quotes in the string (needed if paths have spaces)
path2script = '"<Local System Query Scripts folder>/pymongoqueryscript.py"'
output =system2(command,path2script,stdout=TRUE)
print(paste("UserID and MovieID system2 PyMongo Query is:", output))

#!/usr/bin/env python
import pymongo
from pymongo import MongoClient
# set up the environment
client = MongoClient()
# connect to the database
db=client.MLens
# UserID counts
collection = db.UserID_Counts
q = collection.find_one(1)
print(q)
# MovieID counts
collection= db.MovieID_Counts
r = collection.find_one(2)
print(r)


The next step is to save the R file in local system folder: <Local system SparkR application folder>.

The next step is to run the application using the Spark-submit facility

$ ./bin/spark-submit
<Local System SparkR Application Folder>/SparkRMRJobPipeApp.R \
<Local System Input Data Folder>/InputDataFile1.txt \
<Local System Input Data Folder>/InputDataFile2.txt \
<Local System Input Data Folder>/InputDataFile3.txt \
<Local System Input Data Folder>/InputDataFile4.txt


This will generate the following Spark SQL system output, MongoDB-based NoSQL system output and a local system output file with the following contents excerpt.

















































MovieID ratings average (Pig Hadoop-Wukong Streaming Script)



In order to implement the MovieID ratings MapReduce using the Pig Streaming facility and the Wukong-Hadoop library (Word count configuration), the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Wukong-Hadoop mapper file: wumapper.sh
Local system Wukong-Hadoop reducer file: wureducer.sh
Local system Pig Steam script file folder: <Local System Pig Script File Folder>
Pig Stream script file: PigWuStream.pig
HDFS output data folder: <HDFS Output Data Folder>

The next step is to create the following Pig script file.  

col = LOAD '<HDFS Input Data Folder>/InputData.txt' USING PigStorage() AS (id:chararray);
DEFINE wumapper `wumapper.sh` SHIP ('<Local System mapper Folder>/
wumapper.sh');
DEFINE wureducer `wureducer.sh` SHIP ('<Local System reducer Folder>/
wureducer.sh');
mcol = STREAM col THROUGH wumapper;
omcol = ORDER mcol by $0;
romcol = STREAM omcol THROUGH wureducer AS (key: chararray, count:float);
STORE romcol INTO '<HDFS Output Folder>';
view raw PigWuStream.pig hosted with ❤ by GitHub

The Pig script file can be saved in local system folder: <Local System Pig Script File Folder>. One can then run the Pig script in mapreduce mode from the command line using.

$ pig <Local System Pig Script File Folder>/PigWuStream.pig
view raw Pig_Wu_Submit hosted with ❤ by GitHub


This will generate an output file in HDFS with the following contents excerpt.

















MovieID ratings average (Python Flink Application)



In order to implement the MapReduce using a Python Flink application the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Python Flink Batch wordcount application file: WordCount.py
Local system Python Flink application folder: <Local System Python Flink Application Folder>
Local system output data file: OutputData.txt
Local system output data folder: <Local System Output Data Folder>

The next step is to create the following Python word count application (WordCount.py) using methods outlined in this guide, this post and this post.

import sys
from flink.plan.Environment import get_environment
from flink.functions.Flatmapfunction import FlatMapFunction
from flink.plan.Constants import WriteMode
from flink.functions.GroupReduceFunction \
import GroupReduceFunction
class Tokenizer(FlatMapFunction):
def flat_map(self, value, collector):
for word in value.lower() split():
collector.collect((1, word))
class Adder(GroupReduceFunction):
def reduce(self, iterator, collector):
count, word = iterator.next()
count += sum([x[0] for x in iterator])
collector.collect((count, word))
if __name__ == "__main__":
output_file = 'file://<Local System Output Data Folder>/OutputData.txt'
print('logging results to: %s' % (output_file, ))
env = get_environment()
data = env.read_text('file://<Local System Input Data Folder>/InputData.txt')
result = data \
.flat_map(Tokenizer()) \
.group_by(1) \
.reduce_group(Adder(), combinable=True)
result.write_text(output_file,write_mode=WriteMode.OVERWRITE)
env.execute(local=True)


The Python Flink wordcount application can be saved in local system folder: Local system Python Flink application folder: <Local System Python Flink Application Folder>.

One can then run the Python Flink program with the following command.

$ ./flink/build-target/bin/pyflink2.sh <Local System Python Flink Application Folder>/WordCount.py
view raw Python2FlinkRun hosted with ❤ by GitHub


This will generate a local system output file with the following contents excerpt.
















MovieID ratings average (Scala Flink Application)



In order to implement the MovieID ratings average MapReduce using a Scala Flink application, the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Scala Flink Batch wordcount application file: FlinkScalaApp.scala
Local system Scala Flink Batch application folder: <Local System Scala Flink Wordcount Application jar File Folder>
Local system output data file: OutputData.txt
Local system output data folder: <Local System Output Data Folder>
Scala package: scalapackage
Scala object: Wordcount
Scala Flink Batch wordcount application jar file: ScalaFlinkWordcountApplication.jar

The next step is to create the following Scala application file (FlinkScalaApp.scala) using methods outlined in the Flink DataSet API proramming guide and this post.

package scalapackage
import org.apache.flink.api.scala._
object Wordcount {
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment;
// get input data
val text = env.readTextFile("<Local System Input Data Folder>/InputData.txt");
val counts = text.flatmap(_.split(" ") filter {_.nonEmpty} }
.map{(_,1) }
.groupBy(0)
.sum(1);
counts.writeAsCsv("file://<Local System output Data Folder>/OutputData.txt","\n"," ");
env.execute("Scala WordCount Example");
}
}



Th e FlinkScalaApp.scala file can be exported into a jar file which can be saved in local system folder: Local system Scala Flink application folder: <Local System Scala  Flink Application jar File Folder>.

One can then run the Scala application with the following command.

$ ./bin/flink run --class scalapackage.Wordcount <Local System Scala Flink Application jar File Folder>/ScalaFlinkWordcountApplication.jar
view raw ScalaFlinkRun hosted with ❤ by GitHub


This will generate a local system output file with the following contents excerpt.


















Genre counts



Genre counts (SparkR Go Pipe Application)



In order to implement the Genre counts MapReduce using the SparkR application and a Go word count application, the following arrangements may be made.


Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Go mapper file: gomapper.sh
Local system Go reducer file: goreducer.sh
Local system SparkR Go Pipe application folder: <Local System SparkR Go Pipe Application Folder>
SparkR Go Pipe application file: SparkRGoPipeApp.R
Local system output data folder: <Local System Output Data Folder>
Local system query scripts folder: <Local System Query Scripts Folder>

Local system PyMongo query script file: pymongoqueryscript.py
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration
The next step is to create the following SparkR application file  (SparkRGoPipeApp.R).

#!/usr/bin/env Rscript
args = commandArgs(trailingOnly=TRUE)
#load required packages and set up the contexts
library(SparkR)
library(magrittr)
library(HistogramTools)
library(rmongodb)
sc <- sparkR.init(master="local")
sqlContext <- sparkRSQL.init(sc)
# read in the data
lines_1 <- SparkR:::textFile(sc, args[1])
lines_2 <- SparkR:::textFile(sc, args[2])
lines_3 <- SparkR:::textFile(sc, args[3])
lines_4 <- SparkR:::textFile(sc, args[4])
rdd1_1<- SparkR:::flatMap(lines_1,function(line) { strsplit(line, " ")[[1]]})
rdd1_2<- SparkR:::flatMap(lines_2,function(line) { strsplit(line, " ")[[1]]})
rdd1_3<- SparkR:::flatMap(lines_3,function(line) { strsplit(line, " ")[[1]]})
rdd1_4<- SparkR:::flatMap(lines_4,function(line) { strsplit(line, " ")[[1]]})
# Pipe RDD
mappipeArg<- c("<Local System mapper Folder>/gomapper.sh")
rdd2_1 <-SparkR:::pipeRDD(rdd1_1,mappipeArg)
rdd2_2 <-SparkR:::pipeRDD(rdd1_2,mappipeArg)
rdd2_3 <-SparkR:::pipeRDD(rdd1_3,mappipeArg)
rdd2_4 <-SparkR:::pipeRDD(rdd1_4,mappipeArg)
# Sort by key
rdd3_1 <-SparkR:::sortByKey(rdd2_1)
rdd3_2 <-SparkR:::sortByKey(rdd2_2)
rdd3_3 <-SparkR:::sortByKey(rdd2_3)
rdd3_4 <-SparkR:::sortByKey(rdd2_4)
# Reduce by key
redpipeArg<- c("<Local System reducer Folder>/goreducer.sh")
rdd4_1 <-cache(SparkR:::pipeRDD(rdd3_1,redpipeArg))
rdd4_2 <-cache(SparkR:::pipeRDD(rdd3_2,redpipeArg))
rdd4_3 <-cache(SparkR:::pipeRDD(rdd3_3,redpipeArg))
rdd4_4 <-cache(SparkR:::pipeRDD(rdd3_4,redpipeArg))
# Union RDD
rdd5_1<-SparkR:::unionRDD(rdd4_1,rdd4_2)
rdd5_2<-SparkR:::unionRDD(rdd4_3,rdd4_4)
rdd5_3<-SparkR:::unionRDD(rdd5_1,rdd5_2)
# Sort by key again
rdd5_4<-SparkR:::sortByKey(rdd5_3)
# Reduce by key again
rdd6 <-SparkR:::coalesce(SparkR:::pipeRDD(rdd5_4,redpipeArg), 1L)
# Save as text file
SparkR:::saveAsTextFile(rdd6,"<Local System Output Data Folder>")
parseFields <-function(record) {
Sys.setlocale("LC_ALL", "C"); # necessary for strsplit() to work correctly
parts <- strsplit(record, "\t")[[1]];
list(parts[1],parts[2])
}
# SQL Context
parsedRDD <- SparkR:::lapply(rdd6, parseFields)
output <- collect(parsedRDD)
df <- createDataFrame(sqlContext, output)
head(df)
showDF(df)
showDF(df, numRows= 25)
subsetDF <- subset(df, df$"_1" %in% c("\"Comedy\""), c(1,2))
subsetDF %>% showDF()
p1<-collect(select(df,df$"_2"))
p2<-as.numeric(p1[,1])
dev.new(width=6,height=5);
png("<Local System Histogram output Folder>/Hist1.png", bg="transparent");
h<-hist(p2, plot=FALSE);PlotRelativeFrequency(h, main="Histogram for Movie Genre Counts",xlab="Rating Counts",border="blue",col="moccasin");
dev.off();
# connect to MongoDB
mongo = mongo.create(host ="localhost")
mongo.is.connected(mongo)
mongo.get.databases(mongo)
mongo.get.database.collections(mongo, db= "MLens")
#UserID Counts
bson<-mongo.find.one(mongo, "MLens.UserID_Counts", query= '{"_id": 1}')
bson
bson1<-mongo.find.one(mongo, "MLens.UserID_Counts", query= '{"_id": 2}')
bson1
# Close connection
mongo.destroy(mongo)
#Run the query using pymongo and system2
#command
command = "python"
#note the single + double quotes in the string (needed if paths have spaces)
path2script = '"<Local System Query Scripts folder>/pymongoqueryscript.py"'
output =system2(command,path2script,stdout=TRUE)
print(paste("UserID and MovieID system2 PyMongo Query is:", output))

#!/usr/bin/env python
import pymongo
from pymongo import MongoClient
# set up the environment
client = MongoClient()
# connect to the database
db=client.MLens
# UserID counts
collection = db.UserID_Counts
q = collection.find_one(1)
print(q)
# MovieID counts
collection= db.MovieID_Counts
r = collection.find_one(2)
print(r)

The R file can be saved in local system folder: Local system SparkR Application folder: <Local System R file Folder>

One can then run the application using the Spark-submit facility.

$ ./bin/spark-submit
<Local System SparkR Application Folder>/<Local System SparkR Go Pipe Application Folder>/
SparkRGoPipeApp.R \
<Local System Input Data Folder>/InputData1.txt \
<Local System Input Data Folder>/InputData2.txt \
<Local System Input Data Folder>/InputData3.txt \
<Local System Input Data Folder>/InputData4.txt


This will generate the following Spark SQL system output, MongoDB-based NoSQL system output, Relative Frequency Histogram Plot and a local system output file with file contents excerpt.



















Genre counts (Scala Spark Wukong-Hadoop Pipe Application)



In order to implement the Genre counts MapReduce using the Scala Spark Pipe facility with the Wukong-Hadoop library, the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Wukong-Hadoop mapper file: wumapper.sh
Local system Wukong-Hadoop reducer file: wureducer.sh
Local system Scala Spark Wukong-HadoopPipe application folder: <Local System jar File Folder>
Scala Spark Wukong-Hadoop Pipe application jar file: ScalaSparkWukongHadoopPipeApp.jar
Scala package: scalapackage
Scala object: scalaobject
Local system output data folder: <Local System Output Data Folder>
Local system query scripts folder: <Local System Query Scripts Folder>
Local system rmongodb query script file: rmongodbqueryscript.R

Local system PyMongo query script file: pymongoqueryscript.py
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration

The next step is to create the following Scala Spark Wukong-Hadoop Pipe application scala file (ScalaSparkWuPipeApp.scala).

package scalapackage
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
object scalaobject{
def main(args: Array[String]){
// Load up the contexts and the sqlContext implicits
val conf = new.SparkConf().setAppname("SparkScalaWuPipeApp")
val sc =new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLCotext(sc)
// Check whether sufficient params were supplied
if (args.length < 2) {
println("Usage: ScalaWordCount ,input> <output>")
System.exit(1)
}
val rdd = sc.textFile(args(0))
val scriptPathm = "<Local System mapper Folder>/wumapper.sh"
val scriptPathr = "<Local System reducer Folder>/wureducer.sh"
val pipeRDD = rdd.pipe(scriptPathm).coalesce(1).sortBy[String]({a => a}, false).flatMap(line => line.split(“,“))
val pipe1RDD = pipeRDD.pipe(scriptPathr).flatMap(line =>line.split(","))
pipe1RDD.saveAsTextFile(args(1))
// Import Spark SQL implicits
import sqlContext.implicits._
// Spark SQL query
val bookmarks = pipe1RDD.map(_.split("\t")).map(p => (p(0), p(1).toFloat)).cache()
val bookmarks1 = bookmarks.toDF()
bookmarks1.show()
bookmarks1.describe.show()
bookmarks1.filter(bookmarks1("_2")>20).show()
bookmarks1.filter(bookmarks1("_1").equalTo("\"2;5\"").show()
// NoSQL query
import scala.sys.process._
// Next run query using rmongodb
val rmongodbquery =
"Rscript <Local System Query Scripts Folder>/rmongodbqueryscript.R".!!
print(rmongodbquery)
// Next run query using PyMongo
val pymongoquery =
"python <Local System Query Scripts Folder>/pymongoqueryscript.py".!!
print(pymongoquery)
//Stop Spark context
sc.stop
}
}

#!/usr/bin/env Rscript
library(rmongodb)
# connect to MongoDB
mongo = mongo.create(host = "localhost")
mongo.is.connected(mongo)
mongo.get.databases(mongo)
mongo.get.database.collections(mongo, db="MLens")
# UserID counts
bson <-mongo.find.one(mongo, "MLens.UserID_Counts", query = '{"_id": 1}')
bson
bson1 <-mongo.find.one(mongo, "MLens.MovieID_Counts", query = '{"_id": 2}')
bson1
# Close connection
mongo.destroy(mongo)

#!/usr/bin/env python
import pymongo
from pymongo import MongoClient
# set up the environment
client = MongoClient()
# connect to the database
db=client.MLens
# UserID counts
collection = db.UserID_Counts
q = collection.find_one(1)
print(q)
# MovieID counts
collection= db.MovieID_Counts
r = collection.find_one(2)
print(r)
The next step is to export the scala file into a jar file (ScalaSparWuPipeApp.jar) and to save the Scala Spark Wu application jar file in local system folder: <Local System jar File Folder>.

One can then run the application using the Spark-submit facility.

$ ./bin/spark-submit \
--class “scalapackage.scalaobject” \
--master local[4] \
<Local System jar File Folder>/ScalaWuPipeApp.jar <Local System Input Data Folder>/InputData.txt <Local System Output Data Folder>
view raw Scala_Wu_Submit hosted with ❤ by GitHub


This will generate the following Spark SQL system output, NoSQL system output and local system output file with file content excerpt.





































Genre counts (Pig Perl Hadoop::Streaming Streaming Script)




In order to implement the Genre counts MapReduce using the Pig Stream facility and the Perl Hadoop::Streaming library (Word count configuration), the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Perl Hadoop::Streaming mapper file: map.pl
Local system Perl Hadoop::Streaming reducer file: reduce.pl
Local system Pig Steam script file folder: <Local System Pig Script File Folder>
Pig Stream script file: PigPerlStream.pig
HDFS output data folder: <HDFS Output Data Folder>

The next step is to create the following Pig script file. 

col = LOAD '<HDFS Input Data Folder>/InputData.txt' USING PigStorage() AS (id:chararray);
DEFINE perlmapper `perlmapper.sh` SHIP ('<Local System mapper Folder>/
map.pl');
DEFINE perlreducer `perlreducer.sh` SHIP ('<Local System reducer Folder>/
reduce.pl');
mcol = STREAM col THROUGH perlmapper;
omcol = ORDER mcol by $0;
romcol = STREAM omcol THROUGH perlreducer AS (key: chararray, count:float);
STORE romcol INTO '<HDFS Output Folder>';

The next step is to save the Pig script file in local system folder: <Local System Pig Script File Folder> and run the Pig script in mapreduce mode from the command line using.

$ pig <Local System Pig Script File Folder>/PigPerlStream.pig
view raw Pig_Perl_Submit hosted with ❤ by GitHub


This will generate a local system output file with the following file content excerpt.








Genre ratings average




Genre ratings average (PySpark MRJob Pipe Application)




The Genre ratings average MapReduce can be implemented using a PySpark application and the MRJob library (Average configuration). In order to do this, the following arrangements can be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system MRJob mapper file: mrjobavgmapper.sh
Local system MRJob reducer file: mrjobavgreducer.sh
Local system PySpark MRJob Avg Pipe application file folder: <Local System PySpark MRJob Avg Pipe Application Folder>
PySpark MRJob Avg Pipe App file: PySparkMRJobAvgPipeApp.py
Local system output data folder: <Local System Output Data Folder>
Local system query scripts folder: <Local System Query Scripts Folder>
Local system rmongodb query script file: rmongodbqueryscript.R
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration
The next step is to create the following Python file (PySparkMRJobAvgPipeApp.py).

"""PysparkMRJobAvgPipeApp.py"""
import sys
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import Row
import pymongo
from pymongo import mongoClient
import subprocess
sc = SparkContext("local", "PySparkMRJobAvgPipeApp")
sqlContext = SQLContext(sc)
rdd = sc.textFile("<Local System Input Data Folder>/InputData.txt")
def preparem(line):
"""Each line contains fields separated by a tab."""
return '\t'.join(line.split('\t')) + '\n'
rdd1 = rdd.map(lambda s: preparem(s)).pipe('<Local System mapper Folder>/mrjobavgmapper.sh').map(lambda x: x.split("\t")).sortByKey(False).cache()
def preparer(line):
"""Each line contains fields separated by a tab."""
return '\t'.join(line) + '\n'
rdd2 = rdd1.map(lambda s: preparer(s)).pipe('<Local System reducer file Folder>/mrjobavgreducer.sh').map(lambda x: x.split("\t")).coalesce(1)
rdd2.saveAsTextFile("file://<Local System Output Data Folder>")
# Spark SQL query
Genre= Row('Genre', 'Freq')
rdd3 = rdd2.map(lambda x: (x[0],x[1])).map(lambda r: Genre(*r)) .cache()
df = sqlContext.createDataFrame(rdd3)
df.show(25)
df.describe("Freq").show()
df.where(df.Genre == '"\\"Comedy\\""').show()
# Generate the UserID MapReduce in MongoDB using rmongodb
#Define command and arguements
command = 'Rscript'
path2script = '<Local System Query Scripts Folder>/rmongodbqueryscript.R'
#Build subprocess command
cmd =[command, path2script]
# check_output will run the command and store the result
x = subprocess.check_output(cmd, universal_newlines=True)
print('UserID and MovieID Counts subprocess rmongodb Query is:')
print(x)

#!/usr/bin/env Rscript
library(rmongodb)
# connect to MongoDB
mongo = mongo.create(host = "localhost")
mongo.is.connected(mongo)
mongo.get.databases(mongo)
mongo.get.database.collections(mongo, db="MLens")
# UserID counts
bson <-mongo.find.one(mongo, "MLens.UserID_Counts", query = '{"_id": 1}')
bson
bson1 <-mongo.find.one(mongo, "MLens.MovieID_Counts", query = '{"_id": 2}')
bson1
# Close connection
mongo.destroy(mongo)


The PySpark MRJob Avg Pipe application file can be saved in local system folder: <Local System PySpark MRJob Avg Pipe Application Folder>.

One can then run the application using the Spark-submit facility.

$ ./bin/spark-submit \
--master local[4] \
<Local System PySpark MRJob Avg Pipe Application Folder>/PySparkMRJobAvgPipeApp.py


This will generate the following Spark SQL system output, MongoDB-based NoSQL system Output and  a local system output file with file contents excerpt.















































Genre ratings average (Scala Spark Perl Pipe Application)



In order to implement the MovieID ratings average MapReduce using the Scala Spark Pipe facility and the Perl Hadoop::Streaming library (Word count configuration), the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Perl mapper file: map.pl
Local system Perl reducer file: reduce.pl
Local system jar file folder: <Local System jar file Folder>
Scala Spark Pipe App jar: ScalaPerlPipeApp2.jar
Local system output data folder: <Local System Output Data Folder>
Scala package: scalapackage
Scala object: scalaobject
Local system query scripts folder: <Local System Query Scripts Folder>
Local system rmongodb query script file: rmongodbqueryscript.R

Local system PyMongo query script file: pymongoqueryscript.py
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration
The next step is to create the following Spark Perl Pipe application Scala file (SparkScalaPerlPipeApp2.scala).

package scalapackage
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
object scalaobject{
def main(args: Array[String]){
// Load up the contexts and the sqlContext implicits
val conf = new.SparkConf().setAppname("SparkScalaPerlPipeApp2")
val sc =new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLCotext(sc)
import sqlContext.implicits._
import org.apache.spark.sql.functions._
import scala.sys.process._
// Check whether sufficient params were supplied
if (args.length < 2) {
println("Usage: ScalaWordCount ,input> <output>")
System.exit(1)
}
val rdd = sc.textFile(args(0))
val scriptPathm = "<Local System mapper Folder>/map.pl"
val scriptPathr = "<Local System reducer Folder>/reduce.pl"
val pipeRDD = rdd.pipe(scriptPathm).coalesce(1).sortBy[String]({a => a}, false).flatMap(line => line.split(“,“))
val pipe1RDD = pipeRDD.pipe(scriptPathr).flatMap(line =>line.split(","))
pipe1RDD.saveAsTextFile(args(1))
// Import Spark SQL implicits
import sqlContext.implicits._
// Spark SQL query
val bookmarks = pipe1RDD.map(_.split("\t")).map(p => (p(0), p(1).toFloat)).cache()
val bookmarks1 = bookmarks.toDF()
bookmarks1.show()
bookmarks1.describe.show()
bookmarks1.filter(bookmarks1("_2")>20).show()
bookmarks1.filter(bookmarks1("_1").equalTo("\"Comedy;5\"").show()
// NoSQL query
import scala.sys.process._
// Next run query using rmongodb
val rmongodbquery =
"Rscript <Local System Query Scripts Folder>/rmongodbqueryscript.R".!!
print(rmongodbquery)
// Next run query using PyMongo
val pymongoquery =
"python <Local System Query Scripts Folder>/pymongoqueryscript.py".!!
print(pymongoquery)
//Stop Spark context
sc.stop
}
}

#!/usr/bin/env Rscript
library(rmongodb)
# connect to MongoDB
mongo = mongo.create(host = "localhost")
mongo.is.connected(mongo)
mongo.get.databases(mongo)
mongo.get.database.collections(mongo, db="MLens")
# UserID counts
bson <-mongo.find.one(mongo, "MLens.UserID_Counts", query = '{"_id": 1}')
bson
bson1 <-mongo.find.one(mongo, "MLens.MovieID_Counts", query = '{"_id": 2}')
bson1
# Close connection
mongo.destroy(mongo)

#!/usr/bin/env python
import pymongo
from pymongo import MongoClient
# set up the environment
client = MongoClient()
# connect to the database
db=client.MLens
# UserID counts
collection = db.UserID_Counts
q = collection.find_one(1)
print(q)
# MovieID counts
collection= db.MovieID_Counts
r = collection.find_one(2)
print(r)

The next step is to export the scala file into a jar and to save the jar in local system folder: <Local System jar file Folder>.

One can then run the application using the Spark-submit facility.

$ ./bin/spark-submit \
--class “scalapackage.scalaobject” \
--master local[4] \
<Local System jar File Folder>/ScalaPerlPipeApp2.jar <Local System Input Data Folder>/InputData.txt <Local System Output Data Folder>

This will generate the following Spark SQL system output, NoSQL system output and a local system output file with file contents excerpt.







































Genre ratings average (Java Spark Wukong-Hadoop Pipe Application)



In order to implement the Genre ratings average MapReduce using the Java Spark Pipe facility and the Wukong-Hadoop library, the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Wukong-Hadoop mapper file: wumapper.sh
Local system Wukong-Hadoop reducer file: wureducer.sh
Local system Java Spark Wukong-Hadoop Pipe application folder: <Local System jar File Folder>
Java Spark Wukong-Hadoop Pipe application java file: SparkJavaWuPipeApp.java
Java Spark Wukong-Hadoop Pipe class: SparkJavaWuPipeApp
Java Spark Wukong-Hadoop Pipe application jar file: JavaSparkWuPipeApp.jar
Local system output data folder: <Local System Output Data Folder>
Local system query scripts folder: <Local System Query Scripts Folder>
Local system query scripts submits Bash file: JavaScriptsSubmits.sh
Local system rmongodb query script file: rmongodbqueryscript.R
Local system PyMongo query script file: pymongoqueryscript.py
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration
The next step is to create the following Java Spark Wukong-Hadoop Pipe application file (JavaSparkWuPipeApp.java).

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamreader;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkFiles;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
// Import factory methods provided by DataTypes
import org.apache.spark.sql.types.DataTypes;
// Import StructType and StructField
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructField;
// Import RowFactory
import org.apache.spark.sql.RowFactory;
import scala.Tuple2;
public class SparkJavaWuPipeApp {
public static void main(String[] args) throws Exception {
String inputFile = args[0];
String outputFolder = args[1];
// Create a Java Spark Context.
// Create a Java Spark SQLContext.
SparkConf conf = new SparkConf().setAppName("SparkJavaWUPipeApp");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
// load out input data.
JavaRDD<String> input = sc.textFile(inputFile);
// Pipe the data to external script
String mapperScript = "<Local System mapper Folder>/wumapper.sh";
String mapperScriptName = "wumapper.sh";
String reducerScript = "<Local System reducer Folder>/wureducer.sh";
String reducerScriptName = "wureducer.sh";
// Add the mapper
sc.addFile(mapperScript);
JavaRDD<String> pipeinputs = input;
JavaRDD<String> piperdd = pipeinputs.pipe(SparkFiles.get(mapperScriptName));
// Add the reducer
sc.addFile(reducerScript);
JavaPairRDD<String, Integer> sortpiperdd = piperdd.mapToPair(
new PairFunction <
String, // T <String, Integer>
String, // K key
Integer // V Integer
>(){
/**
*
*/
private static final long serialVersionUID = 1L;
public Tuple2<String, Integer> call (String x){
String[] record = x;
String index = record;
Integer freq = 1;
return new Tuple2<String,Integer>
(index,freq);
}
}).sortByKey().coalesce(2, true);
JavaRDD<String> pipeInputs2 = sortpiperdd.map(
new Function <Tuple2 <String, Integer>, String> () {
public String call(Tuple2<String, Integer> s) {
{return s._1();
}}});
JavaRDD<String> piperdd2 = pipeinputs2.pipe(SparkFiles.get(reducerScriptName));
piperdd2.saveAsTextFile(outputFolder);
// The schema is encoded in a string
String schemaString = "key freq";
// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<StructField>();
for (String fieldName: schemaString.split(" ")) {
fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true));
}
StructType schema = DataTypes.createStructType(fields);
// Convert records of the RDD (piperdd2) to Rows.
JavaRDD<Row> rowRDD = piperdd2.map(
new Function<String, Row>() {
/**
*
*/
private static final long serialVersionUID = 1L;
public Row call(String record) throws Exception {
String[] fields = record.split("\t");
return RowFactory.create(fields[0], fields[1].trim());
}
});
// Apply the schema to the RDD
DataFrame piperdd2DataFrame = sqlContext.createDataFrame(rowRDD, schema);
// Register the DataFrame as table.
piperdd2DataFrame.registerTempTable("piperdd2");
// SQL can be run over RDDs that have been registered as tables.
DataFrame results = sqlContext.sql("SELECT key, freq FROM piperdd2 WHERE key = '\"Comedy;3.5\"'");
// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by ordinal.
List<String> names = results.javaRDD().map(new Function<Row, String>(){
/**
*
*/
private static final long serialVersionUID = 1L;
public String call(Row row) {
return "Genre: Counts: " + row.getString(0) + "\t " + row.getString(1);
}
}).collect();
for (String s: names) {
System.out.println(s);
}
// NoSQL Queries in RMongoDB and PyMongo
String x = null;
try {
// run the Unix "bash <Local System Query Folder>/JavaScriptsSubmit.sh" command
// using the Runtime exec method:
Process p = Runtime.getRuntime().exec("bash <Local System Query Scripts Folder>/JavaScriptsSubmit.sh");
BufferedReader stdInput = new BufferedReader(new
InputStreamReader(p.getInputStream()));
BufferedReader stdError = new BufferedReader(new
InputStreamReader(p.getErrorStream()));
// read the output from the command
System.out.println("Here is the standard output of the command:\n");
while ((x = stdInput.readLine()) != null) {
System.out.println(x);
}
// read any errors from the attempted command
System.out.println("Here is the standard error of the command (if any):\n");
while ((x = stdError.readLine()) != null) {
System.out.println(x);
}
System.exit(0);
}
catch (IOException e) {
System.out.println("exception happened - here's what I know: ");
e.printStackTrace();
System.exit(-1);
};
}
}

#!/bin/bash
Rscript <Local System Query Scripts Folder>/rmongodbqueryscript.R
python <Local System Query Scripts Folder>/pymongoqueryscript.py

#!/usr/bin/env Rscript
library(rmongodb)
# connect to MongoDB
mongo = mongo.create(host = "localhost")
mongo.is.connected(mongo)
mongo.get.databases(mongo)
mongo.get.database.collections(mongo, db="MLens")
# UserID counts
bson <-mongo.find.one(mongo, "MLens.UserID_Counts", query = '{"_id": 1}')
bson
bson1 <-mongo.find.one(mongo, "MLens.MovieID_Counts", query = '{"_id": 2}')
bson1
# Close connection
mongo.destroy(mongo)

#!/usr/bin/env python
import pymongo
from pymongo import MongoClient
# set up the environment
client = MongoClient()
# connect to the database
db=client.MLens
# UserID counts
collection = db.UserID_Counts
q = collection.find_one(1)
print(q)
# MovieID counts
collection= db.MovieID_Counts
r = collection.find_one(2)
print(r)


The next step is to export the file into a jar file and save the jar file in local system folder: <Local system Java Spark Wu Pipe Application Folder>

One can then run the application using the Spark-submit facility.

$ ./bin/spark-submit \
--master local[4] \
--class “SparkJavaWuPipeApp” \
<Local System jar File Folder>/JavaSparkWuPipeApp.jar <Local System Input Data Folder>/InputData.txt <Local System Output Data Folder>
view raw Java_Wu_Submit hosted with ❤ by GitHub


This will generate a local system file with the following file contents excerpt, Spark SQL system output and MongoDB-based NoSQL system output.



























Genre average (PySpark Go Pipe Application)




In order to implement the Genre average MapReduce using the PySpark Pipe facility, the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system Go mapper file: gomapper.sh
Local system Go reducer file: goreducer.sh
Local system PySpark Go Pipe application folder: <Local System PySpark Go Pipe Application Folder>
PySpark Go Pipe application file: PySparkGoPipeApp.py
Local system output data folder: <Local System Output Data Folder>
Local system query scripts folder: <Local System Query Scripts Folder>
Local system rmongodb query script file: rmongodbqueryscript.R
MongoDB: Have an instance of MongoDB running with the arrangements outlined in the MongoDB part of the illustration

The next step is to create the following PySpark Go application file (PySparkMRJobPipeApp.py).

"""PysparkGoMRJobPipeApp.py"""
import sys
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import Row
import subprocess
sc = SparkContext("local", "PySparkGoPipeApp")
sqlContext = SQLContext(sc)
rdd = sc.textFile("<Local System Input Data Folder>/InputData.txt")
def preparem(line):
"""Each line contains fields separated by a space."""
return ' '.join(line.split(' ')) + '\n'
rdd1 = rdd.map(lambda s: preparem(s)).pipe('<Local System mapper Folder>/gomapper.sh').map(lambda x: x.split("\t")).sortByKey(False).cache()
def preparer(line):
"""Each line contains fields separated by a tab."""
return '\t'.join(line) + '\n'
rdd2 = rdd1.map(lambda s: preparer(s)).pipe('<Local System reducer Folder>/goreducer.sh').map(lambda x: x.split("\t"))
rdd2.saveAsTextFile("file://<Local System Output Data Folder>")
# Spark SQL query
GCat= Row('GCat', 'Freq')
rdd3 = rdd2.map(lambda x: (x[0],x[1])).map(lambda r: GCat(*r)).cache()
df = sqlContext.createDataFrame(rdd3)
df.show(25)
df.describe("Freq").show()
df.filter("GCat == '\"Comedy;3.5\"'").show()
# NoSQL queries in MongoDB using rmongodb
#Define command and arguements
command = 'Rscript'
path2script = '<Local System Query Scripts Folder>/rmongodbqueryscript.R'
#Build subprocess command
cmd =[command, path2script]
# check_output will run the command and store the result
x = subprocess.check_output(cmd, universal_newlines=True)
print('UserID and MovieID subprocess rmongodb Query is:')
print(x)

#!/usr/bin/env Rscript
library(rmongodb)
# connect to MongoDB
mongo = mongo.create(host = "localhost")
mongo.is.connected(mongo)
mongo.get.databases(mongo)
mongo.get.database.collections(mongo, db="MLens")
# UserID counts
bson <-mongo.find.one(mongo, "MLens.UserID_Counts", query = '{"_id": 1}')
bson
bson1 <-mongo.find.one(mongo, "MLens.MovieID_Counts", query = '{"_id": 2}')
bson1
# Close connection
mongo.destroy(mongo)


The next step is to save the PySpark Go application file in local system folder: <Local System PySpark Go Pipe Application Folder>

One can then run the application using the Spark-submit facility.

$ ./bin/spark-submit \
--master local[4] \
<Local System PySpark Go Pipe Application Folder>/PySparkGoPipeApp.py

This will generate the following Spark SQL system output, MongoDB-based NoSQL system output and a local system output file with file contents excerpt.




















































Genre ratings average (MRJob Hadoop Streaming)



In order to implement the Genre rating average MapReduce using Hadoop Streaming and the MRJob library (word count configuration) the following arrangements may be made.

Input data: InputData.txt
Hadoop Distributed File System (HDFS) input data folder: < HDFS Input Data Folder>
Local system mapper folder: <Local System mapper Folder>
Local system reducer folder: <Local System reducer Folder>
Local system MRJob mapper file: mrjobwcmapper.sh
Local system MRJob reducer folder: mrjobwcreducer.sh
Local system Hadoop Streaming jar file folder: <Local System Hadoop Streaming jar File Folder>
Hadoop Streaming jar file:  hadoop-streaming-2.6.0.jar
HDFS output data folder: <HDFS Output Data Folder>

The next step is to save the following Bash Hadoop Streaming submit file in local system folder: <Local System Bash Hadoop Streaming Submit File Folder>.

#!/bin/bash
hadoop jar <Local System Hadoop Streaming jar Folder>/hadoop-streamig-2.6.0.jar \
-file $1 -mapper $1 -file $2 -reducer $2 -partitioner \
org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner -input $3 -output $4


The next step is to run the following command on Ubuntu Server 14.04.5 LTS.

bash <Local System Bash Hadoop Streaming Submit File Folder>/HadoopStreamingSubmit.sh \
<Local System mapper Folder>/mrjobwcmapper.sh \
<Local System reducer Folder>/mrjobwcreducer.sh \
<Local System Input Data Folder>/InputData.txt \
<HDFS Output Folder>


This will generate an output file in HDFS with the following contents excerpt.



















Genre ratings average (Python Flink Application)




In order to implement the MapReduce using a Python Flink application, the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Python Flink Batch wordcount application file:  WordCount.py
Local system python Flink application folder: <Local System Python Flink Application Folder>
Local system output data file: OutputData.txt
Local system output data folder: <Local System Output Data Folder>

The next step is to create the following Python Flink application file (WordCount.py).

import sys
from flink.plan.Environment import get_environment
from flink.functions.Flatmapfunction import FlatMapFunction
from flink.plan.Constants import WriteMode
from flink.functions.GroupReduceFunction \
import GroupReduceFunction
class Tokenizer(FlatMapFunction):
def flat_map(self, value, collector):
for word in value.lower() split():
collector.collect((1, word))
class Adder(GroupReduceFunction):
def reduce(self, iterator, collector):
count, word = iterator.next()
count += sum([x[0] for x in iterator])
collector.collect((count, word))
if __name__ == "__main__":
output_file = 'file://<Local System Output Data Folder>/OutputData.txt'
print('logging results to: %s' % (output_file, ))
env = get_environment()
data = env.read_text('file://<Local System Input Data Folder>/InputData.txt')
result = data \
.flat_map(Tokenizer()) \
.group_by(1) \
.reduce_group(Adder(), combinable=True)
result.write_text(output_file,write_mode=WriteMode.OVERWRITE)
env.execute(local=True)

The Python Flink wordcount application file can then be saved in local system folder: Local system Python Flink application folder: <Local System Python Flink Application Folder>.

One can then run the Python Flink application with the following command.

$ ./flink/build-target/bin/pyflink2.sh <Local System Python Flink Application Folder>/WordCount.py
view raw Python2FlinkRun hosted with ❤ by GitHub


This will create a local system output file with the following file contents excerpt.

















Genre ratings average (Scala Flink Application)



In order to implement the Genre ratings average MapReduce using a Scala Flink application, the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Scala Flink Batch wordcount application file:  WordCount.py
Local system Scala Flink Batch application folder: <Local System Scala Flink Application Folder>
Local system output data file: OutputData.txt
Local system output data folder: <Local System Output Data Folder>
Scala Flink Batch wordcount application jar file: ScalaFlinkWordcountApplication.jar


The next step is to create the following Scala Flink application file (WordCount.scala).

package scalapackage
import org.apache.flink.api.scala._
object Wordcount {
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment;
// get input data
val text = env.readTextFile("<Local System Input Data Folder>/InputData.txt");
val counts = text.flatmap(_.split(" ") filter {_.nonEmpty} }
.map{(_,1) }
.groupBy(0)
.sum(1);
counts.writeAsCsv("file://<Local System output Data Folder>/OutputData.txt","\n"," ");
env.execute("Scala WordCount Example");
}
}


The next step is to export the Wordcount.scala file into a jar file and to save the Scala Flink word count jar file in local system folder:  <Local System Scala  Flink Application jar File Folder>.

One can then run the Scala Flink application with the following command.

$ ./bin/flink run --class scalapackage.Wordcount <Local System Scala Flink Application jar File Folder>/ScalaFlinkWordcountApplication.jar
view raw ScalaFlinkRun hosted with ❤ by GitHub


This will generate a local system output file with the following file contents excerpt.



















4. Data query and data interaction


The processed data can be queried interactively using ShinyMongo and Shiny. ShinyMongo can be used to create web applications that include an interface that can be used by the user to query the data in the MongoDB illustrations (generated in JavaScript and PyMongo) interactively using JSON/BSON syntax.


Shiny can be used to create Shiny Applications that generate web based interactive histograms for the UserID, MovieID, Rating Score  and Genrecounts.



ShinyMongo Application


The tutorial and the R scripts (server.R and ui.R) on how to generate interactive queries on data in MongoDB using ShinyMongo can be found in this gist. The method that can be used to create the applications from the ShinyMongo gist can be found in this Shiny tutorial

The application will generate the following web based interface, output and JSON/BSON queries for the MongoDB data.





















































































Shiny Applications 


In order to generate a Shiny application that generates an interactive web based histogram for the UserID_Counts variable the following arrangements may be made.

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Shiny library: Install Shiny package in R

The next step is to create the following two files and save them in a local system folder as outlined in this tutorial.

#!/usr/bin/env Rscript
library(shiny)
data<-read.table(<Local System Input Data Folder>/InputData.txt",sep="\t")
UserID_Counts<-data$V2
#Define server logic required to draw a histogram
shinyServer(function(input, output) {
# expression that generates a histogram. The expression is
# wrapped in a call to renderplot to indicate that:
#
# 1) It is "reactive" and threfore should re-execute automatically
# when inputs change
# 2) Its output is a plot
output$distPlot <- renderplot({
x <- UserID_Counts # UserID data
bins <- seq(min(x), max(x), length.out = input$bins + 1)
# draw the histogram with the specified number of bins
hist(UserID_Counts, breaks = bins, col = 'blue', border ='white')
})
})
view raw server.R hosted with ❤ by GitHub

#!/usr/bin/env Rscript
library(shiny)
#Define UI for application that draws a histogram
shinyUI(fluidPage(
# Application title
titlePanel("Hello Shiny!"),
# Sidebar with a slider input for the number of bins
sidebarLayout(
sidebarPanel(
sliderInput("bins",
"number of bins:",
min = 1,
max = 50,
value = 30)
),
# Show a plot of the generated distribution
mainPanel(
plotOutput("distPlot")
)
)
))
view raw ui.R hosted with ❤ by GitHub


The next step is to run the application using the runApp() command and it will result in the following web based interface.


The output for 30 Histogram bins is as follows.















The output for 15 bins is as follows.













The applications for the MovieID, Rating score and Genre counts variables can be generated analogously to the case of the UserID counts variable.


The web-based output for the MovieID counts variable and 30 bins is as follows.















The web-based output for the MovieID counts variable and 15 bins is as follows.













The web-based output for the Rating score counts variable and 30 bins is as follows.














The web-based output for the Rating score counts variable and 10 bins is as follows.















The web-based output for the Genre counts variable and 30 bins is as follows.














The web-based output for the Genre counts variable and 15 bins is as follows.














5. Summarize the data



Summary statistics


The summary statistics can be generated in R using the stat.desc() function from the pastecs package.


UserID variable counts










MovieID variable counts










Rating score counts









Genre variable counts












Histograms




The histograms can be generated in Relative Frequency form using the PlotRelativeFrequency() function from the HistogramTools package.


UserID variable counts




In order to generate the Relative Frequency Histogram for the UserID_Counts variable one can use the following R command sequence.

library(HistogramTools)
dev.new(width=6,height=5); h<-hist(UserID_Counts, plot=FALSE);PlotRelativeFrequency(h,
main="Relative Frequency Histogram for UserID Counts",
xlab="Rating Counts",
border="blue",
col="moccasin");
# Switch off graphics device
dev.off();









This will generate the following histogram for the UserID counts.

























The other variables can be generated analogously.



MovieID variable counts



























Rating score variable counts




























Genre variable counts



























The next step is to generate summary measures of the metrics of the data generated during the data processing in section 3 using the SAS software Proc SGPlot statement.


6. Analyze the data


The summary measures of the metrics can be depicted graphically using the SAS software PROC SGPlot statement. The bar graph categories can also be further analysed using Bash script grep decompositions in Ubuntu Server 14.04.5 LTS and Hadoop.



UserID counts


In order to generate the bar graph for the UserID variable one can first create a Microsoft Excel file with a UserID column and a Counts column. The two columns can be named UserID for the user-ids and Counts for the user-id counts. 

The file can then be imported into a work.import dataset in the SAS software using the Proc Import statement. The following program can be prepared using the method outlined in this post and then be run in the SAS software.



Data UserID_counts;
set work.import;
run;
proc sort data= UserID_counts out=UserID_counts_sort;
by DESCENDING Counts ;
run;
data UserID_20counts_sort;
set UserID_counts_sort (firstobs=1 obs=20);
run;
title 'Top 20 UserID Rating Counts';
proc sgplot data=UserID_20counts_sort;
vbar UserID/ response=Counts dataskin=gloss datalabel
categoryorder=respasc nostatlabel;
xaxis grid display=(nolabel);
yaxis grid discreteorder=data display=(nolabel);
run;
view raw SAS_PROC_SGPlot hosted with ❤ by GitHub

This will generate the following plot.




















The plots for the other variables can be generated analogously to the case for the UserID variable counts.



Rating score counts




















The Rating score counts can be used to calculate the weighted mean and weighted variance of the ratings. 
In the SAS software this can be done with the PROC MEANS statement with the counts as the weights.

In the SAS software, one can run the following program with the values column named Var and the counts/weights column named Counts in a work.import dataset.

proc means data=import sum sumwgt mean var stddev
vardef= weight;
weight Counts;
var Var;
run;



This will result in the following output.









In R, one can use the Weighted.Desc.Stat package and the following functions.

The application of these functions to the data will lead to the following output.


















The following illustrates how one can perform the calculation for the weighted (sample) mean and weighted (sample) variance using vector calculus in R.
















MovieID counts 





















MovieID Counts decomposition



Ubuntu 14.04.5 Server LTS grep


In order to generate a grep decomposition for the rating counts of MovieID 296, one can take the counts from one of the files (treating one output file partitions as one file) of the FusedMovieID-Rating MapReduce in section three (i.e. output files from one of Java Spark Pipe Go word count, Scala Spark Pipe Perl word count, SparkR Pipe MRJob word count, Pig Wu word count, Python Flink word count or Scala Flink word count) and treat it as an input file for the grep decomposition. If one selects, say, the Pig Wu Streaming output file as the input dataset then one may make the following arrangements/selections.

Input data: InputData.txt
Local system Bash script folder: <Bash Decomposition Folder>
Local system input data folder: <Local System Input Data Folder>
Local system MovieID counts decomposition Bash file: BashMovieIDCountsDecomp.sh
Local system Bash decomposition file folder: <Bash Decomposition Folder>

The next step is to create the following Bash file.

#!/bin/bash
grep -E '"296;' <Local System Input Data Folder>/InputData.txt



The next step is to save the file in local system folder: <Bash Decomposition Folder> and run the following command in Ubuntu Server 14.04.5 LTS.

$ bash <Bash Decomposition Folder>/BashMovieIDCountsDecomp.sh


This will generate the following Bash system output.










Hadoop grep

The equivalent output can be generated in Hadoop using the grep class from the hadoop-mapreduce-examples-2.6.0.jar and the FusedMovieID-Rating.txt file. 

In order to run the Hadoop grep decomposition of the rating counts for MovieID 296 in Hadoop the following arrangements/selections may be made.

Input data (i.e. Fused MovieID-Rating text file): InputData.txt
HDFS input data folder: <HDFS Input Data Folder>
HDFS output data folder: <HDFS output Folder>
Local system MovieID counts Hadoop decomposition Bash file:  HadoopMovieIDCountsDecomp.sh
Local system Hadoop Bash decomposition file folder: <Bash Hadoop Decomposition Folder>
Local system Hadoop MapReduce examples jar file Folder: <Local System Hadoop mapreduce examples jar file Folder>
Hadoop MapReduce examples jar file: hadoop-mapreduce-examples-2.6.0.jar

The next step is to create the following Bash file.

#!/bin/bash
hadoop jar <Local System Hadoop mapreduce examples jar file Folder>/hadoop-mapreduce-examples-2.6.0.jar grep $1 $2 '^296;5|^296;4.5|^296;3.5|^296;3|^296;2.5|^296;2|^296;1.5|^296;1'

The next step is to save the file in local system folder: <Bash Hadoop Decomposition Folder>

One can then run the following command in Ubuntu Server 14.04.5 LTS.

$ bash <Bash Hadoop Decomposition Folder>/HadoopMovieIDCountsDecomp.sh \
<HDFS Input Data Folder>/InputData.txt \
<HDFS Output Folder>


This will generate an output file in HDFS with the following contents.










MovieID Rating averages






















MovieID Rating averages decomposition



In the case of the averages decomposition, it is important to note that the decomposition of the rating counts also allows one to calculate the average rating for a MovieID. This means that one can retain the structure of the Bash and Hadoop decomposition scripts for the averages decompositions.



Ubuntu Server 14.04.5 LTS grep


In order to generate a grep decomposition of the average for MovieID 33264, one can take the counts from one of the files  (treating one output file partitions as one file) from the Fused MovieID-Rating MapReduce in section three (i.e. output files from one of Java Spark Pipe Go word count, Scala Spark Pipe Perl word count, SparkR Pipe MRJob word count, Pig Wu word count, Python Flink word count or Scala Flink word count) and treat it as an input file for the grep decomposition. 

If one selects say the Pig Perl Streaming output as the input dataset then one may make the following arrangements/selections.

Input data: InputData.txt
Local system Bash script folder: <Bash Decomposition Folder>
Local system input data folder: <Local System Input Data Folder>
Local system MovieID counts decomposition Bash file: BashMovieIDAvgDecomp.sh
Local system Bash decomposition file folder: <Bash Decomposition Folder>

The next step is to create the following Bash file.

#!/bin/bash
grep -E '"33264;' <Local System Input Data Folder>/InputData.txt





The next step is to save the file in local system folder: <Bash Decomposition Folder>
and run the following command in Ubuntu Server 14.04.5 LTS.

$ bash <Bash Decomposition Folder>/BashMovieIDAvgDecomp.sh


This will generate the following output.





Hadoop grep


In order to run the Hadoop grep decomposition for the average rating of MovieID 33264, the following arrangements may be made.


Input data (i.e. FusedMovieID-Rating text file): InputData.txt
HDFS input data folder: <HDFS Input Data Folder>

HDFS output data folder: <HDFS output Folder>
Local system MovieID average Hadoop decomposition Bash file:  BashMAvgHadoopDecomp.sh
Local system Hadoop Bash decomposition file folder: <Bash Hadoop Decomposition Folder>
Local system Hadoop MapReduce examples jar file Folder: <Local System Hadoop mapreduce examples jar file Folder>

Hadoop MapReduce examples jar file: hadoop-mapreduce-examples-2.6.0.jar

The next step is to create the following Bash file.

#!/bin/bash
hadoop jar <Local System Hadoop mapreduce examples jar file Folder>/hadoop-mapreduce-examples-2.6.0.jar grep $1 $2 '^33264;5|^33264;4.5|^33264;3.5|^33264;3|^33264;2.5|^33264;2|^33264;1.5|^33264;1'

The next step is to save the file in local system folder: <Bash Hadoop Decomposition Folder> and run the following command in Ubuntu Server 14.04.5 LTS.

$ bash <Bash Hadoop Decomposition Folder>/BashMAvgHadoopDecomp.sh \
<HDFS Input Data Folder>/InputData.txt \
<HDFS output Folder>


This will generate an output file in HDFS with the following contents.





MovieID Genre counts
























MovieID Genre counts decomposition




The Genre decompositions can be generated analogously to those of the MovieID ratings and MovieID ratings averages, using the output of the Fused Genre-Rating counts MapReduces for Ubuntu Server and the Fused Genre-Rating file for Hadoop.



Ubuntu Server 14.04.5 LTS grep


In order to generate a grep decomposition of Genre Drama, one can take the counts from one of the output files (treating one output file partitions as one file) in the Fused Genre-Rating MapReduces in section three (i.e. output files from one of Scala Spark Pipe Perl word count, Java Spark Pipe Wu word count, PySpark Pipe MRJob word count, Hadoop MRJob word count, Python Flink word count or Scala Flink word count) and treat it as an input file for the grep decomposition. 

If one selects say the Spark Java Wu output files and binds them into a single file (adding the rows of the second file below the rows of the first one). If one uses the resulting file as the input dataset then one can make the following arrangements/selections:

Input data: InputData.txt
Local system Bash script folder: <Bash Decomposition Folder>
Local system input data folder: <Local System Input Data Folder>
Local system Genre counts decomposition Bash file: BashCountsDramaDecomp.sh
Local system Bash decomposition file folder: <Bash Decomposition Folder

The next step is to create the following Bash file.

#!/bin/bash
grep -E '"Drama;' <Local System Input Data Folder>/InputData.txt


The next step is to save the file in local system folder: <Bash Decomposition Folder> and to run the following command in Ubuntu Server 14.04.5 LTS.

$ bash <Bash Decomposition Folder>/BashCountsDramaDecomp.sh


This will generate the following output.


















Hadoop grep



In order to run the Hadoop grep decomposition of the Drama Genre ratings the following arrangements may be made.


Input data (i.e. Fused Genre-Rating text file): InputData.txt
HDFS input data folder: <HDFS Input Data Folder>

Local system Genre counts Hadoop decomposition Bash file:  BashGenreDramaHadoopDecomp.sh
Local system Hadoop Bash decomposition file folder: <Bash Hadoop Decomposition Folder>
Local system Hadoop MapReduce examples jar file Folder: <Local System Hadoop mapreduce examples jar file Folder>

Hadoop MapReduce examples jar file: hadoop-mapreduce-examples-2.6.0.jar

The next step is to create the following Bash file.

#!/bin/bash
hadoop jar <Local System Hadoop mapreduce examples jar file Folder>/hadoop-mapreduce-examples-2.6.0.jar grep $1 $2 '^Drama;5|^Drama;4.5|^Drama;3.5|^Drama;3|^Drama;2.5|^Drama;2|^Drama;1.5|^Drama;1'


The next step is to save the file in local system folder: <Bash Hadoop Decomposition Folder and run the following command in Ubuntu Server 14.04.5 LTS.

$ bash <Bash Hadoop Decomposition Folder>/BashGenreDramaHadoopDecomp.sh \
<HDFS Input Data Folder>/InputData.txt \
<HDFS output Folder>



This will generate an output file in HDFS with the following contents.














MovieID Genre counts























MovieID Genre averages decomposition



Ubuntu Server 14.04.5 LTS grep



In order to generate a grep decomposition of rating average for the Genre Animation|IMAX|Sci-Fi, one can take the counts from one of the output files (treating one output file partitions as one file) from the Fused Genre-Rating MapReduces in section three (i.e. output files from one of Scala Spark Pipe Perl word count, Java Spark Pipe Wu word count, PySpark Pipe MRJob word count, Hadoop MRJob word count, Python Flink word count or Scala Flink word count) and treat it as an input file for the grep decomposition.


 If one selects, say, the Spark Java Wu output files and binds them into a single file (i.e. adding the rows of the second file below the rows of the first file) and uses the resulting file as an input dataset for the MovieID Genre averages Ubuntu grep decomposition. 

One can may make the following arrangements/selections:

Input data: InputData.txt
Local system input data folder: <Local System Input Data Folder>
Local system Bash script folder: <Bash Decomposition Folder>
Local system Genre average decomposition bash file: BashGenreAvgDecomp.sh
Local system Bash decomposition file folder: <Bash Decomposition Folder


The next step is to create the following Bash file.

#!/bin/bash
grep -E '"Animation\|IMAX\Sci-Fi;' <Local System Input Data Folder>/InputData.txt

The next step is to save the file in local system folder: <Bash Decomposition Folder> and run the following command in Ubuntu Server 14.04.5 LTS.

$ bash <Bash Decomposition Folder>/BashGenreAvgDecomp.sh


This will generate the following output.








Hadoop grep



In order to run the Hadoop grep decomposition of the average rating for the Animation|IMAX|Sci-Fi Genre, the following arrangements may be made.

Input data (i.e. FusedGenre text file): InputData.txt
HDFS input data folder: <HDFS Input Data Folder>

HDFS output data folder: <HDFS output Folder>
Local system Genre average Hadoop decomposition bash file:  BashGenreAvgHadoopDecomp.sh
Local system Hadoop Bash decomposition file folder: <Bash Hadoop Decomposition Folder>
Local system Hadoop MapReduce examples jar file Folder: <Local System Hadoop mapreduce examples jar file Folder>
Hadoop MapReduce examples jar file: hadoop-mapreduce-examples-2.6.0.jar

The next step is to create the following Bash file.

#!/bin/bash
hadoop jar <Local System Hadoop mapreduce examples jar file Folder>/hadoop-mapreduce-examples-2.6.0.jar grep $1 $2 '^Animation\|IMAX\|Sci-Fi;5|^Animation\|IMAX\|Sci-Fi;4.5|^Animation\|IMAX\|Sci-Fi;3.5|^Animation\|IMAX\|Sci-Fi;3|^Animation\|IMAX\|Sci-Fi;2.5|^Animation\|IMAX\|Sci-Fi;2|^Animation\|IMAX\|Sci-Fi;1.5|^Animation\|IMAX\|Sci-Fi;1'

The next step is to save the file in local system folder: <Bash Hadoop Decomposition Folder> and run the following command in Ubuntu Server 14.04.5 LTS.

$ bash <Bash Hadoop Decomposition Folder>/BashGenreAvgHadoopDecomp.sh \
<HDFS Input Data Folder>/InputData.txt \
<HDFS output Folder>


This will generate an output file in HDFS with the following contents.








7. Conclusions




In the illustration we considered how to generate summary measures for the GroupLens MovieLens 10M ratings.dat and movie.dat datasets using the MapReduce programming model. The specific MapReduce configurations considered were the word count configuration and the average configuration. 

The MapReduces were, in turn, constructed using four Hadoop Streaming libraries and two MongoDB interfaces. The MapReduce illustrations were implemented in ten (eleven) facilities, namely, Hadoop Streaming, Pig Streaming, Scala Spark Pipe, PySpark Pipe, Java Spark Pipe, SparkR Pipe, MongoDB (JavaScript) and MongoDB (PyMongo), Java Flink, Python Flink and Scala Flink. The illustration list can be further generalized to more MapReduce configurations and facilities according to user preference.


The resulting output data sets were further summarized using Bash, Hadoop, R and the SAS software in order to illustrate the kind of information that can be mined from the data sets.



Interested in exploring more material on Big Data and Cloud Computing from the Stats Cosmos blog?

Check out my other posts
















































Subscribe via RSS to keep updated
















Check out my course at Udemy College















Check out our Big Data and statistical Services















Sources

http://bit.ly/2dmK2hv
http://bit.ly/2dyzOYI
http://bit.ly/2dM5xsG
http://bit.ly/2dWSWlN
http://bit.ly/2dzV9kx
http://bit.ly/2ebNLfh
http://bit.ly/2e5yaBn
http://bit.ly/29Hbbwn
http://bit.ly/2e27GzK
http://bit.ly/1Uo1MH8
http://bit.ly/2efZi2p
http://bit.ly/2dV7LF8