PostgreSQL rowcount with SQLAlchemy/psycopg2 Streaming Cursor
SQLAlchemy ResultProxy.rowcount
does work with SELECT
statements when using psycopg2 with PostgreSQL, despite the warnings - as long as you aren’t streaming the results. This is because psycopg2 uses libpq PQexec
along with PQcmdTuples
to retreive the result count (PQexec
always collects the command’s entire result, buffering it in a single PGresult
).
Using SQLAlchemy stream_results
causes psycopg2 to use a named server-side cursor via PostgreSQL DECLARE
. This will stream result records on demand, but ResultProxy.rowcount
will not reflect the total result count.
To workaround this you can configure the psycopg2 server-side cursor to be scrollable
(this allows moving backwards in the resultset). Then after the streaming query, execute MOVE FORWARD ALL
to move to the end of the results without fetching any. PQcmdTuples
will then set the pscyopg2 rowcount and you can then scroll absolute back to the beginning of the results and process them streaming.
>>> import sqlalchemy as sa
>>>
>>> table = sa.Table("testable", sa.MetaData(),
... sa.Column("_id", sa.Integer, primary_key=True),
... sa.Column("value", sa.String))
>>>
>>> engine = sa.create_engine("postgresql://scott:tiger@localhost:5432/mydatabase")
>>> table.drop(engine, checkfirst=True)
>>> table.create(engine, checkfirst=True)
>>> values = ["frog", "horse", "frog", "fish", "frog", "dog", "cow", "cat", "frog"]
>>> engine.execute(table.insert(), [{"_id": i, "value": v} for i, v in enumerate(values)])
<sqlalchemy.engine.result.ResultProxy object at 0x7f2e04d6cc50>
>>>
>>> query = table.select().where(table.c.value == "frog")
>>>
>>> print "non-streaming rowcount", engine.execute(query).rowcount
non-streaming rowcount 4
>>>
>>> with engine.connect().execution_options(stream_results=True) as conn:
... results = conn.execute(query)
... print "incorrect streaming rowcount", results.rowcount
... for r in results:
... print r
...
incorrect streaming rowcount 1
(0, u'frog')
(2, u'frog')
(4, u'frog')
(8, u'frog')
>>>
>>> with engine.connect() as conn:
... def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
... cursor.scrollable = True
... streamconn = conn.execution_options(stream_results=True)
... sa.event.listen(streamconn, "before_cursor_execute", before_cursor_execute)
... results = streamconn.execute(query)
... r = conn.execute("MOVE FORWARD ALL FROM {}".format(results.cursor.name))
... print "correct streaming rowcount", r.rowcount + results.rowcount
... results.cursor.scroll(results.rowcount, mode="absolute")
... for r in results:
... print r
...
correct streaming rowcount 4
(0, u'frog')
(2, u'frog')
(4, u'frog')
(8, u'frog')