python-3.xcassandrakafka-consumer-apicassandra-driver

Syntax error in CQL query when trying to write to cassandra from python


So, I am building an application in python that takes data from twitter and then saves it to cassandra. My current problems lies in a script which reads data from kafka and tries to write it to cassandra, as follows:

import threading, logging, time
import multiprocessing
from cassandra.cluster import Cluster

from kafka import KafkaConsumer, KafkaProducer




class Consumer(multiprocessing.Process):
   def __init__(self):
        multiprocessing.Process.__init__(self)
        self.stop_event = multiprocessing.Event()

    def stop(self):
         self.stop_event.set()

    def run(self):
       consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 auto_offset_reset='earliest',
                                 consumer_timeout_ms=1000)
        consumer.subscribe(['twitter'])



    while not self.stop_event.is_set():
        for message in consumer:
            # session.execute(
            #     """
            #     INSERT INTO mensaje_73 (tweet)
            #     VALUES (message)
            #     """
            # )
            print(message)
            cluster = Cluster()
            session = cluster.connect('twitter')
            session.execute(
                    """
                    INSERT INTO mensaje_73 (tweet)
                    VALUES (message)
                    """
                )

            # if self.stop_event.is_set():
            #     break

    consumer.close()


   def main():

    tasks = [
        Consumer()
    ]

    for t in tasks:
        t.start()

    time.sleep(10)

    for task in tasks:
        task.stop()



if __name__ == "__main__":
     logging.basicConfig(
        format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:% 
   (levelname)s:%(process)d:%(message)s',
        level=logging.INFO
    )
    main()

I have tried to insert test messages into table twitter.mensaje_73 and it has worked perfectly, as here:

import threading, logging, time
import multiprocessing
from cassandra.cluster import Cluster

from kafka import KafkaConsumer, KafkaProducer


cluster = Cluster()
session = cluster.connect('twitter')
session.execute(
    """
    INSERT INTO mensaje_73 (tweet)
    VALUES ('helooo')
    """
)

Any help would be deeply appreciated :)


Solution

  • So the problem here, is that your message variable is being treated as a literal in CQL, which won't work without single quotes. Hence, the error.

    To fix this, I would go the route of using a prepared statement, and then bind message to it:

    session = cluster.connect('twitter')
    preparedTweetInsert = session.prepare(
            """
            INSERT INTO mensaje_73 (tweet)
            VALUES (?)
            """
        )
    session.execute(preparedTweetInsert,[message])
    

    Give that a try, and see if it helps.

    Also, this seems like a simple data model. But one thing to ask yourself, is how are you going to query this data? This wouldn't work unless tweet was your only PRIMARY KEY. Which also means that the only way you can query an individual tweet, is by the exact text of the message. Something to think about, but partitioning it by day might be a better option as it will distribute well and provide a much better query model.