Read & Write data into AWS SQS using Java

Amazon Simple Queue Service (Amazon SQS) is a fully managed message queuing service that makes it easy to decouple and scale microservices, distributed systems, and serverless applications. Amazon SQS moves data between distributed application components and helps you decouple these components.
Before going to development install AWS SDK.
For basic theory check my page AWS SQS.
In application.yml 
sqs:
region: ap-south-1
accessKeyId: arunsinghgujjar
secretAccessKey: jainpurwalearunsingh/saharanpursepauchepuna
cloud:
aws:
end-point:
uri: https://arun-learningsubway-1.amazonaws.com/9876974864/learningsubway_SQS.fifo
queue:
max-poll-time: 20
max-messages: 10
fetch-wait-on-error: 60
enabled: true
content: sqs
In Java need to create two classes one foe send message and another to read & delete message from queue
To Send SQS Message
public static class SqsConfig {
private String region;
private String accessKeyId;
private String secretAccessKey;
}
public class SendMessageByLearningsubway {
@Value("${cloud.aws.queue.enabled}")
private Boolean enabled;
@Value("${cloud.aws.end-point.uri}")
private String url;
@Value("${cloud.aws.queue.content}")
private String queueType;
@Value("${cloud.aws.queue.max-poll-time}")
private Integer maxPollTime;
@Value("${cloud.aws.queue.max-messages}")
private Integer maxMessages;
@Value("${cloud.aws.queue.fetch-wait-on-error}")
private Integer fetchWaitOnError;
@Autowired
public SqsClient sqsClient;
public String sendMessage(MessageDistributionEvent messageDistributionEvent) {
SendMessageResponse sendMessage = null;
try {
Map<String, MessageAttributeValue> attributes = new HashMap<>();

String recepList = "";
for (Integer myInt : messageDistributionEvent.getRecipients()) {
recepList = recepList + "_" + myInt;
}
SendMessageRequest sendMsgRequest = SendMessageRequest.builder()
.queueUrl(url)
.messageBody(messageDistributionEvent.getChannelId() + "_" + messageDistributionEvent.getMessageId() + "" + recepList)
.messageGroupId("1")
.messageAttributes(attributes)
.build();
sendMessage = sqsClient.sendMessage(sendMsgRequest);
} catch (Exception ex) {
log.info("failed to send message :" + ex);
}
return sendMessage.sequenceNumber();
}
}
To Read and Delete Message
If message is not deleted, it will remain there in queue and possibility of retransmission.
public class ReceiveMessageLearningsubway {
@Value("${cloud.aws.queue.enabled}")
private Boolean enabled;
@Value("${cloud.aws.end-point.uri}")
private String url;
@Value("${cloud.aws.queue.content}")
private String queueType;
@Value("${cloud.aws.queue.max-poll-time}")
private Integer maxPollTime;
@Value("${cloud.aws.queue.max-messages}")
private Integer maxMessages;
@Value("${cloud.aws.queue.fetch-wait-on-error}")
private Integer fetchWaitOnError;
private boolean running = false;
@Autowired
public SqsClient sqsClient;
@PostConstruct
public void start() {
if (enabled && queueType.equals("sqs")) {
running = true;
new Thread(this::startListener, "sqs-listener").start();
}
}
@PreDestroy
public void stop() {
running = false;
}
private void startListener() {
while (running) {
try {
ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder()
.queueUrl(url)
.waitTimeSeconds(maxPollTime)
.maxNumberOfMessages(maxMessages)
.messageAttributeNames("MessageLabel")
.build();
List<Message> sqsMessages = sqsClient.receiveMessage(receiveMessageRequest).messages();
for (Message message : sqsMessages) {
try {
String body = message.body();
String[] data = body.split("_");
List<Integer> listRecipient = new LinkedList<>();
for (int i = 2; i < data.length; i++) {
log.info("RecepId:" + data[i]);
listRecipient.add(Integer.parseInt(data[i]));
}
//data read from queue by learningsubway.com
System.out.println(Integer.parseInt(data[0]), data[1], listRecipient);
sqsClient.deleteMessage(DeleteMessageRequest.builder()
.queueUrl(url)
.receiptHandle(message.receiptHandle())
.build());
} catch (Exception e) {
log.error("Failed to process ", message.messageId());
}
}
} catch (Exception e) {
log.warn("Error in fetching messages from SQS Queue. Will sleep and retry again.", e);
try {
Thread.sleep(fetchWaitOnError * 1000);
} catch (InterruptedException ie) {
log.error("Unable to sleep the sqs-listener", e);
}
}
}
}
public String sendMessage(QueueMessage message) {
SendMessageResponse sendMessage = null;
try {
log.info("send message on sqs");
Map<String, MessageAttributeValue> attributes = new HashMap<>();
log.info("send message on SQS: " + message);
attributes.put("ContentBasedDeduplication", MessageAttributeValue.builder().dataType("String").stringValue("true").build());
attributes.put("MessageLabel", MessageAttributeValue.builder()
.dataType("String")
.stringValue(message.getMsgId())
.build());
SendMessageRequest sendMsgRequest = SendMessageRequest.builder()
.queueUrl(url)
.messageBody(message.getMsgId())
.messageGroupId("1")
.messageAttributes(attributes)
.build();
sendMessage = sqsClient.sendMessage(sendMsgRequest);
log.info("data sent to queue: " + sendMessage.sequenceNumber());
} catch (Exception ex) {
log.info("failed to send message :" + ex);
}
return sendMessage.sequenceNumber();
}
}

