Commit 8077e6b2 authored by Majeed's avatar Majeed

implementation of in-memory sorting/filtering of CSV files

parent 1c344772
......@@ -12,7 +12,7 @@ import java.time.Instant;
import com.opencsv.CSVParser;
import com.opencsv.CSVReader;
public class FileRowSupplier extends RowSupplier {
public class FileRowSupplier implements RowSupplier {
private CSVReader in;
private String[] headers;
private URL url;
......@@ -96,9 +96,33 @@ public class FileRowSupplier extends RowSupplier {
return timestamp;
}
/**
* Returns the line number of the record previously
* returned by {@link #get()}.
* @return previous record's line number
*/
public int getLineNo() {
if( lineNo == 0 ) {
throw new IllegalStateException("Line no requires call to get() first");
}
// returned line numbers start with 1, the variable starts with 0
// no need to subtract 1
return lineNo;
}
/**
* Retrieves the URL for the source file.
* @return source file's URL
*/
public URL getSourceURL() {
return url;
}
@Override
public String getLocation() {
return url.toString()+":"+lineNo;
return formatLocation(url, lineNo);
}
public static String formatLocation(URL url, int lineNo) {
return url.toString()+":"+lineNo;
}
}
package de.sekmi.histream.etl;
import java.io.IOException;
import java.net.URL;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
public class MemoryTable implements RowSupplier{
private Row[] rows;
private String[] headers;
private Instant timestamp;
private URL url;
/** index into rows to retrieve next by {@link #get()} */
private int pointer;
public MemoryTable(FileRowSupplier source) {
headers = source.getHeaders();
url = source.getSourceURL();
timestamp = source.getTimestamp();
//
// create array to contain all rows
ArrayList<Row> list = new ArrayList<>();
for( Object[] row=source.get(); row!=null; row=source.get()) {
list.add(new Row(row, source.getLineNo()));
}
rows = list.toArray(new Row[list.size()]);
// point to index 0, get() will retrieve first row
pointer = 0;
}
private class Row{
Object[] row;
int lineNo;
public Row(Object[] row, int lineNo) {
this.row = row;
this.lineNo = lineNo;
}
}
@Override
public String[] getHeaders() {
return headers;
}
private int[] columnPositions(String[] columns) {
final int[] pos = new int[columns.length];
// find positions of sort headers
for( int i=0; i<pos.length; i++ ) {
int j;
for( j=0; j<headers.length; j++ ) {
if( columns[i].equals(headers[j]) ) {
break;
}
}
if( j == headers.length ) {
throw new IllegalArgumentException("Sort header '"+columns[j]+"' not found in "+url);
}
pos[i] = j;
}
return pos;
}
/**
* Keep only one row per unique occurrence of the specified columns.
* Data must be sorted beforehand by the same columns.
* @param columns columns which should be unique per row
* @throws IllegalArgumentException column header not found
*/
public void unique(String[] columns) throws IllegalArgumentException, IllegalStateException{
final int[] pos = columnPositions(columns);
unique(pos);
}
/**
* Keep only one row per unique occurrence of the specified columns.
* Data must be sorted beforehand by the same columns.
* @param columns columns which should be unique per row
* @throws IllegalArgumentException column header not found
*/
public void unique(final int[] columns) throws IllegalArgumentException, IllegalStateException{
if( pointer != 0 ) {
throw new IllegalStateException("Method may not be used after retrieving rows");
}
// make sure we have data
if( rows.length < 1 ) {
return;
}
boolean[] keep = new boolean[rows.length];
keep[0] = true; // always keep first row
int keepCount = 1;
// determine which rows to keep
for( int i=1; i<rows.length; i++ ) {
int j;
for( j=0; j<columns.length; j++ ) {
// determine whether to keep row[i]
// compare to previous row
Object o1 = rows[i-1].row[columns[j]];
Object o2 = rows[i].row[columns[j]];
boolean valueEqual;
if( o1 == null ) {
if( o2 == null ) {
// both null -> same
valueEqual = true;
}else {
// different
valueEqual = false;
}
}else if( o2 == null ) {
// o1 not null (otherwise if case before) -> different
valueEqual = false;
}else {
valueEqual = o1.equals(o2);
}
if( valueEqual == false ) {
// stop comparing more columns, if one column is found different
break;
}
}
if( j == columns.length ) {
// all rows were equal, drop row
keep[i] = false;
}else {
// at least one column differs relative to previous row
keep[i] = true;
keepCount ++;
}
}
// update array, keep only marked
Row[] keepRows = new Row[keepCount];
int r = 0;
for( int i=0; i<rows.length; i++ ) {
if( keep[i] ) {
keepRows[r] = rows[i];
r ++;
}
}
this.rows = keepRows;
}
/**
* Sort the data table by the specified columns
* @param columns columns for sort order
* @throws IllegalArgumentException column header not found
* @throws IllegalStateException rows retrieved before sorting
*/
public void sort(String[] columns) throws IllegalArgumentException, IllegalStateException{
final int[] pos = columnPositions(columns);
sort(pos);
}
/**
* Sort the data table by the specified columns
* @param columns columns for sort order
* @throws IllegalStateException rows retrieved before sorting
*/
public void sort(int[] columns) throws IllegalStateException {
if( pointer != 0 ) {
throw new IllegalStateException("Method may not be used after retrieving rows");
}
Arrays.sort(rows, new Comparator<Row>() {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public int compare(Row r1, Row r2) {
int order=0;
for( int i=0; i< columns.length; i++ ) {
Object o1 = r1.row[columns[i]];
Object o2 = r2.row[columns[i]];
// sort nulls first
if( o1 == null ) {
order = (o2 == null)?0:-1;
}else if( o2 == null ) {
// o1 not null because that would be handled in the first if case
order = 1;
}else {
order = ((Comparable)o1).compareTo(o2);
}
// continue with next colum only, if fist column was equal
if( order != 0 ) {
break;
}
}
return order;
}
});
}
@Override
public Object[] get() {
if( pointer >= rows.length ) {
return null;
}
Object[] row = rows[pointer].row;
pointer ++;
return row;
}
@Override
public void close() throws IOException {
// nothing to do, data lives in memory
}
protected int getLineNumber() {
if( pointer == 0 ) {
throw new IllegalStateException("Line no requires call to get() first");
}
return rows[pointer-1].lineNo;
}
@Override
public String getLocation() {
return FileRowSupplier.formatLocation(url, getLineNumber());
}
@Override
public Instant getTimestamp() {
return timestamp;
}
public int getRowCount() {
return rows.length;
}
}
......@@ -4,11 +4,8 @@ import java.io.IOException;
import java.time.Instant;
import java.util.function.Supplier;
public abstract class RowSupplier implements Supplier<Object[]>, AutoCloseable{
public interface RowSupplier extends Supplier<Object[]>, AutoCloseable{
public RowSupplier(){
}
public abstract String[] getHeaders();
@Override
......
......@@ -51,15 +51,14 @@ public class CsvFile extends TableSource{
// @XmlElement
// char escape;
private CsvFile(){
protected CsvFile(){
}
public CsvFile(String urlSpec, String separator) throws MalformedURLException{
this();
this.url = urlSpec;
this.separator = separator;
}
@Override
public RowSupplier rows(Meta meta) throws IOException {
protected FileRowSupplier openRowSupplier(Meta meta) throws IOException {
// resolve url relative to base url from metadata
URL base = meta.getLocation();
URL source = (base == null)?new URL(url):new URL(base, url);
......@@ -71,7 +70,11 @@ public class CsvFile extends TableSource{
// if not defined, use system charset
charset = Charset.defaultCharset();
}
return new FileRowSupplier(source, separator, charset);
return new FileRowSupplier(source, separator, charset);
}
@Override
public RowSupplier rows(Meta meta) throws IOException {
return openRowSupplier(meta);
}
}
package de.sekmi.histream.etl.config;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.Charset;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlType;
import de.sekmi.histream.etl.FileRowSupplier;
import de.sekmi.histream.etl.MemoryTable;
import de.sekmi.histream.etl.RowSupplier;
/**
* Table source reading plain text tables.
* TODO implement escape sequences and quoting OR use opencsv dependency
*
* @author R.W.Majeed
*
*/
@XmlAccessorType(XmlAccessType.FIELD)
@XmlType(name="csv-filtered")
public class CsvFiltered extends CsvFile{
protected CsvFiltered(){
super();
}
public CsvFiltered(String urlSpec, String separator) throws MalformedURLException{
super(urlSpec,separator);
}
@Override
public RowSupplier rows(Meta meta) throws IOException {
FileRowSupplier rows = super.openRowSupplier(meta);
MemoryTable data = new MemoryTable(rows);
// TODO sort, filter unique, etc.
return data;
}
}
package de.sekmi.histream.etl;
import java.io.IOException;
import java.nio.charset.Charset;
import org.junit.Test;
import static org.junit.Assert.*;
public class TestMemoryTable {
@Test
public void verifyOriginalOrderWithoutModification() throws IOException {
FileRowSupplier visits = new FileRowSupplier(getClass().getResource("/data/p21khg/ICD.csv"), ";", Charset.forName("ASCII"));
MemoryTable mt = new MemoryTable(visits);
assertEquals(15, mt.getRowCount());
// verify that the table is previously unsorted
Object[] r = mt.get();
// first record on line 2 (first line were headers)
assertEquals(2, mt.getLineNumber());
assertEquals("KH-internes-Kennzeichen", mt.getHeaders()[3]);
// visit 1
assertEquals("1", r[3]);
// visit 2
assertEquals("2", mt.get()[3]);
mt.close();
}
@Test
public void verifySortSingleColumn() throws IOException {
FileRowSupplier visits = new FileRowSupplier(getClass().getResource("/data/p21khg/ICD.csv"), ";", Charset.forName("ASCII"));
MemoryTable mt = new MemoryTable(visits);
// sorting by single column
mt.sort(new int[] {3});
// now we should have two consecutive rows with visit 1
assertEquals("1", mt.get()[3]);
assertEquals("1", mt.get()[3]);
assertEquals("11", mt.get()[3]);
assertEquals("2", mt.get()[3]);
mt.close();
}
@Test
public void verifyUniqueSingleColumn() throws IOException {
FileRowSupplier visits = new FileRowSupplier(getClass().getResource("/data/p21khg/ICD.csv"), ";", Charset.forName("ASCII"));
MemoryTable mt = new MemoryTable(visits);
// sorting by single column
mt.sort(new int[] {3});
mt.unique(new int[] {3});
// now we should have only one row with visit 1
assertEquals("1", mt.get()[3]);
assertEquals("11", mt.get()[3]);
assertEquals("2", mt.get()[3]);
assertEquals("3", mt.get()[3]);
assertEquals("4", mt.get()[3]);
assertEquals("5", mt.get()[3]);
assertEquals("6", mt.get()[3]);
assertEquals("7", mt.get()[3]);
// in total without duplicates, there should be 11 rows
assertEquals(8, mt.getRowCount());
mt.close();
}
}
......@@ -6,9 +6,16 @@
</meta>
<patient-table>
<source xsi:type="csv-file">
<source xsi:type="csv-filtered">
<url>p21khg/_PAT.csv</url>
<separator>;</separator>
<convert
<sort-cols>
<col>Patid</col>
</sort-cols>
<unique-filter>
<col>Patid</col>
</unique-filter>
</source>
<idat>
<patient-id column="Patientennummer"/>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment