Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ oauth2-api-client = [
"oauthlib>=3.2.2",
"requests_oauthlib>=1.3.1"
]
opensearch-client = [
"boto3>=1.26.5",
"botocore>=1.29.5",
"opensearch-py>=2.0.0"
]
postgresql-client = [
"psycopg[binary]>=3.1.6"
]
Expand Down Expand Up @@ -81,7 +86,7 @@ research-catalog-identifier-helper = [
"requests>=2.28.1"
]
development = [
"nypl_py_utils[avro-client,kinesis-client,kms-client,mysql-client,oauth2-api-client,postgresql-client,redshift-client,s3-client,secrets-manager-client,sftp-client,config-helper,obfuscation-helper,patron-data-helper,research-catalog-identifier-helper,log_helper]",
"nypl_py_utils[avro-client,kinesis-client,kms-client,mysql-client,oauth2-api-client,opensearch-client,postgresql-client,redshift-client,s3-client,secrets-manager-client,sftp-client,config-helper,obfuscation-helper,patron-data-helper,research-catalog-identifier-helper,log_helper]",
"flake8>=6.0.0",
"freezegun>=1.2.2",
"mock>=4.0.3",
Expand Down
153 changes: 153 additions & 0 deletions src/nypl_py_utils/classes/opensearch_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import boto3
import os

from botocore.exceptions import ClientError
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
from nypl_py_utils.functions.log_helper import create_log


class OpenSearchClient:
"""
Client for interacting with an AWS OpenSearch Service domain.

Takes as input the OpenSearch domain endpoint (without the https:// scheme)
and an optional AWS region. Authentication is performed via AWS IAM using
SigV4 request signing.
"""

def __init__(self, host, region=None):
self.logger = create_log('opensearch_client')
self.host = host
self.region = region or os.environ.get('AWS_REGION', 'us-east-1')

try:
credentials = boto3.Session().get_credentials()
auth = AWSV4SignerAuth(credentials, self.region, 'es')
self.client = OpenSearch(
hosts=[{'host': self.host, 'port': 443}],
http_auth=auth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
pool_maxsize=10
)
except ClientError as e:
self.logger.error(
'Could not create OpenSearch client: {err}'.format(err=e))
raise OpenSearchClientError(
'Could not create OpenSearch client: {err}'.format(err=e)
) from None

def create_index(self, index, body=None):
"""
Creates an OpenSearch index with optional mappings and settings.

Parameters
----------
index: str
The name of the index to create
body: dict, optional
The index settings and/or mappings
"""
self.logger.info('Creating OpenSearch index {}'.format(index))
try:
return self.client.indices.create(index=index, body=body)
except Exception as e:
self.logger.error(
'Error creating OpenSearch index {name}: {error}'.format(
name=index, error=e))
raise OpenSearchClientError(
'Error creating OpenSearch index {name}: {error}'.format(
name=index, error=e)) from None

def index_document(self, index, document, document_id=None):
"""
Indexes a document in the given OpenSearch index.

Parameters
----------
index: str
The name of the index
document: dict
The document to index
document_id: str, optional
The ID to assign to the document. If not provided, OpenSearch
will auto-generate one.
"""
self.logger.info(
'Indexing document in OpenSearch index {}'.format(index))
try:
return self.client.index(
index=index, body=document, id=document_id)
except Exception as e:
self.logger.error(
'Error indexing document in OpenSearch index {name}: '
'{error}'.format(name=index, error=e))
raise OpenSearchClientError(
'Error indexing document in OpenSearch index {name}: '
'{error}'.format(name=index, error=e)) from None

def delete_document(self, index, document_id):
"""
Deletes a document from the given OpenSearch index by ID.

Parameters
----------
index: str
The name of the index
document_id: str
The ID of the document to delete
"""
self.logger.info(
'Deleting document {id} from OpenSearch index {index}'.format(
id=document_id, index=index))
try:
return self.client.delete(index=index, id=document_id)
except Exception as e:
self.logger.error(
'Error deleting document {id} from OpenSearch index '
'{name}: {error}'.format(
id=document_id, name=index, error=e))
raise OpenSearchClientError(
'Error deleting document {id} from OpenSearch index '
'{name}: {error}'.format(
id=document_id, name=index, error=e)) from None

def search(self, index, query):
"""
Executes a search query against the given OpenSearch index.

Parameters
----------
index: str
The name of the index to search
query: dict
The OpenSearch query body

Returns
-------
dict
The OpenSearch response containing hits and metadata
"""
self.logger.info('Searching OpenSearch index {}'.format(index))
self.logger.debug('Executing query {}'.format(query))
try:
return self.client.search(index=index, body=query)
except Exception as e:
self.logger.error(
'Error searching OpenSearch index {name}: {error}'.format(
name=index, error=e))
raise OpenSearchClientError(
'Error searching OpenSearch index {name}: {error}'.format(
name=index, error=e)) from None

def close_connection(self):
"""Closes the OpenSearch connection"""
self.logger.debug(
'Closing OpenSearch connection to {}'.format(self.host))
self.client.close()


class OpenSearchClientError(Exception):
def __init__(self, message=None):
self.message = message
84 changes: 84 additions & 0 deletions tests/test_opensearch_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import pytest

from nypl_py_utils.classes.opensearch_client import (
OpenSearchClient, OpenSearchClientError)

_TEST_HOST = 'test-domain.us-east-1.es.amazonaws.com'
_TEST_INDEX = 'test-index'
_TEST_DOC_ID = 'test-doc-id'
_TEST_DOCUMENT = {'title': 'Test Document', 'body': 'Test body content'}
_TEST_QUERY = {'query': {'match': {'title': 'test'}}}
_TEST_RESPONSE = {
'hits': {
'total': {'value': 1, 'relation': 'eq'},
'hits': [{'_index': _TEST_INDEX, '_id': _TEST_DOC_ID,
'_source': _TEST_DOCUMENT}]
}
}


class TestOpenSearchClient:

@pytest.fixture
def test_instance(self, mocker):
mocker.patch('boto3.Session')
mocker.patch('nypl_py_utils.classes.opensearch_client.AWSV4SignerAuth')
mocker.patch('nypl_py_utils.classes.opensearch_client.OpenSearch')
return OpenSearchClient(_TEST_HOST)

def test_create_index(self, test_instance):
test_instance.create_index(_TEST_INDEX)
test_instance.client.indices.create.assert_called_once_with(
index=_TEST_INDEX, body=None)

def test_create_index_with_body(self, test_instance):
body = {'mappings': {'properties': {'title': {'type': 'text'}}}}
test_instance.create_index(_TEST_INDEX, body=body)
test_instance.client.indices.create.assert_called_once_with(
index=_TEST_INDEX, body=body)

def test_create_index_error(self, test_instance):
test_instance.client.indices.create.side_effect = Exception('error')
with pytest.raises(OpenSearchClientError):
test_instance.create_index(_TEST_INDEX)

def test_index_document(self, test_instance):
test_instance.index_document(_TEST_INDEX, _TEST_DOCUMENT, _TEST_DOC_ID)
test_instance.client.index.assert_called_once_with(
index=_TEST_INDEX, body=_TEST_DOCUMENT, id=_TEST_DOC_ID)

def test_index_document_without_id(self, test_instance):
test_instance.index_document(_TEST_INDEX, _TEST_DOCUMENT)
test_instance.client.index.assert_called_once_with(
index=_TEST_INDEX, body=_TEST_DOCUMENT, id=None)

def test_index_document_error(self, test_instance):
test_instance.client.index.side_effect = Exception('error')
with pytest.raises(OpenSearchClientError):
test_instance.index_document(_TEST_INDEX, _TEST_DOCUMENT)

def test_delete_document(self, test_instance):
test_instance.delete_document(_TEST_INDEX, _TEST_DOC_ID)
test_instance.client.delete.assert_called_once_with(
index=_TEST_INDEX, id=_TEST_DOC_ID)

def test_delete_document_error(self, test_instance):
test_instance.client.delete.side_effect = Exception('error')
with pytest.raises(OpenSearchClientError):
test_instance.delete_document(_TEST_INDEX, _TEST_DOC_ID)

def test_search(self, test_instance):
test_instance.client.search.return_value = _TEST_RESPONSE
result = test_instance.search(_TEST_INDEX, _TEST_QUERY)
test_instance.client.search.assert_called_once_with(
index=_TEST_INDEX, body=_TEST_QUERY)
assert result == _TEST_RESPONSE

def test_search_error(self, test_instance):
test_instance.client.search.side_effect = Exception('error')
with pytest.raises(OpenSearchClientError):
test_instance.search(_TEST_INDEX, _TEST_QUERY)

def test_close_connection(self, test_instance):
test_instance.close_connection()
test_instance.client.close.assert_called_once()