arraysgoogle-drive-apirx-javarx-java2google-drive-realtime-api

Is there any way to pass Observable<String> into AbstractInputStreamContent?


I'm working on uploading a text file to Google Drive

ByteArrayContent content = new ByteArrayContent("text/csv", fileContent.getBytes(Charset.forName("UTF-8")));
Drive.Files.Insert request = drive.files().insert(file, content);

where type(fileContent) = String

I'd like to refactor and change type of fileContent to Observable<String>, is there any nice workaround to pass it to that insert() function (which takes AbstractInputStreamContent as a second argument)?

Thanks


Solution

  • Here is a general Flowable -> InputStream bridge you can delegate to:

    import java.io.*;
    import java.nio.charset.Charset;
    import java.util.concurrent.atomic.AtomicReference;
    
    import org.reactivestreams.*;
    
    import io.reactivex.FlowableSubscriber;
    import io.reactivex.internal.subscriptions.SubscriptionHelper;
    
    public final class FlowableStringInputStream {
    
        private FlowableStringInputStream() {
            throw new IllegalStateException("No instances!");
        }
    
        public static InputStream createInputStream(
                Publisher<String> source, Charset charset) {
            StringInputStream parent = new StringInputStream(charset);
            source.subscribe(parent);
            return parent;
        }
    
        static final class StringInputStream extends InputStream
        implements FlowableSubscriber<String> {
    
            final AtomicReference<Subscription> upstream;
    
            final Charset charset;
    
            volatile byte[] bytes;
    
            int index;
    
            volatile boolean done;
            Throwable error;
    
            StringInputStream(Charset charset) {
                this.charset = charset;
                upstream = new AtomicReference<>();
            }
    
            @Override
            public void onSubscribe(Subscription s) {
                if (SubscriptionHelper.setOnce(upstream, s)) {
                    s.request(1);
                }
            }
    
            @Override
            public void onNext(String t) {
                bytes = t.getBytes(charset);
                synchronized (this) {
                    notifyAll();
                }
            }
    
            @Override
            public void onError(Throwable t) {
                error = t;
                done = true;
                synchronized (this) {
                    notifyAll();
                }
            }
    
            @Override
            public void onComplete() {
                done = true;
                synchronized (this) {
                    notifyAll();
                }
            }
    
            @Override
            public int read() throws IOException {
                for (;;) {
                    byte[] a = awaitBufferIfNecessary();
                    if (a == null) {
                        Throwable ex = error;
                        if (ex != null) {
                            if (ex instanceof IOException) {
                                throw (IOException)ex;
                            }
                            throw new IOException(ex);
                        }
                        return -1;
                    }
                    int idx = index;
                    if (idx == a.length) {
                        index = 0;
                        bytes = null;
                        upstream.get().request(1);
                    } else {
                        int result = a[idx] & 0xFF;
                        index = idx + 1;
                        return result;
                    }
                }
            }
    
            byte[] awaitBufferIfNecessary() throws IOException {
                byte[] a = bytes;
                if (a == null) {
                    synchronized (this) {
                        for (;;) {
                            boolean d = done;
                            a = bytes;
                            if (a != null) {
                                break;
                            }
                            if (d || upstream.get() == SubscriptionHelper.CANCELLED) {
                                break;
                            }
                            try {
                                wait();
                            } catch (InterruptedException ex) {
                                if (upstream.get() != SubscriptionHelper.CANCELLED) {
                                    InterruptedIOException exc = new InterruptedIOException();
                                    exc.initCause(ex);
                                    throw exc;
                                }
                                break;
                            }
                        } 
                    }
                }
                return a;
            }
    
            @Override
            public int read(byte[] b, int off, int len) throws IOException {
                if (off < 0 || len < 0 || off >= b.length || off + len > b.length) {
                    throw new IndexOutOfBoundsException(
                        "b.length=" + b.length + ", off=" + off + ", len=" + len);
                }
                for (;;) {
                    byte[] a = awaitBufferIfNecessary();
                    if (a == null) {
                        Throwable ex = error;
                        if (ex != null) {
                            if (ex instanceof IOException) {
                                throw (IOException)ex;
                            }
                            throw new IOException(ex);
                        }
                        return -1;
                    }
                    int idx = index;
                    if (idx == a.length) {
                        index = 0;
                        bytes = null;
                        upstream.get().request(1);
                    } else {
                        int r = 0;
                        while (idx < a.length && len > 0) {
                            b[off] = a[idx];
                            idx++;
                            off++;
                            r++;
                            len--;
                        }
                        index = idx;
                        return r;
                    }
                }
            }
    
            @Override
            public int available() throws IOException {
                byte[] a = bytes;
                int idx = index;
                return a != null ? Math.max(0, a.length - idx) : 0;
            }
    
            @Override
            public void close() throws IOException {
                SubscriptionHelper.cancel(upstream);
                synchronized (this) {
                    notifyAll();
                }
            }
        }
    }
    

    Usage:

    @Test(timeout = 10000)
    public void async() throws Exception {
        AtomicInteger calls = new AtomicInteger();
    
        Flowable<String> f = Flowable.range(100, 10).map(Object::toString)
                .doOnCancel(() -> calls.incrementAndGet())
                .subscribeOn(Schedulers.computation())
                .delay(10, TimeUnit.MILLISECONDS);
    
        try (InputStream is = FlowableStringInputStream.createInputStream(f, utf8)) {
            assertEquals('1', is.read());
            assertEquals('0', is.read());
            assertEquals('0', is.read());
    
            byte[] buf = new byte[3];
            assertEquals(3, is.read(buf));
    
            assertArrayEquals("101".getBytes(utf8), buf);
        }
    
        assertEquals(1, calls.get());
    }