qtatsuの週報

初心者ですわぁ

Pythonのelasticsearchライブラリで遊んでElasticsearchと仲良くなる

前書き

最近、業務でElasticsearchに初めて触れる機会をいただきました。Kibanaを使ってクエリを投げることにある程度慣れた後、PythonのコードからElasticsearchのデータ取得&更新などを実施することになりました。

そこでまずライブラリに慣れようと思い、色々とクエリを試してみました。

後述の公式ドキュメントにも書いてありますが、elasticsearch(Pythonライブラリ)はElasticsearchにそのままリクエストを送るだけの低レイヤのラッパーなので、先にKibanaなどを使ってElasticsearchの素の操作になれていた方がわかりやすいと思いました(感想)。

「kibanaでHTTPリクエストのbodyに入れてるjsonは、body引数でdictとして渡す

「indexの指定は引数で行う」

「kibanaで_searchのように、アンダースコアで指定したものはメソッド名になる(es.searchなど)」

と覚えておくと簡単に読み替えられると思います。

参考リンク

Python Elasticsearch Client — Elasticsearch 7.9.1 documentation

Elasticsearchで不要なフィールドの削除 - Qiita

環境

バージョン
Elasticsearch 7.1.0
MacOS Catalina 10.15.6
Python3 3.8.6
elasticsearch(Pythonライブラリ) 7.10.0

elasticsearchは、localstack を使っています。docker-composeの該当部分の設定は下のような形です。(今回は使いませんが他のコンテナも使用しているためこのような環境です。)

本題のelasticsearch(Pythonライブラリ)の操作とは直接関係ないのでご容赦を:pray:

version: "2"
services:
  aws.local:
    container_name: awslocal
    mem_limit: 5g
    ports:
      - 4566-4597:4566-4597
      - 8080:8080
    image: localstack/localstack:0.10.9
    environment:
      - SERVICES=s3,es,elasticsearch
      - DEFAULT_REGION=us-east-1
      # dummy configure
      - AWS_DEFAULT_REGION=us-east-1
      - AWS_REGION=us-east-1
      - AWS_DEFAULT_OUTPUT=json
      - AWS_ACCESS_KEY_ID=xxx
      - AWS_SECRET_ACCESS_KEY=xxx

それではみていきましょう。

各種操作

接続とコネクションの確認

>>> from elasticsearch import Elasticsearch
>>> es = Elasticsearch(hosts=[{'host': 'localhost', 'port': 4571}])
>>> es.info()  # 疎通確認

# コネクションを閉じる。
>>> es.close()

バージョンなどの情報がprintされれば成功です。 これ以降、esと出てきたら上記のようにコネクションを作成したものとなります。

接続情報などを扱うクラスには、以下のようにドット繋ぎでアクセスできます。

