Skip to content
Commits on Source (6)
......@@ -4,12 +4,12 @@
<groupId>de.sekmi.histream</groupId>
<artifactId>histream-import</artifactId>
<version>0.14-SNAPSHOT</version>
<version>0.16-SNAPSHOT</version>
<parent>
<groupId>de.sekmi.histream</groupId>
<artifactId>histream</artifactId>
<version>0.14-SNAPSHOT</version>
<version>0.16-SNAPSHOT</version>
</parent>
<properties>
......@@ -50,13 +50,13 @@
<dependency>
<groupId>de.sekmi.histream</groupId>
<artifactId>histream-core</artifactId>
<version>0.14-SNAPSHOT</version>
<version>0.16-SNAPSHOT</version>
</dependency>
<!-- script support -->
<dependency>
<groupId>de.sekmi.histream</groupId>
<artifactId>histream-js</artifactId>
<version>0.14-SNAPSHOT</version>
<version>0.16-SNAPSHOT</version>
</dependency>
<!-- add later for sorting data tables
<dependency>
......
......@@ -14,6 +14,7 @@ import de.sekmi.histream.etl.config.DataSource;
import de.sekmi.histream.etl.config.EavTable;
import de.sekmi.histream.etl.config.Meta;
import de.sekmi.histream.etl.config.WideTable;
import de.sekmi.histream.etl.filter.FilterPostProcessingQueue;
import de.sekmi.histream.impl.ObservationFactoryImpl;
import de.sekmi.histream.impl.SimplePatientExtension;
import de.sekmi.histream.impl.SimpleVisitExtension;
......@@ -114,7 +115,7 @@ public class ETLObservationSupplier implements ObservationSupplier{
* @throws ParseException configuration error
*/
public ETLObservationSupplier(DataSource ds, ObservationFactory factory) throws IOException, ParseException {
this(ds,factory,ds.createFactQueue(factory));
this(ds,factory,new FilterPostProcessingQueue(ds, factory));
}
/**
* Construct a new observation supplier directly from a {@link DataSource}.
......
......@@ -16,9 +16,7 @@ import javax.xml.bind.annotation.XmlElementWrapper;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlSeeAlso;
import de.sekmi.histream.ObservationFactory;
import de.sekmi.histream.etl.FactGroupingQueue;
import de.sekmi.histream.etl.ScriptProcessingQueue;
import de.sekmi.histream.etl.filter.PostProcessingFilter;
/**
* Data source configuration.
......@@ -56,6 +54,8 @@ public class DataSource {
@XmlElement(name="eav-table")
EavTable[] eavTables;
@XmlElement(name="post-processing", required=false)
PostProcessing postProcessing;
/**
* Scripts to execute for each visit. A script
* can add or delete facts for the visit. If scripts
......@@ -111,6 +111,12 @@ public class DataSource {
return ds;
}
public PostProcessingFilter[] getPostProcessingFilters(){
if( postProcessing == null ){
return new PostProcessingFilter[]{};
}
return postProcessing.filter;
}
/**
* If scripts are present, an instance of {@link ScriptProcessingQueue}
* is returned. Otherwise an instance of {@link FactGroupingQueue}.
......@@ -118,7 +124,7 @@ public class DataSource {
* @return fact queue
* @throws IOException error
*/
public FactGroupingQueue createFactQueue(ObservationFactory factory) throws IOException{
// public FactGroupingQueue createFactQueue(ObservationFactory factory) throws IOException{
// if( true ){
// // TODO debug problems with visitpostprocessorqueue
// return new VisitPostProcessorQueue() {
......@@ -127,10 +133,10 @@ public class DataSource {
// }
// };
// }
if( scripts == null || scripts.length == 0 ){
return new FactGroupingQueue();
}else{
return new ScriptProcessingQueue(scripts, meta, factory);
}
}
// if( scripts == null || scripts.length == 0 ){
// return new FactGroupingQueue();
// }else{
// return new ScriptProcessingQueue(scripts, meta, factory);
// }
// }
}
package de.sekmi.histream.etl.config;
import javax.xml.bind.annotation.XmlElement;
import de.sekmi.histream.etl.filter.PostProcessingFilter;
public class PostProcessing {
@XmlElement
PostProcessingFilter[] filter;
}
package de.sekmi.histream.etl.filter;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlType;
import de.sekmi.histream.scripting.AbstractFacts;
@XmlType(name="duplicate-fact")
public class DuplicateFactFilter extends PostProcessingFilter{
@XmlElement
public String[] concept;
@Override
public void processVisit(AbstractFacts facts) {
// TODO Auto-generated method stub
}
}
package de.sekmi.histream.etl;
package de.sekmi.histream.etl.filter;
import java.io.IOException;
import java.io.Reader;
import java.io.UncheckedIOException;
import javax.script.ScriptException;
import de.sekmi.histream.ObservationFactory;
import de.sekmi.histream.etl.config.Meta;
import de.sekmi.histream.etl.config.Script;
import de.sekmi.histream.etl.VisitPostProcessorQueue;
import de.sekmi.histream.etl.config.DataSource;
import de.sekmi.histream.scripting.AbstractFacts;
import de.sekmi.histream.scripting.EncounterScriptEngine;
public class ScriptProcessingQueue extends VisitPostProcessorQueue {
public class FilterPostProcessingQueue extends VisitPostProcessorQueue {
private EncounterScriptEngine engine;
private PostProcessingFilter[] filters;
public ScriptProcessingQueue(Script[] scripts, Meta meta, ObservationFactory factory) throws IOException {
public FilterPostProcessingQueue(DataSource ds, ObservationFactory factory) throws IOException {
try {
engine = new EncounterScriptEngine();
} catch (ScriptException e) {
......@@ -22,25 +23,37 @@ public class ScriptProcessingQueue extends VisitPostProcessorQueue {
}
engine.setObservationFactory(factory);
// load scripts
for( int i=0; i<scripts.length; i++ ){
try( Reader r = scripts[i].openReader(meta) ){
engine.addScript(r, meta.getSourceId(), scripts[i].getTimestamp(meta));
} catch (ScriptException e) {
throw new IOException("Script error in script "+i, e);
filters = ds.getPostProcessingFilters();
// load script files into engine
for( int i=0; i<filters.length; i++ ){
if( filters[i] instanceof ScriptFilter ){
((ScriptFilter)filters[i]).loadIntoEngine(engine, ds.getMeta());
}
}
// load scripts
// for( int i=0; i<ds.length; i++ ){
// try( Reader r = scripts[i].openReader(meta) ){
// engine.addScript(r, meta.getSourceId(), scripts[i].getTimestamp(meta));
// } catch (ScriptException e) {
// throw new IOException("Script error in script "+i, e);
// }
// }
}
@Override
protected void postProcessVisit() {
if( getVisit() == null ){
return; // don't want null visits
}
try {
engine.processEncounter(getPatient(), getVisit(), getVisitFacts());
} catch (ScriptException e) {
IOException io = new IOException("Error during script execution for patient="+getPatient().getId()+", visit="+getVisit().getId(), e);
throw new UncheckedIOException(io);
AbstractFacts facts = engine.wrapEncounterFacts(getPatient(), getVisit(), getVisitFacts());
for( int i=0; i<filters.length; i++ ){
try {
filters[i].processVisit(facts);
} catch (IOException e) {
// TODO UncheckedIOException might be unwrapped and message might be lost, verify this
throw new UncheckedIOException("Filter execution failed for patient="+getPatient().getId()+", visit="+getVisit().getId(), e);
}
}
}
......
package de.sekmi.histream.etl.filter;
import java.io.IOException;
import javax.xml.bind.annotation.XmlSeeAlso;
import de.sekmi.histream.scripting.AbstractFacts;
@XmlSeeAlso({DuplicateFactFilter.class, ScriptFilter.class})
public abstract class PostProcessingFilter {
public abstract void processVisit(AbstractFacts facts)throws IOException;
}
package de.sekmi.histream.etl.filter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.io.StringReader;
import java.net.URL;
import java.time.Instant;
import javax.script.ScriptException;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlTransient;
import javax.xml.bind.annotation.XmlType;
import de.sekmi.histream.etl.config.Meta;
import de.sekmi.histream.scripting.AbstractFacts;
import de.sekmi.histream.scripting.EncounterScriptEngine;
@XmlType(name="javascript")
public class ScriptFilter extends PostProcessingFilter{
/**
* Character encoding for an external script file
*/
@XmlAttribute
String charset;
/**
* Specifies the URL of an external script file. The URL can be relative
* to the configuration.
*/
@XmlAttribute
String src;
/**
* Literal script source
*/
@XmlElement
String script;
@XmlTransient
private int loadedIndex;
@XmlTransient
private EncounterScriptEngine engine;
public Reader openReader(Meta meta) throws IOException{
if( script != null ){
return new StringReader(script);
}else if( src != null ){
URL url = new URL(meta.getLocation(), src);
return new InputStreamReader(url.openStream(), this.charset);
}else{
return null;
}
}
public Instant getTimestamp(Meta meta) throws IOException{
// TODO use last modified of external script files
return Instant.ofEpochMilli(meta.getLastModified());
}
public void loadIntoEngine(EncounterScriptEngine engine, Meta meta) throws IOException{
this.engine = engine;
try( Reader r = openReader(meta) ){
this.loadedIndex = engine.addScript(r, meta.getSourceId(), getTimestamp(meta));
} catch (ScriptException e) {
throw new IOException("Script error in filter[type=javascript]", e);
}
}
@Override
public void processVisit(AbstractFacts facts) throws IOException {
try {
engine.processSingle(facts, loadedIndex);
} catch (ScriptException e) {
throw new IOException("Script execution failed", e);
}
}
}
......@@ -75,6 +75,10 @@ public class Validator extends AbstractObservationHandler implements Transformat
// clear concepts
concepts.clear();
// add concept
if( duplicateConceptCheck ){
concepts.add(new StartPlusConcept(t));
}
prevPatient = patid; // remember patient to suppress errors for the same patient
......
......@@ -62,11 +62,11 @@ public class TestETLSupplier {
Assert.assertNotNull("Source id metadata required",os.getMeta(ObservationSupplier.META_SOURCE_ID));
//Assert.assertNotNull("Source timestamp metadata required",os.getMeta(ObservationSupplier.META_SOURCE_TIMESTAMP));
// verify all scripts are loaded
ObservationFactory f = new ObservationFactoryImpl();
FactGroupingQueue fq = os.getConfiguration().createFactQueue(f);
Assert.assertTrue(fq instanceof ScriptProcessingQueue);
ScriptProcessingQueue sq = (ScriptProcessingQueue)fq;
Assert.assertEquals(2, sq.getNumScripts());
// ObservationFactory f = new ObservationFactoryImpl();
// FactGroupingQueue fq = os.getConfiguration().createFactQueue(f);
// Assert.assertTrue(fq instanceof ScriptProcessingQueue);
// ScriptProcessingQueue sq = (ScriptProcessingQueue)fq;
// Assert.assertEquals(2, sq.getNumScripts());
}
@Test
public void testXMLConversion() throws Exception{
......@@ -141,5 +141,27 @@ public class TestETLSupplier {
System.out.println("Natrium-start: "+o.getStartTime());
}
}
@Test
public void verifyInlineScriptExecution() throws IOException, java.text.ParseException{
List<Observation> all = new ArrayList<>();
os.stream().filter( o -> o.getConceptId().equals("cnt") ).forEach(all::add);
// for( Observation o : all ){
// System.out.println("cnt: "+o.getStartTime()+", "+o.getValue().getStringValue());
// }
// should have a cnt fact for each visit
Assert.assertEquals(4, all.size());
}
@Test
public void verifyExternalScriptExecution() throws IOException, java.text.ParseException{
List<Observation> all = new ArrayList<>();
os.stream().filter( o -> o.getConceptId().equals("ext-js") ).forEach(all::add);
// for( Observation o : all ){
// System.out.println("cnt: "+o.getStartTime()+", "+o.getValue().getStringValue());
// }
// should have a cnt fact for each visit
Assert.assertEquals(4, all.size());
}
}
......@@ -12,6 +12,8 @@ import org.junit.Assert;
import org.junit.Test;
import de.sekmi.histream.etl.config.DataSource;
import de.sekmi.histream.etl.filter.DuplicateFactFilter;
import de.sekmi.histream.etl.filter.ScriptFilter;
public class TestMarshall {
......@@ -65,6 +67,13 @@ public class TestMarshall {
Assert.assertNotNull(ds.eavTables[0].virtualColumnMap);
Assert.assertNotNull(ds.eavTables[0].virtualColumnMap.get("f_eav_x"));
// check post processing
Assert.assertNotNull(ds.postProcessing);
Assert.assertEquals(3, ds.postProcessing.filter.length);
Assert.assertEquals(DuplicateFactFilter.class, ds.postProcessing.filter[0].getClass());
DuplicateFactFilter f = (DuplicateFactFilter)ds.postProcessing.filter[0];
ScriptFilter sf = (ScriptFilter)ds.postProcessing.filter[1];
// check script
/* Assert.assertEquals(2, ds.scripts.length);
Assert.assertNull(ds.scripts[0].src);
......
......@@ -51,4 +51,22 @@ public class TestValidator {
Streams.transfer(os, v);
}
}
@Test
public void validateData4() throws Exception{
// duplicate concepts
try( ObservationSupplier os = ETLObservationSupplier.load(getClass().getResource("/data/test-4-datasource.xml")) ){
Validator v = new Validator(true,true);
v.setErrorHandler(e -> {throw new RuntimeException(e);});
Streams.transfer(os, v);
}catch( RuntimeException e ){
if( e.getCause() instanceof DuplicateConceptException ){
// expected behaviour
return;
}else{
// unexpected exceptoin
throw e;
}
}
Assert.fail("Exception expected");
}
}
......@@ -35,7 +35,6 @@
<visit-table>
<source xsi:type="csv-file">
<url>test-1-visits.txt</url>
<type>text/csv</type>
<separator>\t</separator>
</source>
<idat>
......@@ -139,13 +138,18 @@
<ignore column="user" xsi:type="string"/>
</eav-table>
<!-- scripts are run for each complete encounter in the order of occurrence -->
<script type="text/javascript"><![CDATA[
if( facts.get("natrium") && facts.get("kalium") ){
facts.add("nakl").value(1);
}
facts.add("cnt").value(facts.size());
]]>
</script>
<!-- TODO check why second script does not work -->
<script type="text/javascript" charset="UTF-8" src="test-1-script.js"/>
<post-processing>
<filter xsi:type="duplicate-fact">
<concept>kalium</concept>
</filter>
<filter xsi:type="javascript">
<script><![CDATA[
if( facts.get("natrium") && facts.get("kalium") ){
facts.add("nakl").value(1);
}
facts.add("cnt").value(facts.size());
]]></script>
</filter>
<filter xsi:type="javascript" charset="UTF-8" src="test-1-script.js"/>
</post-processing>
</datasource>
// add a fact which indicates the number of observations
// in this encounter
facts.add("COUNT").value(7);
facts.add("ext-js").value(24);
......@@ -35,7 +35,6 @@
<visit-table>
<source xsi:type="csv-file">
<url>test-2-visits-duplicate.txt</url>
<type>text/csv</type>
<separator>\t</separator>
</source>
<idat>
......
......@@ -35,7 +35,6 @@
<visit-table>
<source xsi:type="csv-file">
<url>test-3-visits-empty.txt</url>
<type>text/csv</type>
<separator>\t</separator>
</source>
<idat>
......
<?xml version="1.0" encoding="UTF-8"?>
<datasource version="1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<!-- Duplicate concept for patient -->
<meta>
<id>LTx_PH_ILD_COPD</id>
<etl-strategy>replace-source</etl-strategy>
</meta>
<patient-table>
<source xsi:type="csv-file">
<url>test-4-table.csv</url>
<separator>;</separator>
</source>
<idat>
<!-- Vorname, Nachname, Geschlecht unbekannt -->
<patient-id column="Pheno-ID"/>
<given-name column="Pheno-ID"/>
<surname column="Pheno-ID"/>
</idat>
<ignore xsi:type="string" column="*"/>
</patient-table>
<visit-table>
<source xsi:type="csv-file">
<url>test-4-table.csv</url>
<separator>;</separator>
</source>
<idat>
<patient-id column="Pheno-ID"/>
<visit-id column="Zeitpunkt"/>
<start column="Zeitpunkt" format="d.M.u[ H[:m[:s]]]"/>
</idat>
<ignore xsi:type="string" column="*"/>
</visit-table>
<eav-table>
<source xsi:type="csv-file">
<url>test-4-table.csv</url>
<separator>;</separator>
</source>
<idat>
<patient-id column="Pheno-ID"/>
<visit-id column="Zeitpunkt"/>
</idat>
<mdat>
<concept column="Export-Param"/>
<start column="Zeitpunkt" format="d.M.u[ H[:m[:s]]]"/>
<end column="Zeitpunkt" format="d.M.u[ H[:m[:s]]]"/>
<type constant-value="string"/>
<value column="Wert" na=""/>
<unit column="Einheiten" na=""/>
</mdat>
<virtual>
<value column="Diagnose" xsi:type="string" na="">
<map>
<otherwise log-warning="Unexpected value" action="drop-fact" />
</map>
</value>
<value column="Zusatzdiagnose" xsi:type="string" na="">
<map>
<case value="IPF" set-concept="B:DP-ID-IPF" set-value=""/>
<case value="UIP" set-concept="B:DP-ID-IPF" set-value=""/>
<otherwise log-warning="Unexpected value" action="drop-fact" />
</map>
</value>
<value column="Probenart" xsi:type="string" na="">
<map>
<otherwise action="drop-fact"/>
</map>
</value>
<value column="Diesease Area" xsi:type="string" na="">
<map>
<otherwise action="drop-fact"/>
</map>
</value>
</virtual>
<ignore xsi:type="string" column="*"/>
</eav-table>
</datasource>
Pheno-ID;Bereich;Zeitpunkt;Export-Param;Wert;Einheiten;
Mmqp212;Biobank-IDs;21.04.2016;Zusatzdiagnose;UIP;;
Mmqp212;Biobank-IDs;21.04.2016;Zusatzdiagnose;IPF;;
\ No newline at end of file
......@@ -4,12 +4,12 @@
<groupId>de.sekmi.histream</groupId>
<artifactId>histream-js</artifactId>
<version>0.14-SNAPSHOT</version>
<version>0.16-SNAPSHOT</version>
<parent>
<groupId>de.sekmi.histream</groupId>
<artifactId>histream</artifactId>
<version>0.14-SNAPSHOT</version>
<version>0.16-SNAPSHOT</version>
</parent>
<build>
......
......@@ -54,21 +54,25 @@ public class EncounterScriptEngine {
scripts = new LinkedList<>();
}
public void addScript(String script, String sourceId, Instant timestamp) throws ScriptException{
public int addScript(String script, String sourceId, Instant timestamp) throws ScriptException{
int index = scripts.size();
scripts.add(new Script(((Compilable)engine).compile(script), sourceId, timestamp));
return index;
}
public void addScript(URL location, String charset, String sourceId) throws ScriptException, IOException{
public int addScript(URL location, String charset, String sourceId) throws ScriptException, IOException{
URLConnection conn = location.openConnection();
Instant timestamp = Instant.ofEpochMilli(conn.getLastModified());
try(
InputStream in = conn.getInputStream();
Reader reader = new InputStreamReader(in, charset)
){
addScript(reader, sourceId, timestamp);
return addScript(reader, sourceId, timestamp);
}
}
public void addScript(Reader reader, String sourceId, Instant timestamp) throws ScriptException{
public int addScript(Reader reader, String sourceId, Instant timestamp) throws ScriptException{
int index = scripts.size();
scripts.add(new Script(((Compilable)engine).compile(reader), sourceId, timestamp));
return index;
}
public int getScriptCount(){
......@@ -78,7 +82,7 @@ public class EncounterScriptEngine {
public void setObservationFactory(ObservationFactory factory){
this.factory = factory;
}
private void process(AbstractFacts facts) throws ScriptException{
public void processAll(AbstractFacts facts) throws ScriptException{
Bindings b = engine.createBindings();
b.put("facts", facts);
for( Script script : scripts ){
......@@ -87,14 +91,28 @@ public class EncounterScriptEngine {
}
// TODO is there a way to add information which script threw an exception?
}
public void processEncounter(String patientId, String encounterId, DateTimeAccuracy defaultStartTime, List<Observation> facts) throws ScriptException{
public void processSingle(AbstractFacts facts, int scriptIndex) throws ScriptException{
Bindings b = engine.createBindings();
b.put("facts", facts);
Script script = scripts.get(scriptIndex);
facts.setSource(script.source);
script.script.eval(b);
}
public AbstractFacts wrapEncounterFacts(String patientId, String encounterId, DateTimeAccuracy defaultStartTime, List<Observation> facts){
SimpleFacts f = new SimpleFacts(factory, patientId, encounterId, defaultStartTime);
f.setObservations(facts);
process(f);
return f;
}
public void processEncounter(Patient patient, Visit visit, List<Observation> facts) throws ScriptException{
public AbstractFacts wrapEncounterFacts(Patient patient, Visit visit, List<Observation> facts){
VisitExtensionFacts f = new VisitExtensionFacts(factory, patient, visit);
f.setObservations(facts);
process(f);
return f;
}
// TODO add method to execute a single script
public void processEncounter(String patientId, String encounterId, DateTimeAccuracy defaultStartTime, List<Observation> facts) throws ScriptException {
processAll( wrapEncounterFacts(patientId, encounterId, defaultStartTime, facts) );
}
}