Commit 79d8db9e authored by R.W.Majeed's avatar R.W.Majeed
Browse files

extractor added

parent f4d1af27
Loading
Loading
Loading
Loading
+206 −0
Original line number Diff line number Diff line
package de.sekmi.histream.i2b2;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;

import de.sekmi.histream.DateTimeAccuracy;
import de.sekmi.histream.Observation;
import de.sekmi.histream.ObservationSupplier;
import de.sekmi.histream.Value;

/**
 * Retrieves observations from i2b2. See {@link I2b2ExtractorFactory}.
 * @author R.W.Majeed
 *
 */
public class I2b2Extractor implements ObservationSupplier {

	private I2b2ExtractorFactory factory;
	private Connection dbc;
	private ResultSet rs;
	private boolean finished;
	
	/**
	 * Constructs a new extractor. Connection and ResultSet need to be 
	 * closed by a call to {@link #close()} if the object is not needed
	 * anymore.
	 * <p>
	 * 	
	 * </p>
	 * @param factory
	 * @param dbc
	 * @param rs ResultSet with rows from {@code observation_fact} table. 
	 * Initially positioned before the first row. It is accessed read only
	 * and forward only. Its columns are required to be in the following order:
	 * <ol>
	 *  <li>patient_num/id (int or String)</li>
	 *  <li>encounter_num/id (int or String)</li>
	 *  <li>instance_num</li>
	 *  <li>concept_cd</li>
	 *  <li>modifier_cd</li>
	 *  <li>provider_id</li>
	 *  <li>location_cd</li>
	 *  <li>start_date</li>
	 *  <li>end_date</li>
	 *  <li>RTRIM(valtype_cd) valtype_cd, tval_char, nval_num, RTRIM(valueflag_cd) valueflag_cd, units_cd, sourcesystem_cd</li>
	 * </ol>
	 * @throws SQLException error
	 */
	I2b2Extractor(I2b2ExtractorFactory factory, Connection dbc, ResultSet rs) throws SQLException {
		this.factory = factory;
		this.dbc = dbc;
		this.rs = rs;
		if( rs.next() == false ){
			// empty result set, no observations to process.
			finished = true;
		}
	}
	
	/**
	 * Retrieves errors during the get() operation.
	 * The default implementation is to wrap the exception
	 * in an unchecked exception which is then thrown.
	 * <p>
	 * TODO document unchecked exception
	 * </p>
	 * @param exception exception
	 */
	protected void errorHandler(SQLException exception){
		throw new UncheckedSQLException(exception);
	}
	
	private static class Row{
		String pid;
		String eid;
		Integer inst;
		/** concept id */
		String cid;
		/** modifier id */
		String mid;
		String lid;
		Timestamp start;
		Timestamp end;
	}
	private Row loadRow() throws SQLException{
		Row row = new Row();
		row.pid = rs.getObject(1).toString(); // patient id
		row.eid = rs.getObject(2).toString(); // encounter id
		row.inst = rs.getInt(3);
		if( rs.wasNull() ){
			row.inst = null;
		}
		row.cid = rs.getString(4); // concept id
		row.mid = factory.dialect.decodeModifierCd(rs.getString(5)); // modifier id
		// provider id 6
		row.lid = factory.dialect.decodeLocationCd(rs.getString(7)); // location id
		row.start = rs.getTimestamp(8);
		row.end = rs.getTimestamp(9);
		return row;
	}
	private Value createValue(Row row){
		// TODO create value
		return null;
	}
	private Observation createObservation(Row row){
		Observation o = factory.getObservationFactory().createObservation(row.pid, row.cid, new DateTimeAccuracy(row.start.toLocalDateTime()));
		o.setEncounterId(row.eid);
		if( row.end != null ){
			o.setEndTime(new DateTimeAccuracy(row.end.toLocalDateTime()));
		}
		o.setValue(createValue(row));
		if( row.lid != null ){
			o.setLocationId(row.lid);
		}
		// TODO more properties
		return o;
	}
	private boolean isModifier(Row fact, Row modifier){
		return( fact.pid.equals(modifier.pid)
				&& fact.eid.equals(modifier.eid) 
				&& fact.inst.equals(modifier.inst)
				&& modifier.mid != null );
	}
	@Override
	public Observation get() {
		if( finished == true ){
			return null;
		}
		Observation o = null;
		try{
			// next/first observation is always top concept
			Row r = loadRow();
			// validate row. modifier_cd should be null
			if( r.mid != null ){
				throw new SQLException("Null modifier expected for first fact in group");
			}
			o = createObservation(r);
			// load modifiers
			for(;;){
				if( rs.next() == false ){
					// last row
					finished = true;
					// observation complete
					break;
				}
				Row m = loadRow();
				if( isModifier(r, m) ){
					// TODO validate modifier, start should be equal to fact
					o.addModifier(m.mid, createValue(m));
				}else{
					// no modifier for main fact
					// fact is finished
					break;
				}
			}
		}catch( SQLException e ){
			errorHandler(e);
		}
		return o;
	}

