Sqoop 导数据到HDFS, 用Spark SQL进行查询
1. 启动HDFS:
cd /usr/local/hadoop-2.7.7/sbin
./start-dfs.sh
2.启动Yarn:
cd cd /usr/local/hadoop-2.7.7/sbin
./start-yarn.sh
3.启动Spark:
/usr/local/spark-2.3.3-bin-hadoop2.7/sbin
./start-master.sh -h 192.168.96.12
./start-slave.sh spark://192.168.96.128:7077
4.创建Sqoop导入任务:
./sqoop-job \
--meta-connect jdbc:hsqldb:hsql://192.168.96.128:16000/sqoop \
--create t_order_increment_job \
-- import --connect jdbc:mysql://192.168.96.1:3306/demo_ds_0?serverTimezone=Asia/Shanghai \
--username root -P \
--append \
--table t_order_increment \
--incremental lastmodified \
--check-column my_time \
--last-value '2019-08-30 21:36:16' \
--target-dir /increment/t_order_increment
5.执行导入任务:
./sqoop-job \
--meta-connect jdbc:hsqldb:hsql://192.168.96.128:16000/sqoop \
--exec t_order_increment_job
6.Spark SQL进行查询的Java代码:
public class IncrementApplication { public static void main(String[] args) { SparkSession spark = SparkSession.builder() .appName("SparkApplication") .config("spark.master", "spark://192.168.96.128:7077") .config("spark.jars", "/usr/local/workspace/spark-test-1.0-SNAPSHOT-shaded.jar") .getOrCreate(); JavaRDD<Order> orderRdd = spark.read().text("hdfs://192.168.96.128:9000/increment/t_order_increment/").javaRDD().map( line -> { Order order = new Order(); String[] items = line.getString(0).split(","); Integer orderId = Integer.valueOf(items[0]); order.setOrderId(orderId); Integer userId = Integer.valueOf(items[1]); order.setUserId(userId); order.setStatus(items[2]); return order; } ); Dataset<Row> orderDataSet = spark.createDataFrame(orderRdd, Order.class); orderDataSet.createOrReplaceTempView("order"); Dataset<Row> sqlDF = spark.sql("SELECT * FROM order"); sqlDF.show(); } }
附录:
删除HDFS文件的命令:
cd /usr/local/hadoop-2.7.7/bin
./hadoop dfs -rm -R /increment/*