javaapache-sparkhadoopapache-spark-sql

org.apache.hadoop.fs.ParentNotDirectoryException - is not a directory


I am trying to append new records into existing csv file using Spark SQL.

Getting Exception in thread "main" org.apache.hadoop.fs.ParentNotDirectoryException: /tmp/spark/input/csv/orders.csv (is not a directory) as the input is a File and not a directory.

Spark (Java) code and exception stack trace mentioned below.

How to resolve this?

Environment Info - Mac OS

MyiCloud@MyMC someDir % uname -a
Darwin MyMC.local 21.6.0 Darwin Kernel Version 21.6.0: Fri Sep 15 16:17:23 PDT 2023; root:xnu-8020.240.18.703.5~1/RELEASE_X86_64 x86_64

Hadoop Environment Info

MyiCloud@MyMC someDir % hadoop version

Hadoop 3.3.6
Source code repository https://github.com/apache/hadoop.git -r 1be78238728da9266a4f88195058f08fd012bf9c
Compiled by ubuntu on 2023-06-18T08:22Z
Compiled on platform linux-x86_64
Compiled with protoc 3.7.1
From source with checksum 5652179ad55f76cb287d9c633bb53bbd
This command was run using /usr/local/Cellar/hadoop/3.3.6/libexec/share/hadoop/common/hadoop-common-3.3.6.jar

File Permission - HDFS

MyiCloud@MyMC sbin % hdfs dfs -ls /tmp/spark/input/csv/
2023-11-20 01:35:51,752 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 1 items
-rwxr-xr-x   1 MyiCloud supergroup        268 2023-11-20 00:58 /tmp/spark/input/csv/orders.csv

Input file - orders.csv

firstName,lastName,state,quantity,revenue,timestamp
Jean Georges,Perrin,NC,1,300,1551903533
Jean Georges,Perrin,NC,2,120,1551903567
Jean Georges,Perrin,CA,4,75,1551903599
Holden,Karau,CA,6,37,1551904299
Ginni,Rometty,NY,7,91,1551916792
Holden,Karau,CA,4,153,1552876129

Apache Spark with Java Code

package org.example.ch11;

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;

import java.util.Collections;

public class InsertApp {

    public static void main(String[] args) {
        InsertApp inApp = new InsertApp();
        inApp.append();
    }

    private void append() {
        SparkSession spark = SparkSession.builder()
                .appName("InsertProcessor")
                .master("local[*]").getOrCreate();
        Dataset<Row> df = spark.read().format("csv")
                .option("inferSchema", true)
                .option("header", true)
                .load("hdfs://localhost:8020/tmp/spark/input/csv/orders.csv");
        System.out.println("Printing csv content..");
        df.show();

        df.createOrReplaceTempView("orders_view");
        try(JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext())) {
            Dataset<Row> newDf = spark.createDataFrame(
                    jsc.parallelize(
                            Collections.singletonList(RowFactory.create("V", "S", "TN", 10, 20, 1551904299))
                    ),
                    df.schema()
            );
            newDf.createOrReplaceTempView("new_order_view");

            System.out.println("Inserting data into orders_view view from new_order_view view...");
            Dataset<Row> mergedDf = spark.sql("INSERT INTO orders_view SELECT * FROM new_order_view");
            mergedDf.show();
            System.out.println("Completed...");
        }
    }
}

Exception - Stack Trace

