scanning of options migrated from AliHLTReconstructor to AliHLTSystem; chain configur...
[u/mrichter/AliRoot.git] / HLT / BASE / HOMER / AliHLTHOMERReader.cxx
CommitLineData
2be16a33 1/************************************************************************
2**
3**
4** This file is property of and copyright by the Technical Computer
5** Science Group, Kirchhoff Institute for Physics, Ruprecht-Karls-
6** University, Heidelberg, Germany, 2001
7** This file has been written by Timm Morten Steinbeck,
8** timm@kip.uni-heidelberg.de
9**
10**
11** See the file license.txt for details regarding usage, modification,
12** distribution and warranty.
13** Important: This file is provided without any warranty, including
14** fitness for any particular purpose.
15**
16**
17** Newer versions of this file's package will be made available from
18** http://web.kip.uni-heidelberg.de/Hardwinf/L3/
19** or the corresponding page of the Heidelberg Alice Level 3 group.
20**
21*************************************************************************/
22
23/*
24***************************************************************************
25**
26** $Author$ - Initial Version by Timm Morten Steinbeck
27**
28** $Id$
29**
30***************************************************************************
31*/
32
9e91e956 33/** @file AliHLTHOMERReader.cxx
2be16a33 34 @author Timm Steinbeck
35 @date Sep 14 2007
36 @brief HLT Online Monitoring Environment including ROOT - Reader
37 @note migrated from PubSub HLT-stable-20070905.141318 (rev 2375) */
38
39// see below for class documentation
40// or
41// refer to README to build package
42// or
43// visit http://web.ift.uib.no/~kjeks/doc/alice-hlt
44
45#include "AliHLTHOMERReader.h"
46#include <stdio.h>
47#include <string.h>
48#include <errno.h>
49#include <netdb.h>
50extern int h_errno;
51#include <sys/types.h>
52#include <sys/socket.h>
53#include <netinet/in.h>
54#include <netinet/tcp.h>
55#include <unistd.h>
56#include <rpc/types.h>
57#include <fcntl.h>
58#include <sys/stat.h>
59#include <netinet/in.h>
60#include <arpa/inet.h>
61#ifdef USE_ROOT
62#include <Rtypes.h>
63#endif
64
65
66#define MOD_BIN "MOD BIN\n"
67#define MOD_ASC "MOD ASC\n"
68#define GET_ONE "GET ONE\n"
69#define GET_ALL "GET ALL\n"
70
71#ifdef USE_ROOT
72ClassImp(MonitoringReader);
73ClassImp(HOMERReader);
74#endif
75
76
77
78
79
80#ifdef USE_ROOT
81HOMERReader::HOMERReader()
746d2a94 82 :
83 fCurrentEventType(~(homer_uint64)0),
84 fCurrentEventID(~(homer_uint64)0),
85 fBlockCnt(0),
86 fMaxBlockCnt(0),
87 fBlocks(NULL),
88 fDataSourceCnt(0),
89 fTCPDataSourceCnt(0),
90 fShmDataSourceCnt(0),
91 fDataSourceMaxCnt(0),
92 fDataSources(NULL)
2be16a33 93 {
94 Init();
95 }
96#endif
97
98
99/* For reading from a TCP port */
100HOMERReader::HOMERReader( const char* hostname, unsigned short port )
746d2a94 101 :
102 MonitoringReader(),
103 fCurrentEventType(~(homer_uint64)0),
104 fCurrentEventID(~(homer_uint64)0),
105 fBlockCnt(0),
106 fMaxBlockCnt(0),
107 fBlocks(NULL),
108 fDataSourceCnt(0),
109 fTCPDataSourceCnt(0),
110 fShmDataSourceCnt(0),
111 fDataSourceMaxCnt(0),
112 fDataSources(NULL)
2be16a33 113 {
114 Init();
115 if ( !AllocDataSources(1) )
116 {
117 fErrorConnection = 0;
118 fConnectionStatus = ENOMEM;
119 return;
120 }
121 fConnectionStatus = AddDataSource( hostname, port, fDataSources[0] );
122 if ( fConnectionStatus )
123 fErrorConnection = 0;
124 else
125 {
126 fDataSourceCnt++;
127 fTCPDataSourceCnt++;
128 fDataSources[0].fNdx = 0;
129 }
130 }
131
132/* For reading from multiple TCP ports */
133HOMERReader::HOMERReader( unsigned int tcpCnt, const char** hostnames, unsigned short* ports )
746d2a94 134 :
135 MonitoringReader(),
136 fCurrentEventType(~(homer_uint64)0),
137 fCurrentEventID(~(homer_uint64)0),
138 fBlockCnt(0),
139 fMaxBlockCnt(0),
140 fBlocks(NULL),
141 fDataSourceCnt(0),
142 fTCPDataSourceCnt(0),
143 fShmDataSourceCnt(0),
144 fDataSourceMaxCnt(0),
145 fDataSources(NULL)
2be16a33 146 {
147 Init();
148 if ( !AllocDataSources(tcpCnt) )
149 {
150 fErrorConnection = 0;
151 fConnectionStatus = ENOMEM;
152 return;
153 }
154 for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
155 {
156 fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
157 if ( fConnectionStatus )
158 {
159 fErrorConnection = n;
160 return;
161 }
162 fDataSources[n].fNdx = n;
163 }
164 }
165
166/* For reading from a System V shared memory segment */
167HOMERReader::HOMERReader( key_t shmKey, int shmSize )
746d2a94 168 :
169 MonitoringReader(),
170 fCurrentEventType(~(homer_uint64)0),
171 fCurrentEventID(~(homer_uint64)0),
172 fBlockCnt(0),
173 fMaxBlockCnt(0),
174 fBlocks(NULL),
175 fDataSourceCnt(0),
176 fTCPDataSourceCnt(0),
177 fShmDataSourceCnt(0),
178 fDataSourceMaxCnt(0),
179 fDataSources(NULL)
2be16a33 180 {
181 Init();
182 if ( !AllocDataSources(1) )
183 {
184 fErrorConnection = 0;
185 fConnectionStatus = ENOMEM;
186 return;
187 }
188 fConnectionStatus = AddDataSource( shmKey, shmSize, fDataSources[0] );
189 if ( fConnectionStatus )
190 fErrorConnection = 0;
191 else
192 {
193 fDataSourceCnt++;
194 fShmDataSourceCnt++;
195 fDataSources[0].fNdx = 0;
196 }
197 }
198
199/* For reading from multiple System V shared memory segments */
200HOMERReader::HOMERReader( unsigned int shmCnt, key_t* shmKeys, int* shmSizes )
746d2a94 201 :
202 MonitoringReader(),
203 fCurrentEventType(~(homer_uint64)0),
204 fCurrentEventID(~(homer_uint64)0),
205 fBlockCnt(0),
206 fMaxBlockCnt(0),
207 fBlocks(NULL),
208 fDataSourceCnt(0),
209 fTCPDataSourceCnt(0),
210 fShmDataSourceCnt(0),
211 fDataSourceMaxCnt(0),
212 fDataSources(NULL)
2be16a33 213 {
214 Init();
215 if ( !AllocDataSources(shmCnt) )
216 {
217 fErrorConnection = 0;
218 fConnectionStatus = ENOMEM;
219 return;
220 }
221 for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
222 {
223 fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[n] );
224 if ( fConnectionStatus )
225 {
226 fErrorConnection = n;
227 return;
228 }
229 fDataSources[n].fNdx = n;
230 }
231 }
232
233/* For reading from multiple TCP ports and multiple System V shared memory segments */
234HOMERReader::HOMERReader( unsigned int tcpCnt, const char** hostnames, unsigned short* ports,
235 unsigned int shmCnt, key_t* shmKeys, int* shmSizes )
746d2a94 236 :
237 MonitoringReader(),
238 fCurrentEventType(~(homer_uint64)0),
239 fCurrentEventID(~(homer_uint64)0),
240 fBlockCnt(0),
241 fMaxBlockCnt(0),
242 fBlocks(NULL),
243 fDataSourceCnt(0),
244 fTCPDataSourceCnt(0),
245 fShmDataSourceCnt(0),
246 fDataSourceMaxCnt(0),
247 fDataSources(NULL)
2be16a33 248 {
249 Init();
250 if ( !AllocDataSources(tcpCnt+shmCnt) )
251 {
252 fErrorConnection = 0;
253 fConnectionStatus = ENOMEM;
254 return;
255 }
256 for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
257 {
258 fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
259 if ( fConnectionStatus )
260 {
261 fErrorConnection = n;
262 return;
263 }
264 fDataSources[n].fNdx = n;
265 }
266 for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
267 {
268 fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[tcpCnt+n] );
269 if ( fConnectionStatus )
270 {
271 fErrorConnection = tcpCnt+n;
272 return;
273 }
274 fDataSources[n].fNdx = n;
275 }
276 }
277HOMERReader::~HOMERReader()
278 {
279 ReleaseCurrentEvent();
280 FreeDataSources();
281 }
282
283/* Read in the next available event */
284int HOMERReader::ReadNextEvent()
285 {
286 return ReadNextEvent( false, 0 );
287 }
288
289/* Read in the next available event */
290int HOMERReader::ReadNextEvent( unsigned long timeout )
291 {
292 return ReadNextEvent( true, timeout );
293 }
294
295/* Return the size (in bytes) of the current event's data
296 block with the given block index (starting at 0). */
297unsigned long HOMERReader::GetBlockDataLength( unsigned long ndx ) const
298 {
299 if ( ndx >= fBlockCnt )
300 return 0;
301 return fBlocks[ndx].fLength;
302 }
303
304 /* Return a pointer to the start of the current event's data
305 block with the given block index (starting at 0). */
306const void* HOMERReader::GetBlockData( unsigned long ndx ) const
307 {
308 if ( ndx >= fBlockCnt )
309 return NULL;
310 return fBlocks[ndx].fData;
311 }
312
313/* Return IP address or hostname of node which sent the
314 current event's data block with the given block index
315 (starting at 0).
316 For HOMER this is the ID of the node on which the subscriber
317 that provided this data runs/ran. */
318const char* HOMERReader::GetBlockSendNodeID( unsigned long ndx ) const
319 {
320 if ( ndx >= fBlockCnt )
321 return NULL;
322#ifdef DEBUG
323 if ( fBlocks[ndx].fSource >= fDataSourceCnt )
324 {
325 fprintf( stderr, "%s:%d: Internal Error: fBlocks[ndx].fSource (%lu) >= fDataSourceCnt (%lu)\n",
326 __FILE__, __LINE__, fBlocks[ndx].fSource, fDataSourceCnt );
327 return NULL;
328 }
329#endif
330 return fDataSources[ fBlocks[ndx].fSource ].fHostname;
331 //return fBlocks[ndx].fOriginatingNodeID;
332 }
333
334 /* Return byte order of the data stored in the
335 current event's data block with the given block
336 index (starting at 0).
337 0 is unknown alignment,
338 1 ist little endian,
339 2 is big endian. */
340homer_uint8 HOMERReader::GetBlockByteOrder( unsigned long ndx ) const
341 {
342 if ( ndx >= fBlockCnt )
343 return 0;
344 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
345 return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kByteOrderAttribute_8b_Offset);
346 }
347 /* Return the alignment (in bytes) of the given datatype
348 in the data stored in the current event's data block
349 with the given block index (starting at 0).
350 Possible values for the data type are
351 0: homer_uint64
352 1: homer_uint32
353 2: uin16
354 3: homer_uint8
355 4: double
356 5: float
357 */
358homer_uint8 HOMERReader::GetBlockTypeAlignment( unsigned long ndx, homer_uint8 dataType ) const
359 {
360 if ( ndx >= fBlockCnt )
361 return 0;
362 if ( dataType > (kFloatAlignment_8b_Offset-kAlignment_8b_StartOffset) )
363 return 0;
364 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
365 return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kAlignment_8b_StartOffset+dataType);
366 }
367
368homer_uint64 HOMERReader::GetBlockStatusFlags( unsigned long ndx ) const
369 {
370 if ( ndx >= fBlockCnt )
371 return 0;
372 return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kStatusFlags_64b_Offset);
373 }
374
375/* HOMER specific */
376/* Return the type of the data in the current event's data
377 block with the given block index (starting at 0). */
378homer_uint64 HOMERReader::GetBlockDataType( unsigned long ndx ) const
379 {
380 if ( ndx >= fBlockCnt )
381 return ~(homer_uint64)0;
382 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
383 return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kType_64b_Offset);
384 }
385
386/* Return the origin of the data in the current event's data
387 block with the given block index (starting at 0). */
388homer_uint32 HOMERReader::GetBlockDataOrigin( unsigned long ndx ) const
389 {
390 if ( ndx >= fBlockCnt )
391 return ~(homer_uint32)0;
392 //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType1.fID );
393 return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType1_64b_Offset);
394 }
395
396/* Return a specification of the data in the current event's data
397 block with the given block index (starting at 0). */
398homer_uint32 HOMERReader::GetBlockDataSpec( unsigned long ndx ) const
399 {
400 if ( ndx >= fBlockCnt )
401 return ~(homer_uint32)0;
402 //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType2.fID );
403 return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType2_64b_Offset);
404 }
405
406/* Find the next data block in the current event with the given
407 data type, origin, and specification. Returns the block's
408 index. */
409unsigned long HOMERReader::FindBlockNdx( homer_uint64 type, homer_uint32 origin,
410 homer_uint32 spec, unsigned long startNdx ) const
411 {
412 for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
413 {
414 if ( ( type == 0xFFFFFFFFFFFFFFFFULL || *(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset)==type ) &&
415 ( origin == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset) )==origin ) &&
416 ( spec == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) )==spec ) )
417 return n;
418 }
419 return ~(unsigned long)0;
420 }
421
422/* Find the next data block in the current event with the given
423 data type, origin, and specification. Returns the block's
424 index. */
425unsigned long HOMERReader::FindBlockNdx( char type[8], char origin[4],
426 homer_uint32 spec, unsigned long startNdx ) const
427 {
428 for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
429 {
430 bool found1=true, found2=true;
431 for ( unsigned i = 0; i < 8; i++ )
432 {
433 if ( type[i] != (char)0xFF )
434 {
435 found1=false;
436 break;
437 }
438 }
439 if ( !found1 )
440 {
441 found1 = true;
442 for ( unsigned i = 0; i < 8; i++ )
443 {
444 //printf( "%u: Comparing type '%c' and '%c'\n", i, ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i], type[i] );
445 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i] != type[i] )
446 {
447 found1=false;
448 break;
449 }
450 }
451 }
452 for ( unsigned i = 0; i < 4; i++ )
453 {
454 if ( origin[i] != (char)0xFF )
455 {
456 found2 = false;
457 break;
458 }
459 }
460 if ( !found2 )
461 {
462 found2 = true;
463 for ( unsigned i = 0; i < 4; i++ )
464 {
465 //printf( "Comparing origin '%c' and '%c'\n", ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i], origin[i] );
466 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i] != origin[i] )
467 {
468 found2=false;
469 break;
470 }
471 }
472 }
473 //printf( "Comparing spec '0x%08lX' and '0x%08lX'\n", (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) ), spec );
474 if ( found1 && found2 &&
475 ( spec == 0xFFFFFFFF || ((homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset)) )==spec ) )
476 return n;
477 }
478 return ~(unsigned long)0;
479 }
480
481/* Return the ID of the node that actually produced this data block.
482 This may be different from the node which sent the data to this
483 monitoring object as returned by GetBlockSendNodeID. */
484const char* HOMERReader::GetBlockCreateNodeID( unsigned long ndx ) const
485 {
486 if ( ndx >= fBlockCnt )
487 return NULL;
488 return fBlocks[ndx].fOriginatingNodeID;
489 }
490
491
492void HOMERReader::Init()
493 {
494 fCurrentEventType = ~(homer_uint64)0;
495 fCurrentEventID = ~(homer_uint64)0;
496 fMaxBlockCnt = fBlockCnt = 0;
497 fBlocks = NULL;
498
499 fDataSourceMaxCnt = fDataSourceCnt = fTCPDataSourceCnt = fShmDataSourceCnt = 0;
500 fDataSources = NULL;
501
502
503 fConnectionStatus = 0;
504 fErrorConnection = ~(unsigned int)0;
505
506 fEventRequestAdvanceTime_us = 0;
507 }
508
509bool HOMERReader::AllocDataSources( unsigned int sourceCnt )
510 {
511 fDataSources = new DataSource[ sourceCnt ];
512 if ( !fDataSources )
513 return false;
514 fDataSourceCnt = 0;
515 fDataSourceMaxCnt = sourceCnt;
516 return true;
517 }
518
519int HOMERReader::AddDataSource( const char* hostname, unsigned short port, DataSource& source )
520 {
521 struct hostent* he;
522 he = gethostbyname( hostname );
523 if ( he == NULL )
524 {
525 //fprintf( stderr, "Unable to determine remote host address from '%s'.\n", hostname );
526 return EADDRNOTAVAIL;
527 }
528
529 struct sockaddr_in remoteAddr;
530 remoteAddr.sin_family = AF_INET; // host byte order
531 remoteAddr.sin_port = htons(port); // short, network byte order
532 remoteAddr.sin_addr = *((struct in_addr *)he->h_addr);
533 memset(&(remoteAddr.sin_zero), '\0', 8); // zero the rest of the struct
534
535 // Create socket and connect to target program on remote node
536 source.fTCPConnection = socket( AF_INET, SOCK_STREAM, 0 );
537 if ( source.fTCPConnection == -1 )
538 {
539 return errno;
540 }
541
542 int ret;
543
544 ret = connect( source.fTCPConnection, (struct sockaddr *)&remoteAddr, sizeof(struct sockaddr) );
545 if ( ret == -1 )
546 {
547 ret=errno;
548 close( source.fTCPConnection );
549 return ret;
550 }
551
552 ret = write( source.fTCPConnection, MOD_BIN, strlen(MOD_BIN) );
553 if ( ret != (int)strlen(MOD_BIN) )
554 {
555 ret=errno;
556 close( source.fTCPConnection );
557 return ret;
558 }
559
560 char* tmpchar = new char[ strlen( hostname )+1 ];
561 if ( !tmpchar )
562 {
563 close( source.fTCPConnection );
564 return ENOMEM;
565 }
566 strcpy( tmpchar, hostname );
567 source.fHostname = tmpchar;
568
569 source.fType = kTCP;
570 source.fTCPPort = port;
571 source.fData = NULL;
572 source.fDataSize = 0;
573 source.fDataRead = 0;
574 return 0;
575 }
576
577int HOMERReader::AddDataSource( key_t shmKey, int shmSize, DataSource& source )
578 {
579 int ret;
580 char* tmpchar = new char[ MAXHOSTNAMELEN+1 ];
581 if ( !tmpchar )
582 {
583 return ENOMEM;
584 }
585 gethostname( tmpchar, MAXHOSTNAMELEN );
586 tmpchar[MAXHOSTNAMELEN]=(char)0;
587 source.fHostname = tmpchar;
588
589 source.fShmID = shmget( shmKey, shmSize, 0660 );
590 if ( source.fShmID == -1 )
591 {
592 ret = errno;
593 delete [] source.fHostname;
594 return ret;
595 }
596
597 source.fShmPtr = (void*)shmat( source.fShmID, NULL, 0 );
598
599 if ( !source.fShmPtr )
600 {
601 ret = errno;
602 shmctl( source.fShmID, IPC_RMID, NULL );
603 delete [] source.fHostname;
604 return ret;
605 }
606
607 source.fType = kShm;
608 source.fShmKey = shmKey;
609 source.fShmSize = shmSize;
610 source.fDataSize = 0;
611 source.fDataRead = 0;
612 return 0;
613 }
614
615void HOMERReader::FreeDataSources()
616 {
617 for ( unsigned n=0; n < fDataSourceCnt; n++ )
618 {
619 if ( fDataSources[n].fType == kTCP )
620 FreeTCPDataSource( fDataSources[n] );
621 else
622 FreeShmDataSource( fDataSources[n] );
623 }
624 }
625
626int HOMERReader::FreeShmDataSource( DataSource& source )
627 {
628 if ( source.fShmPtr )
629 shmdt( source.fShmPtr );
630// if ( source.fShmID != -1 )
631// shmctl( source.fShmID, IPC_RMID, NULL );
632 if ( source.fHostname )
633 delete [] source.fHostname;
634 return 0;
635 }
636
637int HOMERReader::FreeTCPDataSource( DataSource& source )
638 {
639 if ( source.fTCPConnection )
640 close( source.fTCPConnection );
641 if ( source.fHostname )
642 delete [] source.fHostname;
643 return 0;
644 }
645
646int HOMERReader::ReadNextEvent( bool useTimeout, unsigned long timeout )
647 {
648 if ( fDataSourceCnt<=0 )
649 return ENXIO;
650 // Clean up currently active event.
651 ReleaseCurrentEvent();
652 int ret;
653 // Trigger all configured data sources
654 for ( unsigned n = 0; n<fDataSourceCnt; n++ )
655 {
656 if ( fDataSources[n].fType == kTCP )
657 ret = TriggerTCPSource( fDataSources[n], useTimeout, timeout );
658 else
659 ret = TriggerShmSource( fDataSources[n], useTimeout, timeout );
660 if ( ret )
661 {
662 fErrorConnection = n;
663 fConnectionStatus=ret;
664 return fConnectionStatus;
665 }
666 }
667 // Now read in data from the configured data source
668 ret = ReadDataFromTCPSources( fTCPDataSourceCnt, fDataSources, useTimeout, timeout );
669 if ( ret )
670 {
671 return ret;
672 }
673 ret = ReadDataFromShmSources( fShmDataSourceCnt, fDataSources+fTCPDataSourceCnt, useTimeout, timeout );
674 if ( ret )
675 {
676 return ret;
677 }
678// for ( unsigned n = 0; n<fDataSourceCnt; n++ )
679// {
680// if ( fDataSources[n].fType == kTCP )
681// ret = ReadDataFromTCPSource( fDataSources[n], useTimeout, timeout );
682// else
683// ret = ReadDataFromShmSource( fDataSources[n], useTimeout, timeout );
684// if ( ret )
685// {
686// fErrorConnection = n;
687// fConnectionStatus=ret;
688// return fConnectionStatus;
689// }
690// }
691 //Check to see that all sources contributed data for the same event
692 homer_uint64 eventID;
693 homer_uint64 eventType;
694 eventID = GetSourceEventID( fDataSources[0] );
695 eventType = GetSourceEventType( fDataSources[0] );
696 for ( unsigned n = 1; n < fDataSourceCnt; n++ )
697 {
698 if ( GetSourceEventID( fDataSources[n] ) != eventID || GetSourceEventType( fDataSources[n] ) != eventType )
699 {
700 fErrorConnection = n;
746d2a94 701 fConnectionStatus=56;//EBADRQC;
2be16a33 702 return fConnectionStatus;
703 }
704 }
705 // Find all the different data blocks contained in the data from all
706 // the sources.
707 for ( unsigned n = 0; n < fDataSourceCnt; n++ )
708 {
709 ret = ParseSourceData( fDataSources[n] );
710 if ( ret )
711 {
712 fErrorConnection = n;
746d2a94 713 fConnectionStatus=57;//EBADSLT;
2be16a33 714 return fConnectionStatus;
715 }
716 }
717 fCurrentEventID = eventID;
718 fCurrentEventType = eventType;
719 return 0;
720 }
721
722void HOMERReader::ReleaseCurrentEvent()
723 {
724 // sources.fDataRead = 0;
725 // fMaxBlockCnt
726 fCurrentEventID = ~(homer_uint64)0;
727 fCurrentEventType = ~(homer_uint64)0;
728 for ( unsigned n = 0; n < fDataSourceCnt; n++ )
729 {
730 if ( fDataSources[n].fData )
731 {
732 if ( fDataSources[n].fType == kTCP )
733 delete [] (homer_uint8*)fDataSources[n].fData;
734 fDataSources[n].fData = NULL;
735 }
736 fDataSources[n].fDataSize = fDataSources[n].fDataRead = 0;
737 }
738 if ( fBlocks )
739 {
740 for ( unsigned n = 0; n < fMaxBlockCnt; n++ )
741 {
742 if ( fBlocks[n].fOriginatingNodeID )
743 delete [] fBlocks[n].fOriginatingNodeID;
744 }
745 delete [] fBlocks;
746 fBlocks=0;
747 fMaxBlockCnt = 0;
748 fBlockCnt=0;
749 }
750 }
751
752int HOMERReader::TriggerTCPSource( DataSource& source, bool useTimeout, unsigned long timeout_us )
753 {
754 int ret;
755 struct timeval oldSndTO, newSndTO;
756 if ( useTimeout )
757 {
758 socklen_t optlen=sizeof(oldSndTO);
759 ret = getsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, &optlen );
760 if ( ret )
761 {
762 return errno;
763 }
764 if ( optlen!=sizeof(oldSndTO) )
765 {
766 return ENXIO;
767 }
768 newSndTO.tv_sec = timeout_us / 1000000;
769 newSndTO.tv_usec = timeout_us - (newSndTO.tv_sec*1000000);
770 ret = setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &newSndTO, sizeof(newSndTO) );
771 if ( ret )
772 {
773 return errno;
774 }
775 }
776 // Send one event request
777 if ( !fEventRequestAdvanceTime_us )
778 {
779 ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
780
781 if ( ret != (int)strlen(GET_ONE) )
782 {
783 ret=errno;
784 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
785 return ret;
786 }
787 }
788 else
789 {
790 char tmpCmd[ 128 ];
791
792 int len = snprintf( tmpCmd, 128, "FIRST ORBIT EVENT 0x%Lu\n", (unsigned long long)fEventRequestAdvanceTime_us );
793 if ( len>128 || len<0 )
794 {
795 ret=EMSGSIZE;
796 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
797 return ret;
798 }
799
800 ret = write( source.fTCPConnection, tmpCmd, strlen(tmpCmd) );
801
802 if ( ret != (int)strlen(tmpCmd) )
803 {
804 ret=errno;
805 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
806 return ret;
807 }
808
809 }
810 return 0;
811 }
812
813int HOMERReader::TriggerShmSource( DataSource& source, bool, unsigned long )
814 {
815 if ( source.fShmPtr )
816 {
817 *(homer_uint32*)( source.fShmPtr ) = 0;
818 return 0;
819 }
820 else
821 return EFAULT;
822 }
823
824int HOMERReader::ReadDataFromTCPSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
825 {
826 bool toRead = false;
827 do
828 {
829 fd_set conns;
830 FD_ZERO( &conns );
831 int highestConn=0;
832 toRead = false;
833 unsigned firstConnection=~(unsigned)0;
834 for ( unsigned long n = 0; n < sourceCnt; n++ )
835 {
836 if ( sources[n].fDataSize == 0 // size specifier not yet read
837 || sources[n].fDataRead < sources[n].fDataSize ) // Data not yet read fully
838 {
839 toRead = true;
840 FD_SET( sources[n].fTCPConnection, &conns );
841 if ( sources[n].fTCPConnection > highestConn )
842 highestConn = sources[n].fTCPConnection;
843 fcntl( sources[n].fTCPConnection, F_SETFL, O_NONBLOCK );
844 if ( firstConnection == ~(unsigned)0 )
845 firstConnection = n;
846 }
847 else
848 {
849 fcntl( sources[n].fTCPConnection, F_SETFL, 0 );
850 }
851 }
852 if ( toRead )
853 {
854 struct timeval tv, *ptv;
855 if ( useTimeout )
856 {
857 tv.tv_sec = timeout / 1000000;
858 tv.tv_usec = timeout - (tv.tv_sec*1000000);
859 ptv = &tv;
860 }
861 else
862 ptv = NULL;
863 // wait until something is ready to be read
864 // either for timeout usecs or until eternity
865 int ret;
866 ret = select( highestConn+1, &conns, NULL, NULL, ptv );
867 if ( ret <=0 )
868 {
869 fErrorConnection = firstConnection;
870 if ( errno )
871 fConnectionStatus = errno;
872 else
873 fConnectionStatus = ETIMEDOUT;
874 return fConnectionStatus;
875 }
876 for ( unsigned n = 0; n < sourceCnt; n++ )
877 {
878 if ( FD_ISSET( sources[n].fTCPConnection, &conns ) )
879 {
880 if ( sources[n].fDataSize == 0 )
881 {
882 ret=read( sources[n].fTCPConnection, &(sources[n].fDataSize), sizeof(homer_uint32) );
883 if ( ret != sizeof(homer_uint32) )
884 {
885 fErrorConnection = n;
886 if ( errno )
887 fConnectionStatus = errno;
888 else
889 fConnectionStatus = ENOMSG;
890 return fConnectionStatus;
891 }
892 sources[n].fDataSize = ntohl( sources[n].fDataSize );
893 sources[n].fDataRead = 0;
894 sources[n].fData = new homer_uint8[ sources[n].fDataSize ];
895 if ( !sources[n].fData )
896 {
897 fErrorConnection = n;
898 fConnectionStatus = ENOMEM;
899 return fConnectionStatus;
900 }
901 }
902 else if ( sources[n].fData && sources[n].fDataRead < sources[n].fDataSize)
903 {
904 ret=read( sources[n].fTCPConnection, ((homer_uint8*)sources[n].fData)+sources[n].fDataRead, sources[n].fDataSize-sources[n].fDataRead );
905 if ( ret>0 )
906 sources[n].fDataRead += ret;
907 else if ( ret == 0 )
908 {
909 fErrorConnection = n;
910 fConnectionStatus = ECONNRESET;
911 return fConnectionStatus;
912 }
913 else
914 {
915 fErrorConnection = n;
916 fConnectionStatus = errno;
917 return fConnectionStatus;
918 }
919 }
920 else
921 {
922 fErrorConnection = n;
923 fConnectionStatus = ENXIO;
924 return fConnectionStatus;
925 }
926 }
927 }
928 }
929 }
930 while ( toRead );
931 return 0;
932 }
933
934/*
935int HOMERReader::ReadDataFromTCPSources( DataSource& source, bool useTimeout, unsigned long timeout )
936 {
937#warning TODO If useTimeout: Set sockets to nonblocking, select + loop around GET_ONE write
938 // Send one event request
939 ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
940 if ( ret != strlen(GET_ONE) )
941 {
942 return errno;
943 }
944 // wait for and read back size specifier
945 unsigned sizeNBO;
946 // The value transmitted is binary, in network byte order
947 ret = read( source.fTCPConnection, &sizeNBO, sizeof(sizeNBO) );
948 if ( ret != sizeof(sizeNBO) )
949 {
950 return errno;
951 }
952 // Convert back to host byte order
953 source.fDataSize = ntohl( sizeNBO );
954 source.fData = new homer_uint8[ source.fDataSize ];
955 unsigned long dataRead=0, toRead;
956 if ( !source.fData )
957 {
958 char buffer[1024];
959 // Read in data into buffer in order not to block connection
960 while ( dataRead < source.fDataSize )
961 {
962 if ( source.fDataSize-dataRead > 1024 )
963 toRead = 1024;
964 else
965 toRead = source.fDataSize-dataRead;
966 ret = read( source.fTCPConnection, buffer, toRead );
967 if ( ret > 0 )
968 dataRead += ret;
969 else
970 return errno;
971 }
972 return ENOMEM;
973 }
974 while ( dataRead < source.fDataSize )
975 {
976 toRead = source.fDataSize-dataRead;
977 ret = read( source.fTCPConnection, source.fData+dataRead, toRead );
978 if ( ret > 0 )
979 dataRead += ret;
980 else if ( ret == 0 && useTimeout )
981 {
982 struct timeval tv;
983 tv.tv_sec = timeout / 1000000;
984 tv.tv_usec = timeout - (tv.tv_sec*1000000);
985 fd_set conns;
986 FD_ZERO( &conns );
987 FD_SET( source.fTCPConnection, &conns );
988 ret = select( source.fTCPConnection+1, &conns, NULL, NULL );
989 if ( ret <=0 )
990 return errno;
991 }
992 else if ( ret == 0 )
993 {
994 if ( errno == EOK )
995 return ECONNRESET;
996 else
997 return errno;
998 }
999 else
1000 {
1001 return errno;
1002 }
1003 }
1004 return 0;
1005 }
1006*/
1007
1008/*
1009int HOMERReader::ReadDataFromShmSource( DataSource& source, bool useTimeout, unsigned long timeout )
1010 {
1011
1012 }
1013*/
1014
1015int HOMERReader::ReadDataFromShmSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
1016 {
1017 struct timeval tv1, tv2;
1018 bool found=false;
1019 bool all=true;
1020 if ( useTimeout )
1021 gettimeofday( &tv1, NULL );
1022 do
1023 {
1024 found = false;
1025 all = true;
1026 for ( unsigned n = 0; n < sourceCnt; n++ )
1027 {
1028 if ( !sources[n].fDataSize )
1029 all = false;
1030 if ( sources[n].fShmPtr && *(homer_uint32*)sources[n].fShmPtr>0 && !sources[n].fDataSize )
1031 {
1032 found = true;
1033 sources[n].fDataSize = *(homer_uint32*)sources[n].fShmPtr;
1034 sources[n].fData = ((homer_uint8*)sources[n].fShmPtr)+sizeof(homer_uint32);
1035 }
1036 }
1037 if ( found && useTimeout )
1038 gettimeofday( &tv1, NULL );
1039 if ( !all && useTimeout )
1040 {
1041 gettimeofday( &tv2, NULL );
1042 unsigned long long tdiff;
1043 tdiff = tv2.tv_sec-tv1.tv_sec;
1044 tdiff *= 1000000;
1045 tdiff += tv2.tv_usec-tv1.tv_usec;
1046 if ( tdiff > timeout )
1047 return ETIMEDOUT;
1048 }
1049 if ( !all )
1050 usleep( 0 );
1051 }
1052 while ( !all );
1053 return 0;
1054 }
1055
1056int HOMERReader::ParseSourceData( DataSource& source )
1057 {
1058 if ( source.fData )
1059 {
1060 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1061 homer_uint64 blockCnt = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType2_64b_Offset ] );
1062 int ret=ReAllocBlocks( fMaxBlockCnt+blockCnt );
1063 if ( ret )
1064 return ret;
1065 homer_uint64 descrOffset = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kOffset_64b_Offset ] );
1066 for ( homer_uint64 n = 0; n < blockCnt && fBlockCnt < fMaxBlockCnt; n++, fBlockCnt++ )
1067 {
1068 homer_uint8* descr = ((homer_uint8*)source.fData)+descrOffset;
1069 unsigned descrLen = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kLength_64b_Offset ] );
1070 fBlocks[fBlockCnt].fSource = source.fNdx;
1071 fBlocks[fBlockCnt].fData = ((homer_uint8*)source.fData) + Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kOffset_64b_Offset ] );
1072 fBlocks[fBlockCnt].fLength = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kSize_64b_Offset ] );
1073 fBlocks[fBlockCnt].fMetaData = (homer_uint64*)descr;
1074 struct in_addr tmpA;
1075 tmpA.s_addr = (homer_uint32)( ((homer_uint64*)descr)[ kProducerNode_64b_Offset ] );
1076 char* addr = inet_ntoa( tmpA );
1077 char* tmpchar = new char[ strlen(addr)+1 ];
1078 if ( !tmpchar )
1079 return ENOMEM;
1080 strcpy( tmpchar, addr );
1081 fBlocks[fBlockCnt].fOriginatingNodeID = tmpchar;
1082 descrOffset += descrLen;
1083 }
1084 return 0;
1085 }
1086 return EFAULT;
1087 }
1088
1089int HOMERReader::ReAllocBlocks( unsigned long newCnt )
1090 {
1091 DataBlock* newBlocks;
1092 newBlocks = new DataBlock[ newCnt ];
1093 if ( !newBlocks )
1094 return ENOMEM;
1095 unsigned long cpCnt = (newCnt > fMaxBlockCnt) ? fMaxBlockCnt : newCnt;
1096 memcpy( newBlocks, fBlocks, cpCnt*sizeof(DataBlock) );
1097 if ( newCnt > fMaxBlockCnt )
1098 memset( newBlocks+fMaxBlockCnt, 0, (newCnt-fMaxBlockCnt)*sizeof(DataBlock) );
1099 if ( fBlocks )
1100 delete [] fBlocks;
1101 fBlocks = newBlocks;
1102 fMaxBlockCnt = newCnt;
1103 return 0;
1104 }
1105
1106homer_uint64 HOMERReader::GetSourceEventID( DataSource& source )
1107 {
1108 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1109 return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType1_64b_Offset ] );
1110 }
1111
1112homer_uint64 HOMERReader::GetSourceEventType( DataSource& source )
1113 {
1114 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1115 return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kType_64b_Offset ] );
1116 }
1117
1118
1119
1120/*
1121***************************************************************************
1122**
1123** $Author$ - Initial Version by Timm Morten Steinbeck
1124**
1125** $Id$
1126**
1127***************************************************************************
1128*/