Commit 5102cc80 authored by R.W.Majeed's avatar R.W.Majeed
Browse files

Import code from previous HL7 project

parent 19c89432
package histream.hl7;
import java.nio.ByteBuffer;
/**
* A Decoder decodes a HL7v2 byte buffer into a {@link Message}.
* Per default behavior, only a single thread may use the Decoder at a given time.
* If multiple threads access the {@link #decode(ByteBuffer)} method, external synchronization
* must be ensured.
*
* TODO: write a DecoderFactory to instantiate decoders for parallelization.
*
* @author marap1
*
*/
public interface Decoder {
Message decode(ByteBuffer in);
}
package histream.hl7;
import java.nio.ByteBuffer;
public interface Encoder {
int encode(Message message, ByteBuffer out);
}
package histream.hl7;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import histream.io.MLLFileMessageHandler;
import histream.io.MLLPacketHandler;
import histream.io.ProcessedMessage;
public class HL7v2Decoder implements MLLPacketHandler, Closeable{
private Decoder decoder;
private MessageHandler handler;
private MLLFileMessageHandler dumpfile;
private Object dumplock;
public HL7v2Decoder(Decoder decoder, MessageHandler handler){
this.decoder = decoder;
this.handler = handler;
this.dumpfile = null;
this.dumplock = new Object();
}
/**
* Specify a file where all messages are dumped to.
* @param file Dump file
* @param gzip whether to apply gzip compression.
*/
public void setDumpfile(File file, boolean gzip)throws IOException{
synchronized( dumplock ){
if( dumpfile != null ){
dumpfile.close();
}
if( file == null ){
dumpfile = null;
}else{
dumpfile = gzip?MLLFileMessageHandler.GZippedHandler(file):new MLLFileMessageHandler(file);
//dumpfile.open();
}
}
}
@Override
public ProcessedMessage processMessage(ByteBuffer message){
if( dumpfile != null ){
synchronized( dumplock ){
try{
dumpfile.writeMessage(message);
}catch( IOException e ){
// TODO: write to log
// stop writing after IOException
dumpfile = null;
}
}
}
Message m;
synchronized( decoder ){
// TODO: allow parallelization: e.g. use resource manager w/ multiple decoders
m = decoder.decode(message);
}
// TODO: how to handle dropped messages? maybe set a flag 'dropped', instead of deleting all segments
return handler.processMessage(m);
}
public void open(){
}
@Override
public void close() {
if( dumpfile != null ){
dumpfile.close();
}
}
}
package histream.hl7;
import histream.io.AbstractProcessedMessage;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.CharBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.logging.Logger;
public class Message extends AbstractProcessedMessage{
static final Logger log = Logger.getLogger(Message.class.getName());
protected List<String[]> segments;
protected char[] encodingChars;
public static final char segmentTerminatorChar = '\r';
public Message(List<String[]> segments){
this.segments = segments;
loadEncodingChars();
}
protected void loadEncodingChars(){
if( segments == null || segments.size() == 0 )return;
this.encodingChars = segments.get(0)[2].toCharArray();
}
/**
* Splits a string into substrings. The provided
* separation character is used.
*
* @param str String to split
* @param sep Separation character
* @return Array containing the split strings.
*/
public static String[] split(String str, char sep){
ArrayList<String> comps = new ArrayList<String>();
int p=0,i;
while( (i = str.indexOf(sep,p)) != -1 ){
comps.add(str.substring(p, i));
p=i+1;
}
if( p < str.length() )comps.add(str.substring(p));
return comps.toArray(new String[comps.size()]);
}
public static String join(String[] strs, char glue){
if( strs.length == 0 )return "";
else if( strs.length == 1 )return strs[0];
StringBuilder b = new StringBuilder();
b.append(strs[0]);
for( int i=1; i<strs.length; i++ ){
b.append(glue);
b.append(strs[i]);
}
return b.toString();
}
public String toHL7v2String(){
if( segments.size() == 0 )return "";
StringBuilder b = new StringBuilder();
String[] msh = segments.get(0);
char fs = msh[1].charAt(0);
assert msh[0].equals("MSH") : "first segment must be MSH";
// write MSH
b.append(msh[0]);
for( int i=2; i<msh.length; i++ ){
b.append(fs);
b.append(msh[i]);
}
b.append(segmentTerminatorChar);
// write remaining segments */
for( int i=1; i<segments.size(); i++ ){
String[] fields = segments.get(i);
b.append(fields[0]);
for( int j=1; j<fields.length; j++ ){
b.append(fs);
b.append(fields[j]);
}
b.append(segmentTerminatorChar);
}
return b.toString();
}
@Override
public String toString(){
if( segments.size() == 0 )return "[]";
else return toHL7v2String();
}
public List<String[]> getSegments(){
return segments;
}
public String[] getSegment(int index){
return segments.get(index);
}
public int numSegments(){
return segments.size();
}
public String getSegmentId(int index){
return getSegment(index)[0];
}
/**
* Splits a field into its components. The correct
* component separator character from the MSH segment
* is used for separation (e.g ^).
*
* @param field
* @return Array containing all components
*/
public String[] splitFieldComponents(String fields){
return split(fields, encodingChars[0]);
}
/**
* Splits a field into its subcomponents. The correct
* subcomponent separator character from the MSH segment
* is used for separation (e.g. &).
*
* @param field
* @return Array containing all subcomponents
*/
public String[] splitSubComponents(String field){
return split(field,encodingChars[3]);
}
public String joinFieldComponents(String[] fields){
return join(fields, encodingChars[0]);
}
public String joinSubComponents(String[] subcomps){
return join(subcomps, encodingChars[3]);
}
/**
* Write the message to the given appendable (eg. a {@link CharBuffer} or System.out).
* TODO: write unit test to verify that IOException is caused by {@link BufferOverflowException} for CharBuffers
* @param out
* @throws IOException if there is an output error or not enough space in the output buffer in case of CharBuffer
*/
public void write(Appendable out)throws IOException{
// write message
if( segments.size() == 0 ){
// message cleared/dropped by filter
// return empty buffer
}else{
// write processed segments
// reload field separator, in case of change
String[] msh = segments.get(0);
char fs = msh[1].charAt(0);
// write msh separately (b/c special handling of separator characters)
out.append(msh[0]);
out.append(msh[1]);
out.append(msh[2]);
for( int i=3; i<msh.length; i++ ){
out.append(fs);
out.append(msh[i]);
}
out.append(segmentTerminatorChar);
// TODO: find way to use specified encoding and write directly to byte buffer
// write remaining segments
for( int i=1; i<segments.size(); i++ ){
String[] s = segments.get(i);
out.append(s[0]);
for( int j=1; j<s.length; j++ ){
out.append(fs);
out.append(s[j]);
}
out.append(segmentTerminatorChar);
}
}
}
@Override
public String getMessageID() {
if( segments.size() == 0 || segments.get(0).length <= 10 )return null;
else return segments.get(0)[10];
}
public static Date dateFromHL7(String hl7date){
Calendar cal = Calendar.getInstance();
cal.clear();
if( hl7date.length() >= 4 ){
cal.set(Calendar.YEAR, Integer.parseInt(hl7date.substring(0, 4)));
}else{
log.severe("HL7 date without year segment");
}
if( hl7date.length() >= 6 ){
// important: Calendar.MONTH starts with 0 (=Januar)
// therefore, the parsed value is decremented by 1
cal.set(Calendar.MONTH, Integer.parseInt(hl7date.substring(4,6)) - 1);
}
if( hl7date.length() >= 8 ){
cal.set(Calendar.DAY_OF_MONTH, Integer.parseInt(hl7date.substring(6,8)));
}
if( hl7date.length() >= 10 ){
cal.set(Calendar.HOUR, Integer.parseInt(hl7date.substring(8,10)));
}
if( hl7date.length() >= 12 ){
cal.set(Calendar.MINUTE, Integer.parseInt(hl7date.substring(10,12)));
}
if( hl7date.length() >= 14 ){
cal.set(Calendar.SECOND, Integer.parseInt(hl7date.substring(12,14)));
}
return cal.getTime();
}
public Date getMessageTime(){
if( segments.size() == 0 || segments.get(0).length <= 7 )return null;
else return dateFromHL7(segments.get(0)[7]);
}
/**
* Compact the message by removing trailing empty fields for every segment.
* TODO: test/verify whether the last field is kept.
*/
public void compact(){
for( int i=0; i<segments.size(); i++ ){
String[] seg = segments.get(i);
if( seg[seg.length-1].length() == 0 ){
int lastUsedField = seg.length-1;
while( lastUsedField > 0 && seg[lastUsedField-1].length() == 0 )
lastUsedField --;
segments.set(i, Arrays.copyOf(seg, lastUsedField));
}
}
throw new UnsupportedOperationException();
}
public void dump(){
try {
write(System.out);
} catch (IOException e) {
System.err.println("Error while dumping message: "+e.getMessage());
e.printStackTrace();
}
}
}
package histream.hl7;
import histream.io.ProcessedMessage;
/**
* Process parsed HL7 messages. The method {@link #processMessage(Message)} can be
* called by multiple threads concurrently. Thus, synchronization should be implemented
* if necessary.
*
* @author RWM
*
*/
public interface MessageHandler {
ProcessedMessage processMessage(Message message);
}
package histream.hl7.filter;
import histream.hl7.Decoder;
import histream.hl7.Message;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CodingErrorAction;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.UnsupportedCharsetException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Hashtable;
import java.util.LinkedList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* HL7v2 message decoder which enables filters to modify the message during the decoding process.
*
* @author RWM
*
*/
public class FilteringDecoder implements Decoder{
private static Logger log = Logger.getLogger(FilteringDecoder.class.getName());
private CharBuffer charBuffer;
private ByteBuffer byteBuffer;
private String defaultCharset;
private boolean ignoreMessageCharset;
private CharsetDecoder asciiDecoder;
private Hashtable<String,CharsetDecoder> decoderCache;
private List<MessageFilter> acceptedFilters;
protected static final byte segmentTerminatorByte = 0x0D;
protected static final char segmentTerminatorChar = '\r';
ArrayList<String[]> message;
protected ArrayList<MessageFilter> filters;
public FilteringDecoder(String defaultCharset, boolean ignoreMessageCharset){
this.defaultCharset = defaultCharset;
this.ignoreMessageCharset = ignoreMessageCharset;
acceptedFilters = new LinkedList<MessageFilter>();
charBuffer = CharBuffer.allocate(1024*1024);
byteBuffer = ByteBuffer.allocate(1024*1024);
filters = new ArrayList<MessageFilter>();
decoderCache = new Hashtable<String, CharsetDecoder>();
decoderCache.put(defaultCharset, Charset.forName(defaultCharset).newDecoder());
asciiDecoder = Charset.forName("ASCII").newDecoder();
asciiDecoder.onMalformedInput(CodingErrorAction.REPLACE);
asciiDecoder.onUnmappableCharacter(CodingErrorAction.REPLACE);
}
/**
* Get the default charset. The default charset is used to decode messages which do not
* specify any charset.
* @return charset name
*/
public String getDefaultCharset(){
return defaultCharset;
}
/**
* Determine whether the message charset is ignored. If it is ignored, the
* default charset is used to decode all messages, regardles of the charset
* specified in the message.
* @return whether the message charset is ignored
*/
public boolean isMessageCharsetIgnored(){
return ignoreMessageCharset;
}
public FilteringDecoder(){
this( "ISO-8859-1", false);
}
public void addFilter(MessageFilter filter){
synchronized( filters ){
filters.add(filter);
}
}
public void removeFilter(MessageFilter filter){
synchronized( filters ){
filters.remove(filter);
}
}
protected void filterMessage(Message message){
/* filter message */
for( MessageFilter f : acceptedFilters ){
f.filter(message);
// stop if message dropped
if( message.numSegments() == 0 )break;
}
}
protected void parseMessage(List<String[]> message, ByteBuffer src) {
CharBuffer cb = charBuffer;
ByteBuffer bb = byteBuffer;
bb.clear();
cb.clear();
String[] fields = new String[100];
/* parse first segment (should be msh) */
char[] seg = new char[3];
char fs; // field separator
// decode bytes until first 0x0D
while( src.hasRemaining() ){
byte b = src.get();
if( b == segmentTerminatorByte )break;
bb.put(b);
}
bb.flip();
asciiDecoder.reset();
asciiDecoder.decode(bb, cb, true);
asciiDecoder.flush(cb);
cb.flip();
// read segment name and field separator
cb.get(seg);
fs = cb.get();
// TODO: exception/error
assert seg[0]=='M' && seg[1]=='S' && seg[2]=='H' : "MSH segment expected, but found "+new String(seg);
fields[0] = "MSH";
fields[1] = new String(new char[]{fs});
/* read to end of MSH segment */
int f = 2;
StringBuilder sb = new StringBuilder(1024);
while( cb.hasRemaining() ){
char c = cb.get();
if( c == fs ){
/* next field */
fields[f] = sb.toString();
f ++;
sb.delete(0, sb.length());
}else sb.append(c);
}
if( sb.length() != 0 ){
// last segment after fs
fields[f] = sb.toString();
f ++;
}
/* copy string array */
String[] msh = Arrays.copyOf(fields, f);
/* initialize filters */
this.acceptedFilters.clear();
synchronized( filters ){
for( int i=0; i<filters.size(); i++ ){
MessageFilter filter = filters.get(i);
/* filter providers might choose not to filter
* a message by returning null.
*/
int policy = filter.previewMSH(msh);
if( policy == 0 )continue;
else if( policy == 1 ){
// filter message
acceptedFilters.add(filter);
}else if( policy == -1 ){
// drop whole message
message.clear();
return;
}else{
log.warning("Ignoring unsupported return value "+policy+" by filter "+filter+" (.previewMSH)");
}
}
}
// add MSH segment
message.add(msh);
// load charset provider
String charset = null;
if( ignoreMessageCharset == false && msh.length > 18 && msh[18].length() > 0 )charset = msh[18];
else charset = defaultCharset;
CharsetDecoder decoder = decoderCache.get(charset);
if( decoder == null ){
try{