Commit 56150288 authored by R.W.Majeed's avatar R.W.Majeed

use source timestamps from import tables/files

parent 7e111ab7
......@@ -15,18 +15,34 @@ public class ExternalSourceImpl implements ExternalSourceType {
private Instant timestamp;
private String id;
public static class Adapter extends XmlAdapter<String, Instant>{
/**
* Empty constructor for JAXB
*/
public ExternalSourceImpl(){
}
/**
* Create external source
* @param id id string
* @param timestamp timestamp
*/
public ExternalSourceImpl(String id, Instant timestamp){
this();
this.id = id;
this.timestamp = timestamp;
}
private static class Adapter extends XmlAdapter<String, Instant>{
@Override
public Instant unmarshal(String v) throws Exception {
if( v == null )return null;
return javax.xml.bind.DatatypeConverter.parseDateTime(v).toInstant();
}
@Override
public String marshal(Instant v) throws Exception {
if( v == null )return null;
return v.toString();
}
}
@XmlAttribute(name="timestamp")
......
......@@ -37,12 +37,16 @@ public class TestXMLWriter {
public void testWriteMeta(){
Meta meta = new Meta();
meta.etlStrategy = "lala";
meta.source = new ExternalSourceImpl();
meta.source.setSourceId("sid");
meta.source.setSourceTimestamp(Instant.now());
meta.source = new ExternalSourceImpl("sid", Instant.now());
meta.order = new Meta.Order(true,false);
JAXB.marshal(meta, System.out);
meta = new Meta();
meta.etlStrategy = "lala";
meta.source = new ExternalSourceImpl("sid", null);
meta.order = null;
JAXB.marshal(meta, System.out);
}
@Test
......
......@@ -62,25 +62,29 @@ public class ETLObservationSupplier implements ObservationSupplier{
private FactGroupingQueue queue;
private DataSource ds;
public ETLObservationSupplier(DataSource ds, ObservationFactory factory) throws IOException, ParseException {
this.ds = ds;
pt = ds.getPatientTable();
vt = ds.getVisitTable();
wt = ds.getWideTables();
// TODO long tables
String sourceId = ds.getMeta().getSourceId();
// in case of exception, make sure already opened suppliers are closed
try{
pr = pt.open(factory);
vr = vt.open(factory);
pr = pt.open(factory, sourceId);
vr = vt.open(factory, sourceId);
queue = new FactGroupingQueue(pr, vr,
factory.getExtensionAccessor(Patient.class),
factory.getExtensionAccessor(Visit.class),
ds.getMeta().getSource());
factory.getExtensionAccessor(Visit.class));
// open all tables
wr = new ArrayList<>(wt.size());
for( WideTable t : wt ){
RecordSupplier<WideRow> s = t.open(factory);
RecordSupplier<WideRow> s = t.open(factory, sourceId);
queue.addFactTable(s);
wr.add(s);
}
......@@ -117,22 +121,33 @@ public class ETLObservationSupplier implements ObservationSupplier{
}
vr=null;
}
Iterator<RecordSupplier<WideRow>> i = wr.iterator();
while( i.hasNext() ){
try{ i.next().close(); }
catch( IOException e ){
if( error != null )error.addSuppressed(e);
else error = e;
if( wr != null ){
Iterator<RecordSupplier<WideRow>> i = wr.iterator();
while( i.hasNext() ){
try{ i.next().close(); }
catch( IOException e ){
if( error != null )error.addSuppressed(e);
else error = e;
}
i.remove();
}
i.remove();
}
if( error != null )throw error;
}
@Override
public String getMeta(String arg0) {
// TODO Auto-generated method stub
return null;
public String getMeta(String key) {
switch( key ){
case ObservationSupplier.META_ETL_STRATEGY:
return ds.getMeta().getETLStrategy();
case ObservationSupplier.META_SOURCE_ID:
return ds.getMeta().getSourceId();
case ObservationSupplier.META_ORDER_GROUPED:
return "true";
default:
return null;
}
}
}
......@@ -8,7 +8,6 @@ import java.util.Queue;
import de.sekmi.histream.ExtensionAccessor;
import de.sekmi.histream.Observation;
import de.sekmi.histream.ext.ExternalSourceType;
import de.sekmi.histream.ext.Patient;
import de.sekmi.histream.ext.Visit;
......@@ -29,7 +28,6 @@ public class FactGroupingQueue{
private ExtensionAccessor<Patient> patientAccessor;
private ExtensionAccessor<Visit> visitAccessor;
private ExternalSourceType metaSource;
private List<RecordSupplier<? extends FactRow>> factTables;
......@@ -64,7 +62,7 @@ public class FactGroupingQueue{
}
public FactGroupingQueue(RecordSupplier<PatientRow> patientTable, RecordSupplier<VisitRow>visitTable, ExtensionAccessor<Patient> patientAccessor, ExtensionAccessor<Visit> visitAccessor, ExternalSourceType metaSource){
public FactGroupingQueue(RecordSupplier<PatientRow> patientTable, RecordSupplier<VisitRow>visitTable, ExtensionAccessor<Patient> patientAccessor, ExtensionAccessor<Visit> visitAccessor){
this.patientTable = patientTable;
Objects.requireNonNull(patientAccessor);
Objects.requireNonNull(visitAccessor);
......@@ -73,7 +71,6 @@ public class FactGroupingQueue{
this.visitTable = visitTable;
this.factTables = new ArrayList<>();
this.workQueue = new ArrayDeque<>();
this.metaSource = metaSource;
}
public void addFactTable(RecordSupplier<? extends FactRow> supplier){
if( supplier == patientTable || supplier == visitTable )throw new IllegalArgumentException("Cannot add patient or visit table as fact table");
......@@ -85,7 +82,7 @@ public class FactGroupingQueue{
* Current patient changed: {@link #currentPatient}
*/
private void patientChanged(){
currentPatientInstance = patientAccessor.accessStatic(currentPatient.getPatientId(), metaSource);
currentPatientInstance = patientAccessor.accessStatic(currentPatient.getPatientId(), patientTable.getSource());
currentPatientInstance.setBirthDate(currentPatient.getBirthDate());
currentPatientInstance.setDeathDate(currentPatient.getDeathDate());
currentPatientInstance.setSex(currentPatient.getSex());
......@@ -106,7 +103,7 @@ public class FactGroupingQueue{
// TODO later support facts without encounter
}else{
// sync visit with extension factory
currentVisitInstance = visitAccessor.accessStatic(currentVisitId, currentPatientInstance, metaSource);
currentVisitInstance = visitAccessor.accessStatic(currentVisitId, currentPatientInstance, visitTable.getSource());
currentVisitInstance.setStartTime(nextVisit.getStartTime());
currentVisitInstance.setEndTime(nextVisit.getEndTime());
currentVisitInstance.setLocationId(nextVisit.getLocationId());
......
package de.sekmi.histream.etl;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.net.URL;
import java.time.Instant;
import java.util.regex.Pattern;
public class FileRowSupplier extends RowSupplier {
private Pattern fieldSeparatorPattern;
private BufferedReader in;
private String[] headers;
private Instant timestamp;
public FileRowSupplier(URL location, String fieldSeparator) throws IOException{
this(location, Pattern.compile(Pattern.quote(fieldSeparator)));
}
public FileRowSupplier(URL location, Pattern pattern) throws IOException{
this.fieldSeparatorPattern = pattern;
this.in = new BufferedReader(new InputStreamReader(location.openStream()));
......@@ -24,8 +28,33 @@ public class FileRowSupplier extends RowSupplier {
// load headers
String line = in.readLine();
this.headers = fieldSeparatorPattern.split(line);
determineFileTimestamp(location);
}
private void determineFileTimestamp(URL url) throws IOException{
if( url.getProtocol().equals("file") ){
// get file timestamp
/*
Path path;
try {
path = new File(url.toURI()).toPath();
//Paths.get(url.getPath()); does not work with URLs like file:/C:/lala
} catch (URISyntaxException e) {
throw new IOException(e);
}
BasicFileAttributes atts = Files.readAttributes(path, BasicFileAttributes.class);
this.timestamp = atts.creationTime().toInstant();
*/
this.timestamp = Instant.ofEpochMilli(new File(url.getPath()).lastModified());
}else{
throw new IOException("Unable to determine timestamp for URL: "+url);
// TODO e.g. use URLConnection to get timestamp
}
}
@Override
public String[] getHeaders() {
return headers;
......@@ -53,4 +82,9 @@ public class FileRowSupplier extends RowSupplier {
in.close();
}
@Override
public Instant getTimestamp() {
return timestamp;
}
}
......@@ -3,22 +3,29 @@ package de.sekmi.histream.etl;
import java.io.IOException;
import java.util.function.Supplier;
import de.sekmi.histream.Observation;
import de.sekmi.histream.ObservationFactory;
import de.sekmi.histream.etl.config.Table;
import de.sekmi.histream.ext.ExternalSourceType;
import de.sekmi.histream.impl.ExternalSourceImpl;
public class RecordSupplier<R extends FactRow> implements Supplier<R>, AutoCloseable{
private RowSupplier rows;
private Table<R> table;
private ColumnMap map;
private ObservationFactory factory;
private ExternalSourceType source;
public RecordSupplier(RowSupplier rows, Table<R> table, ObservationFactory factory)throws ParseException{
public RecordSupplier(RowSupplier rows, Table<R> table, ObservationFactory factory, String sourceId)throws ParseException{
this.rows = rows;
this.table = table;
this.map = table.getColumnMap(rows.getHeaders());
this.factory = factory;
this.source = new ExternalSourceImpl(sourceId, rows.getTimestamp());
}
public final ExternalSourceType getSource(){ return this.source;}
@Override
public void close() throws IOException {
rows.close();
......@@ -38,6 +45,10 @@ public class RecordSupplier<R extends FactRow> implements Supplier<R>, AutoClose
} catch (ParseException e) {
throw new UncheckedParseException(e);
}
// fill source information
for( Observation o : p.getFacts() ){
o.setSource(source);
}
return p;
}
......
package de.sekmi.histream.etl;
import java.io.IOException;
import java.time.Instant;
import java.util.function.Supplier;
public abstract class RowSupplier implements Supplier<Object[]>, AutoCloseable{
......@@ -15,4 +16,11 @@ public abstract class RowSupplier implements Supplier<Object[]>, AutoCloseable{
@Override
public abstract void close() throws IOException;
/**
* Get the timestamp for the source. Multiple calls to this method should return the
* same timestamp. The source is not allowed to change during reading.
* @return timestamp
*/
public abstract Instant getTimestamp();
}
......@@ -34,4 +34,5 @@ public class FileSource extends TableSource{
public RowSupplier rows() throws IOException {
return new FileRowSupplier(url, Pattern.compile(separator));
}
}
package de.sekmi.histream.etl.config;
import java.time.Instant;
import java.util.Calendar;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlElement;
import de.sekmi.histream.ext.ExternalSourceType;
public class Meta {
@XmlElement
Source source;
String id;
@XmlElement(name="etl-strategy")
String etlStrategy;
public static class Source implements ExternalSourceType{
@XmlAttribute
String id;
@XmlAttribute
Calendar timestamp;
@Override
public String getSourceId() {return id;}
@Override
public Instant getSourceTimestamp() {
return timestamp.toInstant();
}
@Override
public void setSourceId(String arg0) {this.id = arg0;}
@Override
public void setSourceTimestamp(Instant instant) {
this.timestamp = Calendar.getInstance();
this.timestamp.setTimeInMillis(instant.toEpochMilli());
}
}
protected Meta(){
}
public Meta(String etlStrategy, String sourceId, Calendar sourceTimestamp){
public Meta(String etlStrategy, String sourceId){
this.etlStrategy = etlStrategy;
this.source = new Source();
this.source.timestamp = sourceTimestamp;
this.source.id = sourceId;
this.id = sourceId;
}
public Source getSource(){
return this.source;
public String getSourceId(){
return this.id;
}
public String getETLStrategy(){
return etlStrategy;
}
}
package de.sekmi.histream.etl.config;
import javax.xml.bind.annotation.XmlElement;
import de.sekmi.histream.etl.RowSupplier;
......@@ -23,5 +24,4 @@ public class SQLSource extends TableSource {
// TODO Auto-generated method stub
return null;
}
}
......@@ -54,8 +54,8 @@ public abstract class Table<T extends FactRow> {
public abstract T fillRecord(ColumnMap map, Object[] row, ObservationFactory factory) throws ParseException;
public RecordSupplier<T> open(ObservationFactory factory) throws IOException, ParseException{
return new RecordSupplier<>(source.rows(), this, factory);
public RecordSupplier<T> open(ObservationFactory factory, String sourceId) throws IOException, ParseException{
return new RecordSupplier<>(source.rows(), this, factory, sourceId);
}
......
......@@ -2,6 +2,7 @@ package de.sekmi.histream.etl.config;
import java.io.IOException;
import java.time.Instant;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
......@@ -15,6 +16,11 @@ import de.sekmi.histream.etl.RowSupplier;
@XmlSeeAlso({FileSource.class, SQLSource.class})
public abstract class TableSource{
/**
*
* @return
* @throws IOException
*/
public abstract RowSupplier rows() throws IOException;
// TODO allow table sources to specify timestamp
}
\ No newline at end of file
......@@ -17,6 +17,7 @@ import de.sekmi.histream.ObservationFactory;
import de.sekmi.histream.etl.config.DataSource;
import de.sekmi.histream.ext.Patient;
import de.sekmi.histream.ext.Visit;
import de.sekmi.histream.impl.Meta;
import de.sekmi.histream.impl.ObservationFactoryImpl;
import de.sekmi.histream.impl.SimplePatientExtension;
import de.sekmi.histream.impl.SimpleVisitExtension;
......@@ -47,7 +48,8 @@ public class TestETLSupplier {
@Test
public void testXMLConversion() throws Exception{
XMLWriter w = new XMLWriter(System.out);
// TODO transfer meta information
// transfer meta information
Meta.transfer(os, w);
StreamSupport.stream(AbstractObservationParser.nonNullSpliterator(os), false).forEach(w);
w.close();
}
......
......@@ -4,7 +4,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Calendar;
import javax.xml.bind.JAXB;
......@@ -22,7 +21,7 @@ public class TestMarshall {
DataSource ds = JAXB.unmarshal(in, DataSource.class);
Assert.assertNotNull(ds.meta);
Assert.assertEquals("replace-source",ds.meta.etlStrategy);
Assert.assertEquals("test-1",ds.meta.source.id);
Assert.assertEquals("test-1",ds.meta.getSourceId());
// patient table
Assert.assertNotNull(ds.patientTable);
Assert.assertNotNull(ds.patientTable.source);
......@@ -58,7 +57,7 @@ public class TestMarshall {
@Test
public void testMarshal() throws MalformedURLException{
DataSource s = new DataSource();
s.meta = new Meta("replace-source","SID",Calendar.getInstance());
s.meta = new Meta("replace-source","SID");
s.xmlSources = new XmlSource[1];
s.xmlSources[0] = new XmlSource();
s.xmlSources[0].url = new URL("http://lala");
......
......@@ -17,6 +17,7 @@ import de.sekmi.histream.etl.PatientRow;
import de.sekmi.histream.etl.RecordSupplier;
import de.sekmi.histream.etl.VisitRow;
import de.sekmi.histream.etl.WideRow;
import de.sekmi.histream.ext.ExternalSourceType;
import de.sekmi.histream.impl.ObservationFactoryImpl;
import org.junit.Assert;
......@@ -36,7 +37,7 @@ public class TestReadTables {
@Test
public void testReadPatients() throws IOException, ParseException{
try( RecordSupplier<PatientRow> s = ds.patientTable.open(of) ){
try( RecordSupplier<PatientRow> s = ds.patientTable.open(of,ds.getMeta().getSourceId()) ){
PatientRow r = s.get();
Assert.assertEquals("p1", r.getId());
Assert.assertEquals(2003, r.getBirthDate().get(ChronoField.YEAR));
......@@ -45,7 +46,7 @@ public class TestReadTables {
}
@Test
public void testReadVisits() throws IOException, ParseException{
try( RecordSupplier<VisitRow> s = ds.visitTable.open(of) ){
try( RecordSupplier<VisitRow> s = ds.visitTable.open(of,ds.getMeta().getSourceId()) ){
VisitRow r = s.get();
Assert.assertEquals("v1", r.getId());
Assert.assertEquals(2013, r.getStartTime().get(ChronoField.YEAR));
......@@ -54,7 +55,7 @@ public class TestReadTables {
}
@Test
public void testReadWideTable() throws IOException, ParseException{
try( RecordSupplier<WideRow> s = ds.wideTables[0].open(of) ){
try( RecordSupplier<WideRow> s = ds.wideTables[0].open(of,ds.getMeta().getSourceId()) ){
WideRow r = s.get();
Assert.assertNotNull(r);
Assert.assertTrue(r.getFacts().size() > 0);
......@@ -62,6 +63,10 @@ public class TestReadTables {
Assert.assertEquals("natrium", o.getConceptId());
Assert.assertEquals(Value.Type.Numeric, o.getValue().getType());
Assert.assertEquals(BigDecimal.valueOf(124), o.getValue().getNumericValue());
ExternalSourceType e = o.getSource();
Assert.assertNotNull(e);
Assert.assertEquals("test-1", e.getSourceId());
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<datasource version="1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >
<meta>
<id>test-1</id>
<etl-strategy>replace-source</etl-strategy>
<source timestamp="2015-04-21T08:58:00" id="test-1"/>
</meta>
<!-- erstmal weglassen -->
<transformations>
......@@ -83,7 +83,7 @@
<start format="d.M.u[ H[:m[:s]]]">zeitpunkt</start>
</concept>
</mdat>
..
</wide-table>
</datasource>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment