[Source code analysis] Horovod, a distributed training framework for deep learning (3) --- What is done behind Horovodrun

[Source code analysis] Horovod, a distributed training framework for deep learning (3) --- What is done behind Horovodrun

0x00 summary

Horovod is an easy-to-use high-performance distributed training framework released by Uber in 2017, which has been widely used in the industry.

This series will lead everyone to understand Horovod through source code analysis. This article is the third in the series, starting with python to enter the Horovod world and see what Horovodrun has done.

The links to the first two articles are as follows:

[ Source code analysis] Horovod, a distributed training framework for deep learning (1) --- basic knowledge

[ Source code analysis] Horovod, a distributed training framework for deep learning (2) --- From the perspective of users

0x01 background knowledge

First introduce some relevant background knowledge.

1.1 Distributed system

When designing a parallel computer, the most direct way is to share a memory with multiple computing units. Shared memory programming has greater advantages in data exchange and access, and programming is easier. But there is a big bottleneck in scalability.

Another way is distributed memory. That is, each computing unit has a separate memory, and data access between computing units is transmitted through the Internet. This architecture will be much stronger in portability and expansion, but the transmission of messages will become a difficult point in program design.

Combining these two points is the architecture of a distributed shared memory parallel computer, which is also the most commonly used architecture today.

1.2 Parallel task communication

Parallel task communication is generally divided into P2P (Point-to-point communication) and Collective communication.

  • This mode of P2P communication has only one sender and one receiver, that is, point-to-point communication.
  • Collective communication includes multiple senders and multiple receivers.

Collective communication contains some common primitives

  • broadcast
  • reduce, allreduce
  • scatter, scatter reduce
  • gather, allgather
  • ring-base collectives
  • ring-allreduce

Traditional collective communication assumes that the topology composed of communication nodes is a fat tree, so that the communication efficiency is the highest. But the actual communication topology may be more complicated and is not a fat tree. Therefore, ring-based Collective communication is generally used.

1.3 MPI

MPI (Message Passing Interface) is a communication protocol that can support point-to-point and broadcast. There are many libraries that are specifically implemented. Popular ones include Open Mpi , Intel MPI, and so on.

MPI is a message-passing programming model. Message passing means that users must send and receive messages explicitly to achieve data exchange between processors. In this parallel programming, each control flow has its own independent address space, and different control flows cannot directly access each other's address space, and must be implemented through explicit message passing. This programming method is the main programming method adopted by massively parallel processing machines (MPP) and clusters. Because the message passing program design requires users to decompose the problem well, organize the data exchange between different control flows, and the granularity of parallel computing is large, which is especially suitable for large-scale scalable parallel algorithms.

MPI is a process-based parallel environment. The process has independent virtual address space and processor scheduling, and execution is independent of each other. MPI is designed to support cluster systems connected through the network, and to achieve communication through message passing, which is the most basic feature of MPI.

1.4 Open-MPI

OpenMPI is a high-performance messaging library. It was originally used as a fusion of technology and resources from several other projects (FT-MPI, LA-MPI, LAM/MPI, and PACX-MPI). It is one of the MPI-2 standards. Open source implementation, developed and maintained by some scientific research institutions and enterprises. Therefore, OpenMPI can obtain professional technology, industrial technology and resource support from the high-performance community to create the best MPI library. OpenMPI provides a lot of convenience to system and software vendors, program developers and researchers. It is easy to use and runs itself on a variety of operating systems, network interconnections, and batch/dispatch systems.

1.5 MPI usage issues

Because MPI is a distributed memory programming, the subsequent development involves the transfer of information between nodes. Often data and programs are on multiple nodes, so it is necessary to ensure the exchange of information between nodes when executing commands.

In specific use, there are two problems:

  • How did Open-MPI discover and establish a connection with multiple machines?
  • In the training process of multi-machine and multi-card, how to establish the transmission ring, which also determines the training efficiency, then how does Open-MPI do it?

Regarding the first question:

Setting SSH password-free login can eliminate password input during operation. After each node generates a private key and a public key, authentication is required, and at this time, the machine can log in without secret. The public key file of each child node is sent to the master node, and then added to the authentication file of the master node respectively. At this time, the master node can ensure the secret-free login of each child node. Finally, the authentication file is transmitted back to each child node, so as to ensure the secret-free login between each child node and other nodes.

When Open-MPI is started, you can specify

--hostfile
or
--host
To specify the IP or Hostname of the task to be run, Open-MPI will try to connect to the other party's machine through ssh key-free mode, and execute a series of commands, mainly for synchronizing environment variables, current path and issuing startup command.

Of course, users can also issue commands to remote machines in other ways. This can be done through environment variables

OMPI_MCA_plm_rsh_agent
Specify.

Regarding the second question:

When all the machines are connected and ready to start computing, in order to be able to communicate most efficiently, Open-MPI integrates a component- hwloc . This component is mainly for the construction of the single-machine hardware resource topology, and then the shortest path communication.

0x02 entry point

Many machine learning frameworks will use the following routines: shell script (optional), python side and C++ side.

  • Shell script is the entry point to start and run, responsible for parsing parameters, confirming and calling the training program;
  • Python is the user interface, introducing the C++ library, encapsulating the API, and being responsible for the interaction with the underlying C++ at runtime;
  • C++ implements the underlying training logic;

So let's take a look at the hordovodrun script first.

2.1 How to run

One of the official Hovorod running examples is as follows:

-NP horovodrun 2 -H localhost: . 4 --gloo Python/horovod/examples/tensorflow2/tensorflow2_mnist.py duplicated code

Here -np refers to the number of processes, localhost:4 means 4 GPUs on the localhost node.

Note that if the virtual machine has only one core. To forcibly achieve the effect of parallelism, you can use the -np parameter, which will automatically cut a core into multiple processors for you, and each distributed processing is a slot.

Therefore, we can start with the horovodrun command.

2.2 horovodrun

The entry file can be seen from setup.py, which is mapped to horovod.runner.launch:run_commandline.

entry_points={ 'console_scripts' : [ 'horovodrun = horovod.runner.launch:run_commandline' ] } Copy code

So let's take a look at run_commandline

2.3 run_commandline

The command is located at: horovod-master/horovod/runner/launch.py, we excerpt the important part.

def run_commandline (): args = parse_args() _run(args) Copy code

