双队列调度系统提升数据处理效率
基础适配器Adapter核心功能解析
轻易云数据集成平台的基础适配器Adapter作为数据处理中枢,通过标准化方法实现跨系统数据交互。以下是关键功能的深度解析:
日志记录标准化操作
平台采用统一日志管理机制,通过$this->getLogStorage()->insertOne([$content], $status)
实现分级记录。错误日志需配合LogStatus
枚举类,例如:
$this->getLogStorage()->insertOne(
[$content],
LogStatus::RECORD
);
智能数据获取机制
平台通过元数据驱动模式动态获取数据,自动识别metaData
中的operation
参数:
$operation = $this->metaData['operation'] ?? null;
$data = $this->getDataStorage()->fetch($operation);
该设计支持动态配置变更,无需修改代码即可适配不同业务场景。
参数自动生成引擎
平台内置智能参数转换器,通过generateRequestParams
方法自动适配不同接口规范:
try {
$request = $this->generateRequestParams($data);
} catch (\Throwable $th) {
$this->logError(LogMessage::DISPATCH_SOURCE_FAIL, $th, $data);
return $this->dispatch();
}
异常处理机制自动触发日志记录和状态更新,确保流程连续性。
双队列调度系统
平台采用源平台/目标平台双队列架构:
-
源任务队列:
$jobId = $this->getAsynSourceJobStorage()->insertOne( $this->metaData['api'], $request ); $this->asynSourceJob($time, $jobId);
-
目标任务队列:
$jobId = $this->getAsynTargetJobStorage()->insertOne( $this->metaData['api'], $request, $this->getDataStorage()->ids, $this->getDataStorage()->dataRange ); $this->asynTargetJob(round($this->asynTimes), $jobId);
状态全生命周期管理
平台提供完整的状态控制链:
// 标记队列状态
$this->getDataStorage()->setFetchStatus(
DataStatus::QUEUE,
null,
null,
new \MongoDB\BSON\ObjectId($jobId)
);
// 数据持久化
$this->getDataStorage()->insertOne(
$id,
$number,
$response,
false,
$jobId
);
智能容错机制
平台内置三级容错策略:
- 自动重试:
$this->reQueue()
- 异常捕获:
public function handleError($response, $jobId = null) { $throw = new HuidinhuoThrowable($this); $throw->handle($jobId, $response); $this->updateJobStatus($jobId, DataStatus::ERROR, $response); $this->logError(LogMessage::INVOKE_FAIL, $response); }
- 状态回滚:自动重置失败任务状态
该架构设计充分体现轻易云平台在数据处理领域的三大优势:
- 标准化接口:统一的操作方法降低学习成本
- 自动化流程:智能状态管理减少人工干预
- 高可靠性:多层容错保障数据完整性