We took the below approach where everything was running on the driver and could not utilize the spark cluster's parallel processing across nodes (as expected, performance was real bad)
1) Break the main DF into multiple dataframes, placed in an array :
val securityIds = allocStage1DF.select("ALLOCONEGROUP").distinct.collect.flatMap(_.toSeq)
val bySecurityArray = securityIds.map(securityIds => allocStage1DF.where($"ALLOCONEGROUP" <=> securityIds))
2) Loop thru the dataframe and pass to a method to process, row-by-row from above dataframe:
df.coalesce(1).sort($"PRIORITY" asc).collect().foreach({
row => AllocOneOutput.allocOneOutput(row)}
)
What we are looking for is a combination of parallel and sequential processing.
Parallel processing at group level. because, these are all independent groups and can be parallelized.
With in each group, rows have to be processed one after the other in a sequence which is very important for our use case.
Apply grouping on SECURITY_ID,CC,BU,MPU which gives us 2 groups from above (SECID_1,CC_A,BU_A,MPU_A and SECID_2,CC_A,BU_A,MPU_A).
with the help of a priority matrix ( nothing but a ref table for assinging rank to rows), we transpose each group into below :
Each row in the above group has a priority and are sorted in that order. Now, I want to process each row one after the other by passing them to a function and get an output like below :
Detailed Explanation of usecase :
Above points are described using the dataset below.
Base DataFrame
+-------------+----------+----------+------
ROW_NO|SECURITY_ID| ACCOUNT|POSITION|
+-------------+----------+----------+------
1| secId| Acc1| +100|
2| secId| Acc2| -150|
3| secId| Acc3| -25|
4| secId2| Acc3| -25|
5| secId2| Acc3| -25|
Base data frame is divided based on group by securityID. Let us use secId group as below
+-------------+----------+----------+------
ROW_NO|SECURITY_ID| ACCOUNT|POSITION|
+-------------+----------+----------+------
1| secId| Acc1| +100|
2| secId| Acc2| -150|
3| secId| Acc3| -25|
In the above case positive position of 100 can be paired with either -50 or -25. In order to break the tie, the following ref table called priority matrix helps by defining the order.
+-------------+----------+----------+------
+vePositionAccount|-vePositionAccount| RANK
+-------------+----------+----------+------
Acc1| Acc3| 1|
Acc1| Acc2| 2|
so, from above matrix we know that rowNo 1 and 3 will be paired first and then rowNo 1 and 2. This is the order (sequential processing) that we are talking about. Lets pair them now as below :
+-------------+----------+----------+------+-------------+----------+----------+------
+veROW_NO|+veSECURITY_ID| +veACCOUNT|+vePOSITION|+veROW_NO|+veSECURITY_ID| +veACCOUNT|+vePOSITION|
+-------------+----------+----------+------+-------------+----------+----------+------
1| secId| Acc1| +100| 3| secId| Acc3| -25|
1| secId| Acc1| +100| 2| secId| Acc2| -150|
What happens when row 1 is processed after row 2 ? (this is what we need)
1.After processing row 1 - Position in Acc1 will be (100 - 25) = 75 Position in Acc3 will be 0. The updated position in Acc1 which is 75 will be now used in processing second row.
2.After processing row 2 - Position in Acc1 will be 0 . Position in Acc2 will be (75-150) -75.
Result dataframe:
+-------------+----------+----------+------
ROW_NO|SECURITY_ID| ACCOUNT|POSITION|
+-------------+----------+----------+------
1| secId| Acc1| 0|
2| secId| Acc2| -75|
3| secId| Acc3| 0|
What happens when row 2 is processed after row 1 ? (we dont want this)
Result dataframe:
+-------------+----------+----------+------
ROW_NO|SECURITY_ID| ACCOUNT|POSITION|
+-------------+----------+----------+------
1| secId| Acc1| 0|
2| secId| Acc2| -50|
3| secId| Acc3| -25|
As you see above, the order of processing with in a group determines our output
I also wanted to ask - why does not spark support sequential processing with in a section of dataframe ? we are saying that we need parallel processing capability of the cluster. That is why we are dividing the data frame into groups and asking the cluster to apply the logic on these groups in parallel. All we are saying is if the group has lets say 100 rows, then let these 100 rows are processed 1 after other in an order. Is this not supported by spark ?
If it is not, then what other technology in big data can help acheive that ?
Alternate Implementation:
I doubt if thousands of partitions is any good, but yet would like to know if the approach is sounding good.
The concept works well enough until this rule:
- Second reason is that, when a given long quantity and short quantity are not equal then the residual quantity is eligible for pairing. i.e., if long quantity is left, then it can be paired with the next short quantity available in the group as per the priority or vice versa.
This is because you want iteration, looping with dependencies logic, that is difficult to code with Spark which is more flow-oriented.
I also worked on a project where everything was stated - do it in Big Data with Spark, scala or pyspark. Being an architect as well as coder, I looked at the algorithm for something similar to your area, but not quite, in which for commodities all the periods for a set of data points needed to be classified as bull, bear, or not. Like your algorithm, but still different, I did not know what amount of looping to do up-front. In fact I needed to do something, then decide to repeat that something to the left and to the right of a period I had marked as either bull or bear or nothing, potentially. Termination conditions were required. See picture below. Sort of like a 'flat' binary tree traversal until all paths exhausted. Not that Spark-ish.
I actually - for academic purposes - solved my specific situation in Spark, but it was an academic exercise. The point to the matter is that this type of processing - my example and your example are a poor fit for Spark. We did these calculation in ORACLE and simply sqooped the results to Hadoop datastore.
My advice is therefore that you not try this in Spark, as it does not fit the use cases well enough. Trust me, it gets messy. To be honest, it was soon apparent to me that this type of processing was an issue. But when starting out, it is a common aspect to query.