rustapache-arrow-datafusion

Extract Year, Month, Day from Unix TimeStamp column in Rust DataFusion DataFrame?


I have created a DataFusion DataFrame:

| asin       | vote | verified | unixReviewTime | reviewText      |
+------------+------+----------+----------------+-----------------+
| 0486427706 | 3    | true     | 1381017600     | good            |
| 0486427707 |      | false    | 1376006400     | excellent       |
| 0486427707 | 1    | true     | 1459814400     | Did not like it |
| 0486427708 | 4    | false    | 1376006400     |                 |
+------------+------+----------+----------------+-----------------+

I was trying to find the solution of following information from the API document, but could not figure it out:

Here is how json datafile looks like:

{"asin": "0486427706", "vote": 3, "verified": true, "unixReviewTime": 1381017600, "reviewText": "good", "overall": 5.0}
{"asin": "0486427707", "vote": null, "verified": false, "unixReviewTime": 1376006400, "reviewText": "excellent", "overall": 5.0}
{"asin": "0486427707", "vote": 1, "verified": true, "unixReviewTime": 1459814400, "reviewText": "Did not like it", "overall": 2.0}
{"asin": "0486427708", "vote": 4, "verified": false, "unixReviewTime": 1376006400, "reviewText": null, "overall": 4.0}

It is very easy to do in pyspark as follows:

from PySpark.sql import functions as fn
from PySpark.sql.functions import col

main_df = (
    main_df
    .withColumn(
        'reviewed_at',
        fn.from_unixtime(col('unixReviewTime'))
    )
)

main_df = main_df.withColumn("reviewed_year", fn.year(col("reviewed_at")))
main_df = main_df.withColumn("reviewed_month", fn.month(col("reviewed_at")))

Solution

  • use datafusion::prelude::*;
    use datafusion::error::Result;
    use datafusion::arrow::datatypes::{DataType, TimeUnit};
    
    #[tokio::main]
    async fn main() -> Result<()> {
        let mut ctx = SessionContext::new();
        let df = ctx
            .read_json("/tmp/data.json", NdJsonReadOptions::default())
            .await?
            .with_column(
                "unixReviewTimestamp",
                cast(
                    col("unixReviewTime"),
                    DataType::Timestamp(TimeUnit::Millisecond, None),
                ),
            )?
            .with_column(
                "reviewed_year",
                date_part(lit("year"), col("unixReviewTimestamp")),
            )?
            .with_column(
                "reviewed_month",
                date_part(lit("month"), col("unixReviewTimestamp")),
            )?;
        df.show().await?;
        Ok(())
    }
    
    fn cast(expr: Expr, data_type: DataType) -> Expr {
        Expr::Cast {
            expr: Box::new(expr),
            data_type,
        }
    }
    

    Produces:

    +------------+------+----------+----------------+-----------------+---------+-------------------------+---------------+----------------+
    | asin       | vote | verified | unixReviewTime | reviewText      | overall | unixReviewTimestamp     | reviewed_year | reviewed_month |
    +------------+------+----------+----------------+-----------------+---------+-------------------------+---------------+----------------+
    | 0486427706 | 3    | true     | 1381017600     | good            | 5       | 1970-01-16 23:36:57.600 | 1970          | 1              |
    | 0486427707 |      | false    | 1376006400     | excellent       | 5       | 1970-01-16 22:13:26.400 | 1970          | 1              |
    | 0486427707 | 1    | true     | 1459814400     | Did not like it | 2       | 1970-01-17 21:30:14.400 | 1970          | 1              |
    | 0486427708 | 4    | false    | 1376006400     |                 | 4       | 1970-01-16 22:13:26.400 | 1970          | 1              |
    +------------+------+----------+----------------+-----------------+---------+-------------------------+---------------+----------------+