一个基于Mahout与hadoop的聚类搭建
mahout是基于hadoop的数据挖掘工具,因为有了hadoop,所以进行海量数据的挖掘工作显得更为简单。但是因为算法需要支持M/R,所以不是所有常用的数据挖掘算法都会支持。这篇文章会告诉你,如何使用hadoop+mahout搭出一个简易的聚类工具。
第一步:搭建hadoop平台。
我使用的是ubuntu11.04,如果没有ubuntu的开发环境,就参考我的帖子《Ubuntu10.10java开发环境》
#1在ubuntu下面建立一个用户组与用户
beneo@ubuntu:~$ sudo addgroup hadoop beneo@ubuntu:~$ sudo adduser --ingroup hadoop hduser
#2安装ssh-server
beneo@ubuntu:~$ sudo apt-get install ssh beneo@ubuntu:~$ su - hduser hduser@ubuntu:~$ ssh-keygen -t rsa -P "" hduser@ubuntu:~$ cat $HOME/.ssh/id_rsa.pub >> $HOME/.ssh/authorized_keys
#3验证ssh通信
hduser@ubuntu:ssh localhost
sshlocalhost后,选择yes,如果没有问题,就可以安装hadoop了
#4添加java_home
修改conf/hadoop-env.sh文件,让JAVA_HOME指向正确的地址
#5修改下面的配置
conf/core-site.xml:
<configuration> <property> <name>fs.default.name</name> <value>hdfs://localhost:9000</value> </property> </configuration>
conf/hdfs-site.xml:
<configuration> <property> <name>dfs.replication</name> <value>1</value> </property> </configuration>
conf/mapred-site.xml:
<configuration> <property> <name>mapred.job.tracker</name> <value>localhost:9001</value> </property> </configuration>
#6Formatanewdistributed-filesystem:
$ bin/hadoop namenode -format
#7Startthehadoopdaemons:
$ bin/start-all.sh
#8验证启动成功没有
$ jps
数一下有没有6个,没有的话,删除logs下面的文件,然后从#6开始
#9别慌,先打开网页,打不开,等!!!
NameNode - http://localhost:50070/ JobTracker - http://localhost:50030/
第一步搭建hadoop结束
第二步,Mahout的配置
#1下载Mahout,解压
#2.bash_profile里面设置HADOOP_HOME
#3mahout/bin/mahout看看打印结果
第三步,做一个聚类的demo吧
我的聚类是文本->luceneindex->mahout->clusteringdumper
可以选择的是sequeneceFile->mahout->clusteringdumper
我直接贴代码吧,用的是groovy,可能写的不好
#1text->luceneindex
def assembleDoc = { label, content -> assert !label.toString().trim().empty assert !content.toString().trim().empty def doc = new Document() doc.add(new Field("label", label, Field.Store.YES, Field.Index.NOT_ANALYZED)) doc.add(new Field("content", content, Field.Store.NO, Field.Index.ANALYZED, TermVector.YES)) doc } def mockContent = { def set = [] new File("""/home/beneo/text.txt""").newReader().eachLine { String line -> set << line } set } def mockExpandoSet = { def lst = [] mockContent()?.each { content -> // 过滤掉所有非中文字符 def line = content.replaceAll("[^\u4e00-\u9fa5]+", "") if (line != null && line.trim().length() > 2) { println(content) def expando = new Expando() expando.label = content expando.content = line lst << expando } } lst } //创建一个dic def directory = FSDirectory.open(new File("""/home/beneo/index"""), NoLockFactory.getNoLockFactory()) // 用的是 IK分词 def analyzer = new IKAnalyzer() //创建一个indexWriter,这个wirter就是用来产生出index def indexWriter = new IndexWriter(directory, analyzer, true, IndexWriter.MaxFieldLength.UNLIMITED) //从本地获得文本 mockExpandoSet().each { expando -> indexWriter.addDocument(assembleDoc(expando.label, expando.content)) } indexWriter.commit() indexWriter.close() directory.close()
#2luceneindex->mahoutvector
mahout/bin/mahout lucene.vector -d index/ -i label -o tmp/vector/vector -f content -t tmp/vector/dict -n 2
#3mahoutvector->mahoutcanopy
mahout/bin/mahout canopy -i tmp/vector/vector -o tmp/canopy/ -dm org.apache.mahout.common.distance.CosineDistanceMeasure -t1 0.32 -t2 0.08 -ow
#4mahoutcanopy->mahoutkmeans
mahout/bin/mahout kmeans -i tmp/vector/vector -c tmp/canopy/clusters-0/ -dm org.apache.mahout.common.distance.CosineDistanceMeasure -o tmp/kmeans/ -cd 0.001 -x 10 -ow -cl
#5mahoutkeamns->结果分析
String seqFileDir = "/home/hduser/tmp/kmeans/clusters-2/" String pointsDir = "/home/hduser/tmp/kmeans/clusteredPoints/" def conf = new Configuration() FileSystem fs = new Path(seqFileDir).getFileSystem(conf) Map<Integer, List<WeightedVectorWritable>> clusterIdToPoints = readPoints(new Path(pointsDir), new Configuration()); for (FileStatus seqFile: fs.globStatus(new Path(seqFileDir, "part-*"))) { Path path = seqFile.getPath() SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); org.apache.hadoop.io.Writable key = reader.getKeyClass().asSubclass(org.apache.hadoop.io.Writable.class).newInstance(); org.apache.hadoop.io.Writable value = reader.getValueClass().asSubclass(org.apache.hadoop.io.Writable.class).newInstance(); while (reader.next(key, value)) { Cluster cluster = (Cluster) value; int id = cluster.getId() int np = cluster.getNumPoints() List<WeightedVectorWritable> points = clusterIdToPoints.get(cluster.getId()); if (points != null && points.size() > 4) { for (Iterator<WeightedVectorWritable> iterator = points.iterator(); iterator.hasNext();) { println(((NamedVector) iterator.next().getVector()).getName()) } println "======================================" } } } private static Map<Integer, List<WeightedVectorWritable>> readPoints(Path pointsPathDir, Configuration conf) throws IOException { Map<Integer, List<WeightedVectorWritable>> result = new TreeMap<Integer, List<WeightedVectorWritable>>(); FileSystem fs = pointsPathDir.getFileSystem(conf); FileStatus[] children = fs.listStatus(pointsPathDir, new PathFilter() { @Override public boolean accept(Path path) { String name = path.getName(); return !(name.endsWith(".crc") || name.startsWith("_")); } }); for (FileStatus file: children) { Path path = file.getPath(); SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); IntWritable key = reader.getKeyClass().asSubclass(IntWritable.class).newInstance(); WeightedVectorWritable value = reader.getValueClass().asSubclass(WeightedVectorWritable.class).newInstance(); while (reader.next(key, value)) { // value is the cluster id as an int, key is the name/id of the // vector, but that doesn't matter because we only care about printing // it // String clusterId = value.toString(); List<WeightedVectorWritable> pointList = result.get(key.get()); if (pointList == null) { pointList = new ArrayList<WeightedVectorWritable>(); result.put(key.get(), pointList); } pointList.add(value); value = reader.getValueClass().asSubclass(WeightedVectorWritable.class).newInstance(); } } return result; }
效果我就不展示了