pythonamazon-s3web-scrapingscrapy

scrapy tsv file download. how to convert file to parquet before upload to s3


I have a working scrapy project that downloads tsv files and saves them to s3.

I use a custom pipeline to save the original file names with dates.

I am wondering if it is possible to convert the tsv files to parquet before uploading them to s3. If so how would I do this in scrapy?

I should note that I am able to convert the files locally (last code block) but would like to do it inline before they are uploaded to s3.

This is what I have currently working....

##items
class DownfilesItem(scrapy.Item): 
    file_urls = scrapy.Field() 
    files = scrapy.Field() 
    original_file_name = scrapy.Field()
    date = scrapy.Field()
##pipeline to save original file names with dates
class OriginalNameFilesPipeline(FilesPipeline):
    def file_path(self, request, response=None, info=None):
        test = request
        file_name_xml = request.url.split("=")[-1]
        file_name: str = file_name_xml.removesuffix('.tsv') + '_' + datetime.today().strftime("%Y%m%d") + '.' +  file_name_xml.split(".")[-1]
        return file_name
##in my scraper
def parse_all_items(self, response):
        all_urls = [bunch or urls]
        
        for url in all_urls:
            item = DownfilesItem()
            item['file_urls'] = [url]
            item['original_file_name'] = url.split("=")[-1] 
            yield item
##converting tsv to parquet locally 
parse_options = csv.ParseOptions(delimiter="\t")

for name in os.listdir(src_dir):
    localpath = os.path.join(src_dir, name)
    print(localpath)
    if ".tsv" in localpath:
        table = csv.read_csv(localpath, parse_options=parse_options)
        pq.write_table(table, localpath.replace('tsv', 'parquet'))

Solution

  • Using the same example site as in the updated question, here is the spider:

    import scrapy
    import pandas as pd
    from io import StringIO
    
    
    class QuotesSpider(scrapy.Spider):
        name = "parquet"
        start_urls = ["https://onlinetestcase.com/csv-file/"]
    
        def parse(self, response):
            for url in response.css('a::attr(href)').extract():
                if url.endswith('.csv'):
                    url = response.urljoin(url)
    
                    yield scrapy.http.Request(
                        url,
                        callback=self.parse_csv,
                        dont_filter=True
                    )
    
        def parse_csv(self, response):
            yield {
                "url": response.url,
                # Convert CSV text into Data Frame.
                "data": pd.read_csv(StringIO(response.text))
            }
    

    It identifies the CSV links, then follows each link and pulls the data down, converting it into a Data Frame. The Data Frame is yielded along with the source URL.

    The following pipeline then processes all of the resulting objects, converting the CSV filename to a Parquet filename and dumping the Data Frame.

    import re
    from collections import defaultdict
    
    
    class ParquetPipeline:
        def open_spider(self, spider):
            self.items = defaultdict(lambda: [])
    
        def close_spider(self, spider):
            # Iterate over items, writing each to Parquet.
            #
            for name, df in self.items.items():
                df.to_parquet(name, index=False)
    
        def process_item(self, item, spider):
            # Get CSV filename.
            csv = re.search("[^/]+$", item["url"]).group(0)
            # Create Parquet filename.
            parquet = re.sub("\.csv", ".parquet", csv)
    
            self.items[parquet] = item["data"]
    

    ⚠️ Note: For this example to work you will need to set ROBOTSTXT_OBEY = False in settings.py.