javaapache-sparkspark-structured-streaming

Spark Java structured streaming dataset filter


code is

Dataset<Row> mainData=df.select( "data.*").filter("data.eventdesc='logout'");
Dataset<Row> groupByData = mainData.groupBy("ipaddress1").count().filter("count > 1");
mainData.filter(mainData.col("ipaddress1").contains(groupByData.col("ipaddress1")));

main-data output is

+-------+----------+------------+---------+---------------------+-----------+
|id     |resource id|resource name|event-desc|event-date       |ipaddress1  |
+-------+----------+------------+---------+---------------------+-----------+
|2010001|119       |Netopia     |logout   |+56975-05-07 23:01:37|25:34:21:44|
|2010001|119       |Netopia     |logout   |+56975-05-07 23:01:37|25:34:21:44|
|2010001|119       |Netopia     |logout   |+56975-05-07 23:01:37|25:34:21:45|
|2010001|119       |Netopia     |logout   |+56975-05-07 23:01:37|25:34:21:45|
|2010001|119       |Netopia     |logout   |+56975-05-07 23:01:37|25:34:21:44|
|2010001|119       |Netopia     |logout   |+56975-05-07 23:01:37|25:34:21:46|
+-------+----------+------------+---------+---------------------+-----------+

group by data is

+-----------+-----+
|ipaddress1  |count|
+-----------+-----+
|25:34:21:45|2    |
|25:34:21:44|3    |
+-----------+-----+

I need to filter the main data which are present in group data code is above but it is not working as expected, can anybody suggest any possible solution?


Solution

  • You are trying to filter mainData based on the results of groupByData. However, Spark does not support direct filtering of a Dataset using another Dataset's column like in SQL joins.

    In order to achieve your goal, you need to join mainData with groupByData on the ipaddress1 column.

    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    
    public class FilterMainData {
        public static void main(String[] args) {
            SparkSession spark = SparkSession.builder()
                    .appName("Filter Main Data")
                    .getOrCreate();
    
            // Example of loading the data
            Dataset<Row> df = ...;  // Load your DataFrame
    
            // Extract and filter the main data
            Dataset<Row> mainData = df.selectExpr("data.*")
                                      .filter("eventdesc = 'logout'");
    
            // Group by ipaddress1 and count, then filter for count > 1
            Dataset<Row> groupByData = mainData.groupBy("ipaddress1")
                                               .count()
                                               .filter("count > 1");
    
            // Join mainData with groupByData on ipaddress1
            Dataset<Row> filteredData = mainData.join(
                    groupByData,
                    mainData.col("ipaddress1").equalTo(groupByData.col("ipaddress1")),
                    "inner"
            ).select(mainData.col("*")); // Select only columns from mainData
    
            // Show the filtered data
            filteredData.show();
        }
    }