pythonpython-3.xelasticsearchrestful-url

How to recover streaming data even if the connection lost with Python client Elasticsearch?


I stream RESTful API data from https://www.n2yo.com/api/ which for tracking satellite positions. I use python client with Elasticsearch. I save the streamed data to ES every 10 seconds and visualized by Kibana. My ES vesrion is 6.4.3

My code is:

URL = "https://www.n2yo.com/rest/v1/satellite/positions/25544/41.702/-76.014/0/2/&apiKey= your key"



es = Elasticsearch('http://ip:port',timeout=600)

settings = { "settings": {
                 "number_of_shards":1,
                  'number_of_replicas':0
                 },
      "mappings" : { 
           "document" : {
                "properties":{
                    "geo": {
                       "type": "geo_point"
                            }
                          }
                        } 
                     } 
                  }
try:
 es.indices.create(index = "spacestation", body=settings)
except RequestError as es1:
 print('Index already exists!!')
 sys.exit(1)

def collect_data():
  data = requests.get(url = URL).json() 
  del data['positions'][1]
  new_data = {'geo':{'lat':data['positions'][0]['satlatitude'],
               'lon':data['positions'][0]['satlongitude']}, 
                'satname': data['info']['satname'], 'satid': data['info']['satid'], 
                  'timestamp':datetime.fromtimestamp(data['positions'][0]['timestamp']).isoformat()        
              }

  es.index(index='spacestation', doc_type='document', body=new_data)

schedule.every(10).seconds.do(collect_data)

while True:
  schedule.run_pending()
  time.sleep(1) 

My question is: Yesterday I lost the connection. The error is as below,

requests.exceptions.ConnectionError: HTTPSConnectionPool(host='www.n2yo.com', port=443): Max retries exceeded with url: /rest/v1/satellite/positions/25544/41.702/-76.014/0/2/&apiKey= (Caused by NewConnectionError(': Failed to establish a new connection: [Errno -3] Temporary failure in name resolution',))

When I re-run my code, I can't because the index is already exist. If I delete the index I will lost my data which already in ES. What I can do? I need to keep my saved data and I need to run the job from now. Any solutions please?


Solution

  • Just create the index only if you receive the data from n2yo.com . you should use the function es.indices.exists .Then you make your function collect_data recursive in case of failure. Try:

     URL = "https://www.n2yo.com/rest/v1/satellite/positions/25544/41.702/-76.014/0/2/&apiKey= your key"
    
    
    
    es = Elasticsearch('http://ip:port',timeout=600)
    
    def create_index()
    
        if not es.indices.exists(index = "spacestation"):
    
            settings = { "settings": {
                     "number_of_shards":1,
                      'number_of_replicas':0
                     },
          "mappings" : { 
               "document" : {
                    "properties":{
                        "geo": {
                           "type": "geo_point"
                                }
                              }
                            } 
                         } 
                      }
              es.indices.create(index = "spacestation", body=settings)
        else:
            print('Index already exists!!')
            
    
    def collect_data():
      try:
          data = requests.get(url = URL).json()
          create_index() 
          del data['positions'][1]
          new_data = {'geo':{'lat':data['positions'][0]['satlatitude'],
                   'lon':data['positions'][0]['satlongitude']}, 
                    'satname': data['info']['satname'], 'satid': data['info']['satid'], 
                      'timestamp':datetime.fromtimestamp(data['positions'][0]['timestamp']).isoformat()        
                  }
    
          es.index(index='spacestation', doc_type='document', body=new_data)
        except ConnectionError:
            collect_data()
    
    schedule.every(10).seconds.do(collect_data)
    
    while True:
      schedule.run_pending()
      time.sleep(1)