Commit 9241b742 authored by R.W.Majeed's avatar R.W.Majeed

Stream methods moved to separate class

parent 753982f8
......@@ -29,6 +29,7 @@ public abstract class GroupedObservationHandler implements ObservationHandler, A
/**
* Called when the first observation is encountered
* @throws ObservationException to report errors
*/
protected abstract void beginStream()throws ObservationException;
protected abstract void beginPatient(Patient patient)throws ObservationException;
......
......@@ -48,8 +48,8 @@ import de.sekmi.histream.conf.Configuration;
import de.sekmi.histream.conf.PluginConfig;
import de.sekmi.histream.conf.PluginRef;
import de.sekmi.histream.impl.AbstractObservationHandler;
import de.sekmi.histream.io.AbstractObservationParser;
import de.sekmi.histream.io.FileObservationSupplierFactory;
import de.sekmi.histream.io.Streams;
public class RunConfiguration implements Closeable{
private static final Logger log = Logger.getLogger(RunConfiguration.class.getName());
......@@ -128,7 +128,7 @@ public class RunConfiguration implements Closeable{
for( ObservationHandler h : destinationHandlers ){
h.setMeta("etl.strategy", provider.getMeta("etl.strategy"));
}
AbstractObservationParser.nonNullStream(provider).forEach(destinationChain);
Streams.nonNullStream(provider).forEach(destinationChain);
}
public ObservationSupplier providerForFile(File file){
......
......@@ -79,49 +79,6 @@ public class AbstractObservationParser implements ExternalSourceType{
public void setObservationFactory(ObservationFactory factory){
this.factory = factory;
}
/**
* TODO move method to ObservationSupplier
* @param supplier observation supplier
* @return spliterator
*/
public static Spliterator<Observation> nonNullSpliterator(Supplier<Observation> supplier){
return new NonNullSpliterator(supplier);
}
public static Stream<Observation> nonNullStream(Supplier<Observation> supplier){
return StreamSupport.stream(new NonNullSpliterator(supplier), false);
}
private static class NonNullSpliterator implements Spliterator<Observation>{
private Supplier<Observation> supplier;
public NonNullSpliterator(Supplier<Observation> supplier) {
this.supplier = supplier;
}
@Override
public boolean tryAdvance(Consumer<? super Observation> action) {
Observation o = supplier.get();
if( o == null )return false;
action.accept(o);
return true;
}
@Override
public Spliterator<Observation> trySplit() {
return null;
}
@Override
public long estimateSize() {
return Long.MAX_VALUE;
}
@Override
public int characteristics() {
return Spliterator.NONNULL | Spliterator.IMMUTABLE;
}
}
@Override
......
package de.sekmi.histream.io;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import de.sekmi.histream.Observation;
import de.sekmi.histream.ObservationHandler;
import de.sekmi.histream.ObservationSupplier;
import de.sekmi.histream.impl.Meta;
public class Streams {
/**
* TODO move method to ObservationSupplier
* @param supplier observation supplier
* @return spliterator
*/
public static Spliterator<Observation> nonNullSpliterator(Supplier<Observation> supplier){
return new NonNullSpliterator(supplier);
}
/**
* Create a non-null stream of observations
* The stream will end when the first non null observation is
* received via {@link Supplier#get()}, which usually means end of stream.
*
* @param supplier observation supplier
* @return stream
*/
public static Stream<Observation> nonNullStream(Supplier<Observation> supplier){
return StreamSupport.stream(new NonNullSpliterator(supplier), false);
}
/**
* Transfers meta information and all observations from source to target.
*
* @param source observation source
* @param target observation handler
*/
public static void transfer(ObservationSupplier source, ObservationHandler target){
Meta.transfer(source, target);
Streams.nonNullStream(source).forEach(target);
}
private static class NonNullSpliterator implements Spliterator<Observation>{
private Supplier<Observation> supplier;
public NonNullSpliterator(Supplier<Observation> supplier) {
this.supplier = supplier;
}
@Override
public boolean tryAdvance(Consumer<? super Observation> action) {
Observation o = supplier.get();
if( o == null )return false;
action.accept(o);
return true;
}
@Override
public Spliterator<Observation> trySplit() {
return null;
}
@Override
public long estimateSize() {
return Long.MAX_VALUE;
}
@Override
public int characteristics() {
return Spliterator.NONNULL | Spliterator.IMMUTABLE;
}
}
}
......@@ -28,7 +28,6 @@ import java.time.temporal.ChronoField;
import java.time.temporal.ChronoUnit;
import java.util.Iterator;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;
import java.math.BigDecimal;
import javax.xml.bind.JAXBException;
......@@ -54,7 +53,6 @@ import de.sekmi.histream.impl.ObservationFactoryImpl;
import de.sekmi.histream.impl.SimplePatientExtension;
import de.sekmi.histream.impl.SimpleVisitExtension;
import de.sekmi.histream.impl.TestObservationHandler;
import de.sekmi.histream.io.AbstractObservationParser;
import de.sekmi.histream.io.FlatObservationSupplier;
import de.sekmi.histream.io.XMLObservationSupplier;
......@@ -208,7 +206,7 @@ public class FileObservationProviderTest {
}
public void validateExample(Supplier<Observation> supplier){
StreamSupport.stream(AbstractObservationParser.nonNullSpliterator(supplier), false).forEach(handler);
Streams.nonNullStream(supplier).forEach(handler);
}
@After
......
......@@ -5,7 +5,6 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.time.Instant;
import java.util.stream.StreamSupport;
import javax.xml.XMLConstants;
import javax.xml.bind.JAXB;
......@@ -145,8 +144,7 @@ public class TestXMLWriter {
ObservationSupplier s = t.getExampleSupplier();
Document doc = createDocument();
GroupedXMLWriter w = new GroupedXMLWriter(new DOMResult(doc));
Meta.transfer(s, w);
StreamSupport.stream(AbstractObservationParser.nonNullSpliterator(s), false).forEach(w);
Streams.transfer(s, w);
w.close();
s.close();
......@@ -159,8 +157,7 @@ public class TestXMLWriter {
t.initializeObservationFactory();
ObservationSupplier s = t.getExampleSupplier();
GroupedXMLWriter w = new GroupedXMLWriter(debugLog);
Meta.transfer(s, w);
StreamSupport.stream(AbstractObservationParser.nonNullSpliterator(s), false).forEach(w);
Streams.transfer(s, w);
w.close();
s.close();
}
......@@ -215,8 +212,7 @@ public class TestXMLWriter {
t.initializeObservationFactory();
ObservationSupplier s = t.getExampleSupplier();
GroupedXMLWriter w = new GroupedXMLWriter(out);
Meta.transfer(s, w);
StreamSupport.stream(AbstractObservationParser.nonNullSpliterator(s), false).forEach(w);
Streams.transfer(s, w);
s.close();
w.close();
out.close();
......
package de.sekmi.histream.etl;
import java.io.IOException;
import java.util.stream.StreamSupport;
import org.junit.After;
import org.junit.Assert;
......@@ -12,9 +11,8 @@ import de.sekmi.histream.DateTimeAccuracy;
import de.sekmi.histream.Observation;
import de.sekmi.histream.ext.Patient;
import de.sekmi.histream.ext.Visit;
import de.sekmi.histream.impl.Meta;
import de.sekmi.histream.io.AbstractObservationParser;
import de.sekmi.histream.io.GroupedXMLWriter;
import de.sekmi.histream.io.Streams;
public class TestETLSupplier {
private ETLObservationSupplier os;
......@@ -33,8 +31,7 @@ public class TestETLSupplier {
public void testXMLConversion() throws Exception{
GroupedXMLWriter w = new GroupedXMLWriter(System.out);
// transfer meta information
Meta.transfer(os, w);
StreamSupport.stream(AbstractObservationParser.nonNullSpliterator(os), false).forEach(w);
Streams.transfer(os, w);
w.close();
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment