regexapache-sparkpysparkapache-spark-sql

How can I get the last specific character from a UDF with a regex in PySpark?


I am trying to use a regular expression (regex) to extract the last character from a PySpark DataFrame in order to perform some data cleanup and parsing into columns.

Currently, I am using a UDF (User Defined Function), where one of the fields will be user input. I'm specifically looking for a way to capture the last closing square bracket ]. I've tried a few different approaches but keep getting stuck, so I would appreciate any assistance you can provide. Thank you in advance!

sample text:

* the cat in the hat is fat\r\n
* synnopsis -- hey the cat who is wearing the hat is getting to big for its outfit as evidenced by this photo [img123123.png] \r\n
* please get a new cat, or a new hat I would suggest something like this [newhatforcat.jpg]
signed
bob <bob@somecompany.com>

These body texts get put into a Python list, and I an doing a pyspark function to split on the last ]. This is a problem if someone uses the ] char or outlook decides to put the [imgblala.jpg] inside the raw text, then all my columns get over written.

I tried inserting a regex that works on regex101 to capture the last ], which should be the ] from the python list but I am getting an undefined, or else if I just use the ] to split, we would split on the img123123.jpg.

When putting regex into a split, is there some specific char I need to use before the delim column? Here is an example of what I am trying to do.

from pyspark.sql.functions import split

one_value = "['to@abc.com', '[* the cat in the hat is fat\r\n* synnopsis -- hey the cat who is wearing the hat is getting to big for its outfit as evidenced by this photo [img123123.png] \r\n* please get a new cat, or a new hat I would suggest something like this [newhatforcat.jpg]\r\nsigned\r\nbob <bob@somecompany.com>]', 'from@abc.com']"


df = spark.createDataFrame(data=[(one_value,),], schema='emailobj: string')

df = df.withColumn("to", split(df["emailobj"], "]").getItem(0)) \
                     .withColumn("from", split(df["emailobj"], "]").getItem(2)) \
                     .withColumn("body", split(df["emailobj"], r"[^]]*]$").getItem(3)) \
                     .withColumn("messageid", split(df["emailobj"], "]").getItem(4)) \
                     .withColumn("subject", split(df["emailobj"], "]").getItem(6))

I either get an undefined or the split doesnt split on the last ]. This regex seems to work on regex101 as a java regex. Here is a link: (https://regex101.com/r/1ZbVG3/2) Do I need to do something for spark to see it correctly?


Solution

  • Assuming values in that column are always well formatted arrays, you can just use eval() to convert it to an array. Note the little trick about encode('unicode_escape') to address the fact that you have unescaped \n\r in the body,

    >>> from pyspark.sql import functions as F
    >>> from pyspark.sql.types import ArrayType, StringType, StructField, StructType
    >>>
    >>>
    ...     StructField("to", StringType(), False),
    ...     StructField("body", StringType(), False),
    ...     StructField("from", StringType(), False),
    ... ])
    >>>
    >>> email_split = F.udf(lambda s: eval(s.encode('unicode_escape')), return_type)
    >>> one_value = "['to@abc.com', '[* the cat in the hat is fat\r\n* synnopsis -- hey the cat who is wearing the hat is getting to big for its outf
    it as evidenced by this photo [img123123.png] \r\n* please get a new cat, or a new hat I would suggest something like this [newhatforcat.jpg]\r\n
    signed\r\nbob <bob@somecompany.com>]', 'from@abc.com']"
    >>>
    >>> df = spark.createDataFrame(data=[(one_value,),], schema='emailobj: string')
    >>> df = df.withColumn('emailobj_split', email_split(df.emailobj))
    >>> df.printSchema()
    root
     |-- emailobj: string (nullable = true)
     |-- emailobj_split: struct (nullable = true)
     |    |-- to: string (nullable = false)
     |    |-- body: string (nullable = false)
     |    |-- from: string (nullable = false)
    
    >>> df2 = df.select('emailobj_split.*')
    >>> df2.show()
    +----------+--------------------+------------+
    |        to|                body|        from|
    +----------+--------------------+------------+
    |to@abc.com|[* the cat in the...|from@abc.com|
    +----------+--------------------+------------+
    
    >>> df2.show(truncate=False)
    +----------+-------------------------------------------------------------------------------------------------------------------------------------
    -------------------------------------------------------------------------------------------------------------------------------------------------
    ---------------+------------+
    |to        |body
    
                   |from        |
    +----------+-------------------------------------------------------------------------------------------------------------------------------------
    -------------------------------------------------------------------------------------------------------------------------------------------------
    ---------------+------------+
    |to@abc.com|[* the cat in the hat is fat\r\n* synnopsis -- hey the cat who is wearing the hat is getting to big for its outfit as evidenced by th
    is photo [img123123.png] \r\n* please get a new cat, or a new hat I would suggest something like this [newhatforcat.jpg]\r\nsigned\r\nbob <bob@so
    mecompany.com>]|from@abc.com|
    +----------+-------------------------------------------------------------------------------------------------------------------------------------
    -------------------------------------------------------------------------------------------------------------------------------------------------
    ---------------+------------+
    >>>
    
    email_split = F.udf(lambda s: eval(s.encode('unicode_escape')), return_type)
    
    # line above is a short version of elaborate one below. 
    # You can use the one below, if you want more than one liner lambda
    # as your implementation.
    
    @F.udf(returnType=return_type)
    def email_split(s):
        arr = eval(s.encode('unicode_escape'))
        return arr