Project

General

Profile

bookkeeping.py

See line 461 - Kurt Biery, 04/10/2018 09:51 AM

 
1

    
2
import os
3
import sys
4
sys.path.append( os.environ["ARTDAQ_DAQINTERFACE_DIR"] )
5

    
6
import string
7
import re
8

    
9
from rc.control.utilities import table_range
10
from rc.control.utilities import commit_check_throws_if_failure
11

    
12
def bookkeeping_for_fhicl_documents_artdaq_v1_base(self):
13

    
14
    if os.path.exists(self.daq_dir + "/srcs/artdaq"):
15
        commit_check_throws_if_failure(self.daq_dir + "/srcs/artdaq", \
16
                                           "c3d1ce5ce07a83793f91efc0744b19aa8d5caf5c", "Jan 12, 2017", False)
17
    
18
    # JCF, 11/11/14
19

    
20
    # Now, set some variables which we'll use to replace
21
    # pre-existing variables in the FHiCL documents before we send
22
    # them to the artdaq processes
23

    
24
    # First passthrough of procinfos: assemble the
25
    # xmlrpc_client_list string, and figure out how many of each
26
    # type of process there are
27

    
28
    xmlrpc_client_list = "\""
29
    numeral = ""
30

    
31
    for procinfo in self.procinfos:
32
        if "BoardReader" in procinfo.name:
33
            numeral = "3"
34
        elif "EventBuilder" in procinfo.name:
35
            numeral = "4"
36
        elif "Aggregator" in procinfo.name:
37
            numeral = "5"
38

    
39
        xmlrpc_client_list += ";http://" + procinfo.host + ":" + \
40
            procinfo.port + "/RPC2," + numeral
41

    
42
    xmlrpc_client_list += "\""
43

    
44
    # Second passthrough: use this newfound info to modify the
45
    # FHiCL code we'll send during the config transition
46

    
47
    # Note that loops of the form "proc in self.procinfos" are
48
    # pass-by-value rather than pass-by-reference, so I need to
49
    # adopt a slightly cumbersome indexing notation
50

    
51
    for i_proc in range(len(self.procinfos)):
52

    
53
        self.procinfos[i_proc].fhicl_used = re.sub(
54
            "first_event_builder_rank.*\n",
55
            "first_event_builder_rank: " +
56
            str(self.num_boardreaders()) + "\n",
57
            self.procinfos[i_proc].fhicl_used)
58

    
59
        self.procinfos[i_proc].fhicl_used = re.sub(
60
            "event_builder_count.*\n",
61
            "event_builder_count: " +
62
            str(self.num_eventbuilders()) + "\n",
63
            self.procinfos[i_proc].fhicl_used)
64

    
65
        self.procinfos[i_proc].fhicl_used = re.sub(
66
            "xmlrpc_client_list.*\n",
67
            "xmlrpc_client_list: " +
68
            xmlrpc_client_list + "\n",
69
            self.procinfos[i_proc].fhicl_used)
70

    
71
        self.procinfos[i_proc].fhicl_used = re.sub(
72
            "first_data_receiver_rank.*\n",
73
            "first_data_receiver_rank: " +
74
            str(self.num_boardreaders() +
75
                self.num_eventbuilders()) + "\n",
76
            self.procinfos[i_proc].fhicl_used)
77

    
78
        self.procinfos[i_proc].fhicl_used = re.sub(
79
            "expected_fragments_per_event.*\n",
80
            "expected_fragments_per_event: " +
81
            str(self.num_boardreaders()) + "\n",
82
            self.procinfos[i_proc].fhicl_used)
83

    
84
        self.procinfos[i_proc].fhicl_used = re.sub(
85
            "fragment_receiver_count.*\n",
86
            "fragment_receiver_count: " +
87
            str(self.num_boardreaders()) + "\n",
88
            self.procinfos[i_proc].fhicl_used)
89

    
90
        self.procinfos[i_proc].fhicl_used = re.sub(
91
            "data_receiver_count.*\n",
92
            "data_receiver_count: " +
93
            str(self.num_aggregators() - 1) + "\n",
94
            self.procinfos[i_proc].fhicl_used)
95

    
96

    
97

    
98
def bookkeeping_for_fhicl_documents_artdaq_v2_base(self):
99

    
100
    if os.path.exists(self.daq_dir + "/srcs/artdaq"):
