I was curious to know how people have managed to create thread safe connections to shotgun? Does anyone have an easy way of doing this?
In the help docs it says the following…
Multi-threading
The Shotgun API is not thread-safe. If you want to do threading we strongly suggest that you use one connection object per thread and not share the connection.
I’m not sure how one would assign a connection per thread and ensure the methods running use that that threads connection.
You just use a local variable for the instance. Here you create an instance shared among all workers, but that is the opposite.
Instead you should call getShotgunClient inside worker.
Is there any easier way of automating this where the thread could create its only clone or instance for the life of the thread being used? I ask because some of my methods call other shotgun methods and that would require that i pass around the connection object to every method it’s needed.
It sounds like you are using a global SG instance in your methods. I think that’s problematic for the exact issue you are running into. One solution would be to make your functions class methods, that way they can share an instance of Shotgun. Here is one possible pattern:
from shotgun_api3 import Shotgun
class SGExtended(object):
'''Wraps shotgun_api3.Shotgun providing additional convenient methods.'''
def __init__(self, sg=None):
if sg is None:
self._sg = Shotgun(...)
else:
self._sg = sg
def __getattr__(self, attr):
if hasattr(self._sg, attr):
return getattr(self._sg, attr)
raise AttributeError("%r object has no attribute %r" % (self.__class__.__name__, attr))
def find_tasks(self, entity):
return self._sg.find('Task', [['entity', 'is', entity]], ['content'])
def worker():
sg = SGExtended()
# Use your custom method
tasks = sg.find_tasks(...)
# Or use a standard Shotgun method
tasks = sg.find('Task', ...)
In our solutions, the sg instance is indeed being passed around in every function that needs it. It is not so bad, there are some fat functions but one more argument is okay.
This way the lifetime of sg is limited to the worker’s scope - once worker returns, the instance will be garbage collected.
There are several ways to make this “simpler” if you wish.
One is Dan’s example with a class that remembers an instance and uses it in its own methods with self.sg. This is sort of the standard OOP way of structuring things.
A related idea is that this object can itself be a callable and passed as worker. e.g.
class SGWorker(object):
def __init__(self, sg=None):
# pretty much everything from Dan's example
def __call__(self):
tasks = self.find_tasks(...)
# do something with tasks
thread = threading.Thread(target=SGWorker())
...
Another option is partial application (the functions still have sg as their first argument, but you pass it just once to partial)
find_tasks_with_sg = functools.partial(find_tasks, sg)
# now use multiple times
find_tasks_with_sg(entity)
Here is how I deal with thread safe shotgun calls using threading.local
I create a get_sg_connection() which make sure to create one connection per thread.
And I make sure to never store the instance at the module level in all api I build on top of shotgun_api3.
I you want to create api that can be reuse across your pipeline without worrying about threads: you must call get_sg_connection() in the beginning of every fonction that need it (see list_assets(project) function).
import threading
import os
from shotgun_api3 import Shotgun
from queue import Queue, Empty
_local = threading.local()
def get_sg_connection():
if not hasattr(_local, "instance"):
script_name = os.environ["SHOTGUN_SCRIPT_NAME"]
api_key = os.environ["SHOTGUN_API_KEY"]
url = os.environ["SHOTGUN_URL"]
_local.instance = Shotgun(
url,
script_name=script_name,
api_key=api_key,
)
return _local.instance
def list_assets(project):
sg = get_sg_connection()
return sg.find("Asset", [
["project", "is", project]
], ["code"])
def worker(i, q):
sg = get_sg_connection()
while True:
try:
project = q.get(False)
except Empty:
break
assets = list_assets(project)
print(f"#{i} project {project['name']} contains {len(assets)} assets.")
q.task_done()
print(f"End worker #{i}")
def main():
sg = get_sg_connection()
projects = sg.find("Project", [], ["name"])
q = Queue()
for p in projects:
q.put(p)
num_threads = 4
threads = []
for i in range(num_threads):
# Start 4 workers
t = threading.Thread(target=worker, args=[i, q])
threads.append(t)
t.start()
q.join()
if __name__ == "__main__":
main()
Main difference being that I decided to use an instance pool and a pool manager to handle (and grow) the connections, instead of binding them directly to a thread-id. Reason for this being that two different threads can successfully share a connection instance, as long as they don’t concurrently call the API.