Commit 0fd0cf75 authored by R.W.Majeed's avatar R.W.Majeed

Readahead verifies header for flat imports

parent d835be1f
......@@ -10,6 +10,7 @@ import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Hashtable;
import java.util.Map;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
......@@ -30,17 +31,29 @@ import de.sekmi.histream.impl.StringValue;
*
*/
public class FlatObservationProvider extends AbstractObservationParser implements FileObservationProvider{
private static final Logger log = Logger.getLogger(FlatObservationProvider.class.getName());
/**
* Minimum headers required in first line. Additional columns to the right are ignored (warning)
*/
public static final String[] minHeaders = new String[]{"patid","encounter","concept","type","value","units","start","end","provider","location","flag"};
/**
* Maximum number of fields (used to split fields)
*/
private static final int maxFields = minHeaders.length+1;
private BufferedReader reader;
private Pattern fieldSeparator;
private String fieldSeparator;
private String commentPrefix;
private String commandPrefix;
private String commandGroupStart;
private String commandGroupEnd;
private Pattern fieldSeparatorPattern;
private Pattern metaAssignment;
private Pattern specialConceptAssignment;
private long lineNo;
private static final int maxFields = 11;
//static private Class<?>[] supportedExtensions = new Class<?>[]{Patient.class,Visit.class};
private Map<String, SpecialConcept> specialConcepts;
private Map<String, String> metaInfo;
private Observation fact;
......@@ -98,29 +111,60 @@ public class FlatObservationProvider extends AbstractObservationParser implement
public FlatObservationProvider(ObservationFactory factory, BufferedReader reader) throws IOException{
setObservationFactory(factory);
this.reader = reader;
this.fieldSeparator = Pattern.compile("\\t");
this.fieldSeparator = "\t";
this.fieldSeparatorPattern = Pattern.compile(Pattern.quote(fieldSeparator));
this.metaAssignment = Pattern.compile("^#@meta\\(([a-z\\.]+)\\)=(.*)$");
this.specialConceptAssignment = Pattern.compile("^#@concept\\(([a-z\\.]+)\\)=(.*)$");
specialConcepts = new Hashtable<>();
metaInfo = new Hashtable<>();
fact = null;
lineNo = 0;
// TODO load from configuration
commentPrefix = "#";
commandPrefix = "#@";
commandGroupStart = "#@group(start)";
commandGroupEnd = "#@group(end)";
verifyPrefixes();
// read meta info
readMeta();
}
private void verifyPrefixes()throws IOException{
if( !commandPrefix.startsWith(commentPrefix) )throw new IOException("commandPrefix must start with commentPrefix");
if( !commandGroupStart.startsWith(commandPrefix) || !commandGroupEnd.startsWith(commandPrefix))throw new IOException("groupStart and groupEnd must start with commandPrefix");
}
private void readMeta() throws IOException{
// verify headers in first line
String headers = reader.readLine();
String expected = String.join(fieldSeparator, minHeaders);
if( !headers.startsWith(expected) ){
throw new IOException("Header in first line must start with: "+expected);
}else if( !headers.equals(expected) ){
log.warning("Additional columns are ignored: "+headers.substring(expected.length()+1));
}
// parse meta commands/comments
do{
prefetchLine = reader.readLine();
Matcher m = metaAssignment.matcher(prefetchLine);
if( m.matches() ){
// meta
setMeta(m.group(1), m.group(2));
if( prefetchLine == null ){
// end of file
prefetchLine = null;
break;
}else if( prefetchLine.startsWith(commandPrefix) ){
if( prefetchLine.startsWith(commandGroupStart) || prefetchLine.startsWith(commandGroupEnd) ){
// group start/end
break;
}
parseCommand(prefetchLine);
// continue
}else if( prefetchLine.length() == 0 || prefetchLine.startsWith(commentPrefix) ){
// ignore comment or empty lines
// continue
}else{
break; // no more meta information
// content. stop prefetching row
break;
}
}while( true );
......@@ -286,27 +330,23 @@ public class FlatObservationProvider extends AbstractObservationParser implement
}else if( line.length() == 0 ){
// empty line
// continue;
}else if( line.charAt(0) == '#' ){
// comment or command
if( line.length() > 1 && line.charAt(1) == '@' ){
// command
if( line.equals("#@group(start)") ){
inGroup = true;
}else if( line.equals("#@group(end)") ){
inGroup = false;
// resulting observation in 'fact'
break;
}else{
parseCommand(line);
}
// continue;
}else{
// comment, ignore line
// continue;
}else if( line.startsWith(commandPrefix) ){
// command
if( line.equals("#@group(start)") ){
inGroup = true;
}else if( line.equals("#@group(end)") ){
inGroup = false;
// resulting observation in 'fact'
break;
}else{
parseCommand(line);
}
}else if( line.startsWith(commentPrefix) ){
// comment, ignore line
// continue;
}else{
// parse observation
Record fields = new Record(fieldSeparator.split(line, maxFields));
Record fields = new Record(fieldSeparatorPattern.split(line, maxFields));
// fields: 0 patid, 1 encounter, 2 concept, 3: type, 4: value, 5: starttime,
// handle special concepts (defined by previous commands)
......@@ -334,10 +374,4 @@ public class FlatObservationProvider extends AbstractObservationParser implement
fact = null; // clear local copy
return ret;
}
@Override
public String getMeta(String key) {
return metaInfo.get(key);
}
}
......@@ -98,12 +98,14 @@ public class RunConfiguration implements Closeable{
// TODO set error handlers for destinations
// if listeners specified, run as server (don't exit)
args = new String[]{"src/test/resources/dwh-eav.xml"};
if( args.length > 0 ){
for( int i=0; i<args.length; i++ ){
File file = new File(args[i]);
FileObservationProvider p = rc.providerForFile(file);
if( p != null ){
System.out.println("ETL Strategy: "+p.getMeta("etl.strategy"));
rc.processFile(p);
}else{
System.err.println("Unable to find parser for file "+file);
......
patid encounter concept type value units start end provider location flag
#@meta(source.id)=test
#@meta(source.timestamp)=2015-04-21T08:58:00
#@meta(etl.strategy)=replacesource
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment