Commit 87a10fe3 authored by R.W.Majeed's avatar R.W.Majeed

separate initialisation from constructor to allow patient/visit/inserter bean implementations

parent 94f68503
...@@ -29,12 +29,10 @@ import java.sql.SQLException; ...@@ -29,12 +29,10 @@ import java.sql.SQLException;
import java.sql.Timestamp; import java.sql.Timestamp;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.sql.DataSource;
import de.sekmi.histream.Modifier; import de.sekmi.histream.Modifier;
import de.sekmi.histream.Observation; import de.sekmi.histream.Observation;
...@@ -80,17 +78,19 @@ public class I2b2Inserter extends AbstractObservationHandler implements Observat ...@@ -80,17 +78,19 @@ public class I2b2Inserter extends AbstractObservationHandler implements Observat
private DataDialect dialect; private DataDialect dialect;
private int insertCount; private int insertCount;
public I2b2Inserter(Map<String,String> config) throws ClassNotFoundException, SQLException{ // public I2b2Inserter(Map<String,String> config) throws ClassNotFoundException, SQLException{
db = PostgresExtension.getConnection(config, new String[]{"jdbc.","data.jdbc."}); // db = PostgresExtension.getConnection(config, new String[]{"jdbc.","data.jdbc."});
initialize(config); // initialize(config);
} // }
public I2b2Inserter(DataSource ds, Map<String,String> config) throws SQLException{ // public I2b2Inserter(DataSource ds, Map<String,String> config) throws SQLException{
db = ds.getConnection(); // db = ds.getConnection();
initialize(config); // initialize(config);
// }
public I2b2Inserter(){
} }
private interface Preprocessor{ private interface Preprocessor{
void preprocess(Observation fact)throws SQLException; void preprocess(Observation fact)throws SQLException;
} }
...@@ -148,7 +148,7 @@ public class I2b2Inserter extends AbstractObservationHandler implements Observat ...@@ -148,7 +148,7 @@ public class I2b2Inserter extends AbstractObservationHandler implements Observat
log.info("Deleted "+rows+" rows for encounter_num="+encounter_num); log.info("Deleted "+rows+" rows for encounter_num="+encounter_num);
return 0 != rows; return 0 != rows;
} }
private void prepareStatements(Map<String,String> props)throws SQLException{ private void prepareStatements()throws SQLException{
// no value // no value
insertFact = db.prepareStatement("" insertFact = db.prepareStatement(""
+ "INSERT INTO observation_fact (" + "INSERT INTO observation_fact ("
...@@ -171,18 +171,18 @@ public class I2b2Inserter extends AbstractObservationHandler implements Observat ...@@ -171,18 +171,18 @@ public class I2b2Inserter extends AbstractObservationHandler implements Observat
* Initialize the database connection * Initialize the database connection
* @throws SQLException if preparation/initialisation failed * @throws SQLException if preparation/initialisation failed
*/ */
private void initialize(Map<String,String> props)throws SQLException{ public void open(Connection connection, DataDialect dialect)throws SQLException{
dialect = new DataDialect(); this.dialect = dialect;
String nullProvider = props.get("nullProvider"); this.db = connection;
if( nullProvider == null ){ // String nullProvider = props.get("nullProvider");
log.warning("property 'nullProvider' missing, using '@' (may violate foreign keys)"); // if( nullProvider == null ){
nullProvider = "@"; // log.warning("property 'nullProvider' missing, using '@' (may violate foreign keys)");
} // nullProvider = "@";
dialect.setDefaultProviderId(nullProvider); // }
insertCount = 0; insertCount = 0;
db.setAutoCommit(false); db.setAutoCommit(false);
prepareStatements(props); prepareStatements();
} }
......
package de.sekmi.histream.i2b2;
import de.sekmi.histream.Extension;
import de.sekmi.histream.Observation;
public class PatientNumExtension implements Extension<Integer>{
@Override
public Integer createInstance(Observation observation) {
// TODO Auto-generated method stub
return null;
}
@Override
public Integer createInstance(Object... args) throws UnsupportedOperationException, IllegalArgumentException {
if( args.length != 1 || args[0] == null || !(args[0] instanceof Integer) ){
throw new IllegalArgumentException("Expecting single Integer argument");
}
return (Integer)args[0];
}
@Override
public Class<?>[] getInstanceTypes() {
return new Class<?>[]{Integer.class};
}
}
...@@ -21,20 +21,11 @@ package de.sekmi.histream.i2b2; ...@@ -21,20 +21,11 @@ package de.sekmi.histream.i2b2;
*/ */
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Timestamp; import java.sql.Timestamp;
import java.util.Map;
import java.util.Properties;
import javax.sql.DataSource;
import de.sekmi.histream.DateTimeAccuracy; import de.sekmi.histream.DateTimeAccuracy;
import de.sekmi.histream.Extension; import de.sekmi.histream.Extension;
import de.sekmi.histream.Plugin;
/** /**
* Extension with database connectivity. * Extension with database connectivity.
* Uses configuration properties host, port, database. * Uses configuration properties host, port, database.
...@@ -44,112 +35,68 @@ import de.sekmi.histream.Plugin; ...@@ -44,112 +35,68 @@ import de.sekmi.histream.Plugin;
* *
* @param <T> extension instance type * @param <T> extension instance type
*/ */
public abstract class PostgresExtension<T> implements Extension<T>, Plugin { public abstract class PostgresExtension<T> implements Extension<T> {
private static final int defaultFetchSize = 10000; // private static final int defaultFetchSize = 10000;
private static final String driver = "org.postgresql.Driver"; // protected Map<String,String> config;
protected Map<String,String> config; // protected Connection db;
protected Connection db; //
// protected PostgresExtension(Map<String,String> configuration){
protected PostgresExtension(Map<String,String> configuration){ // this.config = configuration;
this.config = configuration; // }
} //
//
public static Connection getConnection(Map<String,String> props, String[] prefixes) throws SQLException, ClassNotFoundException{ // /**
Properties jdbc = new Properties(); // * Open a database connection using configuration properties
for( String prefix : prefixes ){ // * with the given prefixes.
PostgresExtension.copyProperties(props, prefix, jdbc); // * @param propertyPrefixes prefix to the configuration properties
} // * @throws ClassNotFoundException if the database driver could not be loaded
return getConnection(jdbc); // * @throws SQLException any SQL exceptions
} // */
// protected void openDatabase(String[] propertyPrefixes) throws ClassNotFoundException, SQLException{
private static Connection getConnection(Properties props) throws SQLException, ClassNotFoundException{ // db = getConnection(config, propertyPrefixes);
Class.forName(driver); // prepareStatements();
StringBuilder sb = new StringBuilder("jdbc:postgresql://"); // }
//
if( props.get("host") == null ){ // /**
throw new IllegalArgumentException("host property missing for JDBC connection"); // * Open a database connection using a data source
}else{ // * @param ds data source
sb.append(props.get("host")); // * @throws SQLException SQL exceptions
} // */
if( props.get("port") != null ){ // protected void openDatabase(DataSource ds) throws SQLException{
sb.append(':').append(props.get("port")); // db = ds.getConnection();
} // prepareStatements();
if( !props.containsKey("database") ){ // }
throw new IllegalArgumentException("database property missing for JDBC connection"); //
} // @Override
sb.append('/').append(props.getProperty("database")); // public void close()throws IOException{
//
return DriverManager.getConnection(sb.toString(), props); // flush();
} //
// try {
/** // // close database
* Each key in src that starts with keyPrefix is copied (without the prefix) and its value to dest // if( db != null && !db.isClosed() )db.close();
* @param src map containing key,value pairs // }catch( SQLException e ){
* @param keyPrefix prefix to match src keys // throw new IOException(e);
* @param dest destination properties // }
*/ // }
public static void copyProperties(Map<String,String> src, String keyPrefix, Properties dest){ // /**
src.forEach( // * Get the configuration setting for fetchSize if configured. Otherwise
(key,value) -> { // * the default 10000 is returned.
if( key.startsWith(keyPrefix) ){ // * @return configured fetch size, or 10000 otherwise.
dest.put(key.substring(keyPrefix.length()), value); // */
} // public int getFetchSize(){
} // if( config.containsKey("fetchSize") ){
); // return Integer.parseInt(config.get("fetchSize"));
} // }else{
// return defaultFetchSize;
/** // }
* Open a database connection using configuration properties // }
* with the given prefixes.
* @param propertyPrefixes prefix to the configuration properties
* @throws ClassNotFoundException if the database driver could not be loaded
* @throws SQLException any SQL exceptions
*/
protected void openDatabase(String[] propertyPrefixes) throws ClassNotFoundException, SQLException{
db = getConnection(config, propertyPrefixes);
prepareStatements();
}
/**
* Open a database connection using a data source
* @param ds data source
* @throws SQLException SQL exceptions
*/
protected void openDatabase(DataSource ds) throws SQLException{
db = ds.getConnection();
prepareStatements();
}
@Override
public void close()throws IOException{
flush();
try {
// close database
if( db != null && !db.isClosed() )db.close();
}catch( SQLException e ){
throw new IOException(e);
}
}
public static Timestamp inaccurateSqlTimestamp(DateTimeAccuracy dateTime){ public static Timestamp inaccurateSqlTimestamp(DateTimeAccuracy dateTime){
if( dateTime == null )return null; if( dateTime == null )return null;
else return Timestamp.valueOf(dateTime.getLocal()); else return Timestamp.valueOf(dateTime.getLocal());
} }
/**
* Get the configuration setting for fetchSize if configured. Otherwise
* the default 10000 is returned.
* @return configured fetch size, or 10000 otherwise.
*/
public int getFetchSize(){
if( config.containsKey("fetchSize") ){
return Integer.parseInt(config.get("fetchSize"));
}else{
return defaultFetchSize;
}
}
/** /**
* Write updates to disk. The method is automatically called by {@link #close()}. * Write updates to disk. The method is automatically called by {@link #close()}.
......
package de.sekmi.histream.i2b2; package de.sekmi.histream.i2b2;
import java.io.Closeable;
import java.io.IOException;
import java.sql.Connection;
/* /*
* #%L * #%L
* histream * histream
...@@ -28,15 +32,15 @@ import java.sql.Statement; ...@@ -28,15 +32,15 @@ import java.sql.Statement;
import java.sql.Timestamp; import java.sql.Timestamp;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.Hashtable; import java.util.Hashtable;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Map; import java.util.Objects;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.sql.DataSource;
import de.sekmi.histream.DateTimeAccuracy; import de.sekmi.histream.DateTimeAccuracy;
import de.sekmi.histream.Observation; import de.sekmi.histream.Observation;
...@@ -70,13 +74,14 @@ import de.sekmi.histream.ext.PatientStore; ...@@ -70,13 +74,14 @@ import de.sekmi.histream.ext.PatientStore;
* @author marap1 * @author marap1
* *
*/ */
public class PostgresPatientStore extends PostgresExtension<I2b2Patient> implements PatientStore{ public class PostgresPatientStore extends PostgresExtension<I2b2Patient> implements PatientStore, Closeable{
private static final Logger log = Logger.getLogger(PostgresPatientStore.class.getName()); private static final Logger log = Logger.getLogger(PostgresPatientStore.class.getName());
private static final Class<?>[] INSTANCE_TYPES = new Class<?>[]{Patient.class, I2b2Patient.class}; private static final Iterable<Class<? super I2b2Patient>> INSTANCE_TYPES = Arrays.asList(Patient.class, I2b2Patient.class);
private String projectId; private String projectId;
private String idSourceDefault; private String idSourceDefault;
private char idSourceSeparator; private char idSourceSeparator;
private Connection db;
private int fetchSize;
// private String autoInsertSourceId; // private String autoInsertSourceId;
// maximum patient number, used to generate new patient_num for new patients // maximum patient number, used to generate new patient_num for new patients
...@@ -100,48 +105,56 @@ public class PostgresPatientStore extends PostgresExtension<I2b2Patient> impleme ...@@ -100,48 +105,56 @@ public class PostgresPatientStore extends PostgresExtension<I2b2Patient> impleme
private PreparedStatement deletePatientSource; private PreparedStatement deletePatientSource;
private PreparedStatement deleteMapSource; private PreparedStatement deleteMapSource;
/** // /**
* Construct new postgres patient store. In addition to properties // * Construct new postgres patient store. In addition to properties
* needed by {@link PostgresExtension#PostgresExtension(Map)}, // * needed by {@link PostgresExtension#PostgresExtension(Map)},
* the following properties are needed: // * the following properties are needed:
* <p>jdbc.{host|port|database} or data.jdbc.{host|port|database} to // * <p>jdbc.{host|port|database} or data.jdbc.{host|port|database} to
* construct the database URI. // * construct the database URI.
* Any other parameters under jdbc. or data.jdbc. are passed to the // * Any other parameters under jdbc. or data.jdbc. are passed to the
* JDBC connect method. // * JDBC connect method.
* // *
* <p>project, // * <p>project,
* <p>Optional properties: // * <p>Optional properties:
* <p> // * <p>
* idSourceDefault ('HIVE'), idSourceSeparator (single char, ':') // * idSourceDefault ('HIVE'), idSourceSeparator (single char, ':')
* fetchSize (int, 10000) // * fetchSize (int, 10000)
* @param configuration configuration // * @param configuration configuration
* @throws SQLException if the preparation or initialisation fails // * @throws SQLException if the preparation or initialisation fails
* @throws ClassNotFoundException if postgresql driver not found // * @throws ClassNotFoundException if postgresql driver not found
*/ // */
public PostgresPatientStore(Map<String,String> configuration) throws ClassNotFoundException, SQLException { // public PostgresPatientStore(Map<String,String> configuration) throws ClassNotFoundException, SQLException {
super(configuration); // super(configuration);
this.projectId = config.get("project"); // this.projectId = config.get("project");
openDatabase(new String[]{"jdbc.","data.jdbc."}); // openDatabase(new String[]{"jdbc.","data.jdbc."});
initialize(); // initialize();
} // }
//
//
/** // /**
* Create a patient store using a {@link DataSource}. // * Create a patient store using a {@link DataSource}.
* The project id must be specified with the key {@code project}. // * The project id must be specified with the key {@code project}.
* @param ds data source for the connection // * @param ds data source for the connection
* @param configuration configuration settings // * @param configuration configuration settings
* @throws SQLException SQL error // * @throws SQLException SQL error
*/ // */
public PostgresPatientStore(DataSource ds, Map<String,String> configuration) throws SQLException{ // public PostgresPatientStore(DataSource ds, Map<String,String> configuration) throws SQLException{
super(configuration); // super(configuration);
this.projectId = config.get("project"); // this.projectId = config.get("project");
openDatabase(ds); // openDatabase(ds);
initialize(); // initialize();
} // }
private void initialize() throws SQLException{
public PostgresPatientStore(){
this.idSourceDefault = "HIVE"; this.idSourceDefault = "HIVE";
this.idSourceSeparator = ':'; this.idSourceSeparator = ':';
this.fetchSize = 1000;
// TODO add methods to change the configuration
}
public void open(Connection connection, String projectId) throws SQLException{
this.db = connection;
Objects.requireNonNull(this.projectId, "non-null projectId required");
// this.autoInsertSourceId = "HS.auto"; // this.autoInsertSourceId = "HS.auto";
patientCache = new Hashtable<>(1000); patientCache = new Hashtable<>(1000);
idCache = new Hashtable<>(1000); idCache = new Hashtable<>(1000);
...@@ -198,10 +211,10 @@ public class PostgresPatientStore extends PostgresExtension<I2b2Patient> impleme ...@@ -198,10 +211,10 @@ public class PostgresPatientStore extends PostgresExtension<I2b2Patient> impleme
//selectAll = db.prepareStatement("SELECT p.patient_num, p.vital_status_cd, p.birth_date, p.death_date, p.sex_cd, p.download_date, p.sourcesystem_cd, m.patient_ide, m.patient_ide_source, m.patient_ide_status FROM patient_mapping m, patient_dimension p WHERE m.patient_num=p.patient_num AND m.project_id='"+projectId+"'"); //selectAll = db.prepareStatement("SELECT p.patient_num, p.vital_status_cd, p.birth_date, p.death_date, p.sex_cd, p.download_date, p.sourcesystem_cd, m.patient_ide, m.patient_ide_source, m.patient_ide_status FROM patient_mapping m, patient_dimension p WHERE m.patient_num=p.patient_num AND m.project_id='"+projectId+"'");
// TODO select only patients relevant to the current project: eg. join patient_dimension with patient_mapping to get only relevant rows. // TODO select only patients relevant to the current project: eg. join patient_dimension with patient_mapping to get only relevant rows.
selectAll = db.prepareStatement("SELECT patient_num, vital_status_cd, birth_date, death_date, sex_cd, download_date, sourcesystem_cd FROM patient_dimension", ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); selectAll = db.prepareStatement("SELECT patient_num, vital_status_cd, birth_date, death_date, sex_cd, download_date, sourcesystem_cd FROM patient_dimension", ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
selectAll.setFetchSize(getFetchSize()); selectAll.setFetchSize(this.fetchSize);
selectAllIde = db.prepareStatement("SELECT patient_num, patient_ide, patient_ide_source, patient_ide_status, project_id FROM patient_mapping WHERE project_id='"+projectId+"' ORDER BY patient_num", ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); selectAllIde = db.prepareStatement("SELECT patient_num, patient_ide, patient_ide_source, patient_ide_status, project_id FROM patient_mapping WHERE project_id='"+projectId+"' ORDER BY patient_num", ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
selectAllIde.setFetchSize(getFetchSize()); selectAllIde.setFetchSize(this.fetchSize);
deletePatientSource = db.prepareStatement("DELETE FROM patient_dimension WHERE sourcesystem_cd=?"); deletePatientSource = db.prepareStatement("DELETE FROM patient_dimension WHERE sourcesystem_cd=?");
deleteMapSource = db.prepareStatement("DELETE FROM patient_mapping WHERE sourcesystem_cd=?"); deleteMapSource = db.prepareStatement("DELETE FROM patient_mapping WHERE sourcesystem_cd=?");
...@@ -453,7 +466,7 @@ public class PostgresPatientStore extends PostgresExtension<I2b2Patient> impleme ...@@ -453,7 +466,7 @@ public class PostgresPatientStore extends PostgresExtension<I2b2Patient> impleme
else return null; else return null;
} }
private void setVitalStatusCd(Patient patient, String vital_cd){ private static void setVitalStatusCd(Patient patient, String vital_cd){
// load accuracy // load accuracy
if( vital_cd == null )return; // nothing to do if( vital_cd == null )return; // nothing to do
...@@ -657,7 +670,7 @@ public class PostgresPatientStore extends PostgresExtension<I2b2Patient> impleme ...@@ -657,7 +670,7 @@ public class PostgresPatientStore extends PostgresExtension<I2b2Patient> impleme
} }
@Override @Override
public Class<?>[] getInstanceTypes() { public Iterable<Class<? super I2b2Patient>> getInstanceTypes() {
return INSTANCE_TYPES; return INSTANCE_TYPES;
} }
...@@ -753,5 +766,17 @@ public class PostgresPatientStore extends PostgresExtension<I2b2Patient> impleme ...@@ -753,5 +766,17 @@ public class PostgresPatientStore extends PostgresExtension<I2b2Patient> impleme
// TODO Auto-generated method stub // TODO Auto-generated method stub
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public synchronized void close() throws IOException {
if( db != null ){
flush();
try {
db.close();
} catch (SQLException e) {
throw new IOException(e);
}
db = null;
}
}
} }
package de.sekmi.histream.i2b2; package de.sekmi.histream.i2b2;
import java.io.Closeable;
import java.io.IOException;
import java.sql.Connection;
/* /*
* #%L * #%L
* histream * histream
...@@ -28,15 +32,14 @@ import java.sql.Statement; ...@@ -28,15 +32,14 @@ import java.sql.Statement;
import java.sql.Timestamp; import java.sql.Timestamp;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.Hashtable; import java.util.Hashtable;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Map;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.sql.DataSource;
import de.sekmi.histream.DateTimeAccuracy; import de.sekmi.histream.DateTimeAccuracy;
import de.sekmi.histream.Observation; import de.sekmi.histream.Observation;
...@@ -59,14 +62,16 @@ import de.sekmi.histream.ext.Visit.Status; ...@@ -59,14 +62,16 @@ import de.sekmi.histream.ext.Visit.Status;
* @author marap1 * @author marap1
* *
*/ */
public class PostgresVisitStore extends PostgresExtension<I2b2Visit>{ public class PostgresVisitStore extends PostgresExtension<I2b2Visit> implements Closeable{
private static final Logger log = Logger.getLogger(PostgresVisitStore.class.getName()); private static final Logger log = Logger.getLogger(PostgresVisitStore.class.getName());
private static final Class<?>[] INSTANCE_TYPES = new Class<?>[]{Visit.class,I2b2Visit.class}; private static final Iterable<Class<? super I2b2Visit>> INSTANCE_TYPES = Arrays.asList(Visit.class,I2b2Visit.class);
private String projectId; private String projectId;
private int maxEncounterNum; private int maxEncounterNum;
private char idSourceSeparator; private char idSourceSeparator;
private String idSourceDefault; private String idSourceDefault;
private int fetchSize;
private Connection db;
//private static ChronoUnit[] map_date_units = {ChronoUnit.DAYS, ChronoUnit.MONTHS, ChronoUnit.YEARS, ChronoUnit.HOURS, ChronoUnit.MINUTES, ChronoUnit.SECONDS}; //private static ChronoUnit[] map_date_units = {ChronoUnit.DAYS, ChronoUnit.MONTHS, ChronoUnit.YEARS, ChronoUnit.HOURS, ChronoUnit.MINUTES, ChronoUnit.SECONDS};
//private static char[] map_death_chars = {}; //private static char[] map_death_chars = {};
...@@ -82,43 +87,48 @@ public class PostgresVisitStore extends PostgresExtension<I2b2Visit>{ ...@@ -82,43 +87,48 @@ public class PostgresVisitStore extends PostgresExtension<I2b2Visit>{
private PreparedStatement deleteSource; private PreparedStatement deleteSource;
private PreparedStatement deleteMapSource; private PreparedStatement deleteMapSource;
/** // /**
* Create a visit store using configuration settings. // * Create a visit store using configuration settings.
* The project id must be specified with the key {@code project}. // * The project id must be specified with the key {@code project}.
* JDBC connection configuration is specified with the key // * JDBC connection configuration is specified with the key
* prefixes {@code jdbc.*} and {@code data.jdbc.*} // * prefixes {@code jdbc.*} and {@code data.jdbc.*}
* @param configuration key value pairs // * @param configuration key value pairs
* @throws ClassNotFoundException database driver not found // * @throws ClassNotFoundException database driver not found
* @throws SQLException SQL exceptions // * @throws SQLException SQL exceptions
*/ // */
public PostgresVisitStore(Map<String,String> configuration) throws ClassNotFoundException,