• Português
  • English
  • Elasticsearch e Python

    Por: em 1 de abril de 2018

    Com a crescente quantidade de dados produzidos, coletados e armazenados, várias soluções começam a ser pensadas e desenvolvidas para atender às necessidades que o tão falado Big Data traz. O Elasticsearch (ES) é um mecanismo distribuído de pesquisa e análise de dados RESTful bastante versátil, que vem sendo aplicado em diversos casos de uso. A pesquisa (search) permite que sejam executados diferentes tipos de pesquisa – estruturadas, não estruturadas, geográficas, métricas – da maneira que se desejar. As agregações disponíveis por meio das análises (analyze) permite que insights sejam obtidos a partir de, por exemplo, bilhões de linhas de log, permitindo que padrões e tendências sejam identificados.

    E tudo isso com:

    • Velocidade: são implementados índices invertidos com transdutores de estados finitos para consultas de texto completo, árvore BKD para armazenamento de dados numéricos e um armazenamento de colunas para análise para que possamos obter respostas mais rapidamente;

    • Escalabilidade: escalável horizontalmente, gerencia automaticamente como índices e consultas são distribuídos pelo cluster, tornando possível ir de um ambinte de teste inicial ao de produção com muito mais facilidade;

    • Resiliência: detecta falhas para manter seu cluster (e seus dados) seguro e disponível;

    • Flexibilidade: trabalha bem com todos os tipos de dados, numérico, textual, georeferenciado, estruturado, não estruturado.

    Amazon Elastic Search Service

    No DataLab, utilizamos principalmente o serviço de Elasticsearch da Amazon. Uma das principais vantagens é a integração com outros serviços da AWS, incluindo Kinesis Data Firehose, AWS IoT e Amazon CloudWatch Logs para consumo de dados transparente; AWS CloudTrail para auditoria; Amazon VPC, AWS KMS e AWS IAM para segurança; e AWS CloudFormation para orquestração de nuvem. Além disso, é fácil de usar pois é totalmente gerenciado, simplificando tarefas demoradas, como a aplicação de patches de software, a recuperação de falhas, os backups e o monitoramento. Com isso, é possível implantar um cluster do Elasticsearch pronto para produção em minutos, sem ter que se preocupar com a infraestrutura de provisionamento ou a instalação e a manutenção do software Elasticsearch.

    “O Amazon Elasticsearch Service é um serviço totalmente gerenciado que disponibiliza APIs e recursos de análise em tempo real fáceis de usar, bem como a disponibilidade, a escalabilidade e a segurança exigidas por cargas de trabalho de produção” [https://aws.amazon.com/pt/elasticsearch-service/]

    Para completar, oferece vários níveis de segurança para domínios, entre outras características muito úteis, interessantes e importantes.

    Python Elastisearch Client

    Outra escolha do laboratório é a comunicação com o AWS Elastic Search utilizando a linguagem Python e, para isso, também o pacote Python Elasticsearch Client.

    Aplicações e Visualizações

    No DataLab, usamos o ES em projetos nos quais há grande quantidade de dados e os mesmo precisam ser recuperados rapidamente. Utilizamos também a ferramenta Kibana, que no ES da AWS é disponibilizada automaticamente com base nos índices do ES criado, para as primeiras visualizações dos dados. A figura abaixo mostra um exemplo de dashboard criado com essa ferramenta.

    Exemplos práticos

    Abaixo, alguns exemplos básicos das nossas utilizações mais comuns:

    • Conexão

    from elasticsearch import Elasticsearch

    from elasticsearch.connection import RequestsHttpConnection

          

    es_client =  Elasticsearch(host=ES_HOST, #link do endpoint

                                          port=443,

                                          use_ssl=True,

                                          verify_certs=True,

                                          proxies=None,

                                          connection_class=RequestsHttpConnection)

     

    • Criação de índice e Mapping

     

    mapping = json.load(open(mapping.json + '.json'))

    es.indices.create(index=index_complete_name, body=mapping)

     

    • Exemplo de mapping.json:

    {

    "mappings": {

    "doc_type_name_1": {

    "properties": {

    "user_id":          { "type": "keyword" },

    "timestamp":        { "type": "date" },

    "count":            { "type":"float" }

    }

    },

    " doc_type_name_2":{

    "properties": {

    "user_id":          { "type": "keyword" },

    "timestamp":        { "type":"date"},

    "product":          { "type": "keyword" },

    "count":            { "type":"float" }

    }

    }

           }

    }

     

    • Listagem de índice

     

    es.indices.get_alias()

     

    • Inclusão de alias para índice

     

    es_client.indices.put_alias(index=new_index_name, name=index_name)

     

    • Deleção de índice

     

    es_client.indices.delete(index=old_index_name)

     

    • Indexação – armazenamento de um documento

    result = es_client.search(index=index_name, doc_type=doc_type, body=body)

     

    • Busca – recuperação de documento(s)

     

    # Busca comum, com filtro em mais de um campo:

     

    body = {}

    if data:

    should_list = []

    for key in data:

    should_list.append({'match': {key: data[key]} })

    body = {'query':{'bool':{'should': should_list}}}

     

    result = es_client.search(index=index_name, doc_type=doc_type, body=body)

     

    # Busca do valor máximo de um campo:

     

        result = es_client.search(index=index_name,

                           doc_type=doc_type,

                           body={'aggs':{'per_tag':{'terms':{'field':field}}}})

     

    # Busca por intervalo (nesse exemplo, de data):

      

        must_clause = [{'match_all':{}}]

        # se além do intervalo, você quiser filtrar pelo valor de outro campo

        if query:

            must_clause = []

            for key in query:

                must_clause.append({"match": {key:query[key]}})

              

                

     

        es_client = get_connection()

        result = es_client.search(index=index_name,

                                    doc_type=doc_type,

                                    body={"query":{

                                            "bool":{

                                                "must": must_clause,

                                                "filter":{

                                                    "range":{

                                                        "processed_date":{

                                                            "from": from_ts,

                                                            "to": to_ts,

                                                            "format": ts_format

                                                        }

                                                    }

                                                }

                                            }

                                          }

                                    })

    # campos do tipo ‘date’ podem ser enviados ao ES com objetos do tipo datetime.datetime

     

    • Atualização de documento(s)

    # para fazer um update no ES, é preciso fornecer o id do documento a ser atualizado:

    es_client.update(index=index_name, doc_type=doc_type, id=record['_id'], body={'doc':doc})

     

    # para atualizar um conjunto de documentos que respeitam a lista de campos e seus respectivos valores de um dict search, pode-se fazer conforme abaixo:

    ids = retrieve(index, doc_type, search) # usando a forma de recuperação descrita acima

        if ids:

            for record in ids['hits']:

                es_client.update(index=index_name, \

       doc_type=doc_type, \

       id=record['_id'], \

       body={'doc':doc_dict})

         else:

         insert(index, doc_type, doc_dict)

     

    • Deleção de documento(s)

     

    es_client.delete_by_query(index=index, doc_type=doc_type, body=dict_busca)

     

    • Reindexação

     

    elasticsearch.helpers.reindex(es, old_index_name, new_index_name)

     

    Saiba mais:

    Elasticsearch: https://www.elastic.co/products/elasticsearch

    Amazon Elasticsearch Service: https://aws.amazon.com/pt/elasticsearch-service

    Python Elasticsearch Client: http://elasticsearch-py.readthedocs.io

    RESTful: https://www.devmedia.com.br/introducao-a-web-services-restful/37387

    Árvore BKD: https://users.cs.duke.edu/~pankaj/publications/papers/bkd-sstd.pdf

    1 de abril de 2018