Apache Mahout的Taste基于Hadoop实现协同过滤推荐引擎的代码分析
转自:http://hi.baidu.com/dmuyy/blog/item/2a0090e73c434334b83820fd.html
Taste 是 Apache Mahout 提供的一个协同过滤算法的高效实现,它是一个基于Java实现的可扩展的高效的推荐引擎。 该推荐引擎是用<userid,itemid,preference>这样简单的数据格式表达用户对物品的偏好。 以此为输入数据,计算后就可以得到为每个user推荐的items列表。 他提供了方便的单机版的编程接口,也提供了基于hadoop的分布式的实现。 单机版的编程接口主要适用于写demo和做算法的评估,若处理大规模数据,还是需分布式的实现。 以下是对org.apache.mahout.cf.taste.hadoop.item.RecommenderJob的各MapReduce步骤的一个解读。 Taste实现一个分布式的协同过滤推荐共经历了如下12个MapReduce步骤。 以下分析了各步骤的Mapper和Reducer都做了哪些工作,并有什么格式的数据输出。 代码分析: 1、计算item的itemid_index和最小itemid值 1.1、ItemIDIndexMapper.class,VarIntWritable.class,VarLongWritable.class, 用原始输入,将userid,itemid,pref数据转成itemid_index,itemid 1.2、ItemIDIndexReducer.class,VarIntWritable.class,VarLongWritable.class, 在itemid_index,Iterator<itemid>中找最小的itemid,输出itemid_index,minimum_itemid 此处只是保存一个int型的itemid_index索引和对应的long型的itemid的映射 2、计算各user的item偏好向量,即Vector<item,pref> 2.1、ToItemPrefsMapper.class,VarLongWritable.class,booleanData?VarLongWritable.class:EntityPrefWritable.class, 用原始输入,读入偏好数据,得到userid,<itemid,pref> 2.2、ToUserVectorReducer.class,VarLongWritable.class,VectorWritable.class, 将userid,Iterator<itemid,pref>中的itemid变成itemid_index,得到userid,Vector<itemid_index,pref>,后者用RandomAccessSparseVector来存。 3、统计数据中有多少个user 3.1、CountUsersMapper.class,CountUsersKeyWritable.class,VarLongWritable.class, 用步骤2的输出,统计独立userid数目,先转换数据为userid,userid 3.2、CountUsersReducer.class,VarIntWritable.class,NullWritable.class, 通过CountUsersPartitioner将所有数据发到一个区,一个Reducer来处理 由于userid都已排序,所以可以用极简单的方式来统计出独立userid数 输出只有一个值,即用户数 4、计算item的user偏好向量,即Vector<userid,pref>,也即拿步骤2的结果做矩阵的修剪和转置 4.1、MaybePruneRowsMapper.class,IntWritable.class,DistributedRowMatrix.MatrixEntryWritable.class, 用步骤2的输出,按指定的maxCooccurrences参数值来修剪Vector的数目,目的是控制计算的规模,减少计算量 然后转为以userid_index为列号、itemid_index为行号、pref为值的矩阵,用MatrixEntryWritable表示矩阵。 输出为itemid_index,Matrix<userid_index,itemid_index,pref> 4.2、ToItemVectorsReducer.class,IntWritable.class,VectorWritable.class, 输出为itemid_index,Vector<userid_index,pref>,相当于对步骤2的结果进行了矩阵的转置, 有了偏好矩阵数据,接下来会调用RowSimilarityJob来计算行的相似度 此处的行是item,所以默认是item-base的CF。 但其实可以通过传入是否转置的参数来对步骤1进行调整,将userid和itemid转换,就可以实现user-base的CF。 此处也可以通过similarityClassname参数来指定用哪种算法来计算相似度。 RowSimilarityJob将通过接下来的3个步骤来实现: 5、用相似度算法给向量赋权 5.1、RowWeightMapper.class,VarIntWritable.class,WeightedOccurrence.class, 用相应的相似度算法来计算步骤4的输出,计算每个itemid_index所对应的Vector<userid_index,pref>的weight。 输出为userid_index,WeightedOccurrence<itemid_index,pref,weight>,WeightedOccurrence是一个简单的数据封装类。 5.2、WeightedOccurrencesPerColumnReducer.class,VarIntWritable.class,WeightedOccurrenceArray.class, 将Iterator<WeightedOccurrence>简单变为WeightedOccurrenceArray,后者只是简单继承了ArrayWritable。 最后输出结果为userid_index,WeightedOccurrenceArray,数组的数据项是WeightedOccurrence<itemid_index,pref,weight> 6、用相似度算法计算相似度,得到相似度矩阵 6.1、CooccurrencesMapper.class,WeightedRowPair.class,Cooccurrence.class, 取出步骤5的结果,将WeightedOccurrenceArray的数据双重循环,拼装如下的KV数据结构 WeightedRowPair<itemid_indexA,itemid_indexB,weightA,weightB>,Cooccurrence<userid_index,prefA,prefB> 6.2、SimilarityReducer.class,SimilarityMatrixEntryKey.class,DistributedRowMatrix.MatrixEntryWritable.class, 此步骤的Map输出,也即Reduce的输入是WeightedRowPair<itemid_indexA,itemid_indexB,weightA,weightB>,Iterator<Cooccurrence<userid_index,prefA,prefB>> 也即itemA和itemB的weight,以及不同user对itemA和itemB的pref。 相应的Similarity实例就可以利用以上数据计算itemA与itemB的相似度评分similarityValue 输出结果为SimilarityMatrixEntryKey<itemid_indexA,similarityValue>,Matrix<itemid_indexA,itemid_indexB,similarityValue> 也就是不同item和itemA的俩俩相似度,得到一个相似度矩阵 7、将相似度矩阵转为向量存储 7.1、Mapper.class,SimilarityMatrixEntryKey.class,DistributedRowMatrix.MatrixEntryWritable.class, 将步骤6的结果简单读入,item相似度矩阵 7.2、EntriesToVectorsReducer.class,IntWritable.class,VectorWritable.class, 输出为itemid_indexA,Vector<itemid_indexX,similarityValue>,Vector用SequentialAccessSparseVector存储。 也就是输出为不同的其他item与itemA之间的相似度值 8、PartialMultiply的预处理1,填充vector部分的数据 8.1、SimilarityMatrixRowWrapperMapper.class,VarIntWritable.class,VectorOrPrefWritable.class, 用步骤7的相似度数据,输出itemid_index,VectorOrPrefWritable(vector,null,null) 8.2、Reducer.class,VarIntWritable.class,VectorOrPrefWritable.class, 默认Reducer,直接输出Mapper的输出 9、PartialMultiply的预处理2,填充userid和pref部分的数据 9.1、UserVectorSplitterMapper.class,VarIntWritable.class,VectorOrPrefWritable.class, 如果提供了一个userid列表文件,Mapper初始化时会先读入该文件到FastIDSet<userid>中 如果userid不在这个Set中,则会直接return,也就是只会为该列表中的user做推荐 用步骤2的用户对各item的偏好数据,输出itemid_index,VectorOrPrefWritable(null,userid,pref) 9.2、Reducer.class,VarIntWritable.class,VectorOrPrefWritable.class, 默认Reducer,直接输出Mapper的输出 10、拼装两个PartialMultiply预处理的数据 10.1、Mapper.class,VarIntWritable.class,VectorOrPrefWritable.class, 用FileInputFormat.setInputPaths指定多个路径,将步骤8和9的输出同时作为输入 10.2、ToVectorAndPrefReducer.class,VarIntWritable.class,VectorAndPrefsWritable.class, 将VectorOrPrefWritable(vector,null,null)和VectorOrPrefWritable(null,userid,pref) 变为VectorAndPrefsWritable(vector,List<userid>,List<pref>) 最后的输出是itemid_index,VectorAndPrefsWritable(vector,List<userid>,List<pref>) 11、如果设置了item过滤文件则读取,作为黑名单 11.1、ItemFilterMapper.class,VarLongWritable.class,VarLongWritable.class, 简单读入item过滤文件,输出为itemid,userid,这相当于“黑”名单,用于后面推荐结果的过滤。 11.2、ItemFilterAsVectorAndPrefsReducer.class,VarIntWritable.class,VectorAndPrefsWritable.class, 输出为itemid_index,VectorAndPrefsWritable(vector,List<userid>,List<pref>) 其中vector的值为vector(itemid_index,Double.NaN),pref的值都用1.0f来填充。 注意,vector的第二项数据,也即similarityValue被设置为Double.NaN,后面将会用这个来判断这是否是黑名单。 12、用相似度矩阵的PartialMultiply做推荐计算 12.1、PartialMultiplyMapper.class,VarLongWritable.class,PrefAndSimilarityColumnWritable.class, 如果步骤11存在,则用FileInputFormat.setInputPaths指定多个路径,将步骤10和11的输出同时作为输入 也即输入为itemid_index,VectorAndPrefsWritable(vector,List<userid>,List<pref>),其中vector的值为Vector<itemid_index,similarityValue> 输出为userid,PrefAndSimilarityColumnWritable(pref,vector<itemid_index,similarityValue>) 12.2、AggregateAndRecommendReducer.class,VarLongWritable.class,RecommendedItemsWritable.class, 初始化时,会读入步骤1的结果,是一个HashMap<itemid_index,itemid>,也即index和itemid的映射 若设置了item白名单文件,则初始化时也会读入文件到FastIDSet<itemid>,推荐结果必须在这里边。和步骤11的黑名单相反。 Reducer在处理时会区分是否是booleanData而用不同的处理逻辑,此处我们主要讨论非booleanData,也即有实际pref数据的情况而不是默认用1.0f来填充的pref。 Reducer中进行PartialMultiply,按乘积得到的推荐度的大小取出最大的几个item。 处理的过程中需要将itemid_index通过HashMap转换回itemid,并且用“黑”“白”名单进行过滤。 白名单很容易理解,用集合是否为空和集合的contains(); 黑名单是判断Float.isNaN(similarityValue),因为此前在步骤11的输出时黑名单的similarityValue被设置为了Double.NaN。 对于非booleanData,是用pref和相似度矩阵的PartialMultiply得到推荐度的值来进行排序。 而booleanData的pref值都是1.0f,所以去计算矩阵相乘的过程没有意义,直接累加相似度的值即可。 用这个数据排序就可得到推荐结果。 输出为userid,RecommendedItemsWritable,后者实际是List<RecommendedItem<itemid,pref>>, 这里的pref是相似度矩阵的PartialMultiply或是相似度累加计算出来的值而非实际值。 后注: 以上提到的FastIDSet,SequentialAccessSparseVector,RandomAccessSparseVector等等数据结构, 是Mahout提供的一些大数据量存储和处理的一些高效实现, 针对数据的特点而做的有针对性的优化,同时解决性能和空间的问题。 在MahoutinAction的讨论CF和Cluster等的“数据的表达”章节中有专门的阐述,此处不再详细解释。 |