FileBeat 401 Unauthorized Error with AWS Elasticsearch

Overview

Filebeat is a lightweight shipper for forwarding and centralizing log data. Installed as an agent on your servers, Filebeat monitors the log files or locations that you specify, collects log events, and forwards them either to Elasticsearch or Logstash for indexing.

Here’s how Filebeat works: When you start Filebeat, it starts one or more inputs that look in the locations you’ve specified for log data. For each log that Filebeat locates, Filebeat starts a harvester. Each harvester reads a single log for new content and sends the new log data to libbeat, which aggregates the events and sends the aggregated data to the output that you’ve configured for Filebeat.

If you are getting below error while importing data into AWS Elasticsearch directly from Filebeat, then this post is for you!

Exiting: 1 error: error loading index pattern: returned 401 to import file: . Response: {“statusCode”:401,”error”:”Unauthorized”,”message”:”Authentication required”}
Exiting: 1 error: error loading index pattern: returned 401 to import file: . Response: {“statusCode”:401,”error”:”Unauthorized”,”message”:”Authentication required”}

This issue comes if you are approaching AWS Elasticsearch with username/password security as

setup.kibana:
host: “https://arun-learningsubway-abxybalglzl3zmkmiq4.ap-south-1.es.amazonaws.com:443/_plugin/kibana/”

output.elasticsearch:
protocol: https
hosts: [“arun-learningsubway-workapps-abxybalglzl3zmkmiq4.ap-south-1.es.amazonaws.com:9200”]
username: “myUsername”
password: “myPassword”
index: “nginx_index_by_arun”

Solution

In Aws, while configuring your Elasticsearch service configure it for whitelisting of IP instead of Master User.

or

Configure FileBeat–> Logstash–>Elasticsearch with master username/password also it will work.

FileBeat and Logstash to insert Data into AWS Elasticsearch

FileBeat to insert Data into Logstash, and Logstash to insert Data into Elasticsearch

*Important point here is latest Elasticsearch version supported on AWS is 7.10, so Logstash and FileBeat must also be on same version.

If not then there will be a possibility of version compatibility.

* If latest version of ES available is x and you are not on cloud then also keep at least (x-1) version on production. It will keep you safe in production and away from product bugs to a lot extent.

Click and Download Filebeat 7.10 and Logstash7.10

Configuration of FileBeat to insert nginx logs into Logstash

Open filebeat.yml in any editor of your choice from location

/etc/filebeat/ on Linux or

C:\Program Files\filebeat-7.10.0 on windows

filebeat:
inputs:
– paths:
– E:/nginx-1.20.1/logs/.log
input_type: log


filebeat.config.modules:
enabled: true
path: ${path.config}/modules.d/*.yml


output:
logstash:
hosts: [“localhost:5044”]

Logstash Configuration

input {
beats {
port => 5044
ssl => false
}
}

filter {
grok {
match => [ “message” , “%{COMBINEDAPACHELOG}+%{GREEDYDATA:extra_fields}”]
overwrite => [ “message” ]
}
mutate {
convert => [“response”, “integer”]
convert => [“bytes”, “integer”]
convert => [“responsetime”, “float”]
}
geoip {
source => “clientip”
target => “geoip”
add_tag => [ “nginx-geoip” ]
}
date {
match => [ “timestamp” , “dd/MMM/YYYY:HH:mm:ss Z” ]
remove_field => [ “timestamp” ]
}
useragent {
source => “agent”
}
}

output {
elasticsearch {
hosts => [“https://arun-learningsubway-ybalglooophuhyjmik3zmkmiq4.ap-south-1.es.amazonaws.com:443”]
index => “arun_nginx”
document_type => “%{[@metadata][type]}”
user => “myusername”
password => “mypassword”
manage_template => false
template_overwrite => false
ilm_enabled => false
}
}

Commands to Run on run Windows

To run Nginx
cd D:\nginx
start nginx
–to kill nginx process
taskkill /IM “nginx.exe” /F

To run Filebeat

To enable module
.\filebeat.exe modules enable nginx

C:\Program Files\filebeat-7.10.0> .\filebeat.exe -e

To run Logstash

C:\logstash> .\bin\logstash.bat -f .\config\logstash.conf