WorkspaceRunner - Identify when all child processes are complete

Related products: FME Flow

When using the WorkspaceRunner you often want to do something after that transformer with the results of the child processes that it spawned. Sometimes this involves launching another Workspace. At the moment there is no easy way to understand if all the child processes from that WorkspaceRunner have been fully completed.

This is the crux of the problem, from the help:

If the Wait for Job to Complete parameter is set to No, the initiating feature is output through this port if the request was successfully submitted, though whether or not the workspace completes is unknown in this case.

One option might be a new port that releases a feature when the child processes have all been closed off, or some new parameter/mode. At the moment you need a workaround or you need to throttle the process and use 'Wait for job to complete'.

totally agree with @1spatialdave just past few days I was working on a workspace that spins off up to 32 sub-processes, after they all started I was excited that it finished so quick, only to find out that the last 'FME.exe' process disappeared from Task Manager 2h later.


Absolutely, totally agreed! I have a workspace with 8 parallell FME workspacerunner, about half of them takes up to 30 minutes to complete.

 

Why not differentiate between the "succeeded" and "summary" output ports of the workspace runner? "Succeeded" vs "failure" are outputed as today, whereas "summary" waits for the job to actually complete.


Yeah this is a nice idea, but given the number of votes on this and it's it seems like this might be tricky...


As I keep running into this problem myself I decided to try finding a workaround using Python. I put the below code into a PythonCaller and fed the Summary port (from the WorkspaceRunner) into it.

 

 

import fme
import fmeobjects
import subprocess
import re
import time

pattern = re.compile(r"fmemA-Z]*\\.exe {1,}(}0-9]{1,})")
wait_secs = 30

class FeatureProcessor(object):
    def __init__(self):
        pass
    def check_tasks_running(self, processes):
        tasks_running = False
        for task in subprocess.check_output("tasklist").splitlines():
            match = pattern.search(str(task))
            if match:
                if match.group(1) in processes:
                    tasks_running = True
        if tasks_running:
            return False
            time.sleep(wait_secs)
        else:
            return True
    def input(self,feature):
        processes = feature.getAttribute('_processes')
        jobs_finished = False
        while not jobs_finished:
            jobs_finished = self.check_tasks_running(processes)
        self.pyoutput(feature)
    def close(self):
        pass

 

The Summary feature appears to be sent once the last batch of jobs have been kicked off, this includes a list of process IDs. I cross reference this list to what's running on the system, if there is a match then that means one or more jobs are still running, so I keep checking until that's no longer the case. The wait was added to make it a bit more sensible, e.g. if you know your jobs will take hours then there's no point checking multiple times a second. In my case the jobs took minutes each so I opted to check every 30 seconds.

 

 

I'm very much a beginner in Python so I'm sure this could have been done in a much nicer way...