#!/usr/bin/env python # coding: utf-8 # In[1]: # Copyright 2014 Brett Slatkin, Pearson Education Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # Preamble to mimick book environment import logging from pprint import pprint from sys import stdout as STDOUT # In[2]: # Example 1 import subprocess proc = subprocess.Popen( ['echo', 'Hello from the child!'], stdout=subprocess.PIPE) out, err = proc.communicate() print(out.decode('utf-8')) # In[3]: # Example 2 from time import sleep, time proc = subprocess.Popen(['sleep', '0.3']) while proc.poll() is None: print('Working...') # Some time consuming work here sleep(0.2) print('Exit status', proc.poll()) # In[4]: # Example 3 def run_sleep(period): proc = subprocess.Popen(['sleep', str(period)]) return proc start = time() procs = [] for _ in range(10): proc = run_sleep(0.1) procs.append(proc) # In[5]: # Example 4 for proc in procs: proc.communicate() end = time() print('Finished in %.3f seconds' % (end - start)) # In[6]: # Example 5 import os def run_openssl(data): env = os.environ.copy() env['password'] = b'\xe24U\n\xd0Ql3S\x11' proc = subprocess.Popen( ['openssl', 'enc', '-des3', '-pass', 'env:password'], env=env, stdin=subprocess.PIPE, stdout=subprocess.PIPE) proc.stdin.write(data) proc.stdin.flush() # Ensure the child gets input return proc # In[7]: # Example 6 import os procs = [] for _ in range(3): data = os.urandom(10) proc = run_openssl(data) procs.append(proc) # In[8]: # Example 7 for proc in procs: out, err = proc.communicate() print(out[-10:]) # In[9]: # Example 8 def run_md5(input_stdin): proc = subprocess.Popen( ['md5'], stdin=input_stdin, stdout=subprocess.PIPE) return proc # In[10]: # Example 9 input_procs = [] hash_procs = [] for _ in range(3): data = os.urandom(10) proc = run_openssl(data) input_procs.append(proc) hash_proc = run_md5(proc.stdout) hash_procs.append(hash_proc) # In[11]: # Example 10 for proc in input_procs: proc.communicate() for proc in hash_procs: out, err = proc.communicate() print(out.strip()) # In[12]: # Example 11 proc = run_sleep(10) try: proc.communicate(timeout=0.1) except subprocess.TimeoutExpired: proc.terminate() proc.wait() print('Exit status', proc.poll())