温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

使用java怎么对elasticsearch进行操作

发布时间:2021-03-29 17:18:31 来源:亿速云 阅读:259 作者:Leah 栏目:编程语言

这期内容当中小编将会给大家带来有关使用java怎么对elasticsearch进行操作,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

Java操作es集群步骤1:配置集群对象信息;2:创建客户端;3:查看集群信息

1:集群名称

默认集群名为elasticsearch,如果集群名称和指定的不一致则在使用节点资源时会报错。

2:嗅探功能

通过client.transport.sniff启动嗅探功能,这样只需要指定集群中的某一个节点(不一定是主节点),然后会加载集群中的其他节点,这样只要程序不停即使此节点宕机仍然可以连接到其他节点。

3:查询类型SearchType.QUERY_THEN_FETCH

es 查询共有4种查询类型

QUERY_AND_FETCH:

主节点将查询请求分发到所有的分片中,各个分片按照自己的查询规则即词频文档频率进行打分排序,然后将结果返回给主节点,主节点对所有数据进行汇总排序然后再返回给客户端,此种方式只需要和es交互一次。

这种查询方式存在数据量和排序问题,主节点会汇总所有分片返回的数据这样数据量会比较大,二是各个分片上的规则可能不一致。

QUERY_THEN_FETCH:

主节点将请求分发给所有分片,各个分片打分排序后将数据的id和分值返回给主节点,主节点收到后进行汇总排序再根据排序后的id到对应的节点读取对应的数据再返回给客户端,此种方式需要和es交互两次。

这种方式解决了数据量问题但是排序问题依然存在而且是es的默认查询方式

DEF_QUERY_AND_FETCH: 和 DFS_QUERY_THEN_FETCH:

将各个分片的规则统一起来进行打分。解决了排序问题但是DFS_QUERY_AND_FETCH仍然存在数据量问题,DFS_QUERY_THEN_FETCH两种噢乖你问题都解决但是效率是最差的。

1, 获取client, 两种方式获取

@Before  public void before() throws Exception {   Map<String, String> map = new HashMap<String, String>();    map.put("cluster.name", "elasticsearch_wenbronk");    Settings.Builder settings = Settings.builder().put(map);    client = TransportClient.builder().settings(settings).build()        .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("www.wenbronk.com"), Integer.parseInt("9300")));   }
@Before  public void before11() throws Exception {   // 创建客户端, 使用的默认集群名, "elasticSearch" //  client = TransportClient.builder().build() //    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("www.wenbronk.com"), 9300));   // 通过setting对象指定集群配置信息, 配置的集群名   Settings settings = Settings.settingsBuilder().put("cluster.name", "elasticsearch_wenbronk") // 设置集群名 //    .put("client.transport.sniff", true) // 开启嗅探 , 开启后会一直连接不上, 原因未知 //    .put("network.host", "192.168.50.37")     .put("client.transport.ignore_cluster_name", true) // 忽略集群名字验证, 打开后集群名字不对也能连接上 //    .put("client.transport.nodes_sampler_interval", 5) //报错, //    .put("client.transport.ping_timeout", 5) // 报错, ping等待时间,     .build();    client = TransportClient.builder().settings(settings).build()      .addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress("192.168.50.37", 9300)));    // 默认5s    // 多久打开连接, 默认5s    System.out.println("success connect");  }

PS: 官网给的2种方式都不能用, 需要合起来才能用, 浪费老子一下午...

其他参数的意义:

使用java怎么对elasticsearch进行操作

代码:

package com.wenbronk.javaes; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkProcessor.Listener; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.MultiGetItemResponse; import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.script.Script; import org.junit.Before; import org.junit.Test; import com.alibaba.fastjson.JSONObject; /**  * 使用java API操作elasticSearch  *   * @author 231  *  */ public class JavaESTest {  private TransportClient client;  private IndexRequest source;    /**   * 获取连接, 第一种方式   * @throws Exception   */ // @Before  public void before() throws Exception {   Map<String, String> map = new HashMap<String, String>();    map.put("cluster.name", "elasticsearch_wenbronk");    Settings.Builder settings = Settings.builder().put(map);    client = TransportClient.builder().settings(settings).build()        .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("www.wenbronk.com"), Integer.parseInt("9300")));   } /**   * 查看集群信息   */  @Test  public void testInfo() {   List<DiscoveryNode> nodes = client.connectedNodes();   for (DiscoveryNode node : nodes) {    System.out.println(node.getHostAddress());   }  }    /**   * 组织json串, 方式1,直接拼接   */  public String createJson1() {   String json = "{" +     "\"user\":\"kimchy\"," +     "\"postDate\":\"2013-01-30\"," +     "\"message\":\"trying out Elasticsearch\"" +    "}";   return json;  }    /**   * 使用map创建json   */  public Map<String, Object> createJson2() {   Map<String,Object> json = new HashMap<String, Object>();   json.put("user", "kimchy");   json.put("postDate", new Date());   json.put("message", "trying out elasticsearch");   return json;  }    /**   * 使用fastjson创建   */  public JSONObject createJson3() {   JSONObject json = new JSONObject();   json.put("user", "kimchy");   json.put("postDate", new Date());   json.put("message", "trying out elasticsearch");   return json;  }    /**   * 使用es的帮助类   */  public XContentBuilder createJson4() throws Exception {   // 创建json对象, 其中一个创建json的方式   XContentBuilder source = XContentFactory.jsonBuilder()    .startObject()     .field("user", "kimchy")     .field("postDate", new Date())     .field("message", "trying to out ElasticSearch")    .endObject();   return source;  }    /**   * 存入索引中   * @throws Exception   */  @Test  public void test1() throws Exception {   XContentBuilder source = createJson4();   // 存json入索引中   IndexResponse response = client.prepareIndex("twitter", "tweet", "1").setSource(source).get(); //  // 结果获取   String index = response.getIndex();   String type = response.getType();   String id = response.getId();   long version = response.getVersion();   boolean created = response.isCreated();   System.out.println(index + " : " + type + ": " + id + ": " + version + ": " + created);  }  /**   * get API 获取指定文档信息   */  @Test  public void testGet() { //  GetResponse response = client.prepareGet("twitter", "tweet", "1") //        .get();   GetResponse response = client.prepareGet("twitter", "tweet", "1")     .setOperationThreaded(false) // 线程安全     .get();   System.out.println(response.getSourceAsString());  }    /**   * 测试 delete api   */  @Test  public void testDelete() {   DeleteResponse response = client.prepareDelete("twitter", "tweet", "1")     .get();   String index = response.getIndex();   String type = response.getType();   String id = response.getId();   long version = response.getVersion();   System.out.println(index + " : " + type + ": " + id + ": " + version);  }    /**   * 测试更新 update API   * 使用 updateRequest 对象   * @throws Exception    */  @Test  public void testUpdate() throws Exception {   UpdateRequest updateRequest = new UpdateRequest();   updateRequest.index("twitter");   updateRequest.type("tweet");   updateRequest.id("1");   updateRequest.doc(XContentFactory.jsonBuilder()     .startObject()     // 对没有的字段添加, 对已有的字段替换      .field("gender", "male")      .field("message", "hello")     .endObject());   UpdateResponse response = client.update(updateRequest).get();      // 打印   String index = response.getIndex();   String type = response.getType();   String id = response.getId();   long version = response.getVersion();   System.out.println(index + " : " + type + ": " + id + ": " + version);  }    /**   * 测试update api, 使用client   * @throws Exception    */  @Test  public void testUpdate2() throws Exception {   // 使用Script对象进行更新 //  UpdateResponse response = client.prepareUpdate("twitter", "tweet", "1") //    .setScript(new Script("hits._source.gender = \"male\"")) //    .get();      // 使用XContFactory.jsonBuilder() 进行更新 //  UpdateResponse response = client.prepareUpdate("twitter", "tweet", "1") //    .setDoc(XContentFactory.jsonBuilder() //      .startObject() //       .field("gender", "malelelele") //      .endObject()).get();      // 使用updateRequest对象及script //  UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1") //    .script(new Script("ctx._source.gender=\"male\"")); //  UpdateResponse response = client.update(updateRequest).get();      // 使用updateRequest对象及documents进行更新   UpdateResponse response = client.update(new UpdateRequest("twitter", "tweet", "1")     .doc(XContentFactory.jsonBuilder()       .startObject()        .field("gender", "male")       .endObject()      )).get();   System.out.println(response.getIndex());  }    /**   * 测试update   * 使用updateRequest   * @throws Exception    * @throws InterruptedException    */  @Test  public void testUpdate3() throws InterruptedException, Exception {   UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1")    .script(new Script("ctx._source.gender=\"male\""));   UpdateResponse response = client.update(updateRequest).get();  }    /**   * 测试upsert方法   * @throws Exception    *    */  @Test  public void testUpsert() throws Exception {   // 设置查询条件, 查找不到则添加生效   IndexRequest indexRequest = new IndexRequest("twitter", "tweet", "2")    .source(XContentFactory.jsonBuilder()     .startObject()      .field("name", "214")      .field("gender", "gfrerq")     .endObject());   // 设置更新, 查找到更新下面的设置   UpdateRequest upsert = new UpdateRequest("twitter", "tweet", "2")    .doc(XContentFactory.jsonBuilder()      .startObject()       .field("user", "wenbronk")      .endObject())    .upsert(indexRequest);      client.update(upsert).get();  }    /**   * 测试multi get api   * 从不同的index, type, 和id中获取   */  @Test  public void testMultiGet() {   MultiGetResponse multiGetResponse = client.prepareMultiGet()   .add("twitter", "tweet", "1")   .add("twitter", "tweet", "2", "3", "4")   .add("anothoer", "type", "foo")   .get();      for (MultiGetItemResponse itemResponse : multiGetResponse) {    GetResponse response = itemResponse.getResponse();    if (response.isExists()) {     String sourceAsString = response.getSourceAsString();     System.out.println(sourceAsString);    }   }  }    /**   * bulk 批量执行   * 一次查询可以update 或 delete多个document   */  @Test  public void testBulk() throws Exception {   BulkRequestBuilder bulkRequest = client.prepareBulk();   bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")     .setSource(XContentFactory.jsonBuilder()       .startObject()        .field("user", "kimchy")        .field("postDate", new Date())        .field("message", "trying out Elasticsearch")       .endObject()));   bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")     .setSource(XContentFactory.jsonBuilder()       .startObject()        .field("user", "kimchy")        .field("postDate", new Date())        .field("message", "another post")       .endObject()));   BulkResponse response = bulkRequest.get();   System.out.println(response.getHeaders());  }    /**   * 使用bulk processor   * @throws Exception    */  @Test  public void testBulkProcessor() throws Exception {   // 创建BulkPorcessor对象   BulkProcessor bulkProcessor = BulkProcessor.builder(client, new Listener() {    public void beforeBulk(long paramLong, BulkRequest paramBulkRequest) {     // TODO Auto-generated method stub    }        // 执行出错时执行    public void afterBulk(long paramLong, BulkRequest paramBulkRequest, Throwable paramThrowable) {     // TODO Auto-generated method stub    }        public void afterBulk(long paramLong, BulkRequest paramBulkRequest, BulkResponse paramBulkResponse) {     // TODO Auto-generated method stub    }   })   // 1w次请求执行一次bulk   .setBulkActions(10000)   // 1gb的数据刷新一次bulk   .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))   // 固定5s必须刷新一次   .setFlushInterval(TimeValue.timeValueSeconds(5))   // 并发请求数量, 0不并发, 1并发允许执行   .setConcurrentRequests(1)   // 设置退避, 100ms后执行, 最大请求3次   .setBackoffPolicy(     BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))   .build();      // 添加单次请求   bulkProcessor.add(new IndexRequest("twitter", "tweet", "1"));   bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));      // 关闭   bulkProcessor.awaitClose(10, TimeUnit.MINUTES);   // 或者   bulkProcessor.close();  } }

tes2代码:

package com.wenbronk.javaes; import java.net.InetSocketAddress; import org.apache.lucene.queryparser.xml.FilterBuilderFactory; import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings.Builder; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.sort.SortParseElement; import org.junit.Before; import org.junit.Test; /**  * 使用java API操作elasticSearch  * search API  * @author 231  *  */ public class JavaESTest2 {  private TransportClient client;  /**   * 获取client对象   */  @Before  public void testBefore() {   Builder builder = Settings.settingsBuilder();   builder.put("cluster.name", "wenbronk_escluster"); //    .put("client.transport.ignore_cluster_name", true);   Settings settings = builder.build();      org.elasticsearch.client.transport.TransportClient.Builder transportBuild = TransportClient.builder();   TransportClient client1 = transportBuild.settings(settings).build();   client = client1.addTransportAddress((new InetSocketTransportAddress(new InetSocketAddress("192.168.50.37", 9300))));   System.out.println("success connect to escluster");     }    /**   * 测试查询   */  @Test  public void testSearch() { //  SearchRequestBuilder searchRequestBuilder = client.prepareSearch("twitter", "tweet", "1"); //  SearchResponse response = searchRequestBuilder.setTypes("type1", "type2") //       .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) //       .setQuery(QueryBuilders.termQuery("user", "test")) //       .setPostFilter(QueryBuilders.rangeQuery("age").from(0).to(1)) //       .setFrom(0).setSize(2).setExplain(true) //       .execute().actionGet();   SearchResponse response = client.prepareSearch()     .execute().actionGet(); //  SearchHits hits = response.getHits(); //  for (SearchHit searchHit : hits) { //   for(Iterator<SearchHitField> iterator = searchHit.iterator(); iterator.hasNext(); ) { //    SearchHitField next = iterator.next(); //    System.out.println(next.getValues()); //   } //  }   System.out.println(response);  }    /**   * 测试scroll api   * 对大量数据的处理更有效   */  @Test  public void testScrolls() {   QueryBuilder queryBuilder = QueryBuilders.termQuery("twitter", "tweet");      SearchResponse response = client.prepareSearch("twitter")   .addSort(SortParseElement.DOC_FIELD_NAME, SortOrder.ASC)   .setScroll(new TimeValue(60000))   .setQuery(queryBuilder)   .setSize(100).execute().actionGet();      while(true) {    for (SearchHit hit : response.getHits().getHits()) {     System.out.println("i am coming");    }    SearchResponse response2 = client.prepareSearchScroll(response.getScrollId())     .setScroll(new TimeValue(60000)).execute().actionGet();    if (response2.getHits().getHits().length == 0) {     System.out.println("oh no=====");     break;    }   }     }    /**   * 测试multiSearch   */  @Test  public void testMultiSearch() {   QueryBuilder qb1 = QueryBuilders.queryStringQuery("elasticsearch");   SearchRequestBuilder requestBuilder1 = client.prepareSearch().setQuery(qb1).setSize(1);      QueryBuilder qb2 = QueryBuilders.matchQuery("user", "kimchy");   SearchRequestBuilder requestBuilder2 = client.prepareSearch().setQuery(qb2).setSize(1);      MultiSearchResponse multiResponse = client.prepareMultiSearch().add(requestBuilder1).add(requestBuilder2)     .execute().actionGet();   long nbHits = 0;   for (MultiSearchResponse.Item item : multiResponse.getResponses()) {    SearchResponse response = item.getResponse();    nbHits = response.getHits().getTotalHits();    SearchHit[] hits = response.getHits().getHits();    System.out.println(nbHits);   }     }    /**   * 测试聚合查询   */  @Test  public void testAggregation() {   SearchResponse response = client.prepareSearch()     .setQuery(QueryBuilders.matchAllQuery()) // 先使用query过滤掉一部分     .addAggregation(AggregationBuilders.terms("term").field("user"))     .addAggregation(AggregationBuilders.dateHistogram("agg2").field("birth")      .interval(DateHistogramInterval.YEAR))     .execute().actionGet();   Aggregation aggregation2 = response.getAggregations().get("term");   Aggregation aggregation = response.getAggregations().get("agg2"); //  SearchResponse response2 = client.search(new SearchRequest().searchType(SearchType.QUERY_AND_FETCH)).actionGet();  }    /**   * 测试terminate   */  @Test  public void testTerminateAfter() {   SearchResponse response = client.prepareSearch("twitter").setTerminateAfter(1000).get();   if (response.isTerminatedEarly()) {    System.out.println("ternimate");   }  }    /**   * 过滤查询: 大于gt, 小于lt, 小于等于lte, 大于等于gte   */  @Test  public void testFilter() {   SearchResponse response = client.prepareSearch("twitter")      .setTypes("")      .setQuery(QueryBuilders.matchAllQuery()) //查询所有      .setSearchType(SearchType.QUERY_THEN_FETCH)  //    .setPostFilter(FilterBuilders.rangeFilter("age").from(0).to(19)  //      .includeLower(true).includeUpper(true))  //    .setPostFilter(FilterBuilderFactory .rangeFilter("age").gte(18).lte(22))      .setExplain(true) //explain为true表示根据数据相关度排序,和关键字匹配最高的排在前面      .get();   }    /**   * 分组查询   */  @Test  public void testGroupBy() {   client.prepareSearch("twitter").setTypes("tweet")   .setQuery(QueryBuilders.matchAllQuery())   .setSearchType(SearchType.QUERY_THEN_FETCH)   .addAggregation(AggregationBuilders.terms("user")     .field("user").size(0)  // 根据user进行分组            // size(0) 也是10   ).get();  }  }

上述就是小编为大家分享的使用java怎么对elasticsearch进行操作了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注亿速云行业资讯频道。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI