Интерфейсы плагинов для импорта открытых данных
Данная статья описывает базовые интерфейсы плагинов, импортирующих открытые данные.
Термины
Задание (job) - последовательность действий по загрузке массива открытых данных через протокол http, их обработки и загрузки в бд одного из сервисов Geo2Tag.
Импорт - перенос данных из внешней бд в бд одного из сервисов Geo2Tag.
Статус задания - набор параметров, описывающий текущее состояние задания: затраченное время, источник импорта, флаг завершенности задания, прочие параметры импорта.
REST интерфейсы
- /<instance_prefix>/plugin/<plugin_name>/service/<serviceName>/job GET - список заданий импорта и их статусы.
- /<instance_prefix>/plugin/<plugin_name>/service/<serviceName>/job POST - добавить задание импорта, параметры:
- channelName, - название канала, куда будет произведен импорт данных;
- openDataUrl, - ссылка на скачивание набора данных;
- /<instance_prefix>/plugin/<plugin_name>/service/<serviceName>/job/<job_id> GET - статус одного задания.
- /<instance_prefix>/plugin/<plugin_name>/service/<serviceName>/job/<job_id> DELETE - остановить задание.
Программные интерфейсы
Классы
Легенда:
- @should_be_extended_in_descendents - метод/свойство нужно расширить/переопределить в классе-потомке
@abstract_method - абстрактный метод, метод который нужно определить самостоятельно в классе-потомке
Job (абстрактный класс)
class Job(): def __init__( self, backgroundFunction, channelName, openDataUrl, importDataDict, serviceName): self.thread = None self._id = self.generateId() self.startTime = datetime.now() self.done = False self.timeElapsed = None self.backgroundFunction = backgroundFunction self.channelName = channelName self.openDataUrl = openDataUrl self.importDataDict = importDataDict self.serviceName = serviceName @abstract_method def internalStart(self): @abstract_method def internalStop(self): def start(self): self.startTime = datetime.now() self.internalStart() def stop(self): self.internalStop() self.done = True self.timeElapsed = datetime.now() - self.startTime def getTimeStatistics(self): if self.timeElapsed is None: return datetime.now() - self.startTime return self.timeElapsed @should_be_extended_in_descendents def describe(self): return { '_id': self._id, 'time': str( self.getTimeStatistics()), 'done': self.done, 'channelName': self.channelName, 'openDataUrl': self.openDataUrl, 'serviceName': self.serviceName} @classmethod def generateId(cls): return ''.join( random.choice( string.ascii_uppercase + string.ascii_lowercase + string.digits) for x in range(12))
OpenDataObjectsLoader (абстрактный класс)
class OpenDataObjectsLoader: def __init__(self, loadUrl): self.loadUrl = loadUrl @abstract_method def load(self):
OpenDataToPointTranslator
class OpenDataToPointTranslator: def __init__( self, importDataDict, objectRepresentation, version, importSource, channelId): self.objectRepresentation = objectRepresentation self.version = version self.importSource = importSource self.channelId = channelId self.importDataDict = importDataDict @should_be_extended_in_descendents def getPointJson(self): obj = {} obj['version'] = self.version obj['import_source'] = self.importSource return obj @should_be_extended_in_descendents def getPoint(self): point = {'json': self.getPointJson()} point['channel_id'] = self.channelId return point
OpenDataObjectsParser (абстрактный класс)
class OpenDataObjectsParser: def __init__(self, data): self.data = data @abstract_method def parse(self):
OpenDataToPointsLoader
class OpenKareliaDataToPointsLoader: pointsArray = [] def __init__(self, serviceName, points): self.pointsArray = points self.serviceName = serviceName def loadPoints(self): collection = getDbObject(self.serviceName)[POINTS] for point in self.pointsArray: collection.save(point)
JobManager - класс для управления задачами, запускает и останавливает задачи, выводит информацию об их статусе.
class JobManager: jobs = {} @classmethod def startJob(cls, job): jobId = job.describe().get('_id', '') job.start() cls.jobs[jobId] = job return jobId @classmethod def getJob(cls, jobId): return cls.jobs.get(jobId).describe() @classmethod def stopJob(cls, jobId): cls.jobs.get(jobId).stop() @classmethod def getJobs(cls): result = [] for job in cls.jobs: result.append(cls.jobs[job].describe()) return result
JobResource
class ODImportParser(): @should_be_extended_in_descendents MANDATORY_FIELDS = [CHANNEL_NAME, OPEN_DATA_URL] @staticmethod def parse(self): args = loads(request.get_data()) return args @staticmethod def validate(self, args): for key in MANDATORY_FIELDS: if key not in args: raise BadRequest('{0} parameter is missing'.format(key)) elif not isinstance(args[key], unicode): raise BadRequest('{0} value is not unicode'.format(key)) @staticmethod def parsePostParameters(): args = self.parse() self.validate(args) return args
JobListResourceFactory - фабричный метод для создания таких классов JobListResource, какие нам нужны.
def JobListResourceFactory(parserClass, jobClass, importFunction) class JobListResource(Resource): @possibleException def get(self, serviceName): getServiceIdByName(serviceName) return JobManager.getJobs() @possibleException def post(self, serviceName): importDataDict = parserClass.parsePostParameters() channelName = importDataDict.get('channelName') getServiceIdByName(serviceName) getChannelByName(serviceName, channelName) job = jobClass(importFunction, importDataDict.get('channelName'), importDataDict.get('openDataUrl'), importDataDict, serviceName) return JobManager.startJob(job) return JobListResource
Метод performImportActions
performImportActions ( odLoaderClass, odParserClass, odToPointTranslatorClass, odToPointsLoaderClass, serviceName, channelName, openDataUrl, showObjectUrl, showImageUrl)
Функция, несущая в себе логику импорта из источника открытых данных.
def performImportActions ( odLoaderClass, odParserClass, \ odToPointTranslatorClass, odToPointsLoaderClass, \ serviceName, channelName, openDataUrl, \ importDataDict): #showObjectUrl, showImageUrl channelId = getChannelIdByName(channelName) version = datetime.now() loader = odLoaderClass(openDataUrl) openData = loader.load() parser = odParserClass(openData) objects = parser.parse() points = [ ] for object in objects: translator = odToPointTranslatorClass(importDataDict, object, version,openDataUrl, channelId) points.append(translator.getPoint()) pointsLoader = odToPointsLoaderClass(serviceName, points) pointsLoader.loadPoints()