es通过spark实现join查询
elastic search是一个分布式的搜索引擎,支持对数亿的数据进行秒级的查询操作。其底层使用了lucene来进行存储,对lucene进行了分布式的封装,同时在数据进入时进行了translog以实现fail over。
在将elastic search时当做数据库使用时,必然会遇到join操作。
这里提供spark sql来实现join的一种思路。
spark是一个通用的分布式处理框架,包括但不限于数据的读、写、流式计算等操作。使用spark,可以将自己的业务逻辑,由spark框架分布在多台机器上来执行。
elastic search在2.1+的版本上,对spark提供了支持。
下面提供一些读写的例子,绝大部分内容取自于参考资料中的官方文档。
需要注意的是,spark是使用scala语言来开发的,而elasticsearch是使用java语言来开发的。本文中的例子是使用java语言来调用spark的库,官方有更简捷的使用spark语言的例子。
1. 使用spark来读写elasticsearch
import java.util.Map; import java.util.Map.Entry; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; public class WriteToEs { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("Test").setMaster("local"); conf.set("es.index.auto.create", "true"); JavaSparkContext jsc = new JavaSparkContext(conf); // 写入es { Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2); Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran"); JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of( numbers, airports)); JavaEsSpark.saveToEs(javaRDD, "spark/docs"); } // 从es中读取 { JavaPairRDD<String, Map<String, Object>> esRDD = JavaEsSpark.esRDD( jsc, "spark/docs"); for (Entry<String, Map<String, Object>> entry : esRDD .collectAsMap().entrySet()) { System.out.println(entry.getKey()); System.out.println(entry.getValue()); } } // 从es中读取 { JavaRDD<Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, "spark/docs", "?q=one:1").values(); // JavaRDD<Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, // "spark/docs").values(); Function<Map<String, Object>, Boolean> filter = new Function<Map<String, Object>, Boolean>() { public Boolean call(Map<String, Object> map) throws Exception { return map.containsKey("one"); } }; JavaRDD<Map<String, Object>> filtered = esRDD.filter(filter); for (Map<String, Object> map : filtered.collect()) { System.out.println(map); } } jsc.close(); } }
import java.util.Map; import java.util.Map.Entry; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; public class WriteToEs { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("Test").setMaster("local"); conf.set("es.index.auto.create", "true"); JavaSparkContext jsc = new JavaSparkContext(conf); // 写入es { Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2); Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran"); JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of( numbers, airports)); JavaEsSpark.saveToEs(javaRDD, "spark/docs"); } // 从es中读取 { JavaPairRDD<String, Map<String, Object>> esRDD = JavaEsSpark.esRDD( jsc, "spark/docs"); for (Entry<String, Map<String, Object>> entry : esRDD .collectAsMap().entrySet()) { System.out.println(entry.getKey()); System.out.println(entry.getValue()); } } // 从es中读取 { JavaRDD<Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, "spark/docs", "?q=one:1").values(); // JavaRDD<Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, // "spark/docs").values(); Function<Map<String, Object>, Boolean> filter = new Function<Map<String, Object>, Boolean>() { public Boolean call(Map<String, Object> map) throws Exception { return map.containsKey("one"); } }; JavaRDD<Map<String, Object>> filtered = esRDD.filter(filter); for (Map<String, Object> map : filtered.collect()) { System.out.println(map); } } jsc.close(); } }
2. 使用spark sql来读写elasticsearch
此例子为了演示join而写,没有实际的意义。
import java.io.Serializable; import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL; public class SparkSql implements Serializable { private static final long serialVersionUID = -8843264837345502547L; public static class People { private int id; private String name; private String surname; private int age; public People(int id, String name, String surname, int age) { this.id = id; this.name = name; this.surname = surname; this.age = age; } public People() { } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getSurname() { return surname; } public void setSurname(String surname) { this.surname = surname; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } } public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("Test").setMaster("local"); conf.set("es.index.auto.create", "true"); JavaSparkContext jsc = new JavaSparkContext(conf); // 写入 { SQLContext sc = new SQLContext(jsc); List<People> data = new ArrayList<People>(); data.add(new People(1, "Micheal", "Mike", 18)); data.add(new People(2, "Flowaters", "fw", 18)); DataFrame people = sc.createDataFrame(data, People.class); JavaEsSparkSQL.saveToEs(people, "spark/person"); } // 读取 { SQLContext sql = new SQLContext(jsc); // 注册表people DataFrame people = sql.read().format("es").load("spark/people"); people.registerTempTable("people"); // 注册表person DataFrame person = sql.read().format("es").load("spark/person"); person.registerTempTable("person"); // 查看表的schema // df.printSchema(); // 执行sql DataFrame df = sql .sql("SELECT ps.id, p.name FROM people as p INNER JOIN person as ps ON p.name = ps.name"); // 输出执行的结果 for (Row row : df.collectAsList()) { System.out.println(row.toString()); } } jsc.close(); } }
import java.io.Serializable; import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL; public class SparkSql implements Serializable { private static final long serialVersionUID = -8843264837345502547L; public static class People { private int id; private String name; private String surname; private int age; public People(int id, String name, String surname, int age) { this.id = id; this.name = name; this.surname = surname; this.age = age; } public People() { } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getSurname() { return surname; } public void setSurname(String surname) { this.surname = surname; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } } public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("Test").setMaster("local"); conf.set("es.index.auto.create", "true"); JavaSparkContext jsc = new JavaSparkContext(conf); // 写入 { SQLContext sc = new SQLContext(jsc); List<People> data = new ArrayList<People>(); data.add(new People(1, "Micheal", "Mike", 18)); data.add(new People(2, "Flowaters", "fw", 18)); DataFrame people = sc.createDataFrame(data, People.class); JavaEsSparkSQL.saveToEs(people, "spark/person"); } // 读取 { SQLContext sql = new SQLContext(jsc); // 注册表people DataFrame people = sql.read().format("es").load("spark/people"); people.registerTempTable("people"); // 注册表person DataFrame person = sql.read().format("es").load("spark/person"); person.registerTempTable("person"); // 查看表的schema // df.printSchema(); // 执行sql DataFrame df = sql .sql("SELECT ps.id, p.name FROM people as p INNER JOIN person as ps ON p.name = ps.name"); // 输出执行的结果 for (Row row : df.collectAsList()) { System.out.println(row.toString()); } } jsc.close(); } }
参考资料:
1. elasticsearch官方网站:https://www.elastic.co/
2. elasticsearch官方对spark的支持:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html
相关推荐
hanwentan 2020-07-21
Lzs 2020-10-23
聚合室 2020-11-16
零 2020-09-18
Justhavefun 2020-10-22
jacktangj 2020-10-14
ChaITSimpleLove 2020-10-06
Andrea0 2020-09-18
周游列国之仕子 2020-09-15
afanti 2020-09-16
88234852 2020-09-15
YClimb 2020-09-15
风雨断肠人 2020-09-04
卖口粥湛蓝的天空 2020-09-15
stulen 2020-09-15
pythonxuexi 2020-09-06
abfdada 2020-08-26