I need to process multiple json stored in a file and convert them into xml. These json would on an average convert into 3000 lines of xml (to get a picture of the size of json). I am looking at achieving this in less than half an hour.
The time includes reading from file, traversing and converting and again storing into another file as xml's. Later I need to store them in DB as well. I have still not done that but I was planning to use a Kafka connector to directly insert in DB to achieve performance.
Issues I am facing are:- - I am able to read only one line from my input text file but not multiple lines. - How to use my util in my SpliteratorBenchmark.processLine() method which accepts a String to convert. - How to create a new xml file everytime it creates a new xml file. I am having a single file which contains multiple json. I am supposed to read a line and convert it into a xml and then another xml for next json.
Kindly help.
Is there any other way to achieve this. I cannot use any kind of JMS like MQ or Kafka for this task.
This is my core FixedBatchSpliteratorBase:-
public abstract class FixedBatchSpliteratorBase<T> implements Spliterator<T> {
private final int batchSize;
private final int characteristics;
private long est;
public FixedBatchSpliteratorBase(int characteristics, int batchSize, long est) {
this.characteristics = characteristics | SUBSIZED;
this.batchSize = batchSize;
this.est = est;
}
public FixedBatchSpliteratorBase(int characteristics, int batchSize) {
this(characteristics, batchSize, Long.MAX_VALUE);
}
public FixedBatchSpliteratorBase(int characteristics) {
this(characteristics, 128, Long.MAX_VALUE);
}
public FixedBatchSpliteratorBase() {
this(IMMUTABLE | ORDERED | NONNULL);
}
@Override
public Spliterator<T> trySplit() {
final HoldingConsumer<T> holder = new HoldingConsumer<T>();
if (!tryAdvance(holder)) return null;
final Object[] a = new Object[batchSize];
int j = 0;
do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder));
if (est != Long.MAX_VALUE) est -= j;
return spliterator(a, 0, j, characteristics() | SIZED);
}
@Override
public Comparator<? super T> getComparator() {
if (hasCharacteristics(SORTED)) return null;
throw new IllegalStateException();
}
@Override
public long estimateSize() {
return est;
}
@Override
public int characteristics() {
return characteristics;
}
static final class HoldingConsumer<T> implements Consumer<T> {
Object value;
@Override
public void accept(T value) {
this.value = value;
}
}
}
This is my class FixedBatchSpliterator:-
import static java.util.stream.StreamSupport.stream;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
public class FixedBatchSpliterator<T> extends FixedBatchSpliteratorBase<T> {
private final Spliterator<T> spliterator;
public FixedBatchSpliterator(Spliterator<T> toWrap, int batchSize) {
super(toWrap.characteristics(), batchSize, toWrap.estimateSize());
this.spliterator = toWrap;
}
public static <T> FixedBatchSpliterator<T> batchedSpliterator(Spliterator<T> toWrap, int batchSize) {
return new FixedBatchSpliterator<>(toWrap, batchSize);
}
public static <T> Stream<T> withBatchSize(Stream<T> in, int batchSize) {
return stream(batchedSpliterator(in.spliterator(), batchSize), true);
}
@Override public boolean tryAdvance(Consumer<? super T> action) {
return spliterator.tryAdvance(action);
}
@Override public void forEachRemaining(Consumer<? super T> action) {
spliterator.forEachRemaining(action);
}
}
This is my JsonSpliterator:-
import java.util.function.Consumer;
import com.dj.gls.core.FixedBatchSpliteratorBase;
public class JsonSpliterator extends FixedBatchSpliteratorBase<String[]> {
private final JSONReader jr;
JsonSpliterator(JSONReader jr, int batchSize) {
super(IMMUTABLE | ORDERED | NONNULL, batchSize);
if (jr == null) throw new NullPointerException("JSONReader is null");
this.jr = jr;
}
public JsonSpliterator(JSONReader jr) { this(jr, 128); }
@Override
public boolean tryAdvance(Consumer<? super String[]> action) {
if (action == null) throw new NullPointerException();
try {
final String[] row = jr.readNext();
if (row == null) return false;
action.accept(row);
return true;
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void forEachRemaining(Consumer<? super String[]> action) {
if (action == null) throw new NullPointerException();
try {
for (String[] row; (row = jr.readNext()) != null;) action.accept(row);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
This is my main class which reads input file and starts the conversion process:-
import static java.util.concurrent.TimeUnit.SECONDS;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.stream.Stream;
import com.dj.gls.core.FixedBatchSpliterator;
public class SpliteratorBenchmark {
static double sink;
public static void main(String[] args) throws IOException {
final Path inputPath = createInput();
for (int i = 0; i < 3; i++) {
System.out.println("Start processing JDK stream");
measureProcessing(Files.lines(inputPath));
System.out.println("Start processing fixed-batch stream");
measureProcessing(FixedBatchSpliterator.withBatchSize(Files.lines(inputPath), 10));
}
}
private static void measureProcessing(Stream<String> input) throws IOException {
final long start = System.nanoTime();
try (Stream<String> lines = input) {
final long totalTime = lines.parallel().mapToLong(SpliteratorBenchmark::processLine).sum();
final double cpuTime = totalTime, realTime = System.nanoTime() - start;
final int cores = Runtime.getRuntime().availableProcessors();
System.out.println(" Cores: " + cores);
System.out.format(" CPU time: %.2f s\n", cpuTime / SECONDS.toNanos(1));
System.out.format(" Real time: %.2f s\n", realTime / SECONDS.toNanos(1));
System.out.format("CPU utilization: %.2f%%\n\n", 100.0 * cpuTime / realTime / cores);
}
}
private static long processLine(String line) {
final long localStart = System.nanoTime();
//conversion of json to xml and storing an xml file
return System.nanoTime() - localStart;
}
private static Path createInput() throws IOException {
final Path inputPath = Paths.get("/src/main/resources/input/input.txt");
return inputPath;
}
}
This is how I am planning to convert my json to xml :-
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Reader;
import java.io.Writer;
import javax.json.Json;
import javax.json.stream.JsonParser;
import javax.xml.stream.XMLOutputFactory;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamWriter;
import javax.xml.transform.Result;
/**
* Converts JSON stream to XML stream.
* Uses XML representation of JSON defined in XSLT 3.0.
*
* @see <a href="https://www.w3.org/TR/xslt-30/#json">22 Processing JSON Data</a>
*
*/
public class JsonStreamXMLWriter
{
public static final String XPATH_FUNCTIONS_NS = "http://www.w3.org/2005/xpath-functions";
private static final XMLOutputFactory XOF = XMLOutputFactory.newInstance();
static
{
XOF.setProperty(XMLOutputFactory.IS_REPAIRING_NAMESPACES, true);
}
private final JsonParser parser;
private final XMLStreamWriter writer;
public JsonStreamXMLWriter(Reader reader, Writer stream) throws XMLStreamException
{
this(Json.createParser(reader), getXMLOutputFactory().createXMLStreamWriter(stream));
}
public JsonStreamXMLWriter(Reader reader, OutputStream stream) throws XMLStreamException
{
this(Json.createParser(reader), getXMLOutputFactory().createXMLStreamWriter(stream));
}
public JsonStreamXMLWriter(Reader reader, OutputStream stream, String encoding) throws XMLStreamException
{
this(Json.createParser(reader), getXMLOutputFactory().createXMLStreamWriter(stream, encoding));
}
public JsonStreamXMLWriter(Reader reader, Result result) throws XMLStreamException
{
this(Json.createParser(reader), getXMLOutputFactory().createXMLStreamWriter(result));
}
public JsonStreamXMLWriter(Reader reader, XMLStreamWriter writer)
{
this(Json.createParser(reader), writer);
}
public JsonStreamXMLWriter(InputStream is, Writer stream) throws XMLStreamException
{
this(Json.createParser(is), getXMLOutputFactory().createXMLStreamWriter(stream));
}
public JsonStreamXMLWriter(InputStream is, OutputStream stream) throws XMLStreamException
{
this(Json.createParser(is), getXMLOutputFactory().createXMLStreamWriter(stream));
}
public JsonStreamXMLWriter(InputStream is, OutputStream stream, String encoding) throws XMLStreamException
{
this(Json.createParser(is), getXMLOutputFactory().createXMLStreamWriter(stream, encoding));
}
public JsonStreamXMLWriter(InputStream is, Result result) throws XMLStreamException
{
this(Json.createParser(is), getXMLOutputFactory().createXMLStreamWriter(result));
}
public JsonStreamXMLWriter(InputStream is, XMLStreamWriter writer)
{
this(Json.createParser(is), writer);
}
public JsonStreamXMLWriter(JsonParser parser, Writer stream) throws XMLStreamException
{
this(parser, getXMLOutputFactory().createXMLStreamWriter(stream));
}
public JsonStreamXMLWriter(JsonParser parser, OutputStream stream) throws XMLStreamException
{
this(parser, getXMLOutputFactory().createXMLStreamWriter(stream));
}
public JsonStreamXMLWriter(JsonParser parser, OutputStream stream, String encoding) throws XMLStreamException
{
this(parser, getXMLOutputFactory().createXMLStreamWriter(stream, encoding));
}
public JsonStreamXMLWriter(JsonParser parser, Result result) throws XMLStreamException
{
this(parser, getXMLOutputFactory().createXMLStreamWriter(result));
}
public JsonStreamXMLWriter(JsonParser parser, XMLStreamWriter writer)
{
this.parser = parser;
this.writer = writer;
}
public void convert() throws XMLStreamException
{
convert(getWriter());
}
public void convert(XMLStreamWriter writer) throws XMLStreamException
{
convert(getParser(), writer);
}
public static void convert(JsonParser parser, XMLStreamWriter writer) throws XMLStreamException
{
writer.writeStartDocument();
writer.setDefaultNamespace(XPATH_FUNCTIONS_NS);
write(parser, writer);
writer.writeEndDocument();
writer.flush();
parser.close();
}
public static void write(JsonParser parser, XMLStreamWriter writer) throws XMLStreamException
{
String keyName = null;
while (parser.hasNext())
{
JsonParser.Event event = parser.next();
switch(event)
{
case START_ARRAY:
writer.writeStartElement(XPATH_FUNCTIONS_NS, "array");
if (keyName != null)
{
writer.writeAttribute("key", keyName);
keyName = null;
}
break;
case END_ARRAY:
writer.writeEndElement();
break;
case START_OBJECT:
writer.writeStartElement(XPATH_FUNCTIONS_NS, "map");
if (keyName != null)
{
writer.writeAttribute("key", keyName);
keyName = null;
}
break;
case END_OBJECT:
writer.writeEndElement();
break;
case VALUE_FALSE:
writer.writeStartElement(XPATH_FUNCTIONS_NS, "boolean");
if (keyName != null)
{
writer.writeAttribute("key", keyName);
keyName = null;
}
writer.writeCharacters("false");
writer.writeEndElement();
break;
case VALUE_TRUE:
writer.writeStartElement(XPATH_FUNCTIONS_NS, "boolean");
if (keyName != null)
{
writer.writeAttribute("key", keyName);
keyName = null;
}
writer.writeCharacters("true");
writer.writeEndElement();
break;
case KEY_NAME:
keyName = parser.getString();
break;
case VALUE_STRING:
writer.writeStartElement(XPATH_FUNCTIONS_NS, "string");
if (keyName != null)
{
writer.writeAttribute("key", keyName);
keyName = null;
}
writer.writeCharacters(parser.getString());
writer.writeEndElement();
break;
case VALUE_NUMBER:
writer.writeStartElement(XPATH_FUNCTIONS_NS, "number");
if (keyName != null)
{
writer.writeAttribute("key", keyName);
keyName = null;
}
writer.writeCharacters(parser.getString());
writer.writeEndElement();
break;
case VALUE_NULL:
writer.writeEmptyElement(XPATH_FUNCTIONS_NS, "null");
if (keyName != null)
{
writer.writeAttribute("key", keyName);
keyName = null;
}
break;
}
writer.flush();
}
}
protected JsonParser getParser()
{
return parser;
}
protected XMLStreamWriter getWriter()
{
return writer;
}
protected static XMLOutputFactory getXMLOutputFactory()
{
return XOF;
}
}
This is my class to call this Util class to convert :-
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import javax.xml.stream.XMLStreamException;
import org.apache.commons.io.IOUtils;
public class JSON2XML
{
public static void main(String[] args) throws IOException, XMLStreamException
{
OutputStreamWriter outputStreamWriter = new OutputStreamWriter(outputStream);
InputStream json = new FileInputStream(file);
if (json.available() == 0)
{
System.err.println("JSON input: stdin");
System.exit(1);
}
try (Reader reader = new BufferedReader(new InputStreamReader(json, StandardCharsets.UTF_8)))
{
new JsonStreamXMLWriter(reader, new BufferedWriter(new OutputStreamWriter(outputStream))).convert();
}
}
}
I was able to read multiple json files one by one and create new xml file. Below is the code:-
private static long processLine(Path line) {
final long localStart = System.nanoTime();
System.out.println("Path" + line);
OutputStream outputStream;
try {
if(Files.isRegularFile(line)) {
String str = line.getFileName().toString();
System.out.println("FIlename - " + str.substring(0, str.length()-4));
outputStream = new FileOutputStream("/Users/d0s04ub/Documents/STS_Workspace/OrderProcessor/src/main/resources/output/" + str.substring(0, str.length()-4));
InputStream content = Files.newInputStream(line);
try (Reader reader = new BufferedReader(new InputStreamReader(content, StandardCharsets.UTF_8)))
{
try {
new JsonStreamXMLWriter(reader, new BufferedWriter(new OutputStreamWriter(outputStream))).convert();
} catch (XMLStreamException e) {
e.printStackTrace();
}
}
}
} catch (IOException e1) {
e1.printStackTrace();
}finally{
}
return System.nanoTime() - localStart;
}