SpringBoot整合Elasticsearch7

SpringBoot连接ElasticSearch有以下种方式,

  • TransportClient,9300端口,在 7.x 中已经被弃用,据说在8.x 中将完全删除
  • restClient,9200端口,
  • high level client,新推出的连接方式,基于restClient。使用的版本需要保持和ES服务端的版本一致。

Spring boot 2的spring-boot-starter-data-elasticsearch支持的Elasticsearch版本是2.X,

Elasticsearch已迭代到7.X.X版本,建议使用high-level-client进行链接。

pom.xml

需要指定版本号

<!-- elasticsearch -->
		<dependency>
			<groupId>org.elasticsearch</groupId>
			<artifactId>elasticsearch</artifactId>
			<version>7.7.0</version>
		</dependency>
		<dependency>
			<groupId>org.elasticsearch.client</groupId>
			<artifactId>elasticsearch-rest-high-level-client</artifactId>
			<version>7.7.0</version>
		</dependency>
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.56</version>
		</dependency>

存储的对象

package com.ah.es.pojo;

public class Book {
	private Integer bookId;
	private String name;

	public Book() {
	}

	public Book(Integer bookId, String name) {
		this.bookId = bookId;
		this.name = name;
	}

	@Override
	public String toString() {
		return "Book [bookId=" + bookId + ", name=" + name + "]";
	}

	public Integer getBookId() {
		return bookId;
	}

	public void setBookId(Integer bookId) {
		this.bookId = bookId;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}
}

EsEntity·Es保存的对象

package com.ah.es.util;

public final class EsEntity<T> {

	private String id;
	private T data;

	public EsEntity() {
	}

	public EsEntity(String id, T data) {
		this.data = data;
		this.id = id;
	}

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public T getData() {
		return data;
	}

	public void setData(T data) {
		this.data = data;
	}
}

INDEX内容

src/main/resources/es.txt

{
  "properties": {
    "id":{
      "type":"integer"
    },
    "bookId":{
      "type":"integer"
    },
    "name":{
      "type":"text",
      "analyzer": "ik_max_word",
      "search_analyzer": "ik_smart"
    }
  }
}

EsUtil·工具类

package com.ah.es.util;

import com.alibaba.fastjson.JSON;
import org.apache.http.HttpHost;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.*;
import org.elasticsearch.action.search.*;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.*;
import org.elasticsearch.client.indices.*;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.reindex.*;
import org.elasticsearch.search.*;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.*;
import java.util.*;
@Component
public class EsUtil {
	@Value("192.168.16.128")
	public String host;
	@Value("9200")
	public int port;
	@Value("http")
	public String scheme;
	public static final String INDEX_NAME = "book-index";
	public static String CREATE_INDEX;
	public static RestHighLevelClient restClient = null;

	private static String readFileToString(String filePath) {
		File file = new File(filePath);
		System.out.println(file.getAbsolutePath());
		try (FileReader reader = new FileReader(file)) {
			BufferedReader bReader = new BufferedReader(reader);
			StringBuilder sb = new StringBuilder();
			String s = "";
			while ((s = bReader.readLine()) != null) {
				sb.append(s + "\n");
			}
			return sb.toString();
		} catch (IOException e1) {
			e1.printStackTrace();
		}
		return "";
	}

	@PostConstruct
	public void init() {
		CREATE_INDEX = readFileToString("src/main/resources/es.txt");
		System.out.println("CREATE_INDEX = " + CREATE_INDEX);
		try {
			if (restClient != null) {
				restClient.close();
			}
			restClient = new RestHighLevelClient(RestClient.builder(new HttpHost(host, port, scheme)));
			if (this.indexExist(INDEX_NAME)) {
				return;
			}
			CreateIndexRequest request = new CreateIndexRequest(INDEX_NAME);
			request.settings(Settings.builder().put("index.number_of_shards", 3).put("index.number_of_replicas", 2));
			request.mapping(CREATE_INDEX, XContentType.JSON);
			CreateIndexResponse res = restClient.indices().create(request, RequestOptions.DEFAULT);
			if (!res.isAcknowledged()) {
				throw new RuntimeException("初始化失败");
			}
		} catch (Exception e) {
			e.printStackTrace();
			System.exit(0);
		}
	}

