Thursday, May 23, 2013

Apache Mahout Setup with different configuration on Hadoop Cluster

Setup Mahout for Classification :

Step1 : Download and extract mahout from hadoop user "hduser/Some other" as mentioned in previous hadoop setup blog.

Step 2 : Start the hadoop. If any issue, the follow the previous hadoop setup blog. Set the HADOOP_HOME environment variable with hadoop path as :
export HADOOP_HOME=/home/hduser/hadoop/

Step 3: Put Classification data on hadoop file system from local file system as :

/home/hduser/hadoop/bin/hadoop fs -put  /home/hduser/train-data  train-data

We can check data is copied on hdfs or not by this command :
/home/hduser/hadoop/bin/hadoop fs -ls  /home/hduser/

Step 4 : Convert this data to sequence file(hdfs format) as :
/home/hduser/mahout/bin/mahout seqdirectory -i train-data -o train-seq
Assumed mahout is present at /home/hduser/mahout path

Step 5 : Convert this sequence file sequence vectors as :
/home/hduser/mahout/bin/mahout seq2sparse -i train-seq -o complete-vectors

Step 6 : Split this train-vector to training part and testing part as :

/home/hduser/mahout/bin/mahout split -i complete-vectors/tfidf-vectors --trainingOutput train-vectors --testOutput test-vectors --randomSelectionPct 40 --overwrite --sequenceFiles -xm sequential
This command split the data 60% for training and 40% for testing.
--randomSelectionPct 40 value gives input how much percentage split for testing and rest for training.

Step 7 : Train the training data with different algorithms. This example for Naivebayes training  :
 /home/hduser/mahout/bin/mahout trainnb -i train-vectors -el -li labelindex -o model -ow -c

Step 8 : Test the model with testing data as :
/home/hduser/mahout/bin/mahout testnb -i test-vectors -m model -l labelindex -ow -o op-testing -c 2>&1 | tee Result.txt

It will print confusion matrix also.

Tips for Apache Mahout
1) Apache mahout will work on hadoop cluster if MAHOUT_LOCAL in environment is not set. If MAHOUT_LOCAL is true, then it will run on local node.

2) Apache mahout give  only "seqdirectory" option which convert normal text file directory data to sequence directory. But it doesn't give option to  convert one single large file for classification to sequencefile.

For this text file to sequence file conversion, we have to write java code explicity, then that sequence file to put on hdfs as :

/home/hduser/hadoop/bin/hadoop fs -put  /home/hduser/train-seq  train-seq 


Then start the process from Step5

Java Code for conversion of text file (.txt/.csv etc) is :



import java.io.BufferedReader;
import java.io.FileReader;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Text;
import org.apache.commons.configuration.*;
import org.apache.commons.lang.*;
public class ConvertTextToSeq {

/**
* @param args
*/
public static void main(String[] args)
{
// TODO Auto-generated method stub
try
{
System.out.println("Start time " + System.currentTimeMillis());
if (args.length != 2) {
           System.err.println("Arguments: [input tsv file] [output sequence file]");
           return;
       }
       String inputFileName = args[0];
       String outputDirName = args[1];

       final Configuration configuration = new Configuration();
       final FileSystem fs = FileSystem.get(configuration);
       SequenceFile.Writer writer = new SequenceFile.Writer(fs, configuration, new Path(outputDirName + "/chunk-0"),
               Text.class, Text.class);

       int count = 0;
       BufferedReader reader = new BufferedReader(new FileReader(inputFileName));
       Text key = new Text();
       Text value = new Text();
       while(true) {
           String line = reader.readLine();
           if (line == null) {
               break;
           }
           String[] strArr = line.split(" ");
           int len = strArr[0].length() + strArr[1].length()+ 2;
         
         
           String category = strArr[0].split("##")[0];
           String id = strArr[1];
           String message = line.substring(len);
         
           key.set("/" + id + "/" + category);
         
           value.set(message);
           writer.append(key, value);
           count++;
       }
       writer.close();
       System.out.println("time took " + System.currentTimeMillis());
     
     
   
}
catch(Exception e)
{

}

}

}

