Commit e1051f53 authored by R.W.Majeed's avatar R.W.Majeed
Browse files

observation supplier supports patient extension

parent b5f81db2
......@@ -50,8 +50,7 @@ public class SimplePatientExtension implements Extension<PatientImpl>{
@Override
public PatientImpl createInstance(Observation observation) {
PatientImpl patient = createInstance();
patient.setId(observation.getPatientId());
PatientImpl patient = createInstance(observation.getPatientId(), observation);
return patient;
}
......
......@@ -13,6 +13,7 @@ import de.sekmi.histream.etl.config.DataSource;
import de.sekmi.histream.etl.config.PatientTable;
import de.sekmi.histream.etl.config.VisitTable;
import de.sekmi.histream.etl.config.WideTable;
import de.sekmi.histream.ext.Patient;
/**
* Supplier for observations which are loaded from arbitrary
......@@ -64,12 +65,13 @@ public class ETLObservationSupplier implements ObservationSupplier{
pt = ds.getPatientTable();
vt = ds.getVisitTable();
wt = ds.getWideTables();
// TODO long tables
// in case of exception, make sure already opened suppliers are closed
try{
pr = pt.open(factory);
vr = vt.open(factory);
queue = new FactGroupingQueue(pr, vr);
queue = new FactGroupingQueue(pr, vr, factory.getExtensionAccessor(Patient.class), ds.getMeta().getSource());
// open all tables
wr = new ArrayList<>(wt.size());
......
......@@ -3,9 +3,13 @@ package de.sekmi.histream.etl;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import de.sekmi.histream.ExtensionAccessor;
import de.sekmi.histream.Observation;
import de.sekmi.histream.ext.ExternalSourceType;
import de.sekmi.histream.ext.Patient;
/**
* Algorithm:
......@@ -20,9 +24,15 @@ import de.sekmi.histream.Observation;
*/
public class FactGroupingQueue{
private RecordSupplier<? extends FactRow> patientTable, visitTable;
private ExtensionAccessor<Patient> patientAccessor;
private ExternalSourceType metaSource;
private List<RecordSupplier<? extends FactRow>> factTables;
private FactRow currentPatient, nextVisit;
private Patient currentPatientInstance;
private String currentVisitId;
private List<FactRow> currentRows;
private Queue<Observation> workQueue;
......@@ -48,12 +58,14 @@ public class FactGroupingQueue{
}
public FactGroupingQueue(RecordSupplier<? extends FactRow> patientTable, RecordSupplier<? extends FactRow>visitTable){
public FactGroupingQueue(RecordSupplier<? extends FactRow> patientTable, RecordSupplier<? extends FactRow>visitTable, ExtensionAccessor<Patient> patientAccessor, ExternalSourceType metaSource){
this.patientTable = patientTable;
this.patientAccessor = patientAccessor;
Objects.requireNonNull(patientAccessor);
this.visitTable = visitTable;
this.factTables = new ArrayList<>();
this.workQueue = new ArrayDeque<>();
this.metaSource = metaSource;
}
public void addFactTable(RecordSupplier<? extends FactRow> supplier){
if( supplier == patientTable || supplier == visitTable )throw new IllegalArgumentException("Cannot add patient or visit table as fact table");
......@@ -61,6 +73,31 @@ public class FactGroupingQueue{
factTables.add(supplier);
}
/**
* Current patient changed: {@link #currentPatient}
*/
private void patientChanged(){
currentPatientInstance = patientAccessor.accessStatic(currentPatient.getPatientId(), metaSource);
// TODO sync patient with extension factory
addFactsToWorkQueue(currentPatient);
}
/**
* Current visit changed. Current visit id is in {@link #currentVisitId}.
* For facts without visit information, {@link #currentVisitId} may be null.
* If {@link #currentVisitId} is not null, nextVisit will contain the current visit's information.
*/
private void visitChanged(){
// TODO for facts contained in visit, add facts to work queue
if( currentVisitId == null ){
// set visit extension to null
}else{
// TODO sync visit with extension factory
addFactsToWorkQueue(nextVisit);
}
}
/**
* Load first row from each table, fill and sort the observation queue
*/
......@@ -74,17 +111,19 @@ public class FactGroupingQueue{
tableIndex = 0;
currentPatient = patientTable.get();
// TODO sync patient with extension factory
addFactsToWorkQueue(currentPatient);
patientChanged();
// for every patient, facts without visitId (=null) are parsed first
currentVisitId = null;
visitChanged();
nextVisit = visitTable.get();
// TODO sync visit with extension factory
}
private void addFactsToWorkQueue(FactRow r){
for( Observation f : r.getFacts() ){
// set patient extension
patientAccessor.set(f, currentPatientInstance);
workQueue.add(f);
}
}
......@@ -126,8 +165,8 @@ public class FactGroupingQueue{
if( nextVisit != null && nextVisit.getPatientId().equals(currentPatient.getPatientId()) ){
// next visit also belongs to current patient, continue
currentVisitId = nextVisit.getVisitId();
// TODO: sync visit with extension factory
addFactsToWorkQueue(nextVisit);
visitChanged();
nextVisit = visitTable.get();
tableIndex = 0;
// goto top
......@@ -141,8 +180,9 @@ public class FactGroupingQueue{
// we are done
break;
}
// TODO: sync patient with extension factory
addFactsToWorkQueue(currentPatient);
patientChanged();
visitChanged();
tableIndex = 0;
// goto top
continue;
......
......@@ -5,32 +5,46 @@ import java.io.InputStream;
import javax.xml.bind.JAXB;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import de.sekmi.histream.Observation;
import de.sekmi.histream.ObservationFactory;
import de.sekmi.histream.etl.config.DataSource;
import de.sekmi.histream.ext.Patient;
import de.sekmi.histream.impl.ObservationFactoryImpl;
import de.sekmi.histream.impl.SimplePatientExtension;
import de.sekmi.histream.impl.SimpleVisitExtension;
public class TestETLSupplier {
private DataSource ds;
private ObservationFactory of;
private ObservationFactory of ;
private ETLObservationSupplier os;
@Before
public void loadConfiguration() throws IOException{
public void loadConfiguration() throws IOException, ParseException{
try( InputStream in = getClass().getResourceAsStream("/test-1-datasource.xml") ){
ds = JAXB.unmarshal(in, DataSource.class);
}
of = new ObservationFactoryImpl();
of.registerExtension(new SimplePatientExtension());
of.registerExtension(new SimpleVisitExtension());
os = new ETLObservationSupplier(ds,of);
}
@After
public void freeResources() throws IOException{
os.close();
}
@Test
public void testReadFacts() throws IOException, ParseException{
ETLObservationSupplier s = new ETLObservationSupplier(ds,of);
public void testReadFacts() throws IOException{
while( true ){
Observation fact = s.get();
Observation fact = os.get();
if( fact == null )break;
StringBuilder debug_str = new StringBuilder();
debug_str.append(fact.getPatientId()).append('\t');
debug_str.append(fact.getEncounterId()).append('\t');
......@@ -41,6 +55,16 @@ public class TestETLSupplier {
System.out.println(debug_str.toString());
// TODO test patient extension, visit extension
}
s.close();
}
@Test
public void testPatientExtension() throws IOException{
Observation fact = os.get();
Assert.assertNotNull(fact);
Patient p = fact.getExtension(Patient.class);
Assert.assertNotNull(p);
Assert.assertEquals("p1", p.getId());
// TODO verify other patient information
}
}
......@@ -38,7 +38,7 @@ public class TestReadTables {
public void testReadPatients() throws IOException, ParseException{
try( RecordSupplier<PatientRow> s = ds.patientTable.open(of) ){
PatientRow r = s.get();
Assert.assertEquals("1", r.getId());
Assert.assertEquals("p1", r.getId());
Assert.assertEquals(2003, r.getBirthDate().get(ChronoField.YEAR));
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment