이번 포스팅은 파이썬에서 엘라스틱을
연결해서 데이터를 insert, delete, search 등
다양한 요청을 하는 튜토리얼입니다.
필요한 라이브러리 설치
$ pip install elasticsearch
- 파이썬에서 엘라스틱을 연결하기 위해 pip으로 엘라스틱을 설치.
파이썬에서 엘라스틱 연결
from elasticsearch import Elasticsearch
es = Elasticsearch('[엘라스틱_서버_IP_주소]:9200')
- 엘라스틱이 설치된 서버 주소와 포트를 입력
- 파이썬에서 엘라스틱을 연결한다.
데이터 삽입
# mapping 없이 자동 인덱스 생성
def insertData():
es = Elasticsearch('[엘라스틱_서버_IP_주소]:9200')
index="product_list"
doc = {
"category" : "skirt",
"c_key" : "1234",
"price" : 11,400,
"status" : 1,
"@timestamp" : datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
}
es.index(index="product_list", doc_type="_doc", body=doc)
- 인덱스를 생성해주지 않고 바로 데이터를 넣으면 mapping이 자동으로 생성된다.
- 데이터 타입을 미리 지정해 줄 수 없다.
# mapping이 정의된 상태에서 데이터 삽입
def insertData():
es = Elasticsearch('[엘라스틱_서버_IP_주소]:9200')
index="product_list"
with open('mapping.json', 'r') as f:
mapping = json.load(f)
es.indices.create(index=index, body=mapping)
doc = {
"category" : "skirt",
"c_key" : "1234",
"price" : 11,400,
"status" : 1,
"@timestamp" : datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
}
es.index(index="product_list", doc_type="_doc", body=doc)
- 특정 데이터의 타입이나, 설정을 지정하기 위해서는 위와 같이 미리 mapping.json을 만들어 놓아야 한다.
- 미리 생성한 mapping.json으로 인덱스를 생성하고, 데이터를 넣으면 지정 맵핑에 맞게 데이터가 입력된다.
# mapping.json(예시)
{
"mappings" : {
"properties" : {
"@timestamp" : {
"type" : "date"
},
"category" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"c_key" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"price" : {
"type" : "int",
},
"status" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
}
}
<참고사항>
- 이미 같은 이름의 인덱스가 생성되어 있을 경우, 같은 이름의 인덱스를 생성하면 에러가 발생
- 따라서 인덱스를 생성할 때, 미리 같은 이름의 인덱스가 있는지 확인 후 생성하는 것이 좋다
if es.indices.exists(index=index):
pass
else:
es.indices.create(index=index, body=mapping)
데이터 검색
def searchAPI():
es = Elasticsearch('[엘라스틱_서버_IP_주소]:9200')
index = [검색할_인덱스]
body = [검색할_쿼리문]
res = es.search(index=index, body=body)
#res에 검색 결과가 담겨져 있다
- 데이터 검색은 .search를 사용한다
- 같이 보내는 파라미터로는 검색 대상인 인덱스와, 검색 쿼리문을 보내면 된다
- 해당 인덱스의 모든 결과를 확인하고 싶을 때 body는 아래와 같이 작성하면 된다
BULK API 사용
- 데이터를 엘라스틱에 넣을 때 하나의 다큐먼트가 아닌 벌크로 입력하고 싶을 때
# 0
from elasticsearch import Elasticsearch, helpers
# 1
docs = []
# 2
for num in range(100):
docs.append({
'_index': [인덱스_이름],
'_source': {
"category": "test"
"c_key": "test"
"status": "test"
"price": 1111
"@timestamp": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
}
})
#3
helpers.bulk(es, docs)
# 0 : 필요한 라이브러리 import
# 1 : 벌크로 보낼 데이터를 담을 리스트 정의
# 2 : 반복문을 통해 특정 데이터를 docs리스트에 담기
- 현재는 테스트로 같은 데이터가 반복해서 들어가게 되지만, 실제 변수 데이터를 담아 벌크 단위로 적재
# 3 : 데이터가 담아진 후에 helpers.bulk 함수를 사용해 엘라스틱에 데이터를 적재한다
< 참고 사항 >
엘라스틱에 데이터를 적재하는 시간으로 @timestamp를 설정하고 싶을 때는 아래와 같이 @timestamp를 정의
from datetime import datetime
# @timestamp timezone을 utc로 설정하여 kibana로 index 생성시 참조
def utc_time(self):
return datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
- 위와 같이 utcnow()로 utc 현재 시간을 가져와서 @timestamp에 담아준다.
- 카바나에 적재할 때는 utc존으로 하고, 키바나 환경 설정에서 시간을 Asia/Seoul로 설정해준다.
'ELK 스택 > Elasticsearch' 카테고리의 다른 글
[Elasticsearch] Scroll API 사용하기 (전체 문서 읽어오기) (0) | 2020.05.09 |
---|---|
[Elastic] Crontab을 활용해 주기적으로 Document 삭제 (0) | 2020.04.18 |
[Elasticsearch] 실전 쿼리 사용법 (0) | 2020.03.18 |
[Elasticsearch] Plugin 설치 및 적용 (0) | 2020.03.13 |
[Elasticsearch] 기존 맵핑 필드 타입 변경하기 + reindex (2) | 2020.02.10 |