101
        commit_check_throws_if_failure(self.daq_dir + "/srcs/artdaq", \
102
                                           "c3d1ce5ce07a83793f91efc0744b19aa8d5caf5c", "Jan 12, 2017", True)
103

    
104
        commit_check_throws_if_failure(self.daq_dir + "/srcs/artdaq", \
105
                                           "9a63dfd8660bfbba43acadcfa1ed4d362610be2f", "May 9, 2017", False)
106

    
107
    if self.num_aggregators() > 1:
108
        num_data_loggers = self.num_aggregators() - 1  # "-1" is for the dispatcher
109
    else:
110
        num_data_loggers = self.num_aggregators()
111

    
112
    assert num_data_loggers == 1, "Currently only have the logic to handle one data logger"
113

    
114
    max_fragment_size_words = self.max_fragment_size_bytes / 8
115

    
116
    proc_hosts = []
117

    
118
    for proctype in ["BoardReader", "EventBuilder", "Aggregator", "DataLogger", "Dispatcher" ]:
119
        for procinfo in self.procinfos:
120
            if proctype in procinfo.name:
121
                num_existing = len(proc_hosts)
122

    
123
                if procinfo.host == "localhost":
124
                    host_to_display = os.environ["HOSTNAME"]
125
                else:
126
                    host_to_display = procinfo.host
127

    
128
                proc_hosts.append( 
129
                    "{rank: %d host: \"%s\" portOffset: %d}" % \
130
                        (num_existing, host_to_display, self.tcp_base_port + 100*num_existing))
131

    
132
    proc_hosts_string = ", ".join( proc_hosts )
133

    
134
    def create_sources_or_destinations_string(nodetype, first, last):
135

    
136
        if nodetype == "sources":
137
            prefix = "s"
138
        elif nodetype == "destinations":
139
            prefix = "d"
140
        else:
141
            assert False
142

    
143
        nodes = []
144

    
145
        for i in range(first, last):
146
            nodes.append( 
147
                "%s%d: { transferPluginType: Autodetect %s_rank: %d max_fragment_size_words: %d host_map: [%s]}" % \
148
                    (prefix, i, nodetype[:-1], i, max_fragment_size_words, \
149
                    proc_hosts_string))
150

    
151
        return "\n".join( nodes )
152

    
153
    source_node_first = -1
154
    source_node_last = -1
155
    destination_node_first = -1
156
    destination_node_last = -1
157
    
158
    agg_count = 0
159

    
160
    for i_proc in range(len(self.procinfos)):
161
        if "BoardReader" in self.procinfos[i_proc].name:
162
            destination_node_first = self.num_boardreaders()
163
            destination_node_last = self.num_boardreaders() + \
164
                self.num_eventbuilders()
165
            
166
        elif "EventBuilder" in self.procinfos[i_proc].name:
167
            source_node_first = 0
168
            source_node_last = self.num_boardreaders()
169
            destination_node_first = self.num_boardreaders() + \
170
                self.num_eventbuilders()
171
            destination_node_last = self.num_boardreaders() + \
172
                self.num_eventbuilders() + \
173
                num_data_loggers  
174

    
175
        elif "Aggregator" in self.procinfos[i_proc].name:
176
            source_node_first = self.num_boardreaders()
177
            source_node_last = self.num_boardreaders() + \
178
                self.num_eventbuilders()
179
        else:
180
            assert False
181

    
182
        for tablename in [ "sources", "destinations" ]:
183

    
184
            if tablename == "sources":
185
                node_first = source_node_first
186
                node_last = source_node_last
187
            else:
188
                node_first = destination_node_first
189
                node_last = destination_node_last
190

    
191
            (table_start, table_end) = \
192
                table_range(self.procinfos[i_proc].fhicl_used, \
193
                                tablename)
194

    
195
            if table_start != -1 and table_end != -1:
196
                self.procinfos[i_proc].fhicl_used = \
197
                    self.procinfos[i_proc].fhicl_used[:table_start] + \
198
                    "\n" + tablename + ": { \n" + \
199
                    create_sources_or_destinations_string(tablename, node_first, node_last) + \
200
                    "\n } \n" + \
201
                    self.procinfos[i_proc].fhicl_used[table_end:]
202
        
203
        if "Aggregator" in self.procinfos[i_proc].name:
204
            (table_start, table_end) = \
