With Row Level security (RLS) you manage the access control at the row level within a database instead of the application. Row-Level Security allows you to define policies that determine which rows of data a particular user or role can access within a given table.
Postgres Tables
For this demonstration we create a simple setup with a User
table and a Item
table using SQLAlchemy 2.0:
from sqlalchemy import Column, ForeignKey, Integer, String, create_engine, text
from sqlalchemy.orm import declarative_base, relationship
admin_engine = create_engine("postgresql://postgres:postgres@0.0.0.0:5432/postgres")
Base = declarative_base()
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True)
name = Column(String)
user_id = Column(Integer, ForeignKey("users.id"))
user = relationship("User", back_populates="item_entries")
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String)
password = Column(String)
item_entries = relationship("Item", back_populates="user")
Base.metadata.create_all(admin_engine)
Using the PostgreSQL superuser for application access is not a great idea due to its extensive privileges and security risks. It’s advisable to create a dedicated user with limited permissions tailored to the application’s requirements for improved security and operational control.
with admin_engine.connect() as conn:
query="""
CREATE USER app_db_write WITH PASSWORD 'V11Xn3#c3$4r';
GRANT CONNECT ON DATABASE energysales TO app_db_write;
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA public TO app_db_write;
GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public TO app_db_write;
"""
conn.execute(text(query))
conn.commit()
Enable Row-Level Security and define Policies
The below method creates a policy and enables RLS. The policy restricts the rows that users can see from the items
table based on the value of the user_id
column and the value of the app.current_user_id
parameter set for the current session. Only rows where the user_id
matches the app.current_user_id
will be visible to users when they perform a SELECT
operation on the items
table.
def define_security_policies(engine):
# Policy function for filtering rows based on user_id
policy_function = text(
"""
CREATE POLICY items_select_policy ON items
FOR SELECT
USING (user_id = current_setting('app.current_user_id')::integer);
"""
)
# Set the default row-level security policy
default_policy = text(
"""
ALTER TABLE items ENABLE ROW LEVEL SECURITY;
"""
)
# Obtain a connection from the engine
with engine.connect() as conn:
# Execute DDL statements
conn.execute(policy_function)
conn.execute(default_policy)
conn.commit()
define_security_policies(admin_engine)
Create some data
We create two User’s with some Items:
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=admin_engine)
with Session() as session:
user1 = User(username="user1", password="password1")
user2 = User(username="user2", password="password2")
session.add_all([user1, user2])
session.commit()
item1 = Item(name="Item 1", user_id=user1.id)
item2 = Item(name="Item 2", user_id=user1.id)
item3 = Item(name="Item 3", user_id=user1.id)
item4 = Item(name="Item 4", user_id=user2.id)
item5 = Item(name="Item 5", user_id=user2.id)
session.add_all([item1, item2, item3, item4, item5])
session.commit()
Create a session maker
This method returns an async session where the user_id
is set:
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.ext.asyncio import AsyncSession
from contextlib import asynccontextmanager
@asynccontextmanager
async def get_session(engine, user_id) -> AsyncSession:
async_session = sessionmaker(
engine, class_=AsyncSession
)
async with async_session() as session:
try:
await session.execute(
text(f"SET app.current_user_id = {int(user_id)}")
)
yield session
except:
await session.rollback()
raise
finally:
await session.execute(text("RESET ROLE;"))
await session.commit()
await session.close()
pass
app_engine = create_async_engine("postgresql+asyncpg://app_db_write:V11Xn3#c3$4r@0.0.0.0:5432/postgres")
Now finally the RLS can be demonstrated:
async with get_session(app_engine, 0) as session:
result = await session.execute(text("SELECT count(*) FROM items"))
print('count:', result.scalars().one())
count: 0
async with get_session(app_engine, 1) as session:
result = await session.execute(text("SELECT count(*) FROM items"))
print('count:', result.scalars().one())
count: 3
async with get_session(app_engine, 2) as session:
result = await session.execute(text("SELECT count(*) FROM items"))
print('count:', result.scalars().one())
count: 2
Drop all
Base.metadata.drop_all(admin_engine)
with admin_engine.connect() as conn:
query = """
DROP OWNED BY app_db_write;
DROP ROLE app_db_write;
"""
conn.execute(text(query))
conn.commit()