Commit 744fdcc5 authored by rwm's avatar rwm

File parsers read ahead any meta information.

parent 5b3370d2
package de.sekmi.histream.io;
import java.time.Instant;
import java.util.Hashtable;
import java.util.Map;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Supplier;
......@@ -16,23 +18,45 @@ public class AbstractObservationParser {
protected Instant sourceTimestamp;
protected String sourceId;
protected String etlStrategy;
private Map<String,String> meta;
public AbstractObservationParser(){
meta = new Hashtable<>();
}
public void setObservationFactory(ObservationFactory factory){
this.factory = factory;
/**
* Set meta information for this parser
* @param key
* @param value
*/
protected void setMeta(String key, String value){
if( value == null ){
// clear value, remove key
meta.remove(key);
}else{
meta.put(key, value);
}
switch( key ){
case "source.timestamp":
if( value == null )this.sourceTimestamp = null;
else this.sourceTimestamp = javax.xml.bind.DatatypeConverter.parseDateTime(value).toInstant();
break;
case "source.id":
this.sourceId = value;
break;
case "etl.strategy":
this.etlStrategy = value;
break;
}
}
protected void parseSourceTimestamp(String sourceTimestamp){
this.sourceTimestamp = javax.xml.bind.DatatypeConverter.parseDateTime(sourceTimestamp).toInstant();
}
protected void setSourceId(String sourceId){
this.sourceId = sourceId;
public String getMeta(String key){
return meta.get(key);
}
protected void setEtlStrategy(String strategy){
this.etlStrategy = strategy;
public void setObservationFactory(ObservationFactory factory){
this.factory = factory;
}
public static Spliterator<Observation> nonNullSpliterator(Supplier<Observation> supplier){
......
......@@ -4,6 +4,25 @@ import java.util.function.Supplier;
import de.sekmi.histream.Observation;
/**
* Converts a file into a supply of observations.
* <p>
* When an instance is constructed, meta information should be read from
* the file (e.g. etl strategy and other instructions)
* <p>
* TODO shouldn't this interface extend Closable?
* TODO maybe add error handler
* @author Raphael
*
*/
public interface FileObservationProvider extends Supplier<Observation>{
/**
* Retrieve meta information for this supply of observations.
* <p>
* Possible keys are source.id, source.timestamp, etl.strategy
* @param key meta key
* @return value for the meta key
*/
String getMeta(String key);
}
......@@ -40,9 +40,15 @@ public class FlatObservationProvider extends AbstractObservationParser implement
//static private Class<?>[] supportedExtensions = new Class<?>[]{Patient.class,Visit.class};
private Map<String, SpecialConcept> specialConcepts;
private Map<String, String> metaInfo;
private Observation fact;
private DateTimeAccuracy sourceDateTime;
/**
* Unprocessed line if non null (used to look ahead)
*/
private String prefetchLine;
//private DateTimeAccuracy sourceDateTime;
private static enum SpecialConcept{
......@@ -89,18 +95,38 @@ public class FlatObservationProvider extends AbstractObservationParser implement
public String getFlags(){return fields[10];}
}
public FlatObservationProvider(ObservationFactory factory, BufferedReader reader){
public FlatObservationProvider(ObservationFactory factory, BufferedReader reader) throws IOException{
setObservationFactory(factory);
this.reader = reader;
this.fieldSeparator = Pattern.compile("\\t");
this.metaAssignment = Pattern.compile("^#@meta\\(([a-z\\.]+)\\)=(.*)$");
this.specialConceptAssignment = Pattern.compile("^#@concept\\(([a-z\\.]+)\\)=(.*)$");
specialConcepts = new Hashtable<>();
metaInfo = new Hashtable<>();
fact = null;
lineNo = 0;
// read meta info
readMeta();
}
public FlatObservationProvider(ObservationFactory factory, InputStream input){
private void readMeta() throws IOException{
do{
prefetchLine = reader.readLine();
Matcher m = metaAssignment.matcher(prefetchLine);
if( m.matches() ){
// meta
setMeta(m.group(1), m.group(2));
prefetchLine = null;
}else{
break; // no more meta information
}
}while( true );
}
public FlatObservationProvider(ObservationFactory factory, InputStream input) throws IOException{
this(factory, new BufferedReader(new InputStreamReader(input)));
}
......@@ -108,20 +134,9 @@ public class FlatObservationProvider extends AbstractObservationParser implement
Matcher m = metaAssignment.matcher(line);
if( m.matches() ){
// meta
switch( m.group(1) ){
case "source.id":
setSourceId(m.group(2));
break;
case "source.timestamp":
parseSourceTimestamp(m.group(2));
this.sourceDateTime = new DateTimeAccuracy(LocalDateTime.ofInstant(sourceTimestamp, ZoneId.systemDefault()));
break;
case "etl.strategy":
setEtlStrategy(m.group(2));
break;
default:
throw new IllegalArgumentException("Unknown meta command in line "+lineNo+": "+line);
}
setMeta(m.group(1), m.group(2));
//this.sourceDateTime = new DateTimeAccuracy(LocalDateTime.ofInstant(sourceTimestamp, ZoneId.systemDefault()));
return;
}
m = specialConceptAssignment.matcher(line);
......@@ -135,12 +150,15 @@ public class FlatObservationProvider extends AbstractObservationParser implement
throw new IllegalArgumentException("Invalid command in line "+lineNo+": "+line);
}
private DateTimeAccuracy getSourceDateTime(){
return new DateTimeAccuracy(LocalDateTime.ofInstant(sourceTimestamp, ZoneId.systemDefault()));
}
private void specialFields(SpecialConcept special, Record record){
// create temporary observation
// which is only used to fill the special concepts
DateTimeAccuracy ts;
if( record.getStartDate() == null ){
ts = sourceDateTime;
ts = getSourceDateTime();
}else{
ts = DateTimeAccuracy.parsePartialIso8601(record.getStartDate());
}
......@@ -211,16 +229,17 @@ public class FlatObservationProvider extends AbstractObservationParser implement
private void newObservation(Record record){
DateTimeAccuracy ts;
DateTimeAccuracy sourceTs = getSourceDateTime();
if( record.getStartDate() == null ){
// first use source timestamp
ts = sourceDateTime;
ts = sourceTs;
// later update to visit timestamp
}else{
ts = DateTimeAccuracy.parsePartialIso8601(record.getStartDate());
}
fact = factory.createObservation(record.getPatID(), record.getConcept(), ts);
if( ts == sourceDateTime ){
if( ts == sourceTs ){
// try to use visit timestamp
ts = fact.getExtension(Visit.class).getStartTime();
if( ts != null )fact.setStartTime(ts);
......@@ -251,8 +270,13 @@ public class FlatObservationProvider extends AbstractObservationParser implement
boolean inGroup = false;
do{
try {
line = reader.readLine();
lineNo ++;
if( prefetchLine != null ){
line = prefetchLine;
prefetchLine = null;
}else{
line = reader.readLine();
lineNo ++;
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
......@@ -311,4 +335,9 @@ public class FlatObservationProvider extends AbstractObservationParser implement
return ret;
}
@Override
public String getMeta(String key) {
return metaInfo.get(key);
}
}
......@@ -34,8 +34,8 @@ class XMLObservationParser extends AbstractObservationParser{
// provider
protected String providerId;
protected String providerName;
//protected String providerId;
//protected String providerName;
// visit
protected DateTimeAccuracy encounterStart;
......@@ -177,8 +177,8 @@ class XMLObservationParser extends AbstractObservationParser{
}
protected void parseSource(AttributeAccessor atts){
parseSourceTimestamp(atts.getValue("timestamp"));
sourceId = atts.getValue("source");
setMeta("source.timestamp", atts.getValue("timestamp"));
setMeta("source.id", atts.getValue("source"));
}
protected void parseEncounter(AttributeAccessor atts){
encounterStart = DateTimeAccuracy.parsePartialIso8601(atts.getValue("start"));
......
......@@ -14,6 +14,9 @@ import javax.xml.stream.XMLStreamReader;
import de.sekmi.histream.Observation;
import de.sekmi.histream.ObservationFactory;
import de.sekmi.histream.impl.AbstractValue;
......@@ -21,11 +24,10 @@ import de.sekmi.histream.impl.AbstractValue;
public class XMLObservationProvider extends XMLObservationParser implements FileObservationProvider{
//private static final String namespaceURI = "http://sekmi.de/histream/dwh-eav";
private XMLStreamReader reader;
private boolean documentStart;
private AttributeAccessor atts;
public XMLObservationProvider(ObservationFactory factory, XMLStreamReader reader) {
public XMLObservationProvider(ObservationFactory factory, XMLStreamReader reader) throws XMLStreamException {
setObservationFactory(factory);
this.reader = reader;
atts = new AttributeAccessor() {
......@@ -35,7 +37,10 @@ public class XMLObservationProvider extends XMLObservationParser implements File
return reader.getAttributeValue(null, name);
}
};
documentStart = true;
// read start of document until start of visit
readToRoot();
readMeta();
readVisit();
}
public XMLObservationProvider(ObservationFactory factory, InputStream input) throws XMLStreamException, FactoryConfigurationError {
this(factory, XMLInputFactory.newInstance().createXMLStreamReader(input));
......@@ -59,6 +64,14 @@ public class XMLObservationProvider extends XMLObservationParser implements File
// read meta
reader.nextTag();
if( reader.getLocalName().equals("etl") ){
String etlStrategy = reader.getAttributeValue(null, "strategy");
// TODO use constants for etl.strategy, etc.
if( etlStrategy != null )setMeta("etl.strategy", etlStrategy);
reader.nextTag();
// should be end element
reader.nextTag();
}
if( reader.getLocalName().equals("source") ){
parseSource(atts);
reader.nextTag();
......@@ -104,12 +117,7 @@ public class XMLObservationProvider extends XMLObservationParser implements File
}
private Observation readObservation()throws XMLStreamException{
if( documentStart ){
readToRoot();
readMeta();
readVisit();
documentStart = false;
}
// </facts> might occur after previous call to readObservation()
while( reader.isEndElement() ){
switch( reader.getLocalName() ){
case "facts":
......
<dwh-eav xmlns="http://sekmi.de/histream/dwh-eav" etl-strategy="replace-visit">
<dwh-eav xmlns="http://sekmi.de/histream/dwh-eav">
<!-- chronologisch impliziert, dass der zeitstempel eines nachfolgenden elementes gr��er als alle vorangehenden elemente sein muss. Der Zeitstempel kann vor dem Encounter-Start liegen -->
<meta>
<!-- Zeitpunkt, an dem der Export erstellt wurde bzw. Datenstand -->
<etl strategy="replace-visit" />
<source timestamp="2015-04-21T08:58:00" system="test"/>
<enum concept="L:46098-0">
<value id="0">Male</value>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment