Commit c420b26f authored by R.W.Majeed's avatar R.W.Majeed

SAXObservationProvider removed

parent 166d68f9
package de.sekmi.histream.io;
import java.io.IOException;
import java.io.InputStream;
import java.nio.BufferOverflowException;
import java.nio.CharBuffer;
import java.util.function.Consumer;
import org.xml.sax.Attributes;
import org.xml.sax.ContentHandler;
import org.xml.sax.InputSource;
import org.xml.sax.Locator;
import org.xml.sax.SAXException;
import org.xml.sax.XMLReader;
import org.xml.sax.helpers.XMLReaderFactory;
import de.sekmi.histream.Observation;
import de.sekmi.histream.ObservationFactory;
import de.sekmi.histream.ObservationParser;
import de.sekmi.histream.ObservationProvider;
import de.sekmi.histream.ext.Patient;
import de.sekmi.histream.ext.Visit;
import de.sekmi.histream.impl.AbstractValue;
/**
* Sax2 content handler which reads an xml eav representation
* and provides a stream of observations as the xml is processed.
* <p>
* Very large files can be processed, as the observations are processed sequentially when
* they are read.
*
* @author Raphael
*
*/
public class SAXObservationProvider extends XMLObservationParser implements ContentHandler, ObservationProvider, ObservationParser{
//private static final Logger log = Logger.getLogger(SAXObservationProvider.class.getName());
static private Class<?>[] supportedExtensions = new Class<?>[]{Patient.class,Visit.class};
// TODO: also support Concept
//private Consumer<Visit> beforeFacts;
private Consumer<Observation> handler;
private enum Section { Root, Meta, Visit, Data };
private Section section;
private CharBuffer buffer;
/*// no need for fast access, reading from file is slow anyways
private ExtensionAccessor<Visit> visit;
private ExtensionAccessor<Patient> patient;
*/
public SAXObservationProvider() {
buffer = CharBuffer.allocate(1024);
}
@Override
public void characters(char[] ch, int start, int length)throws SAXException {
try{
buffer.put(ch, start, length);
}catch( BufferOverflowException e ){
throw new SAXException(e);
}
}
@Override
public void endDocument() throws SAXException {
}
@Override
public void endElement(String uri, String localName, String qName)throws SAXException {
buffer.flip();
switch( section ){
case Data:
if( qName.equals("eav-item") || qName.equals("eav-group") ){
// process observation
if( qName.equals("eav-item") ){
fact.setValue(parseValue(buffer.toString()));
}
provideObservation(fact);
fact = null;
}else if( qName.equals("value") ){
// modifier value parsed
modifier.setValue(parseValue(buffer.toString()));
}
case Meta:
if( qName.equals("meta") ){
section = Section.Root;
}else if( qName.equals("enum") ){
// TODO: create enum concept and store in hashtable
}
break;
case Visit:
// all other fields are stored with the corresponding element name
visitData.put(qName, buffer.toString());
break;
case Root:
// nothing
break;
}
buffer.clear();
}
@Override
public void endPrefixMapping(String prefix) throws SAXException {}
@Override
public void ignorableWhitespace(char[] ch, int start, int length)throws SAXException {}
@Override
public void processingInstruction(String target, String data)throws SAXException {}
@Override
public void setDocumentLocator(Locator locator) {}
@Override
public void skippedEntity(String name) throws SAXException {}
@Override
public void startDocument() throws SAXException {
section = Section.Root;
visitData.clear();
}
@Override
public void startElement(String uri, String localName, String qName, Attributes attributes) throws SAXException {
AttributeAccessor atts = new AttributeAccessor(){
@Override
public String getValue(String name) {
return attributes.getValue(name);
}
};
buffer.clear();
switch( section ){
case Data:
if( qName.equals("eav-item") ){
newObservation(atts);
parseValueAttributes(atts);
}else if( qName.equals("eav-group") ){
newObservation(atts);
fact.setValue(AbstractValue.NONE);
}else if( qName.equals("value") ){
modifier = fact.addModifier(atts.getValue("modifier"));
parseValueAttributes(atts);
}
break;
case Meta:
if( qName.equals("source") ){
parseSource(atts);
}
break;
case Visit:
if( qName.equals("encounter") ){
parseEncounter(atts);
}else if( qName.equals("facts") ){
// visit section is complete
// patient and visit objects are created, once the first fact is complete
// data section begins
section = Section.Data;
}
break;
case Root:
if( qName.equals("meta") ){
section = Section.Meta;
}else if( qName.equals("visit") ){
section = Section.Visit;
}
break;
default:
break;
}
}
@Override
public void startPrefixMapping(String prefix, String uri)
throws SAXException {}
@Override
public Class<?>[] getSupportedExtensions() {
return supportedExtensions;
}
@Override
public void parse(InputStream input) throws IOException{
try {
XMLReader reader = XMLReaderFactory.createXMLReader();
reader.setContentHandler(this);
reader.parse(new InputSource(input));
} catch (SAXException e) {
throw new IOException(e);
}
}
@Override
public void setHandler(Consumer<Observation> handler) {
this.handler = handler;
}
private void provideObservation(Observation observation){
handler.accept(observation);
}
@Override
public void setObservationFactory(ObservationFactory factory) {
this.factory = factory;
}
}
package de.sekmi.histream.impl;
import java.util.function.Consumer;
import de.sekmi.histream.Observation;
/**
* Calls a distinct extension consumer for each distinct extension found in successive observations
* @author Raphael
*
* @param <T>
*/
public class DistinctExtensionFilter<T> implements Consumer<Observation>{
private Class<T> distinctClass;
private Object prev;
private Consumer<T> distinctConsumer;
private Consumer<Observation> factConsumer;
public DistinctExtensionFilter(Consumer<Observation> factConsumer, Class<T> visitClass, Consumer<T> distinctConsumer){
this.distinctClass = visitClass;
this.factConsumer = factConsumer;
this.distinctConsumer = distinctConsumer;
}
@Override
public void accept(Observation t) {
T cur = t.getExtension(distinctClass);
if( cur != prev ){
distinctConsumer.accept(cur);
prev = cur;
}
factConsumer.accept(t);
}
}
package de.sekmi.histream.impl;
import java.io.FileInputStream;
import java.io.FileReader;
import java.time.temporal.ChronoField;
import java.time.temporal.ChronoUnit;
import java.util.Enumeration;
......@@ -11,16 +10,12 @@ import java.math.BigDecimal;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.xml.sax.InputSource;
import org.xml.sax.XMLReader;
import org.xml.sax.helpers.XMLReaderFactory;
import de.sekmi.histream.Observation;
import de.sekmi.histream.Value;
import de.sekmi.histream.Modifier;
import de.sekmi.histream.io.AbstractObservationParser;
import de.sekmi.histream.io.FlatObservationProvider;
import de.sekmi.histream.io.SAXObservationProvider;
import de.sekmi.histream.io.XMLObservationProvider;
......@@ -112,18 +107,6 @@ public class FileObservationProviderTest {
});
}
@Test
public void testSAXReader() throws Exception{
XMLReader reader = XMLReaderFactory.createXMLReader();
SAXObservationProvider provider = new SAXObservationProvider();
provider.setObservationFactory(factory);
provider.setHandler(handler);
reader.setContentHandler(provider);
reader.parse(new InputSource(new FileReader("src/test/resources/dwh-eav.xml")));
handler.finish();
}
@Test
public void testStAXReader() throws Exception {
XMLObservationProvider xos = new XMLObservationProvider(factory, new FileInputStream("src/test/resources/dwh-eav.xml"));
......
package de.sekmi.histream.impl;
import java.io.FileInputStream;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import de.sekmi.histream.i2b2.I2b2Inserter;
import de.sekmi.histream.i2b2.I2b2Visit;
import de.sekmi.histream.io.SAXObservationProvider;
public class TestInsertXmlToI2b2 {
TestPostgresVisitStore visitStore;
TestPostgresPatientStore patientStore;
private static final String postgresHost = "localhost";
private static final int postgresPort = 15432;
private void load()throws Exception{
visitStore = new TestPostgresVisitStore();
visitStore.open(postgresHost, postgresPort);
//visitStore.getStore().deleteWhereSourceId("test");
patientStore = new TestPostgresPatientStore();
patientStore.open(postgresHost, postgresPort);
//patientStore.getStore().deleteWhereSourceId("test");
ObservationFactoryImpl factory = new ObservationFactoryImpl();
factory.registerExtension(patientStore.getStore());
factory.registerExtension(visitStore.getStore());
SAXObservationProvider provider = new SAXObservationProvider();
provider.setObservationFactory(factory);
Map<String,String> props = new HashMap<>();
props.put("user", "i2b2demodata");
props.put("host", "localhost");
props.put("database", "i2b2");
props.put("port", "15432");
props.put("password", "");
props.put("nullProvider", "LCS-I2B2:PROVIDERS");
I2b2Inserter inserter = new I2b2Inserter(props);
// delete data
//inserter.purgeSource("test");
// load instance_num presets
visitStore.getStore().loadMaxInstanceNums();
// find distinct visits and delete each before inserting
provider.setHandler(new DistinctExtensionFilter<I2b2Visit>(inserter, I2b2Visit.class, v -> {
try{
inserter.purgeVisit(((I2b2Visit)v).getNum());
}catch( SQLException e ){
System.err.println("Unable to delete facts for visit: "+v);
}
} ));
provider.parse(new FileInputStream("src/test/resources/dwh-eav.xml"));
inserter.close();
visitStore.close();
patientStore.close();
}
public static void main(String args[]) throws Exception{
TestInsertXmlToI2b2 t = new TestInsertXmlToI2b2();
t.load();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment