Commit eccf3776 authored by rwm's avatar rwm

ObservationParsers utilize Spliterator and java.util.stream, flat file parsing works

parent d96e585a
......@@ -26,6 +26,7 @@ public interface Observation extends ConceptValuePair, ExternalSourceType{
DateTimeAccuracy getStartTime();
DateTimeAccuracy getEndTime();
void setEndTime(DateTimeAccuracy date);
void setStartTime(DateTimeAccuracy date);
ObservationFactory getFactory();
<T> T getExtension(Class<T> extensionType) throws IllegalArgumentException;
......
......@@ -13,6 +13,10 @@ public class NumericValue extends AbstractValue {
public NumericValue(BigDecimal value){
this(value, Operator.Equal);
}
public NumericValue(BigDecimal value, String units){
this(value, Operator.Equal);
this.units = units;
}
public NumericValue(BigDecimal value, Operator operator){
this.value = value;
......
......@@ -72,7 +72,8 @@ public class ObservationImpl implements Observation{
@Override
public DateTimeAccuracy getStartTime() {return startTime;}
void setStartTime(DateTimeAccuracy startTime){this.startTime = startTime;}
@Override
public void setStartTime(DateTimeAccuracy startTime){this.startTime = startTime;}
@Override
public DateTimeAccuracy getEndTime() {return endTime;}
......
package de.sekmi.histream.io;
import java.time.Instant;
import de.sekmi.histream.ObservationFactory;
public class AbstractObservationParser {
protected ObservationFactory factory;
// meta
protected Instant sourceTimestamp;
protected String sourceId;
protected String etlStrategy;
public AbstractObservationParser(ObservationFactory factory){
this.factory = factory;
}
protected void parseSourceTimestamp(String sourceTimestamp){
this.sourceTimestamp = javax.xml.bind.DatatypeConverter.parseDateTime(sourceTimestamp).toInstant();
}
protected void setSourceId(String sourceId){
this.sourceId = sourceId;
}
protected void setEtlStrategy(String strategy){
this.etlStrategy = strategy;
}
}
package de.sekmi.histream.io;
import java.util.function.Consumer;
import java.util.regex.Pattern;
/**
* Converts a stream of lines to flat observation events
* @author Raphael
*
*/
public class FlatObservationLineReader implements Consumer<String>{
private FlatObservationWriter writer;
private Pattern fieldSeparator;
private static final int maxFields = 8;
public FlatObservationLineReader(FlatObservationWriter writer){
this.writer = writer;
this.fieldSeparator = Pattern.compile("\\t");
}
@Override
public void accept(String line) {
if( line.length() == 0 ){
// empty line
return;
}
char first = line.charAt(0);
if( first == '#' ){
// comment
}else if( first == '@' ){
// command
// TODO @meta, @concept, @group
}else{
String[] fields = fieldSeparator.split(line, maxFields);
writer.writeObservation(fields);
}
}
}
package de.sekmi.histream.io;
import java.util.Hashtable;
import java.util.Map;
import de.sekmi.histream.ext.Patient;
import de.sekmi.histream.ext.Visit;
import de.sekmi.histream.impl.AbstractObservationProvider;
/**
* Reads a flat file with content type text/tab-separated-values.
* TODO rewrite to pull parser (e.g. implementing Spliterator)
* @author Raphael
*
*/
public class FlatObservationProvider extends AbstractObservationProvider implements FlatObservationWriter {
static private Class<?>[] supportedExtensions = new Class<?>[]{Patient.class,Visit.class};
private Map<String, String> meta;
private Map<String, String> conceptMap;
public FlatObservationProvider() {
meta = new Hashtable<>();
}
@Override
public Class<?>[] getSupportedExtensions() {
return supportedExtensions;
}
@Override
public void writeMeta(String key, String value) {
meta.put(key, value);
}
@Override
public void writeConceptMap(String concept, String map) {
conceptMap.put(concept, map);
}
@Override
public void beginGroup() {
// TODO build group of following observations
}
@Override
public void endGroup() {
// TODO group finished, provide observation
}
@Override
public void writeObservation(String[] fields) {
// TODO Auto-generated method stub
}
}
package de.sekmi.histream.io;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Hashtable;
import java.util.Map;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import de.sekmi.histream.DateTimeAccuracy;
import de.sekmi.histream.Observation;
import de.sekmi.histream.ObservationFactory;
import de.sekmi.histream.Value;
import de.sekmi.histream.ext.Patient;
import de.sekmi.histream.ext.Patient.Sex;
import de.sekmi.histream.ext.Visit;
import de.sekmi.histream.impl.AbstractValue;
import de.sekmi.histream.impl.NumericValue;
import de.sekmi.histream.impl.StringValue;
public class FlatObservationSpliterator extends AbstractObservationParser implements Spliterator<Observation>{
private BufferedReader reader;
private Pattern fieldSeparator;
private Pattern metaAssignment;
private Pattern specialConceptAssignment;
private long lineNo;
private static final int maxFields = 11;
//static private Class<?>[] supportedExtensions = new Class<?>[]{Patient.class,Visit.class};
private Map<String, SpecialConcept> specialConcepts;
private Observation fact;
private DateTimeAccuracy sourceDateTime;
private static enum SpecialConcept{
PatientNames("patient.names"),
PatientSurname("patient.surname"),
PatientSex("patient.sex"),
PatientBirthDate("patient.birthdate"),
PatientDeathDate("patient.deathdate"),
Visit("visit");
private final String id;
SpecialConcept(String id){
this.id = id;
}
private static SpecialConcept byId(String id){
for( SpecialConcept c : SpecialConcept.values() ){
if( id.equals(c.id) )return c;
}
return null;
}
}
private final static class Record{
String fields[];
public Record(String fields[]){
this.fields = new String[11];
for( int i=0; i<fields.length; i++ ){
if( fields[i].length() == 0 || fields[i].equals("@") )fields[i] = null;
this.fields[i] = fields[i];
}
}
public String getPatID(){return fields[0];}
public String getVisitID(){return fields[1];}
public String getConcept(){return fields[2];}
public String getType(){return fields[3];}
public String getValue(){return fields[4];}
public String getUnits(){return fields[5];}
public String getStartDate(){return fields[6];}
public String getEndDate(){return fields[7];}
public String getProvider(){return fields[8];}
public String getLocation(){return fields[9];}
public String getFlags(){return fields[10];}
}
public FlatObservationSpliterator(ObservationFactory factory, BufferedReader reader){
super(factory);
this.reader = reader;
this.fieldSeparator = Pattern.compile("\\t");
this.metaAssignment = Pattern.compile("^#@meta\\(([a-z\\.]+)\\)=(.*)$");
this.specialConceptAssignment = Pattern.compile("^#@concept\\(([a-z\\.]+)\\)=(.*)$");
specialConcepts = new Hashtable<>();
fact = null;
lineNo = 0;
}
public FlatObservationSpliterator(ObservationFactory factory, InputStream input){
this(factory, new BufferedReader(new InputStreamReader(input)));
}
private void parseCommand(String line){
Matcher m = metaAssignment.matcher(line);
if( m.matches() ){
// meta
switch( m.group(1) ){
case "source.id":
setSourceId(m.group(2));
break;
case "source.timestamp":
parseSourceTimestamp(m.group(2));
this.sourceDateTime = new DateTimeAccuracy(LocalDateTime.ofInstant(sourceTimestamp, ZoneId.systemDefault()));
break;
case "etl.strategy":
setEtlStrategy(m.group(2));
break;
default:
throw new IllegalArgumentException("Unknown meta command in line "+lineNo+": "+line);
}
return;
}
m = specialConceptAssignment.matcher(line);
if( m.matches() ){
SpecialConcept s = SpecialConcept.byId(m.group(1));
if( s == null )throw new IllegalArgumentException("Illegal special concept in line "+lineNo+": " +m.group(1));
specialConcepts.put(m.group(2), s);
return;
}
throw new IllegalArgumentException("Invalid command in line "+lineNo+": "+line);
}
private void specialFields(SpecialConcept special, Record record){
// create temporary observation
// which is only used to fill the special concepts
DateTimeAccuracy ts;
if( record.getStartDate() == null ){
ts = sourceDateTime;
}else{
ts = DateTimeAccuracy.parsePartialIso8601(record.getStartDate());
}
Observation tmp = factory.createObservation(record.getPatID(), record.getConcept(), ts);
tmp.setEncounterId(record.getVisitID());
tmp.setSourceId(sourceId);
tmp.setSourceTimestamp(sourceTimestamp);
switch( special ){
case PatientBirthDate:
tmp.getExtension(Patient.class).setBirthDate(DateTimeAccuracy.parsePartialIso8601(record.getValue()));
break;
case PatientDeathDate:
tmp.getExtension(Patient.class).setDeathDate(DateTimeAccuracy.parsePartialIso8601(record.getValue()));
break;
case PatientSex:
Patient.Sex sex;
switch( Character.toUpperCase(record.getValue().charAt(0)) ){
case 'F':
case 'W':
sex = Sex.Female;
break;
case 'M':
sex = Sex.Male;
break;
default:
sex = null;
}
tmp.getExtension(Patient.class).setSex(sex);
break;
case PatientNames:
// TODO
break;
case PatientSurname:
// TODO
break;
case Visit:
Visit visit = tmp.getExtension(Visit.class);
visit.setStartTime(tmp.getStartTime());
visit.setEndTime(tmp.getEndTime());
break;
default:
break;
}
}
private Value parseValue(Record record){
if( record.getType() == null )return AbstractValue.NONE;
Value value;
switch( record.getType() ){
case "dat": // date
case "str": // string
value = new StringValue(record.getValue());
break;
case "int":
case "dec":
value = new NumericValue(new BigDecimal(record.getValue()), record.getUnits());
break;
default:
// throw error
case "nil":
value = AbstractValue.NONE;
}
return value;
}
private void newObservation(Record record){
DateTimeAccuracy ts;
if( record.getStartDate() == null ){
// first use source timestamp
ts = sourceDateTime;
// later update to visit timestamp
}else{
ts = DateTimeAccuracy.parsePartialIso8601(record.getStartDate());
}
fact = factory.createObservation(record.getPatID(), record.getConcept(), ts);
if( ts == sourceDateTime ){
// try to use visit timestamp
ts = fact.getExtension(Visit.class).getStartTime();
if( ts != null )fact.setStartTime(ts);
}
// set other fields
fact.setEncounterId(record.getVisitID());
fact.setSourceId(sourceId);
fact.setSourceTimestamp(sourceTimestamp);
fact.setValue( parseValue(record) );
// TODO set remaining fields
record.getEndDate();
record.getLocation();
record.getProvider();
record.getFlags();
}
private void appendModifier(Record record){
// TODO compare patid, encounter, timestamp
// parse and add value
Value value = parseValue(record);
fact.addModifier(record.getConcept()).setValue(value);
}
@Override
public boolean tryAdvance(Consumer<? super Observation> action) {
String line;
boolean inGroup = false;
do{
try {
line = reader.readLine();
lineNo ++;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
if( line == null ){
// end of stream
return false;
}else if( line.length() == 0 ){
// empty line
// continue;
}else if( line.charAt(0) == '#' ){
// comment or command
if( line.length() > 1 && line.charAt(1) == '@' ){
// command
if( line.equals("#@group(start)") ){
inGroup = true;
}else if( line.equals("#@group(end)") ){
inGroup = false;
// resulting observation in 'fact'
break;
}else{
parseCommand(line);
}
// continue;
}else{
// comment, ignore line
// continue;
}
}else{
// parse observation
Record fields = new Record(fieldSeparator.split(line, maxFields));
// fields: 0 patid, 1 encounter, 2 concept, 3: type, 4: value, 5: starttime,
// handle special concepts (defined by previous commands)
SpecialConcept special = specialConcepts.get(fields.getConcept());
if( special != null ){
specialFields(special, fields);
// continue;
}else if( inGroup ){
// first item is fact, following items are modifiers
if( fact == null ){
newObservation(fields);
}else{
appendModifier(fields);
}
// continue;
// group ends with #@group(end)
}else{
// assert( fact == null )
newObservation(fields);
break;
}
}
}while( true );
action.accept(fact);
fact = null;
return true;
}
@Override
public Spliterator<Observation> trySplit() {
// TODO Auto-generated method stub
return null;
}
@Override
public long estimateSize() {
// TODO estimate by file size
return Long.MAX_VALUE;
}
@Override
public int characteristics() {
return Spliterator.IMMUTABLE | Spliterator.NONNULL;
}
}
package de.sekmi.histream.io;
public interface FlatObservationWriter {
void writeMeta(String meta, String value);
void writeConceptMap(String concept, String map);
void beginGroup();
void endGroup();
void writeObservation(String[] fields);
}
package de.sekmi.histream.io;
import java.io.InputStream;
import java.util.stream.Stream;
import de.sekmi.histream.Observation;
import de.sekmi.histream.ObservationParser;
public class Streams {
Stream<Observation> forParser(ObservationParser parser, InputStream input){
return null;
}
}
package de.sekmi.histream.io;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;
import de.sekmi.histream.AbnormalFlag;
import de.sekmi.histream.DateTimeAccuracy;
import de.sekmi.histream.Modifier;
import de.sekmi.histream.Observation;
import de.sekmi.histream.ObservationFactory;
import de.sekmi.histream.Value;
import de.sekmi.histream.Value.Type;
import de.sekmi.histream.ext.Patient;
import de.sekmi.histream.ext.Visit;
import de.sekmi.histream.ext.Patient.Sex;
import de.sekmi.histream.impl.AbstractValue;
import de.sekmi.histream.impl.NumericValue;
import de.sekmi.histream.impl.StringValue;
/**
* Parser for EAV XML documents. This class is used by both the {@link SAXObservationProvider}
* and {@link XMLObservationSpliterator}.
*
* @author marap1
*
*/
class XMLObservationParser extends AbstractObservationParser{
private static final Logger log = Logger.getLogger(XMLObservationParser.class.getName());
public static interface AttributeAccessor{
String getValue(String name);
}
// provider
protected String providerId;
protected String providerName;
// visit
protected DateTimeAccuracy encounterStart;
protected DateTimeAccuracy encounterEnd;
protected String patientId;
protected Map<String,String> visitData;
/**
* Last added modifier to observation
*/
protected Modifier modifier;
protected Observation fact;
// value attributes
protected Value.Type factType;
protected String valueUnit;
protected AbnormalFlag valueFlag;
protected Value.Operator valueOp;
protected Visit visit;
protected Patient patient;
protected XMLObservationParser(ObservationFactory factory){
super(factory);
visitData = new HashMap<>();
//factory.getExtensionAccessor(Patient.class);
// TODO: assert that the supportedExtensions are available from the factory
}
protected void parseValueType(String type){
if( type == null ){
factType = Type.None;
}else switch( type ){
case "xsi:string":
factType = Type.Text;
break;
case "xsi:decimal":
case "xsi:integer":
factType = Type.Numeric;
break;
default:
// TODO: log error
}
}
protected void newObservation(AttributeAccessor atts){
// determine start time
DateTimeAccuracy start;
String ts = atts.getValue("start");
if( ts != null ) // use specified time
start = DateTimeAccuracy.parsePartialIso8601(ts);
else // use time from encounter
start = encounterStart;
fact = factory.createObservation(patientId, atts.getValue("concept"), start);
// set encounter id to visitData.get("encounter")
fact.setEncounterId(visitData.get("encounter"));
// set source information
fact.setSourceId(sourceId);
fact.setSourceTimestamp(sourceTimestamp);
if( patient == null ){
// this is the first observation
// create/sync patient object
patient = fact.getExtension(Patient.class);
// TODO: set patient data
DateTimeAccuracy birthDate = null, deathDate = null;
if( visitData.containsKey("birthdate") ){
birthDate = DateTimeAccuracy.parsePartialIso8601(visitData.get("birthdate"));
}
if( visitData.containsKey("deathdate") ){
deathDate = DateTimeAccuracy.parsePartialIso8601(visitData.get("deathdate"));
}
Sex sex = null;
if( visitData.containsKey("sex") ){
switch( visitData.get("sex").charAt(0) ){
case 'F':
sex = Sex.Female;
break;
case 'M':
sex = Sex.Male;
break;
default:
// warning
log.warning("Unable to recognize patient sex '"+visitData.get("sex")+"': F or M expected");
}
}
// sync data
boolean overwrite = false;
if( patient.getSourceTimestamp() == null ){
// patient did not exist in cache,
// set source/timestamp to this file
patient.setSourceTimestamp(sourceTimestamp);
// TODO: set source id
overwrite = true;
}else if( sourceTimestamp.isAfter(patient.getSourceTimestamp()) ){
// patient already existing, but our information is more recent
// overwrite existing data
overwrite = true;
}
if( overwrite ){
patient.setBirthDate(birthDate);
patient.setDeathDate(deathDate);
patient.setSex(sex);
// TODO set name, surname, etc.
}
}else{
fact.setExtension(Patient.class, patient);
}
if( visit == null ){
// this is the first observation for this visit
// create/sync visit object
visit = fact.getExtension(Visit.class);
visit.setStartTime(encounterStart);
visit.setEndTime(encounterEnd);
// visit.setStatus(status.);
// TODO: set visit data
// notify handlers
/*
if( beforeFacts != null ){
beforeFacts.accept(visit);
}*/
}else{
fact.setExtension(Visit.class, visit);
}
// use end time, if specified
ts = atts.getValue("end");
if( ts != null ){