Week 3: Attempting to create a trajectory streaming propagator in WESTPA
As mentioned in my previous blog post I have started to explore the integration of trajectory streaming into the WESTPA framework. The goal is to edit the executable.py
file in WESTPA to create a new propagator that is started by the WESTPA work manager and runs analysis processes. These changes are being tracked in pull request #495.
Modifications to the WESTPA ExecutablePropagator
ExecutablePropagator
is a class in WESTPA, in which executables are defined to be run by the WESTPA work manager. In WESTPA, there are currently five executables defined: propagator
, pre_iteration
, post_iteration
, get_pcoord
and gen_istate
. I added a new executable called stream_trajectory
. This can be activated by adding the following lines to the west.cfg
file:
stream_trajectory:
enabled: true
executable: $WEST_SIM_ROOT/westpa_scripts/analysis_streaming.sh
stderr: stdout
stdout: $WEST_SIM_ROOT/seg_logs/{segment.n_iter:06d}-{segment.seg_id:06d}_analysis.log
The executable
line specifies the path to the script that will be run. The stderr
and stdout
lines specify where the standard error and standard output of the script will be written. The {segment.n_iter:06d}
and {segment.seg_id:06d}
placeholders are replaced with the current iteration number and segment ID, respectively. This allows the script to write its output to a log file that is specific to the current segment and iteration.
Addition of a function to run the streaming executable
In the ExecutablePropagator
class, I added a new function called stream_trajectory
. This function is called by the WESTPA work manager and launches the stream_trajectory
executable. The function is defined as follows:
def stream_trajectory(self, segments):
child_info = self.exe_info['stream_trajectory']
log.debug('trajectory streaming executable: %s' % child_info['executable'])
for segment in segments:
addtl_env = {}
addtl_env.update(self.port_env_vars(segment.seg_id))
# Pass the additional environment variables to the trajectory streaming executable
rc_stream, rusage_stream = self.exec_for_trajectory_streaming(child_info, segment, addtl_env)
if rc_stream == 0:
log.debug('trajectory streaming child process for segment %d completed successfully' % segment.seg_id)
elif rc_stream < 0:
log.error(
'trajectory streaming child process for segment %d exited on signal %d (%s)'
% (segment.seg_id, -rc_stream, SIGNAL_NAMES[-rc_stream])
)
else:
log.error('trajectory streaming child process for segment %d exited with code %d' % (segment.seg_id, rc_stream))
This function receives a list of segments to run analysis for and adds the port to the environment variables that are passed to the executable (I will discuss port assignment later). It then calls the exec_for_trajectory_streaming
function, which adds additional environment variables and executes the predefined executable for trajectory streaming. In WESTPA, there is currently the exec_for_segment
function, which is used to execute the propagator
executable. I had hoped to reuse this function, but it requires istate and bstate information, which is not needed for the trajectory streaming executable.
Modifications to the WESTPA Sim and Work Managers
In order for the WESTPA to run the stream_trajectory
executable, the WESTPA Sim and Work Managers need to be modified. The Sim Manager is responsible for managing the simulation and sending tasks to the Work Manager, which is responsible for executing the tasks.
I added the following block of code to the propagate
function in the Sim Manager:
# If trajectory streaming is enabled submit a streaming process
if self.do_trajectory_streaming:
log.debug('streaming trajectory for segment block of length: {:d}'.format(len(segment_block)))
future = self.work_manager.submit(wm_ops.stream_trajectory, args=(segment_block,))
futures.add(future)
stream_futures.add(future)
This code runs alongside the section of code that submits the propagator
executable to the Work Manager. The wm_ops.stream_trajectory
operation is defined in the Work Manager and calls the stream_trajectory
function in the ExecutablePropagator
class.
Issues with Port Assignment
From this code, the work manager can successfully run the stream_trajectory
executable for each segment. However, as I elaborated in my previous blog post, both the propagator
and stream_trajectory
executables need to have access to the same port. Whilst in this current implementation, WESTPA successfully sends a port number to both executables using environment variables, there are some issues with the current approach. At the moment, I have not implemented code to check whether the port is free before assigning it. This is more tricky than I initially thought, but there is likely a method to do so. A more critical issue is that the port may be free at the point at which the port is assigned using WESTPA, but may be taken by another process before the molecular dynamics code starts running. If this happens, the molecular dynamics code will fail to start.
Multinode setups represent a further challenge, as there is nothing stopping the work manager from running the stream_trajectory
executable on a different node to the propagator
executable. This would mean that the two executables may not be able to communicate with each other over localhost. This may require forcing the work manager to run the stream_trajectory
executable on the same node as the propagator
executable, which requires some investigation.
Next Steps
In the next week, I am going to focus on implementing the method suggested in the previous post, where a single python executable runs both the simulation and the analysis. This avoids a lot of the issues with port assignment and ensures that both processes run on the same node. I will also continue to work on the implementation within WESTPA, and think about how to handle the port assignment issues.