当前位置:  开发笔记 > 编程语言 > 正文

有人在Scrapy中有sqlite管道的示例代码吗?

如何解决《有人在Scrapy中有sqlite管道的示例代码吗?》经验,为你挑选了2个好方法。

我正在寻找Scrapy中SQLite管道的一些示例代码.我知道没有内置的支持,但我确信它已经完成了.只有实际的代码可以帮助我,因为我只知道足够的Python和Scrapy来完成我非常有限的任务,并且需要以代码作为起点.



1> 小智..:

我做了这样的事情:

#
# Author: Jay Vaughan
#
# Pipelines for processing items returned from a scrape.
# Dont forget to add pipeline to the ITEM_PIPELINES setting
# See: http://doc.scrapy.org/topics/item-pipeline.html
#
from scrapy import log
from pysqlite2 import dbapi2 as sqlite

# This pipeline takes the Item and stuffs it into scrapedata.db
class scrapeDatasqLitePipeline(object):
    def __init__(self):
        # Possible we should be doing this in spider_open instead, but okay
        self.connection = sqlite.connect('./scrapedata.db')
        self.cursor = self.connection.cursor()
        self.cursor.execute('CREATE TABLE IF NOT EXISTS myscrapedata ' \
                    '(id INTEGER PRIMARY KEY, url VARCHAR(80), desc VARCHAR(80))')

    # Take the item and put it in database - do not allow duplicates
    def process_item(self, item, spider):
        self.cursor.execute("select * from myscrapedata where url=?", item['url'])
        result = self.cursor.fetchone()
        if result:
            log.msg("Item already in database: %s" % item, level=log.DEBUG)
        else:
            self.cursor.execute(
                "insert into myscrapedata (url, desc) values (?, ?)",
                    (item['url'][0], item['desc'][0])

            self.connection.commit()

            log.msg("Item stored : " % item, level=log.DEBUG)
        return item

    def handle_error(self, e):
        log.err(e)



2> Levon..:

这是一个带sqlalchemy的sqlite管道.使用sqlalchemy,您可以根据需要轻松更改数据库.

settings.py添加数据库配置中

# settings.py
# ...
DATABASE = {
    'drivername': 'sqlite',
    # 'host': 'localhost',
    # 'port': '5432',
    # 'username': 'YOUR_USERNAME',
    # 'password': 'YOUR_PASSWORD',
    'database': 'books.sqlite'
}

然后pipelines.py添加以下内容

# pipelines.py
import logging

from scrapy import signals
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy import create_engine
from sqlalchemy.engine.url import URL
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import NullPool

logger = logging.getLogger(__name__)

DeclarativeBase = declarative_base()

class Book(DeclarativeBase):
    __tablename__ = "books"

    id = Column(Integer, primary_key=True)
    title = Column('title', String)
    author = Column('author', String)
    publisher = Column('publisher', String)
    url = Column('url', String)
    scrape_date = Column('scrape_date', DateTime)

    def __repr__(self):
        return "".format(self.url)


class SqlitePipeline(object):
    def __init__(self, settings):
        self.database = settings.get('DATABASE')
        self.sessions = {}

    @classmethod
    def from_crawler(cls, crawler):
        pipeline = cls(crawler.settings)
        crawler.signals.connect(pipeline.spider_opened, signals.spider_opened)
        crawler.signals.connect(pipeline.spider_closed, signals.spider_closed)
        return pipeline

    def create_engine(self):
        engine = create_engine(URL(**self.database), poolclass=NullPool, connect_args = {'charset':'utf8'})
        return engine

    def create_tables(self, engine):
        DeclarativeBase.metadata.create_all(engine, checkfirst=True)

    def create_session(self, engine):
        session = sessionmaker(bind=engine)()
        return session

    def spider_opened(self, spider):
        engine = self.create_engine()
        self.create_tables(engine)
        session = self.create_session(engine)
        self.sessions[spider] = session

    def spider_closed(self, spider):
        session = self.sessions.pop(spider)
        session.close()

    def process_item(self, item, spider):
        session = self.sessions[spider]
        book = Book(**item)
        link_exists = session.query(Book).filter_by(url=item['url']).first() is not None

        if link_exists:
            logger.info('Item {} is in db'.format(book))
            return item

        try:
            session.add(book)
            session.commit()
            logger.info('Item {} stored in db'.format(book))
        except:
            logger.info('Failed to add {} to db'.format(book))
            session.rollback()
            raise

        return item

并且items.py应该看起来像这样

#items.py
import scrapy

class BookItem(scrapy.Item):
    title = scrapy.Field()
    author = scrapy.Field()
    publisher = scrapy.Field()
    scrape_date = scrapy.Field()

您也可以考虑class Book进入items.py

推荐阅读
吻过彩虹的脸_378
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有