ELK 스택/Elasticsearch

[Elasticsearch] Scroll API 사용하기 (전체 문서 읽어오기)

new_challenge 2020. 5. 9. 22:18
반응형

이번 포스팅은 엘라스틱 서치에서 전체 문서를 

가져오기 위해, 스크롤 API를 사용하는 방법에 관한

포스팅입니다.

 

 

 

 

Elasticsearch Scoll API란, 

Scroll API

search 요청이 하나의 페이지를 결과로 리턴하는 동안, scrooll API는 하나의 search 요청에서 많은 수의 결과 리턴을 가능하게 해 준다.

이러한 방법은 전통적인 데이터베이스의 커서를 사용하는 것과 같은 방식이라고 생각할 수 있다.

 

Scrolling은 실시간으로 유저의 요청을 처리하기 위해 의도된 것이 아니라 대량의 데이터를 처리하기 위한 것 (예를 들면, 한 인덱스를 다른 구성으로 된 새로운 인덱스로 리인덱스 해주기 위함)

 

Scrolling을 사용하기 위해, 초기 검색 요청 시 scroll 파라미터를 쿼리문에 명시해줘야 한다. 이것은 엘라스틱에게 얼마나 오랫동안 "search context"를 살아있게 할 것인가를 말해준다.

 

 

< 참고 문서 > 

www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html

 

Request Body Search | Elasticsearch Reference [7.6] | Elastic

Highlighters don’t reflect the boolean logic of a query when extracting terms to highlight. Thus, for some complex boolean queries (e.g nested boolean queries, queries using minimum_should_match etc.), parts of documents may be highlighted that don’t corre

www.elastic.co

>> 자세한 설명은 공식 문서를 참고. 

 

 

 

공식문서 예시

POST /twitter/_search?scroll=1m
{
    "size": 100,
    "query": {
        "match" : {
            "title" : "elasticsearch"
        }
    }
}

 

> 위 요청을 보내면 결과는 _scroll_id값을 포함한 결과를 리턴한다.

> 현재 size가 100으로 설정되어, 100건씩 fetch 하도록 설정했기 때문에 이번 쿼리 결과는 100건 만 확인

> 첫 요청에서 받은 scroll_id 값만 elasticsearch에 전달해주면 아직 받지 못한 데이터를 100건 씩 이어서 받을 수 있다.

> 제대로 작동하는지는 데이터의 id 넘버 등으로 확인이 가능함

 

 

POST /_search/scroll 
{
    "scroll" : "1m", 
    "scroll_id" : "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAD4WYm9laVYtZndUQlNsdDcwakFMNjU1QQ==" 
}

 

> 다음 배치의 결과를 받기 위해 scroll API에 이 값을 넘겨준다.

 

# GET이나 POST가 사용되며, 요청 URL에는 인덱스 이름을 포함하지 않아도 된다.

- scroll_id 값이 원래의 search요청을 나타내 주기 때문이다.

 

# "scroll" : "1m"의 뜻은 엘라스틱서치에게 search context를 1m를 추가로 유지하라고 말하는 것

 

# size 파라미터는 각 배치의 결과의 maximum 수를 지정하도록 하는 것.

- 각각의 scroll API는 더 이상 리턴되는 데이터가 없을 때까지 다음 결과의 배치를 리턴한다.

 

# Important : 초기 검색 요청과 이후의 각 스크롤 요청은 각각 _scroll_id를 반환한다. _scroll_id는 요청 간에 변경될 수 있지만, 항상 변경되는 것은 아니다. 어쨌든 가장 최근에 받은 _scroll_id를 사용하여 다음 요청을 보내야 한다.

 

# Note : 만약 집계를 요청한다면, 오직 처음 요청 결과만 집계 결과를 포함할 것이다.

 

# Note : 스크롤 요청에는 정렬 순서가 _doc 일 때 더 빨리 최적화를 하는 최적화 기능이 있다. 

- 순서에 관계없이 모든 문서를 반복하려면 가장 효율적인 옵션!

 

GET /_search?scroll=1m
{
  "sort": [
    "_doc"
  ]
}

 

 

Scroll API 닫기

Scroll API 요청을 하면 "search context"는 문서를 다 읽은 후에도 지정된 시간만큼 살아있다. 이는 엘라스틱서치의 메모리를 잡고 있기 때문에 사용이 완료되면 닫아주는 것이 좋다.

 

> 현재 얼마나 많은 search context가 오픈되어 있는지 확인하기 위해서는 아래와 같이 한다.

GET /_nodes/stats/indices/search

 

> Scroll API 사용이 완료된 건에 대해서는 clear-scroll API를 통해 사용 중인 search context를 닫아준다.

# 하나의 scroll_id를 닫을 경우
DELETE /_search/scroll
{
    "scroll_id" : "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAD4WYm9laVYtZndUQlNsdDcwakFMNjU1QQ=="
}


