""" """ __author__ = "Lukas Mahler" __copyright__ = "Copyright 2020-2021" __version__ = "0.1.4" __date__ = "15.01.2021" __email__ = "lm@ankerlab.de" __status__ = "Development" import os import sys import tinydb import requests import subprocess from math import ceil from time import sleep from dateutil import parser from datetime import datetime, timedelta from requests.utils import requote_uri from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.common.exceptions import NoSuchElementException from PyQt5 import QtWidgets from PyQt5.QtCore import Qt from UX import Ui_Main import st_rc # CONSTANTS # APPID = 730 CURRENCY = 3 SPACER = "-" * 65 # CONSTANTS # class STInstance(QtWidgets.QMainWindow, Ui_Main): def __init__(self, parent=None): QtWidgets.QWidget.__init__(self, parent) self.setWindowFlags(Qt.FramelessWindowHint) self.setAttribute(Qt.WA_TranslucentBackground) self.setupUi(self) self.frameOnline.setDisabled(True) # Vars self.chrome_driver = None self.dbflag = None self.find_driver() def btnexit(self): self.destroy() sys.exit(app.exec_()) def dbconnect(self): self.frameDatabase.setDisabled(True) self.frameGeneral.setEnabled(True) self.rbDatabaseStatus.setChecked(True) self.status("Connected to Database") def evaluate(self): self.status("Placeholder") def local(self): """ Toggles Offline or Online Database """ if self.cbUseLocalDatabase.isChecked(): self.frameOnline.setDisabled(True) self.leLocalDBName.setEnabled(True) self.dbflag = "offline" else: self.frameOnline.setEnabled(True) self.leLocalDBName.setDisabled(True) self.dbflag = "online" def find_driver(self): """ Check for chromedriver """ chrome_driver = None try: chrome_driver = os.getcwd() + "\\chromedriver.exe" # chrome_driver = chrome_driver.resolve() os.path.isfile(chrome_driver) except Exception as e: print(e) self.status("Chromedriver wasn't found in current working directory") pass try: chrome_driver = os.environ["PROGRAMFILES(x86)"] + r"\Google\Chrome\driver\chromedriver.exe" os.path.isfile(chrome_driver) except Exception as e: print(e) self.status(r"Chromedriver wasn't found under %PROGRAMFILES(x86)%\Google\Chrome\driver") pass if not os.path.isfile(chrome_driver): self.status("Couldn't find a Chromedriver, exiting...") exit() else: self.status("Chromedriver found") self.chrome_driver = chrome_driver self.rbChromedriverStatus.setChecked(True) def status(self, txt): self.statusbar.showMessage(txt) # ============================================================================================================== # General Steam API Calls # ============================================================================================================== def steamapicall(hashname): global db global bucket ''' Reuse cached values for already made calls ''' Item = tinydb.Query() query = Item.hashname == hashname dbitem = db.search(query) if len(dbitem) > 0: timestamp = parser.parse(dbitem[0]['lastupdated']) # print(timestamp, (datetime.now() - timedelta(hours=1))) if timestamp > (datetime.now() - timedelta(hours=1)): return dbitem[0] ''' Stop possible rate limiting ''' # print(bucket) if bucket == 20: for i in reversed(range(0, 60)): print("Bucket is full, waiting... {0:2d}".format(i), end="\r") sleep(1) print("") bucket = 1 url = "https://steamcommunity.com/market/priceoverview/?currency={0}&appid={1}&market_hash_name={2}".format( CURRENCY, APPID, hashname) url = requote_uri(url) r = requests.get(url) if r.status_code == 200: bucket += 1 json = r.json() json.pop('success') json['hashname'] = hashname json['lastupdated'] = str(datetime.now()) db.upsert(json, query) return json else: raise Exception("APIERROR: {0}".format(r.status_code)) # print("APIERROR: {0}".format(r.status_code)) # sleep(60) # ============================================================================================================== # Item Evaluation # ============================================================================================================== def getbaseprice(given): if isinstance(given, dict): name = given['full_item_name'] else: name = given.split("730/")[1] json = steamapicall(name) try: price = json['median_price'].replace("-", "0") except KeyError: # Using lowest price if there is no median price provided price = json['lowest_price'].replace("-", "0") baseprice = round(float(price[:-1].replace(",", ".")), 2) return baseprice def getfloatval(idic): wear_ranges = { 'Factory New': "0-0.07", 'Minimal Wear': "0.07-0.15", 'Field-Tested': "0.15-0.38", 'Well-Worn': "0.38-0.45", 'Battle-Scarred': "0.45-1" } fmin, fmax = map(float, wear_ranges[idic['wear_name']].split("-")) if idic['min'] > fmin: fmin = idic['min'] cur = idic['floatvalue'] rankval = 100 - (((cur - fmin) / (fmax - fmin)) * 100) if rankval >= 99: valueadd = idic['baseprice'] * 0.05 else: valueadd = 0 # print("{0} rankval is {1}".format(o,round(rankval,2))) return valueadd def getstickerprice(sticker): name = "Sticker | {0}".format(sticker['name']) json = steamapicall(name) try: mprice = json['median_price'].replace("-", "0") except KeyError: mprice = "9999€" mprice = float(mprice[:-1].replace(",", ".")) try: lprice = json['lowest_price'].replace("-", "0") except KeyError: lprice = "9999€" lprice = float(lprice[:-1].replace(",", ".")) price = min(mprice, lprice) valueadd = round(price * 0.05, 2) return valueadd # ============================================================================================================== # Tools # ============================================================================================================== def getexchangerate(symbol="USD"): url = "https://api.exchangeratesapi.io/latest?symbols={0}".format(symbol) r = requests.get(url) rate = r.json()['rates'][symbol] date = r.json()['date'] return rate, date def usdtoeur(usd): global rate usd = float(usd[1:].split(" ")[0]) eur = round((usd / rate), 2) return eur def timer(msg, time): subprocess.call(["python", "xtimer.py", msg, time], creationflags=subprocess.CREATE_NEW_CONSOLE) # ============================================================================================================== # Evaluate Steam Market Listing # ============================================================================================================== def evaluate(baseprice, listing): ins = listing['ins'] api = "https://api.csgofloat.com/?url=" call = api + ins r = requests.get(call) idic = r.json()['iteminfo'] # print(idic) iname = idic['full_item_name'] ifloat = idic['floatvalue'] istickers = idic['stickers'] price = usdtoeur(listing['price']) # value, = .split(" ") # print(price) # price = round(float(price[:-1].replace(",", ".")), 2) print("Evaluating {0} - [{1} float]\n{2:9.2f}€\n{3}".format(iname, round(ifloat, 6), price, SPACER)) # Evaluate Base Price idic['baseprice'] = baseprice print(" {0:7}€ Base Price".format(baseprice)) # Evaluate Stickers stickeradd = 0 for sticker in istickers: if 'wear' in sticker: # print("- Scraped Sticker: {0} ({1})".format(sticker['name'],round(sticker['wear'],2))) pass else: valueadd = getstickerprice(sticker) stickeradd += valueadd print("+ {0:7}€ Intact Sticker: {1}".format(valueadd, sticker['name'])) # Evalute Floats valueadd = getfloatval(idic) if valueadd != 0: print("+ {0:7}€ Float".format(valueadd)) # print("= Float is: {0} ({1})".format(round(ifloat, 3), idic['wear_name'])) # Evaluate And print possible profit profit = round((baseprice + stickeradd + valueadd) - price, 2) print("{0}\n{1:9.2f}€\n".format(SPACER, profit)) # ============================================================================================================== # Get listings from Steam market # ============================================================================================================== def getmarketlistings(marketurl, n=10): marketlistings = {} pages = n / 10 i = 1 chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--window-size=1920x1080") chrome_options.add_argument("--log-level=3") # Fatal chrome_driver = find_driver() driver = webdriver.Chrome(chrome_options=chrome_options, executable_path=chrome_driver) os.system("CLS") driver.get(marketurl) for page in range(1, ceil(pages)+1): # https://stackoverflow.com/questions/62313034/how-does-the-steam-marketplace-change-values-without-a-url-change-webscraping # There is this weird bug where the first page has all the random currencys and the following pages are all USD # when going back from Page 2 to Page 1 all currencys are USD aswell, so going back and forth on first page # gives us good values if page == 1: driver.find_element_by_css_selector("#searchResults_btn_next").click() sleep(1) driver.find_element_by_css_selector("#searchResults_btn_prev").click() sleep(1) if (page % 10) == 0: for i in reversed(range(0, 60)): print("Waiting to not get rate limited... {0:2d}".format(i), end="\r") sleep(1) print("") print("Query Page", page) try: listings = driver.find_elements_by_css_selector("#searchResultsRows > div") listings.pop(0) # delete ratetable header div except IndexError: return marketlistings for listing in listings: lid = listing.get_attribute('id') try: price = driver.find_element_by_css_selector("#{0} > div.market_listing_price_listings_block > div.market_listing_right_cell.market_listing_their_price > span > span.market_listing_price.market_listing_price_with_fee".format(lid)).text except NoSuchElementException: sleep(0.2) print("Unknown Price not Found happend again, skipping...") continue btn = driver.find_element_by_id("{0}_actionmenu_button".format(lid)) driver.execute_script("arguments[0].click();", btn) popup = driver.find_element_by_css_selector("#market_action_popup_itemactions > a") href = popup.get_attribute('href') # Skip Sold Item if price == "Verkauft": continue marketlistings[i] = { "id": lid, "ins": href, "price": price } i += 1 if i > n: break # Next page driver.find_element_by_css_selector("#searchResults_btn_next").click() # driver.refresh() sleep(1) print("\n") return marketlistings # ============================================================================================================== # Main # ============================================================================================================== if __name__ == '__main__': app = QtWidgets.QApplication(sys.argv) window = STInstance() window.show() ''' bucket = 1 db = tinydb.TinyDB('tiny.db', sort_keys=True, indent=4, separators=(',', ': '), ensure_ascii=False) db.default_ratetable_name = 'Memory' ratetable = db.table("Rate") try: if ratetable.get(doc_id=1)['date'] != datetime.today().strftime('%Y-%m-%d'): rate, date = getexchangerate() ratetable.update({"rate": rate, "date": date}) else: # Reuse Rate from today rate = ratetable.get(doc_id=1)['rate'] except TypeError: # happens on nonexisting rate data in db rate, date = getexchangerate() ratetable.update({"rate": rate, "date": date}) url = "https://steamcommunity.com/market/listings/730/AK-47%20%7C%20Redline%20%28Field-Tested%29" baseprice = getbaseprice(url) marketlistings = getmarketlistings(url, 115) for index in marketlistings: print("{:3d}.".format(index), end=" ") evaluate(baseprice, marketlistings[index]) ''' sys.exit(app.exec_())