I'm doing a POC to push Data from a (Java) server, though LCDS 3.1 's DataService using RTMP.
Configuration is OK. Adobe Air client DataMessage to server (+Assembler saving in DB) : OK
I found lots of examples with AsyncMessage, but as This is an RTMP destination through a DataService service, I must send a DataMessage.
Appearently, there are some bugs (or I am missing things/good API doc!).
So please, could you help me?
Here is the code that does the push. The key method is doPush()
package mypackage.lcds.service.ds.impl;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.collections.CollectionUtils;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
import mypackage.lcds.service.ds.DataPushService;
import mypackage.model.dto.AbstractDto;
import mypackage.model.exception.DsPushException;
import flex.data.messages.DataMessage;
import flex.messaging.MessageBroker;
import flex.messaging.messages.Message;
import flex.messaging.services.MessageService;
import flex.messaging.util.UUIDUtils;
/**
* Implementation of {@link DataPushService}.
*/
// see http://forums.adobe.com/thread/580667
// MessageCLient :
// http://livedocs.adobe.com/livecycle/8.2/programLC/programmer/lcds/help .html?content=lcconnections_2.html
@Service
public final class DataPushServiceImpl implements DataPushService {
private static final Logger LOG = Logger.getLogger(DataPushServiceImpl.class);
/**
* Destination name for Data-service.<br>
* See data-management-config.XML.
*/
private static final String DESTINATION_NAME__POC_DS_XCHANGE = "poc-ds-xchange";
/**
* See data-management-config.XML.
*/
private static final String PUSH_DTO_SERVICE__NAME = "data-service";
/**
* set "manually" by Spring (contexts workaround; not autowired).
*/
private MessageBroker messageBroker = null;
/**
* Does the push of a single DTO.<br>
* Only subscriberId's that are {@link Long} values will be used. Other Id's do not get a Message sent.
*
* @param dto
* {@link AbstractDto} object.
* @param subscriberIds
* {@link Set} of LCDS Message subscriber IDs {@link Long}. If null, sends to all connected clients.
*
* @throws DsPushException
* if any error
*/
@SuppressWarnings("unchecked")
private void doPush(final AbstractDto dto, final Set<Long> subscriberIds)
throws DsPushException {
Set<?> ids = new HashSet<Object>();
// obtain message service by means of message broker
MessageService messageService = this.getMessageService();
DataMessage message = this.createMessage(dto, messageService);
// fill ids
if ((subscriberIds == null) || (subscriberIds.isEmpty())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending message all currently connected subscriberIds ");
}
Set idsFromDS = messageService.getSubscriberIds(message, true);
if ((idsFromDS != null) && (!idsFromDS.isEmpty())) {
CollectionUtils.addAll(ids, idsFromDS.iterator());
}
} else {
CollectionUtils.addAll(ids, subscriberIds.iterator());
}
if (ids.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("No subscriberId to send the Message to.");
LOG.debug("Known subscribers : " + messageService.getSubscriberIds(message, true).toString());
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending message to subscriberIds : " + subscriberIds.toString());
LOG.debug("Known subscribers : " + messageService.getSubscriberIds(message, true).toString());
}
// send messages to all subscriberIds 1 by 1
Object responsePayload = null;
boolean isSent = false;
for (Object id : ids) {
if (id instanceof Long) {
try {
message.setHeader(Message.DESTINATION_CLIENT_ID_HEADER, id);
if (LOG.isDebugEnabled()) {
LOG.debug("Sending LCDS DataMessage to subscriber [" + id + "] \n" + message.toString(2));
}
responsePayload = messageService.serviceMessage(message, true);
// no exception ==> means OK?
// TODO TEST retuned payload
isSent = true;
} catch (Exception e) {
LOG.error("Error while sending message to subscriberId " + id, e);
isSent = false;
} finally {
if (LOG.isDebugEnabled()) {
LOG.debug("Message sent to '" + String.valueOf(id) + "' : " + String.valueOf(isSent));
}
}
} else if (LOG.isDebugEnabled()) {
LOG.debug("Avoiding subscriber ID (not a Long value) : " + String.valueOf(id));
}
}
}
}
/**
* {@inheritDoc}
*
* @see DataPushService#pushToAllClients(AbstractDto)
*/
// TODO test : if client is not connected, does LCDS record it for later (offline mode on the server?)
public void pushToAllClients(final AbstractDto dto) throws DsPushException {
this.doPush(dto, null);
}
public void pushTo1Client(AbstractDto dto, Long subscriberId) throws DsPushException {
Set<Long> subscriberIds = new HashSet<Long>();
subscriberIds.add(subscriberId);
this.doPush(dto, subscriberIds);
}
/**
* {@inheritDoc}<br>
* subscriberIds refer to the 'clientId' set by the client app when it subscribes to the DS destination.
*
* @see DataPushService#pushToClients(AbstractDto, Set)
*/
public void pushToClients(final AbstractDto dto, final Set<Long> subscriberIds) throws DsPushException {
this.doPush(dto, subscriberIds);
}
@SuppressWarnings("unchecked")
private DataMessage createMessage(final AbstractDto dto, final MessageService messageService) {
DataMessage msg = new DataMessage();
msg.setClientId(getServerId());
msg.setTimestamp(System.currentTimeMillis());
msg.setMessageId(UUIDUtils.createUUID(true));
msg.setCorrelationId(msg.getMessageId()); // TODO OK messageId == CorrelationId ?
msg.setDestination(DESTINATION_NAME__POC_DS_XCHANGE);
msg.setBody(dto);
msg.setOperation(DataMessage.CREATE_AND_SEQUENCE_OPERATION); // TODO OK operation?
Map identity = new HashMap(2);
// see data-management-config.xml
identity.put("id", dto.getId());
msg.setIdentity(identity);
// FIXME set priority. How?
if (LOG.isDebugEnabled()) {
LOG.debug("LCDS DataMessage created : \n" + msg.toString(2));
}
return msg;
}
private Object getServerId() {
// FIXME OK?
return "X-BACKEND";
}
/**
* Get the current {@link MessageBroker}'s service layer.
*
* @return {@link MessageService} to use for push data
*/
private MessageService getMessageService() {
if (LOG.isDebugEnabled()) {
LOG.debug("Getting MessageBroker's DataService service ");
}
// Was : return (MessageService) MessageBroker.getMessageBroker(null).getService(PUSH_DTO_SERVICE__NAM E);
return (MessageService) this.messageBroker.getService(PUSH_DTO_SERVICE__NAME);
}
/**
* Set the messageBroker. For SPring.
*
* @param messageBroker
* the messageBroker to set
*/
public void setMessageBroker(final MessageBroker messageBroker) {
this.messageBroker = messageBroker;
}
}
NOTE : the messagebroker is set once through Spring. It works for this POC.
I have a Servlet that saves a DTO to the DB and then tries to push it through the service. All seems OK, but I get a NullPointerException (NPE).
Here is the Tomcat 6 LOG (it sends to subscriberID '99' ):
LCDS DataMessage created :
Flex Message (flex.data.messages.DataMessage)
operation = create_and_sequence
id = {id=3203}
clientId = X-BACKEND
correlationId = 7E6C3051-FA0F-9183-4745-B90ACACD71EA
destination = poc-ds-xchange
messageId = 7E6C3051-FA0F-9183-4745-B90ACACD71EA
timestamp = 1297412881050
timeToLive = 0
body = mypackage.model.dto.XchangeDto[id=3203[clientId=2[userId=123456[text= InterActionServlet Test]
09:28:01,065 DEBUG [impl.DataPushServiceImpl] Sending message to subscriberIds : [99]
09:28:01,065 DEBUG [impl.DataPushServiceImpl] Known subscribers : [99]
09:28:01,065 DEBUG [impl.DataPushServiceImpl] Sending LCDS DataMessage to subscriber [99]
Flex Message (flex.data.messages.DataMessage)
operation = create_and_sequence
id = {id=3203}
clientId = X-BACKEND
correlationId = 7E6C3051-FA0F-9183-4745-B90ACACD71EA
destination = poc-ds-xchange
messageId = 7E6C3051-FA0F-9183-4745-B90ACACD71EA
timestamp = 1297412881050
timeToLive = 0
body = mypackage.model.dto.XchangeDto[id=3203[clientId=2[userId=123456[text= InterActionServlet Test]
hdr(DSDstClientId) = 99
09:28:02,456 ERROR [impl.DataPushServiceImpl] Error while sending message to subscriberId 99
java.lang.NullPointerException
at flex.data.adapters.JavaAdapter.invokeAssemblerSync(JavaAdapter.java:1 741)
at flex.data.adapters.JavaAdapter.invokeBatchOperation(JavaAdapter.java: 1630)
at flex.data.adapters.JavaAdapter.invoke(JavaAdapter.java:658)
at flex.messaging.services.MessageService.serviceMessage(MessageService. java:318)
at flex.messaging.services.MessageService.serviceMessage(MessageService. java:233)
at mypackage.lcds.service.ds.impl.DataPushServiceImpl.doPush(DataPushSer viceImpl.java:142)
at mypackage.lcds.service.ds.impl.DataPushServiceImpl.pushTo1Client(Data PushServiceImpl.java:178)
at mypackage.servlet.InteractionServlet.push(InteractionServlet.java:75)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.springframework.web.bind.annotation.support.HandlerMethodInvoker. doInvokeMethod(HandlerMethodInvoker.java:421)
at org.springframework.web.bind.annotation.support.HandlerMethodInvoker. invokeHandlerMethod(HandlerMethodInvoker.java:136)
at org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandle rAdapter.invokeHandlerMethod(AnnotationMethodHandlerAdapter.java:326)
at org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandle rAdapter.handle(AnnotationMethodHandlerAdapter.java:313)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(Dispatch erServlet.java:875)
at org.springframework.web.servlet.DispatcherServlet.doService(Dispatche rServlet.java:807)
at org.springframework.web.servlet.FrameworkServlet.processRequest(Frame workServlet.java:571)
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServl et.java:501)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:690)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:803)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(Appl icationFilterChain.java:290)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationF ilterChain.java:206)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperV alve.java:233)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextV alve.java:175)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.j ava:128)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.j ava:102)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineVal ve.java:109)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.jav a:263)
at org.apache.coyote.http11.Http11Processor.process(Http11Processor.java :844)
at org.apache.coyote.http11.Http11Protocol$Http11ConnectionHandler.proce ss(Http11Protocol.java:584)
at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:44 7)
at java.lang.Thread.run(Unknown Source)
09:28:02,472 DEBUG [impl.DataPushServiceImpl] Message sent to '99' : false
==> what am I doing wrong?
I cannot trace the code (I do not have the source), but the exception thrown is just not helping at all.
Am I missing a header to set?
Thank you so much for your help,
For the records, I found it ;o)
The base to know is that the DataServiceTransaction api is A MUST-HAVE, if you're using LC DataService.
For deails see Adobe forums thread
for the records here, here's my working (basic) code :
/**
* ASSUMPTION : the client Flex/Air apps set the desired userId (= filter) as a fillParameter of the
* DataService.fill() method. This will filter output based on {@link AbstractDto#getUserId()}.
*/
@Service
public final class DataPushServiceImpl implements DataPushService {
private static final Logger LOG = Logger.getLogger(DataPushServiceImpl.class);
/* *********** V2 : DataServiceTransaction.createItem() ********* */
/**
* Does the push of a single DTO.
*
* @param dto
* {@link AbstractDto} object. Contains the {@link AbstractDto#getUserId()} that is used by clients to
* filter data in the DataService.fill() method (used by the Assembler).
*
* @throws DsPushException
* if any error
*/
private boolean doPushViaTransaction(final AbstractDto dto) throws DsPushException {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending message through DataServiceTransaction (see userId field) : " + dto.toString());
}
// One MUST instantiate a DataServiceTransaction to be able to send anything (NullPointerException)
DataServiceTransaction dtx = null;
boolean isOwnerOfTx = false;
boolean isSent = false;
try {
// if already in an Assembler, we do have a tx ==> no commit nor rollback!
dtx = DataServiceTransaction.getCurrentDataServiceTransaction();
if (dtx == null) {
// new one, no JTA ==> ourselves : commit or rollback
isOwnerOfTx = true;
//MessageBroker instantiated with SpringFlex is auto-named
dtx = DataServiceTransaction.begin("_messageBroker", false);
}
isSent = this.doTransactionSend(dto, dtx);
} catch (Exception e) {
// Log exception, but no impact on the back-end business ==> swallow Exception
LOG.error("Could not send the creation to LCDS", e);
if (isOwnerOfTx) {
dtx.rollback();
}
} finally {
try {
if (isOwnerOfTx && (dtx != null)) {
dtx.commit();
}
} catch (Exception e) {
// swallow
LOG.error("Could not send the creation to LCDS (@commit of the DataServiceTransaction)", e);
}
}
return isSent;
}
private boolean doTransactionSend(final AbstractDto dto, final DataServiceTransaction dtx) {
boolean isSent = false;
if (dto == null) {
LOG.error("The given DTO is null! Nothing happens");
} else {
try {
dtx.createItem(FlexUtils.DESTINATION_NAME__POC_DS, dto);
isSent = true; // no problem
} catch (Exception e) {
// Log exception, but no impact on the business
LOG.error("Could not send the creation to LCDS for DTO " + dto.toString(), e);
} finally {
if (LOG.isDebugEnabled()) {
LOG.debug("DTO : " + dto.toString() + "\n sent : " + String.valueOf(isSent));
}
}
}
return isSent;
}
//implementation of DataPushService interface
/**
* {@inheritDoc}
*
* @see DataPushService#pushNewDTo(AbstractDto, java.lang.Long)
*/
@Transactional(rollbackFor = DsPushException.class)
public boolean pushNewDTo(final AbstractDto dto, final Long subscriberId) throws DsPushException {
return this.doPushViaTransaction(dto);
}
}
Enjoy and thank you for your time!
G.