#import pip
#pip.main(['install', 'elasticsearch'])
# -*- coding: utf-8 -*-
import dataiku
import pandas as pd
import numpy as np
from time import time
import os
import sys
from dataiku import pandasutils as pdu
from elasticsearch import Elasticsearch, helpers
import json
es = Elasticsearch('http://user:pwd@server_ip:port/')
es.info()
#Exclui índice, se ele existir
indice= "datasus-sih"
doc_type="sih-type"
try :
es.indices.delete(index=indice)
except :
pass
#Definição de tipo para documentos
sih_type = {
"mappings":{
'sih-type': {
'properties': {
'ANO_CMPT': {
'type': 'long'
},
'AUD_JUST': {
'type': 'keyword'
},
'CAR_INT': {
'type': 'long'
},
'CBOR': {
'type': 'keyword'
},
'CEP': {
'type': 'long'
},
'CGC_HOSP': {
'type': 'keyword'
},
'CID_ASSO': {
'type': 'keyword'
},
'CID_MORTE': {
'type': 'keyword'
},
'CID_NOTIF': {
'type': 'keyword'
},
'CNAER': {
'type': 'long'
},
'def_cnae': {
'type': 'keyword'
},
'CNES': {
'type': 'long'
},
'CNPJ_MANT': {
'type': 'float'
},
'INFEHOSP': {
'type': 'keyword'
},
'def_infehosp': {
'type': 'keyword'
},
'COBRANCA': {
'type': 'long'
},
'COD_IDADE': {
'type': 'long'
},
'COMPLEX': {
'type': 'long'
},
'CONTRACEP1': {
'type': 'long'
},
'CONTRACEP2': {
'type': 'long'
},
'DIAGSEC1': {
'type': 'keyword'
},
'DIAGSEC2': {
'type': 'keyword'
},
'DIAGSEC3': {
'type': 'keyword'
},
'DIAGSEC4': {
'type': 'keyword'
},
'DIAGSEC5': {
'type': 'keyword'
},
'DIAGSEC6': {
'type': 'keyword'
},
'DIAGSEC7': {
'type': 'keyword'
},
'DIAGSEC8': {
'type': 'keyword'
},
'DIAGSEC9': {
'type': 'keyword'
},
'DIAG_PRINC': {
'type': 'keyword'
},
'DIAG_SECUN': {
'type': 'keyword'
},
'DIAR_ACOM': {
'type': 'long'
},
'DIAS_PERM': {
'type': 'long'
},
'DT_INTER': {
'type': 'long'
},
'DT_SAIDA': {
'type': 'long'
},
'ESPEC': {
'type': 'long'
},
'ETNIA': {
'type': 'long'
},
'FAEC_TP': {
'type': 'float'
},
'FINANC': {
'type': 'long'
},
'GESTAO': {
'type': 'long'
},
'GESTOR_COD': {
'type': 'long'
},
'GESTOR_CPF': {
'type': 'long'
},
'GESTOR_DT': {
'type': 'keyword'
},
'GESTOR_TP': {
'type': 'long'
},
'GESTRISCO': {
'type': 'long'
},
'HOMONIMO': {
'type': 'long'
},
'IDADE': {
'type': 'long'
},
'IDENT': {
'type': 'long'
},
'IND_VDRL': {
'type': 'long'
},
'INSC_PN': {
'type': 'keyword'
},
'INSTRU': {
'type': 'long'
},
'MARCA_UCI': {
'type': 'long'
},
'MARCA_UTI': {
'type': 'long'
},
'MES_CMPT': {
'type': 'long'
},
'MORTE': {
'type': 'long'
},
'MUNIC_MOV': {
'type': 'long'
},
'MUNIC_RES': {
'type': 'long'
},
'NACIONAL': {
'type': 'long'
},
'NUM_PROC': {
'type': 'keyword'
},
'NASC': {
'type': 'long'
},
'NATUREZA': {
'type': 'long'
},
'def_regime': {
'type': 'keyword'
},
'NAT_JUR': {
'type': 'long'
},
'NUM_FILHOS': {
'type': 'long'
},
'N_AIH': {
'type': 'long'
},
'PROC_REA': {
'type': 'long'
},
'PROC_SOLIC': {
'type': 'long'
},
'QT_DIARIAS': {
'type': 'long'
},
'RACA_COR': {
'type': 'long'
},
'REGCT': {
'type': 'long'
},
'REMESSA': {
'type': 'keyword'
},
'RUBRICA': {
'type': 'long'
},
'SEQUENCIA': {
'type': 'long'
},
'SEQ_AIH5': {
'type': 'long'
},
'SEXO': {
'type': 'long'
},
'SIS_JUST': {
'type': 'keyword'
},
'TOT_PT_SP': {
'type': 'long'
},
'CPF_AUT': {
'type': 'keyword'
},
'TPDISEC1': {
'type': 'long'
},
'TPDISEC2': {
'type': 'long'
},
'TPDISEC3': {
'type': 'long'
},
'TPDISEC4': {
'type': 'long'
},
'TPDISEC5': {
'type': 'long'
},
'TPDISEC6': {
'type': 'long'
},
'TPDISEC7': {
'type': 'long'
},
'TPDISEC8': {
'type': 'long'
},
'TPDISEC9': {
'type': 'long'
},
'UF_ZI': {
'type': 'long'
},
'US_TOT': {
'type': 'float'
},
'UTI_INT_AL': {
'type': 'long'
},
'UTI_INT_AN': {
'type': 'long'
},
'UTI_INT_IN': {
'type': 'long'
},
'UTI_INT_TO': {
'type': 'long'
},
'UTI_MES_AL': {
'type': 'long'
},
'UTI_MES_AN': {
'type': 'long'
},
'UTI_MES_IN': {
'type': 'long'
},
'UTI_MES_TO': {
'type': 'long'
},
'VAL_ACOMP': {
'type': 'float'
},
'VAL_OBSANG': {
'type': 'float'
},
'VAL_ORTP': {
'type': 'float'
},
'VAL_PED1AC': {
'type': 'float'
},
'VAL_RN': {
'type': 'float'
},
'VAL_SADT': {
'type': 'float'
},
'VAL_SADTSR': {
'type': 'float'
},
'VAL_SANGUE': {
'type': 'float'
},
'VAL_SH': {
'type': 'float'
},
'VAL_SH_FED': {
'type': 'float'
},
'VAL_SH_GES': {
'type': 'float'
},
'VAL_SP': {
'type': 'float'
},
'VAL_SP_FED': {
'type': 'float'
},
'VAL_SP_GES': {
'type': 'float'
},
'VAL_TOT': {
'type': 'float'
},
'VAL_TRANSP': {
'type': 'float'
},
'VAL_UCI': {
'type': 'float'
},
'VAL_UTI': {
'type': 'float'
},
'VINCPREV': {
'type': 'long'
},
'ano_internacao': {
'type': 'long'
},
'ano_saida': {
'type': 'long'
},
'cod_uf_mov': {
'type': 'long'
},
'cod_uf_res': {
'type': 'long'
},
'def_aglr_int': {
'type': 'keyword'
},
'def_aglr_res': {
'type': 'keyword'
},
'def_car_int': {
'type': 'long'
},
'def_cbo': {
'type': 'keyword'
},
'def_cir_int': {
'type': 'keyword'
},
'def_cir_res': {
'type': 'keyword'
},
'def_cobranca': {
'type': 'keyword'
},
'def_cod_idade': {
'type': 'keyword'
},
'def_complex': {
'type': 'keyword'
},
'def_contracep1': {
'type': 'keyword'
},
'def_contracep2': {
'type': 'keyword'
},
'def_csaud_int': {
'type': 'keyword'
},
'def_csaud_res': {
'type': 'keyword'
},
'def_diag_princ_cap': {
'type': 'keyword'
},
'def_diag_princ_cat': {
'type': 'keyword'
},
'def_diag_princ_grupo': {
'type': 'keyword'
},
'def_diag_princ_subcat': {
'type': 'keyword'
},
'def_diag_secun1_cat': {
'type': 'keyword'
},
'def_diag_secun1_grupo': {
'type': 'keyword'
},
'def_diag_secun1_subcat': {
'type': 'keyword'
},
'def_diarias_uti': {
'type': 'long'
},
'def_dias_perm': {
'type': 'long'
},
'def_esferajur': {
'type': 'keyword'
},
'def_etnia': {
'type': 'keyword'
},
'def_faec_tp': {
'type': 'keyword'
},
'def_financ': {
'type': 'keyword'
},
'def_gestao': {
'type': 'keyword'
},
'def_gestrisco': {
'type': 'keyword'
},
'def_homonimo': {
'type': 'keyword'
},
'def_idade_18': {
'type': 'keyword'
},
'def_idade_anos': {
'type': 'long'
},
'def_idade_bas': {
'type': 'keyword'
},
'def_idade_pub': {
'type': 'keyword'
},
'def_ident': {
'type': 'keyword'
},
'def_identific': {
'type': 'keyword'
},
'def_ind_vdrl': {
'type': 'keyword'
},
'def_instru': {
'type': 'keyword'
},
'def_leitos': {
'type': 'keyword'
},
'def_marca_uci': {
'type': 'keyword'
},
'def_marca_uti': {
'type': 'keyword'
},
'def_meso_int': {
'type': 'keyword'
},
'def_meso_res': {
'type': 'keyword'
},
'def_micro_res': {
'type': 'keyword'
},
'def_morte': {
'type': 'keyword'
},
'def_municipio_int': {
'type': 'keyword'
},
'def_municipio_res': {
'type': 'keyword'
},
'def_n_aih': {
'type': 'long'
},
'def_nacionalidade': {
'type': 'keyword'
},
'def_nat_jur': {
'type': 'keyword'
},
'def_num_filhos': {
'type': 'keyword'
},
'def_procedimento_realizado': {
'type': 'keyword'
},
'def_procedimento_solicitado': {
'type': 'keyword'
},
'def_raca_cor': {
'type': 'keyword'
},
'def_reg_metr_int': {
'type': 'keyword'
},
'def_reg_metr_res': {
'type': 'keyword'
},
'def_regct': {
'type': 'keyword'
},
'def_regiao_int': {
'type': 'keyword'
},
'def_regiao_res': {
'type': 'keyword'
},
'def_rsaud_int': {
'type': 'keyword'
},
'def_rsaud_res': {
'type': 'keyword'
},
'def_secun1_princ_cap': {
'type': 'keyword'
},
'def_seq_aih5': {
'type': 'keyword'
},
'def_sexo': {
'type': 'keyword'
},
'def_tpdisec1': {
'type': 'keyword'
},
'def_tpdisec2': {
'type': 'keyword'
},
'def_tpdisec3': {
'type': 'keyword'
},
'def_tpdisec4': {
'type': 'keyword'
},
'def_tpdisec5': {
'type': 'keyword'
},
'def_tpdisec6': {
'type': 'keyword'
},
'def_tpdisec7': {
'type': 'keyword'
},
'def_tpdisec8': {
'type': 'keyword'
},
'def_tpdisec9': {
'type': 'keyword'
},
'def_uf_int': {
'type': 'keyword'
},
'def_uf_res': {
'type': 'keyword'
},
'def_uf_sigla_int': {
'type': 'keyword'
},
'def_uf_sigla_res': {
'type': 'keyword'
},
'def_uti_mes_to': {
'type': 'long'
},
'dia_semana_internacao': {
'type': 'keyword'
},
'dia_semana_saida': {
'type': 'keyword'
},
'dt_inter': {
'type': 'date'
},
'dt_saida': {
'type': 'date'
},
'idade_nascimento_anos': {
'type': 'long'
},
'ocor_ALTITUDE': {
'type': 'long'
},
'ocor_AMAZONIA': {
'type': 'keyword'
},
'ocor_AREA': {
'type': 'float'
},
'ocor_CAPITAL': {
'type': 'keyword'
},
'ocor_CSAUDCOD': {
'type': 'long'
},
'ocor_FRONTEIRA': {
'type': 'keyword'
},
'ocor_LATITUDE': {
'type': 'float'
},
'ocor_LONGITUDE': {
'type': 'float'
},
'ocor_MSAUDCOD': {
'type': 'long'
},
'ocor_MUNCOD': {
'type': 'long'
},
'ocor_MUNNOME': {
'type': 'keyword'
},
'ocor_MUNNOMEX': {
'type': 'keyword'
},
'ocor_RSAUDCOD': {
'type': 'long'
},
'ocor_UFCOD': {
'type': 'long'
},
'ocor_codigo_adotado': {
'type': 'long'
},
'ocor_coordenadas': {
'type': 'geo_point'
},
'ocor_uf_SIGLA_UF': {
'type': 'keyword'
},
'ocorreu_uf_NOME': {
'type': 'keyword'
},
'res_ALTITUDE': {
'type': 'long'
},
'res_AMAZONIA': {
'type': 'keyword'
},
'res_AREA': {
'type': 'float'
},
'res_CAPITAL': {
'type': 'keyword'
},
'res_CSAUDCOD': {
'type': 'long'
},
'res_FRONTEIRA': {
'type': 'keyword'
},
'res_LATITUDE': {
'type': 'float'
},
'res_LONGITUDE': {
'type': 'float'
},
'res_MSAUDCOD': {
'type': 'long'
},
'res_MUNCOD': {
'type': 'long'
},
'res_MUNNOME': {
'type': 'keyword'
},
'res_MUNNOMEX': {
'type': 'keyword'
},
'res_RSAUDCOD': {
'type': 'long'
},
'res_UFCOD': {
'type': 'long'
},
'res_codigo_adotado': {
'type': 'long'
},
'res_coordenadas': {
'type': 'geo_point'
},
'res_uf_CODIGO_UF': {
'type': 'long'
},
'res_uf_NOME_UF': {
'type': 'keyword'
},
'res_uf_SIGLA_UF': {
'type': 'keyword'
}
}
}
}
}
#cria índice no Elasticsearch
es.indices.create(index=indice,body=cnes_type)
data_prepared = dataiku.Dataset("DATA_prep")
def get_metric(project_name,dataset_name,metric_ids):
client = dataiku.api_client()
current_project = client.get_project(project_name)
dataset = current_project.get_dataset(dataset_name)
metrics = dataset.compute_metrics(partition='ALL', metric_ids=metric_ids)
metrics = [{'metric':m["metricId"],'value':int(m["value"])} for m in metrics["result"]["computed"] if m["metricId"] in metric_ids][0]
return metrics
def record_count(project_name,dataset_name):
return get_metric(project_name,dataset_name,['records:COUNT_RECORDS'])['value']
nrows = record_count('ETLSIH','DATA_prep')
def geraJson(df):
return json.loads(df.T.to_json())
#tamanho do chunk
chunksize = 10000
#número total de chunks a serem indexados
nchunks = nrows/chunksize
#imprime o número total de documentos a serem indexados
print("Documentos: %i\n"%nrows)
res_bulk=[]
for chunk,df in enumerate(data_prepared.iter_dataframes(chunksize=chunksize)):
#gera o json do chunk de dados atual (formato pronto para indexação)
data_json = geraJson(df)
#imprime o número do chunk atual e o total de chunks a serem indexados
print("Chunk: %i/%i"%(chunk,nchunks))
#cria lista de ações para indexação de cada documento do chunk atual
lista=[]
for i, item in enumerate(data_json.values()):
data_dict = {
'_op_type': 'index',
'_index': indice,
'_type': doc_type,
'_source': item
}
lista.append(data_dict)
#indexa todos os documentos do chunk atual (bulk indexa em chunks)
res = helpers.bulk(client=es, actions=lista, chunk_size=1000, raise_on_error=False, raise_on_exception = False)
res_bulk.append(res)
print(res)
res_df = pd.DataFrame(res_bulk)
res_df.columns = ['indexed_chunksize', 'errors']
# Write recipe outputs
res_Elasticsearch = dataiku.Dataset("bulk_elasticsearch")
res_Elasticsearch.write_with_schema(res_df)