I'm running EMR cluster with enabled s3 client-side encryption using custom key provider. But now I need to write data to multiple s3 destinations using different encryption schemas:
Is it possible to configure EMR to use both encryption types by defining some kind of mapping between s3 bucket and encryption type?
Alternatively since I use spark structured streaming to process and write data to s3 I'm wondering if it's possible to disable encryption on EMRFS but then enable CSE for each stream separately?
The idea is to support any file systems scheme and configure it individually. For example:
# custom encryption key provider
fs.s3x.cse.enabled = true
fs.s3x.cse.materialsDescription.enabled = true
fs.s3x.cse.encryptionMaterialsProvider = my.company.fs.encryption.CustomKeyProvider
#no encryption
fs.s3u.cse.enabled = false
#AWS KMS
fs.s3k.cse.enabled = true
fs.s3k.cse.encryptionMaterialsProvider = com.amazon.ws.emr.hadoop.fs.cse.KMSEncryptionMaterialsProvider
fs.s3k.cse.kms.keyId = some-kms-id
And then to use it in spark like this:
StreamingQuery writeStream = session
.readStream()
.schema(RecordSchema.fromClass(TestRecord.class))
.option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
.option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
.csv(“s3x://aws-s3-bucket/input”)
.as(Encoders.bean(TestRecord.class))
.writeStream()
.outputMode(OutputMode.Append())
.format("parquet")
.option("path", “s3k://aws-s3-bucket/output”)
.option("checkpointLocation", “s3u://aws-s3-bucket/checkpointing”)
.start();
Ta handle this I’ve implemented a custom Hadoop file system (extends org.apache.hadoop.fs.FileSystem
) that delegates calls to real file system but with modified configurations.
// Create delegate FS
this.config.set("fs.s3n.impl", “com.amazon.ws.emr.hadoop.fs.EmrFileSystem”);
this.config.set("fs.s3n.impl.disable.cache", Boolean.toString(true));
this.delegatingFs = FileSystem.get(s3nURI(originalUri, SCHEME_S3N), substituteS3Config(conf));
Configuration that passes to delegating file system should take all original settings and replace any occurrences of fs.s3*.
with fs.s3n.
.
private Configuration substituteS3Config(final Configuration conf) {
if (conf == null) return null;
final String fsSchemaPrefix = "fs." + getScheme() + ".";
final String fsS3SchemaPrefix = "fs.s3.";
final String fsSchemaImpl = "fs." + getScheme() + ".impl";
Configuration substitutedConfig = new Configuration(conf);
for (Map.Entry<String, String> configEntry : conf) {
String propName = configEntry.getKey();
if (!fsSchemaImpl.equals(propName)
&& propName.startsWith(fsSchemaPrefix)) {
final String newPropName = propName.replace(fsSchemaPrefix, fsS3SchemaPrefix);
LOG.info("Substituting property '{}' with '{}'", propName, newPropName);
substitutedConfig.set(newPropName, configEntry.getValue());
}
}
return substitutedConfig;
}
Besides that make sure that delegating fs receives uris and paths with supporting scheme and returns paths with custom scheme
@Override
public FileStatus getFileStatus(final Path f) throws IOException {
FileStatus status = this.delegatingFs.getFileStatus(s3Path(f));
if (status != null) {
status.setPath(customS3Path(status.getPath()));
}
return status;
}
private Path s3Path(final Path p) {
if (p.toUri() != null && getScheme().equals(p.toUri().getScheme())) {
return new Path(s3nURI(p.toUri(), SCHEME_S3N));
}
return p;
}
private Path customS3Path(final Path p) {
if (p.toUri() != null && !getScheme().equals(p.toUri().getScheme())) {
return new Path(s3nURI(p.toUri(), getScheme()));
}
return p;
}
private URI s3nURI(final URI originalUri, final String newScheme) {
try {
return new URI(
newScheme,
originalUri.getUserInfo(),
originalUri.getHost(),
originalUri.getPort(),
originalUri.getPath(),
originalUri.getQuery(),
originalUri.getFragment());
} catch (URISyntaxException e) {
LOG.warn("Unable to convert URI {} to {} scheme", originalUri, newScheme);
}
return originalUri;
}
The final step is to register custom file system with Hadoop (spark-defaults
classification)
spark.hadoop.fs.s3x.impl = my.company.fs.DynamicS3FileSystem
spark.hadoop.fs.s3u.impl = my.company.fs.DynamicS3FileSystem
spark.hadoop.fs.s3k.impl = my.company.fs.DynamicS3FileSystem