pythonrethinkdb

How to query RethinkDB based on the current time


I'm trying to write a 'controller' program for a RethinkDB database which continuously dumps to JSON and deletes data which is older than 3 days, using RethinkDB's changefeed feature.

The problem is that the query 'hangs' from the current time, which is evaluated using datetime.utcnow() (or, alternatively, rethinkdb.now()) at the time the query is defined, remaining fixed thereafter. So as the changefeed progresses, the query becomes 'outdated'.

How can I make a query which is continuously 'updated' to reflect the current time?

To illustrate the problem, here is the script so far:

import json
import rethinkdb as r
import pytz
from datetime import datetime, timedelta

# The database and table are assumed to have been previously created
database_name = "sensor_db"
table_name = "sensor_data"
table = r.db(database_name).table(table_name)

port_offset = 1         # To avoid interference of this testing program with the main program, all ports are initialized at an offset of 1 from the default ports using "rethinkdb --port_offset 1" at the command line.
conn = r.connect("localhost", 28015 + port_offset)

current_time = datetime.utcnow().replace(tzinfo=pytz.utc)   # Current time including timezone (assumed to be UTC)
retention_period = timedelta(days=3)                        # Period of time during which data is retained on the main server
expiry_time = current_time - retention_period               # Age at which data is removed from the main database

if "timestamp" in table.index_list().run(conn):         # Assuming the table has "timestamp" as a secondary index, use "between" (for improved speed)
    beginning_of_time = r.time(1400, 1, 1, 'Z')    # The minimum time of a ReQL time object (the year 1400)
    data_to_archive = table.between(beginning_of_time, expiry_time, index="timestamp")
else:     # Else, use "filter" (requires more memory, but does not require "timestamp" to be a secondary index)
    data_to_archive = table.filter(r.row['timestamp'] < expiry_time)

output_file = "archived_sensor_data.json"
with open(output_file, 'a') as f:
    for change in data_to_archive.changes().run(conn, time_format="raw"):        # The time_format="raw" option is passed to prevent a "RqlTzinfo object is not JSON serializable" error when dumping
        if change['new_val'] is not None:               # If the change is not a deletion
            print change
            json.dump(change['new_val'], f)             # Since the main database we are reading from is append-only, the 'old_val' of the change is always None and we are interested in the 'new_val' only
            f.write("\n")                               # Separate entries by a new line
            ID_to_delete = change['new_val']['id']                # Get the ID of the data to be deleted from the database
            r.db(database_name).table(table_name).get(ID_to_delete).delete().run(conn)

The query is stored in the variable data_to_archive. However, the time interval in the between statement is based on the utcnow() when the current_time variable is defined, and is not continuously updated in the changefeed. How could I make it so?


Solution

  • I finally worked around the problem by doing the dumps in 'batch' mode rather than continuously using changes(). (To wit, I'm using the schedule module).

    Here is the script:

    import json
    import rethinkdb as r
    import pytz
    from datetime import datetime, timedelta
    import schedule
    import time
    import functools
    
    def generate_archiving_query(retention_period=timedelta(days=3), database_name="ipercron", table_name="sensor_data", conn=None):
        if conn is None:
            conn = r.connect("localhost", 28015)
    
        table = r.db(database_name).table(table_name)               # RethinkDB cursor for the table of interest
        current_time = r.now()
        expiry_time = current_time - retention_period.total_seconds()
    
        if "timestamp" in table.index_list().run(conn):         # If the table has "timestamp" as a secondary index, use "between" (for improved speed)
            beginning_of_time = r.time(1400, 1, 1, 'Z')         # The minimum time of a ReQL time object (the year 1400)
            data_to_archive = table.between(beginning_of_time, expiry_time, index="timestamp")
        else:                                                   # Else, use "filter" (requires more memory, but does not require "timestamp" to be a secondary index)
            data_to_archive = table.filter(r.row['timestamp'] < expiry_time)
    
        # try:
        #     beginning_of_time = r.time(1400, 1, 1, 'Z')         # The minimum time of a ReQL time object (the year 1400)
        #     data_to_archive = table.between(beginning_of_time, expiry_time, index="timestamp")
        # except:
        #     data_to_archive = table.filter(r.row['timestamp'] < expiry_time)
    
        return data_to_archive
    
    def archiving_job(data_to_archive=None, output_file="archived_sensor_data.json", database_name="ipercron", table_name="sensor_data", conn=None):
        if data_to_archive is None:
            data_to_archive = generate_archiving_query()
        if conn is None:
            conn = r.connect("localhost", 28015)
    
        table = r.db(database_name).table(table_name)
        old_data = data_to_archive.run(conn, time_format="raw")         # Without time_format="raw" the output does not dump to JSON
        with open(output_file, 'a') as f:
            ids_to_delete = []
            for item in old_data:
                print item
                json.dump(item, f)
                f.write('\n')                                           # Separate each document by a new line
                ids_to_delete.append(item['id'])
                # table.get(item['id']).delete().run(conn)
    
        table.get_all(r.args(ids_to_delete)).delete().run(conn)
    
    if __name__ == "__main__":
        # The database and table are assumed to have been previously created
        database_name = "ipercron"
        table_name = "sensor_data"
        # table = r.db(database_name).table(table_name)
    
        port_offset = 1         # To avoid interference of this testing program with the main program, all ports are initialized at an offset of 1 from the default ports using "rethinkdb --port_offset 1" at the command line.
        conn = r.connect("localhost", 28015 + port_offset)
    
        clean_slate = True
        if clean_slate:
            r.db(database_name).table(table_name).delete().run(conn)            # For testing, start with an empty table and add a fixed amount of data
            import rethinkdb_add_data
    
        data_to_archive = generate_archiving_query(conn=conn, database_name=database_name, table_name=table_name)        # Because r.now() is evaluated upon run(), the query needs only to be generated once
        archiving_job_fixed_query = functools.partial(archiving_job, data_to_archive=data_to_archive, conn=conn)
    
        schedule.every(0.1).minutes.do(archiving_job_fixed_query)
    
        while True:
            schedule.run_pending()