I'm trying to write a 'controller' program for a RethinkDB database which continuously dumps to JSON and deletes data which is older than 3 days, using RethinkDB's changefeed feature.
The problem is that the query 'hangs' from the current time, which is evaluated using datetime.utcnow()
(or, alternatively, rethinkdb.now()
) at the time the query is defined, remaining fixed thereafter. So as the changefeed progresses, the query becomes 'outdated'.
How can I make a query which is continuously 'updated' to reflect the current time?
To illustrate the problem, here is the script so far:
import json
import rethinkdb as r
import pytz
from datetime import datetime, timedelta
# The database and table are assumed to have been previously created
database_name = "sensor_db"
table_name = "sensor_data"
table = r.db(database_name).table(table_name)
port_offset = 1 # To avoid interference of this testing program with the main program, all ports are initialized at an offset of 1 from the default ports using "rethinkdb --port_offset 1" at the command line.
conn = r.connect("localhost", 28015 + port_offset)
current_time = datetime.utcnow().replace(tzinfo=pytz.utc) # Current time including timezone (assumed to be UTC)
retention_period = timedelta(days=3) # Period of time during which data is retained on the main server
expiry_time = current_time - retention_period # Age at which data is removed from the main database
if "timestamp" in table.index_list().run(conn): # Assuming the table has "timestamp" as a secondary index, use "between" (for improved speed)
beginning_of_time = r.time(1400, 1, 1, 'Z') # The minimum time of a ReQL time object (the year 1400)
data_to_archive = table.between(beginning_of_time, expiry_time, index="timestamp")
else: # Else, use "filter" (requires more memory, but does not require "timestamp" to be a secondary index)
data_to_archive = table.filter(r.row['timestamp'] < expiry_time)
output_file = "archived_sensor_data.json"
with open(output_file, 'a') as f:
for change in data_to_archive.changes().run(conn, time_format="raw"): # The time_format="raw" option is passed to prevent a "RqlTzinfo object is not JSON serializable" error when dumping
if change['new_val'] is not None: # If the change is not a deletion
print change
json.dump(change['new_val'], f) # Since the main database we are reading from is append-only, the 'old_val' of the change is always None and we are interested in the 'new_val' only
f.write("\n") # Separate entries by a new line
ID_to_delete = change['new_val']['id'] # Get the ID of the data to be deleted from the database
r.db(database_name).table(table_name).get(ID_to_delete).delete().run(conn)
The query is stored in the variable data_to_archive
. However, the time interval in the between
statement is based on the utcnow()
when the current_time
variable is defined, and is not continuously updated in the changefeed. How could I make it so?
I finally worked around the problem by doing the dumps in 'batch' mode rather than continuously using changes()
. (To wit, I'm using the schedule module).
Here is the script:
import json
import rethinkdb as r
import pytz
from datetime import datetime, timedelta
import schedule
import time
import functools
def generate_archiving_query(retention_period=timedelta(days=3), database_name="ipercron", table_name="sensor_data", conn=None):
if conn is None:
conn = r.connect("localhost", 28015)
table = r.db(database_name).table(table_name) # RethinkDB cursor for the table of interest
current_time = r.now()
expiry_time = current_time - retention_period.total_seconds()
if "timestamp" in table.index_list().run(conn): # If the table has "timestamp" as a secondary index, use "between" (for improved speed)
beginning_of_time = r.time(1400, 1, 1, 'Z') # The minimum time of a ReQL time object (the year 1400)
data_to_archive = table.between(beginning_of_time, expiry_time, index="timestamp")
else: # Else, use "filter" (requires more memory, but does not require "timestamp" to be a secondary index)
data_to_archive = table.filter(r.row['timestamp'] < expiry_time)
# try:
# beginning_of_time = r.time(1400, 1, 1, 'Z') # The minimum time of a ReQL time object (the year 1400)
# data_to_archive = table.between(beginning_of_time, expiry_time, index="timestamp")
# except:
# data_to_archive = table.filter(r.row['timestamp'] < expiry_time)
return data_to_archive
def archiving_job(data_to_archive=None, output_file="archived_sensor_data.json", database_name="ipercron", table_name="sensor_data", conn=None):
if data_to_archive is None:
data_to_archive = generate_archiving_query()
if conn is None:
conn = r.connect("localhost", 28015)
table = r.db(database_name).table(table_name)
old_data = data_to_archive.run(conn, time_format="raw") # Without time_format="raw" the output does not dump to JSON
with open(output_file, 'a') as f:
ids_to_delete = []
for item in old_data:
print item
json.dump(item, f)
f.write('\n') # Separate each document by a new line
ids_to_delete.append(item['id'])
# table.get(item['id']).delete().run(conn)
table.get_all(r.args(ids_to_delete)).delete().run(conn)
if __name__ == "__main__":
# The database and table are assumed to have been previously created
database_name = "ipercron"
table_name = "sensor_data"
# table = r.db(database_name).table(table_name)
port_offset = 1 # To avoid interference of this testing program with the main program, all ports are initialized at an offset of 1 from the default ports using "rethinkdb --port_offset 1" at the command line.
conn = r.connect("localhost", 28015 + port_offset)
clean_slate = True
if clean_slate:
r.db(database_name).table(table_name).delete().run(conn) # For testing, start with an empty table and add a fixed amount of data
import rethinkdb_add_data
data_to_archive = generate_archiving_query(conn=conn, database_name=database_name, table_name=table_name) # Because r.now() is evaluated upon run(), the query needs only to be generated once
archiving_job_fixed_query = functools.partial(archiving_job, data_to_archive=data_to_archive, conn=conn)
schedule.every(0.1).minutes.do(archiving_job_fixed_query)
while True:
schedule.run_pending()