I am following Doc to try out how to enrich an unbounded stream by directly looking up from a IMap. I have two Maps:
Map<String, Product>
(ProductId as key)Map<String, Seller>
(SellerId as key)Both Product
and Seller
are very simple classes:
public class Product implements DataSerializable {
String productId;
String sellerId;
int price;
...
public class Seller implements DataSerializable {
String sellerId;
int revenue;
...
I have two data generators keep pushing data to the two maps. The event-journal are enabled for both maps. I have verified the event-journal works fine.
I want to enrich the stream event of Product
map with Seller
map. Here is a snippet of my code:
IMap<String, Seller> sellerIMap = jetClient.getMap(SellerDataGenerator.SELLER_MAP);
StreamSource<Product> productStreamSource = Sources.mapJournal(ProductDataGenerator.PRODUCT_MAP, Util.mapPutEvents(), Util.mapEventNewValue(), START_FROM_CURRENT);
p.drawFrom(productStreamSource)
.withoutTimestamps()
.groupingKey(Product::getSellerId)
.mapUsingIMap(sellerIMap, (product, seller) -> new EnrichedProduct(product, seller))
.drainTo(getSink());
try {
JobConfig jobConfig = new JobConfig();
jobConfig.addClass(TaskSubmitter.class).addClass(Seller.class).addClass(Product.class).addClass(ExtendedProduct.class);
jobConfig.setName(Constants.BASIC_TASK);
Job job = jetClient.newJob(p, jobConfig);
} finally {
jetClient.shutdown();
}
When job was submitted, I got following error:
com.hazelcast.spi.impl.operationservice.impl.Invocation - [172.31.33.212]:80 [jet] [3.1] Failed asynchronous execution of execution callback: com.hazelcast.util.executor.DelegatingFuture$DelegatingExecutionCallback@77ac0407for call Invocation{op=com.hazelcast.map.impl.operation.GetOperation{serviceName='hz:impl:mapService', identityHash=1939050026, partitionId=70, replicaIndex=0, callId=-37944, invocationTime=1570410704479 (2019-10-07 01:11:44.479), waitTimeout=-1, callTimeout=60000, name=sellerMap}, tryCount=250, tryPauseMillis=500, invokeCount=1, callTimeoutMillis=60000, firstInvocationTimeMs=1570410704479, firstInvocationTime='2019-10-07 01:11:44.479', lastHeartbeatMillis=0, lastHeartbeatTime='1970-01-01 00:00:00.000', target=[172.31.33.212]:80, pendingResponse={VOID}, backupsAcksExpected=0, backupsAcksReceived=0, connection=null}
I tried to put one and two instances in my cluster and got the same error message. I couldn't figure out what was the root cause.
It seems that your problem is a ClassNotFoundException
, even though you added the appropriate classes to the job. The objects you store in the IMap
exist independent of your Jet job and when the event journal source asks for them, Jet's IMap code tries to deserialize them and fails because Jet doesn't have your domain model classes on its classpath.
To move on, add a JAR with the classes you use in the IMap to Jet's classpath. We are looking for a solution that will remove this requirement.
The reason you haven't got the exception stacktrace in the log output is due to the default java.util.logging
setup you end up with when you don't explicitly add a more flexible logging module, such as Log4j.
The next version of Jet's packaging will improve this aspect. Until that time you can follow these steps:
Go to the lib
directory of Jet's distribution package and download Log4j into it:
$ cd lib
$ wget https://repo1.maven.org/maven2/log4j/log4j/1.2.17/log4j-1.2.17.jar
Edit bin/common.sh
to add the module to the classpath. Towards the end of the file there is a line
CLASSPATH="$JET_HOME/lib/hazelcast-jet-3.1.jar:$CLASSPATH"
You can duplicate this line and replace hazelcast-jet-3.1
with log4j-1.2.17
.
At the end of commons.sh
there is a multi-line command that constructs the JAVA_OPTS
variable. Add "-Dhazelcast.logging.type=log4j"
and "-Dlog4j.configuration=file:$JET_HOME/config/log4j.properties"
to the list.
Create a file log4j.properties
in the config
directory:
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %5p [%c{1}] [%t] - %m%n
# Change this level to debug to diagnose failed cluster formation:
log4j.logger.com.hazelcast.internal.cluster=info
log4j.logger.com.hazelcast.jet=info
log4j.rootLogger=info, stdout