So enter the _run function. It can be seen that Horovod will choose different paths according to whether it is elastic training. In this series, we will first analyze the inelastic training _run_static.

def _run ( args ): # if hosts are not specified, either parse from hostfile, or default as # localhost if not args.hosts and not args.host_discovery_script: if args.hostfile: args.hosts = hosts.parse_host_files(args.hostfile) else : # Set hosts to localhost if not specified args.hosts = 'localhost:{np}' . format (np=args.np) # Convert nics into set args.nics = set (args.nics.split( ',' )) if args.nics else None if _is_elastic(args): return _run_elastic(args) else : return _run_static(args) # Let's look at the copy code here first

2.4 Inelastic training_run_static

Do the following operations in _run_static:

  • First parse various parameters to get settings;
  • Will call
    driver_service.get_common_interfaces
    Obtain the information of the network card and other hosts, and perform slot allocation based on this information. This part is very complicated, and we will explain the details in a special article (next article).
  • There is a question here: Why do you want to get the information about the relationship between host, slot, and rank? Due to engineering considerations, the role of rank is distinguished in the underlying C++ world: rank 0 is master and rank n is worker, so this information needs to be determined and passed to the C++ world;
  • It will decide which path to take according to whether the running function is passed in the parameter. Generally, there is no running parameter by default, so _launch_job will be executed to start the training job;

The specific code is as follows:

def _run_static ( args ): settings = hvd_settings.Settings(verbose = 2 if args.verbose else 0 , ssh_port=args.ssh_port, ssh_identity_file=args.ssh_identity_file, extra_mpi_args=args.mpi_args, tcp_flag=args.tcp_flag, binding_args=args.binding_args, key=secret.make_secret_key(), start_timeout=tmout, num_proc=args.np, hosts=args.hosts, output_filename=args.output_filename, run_func_mode=args.run_func is not None , nics=args.nics,...) # First parse various parameters and get settings fn_cache = None if not args.disable_cache: params = '' if args.np: params += str (args.np) + '' if args.hosts: params += str (args.hosts) + '' if args.ssh_port: params += str (args.ssh_port) if args.ssh_identity_file: params += args.ssh_identity_file parameters_hash = hashlib.md5(params.encode( 'utf-8' )).hexdigest() fn_cache = cache.Cache(CACHE_FOLDER, CACHE_STALENESS_THRESHOLD_MINUTES, parameters_hash) # Get the information of the network card and other hosts, and slot allocation will be made based on this information all_host_names, _ = hosts.parse_hosts_and_slots(args.hosts) remote_host_names = network.filter_local_addresses(all_host_names) nics = driver_service.get_common_interfaces(settings, all_host_names, remote_host_names, fn_cache) if args.run_func: # get the driver IPv4 address driver_ip = network.get_driver_ip(nics) run_func_server = KVStoreServer(verbose=settings.verbose) # Start the internal KV server run_func_server_port = run_func_server.start_server() put_data_into_kvstore(driver_ip, run_func_server_port, 'runfunc' , 'func' , args.run_func) # Store'func', args.run_func as KV command = [sys.executable, '-m' , 'horovod.runner.run_task' , str (driver_ip), str (run_func_server_port)] try : _launch_job(args, settings, nics, command) results = [ None ] * args.np for i in range (args.np): results[i] = read_data_from_kvstore(driver_ip, run_func_server_port, 'runfunc_result' , str (i)) return results finally : run_func_server.shutdown_server() else : command = args.command _launch_job (args, Settings, nics, the Command) # Here we focus on explaining return None copy the code

The current logic is as follows:

+-----------+ |horovodrun | +-----+-----+ | | v +--------+--------+ | run_commandline | +----+------+-----+ | | +---------+ +--------+ | | | | vv +-----+--------+ +----+--------+ | _run_elastic | | _run_static | | | | | +--------------+ +-------------+ Copy code

So far, we have analyzed and completed the entry of horovod, and the following will analyze how to start the job.

0x03 Run training job

3.1 _launch_job

_launch_job will make specific calls based on configuration or installation. We see that there are three possibilities: gloo, mpi, js.

The information of jsrun is hard to find, so we will focus on gloo and mpi.

def _launch_job ( args, settings, nics, command ): env = os.environ.copy() config_parser.set_env_from_args(env, args) def gloo_run_fn (): driver_ip = network.get_driver_ip(nics) gloo_run(settings, nics, env, driver_ip, command) def mpi_run_fn (): mpi_run(settings, nics, env, command) def js_run_fn (): js_run(settings, nics, env, command) run_controller(args.use_gloo, gloo_run_fn, args.use_mpi, mpi_run_fn, args.use_jsrun, js_run_fn, args.verbose) Copy code

3.2 run_controller

run_controller is still an intermediary function, which specifically imports gloo or mpi.

def run_controller ( use_gloo, gloo_run, use_mpi, mpi_run, use_jsrun, js_run, verbosity ): if use_gloo: gloo_run() elif use_mpi: mpi_run() elif use_jsrun: js_run() else : if mpi_built(verbose=verbose): if lsf.LSFUtils.using_lsf() and is_jsrun_installed(): js_run() else : mpi_run() elif gloo_built(verbose=verbose): gloo_run() Copy code

The current logic is as follows:

+-----------+ |horovodrun | +-----+-----+ | | v +--------+--------+ | run_commandline | +----+------+-----+ | | +---------+ +--------+ | | | | vv +-----+--------+ +----+--------+ | _run_elastic | | _run_static | | | | | +--------------+ +------+------+ | | v +------+------+ | _launch_job | | | +------+------+ | | v +---------+--------+ | run_controller | | | +----+----+-----+--+ | | | +-------------+ | +--------+ | | | | | | vvv +------+---+ +------+----+ +---+-----+ | gloo_run | | mpi_run | | js_run | | | | | | | +----------+ +-----------+ +---------+ Copy code

So we are divided into two branches below: gloo & mpi.

0x04 Gloo implementation

4.1 Introduction to Gloo

