Threading Shotgun connection

I was curious to know how people have managed to create thread safe connections to shotgun? Does anyone have an easy way of doing this?

In the help docs it says the following…

Multi-threading

The Shotgun API is not thread-safe. If you want to do threading we strongly suggest that you use one connection object per thread and not share the connection.

I’m not sure how one would assign a connection per thread and ensure the methods running use that that threads connection.

Any ideas or info would be greatly appreciated.

Thank you

1 Like

The idea would be to instantiate and authenticate an api object in each thread.

So inside each thread handler you have something like

sg = shotgun_api3.Shotgun("https://piedpiper.shotgunstudio.com",
                          login="rhendriks",
                          password="c0mPre$Hi0n")

and then use this connection only in the given thread.

1 Like

The trick would be ensuring each thread uses its own connection to the api. Let me see if i can put a simple test case together and ill report back.

Is there a way to assign the sg variable inside of each thread so it knows to use that instance rather than another one?

sample from another thread where it shows the problem…

import threading
import shotgun_api3


def getShotgunClient():
    sgHost = '...'
    sgScriptName = '...'
    sgApiKey = '...'
    sg = shotgun_api3.Shotgun(sgHost, script_name=sgScriptName, api_key=sgApiKey)
    return sg

num_threads = 5

sg = getShotgunClient()
sgProject = sg.find_one("Project", [["name", "is", 'Test']])

def worker():
    for i in range(5):
        try:
            category = sg.find_one('Task', [['code','is', 'F_FaceOnly']], ['code'])
            print(i, category)
        except Exception as exc:
            print("Exception thrown during SGX worker thread: %s", exc)

threads = [ threading.Thread(target=worker) for _i in range(num_threads) ]

for thread in threads:
    thread.start()

for thread in threads:
    thread.join()

print("All Done")

You just use a local variable for the instance. Here you create an instance shared among all workers, but that is the opposite.
Instead you should call getShotgunClient inside worker.

Is there any easier way of automating this where the thread could create its only clone or instance for the life of the thread being used? I ask because some of my methods call other shotgun methods and that would require that i pass around the connection object to every method it’s needed.

It sounds like you are using a global SG instance in your methods. I think that’s problematic for the exact issue you are running into. One solution would be to make your functions class methods, that way they can share an instance of Shotgun. Here is one possible pattern:

from shotgun_api3 import Shotgun

class SGExtended(object):
    '''Wraps shotgun_api3.Shotgun providing additional convenient methods.'''

    def __init__(self, sg=None):
        if sg is None:
            self._sg = Shotgun(...)
        else:
            self._sg = sg

    def __getattr__(self, attr):
        if hasattr(self._sg, attr):
            return getattr(self._sg, attr)
        raise AttributeError("%r object has no attribute %r" % (self.__class__.__name__, attr))

    def find_tasks(self, entity):
        return self._sg.find('Task', [['entity', 'is', entity]], ['content'])


def worker():
    sg = SGExtended()

    # Use your custom method
    tasks = sg.find_tasks(...)

    # Or use a standard Shotgun method
    tasks = sg.find('Task', ...)
1 Like

In our solutions, the sg instance is indeed being passed around in every function that needs it. It is not so bad, there are some fat functions but one more argument is okay.

This way the lifetime of sg is limited to the worker’s scope - once worker returns, the instance will be garbage collected.

There are several ways to make this “simpler” if you wish.

One is Dan’s example with a class that remembers an instance and uses it in its own methods with self.sg. This is sort of the standard OOP way of structuring things.
A related idea is that this object can itself be a callable and passed as worker. e.g.

class SGWorker(object):
  def __init__(self, sg=None):
    # pretty much everything from Dan's example
  def __call__(self):
    tasks = self.find_tasks(...)
    # do something with tasks

thread = threading.Thread(target=SGWorker())
...

Another option is partial application (the functions still have sg as their first argument, but you pass it just once to partial)

find_tasks_with_sg = functools.partial(find_tasks, sg)
# now use multiple times
find_tasks_with_sg(entity)

This is more powerful than it seems at first.

1 Like

Hi all,

Here is how I deal with thread safe shotgun calls using threading.local
I create a get_sg_connection() which make sure to create one connection per thread.
And I make sure to never store the instance at the module level in all api I build on top of shotgun_api3.

I you want to create api that can be reuse across your pipeline without worrying about threads: you must call get_sg_connection() in the beginning of every fonction that need it (see list_assets(project) function).

import threading
import os
from shotgun_api3 import Shotgun
from queue import Queue, Empty

_local = threading.local()
def get_sg_connection():
    if not hasattr(_local, "instance"):
        script_name = os.environ["SHOTGUN_SCRIPT_NAME"]
        api_key = os.environ["SHOTGUN_API_KEY"]
        url = os.environ["SHOTGUN_URL"]
        _local.instance = Shotgun(
            url,
            script_name=script_name,
            api_key=api_key,
        )
    return _local.instance

def list_assets(project):
    sg = get_sg_connection()
    return sg.find("Asset", [
        ["project", "is", project]
    ], ["code"])

def worker(i, q):
    sg = get_sg_connection()
    while True:
        try:
            project = q.get(False)
        except Empty:
            break
        assets = list_assets(project)
        print(f"#{i} project {project['name']} contains {len(assets)} assets.")
        q.task_done()
    print(f"End worker #{i}")

def main():
    sg = get_sg_connection()
    projects = sg.find("Project", [], ["name"])
    q = Queue()
    for p in projects:
        q.put(p)
    num_threads = 4
    threads = []
    for i in range(num_threads):
        # Start 4 workers
        t = threading.Thread(target=worker, args=[i, q])
        threads.append(t)
        t.start()
    q.join()

if __name__ == "__main__":
    main()
1 Like

I replied to @JokerMartini in another similar thread, with an implementation that slightly resembles @michael.delaporte’s answer above: SSLError: wrong version number - #13 by danielskovli

Main difference being that I decided to use an instance pool and a pool manager to handle (and grow) the connections, instead of binding them directly to a thread-id. Reason for this being that two different threads can successfully share a connection instance, as long as they don’t concurrently call the API.

3 Likes