개발 낙서장

[TIL] 내일배움캠프 77일차 - AWS SQS FIFO 본문

Java/Sparta

[TIL] 내일배움캠프 77일차 - AWS SQS FIFO

권승준 2024. 4. 17. 22:50

 

 

오늘의 학습 키워드📚

AWS SQS

Amazon Simple Queue Service(Amazon SQS)는 내구력 있고 가용성이 뛰어난 보안 호스팅 대기열을 제공하며 이를 통해 분산 소프트웨어 시스템과 구성 요소를 통합 및 분리할 수 있습니다. Amazon SQS는 배달 못한 편지 대기열 및 비용 할당 태그와 같은 공용 구성을 제공합니다. 또한 AWS SDK가 지원하는 모든 프로그래밍 언어로 액세스할 수 있는 일반 웹 서비스 API를 제공합니다.

아마존에서 제공하는 메세지 큐 서비스이다. 클라이언트에서 메세지를 전송하면 큐에 대기열에 저장되고 메세지 폴링을 통해 메세지를 수신 받아 처리할 수 있는 방식이다.

AWS SQS를 프로젝트에 도입하게 된 이유는 기존에 거래 시스템에서 재고 관련 동시성 제어를 Redis의 Single Thread 방식으로 제어하고 있었는데 Redis로 동시성 제어를 하는 방식이 흔하지 않고 안전성 문제 등 단점이 꽤 많다고 한다.
따라서 차라리 DB에 낙관적 락을 걸어 사용하던가 SQS를 활용하는 방법을 고민하다 아무래도 SQS가 데이터 전송에 있어 안정성도 뛰어나고 FIFO 방식으로 확실하게 하나씩 처리할 수 있다는 점 때문에 선택했다.

여러 사용자가 동시에 주문을 요청하면 주문 관련 정보를 메세지 처리해 SQS로 보내고 서버에서는 메세지를 하나씩 수신하여 재고를 확인하고 주문을 처리하는 흐름이다.

SQS 생성

이 글을 참고하여 작성했다.

https://ap-northeast-2.console.aws.amazon.com/sqs/v3/home?region=ap-northeast-2#/homepage

 

https://ap-northeast-2.console.aws.amazon.com/sqs/v3/home?region=ap-northeast-2#/homepage

 

ap-northeast-2.console.aws.amazon.com

위의 사이트로 들어가 대기열 생성을 누른다.

표준 유형은 빠르지만 순서가 보장되지 않고 중복 메세지 수신의 가능성이 있는 반면 FIFO 유형은 느리지만 선입 선출 방식으로 순서와 중복 제어를 보장한다.
본 프로젝트에서는 결제 요청을 하나씩 처리해야 하기에 FIFO 유형을 선택했다.
구성에 대한 설명은 정보를 누르면 AWS에서 친절히 설명해주니 잘 읽어보고 값을 설정하면 된다.

데이터 암호화 부분인데 본 프로젝트에서 SQS로 메세지를 보낼 때 중요한 정보는 보내지 않게 설계돼서 비활성화했다.

이후 나머지 값들은 기본 값으로 설정했다.

IAM 설정

IAM 사용자를 생성하고 AmazonSQSFullAccess 권한을 추가해야 한다.
방법은 권한 추가를 누르고 직접 정책 연결을 눌러 검색하면 된다.

이후 엑세스 키를 만들어 잘 보관하자. 엑세스 키를 생성할 때 확인하는 부분에서 키 값을 따로 저장하지 않으면 다시 확인할 수 없으니 주의해야 한다.

properties 설정

# AWS S3 configuration
cloud.aws.region.static=ap-northeast-2
# Disable automatic detection of Spring Cloud AWS stack
cloud.aws.stack.auto-=false
# AWS SQS Name
cloud.aws.sqs.queue.name=QueueName
# AWS SQS Url
cloud.aws.sqs.queue.url=QueueUrl

# AWS S3 credentials(Key)
cloud.aws.s3.credentials.accessKey=AccessKey
cloud.aws.s3.credentials.secretKey=SecretKey

