412 lines
13 KiB
Python
412 lines
13 KiB
Python
"""
|
|
"""
|
|
|
|
__author__ = "Lukas Mahler"
|
|
__copyright__ = "Copyright 2020-2021"
|
|
__version__ = "0.1.4"
|
|
__date__ = "15.01.2021"
|
|
__email__ = "lm@ankerlab.de"
|
|
__status__ = "Development"
|
|
|
|
import os
|
|
import sys
|
|
import tinydb
|
|
import requests
|
|
import subprocess
|
|
from math import ceil
|
|
from time import sleep
|
|
from dateutil import parser
|
|
from datetime import datetime, timedelta
|
|
from requests.utils import requote_uri
|
|
from selenium import webdriver
|
|
from selenium.webdriver.chrome.options import Options
|
|
from selenium.common.exceptions import NoSuchElementException
|
|
|
|
from PyQt5 import QtWidgets
|
|
from PyQt5.QtCore import Qt
|
|
from UX import Ui_Main
|
|
import st_rc
|
|
|
|
|
|
# CONSTANTS #
|
|
APPID = 730
|
|
CURRENCY = 3
|
|
SPACER = "-" * 65
|
|
# CONSTANTS #
|
|
|
|
|
|
class STInstance(QtWidgets.QMainWindow, Ui_Main):
|
|
def __init__(self, parent=None):
|
|
QtWidgets.QWidget.__init__(self, parent)
|
|
self.setWindowFlags(Qt.FramelessWindowHint)
|
|
self.setAttribute(Qt.WA_TranslucentBackground)
|
|
self.setupUi(self)
|
|
self.frameOnline.setDisabled(True)
|
|
|
|
# Vars
|
|
self.chrome_driver = None
|
|
self.dbflag = None
|
|
|
|
self.find_driver()
|
|
|
|
def btnexit(self):
|
|
self.destroy()
|
|
sys.exit(app.exec_())
|
|
|
|
def dbconnect(self):
|
|
self.frameDatabase.setDisabled(True)
|
|
self.frameGeneral.setEnabled(True)
|
|
self.rbDatabaseStatus.setChecked(True)
|
|
self.status("Connected to Database")
|
|
|
|
def evaluate(self):
|
|
self.status("Placeholder")
|
|
|
|
def local(self):
|
|
""" Toggles Offline or Online Database """
|
|
|
|
if self.cbUseLocalDatabase.isChecked():
|
|
self.frameOnline.setDisabled(True)
|
|
self.leLocalDBName.setEnabled(True)
|
|
self.dbflag = "offline"
|
|
else:
|
|
self.frameOnline.setEnabled(True)
|
|
self.leLocalDBName.setDisabled(True)
|
|
self.dbflag = "online"
|
|
|
|
def find_driver(self):
|
|
""" Check for chromedriver """
|
|
|
|
chrome_driver = None
|
|
try:
|
|
chrome_driver = os.getcwd() + "\\chromedriver.exe"
|
|
# chrome_driver = chrome_driver.resolve()
|
|
os.path.isfile(chrome_driver)
|
|
except Exception as e:
|
|
print(e)
|
|
self.status("Chromedriver wasn't found in current working directory")
|
|
pass
|
|
try:
|
|
chrome_driver = os.environ["PROGRAMFILES(x86)"] + r"\Google\Chrome\driver\chromedriver.exe"
|
|
os.path.isfile(chrome_driver)
|
|
except Exception as e:
|
|
print(e)
|
|
self.status(r"Chromedriver wasn't found under %PROGRAMFILES(x86)%\Google\Chrome\driver")
|
|
pass
|
|
|
|
if not os.path.isfile(chrome_driver):
|
|
self.status("Couldn't find a Chromedriver, exiting...")
|
|
exit()
|
|
else:
|
|
self.status("Chromedriver found")
|
|
self.chrome_driver = chrome_driver
|
|
self.rbChromedriverStatus.setChecked(True)
|
|
|
|
def status(self, txt):
|
|
self.statusbar.showMessage(txt)
|
|
|
|
# ==============================================================================================================
|
|
# General Steam API Calls
|
|
# ==============================================================================================================
|
|
|
|
def steamapicall(hashname):
|
|
global db
|
|
global bucket
|
|
|
|
''' Reuse cached values for already made calls '''
|
|
Item = tinydb.Query()
|
|
query = Item.hashname == hashname
|
|
dbitem = db.search(query)
|
|
if len(dbitem) > 0:
|
|
timestamp = parser.parse(dbitem[0]['lastupdated'])
|
|
# print(timestamp, (datetime.now() - timedelta(hours=1)))
|
|
if timestamp > (datetime.now() - timedelta(hours=1)):
|
|
return dbitem[0]
|
|
|
|
''' Stop possible rate limiting '''
|
|
# print(bucket)
|
|
if bucket == 20:
|
|
for i in reversed(range(0, 60)):
|
|
print("Bucket is full, waiting... {0:2d}".format(i), end="\r")
|
|
sleep(1)
|
|
print("")
|
|
bucket = 1
|
|
|
|
url = "https://steamcommunity.com/market/priceoverview/?currency={0}&appid={1}&market_hash_name={2}".format(
|
|
CURRENCY, APPID, hashname)
|
|
url = requote_uri(url)
|
|
r = requests.get(url)
|
|
if r.status_code == 200:
|
|
bucket += 1
|
|
json = r.json()
|
|
json.pop('success')
|
|
json['hashname'] = hashname
|
|
json['lastupdated'] = str(datetime.now())
|
|
db.upsert(json, query)
|
|
return json
|
|
else:
|
|
raise Exception("APIERROR: {0}".format(r.status_code))
|
|
# print("APIERROR: {0}".format(r.status_code))
|
|
# sleep(60)
|
|
|
|
|
|
# ==============================================================================================================
|
|
# Item Evaluation
|
|
# ==============================================================================================================
|
|
|
|
def getbaseprice(given):
|
|
|
|
if isinstance(given, dict):
|
|
name = given['full_item_name']
|
|
else:
|
|
name = given.split("730/")[1]
|
|
|
|
json = steamapicall(name)
|
|
try:
|
|
price = json['median_price'].replace("-", "0")
|
|
except KeyError:
|
|
# Using lowest price if there is no median price provided
|
|
price = json['lowest_price'].replace("-", "0")
|
|
|
|
baseprice = round(float(price[:-1].replace(",", ".")), 2)
|
|
return baseprice
|
|
|
|
|
|
def getfloatval(idic):
|
|
wear_ranges = {
|
|
'Factory New': "0-0.07",
|
|
'Minimal Wear': "0.07-0.15",
|
|
'Field-Tested': "0.15-0.38",
|
|
'Well-Worn': "0.38-0.45",
|
|
'Battle-Scarred': "0.45-1"
|
|
}
|
|
fmin, fmax = map(float, wear_ranges[idic['wear_name']].split("-"))
|
|
if idic['min'] > fmin:
|
|
fmin = idic['min']
|
|
cur = idic['floatvalue']
|
|
rankval = 100 - (((cur - fmin) / (fmax - fmin)) * 100)
|
|
if rankval >= 99:
|
|
valueadd = idic['baseprice'] * 0.05
|
|
else:
|
|
valueadd = 0
|
|
# print("{0} rankval is {1}".format(o,round(rankval,2)))
|
|
return valueadd
|
|
|
|
|
|
def getstickerprice(sticker):
|
|
name = "Sticker | {0}".format(sticker['name'])
|
|
json = steamapicall(name)
|
|
try:
|
|
mprice = json['median_price'].replace("-", "0")
|
|
except KeyError:
|
|
mprice = "9999€"
|
|
mprice = float(mprice[:-1].replace(",", "."))
|
|
|
|
try:
|
|
lprice = json['lowest_price'].replace("-", "0")
|
|
except KeyError:
|
|
lprice = "9999€"
|
|
lprice = float(lprice[:-1].replace(",", "."))
|
|
|
|
price = min(mprice, lprice)
|
|
valueadd = round(price * 0.05, 2)
|
|
return valueadd
|
|
|
|
|
|
# ==============================================================================================================
|
|
# Tools
|
|
# ==============================================================================================================
|
|
|
|
def getexchangerate(symbol="USD"):
|
|
url = "https://api.exchangeratesapi.io/latest?symbols={0}".format(symbol)
|
|
r = requests.get(url)
|
|
rate = r.json()['rates'][symbol]
|
|
date = r.json()['date']
|
|
return rate, date
|
|
|
|
|
|
def usdtoeur(usd):
|
|
global rate
|
|
|
|
usd = float(usd[1:].split(" ")[0])
|
|
eur = round((usd / rate), 2)
|
|
return eur
|
|
|
|
|
|
def timer(msg, time):
|
|
subprocess.call(["python", "xtimer.py", msg, time], creationflags=subprocess.CREATE_NEW_CONSOLE)
|
|
|
|
|
|
# ==============================================================================================================
|
|
# Evaluate Steam Market Listing
|
|
# ==============================================================================================================
|
|
|
|
def evaluate(baseprice, listing):
|
|
ins = listing['ins']
|
|
api = "https://api.csgofloat.com/?url="
|
|
call = api + ins
|
|
r = requests.get(call)
|
|
idic = r.json()['iteminfo']
|
|
# print(idic)
|
|
|
|
iname = idic['full_item_name']
|
|
ifloat = idic['floatvalue']
|
|
istickers = idic['stickers']
|
|
|
|
price = usdtoeur(listing['price'])
|
|
# value, = .split(" ")
|
|
# print(price)
|
|
# price = round(float(price[:-1].replace(",", ".")), 2)
|
|
|
|
print("Evaluating {0} - [{1} float]\n{2:9.2f}€\n{3}".format(iname, round(ifloat, 6), price, SPACER))
|
|
|
|
# Evaluate Base Price
|
|
idic['baseprice'] = baseprice
|
|
print(" {0:7}€ Base Price".format(baseprice))
|
|
|
|
# Evaluate Stickers
|
|
stickeradd = 0
|
|
for sticker in istickers:
|
|
if 'wear' in sticker:
|
|
# print("- Scraped Sticker: {0} ({1})".format(sticker['name'],round(sticker['wear'],2)))
|
|
pass
|
|
else:
|
|
valueadd = getstickerprice(sticker)
|
|
stickeradd += valueadd
|
|
print("+ {0:7}€ Intact Sticker: {1}".format(valueadd, sticker['name']))
|
|
|
|
# Evalute Floats
|
|
valueadd = getfloatval(idic)
|
|
if valueadd != 0:
|
|
print("+ {0:7}€ Float".format(valueadd))
|
|
# print("= Float is: {0} ({1})".format(round(ifloat, 3), idic['wear_name']))
|
|
|
|
# Evaluate And print possible profit
|
|
profit = round((baseprice + stickeradd + valueadd) - price, 2)
|
|
print("{0}\n{1:9.2f}€\n".format(SPACER, profit))
|
|
|
|
|
|
# ==============================================================================================================
|
|
# Get listings from Steam market
|
|
# ==============================================================================================================
|
|
|
|
def getmarketlistings(marketurl, n=10):
|
|
|
|
marketlistings = {}
|
|
pages = n / 10
|
|
i = 1
|
|
|
|
chrome_options = Options()
|
|
chrome_options.add_argument("--headless")
|
|
chrome_options.add_argument("--window-size=1920x1080")
|
|
chrome_options.add_argument("--log-level=3") # Fatal
|
|
|
|
chrome_driver = find_driver()
|
|
|
|
driver = webdriver.Chrome(chrome_options=chrome_options, executable_path=chrome_driver)
|
|
os.system("CLS")
|
|
|
|
driver.get(marketurl)
|
|
for page in range(1, ceil(pages)+1):
|
|
|
|
# https://stackoverflow.com/questions/62313034/how-does-the-steam-marketplace-change-values-without-a-url-change-webscraping
|
|
# There is this weird bug where the first page has all the random currencys and the following pages are all USD
|
|
# when going back from Page 2 to Page 1 all currencys are USD aswell, so going back and forth on first page
|
|
# gives us good values
|
|
if page == 1:
|
|
driver.find_element_by_css_selector("#searchResults_btn_next").click()
|
|
sleep(1)
|
|
driver.find_element_by_css_selector("#searchResults_btn_prev").click()
|
|
sleep(1)
|
|
|
|
if (page % 10) == 0:
|
|
for i in reversed(range(0, 60)):
|
|
print("Waiting to not get rate limited... {0:2d}".format(i), end="\r")
|
|
sleep(1)
|
|
print("")
|
|
print("Query Page", page)
|
|
|
|
try:
|
|
listings = driver.find_elements_by_css_selector("#searchResultsRows > div")
|
|
listings.pop(0) # delete ratetable header div
|
|
except IndexError:
|
|
return marketlistings
|
|
|
|
for listing in listings:
|
|
lid = listing.get_attribute('id')
|
|
try:
|
|
price = driver.find_element_by_css_selector("#{0} > div.market_listing_price_listings_block > div.market_listing_right_cell.market_listing_their_price > span > span.market_listing_price.market_listing_price_with_fee".format(lid)).text
|
|
except NoSuchElementException:
|
|
sleep(0.2)
|
|
print("Unknown Price not Found happend again, skipping...")
|
|
continue
|
|
btn = driver.find_element_by_id("{0}_actionmenu_button".format(lid))
|
|
driver.execute_script("arguments[0].click();", btn)
|
|
popup = driver.find_element_by_css_selector("#market_action_popup_itemactions > a")
|
|
href = popup.get_attribute('href')
|
|
|
|
# Skip Sold Item
|
|
if price == "Verkauft":
|
|
continue
|
|
|
|
marketlistings[i] = {
|
|
"id": lid,
|
|
"ins": href,
|
|
"price": price
|
|
}
|
|
i += 1
|
|
|
|
if i > n:
|
|
break
|
|
|
|
# Next page
|
|
driver.find_element_by_css_selector("#searchResults_btn_next").click()
|
|
# driver.refresh()
|
|
sleep(1)
|
|
|
|
print("\n")
|
|
return marketlistings
|
|
|
|
|
|
# ==============================================================================================================
|
|
# Main
|
|
# ==============================================================================================================
|
|
|
|
if __name__ == '__main__':
|
|
|
|
app = QtWidgets.QApplication(sys.argv)
|
|
window = STInstance()
|
|
window.show()
|
|
|
|
'''
|
|
bucket = 1
|
|
db = tinydb.TinyDB('tiny.db',
|
|
sort_keys=True,
|
|
indent=4,
|
|
separators=(',', ': '),
|
|
ensure_ascii=False)
|
|
db.default_ratetable_name = 'Memory'
|
|
|
|
ratetable = db.table("Rate")
|
|
try:
|
|
if ratetable.get(doc_id=1)['date'] != datetime.today().strftime('%Y-%m-%d'):
|
|
rate, date = getexchangerate()
|
|
ratetable.update({"rate": rate, "date": date})
|
|
else:
|
|
# Reuse Rate from today
|
|
rate = ratetable.get(doc_id=1)['rate']
|
|
except TypeError:
|
|
# happens on nonexisting rate data in db
|
|
rate, date = getexchangerate()
|
|
ratetable.update({"rate": rate, "date": date})
|
|
|
|
url = "https://steamcommunity.com/market/listings/730/AK-47%20%7C%20Redline%20%28Field-Tested%29"
|
|
baseprice = getbaseprice(url)
|
|
marketlistings = getmarketlistings(url, 115)
|
|
for index in marketlistings:
|
|
print("{:3d}.".format(index), end=" ")
|
|
evaluate(baseprice, marketlistings[index])
|
|
'''
|
|
|
|
sys.exit(app.exec_())
|