/** * 模拟数据 数据格式如下: * * 日期 卡口ID 摄像头编号 车牌号 拍摄时间 车速 道路ID 区域ID * date monitor_id camera_id car action_time speed road_id area_id * * monitor_flow_action * monitor_camera_info * * @author Administrator */object MockData { /** * 获取n位随机数 * * @param index 位数 * @param random * @return */ def randomNum(index: Int, random: Random): String = { var str = "" for (i <- 0 until index) { str += random.nextInt(10) } str } /** * 时 分 秒 如果小于10,填充0 01——09 * 卡口:0001———0009 * @param random * @param num 随机范围 * @param index 填充位数 * @return */ def fillZero(random: Random, num: Int, index: Int): String = { val randomNum = random.nextInt(num) var randomNumStr = randomNum.toString if (randomNum < 10) { randomNumStr = ("%0" + index + "d").format(randomNum) } randomNumStr } /** * 初始化一个输出流 * @param path * @return */ def initFile(path: String): PrintWriter = { new PrintWriter(new File(path)) } /** * 往文件中写数据 * @param pw * @param content */ def writeDataToFile(pw: PrintWriter, content: String): Unit = { pw.write(content + "\n") } /** * 关闭文件流 * @param pw */ def closeFile(pw: PrintWriter): Unit = { pw.close() } /** * 初始化kafka生产者 * @return */ def initKafkaProducer(): KafkaProducer[String,String] = { val props = new Properties() props.put("bootstrap.servers", "wanghy:9092") props.put("acks", "all") props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") new KafkaProducer[String, String](props) } /** * 写数据到kafka中 * @param producer * @param content */ def writeDataToKafka(producer: KafkaProducer[String,String], content: String): Unit = { producer.send(new ProducerRecord[String, String]("1711f", content)) } /** * 关闭kafka生产者 * @param producer */ def closeKafka(producer: KafkaProducer[String, String]): Unit = { producer.close() } /** * 模拟数据 */ def mock() { //初始化文件输出流 // val pw = initFile("d://monitor_action.log") //初始化kafka生产者 val producer = initKafkaProducer() val random = new Random() val locations = Array("鲁", "京", "京", "京", "沪", "京", "京", "深", "京", "京") //day :如:2020-06-06 val day = new SimpleDateFormat("yyyy-MM-dd").format(new Date()) /** * 模拟3000个车辆 */ for (i <- 0 until 3000) { //模拟车牌号:如:京A00001 val car = locations(random.nextInt(10)) + (65 + random.nextInt(26)).asInstanceOf[Char] + randomNum(5, random) //模拟拍照时间 2020-06-06 11 var baseActionTime = day + " " + fillZero(random, 24, 2) /** * 这里的for循环模拟每辆车经过不同的卡口不同的摄像头 数据。 */ for (j <- 0 until random.nextInt(300)) { //模拟每个车辆每被30个摄像头拍摄后 时间上累计加1小时。这样做使数据更加真实。 if (j % 30 == 0 && j != 0) { var nextHour = "" val baseHour = baseActionTime.split(" ")(1) if (baseHour.startsWith("0")) { if (baseHour.endsWith("9")) { nextHour = "10" } else { nextHour = "0" + (baseHour.substring(1).toInt + 1).toString } } else if (baseHour == "23") { nextHour = fillZero(random, 24, 2) } else { nextHour = (baseHour.toInt + 1).toString } baseActionTime = day + " " + nextHour } val actionTime = baseActionTime + ":" + fillZero(random, 60, 2) + ":" + fillZero(random, 60, 2) val monitorId = fillZero(random, 10, 4) val speed = random.nextInt(200) + 1 //模拟车速 【1-200】 val roadId = random.nextInt(50) + 1 //模拟道路id 【1~50 个道路】 val cameraId = "0" + randomNum(4, random) //5位的摄像头id val areaId = fillZero(random, random.nextInt(8) + 1, 2) //模拟areaId 【一共8个区域:01-08】 //将数据写入到文件中 val content = day + "\t" + monitorId + "\t" + cameraId + "\t" + car + "\t" + actionTime + "\t" + speed + "\t" + roadId + "\t" + areaId // writeDataToFile(pw, content)//发送到kafka writeDataToKafka(producer, content) Thread.sleep(50) } } // closeFile(pw) closeKafka(producer) } def main(args: Array[String]): Unit = { mock() }}