Commit 6ef3c96e authored by R.W.Majeed's avatar R.W.Majeed
Browse files

Observation transformation core functionality

parent 95d0d7ab
package de.sekmi.histream.io;
import java.util.LinkedList;
import java.util.Queue;
import java.util.function.Consumer;
import de.sekmi.histream.Observation;
public class AbstractTransformer {
final protected Queue<Observation> fifo;
final protected Consumer<Observation> fifoPush;
final protected Transformation transformation;
protected AbstractTransformer(Transformation transformation){
this.transformation = transformation;
this.fifo = new LinkedList<Observation>();
this.fifoPush = fifo::add;
}
}
......@@ -13,15 +13,34 @@ import de.sekmi.histream.Observation;
* @author Raphael
*
*/
public class PullTransformer implements Supplier<Observation>{
private Supplier<Observation> source;
public PullTransformer(Supplier<Observation> source){
public class PullTransformer extends AbstractTransformer implements Supplier<Observation>{
final private Supplier<Observation> source;
public PullTransformer(Supplier<Observation> source, Transformation transformation){
super(transformation);
this.source = source;
}
@Override
public Observation get() {
// TODO filter, buffer
return source.get();
Observation ret;
do{
if( !fifo.isEmpty() ){ // try to empty queue
ret = fifo.remove();
break;
}
// next transformation
Observation o = source.get();
if( o == null ){
// source depleted
ret = null;
break;
}
ret = transformation.transform(o, fifoPush);
}while( ret == null );
return ret;
}
}
......@@ -4,15 +4,23 @@ import java.util.function.Consumer;
import de.sekmi.histream.Observation;
public class PushTransformer implements Consumer<Observation>{
public class PushTransformer extends AbstractTransformer implements Consumer<Observation>{
private Consumer<Observation> target;
public PushTransformer(Consumer<Observation> target){
public PushTransformer(Consumer<Observation> target, Transformation transformation){
super(transformation);
this.target = target;
}
@Override
public void accept(Observation t) {
// TODO transform, buffer
target.accept(t);
Observation ret = transformation.transform(t, fifoPush);
if( ret != null ){
target.accept(ret);
}
while( !fifo.isEmpty() ){
target.accept( fifo.remove() );
}
}
}
package de.sekmi.histream.impl;
package de.sekmi.histream.io;
/*
* #%L
......@@ -25,17 +25,24 @@ import java.io.FileInputStream;
import java.time.temporal.ChronoField;
import java.time.temporal.ChronoUnit;
import java.util.Iterator;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;
import java.math.BigDecimal;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import de.sekmi.histream.Observation;
import de.sekmi.histream.ObservationFactory;
import de.sekmi.histream.ObservationSupplier;
import de.sekmi.histream.Value;
import de.sekmi.histream.Modifier;
import de.sekmi.histream.impl.ObservationFactoryImpl;
import de.sekmi.histream.impl.SimplePatientExtension;
import de.sekmi.histream.impl.SimpleVisitExtension;
import de.sekmi.histream.impl.TestObservationHandler;
import de.sekmi.histream.io.AbstractObservationParser;
import de.sekmi.histream.io.FlatObservationSupplier;
import de.sekmi.histream.io.XMLObservationSupplier;
......@@ -54,6 +61,10 @@ public class FileObservationProviderTest {
//factory.registerExtension(new ConceptExtension());
}
public ObservationFactory getFactory(){
return factory;
}
@Before
public void initializeHandler(){
handler = new TestObservationHandler(new TestObservationHandler.Tester[]{
......@@ -153,18 +164,25 @@ public class FileObservationProviderTest {
});
}
public void validateExample(Supplier<Observation> supplier){
StreamSupport.stream(AbstractObservationParser.nonNullSpliterator(supplier), false).forEach(handler);
}
@After
public void closeHandler(){
handler.finish();
}
@Test
public void testStAXReader() throws Exception {
ObservationSupplier xos = new XMLObservationSupplier(factory, new FileInputStream("examples/dwh-eav.xml"));
StreamSupport.stream(AbstractObservationParser.nonNullSpliterator(xos), false).forEach(handler);
handler.finish();
validateExample(xos);
}
@Test
public void testFlatReader() throws Exception {
ObservationSupplier s = new FlatObservationSupplier(factory, new FileInputStream("examples/dwh-flat.txt"));
StreamSupport.stream(AbstractObservationParser.nonNullSpliterator(s), false).forEach(handler);
handler.finish();
validateExample(s);
}
}
package de.sekmi.histream.io;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.junit.Test;
public class TransformerTest {
@Test
public void testPullTransformerIdentity() throws FileNotFoundException, IOException{
FileObservationProviderTest f = new FileObservationProviderTest();
f.initializeObservationFactory();
Transformation t = Transformation.Identity;
FlatObservationSupplier sup = new FlatObservationSupplier(f.getFactory(), new FileInputStream("examples/dwh-flat.txt"));
PullTransformer p = new PullTransformer(sup, t);
// validate content after identity transformation
f.initializeHandler();
f.validateExample(p);
f.closeHandler();
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment