Pythonのelasticsearchライブラリで遊んでElasticsearchと仲良くなる
前書き
最近、業務でElasticsearchに初めて触れる機会をいただきました。Kibanaを使ってクエリを投げることにある程度慣れた後、PythonのコードからElasticsearchのデータ取得&更新などを実施することになりました。
そこでまずライブラリに慣れようと思い、色々とクエリを試してみました。
後述の公式ドキュメントにも書いてありますが、elasticsearch(Pythonライブラリ)はElasticsearchにそのままリクエストを送るだけの低レイヤのラッパーなので、先にKibanaなどを使ってElasticsearchの素の操作になれていた方がわかりやすいと思いました(感想)。
「kibanaでHTTPリクエストのbodyに入れてるjsonは、body引数でdictとして渡す」
「indexの指定は引数で行う」
「kibanaで_search
のように、アンダースコアで指定したものはメソッド名になる(es.search
など)」
と覚えておくと簡単に読み替えられると思います。
参考リンク
Python Elasticsearch Client — Elasticsearch 7.9.1 documentation
Elasticsearchで不要なフィールドの削除 - Qiita
環境
バージョン | |
---|---|
Elasticsearch | 7.1.0 |
MacOS Catalina | 10.15.6 |
Python3 | 3.8.6 |
elasticsearch(Pythonライブラリ) | 7.10.0 |
elasticsearchは、localstack を使っています。docker-composeの該当部分の設定は下のような形です。(今回は使いませんが他のコンテナも使用しているためこのような環境です。)
本題のelasticsearch(Pythonライブラリ)の操作とは直接関係ないのでご容赦を:pray:
version: "2" services: aws.local: container_name: awslocal mem_limit: 5g ports: - 4566-4597:4566-4597 - 8080:8080 image: localstack/localstack:0.10.9 environment: - SERVICES=s3,es,elasticsearch - DEFAULT_REGION=us-east-1 # dummy configure - AWS_DEFAULT_REGION=us-east-1 - AWS_REGION=us-east-1 - AWS_DEFAULT_OUTPUT=json - AWS_ACCESS_KEY_ID=xxx - AWS_SECRET_ACCESS_KEY=xxx
それではみていきましょう。
各種操作
接続とコネクションの確認
>>> from elasticsearch import Elasticsearch >>> es = Elasticsearch(hosts=[{'host': 'localhost', 'port': 4571}]) >>> es.info() # 疎通確認 # コネクションを閉じる。 >>> es.close()
バージョンなどの情報がprintされれば成功です。 これ以降、esと出てきたら上記のようにコネクションを作成したものとなります。
接続情報などを扱うクラスには、以下のようにドット繋ぎでアクセスできます。
>>> es.transport.connection_pool <DummyConnectionPool: (<Urllib3HttpConnection: http://localhost:4571>,)> >>> es.transport <elasticsearch.transport.Transport at 0x7f89aae75340>
ドキュメントの↓部分を参照すると、これらのクラスから取得できる情報などが確認できます。 esオブジェクトを作成する時に、これらを指定することができるので必要に応じて参照&確認してください。
Connection Layer API — Elasticsearch 7.9.1 documentation
Connection Layer API — Elasticsearch 7.9.1 documentation
データの投入 と 取得
indexを作成しつつ、データを投入します。
# 表示用にpprintをインポートしてます。 import pprint >>> doc = { 'first_name': '霊夢', 'last_name': '博麗', 'user_name': 'reimu', 'spell': { 'prefix': '霊符', 'title': '夢想封印' } } >>> response = es.index(index='gensou-index', id=1, body=doc) >>> pprint.pprint(response, indent=2, width=80) { '_id': '1', '_index': 'gensou-index', '_primary_term': 1, '_seq_no': 0, '_shards': {'failed': 0, 'successful': 1, 'total': 2}, '_type': '_doc', '_version': 1, 'result': 'created'}
データの取得(GETで、ID指定)は以下のように行います。 indexメソッドの時の挙動は Create or Update です。
>>> res = es.get(index="gensou-index", id=1) >>> pprint.pprint(res, indent=2, width=80) { '_id': '1', '_index': 'gensou-index', '_primary_term': 1, '_seq_no': 0, '_source': { 'first_name': '霊夢', 'last_name': '博麗', 'spell': {'prefix': '霊符', 'title': '夢想封印'}, 'user_name': 'reimu'}, '_type': '_doc', '_version': 1, 'found': True}
通常のElasticsearchを使った時と同じように、_source
という項目にdocument(一つのデータ)の情報が得られます。
次に、IDを指定しない場合のデータ投入をやってみます。
>>> doc = { 'first_name': 'レミリア', 'last_name': 'スカーレット', 'user_name': 'remilia', 'spell': { 'prefix': '紅符', 'title': 'スカーレットマイスタ' } } >>> res = es.index(index='gensou-index', body=doc) # idを指定しない場合、デフォルト値"None"となる。 >>> pprint.pprint(res, indent=2, width=80) { '_id': 'vKouLXYBabhEaLLCx13L', '_index': 'gensou-index', '_primary_term': 1, '_seq_no': 1, '_shards': {'failed': 0, 'successful': 1, 'total': 2}, '_type': '_doc', '_version': 1, 'result': 'created'}
返り値をprintするとわかりますが、_id
はvKouLXYBabhEaLLCx13L
となります。
もちろん、そのIDでgetもできます。
>>> es.get(index='gensou-index', id='vKouLXYBabhEaLLCx13L') # 結果は省略した。
index操作
indexの作成や情報の閲覧をやってみます。
es.indices.get
でindex全体の情報が取れます。フィールドのtype、shardやreplicasの数がわかります。
>>> es.indices.get('gensou-index') {'gensou-index': {'aliases': {}, 'mappings': {'properties': {'first_name': {'type': 'text', 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}}, 'last_name': {'type': 'text', 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}}, 'spell': {'properties': {'prefix': {'type': 'text', 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}}, 'title': {'type': 'text', 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}}}}, 'user_name': {'type': 'text', 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}}}}, 'settings': {'index': {'creation_date': '1607075069521', 'number_of_shards': '1', 'number_of_replicas': '1', 'uuid': 'QUz4C2jxQvWEhtlfBLKhwQ', 'version': {'created': '7010099'}, 'provided_name': 'gensou-index'}}}}
mapping情報だけ欲しければ、get_mapping
を使います。
>>> es.indices.get_mapping(index='gensou-index')
es.indices.create
で新しいindexを作成できます。
Mappingなどをこの際指定することもできます。
API Documentation — Elasticsearch 7.9.1 documentation
>>> es.indices.create(index='new-index') {'acknowledged': True, 'shards_acknowledged': True, 'index': 'new-index'}
テーブル一覧の取得にはes.indices.stats()
を使う..と思います。(GET /_cat/indicest
に相当する処理がわからず...)
ただ、フィールドがとても多くてみづらいのでjmespath
などを用いてjsonを整形する必要があるかもしれません。
以下のようにすると、一応index一覧を取得できます。
>>> es.indices.stats()['indices'].keys() dict_keys(['new-index', '.kibana_1', 'gensou-index'])
サーチ(複数件取得)
検索条件に合ったデータの取得をやってみます。(結果は大量なので適宜省略)
まず、引数を何も渡さない状態で検索すると、全てのindexを跨いで全データを取得します。
>>> es.search()
indexを指定すると、そのindexの全データを取得します。
>>> es.search(index='gensou-index') # match_allのクエリでも、同じ結果です。 res = es.search(index="test-index", body={"query": {"match_all": {}}})
最初にも書きましたが、kibanaでの_search
に相当する操作がes.search
メソッドでは実行でき、HTTPリクエストのbodyに指定していたjsonは、Pythonではbody引数にdictとして渡すことでクエリを投げることができます。
first_name
が霊夢
となるデータは以下のように取得します。
>>> es.search(index='gensou-index', body={'query': {'match': {'first_name': '霊夢'}}})
もう一つよく使うパターンです。
「特定のカラムが存在するデータのみ出力」するには、以下のように指定します。
以下は、age
という項目を持ったdocumentのみを抽出します。
>>> es.search(index='gensou-index', body={"query": {"exists": {"field": "age"}}}) #...............(省略)............. '_source': {'first_name': 'フランドール', 'last_name': 'スカーレット', 'user_name': 'flan', 'spell': {'prefix': '禁忌', 'title': 'かごめかごめ'}, 'age': 495}}]}}
カウント(条件にヒットするデータ件数の表示)
elasticsearchで_search
とほぼ同じようにクエリを指定できる_count
は、検索条件にマッチするデータの件数を表示するのに便利です。
先に述べた、exists
のクエリと組み合わせてよく使います。
>>> es.count(index='gensou-index') {'count': 2, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}}
結果の項目の絞り込み:
document(一つのデータ)の項目が多い場合、クエリの時点で欲しいフィールド以外を取らないようにすることができます。
また、別の方法としてjmespath
を使って取得後に必要なfiledのみを抽出する方法があります。おすすめなのですが今回は紹介しません
>>> es.search(index='gensou-index', filter_path=['hits.hits._source.user_name', 'hits.hits._id']) {'hits': {'hits': [{'_id': '1', '_source': {'user_name': 'reimu'}}, {'_id': 'vKouLXYBabhEaLLCx13L', '_source': {'user_name': 'remilia'}}, {'_id': 'flan-chan', '_source': {'user_name': 'flan'}}]}}
自分が入れたデータはJSONのキーをドット繋ぎでアクセスしたものになります。つまりhits.hits._source
配下になります。その他のデータも自在にfilter可能です。アスタリスクをワイルドカードとして使うこともできます。
めっちゃべんりです。
>>> es.search(index='gensou-index', filter_path=['hits.hits._source.sp*', 'hits.hits._id']) {'hits': {'hits': [{'_id': '1', '_source': {'spell': {'prefix': '霊符', 'title': '夢想封印'}}}, {'_id': 'vKouLXYBabhEaLLCx13L', '_source': {'spell': {'prefix': '紅符', 'title': 'スカーレットマイスタ'}}}, {'_id': 'flan-chan', '_source': {'spell': {'prefix': '禁忌', 'title': 'かごめかごめ'}}}]}}
データの更新(一部アップデート)
パーシャルアップデートをやってみます。データの一部だけ更新したり付け加えたりします。
まず現在のid=1
のデータを確認スルノデス。
>>> es.get(index='gensou-index', id=1) {'_index': 'gensou-index', '_type': '_doc', '_id': '1', '_version': 1, '_seq_no': 0, '_primary_term': 1, 'found': True, '_source': {'first_name': '霊夢', 'last_name': '博麗', 'user_name': 'reimu', 'spell': {'prefix': '霊符', 'title': '夢想封印'}}}
age: 13
という情報を加えましょう。
メソッド名からもわかるように、kibanaでいうところの_update
操作になります。なので、bodyの指定方法に注意です。(doc
というキー配下にデータをおく!)
>>> es.update(index='gensou-index', id=1, body={'doc': {'age': 13}})
データを取得するとageが入っています。
>>> es.get(index='gensou-index', id=1) # ..........(省略)................. 'age': 13}}
もう一つ、スクリプトを使った更新もやってみます。
elasticsearchはpainless
という、名前に反して割と面倒な(ドキュメントが少ない)スクリプト言語を使って、クエリ内部で簡単な処理を行うことができます。
今回は年齢(age)を+1してみます。
ポイントは、ctx._source
からデータ本体にアクセスできるということです。
>>> es.update(index='gensou-index', id=1, body={'script': 'ctx._source.age += 1;'})
結果を見ると、13-> 14になっているので成功です!。 painlessは複数行書くこともできるので、割と便利です。 (また、プラグインを入れるとpythonでも記述できるそうですが自分はやってません...)
>>> es.get(index='gensou-index', id=1) # .................(省略)............... 'age': 14}}
もう一つ、scriptを使ったよくやる構文を紹介します。 「あるフィールドを削除する」パターンをやってみます。
今回は、ageの項目を持つデータからageのデータをpainlessを使って削除します。
まず、ageの項目を持つデータ一覧を取得します。
>>> es.search(index='gensou-index', body={"query": {"exists": {"field": "age" }}}, filter_path=['hits.hits._source', 'hits.hits._id']) {'hits': {'hits': [{'_id': 'flan-chan', '_source': {'first_name': 'フランドール', 'last_name': 'スカーレット', 'user_name': 'flan', 'spell': {'prefix': '禁忌', 'title': 'かごめかごめ'}, 'age': 495}}, {'_id': '1', '_source': {'first_name': '霊夢', 'last_name': '博麗', 'user_name': 'reimu', 'spell': {'prefix': '霊符', 'title': '夢想封印'}, 'age': 14}}]}}
二件ヒットしました。どちらもageフィールドが確認できます。
では、ageフィールドの削除していきます。
>>> es.update(index='gensou-index', id=1, body={'script': "ctx._source.remove('age');"}) >>> es.update(index='gensou-index', id='flan-chan', body={'script': "ctx._source.remove('age');"})
成功したか確認します。まずサーチをしてみます。
>>> es.search(index='gensou-index', body={"query": {"exists": {"field": "age" }}}, filter_path=['hits.hits._source', 'hits.hits._id']) {}
ageを持つデータはないようです。
それぞれのデータをゲットしてみると、ageフィールドが消えたことを確認できます。(省略)
>>> es.get(index='gensou-index', id=1') >>> es.get(index='gensou-index', id='flan-chan') # 結果は省略
バルク操作
複数のデータまとめて操作します。_op_type
(多分オペレーションタイプ)を指定することで、update, create, deleteなどの操作を指定できます。
データは配列として渡しますが、大きなデータの場合にはpythonのジェネレータ関数を使って記述する必要があります。通常、bulkメソッドを使うようなシーンではデータサイズが巨大になると考えられるので、listは避けた方がいいかもしれません。公式もジェネレータを使っています。
(下の例では実際にはリストを渡しているのでジェネレータの利点はなくなっていますが、みやすさのためにこうしました。)
Helpers — Elasticsearch 7.9.1 documentation
>>> from elasticsearch import helpers # イテレータの作成 >>> data_iter = ( {'_op_type': 'create', # operation type '_index': 'gensou-index', '_id': data['id'], # これは任意 '_source': data['data']} for data in [{'data': {'user_name': 'パチュリーノーレッジ'}, 'id': 'p'}, {'data': {'user_name': '十六夜咲夜'}, 'id': 's'}]) >>> helpers.bulk(es, data_iter) (2, [])
結果を確認します。(省略)
>>> es.get(index='gensou-index', id='p') >>> es.get(index='gensou-index', id='s')
バルクデリートもやってみましょう。
>>> data_iter = ( {'_op_type': 'delete', # operation type '_index': 'gensou-index', '_id': data} for data in ['p', 's']) >>> helpers.bulk(es, data_iter) (2, [])
先程入れたデータがきえているはずです。