get_beacon_gas_prices

Function

def get_beacon_gas_prices(speed=None, cache_interval_seconds=10):
    speeds = ('rapid', 'fast', 'standard', 'slow',)
    os.makedirs(cache_folder := './data/cache/', exist_ok=True)
    gas = {}
    try:
        gas = json.load(open(gasnow_file := "{}/gasnow.json".format(cache_folder)))
    except (JSONDecodeError, FileNotFoundError):
        pass
    if not gas or not gas['data'] or (gas['data']['timestamp'] / 1000) + cache_interval_seconds < time.time():
        try:
            r = requests.get('https://beacon.pulsechain.com/api/v1/execution/gasnow')
            _gas = r.json()
        except Exception as e:
            if not gas or not gas['data']:
                logging.debug(e)
                return 5555 * 10 ** 369
        else:
            if not _gas and not gas:
                logging.debug("No gas data returned from GasNow API endpoint")
                return 5555 * 10 ** 369
            elif _gas['data']:
                gas = _gas
                open(gasnow_file, 'w').write(json.dumps(gas, indent=4))
    if type(speed) is str:
        try:
            return float(web3.from_wei(gas['data'][speed], 'gwei'))
        except KeyError:
            raise KeyError("No such speed as '{}' in gas price data {}".format(speed, list(speeds)))
    return {speed: float(web3.from_wei(price, 'gwei')) for speed, price in gas['data'].items() if speed in speeds}

Description

  • Create the cache folder if it doesn’t exist

  • Load the last gas prices from cache if it exists

  • Check if the timestamp is old or if no gas data was found

  • Pull the most recent gas prices from Beacon

  • Cache it if the server responded with data

  • Return absurd gas if the function yields no data

  • Check if a speed level was specified

  • Returns the price in gwei for one or all of the speed tiers