阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

ElasticSearch的基本用法与集群搭建

123次阅读
没有评论

共计 9393 个字符,预计需要花费 24 分钟才能阅读完成。

一、简介

ElasticSearch 和 Solr 都是基于 Lucene 的搜索引擎,不过 ElasticSearch 天生支持分布式,而 Solr 是 4.0 版本后的 SolrCloud 才是分布式版本,Solr 的分布式支持需要 ZooKeeper 的支持。

这里有一个详细的 ElasticSearch 和 Solr 的对比:http://solr-vs-elasticsearch.com/

二、基本用法

Elasticsearch 集群可以包含多个索引(indices),每一个索引可以包含多个类型(types),每一个类型包含多个文档(documents),然后每个文档包含多个字段(Fields),这种面向文档型的储存,也算是 NoSQL 的一种吧。

ES 比传统关系型数据库,对一些概念上的理解:
Relational DB -> Databases -> Tables -> Rows -> Columns
Elasticsearch -> Indices  -> Types  -> Documents -> Fields

从创建一个 Client 到添加、删除、查询等基本用法:

1、创建 Client

public ElasticSearchService(String ipAddress, int port) {
        client = new TransportClient()
                .addTransportAddress(new InetSocketTransportAddress(ipAddress,
                        port));
    }

这里是一个 TransportClient。

ES 下两种客户端对比:

TransportClient:轻量级的 Client,使用 Netty 线程池,Socket 连接到 ES 集群。本身不加入到集群,只作为请求的处理。

Node Client:客户端节点本身也是 ES 节点,加入到集群,和其他 ElasticSearch 节点一样。频繁的开启和关闭这类 Node Clients 会在集群中产生“噪音”。

2、创建 / 删除 Index 和 Type 信息

 

    // 创建索引
    public void createIndex() {
        client.admin().indices().create(new CreateIndexRequest(IndexName))
                .actionGet();
    }

    // 清除所有索引
    public void deleteIndex() {
        IndicesExistsResponse indicesExistsResponse = client.admin().indices()
                .exists(new IndicesExistsRequest(new String[] {IndexName}))
                .actionGet();
        if (indicesExistsResponse.isExists()) {
            client.admin().indices().delete(new DeleteIndexRequest(IndexName))
                    .actionGet();
        }
    }
   
    // 删除 Index 下的某个 Type
    public void deleteType(){
        client.prepareDelete().setIndex(IndexName).setType(TypeName).execute().actionGet();
    }

    // 定义索引的映射类型
    public void defineIndexTypeMapping() {
        try {
            XContentBuilder mapBuilder = XContentFactory.jsonBuilder();
            mapBuilder.startObject()
            .startObject(TypeName)
                .startObject(“properties”)
                    .startObject(IDFieldName).field(“type”, “long”).field(“store”, “yes”).endObject()
                    .startObject(SeqNumFieldName).field(“type”, “long”).field(“store”, “yes”).endObject()
                    .startObject(IMSIFieldName).field(“type”, “string”).field(“index”, “not_analyzed”).field(“store”, “yes”).endObject()
                    .startObject(IMEIFieldName).field(“type”, “string”).field(“index”, “not_analyzed”).field(“store”, “yes”).endObject()
                    .startObject(DeviceIDFieldName).field(“type”, “string”).field(“index”, “not_analyzed”).field(“store”, “yes”).endObject()
                    .startObject(OwnAreaFieldName).field(“type”, “string”).field(“index”, “not_analyzed”).field(“store”, “yes”).endObject()
                    .startObject(TeleOperFieldName).field(“type”, “string”).field(“index”, “not_analyzed”).field(“store”, “yes”).endObject()
                    .startObject(TimeFieldName).field(“type”, “date”).field(“store”, “yes”).endObject()
                .endObject()
            .endObject()
            .endObject();

            PutMappingRequest putMappingRequest = Requests
                    .putMappingRequest(IndexName).type(TypeName)
                    .source(mapBuilder);
            client.admin().indices().putMapping(putMappingRequest).actionGet();
        } catch (IOException e) {
            log.error(e.toString());
        }
    }

 

这里自定义了某个 Type 的索引映射(Mapping),默认 ES 会自动处理数据类型的映射:针对整型映射为 long,浮点数为 double,字符串映射为 string,时间为 date,true 或 false 为 boolean。

注意:针对字符串,ES 默认会做“analyzed”处理,即先做分词、去掉 stop words 等处理再 index。如果你需要把一个字符串做为整体被索引到,需要把这个字段这样设置:field(“index”, “not_analyzed”)。

详情参考:https://www.elastic.co/guide/en/elasticsearch/guide/current/mapping-intro.html

3、索引数据

 

    // 批量索引数据
    public void indexHotSpotDataList(List<Hotspotdata> dataList) {
        if (dataList != null) {
            int size = dataList.size();
            if (size > 0) {
                BulkRequestBuilder bulkRequest = client.prepareBulk();
                for (int i = 0; i < size; ++i) {
                    Hotspotdata data = dataList.get(i);
                    String jsonSource = getIndexDataFromHotspotData(data);
                    if (jsonSource != null) {
                        bulkRequest.add(client
                                .prepareIndex(IndexName, TypeName,
                                        data.getId().toString())
                                .setRefresh(true).setSource(jsonSource));
                    }
                }

                BulkResponse bulkResponse = bulkRequest.execute().actionGet();
                if (bulkResponse.hasFailures()) {
                    Iterator<BulkItemResponse> iter = bulkResponse.iterator();
                    while (iter.hasNext()) {
                        BulkItemResponse itemResponse = iter.next();
                        if (itemResponse.isFailed()) {
                            log.error(itemResponse.getFailureMessage());
                        }
                    }
                }
            }
        }
    }

    // 索引数据
    public boolean indexHotspotData(Hotspotdata data) {
        String jsonSource = getIndexDataFromHotspotData(data);
        if (jsonSource != null) {
            IndexRequestBuilder requestBuilder = client.prepareIndex(IndexName,
                    TypeName).setRefresh(true);
            requestBuilder.setSource(jsonSource)
                    .execute().actionGet();
            return true;
        }

        return false;
    }

    // 得到索引字符串
    public String getIndexDataFromHotspotData(Hotspotdata data) {
        String jsonString = null;
        if (data != null) {
            try {
                XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
                jsonBuilder.startObject().field(IDFieldName, data.getId())
                        .field(SeqNumFieldName, data.getSeqNum())
                        .field(IMSIFieldName, data.getImsi())
                        .field(IMEIFieldName, data.getImei())
                        .field(DeviceIDFieldName, data.getDeviceID())
                        .field(OwnAreaFieldName, data.getOwnArea())
                        .field(TeleOperFieldName, data.getTeleOper())
                        .field(TimeFieldName, data.getCollectTime())
                        .endObject();
                jsonString = jsonBuilder.string();
            } catch (IOException e) {
                log.equals(e);
            }
        }

        return jsonString;
    }

 

ES 支持批量和单个数据索引。

4、查询获取数据

 

    // 获取少量数据 100 个
    private List<Integer> getSearchData(QueryBuilder queryBuilder) {
        List<Integer> ids = new ArrayList<>();
        SearchResponse searchResponse = client.prepareSearch(IndexName)
                .setTypes(TypeName).setQuery(queryBuilder).setSize(100)
                .execute().actionGet();
        SearchHits searchHits = searchResponse.getHits();
        for (SearchHit searchHit : searchHits) {
            Integer id = (Integer) searchHit.getSource().get(“id”);
            ids.add(id);
        }
        return ids;
    }

    // 获取大量数据
    private List<Integer> getSearchDataByScrolls(QueryBuilder queryBuilder) {
        List<Integer> ids = new ArrayList<>();
        // 一次获取 100000 数据
        SearchResponse scrollResp = client.prepareSearch(IndexName)
                .setSearchType(SearchType.SCAN).setScroll(new TimeValue(60000))
                .setQuery(queryBuilder).setSize(100000).execute().actionGet();
        while (true) {
            for (SearchHit searchHit : scrollResp.getHits().getHits()) {
                Integer id = (Integer) searchHit.getSource().get(IDFieldName);
                ids.add(id);
            }
            scrollResp = client.prepareSearchScroll(scrollResp.getScrollId())
                    .setScroll(new TimeValue(600000)).execute().actionGet();
            if (scrollResp.getHits().getHits().length == 0) {
                break;
            }
        }

        return ids;
    }

 

这里的 QueryBuilder 是一个查询条件,ES 支持分页查询获取数据,也可以一次性获取大量数据,需要使用 Scroll Search。

5、聚合(Aggregation Facet)查询

 

    // 得到某段时间内设备列表上每个设备的数据分布情况 < 设备 ID,数量 >
    public Map<String, String> getDeviceDistributedInfo(String startTime,
            String endTime, List<String> deviceList) {

        Map<String, String> resultsMap = new HashMap<>();

        QueryBuilder deviceQueryBuilder = getDeviceQueryBuilder(deviceList);
        QueryBuilder rangeBuilder = getDateRangeQueryBuilder(startTime, endTime);
        QueryBuilder queryBuilder = QueryBuilders.boolQuery()
                .must(deviceQueryBuilder).must(rangeBuilder);

        TermsBuilder termsBuilder = AggregationBuilders.terms(“DeviceIDAgg”).size(Integer.MAX_VALUE)
                .field(DeviceIDFieldName);
        SearchResponse searchResponse = client.prepareSearch(IndexName)
                .setQuery(queryBuilder).addAggregation(termsBuilder)
                .execute().actionGet();
        Terms terms = searchResponse.getAggregations().get(“DeviceIDAgg”);
        if (terms != null) {
            for (Terms.Bucket entry : terms.getBuckets()) {
                resultsMap.put(entry.getKey(),
                        String.valueOf(entry.getDocCount()));
            }
        }
        return resultsMap;
    }

 

Aggregation 查询可以查询类似统计分析这样的功能:如某个月的数据分布情况,某类数据的最大、最小、总和、平均值等。

详情参考:https://www.elastic.co/guide/en/elasticsearch/client/Java-api/current/java-aggs.html

三、集群配置

配置文件 elasticsearch.yml

集群名和节点名:

#cluster.name: elasticsearch

#node.name: “Franz Kafka”

是否参与 master 选举和是否存储数据

#node.master: true

#node.data: true

分片数和副本数

#index.number_of_shards: 5
#index.number_of_replicas: 1

master 选举最少的节点数,这个一定要设置为整个集群节点个数的一半加 1,即 N /2+1

#discovery.zen.minimum_master_nodes: 1

discovery ping 的超时时间,拥塞网络,网络状态不佳的情况下设置高一点

#discovery.zen.ping.timeout: 3s

注意,分布式系统整个集群节点个数 N 要为奇数个!!

四、Elasticsearch 插件

1、elasticsearch-head 是一个 elasticsearch 的集群管理工具:./elasticsearch-1.7.1/bin/plugin -install mobz/elasticsearch-head

2、elasticsearch-sql:使用 SQL 语法查询 elasticsearch:./bin/plugin -u https://github.com/NLPchina/elasticsearch-sql/releases/download/1.3.5/elasticsearch-sql-1.3.5.zip –install sql

github 地址:https://github.com/NLPchina/elasticsearch-sql

3、elasticsearch-bigdesk 是 elasticsearch 的一个集群监控工具,可以通过它来查看 ES 集群的各种状态。

安装:./bin/plugin -install lukas-vlcek/bigdesk

访问:http://192.103.101.203:9200/_plugin/bigdesk/,

4、elasticsearch-servicewrapper 插件是 ElasticSearch 的服务化插件,

在 https://github.com/elasticsearch/elasticsearch-servicewrapper 下载该插件后,解压缩,将 service 目录拷贝到 elasticsearch 目录的 bin 目录下。

而后,可以通过执行以下语句安装、启动、停止 ElasticSearch:

sh elasticsearch install

sh elasticsearch start

sh elasticsearch stop

Linux 上安装部署 ElasticSearch 全程记录  http://www.linuxidc.com/Linux/2015-09/123241.htm

Elasticsearch 安装使用教程 http://www.linuxidc.com/Linux/2015-02/113615.htm

ElasticSearch 配置文件译文解析 http://www.linuxidc.com/Linux/2015-02/114244.htm

ElasticSearch 集群搭建实例  http://www.linuxidc.com/Linux/2015-02/114243.htm

分布式搜索 ElasticSearch 单机与服务器环境搭建  http://www.linuxidc.com/Linux/2012-05/60787.htm

ElasticSearch 的工作机制  http://www.linuxidc.com/Linux/2014-11/109922.htm 

ElasticSearch 的详细介绍 :请点这里
ElasticSearch 的下载地址 :请点这里

本文永久更新链接地址 :http://www.linuxidc.com/Linux/2015-10/124046.htm

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-21发表,共计9393字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中