I'm trying to compute a sentence-transformers model for various rows stored in BigQuery, and then store them in a feather
dataframe in Google Cloud Storage.
However, I'm having problems in saving the actual dataframe. I'm not able to save it locally or in Google Cloud Storage, but get no error.
Here's a reproducible example I've come up with:
import apache_beam as beam
from apache_beam.ml.inference.base import (
ModelHandler,
PredictionResult,
RunInference,
)
from sentence_transformers import SentenceTransformer
import argparse
from apache_beam.options.pipeline_options import PipelineOptions
from typing import Sequence, Optional, Any, Dict, Iterable
from apache_beam.ml.inference.base import KeyedModelHandler
from apache_beam.dataframe.convert import to_dataframe
ENCODING_MODEL_NAME = "distiluse-base-multilingual-cased-v1"
class EmbeddingModelHandler(
ModelHandler[str, PredictionResult, SentenceTransformer]
):
def __init__(self, model_name: str = ENCODING_MODEL_NAME):
self._model_name = model_name
def load_model(self) -> SentenceTransformer:
from sentence_transformers import (
SentenceTransformer,
) # <- These imports are needed otherwise GCP complains
import sentence_transformers
return sentence_transformers.SentenceTransformer(self._model_name)
def run_inference(
self,
batch: Sequence[str],
model: SentenceTransformer,
inference_args: Optional[Dict[str, Any]] = None,
) -> Iterable[PredictionResult]:
from sentence_transformers import SentenceTransformer
import sentence_transformers
embedding_matrix = model.encode(
batch, show_progress_bar=True, normalize_embeddings=True
)
return embedding_matrix
class GetFeatures(beam.DoFn):
def process(self, element):
feature = element.get("overview", "")
iid = element.get("iid")
return [(iid, feature)]
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
"--output",
dest="output",
required=True,
help="Output file to write results to.",
)
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
with beam.Pipeline(options=pipeline_options) as pipeline:
embedding_dataframe = (
pipeline
| "Read BigQuery"
>> beam.io.ReadFromBigQuery(
query="""SELECT text_to_embed,
identifier
FROM [gcp-project:gcp-dataset.gcp-table]
LIMIT 20
""",
project="gcp-project",
gcs_location="gs://ml-apache-beam/tmp/",
)
| "Get features" >> beam.ParDo(GetFeatures())
| "Run inference"
>> RunInference(
KeyedModelHandler(EmbeddingModelHandler(ENCODING_MODEL_NAME))
)
| "To Rows"
>> beam.Map(
lambda element: __import__("beam").Row(
biggint=int(element[0]), embedding=element[1].tolist()
)
)
)
df = to_dataframe(embedding_dataframe)
df.to_feather(known_args.output)
if __name__ == "__main__":
run()
And my requirements.txt
:
sentence-transformers==2.2.2
With python 3.8.14
To run it locally, I use:
python beam_pipeline.py --requirements_file requirements.txt --output embedding_output.feather
Which runs fine, but I see no embedding_output.feather
in the directory.
And to run it on GCP:
python beam_pipeline.py --requirements_file requirements.txt --output "gs://my-bucket/embedding_output.feather" --runner DataflowRunner --project my-gcp-project --region us-central1
Also runs fine, but the gs://my-bucket/embedding_output.feather
file is not there as well.
Thanks @TheNeuralBit for the help here!
The problem is that this part of the code:
df = to_dataframe(embedding_dataframe)
df.to_feather(known_args.output)
Was outside of the pipeline block:
with beam.Pipeline(options=pipeline_options) as pipeline:
I indented the to_dataframe
part and it all worked great:
# ...
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
"--output",
dest="output",
required=True,
help="Output file to write results to.",
)
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
with beam.Pipeline(options=pipeline_options) as pipeline:
embedding_dataframe = (
pipeline
| "Read BigQuery"
>> beam.io.ReadFromBigQuery(
query="""SELECT text_to_embed,
identifier
FROM [gcp-project:gcp-dataset.gcp-table]
LIMIT 20
""",
project="gcp-project",
gcs_location="gs://ml-apache-beam/tmp/",
)
| "Get features" >> beam.ParDo(GetFeatures())
| "Run inference"
>> RunInference(
KeyedModelHandler(EmbeddingModelHandler(ENCODING_MODEL_NAME))
)
| "To Rows"
>> beam.Map(
lambda element: __import__("beam").Row(
biggint=int(element[0]), embedding=element[1].tolist()
)
)
)
df = to_dataframe(embedding_dataframe)
df.to_feather(known_args.output)
if __name__ == "__main__":
run()