pythonscalaapache-sparklatitude-longitudedata-engineering

Convert Easting Northing Coordinates to Latitude and Longitude in Scala/Spark with EPSG Coordinate Transformation


I’m working on a project in Scala and Apache Spark where I need to convert coordinates from Easting/Northing (EPSG:27700) to Latitude/Longitude (EPSG:4326). I have a Python script that uses in built libraries pyproj (transformer) to handle this, but I haven’t found an equivalent way to do it in Scala/Spark.

Here’s the Python code I’m currently using:

    from pyproj import Transformer
    import pandas as pd

    data = {
        'node_id': ['94489', '94555', '94806', '99118', '76202'],
        'easting': [276164, 428790, 357501, 439545, 357353],
        'northing': [84185, 92790, 173246, 336877, 170708]
    }

    df = pd.DataFrame(data)
    transformer = Transformer.from_crs("epsg:27700", "epsg:4326")
    lat, lon = transformer.transform(df['easting'].values, df['northing'].values)
    df['longitude'] = lon
    df['latitude'] = lat
    print(df)

The output DataFrame should looks like this:

node_id easting northing longitude latitude
94489 276164 84185 -3.752811 50.644154
94555 428790 92790 -1.593413 50.734016
94806 357501 173246 -2.613059 51.456587
99118 439545 336877 -1.413188 52.927852
76202 357353 170708 -2.614883 51.433757

Solution

  • @MartinHH Thanks for providing the reference. Looks like it is possible to create same code in scala/spark

    package com.test.job.function_testing
    
    import org.apache.spark.sql.SparkSession
    import geotrellis.proj4.CRS
    import geotrellis.proj4.Transform
    
    object TestCode {
    
      def main(args: Array[String]) = {
    
        val runLocally = true
        val jobName = "Test Spark Logging Case"
    
        implicit val spark: SparkSession = Some(SparkSession.builder.appName(jobName))
          .map(sparkSessionBuilder =>
            if (runLocally) sparkSessionBuilder.master("local[2]") else sparkSessionBuilder
          )
          .map(_.getOrCreate())
          .get
    
        import spark.implicits._
        val columns = Seq("node_id", "easting", "northing")
        val data = Seq(
          (94489, 276164, 84185),
          (94555, 428790, 92790),
          (94806, 357501, 173246),
          (99118, 439545, 336877),
          (76202, 357353, 170708))
    
        val df = data.toDF(columns: _*)
    
        val eastingNorthing = CRS.fromEpsgCode(27700)
        val latLong = CRS.fromEpsgCode(4326)
        val transform = Transform(eastingNorthing, latLong)
        import org.apache.spark.sql.functions._
    
        def transformlatlong = udf((easting: Int, northing: Int) => {
          val (long, lat) = transform(easting, northing)
          (long,lat)
        }
        )
    
        val newdf = df.withColumn("latlong",transformlatlong(df("easting"),df("northing")))
        newdf.select(col("node_id"),col("easting"),col("northing"),col("latlong._1").as("longitude"),col("latlong._2").as("latitude")).show()
    
      }
    
    }
    

    And here is the output dataframe

    +-------+-------+--------+-------------------+------------------+
    |node_id|easting|northing|          longitude|          latitude|
    +-------+-------+--------+-------------------+------------------+
    |  94489| 276164|   84185| -3.752810925839862|50.644154475886154|
    |  94555| 428790|   92790|-1.5934125598396651| 50.73401609723385|
    |  94806| 357501|  173246|-2.6130593045676984| 51.45658738605824|
    |  99118| 439545|  336877| -1.413187622652739| 52.92785156624134|
    |  76202| 357353|  170708| -2.614882589162872| 51.43375699275326|
    +-------+-------+--------+-------------------+------------------+
    

    And added below library to the build.sbt

    libraryDependencies += "org.locationtech.geotrellis" %% "geotrellis-raster" % "3.5.2"