pythoninfluxdbinfluxdb-python

odd result when ingesting dataframe into influxdb 2.x


I'm using python to ingest data into influxdb,

but many examples just don't work (empty table, but no error reported), and one example with odd result.

With client set, I create dataframe by:

from datetime import datetime
from datetime import timedelta

_now = datetime.utcnow()
_data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]],
                           index=[_now, _now + timedelta(hours=1)],
                           columns=["location", "water_level"])

then I get the data as:

_data_frame
Out[24]: 
                                location  water_level
2023-03-07 02:04:11.642867  coyote_creek          1.0
2023-03-07 03:04:11.642867  coyote_creek          2.0

Thus, I should have two data after ingesting, however, when I ingest data by:

write_api.write("testing_for_dataframe", "org", record=_data_frame, data_frame_measurement_name='h2o_feet',
                            data_frame_tag_columns=['location'])

I only queried one row of data by:

query_api.query_data_frame('from(bucket:"testing_for_dataframe")|> range(start: -10m)')

I got:

Out[31]: 
    result  table  ... _measurement      location
0  _result      0  ...     h2o_feet  coyote_creek
[1 rows x 9 columns]

It can totally work when try example of:

from influxdb_client import InfluxDBClient, Point, Dialect
from influxdb_client.client.write_api import SYNCHRONOUS

client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")

write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()

"""
Prepare data
"""

_point1 = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)
_point2 = Point("my_measurement").tag("location", "New York").field("temperature", 24.3)

write_api.write(bucket="my-bucket", record=[_point1, _point2])

"""
Query: using Pandas DataFrame
"""
data_frame = query_api.query_data_frame('from(bucket:"my-bucket") '
                                        '|> range(start: -10m) '
                                        '|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") '
                                        '|> keep(columns: ["location", "temperature"])')
print(data_frame.to_string())

"""
Close client
"""
client.close()

I'm not familiar with influxdb querying grammar, but it is not the key point since I check on dashboard which also shows that it only has one row of data.

enter image description here

MY PROBLEM IS: "Sometimes, it cannot ingest any of the data, only create the named table with empty result; Meanwhile, it will ingest part of the data and lost some without error; Besides, there are correctly worked example, so it's not due to other set issue i guess."

I want to ingest a dataframe, and want to see it in dashboard (means it can be queried in python)


Now I tested official example:


import pandas as pd

from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS, PointSettings

"""
Load DataFrame form CSV File
"""
df = pd.read_csv("vix-daily.csv")
print(df.head())

with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
    """
    Ingest DataFrame with default tags
    """
    point_settings = PointSettings(**{"type": "vix-daily"})
    point_settings.add_default_tag("example-name", "ingest-data-frame")

    write_api = client.write_api(write_options=SYNCHRONOUS, point_settings=point_settings)
    write_api.write(bucket="my-bucket", record=df, data_frame_measurement_name="financial-analysis-df")

    """
    Querying ingested data
    """
    query = 'from(bucket:"my-bucket")' \
            ' |> range(start: 0, stop: now())' \
            ' |> filter(fn: (r) => r._measurement == "financial-analysis-df")' \
            ' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' \
            ' |> limit(n:10, offset: 0)'
    result = client.query_api().query(query=query)

    """
    Processing results
    """
    print()
    print("=== results ===")
    print()
    for table in result:
        for record in table.records:
            print('{4}: Open {0}, Close {1}, High {2}, Low {3}'.format(record["VIX Open"], record["VIX Close"],
                                                                       record["VIX High"], record["VIX Low"],
                                                                       record["type"]))

And get:

         Date  VIX Open  VIX High  VIX Low  VIX Close
0  2004-01-02     17.96     18.68    17.54      18.22
1  2004-01-05     18.45     18.49    17.44      17.49
2  2004-01-06     17.66     17.67    16.19      16.73
3  2004-01-07     16.72     16.75    15.50      15.50
4  2004-01-08     15.42     15.68    15.32      15.61
=== results ===
vix-daily: Open 17.96, Close 18.22, High 18.68, Low 17.54
vix-daily: Open 18.45, Close 17.49, High 18.49, Low 17.44
vix-daily: Open 17.66, Close 16.73, High 17.67, Low 16.19
vix-daily: Open 16.72, Close 15.5, High 16.75, Low 15.5
vix-daily: Open 15.42, Close 15.61, High 15.68, Low 15.32
vix-daily: Open 16.15, Close 16.75, High 16.88, Low 15.57
vix-daily: Open 17.32, Close 16.82, High 17.46, Low 16.79
vix-daily: Open 16.6, Close 18.04, High 18.33, Low 16.53
vix-daily: Open 17.29, Close 16.75, High 17.3, Low 16.4
vix-daily: Open 17.07, Close 15.56, High 17.31, Low 15.49

BUT I STILL CANNOT SEE IT IN DASHBOARD, why?

It can be queried but not visible in dashboard.


Solution

  • I think it is all related on how you are filtering data through timestamps.

    In the first example, you are creating two data points, one with timestamp "now" (i.e. 02:04:11) and one with timestamp one hour in the future (03:04:11). In your query through the Python client, you are asking for the last ten minutes of data since "now" (range(start: -10m)), so you are querying data between 01:54:11 and 02:04:11. Only the first datapoint is returned, because the second datapoint is not in this time window. On the dashboard, on the other hand, you are asking for the last hour of data (see the Time Range filter in the dropdown menu on the left of the "script editor" button, here). You ran the query a while after your script launch, and only data between 01:16:15 and 02:16:15 is queried, thus excluding again the second datapoint.

    I'm not sure about the second example, I have too little information about it, but you will probably need to adjust the Time Range filter to include 2004 data.