Amazon Simple Queue Service (Amazon SQS) is a fully managed message queuing service that makes it easy to decouple and scale microservices, distributed systems, and serverless applications. Amazon SQS moves data between distributed application components and helps you decouple these components. Before going to development install AWS SDK. For basic theory check my page AWS SQS. In application.yml sqs: region: ap-south-1 accessKeyId: arunsinghgujjar secretAccessKey: jainpurwalearunsingh/saharanpursepauchepuna cloud: aws: end-point: uri: https://arun-learningsubway-1.amazonaws.com/9876974864/learningsubway_SQS.fifo queue: max-poll-time: 20 max-messages: 10 fetch-wait-on-error: 60 enabled: true content: sqs In Java need to create two classes one foe send message and another to read & delete message from queue To Send SQS Message public static class SqsConfig { private String region; private String accessKeyId; private String secretAccessKey; } public class SendMessageByLearningsubway { @Value("${cloud.aws.queue.enabled}") private Boolean enabled; @Value("${cloud.aws.end-point.uri}") private String url; @Value("${cloud.aws.queue.content}") private String queueType; @Value("${cloud.aws.queue.max-poll-time}") private Integer maxPollTime; @Value("${cloud.aws.queue.max-messages}") private Integer maxMessages; @Value("${cloud.aws.queue.fetch-wait-on-error}") private Integer fetchWaitOnError; @Autowired public SqsClient sqsClient; public String sendMessage(MessageDistributionEvent messageDistributionEvent) { SendMessageResponse sendMessage = null; try { Map<String, MessageAttributeValue> attributes = new HashMap<>(); String recepList = ""; for (Integer myInt : messageDistributionEvent.getRecipients()) { recepList = recepList + "_" + myInt; } SendMessageRequest sendMsgRequest = SendMessageRequest.builder() .queueUrl(url) .messageBody(messageDistributionEvent.getChannelId() + "_" + messageDistributionEvent.getMessageId() + "" + recepList) .messageGroupId("1") .messageAttributes(attributes) .build(); sendMessage = sqsClient.sendMessage(sendMsgRequest); } catch (Exception ex) { log.info("failed to send message :" + ex); } return sendMessage.sequenceNumber(); } } To Read and Delete Message If message is not deleted, it will remain there in queue and possibility of retransmission. public class ReceiveMessageLearningsubway { @Value("${cloud.aws.queue.enabled}") private Boolean enabled; @Value("${cloud.aws.end-point.uri}") private String url; @Value("${cloud.aws.queue.content}") private String queueType; @Value("${cloud.aws.queue.max-poll-time}") private Integer maxPollTime; @Value("${cloud.aws.queue.max-messages}") private Integer maxMessages; @Value("${cloud.aws.queue.fetch-wait-on-error}") private Integer fetchWaitOnError; private boolean running = false; @Autowired public SqsClient sqsClient; @PostConstruct public void start() { if (enabled && queueType.equals("sqs")) { running = true; new Thread(this::startListener, "sqs-listener").start(); } } @PreDestroy public void stop() { running = false; } private void startListener() { while (running) { try { ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(url) .waitTimeSeconds(maxPollTime) .maxNumberOfMessages(maxMessages) .messageAttributeNames("MessageLabel") .build(); List<Message> sqsMessages = sqsClient.receiveMessage(receiveMessageRequest).messages(); for (Message message : sqsMessages) { try { String body = message.body(); String[] data = body.split("_"); List<Integer> listRecipient = new LinkedList<>(); for (int i = 2; i < data.length; i++) { log.info("RecepId:" + data[i]); listRecipient.add(Integer.parseInt(data[i])); } //data read from queue by learningsubway.com System.out.println(Integer.parseInt(data[0]), data[1], listRecipient); sqsClient.deleteMessage(DeleteMessageRequest.builder() .queueUrl(url) .receiptHandle(message.receiptHandle()) .build()); } catch (Exception e) { log.error("Failed to process ", message.messageId()); } } } catch (Exception e) { log.warn("Error in fetching messages from SQS Queue. Will sleep and retry again.", e); try { Thread.sleep(fetchWaitOnError * 1000); } catch (InterruptedException ie) { log.error("Unable to sleep the sqs-listener", e); } } } } public String sendMessage(QueueMessage message) { SendMessageResponse sendMessage = null; try { log.info("send message on sqs"); Map<String, MessageAttributeValue> attributes = new HashMap<>(); log.info("send message on SQS: " + message); attributes.put("ContentBasedDeduplication", MessageAttributeValue.builder().dataType("String").stringValue("true").build()); attributes.put("MessageLabel", MessageAttributeValue.builder() .dataType("String") .stringValue(message.getMsgId()) .build()); SendMessageRequest sendMsgRequest = SendMessageRequest.builder() .queueUrl(url) .messageBody(message.getMsgId()) .messageGroupId("1") .messageAttributes(attributes) .build(); sendMessage = sqsClient.sendMessage(sendMsgRequest); log.info("data sent to queue: " + sendMessage.sequenceNumber()); } catch (Exception ex) { log.info("failed to send message :" + ex); } return sendMessage.sequenceNumber(); } }
Month: July 2021
FileBeat 401 Unauthorized Error with AWS Elasticsearch
Overview
Filebeat is a lightweight shipper for forwarding and centralizing log data. Installed as an agent on your servers, Filebeat monitors the log files or locations that you specify, collects log events, and forwards them either to Elasticsearch or Logstash for indexing.
Here’s how Filebeat works: When you start Filebeat, it starts one or more inputs that look in the locations you’ve specified for log data. For each log that Filebeat locates, Filebeat starts a harvester. Each harvester reads a single log for new content and sends the new log data to libbeat, which aggregates the events and sends the aggregated data to the output that you’ve configured for Filebeat.
If you are getting below error while importing data into AWS Elasticsearch directly from Filebeat, then this post is for you!
Exiting: 1 error: error loading index pattern: returned 401 to import file: . Response: {“statusCode”:401,”error”:”Unauthorized”,”message”:”Authentication required”}
Exiting: 1 error: error loading index pattern: returned 401 to import file: . Response: {“statusCode”:401,”error”:”Unauthorized”,”message”:”Authentication required”}
This issue comes if you are approaching AWS Elasticsearch with username/password security as
setup.kibana:
host: “https://arun-learningsubway-abxybalglzl3zmkmiq4.ap-south-1.es.amazonaws.com:443/_plugin/kibana/”
output.elasticsearch:
protocol: https
hosts: [“arun-learningsubway-workapps-abxybalglzl3zmkmiq4.ap-south-1.es.amazonaws.com:9200”]
username: “myUsername”
password: “myPassword”
index: “nginx_index_by_arun”
Solution
In Aws, while configuring your Elasticsearch service configure it for whitelisting of IP instead of Master User.
or
Configure FileBeat–> Logstash–>Elasticsearch with master username/password also it will work.
FileBeat and Logstash to insert Data into AWS Elasticsearch
FileBeat to insert Data into Logstash, and Logstash to insert Data into Elasticsearch
*Important point here is latest Elasticsearch version supported on AWS is 7.10, so Logstash and FileBeat must also be on same version.
If not then there will be a possibility of version compatibility.
* If latest version of ES available is x and you are not on cloud then also keep at least (x-1) version on production. It will keep you safe in production and away from product bugs to a lot extent.
Click and Download Filebeat 7.10 and Logstash7.10
Configuration of FileBeat to insert nginx logs into Logstash
Open filebeat.yml in any editor of your choice from location
/etc/filebeat/ on Linux or
C:\Program Files\filebeat-7.10.0 on windows
filebeat:
inputs:
– paths:
– E:/nginx-1.20.1/logs/.log
input_type: log
filebeat.config.modules:
enabled: true
path: ${path.config}/modules.d/*.yml
output:
logstash:
hosts: [“localhost:5044”]
Logstash Configuration
input {
beats {
port => 5044
ssl => false
}
}
filter {
grok {
match => [ “message” , “%{COMBINEDAPACHELOG}+%{GREEDYDATA:extra_fields}”]
overwrite => [ “message” ]
}
mutate {
convert => [“response”, “integer”]
convert => [“bytes”, “integer”]
convert => [“responsetime”, “float”]
}
geoip {
source => “clientip”
target => “geoip”
add_tag => [ “nginx-geoip” ]
}
date {
match => [ “timestamp” , “dd/MMM/YYYY:HH:mm:ss Z” ]
remove_field => [ “timestamp” ]
}
useragent {
source => “agent”
}
}
output {
elasticsearch {
hosts => [“https://arun-learningsubway-ybalglooophuhyjmik3zmkmiq4.ap-south-1.es.amazonaws.com:443”]
index => “arun_nginx”
document_type => “%{[@metadata][type]}”
user => “myusername”
password => “mypassword”
manage_template => false
template_overwrite => false
ilm_enabled => false
}
}
Commands to Run on run Windows
To run Nginx
cd D:\nginx
start nginx
–to kill nginx process
taskkill /IM “nginx.exe” /F
To run Filebeat
To enable module
.\filebeat.exe modules enable nginx
C:\Program Files\filebeat-7.10.0> .\filebeat.exe -e
To run Logstash
C:\logstash> .\bin\logstash.bat -f .\config\logstash.conf