I have created a DataFusion DataFrame:
| asin | vote | verified | unixReviewTime | reviewText |
+------------+------+----------+----------------+-----------------+
| 0486427706 | 3 | true | 1381017600 | good |
| 0486427707 | | false | 1376006400 | excellent |
| 0486427707 | 1 | true | 1459814400 | Did not like it |
| 0486427708 | 4 | false | 1376006400 | |
+------------+------+----------+----------------+-----------------+
I was trying to find the solution of following information from the API document, but could not figure it out:
unixReviewTime
column into Rust Native timestampHere is how json datafile looks like:
{"asin": "0486427706", "vote": 3, "verified": true, "unixReviewTime": 1381017600, "reviewText": "good", "overall": 5.0}
{"asin": "0486427707", "vote": null, "verified": false, "unixReviewTime": 1376006400, "reviewText": "excellent", "overall": 5.0}
{"asin": "0486427707", "vote": 1, "verified": true, "unixReviewTime": 1459814400, "reviewText": "Did not like it", "overall": 2.0}
{"asin": "0486427708", "vote": 4, "verified": false, "unixReviewTime": 1376006400, "reviewText": null, "overall": 4.0}
It is very easy to do in pyspark as follows:
from PySpark.sql import functions as fn
from PySpark.sql.functions import col
main_df = (
main_df
.withColumn(
'reviewed_at',
fn.from_unixtime(col('unixReviewTime'))
)
)
main_df = main_df.withColumn("reviewed_year", fn.year(col("reviewed_at")))
main_df = main_df.withColumn("reviewed_month", fn.month(col("reviewed_at")))
use datafusion::prelude::*;
use datafusion::error::Result;
use datafusion::arrow::datatypes::{DataType, TimeUnit};
#[tokio::main]
async fn main() -> Result<()> {
let mut ctx = SessionContext::new();
let df = ctx
.read_json("/tmp/data.json", NdJsonReadOptions::default())
.await?
.with_column(
"unixReviewTimestamp",
cast(
col("unixReviewTime"),
DataType::Timestamp(TimeUnit::Millisecond, None),
),
)?
.with_column(
"reviewed_year",
date_part(lit("year"), col("unixReviewTimestamp")),
)?
.with_column(
"reviewed_month",
date_part(lit("month"), col("unixReviewTimestamp")),
)?;
df.show().await?;
Ok(())
}
fn cast(expr: Expr, data_type: DataType) -> Expr {
Expr::Cast {
expr: Box::new(expr),
data_type,
}
}
Produces:
+------------+------+----------+----------------+-----------------+---------+-------------------------+---------------+----------------+
| asin | vote | verified | unixReviewTime | reviewText | overall | unixReviewTimestamp | reviewed_year | reviewed_month |
+------------+------+----------+----------------+-----------------+---------+-------------------------+---------------+----------------+
| 0486427706 | 3 | true | 1381017600 | good | 5 | 1970-01-16 23:36:57.600 | 1970 | 1 |
| 0486427707 | | false | 1376006400 | excellent | 5 | 1970-01-16 22:13:26.400 | 1970 | 1 |
| 0486427707 | 1 | true | 1459814400 | Did not like it | 2 | 1970-01-17 21:30:14.400 | 1970 | 1 |
| 0486427708 | 4 | false | 1376006400 | | 4 | 1970-01-16 22:13:26.400 | 1970 | 1 |
+------------+------+----------+----------------+-----------------+---------+-------------------------+---------------+----------------+