es通过spark实现join查询

elastic search是一个分布式的搜索引擎,支持对数亿的数据进行秒级的查询操作。其底层使用了lucene来进行存储,对lucene进行了分布式的封装,同时在数据进入时进行了translog以实现fail over。

在将elastic search时当做数据库使用时,必然会遇到join操作。

这里提供spark sql来实现join的一种思路。

spark是一个通用的分布式处理框架,包括但不限于数据的读、写、流式计算等操作。使用spark,可以将自己的业务逻辑,由spark框架分布在多台机器上来执行。

elastic search在2.1+的版本上,对spark提供了支持。

下面提供一些读写的例子,绝大部分内容取自于参考资料中的官方文档。

需要注意的是,spark是使用scala语言来开发的,而elasticsearch是使用java语言来开发的。本文中的例子是使用java语言来调用spark的库,官方有更简捷的使用spark语言的例子。

1. 使用spark来读写elasticsearch

import java.util.Map;  
import java.util.Map.Entry;  
  
import org.apache.spark.SparkConf;  
import org.apache.spark.api.java.JavaPairRDD;  
import org.apache.spark.api.java.JavaRDD;  
import org.apache.spark.api.java.JavaSparkContext;  
import org.apache.spark.api.java.function.Function;  
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;  
  
import com.google.common.collect.ImmutableList;  
import com.google.common.collect.ImmutableMap;  
  
public class WriteToEs {  
  
    public static void main(String[] args) {  
        SparkConf conf = new SparkConf().setAppName("Test").setMaster("local");  
        conf.set("es.index.auto.create", "true");  
  
        JavaSparkContext jsc = new JavaSparkContext(conf);  
  
        // 写入es  
        {  
            Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);  
            Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO",  
                    "San Fran");  
  
            JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(  
                    numbers, airports));  
            JavaEsSpark.saveToEs(javaRDD, "spark/docs");  
        }  
  
        // 从es中读取  
        {  
            JavaPairRDD<String, Map<String, Object>> esRDD = JavaEsSpark.esRDD(  
                    jsc, "spark/docs");  
  
            for (Entry<String, Map<String, Object>> entry : esRDD  
                    .collectAsMap().entrySet()) {  
                System.out.println(entry.getKey());  
                System.out.println(entry.getValue());  
            }  
        }  
  
        // 从es中读取  
        {  
            JavaRDD<Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc,  
                    "spark/docs", "?q=one:1").values();  
            // JavaRDD<Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc,  
            // "spark/docs").values();  
  
            Function<Map<String, Object>, Boolean> filter = new Function<Map<String, Object>, Boolean>() {  
                public Boolean call(Map<String, Object> map) throws Exception {  
                    return map.containsKey("one");  
                }  
            };  
            JavaRDD<Map<String, Object>> filtered = esRDD.filter(filter);  
            for (Map<String, Object> map : filtered.collect()) {  
                System.out.println(map);  
            }  
        }  
  
        jsc.close();  
    }  
}  
import java.util.Map;
import java.util.Map.Entry;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

public class WriteToEs {

	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setAppName("Test").setMaster("local");
		conf.set("es.index.auto.create", "true");

		JavaSparkContext jsc = new JavaSparkContext(conf);

		// 写入es
		{
			Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);
			Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO",
					"San Fran");

			JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(
					numbers, airports));
			JavaEsSpark.saveToEs(javaRDD, "spark/docs");
		}

		// 从es中读取
		{
			JavaPairRDD<String, Map<String, Object>> esRDD = JavaEsSpark.esRDD(
					jsc, "spark/docs");

			for (Entry<String, Map<String, Object>> entry : esRDD
					.collectAsMap().entrySet()) {
				System.out.println(entry.getKey());
				System.out.println(entry.getValue());
			}
		}

		// 从es中读取
		{
			JavaRDD<Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc,
					"spark/docs", "?q=one:1").values();
			// JavaRDD<Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc,
			// "spark/docs").values();

			Function<Map<String, Object>, Boolean> filter = new Function<Map<String, Object>, Boolean>() {
				public Boolean call(Map<String, Object> map) throws Exception {
					return map.containsKey("one");
				}
			};
			JavaRDD<Map<String, Object>> filtered = esRDD.filter(filter);
			for (Map<String, Object> map : filtered.collect()) {
				System.out.println(map);
			}
		}

		jsc.close();
	}
}

2. 使用spark sql来读写elasticsearch

此例子为了演示join而写,没有实际的意义。

import java.io.Serializable;  
import java.util.ArrayList;  
import java.util.List;  
  
