使用spark 对文本分词统计
package com.snailteam.simple_project import java.sql.DriverManager import scala.collection.mutable.ArrayBuffer import org.apache.spark.SparkContext import org.apache.spark.rdd.JdbcRDD import org.apache.spark.rdd.RDD.rddToPairRDDFunctions import com.huaban.analysis.jieba.JiebaSegmenter import com.huaban.analysis.jieba.JiebaSegmenter.SegMode import scala.collection.JavaConversions /** * 使用spark集群计算的时候,将jieba的类都 implements Serializable * * ./bin/spark-submit --class com.snailteam.simple_project.App --jars lib/mysql-connector-java-5.1.18.jar * * * */ object App { val url = "xx" val username = "xx" val password = "xx" def main(args: Array[String]): Unit = { // val conf = new SparkConf() // .setMaster("spark://192.168.56.102:7077") // .setAppName("mysql_test") // val sc = new SparkContext(conf) val sc = new SparkContext("local", "mysql") val seg = new JiebaSegmenter; val rdd = new JdbcRDD( sc, () => { Class.forName("com.mysql.jdbc.Driver").newInstance() DriverManager.getConnection(url, username, password) }, "select * from book limit ?,? ", 1, 10000000, 4) var result = rdd.flatMap { x => { var ts = seg.process(x(8).toString(), SegMode.SEARCH); for(t<-JavaConversions.asScalaBuffer(ts) if (t.word.getTokenType.contains("v") || t.word.getTokenType.contains("n") || t.word.getTokenType.contains("a")) ) yield t.word.getToken; } }.map { word => (word, 1) }.reduceByKey(_ + _).sortBy(f=>{f._2}, false).take(100); result.foreach(println) sc.stop() } }
相关推荐
Johnson0 2020-07-28
Hhanwen 2020-07-26
zhixingheyitian 2020-07-19
yanqianglifei 2020-07-07
Hhanwen 2020-07-05
Hhanwen 2020-06-25
rongwenbin 2020-06-15
sxyhetao 2020-06-12
hovermenu 2020-06-10
Oeljeklaus 2020-06-10
zhixingheyitian 2020-06-08
Johnson0 2020-06-08
zhixingheyitian 2020-06-01
xclxcl 2020-05-31
Hhanwen 2020-05-29
zhixingheyitian 2020-05-29
Oeljeklaus 2020-05-29