Ingest real-time data into your app

realtime

Let’s simulate some real-time data for the race event, like a car racing on a track that sends telemetry data such as current gear shift and speed.

1. Add Lambda function to simulate race events

First of all, let’s create an AWS Lambda function that can simulate telemetry data events about a racing car that’s moving along the race track. We can generate the geolocation position as well as current gear shift and the current speed. This Lambda function simulates data points for a single lap, and sends data to the Kinesis Data Stream for the ingestion.

In the Cloud9 terminal, execute:

amplify add function
? Select which capability you want to add: Lambda function (serverless function)
? Provide an AWS Lambda function name: simulateRaceEvents
? Choose the runtime that you want to use: NodeJS
? Choose the function template that you want to use: Hello World

Available advanced settings:
- Resource access permissions
- Scheduled recurring invocation
- Lambda layers configuration

? Do you want to configure advanced settings? Yes
? Do you want to access other resources in this project from your Lambda function? Yes
? Select the categories you want this function to have access to. analytics
? Analytics has 2 resources in this project. Select the one you would like your Lambda to access graphqlrealtimeraceKinesis
? Select the operations you want to permit on graphqlrealtimeraceKinesis create, read, update

You can access the following resource attributes as environment variables from your Lambda function
        ANALYTICS_GRAPHQLREALTIMERACEKINESIS_KINESISSTREAMARN
        ENV
        REGION
? Do you want to invoke this function on a recurring schedule? No
? Do you want to configure Lambda layers for this function? No
? Do you want to edit the local lambda function now? No
Successfully added resource simulateRaceEvents locally.

Next edit the function code, it’s located at this path: amplify/backend/function/simulateRaceEvents/src/index.js .

/* Amplify Params - DO NOT EDIT
	ANALYTICS_GRAPHQLREALTIMERACEKINESIS_KINESISSTREAMARN
	ENV
	REGION
Amplify Params - DO NOT EDIT */

