I am looking into using DPF to drive the results in extensions instead of using the legacy results reader.
While DPF is much faster than the legacy reader for getting things like nodal von mises stress, I am finding that the extensions performance is still substantially worse than the hard coded von-mises available in workbench. For large models containing multiple results steps the code performs around 10x worse and can take hours to evaluate results.
I'm assuming this is because the workbench von mises code is written in C# and is optimized for performing this single task.
I figured I'd ask to see if there are any tricks to speeding up the use of DPF / python in extensions? Otherwise, it looks like I should rewrite my extension in C#.
Here is an example xml for an extension that gets the von mises nodal stresses:
<extension version="1" name="Mises_DPF_Performance_Testing">
<guid shortid="DPF_Performance"></guid>
<script src="main.py" />
<interface context="Mechanical">
<images>images</images>
<toolbar name="Mises_Stress" caption="Mises DPF Performance Test">
<entry name="Mises DPF Performance Test" icon="result">
<callbacks>
<onclick>Create_Mises_DPF_Performance_Test</onclick>
</callbacks>
</entry>
</toolbar>
</interface>
<simdata context="Mechanical">
<result name="Mises DPF Performance Test" version="1" caption="Mises DPF Performance Test" unit="Stress" icon="result" location="node" type="scalar">
<callbacks>
<onstarteval>SetStartTime</onstarteval>
<onendeval>ReturnEndTime</onendeval>
<evaluate>Evaluate_Stress</evaluate>
</callbacks>
<property name="Geometry" caption="Geometry" control="scoping"></property>
<property name="RunTime" caption="Run Time" control="float" default="0.0" readonly="true"/>
</result>
</simdata>
</extension>
And here is an example of the main.py file contents:
import units
import math
import os
import time
def Create_Mises_DPF_Performance_Test(analysis):
analysis.CreateResultObject("Mises DPF Performance Test", ExtAPI.ExtensionManager.CurrentExtension)
Start_Time = 0
global Start_Time
def SetStartTime(result, step):
if step == 1:
global Start_Time
Start_Time = time.time()
def ReturnEndTime(result, step):
global Start_Time
Current_Time = time.time()
result.Properties["RunTime"].Value = Current_Time - Start_Time
# Evaluate all nodes using collector
def Evaluate_Stress(object, stepInfo, collector):
analysis = object.Analysis
dpfc = DPF_Controller(ExtAPI, analysis, collector.Ids)
step = stepInfo.Set
stressValues = dpfc.Get_Nodal_Von_Mises_Stress(step, "psi")
values = []
for id in collector.Ids:
values.append(stressValues[id])
collector.SetAllValues(values)
class DPF_Controller:
def __init__(self, api, analysis, scopingIds):
self.api = api
self.Import_DPF()
self.analysis = analysis
self.data_sources = self.GetDataSources(analysis)
self.streams_container = self.mech_dpf.GetStreams(self.GetAnalysisIndex(analysis)) # Must get index of analysis
self.node_scoping = self.GetNodeScoping(scopingIds)
# Set up Time Scoping
op = self.dpf.operators.metadata.time_freq_provider()
op.inputs.data_sources.Connect(self.data_sources)
time_freq_support = op.outputs.time_freq_support.GetData()
self.time_scoping = time_freq_support.TimeFreqs.Scoping
# Get Results
def Get_Nodal_Von_Mises_Stress(self, step, unitString):
self.time_scoping.Ids = [step]
op = self.dpf.operators.result.stress_von_mises()
op.inputs.time_scoping.Connect(self.time_scoping)
op.inputs.mesh_scoping.Connect(self.node_scoping)
op.inputs.data_sources.Connect(self.data_sources)
op.inputs.streams_container.Connect(self.streams_container)
von_mises_field_container = op.outputs.fields_container.GetData()
unit_Converted_Field = self.ConvertFieldUnits(von_mises_field_container, unitString) # psi, Pa
von_mises_stresses = unit_Converted_Field.GetFieldByTimeId(step).Data
field_ids = unit_Converted_Field.GetFieldByTimeId(step).ScopingIds
return dict(zip(field_ids, von_mises_stresses))
# Generic Helpers
def GetDataSources(self, analysis):
working_directory = analysis.WorkingDir
result_file = os.path.join(working_directory, "file.rst")
data_sources = self.dpf.DataSources(result_file)
return data_sources
def GetNodeScoping(self, nodeIds):
node_scoping = self.dpf.Scoping()
node_scoping.Ids = nodeIds
node_scoping.Location = "Nodal"
return node_scoping
def ConvertFieldUnits(self, originalField, convertToString):
op = self.dpf.operators.math.unit_convert_fc()
op.inputs.fields_container.Connect(originalField)
op.inputs.unit_name.Connect(convertToString)
converted_field_container = op.outputs.fields_container.GetData()
return converted_field_container
def GetAnalysisIndex(self, analysis):
ii = 0
for analysis in self.api.DataModel.AnalysisList:
if analysis == self.analysis:
return ii
else:
ii+=1
def Import_DPF(self):
import mech_dpf as mech_dpf
import Ans.DataProcessing as dpf
self.dpf = dpf
self.mech_dpf = mech_dpf
self.mech_dpf.setExtAPI(self.api)