Gloo is a collection communication library similar to MPI produced by facebook ( github.com/facebookinc...

The main feature of the collective communication library is: in general, it will follow the interface regulations provided by MPI to implement related interfaces including point-to-point communication (SEND, RECV, etc.), collective communication (REDUCE, BROADCAST, ALLREDUCE, etc.), and then according to your own hardware or For the needs of the system, corresponding changes have been made to the bottom layer to ensure the stability and performance of the interface.

Gloo provides optimized implementation of collective communication programs for CPU and GPU. It is particularly suitable for GPU because it can perform communication without using GPUDirect to transfer data to the CPU's memory. It can also use NCCL to perform fast intra-node communication and implement its own inter-node routine calculations. You don't need to think about the copy of the memory data, you only need to implement the logic.

Gloo supports and optimizes collective communication. Since data can be exchanged directly between GPUs without going through the CPU and memory, it is faster to use the gloo backend on the GPU.

Why did Horovod choose Gloo? I personally think that in addition to the comprehensiveness and performance of its functions, it is a highlight that it can be re-developed based on it. For example, the Rendezvous function we said below is used by Horovod to achieve flexible training (we will have a special explanation later).

Both Gloo and MPI play the same similar role:

  • On the one hand, Horovod integrates Gloo-based AllReduce, which is similar to NCCL and is used as a gradient protocol;

  • On the other hand, Gloo can be used to start multiple processes (represented by Rank in Hovorod) to achieve parallel computing;

details as follows:

+-----------------------+ +-----------------------+ +------------------------+ | gloo_run slot 1 | | gloo_run slot 2 | | gloo_run slot 3 | | | | | | | | +-------------------+ | | +------------------+ | | +--- ---------------+ | | | python train.py | | | | python train.py | | | | python train.py | | +----+ +<------+ +<------+ +<------+ | | | | | | | | | | | | | | | | | +-------------------+ | | +------------------+ | | +-- ----------------+ | | | | | | | | | | | +-----------------------+ +----------------------- + +------------------------+ | | | | | | | v------------------------------------------------- -------------------------------------> Ring Allreduce on Gloo Copy code

4.2 Rendezvous function

4.2.1 Rendezvous concept

In Gloo's documentation, it says:

The rendezvous process needs to happen exactly once per Gloo context. It makes participating Gloo processes exchange details for setting up their communication channels. For example, when the TCP transport is used, processes exchange IP address and port number details of listening sockets. Rendezvous can be executed by accessing a key/value store that is accessible by all participating processes. Every process is responsible for setting a number of keys and will wait until their peers have set their keys. The values stored against these keys hold the information that is passed to the transport layer. Copy code

It roughly means:

Gloo has a rendezvous process in every Gloo context, and Gloo uses it to exchange the details needed for communication.

The specific implementation of Rendezvous can be accomplished by accessing a KVstore. The specific details are to interact through KVstore.

Take Horovod as an example:

  • When Horovod performs fault-tolerant AllReduce training, in addition to starting the worker process, it also starts a driver process. This driver process is used to help the worker call gloo to construct an AllReduce communication ring.
  • A RendezvousServer with KVStore will be created in the driver process, and the driver will store the ip and other information of the workers participating in the communication in the KVstore.
  • Then the worker can call gloo to access RendezvousServer to construct a communication ring.

4.2.2 RendezvousServer

The specific code is as follows, you can see that RendezvousHTTPServer is started (that is, HTTPServer is inherited and expanded):

class RendezvousServer : def __init__ ( self, verbose = 0 ): self._httpd = None self._listen_thread = None self._verbose = verbose # Rendezvous function finds a available port, create http socket, # and start listening loop to handle request # self.httpd.init needs to be called after server start def start ( self, handler_cls=RendezvousHandler ): # Now introduce self._httpd, port = find_port( lambda addr: RendezvousHTTPServer( addr, handler_cls, self._verbose)) # start the listening loop self._listen_thread = in_thread(target=self._httpd.serve_forever) return port def init ( self, host_alloc_plan ): self._httpd.init(host_alloc_plan) def stop ( self ): self._httpd.shutdown() self._listen_thread.join() Copy code

4.2.3 KVStore

KVStore is embodied by KVStoreHandler, RendezvousHandler inherits KVStoreHandler, and then is used by RendezvousServer as a handler.

The simplified version of KVStoreHandler code is as follows:

class KVStoreHandler ( SimpleHTTPRequestHandler ): # Override PUT handler def do_PUT ( self ): paths = self.path.split( '/' ) _, scope, key = paths # Get body length content_length = int (self.headers[ 'Content-Length' ]) value = self.rfile.read(content_length) self._put_value(scope, key, value) self.send_status_code(OK) def _put_value ( self, scope, key, value ): with self.server.cache_lock: scope_dict = self.server.cache.setdefault(scope, {}) scope_dict[key] = value Copy code

4.2.4 Low-level use

How to use Rendezvous? In brief:

  • The Python world has built a RendezvousServer whose address is configured in environment variables (or other methods).
  • In the C++ world, such as horovod/common/gloo/gloo_context.h, horovod/common/gloo/gloo_context.cc are used. That is, get the address and port of the RendezvousServer configured by Python, and then construct the context required by gloo.
# The DEFINE HOROVOD_HOSTNAME "HOROVOD_HOSTNAME" # the DEFINE HOROVOD_RANK "HOROVOD_RANK" # the DEFINE HOROVOD_SIZE "HOROVOD_SIZE" # the DEFINE HOROVOD_LOCAL_RANK "HOROVOD_LOCAL_RANK" # the DEFINE HOROVOD_LOCAL_SIZE "HOROVOD_LOCAL_SIZE" # the DEFINE HOROVOD_CROSS_RANK "HOROVOD_CROSS_RANK" # the DEFINE HOROVOD_CROSS_SIZE "HOROVOD_CROSS_SIZE" # the DEFINE HOROVOD_ELASTIC "HOROVOD_ELASTIC" ctx = Rendezvous (HOROVOD_GLOO_GLOBAL_PREFIX, rendezvous_addr_env, rendezvous_port, rank, size, dev, timeout); local_ctx = Rendezvous (HOROVOD_GLOO_LOCAL_PREFIX + hostname, rendezvous_addr_env, rendezvous_port, local_rank, local_size, dev, timeout); cross_ctx = Rendezvous (HOROVOD_GLOO_CROSS_PREFIX + std:: to_string (local_rank), rendezvous_addr_env, rendezvous_port, cross_rank, cross_size, dev, timeout); Copy code

The logic is as follows, the C++ world will obtain the IP and port of RendezvousServer from the python world:

+---------------------> System Env +------------------+ | addr, port, ... addr, port, ... | | + | | | | | | | | | | | | | | | | | Python | C++ | | | | | | | | | | | | v +---------+---------------+ | +------------+-------- + | RendezvousServer | | |GlooContext | | | | | | | | | | | | | | | | | RendezvousHandler | | | Rendezvous | | | | | | +-------------------------+ | +--------------------- + | + Copy code

4.3 Horovd's gloo entrance

gloo_run is the relevant entry of the gloo module in horovod.

The note is very clear: each thread will use the ssh command to start a training job on the remote host.

def gloo_run ( settings, nics, env, server_ip, command ): # Each thread will use ssh command to launch the job on each remote host. If an # error occurs in one thread, entire process will be terminated. Otherwise, # threads will keep running and ssh session. exec_command = _exec_command_fn(settings) launch_gloo(command, exec_command, settings, nics, env, server_ip) Copy code

Just use launch_gloo to run exec_command.

At this time, the command parameter is similar

"['python','train.py']"
.

4.4 Build an executable environment

The first part of gloo_run is

exec_command = _exec_command_fn(settings)
, Is based on various configurations to generate an executable command environment. If it is remote, you have to generate the relevant remote runnable command environment (including switching directories, remote execution, etc.).

4.4.1 _exec_command_fn

Specifically, it can be divided into two parts:

  • Use get_remote_command to generate the relevant remote runnable environment, such as adding in front of the training script
    'ssh -o PasswordAuthentication=no -o StrictHostKeyChecking=no'
  • Adjust input and output, and use safe_shell_exec.execute to achieve safe execution capabilities;

details as follows:

def _exec_command_fn ( settings ): """ executes the jobs defined by run command on hosts. :param hosts_alloc: list of dict indicating the allocating info. For example, [{'Hostname':'worker-0','Rank': 0,'Local_rank': 0,'Cross_rank':0, 'Size':2,'Local_size':1,'Cross_size':2}, {'Hostname':'worker-1','Rank': 1,'Local_rank': 0,'Cross_rank':1, 'Size':2,'Local_size':1,'Cross_size':2} ] :type hosts_alloc: list(dict) :param remote_host_names: names that are resolved to one of the addresses of remote hosts interfaces. :param _run_command: command to execute """ def _exec_command ( command, slot_info, events ): index = slot_info.rank host_name = slot_info.hostname host_address = network.resolve_host_address(host_name) local_addresses = network.get_local_host_addresses() # Need to build a remote command if host_address not in local_addresses: local_command = quote( 'cd {pwd}>/dev/null 2>&1; {command}' . format (pwd=os.getcwd(), command=command)) command = get_remote_command(local_command, host=host_name, port=settings.ssh_port, identity_file=settings.ssh_identity_file) # Redirect output if requested # Adjust input and output, use safe_shell_exec.execute to achieve safe execution capability stdout = stderr = None stdout_file = stderr_file = None if settings.output_filename: padded_rank = _pad_rank(index, settings.num_proc) output_dir_rank = os.path.join(settings.output_filename, 'rank.{rank}' . format (rank=padded_rank)) if not os.path.exists(output_dir_rank): os.mkdir(output_dir_rank) stdout_file = open (os.path.join(output_dir_rank, 'stdout' ), 'w' ) stderr_file = open (os.path.join(output_dir_rank, 'stderr' ), 'w' ) stdout = MultiFile([sys.stdout, stdout_file]) stderr = MultiFile([sys.stderr, stderr_file]) # Achieve safe execution capabilities exit_code = safe_shell_exec.execute(command, index=index, stdout=stdout, stderr=stderr, events=events,...) return exit_code, time.time() return _exec_commandCopy code

4.4.2 get_remote_command

This function is aimed at the remote host and gets the way to run on it. This function is relatively new, and it is also related to the kubeflow mpi operator. There will be a chance to analyze it later.

SSH_COMMAND_PREFIX = 'ssh -o PasswordAuthentication=no -o StrictHostKeyChecking=no' def get_ssh_command ( local_command, host, port = None , identity_file = None , timeout_s = None ): port_arg = f' -p (port) ' if port is not None else '' identity_file_arg = f'-i (identity_file) ' if identity_file is not None else '' timeout_arg = f'-o ConnectTimeout = (timeout_s) ' if timeout_s is not None else '' return f' {SSH_COMMAND_PREFIX} {host} {port_arg} {identity_file_arg} {timeout_arg} {local_command} ' def get_remote_command ( local_command, host, port = None , identity_file = None , timeout_s = None ): return f' {env_util.KUBEFLOW_MPI_EXEC} {host} {local_command} ' if env_util.is_kubeflow_mpi()/ port get_ssh_command(local_command, host, else , identity_file, timeout_s) copy the code

The general logic is as follows:

command: python train.py + | | v +---------+-------------+ | | | get_remote_command | | | +---------+-------------+ | | v ssh -o ... python train.py + | | | v +---------+--------------+ |safe_shell_exec.execute | | | +------------------------+ Copy code

4.5 Use gloo to execute commands

After obtaining the executable environment exec_command and the execution command command, you can use gloo to execute the command.

Each command is executed by exec_command.

launch_gloo to get commands, various configuration information, network card information (nics, such as {'lo'}), host information, etc., and then start running, that is, start running our training code, specifically:

  • Create RendezvousServer, which will be used by the underlying Gloo C++ environment;
  • host_alloc_plan = get_host_assignments to allocate slots according to the host, that is, which rank of horovod should run on which slot on which host;
  • get_run_command gets the executable command;
  • slot_info_to_command_fn to get the slot command executable on the slot;
  • Construct args_list based on slot_info_to_command_fn, in this list, each arg is a slot command;
  • Multi-threaded execution, execute each arg (slot command) on top of each exec_command;

code show as below:

def launch_gloo ( command, exec_command, settings, nics, env, server_ip ): """ Launches the given command multiple times using gloo. Each command is launched via exec_command. :param command: command to launch :param exec_command: means to execute a single command :param settings: settings for the distribution :param nics: common interfaces :param env: environment to use :param server_ip: ip to use for rendezvous server """ # Make the output directory if it does not exist if settings.output_filename: _mkdir_p(settings.output_filename) # start global rendezvous server and get port that it is listening on # Build RendezvousServer, this will be used by the underlying Gloo C++ environment rendezvous = RendezvousServer(settings.verbose) # allocate processes into slots # To according to the host, which is which rank of horovod should run on which slot on which host hosts = parse_hosts(settings.hosts) host_alloc_plan = get_host_assignments(hosts, settings.num_proc) # start global rendezvous server and get port that it is listening on global_rendezv_port = rendezvous.start() rendezvous.init(host_alloc_plan) # Get executable commands run_command = get_run_command(command, server_ip, nics, global_rendezv_port) # Get the slot command executable on the slot slot_info_to_command = _slot_info_to_command_fn(run_command, env) event = register_shutdown_event() # Construct args_list according to slot_info_to_command_fn, in this list, each arg is a slot command args_list = [[slot_info_to_command(slot_info), slot_info, [event]] for slot_info in host_alloc_plan] # If an error occurs in one thread, entire process will be terminated. # Otherwise, threads will keep running. # Multi-threaded execution, execute each arg (slot command) on top of each exec_command res = threads.execute_function_multithreaded(exec_command, args_list, block_until_all_done = True ) for name, value in sorted (res.items(), key = lambda item: item[ 1 ][ 1 ]): exit_code, timestamp = value Copy code

4.5.1 Slot allocation plan

As mentioned above, Horovod performs tasks on the slot, we need to see how the slots are allocated.

4.5.1.1 Parse from input parameters

As can be seen from the following code, slot is automatically parsed through parse_hosts.

def parse_hosts ( hosts_string ): """Parse a string of comma-separated hostname:slots mappings into a list of HostItem objects. :param hosts_string: list of addresses and number of processes on each host. For example: -'worker-0:2,worker-1:2' -'10.11.11.11:4,10.11.11.12:4' :return: a list of HostInfo objects describing host to slot mappings :rtype: list[HostInfo] "" " Return [HostInfo.from_string (host_string) for host_string in hosts_string.split ( ',' )] Copy the code

The specific HostInfo.from_string information is as follows:

class HostInfo : def __init__ ( self, hostname, slots ): self.hostname = hostname self.slots = slots @staticmethod DEF of from_string ( host_string ): hostname, host_string.strip slots = () Split (. ':' ) return HostInfo (hostname, int (slots)) copying the code
4.5.1.2 Allocation plan

get_host_assignments will assign processes in Horovod according to host and process capacities (slots), that is, give a corresponding relationship between horovod rank and slot. After setting several np, there are several slots.

The allocation scheme given is similar to the following, so that we know which rank corresponds to which slot on which host:

[ SlotInfo(hostname = 'h1' , rank = 0 , local_rank = 0 , cross_rank = 0 , size = 2 , local_size = 2 , coress_size = 1 ), SlotInfo(hostname = 'h2' , rank = 1 , local_rank = 0 , cross_rank = 0 , size = 2 , local_size = 2 , coress_size = 1 ), ] Copy code

code show as below:

def get_host_assignments ( hosts, min_np, max_np = None ): """Assign hosts with process capacities (slots) to ranks in the Horovod process. This function will try to allocate as many as possible processes on the same host to leverage local network. :param hosts: list of HostInfo objects describing host and slot capacity :type hosts: list[HostInfo] :param min_np: minimum number of processes to be allocated :param max_np: (optional) maximum number of processes to be allocated :return: a list of the allocation of process on hosts in a `SlotInfo` object. :rtype: list[SlotInfo] """ host_ranks = [] cross_ranks = collections.defaultdict( dict ) rank = 0 # Construct rank, local rank, cross rank based on hosts information (required by hierarchical allreduce) for host_info in hosts: ranks = [] for local_rank in range (host_info.slots): if rank == max_np: break ranks.append(rank) rank += 1 cross_ranks_at_local = cross_ranks[local_rank] cross_ranks_at_local[host_info.hostname] = len (cross_ranks_at_local) host_ranks.append((host_info, ranks)) world_size = rank # Give a corresponding relationship between horovod rank and slot. Return an alloc_list, each SlotInfo includes various rank information alloc_list = [] for host_info, ranks in host_ranks: local_size = len (ranks) for local_rank, rank in enumerate (ranks): cross_ranks_at_local = cross_ranks[local_rank] cross_rank = cross_ranks_at_local[host_info.hostname] cross_size = len (cross_ranks_at_local) alloc_list.append( SlotInfo( hostname=host_info.hostname, rank=rank, local_rank=local_rank, cross_rank=cross_rank, size=world_size, local_size=local_size, cross_size=cross_size)) return alloc_listCopy code

4.5.2 Get the run command

get_run_command is to get the Gloo variable from the environment variable, and then add it to the command. After this step is completed, you get a command similar to the following:

= HOROVOD_GLOO_RENDEZVOUS_ADDR 1.1 .1 .1 HOROVOD_GLOO_RENDEZVOUS_PORT = 2222 HOROVOD_CPU_OPERATIONS GLOO HOROVOD_GLOO_IFACE = = = LO HOROVOD_CONTROLLER GLOO Python train.py duplicated code

This format can be abbreviated as: {horovod_gloo_env} command.

The code is:

def create_run_env_vars ( server_ip, nics, port, elastic = False ): # Get Gloo variables from environment variables run_envs = { 'HOROVOD_GLOO_RENDEZVOUS_ADDR' : server_ip, 'HOROVOD_GLOO_RENDEZVOUS_PORT' : Port, 'HOROVOD_CONTROLLER' : "GLOO" , 'HOROVOD_CPU_OPERATIONS' : "GLOO" , 'HOROVOD_GLOO_IFACE' : List (nics) [ 0 ], # the TODO: the Add Multiple ifaces in Future 'NCCL_SOCKET_IFNAME ' : ',' .join(nics), } if elastic: run_envs[ "HOROVOD_ELASTIC" ] = "1" return run_envs def get_run_command ( command, server_ip, nics, port, elastic = False ): env_vars = create_run_env_vars(server_ip, nics, port, elastic) env_string = "" .join( [ f" {k} = { str (v)} " for k, v in env_vars.items()]) run_command = ( '{env_string} ' '{command}' # expect a lot of environment variables . format (env_string=env_string, = Command '' .join (quote (PAR) for PAR in Command))) return run_command duplicated code

4.5.3 Get the slot run command

After getting the running command, we will combine horovod env and env, as well as slot allocation, to further modify it to a suitable way for gloo operation. It is a command that can be run on each specific slot.

This format can be abbreviated as: {horovod_gloo_env} {horovod_rendez_env} {env} run_command.

After this step is completed, you get something like the following:

HOROVOD_HOSTNAME = 1.1 .1 .1 HOROVOD_RANK = 1 HOROVOD_SIZE = 2 HOROVOD_LOCAL_RANK = 1 SHELL=/bin/bash PATH=XXXX USER=xxx PWD=xxx SSH_CONNECTION= "1.1.1.1 11 2.2.2.2 22" HOME=xxx SSH_CLIENZT=xxxx HOROVOD_GLOO_IFACE=lo NCCL_SOCKET_IFNAME=lo = HOROVOD_GLOO_RENDEZVOUS_ADDR 1.1 .1 .1 HOROVOD_GLOO_RENDEZVOUS_PORT = 2222 HOROVOD_CPU_OPERATIONS GLOO HOROVOD_GLOO_IFACE = = = LO HOROVOD_CONTROLLER GLOO Python train.py duplicated code

The specific code is as follows:

def _slot_info_to_command_fn ( run_command, env ): # TODO: Workaround for over-buffered outputs. Investigate how mpirun avoids this problem. env = copy.copy(env) # copy env so we do not leak env modifications env[ 'PYTHONUNBUFFERED' ] = '1' def slot_info_to_command ( slot_info ): """ Given a slot_info, creates a command used by gloo to launch a single job. :param slot_info: host and slot to execute the run command on :return: """ env_vars = create_slot_env_vars(slot_info) horovod_rendez_env = "" .join( [ f" {k} = { str (v)} " for k, v in env_vars.items()]) return '{horovod_env} {env} {run_command}' . format ( horovod_env=horovod_rendez_env, env = '' .join([ '%s=%s' % (key, quote(value)) for key, value in env.items() if env_util.is_exportable(key)]), run_command=run_command) return slot_info_to_commandCopy code

4.5.4 Multi-threaded call command

This is the start of multiple threads to call. The comment of gloo_run is very clear: when calling execute_function_multithreaded, each thread will use the ssh command to start a training job on the remote host.

Recall that we mentioned in the "Building Executable Environment" before: use get_remote_command to generate the relevant remote runnable environment, such as adding in front of the training script

'ssh -o PasswordAuthentication=no -o StrictHostKeyChecking=no'
. Everyone understands how to execute it remotely.

In the local operation , the command roughly as follows:

cd/code directory>/dev/null 2 >& 1 HOROVOD_HOSTNAME = 1.1 .1 .1 HOROVOD_RANK = 1 HOROVOD_SIZE = 2 HOROVOD_LOCAL_RANK = 1 SHELL=/bin/bash PATH=XXXX USER=xxx PWD=xxx SSH_CONNECTION= "1.1.1.1 11 2.2.2.2 22" HOME=xxx SSH_CLIENZT=xxxx HOROVOD_GLOO_IFACE=lo NCCL_SOCKET_IFNAME=lo = HOROVOD_GLOO_RENDEZVOUS_ADDR 1.1 .1 .1 HOROVOD_GLOO_RENDEZVOUS_PORT = 2222 HOROVOD_CPU_OPERATIONS GLOO HOROVOD_GLOO_IFACE = = = LO HOROVOD_CONTROLLER GLOO Python train.py duplicated code

To run remotely , the command needs to add ssh information, roughly as follows:

ssh -o PasswordAuthentication=no -o StrictHostKeyChecking=no 1.1 .1 .1 cd/code directory>/dev/null 2 >& 1 HOROVOD_HOSTNAME = 1.1 .1 .1 HOROVOD_RANK = 1 HOROVOD_SIZE = 2 HOROVOD_LOCAL_RANK = 1 SHELL=/ bin/bash PATH=XXXX USER=xxx PWD=xxx SSH_CONNECTION= "1.1.1.1 11 2.2.2.2 22" HOME=xxx SSH_CLIENZT=xxxx HOROVOD_GLOO_IFACE=lo NCCL_SOCKET_IFNAME=lo = HOROVOD_GLOO_RENDEZVOUS_ADDR 1.1 .1 .1 HOROVOD_GLOO_RENDEZVOUS_PORT = 2222 HOROVOD_CPU_OPERATIONS GLOO HOROVOD_GLOO_IFACE = = = LO HOROVOD_CONTROLLER GLOO Python train.py duplicated code

The specific code of execute_function_multithreaded is as follows, where:

  • fn
    Is the program operating environment (capability) mentioned earlier
    exec_command
    .
  • fn(*arg[:-1])
    Just in
    exec_command
    Running in
    slot_info_to_command
    .
def execute_function_multithreaded ( fn, args_list, block_until_all_done = True , max_concurrent_executions= 1000 ): """ Executes fn in multiple threads each with one set of the args in the args_list. :param fn: function to be executed :type fn: :param args_list: :type args_list: list(list) :param block_until_all_done: if is True, function will block until all the threads are done and will return the results of each thread's execution. :type block_until_all_done: bool :param max_concurrent_executions: :type max_concurrent_executions: int :return: If block_until_all_done is False, returns None. If block_until_all_done is True, function returns the dict of results. { index: execution result of fn with args_list[index] } :rtype: dict """ result_queue = queue.Queue() worker_queue = queue.Queue() for i, arg in enumerate (args_list): arg.append(i) worker_queue.put(arg) def fn_execute (): while True : try : arg = worker_queue.get(block= False ) except queue.Empty: return exec_index = arg[- 1 ] # fn is the program operating environment (capacity) mentioned earlier exec_command # fn(*arg[:-1]) is in Run slot_info_to_command in exec_command res = fn(*arg[:- 1 ]) result_queue.put((exec_index, res)) threads = [] number_of_threads = min (max_concurrent_executions, len (args_list)) # Execute fn_execute for _ in range (number_of_threads) in multiple threads : thread = in_thread(target=fn_execute, daemon = not block_until_all_done) threads.append(thread) # Returns the results only if block_until_all_done is set. # If set, the block waits for results = None if block_until_all_done: # Because join() cannot be interrupted by signal, a single join() # needs to be separated into join()s with timeout in a while loop. have_alive_child = True while have_alive_child: have_alive_child = False for t in threads: t.join( 0.1 ) if t.is_alive(): have_alive_child = True results = {} while not result_queue.empty(): item = result_queue.get() results[item[ 0 ]] = item[ 1 ] return resultsCopy code

python train.py will enter our training code.

The general logic is as shown in the figure below. It can be seen that after combining various information, a result that can be executed is constructed, and then executed by multiple hosts:

  • On the left of the figure, the host and other information are obtained from the parameters, and then the slot information is parsed;
  • On the right of the figure, the command to be run from python train.py is generated based on various configurations to generate an executable command environment. If it is remote, you have to generate the relevant remote runnable command environment (including switching directories, remote execution, etc.);
  • In the middle of the figure, is the command to be run from python train.py, after adding env information and gloo information. Then, after combining the slot information on the left and the executable command environment on the right, a command that can be run on multiple threads and thus run in multiple slots is obtained.
args: '10.11.11.11:4,10.11.11.12:4' python train.py command: python train.py + + + | | | | | | vvv +----------+--------+ +----------+----------+ +----- ----+-------------+ | parse_hosts | | get_run_command | | | +----------+--------+ | | | get_remote_command | | +----------+----------+ | | | | +---------+-------------+ v | | +------------+-----------+ v | | get_host_assignments | v | | gloo python train.py +------------+-----------+ + ssh -o ... python train.py | | + | | | v | | | | SlotInfo(hostname = 'h2' , rank = 1 ) vv + +-----------+---------------+ +---------+--------- -----+ | | _slot_info_to_command_fn | |safe_shell_exec.execute | +-----------------------> | | | | +-----------+---------------+ +---------+---------- ----+ | | | | v | | HOROVOD_CONTROLLER=gloo python train.py | + | | | | | v | +-------------+-------------------+ | | | | | execute_function_multithreaded | <---------------+ | | +---------------------------------+ Copy code

The phones are as follows:

4.6 C++ example

We give a low-level code, and everyone will learn more about what Gloo can play.

This is Horovod, rank 0 finally sends the constructed Tensor to other ranks.

void GlooController::SendFinalTensors (ResponseList& response_list) { //Notify all nodes which tensors we'd like to reduce at this step. std::string encoded_response; ResponseList:: SerializeToString (response_list, encoded_response); //Boardcast the response length int encoded_response_length = ( int )encoded_response. length () + 1 ; { gloo::BroadcastOptions opts (gloo_context_.ctx) ; opts. setOutput (&encoded_response_length, 1 ); opts. setRoot (RANK_ZERO); gloo:: broadcast (opts); //broadcast to other ranks } //Boardcast the response { gloo::BroadcastOptions opts (gloo_context_.ctx) ; opts. setOutput (( uint8_t *)(encoded_response. c_str ()), encoded_response_length); opts. setRoot (RANK_ZERO); gloo:: broadcast (opts); //broadcast to other ranks } } Copy code

0x05 Mpi implementation

5.1 openmpi library

Horovod mainly relies on openmpi here.

  • MPI: The full English name is Message Passing Interface. MPI is a cross-language communication protocol used to write parallel computers. Support point-to-point and broadcast. MPI is an information transfer application program interface, including protocol and semantic description, they specify how to play its characteristics in various implementations. The goals of MPI are high performance, large scale, and portability.
  • openMPI: The full English name is open Message Passing Interface. openMPI is an implementation of MPI, a library project.

MPI has a special role in Hovorod:

  • On the one hand, Horovod integrates MPI-based AllReduce, which is similar to NCCL and is used as a gradient protocol;

  • On the other hand, MPI can be used to start multiple processes (represented by Rank in Hovorod) on all machines to achieve parallel computing;

5.2 mpi_run function

This part of the code is located at: horovod/runner/mpi_run.py.

First extract the key code as follows, you can see that its core is to run the mpirun command.

# I am the key code in the following block of code! mpirun_command = ( 'basic_args the mpirun {}' '-NP num_proc {} {} {ppn_arg hosts_arg}' '{} binding_args' ' {} mpi_args' '{} mpi_ssh_args' ' {} tcp_intf_arg ' ' {} nccl_socket_intf_arg ' ' {} output_filename_arg ' ' {env} {extra_mpi_args} {command}' . format (basic_args=basic_args, num_proc=settings.num_proc, ppn_arg=ppn_arg, hosts_arg=hosts_arg, binding_args=binding_args, mpi_args = '' .join(mpi_impl_flags), tcp_intf_arg=tcp_intf_arg, nccl_socket_intf_arg=nccl_socket_intf_arg, mpi_ssh_args=mpi_ssh_args, output_filename_arg = '' .join(output), env=env_list, extra_mpi_args=settings.extra_mpi_args if settings.extra_mpi_args else '' , command = '' .join(quote(par) for par in command)) ) # Execute the mpirun command. if settings.run_func_mode: exit_code = safe_shell_exec.execute(mpirun_command, env=env, stdout=stdout, stderr=stderr) else : os.execve ( '/bin/SH' , [ '/bin/SH' , '-C' , mpirun_command], the env) copy the code

All the parameters of the mpirun command are constructed based on various configurations and parameters, such as ssh parameters, mpi parameters, nccl parameters, and so on.

An example of the resulting mpirun command is as follows:

mpirun --allow-run- as -root --np 2 -bind-to none- map -by slot/ -x NCCL_DEBUG=INFO -x LD_LIBRARY_PATH -x PATH/ -mca pml ob1 -mca btl ^openib/ python train.py Copy code

The specific code is as follows, specifically:

# The above code is a fragment of mine def mpi_run ( settings, nics, env, command, stdout = None , stderr = None ): """ Runs mpi_run. Args: settings: Settings for running MPI. Note: settings.num_proc and settings.hosts must not be None. nics: Interfaces to include by MPI. env: Environment dictionary to use for running command. command: Command and arguments to run as a list of string. stdout: Stdout of the mpi process. Only used when settings.run_func_mode is True. stderr: Stderr of the mpi process. Only used when settings.run_func_mode is True. """ # Get various configurations mpi_impl_flags, impl_binding_args, mpi = _get_mpi_implementation_flags(settings.tcp_flag, env=env) impi = _IMPI_IMPL == mpi # Processing ssh parameters ssh_args = [] if settings.ssh_port: ssh_args += [f' -p {settings.ssh_port} ' ] if settings.ssh_identity_file: ssh_args += [f' -i {settings.ssh_identity_file} ' ] mpi_ssh_args = '' if ssh_args: joined_ssh_args = '' .join(ssh_args) mpi_ssh_args = f' -bootstrap=ssh -bootstrap-exec-args/" {joined_ssh_args}/"' if impi else f'-mca plm_rsh_args/" {joined_ssh_args}/"' # Process network configuration, network card information, etc. tcp_intf_arg = '-mca btl_tcp_if_include {nics}' . Format ( nics = ',' .join(nics)) if nics and not impi else '' nccl_socket_intf_arg = '-{opt} NCCL_SOCKET_IFNAME={nics}' . format ( = opt 'genv' IF IMPI the else 'X' , nics = ',' .join(nics)) if nics else '' # host # On large cluster runs (eg Summit), we need extra settings to work around OpenMPI issues host_names, host_to_slots = hosts.parse_hosts_and_slots(settings.hosts) if not impi and host_names and len (host_names) >= _LARGE_CLUSTER_THRESHOLD: mpi_impl_flags.append( '-mca plm_rsh_no_tree_spawn true' ) mpi_impl_flags.append( '-mca plm_rsh_num_concurrent {)' . format ( len (host_names))) # IF User does Not the Specify the any the hosts, the mpirun by default uses local Host. # There IS NO need to the Specify localhost. Hosts_arg = '- {opt} {the hosts}' . The format (opt = 'the hosts' IF IMPI the else 'H' , hosts = ',' .join(host_names) if host_names and impi else settings.hosts) # Process ppn configuration ppn_arg = '' if host_to_slots and impi: ppn = host_to_slots[host_names[ 0 ]] for h_name in host_names[ 1 :]: ppn_arg = '-ppn {}' . format (ppn) # Processing timeout configuration if settings.prefix_output_with_timestamp and not impi: mpi_impl_flags.append( '--timestamp-output' ) binding_args = settings.binding_args if settings.binding_args and not impi else '' .join(impl_binding_args) # Configuration needs to be run as root basic_args = '-l' if impi else '--allow-run-as-root --tag-output' output = [] if settings.output_filename: output.append( '-outfile-pattern' if impi else '--output-filename' ) output.append(settings.output_filename) # Build a list of environmental information env_list = '' if impi else '' .join( '-x %s' % key for key in sorted (env.keys()) if env_util.is_exportable(key)) # Build final MPI command # Pass all the env variables to the mpirun command. mpirun_command = ( 'basic_args the mpirun {}' '-NP num_proc {} {} {ppn_arg hosts_arg}' '{} binding_args' ' {} mpi_args' '{} mpi_ssh_args' ' {} tcp_intf_arg ' ' {} nccl_socket_intf_arg ' ' {} output_filename_arg ' ' {env} {extra_mpi_args} {command}' # expect a lot of environment variables . format (basic_args=basic_args, num_proc=settings.num_proc, ppn_arg=ppn_arg, hosts_arg=hosts_arg, binding_args=binding_args, mpi_args = '' .join(mpi_impl_flags), tcp_intf_arg=tcp_intf_arg, nccl_socket_intf_arg=nccl_socket_intf_arg, mpi_ssh_args=mpi_ssh_args, output_filename_arg = '' .join(output), env=env_list, extra_mpi_args=settings.extra_mpi_args if settings.extra_mpi_args else '' , command = '' .join(quote(par) for par in command)) ) # we need the driver's PATH and PYTHONPATH in env to run mpirun, # env for mpirun is different to env encoded in mpirun_command for var in [ 'PATH' , 'PYTHONPATH' ]: if var not in env and var in os.environ: # copy env so we do not leak env modifications env = copy.copy(env) # copy var over from os.environ env[var] = os.environ[var] # Execute the mpirun command. if settings.run_func_mode: exit_code = safe_shell_exec.execute(mpirun_command, env=env, stdout=stdout, stderr=stderr) else : os.execve ( '/bin/SH' , [ '/bin/SH' , '-C' , mpirun_command], the env) copy the code

5.3 mpirun command

Because mpi_run uses the mpirun command to run, so let's introduce it.

mpirun is the startup script of the MPI program. It simplifies the startup process of parallel processes and shields the underlying implementation details as much as possible, thus providing users with a general MPI parallel mechanism.

When executing a parallel program with the mpirun command, the parameter -np specifies the number of processes that need to be run in parallel. mpirun first starts a process on the local node, and then starts a process for each host according to the hosts listed in the/usr/local/share/machines.LINUX file. If the number of processes is more than the number of parallel nodes available, the excess processes will be performed again in accordance with the above rules. After the process is allocated according to this mechanism, each node is generally assigned a fixed label, which is similar to an ID card, which will be used in message transmission later.

What needs to be explained here is that the actual running

orterun (Open MPI SPMD/MPMD launcher; mpirun/mpiexec is just its symbolic link)

Examples of commands are as follows:

mpirun -np 4/ -bind-to none- map -by slot/ -x NCCL_DEBUG=INFO -x LD_LIBRARY_PATH -x PATH/ -mca pml ob1 -mca btl ^openib/ python train.py Copy code

0x06 summary

Comparing the implementation of gloo and mpi, we can still see the difference.

6.1 gloo

gloo is just a library and requires horovod to complete the command distribution function.

gloo needs horovod to implement local operation and remote operation by itself, that is, get_remote_command function implementation

'ssh -o PasswordAuthentication=no -o StrictHostKeyChecking=no'
.

gloo needs to implement RendezvousServer, and the bottom layer will use RendezvousServer for communication.

6.2 mpi

mpi is much more powerful. As long as the command is configured to be packaged by mpirun, openmpi can complete the command distribution and execution by itself. After all, horovod is an mpirun program, even if tensor flow is running, it is also an mpi program that can interact with each other.

0xEE personal information

Thinking about life and technology

WeChat public account: Rossi s thinking

If you want to get the news push of personally written articles in time, or want to see the technical materials recommended by individuals, stay tuned.

0xFF reference

Horovod elastic training

MPI, OpenMPI and deep learning

Two hours introductory MPI and parallel computing (1): Preliminary knowledge of parallel computing

MPI cluster environment construction

Horovod-distributed deep learning framework based on TensorFlow

Horovod source code analysis

How to understand Nvidia's Multi-GPU multi-card communication framework NCCL?