#import pip
#pip.main(['install', 'elasticsearch'])
# -*- coding: utf-8 -*-
import dataiku
import pandas as pd
import numpy as np
from time import time
import os
import sys
from dataiku import pandasutils as pdu
from elasticsearch import Elasticsearch, helpers
import json
es = Elasticsearch('http://user:pwd@server_ip:port/')
es.info()
#Exclui índice, se ele existir
indice= "sim_dss"
doc_type="sim-type"
try :
es.indices.delete(index=indice)
except :
pass
#Definição de tipo para documentos
sim_type = {
"mappings":{
'sim-type': {
'properties': {
'NUMERODO':{'type': 'keyword'},
'TIPOOBITO':{'type': 'keyword'},
'def_tipo_obito':{'type': 'keyword'},
'DTOBITO':{'type': 'keyword'},
'data_obito':{'type': 'keyword'},
'ano_obito':{'type': 'keyword'},
'dia_semana_obito':{'type': 'keyword'},
'NATURAL':{'type': 'keyword'},
'DTNASC':{'type': 'keyword'},
'data_nasc':{'type': 'date'},
'idade_obito_calculado':{'type': 'float'},
'ano_nasc':{'type': 'integer'},
'dia_semana_nasc':{'type': 'keyword'},
'IDADE':{'type': 'keyword'},
'idade_obito':{'type': 'float'},
'SEXO':{'type': 'keyword'},
'def_sexo':{'type': 'keyword'},
'RACACOR':{'type': 'keyword'},
'def_raca_cor':{'type': 'keyword'},
'ESTCIV':{'type': 'keyword'},
'def_est_civil':{'type': 'keyword'},
'ESC':{'type': 'keyword'},
'def_escol':{'type': 'keyword'},
'OCUP':{'type': 'keyword'},
'CODBAIRES':{'type': 'keyword'},
'CODMUNRES':{'type': 'keyword'},
'LOCOCOR':{'type': 'keyword'},
'def_loc_ocor':{'type': 'keyword'},
'CODMUNOCOR':{'type': 'keyword'},
'IDADEMAE':{'type': 'keyword'},
'ESCMAE':{'type': 'keyword'},
'def_escol_mae':{'type': 'keyword'},
'OCUPMAE':{'type': 'keyword'},
'QTDFILVIVO':{'type': 'keyword'},
'QTDFILMORT':{'type': 'keyword'},
'GRAVIDEZ':{'type': 'keyword'},
'def_gravidez':{'type': 'keyword'},
'GESTACAO':{'type': 'keyword'},
'def_gestacao':{'type': 'keyword'},
'PARTO':{'type': 'keyword'},
'def_parto':{'type': 'keyword'},
'OBITOPARTO':{'type': 'keyword'},
'def_obito_parto':{'type': 'keyword'},
'PESO':{'type': 'keyword'},
'OBITOGRAV':{'type': 'keyword'},
'def_obito_grav':{'type': 'keyword'},
'OBITOPUERP':{'type': 'keyword'},
'def_obito_puerp':{'type': 'keyword'},
'ASSISTMED':{'type': 'keyword'},
'def_assist_med':{'type': 'keyword'},
'EXAME':{'type': 'keyword'},
'def_exame':{'type': 'keyword'},
'CIRURGIA':{'type': 'keyword'},
'def_cirurgia':{'type': 'keyword'},
'NECROPSIA':{'type': 'keyword'},
'def_necropsia':{'type': 'keyword'},
'CAUSABAS':{'type': 'keyword'},
'LINHAA':{'type': 'keyword'},
'LINHAB':{'type': 'keyword'},
'LINHAC':{'type': 'keyword'},
'LINHAD':{'type': 'keyword'},
'LINHAII':{'type': 'keyword'},
'def_circ_obito':{'type': 'keyword'},
'def_acid_trab':{'type': 'keyword'},
'def_fonte':{'type': 'keyword'},
'CODESTAB':{'type': 'keyword'},
'ATESTANTE':{'type': 'keyword'},
'UFINFORM':{'type': 'keyword'},
'HORAOBITO':{'type': 'keyword'},
'CODBAIOCOR':{'type': 'keyword'},
'NUMERODN':{'type': 'keyword'},
'TPASSINA':{'type': 'keyword'},
'DTATESTADO':{'type': 'keyword'},
'TPPOS':{'type': 'keyword'},
'DTINVESTIG':{'type': 'keyword'},
'CAUSABAS_O':{'type': 'keyword'},
'DTCADASTRO':{'type': 'keyword'},
'FONTEINV':{'type': 'keyword'},
'DTRECEBIM':{'type': 'keyword'},
'CODINST':{'type': 'keyword'},
'CB_PRE':{'type': 'keyword'},
'MORTEPARTO':{'type': 'keyword'},
'TPOBITOCOR':{'type': 'keyword'},
'ORIGEM':{'type': 'keyword'},
'DTCADINF':{'type': 'keyword'},
'DTCADINV':{'type': 'keyword'},
'NUMERODV':{'type': 'keyword'},
'NUMSUS':{'type': 'keyword'},
'COMUNSVOIM':{'type': 'keyword'},
'DTRECORIG':{'type': 'keyword'},
'DTRECORIGA':{'type': 'keyword'},
'CAUSAMAT':{'type': 'keyword'},
'ESC2010':{'type': 'keyword'},
'ESCMAE2010':{'type': 'keyword'},
'STDOEPIDEM':{'type': 'keyword'},
'STDONOVA':{'type': 'keyword'},
'SEMAGESTAC':{'type': 'keyword'},
'TPMORTEOCO':{'type': 'keyword'},
'DIFDATA':{'type': 'keyword'},
'DTCONCASO':{'type': 'keyword'},
'NUDIASOBIN':{'type': 'keyword'},
'SERIESCFAL':{'type': 'keyword'},
'SERIESCMAE':{'type': 'keyword'},
'CODMUNCART':{'type': 'keyword'},
'CODCART':{'type': 'keyword'},
'NUMREGCART':{'type': 'keyword'},
'DTREGCART':{'type': 'keyword'},
'DTCONINV':{'type': 'keyword'},
'CODMUNNATU':{'type': 'keyword'},
'ESTABDESCR':{'type': 'keyword'},
'CRM':{'type': 'keyword'},
'NUMEROLOTE':{'type': 'keyword'},
'STCODIFICA':{'type': 'keyword'},
'CODIFICADO':{'type': 'keyword'},
'VERSAOSIST':{'type': 'keyword'},
'VERSAOSCB':{'type': 'keyword'},
'ATESTADO':{'type': 'keyword'},
'ESCMAEAGR1':{'type': 'keyword'},
'ESCFALAGR1':{'type': 'keyword'},
'NUDIASOBCO':{'type': 'keyword'},
'FONTES':{'type': 'keyword'},
'TPRESGINFO':{'type': 'keyword'},
'TPNIVELINV':{'type': 'keyword'},
'NUDIASINF':{'type': 'keyword'},
'FONTESINF':{'type': 'keyword'},
'ALTCAUSA':{'type': 'keyword'},
'res_MUNNOME':{'type': 'keyword'},
'res_MUNNOMEX':{'type': 'keyword'},
'res_CAPITAL':{'type': 'keyword'},
'res_FRONTEIRA':{'type': 'keyword'},
'res_AMAZONIA':{'type': 'keyword'},
'res_LATITUDE':{'type': 'float'},
'res_LONGITUDE':{'type': 'float'},
'res_ALTITUDE':{'type': 'float'},
'res_AREA':{'type': 'float'},
'res_codigo_adotado':{'type': 'keyword'},
'res_SIGLA_UF':{'type': 'keyword'},
'res_CODIGO_UF':{'type': 'keyword'},
'res_NOME_UF':{'type': 'keyword'},
'res_MSAUDCOD':{'type': 'keyword'},
'res_RSAUDCOD':{'type': 'keyword'},
'res_CSAUDCOD':{'type': 'keyword'},
'ocor_MUNNOME':{'type': 'keyword'},
'ocor_MUNNOMEX':{'type': 'keyword'},
'ocor_CAPITAL':{'type': 'keyword'},
'ocor_FRONTEIRA':{'type': 'keyword'},
'ocor_AMAZONIA':{'type': 'keyword'},
'ocor_LATITUDE':{'type': 'float'},
'ocor_LONGITUDE':{'type': 'float'},
'ocor_ALTITUDE':{'type': 'float'},
'ocor_AREA':{'type': 'float'},
'ocor_codigo_adotado':{'type': 'keyword'},
'ocor_SIGLA_UF':{'type': 'keyword'},
'ocor_CODIGO_UF':{'type': 'keyword'},
'ocor_NOME_UF':{'type': 'keyword'},
'ocor_MSAUDCOD':{'type': 'keyword'},
'ocor_RSAUDCOD':{'type': 'keyword'},
'ocor_CSAUDCOD':{'type': 'keyword'},
'ocor_coordenadas' : {'type': 'geo_point'},
'res_coordenadas' : {'type': 'geo_point'},
'causabas_capitulo':{'type': 'keyword'},
'causabas_grupo':{'type': 'keyword'},
'causabas_categoria':{'type': 'keyword'},
'causabas_subcategoria':{'type': 'keyword'},
}
}
}
}
#cria índice no Elasticsearch
es.indices.create(index=indice,body=sim_type)
data_prepared = dataiku.Dataset("DORES_preparados")
def get_metric(project_name,dataset_name,metric_ids):
client = dataiku.api_client()
current_project = client.get_project(project_name)
dataset = current_project.get_dataset(dataset_name)
metrics = dataset.compute_metrics(partition='ALL', metric_ids=metric_ids)
metrics = [{'metric':m["metricId"],'value':int(m["value"])} for m in metrics["result"]["computed"] if m["metricId"] in metric_ids][0]
return metrics
def record_count(project_name,dataset_name):
return get_metric(project_name,dataset_name,['records:COUNT_RECORDS'])['value']
nrows = record_count('MERGESIM','DORES_preparados')
def geraJson(df):
return json.loads(df.T.to_json())
#tamanho do chunk
chunksize = 10000
#número total de chunks a serem indexados
nchunks = nrows/chunksize
#imprime o número total de documentos a serem indexados
print("Documentos: %i\n"%nrows)
res_bulk=[]
for chunk,df in enumerate(data_prepared.iter_dataframes(chunksize=chunksize)):
#gera o json do chunk de dados atual (formato pronto para indexação)
data_json = geraJson(df)
#imprime o número do chunk atual e o total de chunks a serem indexados
print("Chunk: %i/%i"%(chunk,nchunks))
#cria lista de ações para indexação de cada documento do chunk atual
lista=[]
for i, item in enumerate(data_json.values()):
data_dict = {
'_op_type': 'index',
'_index': indice,
'_type': doc_type,
'_source': item
}
lista.append(data_dict)
#indexa todos os documentos do chunk atual (bulk indexa em chunks)
res = helpers.bulk(client=es, actions=lista, chunk_size=1000, raise_on_error=False, raise_on_exception = False)
res_bulk.append(res)
print(res)
res_df = pd.DataFrame(res_bulk)
res_df.columns = ['indexed_chunksize', 'errors']
# Write recipe outputs
res_Elasticsearch = dataiku.Dataset("bulk_elasticsearch")
res_Elasticsearch.write_with_schema(res_df)