	@Override
	public String getMeta(String key) {
		// TODO Auto-generated method stub
		return null;
	}

	@Override
	public void close() throws SQLException {
		Statement st = rs.getStatement();
		rs.close();
		st.close();
		dbc.close();
	}
	
	public void dump() throws SQLException{
		int count = 0;
		if( finished ){
			return;
		}
		do{
			count ++;
			StringBuilder b = new StringBuilder(200);
			b.append("row(");
			b.append(count);
			b.append("): ");
			b.append(rs.getInt(1));
			b.append(", ");
			b.append(rs.getInt(2));
			b.append(", ");
			b.append(rs.getInt(3));
			b.append(", ");
			b.append(rs.getString(4));
			b.append(", ");
			b.append(rs.getString(5));
			b.append(", tval=");
			b.append(rs.getString(12));
			System.out.println(b.toString());
		}while( rs.next() );
		finished = true;
		System.out.println("Count:"+count);
	}

}
+105 −0
Original line number Diff line number Diff line
package de.sekmi.histream.i2b2;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.logging.Logger;

import javax.sql.DataSource;

import de.sekmi.histream.ObservationFactory;

/**
 * Extract observations from i2b2.
 * <p>
 * Allows simple queries against the i2b2 observation_fact table
 * and retrieval of facts.
 * 
 * @author R.W.Majeed
 *
 */
public class I2b2ExtractorFactory implements AutoCloseable {
	private static final Logger log = Logger.getLogger(I2b2ExtractorFactory.class.getName());

	private DataSource ds;
	private Integer fetchSize;
	DataDialect dialect;
	private ObservationFactory observationFactory;
	
	private static String SELECT_PARAMETERS = "patient_num, encounter_num, instance_num, concept_cd, modifier_cd, provider_id, location_cd, start_date, end_date, RTRIM(valtype_cd) valtype_cd, tval_char, nval_num, RTRIM(valueflag_cd) valueflag_cd, units_cd, sourcesystem_cd";
	private static String SELECT_TABLE = "observation_fact";
	//private static String SELECT_ORDER_CHRONO = "ORDER BY start_date, patient_num, encounter_num, instance_num, modifier_cd NULLS FIRST";
	private static String SELECT_ORDER_GROUP = "ORDER BY patient_num, encounter_num, start_date, instance_num, concept_cd, modifier_cd NULLS FIRST";

	public I2b2ExtractorFactory(DataSource crc_ds, ObservationFactory factory) throws SQLException{
		// TODO implement
		this.observationFactory = factory;
		ds = crc_ds;
		dialect = new DataDialect();
	}
	public ObservationFactory getObservationFactory(){
		return observationFactory;
	}
	
	public PreparedStatement prepareStatement(Connection dbc, String sql) throws SQLException{
		PreparedStatement s = dbc.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
		if( fetchSize != null ){
			s.setFetchSize(fetchSize);
			s.setFetchDirection(ResultSet.FETCH_FORWARD);
		}
		return s;
	}

	public void setProperty(String property, Object value){
		// de.sekmi.histream.i2b2.extractor.project
	}
	/**
	 * Extract observations for given concept codes with 
	 * {@code observation.start} between start_min and start_end.
	 * 
	 * @param start_min start date of returned observations must be greater than start_min
	 * @param start_max start date of returned observations must be less than start_max
	 * @param concepts concept ids to extract
	 * @return extractor
	 * @throws SQLException error
	 */
	//@SuppressWarnings("resource")
	public I2b2Extractor extract(Timestamp start_min, Timestamp start_max, String[] concepts) throws SQLException{
		// TODO move connection and prepared statement to I2b2Extractor
		Connection dbc = ds.getConnection();
		PreparedStatement ps = null;
		ResultSet rs = null;
		try{
			dbc.setAutoCommit(false);
			StringBuilder b = new StringBuilder(600);
			b.append("SELECT ");
			b.append(SELECT_PARAMETERS+" FROM "+SELECT_TABLE+" ");
			b.append("WHERE start_date BETWEEN ? AND ? ");
			b.append(SELECT_ORDER_GROUP);
			log.info("SQL: "+b.toString());
	
			ps = prepareStatement(dbc, b.toString());
			ps.setTimestamp(1, start_min);
			ps.setTimestamp(2, start_max);
			rs = ps.executeQuery();
			return new I2b2Extractor(this, dbc, rs);
		}catch( SQLException e ){
			// clean up
			if( rs != null ){
				rs.close();
			}
			if( ps != null ){
				ps.close();
			}
			dbc.close();
			throw e;
		}
	}
	
	@Override
	public void close() throws SQLException {

	}
}
+16 −0
Original line number Diff line number Diff line
package de.sekmi.histream.i2b2;

import java.sql.SQLException;

public class UncheckedSQLException extends RuntimeException{

	private static final long serialVersionUID = 1L;

	public UncheckedSQLException(SQLException cause){
		super(cause);
	}
	@Override
	public SQLException getCause(){
		return (SQLException)super.getCause();
	}
}
+100 −0
Original line number Diff line number Diff line
package de.sekmi.histream.i2b2;

import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.Timestamp;
import java.util.logging.Logger;

import javax.sql.DataSource;

import de.sekmi.histream.ObservationFactory;
import de.sekmi.histream.impl.ObservationFactoryImpl;

public class TestExtractor implements DataSource{

	public static void main(String[] args) throws SQLException{
		TestExtractor t = new TestExtractor();
		ObservationFactory of = new ObservationFactoryImpl();
		try( I2b2ExtractorFactory ef = new I2b2ExtractorFactory(t, of) ){
			
			try( I2b2Extractor e = ef.extract(Timestamp.valueOf("2015-01-16 00:00:00"), Timestamp.valueOf("2015-01-17 00:00:00"), null) ){
				
				e.dump();
			}

			try( I2b2Extractor e = ef.extract(Timestamp.valueOf("2015-01-16 00:00:00"), Timestamp.valueOf("2015-01-17 00:00:00"), null) ){
		
				e.stream().forEach(System.out::println);
			}
			
			
		}
		
		
	}

	@Override
	public PrintWriter getLogWriter() throws SQLException {
		return new PrintWriter(System.out);
	}

	@Override
	public void setLogWriter(PrintWriter out) throws SQLException {
		// TODO Auto-generated method stub
		
	}

	@Override
	public void setLoginTimeout(int seconds) throws SQLException {
		// TODO Auto-generated method stub
		
	}

	@Override
	public int getLoginTimeout() throws SQLException {
		// TODO Auto-generated method stub
		return 0;
	}

	@Override
	public Logger getParentLogger() throws SQLFeatureNotSupportedException {
		// TODO Auto-generated method stub
		return null;
	}

	@Override
	public <T> T unwrap(Class<T> iface) throws SQLException {
		// TODO Auto-generated method stub
		return null;
	}

	@Override
	public boolean isWrapperFor(Class<?> iface) throws SQLException {
		// TODO Auto-generated method stub
		return false;
	}

	@Override
	public Connection getConnection() throws SQLException {
		final int defaultFetchSize = 10000;
		final String driver = "org.postgresql.Driver";
		final String uri = "jdbc:postgresql://localhost:15432/i2b2";
		try {
			Class.forName(driver);
		} catch (ClassNotFoundException e) {
			throw new SQLException(e);
		}
		Connection c = DriverManager.getConnection(uri, "i2b2crcdata", "demodata");
		return c;
	}

	@Override
	public Connection getConnection(String username, String password) throws SQLException {
		// TODO Auto-generated method stub
		return null;
	}
}