# -*- coding: utf-8 -*-
"""
Created on Mon Apr 12 09:36:25 2021
@author: Pedram Tavadze
used PyChemia Database class as a guide
https://github.com/MaterialsDiscovery/PyChemia/blob/master/pychemia/db/db.py
"""
import subprocess
import pymongo
import gridfs
from pathlib import Path
from random import choice
from bson.objectid import ObjectId
from collections.abc import Mapping
from typing import Callable, List, Optional, Union
from ..core import Tape, TapeAnalyzer, Image, Metadata
from ..utils.array_tools import read_bytes_io, write_bytes_io
[docs]class ClassMap(Mapping):
"""
A custom mapping class that maps string keys to specific classes.
This class is a subclass of `collections.abc.Mapping` and provides a custom mapping
between string keys and classes. The keys 'material', 'analysis', and 'any' are mapped
to the classes `Tape`, `TapeAnalyzer`, and `Image` respectively.
Attributes
----------
mapping : dict
The internal dictionary that stores the mapping between keys and classes.
Methods
-------
__contains__(x: str) -> bool
Check if `x` is a key in the mapping.
__getitem__(x: str) -> Union[Tape, TapeAnalyzer, Image]
Get the class associated with the key `x`. If `x` is not a key in the mapping, return the class associated with the key 'any'.
__iter__() -> Iterator
Return an iterator over the keys in the mapping.
__len__() -> int
Return the number of key-value pairs in the mapping.
"""
[docs] def __init__(self):
"""
Initialize a new instance of ClassMap.
"""
self.mapping = {'material': Tape,
'analysis': TapeAnalyzer,
'any': Image}
[docs] def __contains__(self, x):
"""
Check if `x` is a key in the mapping.
Parameters
----------
x : str
The key to check.
Returns
-------
bool
True if `x` is a key in the mapping, False otherwise.
"""
return x in self.mapping
[docs] def __getitem__(self, x):
"""
Get the class associated with the key `x`.
If `x` is not a key in the mapping, return the class associated with the key 'any'.
Parameters
----------
x : str
The key to get the associated class for.
Returns
-------
Union[Tape, TapeAnalyzer, Image]
The class associated with the key `x`, or the class associated with the key 'any' if `x` is not a key in the mapping.
"""
if x in self:
return self.mapping.__getitem__(x)
else:
return self.mapping.__getitem__('any')
[docs] def __iter__(self):
"""
Return an iterator over the keys in the mapping.
Returns
-------
Iterator
An iterator over the keys in the mapping.
"""
return self.mapping.__iter__()
[docs] def __len__(self):
"""
Return the number of key-value pairs in the mapping.
Returns
-------
int
The number of key-value pairs in the mapping.
"""
return self.mapping.__len__()
[docs]class Database:
"""
.. _Database:
Database
========
This class provides an interface to interact with the MongoDB database.
The Database class encapsulates the MongoDB client, and provides methods
to query and manipulate the data stored in the MongoDB collections.
Parameters
----------
name : str, optional
The name of the database, defaults to 'forensicfit'.
host : str, optional
The host IP address or hostname where the MongoDB database is running, defaults to "localhost".
port : int, optional
The port number to connect to the MongoDB database, defaults to 27017.
username : str, optional
The username for authenticating with the MongoDB database, defaults to an empty string.
password : str, optional
The password for authenticating with the MongoDB database, defaults to an empty string.
verbose : bool, optional
A flag to indicate whether to print verbose output, defaults to False.
Attributes
----------
uri : str
The URI for connecting to the MongoDB database.
client : pymongo.MongoClient
The MongoDB client instance.
db : pymongo.database.Database
The Database instance from pymongo representing the MongoDB database.
entries : pymongo.collection.Collection
The Collection instance representing the entries in the MongoDB database.
fs : dict
A dictionary of GridFS instances for different collections in the MongoDB database.
class_mapping : ClassMap
The ClassMap instance for classifying the entries.
db_info : dict
A dictionary storing the information about the MongoDB database connection.
"""
[docs] def __init__(self,
name: str = 'forensicfit',
host: str = "localhost",
port: int = 27017,
username: str = "",
password: str = "",
verbose: bool = False,
**kwargs):
self.name = name
self.host = host
self.port = port
self.username = username
self.password = password
self.verbose = verbose
if len(password) != 0:
self.password = ":"+password+"@"
else:
if len(username) != 0:
self.username += "@"
self.uri = "mongodb://%s%s%s:%d" % (username, password, host, port)
self.client = pymongo.MongoClient(self.uri)
self.db = self.client[name]
self.entries = self.db.forensicfit_entries
self.fs = {}
self.fs['material'] = gridfs.GridFS(self.db, "material")
self.fs['analysis'] = gridfs.GridFS(self.db, "analysis")
self.class_mapping = ClassMap()
for x in self.collection_names:
if x not in self.fs:
self.add_collection(x)
self.db_info = {"Database Name": self.name,
"Host": self.host,
"Port": self.port,
"User": self.username}
if self.verbose:
print("----------------")
print("connected to:")
print(self)
[docs] def disconnect(self):
"""
.. _disconnect:
Disconnect
==========
Disconnects the Database object from the MongoDB database by closing the
pymongo client connection.
Returns
-------
None
"""
self.client.close()
def __str__(self):
"""
Return a string representation of the Database object.
This method constructs a string that includes the MongoDB connection details
such as the database name, host, port, and user. The string is formatted
in a way that each detail is presented on a new line with the detail's name
and its value separated by a colon.
Returns
-------
str
A string representation of the Database object including the MongoDB
connection details.
"""
ret = "MongoDB\n"
ret += "----------------\n"
for key in self.db_info:
ret += "{:<15}: {}\n".format(key, self.db_info[key])
return ret
[docs] def add_collection(self, collection: str):
"""
Add a new collection to the database.
This method creates a new GridFS instance for the specified collection
and adds it to the `fs` attribute.
Parameters
----------
collection : str
The name of the collection to add.
Returns
-------
None
"""
self.fs[collection] = gridfs.GridFS(self.db, collection)
return
[docs] def exists(self,
filter: dict = None,
collection: str = None,
metadata: Metadata = None) -> Union[ObjectId, bool]:
"""
Check if a document exists in the specified collection based on the provided filter or metadata.
This method checks if a document exists in the specified collection of the MongoDB database
that matches the provided filter or metadata. If a document is found, it returns the ObjectId
of the document. If no document is found, it returns False.
Parameters
----------
filter : dict, optional
A dictionary specifying the filter criteria to use when searching for the document.
collection : str, optional
The name of the collection to search in. If not provided, the collection name is
determined from the 'mode' field of the metadata.
metadata : Metadata, optional
A Metadata object specifying the metadata to use when searching for the document.
If provided, the 'mode' field of the metadata is used as the collection name and
the metadata is converted to a MongoDB filter.
Returns
-------
ObjectId or bool
The ObjectId of the found document if a matching document is found, False otherwise.
Raises
------
Exception
If neither metadata nor filter and collection are provided.
"""
if collection not in self.fs:
return False
if metadata is not None:
collection = collection or metadata['mode']
filter = {"$and": metadata.to_mongodb_filter()}
elif filter is None or collection is None:
raise Exception("Provide metadata or filter and collection")
ret = self.fs[collection].find_one(filter=filter)
if ret is not None:
return ret._id
else:
return False
[docs] def insert(self,
obj: Union[Image, Tape, TapeAnalyzer],
ext: str = '.png',
overwrite: bool = False,
skip: bool = False,
collection: str = None) -> ObjectId:
"""
Insert an object into the specified collection in the MongoDB database.
This method inserts an object into the specified collection of the MongoDB database.
If the object already exists in the database, the behavior depends on the `overwrite`
and `skip` parameters. If `overwrite` is True, the existing object is deleted and the
new object is inserted. If `skip` is True, the insertion is skipped and the ObjectId
of the existing object is returned. If neither `overwrite` nor `skip` is True and the
object already exists, an exception is raised.
Parameters
----------
obj : Union[Image, Tape, TapeAnalyzer]
The object to insert into the database.
ext : str, optional
The file extension to use when converting the object to a buffer, defaults to '.png'.
overwrite : bool, optional
Whether to overwrite the existing object if it already exists, defaults to False.
skip : bool, optional
Whether to skip the insertion if the object already exists, defaults to False.
collection : str, optional
The name of the collection to insert the object into. If not provided, the collection
name is determined from the 'mode' field of the object's metadata.
Returns
-------
ObjectId
The ObjectId of the inserted object.
Raises
------
Exception
If the object already exists in the database and neither `overwrite` nor `skip` is True.
"""
collection = collection or obj.metadata['mode']
filter = {"$and": obj.metadata.to_mongodb_filter()}
exists = self.exists(filter=filter,
collection=collection,
)
if collection not in self.fs:
self.add_collection(collection)
fs = self.fs[collection]
if overwrite and exists:
if self.verbose:
print(
f"{obj.metadata.filename} {collection} already exists, overwriting!")
self.delete(filter, collection)
elif skip and exists:
if self.verbose:
print(
f"{obj.metadata.filename} {collection} already exists, skipping!")
return exists
metadata = obj.metadata.to_serial_dict
metadata['ext'] = ext
filename = obj.metadata.filename
_id = fs.put(obj.to_buffer(ext),
filename=filename,
metadata=metadata)
return _id
[docs] def find(self,
filter: dict,
collection: str = 'analysis',
ext: str = '.png',
version: int = -1,
no_cursor_timeout: bool = False) -> list:
"""
Find and return objects from the specified collection that match the provided filter.
This method finds and returns objects from the specified collection of the MongoDB database
that match the provided filter. The objects are returned as instances of the class associated
with the collection in the class mapping. The objects are sorted by their upload date in the
order specified by the `version` parameter.
Parameters
----------
filter : dict
A dictionary specifying the filter criteria to use when searching for the objects.
collection : str, optional
The name of the collection to search in, defaults to 'analysis'.
ext : str, optional
The file extension to use when converting the objects to buffers, defaults to '.png'.
version : int, optional
The sort order for the objects based on their upload date. If `version` is -1, the objects
are sorted in descending order. If `version` is 1, the objects are sorted in ascending order,
defaults to -1.
no_cursor_timeout : bool, optional
Whether to prevent the server-side cursor from timing out after an inactivity period,
defaults to False.
Returns
-------
list
A list of objects from the specified collection that match the provided filter. The objects
are returned as instances of the class associated with the collection in the class mapping.
"""
Class = self.class_mapping[collection]
fs = self.fs[collection]
ret = []
if self.count_documents(filter, collection) != 0:
queries = fs.find(filter=filter,
no_cursor_timeout=no_cursor_timeout).sort("uploadDate", version)
for iq in queries:
ret.append(Class.from_buffer(iq.read(), iq.metadata))
return ret
[docs] def map_to(self,
func: Callable,
filter: dict,
collection_source: str,
collection_target: str,
verbose: bool = True,
no_cursor_timeout: bool = False):
"""
Apply a function to each object in the source collection that matches the provided filter
and insert the results into the target collection.
This method applies a function to each object in the source collection of the MongoDB database
that matches the provided filter. The results are inserted into the target collection. The
objects are retrieved as instances of the class associated with the source collection in the
class mapping.
Parameters
----------
func : Callable
The function to apply to each object. The function should take an object as input and
return an object.
filter : dict
A dictionary specifying the filter criteria to use when searching for the objects in
the source collection.
collection_source : str
The name of the source collection to search in.
collection_target : str
The name of the target collection to insert the results into.
verbose : bool, optional
Whether to print the filename of each object being processed, defaults to True.
no_cursor_timeout : bool, optional
Whether to prevent the server-side cursor from timing out after an inactivity period,
defaults to False.
Returns
-------
None
"""
Class = self.class_mapping[collection_source]
fs = self.fs[collection_source]
queries = fs.find(filter=filter,
no_cursor_timeout=no_cursor_timeout)
if queries is None:
print('There are no matching entries to the provided filter')
for iq in queries:
obj = Class.from_buffer(iq.read(), iq.metadata)
ext = obj.metadata['ext']
if verbose:
print(iq.filename)
self.insert(func(obj), ext=ext, collection=collection_target)
[docs] def find_one(self, filter: Optional[dict] = None, collection: Optional[str] = None) -> object:
"""
Find and return one object from the specified collection that matches the provided filter.
This method finds and returns one object from the specified collection of the MongoDB database
that matches the provided filter. The object is returned as an instance of the class associated
with the collection in the class mapping. If no collection is specified, one is chosen randomly.
Parameters
----------
filter : dict, optional
A dictionary specifying the filter criteria to use when searching for the object. If not
provided, the first object in the collection is returned.
collection : str, optional
The name of the collection to search in. If not provided, a collection is chosen randomly.
Returns
-------
object
An object from the specified collection that matches the provided filter. The object is
returned as an instance of the class associated with the collection in the class mapping.
Raises
------
ValueError
If no object is found that matches the provided filter.
"""
if collection is None:
collection = choice(list(self.fs))
Class = self.class_mapping[collection]
fs = self.fs[collection]
iq = fs.find_one(filter)
if iq is not None:
return Class.from_buffer(iq.read(), iq.metadata)
else:
raise ValueError(f'No entry found with the filter: {str(filter)}')
[docs] def find_with_id(self, _id: str, collection: str) -> object:
"""
Find and return an object from the specified collection based on its MongoDB _id.
This method finds and returns an object from the specified collection of the MongoDB database
based on its MongoDB _id. The object is returned as an instance of the class associated with
the collection in the class mapping.
Parameters
----------
_id : str
The MongoDB _id of the object to find.
collection : str
The name of the collection to search in.
Returns
-------
object
An object from the specified collection with the provided MongoDB _id. The object is
returned as an instance of the class associated with the collection in the class mapping.
"""
Class = self.class_mapping[collection]
fs = self.fs[collection]
iq = fs.find_one(
{'_id': _id if type(_id) is ObjectId else ObjectId(_id)})
return Class.from_buffer(iq.read(), iq.metadata)
[docs] def count_documents(self, filter: dict, collection: str) -> int:
"""
Count the number of documents in the specified collection that match the provided filter.
This method counts the number of documents in the specified collection of the MongoDB database
that match the provided filter. If the collection does not exist, it returns 0.
Parameters
----------
filter : dict
A dictionary specifying the filter criteria to use when counting the documents.
collection : str
The name of the collection to count the documents in.
Returns
-------
int
The number of documents in the specified collection that match the provided filter.
"""
if collection not in self.fs:
return 0
fs = self.fs[collection]
cursor = fs.find()
return cursor.collection.count_documents(filter=filter)
[docs] def export_to_files(self,
destination: str,
filter: dict,
collection: str,
ext: str = '.png',
verbose: bool = True,
no_cursor_timeout: bool = False):
"""
Export objects from the specified collection that match the provided filter to files.
This method exports objects from the specified collection of the MongoDB database that match
the provided filter to files. The objects are saved as files in the specified destination
directory. The objects are retrieved as instances of the class associated with the collection
in the class mapping.
Parameters
----------
destination : str
The path to the directory where the files should be saved.
filter : dict
A dictionary specifying the filter criteria to use when searching for the objects.
collection : str
The name of the collection to search in.
ext : str, optional
The file extension to use when saving the objects, defaults to '.png'.
verbose : bool, optional
Whether to print the path of each file being saved, defaults to True.
no_cursor_timeout : bool, optional
Whether to prevent the server-side cursor from timing out after an inactivity period,
defaults to False.
Returns
-------
None
"""
Class = self.class_mapping[collection]
fs = self.fs[collection]
queries = fs.find(filter=filter,
no_cursor_timeout=no_cursor_timeout)
dst = Path(destination)
dst.mkdir(exist_ok=True)
for iq in queries:
obj = Class.from_buffer(iq.read(), iq.metadata)
ext = obj.metadata['ext']
path = dst / iq.filename
if verbose:
print(path.as_posix())
obj.to_file(path.with_suffix(ext))
[docs] def drop_collection(self, collection: str):
"""
Drop the specified collection from the MongoDB database.
This method drops the specified collection and its associated 'files' and 'chunks' collections
from the MongoDB database.
Parameters
----------
collection : str
The name of the collection to drop.
Returns
-------
None
"""
for x in ['files', 'chunks']:
print(f'drop {collection}.{x}')
self.db.drop_collection(f'{collection}.{x}')
return
[docs] def delete(self, filter: dict, collection: str):
"""
Delete documents from the specified collection that match the provided filter.
This method deletes documents from the specified collection of the MongoDB database
that match the provided filter.
Parameters
----------
filter : dict
A dictionary specifying the filter criteria to use when deleting the documents.
collection : str
The name of the collection to delete the documents from.
Returns
-------
None
"""
fs = self.fs[collection]
queries = fs.find(filter)
for iq in queries:
fs.delete(iq._id)
return
[docs] def delete_database(self):
"""
Delete the MongoDB database associated with this Database instance.
This method deletes the MongoDB database that this Database instance is connected to.
Returns
-------
None
"""
self.client.drop_database(self.name)
@property
def collection_names(self) -> List[str]:
"""
Get the names of the collections in the MongoDB database.
This method returns the names of the collections in the MongoDB database that this Database
instance is connected to. The '.files' suffix is removed from the collection names.
Returns
-------
List[str]
A list of the names of the collections in the MongoDB database.
"""
return [x.replace(".files", '')for x in self.db.list_collection_names() if 'files' in x]
@property
def connected(self) -> bool:
"""
Check if the Database instance is currently connected to the MongoDB database.
This method checks if the Database instance is currently connected to the MongoDB database
by attempting to retrieve the server information. If the server information is successfully
retrieved, the method returns True. If a ServerSelectionTimeoutError occurs, the method
prints the error and returns False.
Returns
-------
bool
True if the Database instance is currently connected to the MongoDB database, False otherwise.
"""
try:
self.client.server_info() # force connection on a request as the
return True
except pymongo.errors.ServerSelectionTimeoutError as err:
print(err)
return False
@property
def server_info(self) -> dict:
"""
Get the server information for the MongoDB database.
This method retrieves and returns the server information for the MongoDB database that this
Database instance is connected to.
Returns
-------
dict
A dictionary containing the server information for the MongoDB database.
"""
return self.client.server_info()
[docs]def dict2mongo_query(inp: dict, previous_key: str = '') -> dict:
"""
Convert a dictionary into a MongoDB query.
This function takes a dictionary and converts it into a MongoDB query. The keys of the
dictionary are concatenated with the `previous_key` parameter to form the keys of the query.
The values of the dictionary are used as the values of the query.
Parameters
----------
inp : dict
The dictionary to convert into a MongoDB query.
previous_key : str, optional
The key to prepend to the keys of the dictionary when forming the keys of the query,
defaults to an empty string.
Returns
-------
dict
The MongoDB query formed from the input dictionary.
"""
ret = []
for key in inp:
if isinstance(inp[key], dict):
if len(inp[key]) != 0:
ret.append(dict2mongo_query(inp[key],
previous_key=previous_key+'.'+key,
))
else:
ret.append({previous_key+'.'+key: inp[key]})
else:
ret.append({previous_key+'.'+key: inp[key]})
ret_p = []
for item_1 in ret:
if isinstance(item_1, list):
for item_2 in item_1:
ret_p.append(item_2)
else:
ret_p.append(item_1)
return ret_p
[docs]def list_databases(host: str = 'localhost',
port: int = 27017,
username: str = '',
password: str = '') -> List[str]:
"""
List the names of all databases on a MongoDB server.
This function connects to a MongoDB server using the provided host, port, username, and password,
and returns a list of the names of all databases on the server.
Parameters
----------
host : str, optional
The host IP address or hostname where the MongoDB server is running, defaults to 'localhost'.
port : int, optional
The port number to connect to the MongoDB server, defaults to 27017.
username : str, optional
The username for authenticating with the MongoDB server, defaults to an empty string.
password : str, optional
The password for authenticating with the MongoDB server, defaults to an empty string.
Returns
-------
List[str]
A list of the names of all databases on the MongoDB server.
"""
uri = "mongodb://%s%s%s:%d" % (username, password, host, port)
client = pymongo.MongoClient(uri)
database_names = client.list_database_names()
return database_names
[docs]def dump(db: Optional[str] = None,
host: Optional[str] = None,
port: Optional[int] = None,
username: Optional[str] = None,
password: Optional[str] = None,
out: Optional[str] = None,
collection: Optional[str] = None,
):
"""
Dump a MongoDB database or collection to a BSON file.
This function uses the `mongodump` command to dump a MongoDB database or collection to a BSON
file. The `mongodump` command is a utility for creating a binary export of the contents of a
database.
Parameters
----------
db : str, optional
The name of the database to dump. If not provided, all databases are dumped.
host : str, optional
The host IP address or hostname where the MongoDB server is running. If not provided,
'localhost' is used.
port : int, optional
The port number to connect to the MongoDB server. If not provided, 27017 is used.
username : str, optional
The username for authenticating with the MongoDB server. If not provided, no authentication
is used.
password : str, optional
The password for authenticating with the MongoDB server. If not provided, no authentication
is used.
out : str, optional
The directory where the dump should be output. If not provided, the dump is output to the
'dump' directory in the current working directory.
collection : str, optional
The name of the collection to dump. If not provided, all collections in the specified
database are dumped.
Returns
-------
None
"""
command = 'mongodump '
tags = locals()
for itag in tags:
if tags[itag] is not None and itag != 'command':
command += f'--{itag}={tags[itag]} '
print(command)
print(subprocess.check_output(command, shell=True))
[docs]def restore(path: Optional[str] = None,
db: Optional[str] = None,
host: Optional[str] = None,
port: Optional[int] = None,
username: Optional[str] = None,
password: Optional[str] = None,
collection: Optional[str] = None,
):
"""
Restore a MongoDB database or collection from a BSON file.
This function uses the `mongorestore` command to restore a MongoDB database or collection from
a BSON file. The `mongorestore` command is a utility for creating a binary import from the
contents of a BSON file.
Parameters
----------
path : str, optional
The path to the BSON file. If not provided, the 'dump' directory in the current working
directory is used.
db : str, optional
The name of the database to restore. If not provided, all databases are restored.
host : str, optional
The host IP address or hostname where the MongoDB server is running. If not provided,
'localhost' is used.
port : int, optional
The port number to connect to the MongoDB server. If not provided, 27017 is used.
username : str, optional
The username for authenticating with the MongoDB server. If not provided, no authentication
is used.
password : str, optional
The password for authenticating with the MongoDB server. If not provided, no authentication
is used.
collection : str, optional
The name of the collection to restore. If not provided, all collections in the specified
database are restored.
Returns
-------
None
"""
command = 'mongorestore '
tags = locals()
for itag in tags:
if tags[itag] is not None and itag not in ['command', 'path']:
command += f'--{itag}={tags[itag]} '
command += path
print(command)
subprocess.check_output(command, shell=True)