Source code for CoREMOF.calculation.mof_collection

import os
import glob
import json
import time
import pickle
import shutil
import random
import warnings
import pandas as pd
import numpy as np
import matplotlib.pylab as plt
from multiprocessing import Process, cpu_count, Array
from CoREMOF.calculation.mof import Helper
from CoREMOF.calculation.mof import MofStructure
from CoREMOF.calculation.atomic_parameters import Atom
from sys import exit
pd.options.display.max_rows = 1000


[docs] class MofCollection: """A collection to hold and analyse MOF structures from CIF files. Create a MofCollection from a list of path names. Args: path_list: List of paths to MOF CIF files to be added to the collection. analysis_folder: Path to the folder where the results will be stored.""" separator = "".join(['-'] * 50) def __init__(self, path_list, analysis_folder='analysis_folder'): """ """ self._analysis_folder = analysis_folder self.path_list = path_list self.mof_coll = [] self.batches = [] self._metal_site_df = None self._mof_oms_df = None self._properties = {} self.load_balance_index = {} self.analysis_limit = None self.filter_functions = { "density": self._apply_filter_range, "oms_density": self._apply_filter_range, "uc_volume": self._apply_filter_range, "metal_species": self._apply_filter_in_value, "non_metal_species": self._apply_filter_in_value, "cif_okay": self._apply_filter_value, "has_oms": self._apply_filter_value, "mof_name": self._apply_value_in_filter } self._load_mofs() def __len__(self): return len(self.mof_coll) def __repr__(self): print_str = self.separator print_str += "\nThis collection holds information for " print_str += "{} MOFs.\n".format(len(self)) if self.analysis_folder is None: print_str += "Analysis folder is not set.\n" else: f = os.path.abspath(self.analysis_folder) print_str += "Analysis folder is: {}\n\n".format(f) print_str += "List of cif files in collection:\n\n" for mc in self.mof_coll: print_str += "{}\n".format(mc['mof_file']) print_str += self.separator return print_str @property def analysis_folder(self): """Get value of the analysis folder.""" Helper.make_folder(self._analysis_folder) return self._analysis_folder @analysis_folder.setter def analysis_folder(self, analysis_folder): """Set value of the analysis folder.""" self._analysis_folder = analysis_folder @property def oms_results_folder(self): """Get value of the OMS results folder.""" orf = self.analysis_folder + '/oms_results' Helper.make_folder(orf) return orf @property def summary_folder(self): """Get value of the summary folder.""" sf = self.analysis_folder + '/summary' Helper.make_folder(sf) return sf @property def _properties_filename(self): """Get value of the properties pickle file.""" return self.analysis_folder + '/properties.pickle' @property def properties(self): """Get value for the MOF properties. If the property variable is not None and the pickle file exists, then load the file and return it.""" if not self._properties and os.path.isfile(self._properties_filename): with open(self._properties_filename, 'rb') as properties_file: self._properties = pickle.load(properties_file) return self._properties @property def mof_oms_df(self): """Get a pandas DataFrame that lists for each MOF whether it has an OMS or not and if it has an OMS what metal types it is. """ if self._mof_oms_df is not None: return self._mof_oms_df if not self._validate_properties(['has_oms'])[1]: print('OMS analysis not finished for all MOFs in collection.') return False mof_info = {} for mi in self.mof_coll: mp = self.properties[mi['checksum']] if 'metal_sites' not in mp: continue metal_sites = mp['metal_sites'] if len(metal_sites) == 0: print('No Metal Found in {}'.format(mp['name'])) oms_types = [ms["metal"] for ms in metal_sites if ms["is_open"] and ms["unique"]] oms_types = list(set(oms_types)) if oms_types: oms_types = ",".join(oms_types) else: oms_types = "N/A" if mp['has_oms']: has_oms = 'Yes' else: has_oms = 'No' all_metal_species = ",".join(set(mp['metal_species'])) mof_info[mp['name']] = {'Metal Types': all_metal_species, 'Has OMS': has_oms, 'OMS Types': oms_types} self._metal_site_df = pd.DataFrame.from_dict(mof_info, orient='index') return self._metal_site_df @property def metal_site_df(self): """Get a pandas DataFrame that lists the OMS results for each metal type. """ if self._metal_site_df is not None: return self._metal_site_df if not self._validate_properties(['has_oms'])[1]: print('OMS analysis not finished for all MOFs in collection.') return False site_info = {} for mi in self.mof_coll: mp = self.properties[mi['checksum']] if 'metal_sites' not in mp: continue metal_sites = mp['metal_sites'] if len(metal_sites) == 0: print('No Metal Found in {}'.format(mp['name'])) for i, ms in enumerate(metal_sites): key = mp['name'] + '_' + str(i) site_info[key] = ms if 'all_dihedrals' in ms: del site_info[key]['all_dihedrals'] if 'min_dihedral' in ms: del site_info[key]['min_dihedral'] site_info[key]['mof_name'] = mp['name'] self._metal_site_df = pd.DataFrame.from_dict(site_info, orient='index') return self._metal_site_df
[docs] @classmethod def from_folder(cls, collection_folder, analysis_folder='analysis_folder', name_list=None): """Create a MofCollection from a the CIF files in a folder. Args: collection_folder: Path to the folder containing the CIF files to be added to the collection. analysis_folder: Path to the folder where the results will be stored. name_list: List of MOF names to include in the collection. If set, all the other CIF files in the folder will be excluded. Returns: - A MofCollection object holding the specified MOF structures. """ if name_list: print(cls.separator) print('Using only MOFs in the name list.') print(cls.separator) d = collection_folder path_list = [d+'/'+name for name in name_list] else: path_list = glob.glob(collection_folder + "/*.cif") return cls(path_list, analysis_folder)
[docs] def analyse_mofs(self, overwrite=False, num_batches=1, analysis_limit=None): """Run OMS analysis for the MOFs in the collection. Args: overwrite: Controls if the results will be overwritten or not. num_batches: Sets the number of batches the structures will be split in and analyzed on a separate process. analysis_limit: Analyze only up to the number of MOFs set by analysis_limit, if set to None all MOFs will be analyzed. """ print(self.separator) print("Running OMS Analysis...") self.analysis_limit = analysis_limit t0 = time.time() self._make_batches(num_batches, overwrite) status = Array('i', [0 for i in range(num_batches)]) for i, batch in enumerate(self.batches): p = Process(target=self._run_batch, args=(i, batch, overwrite,status)) p.start() lbs = [len(batch)/100.0 for batch in self.batches] wait_time = 0.0 status_prev = [0 for i in range(num_batches)] while True: # Create a list from the shared array to make sure it doesnt change # during the iteration status_ = list(status) if all([sp == s for sp, s in zip(status_prev, status_)]): wait_time = min(25, 0.1+wait_time) time.sleep(wait_time) status_prev = status_ sout = ["Batch {} Finished.".format(b + 1) if len(self.batches[b]) == 0 or s < 0 else "Batch {} {:.2f} % : Analysing {:}" "".format(b+1, (s+1)/lbs[b], self.batches[b][s]['mof_name']) for b, s in enumerate(status_)] print("|**| ".join(sout) + 100 * " ", end='\r', flush=True) if all([s < 0 for s in status_]): break if overwrite: for mi in self.mof_coll: self._update_property_from_oms_result(mi) self._validate_properties(['has_oms']) t1 = time.time() print('\nAnalysis Finished. Time required:{:.2f} sec'.format(t1 - t0)) print(self.separator)
[docs] def check_structures(self): """Iterate over all the MOFs in the collection and validate that they can be read and a MofStructure can be created. """ self._validate_properties(['cif_okay']) not_read = [mi for mi in self.mof_coll if not self.properties[mi['checksum']]['cif_okay']] read_len = len(self.mof_coll) - len(not_read) print('\nChecked {} structures.'.format(len(self.mof_coll))) msg1 = {0: '\r', 1: '{} was read.'.format(read_len), 2: '{} were read.'.format(read_len)} msg2 = {0: '\r', 1: '{} was NOT read.'.format(len(not_read)), 2: '{} were NOT read.'.format(len(not_read))} print(msg1[min(2, read_len)]) print(msg2[min(2, len(not_read))]) msg = {0: "\r", 1: "\nThe following structures could not be read:"} print(msg[min(1, len(not_read))]) for i, mi in enumerate(not_read): print("{}".format(mi['mof_name'])) mofs_no_metal = [mi for mi in self.mof_coll if self.properties[mi['checksum']]['cif_okay'] and not self.properties[mi['checksum']]['metal_species']] msg = {0: "\r", 1: "The following structures contain no metal:"} print(msg[min(1, len(mofs_no_metal))]) for mi in mofs_no_metal: p = self.properties[mi['checksum']] print("{}.cif {}".format(p['name'], p['metal_species']+p['non_metal_species'])) print('\nFinished checking structures.')
[docs] def check_analysis_status(self): """Iterate over all the MOFs in the collection and check if the results from the OMS analysis exist. """ print(self.separator) not_done = [mi['mof_file'] for mi in self.mof_coll if not self._check_if_results_exist(mi['mof_name'])] done = len(self.mof_coll) - len(not_done) msg1 = {0: '\nAnalysis for no structures has been completed.', 1: '\nAnalysis for {} out of {} structures have been completed.' .format(done, len(self.mof_coll))} msg2 = {0: "\r", 1: "\nThe following structures are missing:"} print(msg1[min(1, done)]) print(msg2[min(1, len(not_done))]) for nd in not_done: print(nd) print(self.separator)
[docs] def sample_collection(self, sample_size=50): """Randomly select a sample of MOFs in the collection and return a new collection with the MOFs in the sample. Args: sample_size: Number of MOFs to be selected. Default value is 50. """ ll = len(self.mof_coll) if sample_size > ll: sample_size = ll print(f"Can only sample up to the number of MOFs " f"in the collection ({ll}).") mof_list = [mi['mof_file'] for mi in self.mof_coll] sampled_list = random.sample(mof_list, sample_size) return MofCollection(sampled_list, analysis_folder=self.analysis_folder)
[docs] def filter_collection(self, using_filter=None, new_collection_folder=None, new_analysis_folder=None): """Filter a collection given a number of filters. Calling this method of a MofCollection applies the filter and creates a new collection for the MOFs that match the filter. The cif files that match the filter are copied to the new_collection_folder. The filters can be one or more of the following: 'density': [min, max] (range of values), 'oms_density': [min, max] (range of values), 'uc_volume': [min, max] (range of values), 'metal_species': ["Cu", "Zn", ...] (list of metal species), 'non_metal_species': ["C", "N", ...] (list of non metal species), 'cif_okay': True (boolean value), 'has_oms': True (boolean value), 'mof_name': [mof_name1, mof_name2] (string values) Args: using_filter: Filter used to identify MOFs with certain characteristics. Has to be a python dictionary. new_collection_folder: Path to the folder where the CIF files of the filtered collection will be stored. If set to None the CIF files will not be copied. new_analysis_folder: Path to the folder where the OMS result files of the filtered collection will be stored. If set to None the result files will not be copied. Returns: - A MofCollection with only the filtered MOFs. If new_collection_folder or new_analysis_folder is not set then the collection will point to the original location of these files. """ print(self.separator) if any([f not in self.filter_functions for f in using_filter]): print('Unknown filter. Try again using one of the following ' 'filters:\n\"{}\"'.format(", ".join(self.filter_functions))) print(self.separator) return validation_level, cf = self._validate_properties(using_filter) if validation_level == 1 and not cf: print('Properties from CIF files could not be validated.' 'Check that all CIF files can be read') return elif validation_level == 2 and not cf: print('Requested a filter that needs OMS information but the ' 'OMS analysis does not appear to be complete.\n' 'Run it first and try again.') return print(self.separator) print('Filtering collection.') filtered_list = [] for i, mi in enumerate(self.mof_coll): mp = self.properties[mi['checksum']] fun = self._apply_filter if all([fun(f, mp[f], using_filter[f]) for f in using_filter]): filtered_list.append(mi['mof_file']) found_s = {0: "No", 1: len(filtered_list)}[min(1, len(filtered_list))] print('\n{} MOFs were matched using the provided' ' filter.'.format(found_s)) if len(filtered_list) == 0: print('No collection returned.') return None print('Returning a new collection using the matched MOFs.') sub_collection = MofCollection(filtered_list, analysis_folder=self.analysis_folder) print(self.separator) sub_collection.copy_cifs(new_collection_folder) sub_collection.copy_results(new_analysis_folder) return sub_collection
[docs] def read_cif_files(self): """Iterate over all MOF files in the collection, load each CIF and store MOF properties such as density, unit cell volume etc. """ print(self.separator) print('Reading CIF files and updating properties...') self._loop_over_collection(self._update_property_from_cif_file) self._store_properties() print('Done') print(self.separator)
[docs] def read_oms_results(self): """Iterate over all MOF files in the collection, load each OMS result file and store OMS information to the MOF properties. """ print(self.separator) print('Adding results to properties.') self._loop_over_collection(self._update_property_from_oms_result) print('Done') self._store_properties() print(self.separator)
[docs] def copy_cifs(self, target_folder): """Copy cif files from their existing location to the specified target_folder. Args: target_folder: Path of folder to copy collection CIF files to. """ if target_folder is None: return tf_abspath = os.path.abspath(target_folder) Helper.make_folder(tf_abspath) print(self.separator) print('The cif files for this collection will be copied to' ' the specified folder:\n\"{}\"'.format(tf_abspath)) print('The cif paths will be updated.') for i, mi in enumerate(list(self.mof_coll)): destination_path = "{}/{}.cif".format(tf_abspath, mi['mof_name']) self.mof_coll[i] = {"mof_name": mi['mof_name'], "mof_file": destination_path, "checksum": mi['checksum']} if not os.path.isfile(destination_path): shutil.copyfile(mi['mof_file'], destination_path) print(self.separator)
[docs] def copy_results(self, target_folder): """Copy OMS result files from their existing location to the specified target_folder. Args: target_folder: Path of folder to copy collection OMS result files to. """ if target_folder is None: return print(self.separator) tf_abspath = os.path.abspath(target_folder) destination_path = tf_abspath + '/oms_results' print('The result files for this collection will be copied to the ' 'specified folder:\n{}\nThe analysis folder will be updated.' ''.format(tf_abspath)) Helper.make_folder(tf_abspath) Helper.make_folder(destination_path) for i, mi in enumerate(self.mof_coll): mof_name = mi['mof_name'] if self._check_if_results_exist(mof_name): source_path = "{}/{}".format(self.oms_results_folder, mof_name) Helper.copy_folder(destination_path, source_path) self.analysis_folder = tf_abspath self._validate_properties(['has_oms']) print(self.separator)
[docs] def summarize_results(self, max_atomic_number=None): """Create a summary table for the OMS results of the collection, group results by metal type. Args: max_atomic_number: Maximum atomic number to be included in summary table. If not defined all metal atoms will be considered. """ df = self.metal_site_df.copy() site_df_u = df.loc[df['unique']] site_df_o = site_df_u.loc[site_df_u['is_open']] all_sites = self._group_and_summarize(site_df_u, ['MOFs', 'Metal Sites']) open_sites = self._group_and_summarize(site_df_o, ['MOFs_with_OMS', 'OMS']) s_df = pd.concat([all_sites, open_sites], axis=1) s_df.fillna(0.0, inplace=True) s_df = s_df.astype(int) s_df['MOFs_with_OMS(%)'] = 100.0 * s_df['MOFs_with_OMS']/s_df['MOFs'] s_df['OMS (%)'] = 100.0 * s_df['OMS'] / s_df['Metal Sites'] cols = ['MOFs', 'MOFs_with_OMS', 'Metal Sites', 'OMS', 'MOFs_with_OMS(%)', 'OMS (%)'] s_df = s_df[cols] s_df['MOFs_with_OMS(%)'] = s_df['MOFs_with_OMS(%)'].apply('{:.2f} %' ''.format) s_df['OMS (%)'] = s_df['OMS (%)'].apply('{:.2f} %'.format) s_df.sort_values("MOFs", inplace=True, ascending=False) num_mofs = df['mof_name'].nunique() num_oms_mofs = df[df['is_open']]['mof_name'].nunique() num_sites = len(site_df_u) num_oms_sites = len(site_df_u[site_df_u['is_open']]) print(self.separator) print('Number of total MOFs: {}'.format(num_mofs)) print('Number of total MOFs with open metal sites: {}' ''.format(num_oms_mofs)) print('Number of total unique sites: {}'.format(num_sites)) print('Number of total unique open metal sites: {}' ''.format(num_oms_sites)) print(self.separator) msg = "Summary Table\n" fname = "{0}/stats.out".format(self.summary_folder, max_atomic_number) if max_atomic_number: subset = pd.Series(s_df.index).apply( lambda x: Atom(x).atomic_number <= max_atomic_number) s_df = s_df.loc[subset.values] fname = "{0}/stats_less_{1}.out".format(self.summary_folder, max_atomic_number) msg = "Summary Table for metal atoms with atomic number smaller " \ "than {}.\n".format(max_atomic_number) print(msg) print(s_df) s_df.to_csv(fname, sep=' ')
[docs] def summarize_tfactors(self): """Summarize the t-factor information and make histograms for all the MOFs in the collection. """ tfac_analysis_folder = self.summary_folder + '/tfac_analysis' Helper.make_folder(self.summary_folder) Helper.make_folder(tfac_analysis_folder) df = self.metal_site_df.copy() sites_u = df[df['unique']] for n in range(4, 7): self._write_t_factors(sites_u, n, tfac_analysis_folder)
def _load_mofs(self): """Add MOfs to collection, use CIF file checksum as an identifier.""" print('Loading CIF files...') li = max(int(len(self.path_list) / 1000), 1) lm = len(self.path_list) / 100.0 for i, mof_file in enumerate(self.path_list): if i % li == 0: print("{:4.1f} %".format((i+1) / lm), end="\r", flush=True) checksum = Helper.get_checksum(mof_file) mof_name = os.path.splitext(os.path.basename(mof_file))[0] mof_info = {"mof_name": mof_name, "mof_file": mof_file, "checksum": checksum} self.mof_coll.append(mof_info) if checksum not in self.properties: self.properties[checksum] = {"mof_name": mof_name} else: if self.properties[checksum]["mof_name"] != mof_name: exit("MOF name and CIF checksum mismatch for {}.cif " "{}.cif. Either the CIF files has already been " "processed with a different name, or the CIF file " "has changed since it was processed." "".format(mof_name, self.properties[checksum]['mof_name'])) if self._check_if_results_exist(mof_name): self._compare_checksums(mof_file, mof_name, checksum) print("\nAll Done.") self._store_properties() def _compare_checksums(self, mof_file, mof_name, checksum): """If OMS results exist for one of the CIF names in the collection then ensure that the CIF checksum matches the one in the result file. """ mof_folder = "{0}/{1}/".format(self.oms_results_folder, mof_name) results_file = "{0}/{1}.json".format(mof_folder, mof_name) with open(results_file, 'r') as f: results_dict = json.load(f) if results_dict['checksum'] != checksum: print("Results for a MOF named {0} appear to already exist" " in the analysis folder \n\"{1}\".\nHowever the " "file checksum in the result file does not match the " "checksum of \n\"{2}\".\n\nHave the CIF files in the " "collection changed since the results were computed?" "\nClear results and try again.".format(mof_name, mof_folder, mof_file)) exit(1) def _run_batch(self, b, batch, overwrite, status): """Run OMS analysis for each of the batches.""" for i, mi in enumerate(batch): status[b] = i self._analyse(mi, overwrite) status[b] = -1 def _analyse(self, mi, overwrite): """For a given CIF file, create MofStructure object and run OMS analysis. If overwrite is false check if results already exist first. """ mof_folder = "{}/{}".format(self.oms_results_folder, mi['mof_name']) results_exist = self._check_if_results_exist(mi['mof_name']) if not overwrite and results_exist: print("Skipping {}. Results already exist and overwrite is set " "to False.".format(mi['mof_name'])) return mof = self._create_mof_from_cif_file(mi['mof_file']) if mof.summary['cif_okay']: mof.analyze_metals(output_folder=mof_folder) def _make_batches(self, num_batches=1, overwrite=False): """Split collection into number of batches Args: num_batches: Number of batches (default: 1) overwrite: Controls if the results will be overwritten or not (default: False) """ print(self.separator) if cpu_count() < num_batches: warnings.warn('You requested {} batches but there are only {}' ' CPUs available.'.format(num_batches, cpu_count())) b_s = {1: 'batch', 2: 'batches'}[min(num_batches, 2)] print('{} {} requested. '.format(num_batches, b_s)) print('Overwrite is set to {}. '.format(overwrite)) print('Storing results in {}. '.format(self.oms_results_folder)) print(self.separator) self._validate_properties(['load_balancing_index']) print(self.separator) lbi = {} for mi in self.mof_coll: mp = self.properties[mi['checksum']] lbi[mi['mof_name']] = mp['load_balancing_index'] # Remove any structures not in load balancing index. subset = [mc for mc in self.mof_coll if mc['mof_name'] in lbi] # If there is no balancing info for a MOF at this point it means # that it could not be read. if len(self.mof_coll) != len(subset): print('\nSkipping {} structures that could not be read.' ' '.format(len(self.mof_coll)-len(subset))) # Remove any structures already completed if not overwrite: print('Checking if results for any of the MOFs exist...') all_ = len(subset) subset = [mc for mc in subset if not self._check_if_results_exist(mc['mof_name'])] msg = {0: "Will not skip any MOFs", 1: "Skipping {} MOFs because results were found. " "".format(all_ - len(subset))} print(msg[min(1, all_ - len(subset))]) # Sort mof list using the load balancing index subset.sort(key=lambda x: lbi[x['mof_name']]) sum_load_balance = sum(lbi[mi["mof_name"]] for mi in subset) lb_per_batch = sum_load_balance / num_batches # Select only up to analysis_limit to work with if self.analysis_limit and len(subset) > self.analysis_limit: subset = subset[0:self.analysis_limit] self.batches = [[] for b in range(num_batches)] for i, mi in enumerate(subset): sum_lb = sum([lbi[mi["mof_name"]] for mi in subset[0:i]]) batch = int(sum_lb / lb_per_batch) self.batches[batch].append(mi) print(self.separator) for i, batch in enumerate(self.batches): print("Batch {0} has {1} MOFs".format(i+1, len(batch))) print(self.separator) def _check_if_results_exist(self, mof_name): """Check if OMS results already exist for a MOF""" mof_folder = "{}/{}".format(self.oms_results_folder, mof_name) if os.path.isfile(mof_folder+'/'+mof_name+'.json'): if not os.path.isfile(mof_folder + '/' + 'analysis_running'): return True return False def _loop_over_collection(self, func): """Iterate over all the MOFs in the collection and run the specified function. Args: func: Function to use. """ li = max(int(len(self.mof_coll) / 1000), 1) lm = len(self.mof_coll) / 100 for i, mi in enumerate(self.mof_coll): if i % li == 0: print("{:4.1f} % {} {:100}".format((i+1)/lm, mi['mof_name'], " "), end="\r", flush=True) func(mi) print() def _apply_filter(self, filter_, v, f): """Apply the proper filter_function for the given filter""" return self.filter_functions[filter_](v, f) @staticmethod def _apply_filter_value(v, f): """Filter function to match a value. Returns false if values is None""" if not v: return False return v == f @staticmethod def _apply_filter_in_value(v, f): """Filter function to match all values of a list""" if not v: return False return all([f_ in v for f_ in f]) @staticmethod def _apply_value_in_filter(v, f): """Filter function to match any of the values of a list""" if not v: return False return v in f @staticmethod def _apply_filter_range(v, f): """Filter function to match a range of values""" if not v: return False return min(f) <= v <= max(f) def _validate_properties(self, keys): """Check if a given property can be found in the properties dictionary. If not try to read the CIF file and check again. If the check fails again try to read the OMS results and check again. If the check fails a third time return False, the property cannot be validated.""" msg = {1: "Validating property", 2: "Validating properties"} print('\n{} : '.format(msg[min(2, len(keys))]), end='') print("\"{}\"".format(", ".join([k for k in keys]))) validation_level = 0 li = max(int(len(self.mof_coll)/1000), 1) lm = len(self.mof_coll) / 100 for i, mi in enumerate(self.mof_coll): if i % li == 0: print("{:4.1f} % {} {:100}".format((i+1) / lm, mi['mof_name'], " "), end="\r", flush=True) mp = self.properties[mi['checksum']] if not self._validate_property(mp, keys): self._update_property_from_cif_file(mi) validation_level = 1 if not self._validate_property(mp, keys): self._update_property_from_oms_result(mi) validation_level = 2 if not self._validate_property(mp, keys): self._store_properties() print('\nProperty Missing\n{}'.format(self.separator)) return validation_level, False self._store_properties() print("Validated 100 % "+100*" ", end="\r") print() return validation_level, True @staticmethod def _validate_property(mp, keys): """Check if property exists.""" test1 = all([f in mp for f in keys]) if test1 and all([mp[f] != 'N/A' for f in keys]): return True if test1 and not mp['cif_okay']: return True return False def _update_property_from_cif_file(self, mi): """Update properties dictionary from a CIF file.""" mp = self.properties[mi['checksum']] mof = self._create_mof_from_cif_file(mi['mof_file']) if mof: mp.update(mof.summary) self.load_balance_index[mi['mof_name']] = len(mof) * len(mof) mp['load_balancing_index'] = self.load_balance_index[mi['mof_name']] def _update_property_from_oms_result(self, mi): """Update properties dictionary from an OMS result file.""" mp = self.properties[mi['checksum']] mof_name = mp["mof_name"] mof_folder = "{0}/{1}/".format(self.oms_results_folder, mof_name) results_file = "{0}/{1}.json".format(mof_folder, mof_name) results_dict = None if os.path.isfile(results_file): results_dict = json.load(open(results_file)) if isinstance(results_dict, dict): results_dict['source_name'] = mof_folder mp.update(results_dict) def _store_properties(self): """Store properties dictionary as a python pickle file.""" with open(self._properties_filename, 'wb') as properties_file: pickle.dump(self._properties, properties_file) @staticmethod def _create_mof_from_cif_file(path_to_mof): """Create and return a MofStructure object from a path to a CIF file.""" mof = MofStructure.from_file(path_to_mof, primitive=False) return mof def _write_t_factors(self, sites, n, target): """Summarize the findings in table form and histograms for a give t-factor. """ s_n = sites.loc[sites['number_of_linkers'] == n].copy() s_n['is_open_yn'] = np.where(s_n['is_open'], 'yes', 'no') s_n = s_n[['mof_name', 'is_open_yn', 't_factor']] for flag in ['yes', 'no']: outpath = "{}/{}_{}.out".format(target, flag, str(n)) s = s_n[s_n['is_open_yn'] == flag] s.to_csv(outpath, index=False) fout = "{}/{}_{}_hist.out".format(target, flag, n) self._write_histogram(s['t_factor'], True, fout) fout = "{}/{}_{}_hist_abs.out".format(target, flag, n) self._write_histogram(s['t_factor'], False, fout) fig = plt.figure(figsize=(10, 5)) plt.title('t-{} factor'.format(n)) s_yes = s_n[s_n['is_open_yn'] == 'yes'] s_yes['t_factor'].hist(bins=50, range=(0, 1), normed=False) s_no = s_n[s_n['is_open_yn'] == 'no'] s_no['t_factor'].hist(bins=50, range=(0, 1), normed=False) plt.show() @staticmethod def _write_histogram(sites, dens, target): """Generate histograms to be used for summarizing the t-factor results. """ hist, edges = np.histogram(sites, bins=50, range=(0, 1), density=dens) with open(target, 'w') as hist_file: w = (edges[1] - edges[0]) / 2 for e, h in zip(edges, hist): print(e + w, h, file=hist_file) @staticmethod def _group_and_summarize(df, names=None): """Group the DataFrame holding the OMS results by metal type and rename its columns. """ rename = {"mof_name": names[0], "is_open": names[1]} agg_dict = {"mof_name": pd.Series.nunique, "is_open": "count"} return df.groupby('metal').agg(agg_dict).rename(columns=rename)