Source code for pybel_tools.summary.provenance

# -*- coding: utf-8 -*-

"""This module contains functions to summarize the provenance (citations, evidences, and authors) in a BEL graph"""

import itertools as itt
import logging
from collections import Counter, defaultdict
from datetime import datetime

from pybel.constants import *
from pybel.struct.filters import filter_edges
from pybel.struct.filters.edge_predicates import edge_has_annotation
from pybel.struct.summary import iterate_pubmed_identifiers
from ..filters import build_edge_data_filter, build_pmid_inclusion_filter
from ..utils import count_defaultdict, count_dict_values, graph_edge_data_iter

__all__ = [
    'count_pmids',
    'get_pmid_by_keyword',
    'count_citations',
    'count_citations_by_annotation',
    'count_authors',
    'count_unique_authors',
    'count_author_publications',
    'count_unique_citations',
    'get_authors',
    'get_authors_by_keyword',
    'count_authors_by_annotation',
    'get_evidences_by_pmid',
    'count_citation_years',
    'create_timeline',
    'get_citation_years'
]

log = logging.getLogger(__name__)


def _generate_citation_dict(graph):
    """Prepares a citation data dictionary from a graph. This is a helper function

    :param pybel.BELGraph graph: A BEL graph
    :return: A dictionary of {citation type: {(reference, name) -> {set of (source node, target node)}}}
    :rtype: dict[str,dict[tuple[tuple,tuple],str]]
    """
    results = defaultdict(lambda: defaultdict(set))

    for u, v, data in graph.edges_iter(data=True):
        if CITATION not in data:
            continue
        results[data[CITATION][CITATION_TYPE]][u, v].add(data[CITATION][CITATION_REFERENCE].strip())

    return dict(results)


[docs]def get_pmid_by_keyword(keyword, graph=None, pubmed_identifiers=None): """Gets the set of PubMed identifiers beginning with the given keyword string :param pybel.BELGraph graph: A BEL graph :param str keyword: The beginning of a PubMed identifier :param set[str] pubmed_identifiers: A set of pre-cached PubMed identifiers :return: A set of PubMed identifiers starting with the given string :rtype: set[str] """ if pubmed_identifiers is not None: return { pubmed_identifier for pubmed_identifier in pubmed_identifiers if pubmed_identifier.startswith(keyword) } if graph is None: raise ValueError('Graph not supplied') return { pubmed_identifier for pubmed_identifier in iterate_pubmed_identifiers(graph) if pubmed_identifier.startswith(keyword) }
[docs]def count_pmids(graph): """Counts the frequency of PubMed documents in a graph :param pybel.BELGraph graph: A BEL graph :return: A Counter from {(pmid, name): frequency} :rtype: collections.Counter """ return Counter(iterate_pubmed_identifiers(graph))
def get_citation_pair(data): return data[CITATION][CITATION_TYPE], data[CITATION][CITATION_REFERENCE].strip()
[docs]def count_unique_citations(graph): """Returns the number of unique citations :param pybel.BELGraph graph: A BEL graph :return: The number of unique citations in the graph. :rtype: int """ return len({ get_citation_pair(data) for data in graph_edge_data_iter(graph) if CITATION in data })
[docs]def count_citations(graph, **annotations): """Counts the citations in a graph based on a given filter :param pybel.BELGraph graph: A BEL graph :param dict annotations: The annotation filters to use :return: A counter from {(citation type, citation reference): frequency} :rtype: collections.Counter """ citations = defaultdict(set) annotation_dict_filter = build_edge_data_filter(annotations) for u, v, _, d in filter_edges(graph, annotation_dict_filter): if CITATION not in d: continue citations[u, v].add(get_citation_pair(d)) counter = Counter(itt.chain.from_iterable(citations.values())) return counter
[docs]def count_citations_by_annotation(graph, annotation): """Groups the citation counters by subgraphs induced by the annotation :param pybel.BELGraph graph: A BEL graph :param str annotation: The annotation to use to group the graph :return: A dictionary of Counters {subgraph name: Counter from {citation: frequency}} """ citations = defaultdict(lambda: defaultdict(set)) for u, v, data in graph.edges_iter(data=True): if not edge_has_annotation(data, annotation) or CITATION not in data: continue k = data[ANNOTATIONS][annotation] citations[k][u, v].add((data[CITATION][CITATION_TYPE], data[CITATION][CITATION_REFERENCE].strip())) return {k: Counter(itt.chain.from_iterable(v.values())) for k, v in citations.items()}
def check_authors_in_data(data): return CITATION not in data or CITATION_AUTHORS not in data[CITATION] def raise_for_unparsed_authors(data): authors = data[CITATION][CITATION_AUTHORS] if isinstance(authors, str): raise ValueError('Graph should be converted with pbt.mutation.parse_authors first: {}'.format(authors))
[docs]def count_authors(graph): """Counts the contributions of each author to the given graph :param pybel.BELGraph graph: A BEL graph :return: A Counter from {author name: frequency} :rtype: collections.Counter """ authors = [] for data in graph_edge_data_iter(graph): if check_authors_in_data(data): continue raise_for_unparsed_authors(data) for author in data[CITATION][CITATION_AUTHORS]: authors.append(author) return Counter(authors)
[docs]def count_author_publications(graph): """Counts the number of publications of each author to the given graph :param pybel.BELGraph graph: A BEL graph :return: A Counter from {author name: frequency} :rtype: collections.Counter """ authors = defaultdict(list) for data in graph_edge_data_iter(graph): if check_authors_in_data(data): continue raise_for_unparsed_authors(data) for author in data[CITATION][CITATION_AUTHORS]: authors[author].append(data[CITATION][CITATION_REFERENCE].strip()) return Counter(count_dict_values(count_defaultdict(authors)))
# TODO switch to use node filters
[docs]def get_authors(graph): """Gets the set of all authors in the given graph :param pybel.BELGraph graph: A BEL graph :return: A set of author names :rtype: set[str] """ result = set() for data in graph_edge_data_iter(graph): if check_authors_in_data(data): continue authors = data[CITATION][CITATION_AUTHORS] result.update( authors.strip().split('|') if isinstance(authors, str) else authors ) return result
[docs]def count_unique_authors(graph): """Counts all authors in the given graph :param pybel.BELGraph graph: A BEL graph :return: The number of unique authors whose publications contributed to the graph :rtype: int """ return len(get_authors(graph))
[docs]def get_authors_by_keyword(keyword, graph=None, authors=None): """Gets authors for whom the search term is a substring :param pybel.BELGraph graph: A BEL graph :param str keyword: The keyword to search the author strings for :param set[str] authors: An optional set of pre-cached authors calculated from the graph :return: A set of authors with the keyword as a substring :rtype: set[str] """ keyword_lower = keyword.lower() if authors is not None: return { author for author in authors if keyword_lower in author.lower() } if graph is None: raise ValueError('Graph not supplied') return { author for author in get_authors(graph) if keyword_lower in author.lower() }
[docs]def count_authors_by_annotation(graph, annotation='Subgraph'): """Groups the author counters by subgraphs induced by the annotation :param pybel.BELGraph graph: A BEL graph :param str annotation: The annotation to use to group the graph :return: A dictionary of Counters {subgraph name: Counter from {author: frequency}} :rtype: dict """ authors = defaultdict(list) for data in graph_edge_data_iter(graph): if not edge_has_annotation(data, annotation) or CITATION not in data or CITATION_AUTHORS not in data[CITATION]: continue if isinstance(data[CITATION][CITATION_AUTHORS], str): raise ValueError('Graph should be converted with pybel.mutation.parse_authors first') for author in data[CITATION][CITATION_AUTHORS]: authors[data[ANNOTATIONS][annotation]].append(author) return count_defaultdict(authors)
[docs]def get_evidences_by_pmid(graph, pmids): """Gets a dictionary from the given PubMed identifiers to the sets of all evidence strings associated with each in the graph :param pybel.BELGraph graph: A BEL graph :param str or iter[str] pmids: An iterable of PubMed identifiers, as strings. Is consumed and converted to a set. :return: A dictionary of {pmid: set of all evidence strings} :rtype: dict """ result = defaultdict(set) for _, _, _, data in filter_edges(graph, build_pmid_inclusion_filter(pmids)): result[data[CITATION][CITATION_REFERENCE]].add(data[EVIDENCE]) return dict(result)
# TODO date parsing should be handled during either pybel parse-time or during graph loading.
[docs]def count_citation_years(graph): """Counts the number of citations in each year :param pybel.BELGraph graph: A BEL graph :return: A Counter of {int year: int frequency} :rtype: collections.Counter """ result = defaultdict(set) for data in graph_edge_data_iter(graph): if CITATION not in data or CITATION_DATE not in data[CITATION]: continue try: dt = _ensure_datetime(data[CITATION][CITATION_DATE]) result[dt.year].add((data[CITATION][CITATION_TYPE], data[CITATION][CITATION_REFERENCE])) except: continue return count_dict_values(result)
def _ensure_datetime(s): if isinstance(s, datetime): return s elif isinstance(s, str): return datetime.strptime(s, '%Y-%m-%d') raise TypeError
[docs]def create_timeline(year_counter): """Completes the Counter timeline :param Counter year_counter: counter dict for each year :return: complete timeline :rtype: list[tuple[int,int]] """ if not year_counter: return [] until_year = datetime.now().year from_year = min(year_counter) timeline = [ (year, year_counter.get(year, 0)) for year in range(from_year, until_year) ] return timeline
[docs]def get_citation_years(graph): """Creates a citation timeline counter :param pybel.BELGraph graph: A BEL graph :rtype: list[tuple[int,int]] """ return create_timeline(count_citation_years(graph))