StarPU Internal Handbook
workers.h
1/* StarPU --- Runtime system for heterogeneous multicore architectures.
2 *
3 * Copyright (C) 2008-2021 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
4 * Copyright (C) 2013 Thibaut Lambert
5 * Copyright (C) 2016 Uppsala University
6 *
7 * StarPU is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU Lesser General Public License as published by
9 * the Free Software Foundation; either version 2.1 of the License, or (at
10 * your option) any later version.
11 *
12 * StarPU is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
15 *
16 * See the GNU Lesser General Public License in COPYING.LGPL for more details.
17 */
18
19#ifndef __WORKERS_H__
20#define __WORKERS_H__
21
23/* @{ */
24
25#include <limits.h>
26
27#include <starpu.h>
28#include <common/config.h>
29#include <common/timing.h>
30#include <common/fxt.h>
31#include <common/thread.h>
32#include <common/utils.h>
33#include <core/jobs.h>
35#include <core/sched_policy.h>
36#include <core/topology.h>
37#include <core/errorcheck.h>
38#include <core/sched_ctx.h>
39#include <core/sched_ctx_list.h>
40#include <core/simgrid.h>
41#ifdef STARPU_HAVE_HWLOC
42#include <hwloc.h>
43#endif
44
45#include <core/drivers.h>
48
49#ifdef STARPU_USE_MIC
51#endif /* STARPU_USE_MIC */
52
53#ifdef STARPU_USE_MPI_MASTER_SLAVE
55#endif
56
58
60
61#include <starpu_parameters.h>
62
63#define STARPU_MAX_PIPELINE 4
64
65enum initialization { UNINITIALIZED = 0, CHANGING, INITIALIZED };
66
67struct _starpu_ctx_change_list;
68
71 struct _starpu_machine_config *config;
72 starpu_pthread_mutex_t mutex;
73 enum starpu_worker_archtype arch;
74 uint32_t worker_mask;
75 struct starpu_perfmodel_arch perf_arch;
76 starpu_pthread_t worker_thread;
77 unsigned devid;
78 unsigned subworkerid;
79 int bindid;
84 starpu_pthread_cond_t started_cond;
85 starpu_pthread_cond_t ready_cond;
86 unsigned memory_node;
92 starpu_pthread_cond_t sched_cond;
93 starpu_pthread_mutex_t sched_mutex;
95#ifdef STARPU_SPINLOCK_CHECK
96 const char *relax_on_file;
97 int relax_on_line;
98 const char *relax_on_func;
99 const char *relax_off_file;
100 int relax_off_line;
101 const char *relax_off_func;
102#endif
120 starpu_pthread_t thread_changing_ctx;
128 struct _starpu_ctx_change_list ctx_change_list;
129 struct starpu_task_list local_tasks;
130 struct starpu_task **local_ordered_tasks;
134 struct starpu_task *current_task;
135 struct starpu_task *current_tasks[STARPU_MAX_PIPELINE];
136#ifdef STARPU_SIMGRID
137 starpu_pthread_wait_t wait;
138#endif
139
140 struct timespec cl_start;
141 struct timespec cl_expend;
142 struct timespec cl_end;
143 unsigned char first_task;
144 unsigned char ntasks;
145 unsigned char pipeline_length;
146 unsigned char pipeline_stuck;
148 unsigned worker_is_running;
149 unsigned worker_is_initialized;
152 char name[128];
153 char short_name[32];
154 unsigned run_by_starpu;
155 struct _starpu_driver_ops *driver_ops;
156
157 struct _starpu_sched_ctx_list *sched_ctx_list;
158 int tmp_sched_ctx;
159 unsigned nsched_ctxs;
160 struct _starpu_barrier_counter tasks_barrier;
162 unsigned has_prev_init;
164 unsigned removed_from_ctx[STARPU_NMAX_SCHED_CTXS+1];
165
170 struct starpu_task *task_transferring;
176 unsigned shares_tasks_lists[STARPU_NMAX_SCHED_CTXS+1];
177
178 unsigned poped_in_ctx[STARPU_NMAX_SCHED_CTXS+1];
184 unsigned reverse_phase[2];
185
189 struct _starpu_sched_ctx *stream_ctx;
190
191#ifdef __GLIBC__
192 cpu_set_t cpu_set;
193#endif /* __GLIBC__ */
194#ifdef STARPU_HAVE_HWLOC
195 hwloc_bitmap_t hwloc_cpu_set;
196 hwloc_obj_t hwloc_obj;
197#endif
198
200 char padding[STARPU_CACHELINE_SIZE];
201);
202
204{
205 struct starpu_perfmodel_arch perf_arch;
206 uint32_t worker_mask;
207 int worker_size;
208 unsigned memory_node;
209 int combined_workerid[STARPU_NMAXWORKERS];
210#ifdef STARPU_USE_MP
211 int count;
212 starpu_pthread_mutex_t count_mutex;
213#endif
214
215#ifdef __GLIBC__
216 cpu_set_t cpu_set;
217#endif /* __GLIBC__ */
218#ifdef STARPU_HAVE_HWLOC
219 hwloc_bitmap_t hwloc_cpu_set;
220#endif
221
224 char padding[STARPU_CACHELINE_SIZE];
225};
226
232{
233 starpu_pthread_mutex_t mutex;
234 starpu_pthread_t worker_thread;
235 unsigned nworkers;
236 unsigned started;
237 void *retval;
238 struct _starpu_worker *workers;
239 starpu_pthread_cond_t ready_cond;
240 unsigned set_is_initialized;
241};
242
243#ifdef STARPU_USE_MPI_MASTER_SLAVE
244extern struct _starpu_worker_set mpi_worker_set[STARPU_MAXMPIDEVS];
245#endif
246
248{
250 unsigned nworkers;
251
254
255 unsigned nsched_ctxs;
256
257#ifdef STARPU_HAVE_HWLOC
259 hwloc_topology_t hwtopology;
260#endif
262 struct starpu_tree *tree;
263
267 unsigned nhwcpus;
268
272 unsigned nhwpus;
273
277 unsigned nhwcudagpus;
278
283
287 unsigned nhwmpi;
288
290 unsigned ncpus;
291
293 unsigned ncudagpus;
294 unsigned nworkerpercuda;
295 int cuda_th_per_stream;
296 int cuda_th_per_dev;
297
299 unsigned nopenclgpus;
300
302 unsigned nmpidevices;
303 unsigned nhwmpidevices;
304
305 unsigned nhwmpicores[STARPU_MAXMPIDEVS];
306 unsigned nmpicores[STARPU_MAXMPIDEVS];
307
311 unsigned nmicdevices;
312
313 unsigned nhwmiccores[STARPU_MAXMICDEVS];
314 unsigned nmiccores[STARPU_MAXMICDEVS];
315
323 unsigned workers_bindid[STARPU_NMAXWORKERS];
324
330
337 unsigned workers_cuda_gpuid[STARPU_NMAXWORKERS];
338
345 unsigned workers_opencl_gpuid[STARPU_NMAXWORKERS];
346
347 /*** Indicates the successive MIC devices that should be used
348 * by the MIC driver. It is either filled according to the
349 * user's explicit parameters (from starpu_conf) or according
350 * to the STARPU_WORKERS_MICID env. variable. Otherwise, they
351 * are taken in ID order. */
355 unsigned workers_mpi_ms_deviceid[STARPU_NMAXWORKERS];
356};
357
359{
360 struct _starpu_machine_topology topology;
361
362#ifdef STARPU_HAVE_HWLOC
363 int cpu_depth;
364 int pu_depth;
365#endif
366
369 char currently_bound[STARPU_NMAXWORKERS];
370 char currently_shared[STARPU_NMAXWORKERS];
371
374
377
380
383
394
396 char padding1[STARPU_CACHELINE_SIZE];
397
400 struct _starpu_worker workers[STARPU_NMAXWORKERS];
401
404 struct _starpu_combined_worker combined_workers[STARPU_NMAX_COMBINEDWORKERS];
405
406 starpu_pthread_mutex_t submitted_mutex;
407
409 char padding2[STARPU_CACHELINE_SIZE];
410
412 struct
413 {
414 int *workerids;
415 unsigned nworkers;
417 unsigned nbindid;
422 uint32_t worker_mask;
423
425 struct starpu_conf conf;
426
428 unsigned running;
429
430 int disable_kernels;
431
435
437 struct _starpu_sched_ctx sched_ctxs[STARPU_NMAX_SCHED_CTXS+1];
438
440 unsigned submitting;
441
442 int watchdog_ok;
443};
444
445extern int _starpu_worker_parallel_blocks;
446
447extern struct _starpu_machine_config _starpu_config STARPU_ATTRIBUTE_INTERNAL;
448extern int _starpu_keys_initialized STARPU_ATTRIBUTE_INTERNAL;
449extern starpu_pthread_key_t _starpu_worker_key STARPU_ATTRIBUTE_INTERNAL;
450extern starpu_pthread_key_t _starpu_worker_set_key STARPU_ATTRIBUTE_INTERNAL;
451
453void _starpu_set_argc_argv(int *argc, char ***argv);
454int *_starpu_get_argc();
455char ***_starpu_get_argv();
456
458void _starpu_conf_check_environment(struct starpu_conf *conf);
459
461void _starpu_may_pause(void);
462
464static inline unsigned _starpu_machine_is_running(void)
465{
466 unsigned ret;
467 /* running is just protected by a memory barrier */
468 STARPU_RMB();
469
470 ANNOTATE_HAPPENS_AFTER(&_starpu_config.running);
471 ret = _starpu_config.running;
472 ANNOTATE_HAPPENS_BEFORE(&_starpu_config.running);
473 return ret;
474}
475
476
478void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu_machine_config *pconfig);
479
481uint32_t _starpu_worker_exists(struct starpu_task *);
482
484uint32_t _starpu_can_submit_cuda_task(void);
485
487uint32_t _starpu_can_submit_cpu_task(void);
488
490uint32_t _starpu_can_submit_opencl_task(void);
491
494unsigned _starpu_worker_can_block(unsigned memnode, struct _starpu_worker *worker);
495
499void _starpu_block_worker(int workerid, starpu_pthread_cond_t *cond, starpu_pthread_mutex_t *mutex);
500
502void _starpu_driver_start(struct _starpu_worker *worker, unsigned fut_key, unsigned sync);
504void _starpu_worker_start(struct _starpu_worker *worker, unsigned fut_key, unsigned sync);
505
506static inline unsigned _starpu_worker_get_count(void)
507{
508 return _starpu_config.topology.nworkers;
509}
510#define starpu_worker_get_count _starpu_worker_get_count
511
515static inline void _starpu_set_local_worker_key(struct _starpu_worker *worker)
516{
517 STARPU_ASSERT(_starpu_keys_initialized);
518 STARPU_PTHREAD_SETSPECIFIC(_starpu_worker_key, worker);
519}
520
523static inline struct _starpu_worker *_starpu_get_local_worker_key(void)
524{
525 if (!_starpu_keys_initialized)
526 return NULL;
527 return (struct _starpu_worker *) STARPU_PTHREAD_GETSPECIFIC(_starpu_worker_key);
528}
529
533static inline void _starpu_set_local_worker_set_key(struct _starpu_worker_set *worker)
534{
535 STARPU_ASSERT(_starpu_keys_initialized);
536 STARPU_PTHREAD_SETSPECIFIC(_starpu_worker_set_key, worker);
537}
538
541static inline struct _starpu_worker_set *_starpu_get_local_worker_set_key(void)
542{
543 if (!_starpu_keys_initialized)
544 return NULL;
545 return (struct _starpu_worker_set *) STARPU_PTHREAD_GETSPECIFIC(_starpu_worker_set_key);
546}
547
550static inline struct _starpu_worker *_starpu_get_worker_struct(unsigned id)
551{
552 STARPU_ASSERT(id < starpu_worker_get_count());
553 return &_starpu_config.workers[id];
554}
555
558static inline struct _starpu_sched_ctx *_starpu_get_sched_ctx_struct(unsigned id)
559{
560 return (id > STARPU_NMAX_SCHED_CTXS) ? NULL : &_starpu_config.sched_ctxs[id];
561}
562
563struct _starpu_combined_worker *_starpu_get_combined_worker_struct(unsigned id);
564
567static inline struct _starpu_machine_config *_starpu_get_machine_config(void)
568{
569 return &_starpu_config;
570}
571
573static inline int _starpu_get_disable_kernels(void)
574{
575 return _starpu_config.disable_kernels;
576}
577
579static inline enum _starpu_worker_status _starpu_worker_get_status(int workerid)
580{
581 return _starpu_config.workers[workerid].status;
582}
583
586static inline void _starpu_worker_set_status(int workerid, enum _starpu_worker_status status)
587{
588 _starpu_config.workers[workerid].status = status;
589}
590
592static inline struct _starpu_sched_ctx* _starpu_get_initial_sched_ctx(void)
593{
594 return &_starpu_config.sched_ctxs[STARPU_GLOBAL_SCHED_CTX];
595}
596
597int starpu_worker_get_nids_by_type(enum starpu_worker_archtype type, int *workerids, int maxsize);
598
601int starpu_worker_get_nids_ctx_free_by_type(enum starpu_worker_archtype type, int *workerids, int maxsize);
602
603static inline unsigned _starpu_worker_mutex_is_sched_mutex(int workerid, starpu_pthread_mutex_t *mutex)
604{
605 struct _starpu_worker *w = _starpu_get_worker_struct(workerid);
606 return &w->sched_mutex == mutex;
607}
608
609static inline int _starpu_worker_get_nsched_ctxs(int workerid)
610{
611 return _starpu_config.workers[workerid].nsched_ctxs;
612}
613
615static inline unsigned _starpu_get_nsched_ctxs(void)
616{
617 /* topology.nsched_ctxs may be increased asynchronously in sched_ctx_create */
618 STARPU_RMB();
619 return _starpu_config.topology.nsched_ctxs;
620}
621
623static inline int _starpu_worker_get_id(void)
624{
625 struct _starpu_worker * worker;
626
627 worker = _starpu_get_local_worker_key();
628 if (worker)
629 {
630 return worker->workerid;
631 }
632 else
633 {
634 /* there is no worker associated to that thread, perhaps it is
635 * a thread from the application or this is some SPU worker */
636 return -1;
637 }
638}
639#define starpu_worker_get_id _starpu_worker_get_id
640
643static inline unsigned __starpu_worker_get_id_check(const char *f, int l)
644{
645 (void) l;
646 (void) f;
647 int id = starpu_worker_get_id();
648 STARPU_ASSERT_MSG(id>=0, "%s:%d Cannot be called from outside a worker\n", f, l);
649 return id;
650}
651#define _starpu_worker_get_id_check(f,l) __starpu_worker_get_id_check(f,l)
652
653enum starpu_node_kind _starpu_worker_get_node_kind(enum starpu_worker_archtype type);
654
655void _starpu_worker_set_stream_ctx(unsigned workerid, struct _starpu_sched_ctx *sched_ctx);
656
657struct _starpu_sched_ctx* _starpu_worker_get_ctx_stream(unsigned stream_workerid);
658
664static inline void _starpu_worker_request_blocking_in_parallel(struct _starpu_worker * const worker)
665{
666 _starpu_worker_parallel_blocks = 1;
667 /* flush pending requests to start on a fresh transaction epoch */
668 while (worker->state_unblock_in_parallel_req)
669 STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
670
671 /* announce blocking intent */
672 STARPU_ASSERT(worker->block_in_parallel_ref_count < UINT_MAX);
674
675 if (worker->block_in_parallel_ref_count == 1)
676 {
677 /* only the transition from 0 to 1 triggers the block_in_parallel_req */
678
679 STARPU_ASSERT(!worker->state_blocked_in_parallel);
680 STARPU_ASSERT(!worker->state_block_in_parallel_req);
681 STARPU_ASSERT(!worker->state_block_in_parallel_ack);
682 STARPU_ASSERT(!worker->state_unblock_in_parallel_req);
683 STARPU_ASSERT(!worker->state_unblock_in_parallel_ack);
684
685 /* trigger the block_in_parallel_req */
686 worker->state_block_in_parallel_req = 1;
687 STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
688#ifdef STARPU_SIMGRID
689 starpu_pthread_queue_broadcast(&_starpu_simgrid_task_queue[worker->workerid]);
690#endif
691
692 /* wait for block_in_parallel_req to be processed */
693 while (!worker->state_block_in_parallel_ack)
694 STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
695
696 STARPU_ASSERT(worker->block_in_parallel_ref_count >= 1);
697 STARPU_ASSERT(worker->state_block_in_parallel_req);
698 STARPU_ASSERT(worker->state_blocked_in_parallel);
699
700 /* reset block_in_parallel_req state flags */
701 worker->state_block_in_parallel_req = 0;
702 worker->state_block_in_parallel_ack = 0;
703
704 /* broadcast block_in_parallel_req state flags reset */
705 STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
706 }
707}
708
713static inline void _starpu_worker_request_unblocking_in_parallel(struct _starpu_worker * const worker)
714{
715 /* flush pending requests to start on a fresh transaction epoch */
716 while (worker->state_block_in_parallel_req)
717 STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
718
719 /* unblocking may be requested unconditionnally
720 * thus, check is unblocking is really needed */
721 if (worker->state_blocked_in_parallel)
722 {
723 if (worker->block_in_parallel_ref_count == 1)
724 {
725 /* only the transition from 1 to 0 triggers the unblock_in_parallel_req */
726
727 STARPU_ASSERT(!worker->state_block_in_parallel_req);
728 STARPU_ASSERT(!worker->state_block_in_parallel_ack);
729 STARPU_ASSERT(!worker->state_unblock_in_parallel_req);
730 STARPU_ASSERT(!worker->state_unblock_in_parallel_ack);
731
732 /* trigger the unblock_in_parallel_req */
734 STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
735
736 /* wait for the unblock_in_parallel_req to be processed */
737 while (!worker->state_unblock_in_parallel_ack)
738 STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
739
740 STARPU_ASSERT(worker->state_unblock_in_parallel_req);
741 STARPU_ASSERT(!worker->state_blocked_in_parallel);
742
743 /* reset unblock_in_parallel_req state flags */
746
747 /* broadcast unblock_in_parallel_req state flags reset */
748 STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
749 }
750
751 /* announce unblocking complete */
752 STARPU_ASSERT(worker->block_in_parallel_ref_count > 0);
754 }
755}
756
762static inline void _starpu_worker_process_block_in_parallel_requests(struct _starpu_worker * const worker)
763{
764 while (worker->state_block_in_parallel_req)
765 {
766 STARPU_ASSERT(!worker->state_blocked_in_parallel);
767 STARPU_ASSERT(!worker->state_block_in_parallel_ack);
768 STARPU_ASSERT(!worker->state_unblock_in_parallel_req);
769 STARPU_ASSERT(!worker->state_unblock_in_parallel_ack);
770 STARPU_ASSERT(worker->block_in_parallel_ref_count > 0);
771
772 /* enter effective blocked state */
773 worker->state_blocked_in_parallel = 1;
774
775 /* notify block_in_parallel_req processing */
776 worker->state_block_in_parallel_ack = 1;
777 STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
778
779 /* block */
780 while (!worker->state_unblock_in_parallel_req)
781 STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
782
783 STARPU_ASSERT(worker->state_blocked_in_parallel);
784 STARPU_ASSERT(!worker->state_block_in_parallel_req);
785 STARPU_ASSERT(!worker->state_block_in_parallel_ack);
786 STARPU_ASSERT(!worker->state_unblock_in_parallel_ack);
787 STARPU_ASSERT(worker->block_in_parallel_ref_count > 0);
788
789 /* leave effective blocked state */
790 worker->state_blocked_in_parallel = 0;
791
792 /* notify unblock_in_parallel_req processing */
794 STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
795 }
796}
797
814#ifdef STARPU_SPINLOCK_CHECK
815static inline void __starpu_worker_enter_sched_op(struct _starpu_worker * const worker, const char*file, int line, const char* func)
816#else
817static inline void _starpu_worker_enter_sched_op(struct _starpu_worker * const worker)
818#endif
819{
820 STARPU_ASSERT(!worker->state_sched_op_pending);
822 {
823 /* process pending block requests before entering a sched_op region */
824 _starpu_worker_process_block_in_parallel_requests(worker);
825 while (worker->state_changing_ctx_notice)
826 {
827 STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
828
829 /* new block requests may have been triggered during the wait,
830 * need to check again */
831 _starpu_worker_process_block_in_parallel_requests(worker);
832 }
833 }
834 else
835 {
836 /* if someone observed the worker state since the last call, postpone block request
837 * processing for one sched_op turn more, because the observer will not have seen
838 * new block requests between its observation and now.
839 *
840 * however, the worker still has to wait for context change operations to complete
841 * before entering sched_op again*/
842 while (worker->state_changing_ctx_notice)
843 {
844 STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
845 }
846 }
847
848 /* no block request and no ctx change ahead,
849 * enter sched_op */
850 worker->state_sched_op_pending = 1;
852 worker->state_relax_refcnt = 0;
853#ifdef STARPU_SPINLOCK_CHECK
854 worker->relax_on_file = file;
855 worker->relax_on_line = line;
856 worker->relax_on_func = func;
857#endif
858}
859#ifdef STARPU_SPINLOCK_CHECK
860#define _starpu_worker_enter_sched_op(worker) __starpu_worker_enter_sched_op((worker), __FILE__, __LINE__, __starpu_func__)
861#endif
862
868#ifdef STARPU_SPINLOCK_CHECK
869static inline void __starpu_worker_leave_sched_op(struct _starpu_worker * const worker, const char*file, int line, const char* func)
870#else
871static inline void _starpu_worker_leave_sched_op(struct _starpu_worker * const worker)
872#endif
873{
874 STARPU_ASSERT(worker->state_sched_op_pending);
875 worker->state_relax_refcnt = 1;
876#ifdef STARPU_SPINLOCK_CHECK
877 worker->relax_off_file = file;
878 worker->relax_off_line = line;
879 worker->relax_off_func = func;
880#endif
881 worker->state_sched_op_pending = 0;
882 STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
884}
885#ifdef STARPU_SPINLOCK_CHECK
886#define _starpu_worker_leave_sched_op(worker) __starpu_worker_leave_sched_op((worker), __FILE__, __LINE__, __starpu_func__)
887#endif
888
889static inline int _starpu_worker_sched_op_pending(void)
890{
891 int workerid = starpu_worker_get_id();
892 if (workerid == -1)
893 return 0;
894 struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
895 STARPU_ASSERT(worker != NULL);
896 return worker->state_sched_op_pending;
897}
898
908static inline void _starpu_worker_enter_changing_ctx_op(struct _starpu_worker * const worker)
909{
910 STARPU_ASSERT(!starpu_pthread_equal(worker->thread_changing_ctx, starpu_pthread_self()));
911 /* flush pending requests to start on a fresh transaction epoch */
912 while (worker->state_changing_ctx_notice)
913 STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
914
915 /* announce changing_ctx intent
916 *
917 * - an already started sched_op is allowed to complete
918 * - no new sched_op may be started
919 */
920 worker->state_changing_ctx_notice = 1;
921
922 worker->thread_changing_ctx = starpu_pthread_self();
923
924 /* allow for an already started sched_op to complete */
925 if (worker->state_sched_op_pending)
926 {
927 /* request sched_op to broadcast when way is cleared */
928 worker->state_changing_ctx_waiting = 1;
929
930 /* wait for sched_op completion */
931 STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
932#ifdef STARPU_SIMGRID
933 starpu_pthread_queue_broadcast(&_starpu_simgrid_task_queue[worker->workerid]);
934#endif
935 do
936 {
937 STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
938 }
939 while (worker->state_sched_op_pending);
940
941 /* reset flag so other sched_ops wont have to broadcast state */
942 worker->state_changing_ctx_waiting = 0;
943 }
944}
945
950static inline void _starpu_worker_leave_changing_ctx_op(struct _starpu_worker * const worker)
951{
952 worker->thread_changing_ctx = (starpu_pthread_t)0;
953 worker->state_changing_ctx_notice = 0;
954 STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
955}
956
959#ifdef STARPU_SPINLOCK_CHECK
960static inline void __starpu_worker_relax_on(const char*file, int line, const char* func)
961#else
962static inline void _starpu_worker_relax_on(void)
963#endif
964{
965 struct _starpu_worker *worker = _starpu_get_local_worker_key();
966 if (worker == NULL)
967 return;
968 if (!worker->state_sched_op_pending)
969 return;
970 STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
971#ifdef STARPU_SPINLOCK_CHECK
972 STARPU_ASSERT_MSG(worker->state_relax_refcnt<UINT_MAX, "relax last turn on in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
973#else
974 STARPU_ASSERT(worker->state_relax_refcnt<UINT_MAX);
975#endif
976 worker->state_relax_refcnt++;
977#ifdef STARPU_SPINLOCK_CHECK
978 worker->relax_on_file = file;
979 worker->relax_on_line = line;
980 worker->relax_on_func = func;
981#endif
982 STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
983 STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
984}
985#ifdef STARPU_SPINLOCK_CHECK
986#define _starpu_worker_relax_on() __starpu_worker_relax_on(__FILE__, __LINE__, __starpu_func__)
987#endif
988#define starpu_worker_relax_on _starpu_worker_relax_on
989
991#ifdef STARPU_SPINLOCK_CHECK
992static inline void __starpu_worker_relax_on_locked(struct _starpu_worker *worker, const char*file, int line, const char* func)
993#else
994static inline void _starpu_worker_relax_on_locked(struct _starpu_worker *worker)
995#endif
996{
997 if (!worker->state_sched_op_pending)
998 return;
999#ifdef STARPU_SPINLOCK_CHECK
1000 STARPU_ASSERT_MSG(worker->state_relax_refcnt<UINT_MAX, "relax last turn on in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
1001#else
1002 STARPU_ASSERT(worker->state_relax_refcnt<UINT_MAX);
1003#endif
1004 worker->state_relax_refcnt++;
1005#ifdef STARPU_SPINLOCK_CHECK
1006 worker->relax_on_file = file;
1007 worker->relax_on_line = line;
1008 worker->relax_on_func = func;
1009#endif
1010 STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
1011}
1012#ifdef STARPU_SPINLOCK_CHECK
1013#define _starpu_worker_relax_on_locked(worker) __starpu_worker_relax_on_locked(worker,__FILE__, __LINE__, __starpu_func__)
1014#endif
1015
1016#ifdef STARPU_SPINLOCK_CHECK
1017static inline void __starpu_worker_relax_off(const char*file, int line, const char* func)
1018#else
1019static inline void _starpu_worker_relax_off(void)
1020#endif
1021{
1022 int workerid = starpu_worker_get_id();
1023 if (workerid == -1)
1024 return;
1025 struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1026 STARPU_ASSERT(worker != NULL);
1027 if (!worker->state_sched_op_pending)
1028 return;
1029 STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
1030#ifdef STARPU_SPINLOCK_CHECK
1031 STARPU_ASSERT_MSG(worker->state_relax_refcnt>0, "relax last turn off in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
1032#else
1033 STARPU_ASSERT(worker->state_relax_refcnt>0);
1034#endif
1035 worker->state_relax_refcnt--;
1036#ifdef STARPU_SPINLOCK_CHECK
1037 worker->relax_off_file = file;
1038 worker->relax_off_line = line;
1039 worker->relax_off_func = func;
1040#endif
1041 STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
1042}
1043#ifdef STARPU_SPINLOCK_CHECK
1044#define _starpu_worker_relax_off() __starpu_worker_relax_off(__FILE__, __LINE__, __starpu_func__)
1045#endif
1046#define starpu_worker_relax_off _starpu_worker_relax_off
1047
1048#ifdef STARPU_SPINLOCK_CHECK
1049static inline void __starpu_worker_relax_off_locked(const char*file, int line, const char* func)
1050#else
1051static inline void _starpu_worker_relax_off_locked(void)
1052#endif
1053{
1054 int workerid = starpu_worker_get_id();
1055 if (workerid == -1)
1056 return;
1057 struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1058 STARPU_ASSERT(worker != NULL);
1059 if (!worker->state_sched_op_pending)
1060 return;
1061#ifdef STARPU_SPINLOCK_CHECK
1062 STARPU_ASSERT_MSG(worker->state_relax_refcnt>0, "relax last turn off in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
1063#else
1064 STARPU_ASSERT(worker->state_relax_refcnt>0);
1065#endif
1066 worker->state_relax_refcnt--;
1067#ifdef STARPU_SPINLOCK_CHECK
1068 worker->relax_off_file = file;
1069 worker->relax_off_line = line;
1070 worker->relax_off_func = func;
1071#endif
1072}
1073#ifdef STARPU_SPINLOCK_CHECK
1074#define _starpu_worker_relax_off_locked() __starpu_worker_relax_off_locked(__FILE__, __LINE__, __starpu_func__)
1075#endif
1076
1077static inline int _starpu_worker_get_relax_state(void)
1078{
1079 int workerid = starpu_worker_get_id();
1080 if (workerid < 0)
1081 return 1;
1082 struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1083 STARPU_ASSERT(worker != NULL);
1084 return worker->state_relax_refcnt != 0;
1085}
1086#define starpu_worker_get_relax_state _starpu_worker_get_relax_state
1087
1092static inline void _starpu_worker_lock(int workerid)
1093{
1094 struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1095 STARPU_ASSERT(worker != NULL);
1096 int cur_workerid = starpu_worker_get_id();
1097 if (workerid != cur_workerid)
1098 {
1099 starpu_worker_relax_on();
1100
1101 STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
1102 while (!worker->state_relax_refcnt)
1103 {
1104 STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
1105 }
1106 }
1107 else
1108 {
1109 STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
1110 }
1111}
1112
1113static inline int _starpu_worker_trylock(int workerid)
1114{
1115 struct _starpu_worker *cur_worker = _starpu_get_local_worker_key();
1116 int cur_workerid = cur_worker->workerid;
1117 struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1118 STARPU_ASSERT(worker != NULL);
1119
1120 /* Start with ourself */
1121 int ret = STARPU_PTHREAD_MUTEX_TRYLOCK_SCHED(&cur_worker->sched_mutex);
1122 if (ret)
1123 return ret;
1124 if (workerid == cur_workerid)
1125 /* We only needed to lock ourself */
1126 return 0;
1127
1128 /* Now try to lock the other worker */
1129 ret = STARPU_PTHREAD_MUTEX_TRYLOCK_SCHED(&worker->sched_mutex);
1130 if (!ret)
1131 {
1132 /* Good, check that it is relaxed */
1133 ret = !worker->state_relax_refcnt;
1134 if (ret)
1135 STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
1136 }
1137 if (!ret)
1138 _starpu_worker_relax_on_locked(cur_worker);
1139 STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&cur_worker->sched_mutex);
1140 return ret;
1141}
1142
1143static inline void _starpu_worker_unlock(int workerid)
1144{
1145 struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1146 STARPU_ASSERT(worker != NULL);
1147 STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
1148 int cur_workerid = starpu_worker_get_id();
1149 if (workerid != cur_workerid)
1150 {
1151 starpu_worker_relax_off();
1152 }
1153}
1154
1155static inline void _starpu_worker_lock_self(void)
1156{
1157 int workerid = starpu_worker_get_id_check();
1158 struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1159 STARPU_ASSERT(worker != NULL);
1160 STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
1161}
1162
1163static inline void _starpu_worker_unlock_self(void)
1164{
1165 int workerid = starpu_worker_get_id_check();
1166 struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1167 STARPU_ASSERT(worker != NULL);
1168 STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
1169}
1170
1171static inline int _starpu_wake_worker_relax(int workerid)
1172{
1173 _starpu_worker_lock(workerid);
1174 int ret = starpu_wake_worker_locked(workerid);
1175 _starpu_worker_unlock(workerid);
1176 return ret;
1177}
1178
1179int starpu_wake_worker_relax_light(int workerid);
1180
1185void _starpu_worker_refuse_task(struct _starpu_worker *worker, struct starpu_task *task);
1186
1187/* @}*/
1188
1189#endif // __WORKERS_H__
Definition: barrier_counter.h:27
_starpu_worker_status
Definition: errorcheck.h:26
void _starpu_worker_apply_deferred_ctx_changes(void)
Definition: sched_ctx_list.h:25
Definition: workers.h:204
char padding[STARPU_CACHELINE_SIZE]
Definition: workers.h:224
unsigned memory_node
Definition: workers.h:208
struct starpu_perfmodel_arch perf_arch
Definition: workers.h:205
uint32_t worker_mask
Definition: workers.h:206
Definition: drivers.h:24
Definition: workers.h:359
int mpi_nodeid
Definition: workers.h:393
struct _starpu_combined_worker combined_workers[STARPU_NMAX_COMBINEDWORKERS]
Definition: workers.h:404
int opencl_nodeid
Definition: workers.h:389
unsigned running
Definition: workers.h:428
int current_cuda_gpuid
Definition: workers.h:373
unsigned nbindid
Definition: workers.h:417
int pause_depth
Definition: workers.h:434
int current_opencl_gpuid
Definition: workers.h:376
int cpus_nodeid
Definition: workers.h:385
int mic_nodeid
Definition: workers.h:391
struct _starpu_machine_config::@4 * bindid_workers
struct _starpu_sched_ctx sched_ctxs[STARPU_NMAX_SCHED_CTXS+1]
Definition: workers.h:437
int current_bindid
Definition: workers.h:368
struct _starpu_worker workers[STARPU_NMAXWORKERS]
Definition: workers.h:400
uint32_t worker_mask
Definition: workers.h:422
int current_mic_deviceid
Definition: workers.h:379
int cuda_nodeid
Definition: workers.h:387
char padding2[STARPU_CACHELINE_SIZE]
Definition: workers.h:409
unsigned submitting
Definition: workers.h:440
char padding1[STARPU_CACHELINE_SIZE]
Definition: workers.h:396
int current_mpi_deviceid
Definition: workers.h:382
struct starpu_conf conf
Definition: workers.h:425
Definition: workers.h:248
struct starpu_tree * tree
Definition: workers.h:262
unsigned nmpidevices
Definition: workers.h:302
unsigned nhwmicdevices
Definition: workers.h:310
hwloc_topology_t hwtopology
Definition: workers.h:259
unsigned workers_mpi_ms_deviceid[STARPU_NMAXWORKERS]
Definition: workers.h:355
unsigned workers_cuda_gpuid[STARPU_NMAXWORKERS]
Definition: workers.h:337
unsigned nhwopenclgpus
Definition: workers.h:282
unsigned workers_opencl_gpuid[STARPU_NMAXWORKERS]
Definition: workers.h:345
unsigned nworkers
Definition: workers.h:250
unsigned ncudagpus
Definition: workers.h:293
unsigned nhwmpi
Definition: workers.h:287
unsigned ncpus
Definition: workers.h:290
unsigned ncombinedworkers
Definition: workers.h:253
unsigned nhwcudagpus
Definition: workers.h:277
unsigned nopenclgpus
Definition: workers.h:299
unsigned workers_nbindid
Definition: workers.h:329
unsigned nhwmpicores[STARPU_MAXMPIDEVS]
Definition: workers.h:305
unsigned nhwpus
Definition: workers.h:272
unsigned workers_bindid[STARPU_NMAXWORKERS]
Definition: workers.h:323
unsigned nhwmiccores[STARPU_MAXMICDEVS]
Definition: workers.h:313
unsigned nhwcpus
Definition: workers.h:267
Definition: sched_ctx.h:46
unsigned id
Definition: sched_ctx.h:48
Definition: workers.h:232
unsigned started
Definition: workers.h:236
starpu_pthread_t worker_thread
Definition: workers.h:234
starpu_pthread_cond_t ready_cond
Definition: workers.h:239
Definition: workers.h:70
unsigned state_blocked_in_parallel_observed
Definition: workers.h:107
struct starpu_task * current_task
Definition: workers.h:134
unsigned state_block_in_parallel_ack
Definition: workers.h:109
unsigned state_relax_refcnt
Definition: workers.h:94
struct starpu_task * task_transferring
Definition: workers.h:170
unsigned state_changing_ctx_waiting
Definition: workers.h:104
unsigned subworkerid
Definition: workers.h:78
unsigned devid
Definition: workers.h:77
starpu_pthread_t thread_changing_ctx
Definition: workers.h:120
int current_rank
Definition: workers.h:82
unsigned state_unblock_in_parallel_req
Definition: workers.h:110
unsigned nb_buffers_transferred
Definition: workers.h:168
starpu_pthread_t worker_thread
Definition: workers.h:76
starpu_pthread_cond_t ready_cond
Definition: workers.h:85
unsigned memory_node
Definition: workers.h:86
int workerid
Definition: workers.h:80
unsigned nb_buffers_totransfer
Definition: workers.h:169
unsigned state_keep_awake
Definition: workers.h:151
struct _starpu_worker_set * set
Definition: workers.h:147
int worker_size
Definition: workers.h:83
unsigned pop_ctx_priority
Definition: workers.h:186
unsigned state_unblock_in_parallel_ack
Definition: workers.h:111
unsigned numa_memory_node
Definition: workers.h:87
unsigned char pipeline_stuck
Definition: workers.h:146
unsigned char pipeline_length
Definition: workers.h:145
unsigned nsched_ctxs
Definition: workers.h:159
unsigned spinning_backoff
Definition: workers.h:166
unsigned state_blocked_in_parallel
Definition: workers.h:106
unsigned current_ordered_task_order
Definition: workers.h:133
starpu_pthread_cond_t started_cond
Definition: workers.h:84
starpu_pthread_mutex_t sched_mutex
Definition: workers.h:93
enum starpu_worker_archtype arch
Definition: workers.h:73
unsigned run_by_starpu
Definition: workers.h:154
unsigned local_ordered_tasks_size
Definition: workers.h:131
struct starpu_task ** local_ordered_tasks
Definition: workers.h:130
unsigned is_slave_somewhere
Definition: workers.h:187
starpu_pthread_cond_t sched_cond
Definition: workers.h:92
unsigned has_prev_init
Definition: workers.h:162
unsigned char ntasks
Definition: workers.h:144
int bindid
Definition: workers.h:79
unsigned state_block_in_parallel_req
Definition: workers.h:108
unsigned state_changing_ctx_notice
Definition: workers.h:105
unsigned state_sched_op_pending
Definition: workers.h:103
int combined_workerid
Definition: workers.h:81
unsigned char first_task
Definition: workers.h:143
unsigned current_ordered_task
Definition: workers.h:132
unsigned block_in_parallel_ref_count
Definition: workers.h:119
uint32_t worker_mask
Definition: workers.h:74
enum _starpu_worker_status status
Definition: workers.h:150