spark写mysql

import java.sql.{DriverManager, PreparedStatement}
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

/**
  * spark-submit --master local[*] --jars /root/sparkjob/mysql-connector-java-5.1.38.jar
  * --class com.zxb.sparkapplication.readwrite.SparkWriteMysql /root/sparkjob/original-scalatest-1.0-SNAPSHOT.jar
  */

object SparkWriteMysql {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local").setAppName("spark write mysql")

    val sc = new SparkContext(conf)

    // 连接mysql相关配置信息
    val driverClassName = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://192.168.242.20:3306/test?characterEncoding=utf8&useSSL=false"
    val user = "root"
    val password = "123456"

    // 构造写入的数据
    val logBuffer = mutable.ListBuffer[(String, String, String, String, String, String)]()

    val ofPattern = DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss")
    for (i <- 1 to 100) {
      logBuffer.+=(("写" + i, "测试" + i, "localhost" + i, LocalDateTime.now().format(ofPattern), "spark", LocalDateTime.now().format(ofPattern)))
    }
    // 构造RDD
    val logRDD: RDD[(String, String, String, String, String, String)] = sc.makeRDD(logBuffer)

    // 按分区遍历(每个分区创建一个连接)
    logRDD.foreachPartition(logData=>{

      Class.forName(driverClassName)
      val connection = DriverManager.getConnection(url,user,password)
      val sql = "insert into syslog(action, event, host, insertTime, userName, update_Time) values(?,?,?,?,?,?)"
      val statement:PreparedStatement = connection.prepareStatement(sql)

      try {
        logData.foreach {
          case (action, event, host, insertTime, userName, update_Time) => {

            statement.setString(1, action)
            statement.setString(2, event)
            statement.setString(3, host)
            statement.setString(4, insertTime)
            statement.setString(5, userName)
            statement.setString(6, update_Time)

            statement.executeUpdate()
          }
        }
      } finally{
        if (statement != null) statement.close()
        if (connection != null) connection.close()
      }
    })

  }
}

相关推荐