Read & Write data into AWS SQS using Java

Amazon Simple Queue Service (Amazon SQS) is a fully managed message queuing service that makes it easy to decouple and scale microservices, distributed systems, and serverless applications. Amazon SQS moves data between distributed application components and helps you decouple these components.
Before going to development install AWS SDK.
For basic theory check my page AWS SQS.
In application.yml 
sqs:
region: ap-south-1
accessKeyId: arunsinghgujjar
secretAccessKey: jainpurwalearunsingh/saharanpursepauchepuna
cloud:
aws:
end-point:
uri: https://arun-learningsubway-1.amazonaws.com/9876974864/learningsubway_SQS.fifo
queue:
max-poll-time: 20
max-messages: 10
fetch-wait-on-error: 60
enabled: true
content: sqs
In Java need to create two classes one foe send message and another to read & delete message from queue
To Send SQS Message
public static class SqsConfig {
private String region;
private String accessKeyId;
private String secretAccessKey;
}
public class SendMessageByLearningsubway {
@Value("${cloud.aws.queue.enabled}")
private Boolean enabled;
@Value("${cloud.aws.end-point.uri}")
private String url;
@Value("${cloud.aws.queue.content}")
private String queueType;
@Value("${cloud.aws.queue.max-poll-time}")
private Integer maxPollTime;
@Value("${cloud.aws.queue.max-messages}")
private Integer maxMessages;
@Value("${cloud.aws.queue.fetch-wait-on-error}")
private Integer fetchWaitOnError;
@Autowired
public SqsClient sqsClient;
public String sendMessage(MessageDistributionEvent messageDistributionEvent) {
SendMessageResponse sendMessage = null;
try {
Map<String, MessageAttributeValue> attributes = new HashMap<>();

String recepList = "";
for (Integer myInt : messageDistributionEvent.getRecipients()) {
recepList = recepList + "_" + myInt;
}
SendMessageRequest sendMsgRequest = SendMessageRequest.builder()
.queueUrl(url)
.messageBody(messageDistributionEvent.getChannelId() + "_" + messageDistributionEvent.getMessageId() + "" + recepList)
.messageGroupId("1")
.messageAttributes(attributes)
.build();
sendMessage = sqsClient.sendMessage(sendMsgRequest);
} catch (Exception ex) {
log.info("failed to send message :" + ex);
}
return sendMessage.sequenceNumber();
}
}
To Read and Delete Message
If message is not deleted, it will remain there in queue and possibility of retransmission.
public class ReceiveMessageLearningsubway {
@Value("${cloud.aws.queue.enabled}")
private Boolean enabled;
@Value("${cloud.aws.end-point.uri}")
private String url;
@Value("${cloud.aws.queue.content}")
private String queueType;
@Value("${cloud.aws.queue.max-poll-time}")
private Integer maxPollTime;
@Value("${cloud.aws.queue.max-messages}")
private Integer maxMessages;
@Value("${cloud.aws.queue.fetch-wait-on-error}")
private Integer fetchWaitOnError;
private boolean running = false;
@Autowired
public SqsClient sqsClient;
@PostConstruct
public void start() {
if (enabled && queueType.equals("sqs")) {
running = true;
new Thread(this::startListener, "sqs-listener").start();
}
}
@PreDestroy
public void stop() {
running = false;
}
private void startListener() {
while (running) {
try {
ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder()
.queueUrl(url)
.waitTimeSeconds(maxPollTime)
.maxNumberOfMessages(maxMessages)
.messageAttributeNames("MessageLabel")
.build();
List<Message> sqsMessages = sqsClient.receiveMessage(receiveMessageRequest).messages();
for (Message message : sqsMessages) {
try {
String body = message.body();
String[] data = body.split("_");
List<Integer> listRecipient = new LinkedList<>();
for (int i = 2; i < data.length; i++) {
log.info("RecepId:" + data[i]);
listRecipient.add(Integer.parseInt(data[i]));
}
//data read from queue by learningsubway.com
System.out.println(Integer.parseInt(data[0]), data[1], listRecipient);
sqsClient.deleteMessage(DeleteMessageRequest.builder()
.queueUrl(url)
.receiptHandle(message.receiptHandle())
.build());
} catch (Exception e) {
log.error("Failed to process ", message.messageId());
}
}
} catch (Exception e) {
log.warn("Error in fetching messages from SQS Queue. Will sleep and retry again.", e);
try {
Thread.sleep(fetchWaitOnError * 1000);
} catch (InterruptedException ie) {
log.error("Unable to sleep the sqs-listener", e);
}
}
}
}
public String sendMessage(QueueMessage message) {
SendMessageResponse sendMessage = null;
try {
log.info("send message on sqs");
Map<String, MessageAttributeValue> attributes = new HashMap<>();
log.info("send message on SQS: " + message);
attributes.put("ContentBasedDeduplication", MessageAttributeValue.builder().dataType("String").stringValue("true").build());
attributes.put("MessageLabel", MessageAttributeValue.builder()
.dataType("String")
.stringValue(message.getMsgId())
.build());
SendMessageRequest sendMsgRequest = SendMessageRequest.builder()
.queueUrl(url)
.messageBody(message.getMsgId())
.messageGroupId("1")
.messageAttributes(attributes)
.build();
sendMessage = sqsClient.sendMessage(sendMsgRequest);
log.info("data sent to queue: " + sendMessage.sequenceNumber());
} catch (Exception ex) {
log.info("failed to send message :" + ex);
}
return sendMessage.sequenceNumber();
}
}

Author: Arun Singh

Learning is an Habit.