import org.apache.spark.SparkConf;  
import org.apache.spark.api.java.JavaSparkContext;  
import org.apache.spark.sql.DataFrame;  
import org.apache.spark.sql.Row;  
import org.apache.spark.sql.SQLContext;  
import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL;  
  
public class SparkSql implements Serializable {  
  
    private static final long serialVersionUID = -8843264837345502547L;  
  
    public static class People {  
        private int id;  
        private String name;  
        private String surname;  
        private int age;  
  
        public People(int id, String name, String surname, int age) {  
            this.id = id;  
            this.name = name;  
            this.surname = surname;  
            this.age = age;  
        }  
  
        public People() {  
        }  
  
        public int getId() {  
            return id;  
        }  
  
        public void setId(int id) {  
            this.id = id;  
        }  
  
        public String getName() {  
            return name;  
        }  
  
        public void setName(String name) {  
            this.name = name;  
        }  
  
        public String getSurname() {  
            return surname;  
        }  
  
        public void setSurname(String surname) {  
            this.surname = surname;  
        }  
  
        public int getAge() {  
            return age;  
        }  
  
        public void setAge(int age) {  
            this.age = age;  
        }  
  
    }  
  
    public static void main(String[] args) {  
        SparkConf conf = new SparkConf().setAppName("Test").setMaster("local");  
        conf.set("es.index.auto.create", "true");  
  
        JavaSparkContext jsc = new JavaSparkContext(conf);  
  
        // 写入  
        {  
            SQLContext sc = new SQLContext(jsc);  
  
            List<People> data = new ArrayList<People>();  
            data.add(new People(1, "Micheal", "Mike", 18));  
            data.add(new People(2, "Flowaters", "fw", 18));  
  
            DataFrame people = sc.createDataFrame(data, People.class);  
  
            JavaEsSparkSQL.saveToEs(people, "spark/person");  
        }  
  
        // 读取  
        {  
            SQLContext sql = new SQLContext(jsc);  
  
            // 注册表people  
            DataFrame people = sql.read().format("es").load("spark/people");  
            people.registerTempTable("people");  
  
            // 注册表person  
            DataFrame person = sql.read().format("es").load("spark/person");  
            person.registerTempTable("person");  
  
            // 查看表的schema  
            // df.printSchema();  
  
            // 执行sql  
            DataFrame df = sql  
                    .sql("SELECT ps.id, p.name FROM people as p INNER JOIN person as ps ON p.name = ps.name");  
  
            // 输出执行的结果  
            for (Row row : df.collectAsList()) {  
                System.out.println(row.toString());  
            }  
        }  
  
        jsc.close();  
    }  
}  
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL;

public class SparkSql implements Serializable {

	private static final long serialVersionUID = -8843264837345502547L;

	public static class People {
		private int id;
		private String name;
		private String surname;
		private int age;

		public People(int id, String name, String surname, int age) {
			this.id = id;
			this.name = name;
			this.surname = surname;
			this.age = age;
		}

		public People() {
		}

		public int getId() {
			return id;
		}

		public void setId(int id) {
			this.id = id;
		}

		public String getName() {
			return name;
		}

		public void setName(String name) {
			this.name = name;
		}

		public String getSurname() {
			return surname;
		}

		public void setSurname(String surname) {
			this.surname = surname;
		}

		public int getAge() {
			return age;
		}

		public void setAge(int age) {
			this.age = age;
		}

	}

	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setAppName("Test").setMaster("local");
		conf.set("es.index.auto.create", "true");

		JavaSparkContext jsc = new JavaSparkContext(conf);

		// 写入
		{
			SQLContext sc = new SQLContext(jsc);

			List<People> data = new ArrayList<People>();
			data.add(new People(1, "Micheal", "Mike", 18));
			data.add(new People(2, "Flowaters", "fw", 18));

			DataFrame people = sc.createDataFrame(data, People.class);

			JavaEsSparkSQL.saveToEs(people, "spark/person");
		}

		// 读取
		{
			SQLContext sql = new SQLContext(jsc);

			// 注册表people
			DataFrame people = sql.read().format("es").load("spark/people");
			people.registerTempTable("people");

			// 注册表person
			DataFrame person = sql.read().format("es").load("spark/person");
			person.registerTempTable("person");

			// 查看表的schema
			// df.printSchema();

			// 执行sql
			DataFrame df = sql
					.sql("SELECT ps.id, p.name FROM people as p INNER JOIN person as ps ON p.name = ps.name");

			// 输出执行的结果
			for (Row row : df.collectAsList()) {
				System.out.println(row.toString());
			}
		}

		jsc.close();
	}
}

参考资料:

1. elasticsearch官方网站:https://www.elastic.co/

2. elasticsearch官方对spark的支持:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html

相关推荐