본인이 설정한 AWS 지역 및 SQS 정보, 그리고 키 값을 설정한다.
Access Key와 Secret Key는 절대 공개돼선 안되니 Git에 올리거나 하드코딩하는 일은 절대로 없어야 한다.

의존성

    // AWS SQS
    // implementation group: 'org.springframework.cloud', name: 'spring-cloud-aws-messaging', version: '2.2.6.RELEASE'
    // implementation group: 'org.springframework.cloud', name: 'spring-cloud-aws-autoconfigure', version: '2.2.6.RELEASE'
    implementation platform("io.awspring.cloud:spring-cloud-aws-dependencies:3.0.1")
    implementation 'io.awspring.cloud:spring-cloud-aws-starter-sqs'

원래는 위의 2줄로 의존성을 추가했었는데 스프링 부트 3.0 이상 버전 부터는 아래 2줄로 의존성을 추가한다고 한다.

Config

@Configuration
public class AwsSqsConfig {

    @Value("${cloud.aws.s3.credentials.accessKey}")
    private String awsAccessKey;

    @Value("${cloud.aws.s3.credentials.secretKey}")
    private String awsSecretKey;

    @Value("${cloud.aws.region.static}")
    private String awsRegion;

    // 클라이언트 설정: region과 자격증명
    @Bean
    public SqsAsyncClient sqsAsyncClient() {
        return SqsAsyncClient.builder()
            .credentialsProvider(() -> new AwsCredentials() {
                @Override
                public String accessKeyId() {
                    return awsAccessKey;
                }

                @Override
                public String secretAccessKey() {
                    return awsSecretKey;
                }
            })
            .region(Region.of(awsRegion))
            .build();
    }

    // Listener Factory 설정 (Listener 쪽)
    @Bean
    public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory() {
        return SqsMessageListenerContainerFactory.builder()
            .sqsAsyncClient(sqsAsyncClient())
            .build();
    }

    // 메시지 발송을 위한 SQS 템플릿 설정 (Sender 쪽)
    @Bean
    public SqsTemplate sqsTemplate() {
        return SqsTemplate.newTemplate(sqsAsyncClient());
    }
}

SQSService

@Slf4j
@Service
public class SqsService {

    private final SqsTemplate sqsTemplate;

    @Value("${cloud.aws.sqs.queue.url}")
    private String queueUrl;

    @Value("${cloud.aws.sqs.queue.name}")
    private String queueName;

    public SqsService(SqsAsyncClient sqsAsyncClient) {
        this.sqsTemplate = SqsTemplate.newTemplate(sqsAsyncClient);
    }

    @SqsListener(value = "MyQueue.fifo")
    public void sqsListener(@Payload Message payload) {
        log.info("Receiver: " + payload.getMessage());
    }

    public SendResult<Message> sendMessage(Message message) {
        log.info("Sender: " + message.getMessage());
        return sqsTemplate.send(to -> to
            .queue(queueName)
            .payload(message));
    }
}

SQSController

@RestController
@RequiredArgsConstructor
public class SqsController {

    private final SqsService sqsService;

    @PostMapping("/message")
    public void sendMessage(@RequestBody Message message) {
        sqsService.sendMessage(message);
    }
}

메세지 전송 API를 호출하면 서비스 로직에 의해 SQS Queue로 메세지가 보내지고 @SqsListener를 통해 서버에서 메세지를 수신한다.

포스트맨으로 테스트한 결과 메세지가 보내지고 바로 수신하는 것을 볼 수 있다.
다음에는 JMeter로 동시성 테스트를 진행해보려고 한다.


오늘의 회고💬

SQS에 대한 개념은 어느정도 잡힌 것 같은데 아직 활용법이 좀 의문이다. JMeter로 동시성 테스트를 해봤는데 재고 관리가 제대로 되지 않는 현상이 생겨서 이 부분은 좀 더 고민을 해야 할 것 같다.

 

내일의 계획📜

내일 SQS 동시성 부분을 좀 더 알아봐야 할 것 같다. SQS에서 메세지를 처리하는 속도와 DB에서 처리되는 속도가 달라 생기는 문제점 같은데 조금 더 찾아봐야 할 것 같다.

Comments