Run the program as :
Java ConvertTextToSeq /home/hduser/mahout/train.csv /home/hduser/mahout/train-seq

csv file have 3 columns in this example one is category data,2nd column is label and 3rd column is description. In sequence file conversion use always label column for key and description column for value.
Then do all the steps after step.


3) In above steps, we had used --randomSelectionPct 40 function for training and testing split. To explicitly provide training and testing data and predict accuracy own instead of get from Apache Mahout we have to follow these steps and write java code mentioned below.


#################################################################################

Step A : If input data is directory then use above Step 1 to Step 4 and otherwise convert text file to sequence file through above code ConvertTextToSeq.
After Step A train-seq file would be at hdfs path.

Step B : Convert sequence file to vectors as :
/home/hduser/mahout/bin/mahout seq2sparse -i train-seq -o train-vectors

Step C: Train the model through training data by any algorithm provided by mahout. Following example of NaiveBayes :

/home/hduser/mahout/bin/mahout trainnb -i train-vectors/tfidf-vectors -el -li labelindex -o model -ow -c

Step D : After this get the these files from hdfs to local system as :
/home/hduser/hadoop/bin/hadoop fs -get labelindex /home/hduser/mahout-work/labelindex
/home/hduser/hadoop/bin/hadoop fs -get model /home/hduser/mahout-work/model
/home/hduser/hadoop/bin/hadoop fs -get train-vectors/dictionary.file-0 /home/hduser/mahout-work/dictionary.file-0
/home/hduser/hadoop/bin/hadoop fs -getmerge train-vectors/df-count /home/hduser/mahout-work/df-count

Step E : Run below java code by giving the step D files as a command lien argument :

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.StringReader;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
import org.apache.mahout.classifier.naivebayes.BayesUtils;
import org.apache.mahout.classifier.naivebayes.NaiveBayesModel;
import org.apache.mahout.classifier.naivebayes.StandardNaiveBayesClassifier;
import org.apache.mahout.common.Pair;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.Vector.Element;
import org.apache.mahout.vectorizer.DefaultAnalyzer;
import org.apache.mahout.vectorizer.TFIDF;

import com.google.common.collect.ConcurrentHashMultiset;
import com.google.common.collect.Multiset;


