Интерфейсы плагинов для импорта открытых данных

Данная статья описывает базовые интерфейсы плагинов, импортирующих открытые данные.

Термины

Задание (job) - последовательность действий по загрузке массива открытых данных через протокол http, их обработки и загрузки в бд одного из сервисов Geo2Tag.

Импорт - перенос данных из внешней бд в бд одного из сервисов Geo2Tag.

Статус задания - набор параметров, описывающий текущее состояние задания: затраченное время, источник импорта, флаг завершенности задания, прочие параметры импорта.

REST интерфейсы

  • /<instance_prefix>/plugin/<plugin_name>/service/<serviceName>/job GET - список заданий импорта и их статусы.
  • /<instance_prefix>/plugin/<plugin_name>/service/<serviceName>/job POST - добавить задание импорта, параметры:
    • channelName, - название канала, куда будет произведен импорт данных;
    • openDataUrl,  - ссылка на скачивание набора данных;
  • /<instance_prefix>/plugin/<plugin_name>/service/<serviceName>/job/<job_id> GET - статус одного задания.
  • /<instance_prefix>/plugin/<plugin_name>/service/<serviceName>/job/<job_id> DELETE - остановить задание.

Программные интерфейсы

Классы

Легенда:

  • @should_be_extended_in_descendents - метод/свойство нужно расширить/переопределить в классе-потомке
  • @abstract_method - абстрактный метод, метод который нужно определить самостоятельно в классе-потомке

Job (абстрактный класс)

class Job():

    def __init__(
            self,
            backgroundFunction,
            channelName,
            openDataUrl,
            importDataDict,
            serviceName):
        self.thread = None
        self._id = self.generateId()
        self.startTime = datetime.now()
        self.done = False
        self.timeElapsed = None
        self.backgroundFunction = backgroundFunction
        self.channelName = channelName
        self.openDataUrl = openDataUrl
        self.importDataDict = importDataDict
        self.serviceName = serviceName

    @abstract_method
    def internalStart(self):
    @abstract_method
    def internalStop(self):

    def start(self):
        self.startTime = datetime.now()
        self.internalStart()

    def stop(self):
        self.internalStop()
        self.done = True
        self.timeElapsed = datetime.now() - self.startTime

    def getTimeStatistics(self):
        if self.timeElapsed is None:
            return datetime.now() - self.startTime
        return self.timeElapsed

    @should_be_extended_in_descendents
    def describe(self):
        return {
            '_id': self._id,
            'time': str(
                self.getTimeStatistics()),
            'done': self.done,
            'channelName': self.channelName,
            'openDataUrl': self.openDataUrl,
            'serviceName': self.serviceName}

    @classmethod
    def generateId(cls):
        return ''.join(
            random.choice(
                string.ascii_uppercase +
                string.ascii_lowercase +
                string.digits) for x in range(12))

OpenDataObjectsLoader (абстрактный класс)

class OpenDataObjectsLoader:

    def __init__(self, loadUrl):
        self.loadUrl = loadUrl

    @abstract_method
    def load(self):      


 

OpenDataToPointTranslator

class OpenDataToPointTranslator:

    def __init__(
            self, importDataDict,
            objectRepresentation,
            version,
            importSource,
            channelId):
        self.objectRepresentation = objectRepresentation
        self.version = version
        self.importSource = importSource
        self.channelId = channelId
        self.importDataDict = importDataDict
 
    @should_be_extended_in_descendents
    def getPointJson(self):
        obj = {}
        obj['version'] = self.version
        obj['import_source'] = self.importSource
        return obj
    @should_be_extended_in_descendents
    def getPoint(self):
        point = {'json': self.getPointJson()}
        point['channel_id'] = self.channelId
        return point

OpenDataObjectsParser (абстрактный класс)

class OpenDataObjectsParser:

    def __init__(self, data):
        self.data = data
    @abstract_method
    def parse(self):

OpenDataToPointsLoader

class OpenKareliaDataToPointsLoader:
    pointsArray = []

    def __init__(self, serviceName, points):
        self.pointsArray = points
        self.serviceName = serviceName

    def loadPoints(self):
        collection = getDbObject(self.serviceName)[POINTS]
        for point in self.pointsArray:
            collection.save(point)

JobManager - класс для управления задачами, запускает и останавливает задачи, выводит информацию об их статусе.

class JobManager:
    jobs = {}

    @classmethod
    def startJob(cls, job):
        jobId = job.describe().get('_id', '')
        job.start()
        cls.jobs[jobId] = job
        return jobId

    @classmethod
    def getJob(cls, jobId):
        return cls.jobs.get(jobId).describe()

    @classmethod
    def stopJob(cls, jobId):
        cls.jobs.get(jobId).stop()

    @classmethod
    def getJobs(cls):
        result = []
        for job in cls.jobs:
            result.append(cls.jobs[job].describe())
        return result

JobResource

class ODImportParser(): 
     @should_be_extended_in_descendents
     MANDATORY_FIELDS = [CHANNEL_NAME, OPEN_DATA_URL]
       

    @staticmethod
    def parse(self):
        args = loads(request.get_data())
        return args
 
    @staticmethod
    def validate(self, args):    
        for key in MANDATORY_FIELDS:
            if key not in args:
                raise BadRequest('{0} parameter is missing'.format(key))
            elif not isinstance(args[key], unicode):
                raise BadRequest('{0} value is not unicode'.format(key)) 
    @staticmethod
    def parsePostParameters():
        args = self.parse()
        self.validate(args)
        return args

JobListResourceFactory - фабричный метод для создания таких классов JobListResource, какие нам нужны.

def JobListResourceFactory(parserClass, jobClass, importFunction)
    class JobListResource(Resource):

        @possibleException
        def get(self, serviceName):
            getServiceIdByName(serviceName)
            return JobManager.getJobs()

        @possibleException
        def post(self, serviceName):
            importDataDict = parserClass.parsePostParameters()
            channelName = importDataDict.get('channelName')
            getServiceIdByName(serviceName)
            getChannelByName(serviceName, channelName)
            job = jobClass(importFunction, importDataDict.get('channelName'),
                           importDataDict.get('openDataUrl'), importDataDict,
                           serviceName)

        return JobManager.startJob(job)
 
    return JobListResource

 

Метод performImportActions

performImportActions ( odLoaderClass, odParserClass, odToPointTranslatorClass, odToPointsLoaderClass, serviceName, channelName, openDataUrl, showObjectUrl, showImageUrl)

Функция, несущая в себе логику импорта из источника открытых данных. 

def performImportActions ( odLoaderClass, odParserClass, \
    odToPointTranslatorClass, odToPointsLoaderClass, \
    serviceName, channelName, openDataUrl, \
    importDataDict):
#showObjectUrl, showImageUrl
    
    channelId = getChannelIdByName(channelName)
    version = datetime.now()
    
    loader = odLoaderClass(openDataUrl)
    openData = loader.load()
    parser = odParserClass(openData)
    objects = parser.parse()
    
    points = [ ]
    
    for object in objects:
	    translator = odToPointTranslatorClass(importDataDict, object, version,openDataUrl, channelId)
	    points.append(translator.getPoint())
	
    pointsLoader = odToPointsLoaderClass(serviceName, points)
    pointsLoader.loadPoints()