In
Oracle EPM, pipelines are essential for automating data workflows, integrating
data sources, and performing transformations. To ensure smooth execution, it's
crucial to monitor pipeline runs and detect job failures in real-time.
When
running automated data processing or calculations in Oracle EPM Cloud,
monitoring the execution status of pipelines is critical. You don’t want
to just trigger a pipeline and hope for the best—you need real-time insights
into whether jobs succeed or fail.
This
blog post explains how a Groovy script fetches pipeline execution
details using Oracle EPM’s REST API and processes job statuses to
determine if any failures have occurred.
The
script retrieves execution details for a specific pipeline and checks the
status of individual jobs within the pipeline. If any job fails, the script
aborts execution and raises an error.
Fetch
Pipeline Execution Details from REST API
The
script initiates an HTTP GET request to Oracle EPM’s REST API to fetch
pipeline execution details.
Making
the API Call - https://yourhost.oraclecloud.com/aif/rest/V1/pipeline?pipelineName=PipelineCode
This
is an undocumented feature for your information.
More
info on this and the idea created is here -
Imagine you’re running a forecast data load pipeline called SFCLCFCSTLOAD. This pipeline consists of multiple stages and jobs that need to execute in sequence (or in parallel). But what if one of those jobs fails?
# You need to detect failures
early.
# You need to log the details of
every job in the pipeline.
# You need to abort execution if
something goes wrong.
This
script does exactly that! Let’s break it down step by step.
The
script starts by making an HTTP GET request to the Oracle EPM pipeline
API to retrieve details of the SFCLCFCSTLOAD pipeline.
groovy
try
{
HttpResponse<String> PipeResponse =
operation.application.getConnection("localURL")
.get('/aif/rest/V1/pipeline?pipelineName=PipelineName')
.asString()
#
Handling the API Response
Before
processing the response, we check for null values to avoid unexpected
errors.
if
(PipeResponse == null || PipeResponse.body == null) {
throwVetoException("Error: Received
null response for pipeline execution.")
}
#
Parsing the JSON Response
Now
that we have a response, we use Groovy’s JsonSlurper to convert the JSON
string into a usable object.
Map
jsonResponse1 = (Map) new
groovy.json.JsonSlurper().parseText(PipeResponse.body)
Map
responseDetails = (Map) jsonResponse1.get("response")
List<Map>
stages = (List<Map>) responseDetails.get("stages")
#
Logging High-Level Pipeline Details
We
now print key details about the pipeline execution.
println
"Pipeline Name: ${responseDetails.get("name")}"
println
"Display Name: ${responseDetails.get("displayName")}"
println
"Pipeline ID: ${responseDetails.get("id")}"
println
"Status: ${jsonResponse1.get("status")}"
println
"Parallel Jobs: ${responseDetails.get("parallelJobs")}"
#
Example Output in Logs:
Pipeline
Overview
Pipeline
Name:
SAMPLE_PIPELINE
Display Name: Sample Data Processing Pipeline
Pipeline ID: 9999
Status: 0
Parallel Jobs: 3
Looping
Through Stages and Jobs
Now,
we loop through each stage in the pipeline and log details of every
job inside it.
boolean
failureDetected = false
stages.each
{ Map stage ->
println " Stage Name:
${stage.get("stageName")}"
println " Display Name:
${stage.get("stageDisplayName")}"
println " Stage ID:
${stage.get("stageID")}"
List<Map> jobs = (List<Map>)
stage.get("jobs")
jobs?.each { Map job ->
String jobStatus =
job.get("status") ?: 'Not started'
println " Job Name:
${job.get("jobName")}"
println " Job ID: ${job.get("processId") ?:
'null'}"
println " Job Type:
${job.get("jobType")}"
println " Status: ${jobStatus}"
if (jobStatus in ["FAILED",
"ERROR", "CANCELLED"]) {
println "❌
Job ${job.get("jobName")} failed! Aborting pipeline execution."
failureDetected = true
}
}
}
#
Example Output in Logs:
Stages
& Jobs
Stage
Name: Initialize Process
- Display
Name:
Set Initial Parameters
- Stage ID: 1001
- Job
Name:
Disable Admin Mode
- Job ID: 20001
- Job
Type:
epmPlatformJob
- Status: SUCCESS
Stage
Name: Data Preparation
- Display
Name:
Clear Old Data
- Stage ID: 1002
- Job
Name:
Clear Previous Records - Batch 1
- Job ID: 20002
- Job
Type:
clearCube
- Status: SUCCESS
- Job
Name:
Clear Previous Records - Batch 2
- Job ID: 20003
- Job
Type:
clearCube
- Status: SUCCESS
Stage
Name: Data Import
- Display
Name:
Load New Data
- Stage ID: 1003
- Job
Name:
Import Batch A
- Job ID: 20004
- Job
Type:
integration
- Status: SUCCESS
- Job
Name:
Import Batch B
- Job ID: 20005
- Job
Type:
integration
- Status: SUCCESS
Stage
Name: Processing & Analysis
- Display
Name:
Run Analytics
- Stage ID: 1004
- Job
Name:
Process Data - Step 1
- Job ID: 20006
- Job
Type:
integration
- Status: WARNING
- Job
Name:
Process Data - Step 2
- Job ID: 20007
- Job
Type:
integration
- Status: WARNING
Stage
Name: Data Export
- Display
Name:
Generate Reports
- Stage ID: 1005
- Job
Name:
Export Processed Data - Batch A
- Job ID: 20008
- Job
Type:
integration
- Status: SUCCESS
- Job
Name:
Export Processed Data - Batch B
- Job ID: 20009
- Job
Type:
integration
- Status: SUCCESS
Stage
Name: File Operations
- Display
Name:
Move Processed Files
- Stage ID: 1006
- Job
Name:
Transfer File - Batch A
- Job ID: 20010
- Job
Type:
fileOperations
- Status: SUCCESS
- Job
Name:
Transfer File - Batch B
- Job ID: 20011
- Job
Type:
fileOperations
- Status: SUCCESS
Stage
Name: Finalize Process
- Display
Name:
Enable Admin Mode
- Stage ID: 1007
- Job
Name:
Re-enable Admin Mode
- Job ID: 20012
- Job
Type:
epmPlatformJob
- Status: SUCCESS
Stage
Name: Variable Updates
- Display
Name:
Update Process Variables
- Stage ID: 1008
- Job
Name:
Set Processing Month
- Job ID: 20013
- Job
Type:
setSubVar
- Status: SUCCESS
- Job
Name:
Set Processing Year
- Job ID: 20014
- Job
Type:
setSubVar
- Status: SUCCESS
#
Handling Job Failures
If
any job fails, we abort execution immediately.
if (failureDetected) {
throwVetoException("❌
One or more child jobs failed in the pipeline. Rule execution aborted.")
}
Catching
Unexpected Errors
Finally,
we handle any unexpected exceptions that might occur.
}
catch (Exception e) {
throwVetoException("Error fetching
pipeline execution details: ${e.message}")
}
This
script is critical for automating and monitoring pipeline execution
in Oracle EPM Cloud.
#
Real-time monitoring ensures you know exactly what’s happening inside
the pipeline.
# Early failure detection
prevents incorrect data from being processed.
# Detailed logging helps
troubleshoot issues quickly.
# Exception handling ensures
that the rule does not fail silently.
By
integrating this script into your EPM Groovy rules, you can track and
validate every step of the pipeline execution, ensuring accuracy,
efficiency, and reliability. 🚀
Hope
this helps, happy days on the cloud!!!
Hi Dayalan. Thanks for this!
ReplyDeleteI had a doubt - so when we say automation, are we saying that we would be scheduling this groovy rule rather than the Pipeline itself, so that the Forecast data load process can be automated?