const streamARN = process.env.ANALYTICS_GRAPHQLREALTIMERACEKINESIS_KINESISSTREAMARN; 
const streamName = streamARN.match(/([^\/]*)\/*$/)[1]
const AWS = require('aws-sdk');
const kinesis = new AWS.Kinesis({
    region: process.env.REGION
});

var telemetryData = require('./telemetry');
var lapData = require('./events');


function wait(milleseconds) {
  return new Promise(resolve => setTimeout(resolve, milleseconds))
}

exports.handler = async (event,context, callback) => {
    
    console.log('SIMULATION FUNCTION GOT EVENT: ');
    console.log(JSON.stringify(event,null,2));
    
    var simulatedRaceLastEventIndex = event.status != null ? event.status : 0;
    if(simulatedRaceLastEventIndex < lapData.events.length) {
        var record = lapData.events[simulatedRaceLastEventIndex];
        record.eventId = event.race.eventID;
        
        for (var telemetry in telemetryData.events) {
            var type = '_telemetry';
            console.log('KINESIS PUT RECORD as: ');
            var telemetryRecord = telemetryData.events[telemetry];
            telemetryRecord.type = type
            telemetryRecord.lap = record.lap;
            telemetryRecord.eventId = record.eventId;
            telemetryRecord.competitor = record.competitor;
            console.log(JSON.stringify(telemetryRecord,null,2));
            const data = await kinesis.putRecord({
                Data: JSON.stringify(telemetryRecord),
                PartitionKey: record.eventId + type,
                StreamName: streamName
            }).promise();
            await wait(760);
        }
        
        console.log('KINESIS PUT RECORD as: ');
        console.log(JSON.stringify(record,null,2));
        const data = await kinesis.putRecord({
            Data: JSON.stringify(record),
            PartitionKey: record.eventId + record.type,
            StreamName: streamName
        }).promise();
        console.log('KINESIS PUT RECORD DATA IS: ');
        console.log(JSON.stringify(data,null,2));
        simulatedRaceLastEventIndex++;
        console.log('returning simulatedRaceLastEventIndex: ' + simulatedRaceLastEventIndex);    
    } else {
        simulatedRaceLastEventIndex = "completed";
    }
    
    callback(null, simulatedRaceLastEventIndex);
};

};

The Lambda function code reads two files with mocked data:

  1. telemetry.js, which contains data about positions on the track, the gear and the speed for the car when at a specific position;
  2. events.js that contains laptime for a driver for every lap.

For every event generated, the code then performs a kinesis.putRecord to ingest data for the event in the Kinesis Data Stream.

Let’s change the default timeout, since this Lambda function must simulate one track lap for each execution. Edit the file located at /amplify/backend/function/simulateRaceEvents/simulateRaceEvents-cloudformation-template.json and change the Timeout property for the Lambda function resource to this value:

"Runtime": "nodejs14.x",
"Layers": [],
"Timeout": "90"

In addition, under the src folder of this lambda function, add the two expected files: telemetry.js and events.js.

Right click on the events_telemetry.zip link above and copy the link to this file. Depending on your browser, this could be: “Copy Link Address”, “Copy Link Location”, etc… You can use these files to populate the telemetry.js and events.js files, just copy and paste contents.

Now let’s deploy your changes from the cli:

amplify push --yes

2. Create a custom resource for Step Functions

Step Functions is not natively supported by the Amplify CLI, so we must extend the app using a custom resource. To extend our Amplify applications with custom resources we start by declaring our custom category within the Amplify toolchain. Amplify categories are declared in the /amplify/backend/backend-config.json file so we give our custom category a name and add resources under it in the same way the Amplify CLI does for the other categories.

Since we need to reference outputs from other resources, we can declare them as parameters in our template and add them in the dependsOn array in our backend-config.json. In our case we require the name and ARN of the Lambda function we just added using the Amplify CLI, and you can add the output attributes we require in the backend-config.json file as well.

Edit the /amplify/backend/backend-config.json file and add the custom category:

"stepFunction": {
        "processWorkflow": {
            "service": "Step Function",
            "providerPlugin": "awscloudformation",
            "dependsOn": [
                {
                    "category": "function",
                    "resourceName": "simulateRaceEvents",
                    "attributes": [
                        "Name",
                        "Arn"
                    ]
                }                
            ]
        }
}

The custom category name is stepFunction and our first resource is named processWorkflow. Amplify will expect the same naming convention for your custom resources as it does for other categories, so keep category and resources names short and non-conflicting.

Next, we replicate the directory structure Amplify uses for the other categories and their backend resources.

/amplify
  |- backend
  |   |- stepFunction
  |   |   |- processWorkflow

Under the newly created resource directory, add the CloudFormation template and parameter json files to describe custom resources for Amplify to manage for us. We need to ensure these files are unique within our custom category, so it is a good idea to prefix these template files with the resource name as follows.

/amplify

  |- backend
  |   |- stepFunction
  |   |   |- processWorkflow
  |   |   |   - processWorkflow-cloudformation-template.yml
  |   |   |   - processWorkflow-parameters.json

Now, let’s edit the cloudformation file processWorkflow-cloudformation-template.yml .

AWSTemplateFormatVersion: 2010-09-09
Description: AWS StepFunctions for ingestion of RealTime Race Update
Metadata: {}

Parameters: 
  env:
    Type: String
    Description: The environment name. e.g. Dev, Test, or Production
    Default: NONE
  WaitIntervalInSec:
    Type: Number
    Description: periodically call race lap events (1 to 600) seconds
    MinValue: 1
    MaxValue: 600
    Default: 1
  functionsimulateRaceEventsName:
    Type: String
  functionsimulateRaceEventsArn:
    Type: String

Resources:
  ProcessWorkflowExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      Path: /
      AssumeRolePolicyDocument: 
        Version: "2012-10-17"
        Statement:
          -
            Effect: "Allow"
            Action: "sts:AssumeRole"
            Principal:
              Service:
                - !Sub states.${AWS::Region}.amazonaws.com
      Policies:
        - 
         PolicyName: !Sub "GraphQL-race-${env}-statemachine-service-role"
         PolicyDocument:
           Version: "2012-10-17"
           Statement:
              - 
                Effect: "Allow"
                Action: "lambda:InvokeFunction"
                Resource: !Ref functionsimulateRaceEventsArn
  ProcessWorkflowStateMachine:
    Type: AWS::StepFunctions::StateMachine
    Properties:
      StateMachineName: !Sub "GraphQL-race-processWorkflow-${env}"
      RoleArn: !GetAtt ProcessWorkflowExecutionRole.Arn
      DefinitionString:
        !Sub 
          - |-
              {
                "Comment": "Workflow that runs a polling job of a race data and monitors the job until it completes.",
                "StartAt": "WaitForDueDate",
                "States": {
                  "WaitForDueDate": {
                    "Type": "Wait",
                    "TimestampPath": "$.planned_race_start",
                    "Next": "Get Race Lap Data"
                  },
                  "Wait X Seconds": {
                    "Type": "Wait",
                    "Seconds": 1,
                    "Next": "Get Race Lap Data"
                  },
                  "Get Race Lap Data": {
                    "Type": "Task",
                    "Resource": "${x1}",
                    "Next": "Race Complete?",
                    "InputPath": "$",
                    "ResultPath": "$.status",
                    "Retry": [
                      {
                        "ErrorEquals": ["States.ALL"],
                        "IntervalSeconds": 1,
                        "MaxAttempts": 3,
                        "BackoffRate": 2
                      }
                    ]
                  },
                  "Race Complete?": {
                    "Type": "Choice",
                    "Choices": [
                      {
                        "Variable": "$.status",
                        "StringEquals": "error",
                        "Next": "Race Failed"
                      },
                      {
                        "Variable": "$.status",
                        "StringEquals": "completed",
                        "Next": "Race Ended"
                      }
                    ],
                    "Default": "Wait X Seconds"
                  },
                  "Race Failed": {
                    "Type": "Fail",
                    "Cause": "Race Data Ingestion Failed",
                    "Error": "Get Race Lap Data returned FAILED"
                  },
                  "Race Ended": {
                    "Type": "Succeed"
                  }
                }
              }
          -
            {
              x0: !Ref WaitIntervalInSec,
              x1: !Ref functionsimulateRaceEventsArn
            }
Outputs:
  ProcessWorkflowExecutionRole:
    Value:
      Ref: ProcessWorkflowExecutionRole
  
  Arn:
    Value:
      Ref: ProcessWorkflowStateMachine

In this cloudformation template we provide the state machine definition as well. The last step we need to do is let the CLI know about our custom category and resource by checking out the current environment.

amplify env checkout staging

Let’s check the CLI status to get evidence of the custom resource.

amplify status

Current Environment: staging

| Category     | Resource name              | Operation | Provider plugin   |
| ------------ | -------------------------- | --------- | ----------------- |
| Stepfunction | processWorkflow            | Create    | awscloudformation |
| Api          | graphqlrealtimerace        | No Change | awscloudformation |
| Auth         | graphqlrealtimerace        | No Change | awscloudformation |
| Hosting      | amplifyhosting             | No Change | awscloudformation |
| Function     | updateLocation             | No Change | awscloudformation |
| Function     | S3Triggercbb7674a          | No Change | awscloudformation |
| Function     | sharp                      | No Change | awscloudformation |
| Function     | pinpointInterface          | No Change | awscloudformation |
| Function     | processLiveEvents          | No Change | awscloudformation |
| Function     | simulateRaceEvents         | No Change | awscloudformation |
| Storage      | chatImages                 | No Change | awscloudformation |
| Analytics    | graphqlrealtimerace        | No Change | awscloudformation |
| Analytics    | graphqlrealtimeraceKinesis | No Change | awscloudformation |

Let’s push our changes:

amplify push --yes

Now, let’s open the Step Function console in the AWS management console and check that the new State Machine has been deployed and the definition is correct

realtime

The State Machina will stay in the WaitForDueDate status unitil the current date is before the date value for the relative event. Then the Get Race Lap Data state executes the simulateRaceEvents Lambda function to generate telemetry and geolocation data for a single lap. When all data for the lap is ingested, the lapTime data is sent. The, if there are more lap to simulate the state machine waits for a second and then execute the simulatedRaceEvents function again. Otherwise, the State Machine execution ends.

3. Add a Lambda trigger for race simulation

The last component you need to add is a new Lambda function that is triggered anytime a new event is created. The scope of this function is to create a new Step Function workflow execution that starts when at the event’s date, in order to simulate race laps for the event itself. This lambda function is triggered by the DyamoDB stream for the table related to the Event @model in our GraphQL API.

amplify add function
? Select which capability you want to add: Lambda function (serverless function)
? Provide an AWS Lambda function name: raceSimulationScheduler
? Choose the runtime that you want to use: NodeJS
? Choose the function template that you want to use: Lambda trigger
? What event source do you want to associate with Lambda trigger? Amazon DynamoDB Stream
? Choose a DynamoDB event source option Use API category graphql @model backed DynamoDB table(s) in the current Amplify project
Selected resource graphqlrealtimerace
? Choose the graphql @model(s) Event

Available advanced settings:
- Resource access permissions
- Scheduled recurring invocation
- Lambda layers configuration

? Do you want to configure advanced settings? No

Since we created a custom StepFunction category we must provide a custom policy for the Lambda function to be able to create a new workflow execution, as well as providing the worklow reference as environment variable.

Let’s open the cloudformation template located at amplify/backend/function/raceSimulationScheduler/raceSimulationScheduler-cloudformation-template.json and add an additional parameter in the Parameters section:

"stepFunctionprocessWorkflowArn": {
      "Type": "String"
    }

Now that the parameter is set in the cloudformation, let’s make it available as ENV variable in the Lambda function. Locate the Environment section and add the STATEMACHINE_ARN variable as desribed here:

"Environment": {
        "Variables": {
                "ENV": {
                        "Ref": "env"
                },
                "REGION": {
                        "Ref": "AWS::Region"
                },
                "STATEMACHINE_ARN": {
                        "Ref": "stepFunctionprocessWorkflowArn"
                }
        }
}

Then, add the following custom IAM policy as last resource:

"LambdaCustomResourcesPolicy": {
      "DependsOn": [
        "LambdaExecutionRole"
      ],
      "Type": "AWS::IAM::Policy",
      "Properties": {
        "PolicyName": "amplify-lambda-custom-execution-policy",
        "Roles": [
          {
            "Ref": "LambdaExecutionRole"
          }
        ],
        "PolicyDocument": {
          "Version": "2012-10-17",
          "Statement": [
            {
              "Effect": "Allow",
              "Action": [
                "states:StartExecution"
              ],
              "Resource": {
                "Fn::Sub": "${stepFunctionprocessWorkflowArn}"
              }
            }
          ]
        }
      }
    }

Finally, open the parameters.json file locate at amplify/backend/function/raceSimulationScheduler/parameters.json, and change the content to this:

{
    "stepFunctionprocessWorkflowArn": {
    "Fn::GetAtt": [
      "stepFunctionprocessWorkflow",
      "Outputs.Arn"
    ]
  }
}

This is how you can ingest input parameters coming from outputs of customer resources.

Now, locate the lambda function code at /amplify/backend/function/raceSimulationScheduler/src/index.js and change the file content with this:

const stateMachineArn = process.env.STATEMACHINE_ARN;
const region = process.env.REGION;
const AWS = require('aws-sdk');
const stepfunctions = new AWS.StepFunctions();
const dynamoDb = new AWS.DynamoDB.DocumentClient();

exports.handler = async(event) => {
  //eslint-disable-line
  console.log(JSON.stringify(event, null, 2));
  var results = [];
  await Promise.all(
    event.Records.map(async(record) => {
      console.log(record.eventID);
      console.log(record.eventName);
      console.log('DynamoDB Record: %j', record.dynamodb);
      var unmarshalledNewImage = AWS.DynamoDB.Converter.unmarshall(record.dynamodb.NewImage);
      console.log('unmarshalledNewImage: %j', unmarshalledNewImage);
      console.log('stepfunctions');
      console.log(stepfunctions);
      if(record.eventName == "INSERT") {
        console.log("inseriting new execution");
        const result = await stepfunctions.startExecution({
                  stateMachineArn,
                  input: JSON.stringify({
                      planned_race_start: unmarshalledNewImage.date, 
                      race: {
                        eventID: unmarshalledNewImage.id
                      }
                  }),
              }).promise();
        console.log('result: %j', result);
        results.push(result);  
      }
    })
  )
  return {
      results: results
    };
};

The Lambda function processes the DynamoDB streams for the Event table and for every new INSERT event it creates a new Step Function Execution, providing input parameters such as the race start date and the event id. Now, any time a new Event is created a new execution is created as well.