Commit 53a3967d authored by R.W.Majeed's avatar R.W.Majeed
Browse files

allow data extraction on visit level. refactoring

parent bf90a70c
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -3,6 +3,8 @@ package de.sekmi.histream;
import java.io.IOException;
import java.time.Instant;

import de.sekmi.histream.ext.Visit;

/**
 * Extracts observations from complex sources
 * such as databases and data warehouses.
@@ -24,4 +26,5 @@ public interface ObservationExtractor {
	 * @throws IOException error (e.g. database failure)
	 */
	ObservationSupplier extract(Instant start_min, Instant start_max, Iterable<String> notations) throws IOException;
	ObservationSupplier extract(Iterable<Visit> visits, Iterable<String> notations) throws IOException;
}
+57 −24
Original line number Diff line number Diff line
@@ -2,6 +2,7 @@ package de.sekmi.histream.i2b2;

import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
@@ -24,24 +25,13 @@ import de.sekmi.histream.impl.StringValue;
 * @author R.W.Majeed
 *
 */
public class I2b2Extractor implements ObservationSupplier {
public abstract class I2b2Extractor implements ObservationSupplier {
	private static final Logger log = Logger.getLogger(I2b2Extractor.class.getName());

	private I2b2ExtractorFactory factory;
	private Connection dbc;
	private ResultSet rs;
	private boolean finished;
	protected I2b2ExtractorFactory factory;
	protected Connection dbc;

	/**
	 * Constructs a new extractor. Connection and ResultSet need to be 
	 * closed by a call to {@link #close()} if the object is not needed
	 * anymore.
	 * <p>
	 * 	
	 * </p>
	 * @param factory
	 * @param dbc
	 * @param rs ResultSet with rows from {@code observation_fact} table. 
	/** ResultSet with rows from {@code observation_fact} table. 
	 * Initially positioned before the first row. It is accessed read only
	 * and forward only. Its columns are required to be in the following order:
	 * <ol>
@@ -56,18 +46,52 @@ public class I2b2Extractor implements ObservationSupplier {
	 *  <li>end_date</li>
	 *  <li>RTRIM(valtype_cd) valtype_cd, tval_char, nval_num, RTRIM(valueflag_cd) valueflag_cd, units_cd, sourcesystem_cd</li>
	 * </ol>
	 **/
	private PreparedStatement ps;
	private ResultSet rs;
	private boolean finished;
	
	/**
	 * Constructs a new extractor. Connection and ResultSet need to be 
	 * closed by a call to {@link #close()} if the object is not needed
	 * anymore.
	 * <p>
	 * 	
	 * </p>
	 * @param factory
	 * @param dbc
	 * @throws SQLException error
	 */
	I2b2Extractor(I2b2ExtractorFactory factory, Connection dbc, ResultSet rs) throws SQLException {
	I2b2Extractor(I2b2ExtractorFactory factory, Connection dbc) throws SQLException {
		this.factory = factory;
		this.dbc = dbc;
		this.rs = rs;
	}

	protected abstract PreparedStatement prepareQuery() throws SQLException;

	/**
	 * Prepares and executes the query, producing the result set
	 * which can then be used to fetch observations.
	 * <p>
	 * This method can be called manually before the first call to {@link #get()}.
	 * If not called manually, it will be called automatically during the first
	 * call to {@link #get()}. During that implicit invocations, any errors are
	 * forwarded to the {@link #errorHandler(SQLException)}.
	 * </p>
	 * @throws SQLException SQL error
	 */
	public void prepareResultSet() throws SQLException{
		if( rs != null ){
			// this method assumes that there is no previous result set
			throw new IllegalStateException();
		}
		this.ps = prepareQuery();
		this.rs = ps.executeQuery();
		if( rs.next() == false ){
			// empty result set, no observations to process.
			finished = true;
		}		
	}
	
	/**
	 * Retrieves errors during the get() operation.
	 * The default implementation is to wrap the exception
@@ -204,6 +228,16 @@ public class I2b2Extractor implements ObservationSupplier {
		if( finished == true ){
			return null;
		}
		if( rs == null ){
			try {
				prepareResultSet();
			} catch (SQLException e) {
				finished = true; // prevent repeating the query execution failure
				// if we fail here, we are unable retrieve any fact
				errorHandler(e);
				return null;
			}
		}
		Observation o = null;
		try{
			// next/first observation is always top concept
@@ -246,13 +280,12 @@ public class I2b2Extractor implements ObservationSupplier {
	@Override
	public void close() {
		log.info("Closing extractor "+this.toString());
//		Statement st = rs.getStatement();
		try{
			rs.close();
			if( rs != null )rs.close();
			if( ps != null )ps.close();
		}catch( SQLException e){
			log.log(Level.WARNING,"Failed to close recortset",e);
			log.log(Level.WARNING,"Failed to close RecordSet",e);
		}
//		st.close();
		try{
			dbc.close();
		}catch( SQLException e){
+74 −84
Original line number Diff line number Diff line
@@ -5,16 +5,17 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
import java.util.logging.Logger;

import javax.sql.DataSource;


import de.sekmi.histream.ObservationExtractor;
import de.sekmi.histream.ObservationFactory;
import de.sekmi.histream.ext.Patient;
@@ -36,9 +37,11 @@ public class I2b2ExtractorFactory implements AutoCloseable, ObservationExtractor

	private DataSource ds;
	private Integer fetchSize;
	DataDialect dialect;
	private ObservationFactory observationFactory;
	private boolean allowWildcardConceptCodes;

	DataDialect dialect;
	boolean allowWildcardConceptCodes;
	boolean useEncounterTiming;
	
	Function<Integer,? extends Patient> lookupPatientNum;
	Function<Integer,? extends Visit> lookupVisitNum;
@@ -49,19 +52,17 @@ public class I2b2ExtractorFactory implements AutoCloseable, ObservationExtractor
	 * concepts overlap. (Such as query fails, duplicate facts, etc.)
	 * </p>
	 */
	public static String ALLOW_WILDCARD_CONCEPT_CODES = "de.sekmi.histream.i2b2.wildcard_concepts";
	public static final String ALLOW_WILDCARD_CONCEPT_CODES = "de.sekmi.histream.i2b2.wildcard_concepts";
	public static final String USE_ENCOUNTER_TIMESTAMPS = "de.sekmi.histream.i2b2.encounter_timing";
	
	
	private static String SELECT_PARAMETERS = "f.patient_num, f.encounter_num, f.instance_num, f.concept_cd, f.modifier_cd, f.provider_id, f.location_cd, f.start_date, f.end_date, RTRIM(f.valtype_cd) valtype_cd, f.tval_char, f.nval_num, RTRIM(f.valueflag_cd) valueflag_cd, f.units_cd, f.download_date, f.sourcesystem_cd";
	private static String SELECT_TABLE = "observation_fact f";
	//private static String SELECT_ORDER_CHRONO = "ORDER BY start_date, patient_num, encounter_num, instance_num, modifier_cd NULLS FIRST";
	private static String SELECT_ORDER_GROUP = "ORDER BY f.patient_num, f.encounter_num, f.start_date, f.instance_num, f.concept_cd, f.modifier_cd NULLS FIRST";

	public I2b2ExtractorFactory(DataSource crc_ds, ObservationFactory factory) throws SQLException{
		// TODO implement
		this.observationFactory = factory;
		ds = crc_ds;
		dialect = new DataDialect();
		fetchSize = 500;
	}

	public ObservationFactory getObservationFactory(){
@@ -75,15 +76,27 @@ public class I2b2ExtractorFactory implements AutoCloseable, ObservationExtractor
		this.lookupVisitNum = lookup;
	}
	public void setFeature(String feature, Object value){
		if( feature.equals(ALLOW_WILDCARD_CONCEPT_CODES) ){
		switch( feature ){
		case ALLOW_WILDCARD_CONCEPT_CODES:
			if( value instanceof Boolean ){
				this.allowWildcardConceptCodes = (Boolean)value;
			}else{
				throw new IllegalArgumentException("Boolean value expected for feature "+feature);
			}
			break;
		case USE_ENCOUNTER_TIMESTAMPS:
			if( value instanceof Boolean ){
				this.useEncounterTiming = (Boolean)value;
			}else{
				throw new IllegalArgumentException("Boolean value expected for feature "+feature);
			}
			break;
		default:
			throw new IllegalArgumentException("Feature not supported:"+feature+"="+value);
		}
	}
	private PreparedStatement prepareStatement(Connection dbc, String sql) throws SQLException{

	PreparedStatement prepareStatementForLargeResultSet(Connection dbc, String sql) throws SQLException{
		PreparedStatement s = dbc.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
		if( fetchSize != null ){
			s.setFetchSize(fetchSize);
@@ -92,35 +105,6 @@ public class I2b2ExtractorFactory implements AutoCloseable, ObservationExtractor
		return s;
	}

	public void setProperty(String property, Object value){
		// de.sekmi.histream.i2b2.extractor.project
	}
	
	private void createTemporaryConceptTable(Connection dbc, Iterable<String> concepts) throws SQLException{
		// delete table if previously existing
		try( Statement s = dbc.createStatement() ){
			s.executeUpdate("DROP TABLE IF EXISTS temp_concepts");
		}
		try( Statement s = dbc.createStatement() ){
			s.executeUpdate("CREATE TEMPORARY TABLE temp_concepts(concept VARCHAR(255) PRIMARY KEY)");			
		}
		try( PreparedStatement ps 
				= dbc.prepareStatement("INSERT INTO temp_concepts(concept) VALUES(?)") ){
			// TODO do we need to make sure that there are no duplicate concepts???
			for( String concept : concepts ){
				ps.clearParameters();
				ps.clearWarnings();
				ps.setString(1, concept);
				ps.executeUpdate();
			}
		}
		
	}
	
	private String escapeLikeString(String likeString){
		// TODO escape _ and % with backslash
		return likeString;
	}
	/**
	 * Extract observations for given concept codes with 
	 * {@code observation.start} between start_min and start_end.
@@ -141,57 +125,17 @@ public class I2b2ExtractorFactory implements AutoCloseable, ObservationExtractor
	 */
	//@SuppressWarnings("resource")
	I2b2Extractor extract(Timestamp start_min, Timestamp start_max, Iterable<String> notations) throws SQLException{
		// TODO move connection and prepared statement to I2b2Extractor
		PreparedStatement ps = null;
		ResultSet rs = null;
		Connection dbc = null;
		try{ // no try with resource, because we need to pass the connection to the extractor
			dbc = ds.getConnection();
			dbc.setAutoCommit(true);
			StringBuilder b = new StringBuilder(600);
			b.append("SELECT ");
			b.append(SELECT_PARAMETERS+" FROM "+SELECT_TABLE+" ");
			if( notations != null ){
				log.info("Creating temporary table for concept ids");
				Iterable<String> ids = notations;
				int wildcardCount = 0;
				if( allowWildcardConceptCodes ){
					List<String>escaped = new ArrayList<>();
					for( String id : ids ){
						String es = escapeLikeString(id).replace('*', '%');
						// check if wildcards actually used
						if( false == es.equals(id) ){
							wildcardCount ++;
						}
						escaped.add(es);
					}
					ids = escaped;
					// TODO add check for overlapping wildcard concepts (e.g. A* and AB*)
				}
				createTemporaryConceptTable(dbc, ids);
				if( wildcardCount > 0 ){
					b.append(" JOIN temp_concepts tc ON f.concept_cd LIKE tc.concept ");					
				}else{
					b.append(" JOIN temp_concepts tc ON f.concept_cd=tc.concept ");
				}
			}
			b.append("WHERE f.start_date BETWEEN ? AND ? ");
			b.append(SELECT_ORDER_GROUP);
			log.info("SQL: "+b.toString());

			ps = prepareStatement(dbc, b.toString());
			ps.setTimestamp(1, start_min);
			ps.setTimestamp(2, start_max);
			rs = ps.executeQuery();
			return new I2b2Extractor(this, dbc, rs);
			I2b2ExtractorImpl ei = new I2b2ExtractorImpl(this, dbc);
			ei.setInterval(start_min, start_max);
			ei.setNotations(notations);
			ei.prepareResultSet();
			return ei;
		}catch( SQLException e ){
			// clean up
			if( rs != null ){
				rs.close();
			}
			if( ps != null ){
				ps.close();
			}
			if( dbc != null ){
				dbc.close();
			}
@@ -211,4 +155,50 @@ public class I2b2ExtractorFactory implements AutoCloseable, ObservationExtractor
			throw new IOException(e);
		}
	}

	private int[] fetchEncounterNums(Iterable<Visit> visits){
		List<Visit> vl;
		if( visits instanceof List ){
			vl = (List<Visit>)visits;
		}else{
			vl = new ArrayList<>();
			visits.forEach(vl::add);
		}
		int[] nums = new int[vl.size()];
		Iterator<Visit> vi = vl.iterator();
		for( int i=0; i<nums.length; i++ ){
			Visit v = vi.next();
			int num;
			if( v instanceof I2b2Visit ){
				num = ((I2b2Visit) v).getNum();
			}else{
				throw new IllegalStateException("encounter_num not available for visit type "+v.getClass());
			}
			nums[i] = num;
		}
		return nums;
	}
	@Override
	public I2b2Extractor extract(Iterable<Visit> visits, Iterable<String> notations) throws IOException {
		Connection dbc = null;
		try{ // no try with resource, because we need to pass the connection to the extractor
			dbc = ds.getConnection();
			dbc.setAutoCommit(true);
			I2b2ExtractorImpl ei = new I2b2ExtractorImpl(this, dbc);
			ei.setVisits(fetchEncounterNums(visits));
			ei.setNotations(notations);
			ei.prepareResultSet();
			return ei;
		}catch( SQLException e ){
			// clean up
			if( dbc != null ){
				try {
					dbc.close();
				} catch (SQLException e1) {
					e.addSuppressed(e1);
				}
			}
			throw new IOException(e);
		}
	}
}
+264 −0
Original line number Diff line number Diff line
package de.sekmi.histream.i2b2;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.logging.Logger;

import de.sekmi.histream.ext.Visit;

public class I2b2ExtractorImpl extends I2b2Extractor {
	private static final Logger log = Logger.getLogger(I2b2ExtractorImpl.class.getName());
	private static String SELECT_PARAMETERS = "f.patient_num, f.encounter_num, f.instance_num, f.concept_cd, f.modifier_cd, f.provider_id, f.location_cd, f.start_date, f.end_date, RTRIM(f.valtype_cd) valtype_cd, f.tval_char, f.nval_num, RTRIM(f.valueflag_cd) valueflag_cd, f.units_cd, f.download_date, f.sourcesystem_cd";
	private static String SELECT_TABLE = "observation_fact f";
	//private static String SELECT_ORDER_CHRONO = "ORDER BY start_date, patient_num, encounter_num, instance_num, modifier_cd NULLS FIRST";
	private static String SELECT_ORDER_GROUP = "ORDER BY f.patient_num, f.encounter_num, f.start_date, f.instance_num, f.concept_cd, f.modifier_cd NULLS FIRST";

	private Iterable<String> notations;
	private int[] encounter_nums;
	
	private Timestamp interval_start;
	private Timestamp interval_end;
	private String temporaryTableSuffix;
	private List<String> temporaryTables;
	private List<Object> joinArguments;
	private List<Object> whereArguments;
	private int maxInlineArgs;

	I2b2ExtractorImpl(I2b2ExtractorFactory factory, Connection dbc) throws SQLException {
		super(factory, dbc);
		temporaryTables = new ArrayList<>();
		joinArguments = new ArrayList<>();
		whereArguments = new ArrayList<>();
	}

	private String tempTableName(String base){
		return base+temporaryTableSuffix;
	}
	private void limitNotations(List<String> joinParts, List<String> whereParts) throws SQLException{
		if( notations == null ){
			// no limit, return everything
			return;
		}
		// check if there is a limited number of notations
		log.info("Creating temporary table for concept ids");
		Iterable<String> ids = notations;
		int wildcardCount = 0;
		if( factory.allowWildcardConceptCodes ){
			List<String>escaped = new ArrayList<>();
			for( String id : ids ){
				String es = escapeLikeString(id).replace('*', '%');
				// check if wildcards actually used
				if( false == es.equals(id) ){
					wildcardCount ++;
				}
				escaped.add(es);
			}
			ids = escaped;
			// TODO add check for overlapping wildcard concepts (e.g. A* and AB*)
		}
		createTemporaryConceptTable(dbc, ids);
		if( wildcardCount > 0 ){
			joinParts.add("INNER JOIN "+tempTableName("temp_concepts")+" tc ON f.concept_cd LIKE tc.concept");				
		}else{
			joinParts.add("INNER JOIN "+tempTableName("temp_concepts")+" tc ON f.concept_cd=tc.concept");
		}

	}

	private void limitTimeframe(List<String> joinParts, List<String> whereParts){
		if( interval_start == null && interval_end == null ){
			// nothing to do
			return;
		}
		Objects.requireNonNull(interval_start);
		Objects.requireNonNull(interval_end);		
		if( factory.useEncounterTiming ){
			// interval refers to encounter start.
			// build and join an encounter table
			joinParts.add("INNER JOIN encounter_dimension ed ON ed.encouner_num=f.encounter_num AND ed.start_time BETWEEN ? AND ?");
			joinArguments.add(interval_start);
			joinArguments.add(interval_end);
		}else{
			// interval refers to fact start
			whereParts.add("f.start_time BETWEEN ? AND ?");
			whereArguments.add(interval_start);
			whereArguments.add(interval_end);
		}
	}

	/**
	 * Build a String like {@code (?,?,?,?)} to use with an SQL IN statement.
	 * @param length number of question marks in the list
	 * @return string containing the list
	 */
	private static String generateInlineIn(int length){
		StringBuilder b = new StringBuilder(length*2+2);
		b.append('(');
		for( int i=0; i<length; i++ ){
			if( i != 0 ){
				b.append(',');
			}
			b.append('?');
		}
		b.append(')');
		return b.toString();
	}
	private void limitVisits(List<String> joinParts, List<String> whereParts){
		if( encounter_nums == null ){
			// no restriction, nothing to do
			return;
		}
		if( encounter_nums.length == 1 ){
			// single visit, use inline sql
			whereParts.add("f.encounter_num = ?");
			whereArguments.add(Integer.valueOf(encounter_nums[0]));
		}else if( encounter_nums.length < maxInlineArgs ){
			whereParts.add("f.encounter_num IN "+generateInlineIn(encounter_nums.length));
			for( int i=0; i<encounter_nums.length; i++ ){
				whereArguments.add(Integer.valueOf(encounter_nums[i]));
			}
		}else{
			// TODO implement temporary visit table
			throw new UnsupportedOperationException("Temporary visit table not implemented yet for specified visits");
		}
	}
	@Override
	protected PreparedStatement prepareQuery() throws SQLException {
		StringBuilder b = new StringBuilder(600);
		b.append("SELECT ");
		b.append(SELECT_PARAMETERS+" FROM "+SELECT_TABLE+" ");

		List<String> joinParts = new ArrayList<>();
		List<String> whereParts = new ArrayList<>();
		
		limitNotations(joinParts, whereParts);

		limitTimeframe(joinParts, whereParts);

		limitVisits(joinParts, whereParts);

		// JOIN ...
		for( String part : joinParts ){
			b.append(part).append(' ');
		}
		// WHERE ...
		if( !whereParts.isEmpty() ){
			b.append("WHERE ");
			boolean first = true;
			for( String part : whereParts ){
				if( first == true ){
					first = false;
				}else{
					b.append(" AND ");
				}
				b.append(part);
			}
		}
		// GROUP ...
		b.append(SELECT_ORDER_GROUP);

		PreparedStatement ps = factory.prepareStatementForLargeResultSet(dbc, b.toString());
		int arg = 1;
		for( Object o : joinArguments ){
			ps.setObject(arg, o);
			arg ++;
		}
		for( Object o : whereArguments ){
			ps.setObject(arg, o);
			arg ++;			
		}
		// TODO debug log query and arguments
		return ps;
		
//		StringBuilder b = createSelect(dbc, notations);
//		b.append("WHERE f.start_date BETWEEN ? AND ? ");
//		log.info("SQL: "+b.toString());
//
//		ps.setTimestamp(1, start_min);
//		ps.setTimestamp(2, start_max);
//		rs = ps.executeQuery();
//		return new I2b2Extractor(this, dbc, rs);

	}


	/**
	 * Limit by specifying encounter_num's to use
	 * @param visits 
	 */
	public void setVisits(int[] encounter_num){
		this.encounter_nums = encounter_num;
	}
	public void setNotations(Iterable<String> notations){
		// TODO any need for preprocessing notations?
		this.notations = notations;
	}
	public void setInterval(Timestamp start, Timestamp end){
		interval_start = start;
		interval_end = end;
	}

	/**
	 * Create a temporary visit table with the specified visits
	 * @param dbc connection
	 * @param visits visits
	 * @throws SQLException SQL errors
	 */
	private void createTemporaryVisitTable(Connection dbc, Iterable<Visit> visits) throws SQLException{
		// delete table if previously existing
		try( Statement s = dbc.createStatement() ){
			s.executeUpdate("DROP TABLE IF EXISTS temp_visits");
		}
		try( Statement s = dbc.createStatement() ){
			s.executeUpdate("CREATE TEMPORARY TABLE temp_visits(encounter_num INTEGER PRIMARY KEY)");			
		}
		try( PreparedStatement ps 
				= dbc.prepareStatement("INSERT INTO temp_visits(encounter_num) VALUES(?)") ){
			for( Visit visit : visits ){
				ps.clearParameters();
				ps.clearWarnings();
				int vn;
				if( visit instanceof I2b2Visit ){
					vn = ((I2b2Visit)visit).getNum();
				}else{
					throw new SQLException("Unsupported visit type "+visit.getClass());
				}
				ps.setInt(1, vn);
				ps.executeUpdate();
			}
		}
	}
	private void createTemporaryConceptTable(Connection dbc, Iterable<String> concepts) throws SQLException{
		// delete table if previously existing
		try( Statement s = dbc.createStatement() ){
			s.executeUpdate("DROP TABLE IF EXISTS "+tempTableName("temp_concepts"));
		}
		try( Statement s = dbc.createStatement() ){
			s.executeUpdate("CREATE TEMPORARY TABLE "+tempTableName("temp_concepts")+"(concept VARCHAR(255) PRIMARY KEY)");
			temporaryTables.add("temp_concepts");
		}
		try( PreparedStatement ps 
				= dbc.prepareStatement("INSERT INTO "+tempTableName("temp_concepts")+"(concept) VALUES(?)") ){
			// TODO do we need to make sure that there are no duplicate concepts???
			for( String concept : concepts ){
				ps.clearParameters();
				ps.clearWarnings();
				ps.setString(1, concept);
				ps.executeUpdate();
			}
		}
		
	}
	
	private String escapeLikeString(String likeString){
		// TODO escape _ and % with backslash
		return likeString;
	}

}