javalambdaparallel-processingjava-8spliterator

Implementing a non-parallel Spliterator for unknown size?


I'm a little confused by all my research. I have custom interface called TabularResultSet (which I've watered down for the sake of example) which traverses through any data set that is tabular in nature. It has a next() method like an iterator and it can be looping through a QueryResultSet, a tabbed-table from a clipboard, a CSV, etc...

However, I'm trying to create a Spliterator that wraps around my TabularResultSet and easily turns it into a stream. I cannot imagine a safe way to parallelize because the TabularResultSet could be traversing a QueryResultSet, and calling next() concurrently could wreak havoc. The only way I imagine parallelization can be done safely is to have the next() called by a single working thread and it passes the data off to a parallel thread to work on it.

So I think parallelization is not an easy option. How do I just get this thing to stream without parallelizing? Here is my work so far...

public final class SpliteratorTest {

    public static void main(String[] args) {
       TabularResultSet rs = null; /* instantiate an implementation; */

       Stream<TabularResultSet> rsStream = StreamSupport.stream(new TabularSpliterator(rs), false);
    }

    public static interface TabularResultSet {
        public boolean next();

        public List<Object> getData();
    }

    private static final class TabularSpliterator implements Spliterator<TabularResultSet> {

        private final TabularResultSet rs;

        public TabularSpliterator(TabularResultSet rs) {
            this.rs = rs;
        }
        @Override
        public boolean tryAdvance(Consumer<? super TabularResultSet> action) {
            action.accept(rs);
            return rs.next();
        }

        @Override
        public Spliterator<TabularResultSet> trySplit() {
            return null;
        }

        @Override
        public long estimateSize() {
            return Long.MAX_VALUE;
        }

        @Override
        public int characteristics() {
            return 0;
        }
    }
}

Solution

  • It's probably easiest to extend Spliterators.AbstractSpliterator. If you do this, you need only implement tryAdvance. This can be turned into a parallel stream; the parallelism comes from the streams implementation calling tryAdvance multiple times, batching up the data it receives, and processing it in different threads.

    If TabularResultSet is anything like a JDBC ResultSet, I don't think you want a Spliterator<TabularResultSet> or a Stream<TabularResultSet>. Instead it looks like a TabularResultSet represents an entire tabular data set, so you probably want each spliterator or stream element to represent one row in that table -- the List<Object> that is returned by getData()? If so, you'd want something like the following.

    class TabularSpliterator extends Spliterators.AbstractSpliterator<List<Object>> {
        private final TabularResultSet rs;
    
        public TabularSpliterator(TabularResultSet rs) {
            super(...);
            this.rs = rs;
        }
    
        @Override public boolean tryAdvance(Consumer<? super List<Object>> action) {
            if (rs.next()) {
                action.accept(rs.getData());
                return true;
            } else {
                return false;
            }
        }
    }
    

    Then you can turn an instance of this spliterator into a stream by calling StreamSupport.stream().

    Note: in general, a Spliterator instance is not called from multiple threads and need not even be thread-safe. See the Spliterator class documentation at the paragraph beginning "Despite..." for details.