Commit 19c89432 authored by R.W.Majeed's avatar R.W.Majeed
Browse files

refactoring

parent e198b503
package de.sekmi.histream;
/*
* #%L
* histream
* %%
* Copyright (C) 2013 - 2015 R.W.Majeed
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import java.io.IOException;
import java.io.InputStream;
/**
* Observation provider parsing input streams to provide observations.
*
* @author marap1
*
*/
public interface ObservationParser extends ObservationProvider {
/**
* Parse an input stream to produce observations
* @param input input stream
* @throws IOException for io errors occurred during parsing
*/
public void parse(InputStream input) throws IOException;
}
package de.sekmi.histream.io;
package de.sekmi.histream;
/*
* #%L
......@@ -23,10 +23,10 @@ package de.sekmi.histream.io;
import java.util.function.Supplier;
import de.sekmi.histream.Observation;
/**
* Converts a file into a supply of observations.
* Supplier of observations.
*
* Also provides meta informaiton via {@link #getMeta(String)}
* <p>
* When an instance is constructed, meta information should be read from
* the file (e.g. etl strategy and other instructions)
......@@ -36,7 +36,7 @@ import de.sekmi.histream.Observation;
* @author Raphael
*
*/
public interface FileObservationProvider extends Supplier<Observation>{
public interface ObservationSupplier extends Supplier<Observation>{
/**
* Retrieve meta information for this supply of observations.
......
......@@ -39,19 +39,19 @@ import de.sekmi.histream.Extension;
import de.sekmi.histream.Observation;
import de.sekmi.histream.ObservationFactory;
import de.sekmi.histream.ObservationHandler;
import de.sekmi.histream.ObservationSupplier;
import de.sekmi.histream.Plugin;
import de.sekmi.histream.conf.Configuration;
import de.sekmi.histream.conf.PluginConfig;
import de.sekmi.histream.conf.PluginRef;
import de.sekmi.histream.io.AbstractObservationParser;
import de.sekmi.histream.io.FileObservationProvider;
import de.sekmi.histream.io.FileObservationProviderFactory;
import de.sekmi.histream.io.FileObservationSupplierFactory;
public class RunConfiguration implements Closeable{
private static final Logger log = Logger.getLogger(RunConfiguration.class.getName());
private ObservationFactory factory;
private Plugin[] plugins;
private FileObservationProviderFactory[] fileFactories;
private FileObservationSupplierFactory[] fileFactories;
private Consumer<Observation> destinationChain;
private ObservationHandler[] destinationHandlers;
......@@ -59,7 +59,7 @@ public class RunConfiguration implements Closeable{
factory = new ObservationFactoryImpl();
plugins = conf.createPluginInstances();
List<FileObservationProviderFactory> ffs = new ArrayList<>();
List<FileObservationSupplierFactory> ffs = new ArrayList<>();
for(int i=0; i<plugins.length; i++ ){
// register plugins
......@@ -70,14 +70,14 @@ public class RunConfiguration implements Closeable{
log.info("Observation extension added: "+plugins[i]);
}
if( plugins[i] instanceof FileObservationProviderFactory ){
if( plugins[i] instanceof FileObservationSupplierFactory ){
// used to process files
ffs.add((FileObservationProviderFactory)plugins[i]);
ffs.add((FileObservationSupplierFactory)plugins[i]);
}
}
if( ffs.size() > 0 ){
fileFactories = ffs.toArray(new FileObservationProviderFactory[ffs.size()]);
fileFactories = ffs.toArray(new FileObservationSupplierFactory[ffs.size()]);
}
// build destination chain
......@@ -110,15 +110,15 @@ public class RunConfiguration implements Closeable{
}
public void processFile(FileObservationProvider provider){
public void processFile(ObservationSupplier provider){
for( ObservationHandler h : destinationHandlers ){
h.setMeta("etl.strategy", provider.getMeta("etl.strategy"));
}
AbstractObservationParser.nonNullStream(provider).forEach(destinationChain);
}
public FileObservationProvider providerForFile(File file){
FileObservationProvider p = null;
public ObservationSupplier providerForFile(File file){
ObservationSupplier p = null;
for( int i=0; i<fileFactories.length; i++ ){
try {
p = fileFactories[i].forFile(file, factory);
......@@ -174,7 +174,7 @@ public class RunConfiguration implements Closeable{
if( files.length > 0 ){
for( int i=0; i<files.length; i++ ){
File file = new File(files[i]);
FileObservationProvider p = rc.providerForFile(file);
ObservationSupplier p = rc.providerForFile(file);
if( p != null ){
System.out.println("ETL("+p.getMeta("etl.strategy")+"): "+file);
rc.processFile(p);
......
......@@ -25,7 +25,8 @@ import java.io.File;
import java.io.IOException;
import de.sekmi.histream.ObservationFactory;
import de.sekmi.histream.ObservationSupplier;
public interface FileObservationProviderFactory {
FileObservationProvider forFile(File file, ObservationFactory factory)throws IOException;
public interface FileObservationSupplierFactory {
ObservationSupplier forFile(File file, ObservationFactory factory)throws IOException;
}
......@@ -38,6 +38,7 @@ import java.util.regex.Pattern;
import de.sekmi.histream.DateTimeAccuracy;
import de.sekmi.histream.Observation;
import de.sekmi.histream.ObservationFactory;
import de.sekmi.histream.ObservationSupplier;
import de.sekmi.histream.Value;
import de.sekmi.histream.ext.Patient;
import de.sekmi.histream.ext.Patient.Sex;
......@@ -67,8 +68,8 @@ import de.sekmi.histream.impl.StringValue;
* @author Raphael
*
*/
public class FlatObservationProvider extends AbstractObservationParser implements FileObservationProvider{
private static final Logger log = Logger.getLogger(FlatObservationProvider.class.getName());
public class FlatObservationSupplier extends AbstractObservationParser implements ObservationSupplier{
private static final Logger log = Logger.getLogger(FlatObservationSupplier.class.getName());
/**
* Minimum headers required in first line. Additional columns to the right are ignored (warning)
*/
......@@ -145,7 +146,7 @@ public class FlatObservationProvider extends AbstractObservationParser implement
public String getFlags(){return fields[10];}
}
public FlatObservationProvider(ObservationFactory factory, BufferedReader reader) throws IOException{
public FlatObservationSupplier(ObservationFactory factory, BufferedReader reader) throws IOException{
setObservationFactory(factory);
this.reader = reader;
this.fieldSeparator = "\t";
......@@ -207,7 +208,7 @@ public class FlatObservationProvider extends AbstractObservationParser implement
}
public FlatObservationProvider(ObservationFactory factory, InputStream input) throws IOException{
public FlatObservationSupplier(ObservationFactory factory, InputStream input) throws IOException{
this(factory, new BufferedReader(new InputStreamReader(input)));
}
......
......@@ -27,9 +27,10 @@ import java.io.IOException;
import java.util.Map;
import de.sekmi.histream.ObservationFactory;
import de.sekmi.histream.ObservationSupplier;
import de.sekmi.histream.Plugin;
public class FlatProviderFactory implements FileObservationProviderFactory, Plugin{
public class FlatProviderFactory implements FileObservationSupplierFactory, Plugin{
public FlatProviderFactory(Map<String,String> props) {
// no configuration needed
......@@ -40,8 +41,8 @@ public class FlatProviderFactory implements FileObservationProviderFactory, Plug
}
@Override
public FileObservationProvider forFile(File file, ObservationFactory factory) throws IOException {
return new FlatObservationProvider(factory, new FileInputStream(file));
public ObservationSupplier forFile(File file, ObservationFactory factory) throws IOException {
return new FlatObservationSupplier(factory, new FileInputStream(file));
}
}
package de.sekmi.histream.io;
/*
* #%L
* histream
* %%
* Copyright (C) 2013 - 2015 R.W.Majeed
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import java.util.Comparator;
import java.util.Iterator;
import java.util.Optional;
import java.util.Spliterator;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.ToDoubleFunction;
import java.util.function.ToIntFunction;
import java.util.function.ToLongFunction;
import java.util.stream.Collector;
import java.util.stream.DoubleStream;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import de.sekmi.histream.Observation;
public class ObservationParserStream implements Stream<Observation>{
@Override
public Iterator<Observation> iterator() {
// TODO Auto-generated method stub
return null;
}
@Override
public Spliterator<Observation> spliterator() {
// TODO Auto-generated method stub
return null;
}
@Override
public boolean isParallel() {
return false;
}
@Override
public Stream<Observation> sequential() {
return this;
}
@Override
public Stream<Observation> parallel() {
// TODO Auto-generated method stub
return null;
}
@Override
public Stream<Observation> unordered() {
return this;
}
@Override
public Stream<Observation> onClose(Runnable closeHandler) {
// TODO Auto-generated method stub
return null;
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public Stream<Observation> filter(Predicate<? super Observation> predicate) {
// TODO Auto-generated method stub
return null;
}
@Override
public <R> Stream<R> map(Function<? super Observation, ? extends R> mapper) {
// TODO Auto-generated method stub
return null;
}
@Override
public IntStream mapToInt(ToIntFunction<? super Observation> mapper) {
// TODO Auto-generated method stub
return null;
}
@Override
public LongStream mapToLong(ToLongFunction<? super Observation> mapper) {
// TODO Auto-generated method stub
return null;
}
@Override
public DoubleStream mapToDouble(ToDoubleFunction<? super Observation> mapper) {
// TODO Auto-generated method stub
return null;
}
@Override
public <R> Stream<R> flatMap(
Function<? super Observation, ? extends Stream<? extends R>> mapper) {
// TODO Auto-generated method stub
return null;
}
@Override
public IntStream flatMapToInt(
Function<? super Observation, ? extends IntStream> mapper) {
// TODO Auto-generated method stub
return null;
}
@Override
public LongStream flatMapToLong(
Function<? super Observation, ? extends LongStream> mapper) {
// TODO Auto-generated method stub
return null;
}
@Override
public DoubleStream flatMapToDouble(
Function<? super Observation, ? extends DoubleStream> mapper) {
// TODO Auto-generated method stub
return null;
}
@Override
public Stream<Observation> distinct() {
// TODO Auto-generated method stub
return null;
}
@Override
public Stream<Observation> sorted() {
// TODO Auto-generated method stub
return null;
}
@Override
public Stream<Observation> sorted(Comparator<? super Observation> comparator) {
// TODO Auto-generated method stub
return null;
}
@Override
public Stream<Observation> peek(Consumer<? super Observation> action) {
// TODO Auto-generated method stub
return null;
}
@Override
public Stream<Observation> limit(long maxSize) {
// TODO Auto-generated method stub
return null;
}
@Override
public Stream<Observation> skip(long n) {
// TODO Auto-generated method stub
return null;
}
@Override
public void forEach(Consumer<? super Observation> action) {
// TODO Auto-generated method stub
}
@Override
public void forEachOrdered(Consumer<? super Observation> action) {
// TODO Auto-generated method stub
}
@Override
public Object[] toArray() {
// TODO Auto-generated method stub
return null;
}
@Override
public <A> A[] toArray(IntFunction<A[]> generator) {
// TODO Auto-generated method stub
return null;
}
@Override
public Observation reduce(Observation identity,
BinaryOperator<Observation> accumulator) {
Observation result = identity;
/*for (T element : this stream)
result = accumulator.apply(result, element) */
return result;
}
@Override
public Optional<Observation> reduce(BinaryOperator<Observation> accumulator) {
// TODO Auto-generated method stub
return null;
}
@Override
public <U> U reduce(U identity,
BiFunction<U, ? super Observation, U> accumulator,
BinaryOperator<U> combiner) {
// TODO Auto-generated method stub
return null;
}
@Override
public <R> R collect(Supplier<R> supplier,
BiConsumer<R, ? super Observation> accumulator,
BiConsumer<R, R> combiner) {
// TODO Auto-generated method stub
return null;
}
@Override
public <R, A> R collect(Collector<? super Observation, A, R> collector) {
// TODO Auto-generated method stub
return null;
}
@Override
public Optional<Observation> min(Comparator<? super Observation> comparator) {
// TODO Auto-generated method stub
return null;
}
@Override
public Optional<Observation> max(Comparator<? super Observation> comparator) {
// TODO Auto-generated method stub
return null;
}
@Override
public long count() {
return mapToLong(e -> 1L).sum();
}
@Override
public boolean anyMatch(Predicate<? super Observation> predicate) {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean allMatch(Predicate<? super Observation> predicate) {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean noneMatch(Predicate<? super Observation> predicate) {
// TODO Auto-generated method stub
return false;
}
@Override
public Optional<Observation> findFirst() {
// TODO Auto-generated method stub
return null;
}
@Override
public Optional<Observation> findAny() {
// TODO Auto-generated method stub
return null;
}
}
......@@ -40,7 +40,7 @@ import de.sekmi.histream.impl.NumericValue;
import de.sekmi.histream.impl.StringValue;
/**
* Parser for EAV XML documents. This class is used by {@link XMLObservationProvider}.
* Parser for EAV XML documents. This class is used by {@link XMLObservationSupplier}.
*
* @author marap1
*
......
......@@ -38,17 +38,19 @@ import javax.xml.stream.XMLStreamReader;
import de.sekmi.histream.Observation;
import de.sekmi.histream.ObservationFactory;
import de.sekmi.histream.ObservationSupplier;
import de.sekmi.histream.impl.AbstractValue;
public class XMLObservationProvider extends XMLObservationParser implements FileObservationProvider{
public class XMLObservationSupplier extends XMLObservationParser implements ObservationSupplier{
//private static final String namespaceURI = "http://sekmi.de/histream/dwh-eav";
private XMLStreamReader reader;
private AttributeAccessor atts;
public XMLObservationProvider(ObservationFactory factory, XMLStreamReader reader) throws XMLStreamException {
public XMLObservationSupplier(ObservationFactory factory, XMLStreamReader reader) throws XMLStreamException {
setObservationFactory(factory);
this.reader = reader;
atts = new AttributeAccessor() {
......@@ -63,7 +65,7 @@ public class XMLObservationProvider extends XMLObservationParser implements File
readMeta();
readVisit();
}
public XMLObservationProvider(ObservationFactory factory, InputStream input) throws XMLStreamException, FactoryConfigurationError {
public XMLObservationSupplier(ObservationFactory factory, InputStream input) throws XMLStreamException, FactoryConfigurationError {
this(factory, XMLInputFactory.newInstance().createXMLStreamReader(input));
}
......
......@@ -30,9 +30,10 @@ import javax.xml.stream.FactoryConfigurationError;
import javax.xml.stream.XMLStreamException;
import de.sekmi.histream.ObservationFactory;
import de.sekmi.histream.ObservationSupplier;
import de.sekmi.histream.Plugin;
public class XMLProviderFactory implements FileObservationProviderFactory, Plugin{
public class XMLProviderFactory implements FileObservationSupplierFactory, Plugin{
public XMLProviderFactory(Map<String,String> props) {
// no configuration needed
......@@ -43,9 +44,9 @@ public class XMLProviderFactory implements FileObservationProviderFactory, Plugi
}
@Override
public FileObservationProvider forFile(File file, ObservationFactory factory) throws IOException {
public ObservationSupplier forFile(File file, ObservationFactory factory) throws IOException {
try {
return new XMLObservationProvider(factory, new FileInputStream(file));
return new XMLObservationSupplier(factory, new FileInputStream(file));
} catch (XMLStreamException | FactoryConfigurationError e) {
throw new IOException(e);
}
......
......@@ -33,11 +33,12 @@ import org.junit.Before;
import org.junit.Test;