Brak opisu

ScheduledTask.php 6.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. <?php
  2. namespace MailPoet\Models;
  3. if (!defined('ABSPATH')) exit;
  4. use MailPoet\Entities\ScheduledTaskEntity;
  5. use MailPoet\Util\Helpers;
  6. use MailPoet\WP\Functions as WPFunctions;
  7. use MailPoetVendor\Carbon\Carbon;
  8. use MailPoetVendor\Idiorm\ORM;
  9. /**
  10. * @property int $id
  11. * @property string $processedAt
  12. * @property string|null $status
  13. * @property string|null $type
  14. * @property int $priority
  15. * @property string|null $scheduledAt
  16. * @property bool|null $inProgress
  17. * @property int $rescheduleCount
  18. * @property string|array|null $meta
  19. */
  20. class ScheduledTask extends Model {
  21. public static $_table = MP_SCHEDULED_TASKS_TABLE; // phpcs:ignore PSR2.Classes.PropertyDeclaration
  22. const STATUS_COMPLETED = ScheduledTaskEntity::STATUS_COMPLETED;
  23. const STATUS_SCHEDULED = ScheduledTaskEntity::STATUS_SCHEDULED;
  24. const STATUS_PAUSED = ScheduledTaskEntity::STATUS_PAUSED;
  25. const STATUS_INVALID = ScheduledTaskEntity::STATUS_INVALID;
  26. const VIRTUAL_STATUS_RUNNING = ScheduledTaskEntity::VIRTUAL_STATUS_RUNNING; // For historical reasons this is stored as null in DB
  27. const PRIORITY_HIGH = ScheduledTaskEntity::PRIORITY_HIGH;
  28. const PRIORITY_MEDIUM = ScheduledTaskEntity::PRIORITY_MEDIUM;
  29. const PRIORITY_LOW = ScheduledTaskEntity::PRIORITY_LOW;
  30. const BASIC_RESCHEDULE_TIMEOUT = 5; //minutes
  31. const MAX_RESCHEDULE_TIMEOUT = 1440; //minutes
  32. private $wp;
  33. public function __construct() {
  34. parent::__construct();
  35. $this->wp = WPFunctions::get();
  36. }
  37. public function subscribers() {
  38. return $this->hasManyThrough(
  39. __NAMESPACE__ . '\Subscriber',
  40. __NAMESPACE__ . '\ScheduledTaskSubscriber',
  41. 'task_id',
  42. 'subscriber_id'
  43. );
  44. }
  45. public function pause() {
  46. $this->set('status', self::STATUS_PAUSED);
  47. $this->save();
  48. return ($this->getErrors() === false && $this->id() > 0);
  49. }
  50. public static function pauseAllByNewsletter(Newsletter $newsletter) {
  51. ScheduledTask::rawExecute(
  52. 'UPDATE `' . ScheduledTask::$_table . '` t ' .
  53. 'JOIN `' . SendingQueue::$_table . '` q ON t.`id` = q.`task_id` ' .
  54. 'SET t.`status` = "' . self::STATUS_PAUSED . '" ' .
  55. 'WHERE ' .
  56. 'q.`newsletter_id` = ' . $newsletter->id() .
  57. ' AND t.`status` = "' . self::STATUS_SCHEDULED . '" '
  58. );
  59. }
  60. public function resume() {
  61. $this->setExpr('status', 'NULL');
  62. $this->save();
  63. return ($this->getErrors() === false && $this->id() > 0);
  64. }
  65. public static function setScheduledAllByNewsletter(Newsletter $newsletter) {
  66. ScheduledTask::rawExecute(
  67. 'UPDATE `' . ScheduledTask::$_table . '` t ' .
  68. 'JOIN `' . SendingQueue::$_table . '` q ON t.`id` = q.`task_id` ' .
  69. 'SET t.`status` = "' . self::STATUS_SCHEDULED . '" ' .
  70. 'WHERE ' .
  71. 'q.`newsletter_id` = ' . $newsletter->id() .
  72. ' AND t.`status` = "' . self::STATUS_PAUSED . '" ' .
  73. ' AND t.`scheduled_at` > CURDATE() - INTERVAL 30 DAY'
  74. );
  75. }
  76. public function complete() {
  77. $this->processedAt = $this->wp->currentTime('mysql');
  78. $this->set('status', self::STATUS_COMPLETED);
  79. $this->save();
  80. return ($this->getErrors() === false && $this->id() > 0);
  81. }
  82. public function save() {
  83. // set the default priority to medium
  84. if (!$this->priority) {
  85. $this->priority = self::PRIORITY_MEDIUM;
  86. }
  87. if (!is_null($this->meta) && !Helpers::isJson($this->meta)) {
  88. $this->set(
  89. 'meta',
  90. (string)json_encode($this->meta)
  91. );
  92. }
  93. parent::save();
  94. return $this;
  95. }
  96. public function asArray() {
  97. $model = parent::asArray();
  98. $model['meta'] = $this->getMeta();
  99. return $model;
  100. }
  101. public function getMeta() {
  102. $meta = (Helpers::isJson($this->meta) && is_string($this->meta)) ? json_decode($this->meta, true) : $this->meta;
  103. return !empty($meta) ? (array)$meta : [];
  104. }
  105. public function delete() {
  106. try {
  107. ORM::get_db()->beginTransaction();
  108. ScheduledTaskSubscriber::where('task_id', $this->id)->deleteMany();
  109. parent::delete();
  110. ORM::get_db()->commit();
  111. } catch (\Exception $error) {
  112. ORM::get_db()->rollBack();
  113. throw $error;
  114. }
  115. return null;
  116. }
  117. public function rescheduleProgressively() {
  118. $scheduledAt = Carbon::createFromTimestamp($this->wp->currentTime('timestamp'));
  119. $timeout = (int)min(self::BASIC_RESCHEDULE_TIMEOUT * pow(2, $this->rescheduleCount), self::MAX_RESCHEDULE_TIMEOUT);
  120. $this->scheduledAt = $scheduledAt->addMinutes($timeout);
  121. $this->rescheduleCount++;
  122. $this->status = ScheduledTask::STATUS_SCHEDULED;
  123. $this->save();
  124. return $timeout;
  125. }
  126. public static function touchAllByIds(array $ids) {
  127. ScheduledTask::rawExecute(
  128. 'UPDATE `' . ScheduledTask::$_table . '`' .
  129. 'SET `updated_at` = NOW() ' .
  130. 'WHERE `id` IN (' . join(',', $ids) . ')'
  131. );
  132. }
  133. /**
  134. * @return ScheduledTask|null
  135. */
  136. public static function findOneScheduledByNewsletterIdAndSubscriberId($newsletterId, $subscriberId) {
  137. return ScheduledTask::tableAlias('tasks')
  138. ->select('tasks.*')
  139. ->innerJoin(SendingQueue::$_table, 'queues.task_id = tasks.id', 'queues')
  140. ->innerJoin(ScheduledTaskSubscriber::$_table, 'task_subscribers.task_id = tasks.id', 'task_subscribers')
  141. ->where('queues.newsletter_id', $newsletterId)
  142. ->where('tasks.status', ScheduledTask::STATUS_SCHEDULED)
  143. ->where('task_subscribers.subscriber_id', $subscriberId)
  144. ->whereNull('queues.deleted_at')
  145. ->whereNull('tasks.deleted_at')
  146. ->findOne() ?: null;
  147. }
  148. public static function findFutureScheduledByType($type, $limit = null) {
  149. return self::findByTypeAndStatus($type, ScheduledTask::STATUS_SCHEDULED, $limit, true);
  150. }
  151. private static function findByTypeAndStatus($type, $status, $limit = null, $future = false) {
  152. $query = ScheduledTask::where('type', $type)
  153. ->whereNull('deleted_at');
  154. $now = Carbon::createFromTimestamp(WPFunctions::get()->currentTime('timestamp'));
  155. if ($future) {
  156. $query->whereGt('scheduled_at', $now);
  157. } else {
  158. $query->whereLte('scheduled_at', $now);
  159. }
  160. if ($status === null) {
  161. $query->whereNull('status');
  162. } else {
  163. $query->where('status', $status);
  164. }
  165. if ($limit !== null) {
  166. $query->limit($limit);
  167. }
  168. return $query->findMany();
  169. }
  170. // temporary function to convert an ScheduledTaskEntity object to ScheduledTask while we don't migrate the rest of
  171. // the code in this class to use Doctrine entities
  172. public static function getFromDoctrineEntity(ScheduledTaskEntity $doctrineTask): ?ScheduledTask {
  173. $parisTask = self::findOne($doctrineTask->getId());
  174. if (!$parisTask instanceof ScheduledTask) {
  175. return null;
  176. }
  177. return $parisTask;
  178. }
  179. }