FP-Growth是一種常被用來進行關聯分析,挖掘頻繁項的算法。與Aprior算法相比,FP-Growth算法采用前綴樹的形式來表征數據,減少了掃描事務數據庫的次數,通過遞歸地生成條件FP-tree來挖掘頻繁項。參考資料[1]詳細分析了這一過程。事實上,面對大數據量時,FP-Growth算法生成的FP-tree非常大,無法放入內存,挖掘到的頻繁項也可能有指數多個。本文將分析如何並行化FP-Growth算法以及Mahout中並行化FP-Growth算法的源碼。
1. 並行化FP-Growth
並行化FP-Growth的方法有很多,最早提出使用MapReduce並行化FP-Growth算法的應該是來自Google Beijing Research的Haoyuan Li等(見參考資料[2])。他們提出使用三次MapReduce來並行化FP-Growth,整個過程大致可以分為五個步驟:
Step 1:Sharding
為了均衡整個集群的讀寫性能,將事務數據庫分成若干個數據片段(shard),存儲到P個節點中。
Step 2:Parallel Counting
與WordCount類似,通過一次MapReduce來計算每一個項(item)的支持度。具體來說,每一個mapper將從hdfs中取得事務數據庫的若干個數據片段(shards),所以mapper的輸入是<key, value=Ti>,Ti表示數據片段中的一條數據。對於Ti中的每一個項aj,mapper輸出<key=aj, value=1>。當集群中的所有mapper處理完數據之後,所有key=aj的鍵值對將被分配到同一個reducer,所以reducer的輸入是<key=aj, value={1, 1, ... , 1}>。reducer只需要進行一次求和,然後輸出<key=aj, value=sum{1, 1, ... , 1}>。最終將得到一張按照支持度遞減排序的列表,稱之為F-List:
圖1
Step 3:Grouping Items
將F-List中的項(item)分為Q個組(group),每一個組都有一個唯一的group-id,我們將所有項以及其所對應的group-id記為G-List。
Step 4:Parallel FP-Growth
這一步驟是並行化FP-Growth的關鍵,也是整個算法中相對難以理解的部分。這一步驟中將用到一次MapReduce。每一個mapper的輸入來自第一步生成的數據片段,所以mapper的輸入是<key, value=Ti>。在處理這些數據片段之前,mapper將讀取第三步生成的G-List。G-List其實是一張Hashmap,鍵是項,值是項所對應的group-id,所占空間一般並不會很大,可以放入內存中。從Ti的最後一項開始向前掃描,或者說從右向左掃描,如果aL在G-List中對應的group-id是第一次被掃描,則輸出{a0,a1,…,aL},否則不輸出任何數據。以圖1中所示的數據為例,假如支持度阈值為1,Q為3,那麼將得到G-List:
圖2
其中,第三列是group-id。假如mapper的輸入是{牛奶,雞蛋,面包,薯片},從最後一項開始掃描,輸出<key=1,value={牛奶,雞蛋,面包,薯片}>。之後的兩項是面包和雞蛋,其所對應的group-id和薯片相同,所以不輸出任何數據。第一項是牛奶,其所對應的group-id未曾出現過,所以輸出<key=2,value={牛奶}>。
所有group-id相同的數據將被推送到同一個reducer,所以reducer的輸入是<key=group-id,value={{ValueList1},{ValueList2},…,{ValueListN}}>。reducer在本地構建FP-tree,然後像傳統的FP-Growth算法那樣遞歸地構建條件FP-tree,並挖掘頻繁模式。與傳統的FP-Growth算法不一樣的是,reducer並不直接輸出所挖掘到的頻繁模式,而是將其放入一個大小為K,根據支持度排序建立的大根堆,然後輸出K個支持度較高的頻繁模式:<key=item,reduce={包含該item的Top K Frequent Patterns}>。
Step 5:Aggregating
上一步挖掘到的頻繁模式Top K Frequent Patterns已經包含了所有頻繁模式,然而上一步的MapReduce是按照groupID來劃分數據,因此key=item對應的頻繁模式會存在於若干個不同groupID的reduce節點上。為了合並所有key=item的鍵值對,優化結果展現形式,可利用MapReduce默認對key排序的特點,對挖掘到的頻繁模式進行一下處理:依次將Top K Frequent Patterns的每一個item作為key,然後輸出包含該key的這一條Top K Frequent Patterns。所以,每一個mapper的輸出是<key=item, value={該節點上的包含該item的頻繁模式}>,reducer匯總所有mapper的輸出結果,並輸出最終的結果<key=item, value={包含該item的所有頻繁模式}>。
2. Parallel FP-Growth源碼分析
Mahout提供了一些機器學習領域經典算法的實現。Mahout0.9之後的版本已經移除了Parallel FP-Growth算法。本文將分析Mahout0.8中Parallel FP-Growth的源碼。
圖3
FPGrowthDriver.java
FPGrowthDriver是FPGrowth算法的驅動類,繼承自AbstractJob類。運行Hadoop任務一般都是通過命令行中執行bin/hadoop腳本,同時傳入一些參數。ToolRunner類中的GenericOptionsParser可獲取這些命令行參數。AbstractJob類封裝了addInputOption,addOutputOption,addOption,parseArguments等方法,為解析命令行參數提供了幫助。params對象存儲了整個算法所需要的參數。FPGrowthDriver根據命令行參數,若順序執行,則調用該文件內的runFPGrowth方法,若並行化執行,則調用PFPGrowth.java文件中的runPFPGrowth方法。
1 public final class FPGrowthDriver extends AbstractJob { 2 3 private static final Logger log = LoggerFactory.getLogger(FPGrowthDriver.class); 4 5 private FPGrowthDriver() { 6 } 7 8 public static void main(String[] args) throws Exception { 9 //ToolRunner的靜態方法run()內有GenericOptionsParser。通過GenericOptionsParser.getRemainingArgs()可獲取傳入的命令行參數。之後,ToolRunner.run()將調用FPGrowthDriver.run()。 10 ToolRunner.run(new Configuration(), new FPGrowthDriver(), args); 11 } 12 13 /** 14 * Run TopK FPGrowth given the input file, 15 */ 16 @Override 17 public int run(String[] args) throws Exception { 18 addInputOption(); //添加默認的輸入目錄路徑 19 addOutputOption(); //添加默認的輸出目錄路徑 20 21 addOption("minSupport", "s", "(Optional) The minimum number of times a co-occurrence must be present." 22 + " Default Value: 3", "3"); //添加支持度阈值 23 addOption("maxHeapSize", "k", "(Optional) Maximum Heap Size k, to denote the requirement to mine top K items." 24 + " Default value: 50", "50"); //添加大根堆的大小 25 addOption("numGroups", "g", "(Optional) Number of groups the features should be divided in the map-reduce version." 26 + " Doesn't work in sequential version Default Value:" + PFPGrowth.NUM_GROUPS_DEFAULT, 27 Integer.toString(PFPGrowth.NUM_GROUPS_DEFAULT)); //添加組數g 28 addOption("splitterPattern", "regex", "Regular Expression pattern used to split given string transaction into" 29 + " itemsets. Default value splits comma separated itemsets. Default Value:" 30 + " \"[ ,\\t]*[,|\\t][ ,\\t]*\" ", "[ ,\t]*[,|\t][ ,\t]*"); //添加分隔符 31 addOption("numTreeCacheEntries", "tc", "(Optional) Number of entries in the tree cache to prevent duplicate" 32 + " tree building. (Warning) a first level conditional FP-Tree might consume a lot of memory, " 33 + "so keep this value small, but big enough to prevent duplicate tree building. " 34 + "Default Value:5 Recommended Values: [5-10]", "5"); 35 addOption("method", "method", "Method of processing: sequential|mapreduce", "sequential"); //添加訓練方法,順序執行或並行執行 36 addOption("encoding", "e", "(Optional) The file encoding. Default value: UTF-8", "UTF-8"); //添加編碼方式 37 addFlag("useFPG2", "2", "Use an alternate FPG implementation"); 38 39 //如果解析命令行參數失敗,則退出 40 if (parseArguments(args) == null) { 41 return -1; 42 } 43 44 Parameters params = new Parameters(); 45 46 if (hasOption("minSupport")) { 47 String minSupportString = getOption("minSupport"); 48 params.set("minSupport", minSupportString); 49 } 50 if (hasOption("maxHeapSize")) { 51 String maxHeapSizeString = getOption("maxHeapSize"); 52 params.set("maxHeapSize", maxHeapSizeString); 53 } 54 if (hasOption("numGroups")) { 55 String numGroupsString = getOption("numGroups"); 56 params.set("numGroups", numGroupsString); 57 } 58 59 if (hasOption("numTreeCacheEntries")) { 60 String numTreeCacheString = getOption("numTreeCacheEntries"); 61 params.set("treeCacheSize", numTreeCacheString); 62 } 63 64 if (hasOption("splitterPattern")) { 65 String patternString = getOption("splitterPattern"); 66 params.set("splitPattern", patternString); 67 } 68 69 String encoding = "UTF-8"; 70 if (hasOption("encoding")) { 71 encoding = getOption("encoding"); 72 } 73 params.set("encoding", encoding); 74 75 if (hasOption("useFPG2")) { 76 params.set(PFPGrowth.USE_FPG2, "true"); 77 } 78 79 Path inputDir = getInputPath(); 80 Path outputDir = getOutputPath(); 81 82 params.set("input", inputDir.toString()); 83 params.set("output", outputDir.toString()); 84 85 String classificationMethod = getOption("method"); 86 if ("sequential".equalsIgnoreCase(classificationMethod)) { 87 runFPGrowth(params); 88 } else if ("mapreduce".equalsIgnoreCase(classificationMethod)) { 89 Configuration conf = new Configuration(); 90 HadoopUtil.delete(conf, outputDir); 91 PFPGrowth.runPFPGrowth(params); 92 } 93 94 return 0; 95 }
PFPGrowth.java
PFPGrowth是並行化FP-Growth算法的驅動類。runPFPGrowth(params)方法內初始化了一個Configuration對象,之後調用runPFPGrowth(params, conf)方法。runPFPGrowth(params, conf)方法包括了並行化FP-Growth算法的五個關鍵步驟。其中,startParallelCounting(params, conf)對應Step1和Step2,通過類似WordCount的方法統計每一項的支持度,其輸出結果將被readFList()和saveList()用於生成FList。之後,將按照用戶輸入的命令行參數NUM_GROUPS來計算每一個group所含項的個數,並將其存儲到params。startParallelFPGrowth(params, conf)對應Step3和Step4。startAggregating(params, conf)對應Step5。
1 public static void runPFPGrowth(Parameters params, Configuration conf) throws IOException, InterruptedException, ClassNotFoundException { 2 conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization," + "org.apache.hadoop.io.serializer.WritableSerialization"); 3 4 startParallelCounting(params, conf); //對應Step1和Step2 5 6 // save feature list to dcache 7 List<Pair<String,Long>> fList = readFList(params); 8 saveFList(fList, params, conf); 9 10 // set param to control group size in MR jobs 11 int numGroups = params.getInt(NUM_GROUPS, NUM_GROUPS_DEFAULT); 12 int maxPerGroup = fList.size() / numGroups; 13 if (fList.size() % numGroups != 0) { 14 maxPerGroup++; 15 } 16 params.set(MAX_PER_GROUP, Integer.toString(maxPerGroup)); 17 18 startParallelFPGrowth(params, conf); //對應Step3和Step4 19 20 startAggregating(params, conf); //對應Step5 21 }
startParallelCounting方法初始化了一個Job對象。該Job對象將調用ParallelCountingMapper和ParallelCountingReducer來完成支持度的統計。
1 /** 2 * Count the frequencies of various features in parallel using Map/Reduce 3 */ 4 public static void startParallelCounting(Parameters params, Configuration conf) 5 throws IOException, InterruptedException, ClassNotFoundException { 6 conf.set(PFP_PARAMETERS, params.toString()); 7 8 conf.set("mapred.compress.map.output", "true"); 9 conf.set("mapred.output.compression.type", "BLOCK"); 10 11 String input = params.get(INPUT); 12 Job job = new Job(conf, "Parallel Counting Driver running over input: " + input); 13 job.setJarByClass(PFPGrowth.class); 14 15 job.setOutputKeyClass(Text.class); 16 job.setOutputValueClass(LongWritable.class); 17 18 FileInputFormat.addInputPath(job, new Path(input)); 19 Path outPath = new Path(params.get(OUTPUT), PARALLEL_COUNTING); 20 FileOutputFormat.setOutputPath(job, outPath); 21 22 HadoopUtil.delete(conf, outPath); 23 24 job.setInputFormatClass(TextInputFormat.class); 25 job.setMapperClass(ParallelCountingMapper.class); 26 job.setCombinerClass(ParallelCountingReducer.class); 27 job.setReducerClass(ParallelCountingReducer.class); 28 job.setOutputFormatClass(SequenceFileOutputFormat.class); 29 30 boolean succeeded = job.waitForCompletion(true); 31 if (!succeeded) { 32 throw new IllegalStateException("Job failed!"); 33 } 34 35 }
ParallelCountingMapper.java
ParallelCountingMapper中map方法的輸入分別是字節偏移量offset和事務數據庫中的某一行數據input。所有input數據中多次出現的項都被視為出���一次,所以將input數據split之後存儲到HashSet中。map方法的輸出是<key=item, value=one>。
1 public class ParallelCountingMapper extends Mapper<LongWritable,Text,Text,LongWritable> { 2 3 private static final LongWritable ONE = new LongWritable(1); 4 5 private Pattern splitter; 6 7 @Override 8 protected void map(LongWritable offset, Text input, Context context) throws IOException, InterruptedException { 9 10 String[] items = splitter.split(input.toString()); 11 Set<String> uniqueItems = Sets.newHashSet(Arrays.asList(items)); 12 for (String item : uniqueItems) { 13 if (item.trim().isEmpty()) { 14 continue; 15 } 16 context.setStatus("Parallel Counting Mapper: " + item); 17 context.write(new Text(item), ONE); 18 } 19 } 20 21 @Override 22 protected void setup(Context context) throws IOException, InterruptedException { 23 super.setup(context); 24 Parameters params = new Parameters(context.getConfiguration().get(PFPGrowth.PFP_PARAMETERS, "")); 25 splitter = Pattern.compile(params.get(PFPGrowth.SPLIT_PATTERN, PFPGrowth.SPLITTER.toString())); 26 } 27 }
ParallelCountingReducer.java
ParallelCountingReducer中reduce方法的輸入是<key=item, value={one, one, ... , one}>。所有key=item的鍵值對將被分配到一台機器上,所以只需要對values進行遍歷求和就可以求出該item的支持度。
1 public class ParallelCountingReducer extends Reducer<Text,LongWritable,Text,LongWritable> { 2 3 @Override 4 protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, 5 InterruptedException { 6 long sum = 0; 7 for (LongWritable value : values) { 8 context.setStatus("Parallel Counting Reducer :" + key); 9 sum += value.get(); 10 } 11 context.setStatus("Parallel Counting Reducer: " + key + " => " + sum); 12 context.write(key, new LongWritable(sum)); 13 14 } 15 }
PFPGrowth.java
通過params中的OUTPUT參數可以獲取ParallelCountingReducer的輸出路徑。在readFList這個方法中用到了幾個數據結構。Pair實現了Comparable接口和Serializable接口,其數據成員first和second分別用來表示item和item所對應的支持度。PriorityQueue是一個用平衡二叉樹實現的小頂堆,如果指定了Comparator,將按照Comparator對PriorityQueue中的元素進行排序,如果未指定Comparator,則將按照元素實現的Comparable接口進行排序。在並行化FP-Growth算法中,初始化PriorityQueue時指定了Comparator,其按照Pair的第一個元素進行排序,如果第一個元素相等,則按照第二個元素進行排序。通過初始化SequenceFileDirIterable來遍歷上一次MapReduce輸出的結果,每次將Pair添加到PriorityQueue的同時完成排序。最後,逐一將PriorityQueue中的元素取出放入fList。因此,fList是一個按照支持度遞減的列表。
1 /** 2 * read the feature frequency List which is built at the end of the Parallel counting job 3 * 4 * @return Feature Frequency List 5 */ 6 public static List<Pair<String,Long>> readFList(Parameters params) { 7 int minSupport = Integer.valueOf(params.get(MIN_SUPPORT, "3")); 8 Configuration conf = new Configuration(); 9 10 Path parallelCountingPath = new Path(params.get(OUTPUT), PARALLEL_COUNTING); 11 12 PriorityQueue<Pair<String,Long>> queue = new PriorityQueue<Pair<String,Long>>(11, 13 new Comparator<Pair<String,Long>>() { 14 @Override 15 public int compare(Pair<String,Long> o1, Pair<String,Long> o2) { 16 int ret = o2.getSecond().compareTo(o1.getSecond()); 17 if (ret != 0) { 18 return ret; 19 } 20 return o1.getFirst().compareTo(o2.getFirst()); 21 } 22 }); 23 24 for (Pair<Text,LongWritable> record 25 : new SequenceFileDirIterable<Text,LongWritable>(new Path(parallelCountingPath, FILE_PATTERN), 26 PathType.GLOB, null, null, true, conf)) { 27 long value = record.getSecond().get(); 28 if (value >= minSupport) { 29 queue.add(new Pair<String,Long>(record.getFirst().toString(), value)); 30 } 31 } 32 List<Pair<String,Long>> fList = Lists.newArrayList(); 33 while (!queue.isEmpty()) { 34 fList.add(queue.poll()); 35 } 36 return fList; 37 }
由於已經生成了fList,上一次MapReduce的輸出結果已經沒有用了,因此,saveFList方法首先刪除了這些文件。之後,saveFList方法將flist寫入到hdfs上。對於存儲在hdfs上的文件,DistributedCache提供了緩存文件的功能,在Slave Node進行計算之前可將hdfs上的文件復制到這些節點上。
1 /** 2 * Serializes the fList and returns the string representation of the List 3 */ 4 public static void saveFList(Iterable<Pair<String,Long>> flist, Parameters params, Configuration conf) 5 throws IOException { 6 Path flistPath = new Path(params.get(OUTPUT), F_LIST); 7 FileSystem fs = FileSystem.get(flistPath.toUri(), conf); 8 flistPath = fs.makeQualified(flistPath); 9 HadoopUtil.delete(conf, flistPath); 10 SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, flistPath, Text.class, LongWritable.class); 11 try { 12 for (Pair<String,Long> pair : flist) { 13 writer.append(new Text(pair.getFirst()), new LongWritable(pair.getSecond())); 14 } 15 } finally { 16 writer.close(); 17 } 18 DistributedCache.addCacheFile(flistPath.toUri(), conf); 19 }
startParallelFPGrowth方法初始化了一個Job對象。該Job對象將調用ParallelFPGrowthMapper和ParallelFPGrowthReducer來實現Step3和Step4。
1 /** 2 * Run the Parallel FPGrowth Map/Reduce Job to calculate the Top K features of group dependent shards 3 */ 4 public static void startParallelFPGrowth(Parameters params, Configuration conf) 5 throws IOException, InterruptedException, ClassNotFoundException { 6 conf.set(PFP_PARAMETERS, params.toString()); 7 conf.set("mapred.compress.map.output", "true"); 8 conf.set("mapred.output.compression.type", "BLOCK"); 9 Path input = new Path(params.get(INPUT)); 10 Job job = new Job(conf, "PFP Growth Driver running over input" + input); 11 job.setJarByClass(PFPGrowth.class); 12 13 job.setMapOutputKeyClass(IntWritable.class); 14 job.setMapOutputValueClass(TransactionTree.class); 15 16 job.setOutputKeyClass(Text.class); 17 job.setOutputValueClass(TopKStringPatterns.class); 18 19 FileInputFormat.addInputPath(job, input); 20 Path outPath = new Path(params.get(OUTPUT), FPGROWTH); 21 FileOutputFormat.setOutputPath(job, outPath); 22 23 HadoopUtil.delete(conf, outPath); 24 25 job.setInputFormatClass(TextInputFormat.class); 26 job.setMapperClass(ParallelFPGrowthMapper.class); 27 job.setCombinerClass(ParallelFPGrowthCombiner.class); 28 job.setReducerClass(ParallelFPGrowthReducer.class); 29 job.setOutputFormatClass(SequenceFileOutputFormat.class); 30 31 boolean succeeded = job.waitForCompletion(true); 32 if (!succeeded) { 33 throw new IllegalStateException("Job failed!"); 34 } 35 }
ParallelFPGrowthMapper.java
ParallelFPGrowthMapper中的setup方法將在map方法之前被運行。setup方法中調用了readFList方法。注意這裡的readFList方法與之前分析的readFList方法參數不一樣,所以是兩個完全不同的方法。這裡的readFList方法通過HadoopUtil.getCachedFiles(conf)來獲取緩存文件flist,將其存儲到fMap,其中item作為fMap的鍵,item在flist中的位置序號作為fMap的值,例如flist中的第一個item,其在fMap中將是<key=item, value=0>。這樣做的原因是之後將fMap分Q個group時需要用到這個位置序號。在map方法中,輸入是字節偏移量和事務數據庫中的某一行數據。根據用戶指定的分隔符splitter來切分數據。為了過濾非頻繁項,通過fMap.containsKey(item)方法來查找該項是否存在於fList中。若存在,將其所對應的位置序號加入到itemSet,否則,將其丟棄。itemArr復制itemSet中的數據,並按照位置序號遞增進行排序,即按照支持度遞減進行排序。之後的for循環從itemArr的最後一個元素向前遍歷,如果其所對應的groupID不在groups中,那麼將初始化TransactionTree,將itemArr[0],itemArr[1],…,itemArr[j]存入該TransactionTree中。groupID的計算非常簡單,將位置序號除以maxPerGroup即可。TransactionTree實現了Writable和Iterable<Pair<IntArrayList, Long>>接口,初始化TransactionTree時,構造方法將參數賦值給TransactionTree中的數據成員List<Pair<IntArrayList, Long>> transactionSet。這裡Pair對象存儲的兩個元素分別是位置序號列表和1。
1 /** 2 * maps each transaction to all unique items groups in the transaction. mapper 3 * outputs the group id as key and the transaction as value 4 * 5 */ 6 public class ParallelFPGrowthMapper extends Mapper<LongWritable,Text,IntWritable,TransactionTree> { 7 8 private final OpenObjectIntHashMap<String> fMap = new OpenObjectIntHashMap<String>(); 9 private Pattern splitter; 10 private int maxPerGroup; 11 private final IntWritable wGroupID = new IntWritable(); 12 13 @Override 14 protected void map(LongWritable offset, Text input, Context context) 15 throws IOException, InterruptedException { 16 17 String[] items = splitter.split(input.toString()); 18 19 OpenIntHashSet itemSet = new OpenIntHashSet(); 20 21 for (String item : items) { 22 if (fMap.containsKey(item) && !item.trim().isEmpty()) { 23 itemSet.add(fMap.get(item)); 24 } 25 } 26 27 IntArrayList itemArr = new IntArrayList(itemSet.size()); 28 itemSet.keys(itemArr); 29 itemArr.sort(); 30 31 OpenIntHashSet groups = new OpenIntHashSet(); 32 for (int j = itemArr.size() - 1; j >= 0; j--) { 33 // generate group dependent shards 34 int item = itemArr.get(j); 35 int groupID = PFPGrowth.getGroup(item, maxPerGroup); 36 37 if (!groups.contains(groupID)) { 38 IntArrayList tempItems = new IntArrayList(j + 1); 39 tempItems.addAllOfFromTo(itemArr, 0, j); 40 context.setStatus("Parallel FPGrowth: Generating Group Dependent transactions for: " + item); 41 wGroupID.set(groupID); 42 context.write(wGroupID, new TransactionTree(tempItems, 1L)); 43 } 44 groups.add(groupID); 45 } 46 47 } 48 49 @Override 50 protected void setup(Context context) throws IOException, InterruptedException { 51 super.setup(context); 52 53 int i = 0; 54 for (Pair<String,Long> e : PFPGrowth.readFList(context.getConfiguration())) { 55 fMap.put(e.getFirst(), i++); 56 } 57 58 Parameters params = 59 new Parameters(context.getConfiguration().get(PFPGrowth.PFP_PARAMETERS, "")); 60 61 splitter = Pattern.compile(params.get(PFPGrowth.SPLIT_PATTERN, 62 PFPGrowth.SPLITTER.toString())); 63 64 maxPerGroup = params.getInt(PFPGrowth.MAX_PER_GROUP, 0); 65 } 66 }
ParallelFPGrowthReducer.java
ParallelFPGrowthReducer的輸入是<key=groupID, value={TransactionTree1, TransactionTree2, … , TransactionTreeN}>。setup方法獲取了參數params,並且通過PFPGrowth.readFList(conf)方法獲取了緩存文件flist,將頻繁項存入featureReverseMap,將頻繁項對應的支持度存入freqList。之前分析到ParallelFPGrowthMapper輸出的TransactionTree其實是List<Pair<IntArrayList, Long>> transactionSet。在ParallelFPGrowthReducer內初始化了一個TransactionTree,雖然這個TransactionTree與之前的Transaction是同一個類,但是是一棵用二維數組實現的樹。考慮到文章篇幅,建樹的過程這裡不作分析。假設已經建好了這棵樹,cTree.generateFList方法遍歷這棵樹,返回Map<Integer, MutableLong> frequencyList。具體的遍歷方法這裡不作詳細分析,提一下其調用過程:TransactionTree實現Iterator<Pair<IntArrayList, Long>>接口時重寫了iterator方法,在generateFList方法中通過iterator方法生成一個迭代器來遍歷整棵樹。iterator方法返回的是TransactionTreeIterator對象。TransactionTreeIterator對象繼承自AbstractIterator<Pair<IntArrayList, Long>>,實現了對TransactionTree進行遍歷。localFList合並了generateFList的結果並按照支持度遞減進行排序。生成頻繁模式的方法有兩種,用戶可以自己選擇來調用FPGrowthIds.generateTopKFrequentPatterns方法或者fpGrowth.generateTopKFrequentPatterns方法來生成頻繁模式,本文將對後者進行分析。在ParallelFPGrowthReducer中還有一個IteratorAdapter類。它是設計模式中十分經典的適配器模式的具體應用,可以將兩個不同類型的迭代器解耦。ParallelFPGrowthReducer的輸出是<key=item, value={Top K Frequent Patterns}>。
1 /** 2 * takes each group of transactions and runs Vanilla FPGrowth on it and 3 * outputs the the Top K frequent Patterns for each group. 4 * 5 */ 6 public final class ParallelFPGrowthReducer extends Reducer<IntWritable,TransactionTree,Text,TopKStringPatterns> { 7 8 private final List<String> featureReverseMap = Lists.newArrayList(); 9 private final LongArrayList freqList = new LongArrayList(); 10 private int maxHeapSize = 50; 11 private int minSupport = 3; 12 private int numFeatures; 13 private int maxPerGroup; 14 private boolean useFP2; 15 16 private static final class IteratorAdapter implements Iterator<Pair<List<Integer>,Long>> { 17 private final Iterator<Pair<IntArrayList,Long>> innerIter; 18 19 private IteratorAdapter(Iterator<Pair<IntArrayList,Long>> transactionIter) { 20 innerIter = transactionIter; 21 } 22 23 @Override 24 public boolean hasNext() { 25 return innerIter.hasNext(); 26 } 27 28 @Override 29 public Pair<List<Integer>,Long> next() { 30 Pair<IntArrayList,Long> innerNext = innerIter.next(); 31 return new Pair<List<Integer>,Long>(innerNext.getFirst().toList(), innerNext.getSecond()); 32 } 33 34 @Override 35 public void remove() { 36 throw new UnsupportedOperationException(); 37 } 38 } 39 40 @Override 41 protected void reduce(IntWritable key, Iterable<TransactionTree> values, Context context) throws IOException { 42 TransactionTree cTree = new TransactionTree(); 43 for (TransactionTree tr : values) { 44 for (Pair<IntArrayList,Long> p : tr) { 45 cTree.addPattern(p.getFirst(), p.getSecond()); 46 } 47 } 48 49 List<Pair<Integer,Long>> localFList = Lists.newArrayList(); 50 for (Entry<Integer,MutableLong> fItem : cTree.generateFList().entrySet()) { 51 localFList.add(new Pair<Integer,Long>(fItem.getKey(), fItem.getValue().toLong())); 52 } 53 54 Collections.sort(localFList, new CountDescendingPairComparator<Integer,Long>()); 55 56 if (useFP2) { 57 FPGrowthIds.generateTopKFrequentPatterns( 58 cTree.iterator(), 59 freqList, 60 minSupport, 61 maxHeapSize, 62 PFPGrowth.getGroupMembers(key.get(), maxPerGroup, numFeatures), 63 new IntegerStringOutputConverter( 64 new ContextWriteOutputCollector<IntWritable, TransactionTree, Text, TopKStringPatterns>(context), 65 featureReverseMap), 66 new ContextStatusUpdater<IntWritable, TransactionTree, Text, TopKStringPatterns>(context)); 67 } else { 68 FPGrowth<Integer> fpGrowth = new FPGrowth<Integer>(); 69 fpGrowth.generateTopKFrequentPatterns( 70 new IteratorAdapter(cTree.iterator()), 71 localFList, 72 minSupport, 73 maxHeapSize, 74 Sets.newHashSet(PFPGrowth.getGroupMembers(key.get(), 75 maxPerGroup, 76 numFeatures).toList()), 77 new IntegerStringOutputConverter( 78 new ContextWriteOutputCollector<IntWritable,TransactionTree,Text,TopKStringPatterns>(context), 79 featureReverseMap), 80 new ContextStatusUpdater<IntWritable,TransactionTree,Text,TopKStringPatterns>(context)); 81 } 82 } 83 84 @Override 85 protected void setup(Context context) throws IOException, InterruptedException { 86 87 super.setup(context); 88 Parameters params = new Parameters(context.getConfiguration().get(PFPGrowth.PFP_PARAMETERS, "")); 89 90 for (Pair<String,Long> e : PFPGrowth.readFList(context.getConfiguration())) { 91 featureReverseMap.add(e.getFirst()); 92 freqList.add(e.getSecond()); 93 } 94 95 maxHeapSize = Integer.valueOf(params.get(PFPGrowth.MAX_HEAPSIZE, "50")); 96 minSupport = Integer.valueOf(params.get(PFPGrowth.MIN_SUPPORT, "3")); 97 98 maxPerGroup = params.getInt(PFPGrowth.MAX_PER_GROUP, 0); 99 numFeatures = featureReverseMap.size(); 100 useFP2 = "true".equals(params.get(PFPGrowth.USE_FPG2)); 101 }
TransactionTree.java
在分析fpGrowth.generateTopKFrequentPatterns方法之前,先來分析一下建樹過程中使用的addPattern方法。下面的代碼列出了TransactionTree的數據成員和addPattern方法。在addPattern方法中,首先從根節點開始與myList中的節點進行比較。childWithAttribute返回temp節點下的孩子節點中是否有和attributeValue名稱相同的節點。如果沒有,addCountMode置為false,將myList中剩余的節點添加到這棵樹中;如果有,則通過addCount方法增加child節點的支持度。這一建樹的思路與傳統的FP-Growth中建樹的思路完全一致。
1 private int[] attribute; //節點的名稱屬性 2 private int[] childCount; //對該節點的有多少個孩子節點進行計數 3 private int[][] nodeChildren; //二維數組,記錄每一個節點的孩子節點 4 private long[] nodeCount; //當前節點的支持度計數 5 private int nodes; 6 private boolean representedAsList; //true表示以List形式展現,false表示以樹的形式展現 7 private List<Pair<IntArrayList,Long>> transactionSet; 8 9 public int addPattern(IntArrayList myList, long addCount) { 10 int temp = ROOTNODEID; 11 int ret = 0; 12 boolean addCountMode = true; 13 for (int idx = 0; idx < myList.size(); idx++) { 14 int attributeValue = myList.get(idx); 15 int child; 16 if (addCountMode) { 17 child = childWithAttribute(temp, attributeValue); 18 if (child == -1) { 19 addCountMode = false; 20 } else { 21 addCount(child, addCount); 22 temp = child; 23 } 24 } 25 if (!addCountMode) { 26 child = createNode(temp, attributeValue, addCount); 27 temp = child; 28 ret++; 29 } 30 } 31 return ret; 32 }
FPGrowth.java
generateTopKFrequentPatterns方法的形參有transactionStream,frequencyList,minSupport,k,Collection<A> returnableFeatures,OutputCollector<A, List<Pair<List<A>, Long>>> output,Statusupdater updater。其中,transactionStream是根據當前key=groupID所對應的Pair<List<A>, Long>類型的values建立的cTree,這裡Pair的第一項是位置序號,第二項是1;frequencyList是ParallelFPGrowthReducer中的localFList,其第一項是位置序號,第二項是支持度;Collection<A> returnableFeatures是當前key=group-id所包含的位置序號集合。
attributeIdMapping過濾了transactionStream中的非頻繁項,並為頻繁項分配新id,將其映射成<key=位置序號, value=id>。reverseMapping倒置了attributeIdMapping的映射關系。attributeFrequentcy記錄了索引為id的項的支持度。對於returnableFeatures中的位置序號進行遍歷,過濾非頻繁項,returnFeatures記錄了剩余的頻繁項。之後調用generateTopKFrequentPatterns方法來構建本地的FP-tree和頭表(Header-Table),並遍歷FP-tree來輸出頻繁項。參考資料[1]詳細分析了這一過程,這裡不作進一步分析,需要注意到是在Mahout中FP-tree是以數組的形式存儲。
1 /** 2 * Generate Top K Frequent Patterns for every feature in returnableFeatures 3 * given a stream of transactions and the minimum support 4 * 5 * @param transactionStream 6 * Iterator of transaction 7 * @param frequencyList 8 * list of frequent features and their support value 9 * @param minSupport 10 * minimum support of the transactions 11 * @param k 12 * Number of top frequent patterns to keep 13 * @param returnableFeatures 14 * set of features for which the frequent patterns are mined. If the 15 * set is empty or null, then top K patterns for every frequent item (an item 16 * whose support> minSupport) is generated 17 * @param output 18 * The output collector to which the the generated patterns are 19 * written 20 * @throws IOException 21 */ 22 public final void generateTopKFrequentPatterns(Iterator<Pair<List<A>,Long>> transactionStream, 23 Collection<Pair<A, Long>> frequencyList, 24 long minSupport, 25 int k, 26 Collection<A> returnableFeatures, 27 OutputCollector<A,List<Pair<List<A>,Long>>> output, 28 StatusUpdater updater) throws IOException { 29 30 Map<Integer,A> reverseMapping = Maps.newHashMap(); 31 Map<A,Integer> attributeIdMapping = Maps.newHashMap(); 32 33 int id = 0; 34 for (Pair<A,Long> feature : frequencyList) { 35 A attrib = feature.getFirst(); 36 Long frequency = feature.getSecond(); 37 if (frequency >= minSupport) { 38 attributeIdMapping.put(attrib, id); 39 reverseMapping.put(id++, attrib); 40 } 41 } 42 43 long[] attributeFrequency = new long[attributeIdMapping.size()]; 44 for (Pair<A,Long> feature : frequencyList) { 45 A attrib = feature.getFirst(); 46 Long frequency = feature.getSecond(); 47 if (frequency < minSupport) { 48 break; 49 } 50 attributeFrequency[attributeIdMapping.get(attrib)] = frequency; 51 } 52 53 log.info("Number of unique items {}", frequencyList.size()); 54 55 Collection<Integer> returnFeatures = Sets.newHashSet(); 56 if (returnableFeatures != null && !returnableFeatures.isEmpty()) { 57 for (A attrib : returnableFeatures) { 58 if (attributeIdMapping.containsKey(attrib)) { 59 returnFeatures.add(attributeIdMapping.get(attrib)); 60 log.info("Adding Pattern {}=>{}", attrib, attributeIdMapping 61 .get(attrib)); 62 } 63 } 64 } else { 65 for (int j = 0; j < attributeIdMapping.size(); j++) { 66 returnFeatures.add(j); 67 } 68 } 69 70 log.info("Number of unique pruned items {}", attributeIdMapping.size()); 71 generateTopKFrequentPatterns(new TransactionIterator<A>(transactionStream, 72 attributeIdMapping), attributeFrequency, minSupport, k, reverseMapping 73 .size(), returnFeatures, new TopKPatternsOutputConverter<A>(output, 74 reverseMapping), updater); 75 76 }
AggregatorMapper的輸入是<key, value=TopKStringPatterns>,TopKStringPatterns是一個存儲<Pair<List<String>,Long>>類型的列表,List<String>類型元素記錄了每一個key=item對應的頻繁模式,Long類型元素記錄了支持度。
1 /** 2 * 3 * outputs the pattern for each item in the pattern, so that reducer can group them 4 * and select the top K frequent patterns 5 * 6 */ 7 public class AggregatorMapper extends Mapper<Text,TopKStringPatterns,Text,TopKStringPatterns> { 8 9 @Override 10 protected void map(Text key, TopKStringPatterns values, Context context) throws IOException, 11 InterruptedException { 12 for (Pair<List<String>,Long> pattern : values.getPatterns()) { 13 for (String item : pattern.getFirst()) { 14 List<Pair<List<String>,Long>> patternSingularList = Lists.newArrayList(); 15 patternSingularList.add(pattern); 16 context.setStatus("Aggregator Mapper:Grouping Patterns for " + item); 17 context.write(new Text(item), new TopKStringPatterns(patternSingularList)); 18 } 19 } 20 21 } 22 }
AggregatorReducer匯總了所有Key相同的item,然後按照支持度遞減排序,最終輸出Top K個頻繁模式。
1 /** 2 * 3 * groups all Frequent Patterns containing an item and outputs the top K patterns 4 * containing that particular item 5 * 6 */ 7 public class AggregatorReducer extends Reducer<Text,TopKStringPatterns,Text,TopKStringPatterns> { 8 9 private int maxHeapSize = 50; 10 11 @Override 12 protected void reduce(Text key, Iterable<TopKStringPatterns> values, Context context) throws IOException, 13 InterruptedException { 14 TopKStringPatterns patterns = new TopKStringPatterns(); 15 for (TopKStringPatterns value : values) { 16 context.setStatus("Aggregator Reducer: Selecting TopK patterns for: " + key); 17 patterns = patterns.merge(value, maxHeapSize); 18 } 19 context.write(key, patterns); 20 21 } 22 23 @Override 24 protected void setup(Context context) throws IOException, InterruptedException { 25 super.setup(context); 26 Parameters params = new Parameters(context.getConfiguration().get("pfp.parameters", "")); 27 maxHeapSize = Integer.valueOf(params.get("maxHeapSize", "50")); 28 29 } 30 }
3. 討論
並行化FP-Growth算法解決了大數據量時傳統FP-Growth的性能瓶頸。除了並行化FP-Growth算法外,還有許多方法可以優化FP-Growth算法,比如並行化FP-Growth算法時考慮負載均衡,采用極大頻繁項集和閉頻繁項集表示頻繁模式。
極大頻繁項集是這樣的頻繁項集,它的直接超集都不是頻繁的。極大頻繁項集形成了可以導出所有頻繁項集的最小項集集合,但是極大頻繁項集卻不包含它們子集的支持度信息。
如果項集的直接超集都不具有和它相同的支持度並且該項集的支持度大於或等於最小支持度阈值,則該項集是閉頻繁項集。閉頻繁項集提供了頻繁項集的一種最小表示,該表示不丟失支持度信息。
4. 參考資料
[1] 關聯分析:FP-Growth算法. Mark Lin. datahunter. 2014. [Link]
[2] PFP: Parallel FP-Growth for Query Recommendation. Haoyuan Li etc. RecSys '08 Proceedings of the 2008 ACM conference on Recommender systems. 2008. [PDF]