Commit 16c093a2 authored by R.W.Majeed's avatar R.W.Majeed
Browse files

prototype of observation supplier

parent 592211b6
package de.sekmi.histream.etl; package de.sekmi.histream.etl;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import de.sekmi.histream.Observation; import de.sekmi.histream.Observation;
import de.sekmi.histream.ObservationFactory;
import de.sekmi.histream.ObservationSupplier; import de.sekmi.histream.ObservationSupplier;
import de.sekmi.histream.etl.config.DataSource;
import de.sekmi.histream.etl.config.PatientTable;
import de.sekmi.histream.etl.config.VisitTable;
import de.sekmi.histream.etl.config.WideTable;
/**
* Supplier for observations which are loaded from arbitrary
* table data.
*
* <p>Algorithm</p>
* <ol>
* <li>read first patient and first visit. -> currentPatient,
* currentVisit</li>
*
* <li>For each concept table (including patient and visit tables):
* read first row, add all concepts from first row to concept queue,
* sort concept queue by patid, visitid, start</li>
*
* <li>process/remove all concepts with currentPatient and
* currentVisit</li>
*
* <li>if all concepts from one concept table are removed,
* fetch next row from that table, add concepts to queue and sort.
* Go to 3.</li>
*
* <li>if no more concepts for and currentVisit
* are in queue, fetch next visit. Go to 3.</li>
*
* <li>if no more concepts for currentPatient are in queue,
* fetch next patient. Go to 3.</li>
*
* <li>if queue empty (no more patient and visit) then
* done.</li>
* </ol>
*
*
* @author marap1
*
*/
public class ETLObservationSupplier implements ObservationSupplier{ public class ETLObservationSupplier implements ObservationSupplier{
private PatientTable pt;
private VisitTable vt;
private List<WideTable> wt;
private RecordSupplier<PatientRow> pr;
private RecordSupplier<VisitRow> vr;
private List<RecordSupplier<WideRow>> wr;
private FactGroupingQueue queue;
public ETLObservationSupplier(DataSource ds, ObservationFactory factory) throws IOException, ParseException {
pt = ds.getPatientTable();
vt = ds.getVisitTable();
wt = ds.getWideTables();
// in case of exception, make sure already opened suppliers are closed
try{
pr = pt.open(factory);
vr = vt.open(factory);
queue = new FactGroupingQueue(pr, vr);
// open all tables
wr = new ArrayList<>(wt.size());
for( WideTable t : wt ){
RecordSupplier<WideRow> s = t.open(factory);
queue.addFactTable(s);
wr.add(s);
}
queue.prepare();
}catch( IOException | UncheckedIOException | ParseException | UncheckedParseException e ){
try{
this.close();
}catch( IOException f ){
e.addSuppressed(f);
}
throw e;
}
}
@Override @Override
public Observation get() { public Observation get() {
// TODO Auto-generated method stub return queue.next();
return null;
} }
@Override @Override
public void close() throws Exception { public void close() throws IOException {
// TODO Auto-generated method stub IOException error = null;
if( pr != null ){
try{ pr.close(); }
catch( IOException e ){ error = e; }
pr=null;
}
if( vr != null ){
try{ vr.close(); }
catch( IOException e ){
if( error != null )error.addSuppressed(e);
else error = e;
}
vr=null;
}
Iterator<RecordSupplier<WideRow>> i = wr.iterator();
while( i.hasNext() ){
try{ i.next().close(); }
catch( IOException e ){
if( error != null )error.addSuppressed(e);
else error = e;
}
i.remove();
}
if( error != null )throw error;
} }
@Override @Override
......
package de.sekmi.histream.etl;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import de.sekmi.histream.Observation;
/**
* Algorithm:
* Get first row from patient and visit.
* Put first rows from each fact table in wait list
* For each table, process all rows belonging to the current patient and visit
* Try with next visit, repeat
* Try with next patient, repeat
*
* @author Raphael
*
*/
public class FactGroupingQueue{
private RecordSupplier<? extends FactRow> patientTable, visitTable;
private List<RecordSupplier<? extends FactRow>> factTables;
private FactRow currentPatient, nextVisit;
private String currentVisitId;
private List<FactRow> currentRows;
private Queue<Observation> workQueue;
/**
* Table index in factTables which was last loaded to workQueue
*/
private int tableIndex;
/**
* String comparison with nulls coming first
* @param v1 fist string
* @param v2 second string
* @return comparison result
*/
private static int compareWithNulls(String v1, String v2){
int c;
if( v1 == null && v2 == null )c = 0;
else if( v1 == null )c = -1;
else if( v2 == null )c = 1;
else c = v1.compareTo(v2);
return c;
}
public FactGroupingQueue(RecordSupplier<? extends FactRow> patientTable, RecordSupplier<? extends FactRow>visitTable){
this.patientTable = patientTable;
this.visitTable = visitTable;
this.factTables = new ArrayList<>();
this.workQueue = new ArrayDeque<>();
}
public void addFactTable(RecordSupplier<? extends FactRow> supplier){
if( supplier == patientTable || supplier == visitTable )throw new IllegalArgumentException("Cannot add patient or visit table as fact table");
if( factTables.contains(supplier) )throw new IllegalArgumentException("Supplier already added");
factTables.add(supplier);
}
/**
* Load first row from each table, fill and sort the observation queue
*/
public void prepare(){
currentRows = new ArrayList<>(factTables.size());
// load first rows
for( RecordSupplier<? extends FactRow> s : factTables ){
FactRow r = s.get();
currentRows.add(r);
}
tableIndex = 0;
currentPatient = patientTable.get();
addFactsToWorkQueue(currentPatient);
// for every patient, facts without visitId (=null) are parsed first
currentVisitId = null;
nextVisit = visitTable.get();
}
private void addFactsToWorkQueue(FactRow r){
for( Observation f : r.getFacts() ){
workQueue.add(f);
}
}
public Observation next(){
do{
if( !workQueue.isEmpty() ){
return workQueue.remove();
}
// queue is empty, try to find table with matching facts
while( tableIndex < factTables.size() ){
FactRow row = currentRows.get(tableIndex);
if( row == null ){
// table empty, remove tables from list
currentRows.remove(tableIndex);
factTables.remove(tableIndex);
continue; // index will now point to next table
}else if( row.getPatientId().equals(currentPatient.getPatientId()) && compareWithNulls(row.getVisitId(), currentVisitId) == 0 ){
// row fits into current group
addFactsToWorkQueue(row);
// prefetch next row
currentRows.set(tableIndex, factTables.get(tableIndex).get());
if( workQueue.isEmpty() ){
// no facts found
// can only happen if a fact table row contains no facts
// which can occur if some filters prevent the facts from being generated
// try to fetch next row
continue;
}
return workQueue.remove();
}else{
// no fitting facts in table index, try next index
tableIndex ++;
}
}
// no more fitting facts in current prefetched rows
// try to get next visit for current patient
if( nextVisit != null && nextVisit.getPatientId().equals(currentPatient.getPatientId()) ){
// next visit also belongs to currrent patient, continue
currentVisitId = nextVisit.getVisitId();
addFactsToWorkQueue(nextVisit);
nextVisit = visitTable.get();
tableIndex = 0;
// goto top
continue;
}else{
// next visit belongs to other patient (or no more visits), try to load next patient
currentVisitId = null;
currentPatient = patientTable.get();
if( currentPatient == null ){
// no more patients available and work queue empty
// we are done
break;
}
addFactsToWorkQueue(currentPatient);
tableIndex = 0;
// goto top
continue;
}
}while( factTables.size() > 0 || !workQueue.isEmpty() );
return null; // done
}
}
package de.sekmi.histream.etl;
import java.io.IOException;
import java.io.InputStream;
import javax.xml.bind.JAXB;
import org.junit.Before;
import org.junit.Test;
import de.sekmi.histream.Observation;
import de.sekmi.histream.ObservationFactory;
import de.sekmi.histream.etl.config.DataSource;
import de.sekmi.histream.impl.ObservationFactoryImpl;
public class TestETLSupplier {
private DataSource ds;
private ObservationFactory of;
@Before
public void loadConfiguration() throws IOException{
try( InputStream in = getClass().getResourceAsStream("/test-1-datasource.xml") ){
ds = JAXB.unmarshal(in, DataSource.class);
}
of = new ObservationFactoryImpl();
}
@Test
public void testReadFacts() throws IOException, ParseException{
ETLObservationSupplier s = new ETLObservationSupplier(ds,of);
while( true ){
Observation fact = s.get();
if( fact == null )break;
StringBuilder debug_str = new StringBuilder();
debug_str.append(fact.getPatientId()).append('\t');
debug_str.append(fact.getEncounterId()).append('\t');
debug_str.append(fact.getStartTime()).append('\t');
debug_str.append(fact.getConceptId()).append('\t');
debug_str.append(fact.getValue());
System.out.println(debug_str.toString());
}
s.close();
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment