I have a raw dataset which comprises of multiple PDF files. I want to be able to get a Foundry Dataset which has the text for each of these pdfs in one column, and the name of the document in the other.
Let's assume xyz.pdf has text asdf Let's assume yyz has text zxcv I want a dataset which has:
doc_name | text |
-----------------
xyz.pdf | asdf |
yyz.pdf | zxcv |
My approach is to open the file, and send to a library which does document parsing (tika, pdfplumber, docquery). When I run something like this:
with source.filesystem().open('xyz.pdf','rb') as f:
pdf = pdfplumber.open(f)
print(pdf)
I get the error:
File "/tmp/conda-xyz/real/environment/lib/python3.8/site-packages/pdfminer/psparser.py", line 220, in seek
self.fp.seek(pos)
io.UnsupportedOperation: File or stream is not seekable.
If I just try and open the file directly, I get:
pdf = pdfplumber.open('xyz.pdf')
I get the error:
File "/tmp/conda-xyz/real/environment/lib/python3.8/site-packages/pdfplumber/pdf.py", line 71, in open
stream = open(path_or_fp, "rb")
FileNotFoundError: [Errno 2] No such file or directory: 'xyz.pdf'
The answer is that filesystem.open
for the Foundry File System does not return same type of object as standard 'open'. To create a seekable I/O buffer, we need to copy the contents to a local file then use Python's open (or a library which uses Python's open).
A code snippet with just the opening of the file is:
with source.filesystem().open(file_path,'rb') as f:
with tempfile.NamedTemporaryFile() as tmp:
shutil.copyfileobj(f, tmp)
tmp.flush()
# you can now use tmp like you would with just the file, ie
pdf = pdfplumber.open(tmp)
Here is a full solution:
from pyspark.sql import functions as F
from transforms.api import transform, transform_df, Input, Output
import pandas as pd
import pdfplumber
import tempfile
import shutil
@transform(
output=Output("/path/to/pdf_text_parsed"),
source=Input("/path/to/pdf_raw_files")
)
list_file = list(source.filesystem().ls())
#since using a loop, keep track of values in empty lists
doc_texts = []
doc_names = []
for file_ in list_file:
file_path = file_.path
with source.filesystem().open(file_path,'rb') as f:
doc_text = []
doc_names.append(file_path)
with tempfile.NamedTemporaryFile() as tmp:
shutil.copyfileobj(f, tmp)
tmp.flush()
pdf = pdfplumber.open(tmp)
#process page by page
for page in pdf.pages:
text = page.extract_text()
doc_text.append(text)
doc_texts.append(doc_text)
d = {'doc_name':doc_names,'doc_text':doc_texts}
result_pd = pd.DataFrame(d)
result = ctx.spark_session.createDataFrame(result_pd)
output.write_dataframe(result)
The end result is a dataset in Foundry called pdf_text_parsed:
doc_name | doc_text |
---------------------
xyz.pdf | asdf |
yyz.pdf | zxcv |