Dayalan Punniyamoorthy Blog

Thursday, May 23, 2024

Running EPCM Calculation Manager rules in sequential and Data Management rules as a child job using Groovy!

In this blog we will see how to run the EPCM (Enterprise Performance Management Cloud)  Calculation Manager rules using groovy in a sequential manner also invoking Data Management rules as child jobs using the jobFactory.

Groovy script is fetching substitution variable values from an operation's application and printing them. Here's a detailed explanation along with the provided script: 

subVarValue is defined to fetch the value of a substitution variable from the application.

Also Sets the connection name, which is already predefined for the REST API to work. 

def connectionName = 'Localhost'

Closure subVarValue = { String subVar -> operation.application.getSubstitutionVariable(subVar).value }

String CurYr = subVarValue('CurYr')

String CurMth = subVarValue('CurMth')

println "The Rule is using the Sub Var Current Year : " +CurYr +" " + "Current Month : " +CurMth


Groovy script is designed to monitor the status of a job running on a server and wait for its completion. It polls the server at increasing intervals until the job is completed.

Wait for Completion Variable:

boolean WaitforCompletion: This seems to be a flag indicating whether to wait for the job to complete.

HttpResponse<String> jsonResponse: This holds the initial HTTP response from the server.

Await Completion Function:

    • The function awaitCompletion takes the initial JSON response, a connection name, and the operation name as parameters.
    • It checks if the HTTP status code of the response is between 200 and 299. If not, it throws an exception.
    • It parses the JSON response to read the job status and job ID.
    • It keeps polling the server at exponentially increasing intervals (up to 1000 milliseconds) until the job is no longer in progress.
    • It prints whether the operation was successful based on the final status.
Get Job Status Function:
    • The function getJobStatus sends a GET request to the server to fetch the current status of the job using the job ID.
    • It parses the response to read and return the job status.

/* Wait for the Rules to complete */
boolean WaitforCompletion
HttpResponse<String> jsonResponse
def payload
def awaitCompletion(HttpResponse<String> jsonResponse, String connectionName, String operation) {
    final int IN_PROGRESS = -1
    if (!(200..299).contains(jsonResponse.status)) {
        throwVetoException("Error occured: $jsonResponse.statusText")
    }
    // Parse the JSON response to get the status of the operation. Keep polling the  server until the operation completes.
    ReadContext ctx = JsonPath.parse(jsonResponse.body)
    int status = ctx.read('$.status')
    for(long delay = 50; status == IN_PROGRESS; delay = Math.min(1000, delay * 2)) {
        sleep(delay)
        status = getJobStatus(connectionName, (String)ctx.read('$.jobId'))
    }

    println("$operation ${status == 0 || status == -1 ? "successful" : "failed"}.\n")
    return status == 0
}

int getJobStatus(String connectionName, String jobId) {
    HttpResponse<String> pingResponse = operation.application.getConnection(connectionName).get("/rest/v3/applications/ICPCM/jobs/" + jobId).asString()
       return JsonPath.parse(pingResponse.body).read('$.status')
    }
 
  • Polling Logic: The script uses exponential backoff for polling the job status, starting from 50 milliseconds and doubling the delay until a maximum of 1000 milliseconds.
  • Error Handling: The script throws an exception if the initial HTTP response is not successful (status code not in the range 200-299).
  • Job Status: It checks if the job status is IN_PROGRESS. If so, it continues polling until the job completes.
  • Print Result: It prints whether the operation was successful or failed based on the final job status.

This script should be integrated within a larger application context where the operation object and its methods (application.getConnection and throwVetoException) are defined

   Run the EPCM Calculation using REST API & connection  /************************************************************************************************/

/*      1. Run Calculation                                                                                                         */

/************************************************************************************************/

    println "1  Run Calculation"

 jsonResponse = operation.application.getConnection("Localhost").post('/rest/v3/applications/PCM/jobs/')

    .header("Content-Type", "application/json")

    .body(json([

    "jobType":"Calculation",

    "jobName":"Run Calculation",

    "parameters":[

        "povDelimiter":":",

        "povName":"$CurYr:$CurMth:Actual:Working",,

        "modelName":"IC Calcs",

        "executionType":"ALL_RULES",

        "clearCalculatedData":"true",

        "executeCalculations":"true",

        "optimizeForReporting":"true",

        "captureDebugScripts":"false"]

        ])

    )

    .asString();

          println 'Response Received'

          println jsonResponse.body

     WaitforCompletion = awaitCompletion(jsonResponse, "Localhost", "Run Calculation Calcs")

making an HTTP POST request to a specific endpoint (/rest/v3/applications/PCM/jobs/) on a server (presumably “Localhost”).

request includes a JSON payload with various parameters for the job, such as the job type (“Calculation”), job name (“Run Calculation”), and other settings.

After sending the request, you’re printing out the response received from the server.

Finally, there’s a call to awaitCompletion

 

   Run the EPCM Clear job using REST API & connection      /***********************************************************************************************/

 /*      2. Clear Job                                                                                                                      

 /***********************************************************************************************/

    println "2a.          CLR_PCM_01"

jsonResponse = operation.application.getConnection("Localhost").post('/rest/v3/applications/PCM/jobs')

    .header("Content-Type", "application/json")

    .body(json([

        "jobType" : "Clear Cube",

        "jobName" : "CLR_PCM_01"

        ])

    )

    .asString();

    println 'Response Received'

    println jsonResponse.body

WaitforCompletion = awaitCompletion(jsonResponse, "Localhost", "CLR_PCM_01")

    Run the EPCM Merge jobs using REST API & connection  

   /**********************************************************************************************/

    /*      3. Merge Data Slices                                                                                              */

    /*      Rule Name:  Merge Data Slices PCM.PCMRPT                                                   */

/**********************************************************************************************/

    println "3.  Merge Data Slices"

          jsonResponse = operation.application.getConnection("Localhost").post('/rest/v3/applications/PCM/jobs')

    .header("Content-Type", "application/json")

    .body(json([

        "jobType" : "Merge Data Slices",

        "jobName" : "Merge Data Slices ICPCM.ICPCMRPT",

        "parameters": [

        "cubeName": "ICPCMRPT",

        "mergeSliceType": "allIncrementalSlicesInMain",

        "keepZeroCells": "false"

    ]

        ])

    )

    .asString();

    println 'Response Received'

    println jsonResponse.body

     WaitforCompletion = awaitCompletion(jsonResponse, "Localhost", "Merge Data Slices")

      Calling the Jobfactory to invoke a Calculation 

/***********************************************************************************************/

/*      4. Push to Reporting Cube                                                                                           */

 /***********************************************************************************************/

 JobFactory jf = operation.application.jobFactory

 JobDefinition jobDef = jf.job("1_Push to Reporting Cube", "Rules", [:])

 Job job = executeJob(jobDef)

 println job.status

 // Print a message indicating the user who executed the rule

println("Rule was executed by $operation.user.fullName")

  

/* Parameters to wait for DM Rules */

boolean pushDataToRPT

HttpResponse<String> jsonResponse

def payload

def awaitCompletion(HttpResponse<String> jsonResponse, String connectionName, String operation) {

    final int IN_PROGRESS = -1

    if (!(200..299).contains(jsonResponse.status)) {

        throwVetoException("Error occured: $jsonResponse.statusText")

    }

    // Parse the JSON response to get the status of the operation. Keep polling the DM server until the operation completes.

    ReadContext ctx = JsonPath.parse(jsonResponse.body)

    int status = ctx.read('$.status')

    for(long delay = 50; status == IN_PROGRESS; delay = Math.min(1000, delay * 2)) {

        sleep(delay)

        status = getJobStatus(connectionName, (String)ctx.read('$.jobId'))

    }

     println("$operation ${status == 0 || status == -1 ? "successful" : "failed"}.\n")

    return status == 0

}

 int getJobStatus(String connectionName, String jobId) {

    HttpResponse<String> pingResponse = operation.application.getConnection(connectionName).get("/" + jobId).asString()

    return JsonPath.parse(pingResponse.body).read('$.status')

}

 

 Run the Data Management rules using REST API & connection  

     println "3a  Run PCM_CLC to RPT"

 jsonResponse = operation.application.getConnection("DM-Localhost").post()

    .header("Content-Type", "application/json")

    .body(json([

         "jobType":"INTEGRATION",

        "jobName":"PCM_CLC to RPT",

        "periodName":"{$CurMth#$CurYr}",

        "importMode":"REPLACE",

        "exportMode":"STORE_DATA"

        ])

    )

    .asString();

    pushDataToRPT = awaitCompletion(jsonResponse, "DM-Localhost", "Push Data to PCMRPT")

 

Job Console Output  







Hope this is useful, Happy days on the Cloud!

No comments:

Post a Comment