	public boolean indexExist(String index) throws Exception {
		GetIndexRequest request = new GetIndexRequest(index);
		request.local(false);
		request.humanReadable(true);
		request.includeDefaults(false);
		return restClient.indices().exists(request, RequestOptions.DEFAULT);
	}

	public IndexResponse insertOrUpdateOne(String index, EsEntity entity) {
		IndexRequest request = new IndexRequest(index);
		request.id(entity.getId());
		request.source(JSON.toJSONString(entity.getData()), XContentType.JSON);
		try {
			return restClient.index(request, RequestOptions.DEFAULT);
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

	public BulkResponse insertBatch(String index, List<EsEntity> list) {
		BulkRequest request = new BulkRequest();
		for (EsEntity item : list) {
			String _json = JSON.toJSONString(item.getData());
			String _id = item.getId();
			IndexRequest indexRequest = new IndexRequest(index).id(_id).source(_json, XContentType.JSON);
			request.add(indexRequest);
		}
		try {
			return restClient.bulk(request, RequestOptions.DEFAULT);
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

	public <T> List<T> search(String index, SearchSourceBuilder searchSourceBuilder, Class<T> resultClass) {
		SearchRequest request = new SearchRequest(index);
		request.source(searchSourceBuilder);
		try {
			SearchResponse response = restClient.search(request, RequestOptions.DEFAULT);

			SearchHits hits1 = response.getHits();
			SearchHit[] hits2 = hits1.getHits();
			List<T> retList = new ArrayList<>(hits2.length);
			for (SearchHit hit : hits2) {
				String strJson = hit.getSourceAsString();
				retList.add(JSON.parseObject(strJson, resultClass));
			}
			return retList;
		} catch (Exception e) {
			e.printStackTrace();
			throw new RuntimeException(e);
		}
	}

	public AcknowledgedResponse deleteIndex(String index) {
		try {
			IndicesClient indicesClient = restClient.indices();
			DeleteIndexRequest request = new DeleteIndexRequest(index);
			AcknowledgedResponse response = indicesClient.delete(request, RequestOptions.DEFAULT);
			return response;
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

	public BulkByScrollResponse deleteByQuery(String index, QueryBuilder builder) {
		DeleteByQueryRequest request = new DeleteByQueryRequest(index);
		request.setQuery(builder);
		request.setBatchSize(10000);
		request.setConflicts("proceed");
		try {
			BulkByScrollResponse response = restClient.deleteByQuery(request, RequestOptions.DEFAULT);
			return response;
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

	public <T> BulkResponse deleteBatch(String index, Collection<T> idList) {
		BulkRequest request = new BulkRequest();
		for (T t : idList) {
			request.add(new DeleteRequest(index, t.toString()));
		}
		try {
			BulkResponse response = restClient.bulk(request, RequestOptions.DEFAULT);
			return response;
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}
}

ES调用方

package com.ah.es;

import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.index.query.*;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.ah.es.pojo.Book;
import com.ah.es.util.*;

import java.util.*;

@Component
public class EsService {

	@Autowired
	private EsUtil esUtil;

	public List<Book> getAll() {
		return esUtil.search(EsUtil.INDEX_NAME, new SearchSourceBuilder(), Book.class);
	}

	public Book getByBookId(int bookId) {
		SearchSourceBuilder builder = new SearchSourceBuilder();
		builder.query(new TermQueryBuilder("bookId", bookId));
		List<Book> res = esUtil.search(EsUtil.INDEX_NAME, builder, Book.class);
		if (res.size() > 0) {
			return res.get(0);
		} else {
			return null;
		}
	}

	public List<Book> searchByKey(String key) {
		BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
		boolQueryBuilder.must(QueryBuilders.matchQuery("name", key));
		SearchSourceBuilder builder = new SearchSourceBuilder();
		builder.size(10).query(boolQueryBuilder);
		return esUtil.search(EsUtil.INDEX_NAME, builder, Book.class);
	}

	public IndexResponse putOne(Book book) {
		EsEntity<Book> entity = new EsEntity<>(book.getBookId() + "", book);
		return esUtil.insertOrUpdateOne(EsUtil.INDEX_NAME, entity);
	}

	public BulkResponse putBatch(List<Book> books) {
		List<EsEntity> list = new ArrayList<>();
		books.forEach(item -> list.add(new EsEntity<>(item.getBookId() + "", item)));
		return esUtil.insertBatch(EsUtil.INDEX_NAME, list);
	}

	public BulkByScrollResponse deleteById(int id) {
		return esUtil.deleteByQuery(EsUtil.INDEX_NAME, new TermQueryBuilder("bookId", id));
	}

	public BulkResponse deleteBatch(List<Integer> list) {
		return esUtil.deleteBatch(EsUtil.INDEX_NAME, list);
	}

}

测试类

package com.ah.es;

import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.ah.es.pojo.Book;

import java.util.ArrayList;
import java.util.List;

@RunWith(SpringRunner.class)
@SpringBootTest
public class EsTest {

	@Autowired
	private EsService bookService;
	
	@Test
	public void testAll() throws InterruptedException {
		t1AddOne();
		t2AddBatch();
		Thread.sleep(1000);
		t3FindAll();
		t4search();
		t5deleteOne();
		t6deleteBatch();
		Thread.sleep(1000);
		t7FindAll();
		
	}

	@Test
	public void t1AddOne() {
		IndexResponse putOne = bookService.putOne(new Book(1, "西游记"));
		System.out.println("【1】putOne:" + putOne);
	}

	@Test
	public void t2AddBatch() {
		List<Book> list = new ArrayList<>();
		list.add(new Book(2, "水浒传"));
		list.add(new Book(3, "三国演义"));
		BulkResponse putBatch = bookService.putBatch(list);
		System.out.println("【2】putBatch:" + putBatch.status());
	}

	@Test
	public void t3FindAll() {
		System.out.println("【3】");
		List<Book> res = bookService.getAll();
		System.out.println("↓↓↓findAll");
		res.forEach(System.out::println);
		System.out.println("↑↑↑findAll");
	}

	@Test
	public void t4search() {
		System.out.println("【4】");
		List<Book> searchByKey = bookService.searchByKey("水传");
		searchByKey.forEach(System.out::println);

		Book book = bookService.getByBookId(2);
		System.out.println("【4】getByBookId:" + book);
	}

	@Test
	public void t5deleteOne() {
		BulkByScrollResponse deleteById = bookService.deleteById(1);
		System.out.println("【5】deleteById:" + deleteById.getStatus());
	}

	@Test
	public void t6deleteBatch() {
		List<Integer> ids = new ArrayList<>();
		ids.add(2);
		ids.add(3);
		BulkResponse deleteBatch = bookService.deleteBatch(ids);
		System.out.println("【6】deleteBatch:" + deleteBatch.status());
	}

	@Test
	public void t7FindAll() {
		System.out.println("【7】");
		List<Book> res = bookService.getAll();
		System.out.println("↓↓↓findAll");
		res.forEach(System.out::println);
		System.out.println("↑↑↑findAll");
	}
}

运行结果:

【1】putOne:IndexResponse[index=book-index,type=_doc,id=1,version=5,result=created,seqNo=51,primaryTerm=1,shards={"total":3,"successful":1,"failed":0}]
【2】putBatch:OK
【3】
↓↓↓findAll
Book [bookId=2, name=水浒传]
Book [bookId=3, name=三国演义]
Book [bookId=1, name=西游记]
↑↑↑findAll
【4】
Book [bookId=2, name=水浒传]
【4】getByBookId:Book [bookId=2, name=水浒传]
【5】deleteById:BulkIndexByScrollResponse[sliceId=null,updated=0,created=0,deleted=1,batches=1,versionConflicts=0,noops=0,retries=0,throttledUntil=0s]
【6】deleteBatch:OK
【7】
↓↓↓findAll
↑↑↑findAll

相关推荐