Skip to content
Commits on Source (5)
......@@ -26,9 +26,9 @@ package de.sekmi.histream;
* Extensions allow additional information to be stored and retrieved
* for observations.
*
* @author Raphael
* @author R.W.Majeed
*
* @param <T> type class
* @param <T> slot type to be kept in memory with the observation
*/
public interface Extension<T>{
/**
......@@ -58,6 +58,23 @@ public interface Extension<T>{
* TODO change return type to array, to register all compatible classes
* @return instance type
*/
Iterable<Class<? super T>> getInstanceTypes();
Class<?>[] getInstanceTypes();
Class<T> getSlotType();
/**
* Extract subtype information from the slot type.
* E.g. a visit store can provide info about the patient
* @param slotInstance slot instance type
* @param subtype subtype to retrieve
* @return subtype instance
*/
<U> U extractSubtype(T slotInstance, Class<U> subtype);
public static <U,T> U extractSupertype(T slotInstance, Class<U> supertype){
if( supertype.isInstance(slotInstance) ) {
return supertype.cast(slotInstance);
}else {
throw new IllegalArgumentException("Unsupported supertype "+supertype);
}
}
}
package de.sekmi.histream.ext;
import java.util.List;
public interface PatientVisitStore {
Patient findPatient(String patientId);
Visit findVisit(String visitId);
void merge(Patient patient, String additionalId, ExternalSourceType source);
/**
* Get alias ids for the given patient (e.g. resulting from a merge)
* @param patient patient instance
* @return alias ids
*/
String[] getPatientAliasIds(Patient patient);
/**
* Deletes the patient identified by given id. This method does not remove any other associated
* data e.g. like visits, observations.
* @param id patient id
*/
void purgePatient(String patientId);
void purgeVisit(String visitId);
List<? extends Visit> allVisits(Patient patient);
}
......@@ -9,6 +9,7 @@ import java.util.Map;
* @author R.W.Majeed
*
*/
@Deprecated
public class CachedPatientExtension extends SimplePatientExtension {
private Map<String, PatientImpl> cache;
......
package de.sekmi.histream.impl;
import java.util.Arrays;
/*
* #%L
* histream
......@@ -35,11 +32,12 @@ import de.sekmi.histream.ext.Patient;
* @author R.W.Majeed
*
*/
@Deprecated
public class SimplePatientExtension implements Extension<PatientImpl>{
private final static Iterable<Class<? super PatientImpl>> TYPES = Arrays.asList(Patient.class, PatientImpl.class);
private final static Class<?>[] TYPES = new Class[] {Patient.class, PatientImpl.class};
@Override
public Iterable<Class<? super PatientImpl>> getInstanceTypes() {return TYPES;}
public Class<?>[] getInstanceTypes() {return TYPES;}
@Override
public PatientImpl createInstance(Object... args) {
......@@ -63,4 +61,14 @@ public class SimplePatientExtension implements Extension<PatientImpl>{
return patient;
}
@Override
public Class<PatientImpl> getSlotType() {
return PatientImpl.class;
}
@Override
public <U> U extractSubtype(PatientImpl slotInstance, Class<U> subtype) {
return extractSubtype(slotInstance, subtype);
}
}
package de.sekmi.histream.impl;
/*
* #%L
* histream
* %%
* Copyright (C) 2013 - 2015 R.W.Majeed
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import de.sekmi.histream.Observation;
import de.sekmi.histream.Extension;
import de.sekmi.histream.ext.ExternalSourceType;
import de.sekmi.histream.ext.Patient;
import de.sekmi.histream.ext.Visit;
public class SimplePatientVisitExtension implements Extension<VisitPatientImpl>{
private final static Class<?>[] TYPES = new Class[] {Visit.class,VisitPatientImpl.class,Patient.class,PatientImpl.class};
@Override
public Class<?>[] getInstanceTypes() {return TYPES;}
@Override
public VisitPatientImpl createInstance(Object... args) {
if( args.length != 3
|| !(args[0] instanceof String)
|| !(args[1] instanceof String)
|| !(args[2] instanceof ExternalSourceType) )
{
throw new IllegalArgumentException("Need arguments Patient id, Visit id, ExternalSourceType");
}
ExternalSourceType source = (ExternalSourceType)args[2];
PatientImpl patient = new PatientImpl();
patient.setId((String)args[0]);
patient.setSourceId(source.getSourceId());
patient.setSourceTimestamp(source.getSourceTimestamp());
VisitPatientImpl visit = new VisitPatientImpl((String)args[1], patient, null);
visit.setSourceId(source.getSourceId());
visit.setSourceTimestamp(source.getSourceTimestamp());
return visit;
}
@Override
public VisitPatientImpl createInstance(Observation observation) {
VisitPatientImpl visit = createInstance(observation.getPatientId(), observation.getEncounterId(), observation.getSource());
//visit.setId();
//visit.setPatientId(observation.getPatientId());
//visit.setSourceId(observation.getSourceId());
//visit.setSourceTimestamp(observation.getSourceTimestamp());
return visit;
}
@Override
public Class<VisitPatientImpl> getSlotType() {
return VisitPatientImpl.class;
}
@Override
public <U> U extractSubtype(VisitPatientImpl slotInstance, Class<U> subtype) {
if( subtype.isAssignableFrom(PatientImpl.class) ){
return subtype.cast(slotInstance.getPatient());
}else if( subtype.isInstance(slotInstance) ) {
return subtype.cast(slotInstance);
}else {
throw new IllegalArgumentException("Unsupported subtype "+subtype);
}
}
}
......@@ -23,18 +23,18 @@ package de.sekmi.histream.impl;
import de.sekmi.histream.Observation;
import java.util.Arrays;
import de.sekmi.histream.Extension;
import de.sekmi.histream.ext.ExternalSourceType;
import de.sekmi.histream.ext.Patient;
import de.sekmi.histream.ext.Visit;
@Deprecated
public class SimpleVisitExtension implements Extension<VisitImpl>{
private final static Iterable<Class<? super VisitImpl>> TYPES = Arrays.asList(Visit.class, VisitImpl.class);
private final static Class<?>[] TYPES = new Class[] {Visit.class, VisitImpl.class};
@Override
public Iterable<Class<? super VisitImpl>> getInstanceTypes() {return TYPES;}
public Class<?>[] getInstanceTypes() {return TYPES;}
@Override
public VisitImpl createInstance(Object... args) {
......@@ -65,4 +65,13 @@ public class SimpleVisitExtension implements Extension<VisitImpl>{
return visit;
}
@Override
public Class<VisitImpl> getSlotType() {
return VisitImpl.class;
}
@Override
public <U> U extractSubtype(VisitImpl slotInstance, Class<U> subtype) {
return extractSubtype(slotInstance, subtype);
}
}
......@@ -30,6 +30,7 @@ import de.sekmi.histream.ext.Patient;
import de.sekmi.histream.ext.StoredExtensionType;
import de.sekmi.histream.ext.Visit;
@Deprecated
public class VisitImpl extends StoredExtensionType implements Visit {
private DateTimeAccuracy startTime;
private DateTimeAccuracy endTime;
......
package de.sekmi.histream.impl;
import java.util.Objects;
/*
* #%L
* histream
* %%
* Copyright (C) 2013 - 2015 R.W.Majeed
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import de.sekmi.histream.DateTimeAccuracy;
import de.sekmi.histream.ext.StoredExtensionType;
import de.sekmi.histream.ext.Visit;
public class VisitPatientImpl extends StoredExtensionType implements Visit {
private PatientImpl patient;
private DateTimeAccuracy startTime;
private DateTimeAccuracy endTime;
private Status status;
private String locationId;
private String providerId;
/**
* Empty constructor protected, only
* available to overriding classes.
*/
protected VisitPatientImpl() {
}
public VisitPatientImpl(String id, PatientImpl patient, DateTimeAccuracy startTime){
setId(id);
setPatient(patient);
this.startTime = startTime;
}
public String getPatientId(){return patient.getId();}
public void setPatient(PatientImpl patient){
Objects.requireNonNull(patient);
// patient id should not be changed normally.
this.patient = patient;
markDirty(true);
}
public PatientImpl getPatient() {
return this.patient;
}
@Override
public DateTimeAccuracy getStartTime() {
return startTime;
}
@Override
public DateTimeAccuracy getEndTime() {
return endTime;
}
@Override
public Status getStatus() {
return this.status;
}
@Override
public void setStatus(Status status) {
checkAndUpdateDirty(this.status, status);
this.status = status;
}
@Override
public String getLocationId() {
return locationId;
}
@Override
public void setLocationId(String locationId){
checkAndUpdateDirty(this.locationId, locationId);
this.locationId = locationId;
}
@Override
public void setEndTime(DateTimeAccuracy endTime) {
checkAndUpdateDirty(this.endTime, endTime);
this.endTime = endTime;
}
@Override
public void setStartTime(DateTimeAccuracy startTime) {
checkAndUpdateDirty(this.startTime, startTime);
this.startTime = startTime;
}
@Override
public String getProviderId() {
return this.providerId;
}
@Override
public void setProviderId(String providerId) {
checkAndUpdateDirty(this.providerId, providerId);
this.providerId = providerId;
}
}
......@@ -32,6 +32,15 @@ public class TestFileObservationSuppliers {
s.close();
p.close();
}
@Test
public void verifyMinimalXML() throws Exception{
GroupedXMLProvider p = new GroupedXMLProvider(null);
ObservationSupplier s = p.createSupplier(getClass().getResourceAsStream("/min.xml"), factory);
Assert.assertTrue( s.stream().count() > 0 );
s.close();
p.close();
}
@Test
public void verifyFlatSupplier() throws Exception{
FlatProviderFactory p = new FlatProviderFactory(null);
......
<eav-data xmlns="http://sekmi.de/histream/ns/eav-data"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<!-- chronologisch impliziert, dass der zeitstempel eines nachfolgenden elementes
größer als alle vorangehenden elemente sein muss. Der Zeitstempel kann vor dem
Encounter-Start liegen -->
<meta>
<!-- Zeitpunkt, an dem der Export erstellt wurde bzw. Datenstand -->
<etl-strategy>replace-visit</etl-strategy>
<source timestamp="2015-04-21T06:58:00Z" id="test"/>
</meta>
<patient id="XX12345">
<given-name>A B</given-name>
<surname>Dampf</surname>
<!-- Gender allows for female, male, indeterminate -->
<!-- Gender element can also be removed or left empty -->
<gender>female</gender>
<encounter id="XXE12345">
<start>2014-01-01T10:30:00</start>
<fact concept="T:date:secs" start="2014-09-07T10:40:03"/>
</encounter>
</patient>
</eav-data>
......@@ -17,11 +17,6 @@
</plugins>
</build>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
......@@ -33,5 +28,17 @@
<artifactId>histream-core</artifactId>
<version>0.16-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hsqldb</groupId>
<artifactId>hsqldb</artifactId>
<version>2.4.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package de.sekmi.histream.i2b2;
import java.time.temporal.ChronoUnit;
import de.sekmi.histream.ext.Visit;
import de.sekmi.histream.impl.PatientImpl;
/*
* #%L
* histream
* %%
* Copyright (C) 2013 - 2015 R.W.Majeed
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import de.sekmi.histream.impl.VisitPatientImpl;
/**
* I2b2 visit. The active encounter_ide is returned by {@link #getId()}.
*
* @author Raphael
*
*/
public class I2b2PatientVisit extends VisitPatientImpl {
/**
* I2b2 internal encounter id (32bit integer)
*/
private int encounter_num;
private int patient_num;
/**
* String id aliases for the encounter
*/
String[] aliasIds;
/**
* Index in aliasIds for the primary alias
*/
int primaryAliasIndex;
int maxInstanceNum;
public I2b2PatientVisit(int encounter_num, int patient_num) {
super();
this.encounter_num = encounter_num;
this.patient_num = patient_num;
maxInstanceNum = 1;
// TODO set startDate, endDate
}
public int getNum(){return encounter_num;}
public int getPatientNum(){return patient_num;}
@Override
public void setPatient(PatientImpl patient) {
super.setPatient(patient);
if( patient instanceof I2b2Patient ) {
// also set the patient_num
int patient_num = ((I2b2Patient)patient).getNum();
this.patient_num = patient_num;
}else {
throw new IllegalArgumentException("Patient expected of instanceOf I2b2Patient");
}
}
@Override
public String toString(){
return "I2b2Visit(encounter_um="+encounter_num+")";
}
/**
* Get the i2b2 vital_status_cd for this visit.
* @return vital status code, see CRC_Design doc
*/
public String getActiveStatusCd(){
Visit visit = this;
char end_char=0, start_char=0;
if( visit.getEndTime() != null ){
switch( visit.getEndTime().getAccuracy() ){
case DAYS:
end_char = 0; // same meaning
end_char = 'Y';
break;
case MONTHS:
end_char = 'M';
break;
case YEARS:
end_char = 'X';
break;
case HOURS:
end_char = 'R';
break;
case MINUTES:
end_char = 'T';
break;
case SECONDS:
end_char = 'S';
break;
default:
}
}else{
// null end date
// U: unknown, O: ongoing
// default to unknown
end_char = 'U';
}
// start date
if( visit.getStartTime() != null ){
switch( visit.getStartTime().getAccuracy() ){
case DAYS:
start_char = 0; // same meaning
start_char = 'D';
break;
case MONTHS:
start_char = 'B';
break;
case YEARS:
start_char = 'F';
break;
case HOURS:
start_char = 'H';
break;
case MINUTES:
start_char = 'I';
break;
case SECONDS:
start_char = 'C';
break;
default:
}
}else{
// null start date
// L: unknown, A: active
// default to unknown
start_char = 'L';
}
if( end_char != 0 && start_char != 0 )
return new String(new char[]{end_char,start_char});
else if( end_char != 0 )
return new String(new char[]{end_char});
else if( start_char != 0 )
return new String(new char[]{start_char});
else return null; // should not happen
}
/**
* For decoding instructions, see the i2b2 documentation CRC_Design.pdf
* The vital cd can be one or two characters.
* This implementation is more failsafe by using the following
* algorithm:
* <ol>
* <li>For {@code null} or {@code ""} use both timestamps accurate to day
* <li>Try to decode first character as end indicator</li>
* <li>If {@code vital_cd.length > 1} use second character as start indicator, otherwise if unable to decode the end indicator, use the first character.</li>
* </ol>
* @param vital_cd code to indicate accuracy of start and end date
*/
public void setActiveStatusCd(String vital_cd){
Visit visit = this;
// load accuracy
char endIndicator = 0;
char startIndicator = 0;
if( vital_cd == null || vital_cd.length() == 0 ){
// start and end date accurate to day
// leave indicators at 0/null
}else{
// load first indicator character
endIndicator = vital_cd.charAt(0);
}
ChronoUnit accuracy = null;
// end date indicator
switch( endIndicator ){
case 'U': // unknown, no date
case 'O': // ongoing, no date
// set to null
visit.setEndTime(null);
break;
case 0:
case 'Y': // known, accurate to day
accuracy = ChronoUnit.DAYS;
break;
case 'M': // known, accurate to month
accuracy = ChronoUnit.MONTHS;
break;
case 'X': // known, accurate to year
accuracy = ChronoUnit.YEARS;
break;
case 'R': // known, accurate to hour
accuracy = ChronoUnit.HOURS;
break;
case 'T': // known, accurate to minute
accuracy = ChronoUnit.MINUTES;
break;
case 'S': // known, accurate to second
accuracy = ChronoUnit.SECONDS;
break;
default:
// no end indicator means accurate to day
accuracy = ChronoUnit.DAYS;
// no match for end date -> check for start status in first character
startIndicator = endIndicator;
}
// set accuracy for end time
if( visit.getEndTime() != null && accuracy != null ){
visit.getEndTime().setAccuracy(accuracy);
}
// load start indicator
if( vital_cd != null && vital_cd.length() > 1 ){
// use second character, if available
startIndicator = vital_cd.charAt(1);
}// otherwise, the first character is used if end indicator wasn't used. See default case above
accuracy = null;
// start date indicator
switch( startIndicator ){
case 'L': // unknown, no date
case 'A': // active, no date
setStartTime(null);
break;
case 0: // same as D
case 'D': // known, accurate to day
accuracy = ChronoUnit.DAYS;
break;
case 'B': // known, accurate to month
accuracy = ChronoUnit.MONTHS;
break;
case 'F': // known, accurate to year
accuracy = ChronoUnit.YEARS;
break;
case 'H': // known, accurate to hour
accuracy = ChronoUnit.HOURS;
break;
case 'I': // known, accurate to minute
accuracy = ChronoUnit.MINUTES;
break;
case 'C': // known, accurate to second
accuracy = ChronoUnit.SECONDS;
break;
default: // default to days if unable to parse
accuracy = ChronoUnit.DAYS;
}
if( visit.getStartTime() != null && accuracy != null ){
visit.getStartTime().setAccuracy(accuracy);
}
}
public String getInOutCd(){
Visit patient = this;
if( patient.getStatus() == null )return null;
else switch( patient.getStatus() ){
case Inpatient:
return "I";
case Outpatient:
case Emergency: // unsupported by i2b2, map to outpatient
return "O";
default:
// XXX should not happen, warning
return null;
}
}
}
......@@ -35,6 +35,7 @@ import de.sekmi.histream.impl.VisitImpl;
* @author Raphael
*
*/
@Deprecated
public class I2b2Visit extends VisitImpl {
/**
......@@ -73,6 +74,8 @@ public class I2b2Visit extends VisitImpl {
// also set the patient_num
int patient_num = ((I2b2Patient)patient).getNum();
this.patient_num = patient_num;
}else {
throw new IllegalArgumentException("Patient expected of instanceOf I2b2Patient");
}
}
......
......@@ -30,7 +30,6 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.Hashtable;
import java.util.Iterator;
......@@ -41,6 +40,7 @@ import java.util.logging.Logger;
import de.sekmi.histream.DateTimeAccuracy;
import de.sekmi.histream.Extension;
import de.sekmi.histream.Observation;
import de.sekmi.histream.ext.ExternalSourceType;
import de.sekmi.histream.ext.Patient;
......@@ -74,7 +74,7 @@ import de.sekmi.histream.ext.PatientStore;
*/
public class PostgresPatientStore extends PostgresExtension<I2b2Patient> implements PatientStore, Closeable{
private static final Logger log = Logger.getLogger(PostgresPatientStore.class.getName());
private static final Iterable<Class<? super I2b2Patient>> INSTANCE_TYPES = Arrays.asList(Patient.class, I2b2Patient.class);
private static final Class<?>[] INSTANCE_TYPES = new Class[] {Patient.class, I2b2Patient.class};
private String projectId;
private String idSourceDefault;
private char idSourceSeparator;
......@@ -541,7 +541,7 @@ public class PostgresPatientStore extends PostgresExtension<I2b2Patient> impleme
}
@Override
public Iterable<Class<? super I2b2Patient>> getInstanceTypes() {
public Class<?>[] getInstanceTypes() {
return INSTANCE_TYPES;
}
......@@ -649,5 +649,13 @@ public class PostgresPatientStore extends PostgresExtension<I2b2Patient> impleme
db = null;
}
}
@Override
public Class<I2b2Patient> getSlotType() {
return I2b2Patient.class;
}
@Override
public <U> U extractSubtype(I2b2Patient slotInstance, Class<U> subtype) {
return Extension.extractSupertype(slotInstance, subtype);
}
}
package de.sekmi.histream.i2b2;
import java.io.Closeable;
import java.io.IOException;
import java.sql.Connection;
/*
* #%L
* histream
* %%
* Copyright (C) 2013 - 2015 R.W.Majeed
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.logging.Level;
import java.util.logging.Logger;
import de.sekmi.histream.DateTimeAccuracy;
import de.sekmi.histream.ext.ExternalSourceType;
import de.sekmi.histream.ext.StoredExtensionType;
import de.sekmi.histream.ext.Visit.Status;
/**
* Visit cache which synchronizes with an i2b2 visit_dimension table.
* Required non-null columns are encounter_num, patient_num, update_date.
* <p>
* Some optional columns are used: active_status_cd, start_date, end_date, inout_cd, location_cd, sourcesystem_cd
* <p>
* XXX after loading encounters, the String patientId not set anymore and always null. To determine the patientId, the patientStore is required for lookup of the patientNum
* TODO use encounter_mapping table to map actual (source) patient_ide to internal patient_num for facts.
* <p>
* The variable argument list for {@link #createInstance(Object...)} requires the following arguments:
* {@link String}{@code visitId}, {@link I2b2Patient}{@code patient}, {@link ExternalSourceType}{@code source}.
*
* @author marap1
*
*/
public class PostgresPatientVisitCache extends PostgresPatientCache implements Closeable{
private static final Logger log = Logger.getLogger(PostgresPatientVisitCache.class.getName());
private int maxEncounterNum;
private Hashtable<Integer, I2b2PatientVisit> visitCache;
private Hashtable<String, I2b2PatientVisit> idCache;
/** if true, don't allow a change of patient for a given visit. */
private boolean rejectPatientChange;
private PreparedStatement insert;
private PreparedStatement insertMapping;
private PreparedStatement update;
// private PreparedStatement select;
private PreparedStatement selectAll;
private PreparedStatement selectMappingsAll;
private PreparedStatement deleteSource;
private PreparedStatement deleteMapSource;
// /**
// * Create a visit store using configuration settings.
// * The project id must be specified with the key {@code project}.
// * JDBC connection configuration is specified with the key
// * prefixes {@code jdbc.*} and {@code data.jdbc.*}
// * @param configuration key value pairs
// * @throws ClassNotFoundException database driver not found
// * @throws SQLException SQL exceptions
// */
// public PostgresVisitStore(Map<String,String> configuration) throws ClassNotFoundException, SQLException {
// super(configuration);
// this.projectId = config.get("project");
// openDatabase(new String[]{"jdbc.","data.jdbc."});
// initialize();
// }
//
// /**
// * Create a visit store using a {@link DataSource}.
// * The project id must be specified with the key {@code project}.
// * @param ds data source for the connection
// * @param configuration configuration settings
// * @throws SQLException SQL error
// */
// public PostgresVisitStore(DataSource ds, Map<String,String> configuration) throws SQLException{
// super(configuration);
// this.projectId = config.get("project");
// openDatabase(ds);
// initialize();
// }
public PostgresPatientVisitCache(){
this.rejectPatientChange = false;
}
@Override
public void open(Connection connection, String projectId, DataDialect dialect) throws SQLException{
// first load patients
super.open(connection, projectId, dialect);
visitCache = new Hashtable<>();
idCache = new Hashtable<>();
loadMaxEncounterNum();
batchLoad(); /// XXX loading visits does not set the String patientId, for that, the patientStore would be needed
}
@Override
protected void prepareStatements() throws SQLException {
super.prepareStatements();
// TODO: use prefix from configuration to specify tablespace
insert = db.prepareStatement("INSERT INTO visit_dimension(encounter_num, patient_num, import_date, download_date, sourcesystem_cd) VALUES(?,?,current_timestamp,?,?)");
insertMapping = db.prepareStatement("INSERT INTO encounter_mapping(encounter_num, encounter_ide, encounter_ide_source, patient_ide, patient_ide_source, encounter_ide_status, project_id, import_date, download_date, sourcesystem_cd) VALUES(?,?,?,?,?,'A','"+projectId+"',current_timestamp,?,?)");
update = db.prepareStatement("UPDATE visit_dimension SET patient_num=?, active_status_cd=?, start_date=?, end_date=?, inout_cd=?, location_cd=?, update_date=current_timestamp, download_date=?, sourcesystem_cd=? WHERE encounter_num=?");
//select = db.prepareStatement("SELECT encounter_num, patient_num, active_status_cd, start_date, end_date, inout_cd, location_cd, update_date, sourcesystem_cd FROM visit_dimension WHERE patient_num=?");
selectAll = db.prepareStatement("SELECT encounter_num, patient_num, active_status_cd, start_date, end_date, inout_cd, location_cd, download_date, sourcesystem_cd FROM visit_dimension", ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
selectAll.setFetchSize(fetchSize);
selectMappingsAll = db.prepareStatement("SELECT encounter_num, encounter_ide, encounter_ide_source, patient_ide, patient_ide_source, encounter_ide_status, project_id FROM encounter_mapping ORDER BY encounter_num", ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
selectMappingsAll.setFetchSize(fetchSize);
deleteSource = db.prepareStatement("DELETE FROM visit_dimension WHERE sourcesystem_cd=?");
deleteMapSource = db.prepareStatement("DELETE FROM encounter_mapping WHERE sourcesystem_cd=?");
}
public int size(){
return visitCache.size();
}
public void setRejectPatientChange(boolean rejectPatientChange){
this.rejectPatientChange = rejectPatientChange;
}
private void loadMaxEncounterNum() throws SQLException{
try( Statement s = db.createStatement() ){
String sql = "SELECT MAX(encounter_num) FROM visit_dimension";
ResultSet rs = s.executeQuery(sql);
if( rs.next() ){
maxEncounterNum = rs.getInt(1);
}else{
// patient_dimension is empty
// start numbering patients with 1
maxEncounterNum = 1;
}
rs.close();
}
log.info("MAX(encounter_num) = "+maxEncounterNum);
}
public I2b2PatientVisit lookupEncounterNum(Integer encounter_num){
return visitCache.get(encounter_num);
}
public void loadMaxInstanceNums() throws SQLException{
// TODO maybe better to load only encounters+max instance_num for current project -> join with encounter_mapping
log.info("Loading maximum instance_num for each encounter");
Statement stmt = db.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
final String sql = "SELECT patient_num, encounter_num, MAX(instance_num) FROM observation_fact GROUP BY patient_num, encounter_num";
int count = 0;
int noMatch = 0;
try( ResultSet rs = stmt.executeQuery(sql) ){
while( rs.next() ){
I2b2PatientVisit v = visitCache.get(rs.getInt(2));
if( v != null )v.maxInstanceNum = rs.getInt(3);
else noMatch ++;
count ++;
}
}
stmt.close();
log.info("Loaded MAX(instance_num) for "+count+" encounters");
if( noMatch != 0 ){
log.warning("Encountered "+noMatch+" encounter_num in observation_fact without matching visits");
}
}
private String pasteId(String source, String ide){
if( source == null || source.equals(idSourceDefault) )return ide;
else return source+":"+ide;
}
private String[] splitId(String id){
String ide;
String ids;
int p = id.indexOf(idSourceSeparator);
if( p == -1 ){
// id does not contain source
ids = idSourceDefault;
ide = id;
}else{
// id contains source, separate from id
ids = id.substring(0, p);
ide = id.substring(p+1);
}
return new String[]{ids,ide};
}
/**
* Set aliases for a visit, puts aliases into cache.
*
* @param visit visit instance
* @param aliases alias patient IDs (e.g. merged)
* @param primary index of primary alias (in aliases)
*/
private void setAliases(I2b2PatientVisit visit, String[] aliases, int primary){
visit.aliasIds = aliases;
visit.primaryAliasIndex = primary;
visit.setId(aliases[primary]);
// put in cache
for( String id : aliases ){
idCache.put(id, visit);
}
}
private void batchSetAliases(int num, ArrayList<String> aliases, int primary){
I2b2PatientVisit visit = visitCache.get(num);
if( visit == null ){
log.warning("Missing row in visit_dimension for encounter_mapping.encounter_num="+num);
}else{
setAliases(visit, aliases.toArray(new String[aliases.size()]), primary);
visit.markDirty(false);
}
}
private void batchLoad() throws SQLException{
// load visits
try( ResultSet rs = selectAll.executeQuery() ){
while( rs.next() ){
I2b2PatientVisit visit = loadFromResultSet(rs);
visitCache.put(visit.getNum(), visit);
}
}
// load id mappings
try( ResultSet rs = selectMappingsAll.executeQuery() ){
int num = -1; // current patient number
ArrayList<String> ids = new ArrayList<>(16);
int primary=0; // primary alias index
while( rs.next() ){
if( num == -1 )num = rs.getInt(1);
else if( num != rs.getInt(1) ){
// next encounter mapping
batchSetAliases(num, ids, primary);
// XXX
ids.clear();
num = rs.getInt(1);
primary = 0;
}
String id = pasteId(rs.getString(3), rs.getString(2));
if( rs.getString(6).equals("A") && rs.getString(7).equals(projectId) ){
// active id for project
primary = ids.size();
// TODO maybe use any other Active encounter as primary, if the project doesn't match
}
ids.add(id);
}
if( num != -1 ){
// don't forget last encounter
batchSetAliases(num, ids, primary);
}
}
}
/*
private I2b2PatientVisit retrieveFromStorage(String id) throws IOException{
synchronized( select ){
try {
select.setInt(1, Integer.parseInt(id));
} catch (NumberFormatException | SQLException e) {
throw new IOException(e);
}
try( ResultSet rs = select.executeQuery() ){
if( !rs.next() )return null;
return loadFromResultSet(rs);
} catch (SQLException e) {
throw new IOException(e);
}
}
}*/
private void updateStorage(I2b2PatientVisit visit) throws SQLException {
synchronized( update ){
update.setInt(1, visit.getPatientNum());
update.setString(2, visit.getActiveStatusCd());
update.setTimestamp(3, dialect.encodeInstantPartial(visit.getStartTime()));
update.setTimestamp(4, dialect.encodeInstantPartial(visit.getEndTime()));
update.setString(5, visit.getInOutCd());
update.setString(6, dialect.encodeLocationCd(visit.getLocationId()));
update.setTimestamp(7, dialect.encodeInstant(visit.getSourceTimestamp()));
update.setString(8, visit.getSourceId());
// where encounter_num=visit.getNum()
update.setInt(9, visit.getNum());
int rows = update.executeUpdate();
if( rows == 0 ){
log.warning("UPDATE executed for visit_dimension.encounter_num="+visit.getNum()+", but no rows changed.");
}
// clear dirty flag
visit.markDirty(false);
}
}
/**
* Add the visit to storage. Patient information is not written
* @param visit visit to add
* @throws SQLException if the INSERT failed
*/
private void addToStorage(I2b2PatientVisit visit) throws SQLException{
synchronized( insert ){
insert.setInt(1, visit.getNum() );
insert.setInt(2, visit.getPatientNum());
insert.setTimestamp(3, dialect.encodeInstant(visit.getSourceTimestamp()));
insert.setString(4, visit.getSourceId());
insert.executeUpdate();
// other fields are not written, don't clear the dirty flag
}
synchronized( insertMapping ){
insertMapping.setInt(1, visit.getNum());
String[] ids = splitId(visit.getId());
insertMapping.setString(2, ids[1]); // encounter_ide
insertMapping.setString(3, ids[0]); // encounter_ide_source
// XXX warning, this is only safe if PostgresPatientStore and PostgresVisitStore use same idSourceSeparator and idSourceDefault
ids = splitId(visit.getPatientId()); // TODO better solution
insertMapping.setString(4, ids[1]); // patient_ide
insertMapping.setString(5, ids[0]); // patient_ide_source
insertMapping.setTimestamp(6, dialect.encodeInstant(visit.getSourceTimestamp()));
insertMapping.setString(7, visit.getSourceId());
insertMapping.executeUpdate();
}
}
private I2b2PatientVisit loadFromResultSet(ResultSet rs) throws SQLException{
int id = rs.getInt(1);
int patid = rs.getInt(2);
// XXX String patientId is always null after loading from the database.
// load vital status code, which contains information about
// accuracy of birth and death dates.
String active_status_cd = rs.getString(3);
// make sure that non-null vital code contains at least one character
if( active_status_cd != null && active_status_cd.length() == 0 )active_status_cd = null;
DateTimeAccuracy startDate = dialect.decodeInstantPartial(rs.getTimestamp(4));
DateTimeAccuracy endDate = dialect.decodeInstantPartial(rs.getTimestamp(5));
// load sex
String inout_cd = rs.getString(6);
Status status = null;
if( inout_cd != null ){
switch( inout_cd.charAt(0) ){
case 'I':
status = Status.Inpatient;
break;
case 'O':
status = Status.Outpatient;
break;
}
}
// TODO: use patid
I2b2PatientVisit visit = new I2b2PatientVisit(id, patid);
visit.setStartTime(startDate);
visit.setEndTime(endDate);
visit.setStatus(status);
visit.setActiveStatusCd(active_status_cd);
visit.setLocationId(dialect.decodeLocationCd(rs.getString(7)));
visit.setSourceTimestamp(dialect.decodeInstant(rs.getTimestamp(8)));
visit.setSourceId(rs.getString(9));
// additional fields go here
// assign patient
I2b2Patient patient = lookupPatientNum(patid);
if( patient == null ) {
// visit in database with illegal patient reference. patient does not exist
// call warning handler
// TODO decide what to do.. create patient? fail loading visit? or maybe create a temporary memory patient without storage
log.severe("Visit "+id+" references non-existing patient_num "+patid);
}else {
visit.setPatient(patient);
}
// mark clean
visit.markDirty(false);
return visit;
}
/*
private void retrievalException(String id, IOException e) {
log.log(Level.SEVERE, "Unable to retrieve visit "+id, e);
}
*/
private void insertionException(I2b2PatientVisit visit, SQLException e) {
log.log(Level.SEVERE, "Unable to insert visit "+visit.getId(), e);
}
private void updateException(I2b2PatientVisit visit, SQLException e) {
log.log(Level.SEVERE, "Unable to update visit "+visit.getId(), e);
}
public I2b2PatientVisit createVisit(String encounterId, I2b2Patient patient, ExternalSourceType source){
I2b2PatientVisit visit = idCache.get(encounterId);
if( visit == null ){
// visit does not exist, create a new one
maxEncounterNum ++;
int encounter_num = maxEncounterNum;
visit = new I2b2PatientVisit(encounter_num, patient.getNum());
visit.setPatient(patient);
// created from observation, use source metadata
visit.setSourceId(source.getSourceId());
visit.setSourceTimestamp(source.getSourceTimestamp());
// put in cache
visitCache.put(encounter_num, visit);
// only one alias which is also primary
setAliases(visit, new String[]{encounterId}, 0);
// insert to storage
try {
addToStorage(visit);
} catch (SQLException e) {
insertionException(visit, e);
} // put in cache and insert into storage
// commonly, the item is modified after a call to this method,
// but changes are written later via a call to update.
// (otherwise, the instance would need to know whether to perform INSERT or UPDATE)
}else {
// visit already existing
// verify that the patient number from the visit matches with the observation
if( visit.getPatientNum() != patient.getNum() ){
// throw exception to abort processing
if( rejectPatientChange ){
throw new AssertionError("Patient_num mismatch for visit "+encounterId+": history says "+visit.getPatientNum()+" while data says "+patient.getNum(), null);
}else {
log.info("Updating visit #"+visit.getNum()+" for patient change from #"+visit.getPatientNum()+" to #"+patient.getNum());
visit.setPatient(patient);
}
}
}
return visit;
}
/**
* Find a visit. Does not create the visit if it doesn't exist.
* @param id visit id/alias
* @return visit or {@code null} if not found.
*/
public I2b2PatientVisit findVisit(String id){
return idCache.get(id);
}
@Override
public void deleteWhereSourceId(String sourceId) throws SQLException {
// first delete patient
super.deleteWhereSourceId(sourceId);
// then visit
deleteSource.setString(1, sourceId);
int numRows = deleteSource.executeUpdate();
log.info("Deleted "+numRows+" rows with sourcesystem_cd = "+sourceId);
deleteMapSource.setString(1, sourceId);
deleteMapSource.executeUpdate();
// find matching patients in cache
Enumeration<I2b2PatientVisit> all = visitCache.elements();
LinkedList<I2b2PatientVisit>remove = new LinkedList<>();
while( all.hasMoreElements() ){
I2b2PatientVisit p = all.nextElement();
if( p.getSourceId() != null && p.getSourceId().equals(sourceId) ){
remove.add(p); // remove later, otherwise the Enumeration might fail
}
// XXX does not work with sourceId == null
}
// remove patients from cache
for( I2b2PatientVisit p : remove ){
visitCache.remove(p.getNum());
for( String id : p.aliasIds ){
idCache.remove(id);
}
}
// XXX some ids might remain in patient_mapping, because we don't store the patient_mapping sourcesystem_cd
// usually this should work, as we assume sourcesystem_cd to be equal for patients in both tables
// reload MAX(patient_num)
loadMaxEncounterNum();
}
@Override
public void flush() {
Iterator<I2b2PatientVisit> dirty = StoredExtensionType.dirtyIterator(visitCache.elements());
int count = 0;
while( dirty.hasNext() ){
I2b2PatientVisit visit = dirty.next();
try {
updateStorage(visit);
count ++;
} catch (SQLException e) {
updateException(visit, e);
}
}
if( count != 0 )log.info("Updated "+count+" visits in database");
}
@Override
public synchronized void close() throws IOException {
if( db != null ){
flush();
try {
db.close();
} catch (SQLException e) {
throw new IOException(e);
}
db = null;
}
}
}
......@@ -30,7 +30,6 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.Hashtable;
import java.util.Iterator;
......@@ -41,8 +40,10 @@ import java.util.logging.Logger;
import de.sekmi.histream.DateTimeAccuracy;
import de.sekmi.histream.Extension;
import de.sekmi.histream.Observation;
import de.sekmi.histream.ext.ExternalSourceType;
import de.sekmi.histream.ext.Patient;
import de.sekmi.histream.ext.StoredExtensionType;
import de.sekmi.histream.ext.Visit;
import de.sekmi.histream.ext.Visit.Status;
......@@ -62,9 +63,10 @@ import de.sekmi.histream.ext.Visit.Status;
* @author marap1
*
*/
@Deprecated
public class PostgresVisitStore extends PostgresExtension<I2b2Visit> implements Closeable{
private static final Logger log = Logger.getLogger(PostgresVisitStore.class.getName());
private static final Iterable<Class<? super I2b2Visit>> INSTANCE_TYPES = Arrays.asList(Visit.class,I2b2Visit.class);
private static final Class<?>[] INSTANCE_TYPES = new Class[] {Visit.class,I2b2Visit.class};
private String projectId;
private int maxEncounterNum;
......@@ -476,9 +478,8 @@ public class PostgresVisitStore extends PostgresExtension<I2b2Visit> implements
return getOrCreateInstance(fact.getEncounterId(), fact.getExtension(I2b2Patient.class), fact.getSource());
}
@Override
public Iterable<Class<? super I2b2Visit>> getInstanceTypes() {
return INSTANCE_TYPES;
public I2b2Visit createInstance(String encounterId, Patient patient, ExternalSourceType source) {
return getOrCreateInstance(encounterId, (I2b2Patient)patient, source);
}
/**
......@@ -565,5 +566,17 @@ public class PostgresVisitStore extends PostgresExtension<I2b2Visit> implements
db = null;
}
}
@Override
public Class<?>[] getInstanceTypes() {
return INSTANCE_TYPES;
}
@Override
public Class<I2b2Visit> getSlotType() {
return I2b2Visit.class;
}
@Override
public <U> U extractSubtype(I2b2Visit slotInstance, Class<U> subtype) {
return Extension.extractSupertype(slotInstance, subtype);
}
}
package de.sekmi.histream.i2b2;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.Statement;
import java.util.logging.Logger;
import javax.sql.DataSource;
// TODO move to query-i2b2-sql
public class LocalHSQLDataSource implements DataSource{
private PrintWriter pw;
private static final String JDBC_URI = "jdbc:hsqldb:file:target/testdb_crcdata";
public void delete() throws SQLException {
try( Connection c = getConnection() ){
Statement s = c.createStatement();
s = c.createStatement();
s.executeQuery("DROP SCHEMA PUBLIC CASCADE");
s.close();
}
}
private static void executeSQL(Connection dbc, BufferedReader lines) throws SQLException, IOException {
StringBuilder stmt = new StringBuilder();
String line;
while( (line = lines.readLine()) != null ) {
line = line.trim();
if( line.trim().startsWith("--") ) {
// ignore comment lines
continue;
}
if( line.endsWith(";") ) {
// append without ;
stmt.append(line.substring(0, line.length()-1));
// execute
try( Statement s = dbc.createStatement() ){
s.executeUpdate(stmt.toString());
}
// clear
stmt = new StringBuilder();
}else {
stmt.append(line);
}
}
}
/**
* Create the database and initialize it with the specified DDL statements
* @param sql_ddl SQL DDL statements
* @throws SQLException SQL error
* @throws IOException IO error reading the DDL
*/
public void create(BufferedReader ... sql_ddl) throws SQLException, IOException {
try( Connection dbc = DriverManager.getConnection(JDBC_URI+";create=true", "SA", "") ){
for( BufferedReader ddl : sql_ddl ) {
executeSQL(dbc, ddl);
}
}
}
public void createI2b2() throws SQLException, IOException {
try( InputStream in = TestHSQLDataSource.class.getResourceAsStream("/i2b2_hsqldb_ddl_create.sql");
BufferedReader rd = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)) )
{
this.create(rd);
}
}
public Integer executeCountQuery(String sql) throws SQLException {
Integer ret = null;
try( Connection c = getConnection();
Statement s = c.createStatement();
ResultSet rs = s.executeQuery(sql) )
{
if( rs.next() ) {
ret = rs.getInt(1);
}
}
return ret;
}
public LocalHSQLDataSource() {
pw = new PrintWriter(System.out);
}
@Override
public PrintWriter getLogWriter() throws SQLException {
return pw;
}
@Override
public void setLogWriter(PrintWriter out) throws SQLException {
this.pw = out;
}
@Override
public void setLoginTimeout(int seconds) throws SQLException {
// TODO Auto-generated method stub
}
@Override
public int getLoginTimeout() throws SQLException {
// TODO Auto-generated method stub
return 0;
}
@Override
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
throw new SQLFeatureNotSupportedException();
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public Connection getConnection() throws SQLException {
return getConnection("SA","");
}
@Override
public Connection getConnection(String username, String password) throws SQLException {
return DriverManager.getConnection(JDBC_URI+";ifexists=true", username, password);
}
}
package de.sekmi.histream.i2b2;
import java.io.IOException;
import java.sql.SQLException;
import org.junit.Assert;
import org.junit.Test;
public class TestHSQLDataSource {
@Test
public void testCreateTables() throws IOException, SQLException {
LocalHSQLDataSource ds = new LocalHSQLDataSource();
ds.createI2b2();
// perform queries
Assert.assertEquals(0, ds.executeCountQuery("SELECT COUNT(*) FROM observation_fact").intValue());
// drop database
ds.delete();
}
}
......@@ -10,6 +10,7 @@ import org.junit.Test;
import de.sekmi.histream.DateTimeAccuracy;
@Deprecated
public class TestI2b2Visit {
private DateTimeAccuracy createAccurateTimestamp(){
......