I have a working scrapy project that downloads tsv files and saves them to s3.
I use a custom pipeline to save the original file names with dates.
I am wondering if it is possible to convert the tsv files to parquet before uploading them to s3. If so how would I do this in scrapy?
I should note that I am able to convert the files locally (last code block) but would like to do it inline before they are uploaded to s3.
This is what I have currently working....
##items
class DownfilesItem(scrapy.Item):
file_urls = scrapy.Field()
files = scrapy.Field()
original_file_name = scrapy.Field()
date = scrapy.Field()
##pipeline to save original file names with dates
class OriginalNameFilesPipeline(FilesPipeline):
def file_path(self, request, response=None, info=None):
test = request
file_name_xml = request.url.split("=")[-1]
file_name: str = file_name_xml.removesuffix('.tsv') + '_' + datetime.today().strftime("%Y%m%d") + '.' + file_name_xml.split(".")[-1]
return file_name
##in my scraper
def parse_all_items(self, response):
all_urls = [bunch or urls]
for url in all_urls:
item = DownfilesItem()
item['file_urls'] = [url]
item['original_file_name'] = url.split("=")[-1]
yield item
##converting tsv to parquet locally
parse_options = csv.ParseOptions(delimiter="\t")
for name in os.listdir(src_dir):
localpath = os.path.join(src_dir, name)
print(localpath)
if ".tsv" in localpath:
table = csv.read_csv(localpath, parse_options=parse_options)
pq.write_table(table, localpath.replace('tsv', 'parquet'))
Using the same example site as in the updated question, here is the spider:
import scrapy
import pandas as pd
from io import StringIO
class QuotesSpider(scrapy.Spider):
name = "parquet"
start_urls = ["https://onlinetestcase.com/csv-file/"]
def parse(self, response):
for url in response.css('a::attr(href)').extract():
if url.endswith('.csv'):
url = response.urljoin(url)
yield scrapy.http.Request(
url,
callback=self.parse_csv,
dont_filter=True
)
def parse_csv(self, response):
yield {
"url": response.url,
# Convert CSV text into Data Frame.
"data": pd.read_csv(StringIO(response.text))
}
It identifies the CSV links, then follows each link and pulls the data down, converting it into a Data Frame. The Data Frame is yielded along with the source URL.
The following pipeline then processes all of the resulting objects, converting the CSV filename to a Parquet filename and dumping the Data Frame.
import re
from collections import defaultdict
class ParquetPipeline:
def open_spider(self, spider):
self.items = defaultdict(lambda: [])
def close_spider(self, spider):
# Iterate over items, writing each to Parquet.
#
for name, df in self.items.items():
df.to_parquet(name, index=False)
def process_item(self, item, spider):
# Get CSV filename.
csv = re.search("[^/]+$", item["url"]).group(0)
# Create Parquet filename.
parquet = re.sub("\.csv", ".parquet", csv)
self.items[parquet] = item["data"]
⚠️ Note: For this example to work you will need to set ROBOTSTXT_OBEY = False
in settings.py
.