205
                table_range(self.procinfos[i_proc].fhicl_used, \
206
                                "transfer_to_dispatcher")
207
            if table_start == -1 or table_end == -1:
208
                raise Exception("Unable to find expected transfer_to_dispatcher transfer plugin definition in the Aggregator FHiCL")
209

    
210
            transfer_source_rank = self.num_boardreaders() + \
211
                self.num_eventbuilders() + \
212
                agg_count
213

    
214
            agg_count += 1
215

    
216
            transfer_destination_rank = self.num_boardreaders() + \
217
                self.num_eventbuilders() + num_data_loggers
218

    
219
            # JCF, Jan-24-2017
220
     
221
            # This is a kluge to account for the fact that the code can't
222
            # yet handle more than one data logger
223

    
224
            if transfer_source_rank == transfer_destination_rank:
225
                transfer_source_rank -= 1
226

    
227
            assert num_data_loggers < 2, "Code doesn't yet support multiple data loggers"
228

    
229
            transfer_code = self.procinfos[i_proc].fhicl_used[table_start:table_end]
230
            transfer_code = re.sub(r"source_rank\s*:\s*[0-9]+", 
231
                                   "source_rank: %d" % (transfer_source_rank),
232
                                   transfer_code)
233
            transfer_code = re.sub(r"destination_rank\s*:\s*[0-9]+",
234
                                   "destination_rank: %d" % (transfer_destination_rank),
235
                                   transfer_code)
236

    
237
            self.procinfos[i_proc].fhicl_used = \
238
                self.procinfos[i_proc].fhicl_used[:table_start] + \
239
                transfer_code + \
240
                self.procinfos[i_proc].fhicl_used[table_end:]
241

    
242
    for i_proc in range(len(self.procinfos)):
243
        self.procinfos[i_proc].fhicl_used = re.sub("expected_fragments_per_event\s*:\s*[0-9]+", 
244
                                                   "expected_fragments_per_event: %d" % (self.num_boardreaders()), 
245
                                                   self.procinfos[i_proc].fhicl_used)
246

    
247

    
248
# A lot of the code in this function repeats the code in
249
# bookkeeping_for_fhicl_documents_artdaq_v2_base. This is intentional
250
# - by not modularizing the repeated code, it means that it won't be
251
# possible to break backwards compatibility should we want to use
252
# artdaq v2_01_00 through artdaq v2_02_02 in the future
253

    
254
def bookkeeping_for_fhicl_documents_artdaq_v3_base(self):
255

    
256
    send_1_over_N = True
257

    
258
    try:
259
        if self.all_events_to_all_dispatchers:
260
            send_1_over_N = False
261
    except Exception:
262
        pass # We don't care if variable above is undefined
263

    
264
    max_fragment_size_words = self.max_fragment_size_bytes / 8
265

    
266
    if os.path.exists(self.daq_dir + "/srcs/artdaq"):
267
        commit_check_throws_if_failure(self.daq_dir + "/srcs/artdaq", \
268
                                           "68cb53e576dd6afea7950ca6286a08f5f329b966", "May 9, 2017", True)
269

    
270
    num_data_loggers = 0
271
    num_dispatchers = 0
272

    
273
    for procinfo in self.procinfos:
274
        if "DataLogger" in procinfo.name:
275
            num_data_loggers += 1
276
        elif "Dispatcher" in procinfo.name:
277
            num_dispatchers += 1
278

    
279
    proc_hosts = []
280

    
281
    for procinfo in self.procinfos:
282
        num_existing = len(proc_hosts)
283

    
284
        if procinfo.host == "localhost":
285
            host_to_display = os.environ["HOSTNAME"]
286
        else:
287
            host_to_display = procinfo.host
288

    
289
        proc_hosts.append( 
290
            "{rank: %d host: \"%s\" portOffset: %d}" % \
291
                (num_existing, host_to_display, self.tcp_base_port + (1+len(self.procinfos))*num_existing))
292

    
293
    proc_hosts_string = ", ".join( proc_hosts )
294

    
295
    def create_sources_or_destinations_string(nodetype, first, last, nth = -1, this_node_index = -1):
296

    
297
        if nodetype == "sources":
298
            prefix = "s"
299
        elif nodetype == "destinations":
300
            prefix = "d"
301
        else:
302
            assert False
303

    
304
        nodes = []
305

    
306
        for i in range(first, last):
307
            if nth == -1:
