Keine Beschreibung

Sending.php 8.9KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. <?php
  2. namespace MailPoet\Tasks;
  3. if (!defined('ABSPATH')) exit;
  4. use MailPoet\Logging\LoggerFactory;
  5. use MailPoet\Models\ScheduledTask;
  6. use MailPoet\Models\ScheduledTaskSubscriber;
  7. use MailPoet\Models\SendingQueue;
  8. use MailPoet\Util\Helpers;
  9. use MailPoet\WP\Functions as WPFunctions;
  10. use MailPoetVendor\Carbon\Carbon;
  11. /**
  12. * A facade class containing all necessary models to work with a sending queue
  13. * @property string|null $status
  14. * @property int $taskId
  15. * @property int $id
  16. * @property int $newsletterId
  17. * @property string $newsletterRenderedSubject
  18. * @property string|array $newsletterRenderedBody
  19. * @property bool $nonExistentColumn
  20. * @property string $scheduledAt
  21. * @property int $priority
  22. */
  23. class Sending {
  24. const TASK_TYPE = 'sending';
  25. const RESULT_BATCH_SIZE = 5;
  26. /** @var ScheduledTask */
  27. private $task;
  28. /** @var SendingQueue */
  29. private $queue;
  30. /** @var Subscribers */
  31. private $taskSubscribers;
  32. private $queueFields = [
  33. 'id',
  34. 'task_id',
  35. 'newsletter_id',
  36. 'newsletter_rendered_subject',
  37. 'newsletter_rendered_body',
  38. 'count_total',
  39. 'count_processed',
  40. 'count_to_process',
  41. 'meta',
  42. ];
  43. private $commonFields = [
  44. 'created_at',
  45. 'updated_at',
  46. 'deleted_at',
  47. ];
  48. private function __construct(
  49. ScheduledTask $task = null,
  50. SendingQueue $queue = null
  51. ) {
  52. if (!$task instanceof ScheduledTask) {
  53. $task = ScheduledTask::create();
  54. $task->type = self::TASK_TYPE;
  55. $task->save();
  56. }
  57. if (!$queue instanceof SendingQueue) {
  58. $queue = SendingQueue::create();
  59. $queue->newsletterId = 0;
  60. $queue->taskId = $task->id;
  61. $queue->save();
  62. }
  63. if ($task->type !== self::TASK_TYPE) {
  64. throw new \Exception('Only tasks of type "' . self::TASK_TYPE . '" are accepted by this class');
  65. }
  66. $this->task = $task;
  67. $this->queue = $queue;
  68. $this->taskSubscribers = new Subscribers($task);
  69. }
  70. public static function create(ScheduledTask $task = null, SendingQueue $queue = null) {
  71. return new self($task, $queue);
  72. }
  73. public static function createManyFromTasks($tasks) {
  74. if (empty($tasks)) {
  75. return [];
  76. }
  77. $tasksIds = array_map(function($task) {
  78. return $task->id;
  79. }, $tasks);
  80. $queues = SendingQueue::whereIn('task_id', $tasksIds)->findMany();
  81. $queuesIndex = [];
  82. foreach ($queues as $queue) {
  83. $queuesIndex[$queue->taskId] = $queue;
  84. }
  85. $result = [];
  86. foreach ($tasks as $task) {
  87. if (!empty($queuesIndex[$task->id])) {
  88. $result[] = self::create($task, $queuesIndex[$task->id]);
  89. } else {
  90. static::handleInvalidTask($task);
  91. }
  92. }
  93. return $result;
  94. }
  95. public static function handleInvalidTask(ScheduledTask $task) {
  96. $loggerFactory = LoggerFactory::getInstance();
  97. $loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addError(
  98. 'invalid sending task found',
  99. ['task_id' => $task->id]
  100. );
  101. $task->status = ScheduledTask::STATUS_INVALID;
  102. $task->save();
  103. }
  104. public static function createFromScheduledTask(ScheduledTask $task) {
  105. $queue = SendingQueue::where('task_id', $task->id)->findOne();
  106. if (!$queue) {
  107. return false;
  108. }
  109. return self::create($task, $queue);
  110. }
  111. public static function createFromQueue(SendingQueue $queue) {
  112. $task = $queue->task()->findOne();
  113. if (!$task) {
  114. return false;
  115. }
  116. return self::create($task, $queue);
  117. }
  118. public static function getByNewsletterId($newsletterId) {
  119. $queue = SendingQueue::where('newsletter_id', $newsletterId)
  120. ->orderByDesc('updated_at')
  121. ->findOne();
  122. if (!$queue instanceof SendingQueue) {
  123. return false;
  124. }
  125. return self::createFromQueue($queue);
  126. }
  127. public function asArray() {
  128. $queue = array_intersect_key(
  129. $this->queue->asArray(),
  130. array_flip($this->queueFields)
  131. );
  132. $task = $this->task->asArray();
  133. return array_merge($task, $queue);
  134. }
  135. public function getErrors() {
  136. $queueErrors = $this->queue->getErrors();
  137. $taskErrors = $this->task->getErrors();
  138. if (empty($queueErrors) && empty($taskErrors)) {
  139. return false;
  140. }
  141. return array_merge((array)$queueErrors, (array)$taskErrors);
  142. }
  143. public function save() {
  144. $this->task->save();
  145. $this->queue->save();
  146. $errors = $this->getErrors();
  147. if ($errors) {
  148. $loggerFactory = LoggerFactory::getInstance();
  149. $loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->addError(
  150. 'error saving sending task',
  151. ['task_id' => $this->task->id, 'queue_id' => $this->queue->id, 'errors' => $errors]
  152. );
  153. }
  154. return $this;
  155. }
  156. public function delete() {
  157. $this->taskSubscribers->removeAllSubscribers();
  158. $this->task->delete();
  159. $this->queue->delete();
  160. }
  161. public function queue() {
  162. return $this->queue;
  163. }
  164. public function task() {
  165. return $this->task;
  166. }
  167. public function taskSubscribers() {
  168. return $this->taskSubscribers;
  169. }
  170. public function getSubscribers($processed = null) {
  171. $subscribers = $this->taskSubscribers->getSubscribers();
  172. if (!is_null($processed)) {
  173. $status = ($processed) ? ScheduledTaskSubscriber::STATUS_PROCESSED : ScheduledTaskSubscriber::STATUS_UNPROCESSED;
  174. $subscribers->where('processed', $status);
  175. }
  176. $subscribers = $subscribers->findArray();
  177. return array_column($subscribers, 'subscriber_id');
  178. }
  179. public function setSubscribers(array $subscriberIds) {
  180. $this->taskSubscribers->setSubscribers($subscriberIds);
  181. $this->updateCount();
  182. }
  183. public function removeSubscribers(array $subscriberIds) {
  184. $this->taskSubscribers->removeSubscribers($subscriberIds);
  185. $this->updateCount();
  186. }
  187. public function removeAllSubscribers() {
  188. $this->taskSubscribers->removeAllSubscribers();
  189. $this->updateCount();
  190. }
  191. public function updateProcessedSubscribers(array $processedSubscribers) {
  192. $this->taskSubscribers->updateProcessedSubscribers($processedSubscribers);
  193. return $this->updateCount()->getErrors() === false;
  194. }
  195. public function saveSubscriberError($subcriberId, $errorMessage) {
  196. $this->taskSubscribers->saveSubscriberError($subcriberId, $errorMessage);
  197. return $this->updateCount()->getErrors() === false;
  198. }
  199. public function updateCount() {
  200. $this->queue->countProcessed = ScheduledTaskSubscriber::getProcessedCount($this->task->id);
  201. $this->queue->countToProcess = ScheduledTaskSubscriber::getUnprocessedCount($this->task->id);
  202. $this->queue->countTotal = $this->queue->countProcessed + $this->queue->countToProcess;
  203. return $this->queue->save();
  204. }
  205. public function hydrate(array $data) {
  206. foreach ($data as $k => $v) {
  207. $this->__set($k, $v);
  208. }
  209. }
  210. public function validate() {
  211. return $this->queue->validate() && $this->task->validate();
  212. }
  213. public function getMeta() {
  214. return $this->queue->getMeta();
  215. }
  216. public function __isset($prop) {
  217. $prop = Helpers::camelCaseToUnderscore($prop);
  218. if ($this->isQueueProperty($prop)) {
  219. return isset($this->queue->$prop);
  220. } else {
  221. return isset($this->task->$prop);
  222. }
  223. }
  224. public function __get($prop) {
  225. $prop = Helpers::camelCaseToUnderscore($prop);
  226. if ($this->isQueueProperty($prop)) {
  227. return $this->queue->$prop;
  228. } else {
  229. return $this->task->$prop;
  230. }
  231. }
  232. public function __set($prop, $value) {
  233. $prop = Helpers::camelCaseToUnderscore($prop);
  234. if ($this->isCommonProperty($prop)) {
  235. $this->queue->$prop = $value;
  236. $this->task->$prop = $value;
  237. } elseif ($this->isQueueProperty($prop)) {
  238. $this->queue->$prop = $value;
  239. } else {
  240. $this->task->$prop = $value;
  241. }
  242. }
  243. public function __call($name, $args) {
  244. $obj = method_exists($this->queue, $name) ? $this->queue : $this->task;
  245. $callback = [$obj, $name];
  246. if (is_callable($callback)) {
  247. return call_user_func_array($callback, $args);
  248. }
  249. }
  250. private function isQueueProperty($prop) {
  251. return in_array($prop, $this->queueFields);
  252. }
  253. private function isCommonProperty($prop) {
  254. return in_array($prop, $this->commonFields);
  255. }
  256. public static function getScheduledQueues($amount = self::RESULT_BATCH_SIZE) {
  257. $wp = new WPFunctions();
  258. $tasks = ScheduledTask::tableAlias('tasks')
  259. ->select('tasks.*')
  260. ->join(SendingQueue::$_table, 'tasks.id = queues.task_id', 'queues')
  261. ->whereNull('tasks.deleted_at')
  262. ->where('tasks.status', ScheduledTask::STATUS_SCHEDULED)
  263. ->whereLte('tasks.scheduled_at', Carbon::createFromTimestamp($wp->currentTime('timestamp')))
  264. ->where('tasks.type', 'sending')
  265. ->orderByAsc('tasks.updated_at')
  266. ->limit($amount)
  267. ->findMany();
  268. return static::createManyFromTasks($tasks);
  269. }
  270. public static function getRunningQueues($amount = self::RESULT_BATCH_SIZE) {
  271. $tasks = ScheduledTask::orderByAsc('priority')
  272. ->orderByAsc('updated_at')
  273. ->whereNull('deleted_at')
  274. ->whereNull('status')
  275. ->where('type', 'sending')
  276. ->limit($amount)
  277. ->findMany();
  278. return static::createManyFromTasks($tasks);
  279. }
  280. }