package org.das2.client; import java.util.LinkedHashMap; import java.util.Map; import java.util.Map.Entry; import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.das2.datum.CacheTag; import org.das2.datum.Datum; import org.das2.datum.DatumRange; import org.das2.datum.DatumVector; import org.das2.qds.DDataSet; import org.das2.qds.DataSetUtil; import org.das2.qds.JoinDataSet; import org.das2.qds.MutablePropertyDataSet; import org.das2.qds.QDataSet; import org.das2.qds.SemanticOps; import org.das2.qds.ops.Ops; import org.das2.qds.util.DataSetBuilder; import org.das2.stream.PacketDescriptor; import org.das2.stream.SkeletonDescriptor; import org.das2.stream.StreamComment; import org.das2.stream.StreamDescriptor; import org.das2.stream.StreamException; import org.das2.stream.StreamHandler; import org.das2.stream.StreamScalarDescriptor; import org.das2.stream.StreamYDescriptor; import org.das2.stream.StreamYScanDescriptor; import org.das2.stream.StreamZDescriptor; import org.das2.util.LoggerManager; import org.das2.util.monitor.NullProgressMonitor; import org.das2.util.monitor.ProgressMonitor; /** * Read the Das2Stream into a QDataSet. * @author jbf * @see DataSetStreamHandler which reads into the old model. */ public class QDataSetStreamHandler implements StreamHandler { private static final Logger logger= LoggerManager.getLogger("das2.dataTransfer"); private Map xbuilders; private Map builders; /** * as each id is retired (for example, packet id 01 gets a new definition), put its data into the result. */ private QDataSet jds= null; private Map schemes; private PacketDescriptor currentPd=null; private DataSetBuilder[] currentBuilders; private DataSetBuilder currentXBuilder; private String streamTitle; private Map streamProperties; /** * TODO: mission statement, how does this compare to jds? */ private QDataSet ds=null; //private Object collectionMode= MODE_SPLIT_BY_PACKET_DESCRIPTOR; private final Object collectionMode= MODE_SPLIT_BY_NEW_PACKET_DESCRIPTOR; private ProgressMonitor monitor= new NullProgressMonitor(); private static final String SCHEME_XYZSCATTER= "xyzScatter"; private static final String SCHEME_PEAKS_AND_AVERAGES= "peaksAndAverages"; public QDataSetStreamHandler() { } public void setMonitor( ProgressMonitor monitor ) { this.monitor= monitor; } @Override public void streamDescriptor(StreamDescriptor sd) throws StreamException { logger.log(Level.FINE, "streamDescriptor: {0}", sd); xbuilders= new LinkedHashMap<>(); builders= new LinkedHashMap<>(); schemes= new LinkedHashMap<>(); String t= (String) sd.getProperty("title"); if ( t!=null ) { streamTitle= adaptUserProperty( t ); } else { streamTitle= null; } streamProperties= sd.getProperties(); Object o; if ( ( o= sd.getProperty("taskSize") )!=null ) { monitor.setTaskSize( ((Integer)o) ); monitor.started(); } else if ( ( o= sd.getProperty("packetCount" ) )!=null ) { monitor.setTaskSize( ((Integer)o) ); monitor.started(); } } private void putProperty( DataSetBuilder builder, String name, Object value ) { if ( value instanceof Datum ) { logger.warning("kludge to fix Datum property values"); value= ((Datum)value).doubleValue(((Datum)value).getUnits()); } if ( SemanticOps.checkPropertyType( name, value, false ) ) { builder.putProperty( name, value ); } else { logger.log(Level.WARNING, "property \"{0}\" should be type \"{1}\"", new Object[]{name, SemanticOps.getPropertyType(name)}); } } private final Pattern ptrn = Pattern.compile("(%\\{)(.+?)(\\})"); private String adaptUserProperty( String s ) { Matcher m = ptrn.matcher(s); while (m.find()) { // Group indices are not as expected, 0 = entire match, 1 = 1st group, etc. if (!m.group(2).contains("USER_PROPERTIES")) { s = String.format("%sUSER_PROPERTIES.%s%s", s.substring(0, m.end(1)), s.substring(m.start(2), m.end(2)), s.substring(m.start(3), s.length())); m = ptrn.matcher(s); } } return s; } private Object findProperty( StreamYScanDescriptor sd, String d2sName ) { Object o= sd.getProperty(d2sName); if ( o==null ) { String n= sd.getName(); o= streamProperties.get( n + "." + d2sName ); } if ( o==null ) { o= streamProperties.get( d2sName ); } // look for macros which refer to the properties. These will become // USER_PROPERTIES, and the macro needs to use that name. if ( o instanceof String ) { o= adaptUserProperty((String)o); } return o; } private Object findProperty( StreamScalarDescriptor sd, String d2sName ) { Object o= sd.getProperty(d2sName); if ( o==null ) { String n= sd.getName(); o= streamProperties.get( n + "." + d2sName ); } if ( o==null ) { o= streamProperties.get( d2sName ); } return o; } @Override public void packetDescriptor(PacketDescriptor pd) throws StreamException { logger.log(Level.FINE, "packetDescriptor: {0}", pd); createBuilders(pd); } @Override public void packet(PacketDescriptor pd, Datum xTag, DatumVector[] vectors) throws StreamException { if ( pd!=currentPd ) { if ( currentPd!=null && collectionMode==MODE_SPLIT_BY_PACKET_DESCRIPTOR ) { collectDataSet(); createBuilders(currentPd); } logger.log(Level.FINE, "packet type changed: {0}", pd.getYDescriptor(0).getSizeBytes()); currentXBuilder= xbuilders.get(pd.getId()); currentBuilders= builders.get(pd.getId()); currentPd= pd; } currentXBuilder.nextRecord(xTag); for ( int i=0; i Level.FINE.intValue()) { logger.log(Level.FINE, sc.getValue()); } else { logger.log(l, sc.getValue()); } monitor.setProgressMessage(sc.getValue()); } } public void createBuilders( PacketDescriptor pd ) { DataSetBuilder[] lbuilders= new DataSetBuilder[pd.getYCount()]; for ( int i=0; i-1 ) { prefix= name0.substring(0,ip); } else { prefix= name0; } if ( ds1.rank()==3 && ( ( name0.equals( prefix + ".avg" ) && name1.equals( prefix + ".max" ) ) || ( name0.equals( prefix ) && name1.equals( prefix + ".max" ) ) || ( prefix.equals("") && name1.equals("peaks") ) ) ) { QDataSet max= Ops.unbundle(ds1,1); max= Ops.putProperty( max, QDataSet.NAME, name1.replaceAll("\\.","_") ); max= Ops.putProperty( max, QDataSet.BUNDLE_1, null ); max= Ops.link( xds1, max ); ds1= Ops.unbundle(ds1,0); ds1= Ops.putProperty( ds1, QDataSet.BIN_MAX, max ); ds1= Ops.putProperty( ds1, QDataSet.BUNDLE_1, null ); } else if ( ds1.rank()==3 && name1.equals( prefix + ".min" ) ) { QDataSet min= Ops.unbundle(ds1,1); min= Ops.putProperty( min, QDataSet.NAME, name1.replaceAll("\\.","_") ); min= Ops.putProperty( min, QDataSet.BUNDLE_1, null ); min= Ops.link( xds1, min ); ds1= Ops.unbundle(ds1,0); ds1= Ops.putProperty( ds1, QDataSet.BIN_MIN, min ); ds1= Ops.putProperty( ds1, QDataSet.BUNDLE_1, null ); } } } if ( ds1 instanceof MutablePropertyDataSet && !((MutablePropertyDataSet)ds1).isImmutable() ) { ((MutablePropertyDataSet)ds1).putProperty( QDataSet.DEPEND_0, xds1 ); } else { ds1= Ops.link( xds1, ds1 ); } if ( ds==null ) { ds= ds1; } else { ds= Ops.join( ds, ds1 ); } } /** * return the dataset collected by the handler, or null if no records have been received. * @return the dataset collected by the handler, or null if no records have been received. */ public QDataSet getDataSet() { if ( collectionMode==MODE_SPLIT_BY_PACKET_DESCRIPTOR ) { if ( currentXBuilder==null ) return null; collectDataSet(); } else { if ( currentXBuilder==null ) return null; int nbuilders= builders.size(); if ( nbuilders==1 ) { collectDataSet(); if ( jds!=null ) { // TODO: the structure of this changed, so that "ds" should probably be renamed with a new mission statement. if ( ds==null ) { ds= jds; } else { jds= (JoinDataSet)Ops.join( jds, ds ); ds= jds; } } } else { for ( Entry e: builders.entrySet() ) { int id= e.getKey(); currentBuilders= e.getValue(); currentXBuilder= xbuilders.get(id); QDataSet ds1= collectDataSet( currentXBuilder, currentBuilders ); jds= (JoinDataSet)Ops.join( jds, ds1 ); } ds= jds; } } if ( ds==null ) return null; if ( ds instanceof JoinDataSet ) { if ( ds.length()<2 ) { //TODO: consider slice } else { if ( appendable( ds.slice(0), ds.slice(1) ) ) { ds= jds.slice(0); for ( int i=1; i