Week 3: Attempting to create a trajectory streaming propagator in WESTPA

25 Jun 2025    

As mentioned in my previous blog post I have started to explore the integration of trajectory streaming into the WESTPA framework. The goal is to edit the executable.py file in WESTPA to create a new propagator that is started by the WESTPA work manager and runs analysis processes. These changes are being tracked in pull request #495.

Modifications to the WESTPA ExecutablePropagator

ExecutablePropagator is a class in WESTPA, in which executables are defined to be run by the WESTPA work manager. In WESTPA, there are currently five executables defined: propagator, pre_iteration, post_iteration, get_pcoord and gen_istate. I added a new executable called stream_trajectory. This can be activated by adding the following lines to the west.cfg file:

stream_trajectory:
    enabled:    true
    executable: $WEST_SIM_ROOT/westpa_scripts/analysis_streaming.sh
    stderr:     stdout
    stdout:     $WEST_SIM_ROOT/seg_logs/{segment.n_iter:06d}-{segment.seg_id:06d}_analysis.log

The executable line specifies the path to the script that will be run. The stderr and stdout lines specify where the standard error and standard output of the script will be written. The {segment.n_iter:06d} and {segment.seg_id:06d} placeholders are replaced with the current iteration number and segment ID, respectively. This allows the script to write its output to a log file that is specific to the current segment and iteration.

Addition of a function to run the streaming executable

In the ExecutablePropagator class, I added a new function called stream_trajectory. This function is called by the WESTPA work manager and launches the stream_trajectory executable. The function is defined as follows:

def stream_trajectory(self, segments):
    child_info = self.exe_info['stream_trajectory']
    log.debug('trajectory streaming executable: %s' % child_info['executable'])
    for segment in segments:
        addtl_env = {}
        addtl_env.update(self.port_env_vars(segment.seg_id))
        # Pass the additional environment variables to the trajectory streaming executable
        rc_stream, rusage_stream = self.exec_for_trajectory_streaming(child_info, segment, addtl_env)
        if rc_stream == 0:
            log.debug('trajectory streaming child process for segment %d completed successfully' % segment.seg_id)
        elif rc_stream < 0:
            log.error(
                'trajectory streaming child process for segment %d exited on signal %d (%s)'
                % (segment.seg_id, -rc_stream, SIGNAL_NAMES[-rc_stream])
            )
        else:
            log.error('trajectory streaming child process for segment %d exited with code %d' % (segment.seg_id, rc_stream))

This function receives a list of segments to run analysis for and adds the port to the environment variables that are passed to the executable (I will discuss port assignment later). It then calls the exec_for_trajectory_streaming function, which adds additional environment variables and executes the predefined executable for trajectory streaming. In WESTPA, there is currently the exec_for_segment function, which is used to execute the propagator executable. I had hoped to reuse this function, but it requires istate and bstate information, which is not needed for the trajectory streaming executable.

Modifications to the WESTPA Sim and Work Managers

In order for the WESTPA to run the stream_trajectory executable, the WESTPA Sim and Work Managers need to be modified. The Sim Manager is responsible for managing the simulation and sending tasks to the Work Manager, which is responsible for executing the tasks.

I added the following block of code to the propagate function in the Sim Manager:

# If trajectory streaming is enabled submit a streaming process
if self.do_trajectory_streaming:
    log.debug('streaming trajectory for segment block of length: {:d}'.format(len(segment_block)))
    future = self.work_manager.submit(wm_ops.stream_trajectory, args=(segment_block,))
    futures.add(future)
    stream_futures.add(future)

This code runs alongside the section of code that submits the propagator executable to the Work Manager. The wm_ops.stream_trajectory operation is defined in the Work Manager and calls the stream_trajectory function in the ExecutablePropagator class.

Issues with Port Assignment

From this code, the work manager can successfully run the stream_trajectory executable for each segment. However, as I elaborated in my previous blog post, both the propagator and stream_trajectory executables need to have access to the same port. Whilst in this current implementation, WESTPA successfully sends a port number to both executables using environment variables, there are some issues with the current approach. At the moment, I have not implemented code to check whether the port is free before assigning it. This is more tricky than I initially thought, but there is likely a method to do so. A more critical issue is that the port may be free at the point at which the port is assigned using WESTPA, but may be taken by another process before the molecular dynamics code starts running. If this happens, the molecular dynamics code will fail to start.

Multinode setups represent a further challenge, as there is nothing stopping the work manager from running the stream_trajectory executable on a different node to the propagator executable. This would mean that the two executables may not be able to communicate with each other over localhost. This may require forcing the work manager to run the stream_trajectory executable on the same node as the propagator executable, which requires some investigation.

Next Steps

In the next week, I am going to focus on implementing the method suggested in the previous post, where a single python executable runs both the simulation and the analysis. This avoids a lot of the issues with port assignment and ensures that both processes run on the same node. I will also continue to work on the implementation within WESTPA, and think about how to handle the port assignment issues.