I am trying to run a pyhon job using apache flink 1.15.2 as per the repo https://github.com/aws-samples/pyflink-getting-started.git
The repo lists 4 steps I am stuck in first step which is 1) Local development using Pyflink
I have done all the pre-requisites , like installing conda virtual env and installing the required flink version that is apache flink 1.15.2
Following the steps of the https://github.com/aws-samples/pyflink-getting-started/tree/main/getting-started
I have also download and saved the .jar file (kinesis connector in subdir lib of current directory)
import datetime
import json
import random
import boto3
STREAM_NAME = "ExampleInputStream"
STREAM_REGION = "eu-west-2"
def get_data():
return {
'event_time': datetime.datetime.now().isoformat(),
'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
'price': round(random.random() * 100, 2) }
def generate(stream_name, kinesis_client):
while True:
data = get_data()
print(data)
kinesis_client.put_record(
StreamName=stream_name,
Data=json.dumps(data),
PartitionKey="partitionkey")
if __name__ == '__main__':
generate(STREAM_NAME, boto3.client('kinesis', region_name=STREAM_REGION))
# -*- coding: utf-8 -*-
"""
getting-started.py
~~~~~~~~~~~~~~~~~~~
This module:
1. Creates a table environment
2. Creates a source table from a Kinesis Data Stream
3. Creates a sink table writing to a Kinesis Data Stream
4. Inserts the source table data into the sink table
"""
from pyflink.table import EnvironmentSettings, TableEnvironment
import os
import json
# 1. Creates a Table Environment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
statement_set = table_env.create_statement_set()
# print(os.environ.get("IS_LOCAL"))
APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json" # on kda
is_local = (
True if os.environ.get("IS_LOCAL") else False
) # set this env var in your local environment
if is_local:
# only for local, overwrite variable to properties and pass in your jars delimited by a semicolon (;)
APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json" # local
CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
table_env.get_config().get_configuration().set_string(
"pipeline.jars",
"file:///" + CURRENT_DIR + "/lib/flink-sql-connector-kinesis-1.15.2.jar",
# "file:///" + "C:/lib/flink-sql-connector-kinesis-1.15.2.jar",
)
def get_application_properties():
if os.path.isfile(APPLICATION_PROPERTIES_FILE_PATH):
with open(APPLICATION_PROPERTIES_FILE_PATH, "r") as file:
contents = file.read()
properties = json.loads(contents)
return properties
else:
print('A file at "{}" was not found'.format(APPLICATION_PROPERTIES_FILE_PATH))
def property_map(props, property_group_id):
for prop in props:
if prop["PropertyGroupId"] == property_group_id:
return prop["PropertyMap"]
def create_source_table(table_name, stream_name, region, stream_initpos):
return """ CREATE TABLE {0} (
ticker VARCHAR(6),
price DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
PARTITIONED BY (ticker)
WITH (
'connector' = 'kinesis',
'stream' = '{1}',
'aws.region' = '{2}',
'scan.stream.initpos' = '{3}',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
) """.format(
table_name, stream_name, region, stream_initpos
)
def create_sink_table(table_name, stream_name, region, stream_initpos):
return """ CREATE TABLE {0} (
ticker VARCHAR(6),
price DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
PARTITIONED BY (ticker)
WITH (
'connector' = 'kinesis',
'stream' = '{1}',
'aws.region' = '{2}',
'sink.partitioner-field-delimiter' = ';',
'sink.batch.max-size' = '100',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
) """.format(
table_name, stream_name, region
)
def create_print_table(table_name, stream_name, region, stream_initpos):
return """ CREATE TABLE {0} (
ticker VARCHAR(6),
price DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
WITH (
'connector' = 'print'
) """.format(
table_name, stream_name, region, stream_initpos
)
def main():
# Application Property Keys
input_property_group_key = "consumer.config.0"
producer_property_group_key = "producer.config.0"
input_stream_key = "input.stream.name"
input_region_key = "aws.region"
input_starting_position_key = "flink.stream.initpos"
output_stream_key = "output.stream.name"
output_region_key = "aws.region"
# tables
input_table_name = "input_table"
output_table_name = "output_table"
# get application properties
props = get_application_properties()
input_property_map = property_map(props, input_property_group_key)
output_property_map = property_map(props, producer_property_group_key)
input_stream = input_property_map[input_stream_key]
input_region = input_property_map[input_region_key]
stream_initpos = input_property_map[input_starting_position_key]
output_stream = output_property_map[output_stream_key]
output_region = output_property_map[output_region_key]
# 2. Creates a source table from a Kinesis Data Stream
table_env.execute_sql(
create_source_table(input_table_name, input_stream, input_region, stream_initpos)
)
# 3. Creates a sink table writing to a Kinesis Data Stream
table_env.execute_sql(
create_print_table(output_table_name, output_stream, output_region, stream_initpos)
)
# 4. Inserts the source table data into the sink table
table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
.format(output_table_name, input_table_name))
if is_local:
table_result.wait()
else:
# get job status through TableResult
print(table_result.get_job_client().get_job_status())
if __name__ == "__main__":
main()
I run into the error Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kinesis' that implements 'org.apache.flink.table.f actories.DynamicTableFactory' in the classpath.
see full strack trace
File ".\getting-started.py", line 181, in <module>
main()
File ".\getting-started.py", line 170, in main
table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
File "C:\Users\a856434\AppData\Local\anaconda3\envs\my-aws-flink-env\lib\site-packages\pyflink\table\table_environment.py", line 828, in execute_sql
return TableResult(self._j_tenv.executeSql(stmt))
File "C:\Users\a856434\AppData\Local\anaconda3\envs\my-aws-flink-env\lib\site-packages\py4j\java_gateway.py", line 1321, in __call__
return_value = get_return_value(
File "C:\Users\a856434\AppData\Local\anaconda3\envs\my-aws-flink-env\lib\site-packages\pyflink\util\exceptions.py", line 146, in deco
return f(*a, **kw)
File "C:\Users\a856434\AppData\Local\anaconda3\envs\my-aws-flink-env\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o1.executeSql.
: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.input_table'.
Table options are:
'aws.region'='eu-west-2'
'connector'='kinesis'
'format'='json'
'json.timestamp-format.standard'='ISO-8601'
'scan.stream.initpos'='LATEST'
'stream'='ExampleInputStream '
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:159)
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:184)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:175)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:115)
at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:1
97)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:189)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1240)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1188)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:345)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNodeOrFail(SqlToOperationConverter.java:353)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:763)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:322)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:105)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:695)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='kinesis'
at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:728)
at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:702)
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:155)
... 34 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kinesis' that implements 'org.apache.flink.table.f
actories.DynamicTableFactory' in the classpath.
Available factory identifiers are:
blackhole
datagen
filesystem
print
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:538)
at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:724)
... 36 more
Here is the application.json file
[
{
"PropertyGroupId": "kinesis.analytics.flink.run.options",
"PropertyMap": {
"python": "GettingStarted/getting-started.py",
"jarfile": "GettingStarted/lib/flink-sql-connector-kinesis-1.15.2.jar"
}
},
{
"PropertyGroupId": "consumer.config.0",
"PropertyMap": {
"input.stream.name": "ExampleInputStream ",
"flink.stream.initpos": "LATEST",
"aws.region": "eu-west-2"
}
},
{
"PropertyGroupId": "producer.config.0",
"PropertyMap": {
"output.stream.name": "ExampleOutputStream",
"shard.count": "1",
"aws.region": "eu-west-2"
}
}
]
I was expecting the output stream to show some results like the image output stream
I rechecked the path of the jar file it exists in the path specified. I even tried testing using python sample program to see a) if the dir exists b) if the .jar file is accessible in that dir dir_structure_files
But still running into the connector issue. I rechecked the pyflink documentation for the error and how .jar dependencies need to be specified, which is exactly how it is specified in the program.
https://pyflink.readthedocs.io/en/main/faq.html#connector-issues https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/dependency_management/#jar-dependencies
I am not using a fat jar as it is just one jar file and it should work as per the steps in repo.
Any help would be much appreciated . As to why I am using version 1.15.2 is because eventually we would want to write beam applications (currently not supported beyond 1.15.2)
Issue resolved. There was no problem in the code. its just that pycharm for some reason doesnt resolve the "is_local" variable correctly. so it cant look at the correct place for the .jar file. So I set the IS_LOCAL=true in the windows cmd. ran the code from there(after activiating conda) and am getting the desired output there.