Code samples to help you build your Lambda Function. The pieces of code show what an AWS Lambda function implementation looks like in different programming languages.
We have included two sample functions for you to use to accelerate your function development:
-
A sample function request and the generated response to help you understand Fivetran’s request and response format and how to use the different nodes to write your cloud function.
-
A real-world sample function to retrieve data from the Transport API.
Sample function requestlink
Select your language to view the sample function request:
Gopackage main
import (
"github.com/aws/aws-lambda-go/lambda"
)
type Request struct {
State map[string]string `json:"state"`
Secrets map[string]string `json:"secrets"`
}
type Response struct {
State map[string]string `json:"state"`
Insert map[string][]Record `json:"insert"`
Delete map[string][]Record `json:"delete"`
Schema map[string]Key `json:"schema"`
HasMore bool `json:"hasMore"`
}
type Key struct {
PrimaryKey []string `json:"primary_key"`
}
type Record struct {
Date string `json:"date"`
OrderId int `json:"order_id"`
Amount string `json:"amount"`
Discount string `json:"discount"`
}
func handleLambdaEvent(request Request) (Response, error) {
insertTransactions, deleteTransactions, newTransactionsCursor := apiResponse(request.State, request.Secrets)
newState := make(map[string]string)
newState["transactionsCursor"] = newTransactionsCursor
primary_key := make([]string, 0)
primary_key = append(primary_key, "order_id")
primary_key = append(primary_key, "date")
transactionsSchema := make(map[string]Key)
transactionsSchema["transactions"] = Key{PrimaryKey: primary_key}
return Response{State: newState, Insert: insertTransactions, Delete: deleteTransactions, Schema: transactionsSchema, HasMore: false}, nil
}
func apiResponse(state map[string]string, secrets map[string]string) (map[string][]Record, map[string][]Record, string) {
insertTransactions := make(map[string][]Record)
insertTransactions["transactions"] = append(insertTransactions["transactions"], Record{Date: "2017-12-31T05:12:05Z", OrderId: 1001, Amount: "$1200", Discount: "$12"})
insertTransactions["transactions"] = append(insertTransactions["transactions"], Record{Date: "2017-12-31T05:12:05Z", OrderId: 1002, Amount: "$1345", Discount: "$14"})
deleteTransactions := make(map[string][]Record)
deleteTransactions["transactions"] = append(deleteTransactions["transactions"], Record{Date: "2017-12-31T05:12:05Z", OrderId: 1001, Amount: "$1200", Discount: "$12"})
return insertTransactions, deleteTransactions, "2018-01-01T00:00:00Z"
}
func main() {
lambda.Start(handleLambdaEvent)
}
Java
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class LambdaFunctionHandler implements RequestHandler<Request, Response> {
private static final ObjectMapper MAPPER = new ObjectMapper();
@Override
public Response handleRequest(Request request, Context context) {
return apiResponse(request.state, request.secrets);
}
Response apiResponse(Object state, Object secrets) {
Record record1 = new Record(Instant.parse("2017-12-31T05:12:05Z"), 1001L, "$1200", "$12");
Record record2 = new Record(Instant.parse("2017-12-31T06:12:04Z"), 1001L, "$1200", "$12");
Record record3 = new Record(Instant.parse("2017-12-31T05:12:05Z"), 1000L, "$1200", "$12");
Record record4 = new Record(Instant.parse("2017-12-31T06:12:04Z"), 1000L, "$1200", "$12");
HashMap<String, Instant> newState = new HashMap<>();
newState.put("transactionCursor", Instant.parse("2018-01-01T00:00:00Z"));
state = newState;
List<Record> records = new ArrayList<>();
records.add(record1);
records.add(record2);
Map<String, Object> insert = new HashMap<>();
insert.put("transactions", records);
records.clear();
records.add(record3);
records.add(record4);
Map<String, Object> delete = new HashMap<>();
delete.put("transactions", records);
List<String> primaryKey = new ArrayList<>();
primaryKey.add("order_id");
primaryKey.add("date");
Map<String, List<String>> transactionsSchema = new HashMap<>();
transactionsSchema.put("primary_key", primaryKey);
Map<String, Object> schema = new HashMap<>();
schema.put("transactions", transactionsSchema);
return new Response(state, insert, delete, schema, true);
}
}
public class Request {
Object state;
Object secrets;
public Object getState() {
return state;
}
public void setState(Object state) {
this.state = state;
}
public Object getSecrets() {
return secrets;
}
public void setSecrets(Object secrets) {
this.secrets = secrets;
}
public Request(Object state, Object secrets) {
this.state = state;
this.secrets = secrets;
}
public Request() {
}
}
import java.util.Map;
public class Response {
Object state;
Map<String, Object> insert;
Map<String, Object> delete;
Map<String, Object> schema;
boolean hasMore;
public Object getState() {
return state;
}
public void setState(Object state) {
this.state = state;
}
public Map<String, Object> getInsert() {
return insert;
}
public void setDelete(Map<String, Object> insert) {
this.insert = insert;
}
public Map<String, Object> getDelete() {
return delete;
}
public void setSchema(Map<String, Object> delete) {
this.delete = delete;
}
public Map<String, Object> getSchema() {
return schema;
}
public void setInsert(Map<String, Object> schema) {
this.schema = schema;
}
public boolean getHasMore() {
return hasMore;
}
public void setHasMore(boolean hasMore) {
this.hasMore = hasMore;
}
public Response(
Object state,
Map<String, Object> insert,
Map<String, Object> delete,
Map<String, Object> schema,
boolean hasMore) {
this.state = state;
this.insert = insert;
this.delete = delete;
this.schema = schema;
this.hasMore = hasMore;
}
public Response() {
}
}
import java.time.Instant;
public class Record {
public Instant date;
public Long order_id;
public String amount;
public String discount;
public Record(Instant date, Long order_id, String amount, String discount) {
this.date = date;
this.order_id = order_id;
this.amount = amount;
this.discount = discount;
}
}
Node.js
exports.handler = (request, context, callback) => {
callback(null, update(request.state, request.secrets));
};
function update(state, secrets) {
// Fetch records using api calls
let [insertTransactions, deleteTransactions, newTransactionsCursor] = apiResponse(state, secrets);
// Populate records and state
return ({
state: {
transactionsCursor: newTransactionsCursor
},
insert: {
transactions: insertTransactions
},
delete: {
transactions: deleteTransactions
},
schema : {
transactions : {
primary_key : ['order_id', 'date']
}
},
hasMore : false
});
}
function apiResponse(state, secrets) {
var insertTransactions = [
{"date":'2017-12-31T05:12:05Z', "order_id":1001, "amount":'$1200', "discount":'$12'},
{"date":'2017-12-31T06:12:04Z', "order_id":1001, "amount":'$1200', "discount":'$12'},
];
var deleteTransactions = [
{"date":'2017-12-31T05:12:05Z', "order_id":1000, "amount":'$1200', "discount":'$12'},
{"date":'2017-12-31T06:12:04Z', "order_id":1000, "amount":'$1200', "discount":'$12'},
];
return [insertTransactions, deleteTransactions, '2018-01-01T00:00:00Z'];
}
Python
import json
def lambda_handler(request, context):
# Fetch records using api calls
(insertTransactions, deleteTransactions, newTransactionCursor) = api_response(request['state'], request['secrets'])
# Populate records in insert
insert = {}
insert['transactions'] = insertTransactions
delete = {}
delete['transactions'] = deleteTransactions
state = {}
state['transactionsCursor'] = newTransactionCursor
transactionsSchema = {}
transactionsSchema['primary_key'] = ['order_id', 'date']
schema = {}
schema['transactions'] = transactionsSchema
response = {}
# Add updated state to response
response['state'] = state
# Add all the records to be inserted in response
response['insert'] = insert
# Add all the records to be marked as deleted in response
response['delete'] = delete
# Add schema defintion in response
response['schema'] = schema
# Add hasMore flag
response['hasMore'] = 'false'
return response
def api_response(state, secrets):
# your api call goes here
insertTransactions = [
{"date":'2017-12-31T05:12:05Z', "order_id":1001, "amount":'$1200', "discount":'$12'},
{"date":'2017-12-31T06:12:04Z', "order_id":1001, "amount":'$1200', "discount":'$12'},
]
deleteTransactions = [
{"date":'2017-12-31T05:12:05Z', "order_id":1000, "amount":'$1200', "discount":'$12'},
{"date":'2017-12-31T06:12:04Z', "order_id":1000, "amount":'$1200', "discount":'$12'},
]
return (insertTransactions, deleteTransactions, '2018-01-01T00:00:00Z')
For more information about the fields in the request, see our request format documentation.
Generated responselink
The sample function request generates the following JSON response:
{
"state": {
"transactionsCursor": "2018-01-01T00:00:00Z"
},
"insert": {
"transactions": [
{
"date": "2017-12-31T05:12:05Z",
"order_id": 1001,
"amount": "$1200",
"discount": "$12"
},
{
"date": "2017-12-31T06:12:04Z",
"order_id": 1001,
"amount": "$1200",
"discount": "$12"
}
]
},
"delete": {
"transactions": [
{
"date": "2017-12-31T05:12:05Z",
"order_id": 1000,
"amount": "$1200",
"discount": "$12"
},
{
"date": "2017-12-31T06:12:04Z",
"order_id": 1000,
"amount": "$1200",
"discount": "$12"
}
]
},
"schema": {
"transactions": {
"primary_key": [
"order_id",
"date"
]
}
},
"hasMore": "true"
}
For more information about the fields in the response, see our response format documentation.
Real-world sample functionlink
Download our sample Transport - API (London Tube) function. Select your language to download the sample function (.zip file):
Upload the downloaded .zip file to the Lambda console. For more information, see Lambda deployment packages.