Commit a2b66e83 authored by R.W.Majeed's avatar R.W.Majeed

script processing moved to generalized filter processing

parent 36ed9849
......@@ -14,6 +14,7 @@ import de.sekmi.histream.etl.config.DataSource;
import de.sekmi.histream.etl.config.EavTable;
import de.sekmi.histream.etl.config.Meta;
import de.sekmi.histream.etl.config.WideTable;
import de.sekmi.histream.etl.filter.FilterPostProcessingQueue;
import de.sekmi.histream.impl.ObservationFactoryImpl;
import de.sekmi.histream.impl.SimplePatientExtension;
import de.sekmi.histream.impl.SimpleVisitExtension;
......@@ -114,7 +115,7 @@ public class ETLObservationSupplier implements ObservationSupplier{
* @throws ParseException configuration error
*/
public ETLObservationSupplier(DataSource ds, ObservationFactory factory) throws IOException, ParseException {
this(ds,factory,ds.createFactQueue(factory));
this(ds,factory,new FilterPostProcessingQueue(ds, factory));
}
/**
* Construct a new observation supplier directly from a {@link DataSource}.
......
package de.sekmi.histream.etl;
import java.io.IOException;
import java.io.Reader;
import java.io.UncheckedIOException;
import javax.script.ScriptException;
import de.sekmi.histream.ObservationFactory;
import de.sekmi.histream.etl.config.Meta;
import de.sekmi.histream.etl.config.Script;
import de.sekmi.histream.scripting.EncounterScriptEngine;
public class ScriptProcessingQueue extends VisitPostProcessorQueue {
private EncounterScriptEngine engine;
public ScriptProcessingQueue(Script[] scripts, Meta meta, ObservationFactory factory) throws IOException {
try {
engine = new EncounterScriptEngine();
} catch (ScriptException e) {
throw new IOException("Unable to create script engine", e);
}
engine.setObservationFactory(factory);
// load scripts
for( int i=0; i<scripts.length; i++ ){
try( Reader r = scripts[i].openReader(meta) ){
engine.addScript(r, meta.getSourceId(), scripts[i].getTimestamp(meta));
} catch (ScriptException e) {
throw new IOException("Script error in script "+i, e);
}
}
}
@Override
protected void postProcessVisit() {
if( getVisit() == null ){
return; // don't want null visits
}
try {
engine.processEncounter(getPatient(), getVisit(), getVisitFacts());
} catch (ScriptException e) {
IOException io = new IOException("Error during script execution for patient="+getPatient().getId()+", visit="+getVisit().getId(), e);
throw new UncheckedIOException(io);
}
}
public int getNumScripts(){
return engine.getScriptCount();
}
}
......@@ -16,9 +16,7 @@ import javax.xml.bind.annotation.XmlElementWrapper;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlSeeAlso;
import de.sekmi.histream.ObservationFactory;
import de.sekmi.histream.etl.FactGroupingQueue;
import de.sekmi.histream.etl.ScriptProcessingQueue;
import de.sekmi.histream.etl.filter.PostProcessingFilter;
/**
* Data source configuration.
......@@ -56,6 +54,8 @@ public class DataSource {
@XmlElement(name="eav-table")
EavTable[] eavTables;
@XmlElement(name="post-processing", required=false)
PostProcessing postProcessing;
/**
* Scripts to execute for each visit. A script
* can add or delete facts for the visit. If scripts
......@@ -111,6 +111,12 @@ public class DataSource {
return ds;
}
public PostProcessingFilter[] getPostProcessingFilters(){
if( postProcessing == null ){
return new PostProcessingFilter[]{};
}
return postProcessing.filter;
}
/**
* If scripts are present, an instance of {@link ScriptProcessingQueue}
* is returned. Otherwise an instance of {@link FactGroupingQueue}.
......@@ -118,7 +124,7 @@ public class DataSource {
* @return fact queue
* @throws IOException error
*/
public FactGroupingQueue createFactQueue(ObservationFactory factory) throws IOException{
// public FactGroupingQueue createFactQueue(ObservationFactory factory) throws IOException{
// if( true ){
// // TODO debug problems with visitpostprocessorqueue
// return new VisitPostProcessorQueue() {
......@@ -127,10 +133,10 @@ public class DataSource {
// }
// };
// }
if( scripts == null || scripts.length == 0 ){
return new FactGroupingQueue();
}else{
return new ScriptProcessingQueue(scripts, meta, factory);
}
}
// if( scripts == null || scripts.length == 0 ){
// return new FactGroupingQueue();
// }else{
// return new ScriptProcessingQueue(scripts, meta, factory);
// }
// }
}
package de.sekmi.histream.etl.filter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.io.StringReader;
import java.net.URL;
import java.time.Instant;
import javax.script.ScriptException;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlTransient;
import javax.xml.bind.annotation.XmlType;
import de.sekmi.histream.etl.config.Meta;
import de.sekmi.histream.scripting.AbstractFacts;
import de.sekmi.histream.scripting.EncounterScriptEngine;
@XmlType(name="javascript")
public class ScriptFilter extends PostProcessingFilter{
/**
* Character encoding for an external script file
*/
@XmlAttribute
String charset;
/**
* Specifies the URL of an external script file. The URL can be relative
* to the configuration.
*/
@XmlAttribute
String src;
/**
* Literal script source
*/
@XmlElement
String script;
@XmlTransient
private int loadedIndex;
@XmlTransient
private EncounterScriptEngine engine;
public Reader openReader(Meta meta) throws IOException{
if( script != null ){
return new StringReader(script);
}else if( src != null ){
URL url = new URL(meta.getLocation(), src);
return new InputStreamReader(url.openStream(), this.charset);
}else{
return null;
}
}
public Instant getTimestamp(Meta meta) throws IOException{
// TODO use last modified of external script files
return Instant.ofEpochMilli(meta.getLastModified());
}
public void loadIntoEngine(EncounterScriptEngine engine, Meta meta) throws IOException{
this.engine = engine;
try( Reader r = openReader(meta) ){
this.loadedIndex = engine.addScript(r, meta.getSourceId(), getTimestamp(meta));
} catch (ScriptException e) {
throw new IOException("Script error in filter[type=javascript]", e);
}
}
@Override
public void processVisit(AbstractFacts facts) throws IOException {
try {
engine.processSingle(facts, loadedIndex);
} catch (ScriptException e) {
throw new IOException("Script execution failed", e);
}
}
}
......@@ -62,11 +62,11 @@ public class TestETLSupplier {
Assert.assertNotNull("Source id metadata required",os.getMeta(ObservationSupplier.META_SOURCE_ID));
//Assert.assertNotNull("Source timestamp metadata required",os.getMeta(ObservationSupplier.META_SOURCE_TIMESTAMP));
// verify all scripts are loaded
ObservationFactory f = new ObservationFactoryImpl();
FactGroupingQueue fq = os.getConfiguration().createFactQueue(f);
Assert.assertTrue(fq instanceof ScriptProcessingQueue);
ScriptProcessingQueue sq = (ScriptProcessingQueue)fq;
Assert.assertEquals(2, sq.getNumScripts());
// ObservationFactory f = new ObservationFactoryImpl();
// FactGroupingQueue fq = os.getConfiguration().createFactQueue(f);
// Assert.assertTrue(fq instanceof ScriptProcessingQueue);
// ScriptProcessingQueue sq = (ScriptProcessingQueue)fq;
// Assert.assertEquals(2, sq.getNumScripts());
}
@Test
public void testXMLConversion() throws Exception{
......@@ -141,5 +141,27 @@ public class TestETLSupplier {
System.out.println("Natrium-start: "+o.getStartTime());
}
}
@Test
public void verifyInlineScriptExecution() throws IOException, java.text.ParseException{
List<Observation> all = new ArrayList<>();
os.stream().filter( o -> o.getConceptId().equals("cnt") ).forEach(all::add);
// for( Observation o : all ){
// System.out.println("cnt: "+o.getStartTime()+", "+o.getValue().getStringValue());
// }
// should have a cnt fact for each visit
Assert.assertEquals(4, all.size());
}
@Test
public void verifyExternalScriptExecution() throws IOException, java.text.ParseException{
List<Observation> all = new ArrayList<>();
os.stream().filter( o -> o.getConceptId().equals("ext-js") ).forEach(all::add);
// for( Observation o : all ){
// System.out.println("cnt: "+o.getStartTime()+", "+o.getValue().getStringValue());
// }
// should have a cnt fact for each visit
Assert.assertEquals(4, all.size());
}
}
......@@ -12,6 +12,8 @@ import org.junit.Assert;
import org.junit.Test;
import de.sekmi.histream.etl.config.DataSource;
import de.sekmi.histream.etl.filter.DuplicateFactFilter;
import de.sekmi.histream.etl.filter.ScriptFilter;
public class TestMarshall {
......@@ -65,6 +67,13 @@ public class TestMarshall {
Assert.assertNotNull(ds.eavTables[0].virtualColumnMap);
Assert.assertNotNull(ds.eavTables[0].virtualColumnMap.get("f_eav_x"));
// check post processing
Assert.assertNotNull(ds.postProcessing);
Assert.assertEquals(3, ds.postProcessing.filter.length);
Assert.assertEquals(DuplicateFactFilter.class, ds.postProcessing.filter[0].getClass());
DuplicateFactFilter f = (DuplicateFactFilter)ds.postProcessing.filter[0];
ScriptFilter sf = (ScriptFilter)ds.postProcessing.filter[1];
// check script
/* Assert.assertEquals(2, ds.scripts.length);
Assert.assertNull(ds.scripts[0].src);
......
......@@ -35,7 +35,6 @@
<visit-table>
<source xsi:type="csv-file">
<url>test-1-visits.txt</url>
<type>text/csv</type>
<separator>\t</separator>
</source>
<idat>
......@@ -139,13 +138,18 @@
<ignore column="user" xsi:type="string"/>
</eav-table>
<!-- scripts are run for each complete encounter in the order of occurrence -->
<script type="text/javascript"><![CDATA[
if( facts.get("natrium") && facts.get("kalium") ){
facts.add("nakl").value(1);
}
facts.add("cnt").value(facts.size());
]]>
</script>
<!-- TODO check why second script does not work -->
<script type="text/javascript" charset="UTF-8" src="test-1-script.js"/>
<post-processing>
<filter xsi:type="duplicate-fact">
<concept>kalium</concept>
</filter>
<filter xsi:type="javascript">
<script><![CDATA[
if( facts.get("natrium") && facts.get("kalium") ){
facts.add("nakl").value(1);
}
facts.add("cnt").value(facts.size());
]]></script>
</filter>
<filter xsi:type="javascript" charset="UTF-8" src="test-1-script.js"/>
</post-processing>
</datasource>
// add a fact which indicates the number of observations
// in this encounter
facts.add("COUNT").value(7);
facts.add("ext-js").value(24);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment