Commit a8f09c04 authored by R.W.Majeed's avatar R.W.Majeed

configuration can be run with xml and flat files

parent d4996ce6
......@@ -23,4 +23,11 @@ public interface ObservationHandler extends Consumer<Observation>{
void accept(Observation observation);
void setErrorHandler(Consumer<ObservationException> handler);
/**
* Set meta information for this observation handler.
* @param key
* @param value
*/
void setMeta(String key, String value);
}
......@@ -7,7 +7,9 @@ import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.logging.Logger;
import de.sekmi.histream.Modifier;
......@@ -55,6 +57,8 @@ public class I2b2Inserter extends AbstractObservationHandler implements Observat
private String nullLocationCd;
private String nullModifierCd;
private String nullValueFlagCd;
private Preprocessor etlPreprocessor;
private int insertCount;
public I2b2Inserter(Map<String,String> config) throws ClassNotFoundException, SQLException{
this.nullUnitCd = "@"; // technically, null is allowed, but the demodata uses both '@' and ''
......@@ -62,9 +66,40 @@ public class I2b2Inserter extends AbstractObservationHandler implements Observat
this.nullValueFlagCd = "@";// technically, null is allowed, but the demodata uses both '@' and ''
// TODO nullBlob (technically null allowed, but '' is used in demodata)
this.nullModifierCd = "@"; // null not allowed, @ is used in demodata
insertCount = 0;
open(config);
}
private interface Preprocessor{
void preprocess(Observation fact)throws SQLException;
}
private class DistinctVisitPurge implements Preprocessor{
private I2b2Visit prev;
@Override
public void preprocess(Observation fact) throws SQLException{
I2b2Visit current = fact.getExtension(I2b2Visit.class);
if( current != prev ){
purgeVisit(current.getNum());
prev = current;
}
}
}
private class UniqueSourcePurge implements Preprocessor{
private Set<String> purgedSources;
public UniqueSourcePurge(){
purgedSources = new HashSet<>();
}
@Override
public void preprocess(Observation fact) throws SQLException {
if( !purgedSources.contains(fact.getSourceId()) ){
purgedSources.add(fact.getSourceId());
purgeSource(fact.getSourceId());
}
}
}
/**
* Deletes all observations with the given sourceId
* @param sourceId
......@@ -72,8 +107,9 @@ public class I2b2Inserter extends AbstractObservationHandler implements Observat
*/
public synchronized void purgeSource(String sourceId)throws SQLException{
deleteSource.setString(1, sourceId);
deleteSource.executeUpdate();
int rows = deleteSource.executeUpdate();
db.commit();
log.info("Deleted "+rows+" rows for sourcesystem_cd="+sourceId);
}
/**
......@@ -85,7 +121,7 @@ public class I2b2Inserter extends AbstractObservationHandler implements Observat
deleteVisit.setInt(1, encounter_num);
int rows = deleteVisit.executeUpdate();
db.commit();
log.info("Deleted "+rows+" observations for encounter_num="+encounter_num);
log.info("Deleted "+rows+" rows for encounter_num="+encounter_num);
}
private void prepareStatements(Map<String,String> props)throws SQLException{
// no value
......@@ -121,8 +157,19 @@ public class I2b2Inserter extends AbstractObservationHandler implements Observat
@Override
public void acceptOrException(Observation o) throws ObservationException{
if( etlPreprocessor != null ){
try{
etlPreprocessor.preprocess(o);
}catch( SQLException e ){
reportError(new ObservationException(o, e));
}
}
try {
insertFact(o, null, 1);
insertCount ++;
} catch (SQLException e) {
throw new ObservationException(o, e);
}
......@@ -312,6 +359,28 @@ public class I2b2Inserter extends AbstractObservationHandler implements Observat
} catch (SQLException e) {
throw new IOException(e);
}
log.info("Inserted "+insertCount+" facts");
}
@Override
public void setMeta(String key, String value) {
if( key.equals("etl.strategy") ){
switch( value ){
case "replace-visit":
etlPreprocessor = new DistinctVisitPurge();
break;
case "replace-source":
etlPreprocessor = new UniqueSourcePurge();
break;
case "insert":
etlPreprocessor = null;
break;
default:
throw new IllegalArgumentException("Unknown etl strategy "+value);
}
}else{
throw new IllegalArgumentException("Unknown meta key "+value);
}
}
}
......@@ -7,18 +7,22 @@ import de.sekmi.histream.ObservationException;
import de.sekmi.histream.ObservationHandler;
public abstract class AbstractObservationHandler implements ObservationHandler{
Consumer<ObservationException> errorHandler;
private Consumer<ObservationException> errorHandler;
@Override
public final void accept(Observation observation) {
try {
acceptOrException(observation);
} catch (ObservationException e) {
if( errorHandler != null )errorHandler.accept(e);
else throw new RuntimeException("Exception encountered, no error handler", e);
reportError(e);
} // don't catch runtime exceptions
}
protected void reportError(ObservationException e){
if( errorHandler != null )errorHandler.accept(e);
else throw new RuntimeException("Exception encountered, no error handler", e);
}
/**
* Accept method which allows exceptions. Exceptions are passed to the error handler
* specified via {@link #setErrorHandler(Consumer)}.
......
package de.sekmi.histream;
package de.sekmi.histream.impl;
import java.io.Closeable;
import java.io.File;
......@@ -10,10 +10,14 @@ import java.util.List;
import java.util.function.Consumer;
import java.util.logging.Logger;
import de.sekmi.histream.Extension;
import de.sekmi.histream.Observation;
import de.sekmi.histream.ObservationFactory;
import de.sekmi.histream.ObservationHandler;
import de.sekmi.histream.Plugin;
import de.sekmi.histream.conf.Configuration;
import de.sekmi.histream.conf.PluginConfig;
import de.sekmi.histream.conf.PluginRef;
import de.sekmi.histream.impl.ObservationFactoryImpl;
import de.sekmi.histream.io.AbstractObservationParser;
import de.sekmi.histream.io.FileObservationProvider;
import de.sekmi.histream.io.FileObservationProviderFactory;
......@@ -24,6 +28,7 @@ public class RunConfiguration implements Closeable{
private Plugin[] plugins;
private FileObservationProviderFactory[] fileFactories;
private Consumer<Observation> destinationChain;
private ObservationHandler[] destinationHandlers;
public RunConfiguration(Configuration conf) throws Exception{
factory = new ObservationFactoryImpl();
......@@ -64,16 +69,26 @@ public class RunConfiguration implements Closeable{
@SuppressWarnings("unchecked")
private void buildDestinationChain(Configuration conf){
PluginRef[] ds = conf.getDestinations();
ArrayList<ObservationHandler> a = new ArrayList<ObservationHandler>(ds.length);
destinationChain = null;
// chain subsequent destinations in order of configuration
for( int i=0; i<ds.length; i++ ){
Consumer<Observation> dest = (Consumer<Observation>)plugins[getPluginIndex(conf, ds[i].getPlugin())];
if( destinationChain == null )destinationChain = dest;
else destinationChain = destinationChain.andThen(dest);
if( dest instanceof ObservationHandler ){
a.add((ObservationHandler)dest);
}
}
destinationHandlers = a.toArray(new ObservationHandler[a.size()]);
}
public void processFile(FileObservationProvider provider){
for( ObservationHandler h : destinationHandlers ){
h.setMeta("etl.strategy", provider.getMeta("etl.strategy"));
}
AbstractObservationParser.nonNullStream(provider).forEach(destinationChain);
}
......@@ -100,7 +115,10 @@ public class RunConfiguration implements Closeable{
// TODO set error handlers for destinations
// if listeners specified, run as server (don't exit)
args = new String[]{"src/test/resources/dwh-eav.xml"};
args = new String[]{
"src/test/resources/dwh-eav.xml",
"src/test/resources/dwh-flat.txt"
};
if( args.length > 0 ){
for( int i=0; i<args.length; i++ ){
......
......@@ -283,17 +283,20 @@ public class FlatObservationProvider extends AbstractObservationParser implement
}
fact = factory.createObservation(record.getPatID(), record.getConcept(), ts);
if( ts == sourceTs ){
// try to use visit timestamp
ts = fact.getExtension(Visit.class).getStartTime();
if( ts != null )fact.setStartTime(ts);
}
// set other fields
fact.setEncounterId(record.getVisitID());
fact.setSourceId(sourceId);
fact.setSourceTimestamp(sourceTimestamp);
fact.setValue( parseValue(record) );
if( ts == sourceTs ){
// try to use visit timestamp
ts = fact.getExtension(Visit.class).getStartTime();
if( ts != null )fact.setStartTime(ts);
}
// TODO set remaining fields
record.getEndDate();
record.getLocation();
......
package de.sekmi.histream.io;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Map;
import de.sekmi.histream.ObservationFactory;
import de.sekmi.histream.Plugin;
public class FlatProviderFactory implements FileObservationProviderFactory, Plugin{
public FlatProviderFactory(Map<String,String> props) {
// no configuration needed
}
@Override
public void close() throws IOException {
// don't need to close anything
}
@Override
public FileObservationProvider forFile(File file, ObservationFactory factory) throws IOException {
return new FlatObservationProvider(factory, new FileInputStream(file));
}
}
......@@ -14,7 +14,7 @@ public class TestLoadConfig {
public void loadNewConfig(){
Configuration config = Configuration.fromFile(new File("src/test/resources/histream.xml"));
Assert.assertNotNull(config);
Assert.assertEquals(6, config.getPlugins().length);
Assert.assertEquals(5, config.getPlugins().length);
Assert.assertEquals(1, config.getDestinations().length);
Assert.assertEquals(config.getPlugins()[2], config.getDestinations()[0].getPlugin());
......
patid encounter concept type value units start end provider location flag
#@meta(source.id)=test
#@meta(source.timestamp)=2015-04-21T08:58:00
#@meta(etl.strategy)=replacesource
# etl strategie: replacesource|replacevisit|insert
#@meta(etl.strategy)=replace-source
# etl strategie: replace-source|replace-visit|insert
# fallid darf leer sein (nicht erlaubt bei replacevisit)
# dann wird pro patient ein neuer fall angelegt
# da die echte fallid nicht bertragen werden darf, kann der importer einen hash ber die echte id bilden
......
......@@ -27,8 +27,10 @@
<property name="project">demo</property>
<property name="nullProvider">LCS-I2B2:PROVIDERS</property>
</plugin>
<plugin class="de.sekmi.histream.io.XMLProviderFactory">
</plugin><!--
<plugin class="de.sekmi.histream.io.XMLProviderFactory"/>
<plugin class="de.sekmi.histream.io.FlatProviderFactory"/>
<!--
<plugin class="de.sekmi.histream.i2b2.services.HiveServer">
<property name="user">i2b2demodata</property>
</plugin>
......@@ -39,9 +41,12 @@
<!-- Wie kann die Konfiguration benutzt werden, wenn z.B. Dateien von der Kommandozeile
gelesen werden sollen und dann das Programm beendet werden soll?
-->
<!--
<source plugin="de.sekmi.histream.hl7.MLLPListener" />
<source plugin="de.sekmi.histream.impl.SAXObservationProvider" />
-->
<!-- TODO source umbenennen in listener (fuer netzwerkschnittstellen) -->
<destination plugin="de.sekmi.histream.i2b2.I2b2Inserter" />
<!-- kann dateien von Kommandozeile lesen -->
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment