­
StatsCosmos: How to apply MapReduce to the Delicious dataset using Hadoop, MongoDB and Spark (Spark-shell, PySpark, Spark Applications, SparkR and SparkSQL) – Part Two

Tuesday, June 21, 2016

Textual description of firstImageUrl

How to apply MapReduce to the Delicious dataset using Hadoop, MongoDB and Spark (Spark-shell, PySpark, Spark Applications, SparkR and SparkSQL) – Part Two























This post is designed for a joint installation of Hadoop 2.6.0 (single cluster), MongoDB 2.4.9, Spark 1.5.1 (pre-built for Hadoop) and Ubuntu 14.04.3. The illustration builds on the steps covered in part one of the post on the application of the MapReduce programming model to the GroupLens HetRec 2011 Delicious dataset. The procedure involves applying seventeen MapReduces to the dataset. The first six were outlined in part one. The underlying mathematical model to the approach is outlined in the paper Cantador, Bellogin and Vallet (2010).


1. Model 



The model starts with a social tagging system with a set of users U, items I, annotations A and tags T, constituting a folksonomy, F.

The users can be assigned profiles based on their tag assignments and the items can be assigned profiles based on the tags used on them. The user profile provides a reflection of the user's tastes, interests and needs. The item’s profile provides a reflection of its contents. 

The illustration rests on two key assumptions. The first is that users will annotate items that are relevant for them, hence, the tags they provide can be assumed to describe their interests, tastes and needs. The second is that the tags assigned to an item usually describe its contents. The first follow on assumption is that the more a user uses a particular tag, the more important the tag is for them. The second follow on assumption is that the more an item is annotated with a tag, the better the tag describes its contents. The limitation to the assumptions are that tags that are used by users to annotate many items may not be useful to discern user preference and item features.

The recommendation problem, as formulated in Adomavicius and Tuzhilin (2005), is then, for a given set of users, U = {u1,....,uM} and items, I = {i1,....,iN} to define g: U * I → ℜ, where ℜ is a totally ordered set, a  utility function such that g(um,in) measures the gain of usefulness of item in to user um. Then, for each user u ∈ U, the aim is to choose a set of items imax,u ∈ I , unknown to the user, which maximize the function g:∀ u ∈ U, imax,u = arg max i∈I g(u,i). In content-based recommendation analyses g() can be formulated as:

g(um,in) = sim(ContentbasedUserProfile(um),Content(in))  ℜ, where 
ContentbasedUserProfile(um) = um = (um,1,....,um,K∈ ℝk is the content-based preferences of user um and
Content(in) = i= (in,1,....,in,K∈ ℝk cis the set of content based features of item in.

ContentbasedUserProfile(um) and Content(in) can  usually be represented as vectors of real numbers where each vector component measures the "importance" of the corresponding feature in the user and item representations. 

The sim() function measures the similarity between the user profile and the item profile in the content feature space.

The key to the MapReduce constructs is the assignment set, A = {(um,tl,in)}∈  U * T * I , of each tag tl to item in by each user um This is available as the assignment dataset if the bookmarked URL's are defined to be the items in the model.


Essentially, a folksonomy can then be defined as a tuple F ={T, U, I, A}, where T={t1,....,tL} is the set of tags, U ={u1,....,uM} is the set of users that annotate, I ={i1,....,iN)} is the set of items that are annotated and A = {(um,tl,in)} is the set of annotations. This notation allows one to define a simple profile for user um as a vector um = (um,1,....,um,L), where um,l = |{(um,tl,i A| i ∈ I }| is the number of times user uhas annotated items with tag tl. The profile for item in can be defined as the vector i= (in,1,....,in,L), where im,l = |{(u,tl,in A | u ∈ U }| is the number of times item in has been annotated with tag tl.

In the part one illustration, each of the social tagging system components and the TF-based similarity measures were explored. The relevant constructs provided an illustration of how a particular solution to the social tagging system problem could be formulated using the MapReduce programming model.

The aim of this part two post is to build on the solution (with its constructs) in order to generate other solutions and constructs using the MapReduce programming model.

The core constructs of the MapReduce are the six quantities described in this table from the paper Cantador, Bellogin and Vallet (2010).

























The constructs are then the inputs with which to formulate the profile models. The Profile models are the following.


TF Profile model













TF-IDF Profile model













Okapi BM25 Profile model




















where b and k1 are the standard values 0.75 and 2, respectively.



The models allow for the formulation of the similarity measures . The Similarity measures are the following.


TF-based Similarity measures












TF Cosine-based  Similarity measure














TF-IDF Cosine-based Similarity measure











Okapi BM25-based Similarity measures














Okapi BM25 Cosine-based Similarity measure














The key approach to keep in mind in the construction of the core components and the profile models is how one defines the key, value pairs for the MapReduce processing. For an example, for the TF Profile Model model, um,l defines the tag frequency for tag tl by user uand in,l the tag frequency for tag tl on item i

The formula g(um,in) in the TF-based Similarity measure essentially means that one can take the tfu(m)(tl) = um,l and  tfi(n)(tl) = in,l terms, attach, the {um,in}  key to the appropriate frequencies in the model (i.e. tfu(m)(tl) for non-zero frequency tags in  in's profile in the first measure and  tfi(n)(tlfor non-zero frequency tags in  um's profile in the second measure) and conduct MapReduce in order to generate the required numerator sums. In part one the illustration showed how the relevant key indices could be constructed from the A set of the data (i.e. the Assignment dataset).

The data compilation component of this illustration involves constructing the appropriate MapReduce keys for the constructs and profile model terms (namely, unweighted and weighted frequency terms). The keys and values can be compiled into datasets that can be processed. The keys and the profile model terms can then be processed using MapReduce to quantify the Similarity measures using the Similarity measure formulae.

The best way to illustrate this process is to begin, as was shown in part one, from the A set data and core measures table from Cantador, Bellogin and Vallet (2010). In terms of the core measures in the table, the first measure can be constructed using an index of the first column and third columns of the A set. The second measure can be constructed using an index created from the second and third column of the A set. The fifth measure can be constructed using an index created from the first column of the A set. The sixth measure can be constructed using an index created from the second column of the A set. The number of observations in the output file of the fifth MapReduce and the outputs of the first MapReduce can be used to construct the third measure. The number of observations in output file of the sixth MapReduce and the outputs of the second MapReduce can be used to construct the fourth measure. 

The profile model formulae can then be used to construct the profile model frequencies (namely, weighted frequencies). The Similarity measure formulae can be used to construct the model Similarity measures (as was shown in part one for the TF-based Similarity).

The remaining eleven MapReduces can be implemented in Hadoop, MongoDB and Spark. The Hadoop and Spark MapReduces can be implemented using mapper-reducer sets prepared in Perl, Python, R and Ruby. The MongoDB MapReduces can be implemented in the MongoDB 2.4.9 shell using mapper and reducer functions.

The MapReduces can be arranged into two categories, namely, one set and three set. 
The three set MapReduces can be implemented in Hadoop and Spark using mapper-reducer sets prepared in R and Python. The one set MapReduces can be implemented in Hadoop, MongoDB and Spark. The non-MongoDB one set MapReduces can be implemented using mapper-reducer sets prepared in Perl, Python and Ruby. 

In the case of the three set MapReduces, the Python mapper-reducer set is compiled for the MapReduces in Hadoop Streaming, the Spark Pipe facility in a PySpark application (and shell) and Spark Pipe facility in a SparkR application (and shell). The R mapper-reducer set is designed for the Spark Pipe facility in a Java application.
  
The one set MapReduces are designed for use in Hadoop Streaming, Spark Pipe in the Scala Spark-shell, Spark Pipe in a PySpark application, and Spark Pipe in a SparkR application. In the scheme, the one set MapReduce configuration is designed for the calculation of the Okapi BM25 Similarity measures (MapReduce nine and ten) from the core measures and profile model terms.


2. Prepare the data


The Similarity measure MapReduces can be referenced as one to thirteen, the first two being the TF-based Similarity measures from part one.


Similarity measure MapReduce three, four and five (TF Cosine-based Similarity)


In compiling the data, one can begin with the output files from the core measures and profile model calculations. The next step will be to take from each user and item frequency ( i.e. um,l and in,l in the output files of the core measures) the first part of the index (i.e. the user uand in, respectively) and create two key, value combinations (i.e. the user um, tfu(m)(tland in, tfi(n)(tl), respectively) for the Similarity measure MapReduce. 

In the mapping phase, the numerator values can be the cross-products (tfu(m)(tltfi(n)(tl)), and the denominator values can be the squares of the individual values (i.e. (tfu(m)(tl))and (tfi(n)(tl))2). In the reduce phase, the values of the numerator cross-products can be summed and the totals output by key. In the case of the denominator entries, the square roots of the sums can be output for each key. This will generate the outputs required by the Similarity measure formulae. This is essentially how the input files for the three set MapReduces can be constructed.


Similarity measure MapReduce six, seven and eight (TF-IDF Cosine-based Similarity)


The approach for the Similarity measure MapReduce six, seven and eight input file creation is similar to that of Similarity measure MapReduce three, four and five, except that one will first create tfu(m)(tl)* iuf(tl) and (tfi(n)(tl) * iif(tl), for the user uand item in, respectively, according to the TF-IDF Profile model. The rest of the steps follow analogously to that of Similarity measure MapReduce three, four and five.


Similarity measure MapReduce nine and ten (Okapi BM25-based Similarity)


The approach for the Similarity measure MapReduce nine and ten input file creation is similar to that of MapReduce three, four and five. 

In the case of Similarity measure MapReduce nine, the approach will be to take the first part of the indices (i.e. the user um and item in) from the Okapi BM25 Profile model calculations and create a new key (um,in) for the Similarity measure MapReduce. The next step, is to identify from the items's profile the tags that have non-zero (item) weighted frequencies (i.e. in,l = bm25i(n)(tl) in the Okapi BM25 Profile model) and allocate the user's weighted tag frequencies (i.e. um,l = bm25u(m)(tlfrom the Okapi BM25 Profile model) for each tag t(respectively). This will generate the key, value input file for Similarity measure MapReduce nine.

In the case of Similarity measure MapReduce ten the process is similar but instead one makes use of the user's profile and the item's frequencies. Hence, after the keys have been created in the same manner as in the case of MapReduce nine, one identifies from the user's profile the tags that have non-zero (user) frequencies. The next step is to allocate the item's weighted tag frequencies (i.e. in,l bm25i(n)(tlfrom the Okapi BM25 Profile model) for each tag t(respectively). This will generate the key, value input file for Similarity measure MapReduce ten.

Similarity measure MapReduce eleven, twelve and thirteen (Okapi BM25 Cosine-based Similarity)


The approach for the compilation of the input file for the Similarity measure MapReduce eleven, twelve and thirteen is similar to that of MapReduce three, four and five. The first part will, however, involve creating the key value pairs using the weighted frequencies bm25u(m)(tland bm25i(n)(tl), for the user uand item in, respectively, according to the Okapi BM25 Profile model. The rest of the steps follow analogously to that of Similarity measure MapReduce three, four and five.


3. Prepare the mapper-reducer sets


Three set mapper-reducer sets



Python mapper-reducer set



The Python mapper-reducer set can be prepared according to the tutorials in this post, this post and this post.


Mapper

#!/usr/bin/python
import re
import sys
import math
from signal import signal,SIGPIPE,SIG_DFL
#Ignore SIG_PIPE and don't throw exceptions on it
signal(SIGPIPE,SIG_DFL)
# input comes from STDIN (standard input)
for line in sys.stdin:
line = line.strip()
line = line.split("\t")
if len(line)>=1:
Key = line[0]
val1 = float(line[1])
val2 = float(line[2])
valcp = (val1*val2)
val1sq = (val1*val1)
val2sq = (val2*val2)
print '%s\t%s\t%s\t%s' % (Key, valcp, val1sq, val2sq)


Reducer

#!/usr/bin/python
import re
import sys
import math
from signal import signal,SIGPIPE,SIG_DFL
#Ignore SIG_PIPE and don't throw exceptions on it
signal(SIGPIPE,SIG_DFL)
lastKey = None
theSum1 = 0
theSum2 = 0
theSum3 = 0
for line in sys.stdin:
parts = line.split("\t")
key = parts[0]
val1 = parts[1]
val2 = parts[2]
val3 = parts[3]
if lastKey==None or lastKey !=key:
if lastKey !=None:
print '%s\t%s\t%s\t%s' % (lastKey, theSum1, math.sqrt(theSum2), math.sqrt(theSum3))
lastKey = key
theSum1=0
theSum2=0
theSum3=0
theSum1 += float(val1)
theSum2 += float(val2)
theSum3 += float(val3)
if lastKey:
print '%s\t%s\t%s\t%s' % (lastKey, theSum1, math.sqrt(theSum2),math.sqrt(theSum3))



R mapper-reducer set



The R mapper-reducer set can be prepared according to the tutorials in this gist, this post and this post.


Mapper

#!/usr/bin/env Rscript
library("magrittr")
options(warn=-1)
sink("/dev/null")
input <- file("stdin", "r")
while(length(currentLine <- readLines(input, n=1, warn=FALSE)) > 0) {
fields <- unlist(strsplit(currentLine, "\t"))
valone<-as.numeric(fields[2])
valoneS<-valone*valone
valtwo<-as.numeric(fields[3])
valtwoS<-valtwo*valtwo
valc<-valone*valtwo
sink()
cat(fields[1],valc,valoneS,valtwoS, "\n", sep="\t")
sink("/dev/null")
}
close(input)


Reducer

#!/usr/bin/env Rscript
library(magrittr)
options(warn=-1)
trimWhiteSpace <- function(line) gsub("\\(|\\)", "", line)
valTotal <- as.numeric(0)
valTotal1 <- as.numeric(0)
valTotal2 <- as.numeric(0)
oldKey <- ""
#Loop around the data by the formats such as key-val pair
input <- file("stdin", "r")
while(length(currentLine <- readLines(input, n=1, warn=FALSE)) > 0) {
currentLine1 <- currentLine %>% trimWhiteSpace()
data_mapped <- unlist(strsplit(currentLine1, ","))
if (length(data_mapped) !=4) {
# Something has gone wrong. However, we can do nothing.
}
thisKey <- data_mapped[1]
thisVal <- as.numeric(data_mapped[2])
thisVal1 <- as.numeric(data_mapped[3])
thisVal2 <- as.numeric(data_mapped[4])
if (!identical(oldKey, "") && !identical(oldKey, thisKey)) {
cat(oldKey, valTotal, valTotal1%>%sqrt(),valTotal2%>%sqrt(), "\n", sep="\t")
oldKey <- thisKey
valTotal = 0
valTotal1 = 0
valTotal2 = 0
}
oldKey = thisKey
valTotal <- valTotal + thisVal
valTotal1 <- valTotal1 + thisVal1
valTotal2 <- valTotal2 + thisVal2
}
if (!identical(oldKey, "")) {
cat(oldKey, valTotal, valTotal1%>%sqrt(), valTotal2%>%sqrt(), "\n", sep ="\t")
}
close(input)



One set mapper-reducer sets



The one set mapper-reducer sets can be prepared in Perl, Python and Ruby.


Perl mapper-reducer set



The Perl mapper and reducer sets can be prepared according to the tutorials in this post and this post.


Mapper


#!/usr/bin/env perl
while(<>) { # read stdin
chomp; # remove last char if newline from the implicit variable $_
my ( $key,$value ) = split(/\t/,$_); # extract key and value
print $key."\t".$value."\n"; # write to stdout
}

Reducer


#!/usr/bin/env perl
use List::Util qw(sum);
my %hashTab;
$total = 0;
foreach (<>)
{
chomp;
@data = split(/\t/,$_);
if($#data == 1)
{
($key,$val)=@data;
if(exists($hashTab{$key}))
{
$hashTab{$key} =sum($hashTab{$key} + $val);
}
else
{
$hashTab{$key} = $val;
}
# $hashTab{$key} = $total;
}
}
foreach (keys(%hashTab))
{
print "$_\t$hashTab{$_}\n";
}

Python mapper-reducer set



The Python mapper and reducer sets can be prepared according to the tutorials in this post and this post.


Mapper


#!/usr/bin/env python
import sys
from signal import signal,SIGPIPE,SIG_DFL
# Ignore SIG_PIPE and don't throw exceptions on it
signal(SIGPIPE,SIG_DFL)
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into key value pairs
parts = line.split("\t")
key = parts[0]
Val = float(parts[1])
print '%s\t%s' % (key, Val)
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for the Pythonreducer.py
#
# tab-delimited;

Reducer


#!/usr/bin/env python
from operator import itemgetter
import sys
from signal import signal,SIGPIPE,SIG_DFL
# Ignore SIGPIPE and don't throw exceptions on it
signal(SIGPIPE,SIG_DFL)
current_key = None
current_count = 0
key = None
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from Pythonmapper.py
key, count = line.split('\t')
# convert count (current a string) into float
try:
count = float(count)
except ValueError:
# count was not anumber, so silently
# ignore/discard this line
continue
# this IF-switch only works because Hadoop sorts map output
# by key before it is passed to the reducer
if current_key == key:
current_count += count
else:
if current_key:
# write result to STDOUT
print '%s\t%s' % (current_key, current_count)
current_count = count
current_key = key
# do not forget to output the last key if needed!
if current_key == key:
print '%s\t%s' % (current_key, current_count)


Ruby mapper-reducer set



The Ruby mapper-reducer set can be prepared according to the tutorials in this post, this post and this post.


Mapper


#!/usr/bin/env ruby
Signal.trap("SIGPIPE", "SYSTEM_DEFAULT")
while line = gets
parts = line.split("\t")
key = parts[0].strip
val = parts[1].strip.to_f
puts "#{key}\t#{val}"
end

Reducer


#!/usr/bin/env ruby
Signal.trap("SIGPIPE", "SYSTEM_DEFAULT")
# Create an empty key hash
keyhash = {}
# Our input comes from STDIN, operating on each line
STDIN.each_line do |line|
# Each line will represent key and count
key, count = line.strip.split
# If we have the key in the hash, add the count to it, otherwise
# create a new one.
if keyhash.has_key?(key)
keyhash[key] += count.to_f
else
keyhash[key] = count.to_f
end
end
# Iterate through and emit the key counters
keyhash.each {|record, count| puts "#{record}\t#{count}"}


4. Process the data in Hadoop, MongoDB and Spark



Three set MapReduce



Hadoop Streaming



In order to implement the first three set MapReduce using the Hadoop Streaming facility the following arrangements need to be made.

Input data file: InputData.txt (tab-separated)
Local system Hadoop Streaming jar file folder: <Local System hadoop streaming jar file folder>
Local system mapper file folder: <Local System mapper File Folder>
Local system reducer file folder:<Local System reducer File Folder>
Hadoop Distributed File System (HDFS) input data folder: <HDFS Input Data Folder>
HDFS output data folder: <HDFS Output Data Folder>


The similarity measure MapReduce three, four and five can be conducted in the Hadoop Streaming facility using the following command on Ubuntu 14.04.3:


$ hadoop jar
<Local System Hadoop Streaming jar File Folder>/hadoop-streaming-2.6.0.jar
-file <Local System mapper File Folder>/PythonThreesetMapper.py
-mapper <Local System mapper File Folder>/PythonThreesetMapper.py
-file <Local System reducer File Folder>/PythonThreesetReducer.py
-mapper <Local System reduce File Folder>/PythonThreesetReducer.py
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
-input <HDFS Input Data Folder>/InputData.txt
-output <HDFS Output Data Folder>


These are the contents of the resulting output file.




















PySpark application



The similarity measure MapReduce six, seven and eight can be conducted in the PySpark Pipe facility. In order to implement the MapReduce using the PySpark Pipe facility the following arrangements need to be made.

Input data file: InputData.txt (tab-separated)
Local system input data folder: <Local System Input Data Folder>
Local system mapper file folder: <Local System mapper File Folder>
Local system reducer file folder:<Local System reducer File Folder>
Local system output data folder: <Local System Output Data Folder>

The next step is to prepare the following application prepared using the tutorials in this book, this post, this post, this post, this guide, this guide, this post, and this post.

"""PySparkThreesetPipeApp.py"""
import sys
from pyspark import SparkContext
from pyspark import SparkFiles
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, col
sc = SparkContext("local", "PySparkThreesetPipeApp")
sqlContext = SQLContext(sc)
rdd = sc.textFile("<Local System Input Data Folder>/InputData.txt")
funcm = '<Local System mapper Folder>/PythonThreesetMapper.py'
funcr = '<Local System reducer Folder>/PythonThreesetreducer.py'
def preparem(line):
"""Each line contains numbers separated by a tab."""
return '\t'.join(line.split('\t')) + '\n'
sc.addFile(funcm)
mappedrdd = rdd.map(lambda s: preparem(s)).pipe(SparkFiles.get(funcm)).coalesce(1)
rdd1 = mappedrdd.map(lambda x: x.split("\t"))
rdd2 = rdd1.map(lambda x: (x[0],x[1],x[2],x[3]))
p = rdd2.count()
rdd3 = rdd2.takeOrdered(p)
rdd4 = sc.parallelize(rdd3)
rdd5 = rdd4.map(lambda (w,x,y,z): w + "\t" + x + "\t" + y + "\t" + z + "\n")
def preparer(line):
"""Each line contains numbers separated by a tab."""
return '\t'.join(line.split('\t')) + '\n'
sc.addFile(funcr)
rdd6= rdd5.map(lambda s: preparer(s)).pipe(SparkFiles.get(funcr))
rdd6.saveAsTextFile(“<Local System Output Data Folder>”)
#sqlContext Query
def string_to_float(x): return float(x)
udfstring_to_float = udf(string_to_float, StringType())
rdd7 = rdd6.map(lambda x: x.split("\t"))
rdd8 = rdd7.map(lambda x: ((x[0]).strip(),string_to_float([x[1]),string_to_float(x[2]),string_to_float(x[3])))
df = sqlContext.createDataFrame(rdd8)
df.show(20)
The next step is to save the application in a Python file (i.e. PySparkThreesetPipeApp.py) in a local system folder and use spark-submit to run the application.

$ YOUR_SPARK_HOME/bin/spark-submit \
--master local [4] \
<Local System Application Folder>/PySparkThreesetPipeApp.py

This will result in the following SQL query (of the contents of the output file).














Java Spark application


The similarity measure MapReduce nine, ten and eleven can be conducted in the Java Spark application Pipe facility.  In order to implement the MapReduce using the Java Pipe facility the following arrangements need to be made.

Input data file: InputData.txt (tab-separated)
Local system input data folder: <Local System Input Data Folder>
Local system mapper file folder: <Local System mapper File Folder>
Local system reducer file folder:<Local System reducer File Folder>
Local system output data folder: <Local System Output Data Folder>


The MapReduce can implemented using the following application prepared using the tutorials in this website, this guide, this book, this book, this programming guide, this post, the SparkSQL website, this website and this post

import java.util.ArrayList;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkFiles;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
// Import factory methods provided by DataTypes
import org.apache.spark.sql.types.DataTypes;
// Import StructType and StructField
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructField;
// Import RowFactory
import org.apache.spark.sql.RowFactory;
import scala.Tuple2;
import scala.Tuple3;
public class SimpleThreesetPipeApp {
public static void main(String[] args) throws Exception {
String inputFile = args[0];
String outputFolder = args[1];
// Create a Java Spark Context.
// Create a Java Spark SQLContext.
SparkConf conf = new SparkConf().setAppName("SimpleThreesetPipeApp");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
// load out input data.
JavaRDD<String> input = sc.textFile(inputFile);
// Pipe the data to external script
String mapperScript = "<Local System mapper file Folder>/RThreesetMapper.R";
String mapperScriptName = "ThreesetMapper.R";
String reducerScript = "<Local System mapper file Folder>/RThreesetReducer.R";
String reducerScriptName = "ThreesetReducer.R";
// Add the mapper
sc.addFile(mapperScript);
JavaRDD<String> pipeInputs = input;
JavaRDD<String> pipeRDD1 = pipeInputs.pipe(SparkFiles.get(mapperScriptName));
// Add the reducer
sc.addFile(reducerScript);
JavaPairRDD<String, Tuple3<Float,Float,Float>> pipeInputs2 = pipeRDD1.mapToPair(
new PairFunction <
String, //T (<String, Float,Float,Float>
String, //K key
Tuple3<Float,Float,Float> // V <Float,Float,Float>
>(){
/**
*
*/
private static final long serialVersionUID = 1L;
public Tuple2<String, Tuple3<Float,Float,Float>> call (String s){
String[] record = s.split("\t");
String key = record[0];
Float Freq1 = new Float(record[1]);
Float Freq2 = new Float(record[2]);
Float Freq3 = new Float(record[3]);
Tuple3<Float,Float,Float> Frequencies =
new Tuple3<Float,Float,Float> (Freq1,Freq2,Freq3);
return new Tuple2<String,Tuple3<Float,Float,Float>>
(key,Frequencies);
}
}).sortByKey().coalesce(2, true);
JavaRDD<String> pipeRDD2 = pipeInputs2.pipe(SparkFiles.get(reducerScriptName));
pipeRDD2.saveAsTextFile(outputFolder);
// The schema is encoded in a string
String schemaString = "key freq1 freq2 freq3";
// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<StructField>();
for (String fieldName: schemaString.split(" ")) {
fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true));
}
StructType schema = DataTypes.createStructType(fields);
// Convert records of the RDD (pipeRDD2) to Rows.
JavaRDD<Row> rowRDD = pipeRDD2.map(
new Function<String, Row>() {
/**
*
*/
private static final long serialVersionUID = 1L;
public Row call(String record) throws Exception {
String[] fields = record.split("\t");
return RowFactory.create(fields[0], fields[1].trim(), fields[2].trim(), fields[3].trim());
}
});
// Apply the schema to the RDD
DataFrame pipeRDD2DataFrame = sqlContext.createDataFrame(rowRDD, schema);
// Register the DataFrame as table.
pipeRDD2DataFrame.registerTempTable("pipeRDD2");
// SQL can be run over RDDs that have been registered as tables.
DataFrame results = sqlContext.sql("SELECT key, freq1,freq2,freq3 FROM pipeRDD2");
// The results of SQL queries are DataFrames and support all the normal RDD oprations
// The columns of a row in the result can be accessed by ordinal.
List<String> names = results.javaRDD().map(new Function<Row, String>(){
/**
*
*/
private static final long serialVersionUID = 1L;
public String call(Row row) {
return "ItemUser: Freq1: Freq2: Freq3: " + row.getString(0) + "\t " + row.getString(1)
+ "\t " + row.getString(2) + "\t " + row.getString(3);
}
}).collect();
for (String s: names) {
System.out.println(s);
}
}
}


The next step is to save the application in a java file (i.e. JavaSparkThreesetPipeApp.java) in a local system folder, export the java file to a jar file JavaSparkThreesetPipeApp.jar (in a local system folder) and use bin/spark-submit to run the application.

This will result in the following SQL query.



















One set MapReduce


The next step is to implement similarity measure MapReduce nine and ten. These can be implemented in MongoDB, the Spark Pipe facility using a SparkR application and the Spark Pipe facility using a Scala Spark-shell program.


MongoDB-shell

The one-set MapReduce for the Okapi BM25-based similarity for the User and Item measure can be prepared using programs in the MongoDB 2.4.9 shell.

The first step is to read the data into MongoDB database <MongoDB database> (in this illustration DeliciousMR) and collection<MongoDB collection>.

In this illustration the collection for the BM25 User collection is BM25UserSimilarity and the collection for the BM25 Item collection is BM25ItemSimilarity. The MapReduce collections are map_reduce_BM25UserSimilarity for the BM25-based User Similarity measure and map_reduce_BM25ItemSimilarity for the BM25-based Item Similarity measure.


The use db command can be used to switch to the DeliciousMR database.
use DeliciousMR
view raw MongoDBUseDB hosted with ❤ by GitHub






The db.BM25UserSimilarity.find().pretty() command can be used to view the BM25-based User Similarity measure collection.

















The next step is to run the following program for the MapReduce prepared using the tutorials in this post and this post.

var mapFunction = function() {
emit(this.MRIndex, this.BM25);
};
var reduceFunction = function(keyMRIndex, valuesBM25) {
return Array.sum(valuesBM25);
};
db.BM25UserSimilarity.mapReduce(
mapFunction,
reduceFunction,
{ out: "map_reduce_BM25UserSimilarity" }
)
This will generate the following output.













The db.collection.find().pretty() command in the Mongo shell will generate the following output for the BM25-based User similarity.





The db.BM25ItemSimilarity.find().pretty() command can be used to view the BM25-based Item Similarity measure collection.


















The MapReduce procedure can then be implemented for the BM25-based Item similarity. 





The find().pretty() command will generate the following output.
















The results can then be queried in the Scala Spark-shell and in Spark applications.


SparkR Application



In order to implement the first one set MapReduce using a Spark Pipe SparkR application and query the results in MongoDB (RMongoDB and PyMongo) the following arrangements should be made.


Input data file: InputData.txt (tab-separated)
Local system input data folder: < Local System Input Data Folder>
Local system mapper file folder: < Local System mapper File Folder >
Local system reducer file folder: < Local System reducer File Folder >
Local system output data file folder: < Local System Output Data Folder>
MongoDB instance: Have an instance of MongoDB with the arrangements in the MongoDB illustration


The MapReduce (and query) can be implemented using an application and a script file. The script (for the application and the supporting script) can be prepared using the tutorials in this post, this post, this post, this document, this post, this post, this post, this post, this post, this post, this post, this post and this post. The next step is to save the following SparkR application and SparkRApplicationScript files in appropriate local system folders.


library(SparkR)
library(magrittr)
library(rmongodb)
sc <- sparkR.init(master="local")
sqlContext <- sparkRSQL.init(sc)
lines <- SparkR:::textFile(sc, "<Local System Input Data Folder>/InputData.txt")
rdd1<- SparkR:::flatMap(lines,function(line) { strsplit(line, " ")[[1]]})
mapperpipeArg<- c("<Local System mapper Folder>/PythonOnesetMapper.py")
rdd2 <-SparkR:::pipeRDD(rdd1,mapperpipeArg)
#Need a sort by key
rdd2sort <-SparkR:::sortByKey(rdd2)
reducerpipeArg<- c("<Local System reducer Folder>/PythonOnesetReducer.py")
rdd3<-SparkR:::pipeRDD(rdd2sort,reducerpipeArg)
parseFields <-function(record) {
Sys.setlocale("LC_ALL", "C"); # necessary for strsplit() to work correctly
parts <- strsplit(record, "\t")[[1]];
list(parts[1],parts[2])
}
parsedRDD <- SparkR:::lapply(rdd3, parseFields)
SparkR:::saveAsTextFile(rdd, "<Local System Output Data Folder>")
output <- collect(parsedRDD)
df <- createDataFrame(sqlContext, output)
df %>% head()
df %>% showDF()
fsummary<-(SparkR:::describe(df,"_2")) %>% collect()
fsummary
subsetDF <- subset(df, df$"_1" %in% c("8;1;"), c(1,2))
subsetDF %>% showDF()
#Connect to MongoDB
mongo = mongo.create(host"localhost")
mongo.is.connected(mongo)
#User Similarity
bson <- mongo.find.one(mongo, "DeliciousMR.map_reduce_BM25UserSimilarity", query = '{"_id":"8;1;"}')
bson
#Item Similarity
bson1 <- mongo.find.one(mongo, "DeliciousMR.map_reduce_BM25ItemSimilarity", "query = '{"_id":"8;1;"}')
bson1
#Run the query using pymongo and system2
#command
command = "python"
#note the single + double quotes in the string (needed if paths have spaces)
path2script = '"<Local System SparkRApplicationScript Folder>/SparkROneSetAppScript.py"'
output =system2(command,path2script,stdout=TRUE)
print(paste("BM25-based Similarity system2 Mongo Query is:", output))

from pymongo import Connection
#Set up the environment
connection = Connection()
#connect to the database
db = connection.DeliciousMR
#User Similarity
collection = db.map_reduce_BM25UserSimilarity
q = collection.find_one("8;1;")
print(q)
#Item Similarity
collection = db.map_reduce_BM25ItemSimilarity
r = collection.find_one("8;1;")
print(r)

The application can be run using the bin/spark-submit script.
$ YOUR_SPARK_HOME/bin/spark-submit \
--master local [4] \
<Local System SparkR Application Folder>/SparkROneSetPipeApp.R



These are the contents of the resulting output file/SQL query/NoSQL query.










Scala Spark-shell



The similarity measure MapReduce nine can be conducted using the Spark Pipe facility in a Scala Spark-shell program. The program can be complemented with MongoDB queries using RMongoDB and PyMongoDB. The first step is to make the following arrangements.


Input data file: InputData.txt (tab-separated)
Local system input data folder: < Local System Input Data Folder>
Local system mapper file folder: < Local System mapper File Folder >
Local system reducer file folder: < Local System reducer File Folder >
Local system output data file folder: < Local System Output Data Folder>
MongoDB instance: Have an instance of MongoDB with the arrangements in the MongoDB illustration

The next step is to save the following scripts in appropriate local system folders.
  
library(rmongodb)
#connect to mongoDB
mongo = mongo.create(host = "localhost")
mongo.is.connected(mongo)
# User Similarity
bson <- mongo.find.one(mongo, "DeliciousMR.map_reduce_BM25UserSimilarity", query ='{"_id": "8;1;"}')
bson
# Item Similarity
bson1 <- mongo.find.one(mongo, "DeliciousMR.map_reduce_BM25ItemSimilarity", query ='{"_id": "8;1;"}')
bson1
#Close connection
mongo.destroy(mongo)


from pymongo import Connection
#Set up the environment
connection = Connection()
#connect to the database
db = connection.DeliciousMR
#User Similarity
collection = db.map_reduce_BM25UserSimilarity
q = collection.find_one("8;1;")
print(q)
#Item Similarity
collection = db.map_reduce_BM25ItemSimilarity
r = collection.find_one("8;1;")
print(r)
The next step is to run the following program prepared using the tutorials in this post, this post, this post, this post, this post, this guide and this post.



// Import the implicits and sys.process
import sqlContext.implicits._
import scala.sys.process._
val data = sc.textFile("<Local System Input Data Folder>/InputData.txt")
val scriptPathm = "<Local System mapper Folder>/PerlOnesetMapper.pl"
val scriptPathr = "<Local System reducer Folder>/PerlOnesetReducer.pl"
val pipeRDD = data.pipe(scriptPathm).coalesce(1)
val pipeSortRDD = pipeRDD.sortBy[String]({a => a}, false)
val data1RDD = pipeSortRDD.flatMap(line => line.split(“,“))
val pipe1RDD = data1RDD.pipe(scriptPathr)
val pipe2RDD = pipe1RDD.flatMap(line => line.split(“,“))
pipe2RDD.saveAsTextFile(“<Local System Output Folder>”)
// First define a case class for the RDD/data/Pipe
case class Item(ItemUser: String, Freq: Float)
val bookmarkuser = pipe2RDD.map(_.split("\t")).map(p => Item(p(0), p(1).toFloat)).toDF()
bookmarkuser.show()
bookmarkuser.filter(bookmarkuser("Freq")>0).show()
bookmarkuser.filter(bookmarkuser("Freq")>3500).show()
bookmarkuser.describe().show()
// Query for user 8 and item 1
bookmarkuser.filter("ItemUser = '8;1;'").show()
//Next run a query on MongoDB using RMongoDB
val rmongoquery ="Rscript <Local System RMongoDB Script Folder>/ScalaSpark-shellRMongoScript.R".!!
print(rmongoquery)
//Next run a query on MongoDB using PyMongo
val pymongoquery = "python <Local System PyMongoDB Script Folder>/ScalaSpark-shellPyMongoScript.py".!!
print(pymongoquery)



These are the contents of the resulting output file/SQL query/NoSQL query.






































5. Query/Analyze the results


Once all the output data has been generated, one can conduct queries using MongoDB, a PySpark application, a SparkR application and a Java application. The BM25-based measures were calculated for user 8 and item 1. The queries using MongoDB, the Spark Pipe facility in a SparkR application (including RMongoDB) and the Spark Pipe facility in a Scala Spark-shell program were shown in the last section.

The three set TF-IDF Cosine-based Similarity measure query for user 8 and item 1 can be generated using a SparkR application prepared using the tutorials in this post, this post, this post, this document, this post, this post, this guide, this guide, this post, this post, this post and this post. The query can also be complemented with the one set MapReduce queries in MongoDB and PyMongo.

The first step is to make the following arrangements.

Input data file: InputData.txt (tab-separated)
Local system input data folder: < Local System Input Data Folder>
Local system mapper file folder: < Local System mapper File Folder >
Local system reducer file folder: < Local System reducer File Folder >
Local system output data file folder: < Local System Output Data Folder>
MongoDB instance: Have an instance of MongoDB with the arrangements in the MongoDB illustration

In order to generate a query the following application file and application script file can be saved in local system folders.

#!/usr/bin/env Rscript
library(SparkR)
library(magrittr)
library(rmongodb)
sc <- sparkR.init(master="local")
sqlContext <- sparkRSQL.init(sc)
lines <- SparkR:::textFile(sc, "<Local System Input Data Folder>/InputData.txt")
rdd<- SparkR:::flatMap(lines,function(line) { strsplit(line, " ")[[1]]})
mapperpipeArg<- c("<Local System mapper Folder>/PythonThreesetMapper.py")
piperdd <-SparkR:::pipeRDD(rdd,mapperpipeArg)
#Need a sort by key
piperddsort <-SparkR:::sortByKey(piperdd)
reducerpipeArg<- c("<Local System reducer Folder>/PythonThreesetReducer.py")
pipe2rdd<-SparkR:::pipeRDD(piperddsort,reducerpipeArg)
SparkR:::saveAsTextFile(pipe2rdd, "<Local System Output Data Folder>")
#Now we need to parse our RDD
parseFields <-function(record) {
Sys.setlocale("LC_ALL", "C"); # necessary for strsplit() to work correctly
parts <- strsplit(record, "\t")[[1]];
list(parts[1],parts[2],parts[3],parts[4])
}
parsedRDD <- SparkR:::lapply(pipe2rdd, parseFields)
output <- collect(parsedRDD)
df <- createDataFrame(sqlContext, output)
df %>% showDF()
#SQL query
subsetDF <- subset(df, df$"_1" %in% c("8;1;"), c(1,2,3,4))
subsetDF %>% showDF()
#No-SQl query
#Connect to MongoDB
mongo = mongo.create(host"localhost")
mongo.is.connected(mongo)
#User Similarity
bson <- mongo.find.one(mongo, "DeliciousMR.map_reduce_BM25UserSimilarity", query = '{"_id":"8;1;"}')
bson
#Item Similarity
bson1 <- mongo.find.one(mongo, "DeliciousMR.map_reduce_BM25ItemSimilarity", "query = '{"_id":"8;1;"}')
bson1
#Run the query using pymongo and system2
#command
command = "python"
#note the single + double quotes in the string (needed if paths have spaces)
path2script = '"<Local System SparkRApplication Folder>/SparkRThreeSetAppScript.py"'
output =system2(command,path2script,stdout=TRUE)
print(paste("BM25-based Similarity system2 Mongo Query is:", output))

from pymongo import Connection
#Set up the environment
connection = Connection()
#connect to the database
db = connection.DeliciousMR
#User Similarity
collection = db.map_reduce_BM25UserSimilarity
q = collection.find_one("8;1;")
print(q)
#Item Similarity
collection = db.map_reduce_BM25ItemSimilarity
r = collection.find_one("8;1;")
print(r)

The application can be run using the bin/spark-submit script.
$ YOUR_SPARK_HOME/bin/spark-submit \
--master local [4] \
<Local System SparkR Application Folder>/SparkRThreesetPipeApp.R


This will generate the following output.


























The three set TF Cosine-based Similarity measure query for user 8 and item 1 can be generated using a Java application. This can be done by firstly appending the Java application code from the last section in line 110 and line 111 as follows.
// SQL can be run over RDDs that have been registered as tables.
DataFrame results = sqlContext.sql("SELECT key, freq1,freq2,freq3 FROM pipeRDD2 WHERE key ='8;1;'");



The next step is to run the application (using the bin/spark-submit) with the input file for Similarity measure MapReduce three, four and five. This will generate the following output.








The three set TF-IDF Cosine-based Similarity measure query for user 8 and item 1 can be generated using a Java application with the appended code and using the input file for Similarity measure MapReduce six, seven and eight. The bin/spark-submit run will generate the following output.








The three set Okapi BM25 Cosine-based Similarity measure query for user 8 and item 1 can be generated using a Java application with the appended code and using the input file for Similarity measure MapReduce eleven, twelve and thirteen. The bin/spark-submit run will generate the following output.








The one set TF-IDF Cosine-based Similarity measure for user 8 and item 1 can be generated using a PySpark application. In order to implement the first one set MapReduce using a Spark Pipe PySpark application and query results in MongoDB (using RMongoDB and PyMongo) the following arrangements should be made.

Input data file: InputData.txt (tab-separated)
Local system input data folder: < Local System Input Data Folder>
Local system mapper file folder: < Local System mapper File Folder >
Local system reducer file folder: < Local System reducer File Folder >
Local system output data file folder: < Local System Output Data Folder>
MongoDB instance: Have an instance of MongoDB with the arrangements in the MongoDB illustration

The PySpark application script (and supporting file script) can be prepared using the tutorials in this post, this post, this post, this post, this post, this guide, this guide, this post and this postThe following PySpark application file (PySparkPipeOnesetApp.py) and PySpark Application script file (PySparkOneSetAppScript.R) can be saved in local system folders.

"""PySparkPipeOnesetApp.py"""
from pyspark import SparkContext
from pyspark import SparkFiles
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql import import *
from time import time
from pymongo import Connection
#Set up the environment
connection = Connection()
sc = SparkContext("local", "SimplePipeApp2")
sqlContext = SQLContext(sc)
#Obtain the input data
rdd = sc.textFile("<Local System Input Data Folder>/InputData.txt")
#Obtain the mapper and reducer
funcm = '<Local System mapper Folder>/RubyOnesetMapper.rb'
funcr = '<Local System reducer Folder>/RubyOneseteducer.rb'
#Prepare the data for processing
def preparem(line):
"""Each line contains numbers separated by a tab."""
return ' '.join(line.split(' ')) + '\n'
sc.addFile(funcm)
rddpipe = rdd.map(lambda s: preparem(s)).pipe(SparkFiles.get(funcm)).coalesce(1)
#Sort the data
rddpipesplit = rddpipe.map(lambda x: x.split("\t"))
rddpipesplit2 = rddpipesplit.map(lambda x: (x[0],x[1]))
p = rddpipesplit2.count()
listSort = rddpipesplit2.takeOrdered(p)
rddlistSort = sc.parallelize(listSort)
rddpipeSorted = rddlistSort.map(lambda (x,y): x + "\t" + y)
#Prepare the data for processing
def preparer(line):
"""Each line contains numbers separated by a space."""
return '\t'.join(line.split('\t')) + '\n'
sc.addFile(funcr)
rddpipe1 = rddpipeSorted.map(lambda s: preparer(s)).pipe(SparkFiles.get(funcr))
rddpipe1.saveAsTextFile("<Local System Output Data Folder>")
#prepare the data
rddpipe1split = rddpipe1.map(lambda x: x.split("\t"))
rddpipe2 = rddpipe1split.map(lambda x: (x[0],x[1]))
df = sqlContext.createDataFrame(rddpipe2)
t0 - time()
df.show(25)
tt = time() - t0
print "Query performed in {} seconds".format(round(tt,3))
#""establish the connection and query the data from MongoDb
#The schema is encoded in a string
schemaString = "ItemUser Freq1'
fields = [StructField(field_name,StringType(),True) for field_name in schemaString.split()]
schema = StructType(fields)
#Apply the schema to the data
df1 = sqlContext.createDataFrame(rddpipe2,schema)
# Register the DataFrame as a table
df1.registerTemptable("Similarity")
results = sqlContext.sql("SELECT ItemUser, Freq1 FROM Similarity")
results.show()
df2= results.filter("ItemUser in ('8;1;')").collect()
print(df2)
#connect to the database
db = connection.DeliciousMR
#Obtain the data from MongoDB
#User Similarity
collection = db.map_reduce_BM25Similarity
q = collection.find_one("8;1;")
print(q)
#Item Similarity
collection = db.map_reduce_BM25ItemSimilarity
r = collection.find_one("8;1;')
print(r)
#Query using rmongodb and subprocess
#Define command and arguments
command - 'Rscript'
path2script = '<Local System PySparkOnesetApplicationScript Folder>/PySparkOneSetAppScript.R'
#Build subprocess command
cmd =[command,path2script]
#check_output will run the command and store to result
x = subprocess.check_output(cmd,universal_newlines=True)
print('BM25-based Similarity subprocess mongo Query is:')
print(x)

library(rmongodb)
#connect to mongoDB
mongo = mongo.create(host = "localhost")
mongo.is.connected(mongo)
# User Similarity
bson <- mongo.find.one(mongo, "DeliciousMR.map_reduce_BM25UserSimilarity", query ='{"_id": "8;1;"}')
bson
# Item Similarity
bson1 <- mongo.find.one(mongo, "DeliciousMR.map_reduce_BM25ItemSimilarity", query ='{"_id": "8;1;"}')
bson1
#Close connection
mongo.destroy(mongo)
The next step is to use spark-submit to run the application..
$ YOUR_SPARK_HOME/bin/spark-submit \
--master local [4] \
<Local System Application Folder>/PySparkPipeOnesetPipeApp.py


This will generate the following output.

























6. Conclusions


The post provided an illustration of how to implement the MapReduce programming model to the GroupLens HetRec 2011 dataset using the methodology outlined in Cantador, Bellogin and Vallet (2010). The approach can be further fine tuned to conduct the other analyses outlined in the paper.




Interested in other Big data analyses and Cloud computing resources from the Stats Cosmos blog?


Check out our other blog posts



































Subscribe to our RSS feeds for blog material updates 












Or get a 28% discount to our exciting training opportunity bundle





















Sources


http://bit.ly/1M7MAYL
http://bit.ly/1tusyEE
http://bit.ly/1Y07BPl
http://bit.ly/1rDzezL
http://bit.ly/1V5cK57
http://bit.ly/1RythtQ
http://bit.ly/1TAOH9I
http://bit.ly/1YxRLtN
http://bit.ly/1WSfBB5
http://bit.ly/1rtVawg
http://bit.ly/231JrU3
http://bit.ly/1Q5vX1t
http://bit.ly/1omcG4d
http://bit.ly/1TiHqjD
http://bit.ly/1Y08OWK
http://bit.ly/1M0oCUO
http://bit.ly/1PFIW8p
http://bit.ly/21TGWAx
http://bit.ly/1Uo1MH8
http://bit.ly/262B8Mv
http://bit.ly/1UTyEr2
http://bit.ly/1RKV5dQ
http://bit.ly/1W4xED9
http://bit.ly/21rwvVv
http://bit.ly/1OuZA19
http://bit.ly/1UFeOgy
http://bit.ly/262BQcE
http://bit.ly/1T76xr7
http://bit.ly/1UeusA6
http://bit.ly/268y3qY
http://bit.ly/1Ueuwjj
http://bit.ly/1sKO0oT
http://bit.ly/1Y07GTd
http://bit.ly/1QcuOe5
http://bit.ly/1UFf3IF
http://bit.ly/1Qc7Gc8
http://bit.ly/1WlCMnp
http://bit.ly/1NoLZIf
http://bit.ly/1tutIQq
http://bit.ly/1sKOIlR
http://bit.ly/1rtWuzr
http://ibm.co/1T3h0ml
http://bit.ly/1SZPyVw
http://bit.ly/1UgJ47t
http://bit.ly/21rwXDt
http://bit.ly/1Y09F9Z
http://bit.ly/1SN27EA
http://bit.ly/1WlCMnp
http://bit.ly/1Y08WFT
http://bit.ly/24WQCvF
http://bit.ly/1UBhVdw
http://bit.ly/1tuuFIF
http://bit.ly/1UezwnY
http://bit.ly/268CGRV
http://bit.ly/1UFi8bv
http://bit.ly/1tuxwkL
http://bit.ly/1Qcz518
http://bit.ly/1Ueuwjj
http://bit.ly/1QcuOe5
http://bit.ly/1sKRrMc
http://bit.ly/1UBka0r

No comments:

Post a Comment

Thank you for your comment.