SCMautoeval/SteamTrace.py

412 lines
13 KiB
Python

"""
"""
__author__ = "Lukas Mahler"
__copyright__ = "Copyright 2020-2021"
__version__ = "0.1.4"
__date__ = "15.01.2021"
__email__ = "lm@ankerlab.de"
__status__ = "Development"
import os
import sys
import tinydb
import requests
import subprocess
from math import ceil
from time import sleep
from dateutil import parser
from datetime import datetime, timedelta
from requests.utils import requote_uri
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import NoSuchElementException
from PyQt5 import QtWidgets
from PyQt5.QtCore import Qt
from UX import Ui_Main
import st_rc
# CONSTANTS #
APPID = 730
CURRENCY = 3
SPACER = "-" * 65
# CONSTANTS #
class STInstance(QtWidgets.QMainWindow, Ui_Main):
def __init__(self, parent=None):
QtWidgets.QWidget.__init__(self, parent)
self.setWindowFlags(Qt.FramelessWindowHint)
self.setAttribute(Qt.WA_TranslucentBackground)
self.setupUi(self)
self.frameOnline.setDisabled(True)
# Vars
self.chrome_driver = None
self.dbflag = None
self.find_driver()
def btnexit(self):
self.destroy()
sys.exit(app.exec_())
def dbconnect(self):
self.frameDatabase.setDisabled(True)
self.frameGeneral.setEnabled(True)
self.rbDatabaseStatus.setChecked(True)
self.status("Connected to Database")
def evaluate(self):
self.status("Placeholder")
def local(self):
""" Toggles Offline or Online Database """
if self.cbUseLocalDatabase.isChecked():
self.frameOnline.setDisabled(True)
self.leLocalDBName.setEnabled(True)
self.dbflag = "offline"
else:
self.frameOnline.setEnabled(True)
self.leLocalDBName.setDisabled(True)
self.dbflag = "online"
def find_driver(self):
""" Check for chromedriver """
chrome_driver = None
try:
chrome_driver = os.getcwd() + "\\chromedriver.exe"
# chrome_driver = chrome_driver.resolve()
os.path.isfile(chrome_driver)
except Exception as e:
print(e)
self.status("Chromedriver wasn't found in current working directory")
pass
try:
chrome_driver = os.environ["PROGRAMFILES(x86)"] + r"\Google\Chrome\driver\chromedriver.exe"
os.path.isfile(chrome_driver)
except Exception as e:
print(e)
self.status(r"Chromedriver wasn't found under %PROGRAMFILES(x86)%\Google\Chrome\driver")
pass
if not os.path.isfile(chrome_driver):
self.status("Couldn't find a Chromedriver, exiting...")
exit()
else:
self.status("Chromedriver found")
self.chrome_driver = chrome_driver
self.rbChromedriverStatus.setChecked(True)
def status(self, txt):
self.statusbar.showMessage(txt)
# ==============================================================================================================
# General Steam API Calls
# ==============================================================================================================
def steamapicall(hashname):
global db
global bucket
''' Reuse cached values for already made calls '''
Item = tinydb.Query()
query = Item.hashname == hashname
dbitem = db.search(query)
if len(dbitem) > 0:
timestamp = parser.parse(dbitem[0]['lastupdated'])
# print(timestamp, (datetime.now() - timedelta(hours=1)))
if timestamp > (datetime.now() - timedelta(hours=1)):
return dbitem[0]
''' Stop possible rate limiting '''
# print(bucket)
if bucket == 20:
for i in reversed(range(0, 60)):
print("Bucket is full, waiting... {0:2d}".format(i), end="\r")
sleep(1)
print("")
bucket = 1
url = "https://steamcommunity.com/market/priceoverview/?currency={0}&appid={1}&market_hash_name={2}".format(
CURRENCY, APPID, hashname)
url = requote_uri(url)
r = requests.get(url)
if r.status_code == 200:
bucket += 1
json = r.json()
json.pop('success')
json['hashname'] = hashname
json['lastupdated'] = str(datetime.now())
db.upsert(json, query)
return json
else:
raise Exception("APIERROR: {0}".format(r.status_code))
# print("APIERROR: {0}".format(r.status_code))
# sleep(60)
# ==============================================================================================================
# Item Evaluation
# ==============================================================================================================
def getbaseprice(given):
if isinstance(given, dict):
name = given['full_item_name']
else:
name = given.split("730/")[1]
json = steamapicall(name)
try:
price = json['median_price'].replace("-", "0")
except KeyError:
# Using lowest price if there is no median price provided
price = json['lowest_price'].replace("-", "0")
baseprice = round(float(price[:-1].replace(",", ".")), 2)
return baseprice
def getfloatval(idic):
wear_ranges = {
'Factory New': "0-0.07",
'Minimal Wear': "0.07-0.15",
'Field-Tested': "0.15-0.38",
'Well-Worn': "0.38-0.45",
'Battle-Scarred': "0.45-1"
}
fmin, fmax = map(float, wear_ranges[idic['wear_name']].split("-"))
if idic['min'] > fmin:
fmin = idic['min']
cur = idic['floatvalue']
rankval = 100 - (((cur - fmin) / (fmax - fmin)) * 100)
if rankval >= 99:
valueadd = idic['baseprice'] * 0.05
else:
valueadd = 0
# print("{0} rankval is {1}".format(o,round(rankval,2)))
return valueadd
def getstickerprice(sticker):
name = "Sticker | {0}".format(sticker['name'])
json = steamapicall(name)
try:
mprice = json['median_price'].replace("-", "0")
except KeyError:
mprice = "9999€"
mprice = float(mprice[:-1].replace(",", "."))
try:
lprice = json['lowest_price'].replace("-", "0")
except KeyError:
lprice = "9999€"
lprice = float(lprice[:-1].replace(",", "."))
price = min(mprice, lprice)
valueadd = round(price * 0.05, 2)
return valueadd
# ==============================================================================================================
# Tools
# ==============================================================================================================
def getexchangerate(symbol="USD"):
url = "https://api.exchangeratesapi.io/latest?symbols={0}".format(symbol)
r = requests.get(url)
rate = r.json()['rates'][symbol]
date = r.json()['date']
return rate, date
def usdtoeur(usd):
global rate
usd = float(usd[1:].split(" ")[0])
eur = round((usd / rate), 2)
return eur
def timer(msg, time):
subprocess.call(["python", "xtimer.py", msg, time], creationflags=subprocess.CREATE_NEW_CONSOLE)
# ==============================================================================================================
# Evaluate Steam Market Listing
# ==============================================================================================================
def evaluate(baseprice, listing):
ins = listing['ins']
api = "https://api.csgofloat.com/?url="
call = api + ins
r = requests.get(call)
idic = r.json()['iteminfo']
# print(idic)
iname = idic['full_item_name']
ifloat = idic['floatvalue']
istickers = idic['stickers']
price = usdtoeur(listing['price'])
# value, = .split(" ")
# print(price)
# price = round(float(price[:-1].replace(",", ".")), 2)
print("Evaluating {0} - [{1} float]\n{2:9.2f}\n{3}".format(iname, round(ifloat, 6), price, SPACER))
# Evaluate Base Price
idic['baseprice'] = baseprice
print(" {0:7}€ Base Price".format(baseprice))
# Evaluate Stickers
stickeradd = 0
for sticker in istickers:
if 'wear' in sticker:
# print("- Scraped Sticker: {0} ({1})".format(sticker['name'],round(sticker['wear'],2)))
pass
else:
valueadd = getstickerprice(sticker)
stickeradd += valueadd
print("+ {0:7}€ Intact Sticker: {1}".format(valueadd, sticker['name']))
# Evalute Floats
valueadd = getfloatval(idic)
if valueadd != 0:
print("+ {0:7}€ Float".format(valueadd))
# print("= Float is: {0} ({1})".format(round(ifloat, 3), idic['wear_name']))
# Evaluate And print possible profit
profit = round((baseprice + stickeradd + valueadd) - price, 2)
print("{0}\n{1:9.2f}\n".format(SPACER, profit))
# ==============================================================================================================
# Get listings from Steam market
# ==============================================================================================================
def getmarketlistings(marketurl, n=10):
marketlistings = {}
pages = n / 10
i = 1
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--window-size=1920x1080")
chrome_options.add_argument("--log-level=3") # Fatal
chrome_driver = find_driver()
driver = webdriver.Chrome(chrome_options=chrome_options, executable_path=chrome_driver)
os.system("CLS")
driver.get(marketurl)
for page in range(1, ceil(pages)+1):
# https://stackoverflow.com/questions/62313034/how-does-the-steam-marketplace-change-values-without-a-url-change-webscraping
# There is this weird bug where the first page has all the random currencys and the following pages are all USD
# when going back from Page 2 to Page 1 all currencys are USD aswell, so going back and forth on first page
# gives us good values
if page == 1:
driver.find_element_by_css_selector("#searchResults_btn_next").click()
sleep(1)
driver.find_element_by_css_selector("#searchResults_btn_prev").click()
sleep(1)
if (page % 10) == 0:
for i in reversed(range(0, 60)):
print("Waiting to not get rate limited... {0:2d}".format(i), end="\r")
sleep(1)
print("")
print("Query Page", page)
try:
listings = driver.find_elements_by_css_selector("#searchResultsRows > div")
listings.pop(0) # delete ratetable header div
except IndexError:
return marketlistings
for listing in listings:
lid = listing.get_attribute('id')
try:
price = driver.find_element_by_css_selector("#{0} > div.market_listing_price_listings_block > div.market_listing_right_cell.market_listing_their_price > span > span.market_listing_price.market_listing_price_with_fee".format(lid)).text
except NoSuchElementException:
sleep(0.2)
print("Unknown Price not Found happend again, skipping...")
continue
btn = driver.find_element_by_id("{0}_actionmenu_button".format(lid))
driver.execute_script("arguments[0].click();", btn)
popup = driver.find_element_by_css_selector("#market_action_popup_itemactions > a")
href = popup.get_attribute('href')
# Skip Sold Item
if price == "Verkauft":
continue
marketlistings[i] = {
"id": lid,
"ins": href,
"price": price
}
i += 1
if i > n:
break
# Next page
driver.find_element_by_css_selector("#searchResults_btn_next").click()
# driver.refresh()
sleep(1)
print("\n")
return marketlistings
# ==============================================================================================================
# Main
# ==============================================================================================================
if __name__ == '__main__':
app = QtWidgets.QApplication(sys.argv)
window = STInstance()
window.show()
'''
bucket = 1
db = tinydb.TinyDB('tiny.db',
sort_keys=True,
indent=4,
separators=(',', ': '),
ensure_ascii=False)
db.default_ratetable_name = 'Memory'
ratetable = db.table("Rate")
try:
if ratetable.get(doc_id=1)['date'] != datetime.today().strftime('%Y-%m-%d'):
rate, date = getexchangerate()
ratetable.update({"rate": rate, "date": date})
else:
# Reuse Rate from today
rate = ratetable.get(doc_id=1)['rate']
except TypeError:
# happens on nonexisting rate data in db
rate, date = getexchangerate()
ratetable.update({"rate": rate, "date": date})
url = "https://steamcommunity.com/market/listings/730/AK-47%20%7C%20Redline%20%28Field-Tested%29"
baseprice = getbaseprice(url)
marketlistings = getmarketlistings(url, 115)
for index in marketlistings:
print("{:3d}.".format(index), end=" ")
evaluate(baseprice, marketlistings[index])
'''
sys.exit(app.exec_())