Server: appserver-7f0f8755-nginx-15961cad18524ec5a9db05f2a6a7e440
Current directory: /usr/lib/python3.11/multiprocessing
Software: nginx/1.27.5
Shell Command
Create a new file
Upload file
File: resource_sharer.py
# # We use a background thread for sharing fds on Unix, and for sharing sockets on # Windows. # # A client which wants to pickle a resource registers it with the resource # sharer and gets an identifier in return. The unpickling process will connect # to the resource sharer, sends the identifier and its pid, and then receives # the resource. # import os import signal import socket import sys import threading from . import process from .context import reduction from . import util __all__ = ['stop'] if sys.platform == 'win32': __all__ += ['DupSocket'] class DupSocket(object): '''Picklable wrapper for a socket.''' def __init__(self, sock): new_sock = sock.dup() def send(conn, pid): share = new_sock.share(pid) conn.send_bytes(share) self._id = _resource_sharer.register(send, new_sock.close) def detach(self): '''Get the socket. This should only be called once.''' with _resource_sharer.get_connection(self._id) as conn: share = conn.recv_bytes() return socket.fromshare(share) else: __all__ += ['DupFd'] class DupFd(object): '''Wrapper for fd which can be used at any time.''' def __init__(self, fd): new_fd = os.dup(fd) def send(conn, pid): reduction.send_handle(conn, new_fd, pid) def close(): os.close(new_fd) self._id = _resource_sharer.register(send, close) def detach(self): '''Get the fd. This should only be called once.''' with _resource_sharer.get_connection(self._id) as conn: return reduction.recv_handle(conn) class _ResourceSharer(object): '''Manager for resources using background thread.''' def __init__(self): self._key = 0 self._cache = {} self._lock = threading.Lock() self._listener = None self._address = None self._thread = None util.register_after_fork(self, _ResourceSharer._afterfork) def register(self, send, close): '''Register resource, returning an identifier.''' with self._lock: if self._address is None: self._start() self._key += 1 self._cache[self._key] = (send, close) return (self._address, self._key) @staticmethod def get_connection(ident): '''Return connection from which to receive identified resource.''' from .connection import Client address, key = ident c = Client(address, authkey=process.current_process().authkey) c.send((key, os.getpid())) return c def stop(self, timeout=None): '''Stop the background thread and clear registered resources.''' from .connection import Client with self._lock: if self._address is not None: c = Client(self._address, authkey=process.current_process().authkey) c.send(None) c.close() self._thread.join(timeout) if self._thread.is_alive(): util.sub_warning('_ResourceSharer thread did ' 'not stop when asked') self._listener.close() self._thread = None self._address = None self._listener = None for key, (send, close) in self._cache.items(): close() self._cache.clear() def _afterfork(self): for key, (send, close) in self._cache.items(): close() self._cache.clear() self._lock._at_fork_reinit() if self._listener is not None: self._listener.close() self._listener = None self._address = None self._thread = None def _start(self): from .connection import Listener assert self._listener is None, "Already have Listener" util.debug('starting listener and thread for sending handles') self._listener = Listener(authkey=process.current_process().authkey) self._address = self._listener.address t = threading.Thread(target=self._serve) t.daemon = True t.start() self._thread = t def _serve(self): if hasattr(signal, 'pthread_sigmask'): signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals()) while 1: try: with self._listener.accept() as conn: msg = conn.recv() if msg is None: break key, destination_pid = msg send, close = self._cache.pop(key) try: send(conn, destination_pid) finally: close() except: if not util.is_exiting(): sys.excepthook(*sys.exc_info()) _resource_sharer = _ResourceSharer() stop = _resource_sharer.stop
.
23 Items
Change directory
Remove directory
Rename directory
..
204 Items
Change directory
Remove directory
Rename directory
__init__.py
0.89 KB
Edit
Delete
Copy
Move
Remame
__pycache__
21 Items
Change directory
Remove directory
Rename directory
connection.py
30.86 KB
Edit
Delete
Copy
Move
Remame
context.py
11.33 KB
Edit
Delete
Copy
Move
Remame
dummy
3 Items
Change directory
Remove directory
Rename directory
forkserver.py
11.86 KB
Edit
Delete
Copy
Move
Remame
heap.py
11.35 KB
Edit
Delete
Copy
Move
Remame
managers.py
46.57 KB
Edit
Delete
Copy
Move
Remame
pool.py
31.99 KB
Edit
Delete
Copy
Move
Remame
popen_fork.py
2.32 KB
Edit
Delete
Copy
Move
Remame
popen_forkserver.py
2.18 KB
Edit
Delete
Copy
Move
Remame
popen_spawn_posix.py
1.98 KB
Edit
Delete
Copy
Move
Remame
popen_spawn_win32.py
3.93 KB
Edit
Delete
Copy
Move
Remame
process.py
11.82 KB
Edit
Delete
Copy
Move
Remame
queues.py
11.74 KB
Edit
Delete
Copy
Move
Remame
reduction.py
9.29 KB
Edit
Delete
Copy
Move
Remame
resource_sharer.py
5.01 KB
Edit
Delete
Copy
Move
Remame
resource_tracker.py
8.76 KB
Edit
Delete
Copy
Move
Remame
shared_memory.py
18.03 KB
Edit
Delete
Copy
Move
Remame
sharedctypes.py
6.16 KB
Edit
Delete
Copy
Move
Remame
spawn.py
9.2 KB
Edit
Delete
Copy
Move
Remame
synchronize.py
11.5 KB
Edit
Delete
Copy
Move
Remame
util.py
13.74 KB
Edit
Delete
Copy
Move
Remame