>>> es.transport.connection_pool
<DummyConnectionPool: (<Urllib3HttpConnection: http://localhost:4571>,)>

>>> es.transport
<elasticsearch.transport.Transport at 0x7f89aae75340>

ドキュメントの↓部分を参照すると、これらのクラスから取得できる情報などが確認できます。 esオブジェクトを作成する時に、これらを指定することができるので必要に応じて参照&確認してください。

Connection Layer API — Elasticsearch 7.9.1 documentation

Connection Layer API — Elasticsearch 7.9.1 documentation

データの投入 と 取得

indexを作成しつつ、データを投入します。

# 表示用にpprintをインポートしてます。
import pprint

>>> doc = {
        'first_name': '霊夢',
        'last_name': '博麗',
        'user_name': 'reimu',
        'spell': {
            'prefix': '霊符',
            'title': '夢想封印'
         }
     }
>>> response = es.index(index='gensou-index', id=1, body=doc)
>>> pprint.pprint(response, indent=2, width=80)
{ '_id': '1',
  '_index': 'gensou-index',
  '_primary_term': 1,
  '_seq_no': 0,
  '_shards': {'failed': 0, 'successful': 1, 'total': 2},
  '_type': '_doc',
  '_version': 1,
  'result': 'created'}

データの取得(GETで、ID指定)は以下のように行います。 indexメソッドの時の挙動は Create or Update です。

>>> res = es.get(index="gensou-index", id=1)
>>> pprint.pprint(res, indent=2, width=80)
{ '_id': '1',
  '_index': 'gensou-index',
  '_primary_term': 1,
  '_seq_no': 0,
  '_source': { 'first_name': '霊夢',
               'last_name': '博麗',
               'spell': {'prefix': '霊符', 'title': '夢想封印'},
               'user_name': 'reimu'},
  '_type': '_doc',
  '_version': 1,
  'found': True}

通常のElasticsearchを使った時と同じように、_sourceという項目にdocument(一つのデータ)の情報が得られます。


次に、IDを指定しない場合のデータ投入をやってみます。

>>> doc = {
        'first_name': 'レミリア',
        'last_name': 'スカーレット',
        'user_name': 'remilia',
        'spell': {
            'prefix': '紅符',
            'title': 'スカーレットマイスタ'
         }
     }
>>> res = es.index(index='gensou-index', body=doc)  # idを指定しない場合、デフォルト値"None"となる。
>>> pprint.pprint(res, indent=2, width=80)
{ '_id': 'vKouLXYBabhEaLLCx13L',
  '_index': 'gensou-index',
  '_primary_term': 1,
  '_seq_no': 1,
  '_shards': {'failed': 0, 'successful': 1, 'total': 2},
  '_type': '_doc',
  '_version': 1,
  'result': 'created'}

返り値をprintするとわかりますが、_idvKouLXYBabhEaLLCx13Lとなります。

もちろん、そのIDでgetもできます。

>>> es.get(index='gensou-index', id='vKouLXYBabhEaLLCx13L')  # 結果は省略した。

index操作

indexの作成や情報の閲覧をやってみます。

es.indices.getでindex全体の情報が取れます。フィールドのtype、shardやreplicasの数がわかります。

>>> es.indices.get('gensou-index')

{'gensou-index': {'aliases': {},
  'mappings': {'properties': {'first_name': {'type': 'text',
     'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}},
    'last_name': {'type': 'text',
     'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}},
    'spell': {'properties': {'prefix': {'type': 'text',
       'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}},
      'title': {'type': 'text',
       'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}}}},
    'user_name': {'type': 'text',
     'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}}}},
  'settings': {'index': {'creation_date': '1607075069521',
    'number_of_shards': '1',
    'number_of_replicas': '1',
    'uuid': 'QUz4C2jxQvWEhtlfBLKhwQ',
    'version': {'created': '7010099'},
    'provided_name': 'gensou-index'}}}}

mapping情報だけ欲しければ、get_mappingを使います。

>>> es.indices.get_mapping(index='gensou-index')

es.indices.createで新しいindexを作成できます。 Mappingなどをこの際指定することもできます。 API Documentation — Elasticsearch 7.9.1 documentation

>>> es.indices.create(index='new-index')
{'acknowledged': True, 'shards_acknowledged': True, 'index': 'new-index'}

テーブル一覧の取得にはes.indices.stats()を使う..と思います。(GET /_cat/indicestに相当する処理がわからず...) ただ、フィールドがとても多くてみづらいのでjmespathなどを用いてjsonを整形する必要があるかもしれません。

以下のようにすると、一応index一覧を取得できます。

>>> es.indices.stats()['indices'].keys()

dict_keys(['new-index', '.kibana_1', 'gensou-index'])

サーチ(複数件取得)

検索条件に合ったデータの取得をやってみます。(結果は大量なので適宜省略)

まず、引数を何も渡さない状態で検索すると、全てのindexを跨いで全データを取得します。

>>> es.search()

indexを指定すると、そのindexの全データを取得します。

>>> es.search(index='gensou-index')


# match_allのクエリでも、同じ結果です。
res = es.search(index="test-index", body={"query": {"match_all": {}}})

最初にも書きましたが、kibanaでの_searchに相当する操作がes.searchメソッドでは実行でき、HTTPリクエストのbodyに指定していたjsonは、Pythonではbody引数にdictとして渡すことでクエリを投げることができます。

first_name霊夢となるデータは以下のように取得します。

>>> es.search(index='gensou-index', body={'query': {'match': {'first_name': '霊夢'}}})

もう一つよく使うパターンです。 「特定のカラムが存在するデータのみ出力」するには、以下のように指定します。 以下は、ageという項目を持ったdocumentのみを抽出します。

>>> es.search(index='gensou-index', body={"query": {"exists": {"field": "age"}}})
#...............(省略).............
    '_source': {'first_name': 'フランドール',
     'last_name': 'スカーレット',
     'user_name': 'flan',
     'spell': {'prefix': '禁忌', 'title': 'かごめかごめ'},
     'age': 495}}]}}

カウント(条件にヒットするデータ件数の表示)

elasticsearchで_searchとほぼ同じようにクエリを指定できる_countは、検索条件にマッチするデータの件数を表示するのに便利です。

先に述べた、existsのクエリと組み合わせてよく使います。

>>> es.count(index='gensou-index')
{'count': 2,
 '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}}

結果の項目の絞り込み:

document(一つのデータ)の項目が多い場合、クエリの時点で欲しいフィールド以外を取らないようにすることができます。

また、別の方法としてjmespathを使って取得後に必要なfiledのみを抽出する方法があります。おすすめなのですが今回は紹介しません

>>> es.search(index='gensou-index', filter_path=['hits.hits._source.user_name', 'hits.hits._id'])

   {'hits': {'hits': [{'_id': '1', '_source': {'user_name': 'reimu'}},
   {'_id': 'vKouLXYBabhEaLLCx13L', '_source': {'user_name': 'remilia'}},
   {'_id': 'flan-chan', '_source': {'user_name': 'flan'}}]}}

自分が入れたデータはJSONのキーをドット繋ぎでアクセスしたものになります。つまりhits.hits._source配下になります。その他のデータも自在にfilter可能です。アスタリスクワイルドカードとして使うこともできます。

めっちゃべんりです。

>>> es.search(index='gensou-index', filter_path=['hits.hits._source.sp*', 'hits.hits._id'])

   {'hits': {'hits': [{'_id': '1',
    '_source': {'spell': {'prefix': '霊符', 'title': '夢想封印'}}},
   {'_id': 'vKouLXYBabhEaLLCx13L',
    '_source': {'spell': {'prefix': '紅符', 'title': 'スカーレットマイスタ'}}},
   {'_id': 'flan-chan',
    '_source': {'spell': {'prefix': '禁忌', 'title': 'かごめかごめ'}}}]}}

データの更新(一部アップデート)

パーシャルアップデートをやってみます。データの一部だけ更新したり付け加えたりします。

まず現在のid=1のデータを確認スルノデス。

>>> es.get(index='gensou-index', id=1)
{'_index': 'gensou-index',
 '_type': '_doc',
 '_id': '1',
 '_version': 1,
 '_seq_no': 0,
 '_primary_term': 1,
 'found': True,
 '_source': {'first_name': '霊夢',
  'last_name': '博麗',
  'user_name': 'reimu',
  'spell': {'prefix': '霊符', 'title': '夢想封印'}}}

age: 13という情報を加えましょう。 メソッド名からもわかるように、kibanaでいうところの_update操作になります。なので、bodyの指定方法に注意です。(docというキー配下にデータをおく!)

>>> es.update(index='gensou-index', id=1, body={'doc': {'age': 13}})

データを取得するとageが入っています。

>>> es.get(index='gensou-index', id=1)
# ..........(省略).................

  'age': 13}}

もう一つ、スクリプトを使った更新もやってみます。 elasticsearchはpainlessという、名前に反して割と面倒な(ドキュメントが少ない)スクリプト言語を使って、クエリ内部で簡単な処理を行うことができます。

今回は年齢(age)を+1してみます。 ポイントは、ctx._sourceからデータ本体にアクセスできるということです。

>>> es.update(index='gensou-index', id=1, body={'script': 'ctx._source.age += 1;'})

結果を見ると、13-> 14になっているので成功です!。 painlessは複数行書くこともできるので、割と便利です。 (また、プラグインを入れるとpythonでも記述できるそうですが自分はやってません...)

>>> es.get(index='gensou-index', id=1)
# .................(省略)...............
  'age': 14}}

もう一つ、scriptを使ったよくやる構文を紹介します。 「あるフィールドを削除する」パターンをやってみます。

今回は、ageの項目を持つデータからageのデータをpainlessを使って削除します。

まず、ageの項目を持つデータ一覧を取得します。

>>> es.search(index='gensou-index', body={"query": {"exists": {"field": "age" }}}, filter_path=['hits.hits._source', 'hits.hits._id'])
{'hits': {'hits': [{'_id': 'flan-chan',
    '_source': {'first_name': 'フランドール',
     'last_name': 'スカーレット',
     'user_name': 'flan',
     'spell': {'prefix': '禁忌', 'title': 'かごめかごめ'},
     'age': 495}},
   {'_id': '1',
    '_source': {'first_name': '霊夢',
     'last_name': '博麗',
     'user_name': 'reimu',
     'spell': {'prefix': '霊符', 'title': '夢想封印'},
     'age': 14}}]}}

二件ヒットしました。どちらもageフィールドが確認できます。

では、ageフィールドの削除していきます。

>>> es.update(index='gensou-index', id=1, body={'script': "ctx._source.remove('age');"})
>>> es.update(index='gensou-index', id='flan-chan', body={'script': "ctx._source.remove('age');"})

成功したか確認します。まずサーチをしてみます。

>>> es.search(index='gensou-index', body={"query": {"exists": {"field": "age" }}}, filter_path=['hits.hits._source', 'hits.hits._id'])
{}

ageを持つデータはないようです。

それぞれのデータをゲットしてみると、ageフィールドが消えたことを確認できます。(省略)

>>> es.get(index='gensou-index', id=1')
>>> es.get(index='gensou-index', id='flan-chan')

# 結果は省略

バルク操作

複数のデータまとめて操作します。_op_type(多分オペレーションタイプ)を指定することで、update, create, deleteなどの操作を指定できます。

データは配列として渡しますが、大きなデータの場合にはpythonのジェネレータ関数を使って記述する必要があります。通常、bulkメソッドを使うようなシーンではデータサイズが巨大になると考えられるので、listは避けた方がいいかもしれません。公式もジェネレータを使っています。

(下の例では実際にはリストを渡しているのでジェネレータの利点はなくなっていますが、みやすさのためにこうしました。)

Helpers — Elasticsearch 7.9.1 documentation

>>> from elasticsearch import helpers

# イテレータの作成
>>> data_iter = (
        {'_op_type': 'create',       # operation type
         '_index': 'gensou-index',
         '_id': data['id'],               # これは任意
         '_source': data['data']}
         for data in
         [{'data': {'user_name': 'パチュリーノーレッジ'}, 'id': 'p'}, {'data': {'user_name': '十六夜咲夜'}, 'id': 's'}])

>>> helpers.bulk(es, data_iter)
(2, [])

結果を確認します。(省略)

>>> es.get(index='gensou-index', id='p')
>>> es.get(index='gensou-index', id='s')

バルクデリートもやってみましょう。

>>> data_iter = (
            {'_op_type': 'delete',       # operation type
             '_index': 'gensou-index',
             '_id': data}
             for data in
             ['p', 's'])
>>> helpers.bulk(es, data_iter)
(2, [])

先程入れたデータがきえているはずです。