I'm new in Java. I want to drop partition in hiveTable. I want to use SparkSession.ExternalCatalog().listPartitions
and SparkSession.ExternalCatalog().dropPartitions
.
I saw this methods on scala How to truncate data and drop all partitions from a Hive table using Spark But I can't understand how to run it on Java. It's a part of etl process and I want to understand how to deal with it on Java.
My code failed because of the misunderstanding how to manipulate with datatypes and convert it to java. What type of object need and how to understand what data return API.
Example of my code:
ExternalCatalog ec = SparkSessionFactory.getSparkSession.sharedState().externalCatalog();
ec.listPartitions("need_schema", "need_table");
And it fails because of the:
method listPartitions in class org.apache.spark.sql.catalog.ExternalCatalog cannot be applied to given types.
I can't beat it because of less information about api (https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-ExternalCatalog.html#listPartitions) and java knowledges and because the all examples I find wrote on scala. Finally I need to convert this code that works on scala to java:
def dropPartitions(spark:SparkSession, shema:String, table:String, need_date:String):Unit = {
val cat = spark.sharedState.externalCatalog
val partit = cat.ListPartitions(shema,table).map(_.spec).map(t => t.get("partition_field")).flatten
val filteredPartit = partita.filter(_<dt).map(x => Map("partition_field" -> x))
cat.dropPartitions(
shema
,table
,filteredPartitions
,ignoreIfNotExists=true
,purge=false
,retainData=false
}
Please, if you know how deal with it can you help in this things:
UPDATING Thank you very much for your feedback @jpg. I'll try. I have big etl task and goal of it to writing into dynamic partitioned table data once a week. Business rules of making this datamart: (sysdate - 90 days). And because of that I want to drop arrays of partition (by days) in target table in public access schema. And I have read that the right way of drop partition - using externalCatalog. I should use java because of the historical tradition this project) and try to understand how to do this most efficiently. Some of methods of externalCatalog I can return into terminal through System.out.println(): externalCatalog.tableExists(), externalCatalog.listTables() and methods of externalCatalog.getTable. But I don't understand how to deal with externalCatalog.listPartitions.
UPDATING ONE MORE TIME Hello everyone. I have one step forward in my task: Now I can return in terminal buffer of list partitions:
ExternalCatalog ec = SparkSessionFactory.getSparkSession.sharedState().externalCatalog();
ec.listPartitions("schema", "table", Option.empty()); // work! null or miss parameter fail program
Seq<CatalogTablePartition> ctp = ec.listPartitions("schema", "table", Option.empty());
List<CatalogTablePartition> catalogTablePartitions = JavaConverters.seqAsJavaListConverter(ctp).asJava();
for CatalogTablePartition catalogTablePartition: catalogTablePartitions) {
System.out.println(catalogTablePartition.toLinkedHashMap().get("Partition Values"));//retutn me value of partition like "Some([validation_date=2021-07-01])"
)
But this is another problem. I can return values in this api in method ec.dropPartitions like Java List. It's want in 3d parameter Seq<Map<String, String>> structure. Also I can't filtered partition in this case - in my dreams I want filtered the values of partition less by date parameter and then drop it. If anyone know how to wrote map method with this api to return it like in my scala example please help me.
I solved it by myself. Maybe it'll help someone.
public static void partitionDeleteLessDate(String db_name, String table_name, String date_less_delete) {
ExternalCatalog ec = SparkSessionFactory.getSparkSession.sharedState().externalCatalog();
Seq<CatalogTablePartition> ctp = ec.listPartitions(db_name, table_name, Option.empty());
List<CatalogTablePartition> catalogTablePartitions = JavaConverters.seqAsJavaListConverter(ctp).asJava();
List<Map<String, String>> allPartList = catalogTablePartitions.stream.
.map(s -> s.spec.seq())
.collect(Collectors.toList());
List<String> datePartDel =
allPartList.stream()
.map(x -> x.get("partition_name").get())
.sorted()
.collect(Collectors.toList());
String lessThisDateDelete = date_less_delete;
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
LocalDate date = LocalDate.parse(lessThisDateDelete, formatter);
List<String> filteredDates = datePartDel.stream()
.map(s -> LocalDate.parse(s, formatter))
.filter(d -> d.isBefore(date))
.map(s -> s.toString())
.collect(Collectors.toList());
for (String seeDate : filteredDates)) {
List<Map<String, String>> elem = allPartList.stream()
.filter(x -> x.get("partition_name").get().equals(seeDate))
.collect(Collectors.toList());
Seq<Map<String, String>> seqElem = JavaConverters.asScalaIteratorConverter(elem.iterator()).asScala.toSeq();
ec.dropPartitions(
db_name
, table_name
, seqElem
, true
, false
, false
);
}
}