Python DPM API¶
NOTE: This is still an experimental module. Feedback from our users is greatly appreciated.
Due to the significance of recent feature additions to the Python DPM and the end of life of Python 2 support on January 1st, 2020, we are no longer adding features to Python 2 as of September 25th, 2019. The master-py2 repo will be maintained in its current state for those already using the API. https://www.python.org/dev/peps/pep-0373/
The ACNET and DPM dependencies can be downloaded from Redmine directly or you can clone the repositories using:
git clone https://cdcvs.fnal.gov/projects/acnetd-python
git clone https://cdcvs.fnal.gov/projects/acsys-dpm-py-client
You will need to switch to the branch corresponding to your desired version of Python e.g.,
git checkout master-py3.
The DPM module supports two styles of programming, blocking and polling. If your script is driven by the arrival of data, you probably want to use the blocking API.
The DPM class supports Python's context manager protocol, so it can be used in a
with-statement. The benefit of this is, when the scope of the
with-statement is exited, the DPM object immediately closes its connection with DPM.
This is a simple, blocking script to print
M:OUTTMP on the 8F event:
#!/usr/bin/env python3 import DPM with DPM.Blocking() as dpm: dpm.add_entry(0, 'M:OUTTMP@E,8F') for event_response in dpm.process(): print(event_response)
Refer to the Data Request Format (DRF) specification for specific data request formats.
If your script does other things and occasionally wants to check if data has arrived, use the polling API. Each call to
DPM.pending() returns all the items that have arrived since the previous call. In this example, after processing all the pending data, the script sleeps before looping back and handling more data that has arrived.
#!/usr/bin/env python3 import time import DPM dpm = DPM.Polling() dpm.add_entry(0, "M:OUTTMP@E,8F") dpm.start() while True: for request_response in dpm.pending(): print(request_response) print('sleeping...') time.sleep(1.5)
Alternative Data Sources¶
DPM can provide data other than live data. Alternative data sources can be specified for individual requests in the DRF string or for all requests when passed to the DPM start method. When a data source is specified in the DRF string, it supersedes the data source passed to the start method.
#!/usr/bin/env python3 import DPM dpm = DPM.Blocking() dpm.add_entry(0, 'M:OUTTMP@E,8F') for ii in dpm.process("LOGGERDURATION:60000"): print(ii)
#!/usr/bin/env python3 import time import DPM dpm = DPM.Polling() dpm.add_entry(0, "M:OUTTMP@E,8F") dpm.start("LOGGERDURATION:60000") while True: for ii in dpm.pending(): print(ii) print('sleeping...') time.sleep(1.5)
DPM provides an
apply_settings interface that uses the active Kerberos credentials to authenticate the user. When making a single setting a single value may be passed to
apply_setttings. When settings need to be made to multiple devices, the reference id or tag is implied by its position in the
apply_settings argument list. Otherwise, a tag can be referred to explicitly by passing
apply_settings a list of tuples where the first position of the tuple is tag and the second position is the value to be set.
This example reads the current setting and applies a setting 1 greater than the setting on the 8F event:
#!/usr/bin/env python3 import DPM with DPM.Blocking() as dpm: dpm.add_entry(0, 'M:OUTTMP.SETTING@E,8F') for event_response in dpm.process(): dpm.apply_settings(event_response.data + 1)
The simple example above works if everything goes well but it ignores the possibility of error responses from the control system. This example also shows how to set multiple devices at a time, a more practical scenario.
#!/usr/bin/env python3 import DPM import acnet import time request_list = ['G:BEAU.SETTING@E,8F', 'G:AMANDA.SETTING@E,8F'] def exceptHandler(status): print("EXCEPTION: " + str(status)) time.sleep(5) try: settings = [None] * len(request_list) with DPM.Blocking() as dpm: for index, request in enumerate(request_list): dpm.add_entry(index, request) for event_response in dpm.process(): if hasattr(event_response, 'data'): print(event_response) # event_response.tag is our index from above settings[event_response.tag] = event_response.data + 1 else: # This is a status # status responses for settings come back as list if isinstance(event_response, list): print(event_response) for status in event_response: print(status) else: # status responses for readings of setting print(event_response) # We have a setting value for each device if settings.count(None) == 0: # apply_settings can throw exception if not authorized dpm.apply_settings(settings) settings = [None] * len(request_list) except acnet.Status as status: exceptHandler(status)
Use Resources Responsibly¶
The DPM object creates an underlying
acnet.Connection object. For a script that simply uses one DPM object, this is fine. However, if you want to create multiple DPM objects, or if your script uses another (not yet written) Acnet service, you should reduce your resource usage. Specifically, all Acnet services should share one Acnet connection object. The DPM constructors take an optional parameter which specifies an Acnet connection to use rather than create its own. For instance:
#!/usr/bin/env python3 import DPM import acnet acnet_connection = acnet.Connection() reading_dpm = DPM.Polling(acnet_connection) datalogger_dpm = DPM.Blocking(acnet_connection) reading_dpm.add_entry(0, "M:OUTTMP@P,15H") datalogger_dpm.add_entry(0, "M:OUTTMP@P,15H") reading_dpm.start() data_since =  gen = reading_dpm.pending() request_response = gen.next() # Get datalogger data ending at the first data point we get for logger_response in datalogger_dpm.process("LOGGERDURATION:60000"): # Datalogger data is complete when we get an empty array if logger_response.data == : break for datum in logger_response.data: data_since.append(datum) while True: for request_response in gen: data_since.append(request_response) print('sleeping...') time.sleep(1.5)
Using a Test Node¶
For debugging or beta testing new features we may ask that you ask for a specific DPM instead of getting one from the pool. The DPM object will accept a string as the second argument that defines what "task@node" or "node" should be used in the transaction. This allows users to request that their request be routed through test code.
#!/usr/bin/env python3 import DPM with DPM.Blocking(None, "DPMJ@VIRT01") as dpm: dpm.add_entry(0, 'M:OUTTMP@E,8F') for event_response in dpm.process(): print(event_response)
A caveat for running Python 3 on clx nodes¶
Note: This is currently only enabled on clx39. If you need this on a particular clx, please contact Beau Harrison.
Include the following in your `.bashrc` file to change to Python 3.
source /opt/rh/rh-python36/enable export X_SCLX="`scl enable rh-python36 'echo $X_SCLS'`"
Simple blocking example
#!/usr/bin/env python2 import DPM dpm = DPM.Blocking() dpm.addEntry(0, 'M:OUTTMP@E,8F') for ii in dpm.process(): print ii
Simple polling example
#!/usr/bin/env python2 import time import DPM dpm = DPM.Polling() dpm.addEntry(0, "M:OUTTMP@E,8F") dpm.start() while True: for ii in dpm.pending(): print ii print 'sleeping...' time.sleep(1.5)
Context manager example
#!/usr/bin/env python2 import time import DPM with DPM.Blocking() as dpm: dpm.addEntry(0, "M:OUTTMP@E,8F") for ii in dpm.process(): print ii