I'm trying to write some data to cassandra, a connection is established and this is my database schema :
CREATE TABLE IF NOT EXISTS road_traffic (
road_id INT,
timestamp TIMESTAMP,
radar_id INT,
vehicles MAP<INT, FROZEN<UDTVehicle>>,
PRIMARY KEY (road_id, timestamp) // partition key and clustering key
);
CREATE TYPE IF NOT EXISTS UDTVehicle (
num_vehicles INT,
speed LIST<FLOAT>
);
and this is my python code :
def write_to_cassandra(self,session, record):
insert_query = """
INSERT INTO road_traffic (road_id, timestamp, radar_id, vehicles)
VALUES (?, ?, ?, ?);
"""
prepared_insert = session.prepare(insert_query)
batch = BatchStatement()
batch_size = 0
batches= 0
print(record)
if not all(record.get(field) is not None for field in list(record.keys())) :
logger.warning("Record is missing a required fields")
return
else :
vehicles_map = {record.get('road_id'): record.get('Vehicles')}
print(record.get('road_id'))
batch.add(prepared_insert, (record.get('road_id'), record.get('timestamp'), record.get('radar_id'), vehicles_map))
batch_size += 1
As far as I understand the vehicles_map must be a key(which is the partition key) and a value(my UDTVehicle object). I get this when I print the record :
{'timestamp': datetime.datetime(2022, 1, 1, 8, 0, 30), 'num_Vehicles': 7, 'road_id': 9, 'radar_id': 11, 'Vehicles': {'num_Vehicles': 7, 'speed': [72.78, 62.67, 85.15, 75.51, 83.95, 76.39, 57.92]}}
The error I get is as follows :
Traceback (most recent call last):
File "cassandra_kafka/ingest_cassandra.py", line 135, in <module>
Data_ingest.consume_store()
File "cassandra_kafka/ingest_cassandra.py", line 113, in consume_store
self.write_to_cassandra(session, value)
File "cassandra_kafka/ingest_cassandra.py", line 66, in write_to_cassandra
batch.add(prepared_insert, (record.get('road_id'), record.get('timestamp'), record.get('radar_id'), vehicles_map))
File "cassandra/query.py", line 827, in cassandra.query.BatchStatement.add
File "cassandra/query.py", line 506, in cassandra.query.PreparedStatement.bind
File "cassandra/query.py", line 636, in cassandra.query.BoundStatement.bind
File "cassandra/cqltypes.py", line 799, in cassandra.cqltypes._ParameterizedType.serialize
File "cassandra/cqltypes.py", line 909, in cassandra.cqltypes.MapType.serialize_safe
File "cassandra/cqltypes.py", line 324, in cassandra.cqltypes._CassandraType.to_binary
File "cassandra/cqltypes.py", line 799, in cassandra.cqltypes._ParameterizedType.serialize
File "cassandra/cqltypes.py", line 1030, in cassandra.cqltypes.UserType.serialize_safe
KeyError: 0
What am I doing wrong here ? any help would be appreciated ^^.
the proper way to insert rows containing UDTs in the Python driver, with prepared statements, is to use a simple class with the same structure as the UDT and use it when creating the values for the insert statement.
I have prepared a simple code demonstrating what I mean: check the UdtVehicle
class and how it is instantiated when creating the arguments to the insert statement in the batch.
The sample code goes on to demonstrate a few other things, depending on the passed command-line arg. read
shows what you get when "just" reading the rows as they are, read_udt
shows how you can register a UDT with your Cluster and have the returned rows nicely cast into your Python class, insert
is a sanity check for a single-row (=non-batch) prepared insertion statement (with the UDT properly handled as explained above), and insertb
exemplifies use of the previous case within a batch.
For more information on handling UDT, please check this page: https://docs.datastax.com/en/developer/python-driver/latest/user_defined_types . Note that for unprepared statement you would need a slightly different approach (anyway you probably want to use prepared statement in a production application).
Looking at your code above, you might want to cast the vehicles into their UDT right within the function you posted, (depending on what exactly you are receiving through the Kafka stream). Just be aware of the fact that a batch will not be executed until you explictly invoke session.execute(batch)
, which is not shown in your code.
Another couple of remarks for your awareness:
self.prepared_statement
is a natural candidate) and then just use it for greater performancenum_vehicles == len(speed)
. If that should be enforced, probably a different model would be better (but then again this depends on your use case)road_id
values), then probably a single batch is to be avoided. Read more here: https://batey.info/cassandra-anti-pattern-cassandra-logged.html, https://www.batey.info/cassandra-anti-pattern-misuse-of.html .And now the sample code you can start from. (Tested on Cassandra 4.1)
import sys
import datetime
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import SimpleStatement, BatchStatement
class UdtVehicle():
def __init__(self, num_vehicles, speed):
self.num_vehicles = num_vehicles
self.speed = speed
def __repr__(self):
return f"UdtVehicle[{self.num_vehicles} vehicles, speeds={', '.join('%.2f' % sp for sp in self.speed)}]"
if __name__ == '__main__':
cluster = Cluster(
["CONTACT_POINT"],
auth_provider=PlainTextAuthProvider(
"USERNAME",
"PASSWORD",
),
)
session = cluster.connect("KEYSPACE_NAME")
cmd = sys.argv[1] if len(sys.argv) > 1 else "read"
if cmd == "read":
for r in session.execute("select * from road_traffic;"):
print(str(r))
print('-'*20)
_one_udt = list(r.vehicles.values())[0]
print(type(_one_udt))
print(str(_one_udt))
print('='*20)
elif cmd == "read_udt":
cluster.register_user_type("KEYSPACE_NAME", "udtvehicle", UdtVehicle)
for r in session.execute("select * from road_traffic;"):
print(str(r))
print('-'*20)
_one_udt = list(r.vehicles.values())[0]
print(type(_one_udt))
print(str(_one_udt))
print('='*20)
elif cmd == "insert":
insertion_prepared = session.prepare("INSERT INTO road_traffic (road_id, timestamp, radar_id, vehicles) VALUES (?, ?, ?, ?);")
road_id = 123
timestamp = datetime.datetime.now()
radar_id = 456
vehicles = {
10: UdtVehicle(
num_vehicles=1,
speed=[100.1, 100.2],
),
999: UdtVehicle(
num_vehicles=100,
speed=[],
),
11: UdtVehicle(
num_vehicles=3,
speed=[0.1, 0.2, 0.3],
),
}
result = session.execute(insertion_prepared, (road_id, timestamp, radar_id, vehicles))
elif cmd == "insertb":
insertion_prepared = session.prepare("INSERT INTO road_traffic (road_id, timestamp, radar_id, vehicles) VALUES (?, ?, ?, ?);")
batch = BatchStatement()
# as per best practices, this batch will be a single-partition batch!
road_id = 100
t0 = datetime.datetime.now()
for row_id in range(3):
timestamp = t0 + datetime.timedelta(hours=row_id)
radar_id = 1000 + row_id
vehicles = {
(1000+row_id+3): UdtVehicle(
num_vehicles=row_id+30,
speed=[10.01] * (1+row_id),
)
}
batch.add(insertion_prepared, (road_id, timestamp, radar_id, vehicles))
# run the batch
session.execute(batch)
else:
print("Unknown command '%s'" % cmd)