ELK 스택/Elasticsearch

[Elasticsearch] python에서 엘라스틱 사용하기

new_challenge 2020. 2. 11. 23:13
반응형

이번 포스팅은 파이썬에서 엘라스틱을

연결해서 데이터를 insert, delete, search 등

다양한 요청을 하는 튜토리얼입니다.

 

 

 

 

필요한 라이브러리 설치

$ pip install elasticsearch

 

- 파이썬에서 엘라스틱을 연결하기 위해 pip으로 엘라스틱을 설치.

 

파이썬에서 엘라스틱 연결

from elasticsearch import Elasticsearch

es = Elasticsearch('[엘라스틱_서버_IP_주소]:9200')

 

- 엘라스틱이 설치된 서버 주소와 포트를 입력

- 파이썬에서 엘라스틱을 연결한다.

 

데이터 삽입

# mapping 없이 자동 인덱스 생성

def insertData():
    es = Elasticsearch('[엘라스틱_서버_IP_주소]:9200')
    
    index="product_list"
    
    doc = {
        "category" : "skirt",
        "c_key" : "1234",
        "price" : 11,400,
        "status" : 1,
        "@timestamp" : datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
    }
    
    es.index(index="product_list", doc_type="_doc", body=doc)

 

- 인덱스를 생성해주지 않고 바로 데이터를 넣으면 mapping이 자동으로 생성된다.

- 데이터 타입을 미리 지정해 줄 수 없다.

 

# mapping이 정의된 상태에서 데이터 삽입

def insertData():
    es = Elasticsearch('[엘라스틱_서버_IP_주소]:9200')
    
    index="product_list"
    
    with open('mapping.json', 'r') as f:
        mapping = json.load(f)
        
    es.indices.create(index=index, body=mapping)    

    doc = {
        "category" : "skirt",
        "c_key" : "1234",
        "price" : 11,400,
        "status" : 1,
        "@timestamp" : datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
    }
    
    es.index(index="product_list", doc_type="_doc", body=doc)

 

- 특정 데이터의 타입이나, 설정을 지정하기 위해서는 위와 같이 미리 mapping.json을 만들어 놓아야 한다.

- 미리 생성한 mapping.json으로 인덱스를 생성하고, 데이터를 넣으면 지정 맵핑에 맞게 데이터가 입력된다.

 

 

# mapping.json(예시)

{
  "mappings" : {
    "properties" : {
      "@timestamp" : {
        "type" : "date"
      },
      "category" : {
        "type" : "text",
        "fields" : {
          "keyword" : {
            "type" : "keyword",
            "ignore_above" : 256
          }
        }
      },
      "c_key" : {
        "type" : "text",
        "fields" : {
          "keyword" : {
            "type" : "keyword",
            "ignore_above" : 256
          }
        }
      },
      "price" : {
        "type" : "int",
      },
      "status" : {
        "type" : "text",
        "fields" : {
          "keyword" : {
            "type" : "keyword",
            "ignore_above" : 256
          }
        }
      }
    }
  }
}

 

<참고사항>

- 이미 같은 이름의 인덱스가 생성되어 있을 경우, 같은 이름의 인덱스를 생성하면 에러가 발생

- 따라서 인덱스를 생성할 때, 미리 같은 이름의 인덱스가 있는지 확인 후 생성하는 것이 좋다

 

if es.indices.exists(index=index):
	pass
else:
	es.indices.create(index=index, body=mapping)

 

 

데이터 검색

def searchAPI():
    es = Elasticsearch('[엘라스틱_서버_IP_주소]:9200')
    
    index = [검색할_인덱스]
    body = [검색할_쿼리문]
    
    res = es.search(index=index, body=body)
    #res에 검색 결과가 담겨져 있다                

 

- 데이터 검색은 .search를 사용한다

- 같이 보내는 파라미터로는 검색 대상인 인덱스와, 검색 쿼리문을 보내면 된다

- 해당 인덱스의 모든 결과를 확인하고 싶을 때 body는 아래와 같이 작성하면 된다

 

해당 인덱스의 모든 문서 검색

 

 

BULK API 사용

- 데이터를 엘라스틱에 넣을 때 하나의 다큐먼트가 아닌 벌크로 입력하고 싶을 때

# 0
from elasticsearch import Elasticsearch, helpers

# 1
docs = []

# 2
for num in range(100):
    docs.append({
    	'_index': [인덱스_이름],
        '_source': {
            "category": "test"
            "c_key": "test"
            "status": "test"
            "price": 1111
            "@timestamp": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
            }
        })

#3
helpers.bulk(es, docs)

 

# 0 : 필요한 라이브러리 import 

# 1 : 벌크로 보낼 데이터를 담을 리스트 정의

# 2 : 반복문을 통해 특정 데이터를 docs리스트에 담기

- 현재는 테스트로 같은 데이터가 반복해서 들어가게 되지만, 실제 변수 데이터를 담아 벌크 단위로 적재

# 3 : 데이터가 담아진 후에 helpers.bulk 함수를 사용해 엘라스틱에 데이터를 적재한다

 

 

< 참고 사항 >

엘라스틱에 데이터를 적재하는 시간으로 @timestamp를 설정하고 싶을 때는 아래와 같이 @timestamp를 정의

from datetime import datetime

# @timestamp timezone을 utc로 설정하여 kibana로 index 생성시 참조
def utc_time(self):  
    return datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'

 

- 위와 같이 utcnow()로 utc 현재 시간을 가져와서 @timestamp에 담아준다.

- 카바나에 적재할 때는 utc존으로 하고, 키바나 환경 설정에서 시간을 Asia/Seoul로 설정해준다.

반응형