Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save tranductam2802/72d96b91eba2fa5aadc2782c0524f14a to your computer and use it in GitHub Desktop.
Save tranductam2802/72d96b91eba2fa5aadc2782c0524f14a to your computer and use it in GitHub Desktop.
Best question about Python on Stackoverflow.

Catch a thread's exception in the caller thread in Python

The problem is that thread_obj.start() returns immediately. The child thread that you spawned executes in its own context, with its own stack. Any exception that occurs there is in the context of the child thread, and it is in its own stack. One way I can think of right now to communicate this information to the parent thread is by using some sort of message passing, so you might look into that.

Try this on for size:

import sys
import threading
import Queue


class ExcThread(threading.Thread):

    def __init__(self, bucket):
        threading.Thread.__init__(self)
        self.bucket = bucket

    def run(self):
        try:
            raise Exception('An error occured here.')
        except Exception:
            self.bucket.put(sys.exc_info())


def main():
    bucket = Queue.Queue()
    thread_obj = ExcThread(bucket)
    thread_obj.start()

    while True:
        try:
            exc = bucket.get(block=False)
        except Queue.Empty:
            pass
        else:
            exc_type, exc_obj, exc_trace = exc
            # deal with the exception
            print exc_type, exc_obj
            print exc_trace

        thread_obj.join(0.1)
        if thread_obj.isAlive():
            continue
        else:
            break


if __name__ == '__main__':
    main()

Daemon Threads Explanation

In the Python documentation it says:

A thread can be flagged as a "daemon thread". The significance of this flag is that the entire Python program exits when only daemon threads are left. The initial value is inherited from the creating thread.

Does anyone have a clearer explanation of what that means or a practical example showing where you would set threads as daemonic? Clarify it for me: so the only situation you wouldn't set threads as daemonic, is when you want them to continue running after the main thread exits?


Some threads do background tasks, like sending keepalive packets, or performing periodic garbage collection, or whatever. These are only useful when the main program is running, and it's okay to kill them off once the other, non-daemon, threads have exited.

Without daemon threads, you'd have to keep track of them, and tell them to exit, before your program can completely quit. By setting them as daemon threads, you can let them run and forget about them, and when your program quits, any daemon threads are killed automatically.

How to get the return value from a thread in python?

FWIW, the multiprocessing module has a nice interface for this using the Pool class. And if you want to stick with threads rather than processes, you can just use the multiprocessing.pool.ThreadPool class as a drop-in replacement.

def foo(bar, baz):
  print 'hello {0}'.format(bar)
  return 'foo' + baz

from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)

async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo

# do some other stuff in the main process

return_val = async_result.get()  # get the return value from your function.

Is there any way to kill a Thread?

It is generally a bad pattern to kill a thread abruptly, in Python and in any language. Think of the following cases:

  • The thread is holding a critical resource that must be closed properly
  • The thread has created several other threads that must be killed as well.

The nice way of handling this if you can afford it (if you are managing your own threads) is to have an exit_request flag that each threads checks on regular interval to see if it is time for it to exit.

For example:

import threading

class StoppableThread(threading.Thread):
    """Thread class with a stop() method. The thread itself has to check
    regularly for the stopped() condition."""

    def __init__(self):
        super(StoppableThread, self).__init__()
        self._stop_event = threading.Event()

    def stop(self):
        self._stop_event.set()

    def stopped(self):
        return self._stop_event.is_set()

In this code, you should call stop() on the thread when you want it to exit, and wait for the thread to exit properly using join(). The thread should check the stop flag at regular intervals. There are cases however when you really need to kill a thread. An example is when you are wrapping an external library that is busy for long calls and you want to interrupt it. The following code allows (with some restrictions) to raise an Exception in a Python thread:

def _async_raise(tid, exctype):
    '''Raises an exception in the threads with id tid'''
    if not inspect.isclass(exctype):
        raise TypeError("Only types can be raised (not instances)")
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid),
                                                     ctypes.py_object(exctype))
    if res == 0:
        raise ValueError("invalid thread id")
    elif res != 1:
        # "if it returns a number greater than one, you're in trouble,
        # and you should call it again with exc=NULL to revert the effect"
        ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), None)
        raise SystemError("PyThreadState_SetAsyncExc failed")

class ThreadWithExc(threading.Thread):
    '''A thread class that supports raising exception in the thread from
       another thread.
    '''
    def _get_my_tid(self):
        """determines this (self's) thread id

        CAREFUL : this function is executed in the context of the caller
        thread, to get the identity of the thread represented by this
        instance.
        """
        if not self.isAlive():
            raise threading.ThreadError("the thread is not active")

        # do we have it cached?
        if hasattr(self, "_thread_id"):
            return self._thread_id

        # no, look for it in the _active dict
        for tid, tobj in threading._active.items():
            if tobj is self:
                self._thread_id = tid
                return tid

        # TODO: in python 2.6, there's a simpler way to do : self.ident

        raise AssertionError("could not determine the thread's id")

    def raiseExc(self, exctype):
        """Raises the given exception type in the context of this thread.

        If the thread is busy in a system call (time.sleep(),
        socket.accept(), ...), the exception is simply ignored.

        If you are sure that your exception should terminate the thread,
        one way to ensure that it works is:

            t = ThreadWithExc( ... )
            ...
            t.raiseExc( SomeException )
            while t.isAlive():
                time.sleep( 0.1 )
                t.raiseExc( SomeException )

        If the exception is to be caught by the thread, you need a way to
        check that your thread has caught it.

        CAREFUL : this function is executed in the context of the
        caller thread, to raise an excpetion in the context of the
        thread represented by this instance.
        """
        _async_raise( self._get_my_tid(), exctype )

(Based on Killable Threads by Tomer Filiba. The quote about the return value of PyThreadState_SetAsyncExc appears to be from an old version of Python.) As noted in the documentation, this is not a magic bullet because if the thread is busy outside the Python interpreter, it will not catch the interruption. A good usage pattern of this code is to have the thread catch a specific exception and perform the cleanup. That way, you can interrupt a task and still have proper cleanup.

multiprocessing.Pool: When to use apply, apply_async or map?

Back in the old days of Python, to call a function with arbitrary arguments, you would use apply():

apply(function, *args, **kwargs)

apply() still exists in Python2.7 though not in Python3, and is generally not used anymore. Nowadays,

function(*args, **kwargs)

is preferred. The multiprocessing.Pool modules tries to provide a similar interface.

Pool.apply() is like Python apply(), except that the function call is performed in a separate process. Pool.apply() blocks until the function is completed. Pool.apply_async() is also like Python's built-in apply(), except that the call returns immediately instead of waiting for the result. An AsyncResult object is returned. You call its get() method to retrieve the result of the function call. The get() method blocks until the function is completed. Thus, pool.apply(function, *args, **kwargs) is equivalent to pool.apply_async(function, *args, **kwargs).get(). In contrast to Pool.apply(), the Pool.apply_async() method also has a callback which, if supplied, is called when the function is complete. This can be used instead of calling get().

For example:

import multiprocessing as mp
import time

def foo_pool(x):
    time.sleep(2)
    return x*x

result_list = []
def log_result(result):
    # This is called whenever foo_pool(i) returns a result.
    # result_list is modified only by the main process, not the pool workers.
    result_list.append(result)

def apply_async_with_callback():
    pool = mp.Pool()
    for i in range(10):
        pool.apply_async(foo_pool, args = (i, ), callback = log_result)
    pool.close()
    pool.join()
    print(result_list)

if __name__ == '__main__':
    apply_async_with_callback()

may yield a result such as

[1, 0, 4, 9, 25, 16, 49, 36, 81, 64]

Notice, unlike pool.map(), the order of the results may not correspond to the order in which the pool.apply_async() calls were made.


So, if you need to run a function in a separate process, but want the current process to block until that function returns, use Pool.apply. Like Pool.apply, Pool.map blocks until the complete result is returned.

If you want the Pool of worker processes to perform many function calls asynchronously, use Pool.apply_async. The order of the results is not guaranteed to be the same as the order of the calls to Pool.apply_async.

Notice also that you could call a number of different functions with Pool.apply_async (not all calls need to use the same function).

In contrast, Pool.map applies the same function to many arguments. However, unlike Pool.apply_async, the results are returned in an order corresponding to the order of the arguments.

Multiprocessing vs Threading Python

The threading module uses threads, the multiprocessing module uses processes. The difference is that threads run in the same memory space, while processes have separate memory. This makes it a bit harder to share objects between processes with multiprocessing. Since threads use the same memory, precautions have to be taken or two threads will write to the same memory at the same time. This is what the global interpreter lock is for.

Spawning processes is a bit slower than spawning threads. Once they are running, there is not much difference.

Multiprocessing

Pros

  • Separate memory space
  • Code is usually straightforward
  • Takes advantage of multiple CPUs & cores
  • Avoids GIL limitations for cPython
  • Eliminates most needs for synchronization primitives unless if you use shared memory (instead, it's more of a communication model for IPC)
  • Child processes are interruptible/killable
  • Python multiprocessing module includes useful abstractions with an interface much like threading.Thread
  • A must with cPython for CPU-bound processing

Cons

  • IPC a little more complicated with more overhead (communication model vs. shared memory/objects)
  • Larger memory footprint

Threading

Pros

  • Lightweight - low memory footprint
  • Shared memory - makes access to state from another context easier
  • Allows you to easily make responsive UIs
  • cPython C extension modules that properly release the GIL will run in parallel
  • Great option for I/O-bound applications

Cons

  • cPython - subject to the GIL
  • Not interruptible/killable
  • If not following a command queue/message pump model (using the Queue module), then manual use of synchronization primitives become a necessity (decisions are needed for the granularity of locking)
  • Code is usually harder to understand and to get right - the potential for race conditions increases dramatically

Run certain code every n seconds [duplicate]

import threading

def printit():
  threading.Timer(5.0, printit).start()
  print "Hello, World!"

printit()

# continue with the rest of your code
from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

Threading pool similar to the multiprocessing Pool?

I just found out that there actually is a thread-based Pool interface in the multiprocessing module, however it is hidden somewhat and not properly documented.

It can be imported via:

from multiprocessing.pool import ThreadPool

It is implemented using a dummy Process class wrapping a python thread. This thread-based Process class can be found in multiprocessing.dummy which is mentioned briefly in the docs. This dummy module supposedly provides the whole multiprocessing interface based on threads.

time.sleep — sleeps thread or process?

It blocks the thread. If you look in Modules/timemodule.c in the Python source, you'll see that in the call to floatsleep(), the substantive part of the sleep operation is wrapped in a Py_BEGIN_ALLOW_THREADS and Py_END_ALLOW_THREADS block, allowing other threads to continue to execute while the current one sleeps. You can also test this with a simple python program:

import time
from threading import Thread

class worker(Thread):
    def run(self):
        for x in xrange(0,11):
            print x
            time.sleep(1)

class waiter(Thread):
    def run(self):
        for x in xrange(100,103):
            print x
            time.sleep(5)

def run():
    worker().start()
    waiter().start()

Result:

0
100
1
2
3
4
5
101
6
7
8
9
10
102

What is the use of join() in python threading

A somewhat clumsy ascii-art to demonstrate the mechanism: The join() is presumably called by the main-thread. It could also be called by another thread, but would needlessly complicate the diagram. join-calling should be placed in the track of the main-thread, but to express thread-relation and keep it as simple as possible, I choose to place it in the child-thread instead.

without join:
+---+---+------------------                     main-thread
    |   |
    |   +...........                            child-thread(short)
    +..................................         child-thread(long)

with join
+---+---+------------------***********+###      main-thread
    |   |                             |
    |   +...........join()            |         child-thread(short)
    +......................join()......         child-thread(long)

with join and daemon thread
+-+--+---+------------------***********+###     parent-thread
  |  |   |                             |
  |  |   +...........join()            |        child-thread(short)
  |  +......................join()......        child-thread(long)
  +,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,     child-thread(long + daemonized)

'-' main-thread/parent-thread/main-program execution
'.' child-thread execution
'#' optional parent-thread execution after join()-blocked parent-thread could 
    continue
'*' main-thread 'sleeping' in join-method, waiting for child-thread to finish
',' daemonized thread - 'ignores' lifetime of other threads;
    terminates when main-programs exits; is normally meant for 
    join-independent tasks

So the reason you don't see any changes is because your main-thread does nothing after your join. You could say join is (only) relevant for the execution-flow of the main-thread. If, for example, you want to concurrently download a bunch of pages to concatenate them into a single large page, you may start concurrent downloads using threads, but need to wait until the last page/thread is finished before you start assembling a single page out of many. That's when you use join().

What is “thread local storage” in Python, and why do I need it?

In Python, everything is shared, except for function-local variables (because each function call gets its own set of locals, and threads are always separate function calls.) And even then, only the variables themselves (the names that refer to objects) are local to the function; objects themselves are always global, and anything can refer to them. The Thread object for a particular thread is not a special object in this regard. If you store the Thread object somewhere all threads can access (like a global variable) then all threads can access that one Thread object. If you want to atomically modify anything that you didn't just create in this very same thread, and did not store anywhere another thread can get at it, you have to protect it by a lock. And all threads must of course share this very same lock, or it wouldn't be very effective.

If you want actual thread-local storage, that's where threading.local comes in. Attributes of threading.local are not shared between threads; each thread sees only the attributes it itself placed in there. If you're curious about its implementation, the source is in _threading_local.py in the standard library.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment