大数据之Spark 模拟数据(本地和kafka方式运行)

/**  * 模拟数据  数据格式如下:  *  *  日期        卡口ID       摄像头编号   车牌号         拍摄时间                车速              道路ID       区域ID  * date  monitor_id   camera_id  car action_time   speed  road_id   area_id  *  * monitor_flow_action  * monitor_camera_info  *  * @author Administrator  */object MockData {  /**    * 获取n位随机数    *    * @param index 位数    * @param random    * @return    */  def randomNum(index: Int, random: Random): String = {    var str = ""    for (i <- 0 until index) {      str += random.nextInt(10)    }    str  }  /**    * 时 分 秒 如果小于10,填充0  01——09    * 卡口:0001———0009    * @param random    * @param num 随机范围    * @param index 填充位数    * @return    */  def fillZero(random: Random, num: Int, index: Int): String = {    val randomNum = random.nextInt(num)    var randomNumStr = randomNum.toString    if (randomNum < 10) {      randomNumStr = ("%0" + index + "d").format(randomNum)    }    randomNumStr  }  /**    * 初始化一个输出流    * @param path    * @return    */  def initFile(path: String): PrintWriter = {    new PrintWriter(new File(path))  }  /**    * 往文件中写数据    * @param pw    * @param content    */  def writeDataToFile(pw: PrintWriter, content: String): Unit = {    pw.write(content + "\n")  }  /**    * 关闭文件流    * @param pw    */  def closeFile(pw: PrintWriter): Unit = {    pw.close()  }  /**    * 初始化kafka生产者    * @return    */  def initKafkaProducer(): KafkaProducer[String,String] = {    val props = new Properties()    props.put("bootstrap.servers", "wanghy:9092")    props.put("acks", "all")    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")    new KafkaProducer[String, String](props)  }  /**    * 写数据到kafka中    * @param producer    * @param content    */  def writeDataToKafka(producer: KafkaProducer[String,String], content: String): Unit = {    producer.send(new ProducerRecord[String, String]("1711f", content))  }  /**    * 关闭kafka生产者    * @param producer    */  def closeKafka(producer: KafkaProducer[String, String]): Unit = {    producer.close()  }  /**    * 模拟数据    */  def mock() {    //初始化文件输出流   // val pw = initFile("d://monitor_action.log")    //初始化kafka生产者   val producer = initKafkaProducer()    val random = new Random()    val locations = Array("鲁", "京", "京", "京", "沪", "京", "京", "深", "京", "京")    //day :如:2020-06-06    val day = new SimpleDateFormat("yyyy-MM-dd").format(new Date())    /**      * 模拟3000个车辆      */    for (i <- 0 until 3000) {      //模拟车牌号:如:京A00001      val car = locations(random.nextInt(10)) + (65 + random.nextInt(26)).asInstanceOf[Char] + randomNum(5, random)      //模拟拍照时间 2020-06-06 11      var baseActionTime = day + " " + fillZero(random, 24, 2)      /**        * 这里的for循环模拟每辆车经过不同的卡口不同的摄像头 数据。        */      for (j <- 0 until random.nextInt(300)) {        //模拟每个车辆每被30个摄像头拍摄后 时间上累计加1小时。这样做使数据更加真实。        if (j % 30 == 0 && j != 0) {          var nextHour = ""          val baseHour = baseActionTime.split(" ")(1)          if (baseHour.startsWith("0")) {            if (baseHour.endsWith("9")) {              nextHour = "10"            } else {              nextHour = "0" + (baseHour.substring(1).toInt + 1).toString            }          } else if (baseHour == "23") {            nextHour = fillZero(random, 24, 2)          } else {            nextHour = (baseHour.toInt + 1).toString          }          baseActionTime = day + " " + nextHour        }        val actionTime = baseActionTime + ":" + fillZero(random, 60, 2) + ":" + fillZero(random, 60, 2)        val monitorId = fillZero(random, 10, 4)        val speed = random.nextInt(200) + 1 //模拟车速 【1-200】        val roadId = random.nextInt(50) + 1 //模拟道路id 【1~50 个道路】        val cameraId = "0" + randomNum(4, random) //5位的摄像头id        val areaId = fillZero(random, random.nextInt(8) + 1, 2) //模拟areaId 【一共8个区域:01-08】        //将数据写入到文件中        val content = day + "\t" + monitorId + "\t" + cameraId + "\t" + car + "\t" + actionTime + "\t" + speed + "\t" + roadId + "\t" + areaId      //  writeDataToFile(pw, content)//发送到kafka       writeDataToKafka(producer, content)        Thread.sleep(50)      }    }  //  closeFile(pw)    closeKafka(producer)  }  def main(args: Array[String]): Unit = {    mock()  }}