This question is related to this other one: How can I read data from a list and index specific values into Elasticsearch, using python?
I have written a script to read a list ("dummy") and index it into Elasticsearch. I converted the list into a list of dictionaries and used the "Bulk" API to index it into Elasticsearch. The script used to work (check the attached link to the related question). But it is no longer working after adding "timestamp" and the function "initialize_elasticsearch".
So, what is wrong? Should I be using JSON instead of the list of dictionaries?
I have also tried using only 1 dictionary of the list. In that case there is no error but nothing gets indexed.
THIS IS THE ERROR
THIS IS THE LIST (dummy)
[
"labels: imagenet_labels.txt ",
"Model: efficientnet-edgetpu-S_quant_edgetpu.tflite ",
"Image: insect.jpg ",
"Time(ms): 23.1",
"Time(ms): 5.7",
"Inference: corkscrew, bottle screw",
"Score: 0.03125 ",
"TPU_temp(°C): 57.05",
"labels: imagenet_labels.txt ",
"Model: efficientnet-edgetpu-M_quant_edgetpu.tflite ",
"Image: insect.jpg ",
"Time(ms): 29.3",
"Time(ms): 10.8",
"Inference: dragonfly, darning needle, devil's darning needle, sewing needle, snake feeder, snake doctor, mosquito hawk, skeeter hawk",
"Score: 0.09375 ",
"TPU_temp(°C): 56.8",
"labels: imagenet_labels.txt ",
"Model: efficientnet-edgetpu-L_quant_edgetpu.tflite ",
"Image: insect.jpg ",
"Time(ms): 45.6",
"Time(ms): 31.0",
"Inference: pick, plectrum, plectron",
"Score: 0.09766 ",
"TPU_temp(°C): 57.55",
"labels: imagenet_labels.txt ",
"Model: inception_v3_299_quant_edgetpu.tflite ",
"Image: insect.jpg ",
"Time(ms): 68.8",
"Time(ms): 51.3",
"Inference: ringlet, ringlet butterfly",
"Score: 0.48047 ",
"TPU_temp(°C): 57.3",
"labels: imagenet_labels.txt ",
"Model: inception_v4_299_quant_edgetpu.tflite ",
"Image: insect.jpg ",
"Time(ms): 121.8",
"Time(ms): 101.2",
"Inference: admiral",
"Score: 0.59375 ",
"TPU_temp(°C): 57.05",
"labels: imagenet_labels.txt ",
"Model: inception_v2_224_quant_edgetpu.tflite ",
"Image: insect.jpg ",
"Time(ms): 34.3",
"Time(ms): 16.6",
"Inference: lycaenid, lycaenid butterfly",
"Score: 0.41406 ",
"TPU_temp(°C): 57.3",
"labels: imagenet_labels.txt ",
"Model: mobilenet_v2_1.0_224_quant_edgetpu.tflite ",
"Image: insect.jpg ",
"Time(ms): 14.4",
"Time(ms): 3.3",
"Inference: leatherback turtle, leatherback, leathery turtle, Dermochelys coriacea",
"Score: 0.36328 ",
"TPU_temp(°C): 57.3",
"labels: imagenet_labels.txt ",
"Model: mobilenet_v1_1.0_224_quant_edgetpu.tflite ",
"Image: insect.jpg ",
"Time(ms): 14.5",
"Time(ms): 3.0",
"Inference: bow tie, bow-tie, bowtie",
"Score: 0.33984 ",
"TPU_temp(°C): 57.3",
"labels: imagenet_labels.txt ",
"Model: inception_v1_224_quant_edgetpu.tflite ",
"Image: insect.jpg ",
"Time(ms): 21.2",
"Time(ms): 3.6",
"Inference: pick, plectrum, plectron",
"Score: 0.17578 ",
"TPU_temp(°C): 57.3",
]
THIS IS THE SCRIPT
import elasticsearch6
from elasticsearch6 import Elasticsearch, helpers
import datetime
import re
ES_DEV_HOST = "http://localhost:9200/"
INDEX_NAME = "coral_ia" #name of index
DOC_TYPE = 'coral_edge' #type of data
##This is the list
dummy = ['labels: imagenet_labels.txt \n', '\n', 'Model: efficientnet-edgetpu-S_quant_edgetpu.tflite \n', '\n', 'Image: insect.jpg \n', '\n', '*The first inference on Edge TPU is slow because it includes loading the model into Edge TPU memory*\n', 'Time(ms): 23.1\n', 'Time(ms): 5.7\n', '\n', '\n', 'Inference: corkscrew, bottle screw\n', 'Score: 0.03125 \n', '\n', 'TPU_temp(°C): 57.05\n', '##################################### \n', '\n', 'labels: imagenet_labels.txt \n', '\n', 'Model: efficientnet-edgetpu-M_quant_edgetpu.tflite \n', '\n', 'Image: insect.jpg \n', '\n', '*The first inference on Edge TPU is slow because it includes loading the model into Edge TPU memory*\n', 'Time(ms): 29.3\n', 'Time(ms): 10.8\n', '\n', '\n', "Inference: dragonfly, darning needle, devil's darning needle, sewing needle, snake feeder, snake doctor, mosquito hawk, skeeter hawk\n", 'Score: 0.09375 \n', '\n', 'TPU_temp(°C): 56.8\n', '##################################### \n', '\n', 'labels: imagenet_labels.txt \n', '\n', 'Model: efficientnet-edgetpu-L_quant_edgetpu.tflite \n', '\n', 'Image: insect.jpg \n', '\n', '*The first inference on Edge TPU is slow because it includes loading the model into Edge TPU memory*\n', 'Time(ms): 45.6\n', 'Time(ms): 31.0\n', '\n', '\n', 'Inference: pick, plectrum, plectron\n', 'Score: 0.09766 \n', '\n', 'TPU_temp(°C): 57.55\n', '##################################### \n', '\n', 'labels: imagenet_labels.txt \n', '\n', 'Model: inception_v3_299_quant_edgetpu.tflite \n', '\n', 'Image: insect.jpg \n', '\n', '*The first inference on Edge TPU is slow because it includes loading the model into Edge TPU memory*\n', 'Time(ms): 68.8\n', 'Time(ms): 51.3\n', '\n', '\n', 'Inference: ringlet, ringlet butterfly\n', 'Score: 0.48047 \n', '\n', 'TPU_temp(°C): 57.3\n', '##################################### \n', '\n', 'labels: imagenet_labels.txt \n', '\n', 'Model: inception_v4_299_quant_edgetpu.tflite \n', '\n', 'Image: insect.jpg \n', '\n', '*The first inference on Edge TPU is slow because it includes loading the model into Edge TPU memory*\n', 'Time(ms): 121.8\n', 'Time(ms): 101.2\n', '\n', '\n', 'Inference: admiral\n', 'Score: 0.59375 \n', '\n', 'TPU_temp(°C): 57.05\n', '##################################### \n', '\n', 'labels: imagenet_labels.txt \n', '\n', 'Model: inception_v2_224_quant_edgetpu.tflite \n', '\n', 'Image: insect.jpg \n', '\n', '*The first inference on Edge TPU is slow because it includes loading the model into Edge TPU memory*\n', 'Time(ms): 34.3\n', 'Time(ms): 16.6\n', '\n', '\n', 'Inference: lycaenid, lycaenid butterfly\n', 'Score: 0.41406 \n', '\n', 'TPU_temp(°C): 57.3\n', '##################################### \n', '\n', 'labels: imagenet_labels.txt \n', '\n', 'Model: mobilenet_v2_1.0_224_quant_edgetpu.tflite \n', '\n', 'Image: insect.jpg \n', '\n', '*The first inference on Edge TPU is slow because it includes loading the model into Edge TPU memory*\n', 'Time(ms): 14.4\n', 'Time(ms): 3.3\n', '\n', '\n', 'Inference: leatherback turtle, leatherback, leathery turtle, Dermochelys coriacea\n', 'Score: 0.36328 \n', '\n', 'TPU_temp(°C): 57.3\n', '##################################### \n', '\n', 'labels: imagenet_labels.txt \n', '\n', 'Model: mobilenet_v1_1.0_224_quant_edgetpu.tflite \n', '\n', 'Image: insect.jpg \n', '\n', '*The first inference on Edge TPU is slow because it includes loading the model into Edge TPU memory*\n', 'Time(ms): 14.5\n', 'Time(ms): 3.0\n', '\n', '\n', 'Inference: bow tie, bow-tie, bowtie\n', 'Score: 0.33984 \n', '\n', 'TPU_temp(°C): 57.3\n', '##################################### \n', '\n', 'labels: imagenet_labels.txt \n', '\n', 'Model: inception_v1_224_quant_edgetpu.tflite \n', '\n', 'Image: insect.jpg \n', '\n', '*The first inference on Edge TPU is slow because it includes loading the model into Edge TPU memory*\n', 'Time(ms): 21.2\n', 'Time(ms): 3.6\n', '\n', '\n', 'Inference: pick, plectrum, plectron\n', 'Score: 0.17578 \n', '\n', 'TPU_temp(°C): 57.3\n', '##################################### \n', '\n']
#This is to clean data and filter some values
regex = re.compile(r'(\w+)\((.+)\):\s(.*)|(\w+:)\s(.*)')
match_regex = list(filter(regex.match, dummy))
match = [line.strip('\n') for line in match_regex]
print("match list", match, "\n")
##Converts the list into a list of dictionaries
groups = [{}]
for line in match:
key, value = line.split(": ", 1)
if key == "labels":
if groups[-1]:
groups.append({})
groups[-1][key] = value
"""
Initialize Elasticsearch by server's IP'
"""
def initialize_elasticsearch():
n = 0
while n <= 10:
try:
es = Elasticsearch(ES_DEV_HOST)
print("Initializing Elasticsearch...")
return es
except elasticsearch6.exceptions.ConnectionTimeout as e: ###elasticsearch
print(e)
n += 1
continue
raise Exception
"""
Create an index in Elasticsearch if one isn't already there
"""
def initialize_mapping(es):
mapping_classification = {
'properties': {
'timestamp': {'type': 'date'},
#'type': {'type':'keyword'}, <--- I have removed this
'labels': {'type': 'keyword'},
'Model': {'type': 'keyword'},
'Image': {'type': 'keyword'},
'Time(ms)': {'type': 'short'},
'Inference': {'type': 'text'},
'Score': {'type': 'short'},
'TPU_temp(°C)': {'type': 'short'}
}
}
print("Initializing the mapping ...")
if not es.indices.exists(INDEX_NAME):
es.indices.create(INDEX_NAME)
es.indices.put_mapping(body=mapping_classification, doc_type=DOC_TYPE, index=INDEX_NAME)
def generate_actions():
actions = {
'_index': INDEX_NAME,
'timestamp': str(datetime.datetime.utcnow().strftime("%Y-%m-%d"'T'"%H:%M:%S")),
'_type': DOC_TYPE,
'_source': groups
}
yield actions
print("Generating actions ...")
#print("actions:", actions)
#print(type(actions), "\n")
def main():
es=initialize_elasticsearch()
initialize_mapping(es)
try:
res=helpers.bulk(client=es, index = INDEX_NAME, actions = generate_actions())
print ("\nhelpers.bulk() RESPONSE:", res)
print ("RESPONSE TYPE:", type(res))
except Exception as err:
print("\nhelpers.bulk() ERROR:", err)
if __name__ == "__main__":
main()
THIS IS THE CODE WHEN TESTING WITH ONLY 1 DICTIONARY
regex = re.compile(r'(\w+)\((.+)\):\s(.*)|(\w+:)\s(.*)')
match_regex = list(filter(regex.match, dummy))
match = [line.rstrip('\n') for line in match_regex] #quita los saltos de linea
#print("match list", match, "\n")
features_wanted='ModelImageTime(ms)InferenceScoreTPU_temp(°C)'
match_out = {i.replace(' ','').split(':')[0]:i.replace(' ','').split(':')[1] for i in match if i.replace(' ','').split(':')[0] in features_wanted}
-------------------EDIT-------------------------
No errors, but "Generating actions ..." is not being printed.
THIS IS THE MAPPING
THIS APPEARS WHEN I WANT TO SEE IF THE DATA WAS INDEXED
IT SEEMS THE DATA HAS BEEN INDEXED...
----------------------EDIT-----------------------
I modified the generate_actions
def generate_actions():
return[{
'_index': INDEX_NAME,
'_type': DOC_TYPE,
'_source': {
"any": doc,
"@timestamp": str(datetime.datetime.utcnow().strftime("%Y-%m-%d"'T'"%H:%M:%S")),}
}
for doc in groups]
This somewhat cryptic error msg is telling you that you need to pass single objects (instead of an array of them) to the bulk helpers.
So you need to rewrite your generate_actions
fn like so:
def generate_actions():
return [{
'timestamp': str(datetime.datetime.utcnow().strftime("%Y-%m-%d"'T'"%H:%M:%S")),
'_index': INDEX_NAME,
'_type': DOC_TYPE,
'_source': doc
} for doc in groups] # <----- note the for loop here. `_source` needs
# to be the doc, not the whole groups list
print("Generating actions ...")
Also, I'd recommend to remove the trailing whitespace from your key-value pairs when you construct the groups
:
groups[-1][key] = value.strip()