SpringBoot整合Elasticsearch7
SpringBoot连接ElasticSearch有以下种方式,
- TransportClient,9300端口,在 7.x 中已经被弃用,据说在8.x 中将完全删除
- restClient,9200端口,
- high level client,新推出的连接方式,基于restClient。使用的版本需要保持和ES服务端的版本一致。
Spring boot 2的spring-boot-starter-data-elasticsearch支持的Elasticsearch版本是2.X,
Elasticsearch已迭代到7.X.X版本,建议使用high-level-client进行链接。
pom.xml
需要指定版本号
<!-- elasticsearch --> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.7.0</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.7.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.56</version> </dependency>
存储的对象
package com.ah.es.pojo; public class Book { private Integer bookId; private String name; public Book() { } public Book(Integer bookId, String name) { this.bookId = bookId; this.name = name; } @Override public String toString() { return "Book [bookId=" + bookId + ", name=" + name + "]"; } public Integer getBookId() { return bookId; } public void setBookId(Integer bookId) { this.bookId = bookId; } public String getName() { return name; } public void setName(String name) { this.name = name; } }
EsEntity·Es保存的对象
package com.ah.es.util; public final class EsEntity<T> { private String id; private T data; public EsEntity() { } public EsEntity(String id, T data) { this.data = data; this.id = id; } public String getId() { return id; } public void setId(String id) { this.id = id; } public T getData() { return data; } public void setData(T data) { this.data = data; } }
INDEX内容
src/main/resources/es.txt
{ "properties": { "id":{ "type":"integer" }, "bookId":{ "type":"integer" }, "name":{ "type":"text", "analyzer": "ik_max_word", "search_analyzer": "ik_smart" } } }
EsUtil·工具类
package com.ah.es.util; import com.alibaba.fastjson.JSON; import org.apache.http.HttpHost; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.bulk.*; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.*; import org.elasticsearch.action.search.*; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.*; import org.elasticsearch.client.indices.*; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.reindex.*; import org.elasticsearch.search.*; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.io.*; import java.util.*; @Component public class EsUtil { @Value("192.168.16.128") public String host; @Value("9200") public int port; @Value("http") public String scheme; public static final String INDEX_NAME = "book-index"; public static String CREATE_INDEX; public static RestHighLevelClient restClient = null; private static String readFileToString(String filePath) { File file = new File(filePath); System.out.println(file.getAbsolutePath()); try (FileReader reader = new FileReader(file)) { BufferedReader bReader = new BufferedReader(reader); StringBuilder sb = new StringBuilder(); String s = ""; while ((s = bReader.readLine()) != null) { sb.append(s + "\n"); } return sb.toString(); } catch (IOException e1) { e1.printStackTrace(); } return ""; } @PostConstruct public void init() { CREATE_INDEX = readFileToString("src/main/resources/es.txt"); System.out.println("CREATE_INDEX = " + CREATE_INDEX); try { if (restClient != null) { restClient.close(); } restClient = new RestHighLevelClient(RestClient.builder(new HttpHost(host, port, scheme))); if (this.indexExist(INDEX_NAME)) { return; } CreateIndexRequest request = new CreateIndexRequest(INDEX_NAME); request.settings(Settings.builder().put("index.number_of_shards", 3).put("index.number_of_replicas", 2)); request.mapping(CREATE_INDEX, XContentType.JSON); CreateIndexResponse res = restClient.indices().create(request, RequestOptions.DEFAULT); if (!res.isAcknowledged()) { throw new RuntimeException("初始化失败"); } } catch (Exception e) { e.printStackTrace(); System.exit(0); } } public boolean indexExist(String index) throws Exception { GetIndexRequest request = new GetIndexRequest(index); request.local(false); request.humanReadable(true); request.includeDefaults(false); return restClient.indices().exists(request, RequestOptions.DEFAULT); } public IndexResponse insertOrUpdateOne(String index, EsEntity entity) { IndexRequest request = new IndexRequest(index); request.id(entity.getId()); request.source(JSON.toJSONString(entity.getData()), XContentType.JSON); try { return restClient.index(request, RequestOptions.DEFAULT); } catch (Exception e) { throw new RuntimeException(e); } } public BulkResponse insertBatch(String index, List<EsEntity> list) { BulkRequest request = new BulkRequest(); for (EsEntity item : list) { String _json = JSON.toJSONString(item.getData()); String _id = item.getId(); IndexRequest indexRequest = new IndexRequest(index).id(_id).source(_json, XContentType.JSON); request.add(indexRequest); } try { return restClient.bulk(request, RequestOptions.DEFAULT); } catch (Exception e) { throw new RuntimeException(e); } } public <T> List<T> search(String index, SearchSourceBuilder searchSourceBuilder, Class<T> resultClass) { SearchRequest request = new SearchRequest(index); request.source(searchSourceBuilder); try { SearchResponse response = restClient.search(request, RequestOptions.DEFAULT); SearchHits hits1 = response.getHits(); SearchHit[] hits2 = hits1.getHits(); List<T> retList = new ArrayList<>(hits2.length); for (SearchHit hit : hits2) { String strJson = hit.getSourceAsString(); retList.add(JSON.parseObject(strJson, resultClass)); } return retList; } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); } } public AcknowledgedResponse deleteIndex(String index) { try { IndicesClient indicesClient = restClient.indices(); DeleteIndexRequest request = new DeleteIndexRequest(index); AcknowledgedResponse response = indicesClient.delete(request, RequestOptions.DEFAULT); return response; } catch (Exception e) { throw new RuntimeException(e); } } public BulkByScrollResponse deleteByQuery(String index, QueryBuilder builder) { DeleteByQueryRequest request = new DeleteByQueryRequest(index); request.setQuery(builder); request.setBatchSize(10000); request.setConflicts("proceed"); try { BulkByScrollResponse response = restClient.deleteByQuery(request, RequestOptions.DEFAULT); return response; } catch (Exception e) { throw new RuntimeException(e); } } public <T> BulkResponse deleteBatch(String index, Collection<T> idList) { BulkRequest request = new BulkRequest(); for (T t : idList) { request.add(new DeleteRequest(index, t.toString())); } try { BulkResponse response = restClient.bulk(request, RequestOptions.DEFAULT); return response; } catch (Exception e) { throw new RuntimeException(e); } } }
ES调用方
package com.ah.es; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.index.query.*; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.ah.es.pojo.Book; import com.ah.es.util.*; import java.util.*; @Component public class EsService { @Autowired private EsUtil esUtil; public List<Book> getAll() { return esUtil.search(EsUtil.INDEX_NAME, new SearchSourceBuilder(), Book.class); } public Book getByBookId(int bookId) { SearchSourceBuilder builder = new SearchSourceBuilder(); builder.query(new TermQueryBuilder("bookId", bookId)); List<Book> res = esUtil.search(EsUtil.INDEX_NAME, builder, Book.class); if (res.size() > 0) { return res.get(0); } else { return null; } } public List<Book> searchByKey(String key) { BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); boolQueryBuilder.must(QueryBuilders.matchQuery("name", key)); SearchSourceBuilder builder = new SearchSourceBuilder(); builder.size(10).query(boolQueryBuilder); return esUtil.search(EsUtil.INDEX_NAME, builder, Book.class); } public IndexResponse putOne(Book book) { EsEntity<Book> entity = new EsEntity<>(book.getBookId() + "", book); return esUtil.insertOrUpdateOne(EsUtil.INDEX_NAME, entity); } public BulkResponse putBatch(List<Book> books) { List<EsEntity> list = new ArrayList<>(); books.forEach(item -> list.add(new EsEntity<>(item.getBookId() + "", item))); return esUtil.insertBatch(EsUtil.INDEX_NAME, list); } public BulkByScrollResponse deleteById(int id) { return esUtil.deleteByQuery(EsUtil.INDEX_NAME, new TermQueryBuilder("bookId", id)); } public BulkResponse deleteBatch(List<Integer> list) { return esUtil.deleteBatch(EsUtil.INDEX_NAME, list); } }
测试类
package com.ah.es; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import com.ah.es.pojo.Book; import java.util.ArrayList; import java.util.List; @RunWith(SpringRunner.class) @SpringBootTest public class EsTest { @Autowired private EsService bookService; @Test public void testAll() throws InterruptedException { t1AddOne(); t2AddBatch(); Thread.sleep(1000); t3FindAll(); t4search(); t5deleteOne(); t6deleteBatch(); Thread.sleep(1000); t7FindAll(); } @Test public void t1AddOne() { IndexResponse putOne = bookService.putOne(new Book(1, "西游记")); System.out.println("【1】putOne:" + putOne); } @Test public void t2AddBatch() { List<Book> list = new ArrayList<>(); list.add(new Book(2, "水浒传")); list.add(new Book(3, "三国演义")); BulkResponse putBatch = bookService.putBatch(list); System.out.println("【2】putBatch:" + putBatch.status()); } @Test public void t3FindAll() { System.out.println("【3】"); List<Book> res = bookService.getAll(); System.out.println("↓↓↓findAll"); res.forEach(System.out::println); System.out.println("↑↑↑findAll"); } @Test public void t4search() { System.out.println("【4】"); List<Book> searchByKey = bookService.searchByKey("水传"); searchByKey.forEach(System.out::println); Book book = bookService.getByBookId(2); System.out.println("【4】getByBookId:" + book); } @Test public void t5deleteOne() { BulkByScrollResponse deleteById = bookService.deleteById(1); System.out.println("【5】deleteById:" + deleteById.getStatus()); } @Test public void t6deleteBatch() { List<Integer> ids = new ArrayList<>(); ids.add(2); ids.add(3); BulkResponse deleteBatch = bookService.deleteBatch(ids); System.out.println("【6】deleteBatch:" + deleteBatch.status()); } @Test public void t7FindAll() { System.out.println("【7】"); List<Book> res = bookService.getAll(); System.out.println("↓↓↓findAll"); res.forEach(System.out::println); System.out.println("↑↑↑findAll"); } }
运行结果:
【1】putOne:IndexResponse[index=book-index,type=_doc,id=1,version=5,result=created,seqNo=51,primaryTerm=1,shards={"total":3,"successful":1,"failed":0}] 【2】putBatch:OK 【3】 ↓↓↓findAll Book [bookId=2, name=水浒传] Book [bookId=3, name=三国演义] Book [bookId=1, name=西游记] ↑↑↑findAll 【4】 Book [bookId=2, name=水浒传] 【4】getByBookId:Book [bookId=2, name=水浒传] 【5】deleteById:BulkIndexByScrollResponse[sliceId=null,updated=0,created=0,deleted=1,batches=1,versionConflicts=0,noops=0,retries=0,throttledUntil=0s] 【6】deleteBatch:OK 【7】 ↓↓↓findAll ↑↑↑findAll
相关推荐
newbornzhao 2020-09-14
做对一件事很重要 2020-09-07
renjinlong 2020-09-03
明瞳 2020-08-19
李玉志 2020-08-19
mengyue 2020-08-07
molong0 2020-08-06
AFei00 2020-08-03
molong0 2020-08-03
wenwentana 2020-08-03
YYDU 2020-08-03
另外一部分,则需要先做聚类、分类处理,将聚合出的分类结果存入ES集群的聚类索引中。数据处理层的聚合结果存入ES中的指定索引,同时将每个聚合主题相关的数据存入每个document下面的某个field下。
sifeimeng 2020-08-03
心丨悦 2020-08-03
liangwenrong 2020-07-31
sifeimeng 2020-08-01
mengyue 2020-07-30
tigercn 2020-07-29
IceStreamLab 2020-07-29