1 /************************************************************************
4 ** This file is property of and copyright by the Technical Computer
5 ** Science Group, Kirchhoff Institute for Physics, Ruprecht-Karls-
6 ** University, Heidelberg, Germany, 2001
7 ** This file has been written by Timm Morten Steinbeck,
8 ** timm@kip.uni-heidelberg.de
11 ** See the file license.txt for details regarding usage, modification,
12 ** distribution and warranty.
13 ** Important: This file is provided without any warranty, including
14 ** fitness for any particular purpose.
17 ** Newer versions of this file's package will be made available from
18 ** http://web.kip.uni-heidelberg.de/Hardwinf/L3/
19 ** or the corresponding page of the Heidelberg Alice Level 3 group.
21 *************************************************************************/
24 ***************************************************************************
26 ** $Author$ - Initial Version by Timm Morten Steinbeck
30 ***************************************************************************
33 /** @file AliHLTHOMERReader.cxx
34 @author Timm Steinbeck
36 @brief HLT Online Monitoring Environment including ROOT - Reader
37 @note migrated from PubSub HLT-stable-20070905.141318 (rev 2375) */
39 // see header file for class documentation
41 // refer to README to build package
43 // visit http://web.ift.uib.no/~kjeks/doc/alice-hlt
45 #include "AliHLTHOMERReader.h"
55 //#include <sys/types.h>
56 //#include <sys/socket.h>
57 //#include <netinet/in.h>
58 //#include <netinet/tcp.h>
60 #include <rpc/types.h>
63 #include <netinet/in.h>
64 #include <arpa/inet.h>
70 #define MOD_BIN "MOD BIN\n"
71 #define MOD_ASC "MOD ASC\n"
72 #define GET_ONE "GET ONE\n"
73 #define GET_ALL "GET ALL\n"
75 // MAXHOSTNAMELEN not defined on macosx
76 // 686-apple-darwin9-gcc-4.0.1
77 #ifndef MAXHOSTNAMELEN
78 #define MAXHOSTNAMELEN 64
82 ClassImp(AliHLTMonitoringReader);
83 ClassImp(AliHLTHOMERReader);
91 AliHLTHOMERReader::AliHLTHOMERReader()
93 fCurrentEventType(~(homer_uint64)0),
94 fCurrentEventID(~(homer_uint64)0),
100 fShmDataSourceCnt(0),
101 fDataSourceMaxCnt(0),
103 fConnectionStatus(0),
104 fErrorConnection(~(unsigned int)0),
105 fEventRequestAdvanceTime(0)
107 // Reader implementation of the HOMER interface.
108 // The HLT Monitoring Environment including ROOT is
109 // a native interface to ship out data from the HLT chain.
110 // See pdf document shiped with the package
111 // for class documentation and tutorial.
117 AliHLTHOMERReader::AliHLTHOMERReader( const char* hostname, unsigned short port )
119 AliHLTMonitoringReader(),
120 fCurrentEventType(~(homer_uint64)0),
121 fCurrentEventID(~(homer_uint64)0),
126 fTCPDataSourceCnt(0),
127 fShmDataSourceCnt(0),
128 fDataSourceMaxCnt(0),
130 fConnectionStatus(0),
131 fErrorConnection(~(unsigned int)0),
132 fEventRequestAdvanceTime(0)
134 // see header file for class documentation
135 // For reading from a TCP port
137 if ( !AllocDataSources(1) )
139 fErrorConnection = 0;
140 fConnectionStatus = ENOMEM;
143 fConnectionStatus = AddDataSource( hostname, port, fDataSources[0] );
144 if ( fConnectionStatus )
145 fErrorConnection = 0;
150 fDataSources[0].fNdx = 0;
154 AliHLTHOMERReader::AliHLTHOMERReader( unsigned int tcpCnt, const char** hostnames, unsigned short* ports )
156 AliHLTMonitoringReader(),
157 fCurrentEventType(~(homer_uint64)0),
158 fCurrentEventID(~(homer_uint64)0),
163 fTCPDataSourceCnt(0),
164 fShmDataSourceCnt(0),
165 fDataSourceMaxCnt(0),
167 fConnectionStatus(0),
168 fErrorConnection(~(unsigned int)0),
169 fEventRequestAdvanceTime(0)
171 // see header file for class documentation
172 // For reading from multiple TCP ports
174 if ( !AllocDataSources(tcpCnt) )
176 fErrorConnection = 0;
177 fConnectionStatus = ENOMEM;
180 for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
182 fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
183 if ( fConnectionStatus )
185 fErrorConnection = n;
188 fDataSources[n].fNdx = n;
192 AliHLTHOMERReader::AliHLTHOMERReader( key_t shmKey, int shmSize )
194 AliHLTMonitoringReader(),
195 fCurrentEventType(~(homer_uint64)0),
196 fCurrentEventID(~(homer_uint64)0),
201 fTCPDataSourceCnt(0),
202 fShmDataSourceCnt(0),
203 fDataSourceMaxCnt(0),
205 fConnectionStatus(0),
206 fErrorConnection(~(unsigned int)0),
207 fEventRequestAdvanceTime(0)
209 // see header file for class documentation
210 // For reading from a System V shared memory segment
212 if ( !AllocDataSources(1) )
214 fErrorConnection = 0;
215 fConnectionStatus = ENOMEM;
218 fConnectionStatus = AddDataSource( shmKey, shmSize, fDataSources[0] );
219 if ( fConnectionStatus )
220 fErrorConnection = 0;
225 fDataSources[0].fNdx = 0;
229 AliHLTHOMERReader::AliHLTHOMERReader( unsigned int shmCnt, key_t* shmKeys, int* shmSizes )
231 AliHLTMonitoringReader(),
232 fCurrentEventType(~(homer_uint64)0),
233 fCurrentEventID(~(homer_uint64)0),
238 fTCPDataSourceCnt(0),
239 fShmDataSourceCnt(0),
240 fDataSourceMaxCnt(0),
242 fConnectionStatus(0),
243 fErrorConnection(~(unsigned int)0),
244 fEventRequestAdvanceTime(0)
246 // see header file for class documentation
247 // For reading from multiple System V shared memory segments
249 if ( !AllocDataSources(shmCnt) )
251 fErrorConnection = 0;
252 fConnectionStatus = ENOMEM;
255 for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
257 fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[n] );
258 if ( fConnectionStatus )
260 fErrorConnection = n;
263 fDataSources[n].fNdx = n;
267 AliHLTHOMERReader::AliHLTHOMERReader( unsigned int tcpCnt, const char** hostnames, unsigned short* ports,
268 unsigned int shmCnt, key_t* shmKeys, int* shmSizes )
270 AliHLTMonitoringReader(),
271 fCurrentEventType(~(homer_uint64)0),
272 fCurrentEventID(~(homer_uint64)0),
277 fTCPDataSourceCnt(0),
278 fShmDataSourceCnt(0),
279 fDataSourceMaxCnt(0),
281 fConnectionStatus(0),
282 fErrorConnection(~(unsigned int)0),
283 fEventRequestAdvanceTime(0)
285 // see header file for class documentation
286 // For reading from multiple TCP ports and multiple System V shared memory segments
288 if ( !AllocDataSources(tcpCnt+shmCnt) )
290 fErrorConnection = 0;
291 fConnectionStatus = ENOMEM;
294 for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
296 fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
297 if ( fConnectionStatus )
299 fErrorConnection = n;
302 fDataSources[n].fNdx = n;
304 for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
306 fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[tcpCnt+n] );
307 if ( fConnectionStatus )
309 fErrorConnection = tcpCnt+n;
312 fDataSources[n].fNdx = n;
316 AliHLTHOMERReader::AliHLTHOMERReader( const void* pBuffer, int size )
318 AliHLTMonitoringReader(),
319 fCurrentEventType(~(homer_uint64)0),
320 fCurrentEventID(~(homer_uint64)0),
325 fTCPDataSourceCnt(0),
326 fShmDataSourceCnt(0),
327 fDataSourceMaxCnt(0),
329 fConnectionStatus(0),
330 fErrorConnection(~(unsigned int)0),
331 fEventRequestAdvanceTime(0)
333 // see header file for class documentation
334 // For reading from a System V shared memory segment
336 if ( !AllocDataSources(1) )
338 fErrorConnection = 0;
339 fConnectionStatus = ENOMEM;
342 fConnectionStatus = AddDataSource(const_cast<void*>(pBuffer), size, fDataSources[0] );
343 if ( fConnectionStatus )
344 fErrorConnection = 0;
349 fDataSources[0].fNdx = 0;
353 AliHLTHOMERReader::~AliHLTHOMERReader()
355 // see header file for class documentation
356 ReleaseCurrentEvent();
360 int AliHLTHOMERReader::ReadNextEvent()
362 // see header file for class documentation
363 // Read in the next available event
364 return ReadNextEvent( false, 0 );
367 int AliHLTHOMERReader::ReadNextEvent( unsigned long timeout )
369 // see header file for class documentation
370 // Read in the next available event
371 return ReadNextEvent( true, timeout );
374 unsigned long AliHLTHOMERReader::GetBlockDataLength( unsigned long ndx ) const
376 // see header file for class documentation
377 // Return the size (in bytes) of the current event's data
378 // block with the given block index (starting at 0).
379 if ( ndx >= fBlockCnt )
381 return fBlocks[ndx].fLength;
384 const void* AliHLTHOMERReader::GetBlockData( unsigned long ndx ) const
386 // see header file for class documentation
387 // Return a pointer to the start of the current event's data
388 // block with the given block index (starting at 0).
389 if ( ndx >= fBlockCnt )
391 return fBlocks[ndx].fData;
394 const char* AliHLTHOMERReader::GetBlockSendNodeID( unsigned long ndx ) const
396 // see header file for class documentation
397 // Return IP address or hostname of node which sent the
398 // current event's data block with the given block index
400 // For HOMER this is the ID of the node on which the subscriber
401 // that provided this data runs/ran.
402 if ( ndx >= fBlockCnt )
405 if ( fBlocks[ndx].fSource >= fDataSourceCnt )
407 fprintf( stderr, "%s:%d: Internal Error: fBlocks[ndx].fSource (%lu) >= fDataSourceCnt (%lu)\n",
408 __FILE__, __LINE__, fBlocks[ndx].fSource, fDataSourceCnt );
412 return fDataSources[ fBlocks[ndx].fSource ].fHostname;
413 //return fBlocks[ndx].fOriginatingNodeID;
416 homer_uint8 AliHLTHOMERReader::GetBlockByteOrder( unsigned long ndx ) const
418 // see header file for class documentation
419 // Return byte order of the data stored in the
420 // current event's data block with the given block index (starting at 0).
421 // 0 is unknown alignment,
422 // 1 ist little endian,
423 // 2 is big endian. */
424 if ( ndx >= fBlockCnt )
426 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
427 return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kByteOrderAttribute_8b_Offset);
430 homer_uint8 AliHLTHOMERReader::GetBlockTypeAlignment( unsigned long ndx, homer_uint8 dataType ) const
432 // see header file for class documentation
433 // Return the alignment (in bytes) of the given datatype
434 // in the data stored in the current event's data block
435 // with the given block index (starting at 0).
436 // Possible values for the data type are
443 if ( ndx >= fBlockCnt )
445 if ( dataType > (kFloatAlignment_8b_Offset-kAlignment_8b_StartOffset) )
447 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
448 return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kAlignment_8b_StartOffset+dataType);
451 homer_uint64 AliHLTHOMERReader::GetBlockStatusFlags( unsigned long ndx ) const
453 // see header file for class documentation
454 if ( ndx >= fBlockCnt )
456 return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kStatusFlags_64b_Offset);
460 /* Return the type of the data in the current event's data
461 block with the given block index (starting at 0). */
462 homer_uint64 AliHLTHOMERReader::GetBlockDataType( unsigned long ndx ) const
464 // see header file for class documentation
465 if ( ndx >= fBlockCnt )
466 return ~(homer_uint64)0;
467 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
468 return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kType_64b_Offset);
471 /* Return the origin of the data in the current event's data
472 block with the given block index (starting at 0). */
473 homer_uint32 AliHLTHOMERReader::GetBlockDataOrigin( unsigned long ndx ) const
475 // see header file for class documentation
476 if ( ndx >= fBlockCnt )
477 return ~(homer_uint32)0;
478 //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType1.fID );
479 return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType1_64b_Offset);
482 /* Return a specification of the data in the current event's data
483 block with the given block index (starting at 0). */
484 homer_uint32 AliHLTHOMERReader::GetBlockDataSpec( unsigned long ndx ) const
486 // see header file for class documentation
487 if ( ndx >= fBlockCnt )
488 return ~(homer_uint32)0;
489 //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType2.fID );
490 return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType2_64b_Offset);
493 /* Find the next data block in the current event with the given
494 data type, origin, and specification. Returns the block's
496 unsigned long AliHLTHOMERReader::FindBlockNdx( homer_uint64 type, homer_uint32 origin,
497 homer_uint32 spec, unsigned long startNdx ) const
499 // see header file for class documentation
500 for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
502 if ( ( type == 0xFFFFFFFFFFFFFFFFULL || *(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset)==type ) &&
503 ( origin == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset) )==origin ) &&
504 ( spec == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) )==spec ) )
507 return ~(unsigned long)0;
510 /* Find the next data block in the current event with the given
511 data type, origin, and specification. Returns the block's
513 unsigned long AliHLTHOMERReader::FindBlockNdx( char type[8], char origin[4],
514 homer_uint32 spec, unsigned long startNdx ) const
516 // see header file for class documentation
517 for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
519 bool found1=true, found2=true;
520 for ( unsigned i = 0; i < 8; i++ )
522 if ( type[i] != (char)0xFF )
531 for ( unsigned i = 0; i < 8; i++ )
533 //printf( "%u: Comparing type '%c' and '%c'\n", i, ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i], type[i] );
534 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i] != type[i] )
541 for ( unsigned i = 0; i < 4; i++ )
543 if ( origin[i] != (char)0xFF )
552 for ( unsigned i = 0; i < 4; i++ )
554 //printf( "Comparing origin '%c' and '%c'\n", ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i], origin[i] );
555 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i] != origin[i] )
562 //printf( "Comparing spec '0x%08lX' and '0x%08lX'\n", (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) ), spec );
563 if ( found1 && found2 &&
564 ( spec == 0xFFFFFFFF || ((homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset)) )==spec ) )
567 return ~(unsigned long)0;
570 /* Return the ID of the node that actually produced this data block.
571 This may be different from the node which sent the data to this
572 monitoring object as returned by GetBlockSendNodeID. */
573 const char* AliHLTHOMERReader::GetBlockCreateNodeID( unsigned long ndx ) const
575 // see header file for class documentation
576 if ( ndx >= fBlockCnt )
578 return fBlocks[ndx].fOriginatingNodeID;
582 void AliHLTHOMERReader::Init()
584 // see header file for class documentation
585 fCurrentEventType = ~(homer_uint64)0;
586 fCurrentEventID = ~(homer_uint64)0;
587 fMaxBlockCnt = fBlockCnt = 0;
590 fDataSourceMaxCnt = fDataSourceCnt = fTCPDataSourceCnt = fShmDataSourceCnt = 0;
594 fConnectionStatus = 0;
595 fErrorConnection = ~(unsigned int)0;
597 fEventRequestAdvanceTime = 0;
600 bool AliHLTHOMERReader::AllocDataSources( unsigned int sourceCnt )
602 // see header file for class documentation
603 fDataSources = new DataSource[ sourceCnt ];
606 memset(fDataSources, 0, sizeof(DataSource)*sourceCnt);
608 fDataSourceMaxCnt = sourceCnt;
612 int AliHLTHOMERReader::AddDataSource( const char* hostname, unsigned short port, DataSource& source )
614 // see header file for class documentation
616 he = gethostbyname( hostname );
619 //fprintf( stderr, "Unable to determine remote host address from '%s'.\n", hostname );
620 return EADDRNOTAVAIL;
623 struct sockaddr_in remoteAddr;
624 remoteAddr.sin_family = AF_INET; // host byte order
625 remoteAddr.sin_port = htons(port); // short, network byte order
626 remoteAddr.sin_addr = *((struct in_addr *)he->h_addr);
627 memset(&(remoteAddr.sin_zero), '\0', 8); // zero the rest of the struct
629 // Create socket and connect to target program on remote node
630 source.fTCPConnection = socket( AF_INET, SOCK_STREAM, 0 );
631 if ( source.fTCPConnection == -1 )
638 ret = connect( source.fTCPConnection, (struct sockaddr *)&remoteAddr, sizeof(struct sockaddr) );
642 close( source.fTCPConnection );
646 ret = write( source.fTCPConnection, MOD_BIN, strlen(MOD_BIN) );
647 if ( ret != (int)strlen(MOD_BIN) )
650 close( source.fTCPConnection );
654 char* tmpchar = new char[ strlen( hostname )+1 ];
657 close( source.fTCPConnection );
660 strcpy( tmpchar, hostname );
661 source.fHostname = tmpchar;
664 source.fTCPPort = port;
666 source.fDataSize = 0;
667 source.fDataRead = 0;
671 int AliHLTHOMERReader::AddDataSource( key_t shmKey, int shmSize, DataSource& source )
673 // see header file for class documentation
675 char* tmpchar = new char[ MAXHOSTNAMELEN+1 ];
680 gethostname( tmpchar, MAXHOSTNAMELEN );
681 tmpchar[MAXHOSTNAMELEN]=(char)0;
682 source.fHostname = tmpchar;
684 source.fShmID = shmget( shmKey, shmSize, 0660 );
685 if ( source.fShmID == -1 )
688 delete [] source.fHostname;
692 source.fShmPtr = (void*)shmat( source.fShmID, NULL, 0 );
694 if ( !source.fShmPtr )
697 shmctl( source.fShmID, IPC_RMID, NULL );
698 delete [] source.fHostname;
703 source.fShmKey = shmKey;
704 source.fShmSize = shmSize;
705 source.fDataSize = 0;
706 source.fDataRead = 0;
710 int AliHLTHOMERReader::AddDataSource( void* pBuffer, int size, DataSource& source )
712 // see header file for class documentation
713 // a buffer data source is like a shm source apart from the shm attach and detach
714 // procedure. Furthermore, the size indicator at the beginning of the buffer is not
715 // cleared right before sources are read but after the reading.
717 if ( !pBuffer || size<=0) return EINVAL;
719 char* tmpchar = new char[ MAXHOSTNAMELEN+1 ];
724 gethostname( tmpchar, MAXHOSTNAMELEN );
725 tmpchar[MAXHOSTNAMELEN]=(char)0;
726 source.fHostname = tmpchar;
729 // the data buffer does not contain a size indicator in the first 4 bytes
730 // like the shm source buffer. Still we want to use the mechanism to invalidate/
731 // trigger by clearing the size indicator. Take the source.fShmSize variable.
732 source.fShmPtr = &source.fShmSize;
735 source.fShmSize = size;
736 source.fData = pBuffer;
737 source.fDataSize = 0;
738 source.fDataRead = 0;
742 void AliHLTHOMERReader::FreeDataSources()
744 // see header file for class documentation
745 for ( unsigned n=0; n < fDataSourceCnt; n++ )
747 if ( fDataSources[n].fType == kTCP )
748 FreeTCPDataSource( fDataSources[n] );
749 else if ( fDataSources[n].fType == kShm )
750 FreeShmDataSource( fDataSources[n] );
754 int AliHLTHOMERReader::FreeShmDataSource( DataSource& source )
756 // see header file for class documentation
757 if ( source.fShmPtr )
758 shmdt( source.fShmPtr );
759 // if ( source.fShmID != -1 )
760 // shmctl( source.fShmID, IPC_RMID, NULL );
761 if ( source.fHostname )
762 delete [] source.fHostname;
766 int AliHLTHOMERReader::FreeTCPDataSource( DataSource& source )
768 // see header file for class documentation
769 if ( source.fTCPConnection )
770 close( source.fTCPConnection );
771 if ( source.fHostname )
772 delete [] source.fHostname;
776 int AliHLTHOMERReader::ReadNextEvent( bool useTimeout, unsigned long timeout )
778 // see header file for class documentation
779 if ( fDataSourceCnt<=0 )
781 // Clean up currently active event.
782 ReleaseCurrentEvent();
784 // Trigger all configured data sources
785 for ( unsigned n = 0; n<fDataSourceCnt; n++ )
787 if ( fDataSources[n].fType == kTCP )
788 ret = TriggerTCPSource( fDataSources[n], useTimeout, timeout );
789 else if ( fDataSources[n].fType == kShm )
790 ret = TriggerShmSource( fDataSources[n], useTimeout, timeout );
793 fErrorConnection = n;
794 fConnectionStatus=ret;
795 return fConnectionStatus;
798 // Now read in data from the configured data source
799 ret = ReadDataFromTCPSources( fTCPDataSourceCnt, fDataSources, useTimeout, timeout );
804 ret = ReadDataFromShmSources( fShmDataSourceCnt, fDataSources+fTCPDataSourceCnt, useTimeout, timeout );
809 // for ( unsigned n = 0; n<fDataSourceCnt; n++ )
811 // if ( fDataSources[n].fType == kTCP )
812 // ret = ReadDataFromTCPSource( fDataSources[n], useTimeout, timeout );
814 // ret = ReadDataFromShmSource( fDataSources[n], useTimeout, timeout );
817 // fErrorConnection = n;
818 // fConnectionStatus=ret;
819 // return fConnectionStatus;
822 //Check to see that all sources contributed data for the same event
823 homer_uint64 eventID;
824 homer_uint64 eventType;
825 if (!fDataSources[0].fData)
827 fErrorConnection = 0;
828 fConnectionStatus=56;//ENOBUF;
829 return fConnectionStatus;
831 eventID = GetSourceEventID( fDataSources[0] );
832 eventType = GetSourceEventType( fDataSources[0] );
833 for ( unsigned n = 1; n < fDataSourceCnt; n++ )
835 if ( !fDataSources[n].fData || GetSourceEventID( fDataSources[n] ) != eventID || GetSourceEventType( fDataSources[n] ) != eventType )
837 fErrorConnection = n;
838 fConnectionStatus=56;//EBADRQC;
839 return fConnectionStatus;
842 // Find all the different data blocks contained in the data from all
844 for ( unsigned n = 0; n < fDataSourceCnt; n++ )
846 ret = ParseSourceData( fDataSources[n] );
849 fErrorConnection = n;
850 fConnectionStatus=57;//EBADSLT;
854 fCurrentEventID = eventID;
855 fCurrentEventType = eventType;
859 void AliHLTHOMERReader::ReleaseCurrentEvent()
861 // see header file for class documentation
862 // sources.fDataRead = 0;
864 fCurrentEventID = ~(homer_uint64)0;
865 fCurrentEventType = ~(homer_uint64)0;
866 for ( unsigned n = 0; n < fDataSourceCnt; n++ )
868 if ( fDataSources[n].fData )
870 if ( fDataSources[n].fType == kTCP )
871 delete [] (homer_uint8*)fDataSources[n].fData;
872 // do not reset the data pointer for kBuf sources since this
873 // can not be set again.
874 if ( fDataSources[n].fType != kBuf )
875 fDataSources[n].fData = NULL;
877 fDataSources[n].fDataSize = fDataSources[n].fDataRead = 0;
881 for ( unsigned n = 0; n < fMaxBlockCnt; n++ )
883 if ( fBlocks[n].fOriginatingNodeID )
884 delete [] fBlocks[n].fOriginatingNodeID;
893 int AliHLTHOMERReader::TriggerTCPSource( DataSource& source, bool useTimeout, unsigned long timeoutUsec )
895 // see header file for class documentation
897 struct timeval oldSndTO, newSndTO;
900 socklen_t optlen=sizeof(oldSndTO);
901 ret = getsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, &optlen );
906 if ( optlen!=sizeof(oldSndTO) )
910 newSndTO.tv_sec = timeoutUsec / 1000000;
911 newSndTO.tv_usec = timeoutUsec - (newSndTO.tv_sec*1000000);
912 ret = setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &newSndTO, sizeof(newSndTO) );
918 // Send one event request
919 if ( !fEventRequestAdvanceTime )
921 ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
923 if ( ret != (int)strlen(GET_ONE) )
926 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
934 int len = snprintf( tmpCmd, 128, "FIRST ORBIT EVENT 0x%Lu\n", (unsigned long long)fEventRequestAdvanceTime );
935 if ( len>128 || len<0 )
938 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
942 ret = write( source.fTCPConnection, tmpCmd, strlen(tmpCmd) );
944 if ( ret != (int)strlen(tmpCmd) )
947 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
955 int AliHLTHOMERReader::TriggerShmSource( DataSource& source, bool, unsigned long ) const
957 // see header file for class documentation
958 // clear the size indicator in the first 4 bytes of the buffer to request data
959 // from the HOMER writer.
960 if ( source.fShmPtr )
962 *(homer_uint32*)( source.fShmPtr ) = 0;
969 int AliHLTHOMERReader::ReadDataFromTCPSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
971 // see header file for class documentation
979 unsigned firstConnection=~(unsigned)0;
980 for ( unsigned long n = 0; n < sourceCnt; n++ )
982 if ( sources[n].fDataSize == 0 // size specifier not yet read
983 || sources[n].fDataRead < sources[n].fDataSize ) // Data not yet read fully
986 FD_SET( sources[n].fTCPConnection, &conns );
987 if ( sources[n].fTCPConnection > highestConn )
988 highestConn = sources[n].fTCPConnection;
989 fcntl( sources[n].fTCPConnection, F_SETFL, O_NONBLOCK );
990 if ( firstConnection == ~(unsigned)0 )
995 fcntl( sources[n].fTCPConnection, F_SETFL, 0 );
1000 struct timeval tv, *ptv;
1003 tv.tv_sec = timeout / 1000000;
1004 tv.tv_usec = timeout - (tv.tv_sec*1000000);
1009 // wait until something is ready to be read
1010 // either for timeout usecs or until eternity
1012 ret = select( highestConn+1, &conns, NULL, NULL, ptv );
1015 fErrorConnection = firstConnection;
1017 fConnectionStatus = errno;
1019 fConnectionStatus = ETIMEDOUT;
1020 return fConnectionStatus;
1022 for ( unsigned n = 0; n < sourceCnt; n++ )
1024 if ( FD_ISSET( sources[n].fTCPConnection, &conns ) )
1026 if ( sources[n].fDataSize == 0 )
1028 ret=read( sources[n].fTCPConnection, &(sources[n].fDataSize), sizeof(homer_uint32) );
1029 if ( ret != sizeof(homer_uint32) )
1031 fErrorConnection = n;
1033 fConnectionStatus = errno;
1035 fConnectionStatus = ENOMSG;
1036 return fConnectionStatus;
1038 sources[n].fDataSize = ntohl( sources[n].fDataSize );
1039 sources[n].fDataRead = 0;
1040 sources[n].fData = new homer_uint8[ sources[n].fDataSize ];
1041 if ( !sources[n].fData )
1043 fErrorConnection = n;
1044 fConnectionStatus = ENOMEM;
1045 return fConnectionStatus;
1048 else if ( sources[n].fData && sources[n].fDataRead < sources[n].fDataSize)
1050 ret=read( sources[n].fTCPConnection, ((homer_uint8*)sources[n].fData)+sources[n].fDataRead, sources[n].fDataSize-sources[n].fDataRead );
1052 sources[n].fDataRead += ret;
1053 else if ( ret == 0 )
1055 fErrorConnection = n;
1056 fConnectionStatus = ECONNRESET;
1057 return fConnectionStatus;
1061 fErrorConnection = n;
1062 fConnectionStatus = errno;
1063 return fConnectionStatus;
1068 fErrorConnection = n;
1069 fConnectionStatus = ENXIO;
1070 return fConnectionStatus;
1081 int AliHLTHOMERReader::ReadDataFromTCPSources( DataSource& source, bool useTimeout, unsigned long timeout )
1083 #warning TODO If useTimeout: Set sockets to nonblocking, select + loop around GET_ONE write
1084 // Send one event request
1085 ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
1086 if ( ret != strlen(GET_ONE) )
1090 // wait for and read back size specifier
1092 // The value transmitted is binary, in network byte order
1093 ret = read( source.fTCPConnection, &sizeNBO, sizeof(sizeNBO) );
1094 if ( ret != sizeof(sizeNBO) )
1098 // Convert back to host byte order
1099 source.fDataSize = ntohl( sizeNBO );
1100 source.fData = new homer_uint8[ source.fDataSize ];
1101 unsigned long dataRead=0, toRead;
1102 if ( !source.fData )
1105 // Read in data into buffer in order not to block connection
1106 while ( dataRead < source.fDataSize )
1108 if ( source.fDataSize-dataRead > 1024 )
1111 toRead = source.fDataSize-dataRead;
1112 ret = read( source.fTCPConnection, buffer, toRead );
1120 while ( dataRead < source.fDataSize )
1122 toRead = source.fDataSize-dataRead;
1123 ret = read( source.fTCPConnection, source.fData+dataRead, toRead );
1126 else if ( ret == 0 && useTimeout )
1129 tv.tv_sec = timeout / 1000000;
1130 tv.tv_usec = timeout - (tv.tv_sec*1000000);
1133 FD_SET( source.fTCPConnection, &conns );
1134 ret = select( source.fTCPConnection+1, &conns, NULL, NULL );
1138 else if ( ret == 0 )
1155 int AliHLTHOMERReader::ReadDataFromShmSource( DataSource& source, bool useTimeout, unsigned long timeout )
1161 int AliHLTHOMERReader::ReadDataFromShmSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
1163 // see header file for class documentation
1164 struct timeval tv1, tv2;
1168 gettimeofday( &tv1, NULL );
1173 for ( unsigned n = 0; n < sourceCnt; n++ )
1175 if ( !sources[n].fDataSize )
1177 if ( sources[n].fShmPtr && *(homer_uint32*)sources[n].fShmPtr>0 && !sources[n].fDataSize )
1180 sources[n].fDataSize = *(homer_uint32*)sources[n].fShmPtr;
1181 if (sources[n].fType==kBuf)
1183 // the data buffer is already set to fData, just need to set fDataSize member
1184 // to invalidate after the first reading. Subsequent calls to ReadNextEvent return 0
1185 TriggerShmSource( sources[n], 0, 0 );
1188 sources[n].fData = ((homer_uint8*)sources[n].fShmPtr)+sizeof(homer_uint32);
1192 if ( found && useTimeout )
1193 gettimeofday( &tv1, NULL );
1194 if ( !all && useTimeout )
1196 gettimeofday( &tv2, NULL );
1197 unsigned long long tdiff;
1198 tdiff = tv2.tv_sec-tv1.tv_sec;
1200 tdiff += tv2.tv_usec-tv1.tv_usec;
1201 if ( tdiff > timeout )
1211 int AliHLTHOMERReader::ParseSourceData( DataSource& source )
1213 // see header file for class documentation
1216 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1217 if (sourceByteOrder!=kHOMERLittleEndianByteOrder && sourceByteOrder!=kHOMERBigEndianByteOrder) return EBADMSG;
1218 homer_uint64 blockCnt = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType2_64b_Offset ] );
1219 // block count is not related to size of the data in the way the
1220 // following condition implies. But we can at least limit the block
1221 // count for the case the data is corrupted
1222 if (blockCnt>source.fDataSize) return EBADMSG;
1223 int ret=ReAllocBlocks( fMaxBlockCnt+blockCnt );
1226 homer_uint64 descrOffset = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kOffset_64b_Offset ] );
1227 for ( homer_uint64 n = 0; n < blockCnt && fBlockCnt < fMaxBlockCnt; n++, fBlockCnt++ )
1229 if (descrOffset+kLength_64b_Offset>=source.fDataSize) return EBADMSG;
1230 homer_uint8* descr = ((homer_uint8*)source.fData)+descrOffset;
1231 unsigned descrLen = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kLength_64b_Offset ] );
1232 if (descrOffset+descrLen>=source.fDataSize) return EBADMSG;
1233 if (Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kID_64b_Offset ] ) != HOMER_BLOCK_DESCRIPTOR_TYPEID) return 126/*ENOKEY*/;
1234 fBlocks[fBlockCnt].fSource = source.fNdx;
1235 fBlocks[fBlockCnt].fData = ((homer_uint8*)source.fData) + Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kOffset_64b_Offset ] );
1236 fBlocks[fBlockCnt].fLength = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kSize_64b_Offset ] );
1237 fBlocks[fBlockCnt].fMetaData = (homer_uint64*)descr;
1238 struct in_addr tmpA;
1239 tmpA.s_addr = (homer_uint32)( ((homer_uint64*)descr)[ kProducerNode_64b_Offset ] );
1240 char* addr = inet_ntoa( tmpA );
1241 char* tmpchar = new char[ strlen(addr)+1 ];
1244 strcpy( tmpchar, addr );
1245 fBlocks[fBlockCnt].fOriginatingNodeID = tmpchar;
1246 descrOffset += descrLen;
1253 int AliHLTHOMERReader::ReAllocBlocks( unsigned long newCnt )
1255 // see header file for class documentation
1256 DataBlock* newBlocks;
1257 newBlocks = new DataBlock[ newCnt ];
1260 unsigned long cpCnt = (newCnt > fMaxBlockCnt) ? fMaxBlockCnt : newCnt;
1261 memcpy( newBlocks, fBlocks, cpCnt*sizeof(DataBlock) );
1262 if ( newCnt > fMaxBlockCnt )
1263 memset( newBlocks+fMaxBlockCnt, 0, (newCnt-fMaxBlockCnt)*sizeof(DataBlock) );
1266 fBlocks = newBlocks;
1267 fMaxBlockCnt = newCnt;
1271 homer_uint64 AliHLTHOMERReader::GetSourceEventID( DataSource& source )
1273 // see header file for class documentation
1274 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1275 return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType1_64b_Offset ] );
1278 homer_uint64 AliHLTHOMERReader::GetSourceEventType( DataSource& source )
1280 // see header file for class documentation
1281 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1282 return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kType_64b_Offset ] );
1285 homer_uint64 AliHLTHOMERReader::Swap( homer_uint8 destFormat, homer_uint8 sourceFormat, homer_uint64 source ) const
1287 // see header file for class documentation
1288 if ( destFormat == sourceFormat )
1291 return ((source & 0xFFULL) << 56) |
1292 ((source & 0xFF00ULL) << 40) |
1293 ((source & 0xFF0000ULL) << 24) |
1294 ((source & 0xFF000000ULL) << 8) |
1295 ((source & 0xFF00000000ULL) >> 8) |
1296 ((source & 0xFF0000000000ULL) >> 24) |
1297 ((source & 0xFF000000000000ULL) >> 40) |
1298 ((source & 0xFF00000000000000ULL) >> 56);
1301 homer_uint32 AliHLTHOMERReader::Swap( homer_uint8 destFormat, homer_uint8 sourceFormat, homer_uint32 source ) const
1303 // see header file for class documentation
1304 if ( destFormat == sourceFormat )
1307 return ((source & 0xFFUL) << 24) |
1308 ((source & 0xFF00UL) << 8) |
1309 ((source & 0xFF0000UL) >> 8) |
1310 ((source & 0xFF000000UL) >> 24);
1313 AliHLTHOMERReader* AliHLTHOMERReaderCreateFromTCPPort(const char* hostname, unsigned short port )
1315 // see header file for function documentation
1316 return new AliHLTHOMERReader(hostname, port);
1319 AliHLTHOMERReader* AliHLTHOMERReaderCreateFromTCPPorts(unsigned int tcpCnt, const char** hostnames, unsigned short* ports)
1321 // see header file for function documentation
1322 return new AliHLTHOMERReader(tcpCnt, hostnames, ports);
1325 AliHLTHOMERReader* AliHLTHOMERReaderCreateFromBuffer(const void* pBuffer, int size)
1327 // see header file for function documentation
1328 return new AliHLTHOMERReader(pBuffer, size);
1331 void AliHLTHOMERReaderDelete(AliHLTHOMERReader* pInstance)
1333 // see header file for function documentation
1334 if (pInstance) delete pInstance;
1338 ***************************************************************************
1340 ** $Author$ - Initial Version by Timm Morten Steinbeck
1344 ***************************************************************************