308
                nodes.append( 
309
                    "%s%d: { transferPluginType: %s %s_rank: %d max_fragment_size_words: %d host_map: [%s]}" % \
310
                    (prefix, i, self.transfer, nodetype[:-1], i, max_fragment_size_words, \
311
                     proc_hosts_string))
312
            else:
313

    
314
                if nodetype == "destinations":
315
                    assert (last - first) == nth, "Problem with the NthEvent logic in the program: first node is %d, last is %d, but nth is %d" % (first, last, nth)
316

    
317
                    offset = (i - first) 
318
                elif nodetype == "sources":
319
                    offset = this_node_index
320

    
321
                nodes.append( 
322
                    "%s%d: { transferPluginType: NthEvent nth: %d offset: %d physical_transfer_plugin: { transferPluginType: %s %s_rank: %d max_fragment_size_words: %d } host_map: [%s]}" % \
323
                    (prefix, i, nth, offset,self.transfer, nodetype[:-1], i, max_fragment_size_words, \
324
                     proc_hosts_string))
325

    
326
        return "\n".join( nodes )
327

    
328
    if send_1_over_N:
329
        current_dispatcher_index = 0
330

    
331
    for i_proc in range(len(self.procinfos)):
332

    
333
        source_node_first = -1
334
        source_node_last = -1
335
        destination_node_first = -1
336
        destination_node_last = -1
337

    
338
        is_data_logger = False
339
        is_dispatcher = False
340

    
341
        if "BoardReader" in self.procinfos[i_proc].name:
342
            destination_node_first = self.num_boardreaders()
343
            destination_node_last = destination_node_first + \
344
                                    self.num_eventbuilders()
345
            
346
        elif "EventBuilder" in self.procinfos[i_proc].name:
347
            source_node_first = 0
348
            source_node_last = source_node_first + self.num_boardreaders()
349
            destination_node_first = self.num_boardreaders() + \
350
                self.num_eventbuilders()
351
            destination_node_last = destination_node_first + num_data_loggers  
352

    
353
        elif "DataLogger" in self.procinfos[i_proc].name:
354
            is_data_logger = True
355

    
356
            source_node_first = self.num_boardreaders()
357
            source_node_last = self.num_boardreaders() + \
358
                               self.num_eventbuilders()
359

    
360
            destination_node_first = self.num_boardreaders() + \
361
                                     self.num_eventbuilders() + \
362
                                     num_data_loggers
363
            destination_node_last =  self.num_boardreaders() + \
364
                                     self.num_eventbuilders() + \
365
                                     num_data_loggers + \
366
                                     num_dispatchers
367
        elif "Dispatcher" in self.procinfos[i_proc].name:
368
            is_dispatcher = True
369

    
370
            source_node_first = self.num_boardreaders() + \
371
                                self.num_eventbuilders()
372
            source_node_last = source_node_first + num_data_loggers
373
        elif "RoutingMaster" in self.procinfos[i_proc].name:
374
            pass
375
        else:
376
            assert False, "Process type not recognized"
377

    
378
        for tablename in [ "sources", "destinations" ]:
379

    
380
            if tablename == "sources":
381
                node_first = source_node_first
382
                node_last = source_node_last
383
            else:
384
                node_first = destination_node_first
385
                node_last = destination_node_last
386

    
387
 
388
            (table_start, table_end) = \
389
                table_range(self.procinfos[i_proc].fhicl_used, \
390
                                tablename)
391

    
392
            if table_start != -1 and table_end != -1:
393
                
394
                node_index = -1
395
                nth = -1
396

    
397
                if send_1_over_N:
398
                    if is_data_logger and tablename == "destinations":
399
                        nth = num_dispatchers
400
                    elif is_dispatcher and tablename == "sources":
401
                        nth = num_dispatchers
402
                        node_index = current_dispatcher_index
403
                        current_dispatcher_index += 1
404

    
405
                self.procinfos[i_proc].fhicl_used = \
406
                    self.procinfos[i_proc].fhicl_used[:table_start] + \
407
                    "\n" + tablename + ": { \n" + \
408
                    create_sources_or_destinations_string(tablename, node_first, node_last, nth, node_index) + \
409
                    "\n } \n" + \
410
                    self.procinfos[i_proc].fhicl_used[table_end:]
411

    
412
    expected_fragments_per_event = 0
