When writing time series data from a pandas
dataframe to an InfluxDB bucket, to check whether a specific row of data already exists in the bucket (and thus prevent data from being written again).
Format of the time series data that exists in the pandas dataframe (sample):
epoch,open,high,low,close,volume
1332374520.0,2.341,2.341,2.341,2.341,1.0
1332374700.0,2.343,2.343,2.343,2.343,1.0
1332374940.0,2.344,2.344,2.344,2.344,1.0
1332375420.0,2.344,2.344,2.344,2.344,2.0
1332375660.0,2.344,2.344,2.344,2.344,2.0
1332376080.0,2.344,2.344,2.344,2.344,1.0
The current Python program, as seen below, isn't detecting that the same data has already been written to the database
bucket. If the program is run over and over, the output from print
statement should be visible notifying that duplicate data has been detected.
import os
import pandas as pd
from tqdm import tqdm
from influxdb_client import InfluxDBClient, Point, WriteOptions
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.client.query_api import QueryApi
# Example OHLCV data
data = {
"epoch": [1330902000, 1330902060, 1330902120],
"open": [2.55, 2.532, 2.537],
"high": [2.55, 2.538, 2.549],
"low": [2.521, 2.531, 2.537],
"close": [2.534, 2.538, 2.548],
"volume": [150, 69, 38]
}
concat_of_all_dfs = pd.DataFrame(data)
def data_point_exists(epoch, bucket, org):
query = f'''
from(bucket: "{bucket}")
|> range(start: 0)
|> filter(fn: (r) => r["_measurement"] == "ohlcv")
|> filter(fn: (r) => r["epoch"] == {epoch})
'''
result = query_api.query(org=org, query=query)
return len(result) > 0
if __name__ == "__main__":
# Database credentials
token = os.getenv('INFLUXDB_TOKEN')
bucket = "bucket_test"
org = "organisation_test"
url = "http://localhost:8086"
# Initialize InfluxDB Client
client = InfluxDBClient(url=url, token=token, org=org)
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()
# Write data points one by one
for index, row in tqdm(concat_of_all_dfs.iterrows(), total=len(concat_of_all_dfs)):
epoch = row['epoch']
if not data_point_exists(epoch, bucket, org):
point = Point("ohlcv") \
.field("epoch", row['epoch']) \
.field("open", row['open']) \
.field("high", row['high']) \
.field("low", row['low']) \
.field("close", row['close']) \
.field("volume", row['volume'])
write_api.write(bucket=bucket, org=org, record=point)
else:
print(f"Data point for epoch {epoch} already exists. Skipping...")
client.close()
Are there any mistakes in the above code that would prevent repeat data from being detected (possibly in the querying function seen via the flux script, or anywhere else)?
I solved the issue by modifying the query code to search for the epoch
in the table for each iteration. It would be good to optimise this program for writing in batches to the database whilst retaining the functionality of checking for duplicates as it's quite slow, but for now this works.
Please note some functions relating to the pandas
dataframe have been omitted.
import os
import pandas as pd
from tqdm import tqdm
from influxdb_client import InfluxDBClient, Point, WriteOptions
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.client.query_api import QueryApi
import datetime
def data_point_exists(epoch, bucket, org):
# Define the Flux query to fetch all fields
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: 0) // adjust the range as needed
|> filter(fn: (r) => r["_measurement"] == "ohlcv")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> keep(columns: ["_time", "epoch", "open", "high", "low", "close", "volume"])
'''
# Execute the query
result = query_api.query(org=org, query=flux_query)
# Search to see if epoch exists
for table in result:
for record in table.records:
if record["epoch"] == epoch:
return True
return False
if __name__ == "__main__":
# Database credentials
token = os.getenv('INFLUXDB_TOKEN')
bucket = "bucket"
org = "org"
url = "http://localhost:8086"
# Initialize InfluxDB Client
client = InfluxDBClient(url=url, token=token, org=org)
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()
# Write data points one by one
for index, row in tqdm(concat_of_all_dfs.iterrows(), total=len(concat_of_all_dfs)):
epoch = row['epoch']
flag = data_point_exists(epoch, bucket, org)
if flag:
print("Repeat detected")
else:
point = Point("ohlcv") \
.field("epoch", row['epoch']) \
.field("open", row['open']) \
.field("high", row['high']) \
.field("low", row['low']) \
.field("close", row['close']) \
.field("volume", row['volume'])
write_api.write(bucket=bucket, org=org, record=point)
client.close()