public class ClassificationForMahout {

public static Map<String, Integer> readDictionnary(Configuration conf, Path dictionnaryPath) {
Map<String, Integer> dictionnary = new HashMap<String, Integer>();
for (Pair<Text, IntWritable> pair : new SequenceFileIterable<Text, IntWritable>(dictionnaryPath, true, conf)) {
dictionnary.put(pair.getFirst().toString(), pair.getSecond().get());
}
return dictionnary;
}

public static Map<Integer, Long> readDocumentFrequency(Configuration conf, Path documentFrequencyPath) {
Map<Integer, Long> documentFrequency = new HashMap<Integer, Long>();
for (Pair<IntWritable, LongWritable> pair : new SequenceFileIterable<IntWritable, LongWritable>(documentFrequencyPath, true, conf)) {
documentFrequency.put(pair.getFirst().get(), pair.getSecond().get());
}
return documentFrequency;
}

public static void main(String[] args) throws Exception {
System.out.println("Start time :" + System.currentTimeMillis());
if (args.length < 5) {
System.out.println("Arguments: [model] [label index] [dictionnary] [document frequency] [tweet file]");
return;
}
String modelPath = args[0];
String labelIndexPath = args[1];
String dictionaryPath = args[2];
String documentFrequencyPath = args[3];
String testFilePath = args[4];

Configuration configuration = new Configuration();

// model is a matrix (wordId, labelId) => probability score
NaiveBayesModel model = NaiveBayesModel.materialize(new Path(modelPath), configuration);

StandardNaiveBayesClassifier classifier = new StandardNaiveBayesClassifier(model);

// labels is a map label => classId
Map<Integer, String> labels = BayesUtils.readLabelIndex(configuration, new Path(labelIndexPath));
Map<String, Integer> dictionary = readDictionnary(configuration, new Path(dictionaryPath));
Map<Integer, Long> documentFrequency = readDocumentFrequency(configuration, new Path(documentFrequencyPath));


// analyzer used to extract word from tweet
Analyzer analyzer = new DefaultAnalyzer();

int labelCount = labels.size();
int documentCount = documentFrequency.get(-1).intValue();

System.out.println("Number of labels: " + labelCount);
System.out.println("Number of documents in training set: " + documentCount);
BufferedReader reader = new BufferedReader(new FileReader(testFilePath));
String outputFile = "/home/hduser/result.txt";
FileWriter f1 = new FileWriter(outputFile,true); 
BufferedWriter out = new BufferedWriter(f1);
int correctCounter=0;
int totalCounter=0;
while(true)
{
String line = reader.readLine();
if (line == null) {
break;
}
String[] arr = line.split(" ");
String catId = arr[0];
String label = arr[1];
String msg = line.substring(arr[0].length() + arr[1].length() + 2);


Multiset<String> words = ConcurrentHashMultiset.create();

// extract words from Msg
TokenStream ts = analyzer.reusableTokenStream("text", new StringReader(msg));
CharTermAttribute termAtt = ts.addAttribute(CharTermAttribute.class);
ts.reset();
int wordCount = 0;
while (ts.incrementToken()) {
if (termAtt.length() > 0) {
String word = ts.getAttribute(CharTermAttribute.class).toString();
Integer wordId = dictionary.get(word);
// if the word is not in the dictionary, skip it
if (wordId != null) {
words.add(word);
wordCount++;
}
}
}

// create vector wordId => weight using tfidf
Vector vector = new RandomAccessSparseVector(10000);
TFIDF tfidf = new TFIDF();
for (Multiset.Entry<String> entry:words.entrySet()) {
String word = entry.getElement();
int count = entry.getCount();
Integer wordId = dictionary.get(word);
Long freq = documentFrequency.get(wordId);
double tfIdfValue = tfidf.calculate(count, freq.intValue(), wordCount, documentCount);
vector.setQuick(wordId, tfIdfValue);
}
// With the classifier, we get one score for each label 
// The label with the highest score is the one the tweet is more likely to
// be associated to
Vector resultVector = classifier.classifyFull(vector);
//double bestScore = -Double.MAX_VALUE;
double bestScore =Double.MAX_VALUE;
int bestCategoryId = -1;
String resultStr=catId+" ";
for(Element element: resultVector) 
{
int categoryId = element.index();
double score = -1 * element.get();
if (score < bestScore) {
bestScore = score;
bestCategoryId = categoryId;
}
//System.out.print("  " + labels.get(categoryId) + ": " + score);
if(resultStr.equalsIgnoreCase(catId + " "))
{
resultStr=resultStr + labels.get(categoryId) + " " + score;
}
else
{
resultStr=resultStr + "   " + labels.get(categoryId) + " " + score;
}
}
try
{
out.write(resultStr);
out.write("\n");
 
}
catch(Exception e)
{
}
//System.out.println(label + " => " + labels.get(bestCategoryId));
out1.write(label + " => " + labels.get(bestCategoryId));
out1.write("\n");
totalCounter++;
if(label.equalsIgnoreCase(labels.get(bestCategoryId)))
{
 
correctCounter++;
System.out.println("correctCounter : " + correctCounter);
}
};
//Close the output stream
System.out.println("correctCounter : " + correctCounter + " TotalCounter :" + totalCounter);
System.out.println("End time :" + System.currentTimeMillis());
System.out.println("Accuracy : " +  (double)correctCounter/totalCounter);
out.close();
}
}

There is 5 command line argument to run this code .

Argument 1 : model
Argument 2 :  labelindex
Argument 3 :  dictionary.file-0
Argument 4 :  df-count
Argument 5 :  testing file

It generate result.txt which contains score value for each label correspond to test description. WHich is more, assign description to that label. 





No comments:

Post a Comment