You've already forked MicroPythonOS
mirror of
https://github.com/m5stack/MicroPythonOS.git
synced 2026-05-20 11:51:27 -07:00
memtest: fix on desktop
This commit is contained in:
@@ -26,10 +26,9 @@
|
||||
# 2097152 | 0
|
||||
#=====================================
|
||||
|
||||
appscreen = lv.screen_active()
|
||||
|
||||
import gc
|
||||
import time
|
||||
import _thread
|
||||
|
||||
# Configuration
|
||||
ALLOCATION_TIMEOUT_MS = 100 # Timeout for a single allocation (in milliseconds)
|
||||
@@ -59,56 +58,58 @@ def test_allocation(buffer_size, n):
|
||||
# Print progress every 100 allocations to avoid flooding serial
|
||||
if count % 100 == 0:
|
||||
print(f"Allocated {count} buffers of {buffer_size} bytes", end="\r")
|
||||
except MemoryError:
|
||||
except MemoryError as e:
|
||||
buffers.clear()
|
||||
print(f"\nStopped after allocating {count} buffers of {buffer_size} bytes: MemoryError")
|
||||
except Exception as e:
|
||||
buffers.clear()
|
||||
print(f"\nStopped after allocating {count} buffers of {buffer_size} bytes: {e}")
|
||||
|
||||
# Free allocated buffers
|
||||
buffers.clear()
|
||||
#buffers.clear()
|
||||
gc.collect()
|
||||
return count
|
||||
|
||||
|
||||
# Test buffer sizes of 2^n, starting from n=1 (2 bytes)
|
||||
n = 1
|
||||
|
||||
|
||||
def stress_test_thread():
|
||||
summary = "Running RAM memory tests...\n\n"
|
||||
summary += "Buffer Size (bytes) | Max Allocated\n"
|
||||
summary += "-----------------------------------\n"
|
||||
lv.async_call(lambda l: status.set_text(summary), None)
|
||||
# Test buffer sizes of 2^n, starting from n=1 (2 bytes)
|
||||
n = 1
|
||||
while appscreen == lv.screen_active():
|
||||
buffer_size = 2 ** n
|
||||
summary += f"{buffer_size:>12} | "
|
||||
#lv.async_call(lambda l: status.set_text(summary), None)
|
||||
# Run allocation test
|
||||
gc.collect()
|
||||
max_buffers = test_allocation(buffer_size, n)
|
||||
# Check if we allocated 0 buffers (indicates we can't allocate this size)
|
||||
if max_buffers == 0:
|
||||
print(f"Cannot allocate buffers of size {buffer_size} bytes. Stopping test.")
|
||||
summary += f"{max_buffers:>14}\n"
|
||||
#lv.async_call(lambda l: status.set_text(summary), None)
|
||||
break
|
||||
# Clean up memory before next test
|
||||
gc.collect()
|
||||
n += 1
|
||||
summary += f"{max_buffers:>14}\n"
|
||||
lv.async_call(lambda l: status.set_text(summary), None)
|
||||
time.sleep_ms(200) # Brief delay to stabilize system
|
||||
# Print summary report
|
||||
summary += "\n"
|
||||
summary += "===================================\n"
|
||||
summary += "Test completed.\n"
|
||||
lv.async_call(lambda l: status.set_text(summary), None)
|
||||
|
||||
appscreen = lv.screen_active()
|
||||
status = lv.label(appscreen)
|
||||
status.align(lv.ALIGN.TOP_LEFT, 5, 10)
|
||||
status.set_style_text_color(lv.color_hex(0xFFFFFF), 0)
|
||||
status.set_style_text_font(lv.font_unscii_8, 0)
|
||||
|
||||
summary = "Running RAM memory tests...\n\n"
|
||||
summary += "Buffer Size (bytes) | Max Allocated\n"
|
||||
summary += "-----------------------------------\n"
|
||||
status.set_text(summary)
|
||||
|
||||
while appscreen == lv.screen_active():
|
||||
buffer_size = 2 ** n
|
||||
summary += f"{buffer_size:>12} | "
|
||||
status.set_text(summary)
|
||||
# Run allocation test
|
||||
gc.collect()
|
||||
max_buffers = test_allocation(buffer_size, n)
|
||||
# Check if we allocated 0 buffers (indicates we can't allocate this size)
|
||||
if max_buffers == 0:
|
||||
print(f"Cannot allocate buffers of size {buffer_size} bytes. Stopping test.")
|
||||
summary += f"{max_buffers:>14}\n"
|
||||
status.set_text(summary)
|
||||
break
|
||||
# Clean up memory before next test
|
||||
gc.collect()
|
||||
time.sleep_ms(100) # Brief delay to stabilize system
|
||||
n += 1
|
||||
summary += f"{max_buffers:>14}\n"
|
||||
status.set_text(summary)
|
||||
|
||||
# Print summary report
|
||||
summary += "\n"
|
||||
summary += "===================================\n"
|
||||
summary += "Test completed.\n"
|
||||
status.set_text(summary)
|
||||
|
||||
# Wait until the user stops the app
|
||||
while appscreen == lv.screen_active():
|
||||
time.sleep(1)
|
||||
_thread.stack_size(12*1024)
|
||||
_thread.start_new_thread(stress_test_thread, ())
|
||||
|
||||
Reference in New Issue
Block a user