챗봇의 다양한 기능 추가(채팅갯수 제한/DB연결)
openai api를 활용하면 토큰이 소모가 되는데 이게 사용량이 많으면 돈이 많이 깨진다
잘못하면 누가 마음 먹고 쓸모없는 채팅으로 내 토큰을 소모되게 할 수 있다
이를 해결하기 위해 여러 방법을 사용하여 보안을 구축해보았다.
1.검색어에 나의 대한 질문만 할 수 있게 설정
# 허용되는 키워드 목록
allowed_keywords = ["서찬", "찬", "너", "운영자"]
if any(keyword in user_input for keyword in allowed_keywords):
#생략
else:
return jsonify({"description": "입력값에 '서찬', '찬', '너', '운영자' 중 하나라도 포함되어야만 실행됩니다."})
2.각 ip마다 질문의 갯수를 제한하여 관리(+하루가 지나면 다시 초기화)
# 제한할 요청 수
REQUEST_LIMIT = 1 # 테스트 목적으로 1로 설정 (원래는 10개)
# IP 주소별 요청 횟수를 저장하는 사전
request_counts = defaultdict(lambda: {'count': 0, 'last_request': None})
def reset_request_count(ip):
request_counts[ip]['count'] = 0
request_counts[ip]['last_request'] = datetime.now()
def is_new_day(last_request):
return datetime.now() - last_request > timedelta(seconds=10) # 테스트 목적으로 10초로 설정 (원래는 days=1)
@app.route('/sendMessage', methods=['POST'])
def send_message():
data = request.get_json()
user_input = data.get('user_input')
#사용자 ip 가져오기
ip = request.remote_addr
# 요청 횟수 가져오기
request_data = request_counts[ip]
count = request_data['count']
last_request = request_data['last_request']
# 하루가 지났는지 확인하여 요청 횟수 초기화
if last_request is None or is_new_day(last_request):
reset_request_count(ip)
if count >= REQUEST_LIMIT:
return jsonify({"description": "질문이 초과했습니다ㅠㅠ"})
else:
request_counts[ip]['count'] += 1
request_counts[ip]['last_request'] = datetime.now()
-중간에 질문이 데이터 베이스에 저장하는데 쓰레드에 여러 질문이 들어와 저장이 안되고 또한 질문의 답 출력하는데 오류가 나길래 쓰레드를 각ip마다 주었다
# IP 주소별 스레드 ID를 저장하는 사전
thread_ids = {}
# 새로운 스레드 생성 함수
def create_new_thread():
response = client.beta.threads.create()
# response의 구조를 출력해보자
print(response)
# response 객체의 속성에 맞게 접근
return response.id
# sendMessage 라우터
@app.route('/sendMessage', methods=['POST'])
def send_message():
data = request.get_json()
user_input = data.get('user_input')
ip = request.remote_addr
# 요청 횟수 가져오기
request_data = request_counts[ip]
# IP 주소별 스레드 ID 가져오기
if ip not in thread_ids:
thread_id = create_new_thread()
thread_ids[ip] = thread_id
else:
thread_id = thread_ids[ip]
# AI 응답 가져오기
ai_response = get_ai_response(user_input, thread_id)
3. 잘못된 질문을 계속하면 차단하기
인스트럭션에 "서찬에 대한 질문이 아니면 "false" 라고만 대답해줘"라는 명령을 주어 false가 3번 이상 나오면 더 이상 opanai api가 실행되지 않게 설정해준다
request_counts = defaultdict(lambda: {'count': 0, 'last_request': None, 'false_count': 0})
def reset_request_count(ip):
request_counts[ip]['count'] = 0
request_counts[ip]['last_request'] = datetime.now()
request_counts[ip]['false_count'] = 0
@app.route('/sendMessage', methods=['POST'])
def send_message():
data = request.get_json()
user_input = data.get('user_input')
ip = request.remote_addr
# 요청 횟수 가져오기
request_data = request_counts[ip]
count = request_data['count']
last_request = request_data['last_request']
false_count = request_data['false_count']
# false_count가 3 이상이면 차단
if false_count >= 3:
return jsonify({"description": "너무 많은 잘못된 응답으로 인해 차단되었습니다."})
else:
# AI 응답 가져오기
ai_response = get_ai_response(user_input, thread_id)
# 'False' 응답 횟수 증가
if ai_response.strip().lower() == 'false':
request_counts[ip]['false_count'] += 1
4.데이터베이스에 사용자가 어떤 질문을 하는지와 어떤 사용자가 어떤 질문을 하는지 확인하게 사용자 ip를 저장하기
from openai import OpenAI
import time
from flask import Flask, request, jsonify
from flask_cors import CORS
import pymysql
from collections import defaultdict
from datetime import datetime, timedelta
# MySQL 데이터베이스 연결 설정 함수
def get_database_connection():
return pymysql.connect(host="localhost", user="root", password="", db="", charset="")
# 데이터베이스에 저장
try:
db_connection = get_database_connection()
cursor = db_connection.cursor()
query = "INSERT INTO portpolio.portpolio_input (ip, comments, answer) VALUES (%s, %s, %s)"
cursor.execute(query, (str(ip), user_input, ai_response))
db_connection.commit()
return jsonify({"description": ai_response})
except Exception as e:
return jsonify({"description": "데이터 저장 중 오류가 발생하였습니다: " + str(e)})
finally:
cursor.close()
db_connection.close()
-전체 코드
(검색어를 미리 설정 하는 건 비효율적이라는 피드백을 받아 수정->새로운 로직)
from openai import OpenAI
import time
from flask import Flask, request, jsonify
from flask_cors import CORS
import pymysql
from collections import defaultdict
from datetime import datetime, timedelta
app = Flask(__name__)
CORS(app) # 모든 출처에서의 요청을 허용합니다.
# OpenAI API 키 설정
client = OpenAI(api_key="")
# 제한할 요청 수
REQUEST_LIMIT = 5 # 테스트 목적으로 1로 설정 (원래는 10개)
# IP 주소별 요청 횟수를 저장하는 사전
request_counts = defaultdict(lambda: {'count': 0, 'last_request': None, 'false_count': 0})
# IP 주소별 스레드 ID를 저장하는 사전
thread_ids = {}
def reset_request_count(ip):
request_counts[ip]['count'] = 0
request_counts[ip]['last_request'] = datetime.now()
request_counts[ip]['false_count'] = 0
def is_new_day(last_request):
return datetime.now() - last_request > timedelta(seconds=50) # 테스트 목적으로 5초로 설정 (원래는 days=1)
# MySQL 데이터베이스 연결 설정 함수
def get_database_connection():
return pymysql.connect(host="", user="t", password="", db="", charset="")
# 새로운 스레드 생성 함수
def create_new_thread():
response = client.beta.threads.create()
# response의 구조를 출력해보자
print(response)
# response 객체의 속성에 맞게 접근
return response.id
# AI 응답을 가져오는 함수
def get_ai_response(user_input, thread_id):
try:
assistant_id = ""
message = client.beta.threads.messages.create(
thread_id,
role="user",
content=user_input,
)
run = client.beta.threads.runs.create(
thread_id=thread_id,
assistant_id=assistant_id,
)
run_id = run.id
while True:
run = client.beta.threads.runs.retrieve(
thread_id=thread_id,
run_id=run_id,
)
if run.status == "completed":
break
else:
time.sleep(2)
thread_messages = client.beta.threads.messages.list(thread_id)
ai_response = thread_messages.data[0].content[0].text.value
return ai_response
except Exception as e:
return str(e)
# sendMessage 라우터
@app.route('/sendMessage', methods=['POST'])
def send_message():
data = request.get_json()
user_input = data.get('user_input')
ip = request.remote_addr
# 요청 횟수 가져오기
request_data = request_counts[ip]
count = request_data['count']
last_request = request_data['last_request']
false_count = request_data['false_count']
# 하루가 지났는지 확인하여 요청 횟수 초기화
if last_request is None or is_new_day(last_request):
reset_request_count(ip)
# false_count가 3 이상이면 차단
if false_count >= 3:
return jsonify({"description": "너무 많은 잘못된 응답으로 인해 차단되었습니다."})
# 요청 횟수 확인
if count >= REQUEST_LIMIT:
return jsonify({"description": "질문이 초과했습니다ㅠㅠ"})
# 요청 횟수 증가
request_counts[ip]['count'] += 1
request_counts[ip]['last_request'] = datetime.now()
# IP 주소별 스레드 ID 가져오기
if ip not in thread_ids:
thread_id = create_new_thread()
thread_ids[ip] = thread_id
else:
thread_id = thread_ids[ip]
# AI 응답 가져오기
ai_response = get_ai_response(user_input, thread_id)
# 'False' 응답 횟수 증가
if ai_response.strip().lower() == 'false':
request_counts[ip]['false_count'] += 1
# 데이터베이스에 저장
try:
db_connection = get_database_connection()
cursor = db_connection.cursor()
query = "INSERT INTO portpolio.portpolio_input (ip, comments, answer) VALUES (%s, %s, %s)"
cursor.execute(query, (str(ip), user_input, ai_response))
db_connection.commit()
return jsonify({"description": ai_response})
except Exception as e:
return jsonify({"description": "데이터 저장 중 오류가 발생하였습니다: " + str(e)})
finally:
cursor.close()
db_connection.close()
if __name__ == '__main__':
print("Flask 애플리케이션이 실행되었습니다.")
app.run(debug=True)
로직의 순서
하루마다 검색어 초기화->false 3번 했는지->ip마다 요청횟수->ip마다 쓰레드 만들기->ai에게 질문
이번 과제를 하며 느낀점
-사실 코딩 할 때 chatgpt한테 물어보는 경우가 많은데 아무리 편하다해도 조금 복잡한 로직은 오류가 많이 나서 시간이 오래 걸렸다. 그러나 한번 천천히 코드 읽어가면서 로직?을 이해하며 수정하니 30분이면 끝나는 경우도 많았다. 이를 통해 chatgpt에게 물어보기 전 먼저 내가 그 로직을 구상 + 코드가 나오면 로직을 읽는 버릇이 필요하다 느낌