/* File: StreamTool.java * Copyright (C) 2002-2003 The University of Iowa * Created by: Jeremy Faden * Jessica Swanner * Edward E. West * * This file is part of the das2 library. * * das2 is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ package org.das2.stream; import org.das2.datum.Datum; import org.das2.datum.DatumVector; import java.io.*; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; /** * * @author jbf */ import java.text.DecimalFormat; import java.text.NumberFormat; import java.text.ParseException; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; import org.das2.dataset.NoDataInIntervalException; //import org.apache.xml.serialize.Method; //import org.apache.xml.serialize.OutputFormat; //import org.apache.xml.serialize.XMLSerializer; import org.das2.util.ByteBufferInputStream; import org.das2.util.InflaterChannel; import org.das2.util.LoggerManager; import org.w3c.dom.*; import org.w3c.dom.ls.DOMImplementationLS; import org.w3c.dom.ls.LSOutput; import org.w3c.dom.ls.LSSerializer; import org.xml.sax.InputSource; import org.xml.sax.SAXException; import org.xml.sax.SAXParseException; public class StreamTool { private static final Logger logger= LoggerManager.getLogger("das2.stream.d2s"); private StreamTool() { // utility class } public static class DelimeterNotFoundException extends Exception { } public static byte[] advanceTo(InputStream in, byte[] delim) throws IOException, DelimeterNotFoundException { // Read from stream to delimeter, leaving the InputStream immediately after // and returning bytes from stream. byte[] data = new byte[4096]; ArrayList list = new ArrayList(); int bytesMatched = 0; int matchIndex = 0; int streamIndex = 0; // offset in bytes from the beginning of the stream int index = 0; boolean notDone = true; int unreadOffset = -99999; int unreadLength = -99999; int totalBytesRead = 0; int offset = 0; // offset within byte[4096] while (notDone) { int byteRead = in.read(); totalBytesRead++; if (byteRead == -1) { notDone = false; } else { data[offset] = (byte) byteRead; if (delim[bytesMatched] == byteRead) { bytesMatched++; } else { bytesMatched = 0; } if (bytesMatched == delim.length) { notDone = false; index = totalBytesRead - delim.length; } } if (notDone) { offset++; if (offset == 4096) { list.add(data); offset = 0; data = new byte[4096]; } } } if (bytesMatched != delim.length) { throw new StreamTool.DelimeterNotFoundException(); } byte[] result = new byte[index]; for (int i = 0; i < list.size(); i++) { System.arraycopy(list.get(i), 0, result, i * 4096, 4096); } System.arraycopy(data, 0, result, list.size() * 4096, index - (list.size() * 4096)); return result; } /** Read off XML data from the InputStream up to the termination of the XML. * XML data is returned in a byte array. The InputStream is left just * following the XML terminator. Processing is done with as little interpretation * as possible, so invalid XML will cause problems with little feedback about * what's wrong. */ public static byte[] readXML(PushbackInputStream in) throws IOException { ReadableByteChannel channel = Channels.newChannel(in); byte[] back = new byte[4096]; ByteBuffer buffer = ByteBuffer.wrap(back); channel.read(buffer); buffer.flip(); ByteBuffer xml = readXML(buffer); byte[] bytes = new byte[xml.remaining()]; xml.get(bytes); return bytes; } private static void eatWhiteSpace(ByteBuffer buffer) { while (buffer.hasRemaining()) { char c = (char) (0xFF & buffer.get()); if (!Character.isWhitespace(c)) { buffer.position(buffer.position() - 1); return; } } } /** * return a ByteBuffer containing just the next xml portion of the stream. * @param input * @return * @throws IOException */ public static ByteBuffer readXML(ByteBuffer input) throws IOException { int gtCount = 0; int tagCount = 0; int bufidx = 0; char lastChar; boolean inQuotes = false; boolean inTag = false; boolean tagContainsSlash = false; int b; ByteBuffer buffer = input.duplicate(); buffer.mark(); eatWhiteSpace(buffer); b = 0xFF & buffer.get(); if (((char) b) != '<') { throw new IOException("found '" + ((char) b) + "', expected '<' at offset=" + buffer.position() + ".\n"); } else { gtCount++; tagContainsSlash = false; } while (buffer.hasRemaining() && (gtCount > 0 || tagCount > 0)) { lastChar = (char) b; b = 0xFF & buffer.get(); if (inQuotes && ((char) b) == '"' && lastChar != '\\') { inQuotes = false; } else if (((char) b) == '<') { gtCount++; inTag = true; tagContainsSlash = false; /* until proven otherwise */ } else if (b == (int) '>') { gtCount--; inTag = false; if (lastChar != '/') { if (tagContainsSlash) { tagCount--; } else { tagCount++; } } } else if (b == (int) '/') { if (lastChar == (int) '<') { tagContainsSlash = true; } } else if (((char) b) == '"' && inTag) { inQuotes = true; } } //findbugs INT_BAD_COMPARISON_WITH_NONNEGATIVE_VALUE. b must be positive, this was probably leftover from an earlier implementation. //if (b == -1) { // throw new IOException("unexpected end of file before xml termination\n"); //} eatWhiteSpace(buffer); int limit = buffer.limit(); buffer.limit(buffer.position()); buffer.reset(); ByteBuffer result = buffer.slice(); buffer.position(buffer.limit()); buffer.limit(limit); return result; } private static class ReadStreamStructure { private ReadableByteChannel stream; private ByteBuffer bigBuffer = ByteBuffer.allocate(1024*1024); private byte[] four = new byte[4]; private StreamHandler handler; private Map descriptors = new HashMap(); private int byteOffset = 0; // byte offset into file of the end of the buffer. private int descriptorCount = 0; // successfully read descriptors private int packetCount = 0; // successfully read packets private ReadStreamStructure(ReadableByteChannel stream, StreamHandler handler) { this.stream = stream; this.handler = handler; } @Override public String toString() { return "\ndescriptorCount="+descriptorCount+ "\npacketCount="+packetCount+ "\nbyteOffset="+byteOffset+ "\ncarotPos="+(byteOffset-bigBuffer.limit()+bigBuffer.position())+ "\nbuffer="+String.valueOf(bigBuffer); } } public static void readStream(ReadableByteChannel stream, StreamHandler handler) throws StreamException { ReadStreamStructure struct = new ReadStreamStructure(stream, handler); try { // Stream properties are loaded in the next line, but not placed in a dataset // yet. StreamDescriptor sd = getStreamDescriptor(struct); if ("deflate".equals(sd.getCompression())) { stream = getInflaterChannel(stream); } handler.streamDescriptor(sd); struct.descriptorCount++; int bytesRead; //int totalBytesRead = 0; while ((bytesRead = stream.read(struct.bigBuffer)) != -1) { struct.byteOffset += struct.bigBuffer.position(); struct.bigBuffer.flip(); if ( bytesRead==0 && stream instanceof InflaterChannel ) break; // https://bugs-pw.physics.uiowa.edu/mantis/view.php?id=441 //totalBytesRead += bytesRead; //System.err.println("d2s bytesRead="+bytesRead+" total="+totalBytesRead ); //if ( totalBytesRead>318260 ) { // System.err.println("here"); //} while (getChunk(struct)) { // this block is empty } struct.bigBuffer.compact(); } if ( struct.bigBuffer.position()!=0 ) { throw new StreamException("Stream ends with partial packet"); } handler.streamClosed(sd); } catch (StreamException se) { handler.streamException(se); throw se; } catch (InterruptedIOException ex) { throw new StreamException(ex); } catch (IOException ioe) { StreamException se = new StreamException(ioe); handler.streamException(se); throw se; } } private static StreamDescriptor getStreamDescriptor(ReadStreamStructure struct) throws StreamException, IOException { struct.bigBuffer.clear().limit(10); while (struct.bigBuffer.hasRemaining() && struct.stream.read(struct.bigBuffer) != -1) { //do nothing ; } if ( struct.byteOffset==0 && struct.bigBuffer.position()==0 ) { throw new StreamException("Stream is empty"); } if (struct.bigBuffer.hasRemaining()) { throw new StreamException("Reached end of stream before encountering stream descriptor"); } struct.byteOffset += struct.bigBuffer.position(); struct.bigBuffer.flip(); struct.bigBuffer.get(struct.four); if (isStreamDescriptorHeader(struct.four)) { int contentLength = getContentLength(struct.bigBuffer); if (contentLength == 0) { throw new StreamException("streamDescriptor content length is 0."); } struct.bigBuffer.clear().limit(contentLength); while (struct.bigBuffer.hasRemaining() && struct.stream.read(struct.bigBuffer) != -1) { // do nothing ; } if (struct.bigBuffer.hasRemaining()) { throw new StreamException("Reached end of stream before encountering stream descriptor"); } struct.byteOffset += struct.bigBuffer.position(); struct.bigBuffer.flip(); try { Document doc = getXMLDocument(struct.bigBuffer, contentLength); Element root = doc.getDocumentElement(); if (root.getTagName().equals("stream")) { StreamDescriptor sd = new StreamDescriptor(doc.getDocumentElement()); struct.bigBuffer.clear(); return sd; } else if (root.getTagName().equals("exception")) { throw exception(root); } else { throw new StreamException("Unexpected xml header, expecting stream or exception, received: " + root.getTagName()); } } catch (SAXException ex) { String msg = getSAXParseExceptionMessage(ex, struct, contentLength ); throw new StreamException(msg); } } else { String s = readMore(struct); throw new StreamException("Expecting stream descriptor header, found: '" + asciiBytesToString(struct.four, 0, 4) + "' beginning \n'" + s + "'"); } } /** * call this after error, to get another 100 bytes off the stream */ private static String readMore(ReadStreamStructure struct) throws IOException { struct.bigBuffer.position(0); struct.bigBuffer.limit(10); byte[] bytes10 = new byte[10]; struct.bigBuffer.get(bytes10); String s = new String(bytes10); struct.bigBuffer.limit(1000); struct.bigBuffer.position(0); while (struct.bigBuffer.hasRemaining() && struct.stream.read(struct.bigBuffer) != -1) { // do nothing ; } int p = struct.bigBuffer.position(); byte[] bytes = new byte[p]; struct.bigBuffer.flip(); struct.bigBuffer.get(bytes); s = s + new String(bytes); return s; } private static String getSAXParseExceptionMessage(final SAXException ex, final ReadStreamStructure struct,int contentLength) { String loc = null; if (ex instanceof SAXParseException) { SAXParseException spe = (SAXParseException) ex; loc = "Relative to packet start, line number is " + spe.getLineNumber() + ", column is " + spe.getColumnNumber(); } int bufOffset= struct.byteOffset - struct.bigBuffer.limit(); String msg = "xml parser fails with the message: \"" + ex.getMessage() + "\" within the packet ending at byte offset " + ( bufOffset + struct.bigBuffer.position() ) + "."; if (ex.getMessage().contains("trailing")) { msg += "\nNon-whitespace data found after xml closing tag, probably caused by content length error."; int i; // find the end of the closing xml tag. for (i = bufOffset + struct.bigBuffer.position() - 1; i > 0; i--) { int bpos= i - bufOffset ; if (struct.bigBuffer.get( bpos ) == '>' ) { break; } else { System.err.println( (char)struct.bigBuffer.get(bpos) ); } } for ( ; i < bufOffset + struct.bigBuffer.position() ; i++) { int bpos= i - bufOffset; if (struct.bigBuffer.get(bpos) == '[' || struct.bigBuffer.get(bpos)==':' ) { break; } } if (i > 0) { int error= i - ( struct.bigBuffer.position() + bufOffset ); //int error= ( i + bufOffset ) - ( struct.byteOffset - 10 ); NumberFormat nf = new DecimalFormat("000000"); msg += "\nContent length was " + nf.format(contentLength) + ", maybe it should have been " + nf.format(contentLength+error) + "."; } } if (loc != null) { msg += " " + loc; } return msg; } // Copies over contents of current buffer and generates a new one that can hold // a little more than the content length. Old buffer should be discarded. private static ByteBuffer biggerBuffer(ByteBuffer buf, int nContentLen) { ByteBuffer temp = ByteBuffer.allocate(8 + nContentLen + nContentLen / 10); buf.reset(); // set read position to the mark temp.put(buf); // read in data from the position to the limit temp.flip(); // set limit to the current position and the read position back to zero return temp; // this buffer should have all the old buffers data and be bigger } private static final StreamException exception(Element exception) { String type = exception.getAttribute("type"); String message= exception.getAttribute("message"); if ( type.equals( StreamException.NO_DATA_IN_INTERVAL ) ) { NoDataInIntervalException ex = new NoDataInIntervalException(message); StreamException se= new StreamException(ex); return se; } else if ( type.equals(StreamException.EMPTY_RESPONSE_FROM_READER ) ) { StreamException se = new StreamException( "Empty response from reader\n"+message ); return se; } else { return new StreamException(message); } } private static boolean getChunk(ReadStreamStructure struct) throws StreamException, IOException { struct.bigBuffer.mark(); if (struct.bigBuffer.remaining() < 4) { return false; } struct.bigBuffer.get(struct.four); if ( struct.four[0]== 0xa ) { // Jeremy's temperature sensors had the wrong content length for about a year. Short of fixing this data, check for this for now. logger.log(Level.WARNING, "off-by-one error in the content length preceding byte offset {0}", (struct.byteOffset + struct.bigBuffer.position() - struct.bigBuffer.limit() - 10 -4)); struct.four[0]= struct.four[1]; struct.four[1]= struct.four[2]; struct.four[2]= struct.four[3]; struct.four[3]= struct.bigBuffer.get(); } if (isPacketDescriptorHeader(struct.four)) { if (struct.bigBuffer.remaining() < 6) { struct.bigBuffer.reset(); return false; } int contentLength = getContentLength(struct.bigBuffer); if (contentLength == 0) { throw new StreamException("packetDescriptor content length is 0."); } // Have to be able to hold the entire packet header in the buffer if (struct.bigBuffer.capacity() < contentLength) { struct.bigBuffer = biggerBuffer(struct.bigBuffer, contentLength); return false; } if (struct.bigBuffer.remaining() < contentLength) { struct.bigBuffer.reset(); return false; } logger.log(Level.FINE, "packetDescriptor len={0}", contentLength); try { Document doc = getXMLDocument(struct.bigBuffer, contentLength); Element root = doc.getDocumentElement(); if (root.getTagName().equals("packet")) { int id= 10*(struct.four[1]-48)+(struct.four[2]-48); if ( id<0 || id>99 ) logger.warning("unable to parse id for packetDescriptor"); PacketDescriptor pd = new PacketDescriptor(id,doc.getDocumentElement()); struct.handler.packetDescriptor(pd); struct.descriptors.put(asciiBytesToString(struct.four, 1, 2), pd); } else if (root.getTagName().equals("exception")) { throw exception(root); } else if (root.getTagName().equals("comment")) { struct.handler.streamComment(new StreamComment(doc.getDocumentElement())); } else { logger.info( "Got extra stream header, ignoring." ); } struct.descriptorCount++; } catch (SAXException ex) { String msg = getSAXParseExceptionMessage(ex, struct, contentLength ); throw new StreamException(msg); } } else if (isPacketHeader(struct.four)) { String key = asciiBytesToString(struct.four, 1, 2); PacketDescriptor pd = (PacketDescriptor) struct.descriptors.get(key); int contentLength = pd.getSizeBytes(); // Have to be able to hold at least one packet in the buffer if (struct.bigBuffer.capacity() < contentLength) { struct.bigBuffer = biggerBuffer(struct.bigBuffer, contentLength); return false; } if (struct.bigBuffer.remaining() < contentLength) { struct.bigBuffer.reset(); return false; } logger.log(Level.FINE, "packetHeader len={0}", contentLength); int yCount = pd.getYCount(); Datum xTag = pd.getXDescriptor().readDatum(struct.bigBuffer); DatumVector[] vectors = new DatumVector[yCount]; for (int i = 0; i < yCount; i++) { vectors[i] = pd.getYDescriptor(i).read(struct.bigBuffer); } struct.handler.packet(pd, xTag, vectors); struct.packetCount++; } else { String msg = "Expected four byte header, found '"; String s = new String(struct.four); s = s.replaceAll("\n", "\\\\n"); // TODO: what's the right way to say this? msg += s; msg += "' at byte offset " + (struct.byteOffset + struct.bigBuffer.position() - struct.bigBuffer.limit() - 10 -4 ); msg += " after reading " + struct.descriptorCount + " descriptors and " + struct.packetCount + " packets."; if ( s.contains("\\n==") ) { byte[] buf= new byte[struct.bigBuffer.limit()-struct.bigBuffer.position()]; struct.bigBuffer.get(buf); String ss= new String( buf ); if ( ss.contains("SPICELIB") ) { msg+= "\nThis appears to be a message from SPICE:\n"; msg+= ss; } } throw new StreamException(msg); } return true; } private static ByteBuffer sliceBuffer(ByteBuffer buffer, int length) { ByteBuffer dup = buffer.duplicate(); dup.limit(dup.position() + length); return dup.slice(); } private static String asciiBytesToString(byte[] bytes, int offset, int length) { try { return new String(bytes, offset, length, "US-ASCII"); } catch (UnsupportedEncodingException uee) { //All JVM implementations are required to support US-ASCII throw new RuntimeException(uee); } } private static boolean isStreamDescriptorHeader(byte[] four) { return four[0] == (byte) '[' && four[1] == (byte) '0' && four[2] == (byte) '0' && four[3] == (byte) ']'; } private static boolean isPacketDescriptorHeader(byte[] four) { return four[0] == (byte) '[' && four[3] == (byte) ']' && (Character.isDigit((char) four[1]) && Character.isDigit((char) four[2]) || (char) four[1] == 'x' && (char) four[2] == 'x'); } private static boolean isPacketHeader(byte[] four) { return four[0] == (byte) ':' && four[3] == (byte) ':' && Character.isDigit((char) four[1]) && Character.isDigit((char) four[2]); } private static Document getXMLDocument(ByteBuffer buffer, int contentLength) throws StreamException, IOException, SAXException { ByteBuffer xml = buffer.duplicate(); ByteBuffer xml2=null; if ( logger.isLoggable(Level.FINEST) ) xml2= buffer.duplicate(); // for debugging. xml.limit(xml.position() + contentLength); if ( logger.isLoggable(Level.FINEST) ) xml2.limit(xml.position() + contentLength); buffer.position(buffer.position() + contentLength); final boolean DEBUG = false; if (DEBUG) { int pos = xml.position(); byte[] bytes = new byte[xml.limit() - xml.position()]; xml.get(bytes); xml.position(pos); System.err.println(new String(bytes)); } ByteBufferInputStream bbin; if ( logger.isLoggable(Level.FINEST) ) { bbin = new ByteBufferInputStream(xml2); try ( InputStreamReader isr2 = new InputStreamReader(bbin,"UTF-8") ) { System.err.println("the encoding is "+isr2.getEncoding()); int c= isr2.read(); int count= 0; while ( c!=-1 && count<1000 ) { System.err.println( String.format( "%05d %04d %X %c", count, c, c, c ) ); count++; c= isr2.read(); } } } bbin = new ByteBufferInputStream(xml); try ( InputStreamReader isr = new InputStreamReader(bbin,"UTF-8") ) { DocumentBuilder builder; builder = DocumentBuilderFactory.newInstance().newDocumentBuilder(); InputSource source = new InputSource(isr); logger.fine("setting UTF-8 to input source"); source.setEncoding("UTF-8"); Document document = builder.parse(source); return document; } catch (ParserConfigurationException ex) { throw new RuntimeException(ex); } } private static int getContentLength(ByteBuffer buffer) throws StreamException { int contentLength = 0; for (int i = 0; i < 6; i++) { char c = (char) (0xFF & buffer.get()); if (c == ' ') { continue; } if (!Character.isDigit(c)) { throw new StreamException("Invalid character in contentLength: '" + c + "'"); } int digit = Character.digit(c, 10); contentLength = contentLength * 10 + digit; } return contentLength; } public static void formatHeader(Document document, Writer writer) throws StreamException { DOMImplementationLS ls = (DOMImplementationLS) document.getImplementation().getFeature("LS", "3.0"); LSOutput output = ls.createLSOutput(); output.setCharacterStream(writer); LSSerializer serializer = ls.createLSSerializer(); try { if ( serializer.getDomConfig().canSetParameter( "format-pretty-print", Boolean.TRUE ) ) { serializer.getDomConfig().setParameter( "format-pretty-print", Boolean.TRUE ); } } catch ( Error e ) { e.printStackTrace(); } serializer.write(document, output); /* try { OutputFormat format = new OutputFormat(Method.XML, "US-ASCII", true); XMLSerializer serializer = new XMLSerializer(writer, format); serializer.serialize(document); } catch (IOException ioe) { throw new StreamException(ioe); } */ } public static Map processPropertiesElement(Element element) throws StreamException { try { if (!element.getTagName().equals("properties")) { // TODO maybe this should be a RuntimeException throw new StreamException("expecting 'properties' element, encountered '" + element.getTagName() + "'"); } HashMap map = new HashMap(); NamedNodeMap attributes = element.getAttributes(); for (int i = 0; i < attributes.getLength(); i++) { Attr attr = (Attr) attributes.item(i); String name = attr.getName(); String[] split = name.split(":"); if (split.length == 1) { map.put(name, attr.getValue()); } else if (split.length == 2) { PropertyType type = PropertyType.getByName(split[0]); Object value = type.parse(attr.getValue()); map.put(split[1], value); } else { throw new IllegalArgumentException("Invalid typed name: " + name); } } return map; } catch (ParseException pe) { String msg= pe.getMessage(); if (msg==null ) msg=pe.toString(); StreamException se = new StreamException(msg); se.initCause(pe); throw se; } } private final static HashMap typesMap; static { typesMap = new HashMap(); typesMap.put(Datum.class, "Datum"); typesMap.put(Datum.Double.class, "Datum"); typesMap.put(Integer.class, "int"); } public static Element processPropertiesMap(Document document, Map properties) { Element propertiesElement = document.createElement("properties"); for (Iterator i = properties.entrySet().iterator(); i.hasNext();) { Map.Entry entry = (Map.Entry) i.next(); String key = (String) entry.getKey(); Object value = entry.getValue(); if (value == null) { continue; } if (typesMap.containsKey(value.getClass())) { key = (String) typesMap.get(value.getClass()) + ":" + key; } propertiesElement.setAttribute(key, value.toString()); } return propertiesElement; } private static ReadableByteChannel getInflaterChannel(ReadableByteChannel channel) throws IOException { return new InflaterChannel(channel); //return Channels.newChannel(new InflaterInputStream(Channels.newInputStream(channel))); } }