Nessuna descrizione

CronWorkerRunner.php 5.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. <?php
  2. namespace MailPoet\Cron;
  3. if (!defined('ABSPATH')) exit;
  4. use MailPoet\Entities\ScheduledTaskEntity;
  5. use MailPoet\Models\ScheduledTask;
  6. use MailPoet\Newsletter\Sending\ScheduledTasksRepository;
  7. use MailPoet\WP\Functions as WPFunctions;
  8. use MailPoetVendor\Carbon\Carbon;
  9. class CronWorkerRunner {
  10. const TASK_BATCH_SIZE = 5;
  11. const TASK_RUN_TIMEOUT = 120;
  12. const TIMED_OUT_TASK_RESCHEDULE_TIMEOUT = 5;
  13. /** @var float */
  14. private $timer;
  15. /** @var CronHelper */
  16. private $cronHelper;
  17. /** @var CronWorkerScheduler */
  18. private $cronWorkerScheduler;
  19. /** @var WPFunctions */
  20. private $wp;
  21. /** @var ScheduledTasksRepository */
  22. private $scheduledTasksRepository;
  23. public function __construct(
  24. CronHelper $cronHelper,
  25. CronWorkerScheduler $cronWorkerScheduler,
  26. WPFunctions $wp,
  27. ScheduledTasksRepository $scheduledTasksRepository
  28. ) {
  29. $this->timer = microtime(true);
  30. $this->cronHelper = $cronHelper;
  31. $this->cronWorkerScheduler = $cronWorkerScheduler;
  32. $this->wp = $wp;
  33. $this->scheduledTasksRepository = $scheduledTasksRepository;
  34. }
  35. public function run(CronWorkerInterface $worker) {
  36. // abort if execution limit is reached
  37. $this->cronHelper->enforceExecutionLimit($this->timer);
  38. $dueTasks = $this->getDueTasks($worker);
  39. $runningTasks = $this->getRunningTasks($worker);
  40. if (!$worker->checkProcessingRequirements()) {
  41. foreach (array_merge($dueTasks, $runningTasks) as $task) {
  42. $this->scheduledTasksRepository->remove($task);
  43. $this->scheduledTasksRepository->flush();
  44. }
  45. return false;
  46. }
  47. $worker->init();
  48. if (!$dueTasks && !$runningTasks) {
  49. if ($worker->scheduleAutomatically()) {
  50. $this->cronWorkerScheduler->schedule($worker->getTaskType(), $worker->getNextRunDate());
  51. }
  52. return false;
  53. }
  54. try {
  55. $parisTask = null;
  56. foreach ($dueTasks as $task) {
  57. $parisTask = ScheduledTask::getFromDoctrineEntity($task);
  58. if ($parisTask) {
  59. $this->prepareTask($worker, $parisTask);
  60. }
  61. }
  62. foreach ($runningTasks as $task) {
  63. $parisTask = ScheduledTask::getFromDoctrineEntity($task);
  64. if ($parisTask) {
  65. $this->processTask($worker, $parisTask);
  66. }
  67. }
  68. } catch (\Exception $e) {
  69. if ($parisTask && $e->getCode() !== CronHelper::DAEMON_EXECUTION_LIMIT_REACHED) {
  70. $parisTask->rescheduleProgressively();
  71. }
  72. throw $e;
  73. }
  74. return true;
  75. }
  76. private function getDueTasks(CronWorkerInterface $worker) {
  77. return $this->scheduledTasksRepository->findDueByType($worker->getTaskType(), self::TASK_BATCH_SIZE);
  78. }
  79. private function getRunningTasks(CronWorkerInterface $worker) {
  80. return $this->scheduledTasksRepository->findRunningByType($worker->getTaskType(), self::TASK_BATCH_SIZE);
  81. }
  82. private function prepareTask(CronWorkerInterface $worker, ScheduledTask $task) {
  83. // abort if execution limit is reached
  84. $this->cronHelper->enforceExecutionLimit($this->timer);
  85. $doctrineTask = $this->convertTaskClassToDoctrine($task);
  86. if ($doctrineTask) {
  87. $prepareCompleted = $worker->prepareTaskStrategy($doctrineTask, $this->timer);
  88. if ($prepareCompleted) {
  89. $task->status = null;
  90. $task->save();
  91. }
  92. }
  93. }
  94. private function processTask(CronWorkerInterface $worker, ScheduledTask $task) {
  95. // abort if execution limit is reached
  96. $this->cronHelper->enforceExecutionLimit($this->timer);
  97. if (!$worker->supportsMultipleInstances()) {
  98. if ($this->rescheduleOutdated($task)) {
  99. return false;
  100. }
  101. if ($this->isInProgress($task)) {
  102. return false;
  103. }
  104. }
  105. $this->startProgress($task);
  106. try {
  107. $completed = $worker->processTaskStrategy($task, $this->timer);
  108. } catch (\Exception $e) {
  109. $this->stopProgress($task);
  110. throw $e;
  111. }
  112. if ($completed) {
  113. $this->complete($task);
  114. }
  115. $this->stopProgress($task);
  116. return (bool)$completed;
  117. }
  118. private function rescheduleOutdated(ScheduledTask $task) {
  119. $currentTime = Carbon::createFromTimestamp($this->wp->currentTime('timestamp'));
  120. $updated = strtotime((string)$task->updatedAt);
  121. if ($updated === false) {
  122. // missing updatedAt, consider this task outdated (set year to 2000) and reschedule
  123. $updatedAt = Carbon::createFromDate(2000);
  124. } else {
  125. $updatedAt = Carbon::createFromTimestamp($updated);
  126. }
  127. // If the task is running for too long consider it stuck and reschedule
  128. if (!empty($task->updatedAt) && $updatedAt->diffInMinutes($currentTime, false) > self::TASK_RUN_TIMEOUT) {
  129. $this->stopProgress($task);
  130. $this->cronWorkerScheduler->reschedule($task, self::TIMED_OUT_TASK_RESCHEDULE_TIMEOUT);
  131. return true;
  132. }
  133. return false;
  134. }
  135. private function isInProgress(ScheduledTask $task) {
  136. if (!empty($task->inProgress)) {
  137. // Do not run multiple instances of the task
  138. return true;
  139. }
  140. return false;
  141. }
  142. private function startProgress(ScheduledTask $task) {
  143. $task->inProgress = true;
  144. $task->save();
  145. }
  146. private function stopProgress(ScheduledTask $task) {
  147. $task->inProgress = false;
  148. $task->save();
  149. }
  150. private function complete(ScheduledTask $task) {
  151. $task->processedAt = $this->wp->currentTime('mysql');
  152. $task->status = ScheduledTask::STATUS_COMPLETED;
  153. $task->save();
  154. }
  155. // temporary function to convert an ScheduledTask object to ScheduledTaskEntity while we don't migrate the rest of
  156. // the code in this class to use Doctrine entities
  157. private function convertTaskClassToDoctrine(ScheduledTask $parisTask): ?ScheduledTaskEntity {
  158. $doctrineTask = $this->scheduledTasksRepository->findOneById($parisTask->id);
  159. if (!$doctrineTask instanceof ScheduledTaskEntity) {
  160. return null;
  161. }
  162. return $doctrineTask;
  163. }
  164. }