413

    
414
    for procinfo in self.procinfos:
415

    
416
        if "BoardReader" in procinfo.name:
417

    
418
            res = re.search(r"[^#]\s*sends_no_fragments:\s*[Tt]rue", procinfo.fhicl_used)
419

    
420
            if not res:
421
                expected_fragments_per_event += 1
422
            else:
423
                continue                
424

    
425
    for i_proc in range(len(self.procinfos)):
426
        if "DataLogger" in self.procinfos[i_proc].name or "Dispatcher" in self.procinfos[i_proc].name:
427
            self.procinfos[i_proc].fhicl_used = re.sub("expected_fragments_per_event\s*:\s*[0-9]+", 
428
                                                       "expected_fragments_per_event: 1", 
429
                                                       self.procinfos[i_proc].fhicl_used)
430
        else:
431
            self.procinfos[i_proc].fhicl_used = re.sub("expected_fragments_per_event\s*:\s*[0-9]+", 
432
                                                       "expected_fragments_per_event: %d" % (expected_fragments_per_event), 
433
                                                       self.procinfos[i_proc].fhicl_used)
434
        if not self.request_address is None:
435
            self.procinfos[i_proc].fhicl_used = re.sub("request_address\s*:\s*[\"0-9\.]+", 
436
                                                       "request_address: \"%s\"" % (self.request_address), 
437
                                                       self.procinfos[i_proc].fhicl_used)
438

    
439
        if not self.request_port is None:
440
            self.procinfos[i_proc].fhicl_used = re.sub("request_port\s*:\s*[0-9]+", 
441
                                                       "request_port: %d" % (self.request_port), 
442
                                                       self.procinfos[i_proc].fhicl_used)
443

    
444
        if not self.partition_number is None:
445
            self.procinfos[i_proc].fhicl_used = re.sub("partition_number\s*:\s*[0-9]+", 
446
                                                       "partition_number: %d" % (self.partition_number), 
447
                                                       self.procinfos[i_proc].fhicl_used)
448
    
449
    if not self.data_directory_override is None:
450
        for i_proc in range(len(self.procinfos)):
451
            if "EventBuilder" in self.procinfos[i_proc].name or "DataLogger" in self.procinfos[i_proc].name:
452

    
453
                res = re.search(r"^[^#]*RootOutput", self.procinfos[i_proc].fhicl_used)
454

    
455
                if res:
456
                    start, end = table_range(self.procinfos[i_proc].fhicl_used, "RootOutput")
457
                    assert start != -1 and end != -1
458

    
459
                    rootoutput_table = self.procinfos[i_proc].fhicl_used[start:end]
460
                    res = re.search(r"(.*fileName\s*:[\s\"]*)/[^\s]+/",
461
                                    rootoutput_table)
462
                    assert res
463

    
464
                    rootoutput_table = re.sub(".*fileName\s*:[\s\"]*/[^\s]+/",
465
                                              "%s%s" % (res.group(1), self.data_directory_override),
466
                                              rootoutput_table)
467

    
468
                    self.procinfos[i_proc].fhicl_used = self.procinfos[i_proc].fhicl_used[:start] + \
469
                                                        rootoutput_table + \
470
                                                        self.procinfos[i_proc].fhicl_used[end:]
471
                                                    
472
                
473

    
474
def bookkeeping_for_fhicl_documents_artdaq_v4_base(self):
475
    pass
476

    
477
def main():
478
    
479
    test_table_range = True
480

    
481
    if test_table_range:
482

    
483
        filename = "%s/simple_test_config/multiple_dispatchers/Aggregator2.fcl" % os.getcwd()
484

    
485
        inf = open( filename )
486
        
487
        inf_contents = inf.read()
488

    
489
        print "From file " + filename
490

    
491
        for tablename in ["sources", "destinations"]:
492
            (table_start, table_end) = table_range( inf_contents, tablename )
493
            
494
            print "Seven characters centered on table_start: \"" + inf_contents[(table_start - 3):(table_start+4)] + "\""
495
            print "Seven characters centered on table_end: \"" + inf_contents[(table_end - 3):(table_end+4)] + "\""
496
            print "The table_start: \"" + inf_contents[(table_start):(table_start+1)] + "\""
497
            print "The table_end: \"" + inf_contents[(table_end ):(table_end+1)] + "\""
498

    
499
if __name__ == "__main__":
500
    main()