← Blog Home

Confused by SqlAlchemy’s DBSession in Pyramid?

Leave a reply.

sqlalchemylogo
I was, I wanted to know:

  1. What is the DBSession anyway?
  2. How does that work in my unit tests?
  3. If I want to use my webapp code outside of the webapp (in a background process) how do I do that?

Let’s see if we can answer those questions.


helloengine.py

# sqlalchemy imports
from sqlalchemy import engine_from_config

def MakeEngine():
   # vars for the connection url
   username = 'psqluser'
   password = 'psqlpass'
   host = 'localhost'
   port = '5432'
   database = 'example'

   # specify the url (and other settings if we had them)
   settings = {
         'url': 'postgres://%s:%s@%s:%s/%s' % (username, password, host, port, database)
         }

   # actually create the engine with the sqlalchemy factory method
   return engine_from_config(settings, prefix='')

# http://docs.sqlalchemy.org/ru/latest/core/connections.html
# http://docs.sqlalchemy.org/ru/latest/core/connections.html#connection-engine-api
# the engine encapsulates connections to the db, it has a connect method which
# will give you a proxy to an underlying dbapi connection which the engine is
# looking after in a connection pool for you.
engine = MakeEngine()

# what can we do with an engine?
# we can get a connection and execute sql
conn = engine.connect()
selectTableSql = "select * from pg_tables where schemaname='public';"
for r in conn.execute(selectTableSql):
   print r

createTableSql = "create table woo(id int primary key, name varchar(200), value varchar(200));"
conn.execute(createTableSql)

for r in conn.execute(selectTableSql):
   print r

dropTableSql = "drop table woo;"
conn.execute(dropTableSql)

conn.close()

So far so good. But we want to use sqlachemy’s nice ORM.

# sqlalchemy imports
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import Boolean
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

# local import
from helloengine import MakeEngine

# To use the ORM we will find we'll need:
# * subclasses of declarative base for our models
# * a session to query for those models and persist them

# sqlalchemy magic to create a base class for our orm models, giving them
# the power to persist themselves in the db.
# http://docs.sqlalchemy.org/ru/latest/orm/tutorial.html#declare-a-mapping
Base = declarative_base()

# a couple of Base subclasses with a relation
class ParkingGarage(Base):
   __tablename__ = 'parking_garage'
   id = Column(Integer, primary_key=True)
   numSpots = Column(Integer)

class ParkingSpace(Base):
   __tablename__ = 'parking_space'
   id = Column(Integer, primary_key=True)
   is_occupied = Column(Boolean, default=False)
   garage_id = Column(Integer, ForeignKey('parking_garage.id'))

   garage = relationship(ParkingGarage, backref='spaces')

# make an engine as before
engine = MakeEngine()

# if this is not the first time we've run this script, drop our
# parking_garage and parking_space tables.
Base.metadata.drop_all(engine)

# so if you look in the database now:
#example=> select schemaname, tablename from pg_tables where schemaname='public';
# schemaname | tablename
#------------+-----------
#(0 rows)

# a handy function to create the tables for all our classes in the db.
Base.metadata.create_all(engine)

# if you look in the database now:
#example=> select schemaname, tablename from pg_tables where schemaname='public';
# schemaname |   tablename
#------------+----------------
# public     | parking_garage
# public     | parking_space
#(2 rows)

# create some instances of our orm classes.
pg = ParkingGarage(
        id=1,
        numSpots=2034,
        )

ps1 = ParkingSpace(
        id=1,
        garage=pg,
        )

ps2 = ParkingSpace(
        id=2,
        garage=pg,
        )

# ok, now we'd like them to persist
# themselves. sqlalchemy wants us to have a session to do this with. What's a
# session? a session is a staging ground for our objects where they'll pretend
# like they're persisted to a certain extent, but they won't actually be put
# into the db until we call session.commit(), for instance we can add an object
# and query for it without it ever hitting the db.

# first we'll create an instance of the sessionmaker factory
createSession = sessionmaker(bind=engine)
# then make a session
session = createSession()

# now we can add things to the session
session.add_all([pg, ps1, ps2])

# and we can query for them as though they were in the db
print 'num parking spaces in session:', session.query(ParkingSpace).count()
# num parking spaces in session: 2

# but they're not yet
s2 = createSession()
print 'num parking spaces in s2:', s2.query(ParkingSpace).count()
# num parking spaces in s2: 0

# after we commit the session they are
session.commit()
print 'num parking spaces in s2:', s2.query(ParkingSpace).count()
# num parking spaces in s2: 2

# yay!

So far so sqlalchemy. Let’s plug it in to pyramid. If we start a new pyramid app like:

pcreate -s alchemy parkingmanager3000

the result will have an app module: in this case in the directory parkingmanager3000/parkingmanager3000/. In that module’s __init__ there will be a main(globalconfig, **settings) method that’s called once on app start up by the wsgi server. In that main method the engine is created. The main app module will have a submodule called models that will create a session factory on import. main() will import this and bind the engine it creates to the session factory.
That is:

  1. models creates a session factory when it’s imported by main
  2. main() creates the engine on app startup.
  3. main() configures the session factory to be bound to the engine it has created

Stripping it down to just what we need for this example parkingmanager3000/ has files __init__.py, models.py, and views.py

__init__.py

from pyramid.config import Configurator
from sqlalchemy import engine_from_config

from .models import DBSession

def main(global_config, **settings):
    # here we create the engine and bind it to the (not really a) session factory called
    # DBSession.
    engine = engine_from_config(settings, 'sqlalchemy.')
    DBSession.configure(bind=engine)

    # here we add some routes to let us interact with our db a bit.
    config = Configurator(settings=settings)
    config.add_route('num_spaces', '/')
    config.add_route('add_space', '/add')
    config.scan()

    # fulfil our contract and make the app for the wsgi server
    return config.make_wsgi_app()

views.py

from pyramid.view import view_config

from .models import DBSession
from .models import ParkingSpace
from .models import ParkingGarage

@view_config(route_name='num_spaces', renderer='json')
def numSpaces(request):
    return {
        'value':DBSession.query(ParkingSpace).count()
        }

@view_config(route_name='add_space', renderer='json')
def addSpace(request):

    # get the id for our model (we wouldn't have to do this if we'd set
    # autoincrement on the model)
    def getHighestId():
        return DBSession.query(
                ParkingSpace
            ).order_by(
                ParkingSpace.id
            )[-1].id

    # if there's not a parking garage yet make one
    if DBSession.query(ParkingGarage).count() == 0:
        DBSession.add(
             ParkingGarage(
                 id=1
                 )
             )

    # make a parking space and add it to the session
    pg = DBSession.query(ParkingGarage).first()
    ps = ParkingSpace(
            id = getHighestId() + 1,
            garage = pg,
            )
    DBSession.add(ps)
    # call flush so that the magic that happens when we add it to the db
    # (is_occupied gets set and garage_id is populated from garage) happens now
    # before it's actually added to the db
    DBSession.flush()

    return {
        'parking_space':{
            'id':ps.id,
            'garage_id':ps.garage_id,
            'is_occupied':ps.is_occupied,
            }
        }

models.py

# sqlalchemy imports
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import Boolean
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
from sqlalchemy.ext.declarative import declarative_base

from zope.sqlalchemy import ZopeTransactionExtension

# well this looks a bit different doesn't it? we had sessionmaker() before,
# what's going on? ZopeTransactionExtension and scoped_session are going to handle our session for
# us so that when our wsgi app makes a request object and starts it through the
# flow of our app the DBSession will be an instance of a session. And if
# there's an exception and we return a 500 or something it'll handle that for
# us too, in fact we can pretty much just import DBSession and use it knowing
# it's all ours and we don't have to think about it. Nice.
# http://docs.sqlalchemy.org/en/latest/orm/session.html#using-thread-local-scope-with-web-applications
# so the object that scoped_session() makes for us isn't really a session, or a
# session factory, but we're going to treat it like a session factory when we
# call configure on it in main() and we're going to treat it like a session
# when we use it within the webapp
DBSession = scoped_session(sessionmaker(extension=ZopeTransactionExtension()))

# all the following is the same as hellomodels.py.
Base = declarative_base()

class ParkingGarage(Base):
   __tablename__ = 'parking_garage'
   id = Column(Integer, primary_key=True)
   numSpots = Column(Integer)

class ParkingSpace(Base):
   __tablename__ = 'parking_space'
   id = Column(Integer, primary_key=True)
   is_occupied = Column(Boolean, default=False)
   garage_id = Column(Integer, ForeignKey('parking_garage.id'))

   garage = relationship(ParkingGarage, backref='spaces')