# 여러개의 scroll_id를 닫을 경우
DELETE /_search/scroll
{
    "scroll_id" : [
      "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAD4WYm9laVYtZndUQlNsdDcwakFMNjU1QQ==",
      "DnF1ZXJ5VGhlbkZldGNoBQAAAAAAAAABFmtSWWRRWUJrU2o2ZExpSGJCVmQxYUEAAAAAAAAAAxZrUllkUVlCa1NqNmRMaUhiQlZkMWFBAAAAAAAAAAIWa1JZZFFZQmtTajZkTGlIYkJWZDFhQQAAAAAAAAAFFmtSWWRRWUJrU2o2ZExpSGJCVmQxYUEAAAAAAAAABBZrUllkUVlCa1NqNmRMaUhiQlZkMWFB"
    ]
}


# 살아있는 모든 context를 닫을 경우
DELETE /_search/scroll/_all

 

 

Python에서 Elasticsearch Scroll API 사용

필요한 라이브러리 import 

from elasticsearch import Elasticsearch, exceptions
import json, time

 

시간 체크를 위한 설정 및 엘라스틱 호스트 연결

# time check
start_time = time.time()

# 접속할 elastic 정보
host = "http://47.56.200.94:9200"
es = Elasticsearch(host)

 

인덱스 및 search 요청 쿼리 작성

# doc_count 초기화
doc_count = 0

# 검색 요청할 인덱스 정의
index = "[인덱스_이름]"

# 요청 쿼리문 작성
resp = es.search(
    index = index,
    body = {
            "size": 100,
            "query": {
                "match_all": {}
                },
            "sort": [
                {
                  "id": {
                    "order": "asc"
                  }
                }
              ]
            },
    scroll = '1s' 
)

old_scroll_id = resp['_scroll_id']

> 여기서는 sort를 id를 기준으로 오름차순 정렬

> 더 최적화를 하기 위해서는 sort기준을 _doc으로 설정하면 된다

 

처음 요청에서 출력된 결과 저장

res = {}
result = []

# 처음 출력된 결과 저장
for doc in resp['hits']['hits']:
    res['resnet_vector'] = doc['_source']['resnet_vector']
    res['cat_key'] = doc['_source']['cat_key']
    res['id'] = doc['_source']['id']
    result.append(res)

 

Scroll API를 통해 나온 결과 저장

# SCROLL API를 통해 나온 결과 저장
while len(resp['hits']['hits']):
    resp = es.scroll(
        scroll_id = old_scroll_id,
        scroll = '1s' # length of time to keep search context
    )

    old_scroll_id = resp['_scroll_id']

    for doc in resp['hits']['hits']:
        res['resnet_vector'] = doc['_source']['resnet_vector']
        res['cat_key'] = doc['_source']['cat_key']
        res['id'] = doc['_source']['id']
        result.append(res)
        
    print(len(result))      

>> scroll api를 사용해 그 전 결과에서 나온 _scroll_id를 사용하여 추가 검색

>> 결과는 resp ['hits']['hits']에 담겨 있고, 그 안에서 필요한 데이터만 추출해서 따로 저장한다.

 

결과 확인

총 9902의 문서를 전부 가져온다. 실제 엘라스틱에 있는 데이터 개수와 맞는지 비교한다.

 

 

 

 

전체 코드 elastic_scroll.py
from elasticsearch import Elasticsearch, exceptions
import json, time

# time check
start_time = time.time()

# 접속할 elastic 정보
host = "http://[IP주소]:9200"
es = Elasticsearch(host)

doc_count = 0
index = "[인덱스이름]"
resp = es.search(
    index = index,
    body = {
            "size": 100,
            "query": {
                "match_all": {}
                },
            "sort": [
                {
                  "id": {
                    "order": "asc"
                  }
                }
              ]
            },
    scroll = '1s' 
)

old_scroll_id = resp['_scroll_id']


res = {}
result = []


# 처음 출력된 결과 저장
for doc in resp['hits']['hits']:
    res['resnet_vector'] = doc['_source']['resnet_vector']
    res['cat_key'] = doc['_source']['cat_key']
    res['id'] = doc['_source']['id']
    result.append(res)

# SCROLL API를 통해 나온 결과 저장
while len(resp['hits']['hits']):
    resp = es.scroll(
        scroll_id = old_scroll_id,
        scroll = '1s' # length of time to keep search context
    )

    for doc in resp['hits']['hits']:
        res['resnet_vector'] = doc['_source']['resnet_vector']
        res['cat_key'] = doc['_source']['cat_key']
        res['id'] = doc['_source']['id']
        result.append(res)
        
    print(len(result))      

print ("TOTAL TIME:", time.time() - start_time, "seconds.")

 

반응형