Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package it.dlms.connectors.mongoDb.manager;
- import static com.mongodb.client.model.Filters.eq;
- import java.io.Serializable;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.Comparator;
- import java.util.HashMap;
- import java.util.LinkedHashMap;
- import java.util.LinkedList;
- import java.util.List;
- import java.util.Map;
- import java.util.Map.Entry;
- import java.util.regex.Pattern;
- import java.util.Set;
- import org.bson.Document;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import com.mongodb.BasicDBObject;
- import com.mongodb.DBCursor;
- import com.mongodb.DBObject;
- import com.mongodb.MongoClient;
- import com.mongodb.client.FindIterable;
- import com.mongodb.client.MongoCollection;
- import com.mongodb.client.MongoCursor;
- import com.mongodb.client.MongoDatabase;
- import it.dlms.metadataCatalog.MetadataManager;
- import it.dlms.metadataCatalog.metadata.DatasetProfile;
- import it.dlms.metadataCatalog.metadata.GlobalDatasetMetadata;
- import scala.Tuple2;
- public class MongoDBGlobalMetadataConnector implements Serializable {
- private final static Logger log = LoggerFactory.getLogger(MetadataManager.class);
- private static final long serialVersionUID = 1L;
- private static MongoDBGlobalMetadataConnector instance;
- private static MongoClient MONGOCLIENT;
- private static MongoDatabase MONGODATABASE;
- private static MongoDatabase GDMENTITYDATABASE;
- private static MongoDatabase GDMONTOLOGYDATABASE;
- private static MongoCollection<Document> MONGOCOLLECTION;
- private static MongoCollection<Document> ENTITIESMONGOCOLLECTION;
- private static MongoCollection<Document> ONTOLOGIESMONGOCOLLECTION;
- public static synchronized MongoDBGlobalMetadataConnector getInstance() {
- if(instance == null) {
- instance = new MongoDBGlobalMetadataConnector();
- }
- return instance;
- }
- private MongoDBGlobalMetadataConnector (){
- if(MONGOCLIENT == null) {
- MONGOCLIENT = new MongoClient( "localhost" , 27017 );
- }
- MONGODATABASE = MONGOCLIENT.getDatabase("MetaLakeTest");//.getDatabase("MetaLake");
- GDMENTITYDATABASE = MONGOCLIENT.getDatabase("entities");
- GDMONTOLOGYDATABASE = MONGOCLIENT.getDatabase("ontologies");
- MONGOCOLLECTION = MONGODATABASE.getCollection("GlobalMetadata");
- ENTITIESMONGOCOLLECTION = MONGODATABASE.getCollection("entities");
- ONTOLOGIESMONGOCOLLECTION = MONGODATABASE.getCollection("ontologies");
- }
- public static MongoCollection<Document> getMONGOCOLLECTION() {
- return MONGOCOLLECTION;
- }
- public static MongoCollection<Document> getENTITIESMONGOCOLLECTION() {
- return ENTITIESMONGOCOLLECTION;
- }
- public static MongoCollection<Document> getONTOLOGIESMONGOCOLLECTION() {
- return ONTOLOGIESMONGOCOLLECTION;
- }
- public static MongoDatabase getGDMENTITYDATABASE() {
- return GDMENTITYDATABASE;
- }
- public static MongoDatabase getGDMONTOLOGYDATABASE() {
- return GDMONTOLOGYDATABASE;
- }
- public static void resetConnection() {
- MONGOCLIENT.close();
- MONGOCLIENT = new MongoClient( "localhost" , 27017 );
- MONGODATABASE = MONGOCLIENT.getDatabase("MetaLakeTest");//.getDatabase("MetaLake");
- GDMENTITYDATABASE = MONGOCLIENT.getDatabase("entities");
- GDMONTOLOGYDATABASE = MONGOCLIENT.getDatabase("ontologies");
- MONGOCOLLECTION = MONGODATABASE.getCollection("GlobalMetadata");
- ENTITIESMONGOCOLLECTION = MONGODATABASE.getCollection("entities");
- ONTOLOGIESMONGOCOLLECTION = MONGODATABASE.getCollection("ontologies");
- }
- public void resetGlobalMetadataMongoDBCollection() {
- MongoDBGlobalMetadataConnector.getMONGOCOLLECTION().drop();
- MongoDBGlobalMetadataConnector.getENTITIESMONGOCOLLECTION().drop();
- MongoDBGlobalMetadataConnector.getONTOLOGIESMONGOCOLLECTION().drop();
- MongoDBGlobalMetadataConnector.getGDMENTITYDATABASE().drop();
- MongoDBGlobalMetadataConnector.getGDMONTOLOGYDATABASE().drop();
- }
- public void removeDocumentFromEntities(String fileName, Map<String, Map<String, Long>> frequencyOfTopFrequentItems) {
- MongoCollection<Document> document = getENTITIESMONGOCOLLECTION();
- log.info("filename: " + fileName);
- for (Entry<String, Map<String, Long>> entry : frequencyOfTopFrequentItems.entrySet()) {
- String columnName = entry.getKey();
- String datasetColumnKey = fileName.replace(".", " ") + "|%#-#%|" + columnName;
- Set<String> entriesToRemove = entry.getValue().keySet();
- for (String entryToRemove : entriesToRemove) {
- BasicDBObject searchQuery = new BasicDBObject();
- searchQuery.put("entity", entryToRemove);
- FindIterable<Document> cursor = document.find(searchQuery);
- if (cursor.first() != null) {
- Map<String, Integer> datasetColumnsEntries = (Map<String, Integer>) cursor.first().get("map");
- log.info("dck: " + datasetColumnKey);
- datasetColumnsEntries.remove(datasetColumnKey);
- if (datasetColumnsEntries.size() == 0) {
- document.findOneAndDelete(searchQuery);
- }
- else {
- Document newDocument = new Document();
- newDocument.put("entity", entryToRemove);
- newDocument.put("map", datasetColumnsEntries);
- document.findOneAndReplace(searchQuery, newDocument);
- }
- }
- }
- }
- }
- public void removeElementsFromEntitiesMap(String fileName, Map<String, Map<String, Long>> frequencyOfTopFrequentItems) {
- MongoCollection<Document> doc = getMONGOCOLLECTION();
- log.info("filename: " + fileName);
- GlobalDatasetMetadata gdm = new GlobalDatasetMetadata(doc.find().first());
- Map<String, Map<String, Integer>> entitiesMap = gdm.getEntitiesMap();
- for (Entry<String, Map<String, Long>> entry : frequencyOfTopFrequentItems.entrySet()) {
- String columnName = entry.getKey();
- String datasetColumnKey = fileName.replace(".", " ") + "|%#-#%|" + columnName;
- Set<String> entriesToRemove = entry.getValue().keySet();
- for (String entryToRemove : entriesToRemove) {
- if (entitiesMap.containsKey(entryToRemove)) {
- Map<String, Integer> datasetColumnsEntries = entitiesMap.get(entryToRemove);
- log.info("dck: " + datasetColumnKey);
- if (datasetColumnsEntries.containsKey(datasetColumnKey)) {
- datasetColumnsEntries.remove(datasetColumnKey);
- if (datasetColumnsEntries.size() == 0) {
- entitiesMap.remove(entryToRemove);
- }
- else {
- entitiesMap.put(entryToRemove, datasetColumnsEntries);
- }
- }
- }
- }
- }
- gdm.setEntitiesMap(entitiesMap);
- Document updatedDoc = gdm.toMongoParser();
- MongoDBGlobalMetadataConnector.replaceGlobalMetadata(updatedDoc, doc);
- }
- public void removeElementsFromOntologiesMap(String fileName,
- Map<String, Map<String, Integer>> ontologiesToRemoveMap) {
- MongoCollection<Document> doc = getMONGOCOLLECTION();
- GlobalDatasetMetadata gdm = new GlobalDatasetMetadata(doc.find().first());
- Map<String, Map<String, Integer>> ontologiesMap = gdm.getOntologiesMap();
- for (Entry<String, Map<String, Integer>> entry : ontologiesToRemoveMap.entrySet()) {
- String columnName = entry.getKey();
- String datasetColumnKey = fileName.replace("."," ") + "|%#-#%|" + columnName;
- Set<String> ontologiesToRemove = entry.getValue().keySet();
- for (String entryToRemove : ontologiesToRemove) {
- if (ontologiesMap.containsKey(entryToRemove)) {
- Map<String, Integer> datasetColumnsEntries = ontologiesMap.get(entryToRemove);
- log.info("dck: " + datasetColumnKey);
- if (datasetColumnsEntries.containsKey(datasetColumnKey)) {
- datasetColumnsEntries.remove(datasetColumnKey);
- if (datasetColumnsEntries.size() == 0) {
- ontologiesMap.remove(entryToRemove);
- }
- else {
- ontologiesMap.put(entryToRemove, datasetColumnsEntries);
- }
- }
- }
- }
- }
- gdm.setOntologiesMap(ontologiesMap);
- Document updatedDoc = gdm.toMongoParser();
- MongoDBGlobalMetadataConnector.replaceGlobalMetadata(updatedDoc, doc);
- }
- public static void replaceGlobalMetadata(Document globalProfileDoc,
- MongoCollection<Document> globalMetadataCollection) {
- globalMetadataCollection.replaceOne(globalMetadataCollection.find().first(), globalProfileDoc);
- log.info("Global document replaced successfully");
- }
- public static void insertEntitiesInGDM(Map<String, Map<String, Long>> frequencyOfTopFrequentItems,
- String fileName, long rowsCount) {
- int count = 0;
- Map<String, List<Document>> docMap = new HashMap<>();
- //List<Tuple2<String, Document>> docList = new LinkedList<Tuple2<String, Document>>();
- for (Entry<String, Map<String, Long>> entry : frequencyOfTopFrequentItems.entrySet()) {
- String columnName = entry.getKey();
- String datasetColumnKey = fileName.replace(".",",") + "|%#-#%|" + columnName;
- Map<String, Long> topFrequentMap = entry.getValue();
- count += topFrequentMap.size();
- List<Document> docList = new LinkedList<>();
- for (Entry<String, Long> topFrequentEntry : topFrequentMap.entrySet()) {
- Integer topFrequentCount = topFrequentMap.entrySet().size();
- String entityName = topFrequentEntry.getKey();
- Long entityOccurrencies = topFrequentEntry.getValue();
- Document datasetAndColumnDocument = new Document("datasetAndColumn", datasetColumnKey);
- Document details = new Document("occurrencies", entityOccurrencies)
- .append("topFrequentCount", topFrequentCount);
- datasetAndColumnDocument.append("details", details);
- docMap.putIfAbsent(entityName, new LinkedList<>());
- docMap.get(entityName).add(details);
- //docList.add(new Tuple2<>(entityName, datasetAndColumnDocument));
- }
- }
- log.info("Inserimento in mongo per : " + fileName + " di " + Integer.toString(docMap.size()) + " entita " + count);
- int i = 0;
- for (Entry<String, List<Document>> entry : docMap.entrySet()) {
- i++;
- log.info("Entita numero : " + Integer.toString(i));
- if (i > 200) {
- resetConnection();
- i = 0;
- }
- String entityName = entry.getKey();
- List<Document> listDocs = entry.getValue();
- GDMENTITYDATABASE.getCollection(entityName).insertMany(listDocs);
- //MongoCollection<Document> entityCollection = GDMENTITYDATABASE.getCollection(element._1());
- //entityCollection.insertMany(element._2());
- }
- }
- public static void insertOntologiesInGDM(Map<String, Map<String, Integer>> ontologyMap,
- String fileName) {
- for (Entry<String, Map<String, Integer>> entry : ontologyMap.entrySet()) {
- String columnName = entry.getKey();
- String datasetColumnKey = fileName.replace("."," ") + "|%#-#%|" + columnName;
- Map<String, Integer> datasetOntologiesMap = entry.getValue();
- for (Entry<String, Integer> ontologyEntry : datasetOntologiesMap.entrySet()) {
- String ontologyName = ontologyEntry.getKey();
- Integer ontologyOccurrencies = ontologyEntry.getValue();
- MongoCollection<Document> ontologyCollection = GDMONTOLOGYDATABASE.getCollection(ontologyName);
- Document datasetAndColumnDocument = new Document("datasetAndColumn", datasetColumnKey);
- Document details = new Document("occurrencies", ontologyOccurrencies);
- datasetAndColumnDocument.append("details", details);
- ontologyCollection.insertOne(datasetAndColumnDocument);
- }
- }
- }
- public void removeEntitiesFromGDM(String fileName, Map<String, Map<String, Long>> frequencyOfTopFrequentItems) {
- for (Entry<String, Map<String, Long>> entry : frequencyOfTopFrequentItems.entrySet()) {
- String columnName = entry.getKey();
- String datasetColumnKey = fileName.replace(".", " ") + "|%#-#%|" + columnName;
- Set<String> entriesToRemove = entry.getValue().keySet();
- for (String entryToRemove : entriesToRemove) {
- MongoCollection<Document> entityCollection = MONGOCLIENT.getDatabase("entities").getCollection(entryToRemove);
- entityCollection.findOneAndDelete(eq("datasetAndColumn", datasetColumnKey));
- }
- }
- }
- public void removeOntologiesFromGDM(String fileName, Map<String, Map<String, Integer>> ontologiesToRemoveMap) {
- for (Entry<String, Map<String, Integer>> entry : ontologiesToRemoveMap.entrySet()) {
- String columnName = entry.getKey();
- String datasetColumnKey = fileName.replace("."," ") + "|%#-#%|" + columnName;
- Set<String> ontologiesToRemove = entry.getValue().keySet();
- for (String entryToRemove : ontologiesToRemove) {
- MongoCollection<Document> ontologyCollection = GDMONTOLOGYDATABASE.getCollection(entryToRemove);
- ontologyCollection.findOneAndDelete(eq("datasetAndColumn", datasetColumnKey));
- }
- }
- }
- public static Map<String, Map<String, Double>> findEntitiesMatchingInGDM(DatasetProfile profile) {
- String fileName = profile.getFileName().replace(".csv","");
- Map<String, Map<String, Double>> matchingDatasets = new LinkedHashMap<>();
- Map<String, Map<String, Long>> frequencyOfTopFrequentItems = profile.getContentMetadata().getFrequencyOfTopFrequentItems();
- for (Entry<String, Map<String, Long>> frequencyOfTopFrequentItemsEntry : frequencyOfTopFrequentItems.entrySet()) {
- String columnName = frequencyOfTopFrequentItemsEntry.getKey();
- matchingDatasets.put(columnName, new LinkedHashMap<>());
- for (Entry<String, Long> singleItemEntry : frequencyOfTopFrequentItemsEntry.getValue().entrySet()) {
- Double topFrequentCount = (double) frequencyOfTopFrequentItemsEntry.getValue().size();
- MongoCollection<Document> entityCollection = MONGOCLIENT.getDatabase("entities").getCollection(singleItemEntry.getKey());
- MongoCursor<Document> cursor = entityCollection.find().iterator();
- try {
- while (cursor.hasNext()) {
- Document doc = cursor.next();
- String key = (String) doc.get("datasetAndColumn");
- Document datasetDetails = (Document) doc.get("details");
- Long occurrencies = (Long) datasetDetails.get("occurrencies");
- Integer occurrencyTopFrequentCount = (Integer) datasetDetails.get("topFrequentCount");
- if (!key.split(Pattern.quote("|%#-#%|"))[0].equals(fileName)) {
- Double value = 2*1/(topFrequentCount + (double) occurrencyTopFrequentCount);
- Map<String, Double> matchingMap = matchingDatasets.get(columnName);
- if (matchingMap.containsKey(key)) {
- Double prevValue = matchingMap.get(key);
- prevValue = prevValue + value;
- matchingMap.put(key, prevValue);
- }
- else {
- matchingMap.put(key, value);
- }
- matchingDatasets.put(columnName, matchingMap);
- }
- }
- } finally {
- cursor.close();
- }
- }
- }
- //sorting column's map
- for (Map.Entry<String, Map<String, Double>> entry : matchingDatasets.entrySet()) {
- Map<String, Double> unsortedMap = entry.getValue();
- List<Map.Entry<String, Double>> freqList = new ArrayList<>(unsortedMap.entrySet());
- Comparator<Map.Entry<String, Double>> freqComparator =
- (entry1, entry2) -> entry1.getValue().compareTo(entry2.getValue());
- Collections.sort(freqList, freqComparator.reversed());
- Map<String, Double> sortedMap = new LinkedHashMap<>();
- for(Entry<String, Double> sortedEntry : freqList) {
- sortedMap.put(sortedEntry.getKey(), sortedEntry.getValue());
- }
- matchingDatasets.put(entry.getKey(), sortedMap);
- }
- return matchingDatasets;
- }
- public static Map<String, Map<String, Map<String, Long>>> findOntologiesMatchingInGDM(DatasetProfile profile) {
- String fileName = profile.getFileName().replace(".csv","");
- Map<String, Map<String, Map<String, Long>>> matchingDatasets = new LinkedHashMap<>();
- if (profile.getOntologyMetadata() == null) {
- log.info("Couldn't find ontology metadata for the given dataset " + fileName);
- return null;
- } else {
- Map<String, Map<String, Integer>> ontologiesMap = profile.getOntologyMetadata().getOntologies();
- for (Entry<String, Map<String, Integer>> ontologiesMapEntry : ontologiesMap.entrySet()) {
- String columnName = ontologiesMapEntry.getKey();
- matchingDatasets.put(columnName, new LinkedHashMap<>());
- for (Entry<String, Integer> singleOntologyEntry : ontologiesMapEntry.getValue().entrySet()) {
- MongoCollection<Document> ontologyCollection = GDMONTOLOGYDATABASE.getCollection(singleOntologyEntry.getKey());
- MongoCursor<Document> cursor = ontologyCollection.find().iterator();
- try {
- while (cursor.hasNext()) {
- Document doc = cursor.next();
- String key = (String) doc.get("datasetAndColumn");
- Document datasetDetails = (Document) doc.get("details");
- Long occurrencies = (Long) datasetDetails.get("occurrencies");
- if (!key.split(Pattern.quote("|%#-#%|"))[0].equals(fileName)) {
- Long value = (long) occurrencies + (long) singleOntologyEntry.getValue();
- Map<String, Map<String, Long>> matchingMap = matchingDatasets.get(columnName);
- if (matchingMap.containsKey(key)) {
- if (matchingMap.get(key).containsKey(singleOntologyEntry.getKey())) {
- Long prevValue = matchingMap.get(key).get(singleOntologyEntry.getKey());
- prevValue = prevValue + value;
- matchingMap.get(key).put(singleOntologyEntry.getKey(), prevValue);
- }
- else {
- matchingMap.get(key).put(singleOntologyEntry.getKey(), value);
- }
- }
- else {
- Map<String, Long> newMapOfEntologies = new LinkedHashMap<>();
- newMapOfEntologies.put(singleOntologyEntry.getKey(), value);
- matchingMap.put(key, newMapOfEntologies);
- }
- matchingDatasets.put(columnName, matchingMap);
- }
- }
- } finally {
- cursor.close();
- }
- }
- }
- }
- //sorting column's map
- for (Entry<String, Map<String, Map<String, Long>>> entry : matchingDatasets.entrySet()) {
- Map<String, Map<String, Long>> columnMatchings = entry.getValue();
- for (Entry<String, Map<String, Long>> matchings : columnMatchings.entrySet()) {
- Map<String, Long> unsortedMap = matchings.getValue();
- List<Map.Entry<String, Long>> ontologiesList = new ArrayList<>(unsortedMap.entrySet());
- Comparator<Map.Entry<String, Long>> freqComparator =
- (entry1, entry2) -> entry1.getValue().compareTo(entry2.getValue());
- Collections.sort(ontologiesList, freqComparator.reversed());
- Map<String, Long> sortedMap = new LinkedHashMap<>();
- for(Entry<String, Long> sortedEntry : ontologiesList) {
- sortedMap.put(sortedEntry.getKey(), sortedEntry.getValue());
- }
- columnMatchings.put(matchings.getKey(), sortedMap);
- }
- matchingDatasets.put(entry.getKey(), columnMatchings);
- }
- return matchingDatasets;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement