使用spark 对文本分词统计
package com.snailteam.simple_project
import java.sql.DriverManager
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkContext
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import com.huaban.analysis.jieba.JiebaSegmenter
import com.huaban.analysis.jieba.JiebaSegmenter.SegMode
import scala.collection.JavaConversions
/**
* 使用spark集群计算的时候,将jieba的类都 implements Serializable
*
* ./bin/spark-submit --class com.snailteam.simple_project.App --jars lib/mysql-connector-java-5.1.18.jar
*
*
*
*/
object App {
val url = "xx"
val username = "xx"
val password = "xx"
def main(args: Array[String]): Unit = {
// val conf = new SparkConf()
// .setMaster("spark://192.168.56.102:7077")
// .setAppName("mysql_test")
// val sc = new SparkContext(conf)
val sc = new SparkContext("local", "mysql")
val seg = new JiebaSegmenter;
val rdd = new JdbcRDD(
sc,
() => {
Class.forName("com.mysql.jdbc.Driver").newInstance()
DriverManager.getConnection(url, username, password)
},
"select * from book limit ?,? ",
1, 10000000, 4)
var result = rdd.flatMap { x =>
{
var ts = seg.process(x(8).toString(), SegMode.SEARCH);
for(t<-JavaConversions.asScalaBuffer(ts) if (t.word.getTokenType.contains("v") || t.word.getTokenType.contains("n") || t.word.getTokenType.contains("a")) ) yield t.word.getToken;
}
}.map { word => (word, 1) }.reduceByKey(_ + _).sortBy(f=>{f._2}, false).take(100);
result.foreach(println)
sc.stop()
}
} 相关推荐
Johnson0 2020-07-28
Hhanwen 2020-07-26
zhixingheyitian 2020-07-19
yanqianglifei 2020-07-07
Hhanwen 2020-07-05
Hhanwen 2020-06-25
rongwenbin 2020-06-15
sxyhetao 2020-06-12
hovermenu 2020-06-10
Oeljeklaus 2020-06-10
zhixingheyitian 2020-06-08
Johnson0 2020-06-08
zhixingheyitian 2020-06-01
xclxcl 2020-05-31
Hhanwen 2020-05-29
zhixingheyitian 2020-05-29
Oeljeklaus 2020-05-29