每日一题 为了工作 2020 0508 第六十六题
package spark.action.factory; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; import java.util.*; /** * * @author 雪瞳 * @Slogan 时钟尚且前行,人怎能就此止步! * @Function 模拟数据并创建DataFrame * */ public class MockData { public static void main(String[] args) { String master = "local"; String appname = "dataFrame"; SparkSession session = SparkSession.builder().master(master).appName(appname).getOrCreate(); JavaSparkContext jsc = JavaSparkContext.fromSparkContext(session.sparkContext()); List<Row> dataList = new ArrayList<>(); Random random = new Random(); String[] locations = new String[]{"鲁","京","冀","鄂","粤","沪","京","深","蒙","川"}; String date = DateUtils.getTodayDate(); for (int i=0 ; i < 3000 ; i++){ //车牌号 String car = locations[random.nextInt(locations.length)]+ (char)(65+random.nextInt(26))+ StringUtils.fullFillNumBites(5,String.valueOf(random.nextInt(10000))); //模拟24小时 yyyyMMdd HH String baseActionTime = date+" "+ StringUtils.fullFillTwoBites(String.valueOf(random.nextInt(24))); //模拟一辆车被多少摄像头拍摄 for (int j=0; j< random.nextInt(300)+1 ;j++){ //每30个摄像头 小时+1 if (j % 30==0 && j!=0){ int tmp = Integer.parseInt( baseActionTime.split(" ")[1]) + 1; baseActionTime = date+ " "+ StringUtils.fullFillTwoBites(String.valueOf(tmp)); } //模拟区域ID 1-8 String areaId = StringUtils.fullFillNumBites(2, String.valueOf(random.nextInt(8)+1)); //模拟道路ID 1-50 String roadId = String.valueOf(random.nextInt(50)+1); //模拟路口数 String monitorId = StringUtils.fullFillNumBites(4, String.valueOf(random.nextInt(9)+1)); //模拟车辆被多少个摄像头拍摄 String cameraId = StringUtils.fullFillNumBites(5, String.valueOf(random.nextInt(100000)+1)); //模拟经过此路口开始时间 ,如:2018-01-01 20:09:10 String actionTime = baseActionTime+ StringUtils.fullFillTwoBites(String.valueOf(random.nextInt(60)))+ StringUtils.fullFillTwoBites(String.valueOf(random.nextInt(60))); //模拟车速 String speed = String.valueOf(random.nextInt(260)+1); // Row row = RowFactory.create(date, monitorId, cameraId, car, actionTime, speed, roadId, areaId); dataList.add(row); } } //将list序列化成row类型的javaRDD JavaRDD<Row> rowJavaRDD = jsc.parallelize(dataList); //动态创建schema方式创建DataFrame StructType structType = DataTypes.createStructType(Arrays.asList( DataTypes.createStructField("date", DataTypes.StringType, true), DataTypes.createStructField("monitor_id", DataTypes.StringType, true), DataTypes.createStructField("camera_id", DataTypes.StringType, true), DataTypes.createStructField("car", DataTypes.StringType, true), DataTypes.createStructField("action_time", DataTypes.StringType, true), DataTypes.createStructField("speed", DataTypes.StringType, true), DataTypes.createStructField("road_id", DataTypes.StringType, true), DataTypes.createStructField("area_id", DataTypes.StringType, true) )); //创建DataFrame Dataset<Row> dataFrame = session.createDataFrame(rowJavaRDD, structType); //打印数据 System.err.println("车辆信息数据"); dataFrame.show(50); dataFrame.registerTempTable("monitor_flow_action"); //生成路口号与摄像头的对应表 Map<String,Set<String>> monitorAndCameras = new HashMap<>(); int index = 0; for (Row row : dataList){ String monitorId = row.getString(1); Set<String> sets = monitorAndCameras.get(monitorId); if (sets == null){ sets = new HashSet<>(); monitorAndCameras.put(monitorId,sets); } index ++; if (index % 1000 == 0){ sets.add(StringUtils.fullFillNumBites(5, String.valueOf(random.nextInt(100000)))); } String cameraId = row.getString(2); sets.add(cameraId); } //创建路口号与摄像头对应的dataFrame dataList.clear(); Set<Map.Entry<String, Set<String>>> entrySet = monitorAndCameras.entrySet(); for (Map.Entry<String, Set<String>> entry:entrySet){ String monitorId = entry.getKey(); Set<String> cameraIds = entry.getValue(); Row row = null; for (String cameraId : cameraIds){ row = RowFactory.create(monitorId,cameraId); dataList.add(row); } } //动态创建schema StructType structTypeTwo = DataTypes.createStructType(Arrays.asList( DataTypes.createStructField("monitor_id", DataTypes.StringType, true), DataTypes.createStructField("camera_id", DataTypes.StringType, true) )); JavaRDD<Row> parallelize = jsc.parallelize(dataList); Dataset<Row> dataFrameTwo = session.createDataFrame(parallelize, structTypeTwo); dataFrameTwo.registerTempTable("monitor_camera_info"); System.err.println("路口与摄像头"); dataFrameTwo.show(50); } }
相关推荐
hanwentan 2020-07-21
Lzs 2020-10-23
聚合室 2020-11-16
零 2020-09-18
Justhavefun 2020-10-22
jacktangj 2020-10-14
ChaITSimpleLove 2020-10-06
Andrea0 2020-09-18
周游列国之仕子 2020-09-15
afanti 2020-09-16
88234852 2020-09-15
YClimb 2020-09-15
风雨断肠人 2020-09-04
卖口粥湛蓝的天空 2020-09-15
stulen 2020-09-15
pythonxuexi 2020-09-06
abfdada 2020-08-26