So I noticed that when calling repartitionByCassandraReplica().JoinWIthCassandraTable()
gives me a different Input size in the Stages tab of SparkUI comparing to the one I get when the DirectJoin
is always On. I know that these two follow different strategies of determining the Spark partitions:
When calling repartitionByCassandraReplica()
, the number of Spark partitions is determined by partitionsPerHost
. Otherwise, the connector will use the estimated table size. Nevertheless, as per documentation, both use DirectJoin
and do not perform a full scan of a Cassandra table.
In my case:
With DirectJoin
always On I get 36.9Gb size in the Input column and it takes 4.5 minutes for a Join and count. However, with repartitionByCassandraReplica().JoinWIthCassandraTable()
on the same data I get 68.9Gb (almost double) in 3.4 minutes.
Question 1
How is the Input Column of Stages Tab calculated for each of these two Join strategies? Does the DirectJoinAlwaysOn
uses the size of the estimated table size for input column and the repartitionByCassandraReplica.JoinWIthCassandraTable()
the actual/precise size of the table?
Question 2
Why does repartitionByCassandraReplica.JoinWIthCassandraTable()
take less time even if it has a bigger Input size? Is it just because of data locality?
Question 3
Finally, is the repartitionByCassandraReplica().JoinWIthCassandraTable()
eventually affected by the size of the Cassandra table? Is the DirectJoin
in these two different strategies a bit different (other than the how are the Spark partitions calculated) ?
The input size is a derivative of the previous stage.
To answer your first question, the Direct Join setting has no bearing on how the Spark partitions are computed. What matters is whether you call repartitionByCassandraReplica()
or not.
I've explained in your previous question (What happens with Spark partitions when using Spark-Cassandra-Connector) that the Spark partitions are computed differently by the Spark Cassandra connector depending on the APIs you're using. To summarise:
repartitionByCassandraReplica()
gets called, the number of Spark partitions are determined by both partitionsPerHost
and the number of Cassandra nodes in the local DC.input.split.size_in_mb
to determine the number of Spark partitions based on the estimated table size.Given that the number of Spark partitions widely differ between these two schemes, the resulting output size (data read) will widely differ too because the Cassandra token range(s) which get mapped to each Spark partition is also going to be different -- it's not an apples-for-apples comparison.
As a side note, I'd like to make a friendly request that you should limit to one question per post, particularly since your second and third questions are different from the original question. Cheers!