The important difference between what we’ve seen so far and what we get in a default pyramid app is, instead of createSession = sessionmaker(), the app has DBSession = scoped_session(sessionmaker(extension=ZopeTransactionExtension())) which is a little bit fancier, but is going to serve much the same purpose. is it a session? no. is it a session factory? no. it’s a thread based session registry! it’ll pretend it’s a session and a factory for us, and we won’t have to open and close our own sessions.

So now (finally I know) we can say: so how does this work in my unit tests?
Consider this:

from parkingmanager3000.models import DBSession
from parkingmanager3000 import main
print 'bound to:', DBSession.bind
# bound to: None

settings = {
    'sqlalchemy.url': 'postgres://psqluser:psqlpass@localhost:5432/example'
    }
main(None, **settings)
print 'bound to:', DBSession.bind

If you run this you’ll find that something interesting happens:

parkingmanager3000/__init__.py:10: SAWarning: At least one scoped session is
already present. configure() can not affect sessions that have already been
created.
DBSession.configure(bind=engine)
bound to: None

Whereas if we ran without the first print DBSession.bind we’d get bound to: Engine(postgres://psqluser:psqlpass@localhost:5432/example)

In other words we refer to DBSession the first time and the scoped_session does it’s magic and makes us one, and now we’re stuck with that. And it won’t let us configure it to a different bind. We can handle that in our unittests by making sure that when they run the first touch of DBSession is configuring it to the engine we want bound. Something like the following, although I’d really recommend reading http://sontek.net/blog/detail/writing-tests-for-pyramid-and-sqlalchemy
tests.py

import transaction
import unittest

from sqlalchemy import create_engine
from webtest import TestApp

from parkingmanager3000 import main
from parkingmanager3000.models import DBSession
from parkingmanager3000.models import Base
from parkingmanager3000.models import ParkingGarage

def setUpModule():
    # once for all the tests in this module:

    # create an engine bound to the test db
    engine = create_engine('postgres://psqluser:psqlpass@localhost:5432/example_unittests')

    # first use of DBSession, bind it to our engine
    DBSession.configure(bind=engine)

    # bind our engine to the metadata so we can call drop_all later without
    # having the engine around
    Base.metadata.bind = engine

    # create_all to create tables
    Base.metadata.create_all()

def tearDownModule():
    Base.metadata.drop_all()

class TestTest(unittest.TestCase):
    def setUp(self):
        # wait a minute this is the first time we've seen transaction isn't it?
        # yes. a transaction sits between the db and the session, if we didn't
        # have this everything we added to the session would be available for
        # all tests, and these tests would fail (and actually hang without
        # completing, I don't know why) but with a transaction begin and abort we can use the
        # session and let the transaction throw everything away for us.
        transaction.begin()

    def tearDown(self):
        transaction.abort()

    def testParkingSpaces(self):
        for i in range(5):
            pg = ParkingGarage(
                id = i,
                )
            DBSession.add(pg)
        self.assertEquals(5, DBSession.query(ParkingGarage).count())

    def testParkingSpacesAgain(self):
        for i in range(5, 10):
            pg = ParkingGarage(
                id = i,
                )
            DBSession.add(pg)
        self.assertEquals(5, DBSession.query(ParkingGarage).count())

    def testWithAnApp(self):
       settings = {
           'sqlalchemy.url': 'postgres://no:no@no:1111/no',
           }
       # it doesn't matter what we specify as the db url, because we've already
       # configured the session with the correct binding and it won't let it be
       # bound to a different engine (although it will complain that we're
       # trying)
       app = main(None, **settings)
       testapp = TestApp(app)
       resp = testapp.get('/')
       self.assertEquals(0, resp.json['value'])

if __name__ == '__main__':
    unittest.main()

And what about if I want to use the webapp code from a background process?
Similar to the unittest, the first time you reference DBSession it gets instantiated, so the first thing to do is configure it with the engine you want to use, and then use the webapp code. Something like:

from parkingmanager3000.models import DBSession
engine = makeMyEngine()
DBSession.configure(bind=engine)
runJob()

Next time perhaps we’ll look at whether using a global session registry causes problems with asynchronous code, such as a web app using gevent (pretty sure it does), and how using a per request session like this http://docs.pylonsproject.org/projects/pyramid_cookbook/en/latest/database/sqlalchemy.html can help.

Leave a Reply

Follow Art & Logic on Twitter

Follow Our Blog via RSS

RSS

Connect Socially

A&L on Facebook

A&L on Google+

Home   About   Blog   Careers   Contact

%d bloggers like this: