May 27, 2015 - 3 min read

Run jobs in parallel with python

How do you manage efficiently multiples jobs with modern computer cores ?

This is a real problem that had me stuck for a while.

I am working on a framework to analyze next generation sequencing data, consequently I have numerous files to manages, differents types of data that have to undergo a specific analysis. On the other hand, I have a few cores (24), some RAM (32Go) and decent hard drives. Let’s combine everything together so that many files can be analyzed on multiple cores at the same time. Of note, I am mostly working with bacterial genomes, so my computing times in this respect might seem light with compared to human genome.. A good example (not too simple, not too complex ?) would be, how do I convert 10 sam files to 10 bam files IN PARALLEL ?

To split jobs across your CPU, most of what you will read about will describe the nice python multiprocessing library. Indeed, many elements that you will find across the web are true but won’t be stable whatever you want to share across your CPUs. A game changer has been the followin blog post and I won’t thank the author enough for communicating on this (yes read it first) :

http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization

As far as I understood it, to start, for example, a file conversion operation on multiple files, the method you provide is first serialized and then one serialized object is sent per cpu to do its work. The problem with python is that this serialization is achieved by Pickle and in certain circonstances it does a poor job (see link above) and led me to countless script crashes before I could start to understand what was happening.

Fortunately, you can install a module called pathos which contains a submodule able to serialize in a much more stable way (dill). See the code below to have an idea on how it can be used.

Remarks, questions, smart comments are welcomed below !

# -*- coding: utf-8 -*-

import os
import subprocess
import glob

from multiprocessing import cpu_count
import pathos.multiprocessing as mp

def shellCommand(command):
    '''
    subprocess method to start shell commands
    '''
    p = subprocess.Popen(command, shell=True, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
    stdout, stderr = p.communicate()
    return(stdout, stderr, p.returncode)

def sortIndexSamToBam(samfile):
    '''
    method to convert sam files to sorted and indexed bam files
    requires to have samtools installed
    '''
    # define root filename without the extension
    sortedbamfile = os.path.splitext(str(samfile))[0]
    # convert the sam file to sorted bam, see davetang.org if questions
    shellCommand('samtools view -bhS %s | samtools sort - %s' % (samfile, sortedbamfile))
    # index the bam file
    shellCommand(samtools index %s %s 2> /dev/null' % (sortedbamfile + '.bam', sortedbamfile + '.bai'))
    # delete the sam file as it is much larger than the binary bam
    os.remove(samfile)
    return print("Conversion achieved for : %s" % samfile)

def spawner(method, data):
    '''
    Dispatch jobs on all your CPUs - 1 
    method: method you want to be applied on multiple elements
    data: list of objects, every element of the list is one file, one set of parameters required by the method
    '''
    maxjobs = cpu_count() - 1
    p = mp.Pool(maxjobs)
    p.map(method, data)

if __name__ == '__main__':
    # List all the sam files of interest.
    my_sam_files = glob.glob("/my/path/*.sam")
    # Run file conversion on the list of files
    spawner(method=sortIndexSamToBam, data=my_sam_files)

Some might suggest to look at libraries such as ruffus, but I rather found it confusing and lacking flexibility for my needs, so I’d rather have a tool that I coded and understood. Maybe I did not tried hard enough to use it..

Copyright 2023 - Mikael Koutero. All rights reserved.

Privacy Statement