Printing csv content..
23/11/20 01:38:10 INFO FileSourceStrategy: Pushed Filters: 
23/11/20 01:38:10 INFO FileSourceStrategy: Post-Scan Filters: 
23/11/20 01:38:10 INFO FileSourceStrategy: Output Data Schema: struct<firstName: string, lastName: string, state: string, quantity: int, revenue: int ... 1 more field>
23/11/20 01:38:10 INFO CodeGenerator: Code generated in 22.283375 ms
23/11/20 01:38:10 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 302.5 KiB, free 2003.6 MiB)
23/11/20 01:38:10 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 27.3 KiB, free 2003.6 MiB)
23/11/20 01:38:10 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 192.168.0.6:49627 (size: 27.3 KiB, free: 2004.5 MiB)
23/11/20 01:38:10 INFO SparkContext: Created broadcast 4 from show at InsertApp.java:27
23/11/20 01:38:10 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
23/11/20 01:38:10 INFO SparkContext: Starting job: show at InsertApp.java:27
23/11/20 01:38:10 INFO DAGScheduler: Got job 2 (show at InsertApp.java:27) with 1 output partitions
23/11/20 01:38:10 INFO DAGScheduler: Final stage: ResultStage 2 (show at InsertApp.java:27)
23/11/20 01:38:10 INFO DAGScheduler: Parents of final stage: List()
23/11/20 01:38:10 INFO DAGScheduler: Missing parents: List()
23/11/20 01:38:10 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[13] at show at InsertApp.java:27), which has no missing parents
23/11/20 01:38:10 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 14.4 KiB, free 2003.6 MiB)
23/11/20 01:38:10 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 6.8 KiB, free 2003.6 MiB)
23/11/20 01:38:10 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on 192.168.0.6:49627 (size: 6.8 KiB, free: 2004.5 MiB)
23/11/20 01:38:10 INFO SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1388
23/11/20 01:38:10 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[13] at show at InsertApp.java:27) (first 15 tasks are for partitions Vector(0))
23/11/20 01:38:10 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks resource profile 0
23/11/20 01:38:10 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2) (192.168.0.6, executor driver, partition 0, NODE_LOCAL, 4877 bytes) taskResourceAssignments Map()
23/11/20 01:38:10 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
23/11/20 01:38:10 INFO BlockManagerInfo: Removed broadcast_3_piece0 on 192.168.0.6:49627 in memory (size: 7.7 KiB, free: 2004.5 MiB)
23/11/20 01:38:10 INFO FileScanRDD: Reading File path: hdfs://localhost:8020/tmp/spark/input/csv/orders.csv, range: 0-268, partition values: [empty row]
23/11/20 01:38:10 INFO CodeGenerator: Code generated in 19.008744 ms
23/11/20 01:38:10 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 1789 bytes result sent to driver
23/11/20 01:38:10 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 115 ms on 192.168.0.6 (executor driver) (1/1)
23/11/20 01:38:10 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
23/11/20 01:38:10 INFO DAGScheduler: ResultStage 2 (show at InsertApp.java:27) finished in 0.141 s
23/11/20 01:38:10 INFO DAGScheduler: Job 2 is finished. Cancelling potential speculative or zombie tasks for this job
23/11/20 01:38:10 INFO TaskSchedulerImpl: Killing all running tasks in stage 2: Stage finished
23/11/20 01:38:10 INFO DAGScheduler: Job 2 finished: show at InsertApp.java:27, took 0.150319 s
23/11/20 01:38:10 INFO CodeGenerator: Code generated in 40.368014 ms
+------------+--------+-----+--------+-------+----------+
|   firstName|lastName|state|quantity|revenue| timestamp|
+------------+--------+-----+--------+-------+----------+
|Jean Georges|  Perrin|   NC|       1|    300|1551903533|
|Jean Georges|  Perrin|   NC|       2|    120|1551903567|
|Jean Georges|  Perrin|   CA|       4|     75|1551903599|
|      Holden|   Karau|   CA|       6|     37|1551904299|
|       Ginni| Rometty|   NY|       7|     91|1551916792|
|      Holden|   Karau|   CA|       4|    153|1552876129|
+------------+--------+-----+--------+-------+----------+

Inserting data into orders_view view from new_order_view view...
23/11/20 01:38:10 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
23/11/20 01:38:10 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
23/11/20 01:38:10 INFO SQLHadoopMapReduceCommitProtocol: Using output committer class org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
23/11/20 01:38:10 INFO SparkUI: Stopped Spark web UI at http://192.168.0.6:4040
23/11/20 01:38:10 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/11/20 01:38:10 INFO MemoryStore: MemoryStore cleared
23/11/20 01:38:10 INFO BlockManager: BlockManager stopped
23/11/20 01:38:10 INFO BlockManagerMaster: BlockManagerMaster stopped
23/11/20 01:38:10 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/11/20 01:38:10 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" org.apache.hadoop.fs.ParentNotDirectoryException: /tmp/spark/input/csv/orders.csv (is not a directory)
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkIsDirectory(FSPermissionChecker.java:745)
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkSimpleTraverse(FSPermissionChecker.java:736)
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkTraverse(FSPermissionChecker.java:711)
    at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkTraverse(FSDirectory.java:1892)
    at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkTraverse(FSDirectory.java:1910)
    at org.apache.hadoop.hdfs.server.namenode.FSDirectory.resolvePath(FSDirectory.java:727)
    at org.apache.hadoop.hdfs.server.namenode.FSDirMkdirOp.mkdirs(FSDirMkdirOp.java:51)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:3441)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:1167)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:742)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:621)
    at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:589)
    at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:573)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1227)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1094)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1017)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3048)

    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121)
    at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88)
    at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2426)
    at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:2400)
    at org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1324)
    at org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1321)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:1338)
    at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:1313)
    at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2275)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:354)
    at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:178)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:173)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:120)
    at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:228)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:228)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
    at org.example.ch11.InsertApp.append(InsertApp.java:40)
    at org.example.ch11.InsertApp.main(InsertApp.java:15)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.fs.ParentNotDirectoryException): /tmp/spark/input/csv/orders.csv (is not a directory)
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkIsDirectory(FSPermissionChecker.java:745)
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkSimpleTraverse(FSPermissionChecker.java:736)
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkTraverse(FSPermissionChecker.java:711)
    at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkTraverse(FSDirectory.java:1892)
    at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkTraverse(FSDirectory.java:1910)
    at org.apache.hadoop.hdfs.server.namenode.FSDirectory.resolvePath(FSDirectory.java:727)
    at org.apache.hadoop.hdfs.server.namenode.FSDirMkdirOp.mkdirs(FSDirMkdirOp.java:51)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:3441)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:1167)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:742)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:621)
    at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:589)
    at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:573)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1227)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1094)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1017)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3048)

    at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1511)
    at org.apache.hadoop.ipc.Client.call(Client.java:1457)
    at org.apache.hadoop.ipc.Client.call(Client.java:1367)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:228)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
    at com.sun.proxy.$Proxy18.mkdirs(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:656)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
    at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
    at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
    at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
    at com.sun.proxy.$Proxy19.mkdirs(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2424)
    ... 31 more
23/11/20 01:38:11 INFO ShutdownHookManager: Shutdown hook called
23/11/20 01:38:11 INFO ShutdownHookManager: Deleting directory /private/var/folders/6w/v2bcqfkd6kl5ylt9k5_q0lb80000gn/T/spark-cf226489-3921-439a-9653-b33159d3a230

Process finished with exit code 1

Solution

  • Update your logic to this:

    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SparkSession;
    
    import java.util.Collections;
    
    public class InsertApp {
    
        public static void main(String[] args) {
            InsertApp inApp = new InsertApp();
            inApp.append();
        }
    
        private void append() {
            SparkSession spark = SparkSession.builder()
                    .appName("InsertProcessor")
                    .master("local[*]").getOrCreate();
            Dataset<Row> df = spark.read()
                    .option("inferSchema", true)
                    .option("header", true)
                    .csv("hdfs://localhost:8020/tmp/spark/input/csv/orders.csv");
    
            df.createOrReplaceTempView("order_view");
            try(JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext())) {
                Dataset<Row> newDf = spark.createDataFrame(
                        jsc.parallelize(
                                Collections.singletonList(RowFactory.create("V", "S", "TN", 10, 20, 1551904299))
                        ),
                        df.schema()
                );
                newDf.createOrReplaceTempView("new_order_view");
                Dataset<Row> df1 = spark.sql("SELECT * from order_view");
                System.out.println("Data From Order View:");
                df1.show();
                Dataset<Row> df2 = spark.sql("SELECT * from new_order_view");
                System.out.println("Data From New Order View:");
                df2.show();
                Dataset<Row> mergedDf = df1.union(df2).distinct();
                mergedDf.createOrReplaceTempView("order_view");
                Dataset<Row> df3 = spark.sql("SELECT * from order_view");
                System.out.println("Data From Order View After Merging:");
                df3.show();
                df3.write().mode(SaveMode.Overwrite)
                    .option("inferSchema", true)
                    .option("header", true).csv("hdfs://localhost:8020/tmp/spark/input/csv/orders.csv");
            }
        }
    }
    

    Output :

    Data From Order View:
    +------------+--------+-----+--------+-------+----------+
    |   firstName|lastName|state|quantity|revenue| timestamp|
    +------------+--------+-----+--------+-------+----------+
    |Jean Georges|  Perrin|   NC|       1|    300|1551903533|
    |Jean Georges|  Perrin|   NC|       2|    120|1551903567|
    |Jean Georges|  Perrin|   CA|       4|     75|1551903599|
    |      Holden|   Karau|   CA|       6|     37|1551904299|
    |       Ginni| Rometty|   NY|       7|     91|1551916792|
    |      Holden|   Karau|   CA|       4|    153|1552876129|
    +------------+--------+-----+--------+-------+----------+
    
    Data From New Order View:
    +---------+--------+-----+--------+-------+----------+
    |firstName|lastName|state|quantity|revenue| timestamp|
    +---------+--------+-----+--------+-------+----------+
    |        V|       S|   TN|      10|     20|1551904299|
    +---------+--------+-----+--------+-------+----------+
    
    Data From Order View After Merging:
    +------------+--------+-----+--------+-------+----------+
    |   firstName|lastName|state|quantity|revenue| timestamp|
    +------------+--------+-----+--------+-------+----------+
    |Jean Georges|  Perrin|   CA|       4|     75|1551903599|
    |       Ginni| Rometty|   NY|       7|     91|1551916792|
    |Jean Georges|  Perrin|   NC|       1|    300|1551903533|
    |      Holden|   Karau|   CA|       4|    153|1552876129|
    |Jean Georges|  Perrin|   NC|       2|    120|1551903567|
    |      Holden|   Karau|   CA|       6|     37|1551904299|
    |           V|       S|   TN|      10|     20|1551904299|
    +------------+--------+-----+--------+-------+----------+
    

    Few points to mention:

    Hadoop UI Output (containing new merged data):

    enter image description here

    enter image description here

    I guess this will be an easier way to approach. See if this helps.