Commit 5b3370d2 authored by R.W.Majeed's avatar R.W.Majeed

histream configuration run implemented (incomplete)

parent 5d826c27
package de.sekmi.histream.conf;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.logging.Logger;
import javax.xml.bind.JAXB;
import javax.xml.bind.annotation.XmlAccessType;
......@@ -10,6 +13,8 @@ import javax.xml.bind.annotation.XmlElementWrapper;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlType;
import de.sekmi.histream.Plugin;
/**
* Configuration for HIStream processing. Uses javax.xml.bind for loading XML files
......@@ -26,6 +31,7 @@ import javax.xml.bind.annotation.XmlType;
"destination"
})
public class Configuration {
private static final Logger log = Logger.getLogger(Configuration.class.getName());
@XmlElement(name="plugin")
@XmlElementWrapper(name="plugins", required=true)
protected PluginConfig[] plugins;
......@@ -43,4 +49,38 @@ public class Configuration {
public static final Configuration fromFile(File file){
return JAXB.unmarshal(file, Configuration.class);
}
/**
* Instantiates each plugin listed order (as specified in configuration).
* Instantiation is performed by looking up a constructor which accepts a single
* argument of type {@link Map} and calling that constructor with the properties
* specified for the plugin.
* <p>
* If an exception is thrown by the constructor if any plugin, the previously instantiated
* plugins are closed in reverse order. Any exceptions thrown by the close methods are suppressed
* by the constructor exception (via {@link Exception#addSuppressed(Throwable)).
* @return plugin instances
* @throws Exception exception thrown by any plugin during construction
*/
public Plugin[] createPluginInstances() throws Exception{
Plugin[] insts = new Plugin[plugins.length];
for( int i=0; i<plugins.length; i++ ){
try{
insts[i] = plugins[i].newInstance();
log.info("Plugin instance created: "+insts[i]);
}catch( Exception e ){
// close previously instantiated plugins in reverse order
for( int j=i-1; j>=0; j-- ){
try{
insts[j].close();
}catch( IOException f ){
e.addSuppressed(f);
}
}
throw e;
}
}
return insts;
}
}
......@@ -44,8 +44,10 @@ public class PluginConfig {
throw new Exception("Unable to find constructor",e);
}
HashMap<String, String> props = new HashMap<>();
for( PluginProperty prop : property ){
props.put(prop.name, prop.value);
if( property != null ){
for( PluginProperty prop : property ){
props.put(prop.name, prop.value);
}
}
return c.newInstance(props);
}
......
......@@ -4,6 +4,8 @@ import java.time.Instant;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import de.sekmi.histream.Observation;
import de.sekmi.histream.ObservationFactory;
......@@ -36,6 +38,10 @@ public class AbstractObservationParser {
public static Spliterator<Observation> nonNullSpliterator(Supplier<Observation> supplier){
return new NonNullSpliterator(supplier);
}
public static Stream<Observation> nonNullStream(Supplier<Observation> supplier){
return StreamSupport.stream(new NonNullSpliterator(supplier), false);
}
private static class NonNullSpliterator implements Spliterator<Observation>{
private Supplier<Observation> supplier;
......
package de.sekmi.histream;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.logging.Logger;
import de.sekmi.histream.conf.Configuration;
import de.sekmi.histream.conf.PluginConfig;
import de.sekmi.histream.conf.PluginRef;
import de.sekmi.histream.impl.ObservationFactoryImpl;
import de.sekmi.histream.io.AbstractObservationParser;
import de.sekmi.histream.io.FileObservationProvider;
import de.sekmi.histream.io.FileObservationProviderFactory;
public class RunConfiguration implements Closeable{
private static final Logger log = Logger.getLogger(RunConfiguration.class.getName());
private ObservationFactory factory;
private Plugin[] plugins;
private FileObservationProviderFactory[] fileFactories;
private Consumer<Observation> destinationChain;
public RunConfiguration(Configuration conf) throws Exception{
factory = new ObservationFactoryImpl();
plugins = conf.createPluginInstances();
List<FileObservationProviderFactory> ffs = new ArrayList<>();
for(int i=0; i<plugins.length; i++ ){
// register plugins
if( plugins[i] instanceof Extension<?> ){
// observation extension
factory.registerExtension((Extension<?>)plugins[i]);
// TODO log info
log.info("Observation extension added: "+plugins[i]);
}
if( plugins[i] instanceof FileObservationProviderFactory ){
// used to process files
ffs.add((FileObservationProviderFactory)plugins[i]);
}
}
if( ffs.size() > 0 ){
fileFactories = ffs.toArray(new FileObservationProviderFactory[ffs.size()]);
}
// build destination chain
buildDestinationChain(conf);
}
private int getPluginIndex(Configuration config, PluginConfig plugin){
for( int i=0; i<config.getPlugins().length; i++ ){
if( config.getPlugins()[i] == plugin )return i;
}
return -1;
}
@SuppressWarnings("unchecked")
private void buildDestinationChain(Configuration conf){
PluginRef[] ds = conf.getDestinations();
destinationChain = (Consumer<Observation>)plugins[getPluginIndex(conf, ds[0].getPlugin())];
// chain subsequent destinations in order of configuration
for( int i=1; i<ds.length; i++ ){
destinationChain.andThen((Consumer<Observation>)plugins[getPluginIndex(conf, ds[0].getPlugin())]);
}
}
public void processFile(FileObservationProvider provider){
AbstractObservationParser.nonNullStream(provider).forEach(destinationChain);
}
public FileObservationProvider providerForFile(File file){
FileObservationProvider p = null;
for( int i=0; i<fileFactories.length; i++ ){
try {
p = fileFactories[i].forFile(file, factory);
break;
} catch (IOException e) {
// unable to process file
}
}
return p;
}
public static void main(String args[])throws Exception{
System.out.println("HIStream starting");
long millis = System.currentTimeMillis();
Configuration conf = Configuration.fromFile(new File("src/test/resources/histream.xml"));
RunConfiguration rc = new RunConfiguration(conf);
// TODO set error handlers for destinations
// if listeners specified, run as server (don't exit)
if( args.length > 0 ){
for( int i=0; i<args.length; i++ ){
File file = new File(args[i]);
FileObservationProvider p = rc.providerForFile(file);
if( p != null ){
rc.processFile(p);
}else{
System.err.println("Unable to find parser for file "+file);
}
}
// files specified, run in batch mode
}
rc.close();
float duration = (System.currentTimeMillis() - millis)/1000f;
System.out.println("HIStream finished ("+duration+"s)");
}
@Override
public void close() throws IOException {
for(int i=0; i<plugins.length; i++ ){
plugins[i].close();
}
}
}
<histream>
<!-- TODO named property groups, which can be shared and referenced by plugins. this will reduce duplicate properties -->
<plugins>
<!-- plugins are loaded in order of occurance -->
<plugin class="de.sekmi.histream.i2b2.PostgresPatientStore">
<property name="user">i2b2demodata</property>
<property name="user">i2b2demodata</property>
<property name="host">localhost</property>
<property name="database">i2b2</property>
<property name="port">15432</property>
<property name="password"></property>
<property name="project">demo</property>
</plugin>
<plugin class="de.sekmi.histream.i2b2.PostgresVisitStore">
<property name="user">i2b2demodata</property>
<property name="user">i2b2demodata</property>
<property name="host">localhost</property>
<property name="database">i2b2</property>
<property name="port">15432</property>
<property name="password"></property>
<property name="project">demo</property>
</plugin>
<plugin class="de.sekmi.histream.i2b2.I2b2Inserter">
<property name="user">i2b2demodata</property>
</plugin>
<plugin class="de.sekmi.histream.impl.SAXObservationProvider">
<property name="user">i2b2demodata</property>
<property name="host">localhost</property>
<property name="database">i2b2</property>
<property name="port">15432</property>
<property name="password"></property>
<property name="project">demo</property>
<property name="nullProvider">LCS-I2B2:PROVIDERS</property>
</plugin>
<plugin class="de.sekmi.histream.io.XMLProviderFactory">
</plugin><!--
<plugin class="de.sekmi.histream.i2b2.services.HiveServer">
<property name="user">i2b2demodata</property>
</plugin>
<plugin class="de.sekmi.histream.hl7.MLLPListener">
<property name="port">123</property>
</plugin>
</plugin> -->
</plugins>
<!-- Wie kann die Konfiguration benutzt werden, wenn z.B. Dateien von der Kommandozeile
gelesen werden sollen und dann das Programm beendet werden soll?
-->
<source plugin="de.sekmi.histream.hl7.MLLPListener" />
<source plugin="de.sekmi.histream.impl.SAXObservationProvider" />
<!-- TODO source umbenennen in listener (fuer netzwerkschnittstellen) -->
<destination plugin="de.sekmi.histream.i2b2.I2b2Inserter" />
<!-- kann dateien von Kommandozeile lesen -->
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment