Commit 693409a0 authored by R.W.Majeed's avatar R.W.Majeed

observation providers from file implement supplier instead of spliterator

parent 1a5b5d2b
package de.sekmi.histream.io;
import java.time.Instant;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Supplier;
import de.sekmi.histream.Observation;
import de.sekmi.histream.ObservationFactory;
public class AbstractObservationParser {
......@@ -25,4 +29,38 @@ public class AbstractObservationParser {
protected void setEtlStrategy(String strategy){
this.etlStrategy = strategy;
}
public static Spliterator<Observation> nonNullSpliterator(Supplier<Observation> supplier){
return new NonNullSpliterator(supplier);
}
private static class NonNullSpliterator implements Spliterator<Observation>{
private Supplier<Observation> supplier;
public NonNullSpliterator(Supplier<Observation> supplier) {
this.supplier = supplier;
}
@Override
public boolean tryAdvance(Consumer<? super Observation> action) {
Observation o = supplier.get();
if( o == null )return false;
action.accept(o);
return true;
}
@Override
public Spliterator<Observation> trySplit() {
return null;
}
@Override
public long estimateSize() {
return Long.MAX_VALUE;
}
@Override
public int characteristics() {
return Spliterator.NONNULL | Spliterator.IMMUTABLE;
}
}
}
package de.sekmi.histream.io;
import java.util.function.Supplier;
import de.sekmi.histream.Observation;
public interface FileObservationProvider extends Supplier<Observation>{
}
package de.sekmi.histream.io;
public interface FileObservationProviderFactory {
}
......@@ -10,8 +10,6 @@ import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Hashtable;
import java.util.Map;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
......@@ -26,7 +24,12 @@ import de.sekmi.histream.impl.AbstractValue;
import de.sekmi.histream.impl.NumericValue;
import de.sekmi.histream.impl.StringValue;
public class FlatObservationSpliterator extends AbstractObservationParser implements Spliterator<Observation>{
/**
*
* @author Raphael
*
*/
public class FlatObservationProvider extends AbstractObservationParser implements FileObservationProvider{
private BufferedReader reader;
private Pattern fieldSeparator;
private Pattern metaAssignment;
......@@ -86,7 +89,7 @@ public class FlatObservationSpliterator extends AbstractObservationParser implem
public String getFlags(){return fields[10];}
}
public FlatObservationSpliterator(ObservationFactory factory, BufferedReader reader){
public FlatObservationProvider(ObservationFactory factory, BufferedReader reader){
super(factory);
this.reader = reader;
this.fieldSeparator = Pattern.compile("\\t");
......@@ -97,7 +100,7 @@ public class FlatObservationSpliterator extends AbstractObservationParser implem
lineNo = 0;
}
public FlatObservationSpliterator(ObservationFactory factory, InputStream input){
public FlatObservationProvider(ObservationFactory factory, InputStream input){
this(factory, new BufferedReader(new InputStreamReader(input)));
}
......@@ -243,7 +246,7 @@ public class FlatObservationSpliterator extends AbstractObservationParser implem
}
@Override
public boolean tryAdvance(Consumer<? super Observation> action) {
public Observation get() {
String line;
boolean inGroup = false;
do{
......@@ -255,7 +258,7 @@ public class FlatObservationSpliterator extends AbstractObservationParser implem
}
if( line == null ){
// end of stream
return false;
return null;
}else if( line.length() == 0 ){
// empty line
// continue;
......@@ -303,27 +306,9 @@ public class FlatObservationSpliterator extends AbstractObservationParser implem
}
}
}while( true );
action.accept(fact);
fact = null;
return true;
}
@Override
public Spliterator<Observation> trySplit() {
// TODO Auto-generated method stub
return null;
}
@Override
public long estimateSize() {
// TODO estimate by file size
return Long.MAX_VALUE;
}
@Override
public int characteristics() {
return Spliterator.IMMUTABLE | Spliterator.NONNULL;
Observation ret = fact;
fact = null; // clear local copy
return ret;
}
}
......@@ -21,7 +21,7 @@ import de.sekmi.histream.impl.StringValue;
/**
* Parser for EAV XML documents. This class is used by both the {@link SAXObservationProvider}
* and {@link XMLObservationSpliterator}.
* and {@link XMLObservationProvider}.
*
* @author marap1
*
......
......@@ -3,8 +3,7 @@ package de.sekmi.histream.io;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.xml.stream.FactoryConfigurationError;
import javax.xml.stream.XMLInputFactory;
......@@ -15,18 +14,19 @@ import javax.xml.stream.XMLStreamReader;
import de.sekmi.histream.Observation;
import de.sekmi.histream.ObservationFactory;
import de.sekmi.histream.impl.AbstractValue;
public class XMLObservationSpliterator extends XMLObservationParser implements Spliterator<Observation>{
public class XMLObservationProvider extends XMLObservationParser implements FileObservationProvider{
//private static final String namespaceURI = "http://sekmi.de/histream/dwh-eav";
private XMLStreamReader reader;
private boolean documentStart;
private AttributeAccessor atts;
public XMLObservationSpliterator(ObservationFactory factory, XMLStreamReader reader) {
public XMLObservationProvider(ObservationFactory factory, XMLStreamReader reader) {
super(factory);
this.reader = reader;
atts = new AttributeAccessor() {
......@@ -38,7 +38,7 @@ public class XMLObservationSpliterator extends XMLObservationParser implements S
};
documentStart = true;
}
public XMLObservationSpliterator(ObservationFactory factory, InputStream input) throws XMLStreamException, FactoryConfigurationError {
public XMLObservationProvider(ObservationFactory factory, InputStream input) throws XMLStreamException, FactoryConfigurationError {
this(factory, XMLInputFactory.newInstance().createXMLStreamReader(input));
}
......@@ -168,32 +168,14 @@ public class XMLObservationSpliterator extends XMLObservationParser implements S
}
@Override
public boolean tryAdvance(Consumer<? super Observation> action) {
public Observation get() {
Observation o;
try {
o = readObservation();
} catch (XMLStreamException e) {
throw new UncheckedIOException(new IOException(e));
}
if( o != null ){
action.accept(o);
return true;
}else return false;
}
@Override
public Spliterator<Observation> trySplit() {
return null;
}
@Override
public long estimateSize() {
// TODO Auto-generated method stub
return Long.MAX_VALUE;
}
@Override
public int characteristics() {
// TODO Auto-generated method stub
return Spliterator.NONNULL | Spliterator.IMMUTABLE;
return o;
}
}
......@@ -18,9 +18,10 @@ import org.xml.sax.helpers.XMLReaderFactory;
import de.sekmi.histream.Observation;
import de.sekmi.histream.Value;
import de.sekmi.histream.Modifier;
import de.sekmi.histream.io.FlatObservationSpliterator;
import de.sekmi.histream.io.AbstractObservationParser;
import de.sekmi.histream.io.FlatObservationProvider;
import de.sekmi.histream.io.SAXObservationProvider;
import de.sekmi.histream.io.XMLObservationSpliterator;
import de.sekmi.histream.io.XMLObservationProvider;
public class FileObservationProviderTest {
......@@ -124,15 +125,15 @@ public class FileObservationProviderTest {
@Test
public void testStAXReader() throws Exception {
XMLObservationSpliterator xos = new XMLObservationSpliterator(factory, new FileInputStream("src/test/resources/dwh-eav.xml"));
StreamSupport.stream(xos, false).forEach(handler);
XMLObservationProvider xos = new XMLObservationProvider(factory, new FileInputStream("src/test/resources/dwh-eav.xml"));
StreamSupport.stream(AbstractObservationParser.nonNullSpliterator(xos), false).forEach(handler);
handler.finish();
}
@Test
public void testFlatReader() throws Exception {
FlatObservationSpliterator s = new FlatObservationSpliterator(factory, new FileInputStream("src/test/resources/dwh-flat.txt"));
StreamSupport.stream(s, false).forEach(handler);
FlatObservationProvider s = new FlatObservationProvider(factory, new FileInputStream("src/test/resources/dwh-flat.txt"));
StreamSupport.stream(AbstractObservationParser.nonNullSpliterator(s), false).forEach(handler);
handler.finish();
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment