apache-sparkencryptionspark-streamingemramazon-emr

EMR with multiple encryption key providers


I'm running EMR cluster with enabled s3 client-side encryption using custom key provider. But now I need to write data to multiple s3 destinations using different encryption schemas:

  1. CSE custom key provider
  2. CSE-KMS

Is it possible to configure EMR to use both encryption types by defining some kind of mapping between s3 bucket and encryption type?

Alternatively since I use spark structured streaming to process and write data to s3 I'm wondering if it's possible to disable encryption on EMRFS but then enable CSE for each stream separately?


Solution

  • The idea is to support any file systems scheme and configure it individually. For example:

    # custom encryption key provider
    fs.s3x.cse.enabled = true
    fs.s3x.cse.materialsDescription.enabled = true
    fs.s3x.cse.encryptionMaterialsProvider = my.company.fs.encryption.CustomKeyProvider
    
    #no encryption
    fs.s3u.cse.enabled = false
    
    #AWS KMS
    fs.s3k.cse.enabled = true
    fs.s3k.cse.encryptionMaterialsProvider = com.amazon.ws.emr.hadoop.fs.cse.KMSEncryptionMaterialsProvider
    fs.s3k.cse.kms.keyId = some-kms-id
    

    And then to use it in spark like this:

    StreamingQuery writeStream = session
            .readStream()
            .schema(RecordSchema.fromClass(TestRecord.class))
            .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
            .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
            .csv(“s3x://aws-s3-bucket/input”)
            .as(Encoders.bean(TestRecord.class))
            .writeStream()
            .outputMode(OutputMode.Append())
            .format("parquet")
            .option("path", “s3k://aws-s3-bucket/output”)
            .option("checkpointLocation", “s3u://aws-s3-bucket/checkpointing”)
            .start();
    

    Ta handle this I’ve implemented a custom Hadoop file system (extends org.apache.hadoop.fs.FileSystem) that delegates calls to real file system but with modified configurations.

    // Create delegate FS
    this.config.set("fs.s3n.impl", “com.amazon.ws.emr.hadoop.fs.EmrFileSystem”);
    this.config.set("fs.s3n.impl.disable.cache", Boolean.toString(true));
    this.delegatingFs = FileSystem.get(s3nURI(originalUri, SCHEME_S3N), substituteS3Config(conf));
    

    Configuration that passes to delegating file system should take all original settings and replace any occurrences of fs.s3*. with fs.s3n..

    private Configuration substituteS3Config(final Configuration conf) {
        if (conf == null) return null;
    
        final String fsSchemaPrefix = "fs." + getScheme() + ".";
        final String fsS3SchemaPrefix = "fs.s3.";
        final String fsSchemaImpl = "fs." + getScheme() + ".impl";
        Configuration substitutedConfig = new Configuration(conf);
        for (Map.Entry<String, String> configEntry : conf) {
            String propName = configEntry.getKey();
            if (!fsSchemaImpl.equals(propName)
                && propName.startsWith(fsSchemaPrefix)) {
                final String newPropName = propName.replace(fsSchemaPrefix, fsS3SchemaPrefix);
                LOG.info("Substituting property '{}' with '{}'", propName, newPropName);
                substitutedConfig.set(newPropName, configEntry.getValue());
            }
        }
    
        return substitutedConfig;
    }
    

    Besides that make sure that delegating fs receives uris and paths with supporting scheme and returns paths with custom scheme

    @Override
    public FileStatus getFileStatus(final Path f) throws IOException {
        FileStatus status = this.delegatingFs.getFileStatus(s3Path(f));
        if (status != null) {
            status.setPath(customS3Path(status.getPath()));
        }
        return status;
    }
    
    private Path s3Path(final Path p) {
        if (p.toUri() != null && getScheme().equals(p.toUri().getScheme())) {
            return new Path(s3nURI(p.toUri(), SCHEME_S3N));
        }
        return p;
    }
    
    private Path customS3Path(final Path p) {
        if (p.toUri() != null && !getScheme().equals(p.toUri().getScheme())) {
            return new Path(s3nURI(p.toUri(), getScheme()));
        }
        return p;
    }
    
    private URI s3nURI(final URI originalUri, final String newScheme) {
         try {
             return new URI(
                 newScheme,
                 originalUri.getUserInfo(),
                 originalUri.getHost(),
                 originalUri.getPort(),
                 originalUri.getPath(),
                 originalUri.getQuery(),
                 originalUri.getFragment());
         } catch (URISyntaxException e) {
             LOG.warn("Unable to convert URI {} to {} scheme", originalUri, newScheme);
         }
    
         return originalUri;
    }
    

    The final step is to register custom file system with Hadoop (spark-defaults classification)

    spark.hadoop.fs.s3x.impl = my.company.fs.DynamicS3FileSystem
    spark.hadoop.fs.s3u.impl = my.company.fs.DynamicS3FileSystem
    spark.hadoop.fs.s3k.impl = my.company.fs.DynamicS3FileSystem