Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import { Observable } from 'rxjs/Observable';
- import { Subscriber } from 'rxjs/Subscriber';
- import { readFile } from 'fs';
- import * as program from 'commander';
- import {
- createConnection,
- Connection,
- MysqlError,
- // format as mysqlFormat
- } from 'mysql';
- // var Rx = require('rxjs');
- // var request = require('request');
- // var progress = require('request-progress');
- declare var __VERSION__: string;
- export function generateObservableOperator<I, O>(callback: (subscriber: Subscriber<O>, input: I) => void): (source: Observable<I>) => Observable<O> {
- return (source: Observable<I>): Observable<O> => {
- return new Observable<O>((subscriber: Subscriber<O>) => {
- source.subscribe({
- next: input => callback(subscriber, input),
- error: error => subscriber.error(error),
- complete: () => subscriber.complete(),
- });
- });
- };
- }
- export function parseCliArgs(subscriber: Subscriber<string>, argv: string[]) {
- program
- .version(__VERSION__)
- .option('-f, --file <required>', 'le nom du fichier de config.json à importer')
- .parse(argv);
- if (program['file'] === undefined) {
- program.outputHelp();
- subscriber.error('Please specify the config file');
- }
- else {
- subscriber.next(program.file);
- }
- }
- export function loadConfigFile(subscriber: Subscriber<any>, filename: string) {
- readFile(filename, 'utf8', function (err, contents) {
- if (err || !contents) {
- console.log('Error during reading of config file' + err);
- console.log('There\'s a problem with your config file : ' + program.file);
- subscriber.error();
- }
- else {
- try {
- let config = JSON.parse(contents);
- subscriber.next(config);
- }
- catch (error) {
- subscriber.error();
- }
- }
- });
- }
- // Connect to the mySQL server.
- export function connectToMySQL(subscriber: Subscriber<Connection>, host: string, user: string, password: string, database: string) {
- let connection: Connection = createConnection({
- host: host,
- user: user,
- password: password,
- database: database,
- });
- connection.connect(function (err: MysqlError) {
- if (err) {
- console.error('Error connecting to the database : ' + err.stack);
- subscriber.error();
- }
- else {
- subscriber.next(connection);
- }
- });
- }
- // Get the last update timestamp.
- export function getLastUpdateDate(subscriber: Subscriber<number>, connection: Connection) {
- var query = 'SELECT * FROM file_log ORDER BY date DESC LIMIT 1';
- connection.query(query, (error: MysqlError, results: any) => {
- if (error) {
- subscriber.error();
- return connection.rollback(function () {
- throw error;
- });
- }
- else {
- if (results && results.date) {
- return subscriber.next(results.date);
- }
- }
- });
- }
- // Get the list of urls from the opendata.gouv.fr webservice.
- // Register those urls in the database.
- // export function getRessourcesUrls(connection: Connection, config: any, timestamp: number) {
- // var configDB = config.dev;
- // var query: string = '';
- // connection.beginTransaction((err: MysqlError) => {
- // if (err) {
- // console.log('Error during transaction' + err);
- // throw err;
- // }
- // // OBSERVABLE CREATE.
- // var observable = Rx.Observable.create((subscriber: any) => {
- // // Connexion to the webservice.
- // var options = {
- // url: config.openData.webServiceUrl + config.openData.datasetId,
- // followAllRedirects: true,
- // headers: {
- // 'User-Agent': 'request',
- // 'X-API-KEY': config.openData.apiKey,
- // accept: '*/*'
- // }
- // };
- // request(options, (error: any, response: any, body: any) => {
- // console.log('Getting Urls : START');
- // if (!error && response.statusCode == 200) {
- // var info = JSON.parse(body);
- // info.resources.forEach((item: any) => {
- // let date = new Date(item.published);
- // let published = date.getTime();
- // // We only process the file that are labeled update and are more recent that the timestamp given in parameters.
- // if (published > timestamp
- // && item.format == 'zip'
- // && item.title.match(/Sirene : mise à jour quotidienne du/g)) {
- // item.date = published / 1000;
- // subscriber.next(item);
- // }
- // });
- // subscriber.complete();
- // }
- // });
- // });
- // let urls: any[] = [];
- // // We process the files in bulk.
- // var bufferedObservable = observable.bufferCount(configDB.size);
- // bufferedObservable.subscribe({
- // next: (items: any) => {
- // var values: any[] = [];
- // var count = 0;
- // for (let item of items) {
- // values[count++] = [item.date, item.url];
- // urls[count] = item.url;
- // }
- // // we fill the database with the fetched data.
- // query = queryInitiateInsert(configDB.tableFile);
- // query = mysqlFormat(query, [['date', 'url'], values]);
- // connection.query(query, (error: MysqlError) => {
- // if (error) {
- // return connection.rollback(function () {
- // throw error;
- // });
- // }
- // });
- // },
- // error: (err: any) => {
- // return connection.rollback(function () {
- // console.log('Transaction error');
- // throw err;
- // });
- // },
- // complete: () => {
- // console.log('Getting Urls : FINISH');
- // connection.commit((err: MysqlError) => {
- // if (err) {
- // return connection.rollback(function () {
- // console.log('Transaction error');
- // throw err;
- // });
- // }
- // // Now we get the files referenced by the urls.
- // getExternalFiles(connection, config, timestamp);
- // });
- // }
- // });
- // });
- // }
Add Comment
Please, Sign In to add comment