How to make elasticsearch.py Fast (Bulk Operate)

用 Python 操作 Elasticsearch 時不免俗的肯定會安裝官方推出的 elasticsearch.py,最近在用 Python 接收 Email Log 並寫入 Elasticsearch 時有蠻多心得可以分享的,大家都知道 AWS lambda 是用執行時間算錢的,所以在撰寫 AWS Lambda 時我可說是分秒必爭,看到執行超過 0.5 秒就各種吐血,所以從最開始的 .index 一筆一筆插入資料到 Elasticsearch,到後來用 streaming_bulk 處理,最後受不了用 parallel_bulk 混合 streaming_bulk 操作,以下就來聊聊這幾種方式的差異以及地雷吧!

單筆新增

最開始,想要插入資料到 Elasticsearch 大家肯定會想到 es.index() 這東西,然後用迴圈去跑,如果你一次只插入一筆資料,這樣做當然沒問題,但如果一次需要插入 N 筆資料,這就是一個很糟的方式! 原因是每次執行 es.index() ,Python 都會傳送資料給 Elasticsearch 一次,如果有十筆資料就傳送十次,如果傳送一筆並等 Elasticsearch 處理完要 100ms,十筆資料就需要 1s,所以多筆資料時十分不推薦 es.index() 。

範例程式碼

from elasticsearch import Elasticsearch

es = Elasticsearch("http://127.0.0.1:9200/")

es.index(index="test-index", doc_type='tweet', id=1, body={
  "name": "Karl",
  "email": "[email protected]"
})

單執行緒批次新增

其實 Elasticsearch 本身有支援批次操作,你可以把一整個 List 的東西傳送給 Elasticsearch 讓他自己去處理,不管是寫入、更新或刪除都可以這麼做,操作時間會比 es.index() 的單筆操作快上許多!

但,如果你仔細觀察會發現,當資料數量增加時,執行速度依舊會變得很慢,這是因為雖然 Elasticsearch Server 本身能夠批次處理,但網路傳輸也是需要時間的,當資料數量增加時,雖然只要傳送一次資料,但這一次資料卻可能是 30MB,速度依舊會慢下來,我依舊不滿足。

如果是批次處理的話,你需要在每筆資料(Dict)裡面都設定 _type 跟 _index,然後整理成一個 List 傳送到 Elasticsearch 他就會幫你處理。

範例程式碼

from elasticsearch import Elasticsearch 
from elasticsearch import helpers as es_helpers

es = Elasticsearch('http://127.0.0.1:9200')

data = [
  {'name': 'Karl 1', 'email': '[email protected]', '_type': 'user', '_index': 'user_table', '_id': 1},
  {'name': 'Karl 2', 'email': '[email protected]', '_type': 'user', '_index': 'user_table', '_id': 2},
  {'name': 'Karl 3', 'email': '[email protected]', '_type': 'user', '_index': 'user_table', '_id': 3}
]

es_helpers.bulk(es, data)

多執行緒批次新增

最後的大決,就是用多個執行緒平行化的傳送資料到 Elasticsearch,這樣做在資料量大的時候確實單執行緒快很多,但當資料量少時,你會發現速度反而變得比單執行緒還要慢,這是因為執行緒的建立本身是很耗費時間的,如果你只有 10 筆資料,放在原本的執行緒裡傳送可能只要 300ms,但放到多執行緒裡,每 5 筆一個執行緒,反而可能要 600ms。

多執行緒處理有個小地雷,當你執行完 parallel_bulk 以後其實不會發生任何事情,資料不會真的新增進去,parallel_bulk 會回傳一個 generator,你必須把這個產生器跑過一次,才會真的新增進去… 這東西一開始沒注意到,害我掉了一小時的資料,詳細可以看下面的範例:

參考閱讀 elasticsearch2.1,paralle_bulk don’t work,why?

範例程式碼

from collections import deque

from elasticsearch import Elasticsearch 
from elasticsearch import helpers as es_helpers

es = Elasticsearch('http://127.0.0.1:9200')

data = [
 {'name': 'Karl 1', 'email': '[email protected]', '_type': 'user', '_index': 'user_table', '_id': 1},
 {'name': 'Karl 2', 'email': '[email protected]', '_type': 'user', '_index': 'user_table', '_id': 2},
 {'name': 'Karl 3', 'email': '[email protected]', '_type': 'user', '_index': 'user_table', '_id': 3},
]

# chunk_size 是以筆數為單位
deque(es_helpers.parallel_bulk(es, data, thread_count=4, chunk_size=50), maxlen=0)

需要特別介紹一下 deque 這東西,其實他本意是建立一個雙向的 List,但這邊我們有點取巧,利用他高效能的優勢來作為 Generator 的 Consume Method,他的使用其實就和下面的程式碼目的相同(把 Generator 全部使用過一次):

for response in es_helpers.parallel_bulk(es, data, thread_count=4, chunk_size=50):
  pass

參考閱讀 https://docs.python.org/2/library/itertools.html#recipes

大決!

其實最好的方式,是設定臨界值,如果資料筆數大於臨界值就使用多執行緒,如果小於臨界值,就使用單執行緒批次操作,至於臨界值是多少就需要你觀察實際情況判斷,以我處理 Mandrill Email Webhook 來說我是設定 100 筆,如果超過 100 筆資料就用多執行緒處裡,否則用單執行緒就足夠快了。

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *

這個網站採用 Akismet 服務減少垃圾留言。進一步了解 Akismet 如何處理網站訪客的留言資料