所以,我有三张桌子:
阶级辩护:
engine = create_engine('sqlite://test.db', echo=False) SQLSession = sessionmaker(bind=engine) Base = declarative_base() class Channel(Base): __tablename__ = 'channel' id = Column(Integer, primary_key = True) title = Column(String) description = Column(String) link = Column(String) pubDate = Column(DateTime) class User(Base): __tablename__ = 'user' id = Column(Integer, primary_key = True) username = Column(String) password = Column(String) sessionId = Column(String) class Subscription(Base): __tablename__ = 'subscription' userId = Column(Integer, ForeignKey('user.id'), primary_key=True) channelId = Column(Integer, ForeignKey('channel.id'), primary_key=True)
注意:我知道user.username应该是唯一的,需要修复它,我不确定为什么SQLalchemy使用双引号创建一些行名称.
我正在尝试找到一种方法来检索所有频道,以及指示一个特定用户(由user.sessionId与user.id一起识别)的订阅频道.
例如,假设我们有四个通道:channel1,channel2,channel3,channel4; 用户:user1; 谁在channel1和channel4上订阅.user1的查询将返回如下内容:
channel.id | channel.title | subscribed --------------------------------------- 1 channel1 True 2 channel2 False 3 channel3 False 4 channel4 True
这是一个最好的结果,但由于我完全不知道如何完成订阅列,我一直试图在用户订阅的行中获取特定用户ID并且缺少订阅,把它留空.
我正在与SQLalchemy atm一起使用的数据库引擎.是sqlite3
我已经在这两天抓住了这个问题,我没有问题通过订阅表连接所有这三个,但是然后所有用户没有订阅的频道都被省略了.
我希望我能够充分描述我的问题,提前感谢.
编辑:管理以涉及子查询的略微笨重的方式解决这个问题:
# What a messy SQL query! stmt = query(Subscription).filter_by(userId = uid()).join((User, Subscription.userId == User.id)).filter_by(sessionId = id()).subquery() subs = aliased(Subscription, stmt) results = query(Channel.id, Channel.title, subs.userId).outerjoin((subs, subs.channelId == Channel.id))
但是,我将继续寻找更优雅的解决方案,所以答案仍然非常受欢迎.
选项1:
Subscription
只是一个多对多的关系对象,我建议你把它建模为这样,而不是作为一个单独的类.请参阅配置多对多关系文档SQLAlchemy/declarative
.
您使用测试代码建模成为:
from sqlalchemy import create_engine, Column, Integer, DateTime, String, ForeignKey, Table from sqlalchemy.orm import relation, scoped_session, sessionmaker, eagerload from sqlalchemy.ext.declarative import declarative_base engine = create_engine('sqlite:///:memory:', echo=True) session = scoped_session(sessionmaker(bind=engine, autoflush=True)) Base = declarative_base() t_subscription = Table('subscription', Base.metadata, Column('userId', Integer, ForeignKey('user.id')), Column('channelId', Integer, ForeignKey('channel.id')), ) class Channel(Base): __tablename__ = 'channel' id = Column(Integer, primary_key = True) title = Column(String) description = Column(String) link = Column(String) pubDate = Column(DateTime) class User(Base): __tablename__ = 'user' id = Column(Integer, primary_key = True) username = Column(String) password = Column(String) sessionId = Column(String) channels = relation("Channel", secondary=t_subscription) # NOTE: no need for this class # class Subscription(Base): # ... Base.metadata.create_all(engine) # ###################### # Add test data c1 = Channel() c1.title = 'channel-1' c2 = Channel() c2.title = 'channel-2' c3 = Channel() c3.title = 'channel-3' c4 = Channel() c4.title = 'channel-4' session.add(c1) session.add(c2) session.add(c3) session.add(c4) u1 = User() u1.username ='user1' session.add(u1) u1.channels.append(c1) u1.channels.append(c3) u2 = User() u2.username ='user2' session.add(u2) u2.channels.append(c2) session.commit() # ###################### # clean the session and test the code session.expunge_all() # retrieve all (I assume those are not that many) channels = session.query(Channel).all() # get subscription info for the user #q = session.query(User) # use eagerload(...) so that all 'subscription' table data is loaded with the user itself, and not as a separate query q = session.query(User).options(eagerload(User.channels)) for u in q.all(): for c in channels: print (c.id, c.title, (c in u.channels))
产生以下输出:
(1, u'channel-1', True) (2, u'channel-2', False) (3, u'channel-3', True) (4, u'channel-4', False) (1, u'channel-1', False) (2, u'channel-2', True) (3, u'channel-3', False) (4, u'channel-4', False)
请注意使用eagerload
,User
当channels
要求时,每个只发出1个SELECT语句而不是1个.
选项2:
但是如果你想让你保持模型,只是创建一个SA查询,可以根据你的要求提供列,那么下面的查询就可以完成这项工作:
from sqlalchemy import and_ from sqlalchemy.sql.expression import case #... q = (session.query(#User.username, Channel.id, Channel.title, case([(Subscription.channelId == None, False)], else_=True) ).outerjoin((Subscription, and_(Subscription.userId==User.id, Subscription.channelId==Channel.id)) ) ) # optionally filter by user q = q.filter(User.id == uid()) # assuming uid() is the function that provides user.id q = q.filter(User.sessionId == id()) # assuming uid() is the function that provides user.sessionId res = q.all() for r in res: print r
输出与上面的选项-1完全相同.