I extend the RethinkDb API by providing some extra functions.
For example I simplify the expression
site_ids = r.table('periods')\
['regions']\
.concat_map(lambda row: row['sites'])\
['id']
to
site_ids = f['periods']\
.unwind('regions.sites.id')
using a custom unwind
method that is able to resolve a path of nested document elements. If an item in the given path is a list, its entries are concatenated with concat_map
. Otherwise the item is accessed with bracket
notation:
def unwind(self, path):
items = path.split('.')
cursor = self._cursor
for item in items:
is_list = isinstance(cursor[item].run().next(), list)
if is_list:
cursor = cursor.concat_map(lambda row: row[item])
else:
cursor = cursor[item]
return self.wrap(self._f, cursor)
=> How can I improve the type check to find out if an element is a list? The check should not require an extra .run()
and it should work in main queries as well as in sub queries.
My current implementation with the expression
is_list = isinstance(cursor[item].run().next(), list)
works fine in "main queries" like
result = f['periods'] \
.unwind('regions.sites.plants.product.process.technologies')\
.populate_with('periods', 'technologies')\
.sum('specific_cost_per_year') \
.run()
It does not work in sub queries, e.g. inside a mapping function:
def period_mapper(period):
return {
'year': period['start'],
'site_ids': f.wrap(period).unwind('regions.sites.id')
}
f.table('periods')\
.map(period_mapper)\
.run()
I get the error
rethinkdb.errors.ReqlServerCompileError: Variable name not found in:
var_1['regions']
^^^^^
because I am not able to .run()
a query on the passed variable argument "period".
I tried to replace the if-then-else condition with r.branch
but that did not help.
=> How can I choose an operator based on the type of the current cursor content in a better way?
Code of my selection class that wraps a RethinkDb cursor:
from rethinkdb.ast import RqlQuery
# needs to inherit from RqlQuery for the json serialization to work
class AbstractSelection(RqlQuery):
def __init__(self, f, cursor):
self._f = f
self._cursor = cursor
def __getitem__(self, identifier):
cursor = self._cursor[identifier]
return self.wrap(self._f, cursor)
def __repr__(self):
return self._cursor.__repr__()
def __str__(self):
return self._cursor.__str__()
def build(self):
return self._cursor.build()
@property
def _args(self): # required for json serialization
return self._cursor._args
@property
def optargs(self): # required for json serialization
return self._cursor.optargs
def wrap(self, r, cursor):
raise NotImplemented('Needs to be implemented by inheriting class')
def unwind(self, path):
items = path.split('.')
cursor = self._cursor
for item in items:
is_list = isinstance(cursor[item].run().next(), list)
if is_list:
cursor = cursor.concat_map(lambda row: row[item])
else:
cursor = cursor[item]
return self.wrap(self._f, cursor)
def pick(self, path, query):
return self.unwind(path).get(query)
def populate(self, collection_name, path):
return self.map(lambda identifier:
self._f[collection_name]
.pick(path, {'id': identifier})
)
def get(self, query):
cursor = self._cursor.filter(query)[0]
return self.wrap(self._f, cursor)
def to_array(self):
return [item for item in self._cursor]
I managed to use type_of
in combination with branch
. Accessing the item with bracket notation returns a STREAM
and I had to get the first item with [0] before using type_of
to check for the 'ARRAY' type. This also works if the property is not an array:
def unwind(self, path):
items = path.split('.')
cursor = self._cursor
r = self._f._r
for item in items:
cursor = r.branch(
cursor[item][0].type_of() == 'ARRAY',
cursor.concat_map(lambda row: row[item]),
cursor[item]
)
return self.wrap(self._f, cursor)