Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
7
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Open sidebar
Sudhanshu Sane
VTK-m
Commits
5fa086f7
Commit
5fa086f7
authored
Jan 08, 2019
by
Robert Maynard
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'upstream-diy' into fix_diy2_unused_call_result_warning
* upstream-diy: diy 2019-01-08 (839fd11e)
parents
1ba38c59
c445f969
Changes
25
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
25 changed files
with
1408 additions
and
693 deletions
+1408
-693
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/assigner.hpp
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/assigner.hpp
+3
-3
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/critical-resource.hpp
...rdparty/diy/vtkmdiy/include/vtkmdiy/critical-resource.hpp
+2
-8
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/detail/collectives.hpp
...dparty/diy/vtkmdiy/include/vtkmdiy/detail/collectives.hpp
+0
-54
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/detail/master/collectives.hpp
...diy/vtkmdiy/include/vtkmdiy/detail/master/collectives.hpp
+131
-0
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/detail/master/commands.hpp
...ty/diy/vtkmdiy/include/vtkmdiy/detail/master/commands.hpp
+22
-0
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/detail/master/communication.hpp
...y/vtkmdiy/include/vtkmdiy/detail/master/communication.hpp
+202
-0
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/detail/master/execution.hpp
...y/diy/vtkmdiy/include/vtkmdiy/detail/master/execution.hpp
+157
-0
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/grid.hpp
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/grid.hpp
+46
-1
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/io/shared.hpp
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/io/shared.hpp
+49
-0
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/io/utils.hpp
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/io/utils.hpp
+5
-4
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/link.hpp
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/link.hpp
+8
-1
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/master.hpp
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/master.hpp
+516
-591
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/mpi/communicator.hpp
...irdparty/diy/vtkmdiy/include/vtkmdiy/mpi/communicator.hpp
+25
-5
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/mpi/datatypes.hpp
.../thirdparty/diy/vtkmdiy/include/vtkmdiy/mpi/datatypes.hpp
+3
-3
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/mpi/io.hpp
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/mpi/io.hpp
+3
-1
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/no-thread.hpp
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/no-thread.hpp
+11
-7
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/proxy.hpp
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/proxy.hpp
+71
-6
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/resolve.hpp
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/resolve.hpp
+72
-0
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/serialization.hpp
.../thirdparty/diy/vtkmdiy/include/vtkmdiy/serialization.hpp
+44
-0
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/stats.hpp
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/stats.hpp
+7
-3
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/storage.hpp
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/storage.hpp
+8
-0
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/time.hpp
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/time.hpp
+3
-3
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/types.hpp
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/types.hpp
+10
-1
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/version.hpp
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/version.hpp
+8
-0
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/vertices.hpp
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/vertices.hpp
+2
-2
No files found.
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/assigner.hpp
View file @
5fa086f7
...
...
@@ -121,7 +121,7 @@ namespace diy
inline
std
::
tuple
<
bool
,
int
>
get_rank
(
int
&
rk
,
int
gid
)
const
;
inline
void
set_rank
(
int
rk
,
int
gid
,
bool
flush
=
true
);
inline
void
set_rank
(
const
int
&
rk
,
int
gid
,
bool
flush
=
true
);
inline
void
set_ranks
(
const
std
::
vector
<
std
::
tuple
<
int
,
int
>>&
rank_gids
);
std
::
tuple
<
int
,
int
>
...
...
@@ -223,7 +223,7 @@ diy::DynamicAssigner::
ranks
(
const
std
::
vector
<
int
>&
gids
)
const
{
bool
all_cached
=
true
;
std
::
vector
<
int
>
result
(
gids
.
size
());
std
::
vector
<
int
>
result
(
gids
.
size
()
,
-
1
);
for
(
size_t
i
=
0
;
i
<
gids
.
size
();
++
i
)
{
auto
cached_gidrk
=
get_rank
(
result
[
i
],
gids
[
i
]);
...
...
@@ -239,7 +239,7 @@ ranks(const std::vector<int>& gids) const
void
diy
::
DynamicAssigner
::
set_rank
(
int
rk
,
int
gid
,
bool
flush
)
set_rank
(
const
int
&
rk
,
int
gid
,
bool
flush
)
{
// TODO: update cache
...
...
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/critical-resource.hpp
View file @
5fa086f7
...
...
@@ -3,20 +3,14 @@
namespace
diy
{
// TODO: when not running under C++11, i.e., when lock_guard is TinyThread's
// lock_guard, and not C++11's unique_lock, this implementation might
// be buggy since the copy constructor is invoked when
// critical_resource::access() returns an instance of this class. Once
// the temporary is destroyed the mutex is unlocked. I'm not 100%
// certain of this because I'd expect a deadlock on copy constructor,
// but it's clearly not happening -- so I may be missing something.
// (This issue will take care of itself in DIY3 once we switch to C++11 completely.)
template
<
class
T
,
class
Mutex
>
class
resource_accessor
{
public:
resource_accessor
(
T
&
x
,
Mutex
&
m
)
:
x_
(
x
),
lock_
(
m
)
{}
resource_accessor
(
resource_accessor
&&
)
=
default
;
resource_accessor
(
const
resource_accessor
&
)
=
delete
;
T
&
operator
*
()
{
return
x_
;
}
T
*
operator
->
()
{
return
&
x_
;
}
...
...
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/detail/collectives.hpp
deleted
100644 → 0
View file @
1ba38c59
#ifndef DIY_COLLECTIVES_HPP
#define DIY_COLLECTIVES_HPP
namespace
diy
{
namespace
detail
{
struct
CollectiveOp
{
virtual
void
init
()
=
0
;
virtual
void
update
(
const
CollectiveOp
&
other
)
=
0
;
virtual
void
global
(
const
mpi
::
communicator
&
comm
)
=
0
;
virtual
void
copy_from
(
const
CollectiveOp
&
other
)
=
0
;
virtual
void
result_out
(
void
*
dest
)
const
=
0
;
virtual
~
CollectiveOp
()
{}
};
template
<
class
T
,
class
Op
>
struct
AllReduceOp
:
public
CollectiveOp
{
AllReduceOp
(
const
T
&
x
,
Op
op
)
:
in_
(
x
),
op_
(
op
)
{}
void
init
()
{
out_
=
in_
;
}
void
update
(
const
CollectiveOp
&
other
)
{
out_
=
op_
(
out_
,
static_cast
<
const
AllReduceOp
&>
(
other
).
in_
);
}
void
global
(
const
mpi
::
communicator
&
comm
)
{
T
res
;
mpi
::
all_reduce
(
comm
,
out_
,
res
,
op_
);
out_
=
res
;
}
void
copy_from
(
const
CollectiveOp
&
other
)
{
out_
=
static_cast
<
const
AllReduceOp
&>
(
other
).
out_
;
}
void
result_out
(
void
*
dest
)
const
{
*
reinterpret_cast
<
T
*>
(
dest
)
=
out_
;
}
private:
T
in_
,
out_
;
Op
op_
;
};
template
<
class
T
>
struct
Scratch
:
public
CollectiveOp
{
Scratch
(
const
T
&
x
)
:
x_
(
x
)
{}
void
init
()
{}
void
update
(
const
CollectiveOp
&
)
{}
void
global
(
const
mpi
::
communicator
&
)
{}
void
copy_from
(
const
CollectiveOp
&
)
{}
void
result_out
(
void
*
dest
)
const
{
*
reinterpret_cast
<
T
*>
(
dest
)
=
x_
;
}
private:
T
x_
;
};
}
}
#endif
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/detail/master/collectives.hpp
0 → 100644
View file @
5fa086f7
namespace
diy
{
namespace
detail
{
struct
CollectiveOp
{
virtual
void
init
()
=
0
;
virtual
void
update
(
const
CollectiveOp
&
other
)
=
0
;
virtual
void
global
(
const
mpi
::
communicator
&
comm
)
=
0
;
virtual
void
copy_from
(
const
CollectiveOp
&
other
)
=
0
;
virtual
void
result_out
(
void
*
dest
)
const
=
0
;
virtual
~
CollectiveOp
()
{}
};
template
<
class
T
,
class
Op
>
struct
AllReduceOp
:
public
CollectiveOp
{
AllReduceOp
(
const
T
&
x
,
Op
op
)
:
in_
(
x
),
op_
(
op
)
{}
void
init
()
{
out_
=
in_
;
}
void
update
(
const
CollectiveOp
&
other
)
{
out_
=
op_
(
out_
,
static_cast
<
const
AllReduceOp
&>
(
other
).
in_
);
}
void
global
(
const
mpi
::
communicator
&
comm
)
{
T
res
;
mpi
::
all_reduce
(
comm
,
out_
,
res
,
op_
);
out_
=
res
;
}
void
copy_from
(
const
CollectiveOp
&
other
)
{
out_
=
static_cast
<
const
AllReduceOp
&>
(
other
).
out_
;
}
void
result_out
(
void
*
dest
)
const
{
*
reinterpret_cast
<
T
*>
(
dest
)
=
out_
;
}
private:
T
in_
,
out_
;
Op
op_
;
};
template
<
class
T
>
struct
Scratch
:
public
CollectiveOp
{
Scratch
(
const
T
&
x
)
:
x_
(
x
)
{}
void
init
()
{}
void
update
(
const
CollectiveOp
&
)
{}
void
global
(
const
mpi
::
communicator
&
)
{}
void
copy_from
(
const
CollectiveOp
&
)
{}
void
result_out
(
void
*
dest
)
const
{
*
reinterpret_cast
<
T
*>
(
dest
)
=
x_
;
}
private:
T
x_
;
};
}
struct
Master
::
Collective
{
Collective
()
:
cop_
(
0
)
{}
Collective
(
detail
::
CollectiveOp
*
cop
)
:
cop_
(
cop
)
{}
Collective
(
Collective
&&
other
)
:
cop_
(
0
)
{
swap
(
const_cast
<
Collective
&>
(
other
));
}
~
Collective
()
{
delete
cop_
;
}
Collective
&
operator
=
(
const
Collective
&
other
)
=
delete
;
Collective
(
Collective
&
other
)
=
delete
;
void
init
()
{
cop_
->
init
();
}
void
swap
(
Collective
&
other
)
{
std
::
swap
(
cop_
,
other
.
cop_
);
}
void
update
(
const
Collective
&
other
)
{
cop_
->
update
(
*
other
.
cop_
);
}
void
global
(
const
mpi
::
communicator
&
c
)
{
cop_
->
global
(
c
);
}
void
copy_from
(
Collective
&
other
)
const
{
cop_
->
copy_from
(
*
other
.
cop_
);
}
void
result_out
(
void
*
x
)
const
{
cop_
->
result_out
(
x
);
}
detail
::
CollectiveOp
*
cop_
;
};
struct
Master
::
CollectivesList
:
public
std
::
list
<
Collective
>
{};
struct
Master
::
CollectivesMap
:
public
std
::
map
<
int
,
CollectivesList
>
{};
}
diy
::
Master
::
CollectivesMap
&
diy
::
Master
::
collectives
()
{
return
*
collectives_
;
}
diy
::
Master
::
CollectivesList
&
diy
::
Master
::
collectives
(
int
gid__
)
{
return
(
*
collectives_
)[
gid__
];
}
void
diy
::
Master
::
process_collectives
()
{
auto
scoped
=
prof
.
scoped
(
"collectives"
);
DIY_UNUSED
(
scoped
);
if
(
collectives
().
empty
())
return
;
using
CollectivesIterator
=
CollectivesList
::
iterator
;
std
::
vector
<
CollectivesIterator
>
iters
;
std
::
vector
<
int
>
gids
;
for
(
auto
&
x
:
collectives
())
{
gids
.
push_back
(
x
.
first
);
iters
.
push_back
(
x
.
second
.
begin
());
}
while
(
iters
[
0
]
!=
collectives
().
begin
()
->
second
.
end
())
{
iters
[
0
]
->
init
();
for
(
unsigned
j
=
1
;
j
<
iters
.
size
();
++
j
)
{
// NB: this assumes that the operations are commutative
iters
[
0
]
->
update
(
*
iters
[
j
]);
}
iters
[
0
]
->
global
(
comm_
);
// do the mpi collective
for
(
unsigned
j
=
1
;
j
<
iters
.
size
();
++
j
)
{
iters
[
j
]
->
copy_from
(
*
iters
[
0
]);
++
iters
[
j
];
}
++
iters
[
0
];
}
}
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/detail/master/commands.hpp
0 → 100644
View file @
5fa086f7
namespace
diy
{
struct
Master
::
BaseCommand
{
virtual
~
BaseCommand
()
{}
// to delete derived classes
virtual
void
execute
(
void
*
b
,
const
ProxyWithLink
&
cp
)
const
=
0
;
virtual
bool
skip
(
int
i
,
const
Master
&
master
)
const
=
0
;
};
template
<
class
Block
>
struct
Master
::
Command
:
public
BaseCommand
{
Command
(
Callback
<
Block
>
f_
,
const
Skip
&
s_
)
:
f
(
f_
),
s
(
s_
)
{}
void
execute
(
void
*
b
,
const
ProxyWithLink
&
cp
)
const
override
{
f
(
static_cast
<
Block
*>
(
b
),
cp
);
}
bool
skip
(
int
i
,
const
Master
&
m
)
const
override
{
return
s
(
i
,
m
);
}
Callback
<
Block
>
f
;
Skip
s
;
};
}
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/detail/master/communication.hpp
0 → 100644
View file @
5fa086f7
namespace
diy
{
struct
Master
::
tags
{
enum
{
queue
,
piece
};
};
struct
Master
::
MessageInfo
{
int
from
,
to
;
int
round
;
};
struct
Master
::
InFlightSend
{
std
::
shared_ptr
<
MemoryBuffer
>
message
;
mpi
::
request
request
;
MessageInfo
info
;
// for debug purposes
};
struct
Master
::
InFlightRecv
{
MemoryBuffer
message
;
MessageInfo
info
{
-
1
,
-
1
,
-
1
};
bool
done
=
false
;
inline
void
recv
(
mpi
::
communicator
&
comm
,
const
mpi
::
status
&
status
);
inline
void
place
(
IncomingRound
*
in
,
bool
unload
,
ExternalStorage
*
storage
,
IExchangeInfo
*
iexchange
);
void
reset
()
{
*
this
=
InFlightRecv
();
}
};
struct
Master
::
InFlightRecvsMap
:
public
std
::
map
<
int
,
InFlightRecv
>
{};
struct
Master
::
InFlightSendsList
:
public
std
::
list
<
InFlightSend
>
{};
struct
Master
::
GidSendOrder
{
size_t
size
()
const
{
return
list
.
size
();
}
bool
empty
()
const
{
return
list
.
empty
();
}
int
pop
()
{
int
x
=
list
.
front
();
list
.
pop_front
();
return
x
;
}
std
::
list
<
int
>
list
;
size_t
limit
=
0
;
};
struct
Master
::
IExchangeInfo
{
IExchangeInfo
()
:
n
(
0
)
{}
IExchangeInfo
(
size_t
n_
,
mpi
::
communicator
comm_
)
:
n
(
n_
),
comm
(
comm_
),
global_work_
(
new
mpi
::
window
<
int
>
(
comm
,
1
))
{
global_work_
->
lock_all
(
MPI_MODE_NOCHECK
);
}
~
IExchangeInfo
()
{
global_work_
->
unlock_all
();
}
inline
void
not_done
(
int
gid
);
inline
int
global_work
();
// get global work status (for debugging)
inline
bool
all_done
();
// get global all done status
inline
void
reset_work
();
// reset global work counter
inline
int
add_work
(
int
work
);
// add work to global work counter
int
inc_work
()
{
return
add_work
(
1
);
}
// increment global work counter
int
dec_work
()
{
return
add_work
(
-
1
);
}
// decremnent global work counter
size_t
n
;
mpi
::
communicator
comm
;
std
::
unordered_map
<
int
,
bool
>
done
;
// gid -> done
std
::
unique_ptr
<
mpi
::
window
<
int
>>
global_work_
;
// global work to do
std
::
shared_ptr
<
spd
::
logger
>
log
=
get_logger
();
};
// VectorWindow is used to send and receive subsets of a contiguous array in-place
namespace
detail
{
template
<
typename
T
>
struct
VectorWindow
{
T
*
begin
;
size_t
count
;
};
}
// namespace detail
namespace
mpi
{
namespace
detail
{
template
<
typename
T
>
struct
is_mpi_datatype
<
diy
::
detail
::
VectorWindow
<
T
>
>
{
typedef
true_type
type
;
};
template
<
typename
T
>
struct
mpi_datatype
<
diy
::
detail
::
VectorWindow
<
T
>
>
{
using
VecWin
=
diy
::
detail
::
VectorWindow
<
T
>
;
static
MPI_Datatype
datatype
()
{
return
get_mpi_datatype
<
T
>
();
}
static
const
void
*
address
(
const
VecWin
&
x
)
{
return
x
.
begin
;
}
static
void
*
address
(
VecWin
&
x
)
{
return
x
.
begin
;
}
static
int
count
(
const
VecWin
&
x
)
{
return
static_cast
<
int
>
(
x
.
count
);
}
};
}
}
// namespace mpi::detail
}
// namespace diy
void
diy
::
Master
::
IExchangeInfo
::
not_done
(
int
gid
)
{
if
(
done
[
gid
])
{
done
[
gid
]
=
false
;
int
work
=
inc_work
();
log
->
debug
(
"[{}] Incrementing work when switching done (on receipt): work = {}
\n
"
,
gid
,
work
);
}
else
log
->
debug
(
"[{}] Not done, no need to increment work
\n
"
,
gid
);
}
diy
::
Master
::
InFlightRecv
&
diy
::
Master
::
inflight_recv
(
int
proc
)
{
return
(
*
inflight_recvs_
)[
proc
];
}
diy
::
Master
::
InFlightSendsList
&
diy
::
Master
::
inflight_sends
()
{
return
*
inflight_sends_
;
}
// receive message described by status
void
diy
::
Master
::
InFlightRecv
::
recv
(
mpi
::
communicator
&
comm
,
const
mpi
::
status
&
status
)
{
if
(
info
.
from
==
-
1
)
// uninitialized
{
MemoryBuffer
bb
;
comm
.
recv
(
status
.
source
(),
status
.
tag
(),
bb
.
buffer
);
if
(
status
.
tag
()
==
tags
::
piece
)
// first piece is the header
{
size_t
msg_size
;
diy
::
load
(
bb
,
msg_size
);
diy
::
load
(
bb
,
info
);
message
.
buffer
.
reserve
(
msg_size
);
}
else
// tags::queue
{
diy
::
load_back
(
bb
,
info
);
message
.
swap
(
bb
);
}
}
else
{
size_t
start_idx
=
message
.
buffer
.
size
();
size_t
count
=
status
.
count
<
char
>
();
message
.
buffer
.
resize
(
start_idx
+
count
);
detail
::
VectorWindow
<
char
>
window
;
window
.
begin
=
&
message
.
buffer
[
start_idx
];
window
.
count
=
count
;
comm
.
recv
(
status
.
source
(),
status
.
tag
(),
window
);
}
if
(
status
.
tag
()
==
tags
::
queue
)
done
=
true
;
}
// once the InFlightRecv is done, place it either out of core or in the appropriate incoming queue
void
diy
::
Master
::
InFlightRecv
::
place
(
IncomingRound
*
in
,
bool
unload
,
ExternalStorage
*
storage
,
IExchangeInfo
*
iexchange
)
{
size_t
size
=
message
.
size
();
int
from
=
info
.
from
;
int
to
=
info
.
to
;
int
external
=
-
1
;
if
(
unload
)
{
get_logger
()
->
debug
(
"Directly unloading queue {} <- {}"
,
to
,
from
);
external
=
storage
->
put
(
message
);
// unload directly
}
else
if
(
!
iexchange
)
{
in
->
map
[
to
].
queues
[
from
].
swap
(
message
);
in
->
map
[
to
].
queues
[
from
].
reset
();
// buffer position = 0
}
else
// iexchange
{
auto
log
=
get_logger
();
iexchange
->
not_done
(
to
);
in
->
map
[
to
].
queues
[
from
].
append_binary
(
&
message
.
buffer
[
0
],
message
.
size
());
// append insted of overwrite
int
work
=
iexchange
->
dec_work
();
log
->
debug
(
"[{}] Decrementing work after receiving: work = {}
\n
"
,
to
,
work
);
}
in
->
map
[
to
].
records
[
from
]
=
QueueRecord
(
size
,
external
);
++
(
in
->
received
);
}
vtkm/thirdparty/diy/vtkmdiy/include/vtkmdiy/detail/master/execution.hpp
0 → 100644
View file @
5fa086f7
struct
diy
::
Master
::
ProcessBlock
{
ProcessBlock
(
Master
&
master_
,
const
std
::
deque
<
int
>&
blocks__
,
int
local_limit_
,
critical_resource
<
int
>&
idx_
)
:
master
(
master_
),
blocks
(
blocks__
),
local_limit
(
local_limit_
),
idx
(
idx_
)
{}
ProcessBlock
(
const
ProcessBlock
&
)
=
delete
;
ProcessBlock
(
ProcessBlock
&&
)
=
default
;
void
operator
()()
{
master
.
log
->
debug
(
"Processing with thread: {}"
,
this_thread
::
get_id
());
std
::
vector
<
int
>
local
;
do
{
int
cur
=
(
*
idx
.
access
())
++
;
if
((
size_t
)
cur
>=
blocks
.
size
())
return
;
int
i
=
blocks
[
cur
];
if
(
master
.
block
(
i
))
{
if
(
local
.
size
()
==
(
size_t
)
local_limit
)
master
.
unload
(
local
);
local
.
push_back
(
i
);
}
master
.
log
->
debug
(
"Processing block: {}"
,
master
.
gid
(
i
));
bool
skip
=
all_skip
(
i
);
IncomingQueuesMap
&
current_incoming
=
master
.
incoming_
[
master
.
exchange_round_
].
map
;
if
(
master
.
block
(
i
)
==
0
)
// block unloaded
{
if
(
skip
)
master
.
load_queues
(
i
);
// even though we are skipping the block, the queues might be necessary
else
{
if
(
local
.
size
()
==
(
size_t
)
local_limit
)
// reached the local limit
master
.
unload
(
local
);
master
.
load
(
i
);
local
.
push_back
(
i
);
}
}
for
(
auto
&
cmd
:
master
.
commands_
)
{
cmd
->
execute
(
skip
?
0
:
master
.
block
(
i
),
master
.
proxy
(
i
));
// no longer need them, so get rid of them
current_incoming
[
master
.
gid
(
i
)].
queues
.
clear
();
current_incoming
[
master
.
gid
(
i
)].
records
.
clear
();
}
if
(
skip
&&
master
.
block
(
i
)
==
0
)
master
.
unload_queues
(
i
);
// even though we are skipping the block, the queues might be necessary
}
while
(
true
);
}
bool
all_skip
(
int
i
)
const
{
bool
skip
=
true
;
for
(
auto
&
cmd
:
master
.
commands_
)
{
if
(
!
cmd
->
skip
(
i
,
master
))
{
skip
=
false
;
break
;
}
}
return
skip
;
}
Master
&
master
;
const
std
::
deque
<
int
>&
blocks
;
int
local_limit
;
critical_resource
<
int
>&
idx
;
};
void
diy
::
Master
::
execute
()
{
log
->
debug
(
"Entered execute()"
);
auto
scoped
=
prof
.
scoped
(
"execute"
);
DIY_UNUSED
(
scoped
);
//show_incoming_records();
// touch the outgoing and incoming queues as well as collectives to make sure they exist
for
(
unsigned
i
=
0
;
i
<
size
();
++
i
)
{