With Row Level security (RLS) you manage the access control at the row level within a database instead of the application. Row-Level Security allows you to define policies that determine which rows of data a particular user or role can access within a given table.

Postgres Tables

For this demonstration we create a simple setup with a User table and a Item table using SQLAlchemy 2.0:

from sqlalchemy import Column, ForeignKey, Integer, String, create_engine, text
from sqlalchemy.orm import declarative_base, relationship

admin_engine = create_engine("postgresql://postgres:postgres@0.0.0.0:5432/postgres")
Base = declarative_base()


class Item(Base):
    __tablename__ = "items"
    id = Column(Integer, primary_key=True)
    name = Column(String)
    user_id = Column(Integer, ForeignKey("users.id"))
    user = relationship("User", back_populates="item_entries")


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True)
    username = Column(String)
    password = Column(String)
    item_entries = relationship("Item", back_populates="user")


Base.metadata.create_all(admin_engine)

Using the PostgreSQL superuser for application access is not a great idea due to its extensive privileges and security risks. It’s advisable to create a dedicated user with limited permissions tailored to the application’s requirements for improved security and operational control.

with admin_engine.connect() as conn:
    query="""
    CREATE USER app_db_write WITH PASSWORD 'V11Xn3#c3$4r';
    GRANT CONNECT ON DATABASE energysales TO app_db_write;
    GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA public TO app_db_write;
    GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public TO app_db_write;
    """
    conn.execute(text(query))
    conn.commit()

Enable Row-Level Security and define Policies

The below method creates a policy and enables RLS. The policy restricts the rows that users can see from the items table based on the value of the user_id column and the value of the app.current_user_id parameter set for the current session. Only rows where the user_id matches the app.current_user_id will be visible to users when they perform a SELECT operation on the items table.

def define_security_policies(engine):
    # Policy function for filtering rows based on user_id
    policy_function = text(
        """
    CREATE POLICY items_select_policy ON items
    FOR SELECT
    USING (user_id = current_setting('app.current_user_id')::integer);
    """
    )

    # Set the default row-level security policy
    default_policy = text(
        """
    ALTER TABLE items ENABLE ROW LEVEL SECURITY;
    """
    )

    # Obtain a connection from the engine
    with engine.connect() as conn:
        # Execute DDL statements
        conn.execute(policy_function)
        conn.execute(default_policy)
        conn.commit()


define_security_policies(admin_engine)

Create some data

We create two User’s with some Items:

from sqlalchemy.orm import sessionmaker

Session = sessionmaker(bind=admin_engine)
with Session() as session:
    user1 = User(username="user1", password="password1")
    user2 = User(username="user2", password="password2")
    session.add_all([user1, user2])
    session.commit()
    item1 = Item(name="Item 1", user_id=user1.id)
    item2 = Item(name="Item 2", user_id=user1.id)
    item3 = Item(name="Item 3", user_id=user1.id)
    item4 = Item(name="Item 4", user_id=user2.id)
    item5 = Item(name="Item 5", user_id=user2.id)
    session.add_all([item1, item2, item3, item4, item5])
    session.commit()

Create a session maker

This method returns an async session where the user_id is set:

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.ext.asyncio import AsyncSession
from contextlib import asynccontextmanager

@asynccontextmanager
async def get_session(engine, user_id) -> AsyncSession:
    async_session = sessionmaker(
        engine, class_=AsyncSession
    )
    async with async_session() as session:
        try:
            await session.execute(
                text(f"SET app.current_user_id = {int(user_id)}")
            )
            yield session
        except:
            await session.rollback()
            raise
        finally:
            await session.execute(text("RESET ROLE;"))
            await session.commit()
            await session.close()
            pass

app_engine = create_async_engine("postgresql+asyncpg://app_db_write:V11Xn3#c3$4r@0.0.0.0:5432/postgres")

Now finally the RLS can be demonstrated:

async with get_session(app_engine, 0) as session:
    result = await session.execute(text("SELECT count(*) FROM items"))
    print('count:', result.scalars().one())

count: 0

async with get_session(app_engine, 1) as session:
    result = await session.execute(text("SELECT count(*) FROM items"))
    print('count:', result.scalars().one())

count: 3

async with get_session(app_engine, 2) as session:
    result = await session.execute(text("SELECT count(*) FROM items"))
    print('count:', result.scalars().one())

count: 2

Drop all

Base.metadata.drop_all(admin_engine)
with admin_engine.connect() as conn:
    query = """
    DROP OWNED BY app_db_write;
    DROP ROLE app_db_write;
    """
    conn.execute(text(query))
    conn.commit()