I have written some scrapy spider and I run it using Crawler process and I want it to run as a prefect flow. This is my function to run CrawlerProcess
from prefect import flow
from SpyingTools.spiders.bankWebsiteNews import BankNews
from scrapy.crawler import CrawlerProcess
@flow
def bank_website_news():
settings = get_project_settings()
process = CrawlerProcess(settings)
process.crawl(BankNews)
process.start()
Add more Info: This is my BankNews class
class BankNews(scrapy.Spider):
sph= SpiderHelper()
name ='BANKWEBSITE'
latest_date = None
bank = "ACLEDA"
index_news = 2
image_base_url = 'https://www.acledabank.com.kh/kh'
custom_settings = {
'ITEM_PIPELINES': {
'SpyingTools.pipelines.DataPipeline': 400,
}
}
def generateDaterange(self,start_date=None):
if start_date==None:
start_date = date(2021, 1, 1)
end_date = datetime.today()
print(end_date) # perhaps date.now()
delta = end_date - start_date
dates = [0]*(delta.days+1 )
for i in range(delta.days + 1):
day = start_date + timedelta(days=i)
dates[i] = str(day).replace("-","")
return dates
def start_requests(self):
base_url = "https://www.acledabank.com.kh/kh/eng/md_ln"
self.latest_date = self.sph.get_latest_date_news(self.bank,self.bank).values[0][0]
self.latest_date = self.latest_date + timedelta(days=1)
dates= self.generateDaterange(self.latest_date)
print(dates)
for date_ in dates:
yield scrapy.Request(base_url+date_, self.parse)
def parse(self,response):
news = NewsItem()
url_r = str(response.request.url).split("_ln")[-1]
if "_" in url_r:
self.index_news+=1
else:
self.index_news= 2
container = response.css("div.main")[0]
div_khmer = container.css('div.font-khm').get()
h1_khmer = container.css('h1.font-khm').get()
title = container.css('h1::text').get()
date_ = container.css('p.date::text').get()
img = container.css("div.imgbox img::attr(src)").get()
img_link = self.image_base_url+ img.split('..')[-1]
if date_:
content = "\n".join(container.xpath('p//text()')[7:].getall())
else:
content = "\n".join(container.xpath('p//text()')[6:].getall())
yield scrapy.Request(response.request.url+"_"+str(self.index_news), self.parse)
if div_khmer==None and h1_khmer==None:
news['time'] =date_
news['title'] = title
news['content'] = content
news['name'] = self.bank.upper()
news['link'] = response.request.url
news['image'] = img_link
news["source"] = self.bank
yield news
And this is my Datapipeline
class DataPipeline(PipelineBase):
def __init__(self) -> None:
super().__init__()
def process_item(self, item, spider):
if isinstance(item, NewsItem):
df = pd.DataFrame([
[
item['time'],item['title'],
item['content'],item["name"],item["link"],item["image"]
]
],
columns=["DATE","TITLE","CONTENT","NAME","URL","IMAGE"])
try:
df["DATE"] = pd.to_datetime(df["DATE"])
except:
df["DATE"] = datetime.datetime.now()
if item.get("source"):
df["WEBSITE"]= item.get("source")
else:
df["WEBSITE"] = 'khmertimeskh'
today = date.today()
df["DOWNLOAD_DATE"] = today
df.to_sql("NEWS", self.engine,index=False,if_exists='append',
dtype={
"TITLE": sqlalchemy.types.NVARCHAR(),
"CONTENT":sqlalchemy.types.NVARCHAR(),
'DATE': sqlalchemy.types.DATETIME(),
'DOWNLOAD_DATE':sqlalchemy.types.DATE()
}
)
return df
else:
return item
Bank news is a spider that I wrote to scrape news
And this is the error that I got when I try to run bank_website_news() function as a prefect flow
File "D:\Development\spyingtool\venv\Lib\site-packages\twisted\internet\base.py", line 1282, in _handleSignals
signal.signal(signal.SIGTERM, reactorBaseSelf.sigTerm)
File "C:\Users\seab.navin\AppData\Local\Programs\Python\Python311\Lib\signal.py", line 56, in signal
handler = _signal.signal(_enum_to_int(signalnum), _enum_to_int(handler))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: signal only works in main thread of the main interpreter
Does anyone know how to solve this problem and running Scrapy with prefect flow ?
I've tried multiple solutions including Asycio library for a synchronous function and CrawlRunner, but it did not work well, So I decide to change from CrawlerProcess to run Scrapy command line instead and it works well,this is my new change code
import subprocess
from prefect import task,flow,get_run_logger
@task
def run_query():
query = 'scrapy crawl BANKWEBSITE'
proc = subprocess.Popen(query, stdout=subprocess.PIPE, stderr=subprocess.PIPE,shell=True)
stdout, stderr = proc.communicate()
if proc.returncode != 0:
raise Exception(stderr.decode())
@flow
def run_all_task():
run_query()