I use class model for each table with method for every sql operation with SQLAlchemy.
I install pgp_sym_encrypt and pgp_sym_decrypt with the extension of PostgreSQL
CREATE EXTENSION IF NOT EXISTS pgcrypto WITH SCHEMA my_database;
After with the help of SQLAlchemy documentation I use the my class User, I encrypt the data with the help of pgp_sym_encrypt, the function of the package pgcrypto of PostgreSQL (I use the version 13.4)
For security, I encrypt the data of the user iI would like to use the native pgp_sym_encrypt and pgp_sym_decrypt native function of postgreSQL. Before I used plain text query to do that with psycopg2. Since I use the SQLAlchemy ORM, through it Flask-SQLALchemy extension.
I found the solution with : https://docs.sqlalchemy.org/en/14/core/custom_types.html#applying-sql-level-bind-result-processing
But None of them show example to use them within a query
Actually my code is as following :
import uuid
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import Column, String, DateTime, func, type_coerce, TypeDecorator
from sqlalchemy.dialects.postgresql import UUID, BYTEA
from instance.config import ENCRYPTION_KEY
db = SQLAlchemy()
class PGPString(TypeDecorator):
impl = BYTEA
cache_ok = True
def __init__(self, passphrase):
super(PGPString, self).__init__()
self.passphrase = passphrase
def bind_expression(self, bindvalue):
# convert the bind's type from PGPString to
# String, so that it's passed to psycopg2 as is without
# a dbapi.Binary wrapper
bindvalue = type_coerce(bindvalue, String)
return func.pgp_sym_encrypt(bindvalue, self.passphrase)
def column_expression(self, col):
return func.pgp_sym_decrypt(col, self.passphrase)
class User(db.Model):
__tablename__ = "user"
__table_args__ = {'schema': 'private'}
id = Column('id', UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
sexe = Column("sexe", String)
username = Column("username", PGPString(ENCRYPTION_KEY), unique=True)
firstname = Column("firstname", PGPString(ENCRYPTION_KEY), unique=True)
lastname = Column("lastname", PGPString(ENCRYPTION_KEY), unique=True)
email = Column("email", PGPString(ENCRYPTION_KEY), unique=True)
password = Column("password", PGPString(ENCRYPTION_KEY), unique=True)
registration_date = Column("registration_date", DateTime)
validation_date = Column("validation_date", DateTime)
role = Column("role", String)
@classmethod
def create_user(cls, **kw):
"""
Create an User
"""
obj = cls(**kw)
db.session.add(obj)
db.session.commit()
Everything worked fine, when I use my create_user method, the user data have his data encrypted.
But if in my class I add a method to check if the user exist, by checking if the username, email, or other data by this method :
class User(db.Model)
....
@classmethod
def check_user_exist(cls, **kwargs):
"""
Check if user exist
:param kwargs:
:return:
"""
exists = db.session.query(cls).with_entities(cls.username).filter_by(**kwargs).first() is not None
return exists
this will produce the bellow query :
SELECT private."user".id AS private_user_id,
private."user".sexe AS private_user_sexe,
pgp_sym_decrypt(private."user".username, ENCRYPTION_KEY) AS private_user_username,
pgp_sym_decrypt(private."user".firstname, ENCRYPTION_KEY) AS private_user_firstname,
pgp_sym_decrypt(private."user".lastname, ENCRYPTION_KEY) AS private_user_lastname,
pgp_sym_decrypt(private."user".email, ENCRYPTION_KEY) AS private_user_email,
pgp_sym_decrypt(private."user".password, ENCRYPTION_KEY) AS private_user_password,
private."user".registration_date AS private_user_registration_date,
private."user".validation_date AS private_user_validation_date,
private."user".role AS private_user_role
FROM private."user";
WHERE private."user".username = pgp_sym_encrypt(username, ENCRYPTION_KEY)
AND private."user".email = pgp_sym_encrypt(email, ENCRYPTION_KEY)
AND private."user".password = pgp_sym_encrypt(password, ENCRYPTION_KEY);
This query return None
because when we use pgp_sym_encrypt with the actual ENCRYPTION_KEY, the data will be encrypted in a new way.
https://www.postgresql.org/docs/13/pgcrypto.html#id-1.11.7.34.8 https://en.wikipedia.org/wiki/Pretty_Good_Privacy#/media/File:PGP_diagram.svg
So how to modify the SQLAlchemy documentation function PGPString(TypeDecorator):
for using pgp_sym_decrypt to have the columns decrypted and not the variable encrypted as the query below:
SELECT private."user".id AS private_user_id,
private."user".sexe AS private_user_sexe,
pgp_sym_decrypt(private."user".username, ENCRYPTION_KEY) AS private_user_username,
pgp_sym_decrypt(private."user".firstname, ENCRYPTION_KEY) AS private_user_firstname,
pgp_sym_decrypt(private."user".lastname, ENCRYPTION_KEY) AS private_user_lastname,
pgp_sym_decrypt(private."user".email, ENCRYPTION_KEY) AS private_user_email,
pgp_sym_decrypt(private."user".password, ENCRYPTION_KEY) AS private_user_password,
private."user".registration_date AS private_user_registration_date,
private."user".validation_date AS private_user_validation_date,
private."user".role AS private_user_role
FROM private."user"
WHERE
pgp_sym_decrypt(private."user".username, ENCRYPTION_KEY) = 'username'
AND pgp_sym_decrypt(private."user".email, ENCRYPTION_KEY) = 'email'
AND pgp_sym_decrypt(private."user".password, ENCRYPTION_KEY) = 'password';
Usually I do well with Python, and SQLAlchemy for the moment but this is really beyond my skills, and I don't really know how to modify this function since it affects functionality that is completely unknown to me.
for the moment Iuse the Text function, but this is uggly
for example for an another method to get the id from the username (encrypted data)
@classmethod
def get_id(cls, username):
query = """
SELECT private.user.id
FROM private."user"
WHERE pgp_sym_decrypt(private."user".username, :ENCRYPTION_KEY) = :username;
"""
q = db.session.query(cls).from_statement(text(query)).params(ENCRYPTION_KEY=ENCRYPTION_KEY,
username=username).first()
return q.id
Thanks a lot for your help ! Best Regards
Solved by adding :
class PGPString(TypeDecorator):
(...)
def pgp_sym_decrypt(self, col):
return func.pgp_sym_decrypt(col, self.passphrase, type_=String)
to PGPString and use like :
class User(db.session):
(...)
@classmethod
def check_user_exist(cls, **kw):
exists = db.session.query(cls).with_entities(cls.username)\
.filter(func.pgp_sym_decrypt(User.username, ENCRYPTION_KEY, type_=String) == kw['username'])\
.filter(func.pgp_sym_decrypt(User.email, ENCRYPTION_KEY, type_=String) == kw['email'])\
.filter(func.pgp_sym_decrypt(User.password, ENCRYPTION_KEY, type_=String) == kw['password'])\
.first() is not None
return exist
thanks to sqlalchemy team: https://github.com/sqlalchemy/sqlalchemy/discussions/7268