"""
import asyncio, logging, json, datetime, os, uuid
from common.constants import Constants
from common.utils import (
decode_base64_to_binary,
encode_binary_to_base64,
get_user_chat_history,
get_message_object_by_id,
get_or_create_latest_conversation,
get_or_create_user_by_email,
insert_message_record,
postprocess_and_translate_query_response,
save_message_obj,
send_request,
fetch_multilingual_texts_for_static_text_messages,
get_user_by_email,
fetch_corresponding_multilingual_text,
create_or_update_user_by_email,
)
from database.models import User
from database.database_config import db_conn
from database.db_operations import update_record
from django_core.config import Config
from intent_classification.intent import process_user_intent
from rag_service.execute_rag import execute_rag_pipeline
from language_service.translation import detect_language_and_translate_to_english, a_translate_to
from language_service.asr import transcribe_and_translate
from language_service.tts import synthesize_speech
from language_service.utils import get_language_by_id
logger = logging.getLogger(__name__)
"""
[docs]def authenticate_user_based_on_email(email_id):
"""
Authenticate the user via content authenticate site
"""
authenticated_user = None
try:
# user_obj = get_record_by_field(User, "email", email_id)
#authentication_url = f"{Config.CONTENT_DOMAIN_URL}{Config.CONTENT_AUTHENTICATE_ENDPOINT}"
response = send_request(
authentication_url, data={"email": email_id}, content_type="JSON", request_type="POST", total_retry=3
)
# authenticated_user = response if len(response) >= 1 else None
authenticated_user = json.loads(response.text) if response and response.status_code == 200 else None
except Exception as error:
logger.error(error, exc_info=True)
return authenticated_user
[docs]def preprocess_user_data(
original_query,
email_id,
authenticated_user={},
#with_db_config=Config.WITH_DB_CONFIG,
#message_input_type=Constants.MESSAGE_INPUT_TYPE_TEXT,
):
"""
Process user profile fetched from content authenticate site by saving or updating.
def preprocess_user_data(
original_query,
email_id,
authenticated_user={},
#with_db_config=Config.WITH_DB_CONFIG,
message_input_type=Constants.MESSAGE_INPUT_TYPE_TEXT,
):
"""
user_name, message_id, message_obj, user_id = None, None, None, None
user_data, message_data_to_insert_or_update = {}, {}
try:
if len(authenticated_user) >= 1:
user_data.update({"user_name": authenticated_user.get("first_name")})
if with_db_config and len(authenticated_user) >= 1:
# save the user if it does not exist in the system
# user_obj = get_or_create_user_by_email(
user_obj = create_or_update_user_by_email(
{
"email": email_id,
"phone": authenticated_user.get("phone_number", None),
"first_name": authenticated_user.get("first_name", None),
"last_name": authenticated_user.get("last_name", None),
}
)
user_id = user_obj.id
user_name = user_obj.first_name
conversation_obj = get_or_create_latest_conversation({"user_id": user_id, "title": original_query})
message_obj = insert_message_record(
{"original_message": original_query, "conversation_id": conversation_obj}
)
message_id = message_obj.id
message_data_to_insert_or_update["input_type"] = message_input_type
message_data_to_insert_or_update["message_input_time"] = datetime.datetime.now()
user_data.update({"user_id": user_id, "user_name": user_name, "message_id": message_id})
except Exception as error:
logger.error(error, exc_info=True)
finally:
if message_obj and message_id:
save_message_obj(message_id, message_data_to_insert_or_update)
return user_data, message_obj
[docs]def process_query(original_query, email_id, authenticated_user={}):
"""
Pre-process user profile and user query, execute RAG pipeline with intent classification if the
user query is relevant to the content, and finally return the generated response for the same.
"""
message_obj, chat_history = None, None
response_map, message_data_to_insert_or_update, message_data_update_post_rag_pipeline = {}, {}, {}
try:
user_data, message_obj = preprocess_user_data(original_query, email_id, authenticated_user)
# fetch user chat history
user_id = user_data.get("user_id", None)
user_name = user_data.get("user_name", None)
message_id = user_data.get("message_id", None)
chat_history = get_user_chat_history(user_id) if user_id else None
# begin translating original query to english
message_data_to_insert_or_update["input_translation_start_time"] = datetime.datetime.now()
query_in_english, input_language_detected = asyncio.run(
detect_language_and_translate_to_english(original_query)
)
message_data_to_insert_or_update["translated_message"] = query_in_english
message_data_to_insert_or_update["input_translation_end_time"] = datetime.datetime.now()
message_data_to_insert_or_update["input_language_detected"] = input_language_detected
# end of translating original query to english
intent_response, user_intent, proceed_to_rag = asyncio.run(process_user_intent(original_query, user_name))
if not proceed_to_rag:
# do not execute rag pipeline
response_map.update({"generated_final_response": intent_response})
else:
# execute rag pipeline further
response_map, message_data_update_post_rag_pipeline = execute_rag_pipeline(
query_in_english,
input_language_detected,
email_id,
user_name=user_name,
message_id=message_id,
chat_history=chat_history,
)
# translate back to the detected input language of the original query
# begin translating original response to input_language_detected
(
translated_response,
final_response,
follow_up_question_options,
follow_up_question_data_to_insert,
) = asyncio.run(
postprocess_and_translate_query_response(
response_map.get("generated_final_response"), input_language_detected, str(message_id)
)
)
# begin translating original response to input_language_detected
response_map.update(
{
"translated_response": translated_response,
"final_response": final_response,
"follow_up_questions": follow_up_question_options,
}
)
message_data_to_insert_or_update["message_response"] = final_response
message_data_to_insert_or_update["message_translated_response"] = translated_response
message_data_to_insert_or_update.update(message_data_update_post_rag_pipeline)
except Exception as error:
logger.error(error, exc_info=True)
finally:
if message_obj and message_id:
save_message_obj(message_id, message_data_to_insert_or_update)
return response_map
[docs]def process_output_audio(original_text, message_id=None,):# with_db_config=Config.WITH_DB_CONFIG):
"""
Synthesise output text or generated response to audio in english language, and encode to base64 string
Note : Org methoc call
def process_output_audio(original_text, message_id=None,):# with_db_config=Config.WITH_DB_CONFIG):
"""
response_audio, response_audio_file, message_obj = None, None, None
message_data_to_insert_or_update = {}
try:
if with_db_config and message_id:
message_obj = get_message_object_by_id(message_id)
input_language_detected = message_obj.input_language_detected
else:
query_in_english, input_language_detected = asyncio.run(
detect_language_and_translate_to_english(original_text)
)
message_data_to_insert_or_update["response_text_to_speech_start_time"] = datetime.datetime.now()
response_audio_file = asyncio.run(synthesize_speech(str(original_text), input_language_detected, message_id))
message_data_to_insert_or_update["response_text_to_speech_end_time"] = datetime.datetime.now()
response_audio = encode_binary_to_base64(response_audio_file)
except Exception as error:
logger.error(error, exc_info=True)
finally:
if message_obj:
save_message_obj(message_id, message_data_to_insert_or_update)
if response_audio_file:
os.remove(response_audio_file)
return response_audio
[docs]def process_transcriptions(
voice_file,
email_id,
authenticated_user={},
#language_code=Constants.LANGUAGE_SHORT_CODE_NATIVE,
#language_bcp_code=Constants.LANGUAGE_BCP_CODE_NATIVE,
#message_input_type=Constants.MESSAGE_INPUT_TYPE_VOICE,
#with_db_config=Config.WITH_DB_CONFIG,
):
"""
Process generation of transcriptions (text) for a given audio or voice file
in a specified language if any or in user preferred language.
"""
message_id, message_obj = None, None
response_map, message_data_to_insert_or_update, language_dict = {}, {}, {}
try:
if with_db_config:
user = get_user_by_email(email_id)
user_id = user.get("user_id")
language_dict = get_language_by_id(user.get("preferred_language_id"))
language_code = language_dict.get("code", language_code)
text_codes_list_with_multilingual_texts = fetch_multilingual_texts_for_static_text_messages(
[Constants.COULD_NOT_UNDERSTAND_MESSAGE], language_code
)
could_not_understand_message = fetch_corresponding_multilingual_text(
Constants.COULD_NOT_UNDERSTAND_MESSAGE, text_codes_list_with_multilingual_texts
)
message_data_to_insert_or_update["message_input_time"] = datetime.datetime.now()
message_data_to_insert_or_update["input_speech_to_text_start_time"] = datetime.datetime.now()
transcriptions, detected_language, confidence_score = asyncio.run(
transcribe_and_translate(voice_file, language_bcp_code)
)
message_data_to_insert_or_update["input_speech_to_text_end_time"] = datetime.datetime.now()
response_map["confidence_score"] = confidence_score
response_map["transcriptions"] = transcriptions
if confidence_score < Constants.ASR_DEFAULT_CONFIDENCE_SCORE:
# if confidence is less then transcriptions may not be correct
message_data_to_insert_or_update["message_response"] = could_not_understand_message
message_data_to_insert_or_update["message_translated_response"] = could_not_understand_message
message_data_to_insert_or_update["message_response_time"] = datetime.datetime.now()
message_data_to_insert_or_update["input_type"] = message_input_type
response_map["transcriptions"] = could_not_understand_message
user_data, message_obj = preprocess_user_data(transcriptions, email_id, authenticated_user)
message_id = user_data.get("message_id", None)
response_map["message_id"] = message_id
except Exception as error:
logger.error(error, exc_info=True)
finally:
if message_obj and message_id:
save_message_obj(message_id, message_data_to_insert_or_update)
if voice_file:
os.remove(voice_file)
return response_map