Source code for pudu.utils

import subprocess
import time
from typing import Optional

colors = [
    "#4040BF",   # Blue
    "#BF4040",   # Red
    "#40BF40",   # Green
    "#A640BF",   # Purple
    "#BFBF40",   # Yellow
    "#BF7340",   # Orange
    "#40BFBF",   # Cyan
    "#BF40A6",   # Magenta
    "#73BF40",   # Lime green
    "#4073BF",   # Blue-cyan
    "#BF8C40",   # Orange-yellow
    "#40BF73",   # Green-cyan
    "#7340BF",   # Blue-purple
    "#A6BF40",   # Yellow-green
    "#BF5940",   # Red-orange
    "#40A6BF",   # Cyan-blue
    "#BF4073",   # Red-purple
    "#59BF40",   # Green
    "#BFA640",   # Orange-yellow
    "#40BFA6",   # Cyan-green
    "#8CBF40",   # Yellow-green
    "#40BF59",   # Green
    "#40BF8C",   # Green-cyan
    "#BF40A6"    # Purple-magenta
]

[docs] class Camera: """ Camera class for handling picture and video capture during Opentrons protocols. This class encapsulates all camera functionality including: - Taking snapshots at specific protocol steps - Recording video during protocol execution - Handling simulation mode gracefully - Managing ffmpeg processes """
[docs] def __init__(self, video_size: str = "320x240", picture_size: str = "640x480", video_device: str = "/dev/video0", storage_path: str = "/data/user_storage"): """ Initialize Camera with configuration options. Args: video_size: Resolution for video recording (default: "320x240") picture_size: Resolution for picture capture (default: "640x480") video_device: Video device path (default: "/dev/video0") storage_path: Path where media files will be saved (default: "/data/user_storage") """ self.video_size = video_size self.picture_size = picture_size self.video_device = video_device self.storage_path = storage_path self._active_video_process: Optional[subprocess.Popen] = None
[docs] def cleanup_ffmpeg_processes(self) -> None: """Clean up any running ffmpeg processes using killall.""" try: subprocess.run(['killall', 'ffmpeg'], capture_output=True, check=False) except Exception: pass # Fail silently if cleanup doesn't work
[docs] def capture_picture(self, protocol, when: str = "image") -> None: """ Take a picture at a specific protocol step. Args: protocol: Opentrons protocol context when: Description of when the picture was taken (used in filename) Returns: Filename of captured image if successful, None if simulation or failed """ if protocol.is_simulating(): protocol.comment(f'[SIMULATION] Taking picture at protocol step: {when}') return protocol.comment(f'Taking picture at protocol step: {when}') timestamp = int(time.time()) filename = f'{when}_image_{timestamp}.jpg' filepath = f'{self.storage_path}/{filename}' try: subprocess.check_call([ 'ffmpeg', '-loglevel', 'error', '-y', '-f', 'video4linux2', '-s', self.picture_size, '-i', self.video_device, '-ss', '0:0:1', '-frames', '1', filepath ]) protocol.comment(f'{when.title()} picture captured: {filename}') except subprocess.CalledProcessError as e: protocol.comment(f'Warning: Picture capture failed: {e}, continuing protocol')
[docs] def start_video(self, protocol) -> None: """ Start video recording. Args: protocol: Opentrons protocol context Returns: Video process handle if successful, None if simulation or failed """ if protocol.is_simulating(): protocol.comment('[SIMULATION] Starting video recording') return # Clean up any existing processes self.cleanup_ffmpeg_processes() time.sleep(0.5) # Brief pause for cleanup timestamp = int(time.time()) filename = f'video_image_{timestamp}.mp4' filepath = f'{self.storage_path}/{filename}' try: video_process = subprocess.Popen([ 'ffmpeg', '-loglevel', 'error', '-y', '-video_size', self.video_size, '-i', self.video_device, filepath ]) self._active_video_process = video_process protocol.comment(f"Video recording started: {filename}") except Exception as e: protocol.comment(f"Warning: Video recording failed: {e}")
[docs] def stop_video(self, protocol) -> None: """ Stop video recording. Args: protocol: Opentrons protocol context video_process: Video process to stop (uses active process if None) """ if protocol.is_simulating(): protocol.comment('[SIMULATION] Stopping video recording') return # Use provided process or the active one process_to_stop = self._active_video_process if process_to_stop is None: protocol.comment("No video recording process to stop") return if process_to_stop.poll() is None: # Process is still running try: process_to_stop.terminate() process_to_stop.wait(timeout=5) protocol.comment("Video recording stopped") except subprocess.TimeoutExpired: process_to_stop.kill() process_to_stop.wait() protocol.comment("Video recording force-stopped") except Exception as e: protocol.comment(f"Warning: Error stopping video: {e}") else: protocol.comment("Video recording already stopped") # Clear active process if it was the one we stopped self._active_video_process = None
[docs] class SmartPipette: """ Opentrons pipette wrapper that uses the API's liquid-tracking system to compute safe aspiration heights for conical tubes. For standard flat-bottom or round-bottom wells, ``SmartPipette`` behaves identically to the underlying pipette. For conical tubes (detected by labware name or the ``use`` flag), it queries the current liquid volume via ``well.current_liquid_volume()`` and converts that to a millimetre height, keeping the tip above the meniscus and away from the narrow tip of the cone. This prevents the pipette tip from plunging into an empty tube or aspirating air when a tube is nearly empty — a common failure mode in protocols that dispense large total volumes from a single stock tube (e.g. LB distribution during plating). """
[docs] def __init__(self, pipette, protocol): """ Initialize SmartPipette. Args: pipette: A loaded Opentrons pipette instrument object. protocol: The active ``ProtocolContext``. Must support ``define_liquid`` (API level ≥ 2.14). Raises: RuntimeError: If the protocol context does not expose liquid tracking (API level too old). """ self.pipette = pipette self.protocol = protocol if not hasattr(protocol, 'define_liquid'): raise RuntimeError("This class requires API with liquid tracking support")
[docs] def is_conical_tube(self, well, use: bool = False) -> bool: """Check if the well is from a conical tube labware or manually set as true""" return 'conical' in well.parent.load_name.lower() or use
[docs] def get_well_volume(self, well) -> Optional[float]: """Get current volume in well using pure API method""" try: return well.current_liquid_volume() except Exception as e: self.protocol.comment(f"ERROR reading volume from {well.well_name}: {e}") return None
[docs] def get_well_height(self, well) -> Optional[float]: """Get current liquid height using pure API method (if available)""" try: if hasattr(well, 'current_liquid_height'): return well.current_liquid_height() else: self.protocol.comment("Liquid height method not available in this API version") return None except Exception as e: self.protocol.comment(f"ERROR reading height from {well.well_name}: {e}") return None
[docs] def get_conical_tube_aspiration_height(self, well) -> float: """ Calculate safe aspiration height for conical tubes using proven method Uses API liquid tracking to get current volume """ # Get current volume from API try: current_volume = well.current_liquid_volume() if current_volume is None: raise ValueError("API returned None for liquid volume") except Exception as e: self.protocol.comment(f"ERROR: Could not get liquid volume from API: {e}") return 10.0 # Safe fallback height max_volume = well.max_volume tube_depth = well.depth - 10 # Account for threads min_safe_height = 3 # mm minimum to prevent tip damage meniscus_offset = 10 # mm below liquid surface # Calculate liquid height based on current volume liquid_height = (current_volume / max_volume) * tube_depth aspiration_height = max(liquid_height - meniscus_offset, min_safe_height) self.protocol.comment( f"Conical calculation: {current_volume:.0f}µL remaining = {aspiration_height:.1f}mm height") return aspiration_height
[docs] def get_aspiration_location(self, well, use: bool = False) -> float: """ Get intelligent aspiration location using API volume data and proven height calculation """ if not self.is_conical_tube(well, use=use): return well try: current_volume = well.current_liquid_volume() if current_volume is None or current_volume < well.max_volume * 0.2: # Less than 20% remaining - use standard aspiration self.protocol.comment("Low volume detected - using standard aspiration") return well # Use conical tube calculation safe_height = self.get_conical_tube_aspiration_height(well) return well.bottom(safe_height) except Exception as e: self.protocol.comment(f"ERROR getting volume from API: {e}") return well # Fallback to standard aspiration
[docs] def liquid_transfer(self, volume: float, source, destination, asp_rate: float = 0.5, disp_rate: float = 1.0, blow_out: bool = True, touch_tip: bool = False, mix_before: float = 0.0, mix_after: float = 0.0, mix_reps: int = 3, new_tip: bool = True, drop_tip: bool = True, use:bool = False) -> bool: """ Transfer liquid using pure API liquid tracking for volume management Returns: bool: True if transfer was successful, False if insufficient volume """ # Check volume using API methods only try: current_volume = source.current_liquid_volume() if current_volume is None: self.protocol.comment("WARNING: API returned None for source volume") return False if current_volume < volume: self.protocol.comment(f"WARNING: Insufficient volume. " f"Requested: {volume}µL, Available: {current_volume:.0f}µL") return False except Exception as e: self.protocol.comment(f"ERROR: Could not check source volume: {e}") return False if new_tip: self.pipette.pick_up_tip() # Get aspiration location using API data + proven calculation aspiration_location = self.get_aspiration_location(source,use) # Mix before if requested if mix_before > 0: # Use current volume to limit mixing try: safe_mix_volume = min(mix_before, current_volume * 0.8) self.pipette.mix(mix_reps, safe_mix_volume, aspiration_location) except: self.protocol.comment("Skipping mix_before due to API error") # Aspirate self.pipette.aspirate(volume, aspiration_location, rate=asp_rate) # Dispense self.pipette.dispense(volume, destination.center(), rate=disp_rate) # Mix after if requested if mix_after > 0: self.pipette.mix(mix_reps, mix_after, destination) if blow_out: self.pipette.blow_out() if touch_tip: self.pipette.touch_tip() if drop_tip: self.pipette.drop_tip() return True