FLOURESCENCE MEASUREMENT CHAMBER
Bacteria are moved into a UV-cuvette inside the measurement chamber of the fluorescence detector. The chamber is a 3D-printed
enclosure (see Fig. Xy) with two walls facing each other that house a row of three LED sockets each. In the current
setup, we only placed three [XXX UV?] LEDs in one wall. The other two opposing walls host camera sockets, to
one one of which we have attached a RPi-camera, that is controlled by the RPi server. Two optical filters are
placed inside. A [XXX] filter [do we show their spectral characteristics?] between the UV-LEDs and the cuvette,
to filter out wavelengths from [XX] to [XX] nm (peak activation of eGFP at ~ XX nm). Between the cuvette and
the camera a [XX] filter is inserted to allow only light emitted from activated bacteria through while holding
back the rest of the LED-light.
The camera is controlled via the Python picamera library and calibrated to maximize the difference between idle and activated
bacteria @allFromHof: (see table “camera-settings” or Supplemental Stuff, Zenodo, or nowhere?). The raw RGB-data
is recorded, the green channel is kept and cropped to our region of interest---the bacterial suspension. The
median green intensity of this region is then compared to a threshold [XX] to decipher the bacterial command.
Casing
3D-printed bla..
LEDs and Optical Filters
because of the expressed fluorescent protein’s spectral characteristics [explain shortly and link to molbio parts], we employ
[XXX] nm LEDs to excite [XXX/proteins/chromophores?] and filter… images of LED’s emission spectrum and filter’s
absorption curves for temperature experiment… ???
Camera
… see “colicam_nu.py” on github/daniel_moser/colibot/server/cam/ … 1 second shutterspeed..
Software
Camera Master:
Camera Slave:
#!/usr/bin/env python """ Code for the master-camera. It is connected to the slave-camera via 2 GPIO pins and outputs the total detected bacterial state (aggregated from both cameras). @author: Daniel Hofstadler, 2017 """ import os import csv import time import argparse import gpiozero from datetime import datetime from fractions import Fraction import numpy as np import picamera import picamera.array from matplotlib import pyplot as plt # import warnings # warnings.filterwarnings('error', category=DeprecationWarning) # globals # LOCAL_TZ = pytz.timezone("Etc/UTC") # read from cams.csv OUT_APPEND = "_coli" CSV_NAME = "colicam" CSV_HEADER = ["framerate", "exposure_time_us", "iso", "g1", "g2", "g3", "g4", "timestamp", "time_utc", "tag", "red", "green", "blue"] # RESOLUTION = (2592, 1944) RESOLUTION = (1280, 720) # REGION_OF_INTEREST = (0.35, 0.4, 0.5, 0.7) ROI = [200, 520, 480, 800] # Y0, Y1, X0, X1 DETECTION_THRESHOLD = 150 PIN_RX = 17 PIN_TX = 18 SETTINGS_FIXED = { 'name': "fixed", 'ss': 1000000, # 1000 milliseconds = 1 sec 'fps': Fraction(1, 2), # (1, 2) 'iso': 800, # 800 'g': (Fraction(2, 1), Fraction(1, 1)), # 'g': (Fraction(345, 256), Fraction(167, 128)), 'sleep': 15} # 10 SETTINGS_SENSITIVE = { 'name': "sensitive", 'ss': 6000000, # 6 seconds 'fps': Fraction(1, 6), 'iso': 800, 'g': None, 'sleep': 30} SETTINGS_AUTO = { 'name': "auto", 'ss': None, 'fps': 30, 'iso': 0, 'g': None, 'sleep': 2} SETTINGS_DEFAULT = "fixed" # argument parsing parser = argparse.ArgumentParser( description="Colibot Photometer Camera.") parser.add_argument("-d", "--debug", action="store_true", help="debug mode") parser.add_argument("-m", "--mode", type=str, default="fixed", choices=["f", "fixed", "s", "sensitive", "a", "auto"], help="set the camera mode") parser.add_argument("-p", "--postfix", type=str, default="", help="add string to filename") # args = vars(ap.parse_args()) args = parser.parse_args() if not args.postfix == "" and not args.postfix.startswith("_"): args.postfix = "_" + args.postfix def now_str(pattern="_%y%m%d-%H%M%S_utc"): # return time.strftime(pattern, time.gmtime()) return datetime.utcnow().strftime(pattern) def from_timestamp(ts, pattern="%y%m%d-%H%M%S_utc"): return datetime.utcfromtimestamp(ts).strftime(pattern) def safename(s, s_type): """Check whether a given file or folder 's' exists, return a non-existing filename. s ........ (full) filename or directory s_type ... 'file' or 'f' for files, 'directory' or 'dir' or 'd' for folders Returns a file- or pathname that is supposedly safe to save without overwriting data. """ low_type = str.lower(s_type) if low_type == 'file' or low_type == 'f': if os.path.isfile(s): s2 = s.split('.') s_base = s2[0] s_ext = s2[-1] counter = 0 while os.path.isfile(s): s = s_base + "-{:02d}.".format(counter) + s_ext counter += 1 elif low_type == 'directory' or low_type == 'dir' or low_type == 'd': if os.path.isdir(s): s_base = s counter = 0 while os.path.isdir(s): s = s_base + "-{:02d}".format(counter) counter += 1 return s class ColiCamMaster: global RESOLUTION global ROI # REGION_OF_INTEREST global DETECTION_THRESHOLD global SETTINGS_FIXED global SETTINGS_SENSITIVE global SETTINGS_AUTO global DEF_SETTINGS_NAME global PIN_RX global PIN_TX def __init__(self, resolution=RESOLUTION, mode=DEF_SETTINGS_NAME): try: self.cam = picamera.PiCamera(resolution=resolution) self.set_cam(mode) except Exception as err: self.cam = "" print("couldn't initialize camera, error: {}".format(err)) self.rx = gpiozero.InputDevice(PIN_RX) self.tx = gpiozero.OutputDevice(PIN_TX) def set_cam(self, mode="fixed"): if mode.lower().startswith("f"): settings = SETTINGS_FIXED elif mode.lower().startswith("s"): settings = SETTINGS_SENSITIVE elif mode.lower().startswith("a"): settings = SETTINGS_AUTO else: print("wrong camera mode") # switch on auto mode self.cam.awb_mode = 'auto' self.cam.exposure_mode = 'auto' # settings = dict with fields: name, ss, fps, iso, g, sleep self.cam.framerate = settings['fps'] if settings['ss']: self.cam.shutter_speed = settings['ss'] self.cam.iso = settings['iso'] # Wait for the automatic gain control to settle time.sleep(settings['sleep']) # Now fix the values if not settings['ss']: self.cam.shutter_speed = self.cam.exposure_speed self.cam.exposure_mode = 'off' if settings['g']: g = settings['g'] else: g = self.cam.awb_gains self.cam.awb_mode = 'off' self.cam.awb_gains = g def measure(self, roi=ROI, channel_name="green", threshold=DETECTION_THRESHOLD, dir_out=None, csv_out=None, postfix=""): # notify other camera to take a picture self.tx.on() # check whether channel is valid if channel_name.lower().startswith("r"): channel = 0 channel_name = "red" elif channel_name.lower().startswith("g"): channel = 1 channel_name = "green" elif channel_name.lower().startswith("b"): channel = 2 channel_name = "blue" else: print("wrong channel_name: '{}'".format(channel_name)) # get camera settings time.sleep(1) fps = float(self.cam.framerate) ss = self.cam.exposure_speed g = self.cam.awb_gains g1 = g[0].numerator g2 = g[0].denominator g3 = g[1].numerator g4 = g[1].denominator iso = self.cam.iso settings_str = "ss{}_g{}-{}_{}-{}_iso{}_".format( ss, g1, g2, g3, g4, iso) csv_row = [fps, ss, iso, g1, g2, g3, g4] print("taking photos with settings: {}".format( settings_str)) # with picamera.PiCamera(resolution=RESOLUTION) as cam: # https://picamera.readthedocs.io/en/latest/ # api_array.html#module-picamera.array with picamera.array.PiRGBArray(self.cam) as output: # cam.zoom = REGION_OF_INTEREST # cam.start_preview() # take picture t0 = time.time() self.cam.capture(output, format="rgb") a = output.array t1 = time.time() print("recorded for {} sec".format(round(t1-t0, 2))) t_pic = int(round(np.mean([t0, t1]))) # re-empty output (to reuse next iteration) # output.truncate(0) # crop to region of interest roi = a[ROI[0]:ROI[1], ROI[2]:ROI[3], :] vals = [] vals.append(np.median(roi[:, :, 0])) vals.append(np.median(roi[:, :, 1])) vals.append(np.median(roi[:, :, 2])) print(("roi.shape={},\n" + # "sum(red)={}, sum(green)={}, sum(blue)={},\n" + "med(red)={}, med(green)={}, med(blue)={}").format( roi.shape, vals[0], vals[1], vals[2])) val = vals[channel] print("reading the {} channel...\nvalue: {}".format( channel_name, val)) if val >= threshold: activated = True else: activated = False print("active={} (threshold: {})".format(activated, threshold)) # debug mode: # output images and csv if output directory exists if dir_out: names = ["1-red", "2-green", "3-blue"] # whole images fn = "full_{}{}_0-rgb{}.png".format( settings_str, postfix, now_str()) ffn = safename(os.path.join(dir_out, fn), "file") plt.imsave(ffn, a) for i in range(3): fn = "full_{}{}_{}{}.png".format( settings_str, postfix, names[i], now_str()) ffn = safename(os.path.join(dir_out, fn), "file") plt.imsave(ffn, a[:, :, i], vmin=0, vmax=255) # cropped to ROI fn = "roi_{}{}_0-rgb{}.png".format( settings_str, postfix, now_str()) ffn = safename(os.path.join(dir_out, fn), "file") plt.imsave(ffn, roi) for i in range(3): fn = "roi_{}{}_{}{}.png".format( settings_str, postfix, names[i], now_str()) ffn = safename(os.path.join(dir_out, fn), "file") plt.imsave(ffn, roi[:, :, i], vmin=0, vmax=255) csv_row.extend([t_pic, from_timestamp(t_pic), postfix, vals[0], vals[1], vals[2]]) with open(csv_out, "at") as csvfile: csvwriter = csv.writer(csvfile) csvwriter.writerow(csv_row) print("wrote csv_row: {}".format(csv_row)) # read state of the slave-camera other_activated = False # impossible that both are active! if not activated: timeout = time.time() + 2 # seconds while not other_activated and time.time() < timeout: other_activated = self.rx.is_active self.tx.off() # determine final state if activated: output = 1 elif other_activated: output = -1 else: output = 0 return output def exit_clean(self): if not self.cam == "": self.cam.close() def main(args): global CSV_NAME global CSV_HEADER # t0 = time.time() cam = ColiCam(mode=args.mode) # t_cam = time.time() - t0 # select mode (camera settings) current_mode = args.mode # optionally create output directory for debugging if args.debug: # setup output directory (and copy over the code?) dir_out = "/home/pi/ColiCam_{}".format(now_str("%y%m%d")) dir_out = safename(dir_out, "dir") if not os.path.isdir(dir_out): os.mkdir(dir_out) print("outputting to '{}'".format(dir_out)) # initialize csv-log csv_fn = "{}{}{}.csv".format(CSV_NAME, args.postfix, now_str()) csv_out = safename(os.path.join(dir_out, csv_fn), 'file') # write header with open(csv_out, "wt") as csvfile: csvwriter = csv.writer(csvfile) csvwriter.writerow(CSV_HEADER) print("initialized {} with header {}".format( csv_out, CSV_HEADER)) else: dir_out = None csv_out = None # it = 0 try: while True: # it += 1 # update mode new_mode = args.mode if not current_mode == new_mode: cam.set_cam(mode=new_mode) # cam.set_cam(mode=args.mode) print("measuring in {} mode (default):".format(args.mode)) # measure with both cams val = cam.measure(dir_out=dir_out, csv_out=csv_out, postfix=args.postfix) if val == -1: print("wall on the left\n") elif val == 0: print("no wall detected\n") elif val == 1: print("wall on the right\n") except KeyboardInterrupt: print("manually interrupted program") finally: cam.exit_clean() print("camera closed, done!") if __name__ == "__main__": main(args)
Camera Slave:
#!/usr/bin/env python """ Code for the slave-camera It is connected to the master-camera via 2 GPIO pins and outputs the detected bacterial state via the GPIO-connection to the master-camera. In order to constantly listen for commands from the master camera, it needs to start a background- thread. @author: Daniel Hofstadler, 2017 """ import os import csv import time import argparse import gpiozero import threading from datetime import datetime from fractions import Fraction import numpy as np import picamera import picamera.array from matplotlib import pyplot as plt # import warnings # warnings.filterwarnings('error', category=DeprecationWarning) # globals # LOCAL_TZ = pytz.timezone("Etc/UTC") # read from cams.csv OUT_APPEND = "_coli" CSV_NAME = "colicam" CSV_HEADER = ["framerate", "exposure_time_us", "iso", "g1", "g2", "g3", "g4", "timestamp", "time_utc", "tag", "red", "green", "blue"] # RESOLUTION = (2592, 1944) RESOLUTION = (1280, 720) # REGION_OF_INTEREST = (0.35, 0.4, 0.5, 0.7) ROI = [200, 520, 480, 800] # Y0, Y1, X0, X1 DETECTION_THRESHOLD = 150 PIN_RX = 17 PIN_TX = 18 SETTINGS_FIXED = { 'name': "fixed", 'ss': 1000000, # 1000 milliseconds = 1 sec 'fps': Fraction(1, 2), # (1, 2) 'iso': 800, # 800 'g': (Fraction(2, 1), Fraction(1, 1)), # 'g': (Fraction(345, 256), Fraction(167, 128)), 'sleep': 15} # 10 SETTINGS_SENSITIVE = { 'name': "sensitive", 'ss': 6000000, # 6 seconds 'fps': Fraction(1, 6), 'iso': 800, 'g': None, 'sleep': 30} SETTINGS_AUTO = { 'name': "auto", 'ss': None, 'fps': 30, 'iso': 0, 'g': None, 'sleep': 2} SETTINGS_DEFAULT = "fixed" # argument parsing parser = argparse.ArgumentParser( description="Colibot Photometer Camera.") parser.add_argument("-d", "--debug", action="store_true", help="debug mode") parser.add_argument("-m", "--mode", type=str, default="fixed", choices=["f", "fixed", "s", "sensitive", "a", "auto"], help="set the camera mode") parser.add_argument("-p", "--postfix", type=str, default="", help="add string to filename") # args = vars(ap.parse_args()) args = parser.parse_args() if not args.postfix == "" and not args.postfix.startswith("_"): args.postfix = "_" + args.postfix def now_str(pattern="_%y%m%d-%H%M%S_utc"): # return time.strftime(pattern, time.gmtime()) return datetime.utcnow().strftime(pattern) def from_timestamp(ts, pattern="%y%m%d-%H%M%S_utc"): return datetime.utcfromtimestamp(ts).strftime(pattern) def safename(s, s_type): """Check whether a given file or folder 's' exists, return a non-existing filename. s ........ (full) filename or directory s_type ... 'file' or 'f' for files, 'directory' or 'dir' or 'd' for folders Returns a file- or pathname that is supposedly safe to save without overwriting data. """ low_type = str.lower(s_type) if low_type == 'file' or low_type == 'f': if os.path.isfile(s): s2 = s.split('.') s_base = s2[0] s_ext = s2[-1] counter = 0 while os.path.isfile(s): s = s_base + "-{:02d}.".format(counter) + s_ext counter += 1 elif low_type == 'directory' or low_type == 'dir' or low_type == 'd': if os.path.isdir(s): s_base = s counter = 0 while os.path.isdir(s): s = s_base + "-{:02d}".format(counter) counter += 1 return s def thread_listener(cam): """Background thread listening on the receive-pin whether to make a measurement.""" try: while cam.active: if cam.rx.is_active: cam.measure_slave() time.sleep(0.5) finally: print("listener thread shutting down") class ColiCamSlave: global RESOLUTION global ROI # REGION_OF_INTEREST global DETECTION_THRESHOLD global SETTINGS_FIXED global SETTINGS_SENSITIVE global SETTINGS_AUTO global DEF_SETTINGS_NAME global PIN_RX global PIN_TX def __init__(self, resolution=RESOLUTION, mode=DEF_SETTINGS_NAME): try: self.cam = picamera.PiCamera(resolution=resolution) self.set_cam(mode) except Exception as err: self.cam = "" print("couldn't initialize camera, error: {}".format(err)) self.rx = gpiozero.InputDevice(PIN_RX) self.tx = gpiozero.OutputDevice(PIN_TX) self.active = True # start the listening thread listener = threading.Thread( target=thread_listener, args=(self)) listener.daemon = True listener.start() self.listener = listener def set_cam(self, mode="fixed"): if mode.lower().startswith("f"): settings = SETTINGS_FIXED elif mode.lower().startswith("s"): settings = SETTINGS_SENSITIVE elif mode.lower().startswith("a"): settings = SETTINGS_AUTO else: print("wrong camera mode") # switch on auto mode self.cam.awb_mode = 'auto' self.cam.exposure_mode = 'auto' # settings = dict with fields: name, ss, fps, iso, g, sleep self.cam.framerate = settings['fps'] if settings['ss']: self.cam.shutter_speed = settings['ss'] self.cam.iso = settings['iso'] # Wait for the automatic gain control to settle time.sleep(settings['sleep']) # Now fix the values if not settings['ss']: self.cam.shutter_speed = self.cam.exposure_speed self.cam.exposure_mode = 'off' if settings['g']: g = settings['g'] else: g = self.cam.awb_gains self.cam.awb_mode = 'off' self.cam.awb_gains = g def measure_slave(self, roi=ROI, channel_name="green", threshold=DETECTION_THRESHOLD, dir_out=None, csv_out=None, postfix=""): # check whether channel is valid if channel_name.lower().startswith("r"): channel = 0 channel_name = "red" elif channel_name.lower().startswith("g"): channel = 1 channel_name = "green" elif channel_name.lower().startswith("b"): channel = 2 channel_name = "blue" else: print("wrong channel_name: '{}'".format(channel_name)) # get camera settings time.sleep(1) fps = float(self.cam.framerate) ss = self.cam.exposure_speed g = self.cam.awb_gains g1 = g[0].numerator g2 = g[0].denominator g3 = g[1].numerator g4 = g[1].denominator iso = self.cam.iso settings_str = "ss{}_g{}-{}_{}-{}_iso{}_".format( ss, g1, g2, g3, g4, iso) csv_row = [fps, ss, iso, g1, g2, g3, g4] print("taking photos with settings: {}".format( settings_str)) # with picamera.PiCamera(resolution=RESOLUTION) as cam: # https://picamera.readthedocs.io/en/latest/ # api_array.html#module-picamera.array with picamera.array.PiRGBArray(self.cam) as output: # cam.zoom = REGION_OF_INTEREST # cam.start_preview() # take picture t0 = time.time() self.cam.capture(output, format="rgb") a = output.array t1 = time.time() print("recorded for {} sec".format(round(t1-t0, 2))) t_pic = int(round(np.mean([t0, t1]))) # re-empty output (to reuse next iteration) # output.truncate(0) # crop to region of interest roi = a[ROI[0]:ROI[1], ROI[2]:ROI[3], :] vals = [] vals.append(np.median(roi[:, :, 0])) vals.append(np.median(roi[:, :, 1])) vals.append(np.median(roi[:, :, 2])) print(("roi.shape={},\n" + # "sum(red)={}, sum(green)={}, sum(blue)={},\n" + "med(red)={}, med(green)={}, med(blue)={}").format( roi.shape, vals[0], vals[1], vals[2])) val = vals[channel] print("reading the {} channel...\nvalue: {}".format( channel_name, val)) # checking whether threshold is crossed and # signalling the master-camera if val >= threshold: activated = True self.tx.on() else: activated = False self.tx.off() print("active={} (threshold: {})".format(activated, threshold)) # debug mode: # output images and csv if output directory exists if dir_out: names = ["1-red", "2-green", "3-blue"] # whole images fn = "full_{}{}_0-rgb{}.png".format( settings_str, postfix, now_str()) ffn = safename(os.path.join(dir_out, fn), "file") plt.imsave(ffn, a) for i in range(3): fn = "full_{}{}_{}{}.png".format( settings_str, postfix, names[i], now_str()) ffn = safename(os.path.join(dir_out, fn), "file") plt.imsave(ffn, a[:, :, i], vmin=0, vmax=255) # cropped to ROI fn = "roi_{}{}_0-rgb{}.png".format( settings_str, postfix, now_str()) ffn = safename(os.path.join(dir_out, fn), "file") plt.imsave(ffn, roi) for i in range(3): fn = "roi_{}{}_{}{}.png".format( settings_str, postfix, names[i], now_str()) ffn = safename(os.path.join(dir_out, fn), "file") plt.imsave(ffn, roi[:, :, i], vmin=0, vmax=255) csv_row.extend([t_pic, from_timestamp(t_pic), postfix, vals[0], vals[1], vals[2]]) with open(csv_out, "at") as csvfile: csvwriter = csv.writer(csvfile) csvwriter.writerow(csv_row) print("wrote csv_row: {}".format(csv_row)) return None def exit_clean(self): if not self.cam == "": self.cam.close() def main(args): global CSV_NAME global CSV_HEADER # t0 = time.time() cam = ColiCam(mode=args.mode) # t_cam = time.time() - t0 # select mode (camera settings) current_mode = args.mode # optionally create output directory for debugging if args.debug: # setup output directory (and copy over the code?) dir_out = "/home/pi/ColiCam_{}".format(now_str("%y%m%d")) dir_out = safename(dir_out, "dir") if not os.path.isdir(dir_out): os.mkdir(dir_out) print("outputting to '{}'".format(dir_out)) # initialize csv-log csv_fn = "{}{}{}.csv".format(CSV_NAME, args.postfix, now_str()) csv_out = safename(os.path.join(dir_out, csv_fn), 'file') # write header with open(csv_out, "wt") as csvfile: csvwriter = csv.writer(csvfile) csvwriter.writerow(CSV_HEADER) print("initialized {} with header {}".format( csv_out, CSV_HEADER)) else: dir_out = None csv_out = None # it = 0 try: while True: # it += 1 # update mode new_mode = args.mode if not current_mode == new_mode: cam.set_cam(mode=new_mode) # cam.set_cam(mode=args.mode) print("measuring in {} mode (default):".format(args.mode)) # measure with both cams val = cam.measure(dir_out=dir_out, csv_out=csv_out, postfix=args.postfix) if val == -1: print("wall on the left\n") elif val == 0: print("no wall detected\n") elif val == 1: print("wall on the right\n") except KeyboardInterrupt: print("manually interrupted program") finally: cam.exit_clean() print("camera closed, done!") if __name__ == "__main__": main(args)