Notify users of real-time race events

realtime

Subscription are a great way to notify users in real-time, but when the volume of ingested data is large and the type of ingested data is etherogenous it is important to decouple the ingestion of data from the field (the car racing on the track and data about timing) and the mutation operations on the AppSync API. This enable us to manage the ingestion and notification rate and frequency as well as aggregate data and using a single mutation for ingesting multiple data. Also, you can leverage a processing layer between the ingestion components and mutation components to process and transfrom incoming data. Let’s tackle that next!

1. Create a Kinesis Data Stream

Let’s start by creating a new Kinesis Data Stream. This component is capable to handle a large volume of ingested data, therefore is a perfect fit for our use case. Imagine you have multiple drivers and cars racing on the track, all these cars sending data about lap time, standing position, geolocation, and telemetry. This data can easily achieve a large volume and high throughput. Using Kinesis Data Streams allows you to partitioning data in multiple shards, accordingly to the throughput, as well as creating a buffer to manage ingestion throughput and real-time notification throughput. For the sake of the demo, we can pick one single shard for Kinesis. When dealing with a large volume of ingested data, you need to carefully choose the right number of shards based on the throughput you want to achieve.

Head to your Cloud9 terminal and run amplify analytics add.

amplify analytics add

? Select an Analytics provider Amazon Kinesis Streams
? Enter a Stream name graphqlrealtimeraceKinesis
? Enter number of shards 1
Successfully added resource graphqlrealtimeraceKinesis locally

Then, push changes by running amplify push.

amplify push --yes

2. Configure a Lambda function that consume the stream

Configure a Lambda function that will be triggered any time a new real-time event is ingested into Kinesis Data Stream. This Lambda function consume the data stream and potentially can aggregate or transform data. For the sake of this workshop, the function will consume the data stream and invoke a mutation operation to AppSync.

From your Cloud9 terminal:

amplify add function
? Select which capability you want to add: Lambda function (serverless function)
? Provide an AWS Lambda function name: processLiveEvents
? Choose the runtime that you want to use: NodeJS
? Choose the function template that you want to use: Lambda trigger
? What event source do you want to associate with Lambda trigger? Amazon Kinesis Stream
? Choose a Kinesis event source option Use Analytics category kinesis stream in the current Amplify project
Selected resource graphqlrealtimeraceKinesis

Available advanced settings:
- Resource access permissions
- Scheduled recurring invocation
- Lambda layers configuration

? Do you want to configure advanced settings? Yes
? Do you want to access other resources in this project from your Lambda function? Yes
? Select the categories you want this function to have access to. api, analytics
? Select the operations you want to permit on graphqlrealtimerace Mutation
? Analytics has 2 resources in this project. Select the one you would like your Lambda to access graphqlrealtimeraceKinesis
? Select the operations you want to permit on graphqlrealtimeraceKinesis read

You can access the following resource attributes as environment variables from your Lambda function
        ANALYTICS_GRAPHQLREALTIMERACEKINESIS_KINESISSTREAMARN
        API_GRAPHQLREALTIMERACE_GRAPHQLAPIENDPOINTOUTPUT
        API_GRAPHQLREALTIMERACE_GRAPHQLAPIIDOUTPUT
        ENV
        REGION
? Do you want to invoke this function on a recurring schedule? No
? Do you want to configure Lambda layers for this function? No
? Do you want to edit the local lambda function now? No

Next edit the function code, it’s located at the path amplify/backend/function/processLiveEvents/src/index.js .

/* Amplify Params - DO NOT EDIT
	ANALYTICS_GRAPHQLREALTIMERACEKINESIS_KINESISSTREAMARN
  API_GRAPHQLREALTIMERACE_GRAPHQLAPIENDPOINTOUTPUT
  API_GRAPHQLREALTIMERACE_GRAPHQLAPIIDOUTPUT
  ENV
  REGION
Amplify Params - DO NOT EDIT */

const https = require('https')
const AWS = require('aws-sdk')
const urlParse = require('url').URL
const region = process.env.REGION
const appsyncUrl = process.env.API_GRAPHQLREALTIMERACE_GRAPHQLAPIENDPOINTOUTPUT

const request = (queryDetails, appsyncUrl, apiKey) => {
  const req = new AWS.HttpRequest(appsyncUrl, region)
  const endpoint = new urlParse(appsyncUrl).hostname.toString()

  req.method = 'POST'
  req.path = '/graphql'
  req.headers.host = endpoint
  req.headers['Content-Type'] = 'application/json'
  req.body = JSON.stringify(queryDetails)

  if (apiKey) {
    req.headers['x-api-key'] = apiKey
  } else {
    const signer = new AWS.Signers.V4(req, 'appsync', true)
    signer.addAuthorization(AWS.config.credentials, AWS.util.date.getDate())
  }

  return new Promise((resolve, reject) => {
    const httpRequest = https.request({ ...req, host: endpoint }, (result) => {
      result.on('data', (data) => {
        resolve(JSON.parse(data.toString()))
      })
    })

    httpRequest.write(req.body)
    httpRequest.end()
  })
}

const createRaceEventMutation = /* GraphQL */ `
  mutation CreateRaceEvent(
    $input: CreateRaceEventInput!
    $condition: ModelRaceEventConditionInput
  ) {
    createRaceEvent(input: $input, condition: $condition) {
      id
      eventId
      type
      competitor
      lap
      time
      position
      speed
      gear
      longitude
      latitude
      createdAt
      updatedAt
      _version
      _deleted
      _lastChangedAt
      event {
        id
        title
        date
        description
        heart
        thumbsup
        happy
        _version
        _deleted
        _lastChangedAt
        createdAt
        updatedAt
      }
    }
  }
`


exports.handler = async (event) => {
  
  console.log('new race event:', event)
  
  console.log('Preprocess function received event: ');
  console.log(event);
  try {
    await Promise.all(
      event.Records.map(async(record) => {
        var payload = Buffer.from(record.kinesis.data, 'base64').toString();
        var recordJSON = JSON.parse(payload)
        var inputEvent = {
          eventId: recordJSON.eventId,
          type: recordJSON.type,
          competitor: recordJSON.competitor,
          lap: recordJSON.lap
        }
      
        if(recordJSON.time != null && recordJSON.time !=="") {
          inputEvent.time = recordJSON.time
        }
        if(recordJSON.position != null) {
          inputEvent.position = recordJSON.position
        }
        if(recordJSON.longitude != null) {
          inputEvent.longitude = recordJSON.longitude
        }
        if(recordJSON.latitude != null) {
          inputEvent.latitude = recordJSON.latitude
        }
        if(recordJSON.speed != null) {
          inputEvent.speed = recordJSON.speed
        }
        if(recordJSON.gear != null) {
          inputEvent.gear = recordJSON.gear
        }
      
        var result = await request(
        {
          query: createRaceEventMutation,
          variables: {
            input: inputEvent
          },
        },
        appsyncUrl
       )
       console.log('appsync result', result)
        
     })
    )
  } catch(e) {
    console.log('error catched: ');
    console.log(e);
    return {
      statusCode: 400,
      body: JSON.stringify({
        payload: e
      }),
      headers: {
          "Access-Control-Allow-Origin": "*",
      }
    }
  }
}


The lambda function consume the Kinesis stream records, and for each record invokes a mutation to AppSync to notify in real-time about a new race events. The lambda function may receive more than one record per execution, depending on the configuration and volume of data ingested. Therefore, you need to iterate over the Records collection in the event object to manage all incoming racing events. Also, the function performs some checks on the incoming record payload to ingest the right data into AppSync. Note the selection set for the mutation: it is requesting the whole list of fields available in order for our frontend view to be able to request whatever fields it is required for them when requesting subscription data.