Primary ionisation electrons come now correctly ordered from FLUKA.
[u/mrichter/AliRoot.git] / HLT / BASE / HOMER / AliHLTHOMERReader.cxx
CommitLineData
2be16a33 1/************************************************************************
2**
3**
4** This file is property of and copyright by the Technical Computer
5** Science Group, Kirchhoff Institute for Physics, Ruprecht-Karls-
6** University, Heidelberg, Germany, 2001
7** This file has been written by Timm Morten Steinbeck,
8** timm@kip.uni-heidelberg.de
9**
10**
11** See the file license.txt for details regarding usage, modification,
12** distribution and warranty.
13** Important: This file is provided without any warranty, including
14** fitness for any particular purpose.
15**
16**
17** Newer versions of this file's package will be made available from
18** http://web.kip.uni-heidelberg.de/Hardwinf/L3/
19** or the corresponding page of the Heidelberg Alice Level 3 group.
20**
21*************************************************************************/
22
23/*
24***************************************************************************
25**
26** $Author$ - Initial Version by Timm Morten Steinbeck
27**
28** $Id$
29**
30***************************************************************************
31*/
32
9e91e956 33/** @file AliHLTHOMERReader.cxx
2be16a33 34 @author Timm Steinbeck
35 @date Sep 14 2007
36 @brief HLT Online Monitoring Environment including ROOT - Reader
37 @note migrated from PubSub HLT-stable-20070905.141318 (rev 2375) */
38
04a939f7 39// see header file for class documentation
2be16a33 40// or
41// refer to README to build package
42// or
43// visit http://web.ift.uib.no/~kjeks/doc/alice-hlt
44
45#include "AliHLTHOMERReader.h"
46#include <stdio.h>
47#include <string.h>
48#include <errno.h>
49#include <netdb.h>
50extern int h_errno;
51#include <sys/types.h>
52#include <sys/socket.h>
53#include <netinet/in.h>
54#include <netinet/tcp.h>
55#include <unistd.h>
56#include <rpc/types.h>
57#include <fcntl.h>
58#include <sys/stat.h>
59#include <netinet/in.h>
60#include <arpa/inet.h>
61#ifdef USE_ROOT
62#include <Rtypes.h>
63#endif
64
65
66#define MOD_BIN "MOD BIN\n"
67#define MOD_ASC "MOD ASC\n"
68#define GET_ONE "GET ONE\n"
69#define GET_ALL "GET ALL\n"
70
71#ifdef USE_ROOT
72ClassImp(MonitoringReader);
73ClassImp(HOMERReader);
74#endif
75
76
77
78
79
80#ifdef USE_ROOT
81HOMERReader::HOMERReader()
746d2a94 82 :
83 fCurrentEventType(~(homer_uint64)0),
84 fCurrentEventID(~(homer_uint64)0),
85 fBlockCnt(0),
86 fMaxBlockCnt(0),
87 fBlocks(NULL),
88 fDataSourceCnt(0),
89 fTCPDataSourceCnt(0),
90 fShmDataSourceCnt(0),
91 fDataSourceMaxCnt(0),
04a939f7 92 fDataSources(NULL),
93 fConnectionStatus(0),
94 fErrorConnection(~(unsigned int)0),
95 fEventRequestAdvanceTime(0)
2be16a33 96 {
04a939f7 97// see header file for class documentation
98// or
99// refer to README to build package
100// or
101// visit http://web.ift.uib.no/~kjeks/doc/alice-hlt
2be16a33 102 Init();
103 }
104#endif
105
106
2be16a33 107HOMERReader::HOMERReader( const char* hostname, unsigned short port )
746d2a94 108 :
109 MonitoringReader(),
110 fCurrentEventType(~(homer_uint64)0),
111 fCurrentEventID(~(homer_uint64)0),
112 fBlockCnt(0),
113 fMaxBlockCnt(0),
114 fBlocks(NULL),
115 fDataSourceCnt(0),
116 fTCPDataSourceCnt(0),
117 fShmDataSourceCnt(0),
118 fDataSourceMaxCnt(0),
04a939f7 119 fDataSources(NULL),
120 fConnectionStatus(0),
121 fErrorConnection(~(unsigned int)0),
122 fEventRequestAdvanceTime(0)
2be16a33 123 {
04a939f7 124// see header file for class documentation
125// For reading from a TCP port
2be16a33 126 Init();
127 if ( !AllocDataSources(1) )
128 {
129 fErrorConnection = 0;
130 fConnectionStatus = ENOMEM;
131 return;
132 }
133 fConnectionStatus = AddDataSource( hostname, port, fDataSources[0] );
134 if ( fConnectionStatus )
135 fErrorConnection = 0;
136 else
137 {
138 fDataSourceCnt++;
139 fTCPDataSourceCnt++;
140 fDataSources[0].fNdx = 0;
141 }
142 }
143
2be16a33 144HOMERReader::HOMERReader( unsigned int tcpCnt, const char** hostnames, unsigned short* ports )
746d2a94 145 :
146 MonitoringReader(),
147 fCurrentEventType(~(homer_uint64)0),
148 fCurrentEventID(~(homer_uint64)0),
149 fBlockCnt(0),
150 fMaxBlockCnt(0),
151 fBlocks(NULL),
152 fDataSourceCnt(0),
153 fTCPDataSourceCnt(0),
154 fShmDataSourceCnt(0),
155 fDataSourceMaxCnt(0),
04a939f7 156 fDataSources(NULL),
157 fConnectionStatus(0),
158 fErrorConnection(~(unsigned int)0),
159 fEventRequestAdvanceTime(0)
2be16a33 160 {
04a939f7 161// see header file for class documentation
162// For reading from multiple TCP ports
2be16a33 163 Init();
164 if ( !AllocDataSources(tcpCnt) )
165 {
166 fErrorConnection = 0;
167 fConnectionStatus = ENOMEM;
168 return;
169 }
170 for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
171 {
172 fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
173 if ( fConnectionStatus )
174 {
175 fErrorConnection = n;
176 return;
177 }
178 fDataSources[n].fNdx = n;
179 }
180 }
181
2be16a33 182HOMERReader::HOMERReader( key_t shmKey, int shmSize )
746d2a94 183 :
184 MonitoringReader(),
185 fCurrentEventType(~(homer_uint64)0),
186 fCurrentEventID(~(homer_uint64)0),
187 fBlockCnt(0),
188 fMaxBlockCnt(0),
189 fBlocks(NULL),
190 fDataSourceCnt(0),
191 fTCPDataSourceCnt(0),
192 fShmDataSourceCnt(0),
193 fDataSourceMaxCnt(0),
04a939f7 194 fDataSources(NULL),
195 fConnectionStatus(0),
196 fErrorConnection(~(unsigned int)0),
197 fEventRequestAdvanceTime(0)
2be16a33 198 {
04a939f7 199// see header file for class documentation
200// For reading from a System V shared memory segment
2be16a33 201 Init();
202 if ( !AllocDataSources(1) )
203 {
204 fErrorConnection = 0;
205 fConnectionStatus = ENOMEM;
206 return;
207 }
208 fConnectionStatus = AddDataSource( shmKey, shmSize, fDataSources[0] );
209 if ( fConnectionStatus )
210 fErrorConnection = 0;
211 else
212 {
213 fDataSourceCnt++;
214 fShmDataSourceCnt++;
215 fDataSources[0].fNdx = 0;
216 }
217 }
218
2be16a33 219HOMERReader::HOMERReader( unsigned int shmCnt, key_t* shmKeys, int* shmSizes )
746d2a94 220 :
221 MonitoringReader(),
222 fCurrentEventType(~(homer_uint64)0),
223 fCurrentEventID(~(homer_uint64)0),
224 fBlockCnt(0),
225 fMaxBlockCnt(0),
226 fBlocks(NULL),
227 fDataSourceCnt(0),
228 fTCPDataSourceCnt(0),
229 fShmDataSourceCnt(0),
230 fDataSourceMaxCnt(0),
04a939f7 231 fDataSources(NULL),
232 fConnectionStatus(0),
233 fErrorConnection(~(unsigned int)0),
234 fEventRequestAdvanceTime(0)
2be16a33 235 {
04a939f7 236// see header file for class documentation
237// For reading from multiple System V shared memory segments
2be16a33 238 Init();
239 if ( !AllocDataSources(shmCnt) )
240 {
241 fErrorConnection = 0;
242 fConnectionStatus = ENOMEM;
243 return;
244 }
245 for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
246 {
247 fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[n] );
248 if ( fConnectionStatus )
249 {
250 fErrorConnection = n;
251 return;
252 }
253 fDataSources[n].fNdx = n;
254 }
255 }
256
2be16a33 257HOMERReader::HOMERReader( unsigned int tcpCnt, const char** hostnames, unsigned short* ports,
258 unsigned int shmCnt, key_t* shmKeys, int* shmSizes )
746d2a94 259 :
260 MonitoringReader(),
261 fCurrentEventType(~(homer_uint64)0),
262 fCurrentEventID(~(homer_uint64)0),
263 fBlockCnt(0),
264 fMaxBlockCnt(0),
265 fBlocks(NULL),
266 fDataSourceCnt(0),
267 fTCPDataSourceCnt(0),
268 fShmDataSourceCnt(0),
269 fDataSourceMaxCnt(0),
04a939f7 270 fDataSources(NULL),
271 fConnectionStatus(0),
272 fErrorConnection(~(unsigned int)0),
273 fEventRequestAdvanceTime(0)
2be16a33 274 {
04a939f7 275// see header file for class documentation
276// For reading from multiple TCP ports and multiple System V shared memory segments
2be16a33 277 Init();
278 if ( !AllocDataSources(tcpCnt+shmCnt) )
279 {
280 fErrorConnection = 0;
281 fConnectionStatus = ENOMEM;
282 return;
283 }
284 for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
285 {
286 fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
287 if ( fConnectionStatus )
288 {
289 fErrorConnection = n;
290 return;
291 }
292 fDataSources[n].fNdx = n;
293 }
294 for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
295 {
296 fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[tcpCnt+n] );
297 if ( fConnectionStatus )
298 {
299 fErrorConnection = tcpCnt+n;
300 return;
301 }
302 fDataSources[n].fNdx = n;
303 }
304 }
305HOMERReader::~HOMERReader()
306 {
04a939f7 307// see header file for class documentation
2be16a33 308 ReleaseCurrentEvent();
309 FreeDataSources();
310 }
311
2be16a33 312int HOMERReader::ReadNextEvent()
313 {
04a939f7 314// see header file for class documentation
315// Read in the next available event
2be16a33 316 return ReadNextEvent( false, 0 );
317 }
318
2be16a33 319int HOMERReader::ReadNextEvent( unsigned long timeout )
320 {
04a939f7 321// see header file for class documentation
322// Read in the next available event
2be16a33 323 return ReadNextEvent( true, timeout );
324 }
325
2be16a33 326unsigned long HOMERReader::GetBlockDataLength( unsigned long ndx ) const
327 {
04a939f7 328// see header file for class documentation
329// Return the size (in bytes) of the current event's data
330// block with the given block index (starting at 0).
2be16a33 331 if ( ndx >= fBlockCnt )
332 return 0;
333 return fBlocks[ndx].fLength;
334 }
335
2be16a33 336const void* HOMERReader::GetBlockData( unsigned long ndx ) const
337 {
04a939f7 338// see header file for class documentation
339// Return a pointer to the start of the current event's data
340// block with the given block index (starting at 0).
2be16a33 341 if ( ndx >= fBlockCnt )
342 return NULL;
343 return fBlocks[ndx].fData;
344 }
345
2be16a33 346const char* HOMERReader::GetBlockSendNodeID( unsigned long ndx ) const
347 {
04a939f7 348// see header file for class documentation
349// Return IP address or hostname of node which sent the
350// current event's data block with the given block index
351// (starting at 0).
352// For HOMER this is the ID of the node on which the subscriber
353// that provided this data runs/ran.
2be16a33 354 if ( ndx >= fBlockCnt )
355 return NULL;
356#ifdef DEBUG
357 if ( fBlocks[ndx].fSource >= fDataSourceCnt )
358 {
359 fprintf( stderr, "%s:%d: Internal Error: fBlocks[ndx].fSource (%lu) >= fDataSourceCnt (%lu)\n",
360 __FILE__, __LINE__, fBlocks[ndx].fSource, fDataSourceCnt );
361 return NULL;
362 }
363#endif
364 return fDataSources[ fBlocks[ndx].fSource ].fHostname;
365 //return fBlocks[ndx].fOriginatingNodeID;
366 }
367
2be16a33 368homer_uint8 HOMERReader::GetBlockByteOrder( unsigned long ndx ) const
369 {
04a939f7 370// see header file for class documentation
371// Return byte order of the data stored in the
372// current event's data block with the given block index (starting at 0).
373// 0 is unknown alignment,
374// 1 ist little endian,
375// 2 is big endian. */
2be16a33 376 if ( ndx >= fBlockCnt )
377 return 0;
378 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
379 return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kByteOrderAttribute_8b_Offset);
380 }
04a939f7 381
2be16a33 382homer_uint8 HOMERReader::GetBlockTypeAlignment( unsigned long ndx, homer_uint8 dataType ) const
383 {
04a939f7 384// see header file for class documentation
385// Return the alignment (in bytes) of the given datatype
386// in the data stored in the current event's data block
387// with the given block index (starting at 0).
388// Possible values for the data type are
389// 0: homer_uint64
390// 1: homer_uint32
391// 2: uin16
392// 3: homer_uint8
393// 4: double
394// 5: float
2be16a33 395 if ( ndx >= fBlockCnt )
396 return 0;
397 if ( dataType > (kFloatAlignment_8b_Offset-kAlignment_8b_StartOffset) )
398 return 0;
399 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
400 return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kAlignment_8b_StartOffset+dataType);
401 }
402
403homer_uint64 HOMERReader::GetBlockStatusFlags( unsigned long ndx ) const
404 {
04a939f7 405// see header file for class documentation
2be16a33 406 if ( ndx >= fBlockCnt )
407 return 0;
408 return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kStatusFlags_64b_Offset);
409 }
410
411/* HOMER specific */
412/* Return the type of the data in the current event's data
413 block with the given block index (starting at 0). */
414homer_uint64 HOMERReader::GetBlockDataType( unsigned long ndx ) const
415 {
04a939f7 416// see header file for class documentation
2be16a33 417 if ( ndx >= fBlockCnt )
418 return ~(homer_uint64)0;
419 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
420 return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kType_64b_Offset);
421 }
422
423/* Return the origin of the data in the current event's data
424 block with the given block index (starting at 0). */
425homer_uint32 HOMERReader::GetBlockDataOrigin( unsigned long ndx ) const
426 {
04a939f7 427// see header file for class documentation
2be16a33 428 if ( ndx >= fBlockCnt )
429 return ~(homer_uint32)0;
430 //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType1.fID );
431 return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType1_64b_Offset);
432 }
433
434/* Return a specification of the data in the current event's data
435 block with the given block index (starting at 0). */
436homer_uint32 HOMERReader::GetBlockDataSpec( unsigned long ndx ) const
437 {
04a939f7 438// see header file for class documentation
2be16a33 439 if ( ndx >= fBlockCnt )
440 return ~(homer_uint32)0;
441 //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType2.fID );
442 return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType2_64b_Offset);
443 }
444
445/* Find the next data block in the current event with the given
446 data type, origin, and specification. Returns the block's
447 index. */
448unsigned long HOMERReader::FindBlockNdx( homer_uint64 type, homer_uint32 origin,
449 homer_uint32 spec, unsigned long startNdx ) const
450 {
04a939f7 451// see header file for class documentation
2be16a33 452 for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
453 {
454 if ( ( type == 0xFFFFFFFFFFFFFFFFULL || *(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset)==type ) &&
455 ( origin == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset) )==origin ) &&
456 ( spec == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) )==spec ) )
457 return n;
458 }
459 return ~(unsigned long)0;
460 }
461
462/* Find the next data block in the current event with the given
463 data type, origin, and specification. Returns the block's
464 index. */
465unsigned long HOMERReader::FindBlockNdx( char type[8], char origin[4],
466 homer_uint32 spec, unsigned long startNdx ) const
467 {
04a939f7 468// see header file for class documentation
2be16a33 469 for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
470 {
471 bool found1=true, found2=true;
472 for ( unsigned i = 0; i < 8; i++ )
473 {
474 if ( type[i] != (char)0xFF )
475 {
476 found1=false;
477 break;
478 }
479 }
480 if ( !found1 )
481 {
482 found1 = true;
483 for ( unsigned i = 0; i < 8; i++ )
484 {
485 //printf( "%u: Comparing type '%c' and '%c'\n", i, ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i], type[i] );
486 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i] != type[i] )
487 {
488 found1=false;
489 break;
490 }
491 }
492 }
493 for ( unsigned i = 0; i < 4; i++ )
494 {
495 if ( origin[i] != (char)0xFF )
496 {
497 found2 = false;
498 break;
499 }
500 }
501 if ( !found2 )
502 {
503 found2 = true;
504 for ( unsigned i = 0; i < 4; i++ )
505 {
506 //printf( "Comparing origin '%c' and '%c'\n", ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i], origin[i] );
507 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i] != origin[i] )
508 {
509 found2=false;
510 break;
511 }
512 }
513 }
514 //printf( "Comparing spec '0x%08lX' and '0x%08lX'\n", (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) ), spec );
515 if ( found1 && found2 &&
516 ( spec == 0xFFFFFFFF || ((homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset)) )==spec ) )
517 return n;
518 }
519 return ~(unsigned long)0;
520 }
521
522/* Return the ID of the node that actually produced this data block.
523 This may be different from the node which sent the data to this
524 monitoring object as returned by GetBlockSendNodeID. */
525const char* HOMERReader::GetBlockCreateNodeID( unsigned long ndx ) const
526 {
04a939f7 527// see header file for class documentation
2be16a33 528 if ( ndx >= fBlockCnt )
529 return NULL;
530 return fBlocks[ndx].fOriginatingNodeID;
531 }
532
533
534void HOMERReader::Init()
535 {
04a939f7 536// see header file for class documentation
2be16a33 537 fCurrentEventType = ~(homer_uint64)0;
538 fCurrentEventID = ~(homer_uint64)0;
539 fMaxBlockCnt = fBlockCnt = 0;
540 fBlocks = NULL;
541
542 fDataSourceMaxCnt = fDataSourceCnt = fTCPDataSourceCnt = fShmDataSourceCnt = 0;
543 fDataSources = NULL;
544
545
546 fConnectionStatus = 0;
547 fErrorConnection = ~(unsigned int)0;
548
04a939f7 549 fEventRequestAdvanceTime = 0;
2be16a33 550 }
551
552bool HOMERReader::AllocDataSources( unsigned int sourceCnt )
553 {
04a939f7 554// see header file for class documentation
2be16a33 555 fDataSources = new DataSource[ sourceCnt ];
556 if ( !fDataSources )
557 return false;
558 fDataSourceCnt = 0;
559 fDataSourceMaxCnt = sourceCnt;
560 return true;
561 }
562
563int HOMERReader::AddDataSource( const char* hostname, unsigned short port, DataSource& source )
564 {
04a939f7 565// see header file for class documentation
2be16a33 566 struct hostent* he;
567 he = gethostbyname( hostname );
568 if ( he == NULL )
569 {
570 //fprintf( stderr, "Unable to determine remote host address from '%s'.\n", hostname );
571 return EADDRNOTAVAIL;
572 }
573
574 struct sockaddr_in remoteAddr;
575 remoteAddr.sin_family = AF_INET; // host byte order
576 remoteAddr.sin_port = htons(port); // short, network byte order
577 remoteAddr.sin_addr = *((struct in_addr *)he->h_addr);
578 memset(&(remoteAddr.sin_zero), '\0', 8); // zero the rest of the struct
579
580 // Create socket and connect to target program on remote node
581 source.fTCPConnection = socket( AF_INET, SOCK_STREAM, 0 );
582 if ( source.fTCPConnection == -1 )
583 {
584 return errno;
585 }
586
587 int ret;
588
589 ret = connect( source.fTCPConnection, (struct sockaddr *)&remoteAddr, sizeof(struct sockaddr) );
590 if ( ret == -1 )
591 {
592 ret=errno;
593 close( source.fTCPConnection );
594 return ret;
595 }
596
597 ret = write( source.fTCPConnection, MOD_BIN, strlen(MOD_BIN) );
598 if ( ret != (int)strlen(MOD_BIN) )
599 {
600 ret=errno;
601 close( source.fTCPConnection );
602 return ret;
603 }
604
605 char* tmpchar = new char[ strlen( hostname )+1 ];
606 if ( !tmpchar )
607 {
608 close( source.fTCPConnection );
609 return ENOMEM;
610 }
611 strcpy( tmpchar, hostname );
612 source.fHostname = tmpchar;
613
614 source.fType = kTCP;
615 source.fTCPPort = port;
616 source.fData = NULL;
617 source.fDataSize = 0;
618 source.fDataRead = 0;
619 return 0;
620 }
621
622int HOMERReader::AddDataSource( key_t shmKey, int shmSize, DataSource& source )
623 {
04a939f7 624// see header file for class documentation
2be16a33 625 int ret;
626 char* tmpchar = new char[ MAXHOSTNAMELEN+1 ];
627 if ( !tmpchar )
628 {
629 return ENOMEM;
630 }
631 gethostname( tmpchar, MAXHOSTNAMELEN );
632 tmpchar[MAXHOSTNAMELEN]=(char)0;
633 source.fHostname = tmpchar;
634
635 source.fShmID = shmget( shmKey, shmSize, 0660 );
636 if ( source.fShmID == -1 )
637 {
638 ret = errno;
639 delete [] source.fHostname;
640 return ret;
641 }
642
643 source.fShmPtr = (void*)shmat( source.fShmID, NULL, 0 );
644
645 if ( !source.fShmPtr )
646 {
647 ret = errno;
648 shmctl( source.fShmID, IPC_RMID, NULL );
649 delete [] source.fHostname;
650 return ret;
651 }
652
653 source.fType = kShm;
654 source.fShmKey = shmKey;
655 source.fShmSize = shmSize;
656 source.fDataSize = 0;
657 source.fDataRead = 0;
658 return 0;
659 }
660
661void HOMERReader::FreeDataSources()
662 {
04a939f7 663// see header file for class documentation
2be16a33 664 for ( unsigned n=0; n < fDataSourceCnt; n++ )
665 {
666 if ( fDataSources[n].fType == kTCP )
667 FreeTCPDataSource( fDataSources[n] );
668 else
669 FreeShmDataSource( fDataSources[n] );
670 }
671 }
672
673int HOMERReader::FreeShmDataSource( DataSource& source )
674 {
04a939f7 675// see header file for class documentation
2be16a33 676 if ( source.fShmPtr )
677 shmdt( source.fShmPtr );
678// if ( source.fShmID != -1 )
679// shmctl( source.fShmID, IPC_RMID, NULL );
680 if ( source.fHostname )
681 delete [] source.fHostname;
682 return 0;
683 }
684
685int HOMERReader::FreeTCPDataSource( DataSource& source )
686 {
04a939f7 687// see header file for class documentation
2be16a33 688 if ( source.fTCPConnection )
689 close( source.fTCPConnection );
690 if ( source.fHostname )
691 delete [] source.fHostname;
692 return 0;
693 }
694
695int HOMERReader::ReadNextEvent( bool useTimeout, unsigned long timeout )
696 {
04a939f7 697// see header file for class documentation
2be16a33 698 if ( fDataSourceCnt<=0 )
699 return ENXIO;
700 // Clean up currently active event.
701 ReleaseCurrentEvent();
702 int ret;
703 // Trigger all configured data sources
704 for ( unsigned n = 0; n<fDataSourceCnt; n++ )
705 {
706 if ( fDataSources[n].fType == kTCP )
707 ret = TriggerTCPSource( fDataSources[n], useTimeout, timeout );
708 else
709 ret = TriggerShmSource( fDataSources[n], useTimeout, timeout );
710 if ( ret )
711 {
712 fErrorConnection = n;
713 fConnectionStatus=ret;
714 return fConnectionStatus;
715 }
716 }
717 // Now read in data from the configured data source
718 ret = ReadDataFromTCPSources( fTCPDataSourceCnt, fDataSources, useTimeout, timeout );
719 if ( ret )
720 {
721 return ret;
722 }
723 ret = ReadDataFromShmSources( fShmDataSourceCnt, fDataSources+fTCPDataSourceCnt, useTimeout, timeout );
724 if ( ret )
725 {
726 return ret;
727 }
728// for ( unsigned n = 0; n<fDataSourceCnt; n++ )
729// {
730// if ( fDataSources[n].fType == kTCP )
731// ret = ReadDataFromTCPSource( fDataSources[n], useTimeout, timeout );
732// else
733// ret = ReadDataFromShmSource( fDataSources[n], useTimeout, timeout );
734// if ( ret )
735// {
736// fErrorConnection = n;
737// fConnectionStatus=ret;
738// return fConnectionStatus;
739// }
740// }
741 //Check to see that all sources contributed data for the same event
742 homer_uint64 eventID;
743 homer_uint64 eventType;
744 eventID = GetSourceEventID( fDataSources[0] );
745 eventType = GetSourceEventType( fDataSources[0] );
746 for ( unsigned n = 1; n < fDataSourceCnt; n++ )
747 {
748 if ( GetSourceEventID( fDataSources[n] ) != eventID || GetSourceEventType( fDataSources[n] ) != eventType )
749 {
750 fErrorConnection = n;
746d2a94 751 fConnectionStatus=56;//EBADRQC;
2be16a33 752 return fConnectionStatus;
753 }
754 }
755 // Find all the different data blocks contained in the data from all
756 // the sources.
757 for ( unsigned n = 0; n < fDataSourceCnt; n++ )
758 {
759 ret = ParseSourceData( fDataSources[n] );
760 if ( ret )
761 {
762 fErrorConnection = n;
746d2a94 763 fConnectionStatus=57;//EBADSLT;
2be16a33 764 return fConnectionStatus;
765 }
766 }
767 fCurrentEventID = eventID;
768 fCurrentEventType = eventType;
769 return 0;
770 }
771
772void HOMERReader::ReleaseCurrentEvent()
773 {
04a939f7 774// see header file for class documentation
2be16a33 775 // sources.fDataRead = 0;
776 // fMaxBlockCnt
777 fCurrentEventID = ~(homer_uint64)0;
778 fCurrentEventType = ~(homer_uint64)0;
779 for ( unsigned n = 0; n < fDataSourceCnt; n++ )
780 {
781 if ( fDataSources[n].fData )
782 {
783 if ( fDataSources[n].fType == kTCP )
784 delete [] (homer_uint8*)fDataSources[n].fData;
785 fDataSources[n].fData = NULL;
786 }
787 fDataSources[n].fDataSize = fDataSources[n].fDataRead = 0;
788 }
789 if ( fBlocks )
790 {
791 for ( unsigned n = 0; n < fMaxBlockCnt; n++ )
792 {
793 if ( fBlocks[n].fOriginatingNodeID )
794 delete [] fBlocks[n].fOriginatingNodeID;
795 }
796 delete [] fBlocks;
797 fBlocks=0;
798 fMaxBlockCnt = 0;
799 fBlockCnt=0;
800 }
801 }
802
803int HOMERReader::TriggerTCPSource( DataSource& source, bool useTimeout, unsigned long timeout_us )
804 {
04a939f7 805// see header file for class documentation
2be16a33 806 int ret;
807 struct timeval oldSndTO, newSndTO;
808 if ( useTimeout )
809 {
810 socklen_t optlen=sizeof(oldSndTO);
811 ret = getsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, &optlen );
812 if ( ret )
813 {
814 return errno;
815 }
816 if ( optlen!=sizeof(oldSndTO) )
817 {
818 return ENXIO;
819 }
820 newSndTO.tv_sec = timeout_us / 1000000;
821 newSndTO.tv_usec = timeout_us - (newSndTO.tv_sec*1000000);
822 ret = setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &newSndTO, sizeof(newSndTO) );
823 if ( ret )
824 {
825 return errno;
826 }
827 }
828 // Send one event request
04a939f7 829 if ( !fEventRequestAdvanceTime )
2be16a33 830 {
831 ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
832
833 if ( ret != (int)strlen(GET_ONE) )
834 {
835 ret=errno;
836 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
837 return ret;
838 }
839 }
840 else
841 {
842 char tmpCmd[ 128 ];
843
04a939f7 844 int len = snprintf( tmpCmd, 128, "FIRST ORBIT EVENT 0x%Lu\n", (unsigned long long)fEventRequestAdvanceTime );
2be16a33 845 if ( len>128 || len<0 )
846 {
847 ret=EMSGSIZE;
848 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
849 return ret;
850 }
851
852 ret = write( source.fTCPConnection, tmpCmd, strlen(tmpCmd) );
853
854 if ( ret != (int)strlen(tmpCmd) )
855 {
856 ret=errno;
857 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
858 return ret;
859 }
860
861 }
862 return 0;
863 }
864
865int HOMERReader::TriggerShmSource( DataSource& source, bool, unsigned long )
866 {
04a939f7 867// see header file for class documentation
2be16a33 868 if ( source.fShmPtr )
869 {
870 *(homer_uint32*)( source.fShmPtr ) = 0;
871 return 0;
872 }
873 else
874 return EFAULT;
875 }
876
877int HOMERReader::ReadDataFromTCPSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
878 {
879 bool toRead = false;
880 do
881 {
882 fd_set conns;
883 FD_ZERO( &conns );
884 int highestConn=0;
885 toRead = false;
886 unsigned firstConnection=~(unsigned)0;
887 for ( unsigned long n = 0; n < sourceCnt; n++ )
888 {
889 if ( sources[n].fDataSize == 0 // size specifier not yet read
890 || sources[n].fDataRead < sources[n].fDataSize ) // Data not yet read fully
891 {
892 toRead = true;
893 FD_SET( sources[n].fTCPConnection, &conns );
894 if ( sources[n].fTCPConnection > highestConn )
895 highestConn = sources[n].fTCPConnection;
896 fcntl( sources[n].fTCPConnection, F_SETFL, O_NONBLOCK );
897 if ( firstConnection == ~(unsigned)0 )
898 firstConnection = n;
899 }
900 else
901 {
902 fcntl( sources[n].fTCPConnection, F_SETFL, 0 );
903 }
904 }
905 if ( toRead )
906 {
907 struct timeval tv, *ptv;
908 if ( useTimeout )
909 {
910 tv.tv_sec = timeout / 1000000;
911 tv.tv_usec = timeout - (tv.tv_sec*1000000);
912 ptv = &tv;
913 }
914 else
915 ptv = NULL;
916 // wait until something is ready to be read
917 // either for timeout usecs or until eternity
918 int ret;
919 ret = select( highestConn+1, &conns, NULL, NULL, ptv );
920 if ( ret <=0 )
921 {
922 fErrorConnection = firstConnection;
923 if ( errno )
924 fConnectionStatus = errno;
925 else
926 fConnectionStatus = ETIMEDOUT;
927 return fConnectionStatus;
928 }
929 for ( unsigned n = 0; n < sourceCnt; n++ )
930 {
931 if ( FD_ISSET( sources[n].fTCPConnection, &conns ) )
932 {
933 if ( sources[n].fDataSize == 0 )
934 {
935 ret=read( sources[n].fTCPConnection, &(sources[n].fDataSize), sizeof(homer_uint32) );
936 if ( ret != sizeof(homer_uint32) )
937 {
938 fErrorConnection = n;
939 if ( errno )
940 fConnectionStatus = errno;
941 else
942 fConnectionStatus = ENOMSG;
943 return fConnectionStatus;
944 }
945 sources[n].fDataSize = ntohl( sources[n].fDataSize );
946 sources[n].fDataRead = 0;
947 sources[n].fData = new homer_uint8[ sources[n].fDataSize ];
948 if ( !sources[n].fData )
949 {
950 fErrorConnection = n;
951 fConnectionStatus = ENOMEM;
952 return fConnectionStatus;
953 }
954 }
955 else if ( sources[n].fData && sources[n].fDataRead < sources[n].fDataSize)
956 {
957 ret=read( sources[n].fTCPConnection, ((homer_uint8*)sources[n].fData)+sources[n].fDataRead, sources[n].fDataSize-sources[n].fDataRead );
958 if ( ret>0 )
959 sources[n].fDataRead += ret;
960 else if ( ret == 0 )
961 {
962 fErrorConnection = n;
963 fConnectionStatus = ECONNRESET;
964 return fConnectionStatus;
965 }
966 else
967 {
968 fErrorConnection = n;
969 fConnectionStatus = errno;
970 return fConnectionStatus;
971 }
972 }
973 else
974 {
975 fErrorConnection = n;
976 fConnectionStatus = ENXIO;
977 return fConnectionStatus;
978 }
979 }
980 }
981 }
982 }
983 while ( toRead );
984 return 0;
985 }
986
987/*
988int HOMERReader::ReadDataFromTCPSources( DataSource& source, bool useTimeout, unsigned long timeout )
989 {
990#warning TODO If useTimeout: Set sockets to nonblocking, select + loop around GET_ONE write
991 // Send one event request
992 ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
993 if ( ret != strlen(GET_ONE) )
994 {
995 return errno;
996 }
997 // wait for and read back size specifier
998 unsigned sizeNBO;
999 // The value transmitted is binary, in network byte order
1000 ret = read( source.fTCPConnection, &sizeNBO, sizeof(sizeNBO) );
1001 if ( ret != sizeof(sizeNBO) )
1002 {
1003 return errno;
1004 }
1005 // Convert back to host byte order
1006 source.fDataSize = ntohl( sizeNBO );
1007 source.fData = new homer_uint8[ source.fDataSize ];
1008 unsigned long dataRead=0, toRead;
1009 if ( !source.fData )
1010 {
1011 char buffer[1024];
1012 // Read in data into buffer in order not to block connection
1013 while ( dataRead < source.fDataSize )
1014 {
1015 if ( source.fDataSize-dataRead > 1024 )
1016 toRead = 1024;
1017 else
1018 toRead = source.fDataSize-dataRead;
1019 ret = read( source.fTCPConnection, buffer, toRead );
1020 if ( ret > 0 )
1021 dataRead += ret;
1022 else
1023 return errno;
1024 }
1025 return ENOMEM;
1026 }
1027 while ( dataRead < source.fDataSize )
1028 {
1029 toRead = source.fDataSize-dataRead;
1030 ret = read( source.fTCPConnection, source.fData+dataRead, toRead );
1031 if ( ret > 0 )
1032 dataRead += ret;
1033 else if ( ret == 0 && useTimeout )
1034 {
1035 struct timeval tv;
1036 tv.tv_sec = timeout / 1000000;
1037 tv.tv_usec = timeout - (tv.tv_sec*1000000);
1038 fd_set conns;
1039 FD_ZERO( &conns );
1040 FD_SET( source.fTCPConnection, &conns );
1041 ret = select( source.fTCPConnection+1, &conns, NULL, NULL );
1042 if ( ret <=0 )
1043 return errno;
1044 }
1045 else if ( ret == 0 )
1046 {
1047 if ( errno == EOK )
1048 return ECONNRESET;
1049 else
1050 return errno;
1051 }
1052 else
1053 {
1054 return errno;
1055 }
1056 }
1057 return 0;
1058 }
1059*/
1060
1061/*
1062int HOMERReader::ReadDataFromShmSource( DataSource& source, bool useTimeout, unsigned long timeout )
1063 {
1064
1065 }
1066*/
1067
1068int HOMERReader::ReadDataFromShmSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
1069 {
04a939f7 1070// see header file for class documentation
2be16a33 1071 struct timeval tv1, tv2;
1072 bool found=false;
1073 bool all=true;
1074 if ( useTimeout )
1075 gettimeofday( &tv1, NULL );
1076 do
1077 {
1078 found = false;
1079 all = true;
1080 for ( unsigned n = 0; n < sourceCnt; n++ )
1081 {
1082 if ( !sources[n].fDataSize )
1083 all = false;
1084 if ( sources[n].fShmPtr && *(homer_uint32*)sources[n].fShmPtr>0 && !sources[n].fDataSize )
1085 {
1086 found = true;
1087 sources[n].fDataSize = *(homer_uint32*)sources[n].fShmPtr;
1088 sources[n].fData = ((homer_uint8*)sources[n].fShmPtr)+sizeof(homer_uint32);
1089 }
1090 }
1091 if ( found && useTimeout )
1092 gettimeofday( &tv1, NULL );
1093 if ( !all && useTimeout )
1094 {
1095 gettimeofday( &tv2, NULL );
1096 unsigned long long tdiff;
1097 tdiff = tv2.tv_sec-tv1.tv_sec;
1098 tdiff *= 1000000;
1099 tdiff += tv2.tv_usec-tv1.tv_usec;
1100 if ( tdiff > timeout )
1101 return ETIMEDOUT;
1102 }
1103 if ( !all )
1104 usleep( 0 );
1105 }
1106 while ( !all );
1107 return 0;
1108 }
1109
1110int HOMERReader::ParseSourceData( DataSource& source )
1111 {
04a939f7 1112// see header file for class documentation
2be16a33 1113 if ( source.fData )
1114 {
1115 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1116 homer_uint64 blockCnt = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType2_64b_Offset ] );
1117 int ret=ReAllocBlocks( fMaxBlockCnt+blockCnt );
1118 if ( ret )
1119 return ret;
1120 homer_uint64 descrOffset = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kOffset_64b_Offset ] );
1121 for ( homer_uint64 n = 0; n < blockCnt && fBlockCnt < fMaxBlockCnt; n++, fBlockCnt++ )
1122 {
1123 homer_uint8* descr = ((homer_uint8*)source.fData)+descrOffset;
1124 unsigned descrLen = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kLength_64b_Offset ] );
1125 fBlocks[fBlockCnt].fSource = source.fNdx;
1126 fBlocks[fBlockCnt].fData = ((homer_uint8*)source.fData) + Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kOffset_64b_Offset ] );
1127 fBlocks[fBlockCnt].fLength = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kSize_64b_Offset ] );
1128 fBlocks[fBlockCnt].fMetaData = (homer_uint64*)descr;
1129 struct in_addr tmpA;
1130 tmpA.s_addr = (homer_uint32)( ((homer_uint64*)descr)[ kProducerNode_64b_Offset ] );
1131 char* addr = inet_ntoa( tmpA );
1132 char* tmpchar = new char[ strlen(addr)+1 ];
1133 if ( !tmpchar )
1134 return ENOMEM;
1135 strcpy( tmpchar, addr );
1136 fBlocks[fBlockCnt].fOriginatingNodeID = tmpchar;
1137 descrOffset += descrLen;
1138 }
1139 return 0;
1140 }
1141 return EFAULT;
1142 }
1143
1144int HOMERReader::ReAllocBlocks( unsigned long newCnt )
1145 {
04a939f7 1146// see header file for class documentation
2be16a33 1147 DataBlock* newBlocks;
1148 newBlocks = new DataBlock[ newCnt ];
1149 if ( !newBlocks )
1150 return ENOMEM;
1151 unsigned long cpCnt = (newCnt > fMaxBlockCnt) ? fMaxBlockCnt : newCnt;
1152 memcpy( newBlocks, fBlocks, cpCnt*sizeof(DataBlock) );
1153 if ( newCnt > fMaxBlockCnt )
1154 memset( newBlocks+fMaxBlockCnt, 0, (newCnt-fMaxBlockCnt)*sizeof(DataBlock) );
1155 if ( fBlocks )
1156 delete [] fBlocks;
1157 fBlocks = newBlocks;
1158 fMaxBlockCnt = newCnt;
1159 return 0;
1160 }
1161
1162homer_uint64 HOMERReader::GetSourceEventID( DataSource& source )
1163 {
04a939f7 1164// see header file for class documentation
2be16a33 1165 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1166 return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType1_64b_Offset ] );
1167 }
1168
1169homer_uint64 HOMERReader::GetSourceEventType( DataSource& source )
1170 {
04a939f7 1171// see header file for class documentation
2be16a33 1172 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1173 return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kType_64b_Offset ] );
1174 }
1175
1176
1177
1178/*
1179***************************************************************************
1180**
1181** $Author$ - Initial Version by Timm Morten Steinbeck
1182**
1183** $Id$
1184**
1185***************************************************************************
1186*/