Commit 1a5b5d2b authored by R.W.Majeed's avatar R.W.Majeed

Changed i2b2 extensions/inserter to use Map in constructor

parent c51b9485
......@@ -3,18 +3,18 @@ package de.sekmi.histream.i2b2;
import java.io.Closeable;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Enumeration;
import java.util.Properties;
import java.util.Map;
import java.util.logging.Logger;
import de.sekmi.histream.Modifier;
import de.sekmi.histream.Observation;
import de.sekmi.histream.ObservationException;
import de.sekmi.histream.ObservationHandler;
import de.sekmi.histream.Plugin;
import de.sekmi.histream.Value;
import de.sekmi.histream.impl.AbstractObservationHandler;
......@@ -44,7 +44,7 @@ import de.sekmi.histream.impl.AbstractObservationHandler;
* @author marap1
*
*/
public class I2b2Inserter extends AbstractObservationHandler implements ObservationHandler, Closeable{
public class I2b2Inserter extends AbstractObservationHandler implements ObservationHandler, Closeable, Plugin{
private static final Logger log = Logger.getLogger(I2b2Inserter.class.getName());
private Connection db;
private PreparedStatement insertFact;
......@@ -56,12 +56,13 @@ public class I2b2Inserter extends AbstractObservationHandler implements Observat
private String nullModifierCd;
private String nullValueFlagCd;
public I2b2Inserter(){
public I2b2Inserter(Map<String,String> config) throws ClassNotFoundException, SQLException{
this.nullUnitCd = "@"; // technically, null is allowed, but the demodata uses both '@' and ''
this.nullLocationCd = "@"; // technically, null is allowed, but the demodata only uses '@'
this.nullValueFlagCd = "@";// technically, null is allowed, but the demodata uses both '@' and ''
// TODO nullBlob (technically null allowed, but '' is used in demodata)
this.nullModifierCd = "@"; // null not allowed, @ is used in demodata
open(config);
}
/**
......@@ -86,7 +87,7 @@ public class I2b2Inserter extends AbstractObservationHandler implements Observat
db.commit();
log.info("Deleted "+rows+" observations for encounter_num="+encounter_num);
}
private void prepareStatements()throws SQLException{
private void prepareStatements(Map<String,String> props)throws SQLException{
// no value
insertFact = db.prepareStatement(""
+ "INSERT INTO observation_fact ("
......@@ -108,25 +109,13 @@ public class I2b2Inserter extends AbstractObservationHandler implements Observat
/**
* Opens a database connection and prepares statements
* @throws SQLException
* @throws ClassNotFoundException
*/
public void open()throws SQLException{
Properties props = new Properties();
props.put("user", "i2b2demodata");
props.put("host", "localhost");
props.put("database", "i2b2");
props.put("port", "15432");
props.put("password", "");
props.put("nullProvider", "LCS-I2B2:PROVIDERS");
try {
Class.forName("org.postgresql.Driver");
} catch (ClassNotFoundException e) {
throw new SQLException(e);
}
db = DriverManager.getConnection("jdbc:postgresql://"+props.getProperty("host")+":"+props.getProperty("port")+"/"+props.getProperty("database"), props);
private void open(Map<String,String> props)throws SQLException, ClassNotFoundException{
db = PostgresExtension.getConnection(props);
db.setAutoCommit(false);
this.nullProviderId = props.getProperty("nullProvider");
prepareStatements();
this.nullProviderId = props.get("nullProvider");
prepareStatements(props);
}
......
......@@ -5,6 +5,7 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Map;
import java.util.Properties;
import de.sekmi.histream.DateTimeAccuracy;
......@@ -21,16 +22,25 @@ import de.sekmi.histream.Plugin;
* @param <T>
*/
public abstract class PostgresExtension<T> implements Extension<T>, Plugin {
static final String driver = "org.postgresql.Driver";
protected Properties config;
private static final int defaultFetchSize = 10000;
private static final String driver = "org.postgresql.Driver";
protected Map<String,String> config;
protected Connection db;
public PostgresExtension(Properties configuration){
public PostgresExtension(Map<String,String> configuration){
this.config = configuration;
}
public void open() throws ClassNotFoundException, SQLException{
public static Connection getConnection(Map<String,String> props) throws SQLException, ClassNotFoundException{
Class.forName(driver);
db = DriverManager.getConnection("jdbc:postgresql://"+config.getProperty("host")+":"+config.getProperty("port")+"/"+config.getProperty("database"), config);
Properties jdbcProps = new Properties();
// TODO put only properties relevant to jdbc
jdbcProps.putAll(props);
return DriverManager.getConnection("jdbc:postgresql://"+props.get("host")+":"+props.get("port")+"/"+props.get("database"), jdbcProps);
}
public void open() throws ClassNotFoundException, SQLException{
db = getConnection(config);
prepareStatements();
}
......@@ -51,6 +61,19 @@ public abstract class PostgresExtension<T> implements Extension<T>, Plugin {
if( dateTime == null )return null;
else return Timestamp.valueOf(dateTime.getLocal());
}
/**
* Get the configuration setting for fetchSize if configured. Otherwise
* the default 10000 is returned.
* @return configured fetch size, or 10000 otherwise.
*/
public int getFetchSize(){
if( config.containsKey("fetchSize") ){
return Integer.parseInt(config.get("fetchSize"));
}else{
return defaultFetchSize;
}
}
/**
* Write updates to disk. The method is automatically called by {@link #close()}.
......
......@@ -11,7 +11,7 @@ import java.util.Enumeration;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Properties;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
......@@ -19,6 +19,7 @@ import java.util.logging.Logger;
import de.sekmi.histream.DateTimeAccuracy;
import de.sekmi.histream.Observation;
import de.sekmi.histream.ext.ExternalSourceType;
......@@ -92,9 +93,9 @@ public class PostgresPatientStore extends PostgresExtension<I2b2Patient> impleme
* fetchSize (int, 10000)
* @param configuration
*/
public PostgresPatientStore(Properties configuration) {
public PostgresPatientStore(Map<String,String> configuration) {
super(configuration);
this.projectId = config.getProperty("project");
this.projectId = config.get("project");
if( projectId == null ){
log.warning("property project is null, some things might fail");
}
......@@ -131,6 +132,7 @@ public class PostgresPatientStore extends PostgresExtension<I2b2Patient> impleme
@Override
protected void prepareStatements()throws SQLException{
db.setAutoCommit(true);
// TODO: use prefix from configuration to specify tablespace
insert = db.prepareStatement("INSERT INTO patient_dimension(patient_num, import_date, sourcesystem_cd) VALUES(?,current_timestamp,?)");
......@@ -144,10 +146,10 @@ public class PostgresPatientStore extends PostgresExtension<I2b2Patient> impleme
//selectAll = db.prepareStatement("SELECT p.patient_num, p.vital_status_cd, p.birth_date, p.death_date, p.sex_cd, p.download_date, p.sourcesystem_cd, m.patient_ide, m.patient_ide_source, m.patient_ide_status FROM patient_mapping m, patient_dimension p WHERE m.patient_num=p.patient_num AND m.project_id='"+projectId+"'");
// TODO select only patients relevant to the current project: eg. join patient_dimension with patient_mapping to get only relevant rows.
selectAll = db.prepareStatement("SELECT patient_num, vital_status_cd, birth_date, death_date, sex_cd, download_date, sourcesystem_cd FROM patient_dimension", ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
selectAll.setFetchSize(Integer.parseInt(config.getProperty("fetchSize", "10000")));
selectAll.setFetchSize(getFetchSize());
selectAllIde = db.prepareStatement("SELECT patient_num, patient_ide, patient_ide_source, patient_ide_status, project_id FROM patient_mapping WHERE project_id='"+projectId+"' ORDER BY patient_num", ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
selectAllIde.setFetchSize(Integer.parseInt(config.getProperty("fetchSize", "10000")));
selectAllIde.setFetchSize(getFetchSize());
deletePatientSource = db.prepareStatement("DELETE FROM patient_dimension WHERE sourcesystem_cd=?");
deleteMapSource = db.prepareStatement("DELETE FROM patient_mapping WHERE sourcesystem_cd=?");
......
......@@ -11,7 +11,7 @@ import java.util.Enumeration;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Properties;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
......@@ -55,11 +55,11 @@ public class PostgresVisitStore extends PostgresExtension<I2b2Visit>{
private PreparedStatement deleteSource;
private PreparedStatement deleteMapSource;
public PostgresVisitStore(Properties configuration) {
public PostgresVisitStore(Map<String,String> configuration) {
super(configuration);
visitCache = new Hashtable<>();
idCache = new Hashtable<>();
projectId = config.getProperty("project");
projectId = config.get("project");
idSourceDefault = "HIVE";
idSourceSeparator = ':';
}
......@@ -75,9 +75,9 @@ public class PostgresVisitStore extends PostgresExtension<I2b2Visit>{
update = db.prepareStatement("UPDATE visit_dimension SET active_status_cd=?, start_date=?, end_date=?, inout_cd=?, location_cd=?, update_date=current_timestamp, download_date=?, sourcesystem_cd=? WHERE encounter_num=?");
//select = db.prepareStatement("SELECT encounter_num, patient_num, active_status_cd, start_date, end_date, inout_cd, location_cd, update_date, sourcesystem_cd FROM visit_dimension WHERE patient_num=?");
selectAll = db.prepareStatement("SELECT encounter_num, patient_num, active_status_cd, start_date, end_date, inout_cd, location_cd, download_date, sourcesystem_cd FROM visit_dimension", ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
selectAll.setFetchSize(Integer.parseInt(config.getProperty("fetchSize", "10000")));
selectAll.setFetchSize(getFetchSize());
selectMappingsAll = db.prepareStatement("SELECT encounter_num, encounter_ide, encounter_ide_source, patient_ide, patient_ide_source, encounter_ide_status, project_id FROM encounter_mapping ORDER BY encounter_num", ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
selectMappingsAll.setFetchSize(Integer.parseInt(config.getProperty("fetchSize", "10000")));
selectMappingsAll.setFetchSize(getFetchSize());
deleteSource = db.prepareStatement("DELETE FROM visit_dimension WHERE sourcesystem_cd=?");
deleteMapSource = db.prepareStatement("DELETE FROM encounter_mapping WHERE sourcesystem_cd=?");
......
package de.sekmi.histream.impl;
import java.util.function.Consumer;
import de.sekmi.histream.Observation;
/**
* Calls a distinct extension consumer for each distinct extension found in successive observations
* @author Raphael
*
* @param <T>
*/
public class DistinctExtensionFilter<T> implements Consumer<Observation>{
private Class<T> distinctClass;
private Object prev;
private Consumer<T> distinctConsumer;
private Consumer<Observation> factConsumer;
public DistinctExtensionFilter(Consumer<Observation> factConsumer, Class<T> visitClass, Consumer<T> distinctConsumer){
this.distinctClass = visitClass;
this.factConsumer = factConsumer;
this.distinctConsumer = distinctConsumer;
}
@Override
public void accept(Observation t) {
T cur = t.getExtension(distinctClass);
if( cur != prev ){
distinctConsumer.accept(cur);
prev = cur;
}
factConsumer.accept(t);
}
}
......@@ -4,7 +4,12 @@ import java.io.FileInputStream;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import de.sekmi.histream.i2b2.I2b2Inserter;
import de.sekmi.histream.i2b2.I2b2Visit;
import de.sekmi.histream.io.SAXObservationProvider;
public class TestInsertXmlToI2b2 {
......@@ -27,24 +32,31 @@ public class TestInsertXmlToI2b2 {
factory.registerExtension(patientStore.getStore());
factory.registerExtension(visitStore.getStore());
SAXObservationProvider provider = new SAXObservationProvider(factory);
I2b2Inserter inserter = new I2b2Inserter();
inserter.open();
Map<String,String> props = new HashMap<>();
props.put("user", "i2b2demodata");
props.put("host", "localhost");
props.put("database", "i2b2");
props.put("port", "15432");
props.put("password", "");
props.put("nullProvider", "LCS-I2B2:PROVIDERS");
I2b2Inserter inserter = new I2b2Inserter(props);
// delete data
//inserter.purgeSource("test");
/*
provider.beforeFacts(v -> {
try{
inserter.purgeVisit(((I2b2Visit)v).getNum());
}catch( SQLException e ){
System.err.println("Unable to delete facts for visit: "+v);
}
});
*/
// load instance_num presets
visitStore.getStore().loadMaxInstanceNums();
provider.setHandler(inserter);
// find distinct visits and delete each before inserting
provider.setHandler(new DistinctExtensionFilter<I2b2Visit>(inserter, I2b2Visit.class, v -> {
try{
inserter.purgeVisit(((I2b2Visit)v).getNum());
}catch( SQLException e ){
System.err.println("Unable to delete facts for visit: "+v);
}
} ));
provider.parse(new FileInputStream("src/test/resources/dwh-eav.xml"));
inserter.close();
visitStore.close();
......
......@@ -3,14 +3,14 @@ package de.sekmi.histream.impl;
import java.io.Closeable;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Properties;
import java.util.HashMap;
import de.sekmi.histream.i2b2.PostgresPatientStore;
public class TestPostgresPatientStore implements Closeable {
PostgresPatientStore store;
public void open(String host, int port) throws ClassNotFoundException, SQLException{
Properties props = new Properties();
HashMap<String, String> props = new HashMap<>();
props.put("project", "demo");
props.put("user", "i2b2demodata");
props.put("host", host);
......
......@@ -3,7 +3,7 @@ package de.sekmi.histream.impl;
import java.io.Closeable;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Properties;
import java.util.HashMap;
import de.sekmi.histream.i2b2.PostgresVisitStore;
......@@ -11,7 +11,7 @@ public class TestPostgresVisitStore implements Closeable {
private PostgresVisitStore store;
public void open(String host, int port) throws ClassNotFoundException, SQLException{
Properties props = new Properties();
HashMap<String, String> props = new HashMap<>();
props.put("project", "demo");
props.put("user", "i2b2demodata");
props.put("host", host);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment