||
- <?php
- namespace MailPoet\Tasks;
- if (!defined('ABSPATH')) exit;
- use MailPoet\Logging\LoggerFactory;
- use MailPoet\Models\ScheduledTask;
- use MailPoet\Models\ScheduledTaskSubscriber;
- use MailPoet\Models\SendingQueue;
- use MailPoet\Util\Helpers;
- use MailPoet\WP\Functions as WPFunctions;
- use MailPoetVendor\Carbon\Carbon;
- /**
- * A facade class containing all necessary models to work with a sending queue
- * @property string|null $status
- * @property int $taskId
- * @property int $id
- * @property int $newsletterId
- * @property string $newsletterRenderedSubject
- * @property string|array $newsletterRenderedBody
- * @property bool $nonExistentColumn
- * @property string $scheduledAt
- * @property int $priority
- */
- class Sending {
- const TASK_TYPE = 'sending';
- const RESULT_BATCH_SIZE = 5;
- /** @var ScheduledTask */
- private $task;
- /** @var SendingQueue */
- private $queue;
- /** @var Subscribers */
- private $taskSubscribers;
- private $queueFields = [
- 'id',
- 'task_id',
- 'newsletter_id',
- 'newsletter_rendered_subject',
- 'newsletter_rendered_body',
- 'count_total',
- 'count_processed',
- 'count_to_process',
- 'meta',
- ];
- private $commonFields = [
- 'created_at',
- 'updated_at',
- 'deleted_at',
- ];
- private function __construct(
- ScheduledTask $task = null,
- SendingQueue $queue = null
- ) {
- if (!$task instanceof ScheduledTask) {
- $task = ScheduledTask::create();
- $task->type = self::TASK_TYPE;
- $task->save();
- }
- if (!$queue instanceof SendingQueue) {
- $queue = SendingQueue::create();
- $queue->newsletterId = 0;
- $queue->taskId = $task->id;
- $queue->save();
- }
- if ($task->type !== self::TASK_TYPE) {
- throw new \Exception('Only tasks of type "' . self::TASK_TYPE . '" are accepted by this class');
- }
- $this->task = $task;
- $this->queue = $queue;
- $this->taskSubscribers = new Subscribers($task);
- }
- public static function create(ScheduledTask $task = null, SendingQueue $queue = null) {
- return new self($task, $queue);
- }
- public static function createManyFromTasks($tasks) {
- if (empty($tasks)) {
- return [];
- }
- $tasksIds = array_map(function($task) {
- return $task->id;
- }, $tasks);
- $queues = SendingQueue::whereIn('task_id', $tasksIds)->findMany();
- $queuesIndex = [];
- foreach ($queues as $queue) {
- $queuesIndex[$queue->taskId] = $queue;
- }
- $result = [];
- foreach ($tasks as $task) {
- if (!empty($queuesIndex[$task->id])) {
- $result[] = self::create($task, $queuesIndex[$task->id]);
- } else {
- static::handleInvalidTask($task);
- }
- }
- return $result;
- }
- public static function handleInvalidTask(ScheduledTask $task) {
- $loggerFactory = LoggerFactory::getInstance();
- $loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addError(
- 'invalid sending task found',
- ['task_id' => $task->id]
- );
- $task->status = ScheduledTask::STATUS_INVALID;
- $task->save();
- }
- public static function createFromScheduledTask(ScheduledTask $task) {
- $queue = SendingQueue::where('task_id', $task->id)->findOne();
- if (!$queue) {
- return false;
- }
- return self::create($task, $queue);
- }
- public static function createFromQueue(SendingQueue $queue) {
- $task = $queue->task()->findOne();
- if (!$task) {
- return false;
- }
- return self::create($task, $queue);
- }
- public static function getByNewsletterId($newsletterId) {
- $queue = SendingQueue::where('newsletter_id', $newsletterId)
- ->orderByDesc('updated_at')
- ->findOne();
- if (!$queue instanceof SendingQueue) {
- return false;
- }
- return self::createFromQueue($queue);
- }
- public function asArray() {
- $queue = array_intersect_key(
- $this->queue->asArray(),
- array_flip($this->queueFields)
- );
- $task = $this->task->asArray();
- return array_merge($task, $queue);
- }
- public function getErrors() {
- $queueErrors = $this->queue->getErrors();
- $taskErrors = $this->task->getErrors();
- if (empty($queueErrors) && empty($taskErrors)) {
- return false;
- }
- return array_merge((array)$queueErrors, (array)$taskErrors);
- }
- public function save() {
- $this->task->save();
- $this->queue->save();
- $errors = $this->getErrors();
- if ($errors) {
- $loggerFactory = LoggerFactory::getInstance();
- $loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addError(
- 'error saving sending task',
- ['task_id' => $this->task->id, 'queue_id' => $this->queue->id, 'errors' => $errors]
- );
- }
- return $this;
- }
- public function delete() {
- $this->taskSubscribers->removeAllSubscribers();
- $this->task->delete();
- $this->queue->delete();
- }
- public function queue() {
- return $this->queue;
- }
- public function task() {
- return $this->task;
- }
- public function taskSubscribers() {
- return $this->taskSubscribers;
- }
- public function getSubscribers($processed = null) {
- $subscribers = $this->taskSubscribers->getSubscribers();
- if (!is_null($processed)) {
- $status = ($processed) ? ScheduledTaskSubscriber::STATUS_PROCESSED : ScheduledTaskSubscriber::STATUS_UNPROCESSED;
- $subscribers->where('processed', $status);
- }
- $subscribers = $subscribers->findArray();
- return array_column($subscribers, 'subscriber_id');
- }
- public function setSubscribers(array $subscriberIds) {
- $this->taskSubscribers->setSubscribers($subscriberIds);
- $this->updateCount();
- }
- public function removeSubscribers(array $subscriberIds) {
- $this->taskSubscribers->removeSubscribers($subscriberIds);
- $this->updateCount();
- }
- public function removeAllSubscribers() {
- $this->taskSubscribers->removeAllSubscribers();
- $this->updateCount();
- }
- public function updateProcessedSubscribers(array $processedSubscribers) {
- $this->taskSubscribers->updateProcessedSubscribers($processedSubscribers);
- return $this->updateCount()->getErrors() === false;
- }
- public function saveSubscriberError($subcriberId, $errorMessage) {
- $this->taskSubscribers->saveSubscriberError($subcriberId, $errorMessage);
- return $this->updateCount()->getErrors() === false;
- }
- public function updateCount() {
- $this->queue->countProcessed = ScheduledTaskSubscriber::getProcessedCount($this->task->id);
- $this->queue->countToProcess = ScheduledTaskSubscriber::getUnprocessedCount($this->task->id);
- $this->queue->countTotal = $this->queue->countProcessed + $this->queue->countToProcess;
- return $this->queue->save();
- }
- public function hydrate(array $data) {
- foreach ($data as $k => $v) {
- $this->__set($k, $v);
- }
- }
- public function validate() {
- return $this->queue->validate() && $this->task->validate();
- }
- public function getMeta() {
- return $this->queue->getMeta();
- }
- public function __isset($prop) {
- $prop = Helpers::camelCaseToUnderscore($prop);
- if ($this->isQueueProperty($prop)) {
- return isset($this->queue->$prop);
- } else {
- return isset($this->task->$prop);
- }
- }
- public function __get($prop) {
- $prop = Helpers::camelCaseToUnderscore($prop);
- if ($this->isQueueProperty($prop)) {
- return $this->queue->$prop;
- } else {
- return $this->task->$prop;
- }
- }
- public function __set($prop, $value) {
- $prop = Helpers::camelCaseToUnderscore($prop);
- if ($this->isCommonProperty($prop)) {
- $this->queue->$prop = $value;
- $this->task->$prop = $value;
- } elseif ($this->isQueueProperty($prop)) {
- $this->queue->$prop = $value;
- } else {
- $this->task->$prop = $value;
- }
- }
- public function __call($name, $args) {
- $obj = method_exists($this->queue, $name) ? $this->queue : $this->task;
- $callback = [$obj, $name];
- if (is_callable($callback)) {
- return call_user_func_array($callback, $args);
- }
- }
- private function isQueueProperty($prop) {
- return in_array($prop, $this->queueFields);
- }
- private function isCommonProperty($prop) {
- return in_array($prop, $this->commonFields);
- }
- public static function getScheduledQueues($amount = self::RESULT_BATCH_SIZE) {
- $wp = new WPFunctions();
- $tasks = ScheduledTask::tableAlias('tasks')
- ->select('tasks.*')
- ->join(SendingQueue::$_table, 'tasks.id = queues.task_id', 'queues')
- ->whereNull('tasks.deleted_at')
- ->where('tasks.status', ScheduledTask::STATUS_SCHEDULED)
- ->whereLte('tasks.scheduled_at', Carbon::createFromTimestamp($wp->currentTime('timestamp')))
- ->where('tasks.type', 'sending')
- ->orderByAsc('tasks.updated_at')
- ->limit($amount)
- ->findMany();
- return static::createManyFromTasks($tasks);
- }
- public static function getRunningQueues($amount = self::RESULT_BATCH_SIZE) {
- $tasks = ScheduledTask::orderByAsc('priority')
- ->orderByAsc('updated_at')
- ->whereNull('deleted_at')
- ->whereNull('status')
- ->where('type', 'sending')
- ->limit($amount)
- ->findMany();
- return static::createManyFromTasks($tasks);
- }
- }
|