关联规则二项集Hadoop实现
近期看mahout的关联规则源码,颇为头痛,本来打算写一个系列分析关联规则的源码的,但是后面看到有点乱了,可能是稍微有点复杂吧,所以就打算先实现最简单的二项集关联规则。
算法的思想还是参考上次的图片:
这里实现分为五个步骤:
- 针对原始输入计算每个项目出现的次数;
- 按出现次数从大到小(排除出现次数小于阈值的项目)生成frequence list file;
- 针对原始输入的事务进行按frequence list file进行排序并剪枝;
- 生成二项集规则;
- 计算二项集规则出现的次数,并删除小于阈值的二项集规则;
第一步的实现:包括步骤1和步骤2,代码如下:
GetFlist.java:
- package org.fansy.date1108.fpgrowth.twodimension;
- import java.io.BufferedReader;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import java.util.ArrayList;
- import java.util.Comparator;
- import java.util.Iterator;
- import java.util.List;
- import java.util.PriorityQueue;
- import java.util.regex.Pattern;
- import org.apache.Hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FSDataInputStream;
- import org.apache.hadoop.fs.FSDataOutputStream;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- // the specific comparator
- class MyComparator implements Comparator<String>{
- private String splitter=",";
- public MyComparator(String splitter){
- this.splitter=splitter;
- }
- @Override
- publicint compare(String o1, String o2) {
- // TODO Auto-generated method stub
- String[] str1=o1.toString().split(splitter);
- String[] str2=o2.toString().split(splitter);
- int num1=Integer.parseInt(str1[1]);
- int num2=Integer.parseInt(str2[1]);
- if(num1>num2){
- return -1;
- }elseif(num1<num2){
- return1;
- }else{
- return str1[0].compareTo(str2[0]);
- }
- }
- }
- publicclass GetFList {
- /**
- * the program is based on the picture
- */
- // Mapper
- publicstaticclass MapperGF extends Mapper<LongWritable ,Text ,Text,IntWritable>{
- private Pattern splitter=Pattern.compile("[ ]*[ ,|\t]");
- privatefinal IntWritable newvalue=new IntWritable(1);
- publicvoid map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
- String [] items=splitter.split(value.toString());
- for(String item:items){
- context.write(new Text(item), newvalue);
- }
- }
- }
- // Reducer
- publicstaticclass ReducerGF extends Reducer<Text,IntWritable,Text ,IntWritable>{
- publicvoid reduce(Text key,Iterable<IntWritable> value,Context context) throws IOException, InterruptedException{
- int temp=0;
- for(IntWritable v:value){
- temp+=v.get();
- }
- context.write(key, new IntWritable(temp));
- }
- }
- publicstaticvoid main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
- // TODO Auto-generated method stub
- if(args.length!=3){
- System.out.println("Usage: <input><output><min_support>");
- System.exit(1);
- }
- String input=args[0];
- String output=args[1];
- int minSupport=0;
- try {
- minSupport=Integer.parseInt(args[2]);
- } catch (NumberFormatException e) {
- // TODO Auto-generated catch block
- minSupport=3;
- }
- Configuration conf=new Configuration();
- String temp=args[1]+"_temp";
- Job job=new Job(conf,"the get flist job");
- job.setJarByClass(GetFList.class);
- job.setMapperClass(MapperGF.class);
- job.setCombinerClass(ReducerGF.class);
- job.setReducerClass(ReducerGF.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
- FileInputFormat.setInputPaths(job, new Path(input));
- FileOutputFormat.setOutputPath(job, new Path(temp));
- boolean succeed=job.waitForCompletion(true);
- if(succeed){
- // read the temp output and write the data to the final output
- List<String> list=readFList(temp+"/part-r-00000",minSupport);
- System.out.println("the frequence list has generated ... ");
- // generate the frequence file
- generateFList(list,output);
- System.out.println("the frequence file has generated ... ");
- }else{
- System.out.println("the job is failed");
- System.exit(1);
- }
- }
- // read the temp_output and return the frequence list
- publicstatic List<String> readFList(String input,int minSupport) throws IOException{
- // read the hdfs file
- Configuration conf=new Configuration();
- Path path=new Path(input);
- FileSystem fs=FileSystem.get(path.toUri(),conf);
- FSDataInputStream in1=fs.open(path);
- PriorityQueue<String> queue=new PriorityQueue<String>(15,new MyComparator("\t"));
- InputStreamReader isr1=new InputStreamReader(in1);
- BufferedReader br=new BufferedReader(isr1);
- String line;
- while((line=br.readLine())!=null){
- int num=0;
- try {
- num=Integer.parseInt(line.split("\t")[1]);
- } catch (NumberFormatException e) {
- // TODO Auto-generated catch block
- num=0;
- }
- if(num>minSupport){
- queue.add(line);
- }
- }
- br.close();
- isr1.close();
- in1.close();
- List<String> list=new ArrayList<String>();
- while(!queue.isEmpty()){
- list.add(queue.poll());
- }
- return list;
- }
- // generate the frequence file
- publicstaticvoid generateFList(List<String> list,String output) throws IOException{
- Configuration conf=new Configuration();
- Path path=new Path(output);
- FileSystem fs=FileSystem.get(path.toUri(),conf);
- FSDataOutputStream writer=fs.create(path);
- Iterator<String> i=list.iterator();
- while(i.hasNext()){
- writer.writeBytes(i.next()+"\n");// in the last line add a \n which is not supposed to exist
- }
- writer.close();
- }
- }