javascriptcometxmlhttprequestatmosphere

Atmosphere Multiple streaming XmlHttpRequests (XHR)/Channels block?


I am building an web app which uses Comet. The backend side is build with Atmosphere and Jersey. However I ran into trouble when i wanted to subscribe to multiple channels. The jQuery plugin atmosphere supplies only has support for 1 channel. I started to write my own implementation as for comet that is for the moment.

THE PROBLEM

If i update channel 1 with msg "Hello" i don't get notified. However when i update channel 2 with msg "World" afterwards. I get both "Hello" and "World" at the same time..


var connection1 = new AtmosphereConnectionComet("http://localhost/b/product/status/1");
var connection2 = new AtmosphereConnectionComet("http://localhost/b/product/status/2");

var handleMessage = function(msg)
{
   alert(msg);
};

connection1.NewMessage.add(handleMessage);
connection2.NewMessage.add(handleMessage);

connection1.connect();
connection2.connect();

The AtmosphereConnectionComet implementation:

UPDATED


function AtmosphereConnectionComet(url)
{
    //signals for dispatching
    this.Connected = new signals.Signal();
    this.Disconnected = new signals.Signal();
    this.NewMessage = new signals.Signal();

    //private vars
    var xhr = null;
    var self = this;
    var gotWelcomeMessage = false;
    var readPosition;
    var url = url;

    //private methods
    var onIncomingXhr = function()
    {
        //check if we got some new data
        if (xhr.readyState == 3)
        {
            //if the status is oke
            if (xhr.status==200)  // Received a message
            {
                //get the message
                //this is like streaming.. each time we get readyState 3 and status 200 there will be text appended to xhr.responseText
                var message = xhr.responseText;

                console.log(message);

                //check if we dont have the welcome message yet and if its maybe there... (it doesn't come in one pull)
                if(!gotWelcomeMessage && message.indexOf("<--EOD-->") > -1)
                {
                    //we has it
                    gotWelcomeMessage = true;
                    //dispatch a signal
                    self.Connected.dispatch(sprintf("Connected to %s", url));
                }
                //welcome message set, from now on only messages (yes this will fail for larger date i presume)
                else
                {
                    //dispatch the new message by substr from the last readPosition
                    self.NewMessage.dispatch(message.substr(readPosition));
                }

                //update the readPosition to the size of this message
                readPosition = xhr.responseText.length;
            }
        }
        //ooh the connection got resumed, seems we got disconnected
        else if (xhr.readyState == 4)
        {
            //disconnect
            self.disconnect();
        }
    }

    var getXhr = function()
    {
        if ( window.location.protocol !== "file:" ) {
            try {
                return new window.XMLHttpRequest();
            } catch(xhrError) {}
        }

        try {
            return new window.ActiveXObject("Microsoft.XMLHTTP");
        } catch(activeError) {}
    }

    this.connect = function()
    {
        xhr = getXhr();
        xhr.onreadystatechange = onIncomingXhr;
        xhr.open("GET", url, true);
        xhr.send(null);
    }

    this.disconnect = function()
    {
        xhr.onreadystatechange = null;
        xhr.abort();
    }

    this.send = function(message)
    {

    }
}

UPDATE 9-1 23:00 GMT+1

It seems atmosphere doesn't output the stuff..

ProductEventObserver

This is a ProductEventObserver which observes SEAM events. This component is autocreated and is in the APPLICATION context of SEAM. It catches the events and uses the broadcastToProduct to get the right broadcaster (via the broadcasterfactory) and broadcast the json message (i used gson as json serializer/marshaller) to the supspended connections.


package nl.ambrero.botenveiling.managers.product;

import com.google.gson.Gson;
import nl.ambrero.botenveiling.entity.product.Product;
import nl.ambrero.botenveiling.entity.product.ProductBid;
import nl.ambrero.botenveiling.entity.product.ProductBidRetraction;
import nl.ambrero.botenveiling.entity.product.ProductRetraction;
import nl.ambrero.botenveiling.managers.EventTypes;
import nl.ambrero.botenveiling.rest.vo.*;
import org.atmosphere.cpr.Broadcaster;
import org.atmosphere.cpr.BroadcasterFactory;
import org.atmosphere.cpr.DefaultBroadcaster;
import org.jboss.seam.ScopeType;
import org.jboss.seam.annotations.*;
import org.jboss.seam.log.Log;

@Name("productEventObserver")
@Scope(ScopeType.APPLICATION)
@AutoCreate
public class ProductEventObserver
{
    @Logger
    Log logger;

    Gson gson;

    @Create
    public void init()
    {
        gson = new Gson();
    }

    private void broadCastToProduct(int id, ApplicationEvent message)
    {
        Broadcaster broadcaster = BroadcasterFactory.getDefault().lookup(DefaultBroadcaster.class, String.format("%s", id));

        logger.info(String.format("There are %s broadcasters active", BroadcasterFactory.getDefault().lookupAll().size()));

        if(broadcaster == null)
        {
            logger.info("No broadcaster found..");

            return;
        }

        logger.info(String.format("Broadcasting message of type '%s' to '%s' with scope '%s'", message.getEventType(), broadcaster.getID(), broadcaster.getScope().toString()));

        broadcaster.broadcast(gson.toJson(message));
    }

    @Observer(value = { EventTypes.PRODUCT_AUCTION_EXPIRED, EventTypes.PRODUCT_AUCTION_SOLD })
    public void handleProductAcutionEnded(Product product)
    {
        this.broadCastToProduct(
            product.getId(),
            new ProductEvent(ApplicationEventType.PRODUCT_AUCTION_ENDED, product)
        );
    }

    @Observer(value = EventTypes.PRODUCT_RETRACTED)
    public void handleProductRetracted(ProductRetraction productRetraction)
    {
        this.broadCastToProduct(
            productRetraction.getProduct().getId(),
            new ProductRetractionEvent(ApplicationEventType.PRODUCT_RETRACTED, productRetraction)
        );
    }

    @Observer(value = EventTypes.PRODUCT_AUCTION_STARTED)
    public void handleProductAuctionStarted(Product product)
    {
        this.broadCastToProduct(
            product.getId(),
            new ProductEvent(ApplicationEventType.PRODUCT_AUCTION_STARTED, product)
        );
    }

    @Observer(value = EventTypes.PRODUCT_BID_ADDED)
    public void handleProductNewBid(ProductBid bid)
    {
        this.broadCastToProduct(
            bid.getProduct().getId(),
            new ProductBidEvent(ApplicationEventType.PRODUCT_BID_ADDED, bid)
        );
    }

    @Observer(value = EventTypes.PRODUCT_BID_RETRACTED)
    public void handleProductRetractedBid(ProductBidRetraction bidRetraction)
    {
        this.broadCastToProduct(
            bidRetraction.getProductBid().getProduct().getId(),
            new ProductBidRetractionEvent(ApplicationEventType.PRODUCT_BID_RETRACTED, bidRetraction)
        );
    }
}

Web.xml

<servlet>
        <description>AtmosphereServlet</description>
        <servlet-name>AtmosphereServlet</servlet-name>
        <servlet-class>org.atmosphere.cpr.AtmosphereServlet</servlet-class>
        <init-param>
            <param-name>com.sun.jersey.config.property.packages</param-name>
            <param-value>nl.ambrero.botenveiling.rest</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.useWebSocket</param-name>
            <param-value>true</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.useNative</param-name>
            <param-value>true</param-value>
        </init-param>
        <load-on-startup>0</load-on-startup>
    </servlet>
    <servlet-mapping>
        <servlet-name>AtmosphereServlet</servlet-name>
        <url-pattern>/b/*</url-pattern>
    </servlet-mapping>

atmosphere.xml

<atmosphere-handlers>
    <atmosphere-handler context-root="/b" class-name="org.atmosphere.handler.ReflectorServletProcessor">
        <property name="servletClass" value="com.sun.jersey.spi.container.servlet.ServletContainer"/>
    </atmosphere-handler>
</atmosphere-handlers>

Broadcaster:


@Path("/product/status/{product}")
@Produces(MediaType.APPLICATION_JSON)
public class ProductEventBroadcaster
{
    @PathParam("product")
    private Broadcaster product;

    @GET
    public SuspendResponse subscribe()
    {
        return new SuspendResponse.SuspendResponseBuilder()
                .broadcaster(product)
                .build();
    }
}

UPDATE 10-1 4:18 GMT+1

Console output:


16:15:16,623 INFO  [STDOUT] 16:15:16,623 INFO  [ProductEventObserver] There are 3 broadcasters active
16:15:16,624 INFO  [STDOUT] 16:15:16,624 INFO  [ProductEventObserver] Broadcasting message of type 'productBidAdded' to '2' with scope 'APPLICATION'
16:15:47,580 INFO  [STDOUT] 16:15:47,580 INFO  [ProductEventObserver] There are 3 broadcasters active
16:15:47,581 INFO  [STDOUT] 16:15:47,581 INFO  [ProductEventObserver] Broadcasting message of type 'productBidAdded' to '1' with scope 'APPLICATION'

Solution

  • The value of product:

    @PathParam("product")
    private Broadcaster product;
    

    Does it match the id of the broadCastToProduct(int id, ApplicationEvent message)?

    Send me a test case I can look at (post it users@atmosphere.java.net).