In this article we are talking about how can we process your data that pushed to AWS SQS queue.The small lambda function which will work as data shipper that pulling data from sqs and push to Elasticsearch.Lambda first check index existence if it is found it will push data other wise it will create index and push data.This process will be useful for logs analytics, sales tracking without imposing load on AWS ES for data indexing purpose.
The following is the flow to ship your data to ES with high availability service between ES and SQS.
Flow Diagram Explanation
First lambda trigger by cloud watch rule and it starts reading sqs queue assigned. Once lambda detects records in queue it will start processing.
-
Case 1: If no records found in sqs queue lambda will terminate.
-
Case 2: If records found in sqs , lambda wil push records to ES and deletes from queue.
-
Lambda implementation
Tested environment java 8:
Jest API : 2.4
Elasticsearch : 5.1
Code Explanation:
Check Indices existence which will be in the below file
Env Variables Configured in lambda:
String indexName=System.getenv("indexName");
String mapping=System.getenv("indexMapping");
String datePattern=System.getenv("datePattern");
Main Handler responsible for lambda execution:
public List<String> handleRequest(String input, Context context){
Try{
//check index existing or not
IndicesExists indicesExists = new IndicesExists.Builder(monthlyIndexName).build();
boolean indexExists =jestConnection.execute(indicesExists).isSucceeded();
if(indexExists){
context.getLogger().log("index is already existed in ElasticSearch");
rsQueue.getQueueMessagesTOES(jestConnection,monthlyIndexName,mapping,parser,context);
}else{
/**ES Connection Object**/
jestConnection.execute(new CreateIndex.Builder(monthlyIndexName).build());
/** Reading json Mapping from file**/
Object mappingObject = parser.parse(new InputStreamReader(new FileInputStream("xmlxyz.json")));
context.getLogger().log("Json Mapping for elasticsearch is "+mappingObject);
PutMapping putMapping = new PutMapping.Builder(monthlyIndexName,mapping,mappingObject).build(); //create mapping if not exists
JestResult resultJest=jestConnection.execute(putMapping);
context.getLogger().log("ResponseCode when creating mapping ::::"+resultJest.getResponseCode());
if(resultJest.getResponseCode()==200){
context.getLogger().log("Mapping creation was successful");
//read queue rsQueue.getQueueMessagesTOES(jestConnection,monthlyIndexName,mapping,parser,context);
}
else{
context.getLogger().log("Mapping creation was failed!");
context.getLogger().log("Reason for mapping creation failing ::::"+resultJest.getErrorMessage());
}
}
larrayObject.add(input);
Note:
larrayObject.add(input);
In the above code snippet we are handling a dummy string input with (larrayObject) to support by lambda function . That inputs string needs to add to list and return in list format other wise it will throw following exception
/*dummy input has to return we need to check this one if it is not arraylist lambda will throw fasterxml error*/
ReadSqsQueue.java
Read queue iteratively and push to Elasticsearch
SQS API to read queue.
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(sqsUrl);
receiveMessageRequest.setMaxNumberOfMessages(10);
List<Message> messages =awsConnection.getSqsConnection().receiveMessage(receiveMessageRequest).getMessages();
Note : only 10 records will come from queue for one Receive call. It is rule of AWS SQS
Getting json string (each record) from queue and converting into JSON in order to push to Elasticsearch.
JSONObject jsonResponseObjectMain = (JSONObject) parser.parse(message.getBody());
//grab header object
Object headerObj = parser.parse(jsonResponseObjectMain.get("header").toString());
JSONObject jsonObjectHeader = (JSONObject) headerObj;
String headerId=jsonObjectHeader.get("id").toString();
/index to AWS
Index index = new Index.Builder(jsonResponseObjectMain).index(indexNameWithDate).type(indexMapping).id(headerId)
.setParameter("pipeline","timestamp").build();
DocumentResult responseFromES=client.execute(index);
if(responseFromES.isSucceeded()){
awsConnection.getSqsConnection().deleteMessage(new DeleteMessageRequest().withQueueUrl(sqsUrl).withReceiptHandle(message.getReceiptHandle()));
}else{
/**nothing we are doing..It will be remain in queue and will try after some time
* even though it is not processed next time like 5 times it will be added to dead letter queue
* that logic need to implement here only if dead letter queue has records process it and send it to ES again**/
}
So we can schedule this job using cloud watch rules.